Changeset 97c1676b


Ignore:
Timestamp:
Jul 20, 2013 5:37:46 PM (7 years ago)
Author:
zab2 <zab2@…>
Branches:
master
Children:
d709f46
Parents:
02b92ac
Message:

Rework locking and state management of NTCP connections

trac ticket #972
up version to -4

Location:
router/java/src/net/i2p/router
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/RouterVersion.java

    r02b92ac r97c1676b  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 3;
     21    public final static long BUILD = 4;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/ntcp/EstablishState.java

    r02b92ac r97c1676b  
    6060 */
    6161class EstablishState {
     62   
     63    public static final VerifiedEstablishState VERIFIED = new VerifiedEstablishState();
     64   
    6265    private final RouterContext _context;
    6366    private final Log _log;
     
    108111    private boolean _confirmWritten;
    109112    private boolean _failedBySkew;
     113   
     114    private EstablishState() {
     115        _context = null;
     116        _log = null;
     117        _X = null;
     118        _hX_xor_bobIdentHash = null;
     119        _curDecrypted = null;
     120        _dh = null;
     121        _transport = null;
     122        _con = null;
     123    }
    110124
    111125    public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
     
    774788        }
    775789    }
     790   
     791    private static class VerifiedEstablishState extends EstablishState {
     792        @Override public boolean isComplete() { return true; }
     793    }
    776794
    777795    /** @deprecated unused */
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    r02b92ac r97c1676b  
    750750            for (Iterator<NTCPConnection> iter = _wantsWrite.iterator(); iter.hasNext(); ) {
    751751                con = iter.next();
     752                SelectionKey key = con.getKey();
     753                if (key == null)
     754                    continue;
    752755                iter.remove();
    753                 SelectionKey key = con.getKey();
    754756                try {
    755757                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    r02b92ac r97c1676b  
    8383    private final Set<FIFOBandwidthLimiter.Request> _bwInRequests;
    8484    private final Set<FIFOBandwidthLimiter.Request> _bwOutRequests;
    85     private boolean _established;
    8685    private long _establishedOn;
    8786    private EstablishState _establishState;
     
    210209        _outboundListener = new OutboundListener();
    211210        initialize();
     211        _establishState = new EstablishState(ctx, transport, this);
    212212    }
    213213
     
    233233    public void setKey(SelectionKey key) { _conKey = key; }
    234234    public boolean isInbound() { return _isInbound; }
    235     public boolean isEstablished() { return _established; }
     235    public synchronized boolean isEstablished() { return _establishState.isComplete(); }
    236236
    237237    /**
     
    270270        //if (_log.shouldLog(Log.DEBUG))
    271271        //    _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
    272         _established = true;
    273272        _establishedOn = System.currentTimeMillis();
    274273        _transport.inboundEstablished(this);
    275         _establishState = null;
    276274        _nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
    277275        _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
     276        _establishState = EstablishState.VERIFIED;
    278277    }
    279278
     
    283282    /** @return milliseconds */
    284283    public long getUptime() {
    285         if (!_established)
     284        if (!isEstablished())
    286285            return getTimeSinceCreated();
    287286        else
     
    334333        if (_chan != null) try { _chan.close(); } catch (IOException ioe) { }
    335334        if (_conKey != null) _conKey.cancel();
    336         _establishState = null;
     335        _establishState = EstablishState.VERIFIED;
    337336        _transport.removeCon(this);
    338337        _transport.getReader().connectionClosed(this);
     
    410409        boolean noOutbound = (_currentOutbound == null);
    411410        //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
    412         if (_established && noOutbound)
     411        if (isEstablished() && noOutbound)
    413412            _transport.getWriter().wantsWrite(this, "enqueued");
    414413    }
     
    542541            _log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
    543542
    544         _established = true;
    545543        _establishedOn = System.currentTimeMillis();
    546         _establishState = null;
     544        _establishState = EstablishState.VERIFIED;
    547545        _transport.markReachable(getRemotePeer().calculateHash(), false);
    548546        //_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE);
     
    694692        //if (_log.shouldLog(Log.DEBUG))
    695693        //    _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
    696         if (!_isInbound && !_established) {
    697             if (_establishState == null) {
    698                 // shouldn't happen
    699                 _establishState = new EstablishState(_context, _transport, this);
    700                 _establishState.prepareOutbound();
    701             } else {
    702                 if (_log.shouldLog(Log.DEBUG))
    703                     _log.debug("prepare next write, but we have already prepared the first outbound and we are not yet established..." + toString());
    704             }
     694        if (!_isInbound && !isEstablished()) {
    705695            return;
    706696        }
     
    15361526               (_isInbound ? "from " : "to ") +
    15371527               (_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) +
    1538                (_established ? "" : " not established") +
     1528               (isEstablished() ? "" : " not established") +
    15391529               " created " + DataHelper.formatDuration(getTimeSinceCreated()) + " ago";
    15401530    }
  • router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java

    r02b92ac r97c1676b  
    232232                    channel.configureBlocking(false);
    233233                    _pumper.registerConnect(con);
     234                    con.getEstablishState().prepareOutbound();
    234235                } catch (IOException ioe) {
    235236                    if (_log.shouldLog(Log.ERROR))
  • router/java/src/net/i2p/router/transport/ntcp/Reader.java

    r02b92ac r97c1676b  
    133133     */
    134134    private void processRead(NTCPConnection con) {
    135         if (con.isClosed())
    136             return;
    137135        ByteBuffer buf = null;
    138         while (!con.isClosed() && !con.isEstablished() && ( (buf = con.getNextReadBuf()) != null) ) {
     136        while(true) {
     137            synchronized(con) {
     138                if (con.isClosed())
     139                    return;
     140                if (con.isEstablished())
     141                    break;
     142            }
     143            if ((buf = con.getNextReadBuf()) == null)
     144                return;
    139145            EstablishState est = con.getEstablishState();
    140146            if (_log.shouldLog(Log.DEBUG))
    141147                _log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
    142             if (est == null) {
    143                 EventPumper.releaseBuf(buf);
    144                 if (!con.isEstablished()) {
    145                     // establish state is only removed when the connection is fully established,
    146                     // yet if that happens, con.isEstablished() should return true...
    147                     throw new RuntimeException("connection was not established, yet the establish state is null for " + con);
    148                 } else {
    149                     // hmm, there shouldn't be a race here - only one reader should
    150                     // be running on a con at a time...
    151                     _log.error("no establishment state but " + con + " is established... race?");
    152                     break;
    153                 }
    154             }
     148           
    155149            if (est.isComplete()) {
    156150                // why is it complete yet !con.isEstablished?
    157                     _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
    158                                + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
     151                _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
     152                        + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
    159153                EventPumper.releaseBuf(buf);
    160154                break;
     
    173167                con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
    174168        }
    175         // catch race?
    176         if (!con.isEstablished())
    177             return;
    178169        while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) {
    179170            // decrypt the data and push it into an i2np message
Note: See TracChangeset for help on using the changeset viewer.