Changeset 9b004bc


Ignore:
Timestamp:
Feb 5, 2016 4:10:04 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
270bc24
Parents:
ee1852f
Message:

SAM v3.3 master sessions.
Compiles only. Untested, not regression tested, not complete.

Location:
apps/sam/java/src/net/i2p/sam
Files:
2 added
9 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java

    ree1852f r9b004bc  
    4646     * @throws I2PSessionException
    4747     */
    48     public SAMDatagramSession(String dest, Properties props,
     48    protected SAMDatagramSession(String dest, Properties props,
    4949                              SAMDatagramReceiver recv) throws IOException,
    5050                              DataFormatException, I2PSessionException {
     
    6969                              DataFormatException, I2PSessionException {
    7070        super(destStream, props);
     71
     72        this.recv = recv;
     73        dgramMaker = new I2PDatagramMaker(getI2PSession());
     74    }
     75
     76    /**
     77     * Create a new SAM DATAGRAM session on an existing I2P session.
     78     *
     79     * @since 0.9.25
     80     */
     81    protected SAMDatagramSession(I2PSession sess, int listenPort,
     82                              SAMDatagramReceiver recv) throws IOException,
     83                              DataFormatException, I2PSessionException {
     84        super(sess, I2PSession.PROTO_DATAGRAM, listenPort);
    7185
    7286        this.recv = recv;
  • apps/sam/java/src/net/i2p/sam/SAMMessageSession.java

    ree1852f r9b004bc  
    3434 * @author human
    3535 */
    36 abstract class SAMMessageSession implements Closeable {
     36abstract class SAMMessageSession implements SAMMessageSess {
    3737
    3838    protected final Log _log;
    3939    private final I2PSession session;
    4040    private final SAMMessageSessionHandler handler;
     41    private final int listenProtocol;
     42    private final int listenPort;
    4143
    4244    /**
     
    6971        handler = new SAMMessageSessionHandler(destStream, props);
    7072        session = handler.getSession();
     73        listenProtocol = I2PSession.PROTO_ANY;
     74        listenPort = I2PSession.PORT_ANY;
    7175        // FIXME don't start threads in constructors
    7276        Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
     
    7579
    7680    /**
     81     * Initialize a new SAM message-based session using an existing I2PSession.
     82     *
     83     * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
     84     * @param props Properties to setup the I2P session
     85     * @throws IOException
     86     * @throws DataFormatException
     87     * @throws I2PSessionException
     88     * @since 0.9.25
     89     */
     90    protected SAMMessageSession(I2PSession sess, int listenProtocol, int listenPort)
     91                            throws IOException, DataFormatException, I2PSessionException {
     92        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
     93        if (_log.shouldLog(Log.DEBUG))
     94            _log.debug("Initializing SAM message-based session");
     95
     96        session = sess;
     97        handler = new SAMMessageSessionHandler(session);
     98        this.listenProtocol = listenProtocol;
     99        this.listenPort = listenPort;
     100        // FIXME don't start threads in constructors
     101        Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
     102        t.start();
     103    }
     104
     105    /**
    77106     * Get the SAM message-based session Destination.
    78107     *
     
    81110    public Destination getDestination() {
    82111        return session.getMyDestination();
     112    }
     113
     114    /**
     115     * @since 0.9.25
     116     */
     117    public int getListenProtocol() {
     118        return listenProtocol;
     119    }
     120
     121    /**
     122     * @since 0.9.25
     123     */
     124    public int getListenPort() {
     125        return listenPort;
    83126    }
    84127
     
    189232     * @author human
    190233     */
    191     class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
     234    private class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
    192235
    193236        private final I2PSession _session;
     
    199242         *
    200243         * @param destStream Input stream containing the destination keys
    201         * @param props Properties to setup the I2P session
    202         * @throws I2PSessionException
     244        * @param props Properties to setup the I2P session
     245        * @throws I2PSessionException
    203246         */
    204247        public SAMMessageSessionHandler(InputStream destStream, Properties props) throws I2PSessionException {
     
    219262                _log.debug("I2P session connected");
    220263
    221             _session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
     264            _session.addMuxedSessionListener(this, listenProtocol, listenPort);
     265        }
     266               
     267        /**
     268         * Create a new SAM message-based session handler on an existing I2PSession
     269         *
     270         * @since 0.9.25
     271         */
     272        public SAMMessageSessionHandler(I2PSession sess) throws I2PSessionException {
     273            _session = sess;
     274            _session.addMuxedSessionListener(this, listenProtocol, listenPort);
    222275        }
    223276
     
    258311           
    259312            shutDown();
    260             session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
     313            session.removeListener(listenProtocol, listenPort);
    261314           
    262315            try {
  • apps/sam/java/src/net/i2p/sam/SAMRawSession.java

    ree1852f r9b004bc  
    4040     * @throws I2PSessionException
    4141     */
    42     public SAMRawSession(String dest, Properties props,
     42    protected SAMRawSession(String dest, Properties props,
    4343                         SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
    4444        super(dest, props);
     
    6161        super(destStream, props);
    6262
     63        this.recv = recv;
     64    }
     65
     66    /**
     67     * Create a new SAM RAW session on an existing I2P session.
     68     *
     69     * @since 0.9.25
     70     */
     71    protected SAMRawSession(I2PSession sess, int listenProtocol, int listenPort,
     72                            SAMRawReceiver recv) throws IOException,
     73                              DataFormatException, I2PSessionException {
     74        super(sess, listenProtocol, listenPort);
    6375        this.recv = recv;
    6476    }
  • apps/sam/java/src/net/i2p/sam/SAMStreamSession.java

    ree1852f r9b004bc  
    2929import net.i2p.I2PException;
    3030import net.i2p.client.I2PClient;
     31import net.i2p.client.I2PSession;
    3132import net.i2p.client.streaming.I2PServerSocket;
    3233import net.i2p.client.streaming.I2PSocket;
     
    4849 * @author human
    4950 */
    50 class SAMStreamSession {
     51class SAMStreamSession implements SAMMessageSess {
    5152
    5253    protected final Log _log;
     
    6970    // Can we create outgoing connections?
    7071    protected final boolean canCreate;
     72    private final int listenProtocol;
     73    private final int listenPort;
    7174
    7275    /**
     
    106109     * @throws SAMException
    107110     */
    108     public SAMStreamSession(InputStream destStream, String dir,
    109                             Properties props,  SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
     111    protected SAMStreamSession(InputStream destStream, String dir,
     112                               Properties props,  SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
    110113        this.recv = recv;
    111114        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
     
    171174        forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH));
    172175       
     176        if (Boolean.parseBoolean(props.getProperty("i2p.streaming.enforceProtocol")))
     177            listenProtocol = I2PSession.PROTO_STREAMING;
     178        else
     179            listenProtocol = I2PSession.PROTO_ANY;
     180        listenPort = I2PSession.PORT_ANY;
     181
    173182
    174183        if (startAcceptor) {
     184            // FIXME don't start threads in constructors
    175185            server = new SAMStreamSessionServer();
    176186            Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
     
    180190            server = null;
    181191        }
     192    }
     193
     194    /**
     195     * Create a new SAM STREAM session on an existing socket manager.
     196     * v3 only.
     197     *
     198     * @param props Properties to setup the I2P session
     199     * @param recv Object that will receive incoming data
     200     * @throws IOException
     201     * @throws DataFormatException
     202     * @throws SAMException
     203     * @since 0.9.25
     204     */
     205    protected SAMStreamSession(I2PSocketManager mgr, Properties props, SAMStreamReceiver recv, int listenport)
     206                               throws IOException, DataFormatException, SAMException {
     207        this.recv = recv;
     208        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
     209        if (_log.shouldLog(Log.DEBUG))
     210            _log.debug("SAM STREAM session instantiated");
     211        canCreate = true;
     212        Properties allprops = (Properties) System.getProperties().clone();
     213        allprops.putAll(props);
     214        socketMgr = mgr;
     215        socketMgr.addDisconnectListener(new DisconnectListener());
     216        forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH));
     217        listenProtocol = I2PSession.PROTO_STREAMING;
     218        listenPort = listenport;
     219        server = null;
     220    }
     221
     222    /*
     223     * @since 0.9.25
     224     */
     225    public int getListenProtocol() {
     226        return listenProtocol;
     227    }
     228
     229    /*
     230     * @since 0.9.25
     231     */
     232    public int getListenPort() {
     233        return listenPort;
    182234    }
    183235   
     
    303355
    304356        return true;
     357    }
     358
     359    /**
     360     *  Unsupported
     361     *  @throws DataFormatException always
     362     *  @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess
     363     */
     364    public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException {
     365        throw new DataFormatException(null);
    305366    }
    306367
  • apps/sam/java/src/net/i2p/sam/SAMv1Handler.java

    ree1852f r9b004bc  
    4141class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver {
    4242   
    43     protected SAMRawSession rawSession;
    44     protected SAMDatagramSession datagramSession;
     43    protected SAMMessageSess rawSession;
     44    protected SAMMessageSess datagramSession;
    4545    protected SAMStreamSession streamSession;
    4646
    47     protected SAMRawSession getRawSession() {return rawSession ;}
    48     protected SAMDatagramSession getDatagramSession() {return datagramSession ;}       
    49     protected SAMStreamSession getStreamSession() {return streamSession ;}
     47    protected final SAMMessageSess getRawSession() { return rawSession; }
     48    protected final SAMMessageSess getDatagramSession() { return datagramSession; }     
     49    protected final SAMStreamSession getStreamSession() { return streamSession; }
    5050
    5151    protected final long _id;
     
    200200                    _log.warn("Error closing socket", e);
    201201            }
    202             if (getRawSession() != null) {
    203                 getRawSession().close();
    204             }
    205             if (getDatagramSession() != null) {
    206                 getDatagramSession().close();
    207             }
    208             if (getStreamSession() != null) {
    209                 getStreamSession().close();
     202            if (rawSession != null) {
     203                rawSession.close();
     204            }
     205            if (datagramSession != null) {
     206                datagramSession.close();
     207            }
     208            if (streamSession != null) {
     209                streamSession.close();
    210210            }
    211211        }
     
    219219        try{
    220220            if (opcode.equals("CREATE")) {
    221                 if ((getRawSession() != null) || (getDatagramSession() != null)
    222                     || (getStreamSession() != null)) {
     221                if ((rawSession != null) || (datagramSession != null)
     222                    || (streamSession != null)) {
    223223                    if (_log.shouldLog(Log.DEBUG))
    224224                        _log.debug("Trying to create a session, but one still exists");
     
    375375            Destination dest = null ;
    376376            if (name.equals("ME")) {
    377                 if (getRawSession() != null) {
    378                     dest = getRawSession().getDestination();
    379                 } else if (getStreamSession() != null) {
    380                     dest = getStreamSession().getDestination();
    381                 } else if (getDatagramSession() != null) {
    382                     dest = getDatagramSession().getDestination();
     377                if (rawSession != null) {
     378                    dest = rawSession.getDestination();
     379                } else if (streamSession != null) {
     380                    dest = streamSession.getDestination();
     381                } else if (datagramSession != null) {
     382                    dest = datagramSession.getDestination();
    383383                } else {
    384384                    if (_log.shouldLog(Log.DEBUG))
     
    412412    /* Parse and execute a DATAGRAM message */
    413413    protected boolean execDatagramMessage(String opcode, Properties props) {
    414         if (getDatagramSession() == null) {
     414        if (datagramSession == null) {
    415415            _log.error("DATAGRAM message received, but no DATAGRAM session exists");
    416416            return false;
     
    479479                in.readFully(data);
    480480
    481                 if (!getDatagramSession().sendBytes(dest, data, proto, fromPort, toPort)) {
     481                if (!datagramSession.sendBytes(dest, data, proto, fromPort, toPort)) {
    482482                    _log.error("DATAGRAM SEND failed");
    483483                    // a message send failure is no reason to drop the SAM session
     
    516516    /* Parse and execute a RAW message */
    517517    protected boolean execRawMessage(String opcode, Properties props) {
    518         if (getRawSession() == null) {
     518        if (rawSession == null) {
    519519            _log.error("RAW message received, but no RAW session exists");
    520520            return false;
     
    592592                in.readFully(data);
    593593
    594                 if (!getRawSession().sendBytes(dest, data, proto, fromPort, toPort)) {
     594                if (!rawSession.sendBytes(dest, data, proto, fromPort, toPort)) {
    595595                    _log.error("RAW SEND failed");
    596596                    // a message send failure is no reason to drop the SAM session
     
    629629    /* Parse and execute a STREAM message */
    630630    protected boolean execStreamMessage(String opcode, Properties props) {
    631         if (getStreamSession() == null) {
     631        if (streamSession == null) {
    632632            _log.error("STREAM message received, but no STREAM session exists");
    633633            return false;
     
    696696
    697697        try {
    698             if (!getStreamSession().sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
     698            if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
    699699                if (_log.shouldLog(Log.WARN))
    700700                    _log.warn("STREAM SEND [" + size + "] failed");
     
    702702                // for style=stream, tell the client the stream failed, and kill the virtual connection..
    703703                boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
    704                 getStreamSession().closeConnection(id);
     704                streamSession.closeConnection(id);
    705705                return rv;
    706706            }
     
    757757        try {
    758758            try {
    759                 if (!getStreamSession().connect(id, dest, props)) {
     759                if (!streamSession.connect(id, dest, props)) {
    760760                    if (_log.shouldLog(Log.DEBUG))
    761761                        _log.debug("STREAM connection failed");
     
    818818        }
    819819
    820         boolean closed = getStreamSession().closeConnection(id);
     820        boolean closed = streamSession.closeConnection(id);
    821821        if ( (!closed) && (_log.shouldLog(Log.WARN)) )
    822822            _log.warn("Stream unable to be closed, but this is non fatal");
     
    836836    // SAMRawReceiver implementation
    837837    public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException {
    838         if (getRawSession() == null) {
     838        if (rawSession == null) {
    839839            _log.error("BUG! Received raw bytes, but session is null!");
    840840            return;
     
    862862            _log.debug("stopRawReceiving() invoked");
    863863
    864         if (getRawSession() == null) {
     864        if (rawSession == null) {
    865865            _log.error("BUG! Got raw receiving stop, but session is null!");
    866866            return;
     
    878878    public void receiveDatagramBytes(Destination sender, byte data[], int proto,
    879879                                     int fromPort, int toPort) throws IOException {
    880         if (getDatagramSession() == null) {
     880        if (datagramSession == null) {
    881881            _log.error("BUG! Received datagram bytes, but session is null!");
    882882            return;
     
    905905            _log.debug("stopDatagramReceiving() invoked");
    906906
    907         if (getDatagramSession() == null) {
     907        if (datagramSession == null) {
    908908            _log.error("BUG! Got datagram receiving stop, but session is null!");
    909909            return;
     
    922922    public void streamSendAnswer( int id, String result, String bufferState ) throws IOException
    923923    {
    924         if ( getStreamSession() == null )
     924        if ( streamSession == null )
    925925        {
    926926            _log.error ( "BUG! Want to answer to stream SEND, but session is null!" );
     
    940940    public void notifyStreamSendBufferFree( int id ) throws IOException
    941941    {
    942         if ( getStreamSession() == null )
     942        if ( streamSession == null )
    943943        {
    944944            _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" );
     
    954954
    955955    public void notifyStreamIncomingConnection(int id, Destination d) throws IOException {
    956         if (getStreamSession() == null) {
     956        if (streamSession == null) {
    957957            _log.error("BUG! Received stream connection, but session is null!");
    958958            return;
     
    969969    public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException
    970970    {
    971         if ( getStreamSession() == null )
     971        if ( streamSession == null )
    972972        {
    973973            _log.error ( "BUG! Received stream connection, but session is null!" );
     
    10121012 
    10131013    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
    1014         if (getStreamSession() == null) {
     1014        if (streamSession == null) {
    10151015            _log.error("Received stream bytes, but session is null!");
    10161016            return;
     
    10331033    /** @param msg may be null */
    10341034    public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
    1035         if (getStreamSession() == null) {
     1035        if (streamSession == null) {
    10361036            _log.error("BUG! Received stream disconnection, but session is null!");
    10371037            return;
     
    10481048            _log.debug("stopStreamReceiving() invoked", new Exception("stopped"));
    10491049
    1050         if (getStreamSession() == null) {
     1050        if (streamSession == null) {
    10511051            _log.error("BUG! Got stream receiving stop, but session is null!");
    10521052            return;
  • apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java

    ree1852f r9b004bc  
    77
    88import java.io.IOException;
     9import java.net.InetSocketAddress;
     10import java.net.SocketAddress ;
     11import java.nio.ByteBuffer;
    912import java.util.Properties;
    1013
     14import net.i2p.client.I2PSession;
    1115import net.i2p.client.I2PSessionException;
    1216import net.i2p.data.DataFormatException;
     
    1519import net.i2p.util.Log;
    1620
    17 import java.net.InetSocketAddress;
    18 import java.net.SocketAddress ;
    19 import java.nio.ByteBuffer;
    2021
    2122class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDatagramReceiver {
     
    5455               
    5556                Properties props = rec.getProps();
    56                 String portStr = props.getProperty("PORT");
    57                 if (portStr == null) {
    58                         if (_log.shouldDebug())
    59                                 _log.debug("receiver port not specified. Current socket will be used.");
    60                         this.clientAddress = null;
    61                 } else {
    62                         int port = Integer.parseInt(portStr);
    63                         String host = props.getProperty("HOST");
    64                         if (host == null) {             
    65                                 host = rec.getHandler().getClientIP();
    66                                 if (_log.shouldDebug())
    67                                         _log.debug("no host specified. Taken from the client socket : " + host+':'+port);
    68                         }
    69                         this.clientAddress = new InetSocketAddress(host, port);
    70                 }
     57                clientAddress = SAMv3RawSession.getSocketAddress(props, handler);
     58        }
     59
     60        /**
     61         *   Build a Datagram Session on an existing i2p session
     62         *   registered with the given nickname
     63         *   
     64         * @param nick nickname of the session
     65         * @throws IOException
     66         * @throws DataFormatException
     67         * @throws I2PSessionException
     68         * @since 0.9.25
     69         */
     70        public SAMv3DatagramSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess,
     71                                    int listenPort, SAMv3DatagramServer dgServer)
     72                        throws IOException, DataFormatException, I2PSessionException {
     73                super(isess, listenPort, null);  // to be replaced by this
     74                this.nick = nick ;
     75                this.recv = this ;  // replacement
     76                this.server = dgServer;
     77                this.handler = handler;
     78                clientAddress = SAMv3RawSession.getSocketAddress(props, handler);
    7179        }
    7280
  • apps/sam/java/src/net/i2p/sam/SAMv3Handler.java

    ree1852f r9b004bc  
    4949       
    5050        private Session session;
     51        // TODO remove singleton, hang off SAMBridge like dgserver
    5152        public static final SessionsDB sSessionsHash = new SessionsDB();
    5253        private volatile boolean stolenSocket;
     
    370371
    371372                String dest = "BUG!";
    372                 String nick =  null ;
    373373                boolean ok = false ;
     374
     375                String nick = (String) props.remove("ID");
     376                if (nick == null)
     377                        return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n");
     378
     379                String style = (String) props.remove("STYLE");
     380                if (style == null && !opcode.equals("REMOVE"))
     381                        return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
    374382
    375383                try{
     
    419427                                }
    420428
    421 
    422                                 nick = (String) props.remove("ID");
    423                                 if (nick == null) {
    424                                         if (_log.shouldLog(Log.DEBUG))
    425                                                 _log.debug("SESSION ID parameter not specified");
    426                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n");
    427                                 }
    428 
    429 
    430                                 String style = (String) props.remove("STYLE");
    431                                 if (style == null) {
    432                                         if (_log.shouldLog(Log.DEBUG))
    433                                                 _log.debug("SESSION STYLE parameter not specified");
    434                                         return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
    435                                 }
    436 
    437429                                // Unconditionally override what the client may have set
    438430                                // (iMule sets BestEffort) as None is more efficient
     
    473465                                        streamSession = v3;
    474466                                        this.session = v3;
     467                                } else if (style.equals("MASTER")) {
     468                                        SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
     469                                        MasterSession v3 = new MasterSession(nick, dgs, this, allProps);
     470                                        streamSession = v3;
     471                                        datagramSession = v3;
     472                                        rawSession = v3;
     473                                        this.session = v3;
    475474                                } else {
    476475                                        if (_log.shouldLog(Log.DEBUG))
     
    481480                                return writeString("SESSION STATUS RESULT=OK DESTINATION="
    482481                                                + dest + "\n");
     482                        } else if (opcode.equals("ADD") || opcode.equals("REMOVE")) {
     483                                // prevent trouble in finally block
     484                                ok = true;
     485                                if (streamSession != null || datagramSession != null || rawSession != null)
     486                                        return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Not a MASTER session\"\n");
     487                                MasterSession msess = (MasterSession) session;
     488                                String msg;
     489                                if (opcode.equals("ADD")) {
     490                                        msg = msess.add(nick, style, props);
     491                                } else {
     492                                        msg = msess.remove(nick, props);
     493                                }
     494                                if (msg == null)
     495                                        return writeString("SESSION STATUS RESULT=OK MESSAGE=\"" + opcode + ' ' + nick + "\"\n");
     496                                else
     497                                        return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + msg + "\"\n");
    483498                        } else {
    484499                                if (_log.shouldLog(Log.DEBUG))
  • apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java

    ree1852f r9b004bc  
    1111import java.util.Properties;
    1212
     13import net.i2p.client.I2PSession;
    1314import net.i2p.client.I2PSessionException;
    1415import net.i2p.data.DataFormatException;
     
    2021 *
    2122 */
    22 class SAMv3RawSession extends SAMRawSession  implements Session, SAMRawReceiver {
     23class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver {
    2324       
    2425        private final String nick;
     
    4344                super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
    4445                      SAMv3Handler.sSessionsHash.get(nick).getProps(),
    45                       SAMv3Handler.sSessionsHash.get(nick).getHandler()  // to be replaced by this
     46                      null  // to be replaced by this
    4647                );
    4748                this.nick = nick ;
    4849                this.recv = this ;  // replacement
    4950                this.server = dgServer;
    50 
    5151                SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    5252                if (rec == null)
     
    5454                this.handler = rec.getHandler();
    5555                Properties props = rec.getProps();
     56                clientAddress = getSocketAddress(props, handler);
     57                _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
     58                              Boolean.parseBoolean(props.getProperty("HEADER"));
     59        }
     60
     61        /**
     62         *   Build a Raw Session on an existing i2p session
     63         *   registered with the given nickname
     64         *   
     65         * @param nick nickname of the session
     66         * @throws IOException
     67         * @throws DataFormatException
     68         * @throws I2PSessionException
     69         * @since 0.9.25
     70         */
     71        public SAMv3RawSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess,
     72                               int listenProtocol, int listenPort, SAMv3DatagramServer dgServer)
     73                        throws IOException, DataFormatException, I2PSessionException {
     74                super(isess, listenProtocol, listenPort, null);  // to be replace by this
     75                this.nick = nick ;
     76                this.recv = this ;  // replacement
     77                this.server = dgServer;
     78                this.handler = handler;
     79                clientAddress = getSocketAddress(props, handler);
     80                _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
     81                              Boolean.parseBoolean(props.getProperty("HEADER"));
     82        }
     83       
     84        /**
     85         *  @return null if PORT not set
     86         *  @since 0.9.25 moved from constructor
     87         */
     88        static SocketAddress getSocketAddress(Properties props, SAMv3Handler handler) {
    5689                String portStr = props.getProperty("PORT") ;
    5790                if (portStr == null) {
    58                         if (_log.shouldLog(Log.DEBUG))
    59                                 _log.debug("receiver port not specified. Current socket will be used.");
    60                         this.clientAddress = null;
     91                        return null;
    6192                } else {
    6293                        int port = Integer.parseInt(portStr);
    6394                        String host = props.getProperty("HOST");
    6495                        if ( host==null ) {
    65                                 host = rec.getHandler().getClientIP();
    66                                 if (_log.shouldLog(Log.DEBUG))
    67                                         _log.debug("no host specified. Taken from the client socket : " + host +':'+port);
     96                                host = handler.getClientIP();
    6897                        }
    69                         this.clientAddress = new InetSocketAddress(host, port);
     98                        return new InetSocketAddress(host, port);
    7099                }
    71                 _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
    72                               Boolean.parseBoolean(props.getProperty("HEADER"));
    73100        }
    74        
     101
    75102        public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException {
    76103                if (this.clientAddress==null) {
  • apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java

    ree1852f r9b004bc  
    2222import java.security.GeneralSecurityException;
    2323import java.util.Properties;
     24import java.util.concurrent.LinkedBlockingQueue;
    2425import java.util.concurrent.atomic.AtomicInteger;
    2526
     
    3132import net.i2p.client.streaming.I2PServerSocket;
    3233import net.i2p.client.streaming.I2PSocket;
     34import net.i2p.client.streaming.I2PSocketManager;
    3335import net.i2p.client.streaming.I2PSocketOptions;
    3436import net.i2p.data.DataFormatException;
     
    4749{
    4850
    49                 private static final int BUFFER_SIZE = 1024 ;
     51                private static final int BUFFER_SIZE = 1024;
     52                private static final int MAX_ACCEPT_QUEUE = 64;
    5053               
    5154                private final Object socketServerLock = new Object();
     
    5457                /** this is the count of active ACCEPT sockets */
    5558                private final AtomicInteger _acceptors = new AtomicInteger();
     59                /** for subsession only, null otherwise */
     60                private final LinkedBlockingQueue<I2PSocket> _acceptQueue;
    5661
    5762                private static I2PSSLSocketFactory _sslSocketFactory;
     
    8085                      getDB().get(login).getHandler());
    8186                this.nick = login ;
     87                _acceptQueue = null;
     88            }
     89
     90            /**
     91             *   Build a Datagram Session on an existing I2P session
     92             *   registered with the given nickname
     93             *   
     94             * @param nick nickname of the session
     95             * @throws IOException
     96             * @throws DataFormatException
     97             * @throws I2PSessionException
     98             * @since 0.9.25
     99             */
     100            public SAMv3StreamSession(String login, Properties props, SAMv3Handler handler, I2PSocketManager mgr,
     101                                        int listenPort) throws IOException, DataFormatException, SAMException {
     102                super(mgr, props, handler, listenPort);
     103                this.nick = login ;
     104                _acceptQueue = new LinkedBlockingQueue<I2PSocket>(MAX_ACCEPT_QUEUE);
     105            }
     106
     107            /**
     108             * Put a socket on the accept queue.
     109             * Only for subsession, throws IllegalStateException otherwise.
     110             *   
     111             * @return success, false if full
     112             * @since 0.9.25
     113             */
     114            public boolean queueSocket(I2PSocket sock) {
     115                if (_acceptQueue == null)
     116                    throw new IllegalStateException();
     117                return _acceptQueue.offer(sock);
     118            }
     119
     120            /**
     121             * Take a socket from the accept queue.
     122             * Only for subsession, throws IllegalStateException otherwise.
     123             *   
     124             * @since 0.9.25
     125             */
     126            private I2PSocket acceptSocket() throws ConnectException {
     127                if (_acceptQueue == null)
     128                    throw new IllegalStateException();
     129                try {
     130                        return _acceptQueue.take();
     131                } catch (InterruptedException ie) {
     132                        ConnectException ce = new ConnectException("interrupted");
     133                        ce.initCause(ie);
     134                        throw ce;
     135                }
    82136            }
    83137
     
    186240                }
    187241
    188                 I2PSocket i2ps;
     242                I2PSocket i2ps = null;
    189243                _acceptors.incrementAndGet();
    190244                try {
    191                         i2ps = socketMgr.getServerSocket().accept();
     245                        if (_acceptQueue != null)
     246                                i2ps = acceptSocket();
     247                        else
     248                                i2ps = socketMgr.getServerSocket().accept();
    192249                } finally {
    193250                        _acceptors.decrementAndGet();
     
    258315                }
    259316               
    260                 SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, this, verbose, sendPorts);
     317                SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, verbose, sendPorts);
    261318                (new I2PAppThread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
    262319            }
     
    265322             *  Forward sockets from I2P to the host/port provided
    266323             */
    267             private static class SocketForwarder implements Runnable
     324            private class SocketForwarder implements Runnable
    268325            {
    269326                private final String host;
    270327                private final int port;
    271                 private final SAMv3StreamSession session;
    272328                private final boolean isSSL, verbose, sendPorts;
    273329               
    274330                SocketForwarder(String host, int port, boolean isSSL,
    275                                 SAMv3StreamSession session, boolean verbose, boolean sendPorts) {
     331                                boolean verbose, boolean sendPorts) {
    276332                        this.host = host ;
    277333                        this.port = port ;
    278                         this.session = session ;
    279334                        this.verbose = verbose ;
    280335                        this.sendPorts = sendPorts;
     
    284339                public void run()
    285340                {
    286                         while (session.getSocketServer()!=null) {
     341                        while (getSocketServer() != null) {
    287342                               
    288343                                // wait and accept a connection from I2P side
    289344                                I2PSocket i2ps;
    290345                                try {
    291                                         i2ps = session.getSocketServer().accept();
     346                                        if (_acceptQueue != null)
     347                                                i2ps = acceptSocket();
     348                                        else
     349                                                i2ps = getSocketServer().accept();
    292350                                        if (i2ps == null)
    293351                                                continue;
     
    438496            }
    439497           
    440             private I2PServerSocket getSocketServer()
     498            protected I2PServerSocket getSocketServer()
    441499            {
    442500                synchronized ( this.socketServerLock ) {
     
    480538                socketMgr.destroySocketManager();
    481539            }
    482 
    483             /**
    484              *  Unsupported
    485              *  @throws DataFormatException always
    486              */
    487             public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException
    488             {
    489                 throw new DataFormatException(null);
    490             }
    491540}
Note: See TracChangeset for help on using the changeset viewer.