Opened 16 months ago
Last modified 15 months ago
#2620 new enhancement
NTCP reader: parallize operation
Reported by: | jogger | Owned by: | zzz |
---|---|---|---|
Priority: | minor | Milestone: | undecided |
Component: | router/transport | Version: | 0.9.42 |
Keywords: | Cc: | ||
Parent Tickets: | Sensitive: | no |
Description
NTCP reader has the same unnecessary notify()´s as the writer. It is already an improvement to just apply the same changes.
However here we have the same situation as with #2617. The reader is just too slow to fill the pumper queue. The inbound code provided in #2432 comment 11 just works for the UDP message receiver, the liveReads logic here makes NTCP reader run single threaded, causing all messages to trickle one at a time through the router. Takes ages to get a 16k torrent piece to the local client.
Here the opposite is needed: if a read is in play, prioritize the same con for reading on another reader (ideally up to the number of processors). This way we have a chance to really get a bunch of messages to pump(). Load tested patch as follows:
--- i2p-0.9.42/router/java/src/net/i2p/router/transport/ntcp/Reader.java 2019-08-27 14:34:07.000000000 +0200 +++ 42dev/router/java/src/net/i2p/router/transport/ntcp/Reader.java 2019-09-12 09:07:08.000000000 +0200 @@ -7,6 +7,8 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.Map; +import java.util.HashMap; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; @@ -23,17 +25,15 @@ private final Log _log; // TODO change to LBQ ?? private final Set<NTCPConnection> _pendingConnections; - private final Set<NTCPConnection> _liveReads; - private final Set<NTCPConnection> _readAfterLive; + private final Map<NTCPConnection, Integer> _liveReads; private final List<Runner> _runners; public Reader(RouterContext ctx) { _context = ctx; _log = ctx.logManager().getLog(getClass()); - _pendingConnections = new LinkedHashSet<NTCPConnection>(16); + _pendingConnections = new HashSet<NTCPConnection>(16); _runners = new ArrayList<Runner>(8); - _liveReads = new HashSet<NTCPConnection>(8); - _readAfterLive = new HashSet<NTCPConnection>(8); + _liveReads = new HashMap<NTCPConnection, Integer>(8); } public synchronized void startReading(int numReaders) { @@ -51,21 +51,17 @@ r.stop(); } synchronized (_pendingConnections) { - _readAfterLive.clear(); + _liveReads.clear(); + _pendingConnections.clear(); _pendingConnections.notifyAll(); } } public void wantsRead(NTCPConnection con) { - boolean already = false; + boolean already; synchronized (_pendingConnections) { - if (_liveReads.contains(con)) { - _readAfterLive.add(con); - already = true; - } else { + already = _liveReads.containsKey(con); _pendingConnections.add(con); - // only notify here if added? - } _pendingConnections.notify(); } if (_log.shouldLog(Log.DEBUG)) @@ -74,10 +70,8 @@ public void connectionClosed(NTCPConnection con) { synchronized (_pendingConnections) { - _readAfterLive.remove(con); + _liveReads.remove(con); _pendingConnections.remove(con); - // necessary? - _pendingConnections.notify(); } } @@ -94,25 +88,30 @@ while (!_stop) { try { synchronized (_pendingConnections) { - boolean keepReading = (con != null) && _readAfterLive.remove(con); - if (keepReading) { - // keep on reading the same one - } else { - if (con != null) { + Integer num = _liveReads.get(con); + if (num != null) { + int val = num.intValue(); + if (val <= 1) _liveReads.remove(con); - con = null; + else + _liveReads.replace(con, Integer.valueOf(val - 1)); } - if (_pendingConnections.isEmpty()) { - _pendingConnections.wait(); - } else { + con = null; Iterator<NTCPConnection> iter = _pendingConnections.iterator(); + while (iter.hasNext()) { con = iter.next(); - iter.remove(); - _liveReads.add(con); - } + num = _liveReads.get(con); + if (num != null) + break; } + if (con == null) + _pendingConnections.wait(); + else + _liveReads.put(con, Integer.valueOf(num == null ? 1 : num.intValue() + 1)); + _pendingConnections.remove(con); } } catch (InterruptedException ie) {} + if (!_stop && (con != null) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("begin read for " + con);
Patch from #2619 comment 4 (addresses OP 1st sentence) in e1163891c6249ea2bdd626fec783c1f88ad462fa 0.9.42-11-rc
Leaving ticket open for evaluation of the other changes.