Changeset 2df4370 for apps/sam/perl


Ignore:
Timestamp:
May 19, 2004 1:26:02 AM (16 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
0942a7f
Parents:
7243963
git-author:
brianr <brianr> (05/19/04 01:26:02)
git-committer:
zzz <zzz@…> (05/19/04 01:26:02)
Message:

Some changes to make the SAM module never block if called on a socket
which select() says is safe to read/write or called in any case on a socket
which is O_NONBLOCK

Significant work is still required.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/perl/Net/SAM.pm

    r7243963 r2df4370  
    1010
    1111use strict;
     12
     13use POSIX;
    1214
    1315use Switch;
     
    7476#}
    7577
    76 sub readprocess {   
    77     my $self = shift;
    78     my $chunk;
     78
     79sub readprocesswrite {
     80    my $self = shift;
     81    $self->readprocess();
     82    $self->dowrite();
     83}
     84
     85sub doread {
     86    my $self = shift;
     87    my $rv;
     88    my $data;
     89   
     90    $rv = $self->recv($data, $POSIX::BUFSIZE, 0);
     91
     92    if ( defined($rv) && ( length($data) >= 1 ) ) {
     93        # We received some data. Put it in our buffer.
     94        ${*$self}->{inbuffer} += $data;
     95    } else {
     96        # No data. Either we're on a non-blocking socket, or there
     97        # was an error or EOF
     98        if ( $!{EAGAIN} ) {
     99            return 1;
     100        } else {
     101            # I suppose caller can look at $! for details
     102            return undef;
     103        }
     104    }
     105}
     106
     107
     108sub dowrite {
     109    my $self = shift;
     110    my $rv;
     111    my $data;
     112
     113    $rv = $self->send(${*$self}->{outbuffer}, 0);
     114   
     115    if ( ! defined($rv) ) {
     116        warn "SAM::dowrite - Couldn't write for no apparent reason.\n";
     117        return undef;
     118    }
     119
     120    if ( $rv == length(${*$self}->{outbuffer}) || $!{EWOULDBLOCK} ) {
     121        substr(${*$self}->{outbuffer},0, $rv) = ''; # Remove from buffer
     122
     123        # Nuke buffer if empty
     124        delete ${*$self}->{outbuffer} unless length(${*$self}->{outbuffer});
     125    } else {
     126        # Socket closed on us or something?
     127        return undef;
     128    }
     129}
     130
     131sub messages {
     132    my $self = shift;
     133   
     134    return @{ ${*$self}->{messages} };
     135}
     136
     137sub queuemessage {
     138
     139    my $self = shift;
     140    my $message = shift;
     141   
     142    push @{ ${*$self}->{messages} } , $message;
     143}
     144
     145sub unqueuemessage {
     146    my $self = shift;
     147   
     148    return unshift(@{ ${*$self}->{messages} } );
     149   
     150}
     151
     152sub readprocess {
     153    my $self = shift;
     154
     155    $self->doread();
     156    $self->process();
     157}
     158
     159sub process {   
     160    my $self = shift;
     161    my %tvhash;
    79162    my $payload;
    80 
    81     print "readprocess: " . $self->connected() . "\n";
    82 
    83     # May block if the SAM bridge gets hosed
    84     my $response = <$self>;
    85 
    86     print "readprocess: $!" . $self->connected() . "\n";
    87 
    88     chomp $response;
    89     my ($primative, $more, $extra) = split (' ', $response, 3);
    90 
    91     $primative = uc($primative);
    92 
    93     print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
    94 
    95     switch ($primative) {
    96 
    97         case "HELLO" {
    98             if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
    99             if ($extra =~ m/NOVERSION/ ) {
    100                 die("SAM Bridge Doesn't support my version") ;
    101             }
    102             $self->_hashtv($extra);
    103             ${*$self}->{greeted} = 1;
    104         };
    105         case "SESSION" {
    106             if ( $more !~ m/STATUS/ ) {
    107                 die("Bogus SESSION response");
    108             }
    109             $self->_hashtv($extra);
     163   
     164
     165    # Before we can read any new messages, if an existing message has payload
     166    # we must read it in. Otherwise we'll create garbage messages containing
     167    # the payload of previous messages.
     168
     169    if ( ${*$self}->{payloadrequired} >= 1 ) {
     170
     171        if ( length( ${*$self}->{inbuffer} ) >= ${*$self}->{payloadrequired} ) {
     172            # Scarf payload from inbuffer into $payload
     173            $payload = substr(${*$self}->{inbuffer}, 0,
     174                              ${*$self}->{payloadrequired});
     175           
     176            # Nuke payload from inbuffer
     177            substr(${*$self}->{inbuffer}, 0,
     178                   ${*$self}->{payloadrequired} ) = '';
     179           
     180            # Put message with payload into spool
     181            push @{ ${*$self}->{messages} } ,
     182            ${*$self}->{messagerequiringpayload}.$payload;
     183
     184            # Delete the saved message requiring payload
     185            delete ${*$self}->{messagerequiringpayload};
     186        } else {
     187            # Insufficient payload in inbuffer. Try again later.
     188            return 1;
    110189        }
    111         case "STREAM" {};
    112         case "DATAGRAM" {
    113             if ( $more !~ m/RECEIVE/ ) {
    114                 die("Bogus DATAGRAM response.");
    115             }
    116             $self->_hashtv($extra);
    117             push @{ ${*$self}->{incomingdatagram } },
    118                     [ ${*$self}->{DESTINATION},
    119                       $self->_readblock(${*$self}->{SIZE}) ];
     190
     191    }
     192
     193
     194    if ( ${*$self}->{inbuffer} =~ s/(.*\n)// ) {
     195        %tvhash = $self->_hashtv($1); # Returns a tag/value hash
     196        if ( $tvhash{SIZE} ) {
     197            # We've got a message with payload on our hands. :(
     198            ${*$self}->{payloadrequired} = $tvhash{SIZE};
     199            ${*$self}->{messagerequiringpayload} = $1;
     200            return 1; # Could call ourself here, but we'll get called again.
     201        } else {
     202            push @{ ${*$self}->{messages} } , $1;
     203        }
     204    }
     205    return 1;
     206}
     207
     208# sub junk {
     209
     210
     211#     print "readprocess: " . $self->connected() . "\n";
     212
     213#     # May block if the SAM bridge gets hosed
     214#     my $response = <$self>;
     215
     216#     print "readprocess: $!" . $self->connected() . "\n";
     217
     218#     chomp $response;
     219#     my ($primative, $more, $extra) = split (' ', $response, 3);
     220
     221#     $primative = uc($primative);
     222
     223#     print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
     224
     225#     switch ($primative) {
     226
     227#       case "HELLO" {
     228#           if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
     229#           if ($extra =~ m/NOVERSION/ ) {
     230#               die("SAM Bridge Doesn't support my version") ;
     231#           }
     232#           $self->_hashtv($extra);
     233#           ${*$self}->{greeted} = 1;
     234#       };
     235#       case "SESSION" {
     236#           if ( $more !~ m/STATUS/ ) {
     237#               die("Bogus SESSION response");
     238#           }
     239#           $self->_hashtv($extra);
     240#       }
     241#       case "STREAM" {};
     242#       case "DATAGRAM" {
     243#           if ( $more !~ m/RECEIVE/ ) {
     244#               die("Bogus DATAGRAM response.");
     245#           }
     246#           $self->_hashtv($extra);
     247#           push @{ ${*$self}->{incomingdatagram } },
     248#                   [ ${*$self}->{DESTINATION},
     249#                     $self->_readblock(${*$self}->{SIZE}) ];
    120250                     
    121         };
    122         case "RAW" {
    123             if ( $more !~ m/RECEIVE/ ) {
    124                 die("Bogus RAW response.");
    125             }
    126             $self->_hashtv($extra);
    127 
    128             push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
    129         };
    130         case "NAMING" {
    131             if ( $more !~ m/REPLY/ ) {
    132                 die("Bogus NAMING response");
    133             }
    134             $self->_hashtv($extra);
    135         };
    136         case "DEST" {};
    137     }
    138     return 1;
    139 }
     251#       };
     252#       case "RAW" {
     253#           if ( $more !~ m/RECEIVE/ ) {
     254#               die("Bogus RAW response.");
     255#           }
     256#           $self->_hashtv($extra);
     257
     258#           push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
     259#       };
     260#       case "NAMING" {
     261#           if ( $more !~ m/REPLY/ ) {
     262#               die("Bogus NAMING response");
     263#           }
     264#           $self->_hashtv($extra);
     265#       };
     266#       case "DEST" {};
     267#     }
     268#     return 1;
     269# }
    140270
    141271sub getfh {
     
    162292sub _hashtv {
    163293    my $self = shift;
    164     my $extra = shift;
    165 
    166     while ( $extra=~ m/(\S+)=(\S+)/sg ) {
    167         ${*$self}->{$1}=$2;
    168         print "$1=$2\n"
    169     }
     294    my $tvstring = shift;
     295    my $tvhash;
     296
     297    while ( $tvstring =~ m/(\S+)=(\S+)/sg ) {
     298        $tvhash->{$1}=$2;
     299        print "hashtv: $1=$2\n"
     300    }
     301    return $tvhash;
    170302}
    171303
Note: See TracChangeset for help on using the changeset viewer.