Changeset 1119612


Ignore:
Timestamp:
Nov 21, 2011 6:22:13 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
9d0bafb
Parents:
dc6c568
Message:
  • NTCP Pumper:
    • Ensure failsafe pumper code gets run on schedule
    • Don't copy the read buffers
    • Adjust minimum read buffers based on memory
    • New i2np.ntcp.useDirectBuffer option (default false)
    • Mark peer unreachable when read failure is during establishment
    • Change some Reader Lists to Sets to avoid linear search
    • Log tweaks, debugging, new loop stats
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    rdc6c568 r1119612  
     12011-11-21 zzz
     2  * NTCP Pumper:
     3    - Ensure failsafe pumper code gets run on schedule
     4    - Don't copy the read buffers
     5    - Adjust minimum read buffers based on memory
     6    - New i2np.ntcp.useDirectBuffer option (default false)
     7    - Mark peer unreachable when read failure is during establishment
     8    - Change some Reader Lists to Sets to avoid linear search
     9    - Log tweaks, debugging, new loop stats
     10
    1112011-11-18 zzz
    212  * NTCP:
  • router/java/src/net/i2p/router/RouterVersion.java

    rdc6c568 r1119612  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 5;
     21    public final static long BUILD = 6;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/ntcp/EstablishState.java

    rdc6c568 r1119612  
    148148    /**
    149149     * parse the contents of the buffer as part of the handshake.  if the
    150      * handshake is completed and there is more data remaining, the buffer is
    151      * updated so that the next read will be the (still encrypted) remaining
     150     * handshake is completed and there is more data remaining, the data are
     151     * copieed out so that the next read will be the (still encrypted) remaining
    152152     * data (available from getExtraBytes)
     153     *
     154     * All data must be copied out of the buffer as Reader.processRead()
     155     * will return it to the pool.
    153156     */
    154157    public void receive(ByteBuffer src) {
     
    177180     *  we are Bob, so receive these bytes as part of an inbound connection
    178181     *  This method receives messages 1 and 3, and sends messages 2 and 4.
     182     *
     183     *  All data must be copied out of the buffer as Reader.processRead()
     184     *  will return it to the pool.
    179185     */
    180186    private void receiveInbound(ByteBuffer src) {
     
    341347     *  We are Alice, so receive these bytes as part of an outbound connection.
    342348     *  This method receives messages 2 and 4, and sends message 3.
     349     *
     350     *  All data must be copied out of the buffer as Reader.processRead()
     351     *  will return it to the pool.
    343352     */
    344353    private void receiveOutbound(ByteBuffer src) {
     
    685694    }
    686695
    687     /** anything left over in the byte buffer after verification is extra */
     696    /** Anything left over in the byte buffer after verification is extra
     697     *
     698     *  All data must be copied out of the buffer as Reader.processRead()
     699     *  will return it to the pool.
     700     */
    688701    private void prepareExtra(ByteBuffer buf) {
    689702        int remaining = buf.remaining();
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    rdc6c568 r1119612  
    2020import java.util.concurrent.LinkedBlockingQueue;
    2121
     22import net.i2p.I2PAppContext;
    2223import net.i2p.data.RouterIdentity;
    2324import net.i2p.data.RouterInfo;
     
    4546    private final NTCPTransport _transport;
    4647    private long _expireIdleWriteTime;
    47    
     48    private boolean _useDirect;
     49   
     50    /**
     51     *  This probably doesn't need to be bigger than the largest typical
     52     *  message, which is a 5-slot VTBM (~2700 bytes).
     53     *  The occasional larger message can use multiple buffers.
     54     */
    4855    private static final int BUF_SIZE = 8*1024;
    4956    private static final int MAX_CACHE_SIZE = 64;
    5057
    5158    /**
     59     *  Read buffers. (write buffers use wrap())
    5260     *  Shared if there are multiple routers in the JVM
     61     *  Note that if the routers have different PROP_DIRECT settings this will have a mix,
     62     *  so don't do that.
    5363     */
    5464    private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
     
    6878    private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l;
    6979
     80    /**
     81     *  Do we use direct buffers for reading? Default false.
     82     *  @see java.nio.ByteBuffer
     83     */
     84    private static final String PROP_DIRECT = "i2np.ntcp.useDirectBuffers";
     85
     86    private static final int MIN_MINB = 4;
     87    private static final int MAX_MINB = 12;
     88    private static final int MIN_BUFS;
     89    static {
     90        long maxMemory = Runtime.getRuntime().maxMemory();
     91        if (maxMemory == Long.MAX_VALUE)
     92            maxMemory = 96*1024*1024l;
     93        MIN_BUFS = (int) Math.max(MIN_MINB, Math.min(MAX_MINB, 1 + (maxMemory / (16*1024*1024))));
     94    }
     95
    7096    public EventPumper(RouterContext ctx, NTCPTransport transport) {
    7197        _context = ctx;
     
    7399        _transport = transport;
    74100        _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME;
     101        _context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} );
     102        _context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} );
     103        _context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} );
    75104    }
    76105   
     
    131160     */
    132161    public void run() {
     162        int loopCount = 0;
    133163        long lastFailsafeIteration = System.currentTimeMillis();
    134164        while (_alive && _selector.isOpen()) {
    135165            try {
     166                loopCount++;
    136167                runDelayedEvents();
    137                 int count = 0;
     168
    138169                try {
    139170                    //if (_log.shouldLog(Log.DEBUG))
    140171                    //    _log.debug("before select...");
    141                     count = _selector.select(SELECTOR_LOOP_DELAY);
     172                    int count = _selector.select(SELECTOR_LOOP_DELAY);
     173                    if (count > 0) {
     174                        //if (_log.shouldLog(Log.DEBUG))
     175                        //    _log.debug("select returned " + count);
     176                        Set<SelectionKey> selected = _selector.selectedKeys();
     177                        _context.statManager().addRateData("ntcp.pumperKeysPerLoop", selected.size());
     178                        processKeys(selected);
     179                        // does clear() do anything useful?
     180                        selected.clear();
     181                    }
     182                } catch (ClosedSelectorException cse) {
     183                    continue;
    142184                } catch (IOException ioe) {
    143185                    if (_log.shouldLog(Log.WARN))
    144186                        _log.warn("Error selecting", ioe);
    145187                }
    146                 if (count <= 0)
    147                     continue;
    148                 //if (_log.shouldLog(Log.DEBUG))
    149                 //    _log.debug("select returned " + count);
    150 
    151                 Set<SelectionKey> selected;
    152                 try {
    153                     selected = _selector.selectedKeys();
    154                 } catch (ClosedSelectorException cse) {
    155                     continue;
    156                 }
    157 
    158                 processKeys(selected);
    159                 selected.clear();
    160188               
    161189                if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) {
     
    167195                    try {
    168196                        Set<SelectionKey> all = _selector.keys();
     197                        _context.statManager().addRateData("ntcp.pumperKeySetSize", all.size());
     198                        _context.statManager().addRateData("ntcp.pumperLoopsPerSecond", loopCount / (FAILSAFE_ITERATION_FREQ / 1000));
     199                        loopCount = 0;
    169200                       
    170201                        int failsafeWrites = 0;
     
    204235                                    con.getTimeSinceCreated() > 2 * NTCPTransport.ESTABLISH_TIMEOUT) {
    205236                                    if (_log.shouldLog(Log.INFO))
    206                                         _log.info("Invalid key " + con);
     237                                        _log.info("Removing invalid key for " + con);
    207238                                    // this will cancel the key, and it will then be removed from the keyset
    208239                                    con.close();
     
    240271                    }
    241272                }
     273                // Clear the cache if the user changes the setting,
     274                // so we can test the effect.
     275                boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT);
     276                if (_useDirect != newUseDirect) {
     277                    _useDirect = newUseDirect;
     278                    _bufCache.clear();
     279                }
    242280            } catch (RuntimeException re) {
    243281                _log.error("Error in the event pumper", re);
     
    311349                if (read) {
    312350                    //_context.statManager().addRateData("ntcp.read", 1, 0);
    313                     key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
    314351                    processRead(key);
    315352                }
    316353                if (write) {
    317354                    //_context.statManager().addRateData("ntcp.write", 1, 0);
    318                     key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    319355                    processWrite(key);
    320356                }
     357                //if (!(accept || connect || read || write)) {
     358                //    if (_log.shouldLog(Log.INFO))
     359                //        _log.info("key wanted nothing? con: " + key.attachment());
     360                //}
    321361            } catch (CancelledKeyException cke) {
    322362                if (_log.shouldLog(Log.DEBUG))
     
    366406        _selector.wakeup();
    367407    }
    368    
    369     private static final int MIN_BUFS = 5;
    370408
    371409    /**
     
    374412     */
    375413    private static int _numBufs = MIN_BUFS;
    376     private static int __liveBufs = 0;
    377414    private static int __consecutiveExtra;
    378415
     
    380417     *  High-frequency path in thread.
    381418     */
    382     private static ByteBuffer acquireBuf() {
     419    private ByteBuffer acquireBuf() {
    383420        ByteBuffer rv = _bufCache.poll();
    384         if (rv == null) {
    385             rv = ByteBuffer.allocate(BUF_SIZE);
    386             _numBufs = ++__liveBufs;
     421        // discard buffer if _useDirect setting changes
     422        if (rv == null || rv.isDirect() != _useDirect) {
     423            if (_useDirect)
     424                rv = ByteBuffer.allocateDirect(BUF_SIZE);
     425            else
     426                rv = ByteBuffer.allocate(BUF_SIZE);
     427            _numBufs++;
    387428            //if (_log.shouldLog(Log.DEBUG))
    388429            //    _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv);           
     
    396437   
    397438    /**
     439     *  Return a read buffer to the pool.
     440     *  These buffers must be from acquireBuf(), i.e. capacity() == BUF_SIZE.
    398441     *  High-frequency path in thread.
    399442     */
    400     private static void releaseBuf(ByteBuffer buf) {
     443    public static void releaseBuf(ByteBuffer buf) {
    401444        //if (false) return;
    402445        //if (_log.shouldLog(Log.DEBUG))
    403446        //    _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf);
     447
     448        // double check
     449        if (buf.capacity() < BUF_SIZE) {
     450            I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception());
     451            return;
     452        }
    404453        buf.clear();
    405454        int extra = _bufCache.size();
    406455        boolean cached = extra < _numBufs;
    407456
     457        // TODO always offer if direct?
    408458        if (cached) {
    409459            _bufCache.offer(buf);
    410             if (extra > 5) {
     460            if (extra > MIN_BUFS) {
    411461                __consecutiveExtra++;
    412462                if (__consecutiveExtra >= 20) {
    413                     _numBufs = Math.max(_numBufs - 1, MIN_BUFS);
     463                    if (_numBufs > MIN_BUFS)
     464                        _numBufs--;
    414465                    __consecutiveExtra = 0;
    415466                }
    416467            }
    417         } else {
    418             __liveBufs--;
    419468        }
    420469        //if (cached && _log.shouldLog(Log.DEBUG))
     
    466515            boolean connected = chan.finishConnect();
    467516            if (_log.shouldLog(Log.DEBUG))
    468                 _log.debug("processing connect for " + key + " / " + con + ": connected? " + connected);
     517                _log.debug("processing connect for " + con + ": connected? " + connected);
    469518            if (connected) {
    470519                // BUGFIX for firewalls. --Sponge
     
    479528            }
    480529        } catch (IOException ioe) {   // this is the usual failure path for a timeout or connect refused
    481             if (_log.shouldLog(Log.WARN))
    482                 _log.warn("Failed outbound connection to " + con.getRemotePeer().calculateHash(), ioe);
     530            if (_log.shouldLog(Log.INFO))
     531                _log.info("Failed outbound " + con, ioe);
    483532            con.close();
    484533            //_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE);
     
    487536        } catch (NoConnectionPendingException ncpe) {
    488537            // ignore
    489         }
    490     }
    491    
    492     /**
     538            if (_log.shouldLog(Log.WARN))
     539                _log.warn("error connecting on " + con, ncpe);
     540        }
     541    }
     542   
     543    /**
     544     *  OP_READ will always be set before this is called.
     545     *  This method will disable the interest if no more reads remain because of inbound bandwidth throttling.
    493546     *  High-frequency path in thread.
    494547     */
     
    500553            if (read == -1) {
    501554                //if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con);
    502                 _context.statManager().addRateData("ntcp.readEOF", 1);
     555                //_context.statManager().addRateData("ntcp.readEOF", 1);
    503556                con.close();
    504557                releaseBuf(buf);
     
    506559                //if (_log.shouldLog(Log.DEBUG))
    507560                //    _log.debug("nothing to read for " + con + ", but stay interested");
    508                 key.interestOps(key.interestOps() | SelectionKey.OP_READ);
     561                // stay interested
     562                //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
    509563                releaseBuf(buf);
    510564            } else if (read > 0) {
    511                 byte data[] = new byte[read];
     565                // ZERO COPY. The buffer will be returned in Reader.processRead()
    512566                buf.flip();
    513                 buf.get(data);
    514                 releaseBuf(buf);
    515                 buf = null;
    516                 ByteBuffer rbuf = ByteBuffer.wrap(data);
    517567                FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
    518568                if (req.getPendingInboundRequested() > 0) {
     
    522572                    //    _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore");
    523573                    _context.statManager().addRateData("ntcp.queuedRecv", read);
    524                     con.queuedRecv(rbuf, req);
     574                    con.queuedRecv(buf, req);
    525575                } else {
    526576                    // fully allocated
    527577                    //if (_log.shouldLog(Log.DEBUG))
    528578                    //    _log.debug("not bw throttled reading for " + con);
    529                     key.interestOps(key.interestOps() | SelectionKey.OP_READ);
    530                     con.recv(rbuf);
     579                    // stay interested
     580                    //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
     581                    con.recv(buf);
     582                    _context.statManager().addRateData("ntcp.read", read);
    531583                }
    532584            }
    533585        } catch (CancelledKeyException cke) {
    534             if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke);
     586            releaseBuf(buf);
     587            if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke);
    535588            con.close();
    536589            _context.statManager().addRateData("ntcp.readError", 1);
    537             if (buf != null) releaseBuf(buf);
    538590        } catch (IOException ioe) {
    539             if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe);
     591            // common, esp. at outbound connect time
     592            releaseBuf(buf);
     593            if (_log.shouldLog(Log.INFO))
     594                _log.info("error reading on " + con, ioe);
     595            if (con.isEstablished()) {
     596                _context.statManager().addRateData("ntcp.readError", 1);
     597            } else {
     598                // Usually "connection reset by peer", probably a conn limit rejection?
     599                // although it could be a read failure during the DH handshake
     600                // Same stat as in processConnect()
     601                _context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1);
     602                _transport.markUnreachable(con.getRemotePeer().calculateHash());
     603            }
    540604            con.close();
    541             _context.statManager().addRateData("ntcp.readError", 1);
    542             if (buf != null) releaseBuf(buf);
    543605        } catch (NotYetConnectedException nyce) {
     606            releaseBuf(buf);
    544607            // ???
    545         }
    546     }
    547    
    548     /**
     608            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
     609            if (_log.shouldLog(Log.WARN))
     610                _log.warn("error reading on " + con, nyce);
     611        }
     612    }
     613   
     614    /**
     615     *  OP_WRITE will always be set before this is called.
     616     *  This method will disable the interest if no more writes remain.
    549617     *  High-frequency path in thread.
    550618     */
     
    574642                        if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) {
    575643                            //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains...");
    576                             key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
     644                            // stay interested
     645                            //key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    577646                        } else {
    578647                            //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains...");
     648                            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    579649                        }
    580650                        break;
    581651                    } else if (buf.remaining() > 0) {
    582652                        //if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining...");
    583                         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
     653                        // stay interested
     654                        //key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
    584655                        break;
    585656                    } else {
     
    593664                        //buffers++;
    594665                        //if (buffer time is too much, add OP_WRITe to the interest ops and break?)
     666                        // LOOP
    595667                    }
    596668                } else {
     669                    // Nothing more to write
     670                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    597671                    break;
    598672                }
    599673            }
    600674        } catch (CancelledKeyException cke) {
    601             if (_log.shouldLog(Log.WARN)) _log.warn("error writing", cke);
     675            if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, cke);
    602676            _context.statManager().addRateData("ntcp.writeError", 1);
    603677            con.close();
    604678        } catch (IOException ioe) {
    605             if (_log.shouldLog(Log.WARN)) _log.warn("error writing", ioe);
     679            if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, ioe);
    606680            _context.statManager().addRateData("ntcp.writeError", 1);
    607681            con.close();
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    rdc6c568 r1119612  
    166166        _readBufs = new ConcurrentLinkedQueue();
    167167        _writeBufs = new ConcurrentLinkedQueue();
    168         _bwRequests = new ConcurrentHashSet(2);
     168        _bwRequests = new ConcurrentHashSet(8);
    169169        // TODO possible switch to CLQ but beware non-constant size() - see below
    170170        _outbound = new LinkedBlockingQueue();
     
    258258        for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) {
    259259            iter.next().abort();
     260            // we would like to return read ByteBuffers via EventPumper.releaseBuf(),
     261            // but we can't risk releasing it twice
    260262        }
    261263        _bwRequests.clear();
     264
     265        _writeBufs.clear();
     266        ByteBuffer bb;
     267        while ((bb = _readBufs.poll()) != null) {
     268            EventPumper.releaseBuf(bb);
     269        }
    262270
    263271        OutNetMessage msg;
     
    790798    }
    791799
     800    /**
     801     *  The FifoBandwidthLimiter.CompleteListener callback.
     802     *  Does the delayed read or write.
     803     */
    792804    public void complete(FIFOBandwidthLimiter.Request req) {
    793805        removeRequest(req);
    794806        ByteBuffer buf = (ByteBuffer)req.attachment();
    795807        if (req.getTotalInboundRequested() > 0) {
     808            if (_closed) {
     809                EventPumper.releaseBuf(buf);
     810                return;
     811            }
    796812            _context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime()));
    797813            recv(buf);
     
    801817            _transport.getPumper().wantsRead(this);
    802818            //_transport.getReader().wantsRead(this);
    803         } else if (req.getTotalOutboundRequested() > 0) {
     819        } else if (req.getTotalOutboundRequested() > 0 && !_closed) {
    804820            _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime()));
    805821            write(buf);
     
    837853     * The contents of the buffer have been read and can be processed asap.
    838854     * This should not block, and the NTCP connection now owns the buffer
    839      * to do with as it pleases.
     855     * to do with as it pleases BUT it should eventually copy out the data
     856     * and call EventPumper.releaseBuf().
    840857     */
    841858    public void recv(ByteBuffer buf) {
     
    978995     * encoded as "sizeof(data)+data+pad+crc", and those are encrypted
    979996     * with the session key and the last 16 bytes of the previous encrypted
    980      * i2np message.  the contents of the buffer is owned by the EventPumper,
    981      * so data should be copied out.
     997     * i2np message.
     998     *
     999     * The NTCP connection now owns the buffer
     1000     * BUT it must copy out the data
     1001     * as reader will call EventPumper.releaseBuf().
    9821002     */
    9831003    synchronized void recvEncryptedI2NP(ByteBuffer buf) {
     
    10111031    }
    10121032   
    1013     /** _decryptBlockBuf contains another cleartext block of I2NP to parse */
     1033    /**
     1034     *  Append the next 16 bytes of cleartext to the read state.
     1035     *  _decryptBlockBuf contains another cleartext block of I2NP to parse.
     1036     *  Caller must synchronize!
     1037     *  @return success
     1038     */
    10141039    private boolean recvUnencryptedI2NP() {
    10151040        _curReadState.receiveBlock(_decryptBlockBuf);
  • router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java

    rdc6c568 r1119612  
    9090        _context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES);
    9191        _context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES);
    92         _context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES);
     92        //_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES);
    9393        _context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", RATES);
    9494        _context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", RATES);
     
    125125        _context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES);
    126126        _context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES);
    127         //_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
    128         _context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);
     127        _context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);
     128        //_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);
    129129        _context.statManager().createRateStat("ntcp.readError", "", "ntcp", RATES);
    130130        _context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", RATES);
     
    290290        if (addr == null) {
    291291            markUnreachable(peer);
    292             _context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);
     292            //_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);
    293293            //_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE);
    294294            if (_log.shouldLog(Log.DEBUG))
  • router/java/src/net/i2p/router/transport/ntcp/Reader.java

    rdc6c568 r1119612  
    33import java.nio.ByteBuffer;
    44import java.util.ArrayList;
     5import java.util.HashSet;
    56import java.util.List;
     7import java.util.Set;
    68
    79import net.i2p.router.RouterContext;
     
    2022    // TODO change to LBQ ??
    2123    private final List<NTCPConnection> _pendingConnections;
    22     private final List<NTCPConnection> _liveReads;
    23     private final List<NTCPConnection> _readAfterLive;
     24    private final Set<NTCPConnection> _liveReads;
     25    private final Set<NTCPConnection> _readAfterLive;
    2426    private final List<Runner> _runners;
    2527   
     
    2830        _log = ctx.logManager().getLog(getClass());
    2931        _pendingConnections = new ArrayList(16);
    30         _runners = new ArrayList(5);
    31         _liveReads = new ArrayList(5);
    32         _readAfterLive = new ArrayList();
     32        _runners = new ArrayList(8);
     33        _liveReads = new HashSet(8);
     34        _readAfterLive = new HashSet(8);
    3335    }
    3436   
     
    4143        }
    4244    }
     45
    4346    public void stopReading() {
    4447        while (!_runners.isEmpty()) {
     
    5659        synchronized (_pendingConnections) {
    5760            if (_liveReads.contains(con)) {
    58                 if (!_readAfterLive.contains(con)) {
    59                     _readAfterLive.add(con);
    60                 }
     61                _readAfterLive.add(con);
    6162                already = true;
    6263            } else if (!_pendingConnections.contains(con)) {
     
    7980    private class Runner implements Runnable {
    8081        private boolean _stop;
    81         public Runner() { _stop = false; }
     82
     83        public Runner() {}
     84
    8285        public void stop() { _stop = true; }
     86
    8387        public void run() {
    8488            if (_log.shouldLog(Log.INFO)) _log.info("Starting reader");
     
    119123   
    120124    /**
    121      * process everything read
     125     * Process everything read.
     126     * Return read buffers back to the pool as we process them.
    122127     */
    123128    private void processRead(NTCPConnection con) {
     
    130135                _log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
    131136            if (est == null) {
     137                EventPumper.releaseBuf(buf);
    132138                if (!con.isEstablished()) {
    133139                    // establish state is only removed when the connection is fully established,
     
    145151                    _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? "
    146152                               + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")");
     153                EventPumper.releaseBuf(buf);
    147154                break;
    148155            }
    149156            est.receive(buf);
     157            EventPumper.releaseBuf(buf);
    150158            if (est.isCorrupt()) {
    151159                if (_log.shouldLog(Log.WARN))
     
    155163                con.close();
    156164                return;
    157             } else if (buf.remaining() <= 0) {
    158                 // not necessary, getNextReadBuf() removes
    159                 //con.removeReadBuf(buf);
    160165            }
    161166            if (est.isComplete() && est.getExtraBytes() != null)
     
    170175                _log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
    171176            con.recvEncryptedI2NP(buf);
    172             // not necessary, getNextReadBuf() removes
    173             //con.removeReadBuf(buf);
     177            EventPumper.releaseBuf(buf);
    174178        }
    175179    }
Note: See TracChangeset for help on using the changeset viewer.