Opened 10 months ago

Closed 9 months ago

#2243 closed enhancement (fixed)

NTCP read should be in a loop and buffers enlarged

Reported by: zab Owned by: zzz
Priority: minor Milestone: 0.9.36
Component: router/transport Version: 0.9.34
Keywords: ntcp nio Cc:
Parent Tickets:

Description

Currently we do a single read call from a socket channel per invocation of processRead. Instead, we should do it in a loop like this:

http://zerobin.i2p/?7f68f574d1afec57#HULmG3JC6DaoT/0Cs8eCqBNBF4L0pjn92bwIcSQuI74=

(debug statement added for tracing)

It's rare but occasionally there are more than one reads and sometimes they fill up to half of the read buffer. So I believe it's prudent to increase the 8kb read buffer to 16k.

Note that the select() call returns only if there are newly ready SocketChannels ready to be read from, so if we don't fully drain the SocketChannel in a loop, that data just sits there and clogs the connection.

Subtickets

Change History (6)

comment:1 Changed 10 months ago by zab

After running a modified version on my busy router for a few hours, I've observed read bursts as high as 14480 bytes per select() call, and as high as 5 read calls to drain the socket.

comment:2 Changed 10 months ago by zzz

  • Milestone changed from undecided to 0.9.36
  • Owner set to zzz
  • Status changed from new to accepted

Draining the channel sounds like the right thing to do.
What's the effect of not draining it now? Does the failsafe catch it, or does it just sit there until more data comes in?

If we fix this then it doesn't matter what the read buffer size is, and increasing the size would be a separate discussion not really related anymore.

comment:3 Changed 9 months ago by zzz

Proposed patch below, with more changes on top of zerobin from OP:

  • Fix EOF handling
  • Loop to read into additional buffers if we filled the one before, to drain the channel

patch is a little hard to read because of indenting changes...

#
# old_revision [6c6a7520cad38845cee83103c323782b426b3ff5]
#
# patch "router/java/src/net/i2p/router/transport/ntcp/EventPumper.java"
#  from [a7ac861bf187ef81eb29836e29d19595c1a063fa]
#    to [edc37824627dfbe37664fda6534ef5474eb86861]
#
============================================================
--- router/java/src/net/i2p/router/transport/ntcp/EventPumper.java	a7ac861bf187ef81eb29836e29d19595c1a063fa
+++ router/java/src/net/i2p/router/transport/ntcp/EventPumper.java	edc37824627dfbe37664fda6534ef5474eb86861
@@ -613,50 +613,68 @@ class EventPumper implements Runnable {
      */
     private void processRead(SelectionKey key) {
         NTCPConnection con = (NTCPConnection)key.attachment();
-        ByteBuffer buf = acquireBuf();
+        ByteBuffer buf = null;
         try {
-            int read = con.getChannel().read(buf);
-            if (read < 0) {
-                if (con.isInbound() && con.getMessagesReceived() <= 0) {
-                    InetAddress addr = con.getChannel().socket().getInetAddress();
-                    int count;
-                    if (addr != null) {
-                        byte[] ip = addr.getAddress();
-                        ByteArray ba = new ByteArray(ip);
-                        count = _blockedIPs.increment(ba);
-                        if (_log.shouldLog(Log.WARN))
-                            _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
+            while (true) {
+                buf = acquireBuf();
+                int read = 0;
+                int readThisTime;
+                int readCount = 0;
+                while ((readThisTime = con.getChannel().read(buf)) > 0)  {
+                    read += readThisTime;
+                    readCount++;
+                }
+                if (readThisTime < 0 && read == 0)
+                    read = readThisTime;
+                if (_log.shouldDebug())
+                    _log.debug("Read " + read + " bytes total in " + readCount + " times from " + con);
+                if (read < 0) {
+                    if (con.isInbound() && con.getMessagesReceived() <= 0) {
+                        InetAddress addr = con.getChannel().socket().getInetAddress();
+                        int count;
+                        if (addr != null) {
+                            byte[] ip = addr.getAddress();
+                            ByteArray ba = new ByteArray(ip);
+                            count = _blockedIPs.increment(ba);
+                            if (_log.shouldLog(Log.WARN))
+                                _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
+                        } else {
+                            count = 1;
+                            if (_log.shouldLog(Log.WARN))
+                                _log.warn("EOF on inbound before receiving any: " + con);
+                        }
+                        _context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
                     } else {
-                        count = 1;
-                        if (_log.shouldLog(Log.WARN))
-                            _log.warn("EOF on inbound before receiving any: " + con);
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("EOF on " + con);
                     }
-                    _context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
-                } else {
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("EOF on " + con);
+                    con.close();
+                    releaseBuf(buf);
+                    break;
                 }
-                con.close();
-                releaseBuf(buf);
-            } else if (read == 0) {
-                // stay interested
-                //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
-                releaseBuf(buf);
-                // workaround for channel stuck returning 0 all the time, causing 100% CPU
-                int consec = con.gotZeroRead();
-                if (consec >= 5) {
-                    _context.statManager().addRateData("ntcp.zeroReadDrop", 1);
-                    if (_log.shouldLog(Log.WARN))
-                        _log.warn("Fail safe zero read close " + con);
-                    con.close();
-                } else {
-                    _context.statManager().addRateData("ntcp.zeroRead", consec);
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("nothing to read for " + con + ", but stay interested");
+                if (read == 0) {
+                    // stay interested
+                    //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+                    releaseBuf(buf);
+                    // workaround for channel stuck returning 0 all the time, causing 100% CPU
+                    int consec = con.gotZeroRead();
+                    if (consec >= 5) {
+                        _context.statManager().addRateData("ntcp.zeroReadDrop", 1);
+                        if (_log.shouldLog(Log.WARN))
+                            _log.warn("Fail safe zero read close " + con);
+                        con.close();
+                    } else {
+                        _context.statManager().addRateData("ntcp.zeroRead", consec);
+                        if (_log.shouldLog(Log.INFO))
+                            _log.info("nothing to read for " + con + ", but stay interested");
+                    }
+                    break;
                 }
-            } else {
+                // Process the data received
                 // clear counter for workaround above
                 con.clearZeroRead();
+                // go around again if we filled the buffer (so we can read more)
+                boolean keepReading = !buf.hasRemaining();
                 // ZERO COPY. The buffer will be returned in Reader.processRead()
                 buf.flip();
                 FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
@@ -665,21 +683,31 @@ class EventPumper implements Runnable {
                     key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
                     _context.statManager().addRateData("ntcp.queuedRecv", read);
                     con.queuedRecv(buf, req);
+                    break;
                 } else {
                     // stay interested
                     //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
                     con.recv(buf);
                     _context.statManager().addRateData("ntcp.read", read);
+                    if (readThisTime < 0) {
+                        // EOF, we're done
+                        con.close();
+                        break;
+                    }
+                    if (!keepReading)
+                        break;
                 }
-            }
+            }  // while true
         } catch (CancelledKeyException cke) {
-            releaseBuf(buf);
+            if (buf != null)
+                releaseBuf(buf);
             if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke);
             con.close();
             _context.statManager().addRateData("ntcp.readError", 1);
         } catch (IOException ioe) {
             // common, esp. at outbound connect time
-            releaseBuf(buf);
+            if (buf != null)
+                releaseBuf(buf);
             if (con.isInbound() && con.getMessagesReceived() <= 0) {
                 InetAddress addr = con.getChannel().socket().getInetAddress();
                 int count;
@@ -712,7 +740,8 @@ class EventPumper implements Runnable {
             }
             con.close();
         } catch (NotYetConnectedException nyce) {
-            releaseBuf(buf);
+            if (buf != null)
+                releaseBuf(buf);
             // ???
             key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
             if (_log.shouldLog(Log.WARN))

comment:4 Changed 9 months ago by zab

Looks good, except maybe release the buffer in a finally clause. But I'm fine with it the way it is

comment:5 Changed 9 months ago by zab

Actually nvm the finally thing, we don't want to release unless there's been an exception.

comment:6 Changed 9 months ago by zzz

  • Resolution set to fixed
  • Status changed from accepted to closed

In 566b1eeda884669d179397966db673e71775debb to be 0.9.35-6

Note that we're now less sure that the previous code actually introduced much latency or "clogs" (see OP), it's hard to know exactly how the selector works. But we agreed this change is a good thing, regardless.

Note: See TracTickets for help on using tickets.