Changeset 1119612
- Timestamp:
- Nov 21, 2011 6:22:13 PM (9 years ago)
- Branches:
- master
- Children:
- 9d0bafb
- Parents:
- dc6c568
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
history.txt
rdc6c568 r1119612 1 2011-11-21 zzz 2 * NTCP Pumper: 3 - Ensure failsafe pumper code gets run on schedule 4 - Don't copy the read buffers 5 - Adjust minimum read buffers based on memory 6 - New i2np.ntcp.useDirectBuffer option (default false) 7 - Mark peer unreachable when read failure is during establishment 8 - Change some Reader Lists to Sets to avoid linear search 9 - Log tweaks, debugging, new loop stats 10 1 11 2011-11-18 zzz 2 12 * NTCP: -
router/java/src/net/i2p/router/RouterVersion.java
rdc6c568 r1119612 19 19 public final static String ID = "Monotone"; 20 20 public final static String VERSION = CoreVersion.VERSION; 21 public final static long BUILD = 5;21 public final static long BUILD = 6; 22 22 23 23 /** for example "-test" */ -
router/java/src/net/i2p/router/transport/ntcp/EstablishState.java
rdc6c568 r1119612 148 148 /** 149 149 * parse the contents of the buffer as part of the handshake. if the 150 * handshake is completed and there is more data remaining, the buffer is151 * updatedso that the next read will be the (still encrypted) remaining150 * handshake is completed and there is more data remaining, the data are 151 * copieed out so that the next read will be the (still encrypted) remaining 152 152 * data (available from getExtraBytes) 153 * 154 * All data must be copied out of the buffer as Reader.processRead() 155 * will return it to the pool. 153 156 */ 154 157 public void receive(ByteBuffer src) { … … 177 180 * we are Bob, so receive these bytes as part of an inbound connection 178 181 * This method receives messages 1 and 3, and sends messages 2 and 4. 182 * 183 * All data must be copied out of the buffer as Reader.processRead() 184 * will return it to the pool. 179 185 */ 180 186 private void receiveInbound(ByteBuffer src) { … … 341 347 * We are Alice, so receive these bytes as part of an outbound connection. 342 348 * This method receives messages 2 and 4, and sends message 3. 349 * 350 * All data must be copied out of the buffer as Reader.processRead() 351 * will return it to the pool. 343 352 */ 344 353 private void receiveOutbound(ByteBuffer src) { … … 685 694 } 686 695 687 /** anything left over in the byte buffer after verification is extra */ 696 /** Anything left over in the byte buffer after verification is extra 697 * 698 * All data must be copied out of the buffer as Reader.processRead() 699 * will return it to the pool. 700 */ 688 701 private void prepareExtra(ByteBuffer buf) { 689 702 int remaining = buf.remaining(); -
router/java/src/net/i2p/router/transport/ntcp/EventPumper.java
rdc6c568 r1119612 20 20 import java.util.concurrent.LinkedBlockingQueue; 21 21 22 import net.i2p.I2PAppContext; 22 23 import net.i2p.data.RouterIdentity; 23 24 import net.i2p.data.RouterInfo; … … 45 46 private final NTCPTransport _transport; 46 47 private long _expireIdleWriteTime; 47 48 private boolean _useDirect; 49 50 /** 51 * This probably doesn't need to be bigger than the largest typical 52 * message, which is a 5-slot VTBM (~2700 bytes). 53 * The occasional larger message can use multiple buffers. 54 */ 48 55 private static final int BUF_SIZE = 8*1024; 49 56 private static final int MAX_CACHE_SIZE = 64; 50 57 51 58 /** 59 * Read buffers. (write buffers use wrap()) 52 60 * Shared if there are multiple routers in the JVM 61 * Note that if the routers have different PROP_DIRECT settings this will have a mix, 62 * so don't do that. 53 63 */ 54 64 private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE); … … 68 78 private static final long MAX_EXPIRE_IDLE_TIME = 15*60*1000l; 69 79 80 /** 81 * Do we use direct buffers for reading? Default false. 82 * @see java.nio.ByteBuffer 83 */ 84 private static final String PROP_DIRECT = "i2np.ntcp.useDirectBuffers"; 85 86 private static final int MIN_MINB = 4; 87 private static final int MAX_MINB = 12; 88 private static final int MIN_BUFS; 89 static { 90 long maxMemory = Runtime.getRuntime().maxMemory(); 91 if (maxMemory == Long.MAX_VALUE) 92 maxMemory = 96*1024*1024l; 93 MIN_BUFS = (int) Math.max(MIN_MINB, Math.min(MAX_MINB, 1 + (maxMemory / (16*1024*1024)))); 94 } 95 70 96 public EventPumper(RouterContext ctx, NTCPTransport transport) { 71 97 _context = ctx; … … 73 99 _transport = transport; 74 100 _expireIdleWriteTime = MAX_EXPIRE_IDLE_TIME; 101 _context.statManager().createRateStat("ntcp.pumperKeySetSize", "", "ntcp", new long[] {10*60*1000} ); 102 _context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} ); 103 _context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} ); 75 104 } 76 105 … … 131 160 */ 132 161 public void run() { 162 int loopCount = 0; 133 163 long lastFailsafeIteration = System.currentTimeMillis(); 134 164 while (_alive && _selector.isOpen()) { 135 165 try { 166 loopCount++; 136 167 runDelayedEvents(); 137 int count = 0; 168 138 169 try { 139 170 //if (_log.shouldLog(Log.DEBUG)) 140 171 // _log.debug("before select..."); 141 count = _selector.select(SELECTOR_LOOP_DELAY); 172 int count = _selector.select(SELECTOR_LOOP_DELAY); 173 if (count > 0) { 174 //if (_log.shouldLog(Log.DEBUG)) 175 // _log.debug("select returned " + count); 176 Set<SelectionKey> selected = _selector.selectedKeys(); 177 _context.statManager().addRateData("ntcp.pumperKeysPerLoop", selected.size()); 178 processKeys(selected); 179 // does clear() do anything useful? 180 selected.clear(); 181 } 182 } catch (ClosedSelectorException cse) { 183 continue; 142 184 } catch (IOException ioe) { 143 185 if (_log.shouldLog(Log.WARN)) 144 186 _log.warn("Error selecting", ioe); 145 187 } 146 if (count <= 0)147 continue;148 //if (_log.shouldLog(Log.DEBUG))149 // _log.debug("select returned " + count);150 151 Set<SelectionKey> selected;152 try {153 selected = _selector.selectedKeys();154 } catch (ClosedSelectorException cse) {155 continue;156 }157 158 processKeys(selected);159 selected.clear();160 188 161 189 if (lastFailsafeIteration + FAILSAFE_ITERATION_FREQ < System.currentTimeMillis()) { … … 167 195 try { 168 196 Set<SelectionKey> all = _selector.keys(); 197 _context.statManager().addRateData("ntcp.pumperKeySetSize", all.size()); 198 _context.statManager().addRateData("ntcp.pumperLoopsPerSecond", loopCount / (FAILSAFE_ITERATION_FREQ / 1000)); 199 loopCount = 0; 169 200 170 201 int failsafeWrites = 0; … … 204 235 con.getTimeSinceCreated() > 2 * NTCPTransport.ESTABLISH_TIMEOUT) { 205 236 if (_log.shouldLog(Log.INFO)) 206 _log.info(" Invalid key" + con);237 _log.info("Removing invalid key for " + con); 207 238 // this will cancel the key, and it will then be removed from the keyset 208 239 con.close(); … … 240 271 } 241 272 } 273 // Clear the cache if the user changes the setting, 274 // so we can test the effect. 275 boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT); 276 if (_useDirect != newUseDirect) { 277 _useDirect = newUseDirect; 278 _bufCache.clear(); 279 } 242 280 } catch (RuntimeException re) { 243 281 _log.error("Error in the event pumper", re); … … 311 349 if (read) { 312 350 //_context.statManager().addRateData("ntcp.read", 1, 0); 313 key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);314 351 processRead(key); 315 352 } 316 353 if (write) { 317 354 //_context.statManager().addRateData("ntcp.write", 1, 0); 318 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);319 355 processWrite(key); 320 356 } 357 //if (!(accept || connect || read || write)) { 358 // if (_log.shouldLog(Log.INFO)) 359 // _log.info("key wanted nothing? con: " + key.attachment()); 360 //} 321 361 } catch (CancelledKeyException cke) { 322 362 if (_log.shouldLog(Log.DEBUG)) … … 366 406 _selector.wakeup(); 367 407 } 368 369 private static final int MIN_BUFS = 5;370 408 371 409 /** … … 374 412 */ 375 413 private static int _numBufs = MIN_BUFS; 376 private static int __liveBufs = 0;377 414 private static int __consecutiveExtra; 378 415 … … 380 417 * High-frequency path in thread. 381 418 */ 382 private staticByteBuffer acquireBuf() {419 private ByteBuffer acquireBuf() { 383 420 ByteBuffer rv = _bufCache.poll(); 384 if (rv == null) { 385 rv = ByteBuffer.allocate(BUF_SIZE); 386 _numBufs = ++__liveBufs; 421 // discard buffer if _useDirect setting changes 422 if (rv == null || rv.isDirect() != _useDirect) { 423 if (_useDirect) 424 rv = ByteBuffer.allocateDirect(BUF_SIZE); 425 else 426 rv = ByteBuffer.allocate(BUF_SIZE); 427 _numBufs++; 387 428 //if (_log.shouldLog(Log.DEBUG)) 388 429 // _log.debug("creating a new read buffer " + System.identityHashCode(rv) + " with " + __liveBufs + " live: " + rv); … … 396 437 397 438 /** 439 * Return a read buffer to the pool. 440 * These buffers must be from acquireBuf(), i.e. capacity() == BUF_SIZE. 398 441 * High-frequency path in thread. 399 442 */ 400 p rivatestatic void releaseBuf(ByteBuffer buf) {443 public static void releaseBuf(ByteBuffer buf) { 401 444 //if (false) return; 402 445 //if (_log.shouldLog(Log.DEBUG)) 403 446 // _log.debug("releasing read buffer " + System.identityHashCode(buf) + " with " + __liveBufs + " live: " + buf); 447 448 // double check 449 if (buf.capacity() < BUF_SIZE) { 450 I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception()); 451 return; 452 } 404 453 buf.clear(); 405 454 int extra = _bufCache.size(); 406 455 boolean cached = extra < _numBufs; 407 456 457 // TODO always offer if direct? 408 458 if (cached) { 409 459 _bufCache.offer(buf); 410 if (extra > 5) {460 if (extra > MIN_BUFS) { 411 461 __consecutiveExtra++; 412 462 if (__consecutiveExtra >= 20) { 413 _numBufs = Math.max(_numBufs - 1, MIN_BUFS); 463 if (_numBufs > MIN_BUFS) 464 _numBufs--; 414 465 __consecutiveExtra = 0; 415 466 } 416 467 } 417 } else {418 __liveBufs--;419 468 } 420 469 //if (cached && _log.shouldLog(Log.DEBUG)) … … 466 515 boolean connected = chan.finishConnect(); 467 516 if (_log.shouldLog(Log.DEBUG)) 468 _log.debug("processing connect for " + key + " / " +con + ": connected? " + connected);517 _log.debug("processing connect for " + con + ": connected? " + connected); 469 518 if (connected) { 470 519 // BUGFIX for firewalls. --Sponge … … 479 528 } 480 529 } catch (IOException ioe) { // this is the usual failure path for a timeout or connect refused 481 if (_log.shouldLog(Log. WARN))482 _log. warn("Failed outbound connection to " + con.getRemotePeer().calculateHash(), ioe);530 if (_log.shouldLog(Log.INFO)) 531 _log.info("Failed outbound " + con, ioe); 483 532 con.close(); 484 533 //_context.shitlist().shitlistRouter(con.getRemotePeer().calculateHash(), "Error connecting", NTCPTransport.STYLE); … … 487 536 } catch (NoConnectionPendingException ncpe) { 488 537 // ignore 489 } 490 } 491 492 /** 538 if (_log.shouldLog(Log.WARN)) 539 _log.warn("error connecting on " + con, ncpe); 540 } 541 } 542 543 /** 544 * OP_READ will always be set before this is called. 545 * This method will disable the interest if no more reads remain because of inbound bandwidth throttling. 493 546 * High-frequency path in thread. 494 547 */ … … 500 553 if (read == -1) { 501 554 //if (_log.shouldLog(Log.DEBUG)) _log.debug("EOF on " + con); 502 _context.statManager().addRateData("ntcp.readEOF", 1);555 //_context.statManager().addRateData("ntcp.readEOF", 1); 503 556 con.close(); 504 557 releaseBuf(buf); … … 506 559 //if (_log.shouldLog(Log.DEBUG)) 507 560 // _log.debug("nothing to read for " + con + ", but stay interested"); 508 key.interestOps(key.interestOps() | SelectionKey.OP_READ); 561 // stay interested 562 //key.interestOps(key.interestOps() | SelectionKey.OP_READ); 509 563 releaseBuf(buf); 510 564 } else if (read > 0) { 511 byte data[] = new byte[read];565 // ZERO COPY. The buffer will be returned in Reader.processRead() 512 566 buf.flip(); 513 buf.get(data);514 releaseBuf(buf);515 buf = null;516 ByteBuffer rbuf = ByteBuffer.wrap(data);517 567 FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf); 518 568 if (req.getPendingInboundRequested() > 0) { … … 522 572 // _log.debug("bw throttled reading for " + con + ", so we don't want to read anymore"); 523 573 _context.statManager().addRateData("ntcp.queuedRecv", read); 524 con.queuedRecv( rbuf, req);574 con.queuedRecv(buf, req); 525 575 } else { 526 576 // fully allocated 527 577 //if (_log.shouldLog(Log.DEBUG)) 528 578 // _log.debug("not bw throttled reading for " + con); 529 key.interestOps(key.interestOps() | SelectionKey.OP_READ); 530 con.recv(rbuf); 579 // stay interested 580 //key.interestOps(key.interestOps() | SelectionKey.OP_READ); 581 con.recv(buf); 582 _context.statManager().addRateData("ntcp.read", read); 531 583 } 532 584 } 533 585 } catch (CancelledKeyException cke) { 534 if (_log.shouldLog(Log.WARN)) _log.warn("error reading", cke); 586 releaseBuf(buf); 587 if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke); 535 588 con.close(); 536 589 _context.statManager().addRateData("ntcp.readError", 1); 537 if (buf != null) releaseBuf(buf);538 590 } catch (IOException ioe) { 539 if (_log.shouldLog(Log.WARN)) _log.warn("error reading", ioe); 591 // common, esp. at outbound connect time 592 releaseBuf(buf); 593 if (_log.shouldLog(Log.INFO)) 594 _log.info("error reading on " + con, ioe); 595 if (con.isEstablished()) { 596 _context.statManager().addRateData("ntcp.readError", 1); 597 } else { 598 // Usually "connection reset by peer", probably a conn limit rejection? 599 // although it could be a read failure during the DH handshake 600 // Same stat as in processConnect() 601 _context.statManager().addRateData("ntcp.connectFailedTimeoutIOE", 1); 602 _transport.markUnreachable(con.getRemotePeer().calculateHash()); 603 } 540 604 con.close(); 541 _context.statManager().addRateData("ntcp.readError", 1);542 if (buf != null) releaseBuf(buf);543 605 } catch (NotYetConnectedException nyce) { 606 releaseBuf(buf); 544 607 // ??? 545 } 546 } 547 548 /** 608 key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); 609 if (_log.shouldLog(Log.WARN)) 610 _log.warn("error reading on " + con, nyce); 611 } 612 } 613 614 /** 615 * OP_WRITE will always be set before this is called. 616 * This method will disable the interest if no more writes remain. 549 617 * High-frequency path in thread. 550 618 */ … … 574 642 if ( (buf.remaining() > 0) || (!con.isWriteBufEmpty()) ) { 575 643 //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, but data remains..."); 576 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 644 // stay interested 645 //key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 577 646 } else { 578 647 //if (_log.shouldLog(Log.DEBUG)) _log.debug("done writing, no data remains..."); 648 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 579 649 } 580 650 break; 581 651 } else if (buf.remaining() > 0) { 582 652 //if (_log.shouldLog(Log.DEBUG)) _log.debug("buffer data remaining..."); 583 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 653 // stay interested 654 //key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); 584 655 break; 585 656 } else { … … 593 664 //buffers++; 594 665 //if (buffer time is too much, add OP_WRITe to the interest ops and break?) 666 // LOOP 595 667 } 596 668 } else { 669 // Nothing more to write 670 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 597 671 break; 598 672 } 599 673 } 600 674 } catch (CancelledKeyException cke) { 601 if (_log.shouldLog(Log.WARN)) _log.warn("error writing ", cke);675 if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, cke); 602 676 _context.statManager().addRateData("ntcp.writeError", 1); 603 677 con.close(); 604 678 } catch (IOException ioe) { 605 if (_log.shouldLog(Log.WARN)) _log.warn("error writing ", ioe);679 if (_log.shouldLog(Log.WARN)) _log.warn("error writing on " + con, ioe); 606 680 _context.statManager().addRateData("ntcp.writeError", 1); 607 681 con.close(); -
router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java
rdc6c568 r1119612 166 166 _readBufs = new ConcurrentLinkedQueue(); 167 167 _writeBufs = new ConcurrentLinkedQueue(); 168 _bwRequests = new ConcurrentHashSet( 2);168 _bwRequests = new ConcurrentHashSet(8); 169 169 // TODO possible switch to CLQ but beware non-constant size() - see below 170 170 _outbound = new LinkedBlockingQueue(); … … 258 258 for (Iterator<FIFOBandwidthLimiter.Request> iter = _bwRequests.iterator(); iter.hasNext(); ) { 259 259 iter.next().abort(); 260 // we would like to return read ByteBuffers via EventPumper.releaseBuf(), 261 // but we can't risk releasing it twice 260 262 } 261 263 _bwRequests.clear(); 264 265 _writeBufs.clear(); 266 ByteBuffer bb; 267 while ((bb = _readBufs.poll()) != null) { 268 EventPumper.releaseBuf(bb); 269 } 262 270 263 271 OutNetMessage msg; … … 790 798 } 791 799 800 /** 801 * The FifoBandwidthLimiter.CompleteListener callback. 802 * Does the delayed read or write. 803 */ 792 804 public void complete(FIFOBandwidthLimiter.Request req) { 793 805 removeRequest(req); 794 806 ByteBuffer buf = (ByteBuffer)req.attachment(); 795 807 if (req.getTotalInboundRequested() > 0) { 808 if (_closed) { 809 EventPumper.releaseBuf(buf); 810 return; 811 } 796 812 _context.statManager().addRateData("ntcp.throttledReadComplete", (System.currentTimeMillis()-req.getRequestTime())); 797 813 recv(buf); … … 801 817 _transport.getPumper().wantsRead(this); 802 818 //_transport.getReader().wantsRead(this); 803 } else if (req.getTotalOutboundRequested() > 0 ) {819 } else if (req.getTotalOutboundRequested() > 0 && !_closed) { 804 820 _context.statManager().addRateData("ntcp.throttledWriteComplete", (System.currentTimeMillis()-req.getRequestTime())); 805 821 write(buf); … … 837 853 * The contents of the buffer have been read and can be processed asap. 838 854 * This should not block, and the NTCP connection now owns the buffer 839 * to do with as it pleases. 855 * to do with as it pleases BUT it should eventually copy out the data 856 * and call EventPumper.releaseBuf(). 840 857 */ 841 858 public void recv(ByteBuffer buf) { … … 978 995 * encoded as "sizeof(data)+data+pad+crc", and those are encrypted 979 996 * with the session key and the last 16 bytes of the previous encrypted 980 * i2np message. the contents of the buffer is owned by the EventPumper, 981 * so data should be copied out. 997 * i2np message. 998 * 999 * The NTCP connection now owns the buffer 1000 * BUT it must copy out the data 1001 * as reader will call EventPumper.releaseBuf(). 982 1002 */ 983 1003 synchronized void recvEncryptedI2NP(ByteBuffer buf) { … … 1011 1031 } 1012 1032 1013 /** _decryptBlockBuf contains another cleartext block of I2NP to parse */ 1033 /** 1034 * Append the next 16 bytes of cleartext to the read state. 1035 * _decryptBlockBuf contains another cleartext block of I2NP to parse. 1036 * Caller must synchronize! 1037 * @return success 1038 */ 1014 1039 private boolean recvUnencryptedI2NP() { 1015 1040 _curReadState.receiveBlock(_decryptBlockBuf); -
router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java
rdc6c568 r1119612 90 90 _context.statManager().createRateStat("ntcp.connectFailedInvalidPort", "", "ntcp", RATES); 91 91 _context.statManager().createRateStat("ntcp.bidRejectedLocalAddress", "", "ntcp", RATES); 92 _context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES);92 //_context.statManager().createRateStat("ntcp.bidRejectedNoNTCPAddress", "", "ntcp", RATES); 93 93 _context.statManager().createRateStat("ntcp.connectFailedTimeout", "", "ntcp", RATES); 94 94 _context.statManager().createRateStat("ntcp.connectFailedTimeoutIOE", "", "ntcp", RATES); … … 125 125 _context.statManager().createRateStat("ntcp.noBidTooLargeI2NP", "send size", "ntcp", RATES); 126 126 _context.statManager().createRateStat("ntcp.queuedRecv", "", "ntcp", RATES); 127 //_context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES);128 _context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES);127 _context.statManager().createRateStat("ntcp.read", "", "ntcp", RATES); 128 //_context.statManager().createRateStat("ntcp.readEOF", "", "ntcp", RATES); 129 129 _context.statManager().createRateStat("ntcp.readError", "", "ntcp", RATES); 130 130 _context.statManager().createRateStat("ntcp.receiveCorruptEstablishment", "", "ntcp", RATES); … … 290 290 if (addr == null) { 291 291 markUnreachable(peer); 292 _context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1);292 //_context.statManager().addRateData("ntcp.bidRejectedNoNTCPAddress", 1); 293 293 //_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "No NTCP address", STYLE); 294 294 if (_log.shouldLog(Log.DEBUG)) -
router/java/src/net/i2p/router/transport/ntcp/Reader.java
rdc6c568 r1119612 3 3 import java.nio.ByteBuffer; 4 4 import java.util.ArrayList; 5 import java.util.HashSet; 5 6 import java.util.List; 7 import java.util.Set; 6 8 7 9 import net.i2p.router.RouterContext; … … 20 22 // TODO change to LBQ ?? 21 23 private final List<NTCPConnection> _pendingConnections; 22 private final List<NTCPConnection> _liveReads;23 private final List<NTCPConnection> _readAfterLive;24 private final Set<NTCPConnection> _liveReads; 25 private final Set<NTCPConnection> _readAfterLive; 24 26 private final List<Runner> _runners; 25 27 … … 28 30 _log = ctx.logManager().getLog(getClass()); 29 31 _pendingConnections = new ArrayList(16); 30 _runners = new ArrayList( 5);31 _liveReads = new ArrayList(5);32 _readAfterLive = new ArrayList();32 _runners = new ArrayList(8); 33 _liveReads = new HashSet(8); 34 _readAfterLive = new HashSet(8); 33 35 } 34 36 … … 41 43 } 42 44 } 45 43 46 public void stopReading() { 44 47 while (!_runners.isEmpty()) { … … 56 59 synchronized (_pendingConnections) { 57 60 if (_liveReads.contains(con)) { 58 if (!_readAfterLive.contains(con)) { 59 _readAfterLive.add(con); 60 } 61 _readAfterLive.add(con); 61 62 already = true; 62 63 } else if (!_pendingConnections.contains(con)) { … … 79 80 private class Runner implements Runnable { 80 81 private boolean _stop; 81 public Runner() { _stop = false; } 82 83 public Runner() {} 84 82 85 public void stop() { _stop = true; } 86 83 87 public void run() { 84 88 if (_log.shouldLog(Log.INFO)) _log.info("Starting reader"); … … 119 123 120 124 /** 121 * process everything read 125 * Process everything read. 126 * Return read buffers back to the pool as we process them. 122 127 */ 123 128 private void processRead(NTCPConnection con) { … … 130 135 _log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]"); 131 136 if (est == null) { 137 EventPumper.releaseBuf(buf); 132 138 if (!con.isEstablished()) { 133 139 // establish state is only removed when the connection is fully established, … … 145 151 _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? " 146 152 + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")"); 153 EventPumper.releaseBuf(buf); 147 154 break; 148 155 } 149 156 est.receive(buf); 157 EventPumper.releaseBuf(buf); 150 158 if (est.isCorrupt()) { 151 159 if (_log.shouldLog(Log.WARN)) … … 155 163 con.close(); 156 164 return; 157 } else if (buf.remaining() <= 0) {158 // not necessary, getNextReadBuf() removes159 //con.removeReadBuf(buf);160 165 } 161 166 if (est.isComplete() && est.getExtraBytes() != null) … … 170 175 _log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)"); 171 176 con.recvEncryptedI2NP(buf); 172 // not necessary, getNextReadBuf() removes 173 //con.removeReadBuf(buf); 177 EventPumper.releaseBuf(buf); 174 178 } 175 179 }
Note: See TracChangeset
for help on using the changeset viewer.