Changeset f341e55


Ignore:
Timestamp:
Jun 10, 2015 7:14:33 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
605602e
Parents:
7b84676
Message:

Pass session in connect();
Store the session in Connection;
Don't create a new ConnectionManager? for a subsession,
now that all components track the session properly.
@since updates

Location:
apps/streaming/java/src/net/i2p/client/streaming/impl
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java

    r7b84676 rf341e55  
    2929    private final Log _log;
    3030    private final ConnectionManager _connectionManager;
     31    private final I2PSession _session;
    3132    private Destination _remotePeer;
    3233    private final AtomicLong _sendStreamId = new AtomicLong();
     
    113114     *  @param opts may be null
    114115     */
    115     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
     116    public Connection(I2PAppContext ctx, ConnectionManager manager,
     117                      I2PSession session, SchedulerChooser chooser,
    116118                      SimpleTimer2 timer,
    117119                      PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts,
     
    119121        _context = ctx;
    120122        _connectionManager = manager;
     123        _session = session;
    121124        _chooser = chooser;
    122125        _outboundQueue = queue;
     
    878881    public ConnectionManager getConnectionManager() { return _connectionManager; }
    879882
    880     public I2PSession getSession() { return _connectionManager.getSession(); }
     883    public I2PSession getSession() { return _session; }
    881884    public I2PSocketFull getSocket() { return _socket; }
    882885    public void setSocket(I2PSocketFull socket) { _socket = socket; }
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java

    r7b84676 rf341e55  
    215215        opts.setPort(synPacket.getRemotePort());
    216216        opts.setLocalPort(synPacket.getLocalPort());
    217         Connection con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, true);
     217        Connection con = new Connection(_context, this, synPacket.getSession(), _schedulerChooser,
     218                                        _timer, _outboundQueue, _conPacketHandler, opts, true);
    218219        _tcbShare.updateOptsFromShare(con);
    219220        boolean reject = false;
     
    394395     * @param peer Destination to contact
    395396     * @param opts Connection's options
     397     * @param session generally the session from the constructor, but could be a subsession
    396398     * @return new connection, or null if we have exceeded our limit
    397399     */
    398     public Connection connect(Destination peer, ConnectionOptions opts) {
     400    public Connection connect(Destination peer, ConnectionOptions opts, I2PSession session) {
    399401        Connection con = null;
    400402        long expiration = _context.clock().now();
     
    430432                    try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
    431433                } else {
    432                     con = new Connection(_context, this, _schedulerChooser, _timer,
     434                    con = new Connection(_context, this, session, _schedulerChooser, _timer,
    433435                                         _outboundQueue, _conPacketHandler, opts, false);
    434436                    con.setRemotePeer(peer);
     
    592594    public MessageHandler getMessageHandler() { return _messageHandler; }
    593595    public PacketHandler getPacketHandler() { return _packetHandler; }
     596
     597    /**
     598     * This is the primary session only
     599     */
    594600    public I2PSession getSession() { return _session; }
     601
    595602    public void updateOptsFromShare(Connection con) { _tcbShare.updateOptsFromShare(con); }
    596603    public void updateShareOpts(Connection con) { _tcbShare.updateShareOpts(con); }
  • apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java

    r7b84676 rf341e55  
    1515import java.util.Properties;
    1616import java.util.Set;
    17 import java.util.concurrent.ConcurrentHashMap;
    1817import java.util.concurrent.atomic.AtomicBoolean;
    1918import java.util.concurrent.atomic.AtomicInteger;
     
    3635import net.i2p.data.SimpleDataStructure;
    3736import net.i2p.util.ConvertToHash;
     37import net.i2p.util.ConcurrentHashSet;
    3838import net.i2p.util.Log;
    3939
     
    5252    private final Log _log;
    5353    private final I2PSession _session;
    54     private final ConcurrentHashMap<I2PSession, ConnectionManager> _subsessions;
     54    private final Set<I2PSession> _subsessions;
    5555    private final I2PServerSocketFull _serverSocket;
    5656    private StandardServerSocket _realServerSocket;
     
    6262    private final AtomicBoolean _isDestroyed = new AtomicBoolean();
    6363
    64     /** @since 0.9.20 */
     64    /** @since 0.9.21 */
    6565    private static final Set<Hash> _dsaOnly = new HashSet<Hash>(16);
    6666    private static final String[] DSA_ONLY_HASHES = {
     
    141141        _context = context;
    142142        _session = session;
    143         _subsessions = new ConcurrentHashMap<I2PSession, ConnectionManager>(4);
     143        _subsessions = new ConcurrentHashSet<I2PSession>(4);
    144144        _log = _context.logManager().getLog(I2PSocketManagerFull.class);
    145145       
     
    187187     *                          and different signing keys
    188188     *  @param opts subsession options if any, may be null
    189      *  @since 0.9.19
     189     *  @since 0.9.21
    190190     */
    191191    public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
     
    215215        }
    216216        I2PSession rv = _session.addSubsession(privateKeyStream, opts);
    217         ConnectionOptions defaultOptions = new ConnectionOptions(opts);
    218         ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions);
    219         ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager);
    220         if (old != null) {
     217        boolean added = _subsessions.add(rv);
     218        if (!added) {
    221219            // shouldn't happen
    222220            _session.removeSubsession(rv);
    223             connectionManager.shutdown();
    224221            throw new I2PSessionException("dup");
    225222        }
     223        ConnectionOptions defaultOptions = new ConnectionOptions(opts);
     224        int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
     225        rv.addMuxedSessionListener(_connectionManager.getMessageHandler(), protocol, defaultOptions.getLocalPort());
    226226        if (_log.shouldLog(Log.WARN))
    227227            _log.warn("Added subsession " + rv);
     
    231231    /**
    232232     *  @param opts may be null
    233      *  @since 0.9.20 copied from I2PSocketManagerFactory
     233     *  @since 0.9.21 copied from I2PSocketManagerFactory
    234234     */
    235235    private SigType getSigType(Properties opts) {
     
    253253     *  Remove the subsession
    254254     *
    255      *  @since 0.9.19
     255     *  @since 0.9.21
    256256     */
    257257    public void removeSubsession(I2PSession session) {
    258258        _session.removeSubsession(session);
    259         ConnectionManager cm = _subsessions.remove(session);
    260         if (cm != null) {
    261             cm.shutdown();
     259        boolean removed = _subsessions.remove(session);
     260        if (removed) {
    262261            if (_log.shouldLog(Log.WARN))
    263262                _log.warn("Removeed subsession " + session);
     
    270269    /**
    271270     *  @return a list of subsessions, non-null, does not include the primary session
    272      *  @since 0.9.19
     271     *  @since 0.9.21
    273272     */
    274273    public List<I2PSession> getSubsessions() {
     
    282281    /**
    283282     * The accept() call.
     283     *
     284     * This only listens on the primary session. There is no way to get
     285     * incoming connections on a subsession.
    284286     *
    285287     * @return connected I2PSocket, or null through 0.9.16, non-null as of 0.9.17
     
    302304     * Uses the ports from the default options.
    303305     *
     306     * TODO There is no way to ping on a subsession.
     307     *
    304308     * @param peer
    305309     * @param timeoutMs timeout in ms, greater than zero
     
    319323     *
    320324     * Uses the ports specified.
     325     *
     326     * TODO There is no way to ping on a subsession.
    321327     *
    322328     * @param peer Destination to ping
     
    342348     *
    343349     * Uses the ports specified.
     350     *
     351     * TODO There is no way to ping on a subsession.
    344352     *
    345353     * @param peer Destination to ping
     
    375383     *  with the setters; no need to use this method for those.
    376384     *  This does NOT update the underlying I2CP or tunnel options; use getSession().updateOptions() for that.
     385     *
     386     *  TODO There is no way to update the options on a subsession.
    377387     *
    378388     *  @param options as created from a call to buildOptions(properties), non-null
     
    389399    /**
    390400     *  Current options, not a copy, setters may be used to make changes.
     401     *
     402     *  TODO There is no facility to specify the session.
    391403     */
    392404    public I2PSocketOptions getDefaultOptions() {
     
    398410     *  This method does not throw exceptions, but methods on the returned socket
    399411     *  may throw exceptions if the socket or socket manager is closed.
     412     *
     413     *  This only listens on the primary session. There is no way to get
     414     *  incoming connections on a subsession.
    400415     *
    401416     *  @return non-null
     
    408423    /**
    409424     *  Like getServerSocket but returns a real ServerSocket for easier porting of apps.
     425     *
     426     *  This only listens on the primary session. There is no way to get
     427     *  incoming connections on a subsession.
     428     *
    410429     *  @since 0.8.4
    411430     */
     
    418437
    419438    private void verifySession() throws I2PException {
    420         verifySession(_connectionManager);
    421     }
    422 
    423     /** @since 0.9.20 */
    424     private void verifySession(ConnectionManager cm) throws I2PException {
     439        verifySession(_connectionManager.getSession());
     440    }
     441
     442    /** @since 0.9.21 */
     443    private void verifySession(I2PSession session) throws I2PException {
    425444        if (_isDestroyed.get())
    426445            throw new I2PException("Session was closed");
    427         if (!cm.getSession().isClosed())
     446        if (!session.isClosed())
    428447            return;
    429         cm.getSession().connect();
     448        session.connect();
    430449    }
    431450   
     
    458477                      + " with options: " + opts);
    459478        // pick the subsession here
    460         ConnectionManager cm = _connectionManager;
     479        I2PSession session = _session;
    461480        if (!_subsessions.isEmpty()) {
    462481            Hash h = peer.calculateHash();
    463482            if (_dsaOnly.contains(h)) {
    464483                // FIXME just taking the first one for now
    465                 for (Map.Entry<I2PSession, ConnectionManager> e : _subsessions.entrySet()) {
    466                     if (e.getKey().getMyDestination().getSigType() == SigType.DSA_SHA1) {
    467                         cm = e.getValue();
     484                for (I2PSession sess : _subsessions) {
     485                    if (sess.getMyDestination().getSigType() == SigType.DSA_SHA1) {
     486                        session = sess;
    468487                        break;
    469488                    }
     
    471490            }
    472491        }
    473         verifySession(cm);
     492        verifySession(session);
    474493        // the following blocks unless connect delay > 0
    475         Connection con = cm.connect(peer, opts);
     494        Connection con = _connectionManager.connect(peer, opts, session);
    476495        if (con == null)
    477496            throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns());
     
    557576        _connectionManager.shutdown();
    558577        if (!_subsessions.isEmpty()) {
    559             for (I2PSession sess : _subsessions.keySet()) {
     578            for (I2PSession sess : _subsessions) {
    560579                 removeSubsession(sess);
    561580            }
Note: See TracChangeset for help on using the changeset viewer.