source: apps/sam/perl/Net/SAM.pm @ 2df4370

Last change on this file since 2df4370 was 2df4370, checked in by zzz <zzz@…>, 17 years ago

Some changes to make the SAM module never block if called on a socket
which select() says is safe to read/write or called in any case on a socket
which is O_NONBLOCK

Significant work is still required.

  • Property mode set to 100644
File size: 6.8 KB
Line 
1#!/usr/bin/perl
2
3## Copyright 2004 Brian Ristuccia. This program is Free Software;
4## You can redistribute it and/or modify it under the same terms as
5## Perl itself.
6
7package Net::SAM;
8
9@ISA = ( "IO::Socket::INET" ); 
10
11use strict;
12
13use POSIX;
14
15use Switch;
16
17use IO::Socket;
18use IO::Select;
19
20#use Net::SAM::StreamSession;
21#use Net::SAM::DatagramSession;
22#use Net::SAM::RawSession;
23
24sub new {
25    my ($class) = shift;
26    my $type = ref($class) || $class;
27    my $self = $type->SUPER::new("127.0.0.1:7656");
28
29
30    ${*$self}->{incomingraw} = [];
31
32    # Connect us to the local SAM proxy.
33    # my $samsock = IO::Socket::INET->new('127.0.0.1:7657');
34    #$self->{samsock}=$samsock;
35
36    # Say hello, read response.
37    $self->SUPER::send("HELLO VERSION MIN=1.0 MAX=1.0\n");
38   
39    while (! ${*$self}->{greeted}) { 
40        $self->readprocess();
41    }
42    print "Created SAM object\n";
43    return $self;
44}
45
46sub lookup {
47    my $self = shift;
48    my $name= shift;
49
50    $self->SUPER::send("NAMING LOOKUP NAME=$name\n");
51    undef ${*$self}->{RESULT};
52    while (! ${*$self}->{RESULT}) {
53        $self->readprocess();
54    }
55    if ( ${*$self}->{RESULT} == "OK" ) {
56        return ${*$self}->{VALUE};
57    } else {
58        return undef;
59    }
60}
61
62#sub createsession {
63#    my ($self) = shift;
64#    my ($sesstype) = shift;
65#    print $self->{samsock} "SESSION CREATE STYLE=$SESSTYPE DESTINATION=$DEST, DIRECTION=
66#}
67
68#sub waitfor {
69#    my ($self) = shift;
70#    my ($prefix) = shift;
71#    my ($response) = <$samsock>;#
72
73 #   if $response =~
74   
75
76#}
77
78
79sub readprocesswrite {
80    my $self = shift;
81    $self->readprocess();
82    $self->dowrite();
83}
84
85sub doread {
86    my $self = shift;
87    my $rv;
88    my $data;
89   
90    $rv = $self->recv($data, $POSIX::BUFSIZE, 0);
91
92    if ( defined($rv) && ( length($data) >= 1 ) ) {
93        # We received some data. Put it in our buffer.
94        ${*$self}->{inbuffer} += $data;
95    } else {
96        # No data. Either we're on a non-blocking socket, or there
97        # was an error or EOF
98        if ( $!{EAGAIN} ) {
99            return 1;
100        } else {
101            # I suppose caller can look at $! for details
102            return undef; 
103        }
104    }
105}
106
107
108sub dowrite {
109    my $self = shift;
110    my $rv;
111    my $data; 
112
113    $rv = $self->send(${*$self}->{outbuffer}, 0);
114   
115    if ( ! defined($rv) ) {
116        warn "SAM::dowrite - Couldn't write for no apparent reason.\n";
117        return undef; 
118    }
119
120    if ( $rv == length(${*$self}->{outbuffer}) || $!{EWOULDBLOCK} ) {
121        substr(${*$self}->{outbuffer},0, $rv) = ''; # Remove from buffer
122
123        # Nuke buffer if empty
124        delete ${*$self}->{outbuffer} unless length(${*$self}->{outbuffer});
125    } else {
126        # Socket closed on us or something?
127        return undef;
128    }
129}
130
131sub messages {
132    my $self = shift;
133   
134    return @{ ${*$self}->{messages} };
135}
136
137sub queuemessage {
138
139    my $self = shift;
140    my $message = shift; 
141   
142    push @{ ${*$self}->{messages} } , $message;
143}
144
145sub unqueuemessage {
146    my $self = shift;
147   
148    return unshift(@{ ${*$self}->{messages} } );
149   
150}
151
152sub readprocess {
153    my $self = shift;
154
155    $self->doread();
156    $self->process();
157}
158
159sub process {   
160    my $self = shift;
161    my %tvhash; 
162    my $payload; 
163   
164
165    # Before we can read any new messages, if an existing message has payload
166    # we must read it in. Otherwise we'll create garbage messages containing
167    # the payload of previous messages.
168
169    if ( ${*$self}->{payloadrequired} >= 1 ) {
170
171        if ( length( ${*$self}->{inbuffer} ) >= ${*$self}->{payloadrequired} ) {
172            # Scarf payload from inbuffer into $payload
173            $payload = substr(${*$self}->{inbuffer}, 0, 
174                              ${*$self}->{payloadrequired});
175           
176            # Nuke payload from inbuffer
177            substr(${*$self}->{inbuffer}, 0,
178                   ${*$self}->{payloadrequired} ) = '';
179           
180            # Put message with payload into spool
181            push @{ ${*$self}->{messages} } , 
182            ${*$self}->{messagerequiringpayload}.$payload;
183
184            # Delete the saved message requiring payload
185            delete ${*$self}->{messagerequiringpayload};
186        } else {
187            # Insufficient payload in inbuffer. Try again later.
188            return 1;
189        }
190
191    }
192
193
194    if ( ${*$self}->{inbuffer} =~ s/(.*\n)// ) {
195        %tvhash = $self->_hashtv($1); # Returns a tag/value hash
196        if ( $tvhash{SIZE} ) {
197            # We've got a message with payload on our hands. :(
198            ${*$self}->{payloadrequired} = $tvhash{SIZE}; 
199            ${*$self}->{messagerequiringpayload} = $1; 
200            return 1; # Could call ourself here, but we'll get called again.
201        } else {
202            push @{ ${*$self}->{messages} } , $1;
203        }
204    }
205    return 1; 
206}
207
208# sub junk {
209
210
211#     print "readprocess: " . $self->connected() . "\n";
212
213#     # May block if the SAM bridge gets hosed
214#     my $response = <$self>;
215
216#     print "readprocess: $!" . $self->connected() . "\n";
217
218#     chomp $response;
219#     my ($primative, $more, $extra) = split (' ', $response, 3);
220
221#     $primative = uc($primative);
222
223#     print "readprocess: " . $self->connected() . " -- $primative -- $more -- $extra\n";
224
225#     switch ($primative) {
226
227#       case "HELLO" {
228#           if ($more !~ m/REPLY/ ) { die ("Bogus HELLO response") }
229#           if ($extra =~ m/NOVERSION/ ) {
230#               die("SAM Bridge Doesn't support my version") ;
231#           }
232#           $self->_hashtv($extra);
233#           ${*$self}->{greeted} = 1;
234#       };
235#       case "SESSION" {
236#           if ( $more !~ m/STATUS/ ) {
237#               die("Bogus SESSION response");
238#           }
239#           $self->_hashtv($extra);
240#       }
241#       case "STREAM" {};
242#       case "DATAGRAM" {
243#           if ( $more !~ m/RECEIVE/ ) {
244#               die("Bogus DATAGRAM response.");
245#           }
246#           $self->_hashtv($extra);
247#           push @{ ${*$self}->{incomingdatagram } },
248#                   [ ${*$self}->{DESTINATION},
249#                     $self->_readblock(${*$self}->{SIZE}) ];
250                     
251#       };
252#       case "RAW" {
253#           if ( $more !~ m/RECEIVE/ ) {
254#               die("Bogus RAW response.");
255#           }
256#           $self->_hashtv($extra);
257
258#           push @{ $self->{incomingraw} }, $self->_readblock($self->{SIZE});
259#       };
260#       case "NAMING" {
261#           if ( $more !~ m/REPLY/ ) {
262#               die("Bogus NAMING response");
263#           }
264#           $self->_hashtv($extra);
265#       };
266#       case "DEST" {};
267#     }
268#     return 1;
269# }
270
271sub getfh {
272    # Return the FH of the SAM socket so apps can select() or poll() on it
273    my $self = shift;
274    return $self->{samsock};
275}
276
277sub _readblock {
278    my $self = shift;
279    my $size = shift; 
280    my $chunk;
281    my $payload;
282
283    while ( $size > 1 )  {
284        # XXX: May block. No error checking.
285        print "readblock: $size\n";
286        $size -= $self->SUPER::recv($chunk, $size);
287        $payload .= $chunk;
288    }
289    return $payload; 
290}
291
292sub _hashtv {
293    my $self = shift;
294    my $tvstring = shift;
295    my $tvhash;
296
297    while ( $tvstring =~ m/(\S+)=(\S+)/sg ) {
298        $tvhash->{$1}=$2;
299        print "hashtv: $1=$2\n"
300    }
301    return $tvhash;
302}
303
304sub DESTROY {
305    # Do nothing yet.
306}
307
308#sub StreamSession {
309#    my $self = shift;
310#    return Net::SAM::StreamSession->new($self);
311#}
312
313#sub DatagramSession {
314#    return Net::SAM::DatagramSession->new($self);
315#}
316
317#sub RawSession {
318#    return Net::SAM::RawSession->new($self);
319#}
320
3211;
Note: See TracBrowser for help on using the repository browser.