# THREAD.pm: Thread Command Library package RDA::Request::THREAD; # $Id: THREAD.pm,v 1.3 2015/07/09 08:26:25 RDA Exp $ # ARCS: $Header: /home/cvs/cvs/RDA_8/src/scripting/lib/RDA/Request/THREAD.pm,v 1.3 2015/07/09 08:26:25 RDA Exp $ # # Change History # 20150707 MSC Improve the data management. =head1 NAME RDA::Request::THREAD - Thread Command Library =head1 SYNOPSIS require RDA::Request::THREAD; =head1 DESCRIPTION The objects of the C class are used to execute a same request in parallel on multiple agents. The following methods are available: =cut use strict; BEGIN { use Exporter; use RDA::Text qw(get_string); use RDA::Agent; use RDA::Driver::Agent qw(get_line); use RDA::Error; use RDA::Object::Message; } # Define the global public variables use vars qw($STRINGS $VERSION @ISA); $VERSION = sprintf('%d.%02d', q$Revision: 1.3 $ =~ /(\d+)\.(\d+)/); @ISA = qw(Exporter); # Define the global private constants # Define the global private variables my %tb_cmd = ( 'THREAD.EXEC' => \&_do_exec, ); # Report the package version sub Version { return $VERSION; } =head2 S<$h = RDA::Request::THREAD-Enew($req,$agt)> The object constructor. This method enables you to specify the request and agent references as arguments. C is represented by a blessed hash reference. The following special keys are used: =over 10 =item S< B<'_agt'> > Reference to the agent object =item S< B<'_ser'> > Serialization indicator =back Internal keys are prefixed by an underscore. =cut sub new { my ($slf, $agt) = @_; my $cls = ref($slf) || $slf; # Create the library object $slf = bless {'_agt' => $agt}, $cls; # Determine if threads are enabled $slf->{'_ser'} = ($^O =~ m/MSWin(32|64)|VMS|Windows_NT/) ? 1 : 0; # Return the object reference return $slf; } =head2 S<$h-Edelete_object> This method deletes the library object. =cut sub delete_object { undef %{$_[0]}; undef $_[0]; return; } =head2 S<$h-Eexec_command($req)> This method executes the command specified in the message. =cut sub exec_command { my ($slf, $req) = @_; my $cmd = $req->{'msg'}; return exists($tb_cmd{$cmd}) ? &{$tb_cmd{$cmd}}($slf, $req) : $req->error('NotImplemented', get_string('BAD_COMMAND', $cmd)); } =head1 THREAD COMMANDS =head2 THREAD.EXEC - Exec command This command executes a same command in parallel on multiple agents. It supports the following attributes: =over 14 =item B< oid> Lists the agent object identifiers. =item B< command> Specifies the command to execute on all agents. =item B< prefix> When set, prefixes property name (true by default). =item B< serial> When set, forces the request serialization. =back To protect attributes from being interpreted directly, you can prefix the attribute names with the corresponding agent object identifier. It returns response status and attributes as attributes, prefixed by the corresponding agent object identifier. It returns errors as data lines. =cut sub _do_exec ## no critic (Complex) { my ($slf, $msg) = @_; my ($agt, $alt, $aux, $cmd, $ctl, $nod, $rec, $req, $rsp, $snd, @oid, %bkp, %tbl); # Get the parameters return $msg->new('ERROR.MissingCommand') unless ($cmd = $msg->set_value('command')); return $msg->new('ERROR.InvalidCommand', command => $cmd) if $cmd =~ m/^[A-Z]+\.(EXIT|INIT|LOGIN)$/; @oid = $msg->get_value('oid'); # Determine the functions to use if ($msg->get_first('serial', $slf->{'_ser'})) { ($snd, $rec) = (\&_submit); } else { ($snd, $rec) = (\&_send, \&_receive); } # Validate the object identifiers $agt = $slf->{'_agt'}; $aux = RDA::Error->new( att => [], cmd => $cmd, cnt => 0, pre => $msg->get_first('prefix', 1), top => $agt); foreach my $oid (@oid) { $nod = uc($oid); if ($nod eq q{.} || $nod eq q{..} || $nod eq $agt->get_info('nod')) { $aux->add_error(get_string('INVALID', $oid)); } elsif (exists($aux->{'dup'}->{$nod})) { $aux->add_error(get_string('DUPLICATED', $oid)); } elsif ($ctl = $agt->get_agent($nod)) { $aux->{'agt'}->{$oid} = $ctl; $aux->{'bad'}->{$oid} = 0; $aux->{'drv'}->{$oid} = $ctl->get_info('drv'); $aux->{'dup'}->{$nod} = $oid; $aux->{'fln'}->{$oid} = fileno($ctl->get_info('ifh')); $aux->{'loc'}->{$oid} = $ctl->get_info('loc'); $aux->{'rsp'}->{$oid} = 0; ++$aux->{'cnt'}; } else { $aux->add_error(get_string('UNKNOWN', $oid)); } } $aux->add_error(get_string('MISSING')) unless $aux->{'cnt'}; return $msg->error('BadDestination', $aux->purge_errors) if $aux->has_errors; # Treat the requests $bkp{'aux'} = $agt->set_info('aux', $aux); $msg->set_info('msg', $cmd); $msg->set_value('oid'); $msg->set_value('prefix'); $msg->set_value('serial'); foreach my $oid (@oid) { # Switch execution context %tbl = (); $req = $msg->clone; foreach my $key ($req->grep(qr{^$oid\_}i)) { $alt = $key; $alt =~ s{^$oid\_}{}i; $tbl{$alt} = $req->set_value($key); } foreach my $oth (@oid) { foreach my $key ($req->grep(qr{^$oth\_}i)) { $req->set_value($key); } } foreach my $key (keys(%tbl)) { $req->set_value($key, $tbl{$key}); } # Submit the request $bkp{'err'} = $agt->set_info('err', $req->get_errors); $bkp{'ids'} = $agt->set_info('ids', $req->set_id($agt)); $bkp{'req'} = $agt->set_info('req', $req); &{$snd}($aux, $oid, $req); $agt->set_info('err', $bkp{'err'}); $agt->set_info('ids', $bkp{'ids'}); $agt->set_info('req', $bkp{'req'}); } &{$rec}($aux) if $rec; $agt->set_info('aux', $bkp{'aux'}); # Indicate the completion status return $aux->has_errors ? $msg->error('Thread', {@{$aux->{'att'}}}, $aux->purge_errors) : $msg->new('OK.Thread', @{$aux->{'att'}}); } # --- Internal routines ------------------------------------------------------- # Determine the select mask sub _mask { my ($tbl, $oid) = @_; my $msk = q{}; delete($tbl->{$oid}) if $oid; foreach my $oid (keys(%{$tbl})) { vec($msk, $tbl->{$oid}, 1) = 1; } return $msk; } # Wait for the responses sub _receive { my ($aux) = @_; my ($agt, $drv, $err, $flg, $lin, $lvl, $msg, $msk, $oid, $pre, $top); # Get the tracing level $lvl = ($top = $aux->{'top'})->{'lvl'}; # Read agent responses for ($msk = _mask($aux->{'fln'}) ;;) ## no critic (Loop) { $top->trace(get_string('T_Wait')) unless $lvl < 20; ## no critic (Unless) $oid = _select($aux, $msk); $agt = $aux->{'agt'}->{$oid}; $drv = $aux->{'drv'}->{$oid}; unless (defined($lin = get_line($drv, 1))) { $aux->add_error("[$oid]: Input pipe broken"); push(@{$aux->{'att'}}, "$oid\_status", 'ERROR.Receive'); return unless --$aux->{'cnt'}; $msk = _mask($aux->{'fln'}, $oid); next; } $top->trace($lin) unless $agt->get_level < 30; ## no critic (Unless) if ($lin =~ m{^(REQ|RSP) (\d+(\/\d+)*) ((\w+)\.\w+)}) { # Treat a response if ($1 eq 'RSP') { $flg = exists($aux->{'sav'}->{$oid}) && $5 ne 'ERROR'; $agt->incr_usage($aux->{'cmd'}, $4) if $aux->{'loc'}->{$oid}; $msg = $drv->receive_message( RDA::Object::Message->new_msg($1, $2, $4), $flg); $agt->extract_usage($msg); push(@{$aux->{'att'}}, "$oid\_status", $msg->get_info('msg')); unless ($msg->is_error($aux, "[$oid]: %s")) { $pre = $aux->{'pre'} ? $oid.q{_} : q{}; push(@{$aux->{'att'}}, map {$pre.$_ => $msg->get_value($_)} $msg->grep(q{.})); } RDA::Agent->treat_data($msg, $aux->{'sav'}->{$oid}) if $flg; return unless --$aux->{'cnt'}; $msk = _mask($aux->{'fln'}, $oid); next; } # Treat a sub request $msg = $drv->receive_message( RDA::Object::Message->new_msg($1, $2, $4)); if ($err = $drv->send_message($drv->treat_message($msg))) { $aux->add_error("[$oid]:", $err) if $aux->{'pre'}; push(@{$aux->{'att'}}, "$oid\_status", 'ERROR.Send'); return unless --$aux->{'cnt'}; $msk = _mask($aux->{'fln'}, $oid); next; } } } } # Select the agent, giving the preference to the less active agent sub _select { my ($aux, $msk) = @_; my ($cnt, $efd, $min, $rfd, $sel, $wfd); $cnt = select($rfd = $msk, $wfd = q{}, $efd = q{}, undef); foreach my $oid (keys(%{$aux->{'fln'}})) { next unless vec($rfd, $aux->{'fln'}->{$oid}, 1) == 1; ($sel, $min) = ($oid, $aux->{'msg'}->{$oid}) unless defined($min) && ## no critic (Unless) $aux->{'msg'}->{$oid} < $min; } return $sel; } # Send the request without waiting for the response sub _send { my ($aux, $oid, $req) = @_; my ($err); # Abort when the agent is not active return $aux->add_error("[$oid]: Inactive agent") unless $aux->{'agt'}->{$oid}->get_info('pid'); # Determine the data treatment $aux->{'sav'}->{$oid} = $req->set_value('_save') || {skp => 1}; $req->set_value('_forward', 1); # Send the request $aux->{'agt'}->{$oid}->incr_usage($aux->{'cmd'}) if $aux->{'loc'}->{$oid}; $aux->add_error("[$oid]:", $err) if ($err = $aux->{'drv'}->{$oid}->send_message($req)); return; } # Submit a request and wait for the response sub _submit { my ($aux, $oid, $req) = @_; my ($rsp, $sav); eval { # Determine the data treatment $sav = $req->set_value('_save') || {skp => 1}; $req->set_value('_forward', 1); # Execute the request $rsp = $aux->{'drv'}->{$oid}->exec_request($req, 1); push(@{$aux->{'att'}}, "$oid\_status", $rsp->get_info('msg')); $rsp->is_error($aux, "[$oid]:"); # Treat the data RDA::Agent->treat_data($rsp, $sav); }; $aux->add_error("[$oid]: $@") if $@; return; } 1; __END__ =head1 SEE ALSO L, L, L, L, L =head1 COPYRIGHT NOTICE Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved. =head1 TRADEMARK NOTICE Oracle and Java are registered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners. =cut