Opened 6 days ago

Last modified 2 days ago

#2619 new defect

NCTP Writer: Do not run dry

Reported by: jogger Owned by: zzz
Priority: minor Milestone: undecided
Component: router/transport Version: 0.9.42
Keywords: Cc:
Parent Tickets: Sensitive: no

Description

Writer.java has a comment in wantsWrite() that highlights the bug. If a con is already transmitting the next write to the same con wakes up another instance falling into wait() immediately again.

Putting the notify() there where the comment is, solves the issue. NTCP Writer then runs less frequently, with a longer average time slice. CPU usage clearly down. A bit hard to read, using only two sets would make the code much more readable.

Subtickets

Change History (3)

comment:1 Changed 5 days ago by zzz

this would be easier if you supplied a patch, but I think you want to move the notify() call from line 67 to 65?

I don't understand your "two sets" comment or whether it's related at all to the notify() thing.

comment:2 Changed 4 days ago by jogger

Yes, that´s line 65 and there is another unnecessary notify() in connectionClosed(). Patch below. I was referring to the fact that having three sets for just four states (inactive, pending, active or both) makes code hard to read and that might be the reason nobody finally resolved this. So I removed one of them.

A reason should also be given why to retain the livewrites logic as it partially defeats multithreading. NTCP Writer runs for a much shorter time than the minimum linux time slice so looping over the same con avoids context switching and cache invalidation. Also under load another thread would spend about as much time waiting on the processor run queue before running as the current writer needs to process the packet.

--- i2p-0.9.42/router/java/src/net/i2p/router/transport/ntcp/Writer.java
+++ 42dev/router/java/src/net/i2p/router/transport/ntcp/Writer.java
@@ -21,17 +21,15 @@
     private final Log _log;
     private final Set<NTCPConnection> _pendingConnections;
     private final Set<NTCPConnection> _liveWrites;
-    private final Set<NTCPConnection> _writeAfterLive;
     private final List<Runner> _runners;
-    
+
     public Writer(RouterContext ctx) {
         _log = ctx.logManager().getLog(getClass());
         _pendingConnections = new LinkedHashSet<NTCPConnection>(16);
         _runners = new ArrayList<Runner>(5);
         _liveWrites = new HashSet<NTCPConnection>(5);
-        _writeAfterLive = new HashSet<NTCPConnection>(5);
     }
-    
+
     public synchronized void startWriting(int numWriters) {
         for (int i = 1; i <=numWriters; i++) {
             Runner r = new Runner();
@@ -46,25 +44,22 @@
             r.stop();
         }
         synchronized (_pendingConnections) {
-            _writeAfterLive.clear();
+            _liveWrites.clear();
+            _pendingConnections.clear();
             _pendingConnections.notifyAll();
         }
     }
-    
+
     public void wantsWrite(NTCPConnection con, String source) {
         //if (con.getCurrentOutbound() != null)
         //    throw new RuntimeException("Current outbound message already in play on " + con);
-        boolean already = false;
-        boolean pending = false;
+        boolean already;
+        boolean pending;
         synchronized (_pendingConnections) {
-            if (_liveWrites.contains(con)) {
-                _writeAfterLive.add(con);
-                already = true;
-            } else {
-                pending = _pendingConnections.add(con);
-                // only notify here if added?
-            }
-            _pendingConnections.notify();
+            already = _liveWrites.contains(con);
+            pending = _pendingConnections.add(con);
+            if (!already)
+                _pendingConnections.notify();
         }
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("wantsWrite: " + con + " already live? " + already + " added to pending? " + pending + ": " + source);
@@ -72,18 +67,15 @@
 
     public void connectionClosed(NTCPConnection con) {
         synchronized (_pendingConnections) {
-            _writeAfterLive.remove(con);
+            _liveWrites.remove(con);
             _pendingConnections.remove(con);
-            // necessary?
-            _pendingConnections.notify();
         }
     }
-    
+
     private class Runner implements Runnable {
-        
+
         /** a scratch space to serialize and encrypt messages */
         private final NTCPConnection.PrepBuffer _prepBuffer;
-        
+
         private volatile boolean _stop;
 
         public Runner() {
@@ -98,27 +90,31 @@
             while (!_stop) {
                 try {
                     synchronized (_pendingConnections) {
-                        boolean keepWriting = (con != null) && _writeAfterLive.remove(con);
+                        _liveWrites.remove(con);
+                        boolean keepWriting = (con != null) && _pendingConnections.contains(con);
                         if (keepWriting) {
                             // keep on writing the same one
                             if (_log.shouldLog(Log.DEBUG))
                                 _log.debug("Keep writing on the same connection: " + con);
                         } else {
-                            _liveWrites.remove(con);
                             con = null;
                             if (_pendingConnections.isEmpty()) {
-                                if (_log.shouldLog(Log.DEBUG))
-                                    _log.debug("Done writing, but nothing pending, so wait");
-                                _pendingConnections.wait();
-                            } else {
-                                Iterator<NTCPConnection> iter = _pendingConnections.iterator();
+                               if (_log.shouldLog(Log.DEBUG))
+                                   _log.debug("Done writing, but nothing pending, so wait");
+                               _pendingConnections.wait();
+                            }
+                            Iterator<NTCPConnection> iter = _pendingConnections.iterator();
+                            while (iter.hasNext()) {
                                 con = iter.next();
-                                iter.remove();
-                                _liveWrites.add(con);
-                                if (_log.shouldLog(Log.DEBUG))
-                                    _log.debug("Switch to writing on: " + con);
+                                if (_liveWrites.add(con)) {
+                                    if (_log.shouldLog(Log.DEBUG))
+                                        _log.debug("Switch to writing on: " + con);
+                                    break;
+                                }
+                                con = null;
                             }
                         }
+                        _pendingConnections.remove(con);
                     }
                 } catch (InterruptedException ie) {}
                 if (!_stop && (con != null)) {

comment:3 Changed 2 days ago by zzz

I'm testing moving the notify (only) in Writer and Reader. Haven't looked closely at the other changes yet.

related: #2620

Note: See TracTickets for help on using tickets.