Changeset 4cdd42f


Ignore:
Timestamp:
Apr 13, 2004 5:40:07 PM (17 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
d2b09ec
Parents:
4d0b3b2
git-author:
aum <aum> (04/13/04 17:40:07)
git-committer:
zzz <zzz@…> (04/13/04 17:40:07)
Message:

Fixed build.xml to detect os, and launch 'jythonc' or 'jythonc.bat'
according to whether we're running on *nix or windoze. build.xml
should now work on your platform, as long as you have jython installed
and jython is on your execution path.

Got SAM STREAMs working - test code added to i2psamclient.py
as function demoSTREAM()

Location:
apps/sam
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/code.leo

    r4d0b3b2 r4cdd42f  
    11<?xml version="1.0" encoding="UTF-8"?>
    22<leo_file>
    3 <leo_header file_format="2" tnodes="0" max_tnode_index="216" clone_windows="0"/>
    4 <globals body_outline_ratio="0.37624999999999997">
    5         <global_window_position top="155" left="108" height="585" width="890"/>
     3<leo_header file_format="2" tnodes="0" max_tnode_index="217" clone_windows="0"/>
     4<globals body_outline_ratio="0.35262008733624456">
     5        <global_window_position top="70" left="219" height="649" width="978"/>
    66        <global_log_window_position top="0" left="0" height="0" width="0"/>
    77</globals>
     
    1414<vnodes>
    1515<v t="davidmcnab.041004143447" a="E"><vh>I2P SAM Server and Client</vh>
    16 <v t="davidmcnab.041004144338" tnodeList="davidmcnab.041004144338,davidmcnab.041004144338.1,davidmcnab.041004144338.2,davidmcnab.041004144338.4,davidmcnab.041004144338.5,davidmcnab.041004144338.6,davidmcnab.041004144338.8,davidmcnab.041004144338.9,davidmcnab.041004144338.10,davidmcnab.041004144338.11,davidmcnab.041004144338.12,davidmcnab.041004144338.13,davidmcnab.041004144338.14,davidmcnab.041004144338.15,davidmcnab.041004144338.17,davidmcnab.041004144338.18,davidmcnab.041004144338.19,davidmcnab.041004144338.20,davidmcnab.041004144338.21,davidmcnab.041004144338.22,davidmcnab.041004144338.23,davidmcnab.041004144338.24,davidmcnab.041004144338.26,davidmcnab.041004144338.27,davidmcnab.041004144338.29,davidmcnab.041004144338.30,davidmcnab.041004144338.31,davidmcnab.041004144338.32,davidmcnab.041004144338.33,davidmcnab.041004144338.34,davidmcnab.041004144338.35,davidmcnab.041004144338.36,davidmcnab.041004144338.37,davidmcnab.041004144338.38,davidmcnab.041004144338.39,davidmcnab.041004144338.40,davidmcnab.041004144338.41,davidmcnab.041004144338.42,davidmcnab.041004144338.43,davidmcnab.041004144338.44,davidmcnab.041004144338.45,davidmcnab.041004144338.46,davidmcnab.041004144338.47,davidmcnab.041004144338.49,davidmcnab.041004144338.50,davidmcnab.041004144338.51,davidmcnab.041004144338.52,davidmcnab.041004144338.53,davidmcnab.041004144338.54,davidmcnab.041004144338.55,davidmcnab.041004144338.56,davidmcnab.041004144338.57,davidmcnab.041004144338.58,davidmcnab.041004144338.59,davidmcnab.041004144338.60,davidmcnab.041004144338.62,davidmcnab.041004144338.63,davidmcnab.041004144338.64,davidmcnab.041004144338.65,davidmcnab.041004144338.66,davidmcnab.041004144338.67,davidmcnab.041004144338.68,davidmcnab.041004144338.69,davidmcnab.041004144338.70,davidmcnab.041004144338.71,davidmcnab.041004144338.72,davidmcnab.041004144338.73,davidmcnab.041004144338.74,davidmcnab.041004144338.75,davidmcnab.041004144338.76,davidmcnab.041004144338.77,davidmcnab.041004144338.78,davidmcnab.041004144338.79,davidmcnab.041004144338.80,davidmcnab.041004144338.81,davidmcnab.041004144338.82,davidmcnab.041004144338.83,davidmcnab.041004144338.84,davidmcnab.041004144338.85,davidmcnab.041004144338.86,davidmcnab.041004144338.87,davidmcnab.041004144338.88,davidmcnab.041004144338.89,davidmcnab.041004144338.90,davidmcnab.041004144338.92,davidmcnab.041004144338.93,davidmcnab.041004144338.94,davidmcnab.041004144338.95,davidmcnab.041004144338.96,davidmcnab.041004144338.97,davidmcnab.041004144338.98,davidmcnab.041004144338.99,davidmcnab.041004144338.100,davidmcnab.041004144338.101,davidmcnab.041004144338.102,davidmcnab.041004144338.103,davidmcnab.041004144338.105,davidmcnab.041004144338.106,davidmcnab.041004144338.107,davidmcnab.041004144338.108,davidmcnab.041004144338.109"><vh>@file jython/src/i2psam.py</vh>
    17 <v t="davidmcnab.041004144338.1"><vh>imports</vh></v>
    18 <v t="davidmcnab.041004144338.2"><vh>globals</vh></v>
     16<v t="davidmcnab.041004144338" a="E" tnodeList="davidmcnab.041004144338,davidmcnab.041004144338.1,davidmcnab.041004144338.2,davidmcnab.041004144338.4,davidmcnab.041004144338.5,davidmcnab.041004144338.6,davidmcnab.041004144338.8,davidmcnab.041004144338.9,davidmcnab.041004144338.10,davidmcnab.041004144338.11,davidmcnab.041004144338.12,davidmcnab.041004144338.13,davidmcnab.041004144338.14,davidmcnab.041004144338.15,davidmcnab.041004144338.17,davidmcnab.041004144338.18,davidmcnab.041004144338.19,davidmcnab.041004144338.20,davidmcnab.041004144338.21,davidmcnab.041004144338.22,davidmcnab.041004144338.23,davidmcnab.041004144338.24,davidmcnab.041004144338.26,davidmcnab.041004144338.27,davidmcnab.041004144338.29,davidmcnab.041004144338.30,davidmcnab.041004144338.31,davidmcnab.041004144338.32,davidmcnab.041004144338.33,davidmcnab.041004144338.34,davidmcnab.041004144338.35,davidmcnab.041004144338.36,davidmcnab.041004144338.37,davidmcnab.041004144338.38,davidmcnab.041004144338.39,davidmcnab.041004144338.40,davidmcnab.041004144338.41,davidmcnab.041004144338.42,davidmcnab.041004144338.43,davidmcnab.041004144338.44,davidmcnab.041004144338.45,davidmcnab.041004144338.46,davidmcnab.041004144338.47,davidmcnab.041004144338.49,davidmcnab.041004144338.50,davidmcnab.041004144338.51,davidmcnab.041004144338.52,davidmcnab.041004144338.53,davidmcnab.041004144338.54,davidmcnab.041004144338.55,davidmcnab.041004144338.56,davidmcnab.041004144338.57,davidmcnab.041004144338.58,davidmcnab.041004144338.59,davidmcnab.041004144338.60,davidmcnab.041004144338.62,davidmcnab.041004144338.63,davidmcnab.041004144338.64,davidmcnab.041004144338.65,davidmcnab.041004144338.66,davidmcnab.041004144338.67,davidmcnab.041004144338.68,davidmcnab.041004144338.69,davidmcnab.041004144338.70,davidmcnab.041004144338.71,davidmcnab.041004144338.72,davidmcnab.041004144338.73,davidmcnab.041004144338.74,davidmcnab.041004144338.75,davidmcnab.041004144338.76,davidmcnab.041004144338.77,davidmcnab.041004144338.78,davidmcnab.041004144338.79,davidmcnab.041004144338.80,davidmcnab.041004144338.81,davidmcnab.041004144338.82,davidmcnab.041004144338.83,davidmcnab.041004144338.84,davidmcnab.041304205426,davidmcnab.041004144338.85,davidmcnab.041004144338.86,davidmcnab.041004144338.87,davidmcnab.041004144338.88,davidmcnab.041004144338.89,davidmcnab.041004144338.90,davidmcnab.041004144338.92,davidmcnab.041004144338.93,davidmcnab.041004144338.94,davidmcnab.041004144338.95,davidmcnab.041004144338.96,davidmcnab.041004144338.97,davidmcnab.041004144338.98,davidmcnab.041004144338.99,davidmcnab.041004144338.100,davidmcnab.041004144338.101,davidmcnab.041004144338.105,davidmcnab.041004144338.106,davidmcnab.041004144338.107,davidmcnab.041004144338.108,davidmcnab.041004144338.102,davidmcnab.041004144338.103,davidmcnab.041004144338.109"><vh>@file jython/src/i2psam.py</vh>
     17<v t="davidmcnab.041004144338.1" a="M"><vh>imports</vh></v>
     18<v t="davidmcnab.041004144338.2" a="V"><vh>globals</vh></v>
    1919<v t="davidmcnab.041004144338.3" a="E"><vh>I2CP Interface Classes</vh>
    2020<v t="davidmcnab.041004144338.4"><vh>class JavaWrapper</vh></v>
     
    111111<v t="davidmcnab.041004144338.83"><vh>on_message</vh></v>
    112112<v t="davidmcnab.041004144338.84"><vh>threadSocketListener</vh></v>
     113<v t="davidmcnab.041304205426"><vh>threadSocketReceiver</vh></v>
    113114<v t="davidmcnab.041004144338.85"><vh>samParse</vh></v>
    114115<v t="davidmcnab.041004144338.86"><vh>samSend</vh></v>
     
    130131<v t="davidmcnab.041004144338.100"><vh>log</vh></v>
    131132<v t="davidmcnab.041004144338.101"><vh>logException</vh></v>
    132 <v t="davidmcnab.041004144338.102"><vh>usage</vh></v>
    133 <v t="davidmcnab.041004144338.103"><vh>main</vh></v>
    134 </v>
    135133<v t="davidmcnab.041004144338.104" a="E"><vh>Tests</vh>
    136134<v t="davidmcnab.041004144338.105" tnodeList="davidmcnab.041004144338.105"><vh>testdests</vh></v>
     
    139137<v t="davidmcnab.041004144338.108"><vh>testsocket</vh></v>
    140138</v>
     139<v t="davidmcnab.041004144338.102"><vh>usage</vh></v>
     140<v t="davidmcnab.041004144338.103"><vh>main</vh></v>
     141</v>
    141142<v t="davidmcnab.041004144338.109"><vh>MAINLINE</vh></v>
    142143</v>
    143 <v t="davidmcnab.041004144551" a="EV" tnodeList="davidmcnab.041004144551,davidmcnab.041004144551.1,davidmcnab.041004144551.2,davidmcnab.041004144551.3,davidmcnab.041004144551.4,davidmcnab.041004144551.5,davidmcnab.041004144551.6,davidmcnab.041004144551.7,davidmcnab.041004144551.8,davidmcnab.041004144551.9,davidmcnab.041004144551.10,davidmcnab.041004144551.12,davidmcnab.041004144551.13,davidmcnab.041004144551.14,davidmcnab.041004144551.15,davidmcnab.041004144551.16,davidmcnab.041004144551.17,davidmcnab.041004144551.18,davidmcnab.041004144551.19,davidmcnab.041004144551.20,davidmcnab.041004144551.21,davidmcnab.041004144551.22,davidmcnab.041004144551.23,davidmcnab.041004144551.24,davidmcnab.041004144551.26,davidmcnab.041004144551.27,davidmcnab.041004144551.28,davidmcnab.041004144551.29,davidmcnab.041004144551.30,davidmcnab.041004144551.31,davidmcnab.041004144551.32,davidmcnab.041004144551.33,davidmcnab.041004144551.35,davidmcnab.041004144551.36,davidmcnab.041004144551.37,davidmcnab.041004144551.38,davidmcnab.041004144551.39,davidmcnab.041004144551.40,davidmcnab.041004144551.41,davidmcnab.041004144551.42,davidmcnab.041004144551.43,davidmcnab.041004144551.45,davidmcnab.041004144551.46,davidmcnab.041004144551.47,davidmcnab.041004144551.48,davidmcnab.041004144551.49,davidmcnab.041004144551.50,davidmcnab.041004144551.51,davidmcnab.041004144551.52"><vh>@file python/src/i2psamclient.py</vh>
     144<v t="davidmcnab.041004144551" a="E" tnodeList="davidmcnab.041004144551,davidmcnab.041004144551.1,davidmcnab.041004144551.2,davidmcnab.041004144551.3,davidmcnab.041004144551.4,davidmcnab.041004144551.5,davidmcnab.041004144551.6,davidmcnab.041004144551.7,davidmcnab.041004144551.8,davidmcnab.041004144551.9,davidmcnab.041004144551.10,davidmcnab.041004144551.12,davidmcnab.041004144551.13,davidmcnab.041004144551.14,davidmcnab.041004144551.15,davidmcnab.041004144551.16,davidmcnab.041004144551.17,davidmcnab.041004144551.18,davidmcnab.041004144551.19,davidmcnab.041004144551.20,davidmcnab.041204020513,davidmcnab.041204204235,davidmcnab.041204044735,davidmcnab.041204050339,davidmcnab.041004144551.21,davidmcnab.041004144551.22,davidmcnab.041004144551.23,davidmcnab.041004144551.24,davidmcnab.041004144551.26,davidmcnab.041004144551.27,davidmcnab.041004144551.28,davidmcnab.041004144551.29,davidmcnab.041004144551.30,davidmcnab.041004144551.31,davidmcnab.041004144551.32,davidmcnab.041004144551.33,davidmcnab.041204042212,davidmcnab.041004144551.35,davidmcnab.041004144551.36,davidmcnab.041004144551.37,davidmcnab.041004144551.38,davidmcnab.041204042212.1,davidmcnab.041204042212.2,davidmcnab.041204044735.1,davidmcnab.041204050339.1,davidmcnab.041304235615,davidmcnab.041204050339.2,davidmcnab.041204050511,davidmcnab.041204044135,davidmcnab.041004144551.39,davidmcnab.041004144551.40,davidmcnab.041004144551.41,davidmcnab.041004144551.42,davidmcnab.041004144551.43,davidmcnab.041004144551.45,davidmcnab.041004144551.46,davidmcnab.041004144551.47,davidmcnab.041004144551.48,davidmcnab.041004144551.49,davidmcnab.041004144551.50,davidmcnab.041204203651,davidmcnab.041004144551.51,davidmcnab.041004144551.52"><vh>@file python/src/i2psamclient.py</vh>
    144145<v t="davidmcnab.041004144551.1"><vh>imports</vh></v>
    145146<v t="davidmcnab.041004144551.2"><vh>globals</vh></v>
     
    162163<v t="davidmcnab.041004144551.19"><vh>samDatagramCheck</vh></v>
    163164<v t="davidmcnab.041004144551.20"><vh>samDatagramReceive</vh></v>
     165<v t="davidmcnab.041204020513"><vh>samStreamConnect</vh></v>
     166<v t="davidmcnab.041204204235"><vh>samStreamAccept</vh></v>
     167<v t="davidmcnab.041204044735"><vh>samStreamSend</vh></v>
     168<v t="davidmcnab.041204050339"><vh>samStreamClose</vh></v>
    164169<v t="davidmcnab.041004144551.21"><vh>samNamingLookup</vh></v>
    165170<v t="davidmcnab.041004144551.22"><vh>samParse</vh></v>
     
    178183</v>
    179184<v t="davidmcnab.041004144551.34" a="E"><vh>Utility Methods</vh>
     185<v t="davidmcnab.041204042212"><vh>samAllocId</vh></v>
    180186<v t="davidmcnab.041004144551.35"><vh>_recvline</vh></v>
    181187<v t="davidmcnab.041004144551.36"><vh>_recvbytes</vh></v>
     
    183189<v t="davidmcnab.041004144551.38"><vh>_sendline</vh></v>
    184190</v>
     191</v>
     192<v t="davidmcnab.041204042212.1" a="E"><vh>class I2PSAMStream</vh>
     193<v t="davidmcnab.041204042212.2"><vh>__init__</vh></v>
     194<v t="davidmcnab.041204044735.1"><vh>send</vh></v>
     195<v t="davidmcnab.041204050339.1"><vh>recv</vh></v>
     196<v t="davidmcnab.041304235615"><vh>readline</vh></v>
     197<v t="davidmcnab.041204050339.2"><vh>close</vh></v>
     198<v t="davidmcnab.041204050511"><vh>__del__</vh></v>
     199<v t="davidmcnab.041204044135"><vh>_notifyIncomingData</vh></v>
    185200</v>
    186201<v t="davidmcnab.041004144551.39" a="E"><vh>class I2PRemoteSession</vh>
     
    197212<v t="davidmcnab.041004144551.49"><vh>demoDATAGRAM</vh></v>
    198213<v t="davidmcnab.041004144551.50"><vh>demoSTREAM</vh></v>
     214<v t="davidmcnab.041204203651"><vh>demoSTREAM_thread</vh></v>
    199215<v t="davidmcnab.041004144551.51"><vh>demo</vh></v>
    200216</v>
     
    245261import net.i2p
    246262import net.i2p.client # to shut up epydoc
    247 
    248 # shut up java with a few more imports
     263#import net.i2p.client.I2PClient
     264#import net.i2p.client.I2PClientFactory
     265#import net.i2p.client.I2PSessionListener
     266import net.i2p.client.naming
    249267import net.i2p.client.streaming
    250268import net.i2p.crypto
    251269import net.i2p.data
    252 import net.i2p.client.I2PClient
    253 import net.i2p.client.I2PClientFactory
    254 import net.i2p.client.naming
    255 #import net.i2p.client.I2PSessionListener
    256270
    257271# handy shorthand refs
     
    286300
    287301# 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful
    288 verbosity = 5
     302verbosity = 2
    289303
    290304# change to a filename to log there instead
     
    10181032
    10191033    if kw.has_key('sock') \
    1020             and kw.has_key('dest') \
    10211034            and kw.has_key('remdest') \
    10221035            and kw.has_key('instream') \
    10231036            and kw.has_key('outstream'):
     1037
    10241038        # wrapping an accept()'ed connection
     1039        log(4, "accept()'ed a connection, wrapping...")
     1040
    10251041        self.sock = kw['sock']
    1026         self.dest = kw['dest']
     1042        self.dest = dest
    10271043        self.remdest = kw['remdest']
    10281044        self.instream = kw['instream']
    10291045        self.outstream = kw['outstream']
    10301046    else:
     1047        log(4, "creating new I2PSocket %s" % dest)
     1048
    10311049        # process keywords
    10321050        self.host = kw.get('host', self.host)
     
    10351053        # we need a factory, don't we?
    10361054        self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory()
     1055
    10371056</t>
    10381057<t tx="davidmcnab.041004144338.52">def bind(self, dest=None):
     
    10481067    elif not self.dest:
    10491068        # create new dest, client should interrogate it at some time
    1050         self.dest = Destination()
     1069        log(4, "bind: socket has no dest, creating one")
     1070        self.dest = I2PDestination()
    10511071</t>
    10521072<t tx="davidmcnab.041004144338.53">def listen(self, *args, **kw):
     
    10591079    if not self.dest:
    10601080        raise I2PSocketError("socket is not bound to a destination")
     1081
     1082    log(4, "listening on socket")
    10611083   
    10621084    # create the socket manager
     
    10911113    """
    10921114    Connects to a remote destination
     1115
     1116    This has one totally major difference from the normal socket
     1117    paradigm, and that is that you can have n outbound connections
     1118    to different dests.
    10931119    """
    10941120    # sanity check
     
    10981124    # create whole new dest if none was provided to constructor
    10991125    if self.dest is None:
     1126        log(4, "connect: creating whole new dest")
    11001127        self.dest = I2PDestination()
    11011128
     
    11081135    opts = net.i2p.client.streaming.I2PSocketOptions()
    11091136    try:
    1110         self.sock = self.sockmgr.connect(remdest._item, opts)
     1137        log(4, "trying to connect to %s" % remdest.toBase64())
     1138        sock = self.sock = self.sockmgr.connect(remdest._item, opts)
    11111139        self.remdest = remdest
    11121140    except:
    11131141        logException(2, "apparent exception, continuing...")
    1114     self.instream = self.sock.getInputStream()
    1115     self.outstream = self.sock.getOutputStream()
     1142
     1143    self.instream = sock.getInputStream()
     1144    self.outstream = sock.getOutputStream()
     1145
     1146    sockobj = I2PSocket(dest=self.dest,
     1147                        remdest=remdest,
     1148                        sock=sock,
     1149                        instream=self.instream,
     1150                        outstream=self.outstream)
    11161151    self._connected = 1
     1152    return sockobj
    11171153</t>
    11181154<t tx="davidmcnab.041004144338.56">def recv(self, nbytes):
     
    11511187
    11521188    # and write it out
    1153     #print "send: writing '%s' to outstream..." % repr(buf)
     1189    log(4, "send: writing '%s' to outstream..." % repr(buf))
    11541190    outstream = self.outstream
    11551191    for c in buf:
     
    11571193
    11581194    # flush just in case
    1159     #print "send: flushing..."
     1195    log(4, "send: flushing...")
    11601196    self.outstream.flush()
    11611197
    1162     #print "send: done"
     1198    log(4, "send: done")
     1199
    11631200</t>
    11641201<t tx="davidmcnab.041004144338.58">def available(self):
     
    11661203    Returns the number of bytes available for recv()
    11671204    """
    1168     return self.sock.available()
     1205    #print "available: sock is %s" % repr(self.sock)
     1206
     1207    return self.instream.available()
     1208
    11691209
    11701210</t>
     
    11931233<t tx="davidmcnab.041004144338.60">def _createSockmgr(self):
    11941234
     1235    if getattr(self, 'sockmgr', None):
     1236        return
     1237
    11951238    #options = {jI2PClient.PROP_TCP_HOST: self.host,
    11961239    #           jI2PClient.PROP_TCP_PORT: self.port}
     
    13281371    self.samSessionIsOpen = 0
    13291372    self.samSessionStyle = ''
     1373
     1374    # localise the id allocator
     1375    self.samAllocId = self.server.samAllocId
    13301376
    13311377    # need a local sending lock
     
    17301776        else: # STREAM
    17311777            # no need to create session object, because we're using streaming api
     1778            log(4, "Creating STREAM session")
     1779           
     1780            # what kind of stream?
     1781            direction = args.get('DIRECTION', 'BOTH')
     1782            if direction not in ['BOTH', 'RECEIVE', 'CREATE']:
     1783                self.samSend("SESSION", "STATUS",
     1784                    RESULT="I2P_ERROR",
     1785                    MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"),
     1786                    )
     1787                return
     1788
     1789            if direction == 'BOTH':
     1790                self.canConnect = 1
     1791                self.canAccept = 1
     1792            elif direction == 'RECEIVE':
     1793                self.canConnect = 0
     1794                self.canAccept = 1
     1795            elif direction == 'CREATE':
     1796                self.canConnect = 1
     1797                self.canAccept = 0
    17321798
    17331799            # but we do need to mark it as being in use
     
    17371803            sock = self.samSock = I2PSocket(dest)
    17381804
    1739             # and we also need to fire up a socket listener
    1740             thread.start_new_thread(self.threadSocketListener, (sock, dest))
     1805            # and we also need to fire up a socket listener, if not CREATE-only
     1806            if self.canAccept:
     1807                thread.start_new_thread(self.threadSocketListener, (sock, dest))
    17411808
    17421809        # finally, we can reply with the good news
     
    17851852    if subtopic == 'CONNECT':
    17861853        # who are we connecting to again?
    1787         remdest = I2PDestionation(b64=args['DESTINATION'])
    1788         id = args['ID']
     1854        remdest = I2PDestination(base64=args['DESTINATION'])
     1855        id = int(args['ID'])
    17891856   
    17901857        try:
    1791             self.samSock.connect(remdest)
     1858            log(4, "Trying to connect to remote peer %s..." % args['DESTINATION'])
     1859            sock = self.samSock.connect(remdest)
     1860            log(4, "Connected to remote peer %s..." % args['DESTINATION'])
     1861            self.localstreams[id] = sock
    17921862            self.samSend("STREAM", "STATUS",
    17931863                         RESULT='OK',
    17941864                         ID=id,
    17951865                         )
     1866            thread.start_new_thread(self.threadSocketReceiver, (sock, id))
     1867
    17961868        except:
     1869            log(4, "Failed to connect to remote peer %s..." % args['DESTINATION'])
    17971870            self.samSend("STREAM", "STATUS",
    17981871                         RESULT='I2P_ERROR',
    1799                          MESSAGE='exception on connect',
     1872                         MESSAGE='exception_on_connect',
     1873                         ID=id,
    18001874                         )
     1875
     1876    elif subtopic == 'SEND':
     1877        # send to someone
     1878        id = int(args['ID'])
     1879        try:
     1880            sock = self.localstreams[id]
     1881            sock.send(args['DATA'])
     1882        except:
     1883            logException(4, "send failed")
     1884
     1885
     1886
    18011887
    18021888</t>
     
    19542040    destb64 = dest.toBase64()
    19552041
    1956     log(4, "Listening for connections to %s..." % destb64[:40])
     2042    log(4, "Listening for connections to %s..." % destb64)
     2043
     2044    sock.bind()
     2045    sock.listen()
     2046
    19572047    while 1:
     2048        log(4, "Awaiting next connection to %s..." % destb64)
    19582049        newsock = sock.accept()
    1959        
     2050        log(4, "Got connection to %s..." % destb64)
     2051
    19602052        # need an id, negative
    19612053        id = - self.server.samAllocId()
     
    19632055        # register it in local and global streams
    19642056        self.localstreams[id] = self.globalstreams[id] = newsock
     2057
     2058        # fire up the receiver thread
     2059        thread.start_new_thread(self.threadSocketReceiver, (newsock, id))
    19652060       
    19662061        # who is connected to us?
     
    19722067                     DESTINATION=remdest_b64,
    19732068                     ID=id)
     2069
    19742070</t>
    19752071<t tx="davidmcnab.041004144338.85">def samParse(self, flds):
     
    19892085            name, val = arg.split("=", 1)
    19902086        except:
    1991             logException(3, "failed to process %s" % repr(arg))
     2087            logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
    19922088            raise
    19932089        dargs[name] = val
     
    22252321    print "     samserver - runs as a SAM server"
    22262322    print "     test - run a suite of self-tests"
     2323    print "     testsocket - run only the socket test"
     2324    print "     testbidirsocket - run socket test in bidirectional mode"
    22272325    print
    22282326   
    22292327    sys.exit(0)
    2230 
    2231 
    2232 
    22332328</t>
    22342329<t tx="davidmcnab.041004144338.103">def main():
     
    22362331    argv = sys.argv
    22372332    argc = len(argv)
     2333
     2334    # -------------------------------------------------
     2335    # do the getopt command line parsing
    22382336
    22392337    try:
     
    22482346        usage("You entered an invalid option")
    22492347
    2250     cmd = 'samserver'
    2251 
    2252     # we prolly should pass all these parms in constructor call, but
    2253     # what the heck!
    2254     #global verbosity, i2psamhost, i2psamport, i2cpHost, i2cpPort
    2255    
     2348    #print "args=%s" % args
     2349
    22562350    serveropts = {}
    2257 
    22582351    for opt, val in opts:
    22592352        if opt in ['-h', '-?', '--help']:
     
    22752368            usage(0)
    22762369
     2370    # --------------------------------------------------
     2371    # now run in required mode, default is 'samserver'
     2372
    22772373    if len(args) == 0:
    22782374        cmd = 'samserver'
     
    22882384    elif cmd == 'test':
    22892385       
    2290         print "RUNNING I2P Jython TESTS"
     2386        print "RUNNING full I2PSAM Jython TEST SUITE"
    22912387        testsigs()
    22922388        testdests()
     
    22942390        testsocket()
    22952391
     2392    elif cmd == 'testsocket':
     2393       
     2394        print "RUNNING SOCKET TEST"
     2395        testsocket(0)
     2396
     2397    elif cmd == 'testbidirsocket':
     2398        print "RUNNING BIDIRECTIONAL SOCKET TEST"
     2399        testsocket(1)
     2400
    22962401    else:
     2402        # spit at unrecognised option
    22972403        usage(0)
     2404
    22982405</t>
    22992406<t tx="davidmcnab.041004144338.104"></t>
     
    24422549    print "session tests passed!"
    24432550</t>
    2444 <t tx="davidmcnab.041004144338.108">def testsocket():
     2551<t tx="davidmcnab.041004144338.108">def testsocket(bidirectional=0):
    24452552
    24462553    global d1, d2, s1, s2
     
    25022609    thread.start_new_thread(servThread, (sServer,))
    25032610
     2611    if bidirectional:
     2612        # dummy thread which accepts connections TO client socket
     2613        def threadDummy(s):
     2614            print "dummy: listening"
     2615            s.listen()
     2616            print "dummy: accepting"
     2617   
     2618            sock = s.accept()
     2619            print "dummy: got connection"
     2620   
     2621        print "test - launching dummy client accept thread"
     2622        thread.start_new_thread(threadDummy, (sClient,))
     2623
    25042624    print "client: trying to connect"
    25052625    sClient.connect(dServer)
     
    25182638    print "I2PSocket test apparently succeeded"
    25192639
     2640
    25202641</t>
    25212642<t tx="davidmcnab.041004144338.109">if __name__ == '__main__':
     
    25402661<t tx="davidmcnab.041004144551.1">import sys, os, socket, thread, threading, Queue, traceback, StringIO, time
    25412662
     2663from pdb import set_trace
     2664
    25422665</t>
    25432666<t tx="davidmcnab.041004144551.2"># -----------------------------------------
     
    25742697    """
    25752698    pass
     2699
     2700class I2PStreamClosed(Exception):
     2701    """
     2702    Stream is not open
     2703    """
    25762704</t>
    25772705<t tx="davidmcnab.041004144551.4">class I2PSamClient:
     
    26292757    self.qDatagrams = Queue.Queue()
    26302758    self.qRawMessages = Queue.Queue()
     2759
    26312760    self.namingReplies = {}
    26322761    self.namingCache = {}
     2762
     2763    self.streams = {} # currently open streams, keyed by id
     2764    self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id
     2765    self.qNewStreams = Queue.Queue() # incoming connections
     2766
     2767    self.samNextIdLock = threading.Lock()
     2768    self.samNextId = 1
     2769
    26332770    self.isRunning = 1
     2771
    26342772
    26352773    log(4, "trying connection to SAM server...")
     
    28262964   
    28272965    Keywords:
     2966        - direction - only used for STREAM sessions, can be RECEIVE,
     2967          CREATE or BOTH (default BOTH)
    28282968        - i2cphost - hostname for the SAM bridge to contact i2p router on
    28292969        - i2cpport - port for the SAM bridge to contact i2p router on
     
    28362976    kw1['STYLE'] = self.samStyle = style
    28372977    kw1['DESTINATION'] = dest
     2978    if style == 'STREAM':
     2979        direction = kw.get('direction', 'BOTH')
     2980        kw1['DIRECTION'] = direction
     2981        if direction == 'BOTH':
     2982            self.canAccept = 1
     2983            self.canConnect = 1
     2984        elif direction == 'RECEIVE':
     2985            self.canAccept = 1
     2986            self.canConnect = 0
     2987        elif direction == 'CREATE':
     2988            self.canAccept = 0
     2989            self.canConnect = 1
     2990        else:
     2991            raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH")
    28382992
    28392993    # stick in i2cp host/port if specified
     
    29753129            name, val = arg.split("=", 1)
    29763130        except:
    2977             logException(3, "failed to process %s" % repr(arg))
     3131            logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
    29783132            raise
    29793133        dargs[name] = val
     
    29883142
    29893143    return cmd, subcmd, dargs
     3144
     3145
    29903146
    29913147
     
    30843240    """
    30853241    Handles STREAM messages from server
    3086     """
     3242
     3243    STREAM STATUS
     3244    RESULT=$result
     3245    ID=$id
     3246    [MESSAGE=...]
     3247
     3248    STREAM CONNECTED
     3249    DESTINATION=$base64key
     3250    ID=$id
     3251
     3252    STREAM RECEIVED
     3253    ID=$id
     3254    SIZE=$numBytes\n[$numBytes of data]
     3255
     3256    STREAM CLOSED
     3257    RESULT=$result
     3258    ID=$id
     3259    [MESSAGE=...]
     3260    """
     3261    log(4, "got %s %s %s" % (topic, subtopic, args))
     3262
     3263    # which stream?
     3264    id = int(args['ID'])
     3265
     3266    # result of prior connection attempt
     3267    if subtopic == 'STATUS':
     3268        # stick it on the queue that the caller is waiting on and let the
     3269        # caller interpret the result
     3270        self.streamConnectReplies[id].put(args)
     3271        return
     3272
     3273    # notice of incoming connection
     3274    if subtopic == 'CONNECTED':
     3275
     3276        # grab details
     3277        dest = args['DESTINATION']
     3278
     3279        # wrap it in a stream obj
     3280        conn = I2PSAMStream(self, id, dest)
     3281        self.streams[id] = conn
     3282
     3283        # and put it there for anyone calling samStreamAccept()
     3284        self.qNewStreams.put(conn)
     3285
     3286        # done
     3287        return
     3288
     3289    # notice of received data
     3290    elif subtopic == 'RECEIVED':
     3291        # grab details
     3292        data = args['DATA']
     3293
     3294        # lookup the connection
     3295        conn = self.streams.get(id, None)
     3296        if not conn:
     3297            # conn not known, just ditch
     3298            log(2, "got data, but don't recall any conn with id %s" % id)
     3299            return
     3300
     3301        # and post the received data
     3302        conn._notifyIncomingData(data)
     3303
     3304        log(4, "wrote data to conn's inbound queue")
     3305       
     3306        # done
     3307        return
     3308
     3309    elif subtopic == 'CLOSED':
     3310        # lookup the connection
     3311        conn = self.streams.get(id, None)
     3312        if not conn:
     3313            # conn not known, just ditch
     3314            return
     3315
     3316        # mark conn as closed and forget it
     3317        conn._notifyIncomingData("") # special signal to close
     3318        conn.isOpen = 0
     3319        del self.streams[id]
     3320
     3321        # done
     3322        return
     3323
     3324
     3325
    30873326</t>
    30883327<t tx="davidmcnab.041004144551.30">def on_DATAGRAM(self, topic, subtopic, args):
     
    31883427</t>
    31893428<t tx="davidmcnab.041004144551.41">def send(self, peerdest, msg):
    3190 
     3429    """
     3430    """
    31913431    return self.client.send(self.dest, peerdest, msg)
    31923432</t>
     
    33613601    print
    33623602
    3363     print "Instantiating 2 more client connections..."
     3603    print "Instantiating client c6..."
     3604    c6 = I2PSamClient()
     3605
     3606    print "Creating dest for c6"
     3607    pub6, priv6 = c6.samDestGenerate()
     3608
     3609    print "Creating SAM STREAM SESSION on connection c6..."
     3610    res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE")
     3611    if res != 'OK':
     3612        print "Failed to create STREAM session on connection c6: %s" % repr(res)
     3613        return
     3614    print "STREAM Session on connection c6 created successfully"
     3615
     3616    print "Launching acceptor thread..."
     3617    thread.start_new_thread(demoSTREAM_thread, (c6,))
     3618
     3619    #print "sleep a while and give the server a chance..."
     3620    #time.sleep(10)
     3621
     3622    print "----------------------------------------"
     3623
     3624    print "Instantiating client c5..."
    33643625    c5 = I2PSamClient()
    3365     c6 = I2PSamClient()
    3366 
    3367     print "Creating more dests via SAM"
     3626
     3627    print "Creating dest for c5"
    33683628    pub5, priv5 = c5.samDestGenerate()
    3369     pub6, priv6 = c6.samDestGenerate()
    3370 
    3371     print "Creating SAM STREAM SESSION on connection c3..."
    3372     res = c5.samSessionCreate("STREAM", priv5)
     3629
     3630    print "Creating SAM STREAM SESSION on connection c5..."
     3631    res = c5.samSessionCreate("STREAM", priv5, direction="CREATE")
    33733632    if res != 'OK':
    33743633        print "Failed to create STREAM session on connection c5: %s" % repr(res)
     
    33763635    print "STREAM Session on connection c5 created successfully"
    33773636
    3378     print "Creating SAM STREAM SESSION on connection c6..."
    3379     res = c6.samSessionCreate("STREAM", priv6)
    3380     if res != 'OK':
    3381         print "Failed to create STREAM session on connection c4: %s" % repr(res)
     3637    print "----------------------------------------"
     3638
     3639    print "Making connection from c5 to c6..."
     3640
     3641    #set_trace()
     3642
     3643    try:
     3644        conn_c5 = c5.samStreamConnect(pub6)
     3645    except:
     3646        print "Stream Connection failed"
    33823647        return
    3383     print "STREAM Session on connection c4 created successfully"
    3384 
    3385     msg = "Hi there, this is a datagram!"
    3386     print "sending from c5 to c6: %s" % repr(msg)
    3387     c5.samStreamSend(pub6, msg)
    3388 
    3389     print "now try to receive from c6 (will block)..."
    3390     msg1 = c6.samStreamReceive()
    3391     print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest))
     3648    print "Stream connect succeeded"
     3649   
     3650    print "Receiving from c5..."
     3651    buf = conn_c5.readline()
     3652    print "Got %s" % repr(buf)
     3653
     3654    #print "Try to accept connection on c6..."
     3655    #conn_c6 = c6.sam
    33923656
    33933657    print
     
    33973661    print
    33983662
     3663
     3664
     3665
     3666
    33993667</t>
    34003668<t tx="davidmcnab.041004144551.51">def demo():
     
    34113679    print
    34123680
    3413     demoNAMING()
    3414     demoRAW()
    3415     demoDATAGRAM()
    3416     #demoSTREAM()
     3681    #demoNAMING()
     3682    #demoRAW()
     3683    #demoDATAGRAM()
     3684    demoSTREAM()
    34173685
    34183686    print
     
    34273695    demo()
    34283696</t>
     3697<t tx="davidmcnab.041204020513">def samStreamConnect(self, dest):
     3698    """
     3699    Makes a STREAM connection to a remote dest
     3700
     3701    STREAM STATUS
     3702    RESULT=$result
     3703    ID=$id
     3704    [MESSAGE=...]
     3705    """
     3706    # need an ID
     3707    id = self.samAllocId()
     3708   
     3709    # create queue for connect reply
     3710    q = self.streamConnectReplies[id] = Queue.Queue()
     3711
     3712    # send req
     3713    self.samSend("STREAM", "CONNECT",
     3714                 ID=id,
     3715                 DESTINATION=dest,
     3716                 )
     3717
     3718    # await reply - comes back as a dict
     3719    resp = q.get()
     3720
     3721    # ditch queue
     3722    del self.streamConnectReplies[id]
     3723    del q
     3724   
     3725    # check out response
     3726    result = resp['RESULT']
     3727    if result == 'OK':
     3728        conn = I2PSAMStream(self, id, dest)
     3729        self.streams[id] = conn
     3730        return conn
     3731    else:
     3732        msg = resp.get('MESSAGE', '')
     3733        raise I2PCommandFail(result, msg, "STREAM CONNECT")
     3734
     3735</t>
     3736<t tx="davidmcnab.041204042212">def samAllocId(self):
     3737    """
     3738    Allocates a new unique id as required by SAM protocol
     3739    """
     3740    self.samNextIdLock.acquire()
     3741    id = self.samNextId
     3742    self.samNextId += 1
     3743    self.samNextIdLock.release()
     3744    return id
     3745</t>
     3746<t tx="davidmcnab.041204042212.1">class I2PSAMStream:
     3747    """
     3748    Wrapper for a stream object
     3749    """
     3750    @others
     3751</t>
     3752<t tx="davidmcnab.041204042212.2">def __init__(self, client, id, dest):
     3753    """
     3754    """
     3755    self.client = client
     3756    self.id = id
     3757    self.dest = dest
     3758
     3759    self.qIncomingData = Queue.Queue()
     3760
     3761    self.inbuf = ''
     3762    self.isOpen = 1
     3763</t>
     3764<t tx="davidmcnab.041204044135">def _notifyIncomingData(self, data):
     3765    """
     3766    Called by client receiver to notify incoming data
     3767    """
     3768    log(4, "got %s" % repr(data))
     3769    self.qIncomingData.put(data)
     3770</t>
     3771<t tx="davidmcnab.041204044735">def samStreamSend(self, conn, data):
     3772    """
     3773    DO NOT CALL THIS DIRECTLY
     3774   
     3775    Invoked by an I2PSAMStream object to transfer data
     3776    Use the object's .send() method instead.
     3777   
     3778    conn is the I2PSAMStream
     3779
     3780    STREAM SEND
     3781    ID=$id
     3782    SIZE=$numBytes\n[$numBytes of data]
     3783    """
     3784    # dispatch
     3785    self.samSend("STREAM", "SEND", data, ID=conn.id)
     3786
     3787    # useless, but mimics socket paradigm
     3788    return len(data)
     3789
     3790</t>
     3791<t tx="davidmcnab.041204044735.1">def send(self, data):
     3792    """
     3793    Sends data to a stream connection
     3794    """
     3795    # barf if stream not open
     3796    if not self.isOpen:
     3797        raise I2PStreamClosed
     3798
     3799    # can send
     3800    return self.client.samStreamSend(self, data)
     3801</t>
     3802<t tx="davidmcnab.041204050339">def samStreamClose(self, conn):
     3803    """
     3804    DO NOT CALL DIRECTLY
     3805   
     3806    Invoked by I2PSAMStream to close stream
     3807    Use the object's .send() method instead.
     3808   
     3809    STREAM CLOSE
     3810    ID=$id
     3811    """
     3812    self.samSend("STREAM", "CLOSE", ID=conn.id)
     3813    del self.streams[conn.id]
     3814
     3815</t>
     3816<t tx="davidmcnab.041204050339.1">def recv(self, size):
     3817    """
     3818    Retrieves n bytes from peer
     3819    """
     3820    chunks = []
     3821
     3822    while self.isOpen and size &gt; 0:
     3823        # try internal buffer first
     3824        if self.inbuf:
     3825            chunk = self.inbuf[:size]
     3826            chunklen = len(chunk)
     3827            self.inbuf = self.inbuf[chunklen:]
     3828            chunks.append(chunk)
     3829            size -= chunklen
     3830        else:
     3831            # replenish input buffer
     3832            log(4, "I2PSAMStream.recv: replenishing input buffer")
     3833            buf = self.qIncomingData.get()
     3834            if buf == '':
     3835                # connection closed by peer
     3836                self.isOpen = 0
     3837                break
     3838            else:
     3839                # got more data
     3840                log(4, "I2PSAMStream: queue returned %s" % repr(buf))
     3841                self.inbuf += buf
     3842
     3843    # return whatever we've got, hopefully all
     3844    return "".join(chunks)
     3845
     3846
     3847</t>
     3848<t tx="davidmcnab.041204050339.2">def close(self):
     3849    """
     3850    close this stream connection
     3851    """
     3852    log(4, "closing stream")
     3853    self.client.samStreamClose(self)
     3854    log(4, "stream closed")
     3855    self.isOpen = 0
     3856
     3857    # and just to make sure...
     3858    self.qIncomingData.put("") # busts out of recv() loops
     3859
     3860</t>
     3861<t tx="davidmcnab.041204050511">def __del__(self):
     3862    """
     3863    Dropping last ref to this object closes stream
     3864    """
     3865    self.close()
     3866</t>
     3867<t tx="davidmcnab.041204203651">def demoSTREAM_thread(sess):
     3868   
     3869    while 1:
     3870        sock = sess.samStreamAccept()
     3871        log(4, "got incoming connection")
     3872
     3873        print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING"
     3874       
     3875        time.sleep(10)
     3876
     3877        sock.send("Hi there, what do you want?\n")
     3878
     3879        print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING"
     3880        time.sleep(300)
     3881        print "**ACCEPTOR CLOSING STREAM"
     3882
     3883        sock.close()
     3884
     3885</t>
     3886<t tx="davidmcnab.041204204235">def samStreamAccept(self):
     3887    """
     3888    Waits for an incoming connection, returning a wrapped conn obj
     3889    """
     3890    log(4, "waiting for connection")
     3891    conn = self.qNewStreams.get()
     3892    log(4, "got connection")
     3893    return conn
     3894</t>
     3895<t tx="davidmcnab.041304205426">def threadSocketReceiver(self, sock, id):
     3896    """
     3897    One of these gets launched each time a new stream connection
     3898    is created. Due to the lack of callback mechanism within the
     3899    ministreaming API, we have to actively poll for and send back
     3900    received data
     3901    """
     3902    while 1:
     3903        #avail = sock.available()
     3904        #if avail &lt;= 0:
     3905        #    print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail)
     3906        #    time.sleep(5)
     3907        #    continue
     3908        #log(4, "reading a byte")
     3909
     3910        try:
     3911            buf = sock.recv(1)
     3912        except:
     3913            logException(4, "Exception reading first byte")
     3914       
     3915        if buf == '':
     3916            log(4, "stream closed")
     3917
     3918            # notify a close
     3919            self.samSend("STREAM", "CLOSED",
     3920                         ID=id)
     3921            return
     3922
     3923        # grab more if there's any available
     3924        navail = sock.available()
     3925        if navail &gt; 0:
     3926            #log(4, "%d more bytes available, reading..." % navail)
     3927            rest = sock.recv(navail)
     3928            buf += rest
     3929       
     3930        # send if off
     3931        log(4, "got from peer: %s" % repr(buf))
     3932       
     3933        self.samSend("STREAM", "RECEIVED", buf,
     3934                     ID=id,
     3935                     )
     3936
     3937
     3938
     3939
     3940</t>
     3941<t tx="davidmcnab.041304235615">def readline(self):
     3942    """
     3943    Read a line of text from stream, return the line without trailing newline
     3944   
     3945    This method really shouldn't exist in a class that's trying to look a bit
     3946    like a socket object, but what the hell!
     3947    """
     3948    chars = []
     3949    while 1:
     3950        char = self.recv(1)
     3951        if char in ['', '\n']:
     3952            break
     3953        chars.append(char)
     3954    return "".join(chars)
     3955</t>
    34293956</tnodes>
    34303957</leo_file>
  • apps/sam/jython/build.xml

    r4d0b3b2 r4cdd42f  
    1212
    1313    <target name="jar">
    14         <mkdir dir="./build" />
    15         <exec executable="jythonc">
     14
     15        <condition property="jythonext" value=".bat">
     16            <os family="windows" />
     17        </condition>
     18        <condition property="jythonext" value="">
     19            <not>
     20                <os family="windows" />
     21            </not>
     22        </condition>
     23
     24        <exec executable="jythonc${jythonext}" dir=".">
    1625            <env key="CLASSPATH" path="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar"/>
    17             <arg value="--jar"/>
    18             <arg value="./build/i2psam.jar"/>
    19             <arg value="./src/i2psam.py"/>
     26            <arg value="--jar"/><arg path="./i2psam.jar"/>
     27            <arg path="./src/i2psam.py"/>
    2028        </exec>
    2129    </target>
    2230
    2331    <target name="clean">
    24         <delete dir="./build" />
     32        <delete file="i2psam.jar" />
    2533        <delete dir="./jpywork" />
    2634    </target>
  • apps/sam/jython/src/i2psam.py

    r4d0b3b2 r4cdd42f  
    8080
    8181# 1=v.quiet, 2=normal, 3=verbose, 4=debug, 5=painful
    82 verbosity = 5
     82verbosity = 2
    8383
    8484# change to a filename to log there instead
     
    852852   
    853853        if kw.has_key('sock') \
    854                 and kw.has_key('dest') \
    855854                and kw.has_key('remdest') \
    856855                and kw.has_key('instream') \
    857856                and kw.has_key('outstream'):
     857   
    858858            # wrapping an accept()'ed connection
     859            log(4, "accept()'ed a connection, wrapping...")
     860   
    859861            self.sock = kw['sock']
    860             self.dest = kw['dest']
     862            self.dest = dest
    861863            self.remdest = kw['remdest']
    862864            self.instream = kw['instream']
    863865            self.outstream = kw['outstream']
    864866        else:
     867            log(4, "creating new I2PSocket %s" % dest)
     868   
    865869            # process keywords
    866870            self.host = kw.get('host', self.host)
     
    869873            # we need a factory, don't we?
    870874            self.sockmgrFact = i2p.client.streaming.I2PSocketManagerFactory()
     875   
    871876    #@-node:__init__
    872877    #@+node:bind
     
    883888        elif not self.dest:
    884889            # create new dest, client should interrogate it at some time
    885             self.dest = Destination()
     890            log(4, "bind: socket has no dest, creating one")
     891            self.dest = I2PDestination()
    886892    #@-node:bind
    887893    #@+node:listen
     
    895901        if not self.dest:
    896902            raise I2PSocketError("socket is not bound to a destination")
     903   
     904        log(4, "listening on socket")
    897905       
    898906        # create the socket manager
     
    937945        """
    938946        # sanity check
    939         #if self.sockmgr:
    940         #    raise I2PSocketError(".sockmgr already present - have you already called listen/connect?")
     947        if self.sockmgr:
     948            raise I2PSocketError(".sockmgr already present - have you already called listen/connect?")
    941949   
    942950        # create whole new dest if none was provided to constructor
    943951        if self.dest is None:
     952            log(4, "connect: creating whole new dest")
    944953            self.dest = I2PDestination()
    945954   
     
    952961        opts = net.i2p.client.streaming.I2PSocketOptions()
    953962        try:
     963            log(4, "trying to connect to %s" % remdest.toBase64())
    954964            sock = self.sock = self.sockmgr.connect(remdest._item, opts)
    955965            self.remdest = remdest
     
    963973                            remdest=remdest,
    964974                            sock=sock,
    965                             instream=instream,
    966                             outstream=outstream)
     975                            instream=self.instream,
     976                            outstream=self.outstream)
    967977        self._connected = 1
    968978        return sockobj
     
    10051015   
    10061016        # and write it out
    1007         #print "send: writing '%s' to outstream..." % repr(buf)
     1017        log(4, "send: writing '%s' to outstream..." % repr(buf))
    10081018        outstream = self.outstream
    10091019        for c in buf:
     
    10111021   
    10121022        # flush just in case
    1013         #print "send: flushing..."
     1023        log(4, "send: flushing...")
    10141024        self.outstream.flush()
    10151025   
    1016         #print "send: done"
     1026        log(4, "send: done")
     1027   
    10171028    #@-node:send
    10181029    #@+node:available
     
    10211032        Returns the number of bytes available for recv()
    10221033        """
    1023         return self.sock.available()
     1034        #print "available: sock is %s" % repr(self.sock)
     1035   
     1036        return self.instream.available()
     1037   
    10241038   
    10251039    #@-node:available
     
    10491063    #@+node:_createSockmgr
    10501064    def _createSockmgr(self):
     1065   
     1066        if getattr(self, 'sockmgr', None):
     1067            return
    10511068   
    10521069        #options = {jI2PClient.PROP_TCP_HOST: self.host,
     
    16071624            else: # STREAM
    16081625                # no need to create session object, because we're using streaming api
     1626                log(4, "Creating STREAM session")
     1627               
     1628                # what kind of stream?
     1629                direction = args.get('DIRECTION', 'BOTH')
     1630                if direction not in ['BOTH', 'RECEIVE', 'CREATE']:
     1631                    self.samSend("SESSION", "STATUS",
     1632                        RESULT="I2P_ERROR",
     1633                        MESSAGE="Illegal_direction_keyword_%s" % direction.replace(" ","_"),
     1634                        )
     1635                    return
     1636   
     1637                if direction == 'BOTH':
     1638                    self.canConnect = 1
     1639                    self.canAccept = 1
     1640                elif direction == 'RECEIVE':
     1641                    self.canConnect = 0
     1642                    self.canAccept = 1
     1643                elif direction == 'CREATE':
     1644                    self.canConnect = 1
     1645                    self.canAccept = 0
    16091646   
    16101647                # but we do need to mark it as being in use
     
    16141651                sock = self.samSock = I2PSocket(dest)
    16151652   
    1616                 # and we also need to fire up a socket listener
    1617                 thread.start_new_thread(self.threadSocketListener, (sock, dest))
     1653                # and we also need to fire up a socket listener, if not CREATE-only
     1654                if self.canAccept:
     1655                    thread.start_new_thread(self.threadSocketListener, (sock, dest))
    16181656   
    16191657            # finally, we can reply with the good news
     
    16641702        if subtopic == 'CONNECT':
    16651703            # who are we connecting to again?
    1666             remdest = I2PDestionation(b64=args['DESTINATION'])
    1667             id = args['ID']
     1704            remdest = I2PDestination(base64=args['DESTINATION'])
     1705            id = int(args['ID'])
    16681706       
    16691707            try:
     1708                log(4, "Trying to connect to remote peer %s..." % args['DESTINATION'])
    16701709                sock = self.samSock.connect(remdest)
     1710                log(4, "Connected to remote peer %s..." % args['DESTINATION'])
    16711711                self.localstreams[id] = sock
    16721712                self.samSend("STREAM", "STATUS",
     
    16741714                             ID=id,
    16751715                             )
     1716                thread.start_new_thread(self.threadSocketReceiver, (sock, id))
     1717   
    16761718            except:
     1719                log(4, "Failed to connect to remote peer %s..." % args['DESTINATION'])
    16771720                self.samSend("STREAM", "STATUS",
    16781721                             RESULT='I2P_ERROR',
    16791722                             MESSAGE='exception_on_connect',
     1723                             ID=id,
    16801724                             )
     1725   
     1726        elif subtopic == 'SEND':
     1727            # send to someone
     1728            id = int(args['ID'])
     1729            try:
     1730                sock = self.localstreams[id]
     1731                sock.send(args['DATA'])
     1732            except:
     1733                logException(4, "send failed")
     1734   
     1735   
    16811736   
    16821737   
     
    18411896        destb64 = dest.toBase64()
    18421897   
    1843         log(4, "Listening for connections to %s..." % destb64[:40])
    1844    
     1898        log(4, "Listening for connections to %s..." % destb64)
     1899   
     1900        sock.bind()
    18451901        sock.listen()
    18461902   
    18471903        while 1:
     1904            log(4, "Awaiting next connection to %s..." % destb64)
    18481905            newsock = sock.accept()
    1849            
     1906            log(4, "Got connection to %s..." % destb64)
     1907   
    18501908            # need an id, negative
    18511909            id = - self.server.samAllocId()
     
    18531911            # register it in local and global streams
    18541912            self.localstreams[id] = self.globalstreams[id] = newsock
     1913   
     1914            # fire up the receiver thread
     1915            thread.start_new_thread(self.threadSocketReceiver, (newsock, id))
    18551916           
    18561917            # who is connected to us?
     
    18621923                         DESTINATION=remdest_b64,
    18631924                         ID=id)
     1925   
    18641926    #@-node:threadSocketListener
     1927    #@+node:threadSocketReceiver
     1928    def threadSocketReceiver(self, sock, id):
     1929        """
     1930        One of these gets launched each time a new stream connection
     1931        is created. Due to the lack of callback mechanism within the
     1932        ministreaming API, we have to actively poll for and send back
     1933        received data
     1934        """
     1935        while 1:
     1936            #avail = sock.available()
     1937            #if avail <= 0:
     1938            #    print "threadSocketReceiver: waiting for data on %s (%s avail)..." % (id, avail)
     1939            #    time.sleep(5)
     1940            #    continue
     1941            #log(4, "reading a byte")
     1942   
     1943            try:
     1944                buf = sock.recv(1)
     1945            except:
     1946                logException(4, "Exception reading first byte")
     1947           
     1948            if buf == '':
     1949                log(4, "stream closed")
     1950   
     1951                # notify a close
     1952                self.samSend("STREAM", "CLOSED",
     1953                             ID=id)
     1954                return
     1955   
     1956            # grab more if there's any available
     1957            navail = sock.available()
     1958            if navail > 0:
     1959                #log(4, "%d more bytes available, reading..." % navail)
     1960                rest = sock.recv(navail)
     1961                buf += rest
     1962           
     1963            # send if off
     1964            log(4, "got from peer: %s" % repr(buf))
     1965           
     1966            self.samSend("STREAM", "RECEIVED", buf,
     1967                         ID=id,
     1968                         )
     1969   
     1970   
     1971   
     1972   
     1973    #@-node:threadSocketReceiver
    18651974    #@+node:samParse
    18661975    def samParse(self, flds):
     
    18801989                name, val = arg.split("=", 1)
    18811990            except:
    1882                 logException(3, "failed to process %s" % repr(arg))
     1991                logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
    18831992                raise
    18841993            dargs[name] = val
     
    21092218    log(level, "%s\n%s" % (s.getvalue(), msg), 1)
    21102219#@-node:logException
     2220#@+node:testdests
     2221def testdests():
     2222    """
     2223    Demo function which tests out dest generation and import/export
     2224    """
     2225    print
     2226    print "********************************************"
     2227    print "Testing I2P destination create/export/import"
     2228    print "********************************************"
     2229    print
     2230
     2231    print "Generating a destination"
     2232    d1 = I2PDestination()
     2233
     2234    print "Exporting and importing dest1 in several forms"
     2235
     2236    print "public binary string..."
     2237    d1_bin = d1.toBin()
     2238    d2_bin = I2PDestination(bin=d1_bin)
     2239
     2240    print "public binary file..."
     2241    d1.toBinFile("temp-d1-bin")
     2242    d2_binfile = I2PDestination(binfile="temp-d1-bin")
     2243
     2244    print "private binary string..."
     2245    d1_binprivate = d1.toBinPrivate()
     2246    d2_binprivate = I2PDestination(binprivate=d1_binprivate)
     2247
     2248    print "private binary file..."
     2249    d1.toBinFilePrivate("temp-d1-bin-private")
     2250    d2_binfileprivate = I2PDestination(binfileprivate="temp-d1-bin-private")
     2251
     2252    print "public base64 string..."
     2253    d1_b64 = d1.toBase64()
     2254    d2_b64 = I2PDestination(base64=d1_b64)
     2255
     2256    print "public base64 file..."
     2257    d1.toBase64File("temp-d1-b64")
     2258    d2_b64file = I2PDestination(base64file="temp-d1-b64")
     2259
     2260    print "private base64 string..."
     2261    d1_base64private = d1.toBase64Private()
     2262    d2_b64private = I2PDestination(base64private=d1_base64private)
     2263
     2264    print "private base64 file..."
     2265    d1.toBase64FilePrivate("temp-d1-b64-private")
     2266    d2_b64fileprivate = I2PDestination(base64fileprivate="temp-d1-b64-private")
     2267
     2268    print "All destination creation/import/export tests passed!"
     2269
     2270
     2271#@-node:testdests
     2272#@+node:testsigs
     2273def testsigs():
     2274    global d1, d1pub, d1sig, d1res
     2275   
     2276    print
     2277    print "********************************************"
     2278    print "Testing I2P dest-based signatures"
     2279    print "********************************************"
     2280    print
     2281   
     2282    print "Creating dest..."
     2283    d1 = I2PDestination()
     2284
     2285    s_good = "original stuff that we're signing"
     2286    s_bad = "non-original stuff we're trying to forge"
     2287   
     2288    print "Signing some shit against d1..."
     2289    d1sig = d1.sign(s_good)
     2290
     2291    print "Creating public dest d1pub"
     2292    d1pub = I2PDestination(bin=d1.toBin())
     2293
     2294    print "Verifying original data with d1pub"
     2295    res = d1pub.verify(s_good, d1sig)
     2296    print "Result: %s (should be 1)" % repr(res)
     2297   
     2298    print "Trying to verify on a different string"
     2299    res1 = d1pub.verify(s_bad, d1sig)
     2300    print "Result: %s (should be 0)" % repr(res1)
     2301   
     2302    if res and not res1:
     2303        print "signing/verifying test passed"
     2304    else:
     2305        print "SIGNING/VERIFYING TEST FAILED"
     2306
     2307#@-node:testsigs
     2308#@+node:testsession
     2309def testsession():
     2310
     2311    global c, d1, d2, s1, s2
     2312
     2313    print
     2314    print "********************************************"
     2315    print "Testing I2P dest->dest messaging"
     2316    print "********************************************"
     2317    print
     2318   
     2319    print "Creating I2P client..."
     2320    c = I2PClient()
     2321
     2322    print "Creating destination d1..."
     2323    d1 = c.createDestination()
     2324
     2325    print "Creating destination d2..."
     2326    d2 = c.createDestination()
     2327
     2328    print "Creating destination d3..."
     2329    d3 = c.createDestination()
     2330
     2331    print "Creating session s1 on dest d1..."
     2332    s1 = c.createSession(d1, host='localhost', port=7654)
     2333
     2334    print "Creating session s2 on dest d2..."
     2335    s2 = c.createSession(d2)
     2336
     2337    print "Connecting session s1..."
     2338    s1.connect()
     2339
     2340    print "Connecting session s2..."
     2341    s2.connect()
     2342
     2343    print "Sending message from s1 to d2..."
     2344    s1.sendMessage(d2, "Hi there, s2!!")
     2345
     2346    print "Retrieving message from s2..."
     2347    print "got: %s" % repr(s2.getMessage())
     2348
     2349    print "Sending second message from s1 to d2..."
     2350    s1.sendMessage(d2, "Hi there again, s2!!")
     2351
     2352    print "Retrieving message from s2..."
     2353    print "got: %s" % repr(s2.getMessage())
     2354
     2355    print "Sending message from s1 to d3 (should take ages then fail)..."
     2356    res = s1.sendMessage(d3, "This is futile!!")
     2357    print "result of that send was %s (should have been 0)" % res
     2358
     2359    print "Destroying session s1..."
     2360    s1.destroySession()
     2361
     2362    print "Destroying session s2..."
     2363    s2.destroySession()
     2364
     2365    print "session tests passed!"
     2366#@-node:testsession
     2367#@+node:testsocket
     2368def testsocket(bidirectional=0):
     2369
     2370    global d1, d2, s1, s2
     2371
     2372    print
     2373    print "********************************************"
     2374    print "Testing I2P streaming interface"
     2375    print "********************************************"
     2376    print
     2377   
     2378    print "Creating destinations..."
     2379    dServer = I2PDestination()
     2380    dClient = I2PDestination()
     2381
     2382    print "Creating sockets..."
     2383    sServer = I2PSocket(dServer)
     2384    sClient = I2PSocket(dClient)
     2385
     2386    # server thread which simply reads a line at a time, then echoes
     2387    # that line back to the client
     2388    def servThread(s):
     2389        print "server: binding socket"
     2390        s.bind()
     2391        print "server: setting socket to listen"
     2392        s.listen()
     2393        print "server: awaiting connection"
     2394        sock = s.accept()
     2395        print "server: got connection"
     2396
     2397        sock.send("Hello, echoing...\n")
     2398        buf = ''
     2399        while 1:
     2400            c = sock.recv(1)
     2401            if c == '':
     2402                sock.close()
     2403                print "server: socket closed"
     2404                break
     2405
     2406            buf += c
     2407            if c == '\n':
     2408                sock.send("SERVER: "+buf)
     2409                buf = ''
     2410
     2411    # client thread which reads lines and prints them to stdout
     2412    def clientThread(s):
     2413        buf = ''
     2414        while 1:
     2415            c = s.recv(1)
     2416            if c == '':
     2417                s.close()
     2418                print "client: socket closed"
     2419                break
     2420            buf += c
     2421            if c == '\n':
     2422                print "client: got %s" % repr(buf)
     2423                buf = ''
     2424
     2425    print "launching server thread..."
     2426    thread.start_new_thread(servThread, (sServer,))
     2427
     2428    if bidirectional:
     2429        # dummy thread which accepts connections TO client socket
     2430        def threadDummy(s):
     2431            print "dummy: listening"
     2432            s.listen()
     2433            print "dummy: accepting"
     2434   
     2435            sock = s.accept()
     2436            print "dummy: got connection"
     2437   
     2438        print "test - launching dummy client accept thread"
     2439        thread.start_new_thread(threadDummy, (sClient,))
     2440
     2441    print "client: trying to connect"
     2442    sClient.connect(dServer)
     2443
     2444    print "client: connected, launching rx thread"
     2445    thread.start_new_thread(clientThread, (sClient,))
     2446
     2447    while 1:
     2448        line = raw_input("Enter something (q to quit)> ")
     2449        if line == 'q':
     2450            print "closing client socket"
     2451            sClient.close()
     2452            break
     2453        sClient.send(line+"\n")
     2454
     2455    print "I2PSocket test apparently succeeded"
     2456
     2457
     2458#@-node:testsocket
    21112459#@+node:usage
    21122460def usage(detailed=0):
     
    21362484    print "     samserver - runs as a SAM server"
    21372485    print "     test - run a suite of self-tests"
     2486    print "     testsocket - run only the socket test"
     2487    print "     testbidirsocket - run socket test in bidirectional mode"
    21382488    print
    21392489   
    21402490    sys.exit(0)
    2141 
    2142 
    2143 
    21442491#@-node:usage
    21452492#@+node:main
     
    21482495    argv = sys.argv
    21492496    argc = len(argv)
     2497
     2498    # -------------------------------------------------
     2499    # do the getopt command line parsing
    21502500
    21512501    try:
     
    21602510        usage("You entered an invalid option")
    21612511
    2162     cmd = 'samserver'
    2163 
    2164     # we prolly should pass all these parms in constructor call, but
    2165     # what the heck!
    2166     #global verbosity, i2psamhost, i2psamport, i2cpHost, i2cpPort
    2167    
     2512    #print "args=%s" % args
     2513
    21682514    serveropts = {}
    2169 
    21702515    for opt, val in opts:
    21712516        if opt in ['-h', '-?', '--help']:
     
    21872532            usage(0)
    21882533
     2534    # --------------------------------------------------
     2535    # now run in required mode, default is 'samserver'
     2536
    21892537    if len(args) == 0:
    21902538        cmd = 'samserver'
     
    22002548    elif cmd == 'test':
    22012549       
    2202         print "RUNNING I2P Jython TESTS"
     2550        print "RUNNING full I2PSAM Jython TEST SUITE"
    22032551        testsigs()
    22042552        testdests()
     
    22062554        testsocket()
    22072555
     2556    elif cmd == 'testsocket':
     2557       
     2558        print "RUNNING SOCKET TEST"
     2559        testsocket(0)
     2560
     2561    elif cmd == 'testbidirsocket':
     2562        print "RUNNING BIDIRECTIONAL SOCKET TEST"
     2563        testsocket(1)
     2564
    22082565    else:
     2566        # spit at unrecognised option
    22092567        usage(0)
     2568
    22102569#@-node:main
    2211 #@+node:testdests
    2212 def testdests():
    2213     """
    2214     Demo function which tests out dest generation and import/export
    2215     """
    2216     print
    2217     print "********************************************"
    2218     print "Testing I2P destination create/export/import"
    2219     print "********************************************"
    2220     print
    2221 
    2222     print "Generating a destination"
    2223     d1 = I2PDestination()
    2224 
    2225     print "Exporting and importing dest1 in several forms"
    2226 
    2227     print "public binary string..."
    2228     d1_bin = d1.toBin()
    2229     d2_bin = I2PDestination(bin=d1_bin)
    2230 
    2231     print "public binary file..."
    2232     d1.toBinFile("temp-d1-bin")
    2233     d2_binfile = I2PDestination(binfile="temp-d1-bin")
    2234 
    2235     print "private binary string..."
    2236     d1_binprivate = d1.toBinPrivate()
    2237     d2_binprivate = I2PDestination(binprivate=d1_binprivate)
    2238 
    2239     print "private binary file..."
    2240     d1.toBinFilePrivate("temp-d1-bin-private")
    2241     d2_binfileprivate = I2PDestination(binfileprivate="temp-d1-bin-private")
    2242 
    2243     print "public base64 string..."
    2244     d1_b64 = d1.toBase64()
    2245     d2_b64 = I2PDestination(base64=d1_b64)
    2246 
    2247     print "public base64 file..."
    2248     d1.toBase64File("temp-d1-b64")
    2249     d2_b64file = I2PDestination(base64file="temp-d1-b64")
    2250 
    2251     print "private base64 string..."
    2252     d1_base64private = d1.toBase64Private()
    2253     d2_b64private = I2PDestination(base64private=d1_base64private)
    2254 
    2255     print "private base64 file..."
    2256     d1.toBase64FilePrivate("temp-d1-b64-private")
    2257     d2_b64fileprivate = I2PDestination(base64fileprivate="temp-d1-b64-private")
    2258 
    2259     print "All destination creation/import/export tests passed!"
    2260 
    2261 
    2262 #@-node:testdests
    2263 #@+node:testsigs
    2264 def testsigs():
    2265     global d1, d1pub, d1sig, d1res
    2266    
    2267     print
    2268     print "********************************************"
    2269     print "Testing I2P dest-based signatures"
    2270     print "********************************************"
    2271     print
    2272    
    2273     print "Creating dest..."
    2274     d1 = I2PDestination()
    2275 
    2276     s_good = "original stuff that we're signing"
    2277     s_bad = "non-original stuff we're trying to forge"
    2278    
    2279     print "Signing some shit against d1..."
    2280     d1sig = d1.sign(s_good)
    2281 
    2282     print "Creating public dest d1pub"
    2283     d1pub = I2PDestination(bin=d1.toBin())
    2284 
    2285     print "Verifying original data with d1pub"
    2286     res = d1pub.verify(s_good, d1sig)
    2287     print "Result: %s (should be 1)" % repr(res)
    2288    
    2289     print "Trying to verify on a different string"
    2290     res1 = d1pub.verify(s_bad, d1sig)
    2291     print "Result: %s (should be 0)" % repr(res1)
    2292    
    2293     if res and not res1:
    2294         print "signing/verifying test passed"
    2295     else:
    2296         print "SIGNING/VERIFYING TEST FAILED"
    2297 
    2298 #@-node:testsigs
    2299 #@+node:testsession
    2300 def testsession():
    2301 
    2302     global c, d1, d2, s1, s2
    2303 
    2304     print
    2305     print "********************************************"
    2306     print "Testing I2P dest->dest messaging"
    2307     print "********************************************"
    2308     print
    2309    
    2310     print "Creating I2P client..."
    2311     c = I2PClient()
    2312 
    2313     print "Creating destination d1..."
    2314     d1 = c.createDestination()
    2315 
    2316     print "Creating destination d2..."
    2317     d2 = c.createDestination()
    2318 
    2319     print "Creating destination d3..."
    2320     d3 = c.createDestination()
    2321 
    2322     print "Creating session s1 on dest d1..."
    2323     s1 = c.createSession(d1, host='localhost', port=7654)
    2324 
    2325     print "Creating session s2 on dest d2..."
    2326     s2 = c.createSession(d2)
    2327 
    2328     print "Connecting session s1..."
    2329     s1.connect()
    2330 
    2331     print "Connecting session s2..."
    2332     s2.connect()
    2333 
    2334     print "Sending message from s1 to d2..."
    2335     s1.sendMessage(d2, "Hi there, s2!!")
    2336 
    2337     print "Retrieving message from s2..."
    2338     print "got: %s" % repr(s2.getMessage())
    2339 
    2340     print "Sending second message from s1 to d2..."
    2341     s1.sendMessage(d2, "Hi there again, s2!!")
    2342 
    2343     print "Retrieving message from s2..."
    2344     print "got: %s" % repr(s2.getMessage())
    2345 
    2346     print "Sending message from s1 to d3 (should take ages then fail)..."
    2347     res = s1.sendMessage(d3, "This is futile!!")
    2348     print "result of that send was %s (should have been 0)" % res
    2349 
    2350     print "Destroying session s1..."
    2351     s1.destroySession()
    2352 
    2353     print "Destroying session s2..."
    2354     s2.destroySession()
    2355 
    2356     print "session tests passed!"
    2357 #@-node:testsession
    2358 #@+node:testsocket
    2359 def testsocket():
    2360 
    2361     global d1, d2, s1, s2
    2362 
    2363     print
    2364     print "********************************************"
    2365     print "Testing I2P streaming interface"
    2366     print "********************************************"
    2367     print
    2368    
    2369     print "Creating destinations..."
    2370     dServer = I2PDestination()
    2371     dClient = I2PDestination()
    2372 
    2373     print "Creating sockets..."
    2374     sServer = I2PSocket(dServer)
    2375     sClient = I2PSocket(dClient)
    2376 
    2377     # server thread which simply reads a line at a time, then echoes
    2378     # that line back to the client
    2379     def servThread(s):
    2380         print "server: binding socket"
    2381         s.bind()
    2382         print "server: setting socket to listen"
    2383         s.listen()
    2384         print "server: awaiting connection"
    2385         sock = s.accept()
    2386         print "server: got connection"
    2387 
    2388         sock.send("Hello, echoing...\n")
    2389         buf = ''
    2390         while 1:
    2391             c = sock.recv(1)
    2392             if c == '':
    2393                 sock.close()
    2394                 print "server: socket closed"
    2395                 break
    2396 
    2397             buf += c
    2398             if c == '\n':
    2399                 sock.send("SERVER: "+buf)
    2400                 buf = ''
    2401 
    2402     # client thread which reads lines and prints them to stdout
    2403     def clientThread(s):
    2404         buf = ''
    2405         while 1:
    2406             c = s.recv(1)
    2407             if c == '':
    2408                 s.close()
    2409                 print "client: socket closed"
    2410                 break
    2411             buf += c
    2412             if c == '\n':
    2413                 print "client: got %s" % repr(buf)
    2414                 buf = ''
    2415 
    2416     print "launching server thread..."
    2417     thread.start_new_thread(servThread, (sServer,))
    2418 
    2419     print "client: trying to connect"
    2420     sClient.connect(dServer)
    2421 
    2422     print "client: connected, launching rx thread"
    2423     thread.start_new_thread(clientThread, (sClient,))
    2424 
    2425     while 1:
    2426         line = raw_input("Enter something (q to quit)> ")
    2427         if line == 'q':
    2428             print "closing client socket"
    2429             sClient.close()
    2430             break
    2431         sClient.send(line+"\n")
    2432 
    2433     print "I2PSocket test apparently succeeded"
    2434 
    2435 #@-node:testsocket
    24362570#@+node:MAINLINE
    24372571if __name__ == '__main__':
  • apps/sam/python/src/i2psamclient.py

    r4d0b3b2 r4cdd42f  
    1818import sys, os, socket, thread, threading, Queue, traceback, StringIO, time
    1919
     20from pdb import set_trace
     21
    2022#@-node:imports
    2123#@+node:globals
     
    5456    """
    5557    pass
     58
     59class I2PStreamClosed(Exception):
     60    """
     61    Stream is not open
     62    """
    5663#@-node:exceptions
    5764#@+node:class I2PSamClient
     
    111118        self.qDatagrams = Queue.Queue()
    112119        self.qRawMessages = Queue.Queue()
     120   
    113121        self.namingReplies = {}
    114122        self.namingCache = {}
     123   
     124        self.streams = {} # currently open streams, keyed by id
     125        self.streamConnectReplies = {} # holds queues awaiting connect resp, keyed by id
     126        self.qNewStreams = Queue.Queue() # incoming connections
     127   
     128        self.samNextIdLock = threading.Lock()
     129        self.samNextId = 1
     130   
    115131        self.isRunning = 1
     132   
    116133   
    117134        log(4, "trying connection to SAM server...")
     
    313330       
    314331        Keywords:
     332            - direction - only used for STREAM sessions, can be RECEIVE,
     333              CREATE or BOTH (default BOTH)
    315334            - i2cphost - hostname for the SAM bridge to contact i2p router on
    316335            - i2cpport - port for the SAM bridge to contact i2p router on
     
    323342        kw1['STYLE'] = self.samStyle = style
    324343        kw1['DESTINATION'] = dest
     344        if style == 'STREAM':
     345            direction = kw.get('direction', 'BOTH')
     346            kw1['DIRECTION'] = direction
     347            if direction == 'BOTH':
     348                self.canAccept = 1
     349                self.canConnect = 1
     350            elif direction == 'RECEIVE':
     351                self.canAccept = 1
     352                self.canConnect = 0
     353            elif direction == 'CREATE':
     354                self.canAccept = 0
     355                self.canConnect = 1
     356            else:
     357                raise I2PCommandFail("direction keyword must be one of RECEIVE, CREATE or BOTH")
    325358   
    326359        # stick in i2cp host/port if specified
     
    424457        return self.qDatagrams.get()
    425458    #@-node:samDatagramReceive
     459    #@+node:samStreamConnect
     460    def samStreamConnect(self, dest):
     461        """
     462        Makes a STREAM connection to a remote dest
     463   
     464        STREAM STATUS
     465        RESULT=$result
     466        ID=$id
     467        [MESSAGE=...]
     468        """
     469        # need an ID
     470        id = self.samAllocId()
     471       
     472        # create queue for connect reply
     473        q = self.streamConnectReplies[id] = Queue.Queue()
     474   
     475        # send req
     476        self.samSend("STREAM", "CONNECT",
     477                     ID=id,
     478                     DESTINATION=dest,
     479                     )
     480   
     481        # await reply - comes back as a dict
     482        resp = q.get()
     483   
     484        # ditch queue
     485        del self.streamConnectReplies[id]
     486        del q
     487       
     488        # check out response
     489        result = resp['RESULT']
     490        if result == 'OK':
     491            conn = I2PSAMStream(self, id, dest)
     492            self.streams[id] = conn
     493            return conn
     494        else:
     495            msg = resp.get('MESSAGE', '')
     496            raise I2PCommandFail(result, msg, "STREAM CONNECT")
     497   
     498    #@-node:samStreamConnect
     499    #@+node:samStreamAccept
     500    def samStreamAccept(self):
     501        """
     502        Waits for an incoming connection, returning a wrapped conn obj
     503        """
     504        log(4, "waiting for connection")
     505        conn = self.qNewStreams.get()
     506        log(4, "got connection")
     507        return conn
     508    #@-node:samStreamAccept
     509    #@+node:samStreamSend
     510    def samStreamSend(self, conn, data):
     511        """
     512        DO NOT CALL THIS DIRECTLY
     513       
     514        Invoked by an I2PSAMStream object to transfer data
     515        Use the object's .send() method instead.
     516       
     517        conn is the I2PSAMStream
     518   
     519        STREAM SEND
     520        ID=$id
     521        SIZE=$numBytes\n[$numBytes of data]
     522        """
     523        # dispatch
     524        self.samSend("STREAM", "SEND", data, ID=conn.id)
     525   
     526        # useless, but mimics socket paradigm
     527        return len(data)
     528   
     529    #@-node:samStreamSend
     530    #@+node:samStreamClose
     531    def samStreamClose(self, conn):
     532        """
     533        DO NOT CALL DIRECTLY
     534       
     535        Invoked by I2PSAMStream to close stream
     536        Use the object's .send() method instead.
     537       
     538        STREAM CLOSE
     539        ID=$id
     540        """
     541        self.samSend("STREAM", "CLOSE", ID=conn.id)
     542        del self.streams[conn.id]
     543   
     544    #@-node:samStreamClose
    426545    #@+node:samNamingLookup
    427546    def samNamingLookup(self, host):
     
    473592                name, val = arg.split("=", 1)
    474593            except:
    475                 logException(3, "failed to process %s" % repr(arg))
     594                logException(3, "failed to process %s in %s" % (repr(arg), repr(flds)))
    476595                raise
    477596            dargs[name] = val
     
    486605   
    487606        return cmd, subcmd, dargs
     607   
     608   
    488609   
    489610   
     
    587708        """
    588709        Handles STREAM messages from server
    589         """
     710   
     711        STREAM STATUS
     712        RESULT=$result
     713        ID=$id
     714        [MESSAGE=...]
     715   
     716        STREAM CONNECTED
     717        DESTINATION=$base64key
     718        ID=$id
     719   
     720        STREAM RECEIVED
     721        ID=$id
     722        SIZE=$numBytes\n[$numBytes of data]
     723   
     724        STREAM CLOSED
     725        RESULT=$result
     726        ID=$id
     727        [MESSAGE=...]
     728        """
     729        log(4, "got %s %s %s" % (topic, subtopic, args))
     730   
     731        # which stream?
     732        id = int(args['ID'])
     733   
     734        # result of prior connection attempt
     735        if subtopic == 'STATUS':
     736            # stick it on the queue that the caller is waiting on and let the
     737            # caller interpret the result
     738            self.streamConnectReplies[id].put(args)
     739            return
     740   
     741        # notice of incoming connection
     742        if subtopic == 'CONNECTED':
     743   
     744            # grab details
     745            dest = args['DESTINATION']
     746   
     747            # wrap it in a stream obj
     748            conn = I2PSAMStream(self, id, dest)
     749            self.streams[id] = conn
     750   
     751            # and put it there for anyone calling samStreamAccept()
     752            self.qNewStreams.put(conn)
     753   
     754            # done
     755            return
     756   
     757        # notice of received data
     758        elif subtopic == 'RECEIVED':
     759            # grab details
     760            data = args['DATA']
     761   
     762            # lookup the connection
     763            conn = self.streams.get(id, None)
     764            if not conn:
     765                # conn not known, just ditch
     766                log(2, "got data, but don't recall any conn with id %s" % id)
     767                return
     768   
     769            # and post the received data
     770            conn._notifyIncomingData(data)
     771   
     772            log(4, "wrote data to conn's inbound queue")
     773           
     774            # done
     775            return
     776   
     777        elif subtopic == 'CLOSED':
     778            # lookup the connection
     779            conn = self.streams.get(id, None)
     780            if not conn:
     781                # conn not known, just ditch
     782                return
     783   
     784            # mark conn as closed and forget it
     785            conn._notifyIncomingData("") # special signal to close
     786            conn.isOpen = 0
     787            del self.streams[id]
     788   
     789            # done
     790            return
     791   
     792   
     793   
    590794    #@-node:on_STREAM
    591795    #@+node:on_DATAGRAM
     
    628832        self.qNewDests.put(res)
    629833    #@-node:on_DEST
     834    #@+node:samAllocId
     835    def samAllocId(self):
     836        """
     837        Allocates a new unique id as required by SAM protocol
     838        """
     839        self.samNextIdLock.acquire()
     840        id = self.samNextId
     841        self.samNextId += 1
     842        self.samNextIdLock.release()
     843        return id
     844    #@-node:samAllocId
    630845    #@+node:_recvline
    631846    def _recvline(self):
     
    681896    #@-others
    682897#@-node:class I2PSamClient
     898#@+node:class I2PSAMStream
     899class I2PSAMStream:
     900    """
     901    Wrapper for a stream object
     902    """
     903    #@    @+others
     904    #@+node:__init__
     905    def __init__(self, client, id, dest):
     906        """
     907        """
     908        self.client = client
     909        self.id = id
     910        self.dest = dest
     911   
     912        self.qIncomingData = Queue.Queue()
     913   
     914        self.inbuf = ''
     915        self.isOpen = 1
     916    #@-node:__init__
     917    #@+node:send
     918    def send(self, data):
     919        """
     920        Sends data to a stream connection
     921        """
     922        # barf if stream not open
     923        if not self.isOpen:
     924            raise I2PStreamClosed
     925   
     926        # can send
     927        return self.client.samStreamSend(self, data)
     928    #@-node:send
     929    #@+node:recv
     930    def recv(self, size):
     931        """
     932        Retrieves n bytes from peer
     933        """
     934        chunks = []
     935   
     936        while self.isOpen and size > 0:
     937            # try internal buffer first
     938            if self.inbuf:
     939                chunk = self.inbuf[:size]
     940                chunklen = len(chunk)
     941                self.inbuf = self.inbuf[chunklen:]
     942                chunks.append(chunk)
     943                size -= chunklen
     944            else:
     945                # replenish input buffer
     946                log(4, "I2PSAMStream.recv: replenishing input buffer")
     947                buf = self.qIncomingData.get()
     948                if buf == '':
     949                    # connection closed by peer
     950                    self.isOpen = 0
     951                    break
     952                else:
     953                    # got more data
     954                    log(4, "I2PSAMStream: queue returned %s" % repr(buf))
     955                    self.inbuf += buf
     956   
     957        # return whatever we've got, hopefully all
     958        return "".join(chunks)
     959   
     960   
     961    #@-node:recv
     962    #@+node:readline
     963    def readline(self):
     964        """
     965        Read a line of text from stream, return the line without trailing newline
     966       
     967        This method really shouldn't exist in a class that's trying to look a bit
     968        like a socket object, but what the hell!
     969        """
     970        chars = []
     971        while 1:
     972            char = self.recv(1)
     973            if char in ['', '\n']:
     974                break
     975            chars.append(char)
     976        return "".join(chars)
     977    #@-node:readline
     978    #@+node:close
     979    def close(self):
     980        """
     981        close this stream connection
     982        """
     983        log(4, "closing stream")
     984        self.client.samStreamClose(self)
     985        log(4, "stream closed")
     986        self.isOpen = 0
     987   
     988        # and just to make sure...
     989        self.qIncomingData.put("") # busts out of recv() loops
     990   
     991    #@-node:close
     992    #@+node:__del__
     993    def __del__(self):
     994        """
     995        Dropping last ref to this object closes stream
     996        """
     997        self.close()
     998    #@-node:__del__
     999    #@+node:_notifyIncomingData
     1000    def _notifyIncomingData(self, data):
     1001        """
     1002        Called by client receiver to notify incoming data
     1003        """
     1004        log(4, "got %s" % repr(data))
     1005        self.qIncomingData.put(data)
     1006    #@-node:_notifyIncomingData
     1007    #@-others
     1008#@-node:class I2PSAMStream
    6831009#@+node:class I2PRemoteSession
    6841010class I2PRemoteSession:
     
    7021028    #@+node:send
    7031029    def send(self, peerdest, msg):
    704    
     1030        """
     1031        """
    7051032        return self.client.send(self.dest, peerdest, msg)
    7061033    #@-node:send
     
    8861213    print
    8871214
    888     print "Instantiating 2 more client connections..."
     1215    print "Instantiating client c6..."
     1216    c6 = I2PSamClient()
     1217
     1218    print "Creating dest for c6"
     1219    pub6, priv6 = c6.samDestGenerate()
     1220
     1221    print "Creating SAM STREAM SESSION on connection c6..."
     1222    res = c6.samSessionCreate("STREAM", priv6, direction="RECEIVE")
     1223    if res != 'OK':
     1224        print "Failed to create STREAM session on connection c6: %s" % repr(res)
     1225        return
     1226    print "STREAM Session on connection c6 created successfully"
     1227
     1228    print "Launching acceptor thread..."
     1229    thread.start_new_thread(demoSTREAM_thread, (c6,))
     1230
     1231    #print "sleep a while and give the server a chance..."
     1232    #time.sleep(10)
     1233
     1234    print "----------------------------------------"
     1235
     1236    print "Instantiating client c5..."
    8891237    c5 = I2PSamClient()
    890     c6 = I2PSamClient()
    891 
    892     print "Creating more dests via SAM"
     1238
     1239    print "Creating dest for c5"
    8931240    pub5, priv5 = c5.samDestGenerate()
    894     pub6, priv6 = c6.samDestGenerate()
    895 
    896     print "Creating SAM STREAM SESSION on connection c3..."
    897     res = c5.samSessionCreate("STREAM", priv5)
     1241
     1242    print "Creating SAM STREAM SESSION on connection c5..."
     1243    res = c5.samSessionCreate("STREAM", priv5, direction="CREATE")
    8981244    if res != 'OK':
    8991245        print "Failed to create STREAM session on connection c5: %s" % repr(res)
     
    9011247    print "STREAM Session on connection c5 created successfully"
    9021248
    903     print "Creating SAM STREAM SESSION on connection c6..."
    904     res = c6.samSessionCreate("STREAM", priv6)
    905     if res != 'OK':
    906         print "Failed to create STREAM session on connection c4: %s" % repr(res)
     1249    print "----------------------------------------"
     1250
     1251    print "Making connection from c5 to c6..."
     1252
     1253    #set_trace()
     1254
     1255    try:
     1256        conn_c5 = c5.samStreamConnect(pub6)
     1257    except:
     1258        print "Stream Connection failed"
    9071259        return
    908     print "STREAM Session on connection c4 created successfully"
    909 
    910     msg = "Hi there, this is a datagram!"
    911     print "sending from c5 to c6: %s" % repr(msg)
    912     c5.samStreamSend(pub6, msg)
    913 
    914     print "now try to receive from c6 (will block)..."
    915     msg1 = c6.samStreamReceive()
    916     print "Connection c6 got %s from %s..." % (repr(msg1), repr(remdest))
     1260    print "Stream connect succeeded"
     1261   
     1262    print "Receiving from c5..."
     1263    buf = conn_c5.readline()
     1264    print "Got %s" % repr(buf)
     1265
     1266    #print "Try to accept connection on c6..."
     1267    #conn_c6 = c6.sam
    9171268
    9181269    print
     
    9221273    print
    9231274
     1275
     1276
     1277
     1278
    9241279#@-node:demoSTREAM
     1280#@+node:demoSTREAM_thread
     1281def demoSTREAM_thread(sess):
     1282   
     1283    while 1:
     1284        sock = sess.samStreamAccept()
     1285        log(4, "got incoming connection")
     1286
     1287        print "**ACCEPTOR SLEEPING 10 secs BEFORE SENDING"
     1288       
     1289        time.sleep(10)
     1290
     1291        sock.send("Hi there, what do you want?\n")
     1292
     1293        print "**ACCEPTOR SLEEPING 5 MINS BEFORE CLOSING"
     1294        time.sleep(300)
     1295        print "**ACCEPTOR CLOSING STREAM"
     1296
     1297        sock.close()
     1298
     1299#@-node:demoSTREAM_thread
    9251300#@+node:demo
    9261301def demo():
     
    9371312    print
    9381313
    939     demoNAMING()
    940     demoRAW()
    941     demoDATAGRAM()
    942     #demoSTREAM()
     1314    #demoNAMING()
     1315    #demoRAW()
     1316    #demoDATAGRAM()
     1317    demoSTREAM()
    9431318
    9441319    print
Note: See TracChangeset for help on using the changeset viewer.