Changeset 241bb38


Ignore:
Timestamp:
Feb 8, 2016 9:24:06 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
e402bfa
Parents:
84b9436 (diff), 55addfc (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p.zzz.sam' (head d5c193915251826fe4f5dcd58c36f74714495fd4)

to branch 'i2p.i2p' (head 5ad07e5b5ef68fddeec919c04c6c49178b6a6b31)

Location:
apps/sam/java/src/net/i2p/sam
Files:
5 added
16 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java

    r84b9436 r241bb38  
    3333    // FIXME make final after fixing SAMv3DatagramSession override
    3434    protected SAMDatagramReceiver recv;
    35 
    3635    private final I2PDatagramMaker dgramMaker;
    3736    private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector();
     37
    3838    /**
    3939     * Create a new SAM DATAGRAM session.
     
    4646     * @throws I2PSessionException
    4747     */
    48     public SAMDatagramSession(String dest, Properties props,
     48    protected SAMDatagramSession(String dest, Properties props,
    4949                              SAMDatagramReceiver recv) throws IOException,
    5050                              DataFormatException, I2PSessionException {
    5151        super(dest, props);
    52 
    5352        this.recv = recv;
    5453        dgramMaker = new I2PDatagramMaker(getI2PSession());
     
    5756    /**
    5857     * Create a new SAM DATAGRAM session.
     58     *
     59     * Caller MUST call start().
    5960     *
    6061     * @param destStream Input stream containing the destination keys
     
    6970                              DataFormatException, I2PSessionException {
    7071        super(destStream, props);
     72        this.recv = recv;
     73        dgramMaker = new I2PDatagramMaker(getI2PSession());
     74    }
    7175
     76    /**
     77     * Create a new SAM DATAGRAM session on an existing I2P session.
     78     *
     79     * @param props unused for now
     80     * @since 0.9.25
     81     */
     82    protected SAMDatagramSession(I2PSession sess, Properties props, int listenPort,
     83                              SAMDatagramReceiver recv) throws IOException,
     84                              DataFormatException, I2PSessionException {
     85        super(sess, I2PSession.PROTO_DATAGRAM, listenPort);
    7286        this.recv = recv;
    7387        dgramMaker = new I2PDatagramMaker(getI2PSession());
     
    91105        byte[] dgram ;
    92106        synchronized (dgramMaker) {
    93                 dgram = dgramMaker.makeI2PDatagram(data);
     107            dgram = dgramMaker.makeI2PDatagram(data);
    94108        }
    95109        return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort);
     110    }
     111
     112    /**
     113     * Send bytes through a SAM DATAGRAM session.
     114     *
     115     * @since 0.9.25
     116     */
     117    public boolean sendBytes(String dest, byte[] data, int proto,
     118                             int fromPort, int toPort,
     119                             boolean sendLeaseSet, int sendTags,
     120                             int tagThreshold, int expiration)
     121                                 throws DataFormatException, I2PSessionException {
     122        if (data.length > DGRAM_SIZE_MAX)
     123            throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
     124        byte[] dgram ;
     125        synchronized (dgramMaker) {
     126            dgram = dgramMaker.makeI2PDatagram(data);
     127        }
     128        return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort,
     129                                              sendLeaseSet, sendTags,tagThreshold, expiration);
    96130    }
    97131
  • apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java

    r84b9436 r241bb38  
    2626class SAMHandlerFactory {
    2727
    28     private static final String VERSION = "3.2";
     28    private static final String VERSION = "3.3";
    2929
    3030    private static final int HELLO_TIMEOUT = 60*1000;
     
    140140            VersionComparator.comp(VERSION, maxVer) <= 0)
    141141            return VERSION;
     142        if (VersionComparator.comp("3.2", minVer) >= 0 &&
     143            VersionComparator.comp("3.2", maxVer) <= 0)
     144            return "3.2";
    142145        if (VersionComparator.comp("3.1", minVer) >= 0 &&
    143146            VersionComparator.comp("3.1", maxVer) <= 0)
  • apps/sam/java/src/net/i2p/sam/SAMMessageSession.java

    r84b9436 r241bb38  
    3434 * @author human
    3535 */
    36 abstract class SAMMessageSession implements Closeable {
     36abstract class SAMMessageSession implements SAMMessageSess {
    3737
    3838    protected final Log _log;
    39     private I2PSession session;
    40     private SAMMessageSessionHandler handler;
     39    private final I2PSession session;
     40    protected final boolean _isOwnSession;
     41    private final SAMMessageSessionHandler handler;
     42    private final int listenProtocol;
     43    private final int listenPort;
    4144
    4245    /**
     
    5053     */
    5154    protected SAMMessageSession(String dest, Properties props) throws IOException, DataFormatException, I2PSessionException {
    52         _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
    53         ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(dest));
    54         initSAMMessageSession(bais, props);
     55        this(new ByteArrayInputStream(Base64.decode(dest)), props);
    5556    }
    5657
     
    6566     */
    6667    protected SAMMessageSession(InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException {
    67         _log = new Log(getClass());
    68         initSAMMessageSession(destStream, props);
    69     }
    70 
    71     private void initSAMMessageSession (InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException {
     68        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
    7269        if (_log.shouldLog(Log.DEBUG))
    7370            _log.debug("Initializing SAM message-based session");
    74 
     71        listenProtocol = I2PSession.PROTO_ANY;
     72        listenPort = I2PSession.PORT_ANY;
     73        _isOwnSession = true;
    7574        handler = new SAMMessageSessionHandler(destStream, props);
    76 
    77         // FIXME don't start threads in constructors
     75        session = handler.getSession();
     76    }
     77
     78    /**
     79     * Initialize a new SAM message-based session using an existing I2PSession.
     80     *
     81     * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
     82     * @param props Properties to setup the I2P session
     83     * @throws IOException
     84     * @throws DataFormatException
     85     * @throws I2PSessionException
     86     * @since 0.9.25
     87     */
     88    protected SAMMessageSession(I2PSession sess, int listenProtocol, int listenPort)
     89                            throws IOException, DataFormatException, I2PSessionException {
     90        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
     91        if (_log.shouldLog(Log.DEBUG))
     92            _log.debug("Initializing SAM message-based session");
     93        this.listenProtocol = listenProtocol;
     94        this.listenPort = listenPort;
     95        _isOwnSession = false;
     96        session = sess;
     97        handler = new SAMMessageSessionHandler(session);
     98    }
     99
     100    /*
     101     * @since 0.9.25
     102     */
     103    public void start() {
    78104        Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
    79105        t.start();
     
    87113    public Destination getDestination() {
    88114        return session.getMyDestination();
     115    }
     116
     117    /**
     118     * @since 0.9.25
     119     */
     120    public int getListenProtocol() {
     121        return listenProtocol;
     122    }
     123
     124    /**
     125     * @since 0.9.25
     126     */
     127    public int getListenPort() {
     128        return listenPort;
    89129    }
    90130
     
    129169
    130170    /**
    131      * Actually send bytes through the SAM message-based session I2PSession.
    132      * TODO unused, umimplemented in the sessions and handlers
     171     * Actually send bytes through the SAM message-based session I2PSession,
     172     * using per-message extended options.
     173     * For efficiency, use the method without all the extra options if they are all defaults.
    133174     *
    134175     * @param dest Destination
     
    137178     * @param fromPort I2CP from port
    138179     * @param toPort I2CP to port
     180     * @param sendLeaseSet true is the usual setting and the I2CP default
     181     * @param sendTags 0 to leave as default
     182     * @param tagThreshold 0 to leave as default
     183     * @param expiration SECONDS from now, NOT absolute time, 0 to leave as default
    139184     *
    140185     * @return True if the data was sent, false otherwise
     
    146191                                        int proto, int fromPort, int toPort,
    147192                                        boolean sendLeaseSet, int sendTags,
    148                                         int tagThreshold, long expires)
     193                                        int tagThreshold, int expiration)
    149194                                        throws DataFormatException, I2PSessionException {
    150195        Destination d = SAMUtils.getDest(dest);
     
    154199        }
    155200        SendMessageOptions opts = new SendMessageOptions();
    156         opts.setSendLeaseSet(sendLeaseSet);
    157         opts.setTagsToSend(sendTags);
    158         opts.setTagThreshold(tagThreshold);
    159         opts.setDate(expires);
     201        if (!sendLeaseSet)
     202            opts.setSendLeaseSet(false);
     203        if (sendTags > 0)
     204            opts.setTagsToSend(sendTags);
     205        if (tagThreshold > 0)
     206            opts.setTagThreshold(tagThreshold);
     207        if (expiration > 0)
     208            opts.setDate(I2PAppContext.getGlobalContext().clock().now() + (expiration * 1000));
    160209
    161210        return session.sendMessage(d, data, 0, data.length, proto, fromPort, toPort, opts);
     
    195244     * @author human
    196245     */
    197     class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
    198 
     246    private class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
     247
     248        private final I2PSession _session;
    199249        private final Object runningLock = new Object();
    200250        private volatile boolean stillRunning = true;
     
    204254         *
    205255         * @param destStream Input stream containing the destination keys
    206         * @param props Properties to setup the I2P session
    207         * @throws I2PSessionException
     256        * @param props Properties to setup the I2P session
     257        * @throws I2PSessionException
    208258         */
    209259        public SAMMessageSessionHandler(InputStream destStream, Properties props) throws I2PSessionException {
     
    216266                props.setProperty("outbound.nickname", "SAM UDP Client");
    217267            }
    218             session = client.createSession(destStream, props);
     268            _session = client.createSession(destStream, props);
    219269
    220270            if (_log.shouldLog(Log.DEBUG))
    221271                _log.debug("Connecting I2P session...");
    222             session.connect();
     272            _session.connect();
    223273            if (_log.shouldLog(Log.DEBUG))
    224274                _log.debug("I2P session connected");
    225275
    226             session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
     276            _session.addMuxedSessionListener(this, listenProtocol, listenPort);
     277        }
     278               
     279        /**
     280         * Create a new SAM message-based session handler on an existing I2PSession
     281         *
     282         * @since 0.9.25
     283         */
     284        public SAMMessageSessionHandler(I2PSession sess) throws I2PSessionException {
     285            _session = sess;
     286            _session.addMuxedSessionListener(this, listenProtocol, listenPort);
     287        }
     288
     289        /**
     290         * The session.
     291         * @since 0.9.25
     292         */
     293        public final I2PSession getSession() {
     294            return _session;
    227295        }
    228296
     
    255323           
    256324            shutDown();
    257             session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
     325            session.removeListener(listenProtocol, listenPort);
    258326           
    259             try {
    260                 if (_log.shouldLog(Log.DEBUG))
    261                     _log.debug("Destroying I2P session...");
    262                 session.destroySession();
    263                 if (_log.shouldLog(Log.DEBUG))
    264                     _log.debug("I2P session destroyed");
    265             } catch (I2PSessionException e) {
    266                     _log.error("Error destroying I2P session", e);
     327            if (_isOwnSession) {
     328                try {
     329                    if (_log.shouldLog(Log.DEBUG))
     330                        _log.debug("Destroying I2P session...");
     331                    session.destroySession();
     332                    if (_log.shouldLog(Log.DEBUG))
     333                        _log.debug("I2P session destroyed");
     334                } catch (I2PSessionException e) {
     335                        _log.error("Error destroying I2P session", e);
     336                }
    267337            }
    268338        }
  • apps/sam/java/src/net/i2p/sam/SAMRawSession.java

    r84b9436 r241bb38  
    4040     * @throws I2PSessionException
    4141     */
    42     public SAMRawSession(String dest, Properties props,
     42    protected SAMRawSession(String dest, Properties props,
    4343                         SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
    4444        super(dest, props);
    45 
    4645        this.recv = recv;
    4746    }
     
    4948    /**
    5049     * Create a new SAM RAW session.
     50     *
     51     * Caller MUST call start().
    5152     *
    5253     * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
     
    6061                         SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
    6162        super(destStream, props);
     63        this.recv = recv;
     64    }
    6265
     66    /**
     67     * Create a new SAM RAW session on an existing I2P session.
     68     *
     69     * @param props unused for now
     70     * @since 0.9.25
     71     */
     72    protected SAMRawSession(I2PSession sess, Properties props, int listenProtocol, int listenPort,
     73                            SAMRawReceiver recv) throws IOException,
     74                              DataFormatException, I2PSessionException {
     75        super(sess, listenProtocol, listenPort);
    6376        this.recv = recv;
    6477    }
     
    8396    }
    8497
     98    /**
     99     * Send bytes through a SAM RAW session.
     100     *
     101     * @since 0.9.25
     102     */
     103    public boolean sendBytes(String dest, byte[] data, int proto,
     104                             int fromPort, int toPort,
     105                             boolean sendLeaseSet, int sendTags,
     106                             int tagThreshold, int expiration)
     107                                 throws DataFormatException, I2PSessionException {
     108        if (data.length > RAW_SIZE_MAX)
     109            throw new DataFormatException("Data size limit exceeded (" + data.length + ")");
     110        if (proto == I2PSession.PROTO_UNSPECIFIED)
     111            proto = I2PSession.PROTO_DATAGRAM_RAW;
     112        return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort,
     113                                              sendLeaseSet, sendTags,tagThreshold, expiration);
     114    }
     115
    85116    protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
    86117        try {
  • apps/sam/java/src/net/i2p/sam/SAMStreamSession.java

    r84b9436 r241bb38  
    2929import net.i2p.I2PException;
    3030import net.i2p.client.I2PClient;
     31import net.i2p.client.I2PSession;
     32import net.i2p.client.I2PSessionException;
    3133import net.i2p.client.streaming.I2PServerSocket;
    3234import net.i2p.client.streaming.I2PSocket;
     
    4850 * @author human
    4951 */
    50 class SAMStreamSession {
     52class SAMStreamSession implements SAMMessageSess {
    5153
    5254    protected final Log _log;
    53 
    5455    protected final static int SOCKET_HANDLER_BUF_SIZE = 32768;
    55 
    5656    protected final SAMStreamReceiver recv;
    57 
    5857    protected final SAMStreamSessionServer server;
    59 
    6058    protected final I2PSocketManager socketMgr;
    6159
     
    6967    // Can we create outgoing connections?
    7068    protected final boolean canCreate;
     69    private final int listenProtocol;
     70    private final int listenPort;
     71    protected final boolean _isOwnSession;
    7172
    7273    /**
     
    8182    /**
    8283     * Create a new SAM STREAM session.
     84     *
     85     * Caller MUST call start().
    8386     *
    8487     * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile)
     
    106109     * @throws SAMException
    107110     */
    108     public SAMStreamSession(InputStream destStream, String dir,
    109                             Properties props,  SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
     111    protected SAMStreamSession(InputStream destStream, String dir,
     112                               Properties props,  SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
    110113        this.recv = recv;
    111114        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
     
    157160        }
    158161
     162        _isOwnSession = true;
    159163        if (_log.shouldLog(Log.DEBUG))
    160164            _log.debug("Creating I2PSocketManager...");
    161         socketMgr = I2PSocketManagerFactory.createManager(destStream,
    162                                                           i2cpHost,
    163                                                           i2cpPort,
    164                                                           allprops);
    165         if (socketMgr == null) {
    166             throw new SAMException("Error creating I2PSocketManager");
     165        try {
     166            // we do it this way so we get exceptions
     167            socketMgr = I2PSocketManagerFactory.createDisconnectedManager(destStream,
     168                                                            i2cpHost, i2cpPort, allprops);
     169            socketMgr.getSession().connect();
     170        } catch (I2PSessionException ise) {
     171            throw new SAMException("Error creating I2PSocketManager: " + ise.getMessage(), ise);
    167172        }
    168173       
     
    171176        forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH));
    172177       
     178        if (Boolean.parseBoolean(props.getProperty("i2p.streaming.enforceProtocol")))
     179            listenProtocol = I2PSession.PROTO_STREAMING;
     180        else
     181            listenProtocol = I2PSession.PROTO_ANY;
     182        listenPort = I2PSession.PORT_ANY;
     183
    173184
    174185        if (startAcceptor) {
    175186            server = new SAMStreamSessionServer();
    176             Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
    177 
    178             t.start();
    179187        } else {
    180188            server = null;
    181189        }
     190    }
     191
     192    /**
     193     * Create a new SAM STREAM session on an existing socket manager.
     194     * v3 only.
     195     *
     196     * @param props Properties to setup the I2P session
     197     * @param recv Object that will receive incoming data
     198     * @throws IOException
     199     * @throws DataFormatException
     200     * @throws SAMException
     201     * @since 0.9.25
     202     */
     203    protected SAMStreamSession(I2PSocketManager mgr, Properties props, SAMStreamReceiver recv, int listenport)
     204                               throws IOException, DataFormatException, SAMException {
     205        this.recv = recv;
     206        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
     207        if (_log.shouldLog(Log.DEBUG))
     208            _log.debug("SAM STREAM session instantiated");
     209        canCreate = true;
     210        Properties allprops = (Properties) System.getProperties().clone();
     211        allprops.putAll(props);
     212        _isOwnSession = false;
     213        socketMgr = mgr;
     214        socketMgr.addDisconnectListener(new DisconnectListener());
     215        forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH));
     216        listenProtocol = I2PSession.PROTO_STREAMING;
     217        listenPort = listenport;
     218        server = null;
     219    }
     220
     221    /*
     222     * @since 0.9.25
     223     */
     224    public void start() {
     225        if (server != null) {
     226            Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
     227            t.start();
     228        }
     229    }
     230
     231    /*
     232     * @since 0.9.25
     233     */
     234    public int getListenProtocol() {
     235        return listenProtocol;
     236    }
     237
     238    /*
     239     * @since 0.9.25
     240     */
     241    public int getListenPort() {
     242        return listenPort;
    182243    }
    183244   
     
    285346        removeAllSocketHandlers();
    286347        recv.stopStreamReceiving();
    287         socketMgr.destroySocketManager();
     348        if (_isOwnSession)
     349            socketMgr.destroySocketManager();
    288350    }
    289351
     
    303365
    304366        return true;
     367    }
     368
     369    /**
     370     *  Unsupported
     371     *  @throws I2PSessionException always
     372     *  @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess
     373     */
     374    public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws I2PSessionException {
     375        throw new I2PSessionException("Unsupported in STREAM or MASTER session");
     376    }
     377
     378    /**
     379     *  Unsupported
     380     *  @throws I2PSessionException always
     381     *  @since 0.9.25
     382     */
     383    public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp,
     384                             boolean sendLeaseSet, int sendTags,
     385                             int tagThreshold, int expiration)
     386                                 throws I2PSessionException {
     387        throw new I2PSessionException("Unsupported in STREAM or MASTER session");
    305388    }
    306389
  • apps/sam/java/src/net/i2p/sam/SAMv1Handler.java

    r84b9436 r241bb38  
    4141class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver {
    4242   
    43     protected SAMRawSession rawSession;
    44     protected SAMDatagramSession datagramSession;
     43    protected SAMMessageSess rawSession;
     44    protected SAMMessageSess datagramSession;
    4545    protected SAMStreamSession streamSession;
    4646
    47     protected SAMRawSession getRawSession() {return rawSession ;}
    48     protected SAMDatagramSession getDatagramSession() {return datagramSession ;}       
    49     protected SAMStreamSession getStreamSession() {return streamSession ;}
     47    protected final SAMMessageSess getRawSession() { return rawSession; }
     48    protected final SAMMessageSess getDatagramSession() { return datagramSession; }     
     49    protected final SAMStreamSession getStreamSession() { return streamSession; }
    5050
    5151    protected final long _id;
    5252    private static final AtomicLong __id = new AtomicLong();
    5353    private static final int FIRST_READ_TIMEOUT = 60*1000;
     54    protected static final String SESSION_ERROR = "SESSION STATUS RESULT=I2P_ERROR";
    5455   
    5556    /**
     
    133134                    sock.setSoTimeout(0);
    134135                } catch (SocketTimeoutException ste) {
    135                     writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
     136                    writeString(SESSION_ERROR, "command timeout, bye");
    136137                    break;
    137138                }
     
    200201                    _log.warn("Error closing socket", e);
    201202            }
    202             if (getRawSession() != null) {
    203                 getRawSession().close();
    204             }
    205             if (getDatagramSession() != null) {
    206                 getDatagramSession().close();
    207             }
    208             if (getStreamSession() != null) {
    209                 getStreamSession().close();
     203            if (rawSession != null) {
     204                rawSession.close();
     205            }
     206            if (datagramSession != null) {
     207                datagramSession.close();
     208            }
     209            if (streamSession != null) {
     210                streamSession.close();
    210211            }
    211212        }
     
    219220        try{
    220221            if (opcode.equals("CREATE")) {
    221                 if ((getRawSession() != null) || (getDatagramSession() != null)
    222                     || (getStreamSession() != null)) {
     222                if ((rawSession != null) || (datagramSession != null)
     223                    || (streamSession != null)) {
    223224                    if (_log.shouldLog(Log.DEBUG))
    224225                        _log.debug("Trying to create a session, but one still exists");
    225                     return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
     226                    return writeString(SESSION_ERROR, "Session already exists");
    226227                }
    227228                if (props.isEmpty()) {
    228229                    if (_log.shouldLog(Log.DEBUG))
    229230                        _log.debug("No parameters specified in SESSION CREATE message");
    230                     return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
     231                    return writeString(SESSION_ERROR, "No parameters for SESSION CREATE");
    231232                }
    232233               
    233                 dest = props.getProperty("DESTINATION");
     234                dest = (String) props.remove("DESTINATION");
    234235                if (dest == null) {
    235236                    if (_log.shouldLog(Log.DEBUG))
    236237                        _log.debug("SESSION DESTINATION parameter not specified");
    237                     return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
    238                 }
    239                 props.remove("DESTINATION");
     238                    return writeString(SESSION_ERROR, "DESTINATION not specified");
     239                }
    240240               
    241241                String destKeystream = null;
     
    262262                }
    263263               
    264                 String style = props.getProperty("STYLE");
     264                String style = (String) props.remove("STYLE");
    265265                if (style == null) {
    266266                    if (_log.shouldLog(Log.DEBUG))
    267267                        _log.debug("SESSION STYLE parameter not specified");
    268                     return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
    269                 }
    270                 props.remove("STYLE");
     268                    return writeString(SESSION_ERROR, "No SESSION STYLE specified");
     269                }
    271270               
    272271                // Unconditionally override what the client may have set
     
    277276                if (style.equals("RAW")) {
    278277                    rawSession = new SAMRawSession(destKeystream, props, this);
     278                    rawSession.start();
    279279                } else if (style.equals("DATAGRAM")) {
    280280                    datagramSession = new SAMDatagramSession(destKeystream, props,this);
     281                    datagramSession.start();
    281282                } else if (style.equals("STREAM")) {
    282                     String dir = props.getProperty("DIRECTION");
     283                    String dir = (String) props.remove("DIRECTION");
    283284                    if (dir == null) {
    284285                        if (_log.shouldLog(Log.DEBUG))
     
    289290                        if (_log.shouldLog(Log.DEBUG))
    290291                            _log.debug("Unknown DIRECTION parameter value: [" + dir + "]");
    291                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown DIRECTION parameter\"\n");
    292                     } else {
    293                         props.remove("DIRECTION");
     292                        return writeString(SESSION_ERROR, "Unknown DIRECTION parameter");
    294293                    }
    295294               
    296295                    streamSession = newSAMStreamSession(destKeystream, dir,props);
     296                    streamSession.start();
    297297                } else {
    298298                    if (_log.shouldLog(Log.DEBUG))
    299299                        _log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
    300                     return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
     300                    return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE");
    301301                }
    302302                return writeString("SESSION STATUS RESULT=OK DESTINATION="
     
    306306                    _log.debug("Unrecognized SESSION message opcode: \""
    307307                           + opcode + "\"");
    308                 return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
     308                return writeString(SESSION_ERROR, "Unrecognized opcode");
    309309            }
    310310        } catch (DataFormatException e) {
    311311            if (_log.shouldLog(Log.DEBUG))
    312312                _log.debug("Invalid destination specified");
    313             return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     313            return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage());
    314314        } catch (I2PSessionException e) {
    315315            if (_log.shouldLog(Log.DEBUG))
    316316                _log.debug("I2P error when instantiating session", e);
    317             return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     317            return writeString(SESSION_ERROR, e.getMessage());
    318318        } catch (SAMException e) {
    319319            _log.error("Unexpected SAM error", e);
    320             return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     320            return writeString(SESSION_ERROR, e.getMessage());
    321321        } catch (IOException e) {
    322322            _log.error("Unexpected IOException", e);
    323             return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     323            return writeString(SESSION_ERROR, e.getMessage());
    324324        }
    325325    }
     
    379379            Destination dest = null ;
    380380            if (name.equals("ME")) {
    381                 if (getRawSession() != null) {
    382                     dest = getRawSession().getDestination();
    383                 } else if (getStreamSession() != null) {
    384                     dest = getStreamSession().getDestination();
    385                 } else if (getDatagramSession() != null) {
    386                     dest = getDatagramSession().getDestination();
     381                if (rawSession != null) {
     382                    dest = rawSession.getDestination();
     383                } else if (streamSession != null) {
     384                    dest = streamSession.getDestination();
     385                } else if (datagramSession != null) {
     386                    dest = datagramSession.getDestination();
    387387                } else {
    388388                    if (_log.shouldLog(Log.DEBUG))
     
    416416    /* Parse and execute a DATAGRAM message */
    417417    protected boolean execDatagramMessage(String opcode, Properties props) {
    418         if (getDatagramSession() == null) {
     418        if (datagramSession == null) {
    419419            _log.error("DATAGRAM message received, but no DATAGRAM session exists");
    420420            return false;
    421421        }
    422 
     422        return execDgOrRawMessage(false, opcode, props);
     423    }
     424
     425    /* Parse and execute a RAW message */
     426    protected boolean execRawMessage(String opcode, Properties props) {
     427        if (rawSession == null) {
     428            _log.error("RAW message received, but no RAW session exists");
     429            return false;
     430        }
     431        return execDgOrRawMessage(true, opcode, props);
     432    }
     433
     434
     435    /*
     436     * Parse and execute a RAW or DATAGRAM SEND message.
     437     * This is for v1/v2 compatible sending only.
     438     * For v3 sending, see SAMv3DatagramServer.
     439     *
     440     * Note that props are from the command line only.
     441     * Session defaults from CREATE are NOT honored here.
     442     * FIXME if we care, but nobody's probably using v3.2 options for v1/v2 sending.
     443     *
     444     * @since 0.9.25 consolidated from execDatagramMessage() and execRawMessage()
     445     */
     446    private boolean execDgOrRawMessage(boolean isRaw, String opcode, Properties props) {
    423447        if (opcode.equals("SEND")) {
    424448            if (props.isEmpty()) {
    425449                if (_log.shouldLog(Log.DEBUG))
    426                     _log.debug("No parameters specified in DATAGRAM SEND message");
     450                    _log.debug("No parameters specified in SEND message");
    427451                return false;
    428452            }
     
    431455            if (dest == null) {
    432456                if (_log.shouldLog(Log.DEBUG))
    433                     _log.debug("Destination not specified in DATAGRAM SEND message");
     457                    _log.debug("Destination not specified in SEND message");
    434458                return false;
    435459            }
     
    439463            if (strsize == null) {
    440464                if (_log.shouldLog(Log.WARN))
    441                     _log.warn("Size not specified in DATAGRAM SEND message");
     465                    _log.warn("Size not specified in SEND message");
    442466                return false;
    443467            }
     
    446470            } catch (NumberFormatException e) {
    447471                if (_log.shouldLog(Log.WARN))
    448                     _log.warn("Invalid DATAGRAM SEND size specified: " + strsize);
    449                 return false;
    450             }
    451             if (!checkDatagramSize(size)) {
     472                    _log.warn("Invalid SEND size specified: " + strsize);
     473                return false;
     474            }
     475            boolean ok = isRaw ? checkSize(size) : checkDatagramSize(size);
     476            if (!ok) {
    452477                if (_log.shouldLog(Log.WARN))
    453                      _log.warn("Specified size (" + size
     478                    _log.warn("Specified size (" + size
    454479                           + ") is out of protocol limits");
    455480                return false;
    456481            }
    457             int proto = I2PSession.PROTO_DATAGRAM;
    458482            int fromPort = I2PSession.PORT_UNSPECIFIED;
    459483            int toPort = I2PSession.PORT_UNSPECIFIED;
     484            int proto;
     485            if (isRaw) {
     486                proto = I2PSession.PROTO_DATAGRAM_RAW;
     487                String s = props.getProperty("PROTOCOL");
     488                if (s != null) {
     489                    try {
     490                        proto = Integer.parseInt(s);
     491                    } catch (NumberFormatException e) {
     492                        if (_log.shouldLog(Log.WARN))
     493                            _log.warn("Invalid SEND protocol specified: " + s);
     494                    }
     495                }
     496            } else {
     497                proto = I2PSession.PROTO_DATAGRAM;
     498            }
    460499            String s = props.getProperty("FROM_PORT");
    461500            if (s != null) {
     
    464503                } catch (NumberFormatException e) {
    465504                    if (_log.shouldLog(Log.WARN))
    466                         _log.warn("Invalid DATAGRAM SEND port specified: " + s);
     505                        _log.warn("Invalid SEND port specified: " + s);
    467506                }
    468507            }
     
    473512                } catch (NumberFormatException e) {
    474513                    if (_log.shouldLog(Log.WARN))
    475                         _log.warn("Invalid RAW SEND port specified: " + s);
     514                        _log.warn("Invalid SEND port specified: " + s);
    476515                }
    477516            }
     
    483522                in.readFully(data);
    484523
    485                 if (!getDatagramSession().sendBytes(dest, data, proto, fromPort, toPort)) {
    486                     _log.error("DATAGRAM SEND failed");
     524                SAMMessageSess sess = isRaw ? rawSession : datagramSession;
     525                if (sess.sendBytes(dest, data, proto, fromPort, toPort)) {
     526                    _log.error("SEND failed");
    487527                    // a message send failure is no reason to drop the SAM session
    488528                    // for raw and repliable datagrams, just carry on our merry way
     
    493533            } catch (EOFException e) {
    494534                if (_log.shouldLog(Log.DEBUG))
    495                     _log.debug("Too few bytes with DATAGRAM SEND message (expected: "
     535                    _log.debug("Too few bytes with SEND message (expected: "
    496536                           + size);
    497537                return false;
    498538            } catch (IOException e) {
    499539                if (_log.shouldLog(Log.DEBUG))
    500                     _log.debug("Caught IOException while parsing DATAGRAM SEND message",
     540                    _log.debug("Caught IOException while parsing SEND message",
    501541                           e);
    502542                return false;
    503543            } catch (DataFormatException e) {
    504544                if (_log.shouldLog(Log.DEBUG))
    505                     _log.debug("Invalid key specified with DATAGRAM SEND message",
     545                    _log.debug("Invalid key specified with SEND message",
    506546                           e);
    507547                return false;
    508548            } catch (I2PSessionException e) {
    509                 _log.error("Session error with DATAGRAM SEND message", e);
     549                _log.error("Session error with SEND message", e);
    510550                return false;
    511551            }
    512552        } else {
    513553            if (_log.shouldLog(Log.DEBUG))
    514                 _log.debug("Unrecognized DATAGRAM message opcode: \""
    515                        + opcode + "\"");
    516             return false;
    517         }
    518     }
    519 
    520     /* Parse and execute a RAW message */
    521     protected boolean execRawMessage(String opcode, Properties props) {
    522         if (getRawSession() == null) {
    523             _log.error("RAW message received, but no RAW session exists");
    524             return false;
    525         }
    526 
    527         if (opcode.equals("SEND")) {
    528             if (props.isEmpty()) {
    529                 if (_log.shouldLog(Log.DEBUG))
    530                     _log.debug("No parameters specified in RAW SEND message");
    531                 return false;
    532             }
    533            
    534             String dest = props.getProperty("DESTINATION");
    535             if (dest == null) {
    536                 if (_log.shouldLog(Log.DEBUG))
    537                     _log.debug("Destination not specified in RAW SEND message");
    538                 return false;
    539             }
    540 
    541             int size;
    542             String strsize = props.getProperty("SIZE");
    543             if (strsize == null) {
    544                 if (_log.shouldLog(Log.WARN))
    545                     _log.warn("Size not specified in RAW SEND message");
    546                 return false;
    547             }
    548             try {
    549                 size = Integer.parseInt(strsize);
    550             } catch (NumberFormatException e) {
    551                 if (_log.shouldLog(Log.WARN))
    552                     _log.warn("Invalid RAW SEND size specified: " + strsize);
    553                 return false;
    554             }
    555             if (!checkSize(size)) {
    556                 if (_log.shouldLog(Log.WARN))
    557                     _log.warn("Specified size (" + size
    558                            + ") is out of protocol limits");
    559                 return false;
    560             }
    561             int proto = I2PSession.PROTO_DATAGRAM_RAW;
    562             int fromPort = I2PSession.PORT_UNSPECIFIED;
    563             int toPort = I2PSession.PORT_UNSPECIFIED;
    564             String s = props.getProperty("PROTOCOL");
    565             if (s != null) {
    566                 try {
    567                     proto = Integer.parseInt(s);
    568                 } catch (NumberFormatException e) {
    569                     if (_log.shouldLog(Log.WARN))
    570                         _log.warn("Invalid RAW SEND protocol specified: " + s);
    571                 }
    572             }
    573             s = props.getProperty("FROM_PORT");
    574             if (s != null) {
    575                 try {
    576                     fromPort = Integer.parseInt(s);
    577                 } catch (NumberFormatException e) {
    578                     if (_log.shouldLog(Log.WARN))
    579                         _log.warn("Invalid RAW SEND port specified: " + s);
    580                 }
    581             }
    582             s = props.getProperty("TO_PORT");
    583             if (s != null) {
    584                 try {
    585                     toPort = Integer.parseInt(s);
    586                 } catch (NumberFormatException e) {
    587                     if (_log.shouldLog(Log.WARN))
    588                         _log.warn("Invalid RAW SEND port specified: " + s);
    589                 }
    590             }
    591 
    592             try {
    593                 DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
    594                 byte[] data = new byte[size];
    595 
    596                 in.readFully(data);
    597 
    598                 if (!getRawSession().sendBytes(dest, data, proto, fromPort, toPort)) {
    599                     _log.error("RAW SEND failed");
    600                     // a message send failure is no reason to drop the SAM session
    601                     // for raw and repliable datagrams, just carry on our merry way
    602                     return true;
    603                 }
    604 
    605                 return true;
    606             } catch (EOFException e) {
    607                 if (_log.shouldLog(Log.DEBUG))
    608                     _log.debug("Too few bytes with RAW SEND message (expected: "
    609                            + size);
    610                 return false;
    611             } catch (IOException e) {
    612                 if (_log.shouldLog(Log.DEBUG))
    613                     _log.debug("Caught IOException while parsing RAW SEND message",
    614                            e);
    615                 return false;
    616             } catch (DataFormatException e) {
    617                 if (_log.shouldLog(Log.DEBUG))
    618                     _log.debug("Invalid key specified with RAW SEND message",
    619                            e);
    620                 return false;
    621             } catch (I2PSessionException e) {
    622                 _log.error("Session error with RAW SEND message", e);
    623                 return false;
    624             }
    625         } else {
    626             if (_log.shouldLog(Log.DEBUG))
    627                 _log.debug("Unrecognized RAW message opcode: \""
     554                _log.debug("Unrecognized message opcode: \""
    628555                       + opcode + "\"");
    629556            return false;
     
    633560    /* Parse and execute a STREAM message */
    634561    protected boolean execStreamMessage(String opcode, Properties props) {
    635         if (getStreamSession() == null) {
     562        if (streamSession == null) {
    636563            _log.error("STREAM message received, but no STREAM session exists");
    637564            return false;
     
    646573        } else {
    647574            if (_log.shouldLog(Log.DEBUG))
    648                 _log.debug("Unrecognized RAW message opcode: \""
     575                _log.debug("Unrecognized STREAM message opcode: \""
    649576                       + opcode + "\"");
    650577            return false;
     
    700627
    701628        try {
    702             if (!getStreamSession().sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
     629            if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
    703630                if (_log.shouldLog(Log.WARN))
    704631                    _log.warn("STREAM SEND [" + size + "] failed");
     
    706633                // for style=stream, tell the client the stream failed, and kill the virtual connection..
    707634                boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
    708                 getStreamSession().closeConnection(id);
     635                streamSession.closeConnection(id);
    709636                return rv;
    710637            }
     
    728655            if (_log.shouldLog(Log.DEBUG))
    729656                _log.debug("No parameters specified in STREAM CONNECT message");
     657            return false;
     658        }
     659
     660        int id;
     661        {
     662            String strid = (String) props.remove("ID");
     663            if (strid == null) {
     664                if (_log.shouldLog(Log.DEBUG))
     665                    _log.debug("ID not specified in STREAM SEND message");
     666                return false;
     667            }
     668            try {
     669                id = Integer.parseInt(strid);
     670            } catch (NumberFormatException e) {
     671                if (_log.shouldLog(Log.DEBUG))
     672                    _log.debug("Invalid STREAM CONNECT ID specified: " +strid);
     673                return false;
     674            }
     675            if (id < 1) {
     676                if (_log.shouldLog(Log.DEBUG))
     677                    _log.debug("Invalid STREAM CONNECT ID specified: " +strid);
     678                return false;
     679            }
     680        }
     681
     682        String dest = (String) props.remove("DESTINATION");
     683        if (dest == null) {
     684            _log.debug("Destination not specified in RAW SEND message");
     685            return false;
     686        }
     687
     688        try {
     689            try {
     690                if (!streamSession.connect(id, dest, props)) {
     691                    if (_log.shouldLog(Log.DEBUG))
     692                        _log.debug("STREAM connection failed");
     693                    return false;
     694                }
     695            } catch (DataFormatException e) {
     696                if (_log.shouldLog(Log.DEBUG))
     697                    _log.debug("Invalid destination in STREAM CONNECT message");
     698                notifyStreamOutgoingConnection ( id, "INVALID_KEY", null );
     699            } catch (SAMInvalidDirectionException e) {
     700                if (_log.shouldLog(Log.DEBUG))
     701                    _log.debug("STREAM CONNECT failed", e);
     702                notifyStreamOutgoingConnection ( id, "INVALID_DIRECTION", null );
     703            } catch (ConnectException e) {
     704                if (_log.shouldLog(Log.DEBUG))
     705                    _log.debug("STREAM CONNECT failed", e);
     706                notifyStreamOutgoingConnection ( id, "CONNECTION_REFUSED", null );
     707            } catch (NoRouteToHostException e) {
     708                if (_log.shouldLog(Log.DEBUG))
     709                    _log.debug("STREAM CONNECT failed", e);
     710                notifyStreamOutgoingConnection ( id, "CANT_REACH_PEER", null );
     711            } catch (InterruptedIOException e) {
     712                if (_log.shouldLog(Log.DEBUG))
     713                    _log.debug("STREAM CONNECT failed", e);
     714                notifyStreamOutgoingConnection ( id, "TIMEOUT", null );
     715            } catch (I2PException e) {
     716                if (_log.shouldLog(Log.DEBUG))
     717                    _log.debug("STREAM CONNECT failed", e);
     718                notifyStreamOutgoingConnection ( id, "I2P_ERROR", null );
     719            }
     720        } catch (IOException e) {
     721            return false ;
     722        }
     723   
     724        return true ;
     725    }
     726   
     727  protected boolean execStreamClose(Properties props) {
     728        if (props.isEmpty()) {
     729            if (_log.shouldLog(Log.DEBUG))
     730                _log.debug("No parameters specified in STREAM CLOSE message");
    730731            return false;
    731732        }
     
    736737            if (strid == null) {
    737738                if (_log.shouldLog(Log.DEBUG))
    738                     _log.debug("ID not specified in STREAM SEND message");
     739                    _log.debug("ID not specified in STREAM CLOSE message");
    739740                return false;
    740741            }
     
    743744            } catch (NumberFormatException e) {
    744745                if (_log.shouldLog(Log.DEBUG))
    745                     _log.debug("Invalid STREAM CONNECT ID specified: " +strid);
    746                 return false;
    747             }
    748             if (id < 1) {
    749                 if (_log.shouldLog(Log.DEBUG))
    750                     _log.debug("Invalid STREAM CONNECT ID specified: " +strid);
    751                 return false;
    752             }
    753             props.remove("ID");
    754         }
    755 
    756         String dest = props.getProperty("DESTINATION");
    757         if (dest == null) {
    758             _log.debug("Destination not specified in RAW SEND message");
    759             return false;
    760         }
    761         props.remove("DESTINATION");
    762 
    763         try {
    764             try {
    765                 if (!getStreamSession().connect(id, dest, props)) {
    766                     if (_log.shouldLog(Log.DEBUG))
    767                         _log.debug("STREAM connection failed");
    768                     return false;
    769                 }
    770             } catch (DataFormatException e) {
    771                 if (_log.shouldLog(Log.DEBUG))
    772                     _log.debug("Invalid destination in STREAM CONNECT message");
    773                 notifyStreamOutgoingConnection ( id, "INVALID_KEY", null );
    774             } catch (SAMInvalidDirectionException e) {
    775                 if (_log.shouldLog(Log.DEBUG))
    776                     _log.debug("STREAM CONNECT failed", e);
    777                 notifyStreamOutgoingConnection ( id, "INVALID_DIRECTION", null );
    778             } catch (ConnectException e) {
    779                 if (_log.shouldLog(Log.DEBUG))
    780                     _log.debug("STREAM CONNECT failed", e);
    781                 notifyStreamOutgoingConnection ( id, "CONNECTION_REFUSED", null );
    782             } catch (NoRouteToHostException e) {
    783                 if (_log.shouldLog(Log.DEBUG))
    784                     _log.debug("STREAM CONNECT failed", e);
    785                 notifyStreamOutgoingConnection ( id, "CANT_REACH_PEER", null );
    786             } catch (InterruptedIOException e) {
    787                 if (_log.shouldLog(Log.DEBUG))
    788                     _log.debug("STREAM CONNECT failed", e);
    789                 notifyStreamOutgoingConnection ( id, "TIMEOUT", null );
    790             } catch (I2PException e) {
    791                 if (_log.shouldLog(Log.DEBUG))
    792                     _log.debug("STREAM CONNECT failed", e);
    793                 notifyStreamOutgoingConnection ( id, "I2P_ERROR", null );
    794             }
    795         } catch (IOException e) {
    796             return false ;
    797         }
    798    
    799         return true ;
    800     }
    801    
    802   protected boolean execStreamClose(Properties props) {
    803         if (props.isEmpty()) {
    804             if (_log.shouldLog(Log.DEBUG))
    805                 _log.debug("No parameters specified in STREAM CLOSE message");
    806             return false;
    807         }
    808 
    809         int id;
    810         {
    811             String strid = props.getProperty("ID");
    812             if (strid == null) {
    813                 if (_log.shouldLog(Log.DEBUG))
    814                     _log.debug("ID not specified in STREAM CLOSE message");
    815                 return false;
    816             }
    817             try {
    818                 id = Integer.parseInt(strid);
    819             } catch (NumberFormatException e) {
    820                 if (_log.shouldLog(Log.DEBUG))
    821746                    _log.debug("Invalid STREAM CLOSE ID specified: " +strid);
    822747                return false;
     
    824749        }
    825750
    826         boolean closed = getStreamSession().closeConnection(id);
     751        boolean closed = streamSession.closeConnection(id);
    827752        if ( (!closed) && (_log.shouldLog(Log.WARN)) )
    828753            _log.warn("Stream unable to be closed, but this is non fatal");
     
    842767    // SAMRawReceiver implementation
    843768    public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException {
    844         if (getRawSession() == null) {
     769        if (rawSession == null) {
    845770            _log.error("BUG! Received raw bytes, but session is null!");
    846771            return;
     
    868793            _log.debug("stopRawReceiving() invoked");
    869794
    870         if (getRawSession() == null) {
     795        if (rawSession == null) {
    871796            _log.error("BUG! Got raw receiving stop, but session is null!");
    872797            return;
     
    884809    public void receiveDatagramBytes(Destination sender, byte data[], int proto,
    885810                                     int fromPort, int toPort) throws IOException {
    886         if (getDatagramSession() == null) {
     811        if (datagramSession == null) {
    887812            _log.error("BUG! Received datagram bytes, but session is null!");
    888813            return;
     
    911836            _log.debug("stopDatagramReceiving() invoked");
    912837
    913         if (getDatagramSession() == null) {
     838        if (datagramSession == null) {
    914839            _log.error("BUG! Got datagram receiving stop, but session is null!");
    915840            return;
     
    928853    public void streamSendAnswer( int id, String result, String bufferState ) throws IOException
    929854    {
    930         if ( getStreamSession() == null )
     855        if ( streamSession == null )
    931856        {
    932857            _log.error ( "BUG! Want to answer to stream SEND, but session is null!" );
     
    946871    public void notifyStreamSendBufferFree( int id ) throws IOException
    947872    {
    948         if ( getStreamSession() == null )
     873        if ( streamSession == null )
    949874        {
    950875            _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" );
     
    960885
    961886    public void notifyStreamIncomingConnection(int id, Destination d) throws IOException {
    962         if (getStreamSession() == null) {
     887        if (streamSession == null) {
    963888            _log.error("BUG! Received stream connection, but session is null!");
    964889            return;
     
    975900    public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException
    976901    {
    977         if ( getStreamSession() == null )
     902        if ( streamSession == null )
    978903        {
    979904            _log.error ( "BUG! Received stream connection, but session is null!" );
     
    1016941        return rv;
    1017942    }
     943
     944    /**
     945     * Write a string and message, escaping the message.
     946     * Writes s + createMessageString(msg) + \n
     947     *
     948     * @param s The string, non-null
     949     * @param s The message may be null
     950     * @since 0.9.25
     951     */
     952    protected boolean writeString(String s, String msg) {
     953        return writeString(s + createMessageString(msg) + '\n');
     954    }
    1018955 
    1019956    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
    1020         if (getStreamSession() == null) {
     957        if (streamSession == null) {
    1021958            _log.error("Received stream bytes, but session is null!");
    1022959            return;
     
    1039976    /** @param msg may be null */
    1040977    public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
    1041         if (getStreamSession() == null) {
     978        if (streamSession == null) {
    1042979            _log.error("BUG! Received stream disconnection, but session is null!");
    1043980            return;
     
    1054991            _log.debug("stopStreamReceiving() invoked", new Exception("stopped"));
    1055992
    1056         if (getStreamSession() == null) {
     993        if (streamSession == null) {
    1057994            _log.error("BUG! Got stream receiving stop, but session is null!");
    1058995            return;
  • apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java

    r84b9436 r241bb38  
    3737 * @author mkvore
    3838 */
    39 
    4039class SAMv2StreamSession extends SAMStreamSession
    4140{
    42 
    4341                /**
    4442                 * Create a new SAM STREAM session.
     43                 *
     44                 * Caller MUST call start().
    4545                 *
    4646                 * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile)
     
    6161                 * Create a new SAM STREAM session.
    6262                 *
     63                 * Caller MUST call start().
     64                 *
    6365                 * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
    6466                 * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
     
    8789                 * @return true if the communication with the SAM client is ok
    8890                 */
    89 
    9091                @Override
    9192                public boolean connect ( int id, String dest, Properties props )
     
    121122                }
    122123
    123 
    124 
    125 
    126                 /**
    127                                 * SAM STREAM socket connecter, running in its own thread. 
    128                                 *
    129                                 * @author mkvore
    130                 */
    131 
     124                /**
     125                 * SAM STREAM socket connecter, running in its own thread. 
     126                 *
     127                 * @author mkvore
     128                 */
    132129                private class StreamConnector implements Runnable
    133130                {
    134 
    135131                                private final int id;
    136132                                private final Destination      dest ;
     
    138134
    139135                                /**
    140                                                 * Create a new SAM STREAM session socket reader
    141                                                 *
    142                                                 * @param id   Unique id assigned to the handler
    143                                                 * @param dest Destination to reach
    144                                                 * @param opts Socket options (I2PSocketOptions)
     136                                 * Create a new SAM STREAM session socket reader
     137                                 *
     138                                 * @param id   Unique id assigned to the handler
     139                                 * @param dest Destination to reach
     140                                 * @param opts Socket options (I2PSocketOptions)
    145141                                */
    146 
    147142
    148143                                public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException
     
    156151                                }
    157152
    158 
    159153                                public void run()
    160154                                {
     
    216210                }
    217211
    218 
    219 
    220                 /**
    221                                 * Lets us push data through the stream without blocking, (even after exceeding
    222                                 * the I2PSocket's buffer)
     212                /**
     213                 * Lets us push data through the stream without blocking, (even after exceeding
     214                 * the I2PSocket's buffer)
    223215                 *
    224216                 * @param s I2PSocket
     
    227219                 * @throws IOException
    228220                 */
    229 
    230221                @Override
    231222                protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException
     
    242233
    243234                private class V2StreamSender extends StreamSender
    244 
    245235                {
    246236                                private final List<ByteArray> _data;
     
    261251
    262252                                /**
    263                                                 * Send bytes through the SAM STREAM session socket sender
    264                                                 *
     253                                 * Send bytes through the SAM STREAM session socket sender
     254                                 *
    265255                                 * @param in Data stream of data to send
    266256                                 * @param size Count of bytes to send
    267257                                 * @throws IOException if the client didnt provide enough data
    268                                 */
     258                                 */
    269259                                @Override
    270260                                public void sendBytes ( InputStream in, int size ) throws IOException
     
    308298
    309299                                /**
    310                                                 * Stop a SAM STREAM session socket sender thread immediately
    311                                                 *
    312                                 */
     300                                 * Stop a SAM STREAM session socket sender thread immediately
     301                                 *
     302                                 */
    313303                                @Override
    314304                                public void stopRunning()
     
    343333
    344334                                /**
    345                                                 * Stop a SAM STREAM session socket sender gracefully: stop the
    346                                                 * sender thread once all pending data has been sent.
    347                                 */
     335                                 * Stop a SAM STREAM session socket sender gracefully: stop the
     336                                 * sender thread once all pending data has been sent.
     337                                 */
    348338                                @Override
    349339                                public void shutDownGracefully()
     
    431421                                }
    432422                }
    433 
    434 
    435423
    436424                /**
     
    460448                }
    461449
    462 
    463                 /**
    464                                 * SAM STREAM socket reader, running in its own thread.  It forwards
    465                                 * forward data to/from an I2P socket.
    466                                 *
    467                                 * @author human
     450                /**
     451                 * SAM STREAM socket reader, running in its own thread.  It forwards
     452                 * forward data to/from an I2P socket.
     453                 *
     454                 * @author human
    468455                */
    469 
    470                
    471 
    472456                public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader
    473457                {
    474 
    475458                                protected boolean nolimit       ;
    476459                                protected long    limit         ;
    477460                                protected long    totalReceived ;
    478461
    479 
    480462                                /**
    481                                                 * Create a new SAM STREAM session socket reader
    482                                                 *
    483                                                 * @param s Socket to be handled
    484                                                 * @param id Unique id assigned to the handler
    485                                 */
     463                                 * Create a new SAM STREAM session socket reader
     464                                 *
     465                                 * @param s Socket to be handled
     466                                 * @param id Unique id assigned to the handler
     467                                 */
    486468                                public SAMv2StreamSessionSocketReader ( I2PSocket s, int id ) throws IOException
    487469                                {
     
    582564                                }
    583565                }
    584 
    585 
    586 
    587566}
  • apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java

    r84b9436 r241bb38  
    138138        private static class MessageDispatcher implements Runnable {
    139139                private final ByteArrayInputStream is;
     140                private static final int MAX_LINE_LENGTH = 2*1024;
    140141       
    141142                public MessageDispatcher(byte[] buf) {
     
    145146                public void run() {
    146147                        try {
    147                                 String header = DataHelper.readLine(is).trim();
     148                                // not UTF-8
     149                                //String header = DataHelper.readLine(is).trim();
    148150                                // we cannot use SAMUtils.parseParams() here
     151                                final UTF8Reader reader = new UTF8Reader(is);
     152                                final StringBuilder buf = new StringBuilder(MAX_LINE_LENGTH);
     153                                int c;
     154                                int i = 0;
     155                                while ((c = reader.read()) != -1) {
     156                                        if (++i > MAX_LINE_LENGTH)
     157                                                throw new IOException("Line too long - max " + MAX_LINE_LENGTH);
     158                                        if (c == '\n')
     159                                                break;
     160                                        buf.append((char)c);
     161                                }
     162                                String header = buf.toString();
    149163                                StringTokenizer tok = new StringTokenizer(header, " ");
    150164                                if (tok.countTokens() < 3) {
     
    161175                                String dest = tok.nextToken();
    162176
    163                                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     177                                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    164178                                if (rec!=null) {
    165179                                        Properties sprops = rec.getProps();
     180                                        // 3.2 props
    166181                                        String pr = sprops.getProperty("PROTOCOL");
    167182                                        String fp = sprops.getProperty("FROM_PORT");
    168183                                        String tp = sprops.getProperty("TO_PORT");
     184                                        // 3.3 props
     185                                        // If this is a straight DATAGRAM or RAW session, we
     186                                        // don't need to send these, the router already got them in
     187                                        // the options, but if a subsession, we must, so just
     188                                        // do it all the time.
     189                                        String st = sprops.getProperty("crypto.tagsToSend");
     190                                        String tt = sprops.getProperty("crypto.lowTagThreshold");
     191                                        String sl = sprops.getProperty("shouldBundleReplyInfo");
     192                                        String exms = sprops.getProperty("clientMessageTimeout");  // ms
     193                                        String exs = null;                                         // seconds
    169194                                        while (tok.hasMoreTokens()) {
    170195                                                String t = tok.nextToken();
     196                                                // 3.2 props
    171197                                                if (t.startsWith("PROTOCOL="))
    172198                                                        pr = t.substring("PROTOCOL=".length());
     
    175201                                                else if (t.startsWith("TO_PORT="))
    176202                                                        tp = t.substring("TO_PORT=".length());
     203                                                // 3.3 props
     204                                                else if (t.startsWith("SEND_TAGS="))
     205                                                        st = t.substring("SEND_TAGS=".length());
     206                                                else if (t.startsWith("TAG_THRESHOLD="))
     207                                                        tt = t.substring("TAG_THRESHOLD=".length());
     208                                                else if (t.startsWith("EXPIRES="))
     209                                                        exs = t.substring("EXPIRES=".length());
     210                                                else if (t.startsWith("SEND_LEASESET="))
     211                                                        sl = t.substring("SEND_LEASESET=".length());
    177212                                        }
    178213
     214                                        // 3.2 props
    179215                                        int proto = I2PSession.PROTO_UNSPECIFIED;
    180216                                        int fromPort = I2PSession.PORT_UNSPECIFIED;
    181217                                        int toPort = I2PSession.PORT_UNSPECIFIED;
    182                                         if (pr != null) {
    183                                                 try {
     218                                        // 3.3 props
     219                                        int sendTags = 0;
     220                                        int tagThreshold = 0;
     221                                        int expires = 0; // seconds
     222                                        boolean sendLeaseSet = true;
     223                                        try {
     224                                                // 3.2 props
     225                                                if (pr != null)
    184226                                                        proto = Integer.parseInt(pr);
    185                                                 } catch (NumberFormatException nfe) {
    186                                                         warn("Bad datagram header received");
    187                                                         return;
    188                                                 }
    189                                         }
    190                                         if (fp != null) {
    191                                                 try {
     227                                                if (fp != null)
    192228                                                        fromPort = Integer.parseInt(fp);
    193                                                 } catch (NumberFormatException nfe) {
    194                                                         warn("Bad datagram header received");
    195                                                         return;
    196                                                 }
    197                                         }
    198                                         if (tp != null) {
    199                                                 try {
     229                                                if (tp != null)
    200230                                                        toPort = Integer.parseInt(tp);
    201                                                 } catch (NumberFormatException nfe) {
    202                                                         warn("Bad datagram header received");
    203                                                         return;
    204                                                 }
     231                                                // 3.3 props
     232                                                if (st != null)
     233                                                        sendTags = Integer.parseInt(st);
     234                                                if (tt != null)
     235                                                        tagThreshold = Integer.parseInt(tt);
     236                                                if (exs != null)
     237                                                        expires = Integer.parseInt(exs);
     238                                                else if (exms != null)
     239                                                        expires = Integer.parseInt(exms) / 1000;
     240                                                if (sl != null)
     241                                                        sendLeaseSet = Boolean.parseBoolean(sl);
     242                                        } catch (NumberFormatException nfe) {
     243                                                warn("Bad datagram header received");
     244                                                return;
    205245                                        }
    206246                                        // TODO too many allocations and copies. One here and one in Listener above.
    207247                                        byte[] data = new byte[is.available()];
    208248                                        is.read(data);
    209                                         SAMv3Handler.Session sess = rec.getHandler().getSession();
    210                                         if (sess != null)
    211                                                 sess.sendBytes(dest, data, proto, fromPort, toPort);
    212                                         else
     249                                        Session sess = rec.getHandler().getSession();
     250                                        if (sess != null) {
     251                                                if (sendTags > 0 || tagThreshold > 0 || expires > 0 || !sendLeaseSet) {
     252                                                        sess.sendBytes(dest, data, proto, fromPort, toPort,
     253                                                                       sendLeaseSet, sendTags, tagThreshold, expires);
     254                                                } else {
     255                                                        sess.sendBytes(dest, data, proto, fromPort, toPort);
     256                                                }
     257                                        } else {
    213258                                                warn("Dropping datagram, no session for " + nick);
     259                                        }
    214260                                } else {
    215261                                        warn("Dropping datagram, no session for " + nick);
  • apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java

    r84b9436 r241bb38  
    77
    88import java.io.IOException;
     9import java.net.InetSocketAddress;
     10import java.net.SocketAddress ;
     11import java.nio.ByteBuffer;
    912import java.util.Properties;
    1013
     14import net.i2p.client.I2PSession;
    1115import net.i2p.client.I2PSessionException;
    1216import net.i2p.data.DataFormatException;
     
    1519import net.i2p.util.Log;
    1620
    17 import java.net.InetSocketAddress;
    18 import java.net.SocketAddress ;
    19 import java.nio.ByteBuffer;
    2021
    21 class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Session, SAMDatagramReceiver {
     22class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDatagramReceiver {
    2223       
    2324        private final SAMv3Handler handler;
     
    3132         *   build a DatagramSession according to informations registered
    3233         *   with the given nickname
     34         *
     35         * Caller MUST call start().
    3336         *
    3437         * @param nick nickname of the session
     
    4750                this.server = dgServer;
    4851
    49                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     52                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    5053                if (rec == null)
    5154                        throw new SAMException("Record disappeared for nickname : \""+nick+"\"");
     
    5457               
    5558                Properties props = rec.getProps();
    56                 String portStr = props.getProperty("PORT");
    57                 if (portStr == null) {
    58                         if (_log.shouldDebug())
    59                                 _log.debug("receiver port not specified. Current socket will be used.");
    60                         this.clientAddress = null;
    61                 } else {
    62                         int port = Integer.parseInt(portStr);
    63                         String host = props.getProperty("HOST");
    64                         if (host == null) {             
    65                                 host = rec.getHandler().getClientIP();
    66                                 if (_log.shouldDebug())
    67                                         _log.debug("no host specified. Taken from the client socket : " + host+':'+port);
    68                         }
    69                         this.clientAddress = new InetSocketAddress(host, port);
    70                 }
     59                clientAddress = SAMv3RawSession.getSocketAddress(props, handler);
     60        }
     61
     62        /**
     63         *   Build a Datagram Session on an existing i2p session
     64         *   registered with the given nickname
     65         *   
     66         * Caller MUST call start().
     67         *
     68         * @param nick nickname of the session
     69         * @throws IOException
     70         * @throws DataFormatException
     71         * @throws I2PSessionException
     72         * @since 0.9.25
     73         */
     74        public SAMv3DatagramSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess,
     75                                    int listenPort, SAMv3DatagramServer dgServer)
     76                        throws IOException, DataFormatException, I2PSessionException {
     77                super(isess, props, listenPort, null);  // to be replaced by this
     78                this.nick = nick ;
     79                this.recv = this ;  // replacement
     80                this.server = dgServer;
     81                this.handler = handler;
     82                clientAddress = SAMv3RawSession.getSocketAddress(props, handler);
    7183        }
    7284
  • apps/sam/java/src/net/i2p/sam/SAMv3Handler.java

    r84b9436 r241bb38  
    2424import java.nio.ByteBuffer;
    2525import java.util.Properties;
    26 import java.util.HashMap;
    2726
    2827import net.i2p.I2PAppContext;
     
    5049       
    5150        private Session session;
     51        // TODO remove singleton, hang off SAMBridge like dgserver
    5252        public static final SessionsDB sSessionsHash = new SessionsDB();
    5353        private volatile boolean stolenSocket;
     
    5757        private static final int FIRST_READ_TIMEOUT = 60*1000;
    5858        private static final int READ_TIMEOUT = 3*60*1000;
    59        
    60         interface Session {
    61                 String getNick();
    62                 void close();
    63                 boolean sendBytes(String dest, byte[] data, int proto,
    64                                   int fromPort, int toPort) throws DataFormatException, I2PSessionException;
    65         }
     59        private static final String AUTH_ERROR = "AUTH STATUS RESULT=I2P_ERROR";
    6660       
    6761        /**
     
    10599                return (verMajor == 3);
    106100        }
    107        
    108         /**
    109          *  The values in the SessionsDB
    110          */
    111         public static class SessionRecord
    112         {
    113                 private final String m_dest ;
    114                 private final Properties m_props ;
    115                 private ThreadGroup m_threadgroup ;
    116                 private final SAMv3Handler m_handler ;
    117 
    118                 public SessionRecord( String dest, Properties props, SAMv3Handler handler )
    119                 {
    120                         m_dest = dest;
    121                         m_props = new Properties() ;
    122                         m_props.putAll(props);
    123                         m_handler = handler ;
    124                 }
    125 
    126                 public SessionRecord( SessionRecord in )
    127                 {
    128                         m_dest = in.getDest();
    129                         m_props = in.getProps();
    130                         m_threadgroup = in.getThreadGroup();
    131                         m_handler = in.getHandler();
    132                 }
    133 
    134                 public String getDest()
    135                 {
    136                         return m_dest;
    137                 }
    138 
    139                 synchronized public Properties getProps()
    140                 {
    141                         Properties p = new Properties();
    142                         p.putAll(m_props);
    143                         return m_props;
    144                 }
    145 
    146                 public SAMv3Handler getHandler()
    147                 {
    148                         return m_handler ;
    149                 }
    150 
    151                 synchronized public ThreadGroup getThreadGroup()
    152                 {
    153                         return m_threadgroup ;
    154                 }
    155 
    156                 synchronized public void createThreadGroup(String name)
    157                 {
    158                         if (m_threadgroup == null)
    159                                 m_threadgroup = new ThreadGroup(name);
    160                 }
    161         }
    162 
    163         /**
    164          *  basically a HashMap from String to SessionRecord
    165          */
    166         public static class SessionsDB
    167         {
    168                 private static final long serialVersionUID = 0x1;
    169 
    170                 static class ExistingIdException extends Exception {
    171                         private static final long serialVersionUID = 0x1;
    172                 }
    173 
    174                 static class ExistingDestException extends Exception {
    175                         private static final long serialVersionUID = 0x1;
    176                 }
    177                
    178                 private final HashMap<String, SessionRecord> map;
    179 
    180                 public SessionsDB() {
    181                         map = new HashMap<String, SessionRecord>() ;
    182                 }
    183 
    184                 /** @return success */
    185                 synchronized public boolean put( String nick, SessionRecord session )
    186                         throws ExistingIdException, ExistingDestException
    187                 {
    188                         if ( map.containsKey(nick) ) {
    189                                 throw new ExistingIdException();
    190                         }
    191                         for ( SessionRecord r : map.values() ) {
    192                                 if (r.getDest().equals(session.getDest())) {
    193                                         throw new ExistingDestException();
    194                                 }
    195                         }
    196 
    197                         if ( !map.containsKey(nick) ) {
    198                                 session.createThreadGroup("SAM session "+nick);
    199                                 map.put(nick, session) ;
    200                                 return true ;
    201                         }
    202                         else
    203                                 return false ;
    204                 }
    205 
    206                 /** @return true if removed */
    207                 synchronized public boolean del( String nick )
    208                 {
    209                         return map.remove(nick) != null;
    210                 }
    211 
    212                 synchronized public SessionRecord get(String nick)
    213                 {
    214                         return map.get(nick);
    215                 }
    216 
    217                 synchronized public boolean containsKey( String nick )
    218                 {
    219                         return map.containsKey(nick);
    220                 }
    221         }
    222101
    223102        public String getClientIP()
     
    256135                return session;
    257136        }
    258        
     137
     138        /**
     139         *  For subsessions created by MasterSession
     140         *  @since 0.9.25
     141         */
     142        void setSession(SAMv3RawSession sess) {
     143                rawSession = sess; session = sess;
     144        }
     145
     146        /**
     147         *  For subsessions created by MasterSession
     148         *  @since 0.9.25
     149         */
     150        void setSession(SAMv3DatagramSession sess) {
     151                datagramSession = sess; session = sess;
     152        }       
     153
     154        /**
     155         *  For subsessions created by MasterSession
     156         *  @since 0.9.25
     157         */
     158        void setSession(SAMv3StreamSession sess) {
     159                streamSession = sess; session = sess;
     160        }       
     161
    259162        @Override
    260163        public void handle() {
     
    295198                                                                        if (_log.shouldWarn())
    296199                                                                                _log.warn("Failed to respond to PING");
    297                                                                         writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
     200                                                                        writeString(SESSION_ERROR, "PONG timeout");
    298201                                                                        break;
    299202                                                                }
     
    310213                                                                        if (_log.shouldWarn())
    311214                                                                                _log.warn("Failed to respond to PING");
    312                                                                         writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
     215                                                                        writeString(SESSION_ERROR, "PONG timeout");
    313216                                                                        break;
    314217                                                                }
     
    316219                                                                if (_log.shouldWarn())
    317220                                                                        _log.warn("2nd timeout");
    318                                                                 writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
     221                                                                writeString(SESSION_ERROR, "command timeout, bye");
    319222                                                                break;
    320223                                                        } else {
     
    337240                                                socket.setSoTimeout(0);
    338241                                        } catch (SocketTimeoutException ste) {
    339                                                 writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
     242                                                writeString(SESSION_ERROR, "command timeout, bye");
    340243                                                break;
    341244                                        }
     
    374277                                if (opcode == null) {
    375278                                        // This is not a correct message, for sure
    376                                         if (writeString(domain + " STATUS RESULT=I2P_ERROR MESSAGE=\"command not specified\"\n"))
     279                                        if (writeString(domain + " STATUS RESULT=I2P_ERROR", "command not specified"))
    377280                                                continue;
    378281                                        else
     
    412315                        if (_log.shouldLog(Log.DEBUG))
    413316                                _log.debug("Caught IOException in handler", e);
     317                        writeString(SESSION_ERROR, e.getMessage());
    414318                } catch (SAMException e) {
    415319                        _log.error("Unexpected exception for message [" + msg + ']', e);
     320                        writeString(SESSION_ERROR, e.getMessage());
    416321                } catch (RuntimeException e) {
    417322                        _log.error("Unexpected exception for message [" + msg + ']', e);
     323                        writeString(SESSION_ERROR, e.getMessage());
    418324                } finally {
    419325                        if (_log.shouldLog(Log.DEBUG))
     
    493399
    494400                String dest = "BUG!";
    495                 String nick =  null ;
    496401                boolean ok = false ;
     402
     403                String nick = (String) props.remove("ID");
     404                if (nick == null)
     405                        return writeString(SESSION_ERROR, "ID not specified");
     406
     407                String style = (String) props.remove("STYLE");
     408                if (style == null && !opcode.equals("REMOVE"))
     409                        return writeString(SESSION_ERROR, "No SESSION STYLE specified");
    497410
    498411                try{
     
    502415                                        if (_log.shouldLog(Log.DEBUG))
    503416                                                _log.debug("Trying to create a session, but one still exists");
    504                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
     417                                        return writeString(SESSION_ERROR, "Session already exists");
    505418                                }
    506419                                if (props.isEmpty()) {
    507420                                        if (_log.shouldLog(Log.DEBUG))
    508421                                                _log.debug("No parameters specified in SESSION CREATE message");
    509                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
    510                                 }
    511 
    512                                 dest = props.getProperty("DESTINATION");
     422                                        return writeString(SESSION_ERROR, "No parameters for SESSION CREATE");
     423                                }
     424
     425                                dest = (String) props.remove("DESTINATION");
    513426                                if (dest == null) {
    514427                                        if (_log.shouldLog(Log.DEBUG))
    515428                                                _log.debug("SESSION DESTINATION parameter not specified");
    516                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
    517                                 }
    518                                 props.remove("DESTINATION");
     429                                        return writeString(SESSION_ERROR, "DESTINATION not specified");
     430                                }
    519431
    520432                                if (dest.equals("TRANSIENT")) {
    521433                                        if (_log.shouldLog(Log.DEBUG))
    522434                                                _log.debug("TRANSIENT destination requested");
    523                                         String sigTypeStr = props.getProperty("SIGNATURE_TYPE");
     435                                        String sigTypeStr = (String) props.remove("SIGNATURE_TYPE");
    524436                                        SigType sigType;
    525437                                        if (sigTypeStr != null) {
    526438                                                sigType = SigType.parseSigType(sigTypeStr);
    527439                                                if (sigType == null) {
    528                                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"SIGNATURE_TYPE "
    529                                                                            + sigTypeStr + " unsupported\"\n");
     440                                                        return writeString(SESSION_ERROR, "SIGNATURE_TYPE "
     441                                                                           + sigTypeStr + " unsupported");
    530442                                                }
    531                                                 props.remove("SIGNATURE_TYPE");
    532443                                        } else {
    533444                                                sigType = SigType.DSA_SHA1;
     
    544455                                }
    545456
    546 
    547                                 nick = props.getProperty("ID");
    548                                 if (nick == null) {
    549                                         if (_log.shouldLog(Log.DEBUG))
    550                                                 _log.debug("SESSION ID parameter not specified");
    551                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n");
    552                                 }
    553                                 props.remove("ID");
    554 
    555 
    556                                 String style = props.getProperty("STYLE");
    557                                 if (style == null) {
    558                                         if (_log.shouldLog(Log.DEBUG))
    559                                                 _log.debug("SESSION STYLE parameter not specified");
    560                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
    561                                 }
    562                                 props.remove("STYLE");
    563 
    564457                                // Unconditionally override what the client may have set
    565458                                // (iMule sets BestEffort) as None is more efficient
     
    571464                                allProps.putAll(i2cpProps);
    572465                                allProps.putAll(props);
    573                                
     466
     467                                if (style.equals("MASTER")) {
     468                                        // We must put these here, as SessionRecord.getProps() makes a copy,
     469                                        // and the socket manager is instantiated in the
     470                                        // SAMStreamSession constructor.
     471                                        allProps.setProperty("i2p.streaming.enforceProtocol", "true");
     472                                        allProps.setProperty("i2cp.dontPublishLeaseSet", "false");
     473                                }
    574474
    575475                                try {
     
    591491                                        rawSession = v3;
    592492                                        this.session = v3;
     493                                        v3.start();
    593494                                } else if (style.equals("DATAGRAM")) {
    594495                                        SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
     
    596497                                        datagramSession = v3;
    597498                                        this.session = v3;
     499                                        v3.start();
    598500                                } else if (style.equals("STREAM")) {
    599501                                        SAMv3StreamSession v3 = newSAMStreamSession(nick);
    600502                                        streamSession = v3;
    601503                                        this.session = v3;
     504                                        v3.start();
     505                                } else if (style.equals("MASTER")) {
     506                                        SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
     507                                        MasterSession v3 = new MasterSession(nick, dgs, this, allProps);
     508                                        streamSession = v3;
     509                                        datagramSession = v3;
     510                                        rawSession = v3;
     511                                        this.session = v3;
     512                                        v3.start();
    602513                                } else {
    603514                                        if (_log.shouldLog(Log.DEBUG))
    604515                                                _log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
    605                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
     516                                        return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE");
    606517                                }
    607518                                ok = true ;
    608519                                return writeString("SESSION STATUS RESULT=OK DESTINATION="
    609520                                                + dest + "\n");
     521                        } else if (opcode.equals("ADD") || opcode.equals("REMOVE")) {
     522                                // prevent trouble in finally block
     523                                ok = true;
     524                                if (streamSession == null || datagramSession == null || rawSession == null)
     525                                        return writeString(SESSION_ERROR, "Not a MASTER session");
     526                                MasterSession msess = (MasterSession) session;
     527                                String msg;
     528                                if (opcode.equals("ADD")) {
     529                                        msg = msess.add(nick, style, props);
     530                                } else {
     531                                        msg = msess.remove(nick, props);
     532                                }
     533                                if (msg == null)
     534                                        return writeString("SESSION STATUS RESULT=OK ID=\"" + nick + '"', opcode + ' ' + nick);
     535                                else
     536                                        return writeString(SESSION_ERROR + " ID=\"" + nick + '"', msg);
    610537                        } else {
    611538                                if (_log.shouldLog(Log.DEBUG))
    612539                                        _log.debug("Unrecognized SESSION message opcode: \""
    613540                                                + opcode + "\"");
    614                                 return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
     541                                return writeString(SESSION_ERROR, "Unrecognized opcode");
    615542                        }
    616543                } catch (DataFormatException e) {
    617544                        if (_log.shouldLog(Log.DEBUG))
    618545                                _log.debug("Invalid destination specified");
    619                         return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     546                        return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage());
    620547                } catch (I2PSessionException e) {
    621548                        if (_log.shouldLog(Log.DEBUG))
    622549                                _log.debug("I2P error when instantiating session", e);
    623                         return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     550                        return writeString(SESSION_ERROR, e.getMessage());
    624551                } catch (SAMException e) {
    625552                        if (_log.shouldLog(Log.INFO))
    626553                                _log.info("Funny SAM error", e);
    627                         return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     554                        return writeString(SESSION_ERROR, e.getMessage());
    628555                } catch (IOException e) {
    629556                        _log.error("Unexpected IOException", e);
    630                         return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
     557                        return writeString(SESSION_ERROR, e.getMessage());
    631558                } finally {
    632559                        // unregister the session if it has not been created
     
    656583                if ( session != null )
    657584                {
    658                         _log.error ( "STREAM message received, but this session is a master session" );
    659                        
     585                        _log.error("v3 control socket cannot be used for STREAM");
    660586                        try {
    661                                 notifyStreamResult(true, "I2P_ERROR", "master session cannot be used for streams");
     587                                notifyStreamResult(true, "I2P_ERROR", "v3 control socket cannot be used for STREAM");
    662588                        } catch (IOException e) {}
    663589                        return false;
    664590                }
    665591
    666                 nick = props.getProperty("ID");
     592                nick = (String) props.remove("ID");
    667593                if (nick == null) {
    668594                        if (_log.shouldLog(Log.DEBUG))
     
    673599                        return false ;
    674600                }
    675                 props.remove("ID");
    676601
    677602                rec = sSessionsHash.get(nick);
    678 
    679603                if ( rec==null ) {
    680604                        if (_log.shouldLog(Log.DEBUG))
    681605                                _log.debug("STREAM SESSION ID does not exist");
    682606                        try {
    683                                 notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID does not exist");
     607                                notifyStreamResult(true, "INVALID_ID", "STREAM SESSION ID " + nick + " does not exist");
    684608                        } catch (IOException e) {}
    685609                        return false ;
     
    687611               
    688612                streamSession = rec.getHandler().streamSession ;
    689                
    690613                if (streamSession==null) {
    691614                        if (_log.shouldLog(Log.DEBUG))
    692615                                _log.debug("specified ID is not a stream session");
    693616                        try {
    694                                 notifyStreamResult(true, "I2P_ERROR",  "specified ID is not a STREAM session");
     617                                notifyStreamResult(true, "I2P_ERROR",  "specified ID " + nick + " is not a STREAM session");
    695618                        } catch (IOException e) {}
    696619                        return false ;
     
    734657                        }
    735658               
    736                         String dest = props.getProperty("DESTINATION");
     659                        String dest = (String) props.remove("DESTINATION");
    737660                        if (dest == null) {
    738661                                notifyStreamResult(verbose, "I2P_ERROR", "Destination not specified in STREAM CONNECT message");
     
    741664                                return false;
    742665                        }
    743                         props.remove("DESTINATION");
    744666
    745667                        try {
     
    749671                                if (_log.shouldLog(Log.DEBUG))
    750672                                        _log.debug("Invalid destination in STREAM CONNECT message");
    751                                 notifyStreamResult ( verbose, "INVALID_KEY", null );
     673                                notifyStreamResult ( verbose, "INVALID_KEY", e.getMessage());
    752674                        } catch (ConnectException e) {
    753675                                if (_log.shouldLog(Log.DEBUG))
    754676                                        _log.debug("STREAM CONNECT failed", e);
    755                                 notifyStreamResult ( verbose, "CONNECTION_REFUSED", null );
     677                                notifyStreamResult ( verbose, "CONNECTION_REFUSED", e.getMessage());
    756678                        } catch (NoRouteToHostException e) {
    757679                                if (_log.shouldLog(Log.DEBUG))
    758680                                        _log.debug("STREAM CONNECT failed", e);
    759                                 notifyStreamResult ( verbose, "CANT_REACH_PEER", null );
     681                                notifyStreamResult ( verbose, "CANT_REACH_PEER", e.getMessage());
    760682                        } catch (InterruptedIOException e) {
    761683                                if (_log.shouldLog(Log.DEBUG))
    762684                                        _log.debug("STREAM CONNECT failed", e);
    763                                 notifyStreamResult ( verbose, "TIMEOUT", null );
     685                                notifyStreamResult ( verbose, "TIMEOUT", e.getMessage());
    764686                        } catch (I2PException e) {
    765687                                if (_log.shouldLog(Log.DEBUG))
     
    813735                                if (_log.shouldLog(Log.DEBUG))
    814736                                        _log.debug("STREAM ACCEPT failed", e);
    815                                 notifyStreamResult ( verbose, "ALREADY_ACCEPTING", null );
     737                                notifyStreamResult ( verbose, "ALREADY_ACCEPTING", e.getMessage());
    816738                        }
    817739                } catch (IOException e) {
     
    821743       
    822744
     745        /**
     746         * @param verbose if false, does nothing
     747         * @param result non-null
     748         * @param message may be null
     749         */
    823750        public void notifyStreamResult(boolean verbose, String result, String message) throws IOException {
    824751                if (!verbose) return ;
     
    871798                        String pw = props.getProperty("PASSWORD");
    872799                        if (user == null || pw == null)
    873                                 return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER and PASSWORD required\"\n");
     800                                return writeString(AUTH_ERROR, "USER and PASSWORD required");
    874801                        String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
    875802                        if (i2cpProps.containsKey(prop))
    876                                 return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " already exists\"\n");
     803                                return writeString(AUTH_ERROR, "user " + user + " already exists");
    877804                        PasswordManager pm = new PasswordManager(I2PAppContext.getGlobalContext());
    878805                        String shash = pm.createHash(pw);
     
    881808                        String user = props.getProperty("USER");
    882809                        if (user == null)
    883                                 return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER required\"\n");
     810                                return writeString(AUTH_ERROR, "USER required");
    884811                        String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
    885812                        if (!i2cpProps.containsKey(prop))
    886                                 return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " not found\"\n");
     813                                return writeString(AUTH_ERROR, "user " + user + " not found");
    887814                        i2cpProps.remove(prop);
    888815                } else {
    889                         return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown AUTH command\"\n");
     816                        return writeString(AUTH_ERROR, "Unknown AUTH command");
    890817                }
    891818                try {
     
    893820                        return writeString("AUTH STATUS RESULT=OK\n");
    894821                } catch (IOException ioe) {
    895                         return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Config save failed: " + ioe + "\"\n");
     822                        return writeString(AUTH_ERROR, "Config save failed: " + ioe);
    896823                }
    897824        }
  • apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java

    r84b9436 r241bb38  
    1111import java.util.Properties;
    1212
     13import net.i2p.client.I2PSession;
    1314import net.i2p.client.I2PSessionException;
    1415import net.i2p.data.DataFormatException;
     
    2021 *
    2122 */
    22 class SAMv3RawSession extends SAMRawSession  implements SAMv3Handler.Session, SAMRawReceiver {
     23class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver {
    2324       
    2425        private final String nick;
     
    3435         *   registered with the given nickname
    3536         *   
     37         * Caller MUST call start().
     38         *
    3639         * @param nick nickname of the session
    3740         * @throws IOException
     
    4346                super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
    4447                      SAMv3Handler.sSessionsHash.get(nick).getProps(),
    45                       SAMv3Handler.sSessionsHash.get(nick).getHandler()  // to be replaced by this
     48                      null  // to be replaced by this
    4649                );
    4750                this.nick = nick ;
    4851                this.recv = this ;  // replacement
    4952                this.server = dgServer;
    50 
    51                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     53                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    5254                if (rec == null)
    5355                        throw new InterruptedIOException() ;
    5456                this.handler = rec.getHandler();
    5557                Properties props = rec.getProps();
     58                clientAddress = getSocketAddress(props, handler);
     59                _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
     60                              Boolean.parseBoolean(props.getProperty("HEADER"));
     61        }
     62
     63        /**
     64         *   Build a Raw Session on an existing i2p session
     65         *   registered with the given nickname
     66         *   
     67         * Caller MUST call start().
     68         *
     69         * @param nick nickname of the session
     70         * @throws IOException
     71         * @throws DataFormatException
     72         * @throws I2PSessionException
     73         * @since 0.9.25
     74         */
     75        public SAMv3RawSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess,
     76                               int listenProtocol, int listenPort, SAMv3DatagramServer dgServer)
     77                        throws IOException, DataFormatException, I2PSessionException {
     78                super(isess, props, listenProtocol, listenPort, null);  // to be replaced by this
     79                this.nick = nick ;
     80                this.recv = this ;  // replacement
     81                this.server = dgServer;
     82                this.handler = handler;
     83                clientAddress = getSocketAddress(props, handler);
     84                _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
     85                              Boolean.parseBoolean(props.getProperty("HEADER"));
     86        }
     87       
     88        /**
     89         *  @return null if PORT not set
     90         *  @since 0.9.25 moved from constructor
     91         */
     92        static SocketAddress getSocketAddress(Properties props, SAMv3Handler handler) {
    5693                String portStr = props.getProperty("PORT") ;
    5794                if (portStr == null) {
    58                         if (_log.shouldLog(Log.DEBUG))
    59                                 _log.debug("receiver port not specified. Current socket will be used.");
    60                         this.clientAddress = null;
     95                        return null;
    6196                } else {
    6297                        int port = Integer.parseInt(portStr);
    6398                        String host = props.getProperty("HOST");
    6499                        if ( host==null ) {
    65                                 host = rec.getHandler().getClientIP();
    66                                 if (_log.shouldLog(Log.DEBUG))
    67                                         _log.debug("no host specified. Taken from the client socket : " + host +':'+port);
     100                                host = handler.getClientIP();
    68101                        }
    69                         this.clientAddress = new InetSocketAddress(host, port);
     102                        return new InetSocketAddress(host, port);
    70103                }
    71                 _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
    72                               Boolean.parseBoolean(props.getProperty("HEADER"));
    73104        }
    74        
     105
    75106        public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException {
    76107                if (this.clientAddress==null) {
  • apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java

    r84b9436 r241bb38  
    2222import java.security.GeneralSecurityException;
    2323import java.util.Properties;
     24import java.util.concurrent.LinkedBlockingQueue;
    2425import java.util.concurrent.atomic.AtomicInteger;
    2526
     
    3132import net.i2p.client.streaming.I2PServerSocket;
    3233import net.i2p.client.streaming.I2PSocket;
     34import net.i2p.client.streaming.I2PSocketManager;
    3335import net.i2p.client.streaming.I2PSocketOptions;
    3436import net.i2p.data.DataFormatException;
     
    4446 */
    4547
    46 class SAMv3StreamSession  extends SAMStreamSession implements SAMv3Handler.Session
     48class SAMv3StreamSession  extends SAMStreamSession implements Session
    4749{
    4850
    49                 private static final int BUFFER_SIZE = 1024 ;
     51                private static final int BUFFER_SIZE = 1024;
     52                private static final int MAX_ACCEPT_QUEUE = 64;
    5053               
    5154                private final Object socketServerLock = new Object();
     
    5457                /** this is the count of active ACCEPT sockets */
    5558                private final AtomicInteger _acceptors = new AtomicInteger();
     59                /** for subsession only, null otherwise */
     60                private final LinkedBlockingQueue<I2PSocket> _acceptQueue;
    5661
    5762                private static I2PSSLSocketFactory _sslSocketFactory;
     
    6772             * registered with the given nickname
    6873             *
     74             * Caller MUST call start().
     75             *
    6976             * @param login The nickname
    7077             * @throws IOException
     
    8087                      getDB().get(login).getHandler());
    8188                this.nick = login ;
    82             }
    83 
    84             public static SAMv3Handler.SessionsDB getDB()
     89                _acceptQueue = null;
     90            }
     91
     92            /**
     93             *   Build a Stream Session on an existing I2P session
     94             *   registered with the given nickname
     95             *   
     96             * Caller MUST call start().
     97             *
     98             * @param nick nickname of the session
     99             * @throws IOException
     100             * @throws DataFormatException
     101             * @throws I2PSessionException
     102             * @since 0.9.25
     103             */
     104            public SAMv3StreamSession(String login, Properties props, SAMv3Handler handler, I2PSocketManager mgr,
     105                                        int listenPort) throws IOException, DataFormatException, SAMException {
     106                super(mgr, props, handler, listenPort);
     107                this.nick = login ;
     108                _acceptQueue = new LinkedBlockingQueue<I2PSocket>(MAX_ACCEPT_QUEUE);
     109            }
     110
     111            /**
     112             * Put a socket on the accept queue.
     113             * Only for subsession, throws IllegalStateException otherwise.
     114             *   
     115             * @return success, false if full
     116             * @since 0.9.25
     117             */
     118            public boolean queueSocket(I2PSocket sock) {
     119                if (_acceptQueue == null)
     120                    throw new IllegalStateException();
     121                return _acceptQueue.offer(sock);
     122            }
     123
     124            /**
     125             * Take a socket from the accept queue.
     126             * Only for subsession, throws IllegalStateException otherwise.
     127             *   
     128             * @since 0.9.25
     129             */
     130            private I2PSocket acceptSocket() throws ConnectException {
     131                if (_acceptQueue == null)
     132                    throw new IllegalStateException();
     133                try {
     134                        // TODO there's no CoDel or expiration in this queue
     135                        return _acceptQueue.take();
     136                } catch (InterruptedException ie) {
     137                        ConnectException ce = new ConnectException("interrupted");
     138                        ce.initCause(ie);
     139                        throw ce;
     140                }
     141            }
     142
     143            public static SessionsDB getDB()
    85144            {
    86145                return SAMv3Handler.sSessionsHash ;
     
    136195                I2PSocket i2ps = socketMgr.connect(d, opts);
    137196
    138                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     197                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    139198               
    140199                if ( rec==null ) throw new InterruptedIOException() ;
     
    186245                }
    187246
    188                 I2PSocket i2ps;
     247                I2PSocket i2ps = null;
    189248                _acceptors.incrementAndGet();
    190249                try {
    191                         i2ps = socketMgr.getServerSocket().accept();
     250                        if (_acceptQueue != null)
     251                                i2ps = acceptSocket();
     252                        else
     253                                i2ps = socketMgr.getServerSocket().accept();
    192254                } finally {
    193255                        _acceptors.decrementAndGet();
    194256                }
    195257
    196                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     258                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    197259
    198260                if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
     
    224286            public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException
    225287            {
    226                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     288                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    227289                boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
    228290               
     
    258320                }
    259321               
    260                 SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, this, verbose, sendPorts);
     322                SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, verbose, sendPorts);
    261323                (new I2PAppThread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
    262324            }
     
    265327             *  Forward sockets from I2P to the host/port provided
    266328             */
    267             private static class SocketForwarder implements Runnable
     329            private class SocketForwarder implements Runnable
    268330            {
    269331                private final String host;
    270332                private final int port;
    271                 private final SAMv3StreamSession session;
    272333                private final boolean isSSL, verbose, sendPorts;
    273334               
    274335                SocketForwarder(String host, int port, boolean isSSL,
    275                                 SAMv3StreamSession session, boolean verbose, boolean sendPorts) {
     336                                boolean verbose, boolean sendPorts) {
    276337                        this.host = host ;
    277338                        this.port = port ;
    278                         this.session = session ;
    279339                        this.verbose = verbose ;
    280340                        this.sendPorts = sendPorts;
     
    284344                public void run()
    285345                {
    286                         while (session.getSocketServer()!=null) {
     346                        while (getSocketServer() != null) {
    287347                               
    288348                                // wait and accept a connection from I2P side
    289349                                I2PSocket i2ps;
    290350                                try {
    291                                         i2ps = session.getSocketServer().accept();
     351                                        if (_acceptQueue != null)
     352                                                i2ps = acceptSocket();
     353                                        else
     354                                                i2ps = getSocketServer().accept();
    292355                                        if (i2ps == null)
    293356                                                continue;
     
    438501            }
    439502           
    440             private I2PServerSocket getSocketServer()
     503            protected I2PServerSocket getSocketServer()
    441504            {
    442505                synchronized ( this.socketServerLock ) {
     
    451514            public void stopForwardingIncoming() throws SAMException, InterruptedIOException
    452515            {
    453                 SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
     516                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    454517               
    455518                if ( rec==null ) throw new InterruptedIOException() ;
     
    475538            /**
    476539             * Close the stream session
     540             * TODO Why do we override?
    477541             */
    478542            @Override
    479543            public void close() {
    480                 socketMgr.destroySocketManager();
    481             }
    482 
    483             /**
    484              *  Unsupported
    485              *  @throws DataFormatException always
    486              */
    487             public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException
    488             {
    489                 throw new DataFormatException(null);
     544                if (_isOwnSession)
     545                        socketMgr.destroySocketManager();
    490546            }
    491547}
  • apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java

    r84b9436 r241bb38  
    1919    private final Object _helloLock = new Object();
    2020    private Boolean _sessionCreateOk;
     21    private Boolean _sessionAddOk;
    2122    private Boolean _streamStatusOk;
    2223    private final Object _sessionCreateLock = new Object();
     
    4243    }
    4344
     45    /** may be called twice, first for CREATE and second for ADD */
    4446    @Override
    4547    public void sessionStatusReceived(String result, String destination, String msg) {
    4648        synchronized (_sessionCreateLock) {
     49            Boolean ok;
    4750            if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
    48                 _sessionCreateOk = Boolean.TRUE;
     51                ok = Boolean.TRUE;
    4952            else
    50                 _sessionCreateOk = Boolean.FALSE;
     53                ok = Boolean.FALSE;
     54            if (_sessionCreateOk == null)
     55                _sessionCreateOk = ok;
     56            else if (_sessionAddOk == null)
     57                _sessionAddOk = ok;
    5158            _sessionCreateLock.notifyAll();
    5259        }
     
    122129
    123130    /**
     131     * Wait for the session to be added, returning true if everything went ok
     132     *
     133     * @return true if everything ok
     134     * @since 0.9.25
     135     */
     136    public boolean waitForSessionAddReply() {
     137        while (true) {
     138            try {
     139                synchronized (_sessionCreateLock) {
     140                    if (_sessionAddOk == null)
     141                        _sessionCreateLock.wait();
     142                    else
     143                        return _sessionAddOk.booleanValue();
     144                }
     145            } catch (InterruptedException ie) { return false; }
     146        }
     147    }
     148
     149    /**
    124150     * Wait for the stream to be created, returning true if everything went ok
    125151     *
  • apps/sam/java/src/net/i2p/sam/client/SAMReader.java

    r84b9436 r241bb38  
    138138                        String name = pair.substring(0, eq);
    139139                        String val = pair.substring(eq+1);
     140                        if (val.length() <= 0) {
     141                            _log.error("Empty value for " + name);
     142                            continue;
     143                        }
    140144                        while ( (val.charAt(0) == '\"') && (val.length() > 0) )
    141145                            val = val.substring(1);
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    r84b9436 r241bb38  
    5656   
    5757    private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
    58     private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
     58    private static final int MASTER=8;
     59    private static final String USAGE = "Usage: SAMStreamSend [-s] [-x] [-m mode] [-v version] [-b samHost] [-p samPort]\n" +
     60                                        "                     [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
    5961                                        "       modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
     62                                        "              default is stream\n" +
    6063                                        "       -s: use SSL\n" +
     64                                        "       -x: use master session (forces -v 3.3)\n" +
    6165                                        "       multiple -o session options are allowed";
    6266
    6367    public static void main(String args[]) {
    64         Getopt g = new Getopt("SAM", args, "sb:m:o:p:u:v:w:");
     68        Getopt g = new Getopt("SAM", args, "sxhb:m:o:p:u:v:w:");
    6569        boolean isSSL = false;
     70        boolean isMaster = false;
    6671        int mode = STREAM;
    67         String version = "1.0";
     72        String version = "3.3";
    6873        String host = "127.0.0.1";
    6974        String port = "7656";
     
    7883                break;
    7984
     85            case 'x':
     86                isMaster = true;
     87                break;
     88
    8089            case 'm':
    8190                mode = Integer.parseInt(g.getOptarg());
     
    123132            System.err.println(USAGE);
    124133            return;
     134        }
     135        if (isMaster) {
     136            mode += MASTER;
     137            version = "3.3";
    125138        }
    126139        if ((user == null && password != null) ||
     
    163176            OutputStream out = sock.getOutputStream();
    164177            String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
     178            if (mode >= MASTER)
     179                mode -= MASTER;
    165180            if (ourDest == null)
    166181                throw new IOException("handshake failed");
     
    231246    }
    232247   
    233     /** @return our b64 dest or null */
     248    /**
     249     * @param isMaster is this the control socket
     250     * @return our b64 dest or null
     251     */
    234252    private String handshake(OutputStream samOut, String version, boolean isMaster,
    235253                             SAMEventHandler eventHandler, int mode, String user, String password,
     
    262280                    _conOptions = "ID=" + _v3ID;
    263281                }
     282                boolean masterMode;  // are we using v3.3 master session
     283                String command;
     284                if (mode >= MASTER) {
     285                    masterMode = true;
     286                    command = "ADD";
     287                    mode -= MASTER;
     288                } else {
     289                    masterMode = false;
     290                    command = "CREATE DESTINATION=TRANSIENT";
     291                }
    264292                String style;
    265293                if (mode == STREAM)
     
    267295                else if (mode == DG || mode == V1DG)
    268296                    style = "DATAGRAM";
    269                 else
     297                else   // RAW or V1RAW
    270298                    style = "RAW";
    271                 String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n';
     299
     300                if (masterMode) {
     301                    if (mode == V1DG || mode == V1RAW)
     302                        throw new IllegalArgumentException("v1 dg/raw incompatible with master session");
     303                    String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=masterSend " + opts + '\n';
     304                    samOut.write(req.getBytes("UTF-8"));
     305                    samOut.flush();
     306                    if (_log.shouldLog(Log.DEBUG))
     307                        _log.debug("SESSION CREATE STYLE=MASTER sent");
     308                    boolean ok = eventHandler.waitForSessionCreateReply();
     309                    if (!ok)
     310                        throw new IOException("SESSION CREATE STYLE=MASTER failed");
     311                    if (_log.shouldLog(Log.DEBUG))
     312                        _log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok);
     313                    // PORT required even if we aren't listening for this test
     314                    if (mode != STREAM)
     315                        opts += " PORT=9999";
     316                }
     317                String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + opts + '\n';
    272318                samOut.write(req.getBytes("UTF-8"));
    273319                samOut.flush();
    274320                if (_log.shouldLog(Log.DEBUG))
    275                     _log.debug("Session create sent");
    276                 boolean ok = eventHandler.waitForSessionCreateReply();
     321                    _log.debug("SESSION " + command + " sent");
     322                boolean ok;
     323                if (masterMode)
     324                    ok = eventHandler.waitForSessionAddReply();
     325                else
     326                    ok = eventHandler.waitForSessionCreateReply();
    277327                if (!ok)
    278                     throw new IOException("Session create failed");
    279                 if (_log.shouldLog(Log.DEBUG))
    280                     _log.debug("Session create reply found: " + ok);
    281 
     328                    throw new IOException("SESSION " + command + " failed");
     329                if (_log.shouldLog(Log.DEBUG))
     330                    _log.debug("SESSION " + command + " reply found: " + ok);
     331
     332                if (masterMode) {
     333                    // do a bunch more
     334                    req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n";
     335                    samOut.write(req.getBytes("UTF-8"));
     336                    req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n";
     337                    samOut.write(req.getBytes("UTF-8"));
     338                    req = "SESSION REMOVE ID=stream99\n";
     339                    samOut.write(req.getBytes("UTF-8"));
     340                    samOut.flush();
     341                }
    282342                req = "NAMING LOOKUP NAME=ME\n";
    283343                samOut.write(req.getBytes("UTF-8"));
     
    454514                                else
    455515                                    baos.write(DataHelper.getUTF8(" TO_PORT=5678"));
     516                                baos.write(DataHelper.getUTF8(" SEND_TAGS=19 TAG_THRESHOLD=13 EXPIRES=33 SEND_LEASESET=true"));
    456517                            }
    457518                            baos.write((byte) '\n');
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    r84b9436 r241bb38  
    5959   
    6060    private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6, FORWARDSSL=7;
     61    private static final int MASTER=8;
    6162    private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort]\n" +
    6263                                        "                     [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" +
     
    6465                                        "              raw: 3; v1raw: 4; raw-with-headers: 5;\n" +
    6566                                        "              stream-forward: 6; stream-forward-ssl: 7\n" +
     67                                        "              default is stream\n" +
    6668                                        "       -s: use SSL to connect to bridge\n" +
     69                                        "       -x: use master session (forces -v 3.3)\n" +
    6770                                        "       multiple -o session options are allowed";
    6871    private static final int V3FORWARDPORT=9998;
     
    7073
    7174    public static void main(String args[]) {
    72         Getopt g = new Getopt("SAM", args, "sb:m:p:u:v:w:");
     75        Getopt g = new Getopt("SAM", args, "sxhb:m:p:u:v:w:");
    7376        boolean isSSL = false;
     77        boolean isMaster = false;
    7478        int mode = STREAM;
    75         String version = "1.0";
     79        String version = "3.3";
    7680        String host = "127.0.0.1";
    7781        String port = "7656";
     
    8690                break;
    8791
     92            case 'x':
     93                isMaster = true;
     94                break;
     95
    8896            case 'm':
    8997                mode = Integer.parseInt(g.getOptarg());
     
    131139            System.err.println(USAGE);
    132140            return;
     141        }
     142        if (isMaster) {
     143            mode += MASTER;
     144            version = "3.3";
    133145        }
    134146        if ((user == null && password != null) ||
     
    170182                _log.debug("Reader created");
    171183            String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
     184            if (mode >= MASTER)
     185                mode -= MASTER;
    172186            if (ourDest == null)
    173187                throw new IOException("handshake failed");
     
    561575    }
    562576   
    563     /** @return our b64 dest or null */
     577    /**
     578     * @param isMaster is this the control socket
     579     * @return our b64 dest or null
     580     */
    564581    private String handshake(OutputStream samOut, String version, boolean isMaster,
    565582                             SAMEventHandler eventHandler, int mode, String user, String password,
     
    642659                    dest = _destFile;
    643660                }
     661                boolean masterMode;  // are we using v3.3 master session
     662                String command;
     663                if (mode >= MASTER) {
     664                    masterMode = true;
     665                    command = "ADD";
     666                    mode -= MASTER;
     667                } else {
     668                    masterMode = false;
     669                    command = "CREATE DESTINATION=" + dest;
     670                }
    644671                String style;
    645672                if (mode == STREAM || mode == FORWARD || mode == FORWARDSSL)
     
    655682                else
    656683                    style = "RAW HEADER=true PORT=" + V3DGPORT;
    657                 String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n';
     684
     685                if (masterMode) {
     686                    if (mode == V1DG || mode == V1RAW)
     687                        throw new IllegalArgumentException("v1 dg/raw incompatible with master session");
     688                    String req = "SESSION CREATE DESTINATION=" + dest + " STYLE=MASTER ID=masterSink " + sopts + '\n';
     689                    samOut.write(req.getBytes("UTF-8"));
     690                    samOut.flush();
     691                    if (_log.shouldLog(Log.DEBUG))
     692                        _log.debug("SESSION CREATE STYLE=MASTER sent");
     693                    boolean ok = eventHandler.waitForSessionCreateReply();
     694                    if (!ok)
     695                        throw new IOException("SESSION CREATE STYLE=MASTER failed");
     696                    if (_log.shouldLog(Log.DEBUG))
     697                        _log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok);
     698                }
     699
     700                String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + sopts + '\n';
    658701                samOut.write(req.getBytes("UTF-8"));
    659702                samOut.flush();
    660703                if (_log.shouldLog(Log.DEBUG))
    661                     _log.debug("Session create sent");
    662                 if (mode == STREAM) {
    663                     boolean ok = eventHandler.waitForSessionCreateReply();
     704                    _log.debug("SESSION " + command + " sent");
     705                //if (mode == STREAM) {
     706                    boolean ok;
     707                    if (masterMode)
     708                        ok = eventHandler.waitForSessionAddReply();
     709                    else
     710                        ok = eventHandler.waitForSessionCreateReply();
    664711                    if (!ok)
    665                         throw new IOException("Session create failed");
     712                        throw new IOException("SESSION " + command + " failed");
    666713                    if (_log.shouldLog(Log.DEBUG))
    667                         _log.debug("Session create reply found: " + ok);
     714                        _log.debug("SESSION " + command + " reply found: " + ok);
     715                //}
     716                if (masterMode) {
     717                    // do a bunch more
     718                    req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n";
     719                    samOut.write(req.getBytes("UTF-8"));
     720                    req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n";
     721                    samOut.write(req.getBytes("UTF-8"));
     722                    req = "SESSION ADD STYLE=DATAGRAM PORT=9997 LISTEN_PORT=97 ID=dg97\n";
     723                    samOut.write(req.getBytes("UTF-8"));
     724                    req = "SESSION ADD STYLE=DATAGRAM PORT=9996 FROM_PORT=96 ID=dg96\n";
     725                    samOut.write(req.getBytes("UTF-8"));
     726                    req = "SESSION ADD STYLE=RAW PORT=9995 LISTEN_PORT=95 ID=raw95\n";
     727                    samOut.write(req.getBytes("UTF-8"));
     728                    req = "SESSION ADD STYLE=RAW PORT=9994 FROM_PORT=94 LISTEN_PROTOCOL=222 ID=raw94\n";
     729                    samOut.write(req.getBytes("UTF-8"));
     730                    req = "SESSION REMOVE ID=stream99\n";
     731                    samOut.write(req.getBytes("UTF-8"));
     732                    req = "SESSION REMOVE ID=raw95\n";
     733                    samOut.write(req.getBytes("UTF-8"));
     734                    req = "SESSION REMOVE ID=notfound\n";
     735                    samOut.write(req.getBytes("UTF-8"));
     736                    req = "SESSION REMOVE ID=masterSink\n"; // shouldn't remove ourselves
     737                    samOut.write(req.getBytes("UTF-8"));
     738                    samOut.flush();
    668739                }
    669740                req = "NAMING LOOKUP NAME=ME\n";
Note: See TracChangeset for help on using the changeset viewer.