# Agent.pm: Class Used to Interact between Agents package RDA::Driver::Agent; # $Id: Agent.pm,v 1.20 2015/10/02 14:11:49 RDA Exp $ # ARCS: $Header: /home/cvs/cvs/RDA_8/src/scripting/lib/RDA/Driver/Agent.pm,v 1.20 2015/10/02 14:11:49 RDA Exp $ # # Change History # 20151001 MSC Improve the local mode. =head1 NAME RDA::Driver::Agent - Class Used to Interact between Agents =head1 SYNOPSIS require RDA::Driver::Agent; =head1 DESCRIPTION The objects of the C class are used to interact between agents. It is a subclass of L. The following methods are available: =cut use strict; BEGIN { use Exporter; use IO::Handle; use RDA::Text qw(get_string); use RDA::Error; use RDA::Object; use RDA::Object::Message; } # Define the global public variables use vars qw($STRINGS $VERSION @EXPORT_OK @ISA); $VERSION = sprintf('%d.%02d', q$Revision: 1.20 $ =~ /(\d+)\.(\d+)/); @EXPORT_OK = qw(get_line); @ISA = qw(RDA::Object Exporter); # Define the global private constants my $EOD = '___End_of_Data___'; my $EOL = qq{\015\012}; my $EOP = 'Agent pipe broken'; # Define the global private variables my %tb_eol = ( CR => qq{\015}, CRLF => qq{\015\012}, LF => qq{\012}, ); # Report the package version sub Version { return $VERSION; } =head2 S<$h = RDA::Driver::Agent-Enew($agt,$ifh,$ofh)> The object constructor. This method enables you to specify the agent reference, the input and output file handles as arguments. C is represented by a blessed hash reference. The following special keys are used: =over 12 =item S< B<'agt' > > Reference to the agent object =item S< B<'end' > > End command =item S< B<'eol' > > End-of-line string =item S< B<'fct' > > Optional post sent treatment =item S< B<'ifh' > > Input file handle =item S< B<'lvl' > > Trace level =item S< B<'ofh' > > Output file handle =item S< B<'rsp' > > Reference to last response with unread data =item S< B<'top' > > Reference to the master or slave agent object =item S< B<'_buf'> > Input pipe buffer =item S< B<'_eof'> > Input end-of-file indicator =item S< B<'_nxt'> > Character to skip at next read from the buffer =back Internal keys are prefixed by an underscore. =cut sub new { my ($cls, $agt, $ifh, $ofh) = @_; # Create and return the driver object return bless { agt => $agt, eol => $EOL, ifh => $ifh, lvl => $agt->get_level, ofh => $ofh, top => $agt->get_top, _buf => q{}, _eof => 0, }, ref($cls) || $cls; } =head2 S<$h-Eexec_request($msg[,$flg])> This method executes a request. When the flag is set, the data will be read by the caller. The caller must take care that the data are read completely before sending another request. =cut sub exec_request { my ($slf, $req, $flg) = @_; my ($agt, $bkp, $cmd, $err, $lin, $loc, $lvl, $msg); # Initialization $agt = $slf->{'agt'}; $lvl = $slf->{'lvl'}; # Take care that the data of the previous response is completely read if (exists($slf->{'rsp'})) { $agt->trace(get_string('T_Purge')) unless $lvl < 20; ## no critic (Unless) $slf->{'rsp'}->skip_data; delete($slf->{'rsp'}); } # Send the request and read the response $loc = $agt->get_info('loc'); $agt->incr_usage($cmd = $req->get_info('msg')) if $loc; $req->set_value('_forward', 1) if $flg; unless ($err = send_message($slf, $req)) { $agt->trace(get_string('T_Wait')) unless $lvl < 20; ## no critic (Unless) for (;;) ## no critic (Loop) { unless (defined($lin = get_line($slf, 1))) { $agt->trace(get_string('Broken')) unless $lvl < 10; ## no critic (Unless) return $req->error('Receive', 'Input pipe broken'); } $agt->trace($lin) unless $lvl < 30; ## no critic (Unless) if ($lin =~ m{^(REQ|RSP) (\d+(\/\d+)*) ((\w+)\.\w+)}) { # Treat a response if ($1 eq 'RSP') { $flg = 0 if $5 eq 'ERROR'; $agt->incr_usage($cmd, $4) if $loc; $msg = receive_message($slf, RDA::Object::Message->new_msg($1, $2, $4), $flg); $agt->extract_usage($msg); $slf->{'rsp'} = $msg if $flg && exists($msg->{'dat'}); return $msg; } # Treat a sub request $msg = receive_message($slf, RDA::Object::Message->new_msg($1, $2, $4)); last if ($err = send_message($slf, treat_request($slf, $msg))); } } } # Report any error return $req->error('Send', $err); } =head2 S<$h-Eis_busy> This method indicates whether the communication channel is busy. =cut sub is_busy { return shift->{'rsp'}; } =head2 S<$h-Ereceive_message($msg[,$flg])> This method receives a message. When the flag is set, the data are read through function calls. =cut sub receive_message ## no critic (Complex) { my ($slf, $msg, $flg) = @_; my ($lgt, $lim, $lin, $off, $ptr); # Extract the message header while (defined($lin = get_line($slf, 1))) { last unless length($lin); if ($lin =~ s/^(\w+)=\(//) { $msg->{'att'}->{$1} = $ptr = []; push(@{$ptr}, RDA::Object::decode($1)) while $lin =~ s/"(.*?)"[\,\)]//; } elsif ($lin =~ s/^(\w+)="(.*?)"//) { $msg->{'att'}->{$1} = RDA::Object::decode($2); } } # Treat attributes if (exists($msg->{'att'})) { # Treat the proxy information $slf->{'agt'}->apply_changes($msg); # Load the data if ($flg) { if (exists($msg->{'att'}->{'end'})) { $msg->{'dat'} = \&_read_line; $msg->{'arg'} = [$slf, {flg => 1, end => $msg->{'att'}->{'end'}}]; } elsif (exists($msg->{'att'}->{'size'})) { $msg->{'dat'} = \&_read_data; $msg->{'arg'} = [$slf, {siz => $msg->{'att'}->{'size'}}]; } } elsif (exists($msg->{'att'}->{'end'})) { $slf->{'agt'}->trace(get_string('Data')) unless $slf->{'lvl'} < 30; ## no critic (Unless) $lim = $msg->{'att'}->{'end'}; $msg->{'dat'} = $ptr = []; push(@{$ptr}, $lin) while defined($lin = get_line($slf, 1)) && $lin ne $lim; } elsif (exists($msg->{'att'}->{'size'}) && ($lim = $msg->{'att'}->{'size'}) > 0) { $slf->{'agt'}->trace(get_string('Data')) unless $slf->{'lvl'} < 30; ## no critic (Unless) _sync_line($slf); if ($lim > length($slf->{'_buf'})) { $lim -= ($off = length($msg->{'dat'} = $slf->{'_buf'})); $slf->{'_buf'} = q{}; while ($lim > 0 && ($lgt = sysread($slf->{'ifh'}, $msg->{'dat'}, ($lim > 1024) ? 1024 : $lim, $off))) { $off += $lgt; $lim -= $lgt; } } else { $msg->{'dat'} = substr($slf->{'_buf'}, 0, $lim); $slf->{'_buf'} = substr($slf->{'_buf'}, $lim); } } } # Dump the message $slf->{'agt'}->trace_object($msg, '- Received: ') unless $slf->{'lvl'} < 20; ## no critic (Unless) # Return the message reference return $msg; } =head2 S<$h-Esend_message($msg)> This method sends a message. =cut sub send_message ## no critic (Complex) { my ($slf, $msg) = @_; my ($buf, $cnt, $dat, $eol, $lgt, $ofh, $ref); my (@arg, @err); # Send the message eval { local $SIG{'INT'} = 'IGNORE'; local $SIG{'PIPE'} = sub { die "$EOP\n"; }; $eol = $slf->{'eol'}; $ofh = $slf->{'ofh'}; # Remove save attributes $msg->clear('^_save_'); # Compact the errors $msg->{'att'}->{'_error'} = [@err] if exists($msg->{'err'}) && (@err = RDA::Error::compact_errors(delete($msg->{'err'}), -1)); # Merge proxy information $slf->{'agt'}->merge_changes($msg); # Determine the data size if (exists($msg->{'dat'}) && defined($dat = $msg->{'dat'})) { $ref = ref($dat) || 'DATA'; if ($ref eq 'CODE') { @arg = @{$msg->{'arg'}} if exists($msg->{'arg'}); $lgt = &$dat($cnt = 0, @arg); if (!defined($lgt)) { $ref = 'NONE'; } elsif ($lgt < 0) { $ref = 'LINES'; $msg->{'att'}->{'end'} = $EOD; } else { $ref = 'BLOCKS'; $msg->{'att'}->{'size'} = $lgt; } } elsif ($ref eq 'ARRAY') { $msg->{'att'}->{'end'} = $EOD unless exists($msg->{'att'}->{'end'}); } elsif ($ref eq 'DATA') { $msg->{'att'}->{'size'} = length($dat); } } else { $ref = 'NONE'; } # Dump the message $slf->{'agt'}->trace_object($msg, '- Sent: ') unless $slf->{'lvl'} < 20; ## no critic (Unless) # Write the static part of the message $buf = $msg->{'typ'}.q{ }.$msg->get_id.q{ }.$msg->{'msg'}.$eol; if (exists($msg->{'att'})) { foreach my $key (sort keys(%{$msg->{'att'}})) { $buf .= $key.q{=}; if (ref($msg->{'att'}->{$key})) { $buf .= q{(}.join(q{,}, map {RDA::Object::encode($_, 1)} @{$msg->{'att'}->{$key}}).q{)}; } else { $buf .= RDA::Object::encode($msg->{'att'}->{$key}, 1); } $buf .= $eol; } } $buf .= $eol; if ($ref eq 'ARRAY') { $buf .= join($eol, @{$dat}, $EOD).$eol.$eol; } elsif ($ref eq 'DATA') { $buf .= $dat.$eol; } _write($ofh, $buf, length($buf)); # Treat the dynamic part of the message if ($ref eq 'BLOCKS') { while (($buf, $lgt) = &{$dat}(++$cnt, @arg)) { _write($ofh, $buf, $lgt); } _write($ofh, $eol, length($eol)); } elsif ($ref eq 'LINES') { while (defined($buf = &{$dat}(++$cnt, @arg))) { $buf =~ s/[\n\r]+$//; $buf .= $eol; _write($ofh, $buf, length($buf)); } $buf = $EOD.$eol.$eol; _write($ofh, $buf, length($buf)); } }; # Indicate the completion status return $@; } sub _write { my ($ofh, $buf, $max) = @_; my ($lgt, $off); $max = length($buf); $off = 0; while ($max) { $lgt = syswrite($ofh, $buf, $max, $off) or die get_string('ERR_WRITE', $!); $max -= $lgt; $off += $lgt; } return; } =head2 S<$h-Eset_eol(value)> =cut sub set_eol { my ($slf, $val) = @_; $slf->{'eol'} = $tb_eol{$val} if exists($tb_eol{$val}); return; } =head2 S<$h-Etreat_messages($agt,$end)> This method treats all messages received on the standard input and sends their respective responses on the standard output. =cut sub treat_messages { my ($cls, $agt, $end) = @_; my ($bkp, $err, $fct, $lin, $lvl, $msg, $req, $rsp, $slf); # Create the driver object $lvl = $agt->get_level; $agt->set_info('drv', $slf = bless { agt => $agt, end => $end, eol => $EOL, ifh => \*STDIN, lvl => $lvl, ofh => \*STDOUT, top => $agt, _buf => q{}, _eof => 0, }, ref($cls) || $cls); binmode(STDIN); binmode(STDOUT); # Get the tracing level # Protect the process about SIGINT local $SIG{'INT'} = 'IGNORE'; # Treat the submitted requests $agt->trace(get_string('Started')) unless $lvl < 10; ## no critic (Unless) for (;;) ## no critic (Loop) { # Read next line unless (defined($lin = get_line($slf, 1))) { $agt->trace(get_string('Broken')) unless $lvl < 10; ## no critic (Unless) return 1; } # Get the next message $agt->trace($lin) unless $lvl < 30; ## no critic (Unless) next unless $lin =~ m{^REQ (\d+(?:\/\d+)*) (\w+\.\w+)}; $req = receive_message($slf, RDA::Object::Message->new_msg('REQ', $1, $msg = $2)); # Treat the request $bkp = $agt->set_info('ids', $req->get_info('ids')); $rsp = $agt->submit($req->set_value('_dest'), $req)->pop_id; $agt->set_info('ids', $bkp); # Detect the exit request if ($msg eq $slf->{'end'}) { $agt->trace(get_string('Shutdown')) unless $lvl < 10; ## no critic (Unless) $agt->end($rsp, 0); } # Send the response if ($err = send_message($slf, $rsp)) { $agt->trace(get_string('Error', $err)) unless $lvl < 10; ## no critic (Unless) return 2; } # Perform the post treatement &$fct($slf, $req) if ($fct = delete($slf->{'fct'})); } } =head2 S<$h-Etreat_request($agt,$req)> This method treats a sub request. =cut sub treat_request { my ($slf, $req) = @_; my ($bkp, $rsp, $top); $top = $slf->{'top'}; $bkp = $top->set_info('ids',$req->get_info('ids')); $rsp = $top->submit($req->set_value('_dest'), $req)->pop_id; $top->set_info('ids',$bkp); return $rsp; } # --- Internal routines ------------------------------------------------------- # Extract a line from the message data sub get_line { my ($slf, $flg) = @_; my ($buf, $eol); for ($eol = $slf->{'eol'} ;; _load_buffer($slf)) ## no critic (Loop) { # Extract the first line from the buffer if (length($slf->{'_buf'})) { _sync_line($slf); return $1 if $slf->{'_buf'} =~ s/^(.*?)$eol//; if ($flg && $slf->{'_buf'} =~ s/^(.*?)([\012\015])//) { $slf->{'_nxt'} = ($2 eq "\012") ? "\015" : "\012"; return $1; } } # Accept an incomplete last line if ($slf->{'_eof'}) { return unless length($slf->{'_buf'}); ($buf, $slf->{'_buf'}) = ($slf->{'_buf'}, q{}); return $buf; } } } # Load more input in the buffer sub _load_buffer { my ($slf) = @_; my ($lgt); $lgt = sysread($slf->{'ifh'}, $slf->{'_buf'}, 1024, length($slf->{'_buf'})); die get_string('ERR_READ', $!) unless defined($lgt); $slf->{'_eof'} = 1 unless $lgt; return; } # Extract a data block from the received message sub _read_data { my ($cmd, $slf, $dat) = @_; my ($buf, $lgt, $siz); $siz = $dat->{'siz'}; # Determine the data type return $siz unless $cmd; # Extract some data if ($siz) { # Extract a data block if ($cmd > 0) { if ($lgt = length($slf->{'_buf'})) { $lgt = $siz unless $siz > $lgt; ## no critic (Unless) $buf = substr($slf->{'_buf'}, 0, $lgt); $slf->{'_buf'} = substr($slf->{'_buf'}, $lgt); } else { $lgt = sysread($slf->{'ifh'}, $buf, ($siz > 1024) ? 1024 : $siz); } delete($slf->{'rsp'}) unless ($dat->{'siz'} -= $lgt); return ($buf, $lgt) if wantarray; return $buf; } # Purge remaining data if ($siz > ($lgt = length($slf->{'_buf'}))) { $siz -= $lgt; $slf->{'_buf'} = q{}; $siz -= $lgt while $siz > 0 && ($lgt = sysread($slf->{'ifh'}, $buf, ($siz > 1024) ? 1024 : $siz)); } else { $slf->{'_buf'} = substr($slf->{'_buf'}, $siz); } delete($slf->{'rsp'}); $dat->{'siz'} = 0; } return; } # Extract a line from the received message sub _read_line { my ($cmd, $slf, $dat) = @_; my ($lin); # Determine the data type return -1 if !$cmd; # Extract line(s) if ($cmd > 0) # Get data element { return $lin if $dat->{'flg'} && defined($lin = get_line($slf, 1)) && $lin ne $dat->{'end'}; } else # Purge remaining data { 1 while $dat->{'flg'} && defined($lin = get_line($slf, 1)) && $lin ne $dat->{'end'}; } # Indicate that all lines have been extracted delete($slf->{'rsp'}); $dat->{'flg'} = 0; return; } # Skip trailing characters from previous line sub _sync_line { my ($slf) = @_; my $nxt; if (defined($nxt = delete($slf->{'_nxt'}))) { _load_buffer($slf) unless length($slf->{'_buf'}) || $slf->{'_eof'}; $slf->{'_buf'} =~ s/^$nxt?// } return; } 1; __END__ =head1 SEE ALSO L, L, L, L, L =head1 COPYRIGHT NOTICE Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved. =head1 TRADEMARK NOTICE Oracle and Java are registered trademarks of Oracle and/or its affiliates. Other names may be trademarks of their respective owners. =cut