Opened 2 months ago

Last modified 6 weeks ago

#2620 new enhancement

NTCP reader: parallize operation

Reported by: jogger Owned by: zzz
Priority: minor Milestone: undecided
Component: router/transport Version: 0.9.42
Keywords: Cc:
Parent Tickets: Sensitive: no

Description

NTCP reader has the same unnecessary notify()´s as the writer. It is already an improvement to just apply the same changes.

However here we have the same situation as with #2617. The reader is just too slow to fill the pumper queue. The inbound code provided in #2432 comment 11 just works for the UDP message receiver, the liveReads logic here makes NTCP reader run single threaded, causing all messages to trickle one at a time through the router. Takes ages to get a 16k torrent piece to the local client.

Here the opposite is needed: if a read is in play, prioritize the same con for reading on another reader (ideally up to the number of processors). This way we have a chance to really get a bunch of messages to pump(). Load tested patch as follows:

--- i2p-0.9.42/router/java/src/net/i2p/router/transport/ntcp/Reader.java	2019-08-27 14:34:07.000000000 +0200
+++ 42dev/router/java/src/net/i2p/router/transport/ntcp/Reader.java	2019-09-12 09:07:08.000000000 +0200
@@ -7,6 +7,8 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 
 import net.i2p.router.RouterContext;
 import net.i2p.util.I2PThread;
@@ -23,17 +25,15 @@
     private final Log _log;
     // TODO change to LBQ ??
     private final Set<NTCPConnection> _pendingConnections;
-    private final Set<NTCPConnection> _liveReads;
-    private final Set<NTCPConnection> _readAfterLive;
+    private final Map<NTCPConnection, Integer> _liveReads;
     private final List<Runner> _runners;
     
     public Reader(RouterContext ctx) {
         _context = ctx;
         _log = ctx.logManager().getLog(getClass());
-        _pendingConnections = new LinkedHashSet<NTCPConnection>(16);
+        _pendingConnections = new HashSet<NTCPConnection>(16);
         _runners = new ArrayList<Runner>(8);
-        _liveReads = new HashSet<NTCPConnection>(8);
-        _readAfterLive = new HashSet<NTCPConnection>(8);
+        _liveReads = new HashMap<NTCPConnection, Integer>(8);
     }
     
     public synchronized void startReading(int numReaders) {
@@ -51,21 +51,17 @@
             r.stop();
         }
         synchronized (_pendingConnections) {
-            _readAfterLive.clear();
+            _liveReads.clear();
+            _pendingConnections.clear();
             _pendingConnections.notifyAll();
         }
     }
     
     public void wantsRead(NTCPConnection con) {
-        boolean already = false;
+        boolean already;
         synchronized (_pendingConnections) {
-            if (_liveReads.contains(con)) {
-                _readAfterLive.add(con);
-                already = true;
-            } else {
+            already = _liveReads.containsKey(con);
                 _pendingConnections.add(con);
-                // only notify here if added?
-            }
             _pendingConnections.notify();
         }
         if (_log.shouldLog(Log.DEBUG))
@@ -74,10 +70,8 @@
 
     public void connectionClosed(NTCPConnection con) {
         synchronized (_pendingConnections) {
-            _readAfterLive.remove(con);
+            _liveReads.remove(con);
             _pendingConnections.remove(con);
-            // necessary?
-            _pendingConnections.notify();
         }
     }
     
@@ -94,25 +88,30 @@
             while (!_stop) {
                 try {
                     synchronized (_pendingConnections) {
-                        boolean keepReading = (con != null) && _readAfterLive.remove(con);
-                        if (keepReading) {
-                            // keep on reading the same one
-                        } else {
-                            if (con != null) {
+                        Integer num = _liveReads.get(con);
+                        if (num != null) {
+                            int val = num.intValue();
+                            if (val <= 1)
                                 _liveReads.remove(con);
-                                con = null;
+                            else
+                                _liveReads.replace(con, Integer.valueOf(val - 1));
                             }
-                            if (_pendingConnections.isEmpty()) {
-                                _pendingConnections.wait();
-                            } else {
+                        con = null;
                                 Iterator<NTCPConnection> iter = _pendingConnections.iterator();
+                        while (iter.hasNext()) {
                                 con = iter.next();
-                                iter.remove();
-                                _liveReads.add(con);
-                            }
+                            num = _liveReads.get(con);
+                            if (num != null)
+                                break;
                         }
+                        if (con == null)
+                            _pendingConnections.wait();
+                        else
+                            _liveReads.put(con, Integer.valueOf(num == null ? 1 : num.intValue() + 1));
+                        _pendingConnections.remove(con);
                     }
                 } catch (InterruptedException ie) {}
+
                 if (!_stop && (con != null) ) {
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("begin read for " + con);

Subtickets

Change History (1)

comment:1 Changed 6 weeks ago by zzz

Patch from #2619 comment 4 (addresses OP 1st sentence) in e1163891c6249ea2bdd626fec783c1f88ad462fa 0.9.42-11-rc
Leaving ticket open for evaluation of the other changes.

Note: See TracTickets for help on using tickets.