Changeset f3c4a26


Ignore:
Timestamp:
Jul 10, 2013 6:54:25 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
70a8ab1
Parents:
9a1e1a9
Message:
  • I2CP/I2PTunnel locking fixes (partial fixes for tickets 650. 815, 946, 947, 953):
    • I2PSocketManagerFactory: New createDisconnectedManager(), javadocs
    • I2PSessionImpl: Rewrite state management and locking, prevent multiple connect() calls, but allow disconnect() to interrupt connect()
    • I2PSimpleSession: Changes to match I2PSessionImpl
    • I2PTunnelServer: Don't connect in constructor, use createDisconnectedManager() for a final manager, finals and cleanups Lightly tested. Todo: I2PTunnelClientBase
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java

    r9a1e1a9 rf3c4a26  
    2626import net.i2p.I2PAppContext;
    2727import net.i2p.I2PException;
     28import net.i2p.client.I2PSessionException;
    2829import net.i2p.client.streaming.I2PServerSocket;
    2930import net.i2p.client.streaming.I2PSocket;
     
    3839
    3940    protected final Log _log;
    40     protected I2PSocketManager sockMgr;
     41    protected final I2PSocketManager sockMgr;
    4142    protected I2PServerSocket i2pss;
    4243
     
    4445    protected final Object slock = new Object();
    4546
    46     protected InetAddress remoteHost;
    47     protected int remotePort;
    48     private boolean _usePool;
    49 
    50     protected Logging l;
     47    protected final InetAddress remoteHost;
     48    protected final int remotePort;
     49    private final boolean _usePool;
     50    protected final Logging l;
    5151
    5252    private static final long DEFAULT_READ_TIMEOUT = 5*60*1000;
     
    5757    private static final String PROP_USE_POOL = "i2ptunnel.usePool";
    5858    private static final boolean DEFAULT_USE_POOL = true;
     59    /** apparently unused */
    5960    protected static volatile long __serverId = 0;
    6061    /** max number of threads  - this many slowlorisses will DOS this server, but too high could OOM the JVM */
     
    6667    private static final long HANDLER_KEEPALIVE_MS = 30*1000;
    6768
    68     protected I2PTunnelTask task = null;
    69     protected boolean bidir = false;
     69    protected I2PTunnelTask task;
     70    protected boolean bidir;
    7071    private ThreadPoolExecutor _executor;
    7172
     
    7576
    7677    /**
    77      * Warning, blocks in constructor while connecting to router and building tunnels;
    78      * TODO move that to startRunning()
     78     *  Non-blocking
    7979     *
    8080     * @param privData Base64-encoded private key data,
     
    8787        _log = tunnel.getContext().logManager().getLog(getClass());
    8888        ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(privData));
    89         init(host, port, bais, privData, l);
    90     }
    91 
    92     /**
    93      * Warning, blocks in constructor while connecting to router and building tunnels;
    94      * TODO move that to startRunning()
     89        this.l = l;
     90        this.remoteHost = host;
     91        this.remotePort = port;
     92        _usePool = getUsePool();
     93        sockMgr = createManager(bais);
     94    }
     95
     96    /**
     97     *  Non-blocking
    9598     *
    9699     * @param privkey file containing the private key data,
    97100     *                format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    98      * @param privkeyname the name of the privKey file, not clear why we need this too
     101     * @param privkeyname the name of the privKey file, just for logging
    99102     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    100103     *                                  badly that we cant create a socketManager
     
    104107        super("Server at " + host + ':' + port, notifyThis, tunnel);
    105108        _log = tunnel.getContext().logManager().getLog(getClass());
     109        this.l = l;
     110        this.remoteHost = host;
     111        this.remotePort = port;
     112        _usePool = getUsePool();
    106113        FileInputStream fis = null;
    107114        try {
    108115            fis = new FileInputStream(privkey);
    109             init(host, port, fis, privkeyname, l);
     116            sockMgr = createManager(fis);
    110117        } catch (IOException ioe) {
    111             _log.error("Error starting server", ioe);
     118            _log.error("Cannot read private key data for " + privkeyname, ioe);
    112119            notifyEvent("openServerResult", "error");
     120            throw new IllegalArgumentException("Error starting server", ioe);
    113121        } finally {
    114122            if (fis != null)
     
    118126
    119127    /**
    120      * Warning, blocks in constructor while connecting to router and building tunnels;
    121      * TODO move that to startRunning()
     128     *  Non-blocking
    122129     *
    123130     * @param privData stream containing the private key data,
    124131     *                 format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    125      * @param privkeyname the name of the privKey file, not clear why we need this too
     132     * @param privkeyname the name of the privKey file, just for logging
    126133     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    127134     *                                  badly that we cant create a socketManager
     
    130137        super("Server at " + host + ':' + port, notifyThis, tunnel);
    131138        _log = tunnel.getContext().logManager().getLog(getClass());
    132         init(host, port, privData, privkeyname, l);
     139        this.l = l;
     140        this.remoteHost = host;
     141        this.remotePort = port;
     142        _usePool = getUsePool();
     143        sockMgr = createManager(privData);
    133144    }
    134145
     
    146157        this.remotePort = port;
    147158        _log = tunnel.getContext().logManager().getLog(getClass());
     159        _usePool = false;
    148160        sockMgr = sktMgr;
    149161        open = true;
    150162    }
    151163
     164    /** @since 0.9.8 */
     165    private boolean getUsePool() {
     166        // extending classes default to threaded, but for a standard server, we can't get slowlorissed
     167        boolean rv = !getClass().equals(I2PTunnelServer.class);
     168        if (rv) {
     169            String usePool = getTunnel().getClientOptions().getProperty(PROP_USE_POOL);
     170            if (usePool != null)
     171                rv = Boolean.parseBoolean(usePool);
     172            else
     173                rv = DEFAULT_USE_POOL;
     174        }
     175        return rv;
     176    }
     177
    152178    private static final int RETRY_DELAY = 20*1000;
    153179    private static final int MAX_RETRIES = 4;
    154180
    155181    /**
    156      * Warning, blocks while connecting to router and building tunnels;
    157      * TODO move that to startRunning()
    158      *
    159      * @param privData stream containing the private key data,
    160      *                 format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    161      * @param privkeyname the name of the privKey file, not clear why we need this too
    162      * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    163      *                                  badly that we cant create a socketManager
    164      */
    165     private void init(InetAddress host, int port, InputStream privData, String privkeyname, Logging l) {
    166         this.l = l;
    167         this.remoteHost = host;
    168         this.remotePort = port;
     182     *
     183     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
     184     *                                  badly that we cant create a socketManager
     185     * @since 0.9.8
     186     */
     187    private I2PSocketManager createManager(InputStream privData) {
    169188        Properties props = new Properties();
    170189        props.putAll(getTunnel().getClientOptions());
     
    177196            }
    178197        }
    179 
    180         // copy the privData to a new BAIS, so we can always reset() it if we have to retry
    181         ByteArrayInputStream privDataCopy;
    182198        try {
    183             privDataCopy = copyOfInputStream(privData);
    184         } catch (IOException ioe) {
    185             _log.log(Log.CRIT, "Cannot read private key data for " + privkeyname, ioe);
    186             return;
    187         }
    188 
    189         // extending classes default to threaded, but for a standard server, we can't get slowlorissed
    190         _usePool = !getClass().equals(I2PTunnelServer.class);
    191         if (_usePool) {
    192             String usePool = getTunnel().getClientOptions().getProperty(PROP_USE_POOL);
    193             if (usePool != null)
    194                 _usePool = Boolean.parseBoolean(usePool);
    195             else
    196                 _usePool = DEFAULT_USE_POOL;
    197         }
    198 
    199         // Todo: Can't stop a tunnel from the UI while it's in this loop (no session yet)
     199            I2PSocketManager rv = I2PSocketManagerFactory.createDisconnectedManager(privData, getTunnel().host,
     200                                                                                    portNum, props);
     201            rv.setName("Server");
     202            getTunnel().addSession(rv.getSession());
     203            return rv;
     204        } catch (I2PSessionException ise) {
     205            throw new IllegalArgumentException("Can't create socket manager", ise);
     206        } finally {
     207            try { privData.close(); } catch (IOException ioe) {}
     208        }
     209    }
     210
     211
     212    /**
     213     * Warning, blocks while connecting to router and building tunnels;
     214     *
     215     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
     216     *                                  badly that we cant create a socketManager
     217     * @since 0.9.8
     218     */
     219    private void connectManager() {
    200220        int retries = 0;
    201         while (sockMgr == null) {
    202             synchronized (slock) {
    203                 sockMgr = I2PSocketManagerFactory.createManager(privDataCopy, getTunnel().host, portNum,
    204                                                                 props);
    205 
    206             }
    207             if (sockMgr == null) {
     221        while (sockMgr.getSession().isClosed()) {
     222            try {
     223                sockMgr.getSession().connect();
     224            } catch (I2PSessionException ise) {
    208225                // try to make this error sensible as it will happen...
     226                String portNum = getTunnel().port;
     227                if (portNum == null)
     228                    portNum = "7654";
    209229                String msg = "Unable to connect to the router at " + getTunnel().host + ':' + portNum +
    210                              " and build tunnels for the server at " + host.getHostAddress() + ':' + port;
     230                             " and build tunnels for the server at " + remoteHost.getHostAddress() + ':' + remotePort;
    211231                if (++retries < MAX_RETRIES) {
    212                     this.l.log(msg + ", retrying in " + (RETRY_DELAY / 1000) + " seconds");
    213                     _log.error(msg + ", retrying in " + (RETRY_DELAY / 1000) + " seconds");
     232                    msg += ", retrying in " + (RETRY_DELAY / 1000) + " seconds";
     233                    this.l.log(msg);
     234                    _log.error(msg);
    214235                } else {
    215                     this.l.log(msg + ", giving up");
    216                     _log.log(Log.CRIT, msg + ", giving up");
    217                     throw new IllegalArgumentException(msg);
     236                    msg += ", giving up";
     237                    this.l.log(msg);
     238                    _log.log(Log.CRIT, msg, ise);
     239                    throw new IllegalArgumentException(msg, ise);
    218240                }
    219241                try { Thread.sleep(RETRY_DELAY); } catch (InterruptedException ie) {}
    220                 privDataCopy.reset();
    221             }
    222         }
    223 
    224         sockMgr.setName("Server");
    225         getTunnel().addSession(sockMgr.getSession());
    226         l.log("Tunnels ready for server at " + host.getHostAddress() + ':' + port);
     242            }
     243        }
     244
     245        l.log("Tunnels ready for server at " + remoteHost.getHostAddress() + ':' + remotePort);
    227246        notifyEvent("openServerResult", "ok");
    228247        open = true;
     
    250269    /**
    251270     * Start running the I2PTunnelServer.
    252      *
    253      * TODO: Wait to connect to router until here.
    254      */
    255     public void startRunning() {
     271     * Warning, blocks while connecting to router and building tunnels;
     272     *
     273     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
     274     *                                  badly that we cant create a socketManager
     275     */
     276    public synchronized void startRunning() {
     277        connectManager();
    256278        // prevent JVM exit when running outside the router
    257279        boolean isDaemon = getTunnel().getContext().isRouterContext();
     
    406428    /** just to set the name and set Daemon */
    407429    private static class CustomThreadFactory implements ThreadFactory {
    408         private String _name;
     430        private final String _name;
    409431
    410432        public CustomThreadFactory(String name) {
     
    426448     */
    427449    private class Handler implements Runnable {
    428         private I2PSocket _i2ps;
     450        private final I2PSocket _i2ps;
    429451
    430452        public Handler(I2PSocket socket) {
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java

    r9a1e1a9 rf3c4a26  
    2727
    2828    public static final String PROP_MANAGER = "i2p.streaming.manager";
    29     //public static final String DEFAULT_MANAGER = "net.i2p.client.streaming.I2PSocketManagerImpl";
    3029    public static final String DEFAULT_MANAGER = "net.i2p.client.streaming.I2PSocketManagerFull";
    3130   
     
    4847     * Blocks for a long time while the router builds tunnels.
    4948     *
    50      * @param opts I2CP options
     49     * @param opts Streaming and I2CP options, may be null
    5150     * @return the newly created socket manager, or null if there were errors
    5251     */
     
    6160     * Blocks for a long time while the router builds tunnels.
    6261     *
    63      * @param host I2CP host
    64      * @param port I2CP port
     62     * @param host I2CP host null to use default
     63     * @param port I2CP port <= 0 to use default
    6564     * @return the newly created socket manager, or null if there were errors
    6665     */
     
    7574     * Blocks for a long time while the router builds tunnels.
    7675     *
    77      * @param i2cpHost I2CP host
    78      * @param i2cpPort I2CP port
    79      * @param opts I2CP options
     76     * @param i2cpHost I2CP host null to use default
     77     * @param i2cpPort I2CP port <= 0 to use default
     78     * @param opts Streaming and I2CP options, may be null
    8079     * @return the newly created socket manager, or null if there were errors
    8180     */
     
    103102     *
    104103     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
     104     *                           or null for a transient destination. Caller must close.
    105105     * @return the newly created socket manager, or null if there were errors
    106106     */
     
    116116     *
    117117     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    118      * @param opts I2CP options
     118     *                           or null for a transient destination. Caller must close.
     119     * @param opts Streaming and I2CP options, may be null
    119120     * @return the newly created socket manager, or null if there were errors
    120121     */
     
    131132     *
    132133     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    133      * @param i2cpHost I2CP host
    134      * @param i2cpPort I2CP port
    135      * @param opts I2CP options
     134     *                           or null for a transient destination. Caller must close.
     135     * @param i2cpHost I2CP host null to use default
     136     * @param i2cpPort I2CP port <= 0 to use default
     137     * @param opts Streaming and I2CP options, may be null
    136138     * @return the newly created socket manager, or null if there were errors
    137139     */
    138140    public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort,
    139141                                                 Properties opts) {
     142        try {
     143            return createManager(myPrivateKeyStream, i2cpHost, i2cpPort, opts, true);
     144        } catch (I2PSessionException ise) {
     145            getLog().error("Error creating session for socket manager", ise);
     146            return null;
     147        }
     148    }
     149   
     150    /**
     151     * Create a disconnected socket manager using the destination loaded from the given private key
     152     * stream, or null for a transient destination.
     153     *
     154     * Non-blocking. Does not connect to the router or build tunnels.
     155     * For servers, caller MUST call getSession().connect() to build tunnels and start listening.
     156     * For clients, caller may do that to build tunnels in advance;
     157     * otherwise, the first call to connect() will initiate a connection to the router,
     158     * with significant delay for tunnel building.
     159     *
     160     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
     161     *                           or null for a transient destination. Caller must close.
     162     * @param i2cpHost I2CP host null to use default
     163     * @param i2cpPort I2CP port <= 0 to use default
     164     * @param opts Streaming and I2CP options, may be null
     165     * @return the newly created socket manager, non-null (throws on error)
     166     * @since 0.9.8
     167     */
     168    public static I2PSocketManager createDisconnectedManager(InputStream myPrivateKeyStream, String i2cpHost,
     169                                                             int i2cpPort, Properties opts) throws I2PSessionException {
     170        if (myPrivateKeyStream == null) {
     171            I2PClient client = I2PClientFactory.createClient();
     172            ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512);
     173            try {
     174                client.createDestination(keyStream);
     175            } catch (Exception e) {
     176                throw new I2PSessionException("Error creating keys", e);
     177            }
     178            myPrivateKeyStream = new ByteArrayInputStream(keyStream.toByteArray());
     179        }
     180        return createManager(myPrivateKeyStream, i2cpHost, i2cpPort, opts, false);
     181    }
     182   
     183    /**
     184     * Create a socket manager using the destination loaded from the given private key
     185     * stream and connected to the I2CP router on the specified machine on the given
     186     * port.
     187     *
     188     * Blocks for a long time while the router builds tunnels if connect is true.
     189     *
     190     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
     191     *                           non-null. Caller must close.
     192     * @param i2cpHost I2CP host null to use default
     193     * @param i2cpPort I2CP port <= 0 to use default
     194     * @param opts Streaming and I2CP options, may be null
     195     * @param connect true to connect (blocking)
     196     * @return the newly created socket manager, non-null (throws on error)
     197     * @since 0.9.7
     198     */
     199    private static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort,
     200                                                 Properties opts, boolean connect) throws I2PSessionException {
    140201        I2PClient client = I2PClientFactory.createClient();
    141202        if (opts == null)
     
    147208                opts.setProperty(name, (String) e.getValue());
    148209        }
    149         //boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER));
    150         //if (oldLib && false) {
    151             // for the old streaming lib
    152         //    opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
    153             //opts.setProperty("tunnels.depthInbound", "0");
    154         //} else {
    155             // for new streaming lib:
    156             //opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
    157             // as of 0.8.1 (I2CP default is BestEffort)
    158             if (!opts.containsKey(I2PClient.PROP_RELIABILITY))
    159                 opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE);
    160             //p.setProperty("tunnels.depthInbound", "0");
    161         //}
     210        // as of 0.8.1 (I2CP default is BestEffort)
     211        if (!opts.containsKey(I2PClient.PROP_RELIABILITY))
     212            opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE);
    162213
    163214        if (i2cpHost != null)
     
    166217            opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort);
    167218       
    168         try {
    169             I2PSession session = client.createSession(myPrivateKeyStream, opts);
     219        I2PSession session = client.createSession(myPrivateKeyStream, opts);
     220        if (connect)
    170221            session.connect();
    171             I2PSocketManager sockMgr = createManager(session, opts, "manager");
    172             return sockMgr;
    173         } catch (I2PSessionException ise) {
    174             getLog().error("Error creating session for socket manager", ise);
    175             return null;
    176         }
     222        I2PSocketManager sockMgr = createManager(session, opts, "manager");
     223        return sockMgr;
    177224    }
    178225
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    r9a1e1a9 rf3c4a26  
    8181    /** writer message queue */
    8282    protected ClientWriterRunner _writer;
    83     /** where we pipe our messages */
    84     protected /* FIXME final FIXME */OutputStream _out;
    8583
    8684    /**
    8785     *  Used for internal connections to the router.
    88      *  If this is set, _socket, _writer, and _out will be null.
     86     *  If this is set, _socket and _writer will be null.
    8987     *  @since 0.8.3
    9088     */
     
    112110    private final Object _leaseSetWait = new Object();
    113111
    114     /** whether the session connection has already been closed (or not yet opened) */
    115     protected volatile boolean _closed;
    116 
    117     /** whether the session connection is in the process of being closed */
    118     protected volatile boolean _closing;
     112    /**
     113     *  @since 0.9.8
     114     */
     115    protected enum State {
     116        OPENING,
     117        OPEN,
     118        CLOSING,
     119        CLOSED
     120    }
     121
     122    protected State _state = State.CLOSED;
     123    protected final Object _stateLock = new Object();
    119124
    120125    /** have we received the current date from the router yet? */
     
    123128    private final Object _dateReceivedLock = new Object();
    124129
    125     /** whether the session connection is in the process of being opened */
    126     protected volatile boolean _opening;
    127 
    128     /** monitor for waiting until opened */
    129     private final Object _openingWait = new Object();
    130130    /**
    131131     * thread that we tell when new messages are available who then tells us
     
    169169
    170170    private static final int BUF_SIZE = 32*1024;
    171    
     171
    172172    /**
    173173     * for extension by SimpleSession (no dest)
     
    184184        _context = context;
    185185        _log = context.logManager().getLog(getClass());
    186         _closed = true;
    187186        if (options == null)
    188187            options = (Properties) System.getProperties().clone();
     
    352351    }
    353352
    354     void setOpening(boolean ls) {
    355         _opening = ls;
    356         synchronized (_openingWait) {
    357             _openingWait.notifyAll();
    358         }
    359     }
    360 
    361     boolean getOpening() {
    362         return _opening;
     353    protected void changeState(State state) {
     354        synchronized (_stateLock) {
     355            _state = state;
     356            _stateLock.notifyAll();
     357        }
    363358    }
    364359
     
    379374     * a session is granted.
    380375     *
     376     * Should be threadsafe, other threads will block until complete.
     377     * Disconnect / destroy from another thread may be called simultaneously and
     378     * will (should?) interrupt the connect.
     379     *
    381380     * @throws I2PSessionException if there is a configuration error or the router is
    382381     *                             not reachable
    383382     */
    384383    public void connect() throws I2PSessionException {
    385         setOpening(true);
    386         _closed = false;
     384        synchronized(_stateLock) {
     385            boolean wasOpening = false;
     386            boolean loop = true;
     387            while (loop) {
     388                switch (_state) {
     389                    case CLOSED:
     390                        if (wasOpening)
     391                            throw new I2PSessionException("connect by other thread failed");
     392                        loop = false;
     393                        break;
     394                    case OPENING:
     395                        wasOpening = true;
     396                        try {
     397                            _stateLock.wait();
     398                        } catch (InterruptedException ie) {}
     399                        break;
     400                    case CLOSING:
     401                        throw new I2PSessionException("close in progress");
     402                    case OPEN:
     403                        return;
     404                }
     405            }
     406            changeState(State.OPENING);
     407        }
     408
    387409        _availabilityNotifier.stopNotifying();
    388410       
     
    393415        }
    394416           
     417        boolean success = false;
    395418        long startConnect = _context.clock().now();
    396419        try {
    397             // If we are in the router JVM, connect using the interal queue
    398             if (_context.isRouterContext()) {
    399                 // _socket, _out, and _writer remain null
    400                 InternalClientManager mgr = _context.internalClientManager();
    401                 if (mgr == null)
    402                     throw new I2PSessionException("Router is not ready for connections");
    403                 // the following may throw an I2PSessionException
    404                 _queue = mgr.connect();
    405                 _reader = new QueuedI2CPMessageReader(_queue, this);
    406             } else {
    407                 if (Boolean.parseBoolean(_options.getProperty(PROP_ENABLE_SSL)))
    408                     _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
    409                 else
    410                     _socket = new Socket(_hostname, _portNum);
    411                 // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
    412                 _out = _socket.getOutputStream();
    413                 _out.write(I2PClient.PROTOCOL_BYTE);
    414                 _out.flush();
    415                 _writer = new ClientWriterRunner(_out, this);
    416                 InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
    417                 _reader = new I2CPMessageReader(in, this);
     420            // protect w/ closeSocket()
     421            synchronized(_stateLock) {
     422                // If we are in the router JVM, connect using the interal queue
     423                if (_context.isRouterContext()) {
     424                    // _socket and _writer remain null
     425                    InternalClientManager mgr = _context.internalClientManager();
     426                    if (mgr == null)
     427                        throw new I2PSessionException("Router is not ready for connections");
     428                    // the following may throw an I2PSessionException
     429                    _queue = mgr.connect();
     430                    _reader = new QueuedI2CPMessageReader(_queue, this);
     431                } else {
     432                    if (Boolean.parseBoolean(_options.getProperty(PROP_ENABLE_SSL)))
     433                        _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
     434                    else
     435                        _socket = new Socket(_hostname, _portNum);
     436                    // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
     437                    OutputStream out = _socket.getOutputStream();
     438                    out.write(I2PClient.PROTOCOL_BYTE);
     439                    out.flush();
     440                    _writer = new ClientWriterRunner(out, this);
     441                    InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
     442                    _reader = new I2CPMessageReader(in, this);
     443                }
    418444            }
    419445            Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
     
    467493            startIdleMonitor();
    468494            startVerifyUsage();
    469              setOpening(false);
     495            success = true;
    470496        } catch (UnknownHostException uhe) {
    471             _closed = true;
    472             setOpening(false);
    473497            throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe);
    474498        } catch (IOException ioe) {
    475             _closed = true;
    476             setOpening(false);
    477499            throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe);
     500        } finally {
     501            changeState(success ? State.OPEN : State.CLOSED);
    478502        }
    479503    }
     
    571595     */
    572596    protected class AvailabilityNotifier implements Runnable {
    573         private final List _pendingIds;
    574         private final List _pendingSizes;
     597        private final List<Long> _pendingIds;
     598        private final List<Integer> _pendingSizes;
    575599        private volatile boolean _alive;
    576600 
     
    607631                    }
    608632                    if (!_pendingIds.isEmpty()) {
    609                         msgId = (Long)_pendingIds.remove(0);
    610                         size = (Integer)_pendingSizes.remove(0);
     633                        msgId = _pendingIds.remove(0);
     634                        size = _pendingSizes.remove(0);
    611635                    }
    612636                }
     
    696720    public void setSessionListener(I2PSessionListener lsnr) { _sessionListener = lsnr; }
    697721
    698     /** has the session been closed (or not yet connected)? */
    699     public boolean isClosed() { return _closed; }
     722    /**
     723     *  Has the session been closed (or not yet connected)?
     724     *  False when open and during transitions. Unsynchronized.
     725     */
     726    public boolean isClosed() { return _state == State.CLOSED; }
    700727
    701728    /**
     
    757784     * Tear down the session, and do NOT reconnect.
    758785     *
    759      * Blocks if session has not been fully started.
     786     * Will interrupt an open in progress.
    760787     */
    761788    public void destroySession(boolean sendDisconnect) {
    762         while (_opening) {
    763             synchronized (_openingWait) {
    764                 try {
    765                     _openingWait.wait(1000);
    766                 } catch (InterruptedException ie) { // nop
    767                 }
    768             }
    769         }
    770         if (_closed) return;
     789        synchronized(_stateLock) {
     790            if (_state == State.CLOSING || _state == State.CLOSED)
     791                return;
     792            changeState(State.CLOSING);
     793        }
    771794       
    772795        if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Destroy the session", new Exception("DestroySession()"));
    773         _closing = true;   // we use this to prevent a race
    774796        if (sendDisconnect && _producer != null) {    // only null if overridden by I2PSimpleSession
    775797            try {
     
    784806        if (_availabilityNotifier != null)
    785807            _availabilityNotifier.stopNotifying();
    786         _closed = true;
    787         _closing = false;
    788808        closeSocket();
    789809        if (_sessionListener != null) _sessionListener.disconnected(this);
     
    791811
    792812    /**
    793      * Close the socket carefully
    794      *
     813     * Close the socket carefully.
    795814     */
    796815    private void closeSocket() {
     816        synchronized(_stateLock) {
     817            changeState(State.CLOSING);
     818            locked_closeSocket();
     819            changeState(State.CLOSED);
     820        }
     821    }
     822
     823    /**
     824     * Close the socket carefully.
     825     * Caller must change state.
     826     */
     827    private void locked_closeSocket() {
    797828        if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Closing the socket", new Exception("closeSocket"));
    798         _closed = true;
    799829        if (_reader != null) {
    800830            _reader.stopReading();
     
    831861    }
    832862
     863    /**
     864     * Will interrupt a connect in progress.
     865     */
    833866    protected void disconnect() {
    834         if (_closed || _closing) return;
     867        synchronized(_stateLock) {
     868            if (_state == State.CLOSING || _state == State.CLOSED)
     869                return;
     870            changeState(State.CLOSING);
     871        }
    835872        if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Disconnect() called", new Exception("Disconnect"));
    836873        if (shouldReconnect()) {
     
    843880
    844881        if (_log.shouldLog(Log.ERROR))
    845             _log.error(getPrefix() + "Disconned from the router, and not trying to reconnect further.  I hope you're not hoping anything else will happen");
     882            _log.error(getPrefix() + "Disconned from the router, and not trying to reconnect");
    846883        if (_sessionListener != null) _sessionListener.disconnected(this);
    847884
    848         _closed = true;
    849885        closeSocket();
     886        changeState(State.CLOSED);
    850887    }
    851888
     
    9711008                return rv;
    9721009        }
    973         if (_closed)
     1010        if (isClosed())
    9741011            return null;
    9751012        LookupWaiter waiter = new LookupWaiter(h);
     
    9971034     */
    9981035    public int[] bandwidthLimits() throws I2PSessionException {
    999         if (_closed)
     1036        if (isClosed())
    10001037            return null;
    10011038        sendMessage(new GetBandwidthLimitsMessage());
  • core/java/src/net/i2p/client/I2PSimpleSession.java

    r9a1e1a9 rf3c4a26  
    99import java.io.IOException;
    1010import java.io.InputStream;
     11import java.io.OutputStream;
    1112import java.net.Socket;
    1213import java.net.UnknownHostException;
     
    4647     * a session is granted.
    4748     *
     49     * NOT threadsafe, do not call from multiple threads.
     50     *
    4851     * @throws I2PSessionException if there is a configuration error or the router is
    4952     *                             not reachable
     
    5154    @Override
    5255    public void connect() throws I2PSessionException {
    53         _closed = false;
    54        
     56        changeState(State.OPENING);
     57        boolean success = false;
    5558        try {
    56             // If we are in the router JVM, connect using the interal queue
    57             if (_context.isRouterContext()) {
    58                 // _socket, _out, and _writer remain null
    59                 InternalClientManager mgr = _context.internalClientManager();
    60                 if (mgr == null)
    61                     throw new I2PSessionException("Router is not ready for connections");
    62                 // the following may throw an I2PSessionException
    63                 _queue = mgr.connect();
    64                 _reader = new QueuedI2CPMessageReader(_queue, this);
    65             } else {
    66                 if (Boolean.parseBoolean(getOptions().getProperty(PROP_ENABLE_SSL)))
    67                     _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
    68                 else
    69                     _socket = new Socket(_hostname, _portNum);
    70                 _out = _socket.getOutputStream();
    71                 _out.write(I2PClient.PROTOCOL_BYTE);
    72                 _out.flush();
    73                 _writer = new ClientWriterRunner(_out, this);
    74                 InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
    75                 _reader = new I2CPMessageReader(in, this);
     59            // protect w/ closeSocket()
     60            synchronized(_stateLock) {
     61                // If we are in the router JVM, connect using the interal queue
     62                if (_context.isRouterContext()) {
     63                    // _socket and _writer remain null
     64                    InternalClientManager mgr = _context.internalClientManager();
     65                    if (mgr == null)
     66                        throw new I2PSessionException("Router is not ready for connections");
     67                    // the following may throw an I2PSessionException
     68                    _queue = mgr.connect();
     69                    _reader = new QueuedI2CPMessageReader(_queue, this);
     70                } else {
     71                    if (Boolean.parseBoolean(getOptions().getProperty(PROP_ENABLE_SSL)))
     72                        _socket = I2CPSSLSocketFactory.createSocket(_context, _hostname, _portNum);
     73                    else
     74                        _socket = new Socket(_hostname, _portNum);
     75                    OutputStream out = _socket.getOutputStream();
     76                    out.write(I2PClient.PROTOCOL_BYTE);
     77                    out.flush();
     78                    _writer = new ClientWriterRunner(out, this);
     79                    InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
     80                    _reader = new I2CPMessageReader(in, this);
     81                }
    7682            }
    7783            // we do not receive payload messages, so we do not need an AvailabilityNotifier
    7884            // ... or an Idle timer, or a VerifyUsage
    7985            _reader.startReading();
    80 
     86            success = true;
    8187        } catch (UnknownHostException uhe) {
    82             _closed = true;
    8388            throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe);
    8489        } catch (IOException ioe) {
    85             _closed = true;
    8690            throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe);
     91        } finally {
     92            changeState(success ? State.OPEN : State.CLOSED);
    8793        }
    8894    }
Note: See TracChangeset for help on using the changeset viewer.