Opened 10 months ago

Last modified 3 months ago

#2432 new enhancement

Overhaul Tunnel GW Pumper

Reported by: jogger Owned by: zzz
Priority: minor Milestone: undecided
Component: router/general Version: 0.9.38
Keywords: Cc: Zlatin Balevsky
Parent Tickets: #2617 Sensitive: no

Description

This thing has confusing flow control with a weird poison logic that gets never executed because the "_stop" control variable rules it all. For readabilitys sake this should be cleaned up.

Then are two To-Do´s:

  • # of threads
  • prio inbound vs. outbound

I have run T GW pumper with one thread only since more than a year now without a hitch. The requeue logic is executed less than once per hour, the average queue length when entering the pump() is less then 0.15 on a loaded ARM32. This confirms one thread is sufficient.

Then I came up with a queue prio logic for inbound vs. outbound. With outbound prio seeding torrents was faster. With inbound prio surfing eepsites felt snappier.

My suggestion: We fix this thing to two threads. If there are inbound as well as outbound GWs in the queue, one serves inbound first, the other outbound first. This way we cater to everybodys needs.

If nobody objects I will code that.

Subtickets

Change History (21)

comment:1 Changed 10 months ago by Zlatin Balevsky

Cc: Zlatin Balevsky added

comment:2 Changed 10 months ago by jogger

OK, have run this thing some time, absolutely satisfied. Feels faster.

Since this is much shorter and 90% recoded I provide the source with a supporting diff to PumpedTunnelGateway?.java. This way it is easier to read. diff no problem.

TunnelGatewayPumper?.java

package net.i2p.router.tunnel;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SystemVersion;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Run through the tunnel gateways that have had messages added to them and push
 * those messages through the preprocessing and sending process.
 *
 * Starts two threads: one for inbound and outbound priority each.
 */
class TunnelGatewayPumper implements Runnable {
    private final RouterContext _context;
    private final Set<PumpedTunnelGateway> _outbound;
    private final Set<PumpedTunnelGateway> _inbound;
    private final Set<PumpedTunnelGateway> _backlogged;
    private volatile boolean _stop;
    private AtomicBoolean _inboundFirst = new AtomicBoolean(false);

    /**
     *  Wait just a little, but this lets the pumper queue back up.
     *  See additional comments in PTG.
     */
    private static final long REQUEUE_TIME = 50;

    /** Creates a new instance of TunnelGatewayPumper */
    public TunnelGatewayPumper(RouterContext ctx) {
        _context = ctx;
        _outbound = new LinkedHashSet<PumpedTunnelGateway>(16);
        _inbound = new LinkedHashSet<PumpedTunnelGateway>(16);
        _backlogged = new HashSet<PumpedTunnelGateway>(16);
        for (int i = 1; i <= ((ctx.getBooleanProperty("i2p.dummyTunnelManager")) ? 1 : 2); i++)
            new I2PThread(this, "Tunnel GW pumper " + i, true).start();
    }

    public void stopPumping() {
        _stop = true;
        synchronized (_outbound) {
            _outbound.notifyAll();
        }
    }

    public void wantsPumping(PumpedTunnelGateway gw) {
         synchronized (_outbound) { // used reentrant
             if (!_outbound.contains(gw) && !_inbound.contains(gw) && !_backlogged.contains(gw)) {
                 if (gw._isInbound)
                    _inbound.add(gw);
                 else
                     _outbound.add(gw);
                _outbound.notify();
            }
        }
    }

    public void run() {
        boolean inboundFirst = _inboundFirst.getAndSet(true);
        PumpedTunnelGateway gw = null;
        List<PendingGatewayMessage> queueBuf = new ArrayList<PendingGatewayMessage>(32);
        boolean requeue = false;
        while (!_stop) {
            try {
                synchronized (_outbound) {
                    if (requeue) { // usually happens less than 1 / hour
                        // in case another packet came in
                        _outbound.remove(gw);
                        _inbound.remove(gw);
                        if (_backlogged.add(gw))  // if not necessary, synchronized
                            _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
                    }
                    while (_outbound.isEmpty() && _inbound.isEmpty()) { // spurios wakeup
                        _outbound.wait();
                        if (_stop)
                            return;
                    }
                    Iterator<PumpedTunnelGateway> iter = (_outbound.isEmpty() ||
                        inboundFirst && !_inbound.isEmpty()) ? _inbound.iterator() : _outbound.iterator();
                    gw = iter.next();
                    iter.remove();
                }
            } catch (InterruptedException ie) {}
            requeue = gw.pump(queueBuf); // if single thread: average queue length before this < 0.15 on busy router
        }
    }

    private class Requeue implements SimpleTimer.TimedEvent {
        private final PumpedTunnelGateway _ptg;

        public Requeue(PumpedTunnelGateway ptg) {
            _ptg = ptg;
        }

        public void timeReached() {
            synchronized (_outbound) {
                _backlogged.remove(_ptg);
                wantsPumping(_ptg);
            }
        }
    }
}

PumpedTunnelGateway?.java

--- "PumpedTunnelGateway orig.java"	2019-02-08 18:36:54.149434258 +0100
+++ "PumpedTunnelGateway patch.java"	2019-02-08 18:35:59.461682571 +0100
@@ -25,11 +25,11 @@
  *     messages or instruct the TunnelGateway to offer it the messages again in
  *     a short while (in an attempt to coallesce them).
  * <li>when the QueueProcessor accepts a TunnelGateway.Pending, it preprocesses
- *     it into fragments, forwarding each preprocessed fragment group through 
+ *     it into fragments, forwarding each preprocessed fragment group through
  *     the Sender.</li>
- * <li>the Sender then encrypts the preprocessed data and delivers it to the 
+ * <li>the Sender then encrypts the preprocessed data and delivers it to the
  *     Receiver.</li>
- * <li>the Receiver now has the encrypted message and may do with it as it 
+ * <li>the Receiver now has the encrypted message and may do with it as it
  *     pleases (e.g. wrap it as necessary and enqueue it onto the OutNetMessagePool,
  *     or if debugging, verify that it can be decrypted properly)</li>
  * </ol>
@@ -38,9 +38,9 @@
 class PumpedTunnelGateway extends TunnelGateway {
     private final BlockingQueue<PendingGatewayMessage> _prequeue;
     private final TunnelGatewayPumper _pumper;
-    private final boolean _isInbound;
+    public final boolean _isInbound;
     private final Hash _nextHop;
-    
+
     /**
      *  warning - these limit total messages per second throughput due to
      *  requeue delay in TunnelGatewayPumper to max * 1000 / REQUEUE_TIME
@@ -53,9 +53,9 @@
     /**
      * @param preprocessor this pulls Pending messages off a list, builds some
      *                     full preprocessed messages, and pumps those into the sender
-     * @param sender this takes a preprocessed message, encrypts it, and sends it to 
+     * @param sender this takes a preprocessed message, encrypts it, and sends it to
      *               the receiver
-     * @param receiver this receives the encrypted message and forwards it off 
+     * @param receiver this receives the encrypted message and forwards it off
      *                 to the first hop
      */
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -68,20 +68,15 @@
             _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = false;
-        } else if (receiver != null) {  // extended by ThrottledPTG for IB
+        } else {  // extended by ThrottledPTG for IB
             // Bounded non-priority queue for inbound
             _prequeue = new CoDelBlockingQueue<PendingGatewayMessage>(context, "IBGW", MAX_IB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = true;
-        } else {
-            // Poison PTG
-            _prequeue = null;
-            _nextHop = null;
-            _isInbound = true;
         }
         _pumper = pumper;
     }
-    
+
     /**
      * Add a message to be sent down the tunnel, either sending it now (perhaps
      * coallesced with other pending messages) or after a brief pause (_flushFrequency).
@@ -138,7 +133,7 @@
             return false;
         boolean rv = !_prequeue.isEmpty();
 
-        long startAdd = System.currentTimeMillis();
+        long startAdd = _context.clock().now();
         long beforeLock = startAdd;
         long afterAdded = -1;
         boolean delayedFlush = false;
@@ -148,15 +143,15 @@
         long afterExpire = 0;
         synchronized (_queue) {
             _queue.addAll(queueBuf);
-            afterAdded = System.currentTimeMillis();
+            afterAdded = _context.clock().now();
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Added before direct flush preprocessing for " + toString() + ": " + _queue);
             delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
-            afterPreprocess = System.currentTimeMillis();
+            afterPreprocess = _context.clock().now();
             if (delayedFlush)
                 delayAmount = _preprocessor.getDelayAmount();
             _lastFlush = _context.clock().now();
-            
+
             // expire any as necessary, even if its framented
             for (int i = 0; i < _queue.size(); i++) {
                 PendingGatewayMessage m = _queue.get(i);
@@ -167,18 +162,18 @@
                     i--;
                 }
             }
-            afterExpire = System.currentTimeMillis();
+            afterExpire = _context.clock().now();
             remaining = _queue.size();
             if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
                 _log.debug("Remaining after preprocessing: " + _queue);
         }
-        
+
         if (delayedFlush) {
             _delayedFlush.reschedule(delayAmount);
         }
         //_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
         if (_log.shouldLog(Log.DEBUG)) {
-            long complete = System.currentTimeMillis();
+            long complete = _context.clock().now();
             _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
                        + " delayed? " + delayedFlush + " remaining: " + remaining
                        + " add: " + (afterAdded-beforeLock)
@@ -192,5 +187,5 @@
                       " IB? " + _isInbound + " backlogged? " + backlogged);
         return rv;
     }
-    
+
 }

comment:3 Changed 10 months ago by zzz

Component: router/transportrouter/general

comment:4 Changed 4 months ago by jogger

Sensitive: unset

Discussion here was quiet. Nobody obviously likes the double priority pumping. I then propose to fix # of threads to 0, like is done with the directdispatch for participating traffic. Gives a much higher degree of concurrency, as we distribute the load over all those readers and receivers.

--- i2p-0.9.41/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
+++ 41p/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
@@ -6,30 +6,19 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
+import net.i2p.util.ConcurrentHashSet;
+import net.i2p.util.TryCache;
 import net.i2p.router.RouterContext;
-import net.i2p.util.I2PThread;
 import net.i2p.util.SimpleTimer;
-import net.i2p.util.SystemVersion;
 
 /**
- * Run through the tunnel gateways that have had messages added to them and push
- * those messages through the preprocessing and sending process.
- *
- * TODO do we need this many threads?
- * TODO this combines IBGWs and OBGWs, do we wish to separate the two
- * and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
- */
-class TunnelGatewayPumper implements Runnable {
+ * straight pumping
+**/
+
+class TunnelGatewayPumper {
     private final RouterContext _context;
-    private final Set<PumpedTunnelGateway> _wantsPumping;
     private final Set<PumpedTunnelGateway> _backlogged;
-    private final List<Thread> _threads;
     private volatile boolean _stop;
-    private static final int MIN_PUMPERS = 1;
-    private static final int MAX_PUMPERS = 4;
-    private final int _pumpers;
 
     /**
      *  Wait just a little, but this lets the pumper queue back up.
@@ -37,89 +26,34 @@
      */
     private static final long REQUEUE_TIME = 50;
     
+    private static final TryCache<List<PendingGatewayMessage>> _bufferCache = new TryCache<>(new BufferFactory(), 16);
+
+    private static class BufferFactory implements TryCache.ObjectFactory<List<PendingGatewayMessage>> {
+        public List<PendingGatewayMessage> newInstance() {
+            return new ArrayList<PendingGatewayMessage>(32);
+        }
+    }
+
     /** Creates a new instance of TunnelGatewayPumper */
     public TunnelGatewayPumper(RouterContext ctx) {
         _context = ctx;
-        _wantsPumping = new LinkedHashSet<PumpedTunnelGateway>(16);
-        _backlogged = new HashSet<PumpedTunnelGateway>(16);
-        _threads = new CopyOnWriteArrayList<Thread>();
-        if (ctx.getBooleanProperty("i2p.dummyTunnelManager")) {
-            _pumpers = 1;
-        } else {
-            long maxMemory = SystemVersion.getMaxMemory();
-            _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024))));
-        }
-        for (int i = 0; i < _pumpers; i++) {
-            Thread t = new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true);
-            _threads.add(t);
-            t.start();
-        }
+        _backlogged = new ConcurrentHashSet<PumpedTunnelGateway>(16);
     }
 
     public void stopPumping() {
-        _stop=true;
-        _wantsPumping.clear();
-        for (int i = 0; i < _pumpers; i++) {
-            PumpedTunnelGateway poison = new PoisonPTG(_context);
-            wantsPumping(poison);
-        }
-        for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
-            try {
-                Thread.sleep(i * 50);
-            } catch (InterruptedException ie) {}
-        }
-        for (Thread t : _threads) {
-            t.interrupt();
-        }
-        _threads.clear();
-        _wantsPumping.clear();
+        _stop = true;
     }
     
     public void wantsPumping(PumpedTunnelGateway gw) {
-        if (!_stop) {
-            synchronized (_wantsPumping) {
-                if ((!_backlogged.contains(gw)) && _wantsPumping.add(gw))
-                    _wantsPumping.notify();
+        if (!_backlogged.contains(gw) && !_stop) {
+            List<PendingGatewayMessage> queueBuf = _bufferCache.acquire();
+            synchronized (gw) {
+                if (gw.pump(queueBuf)) {
+                    _backlogged.add(gw);
+                    _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
+                }
             }
-        }
-    }
-    
-    public void run() {
-        try {
-            run2();
-        } finally {
-            _threads.remove(Thread.currentThread());
-        }
-    }
-
-    private void run2() {
-        PumpedTunnelGateway gw = null;
-        List<PendingGatewayMessage> queueBuf = new ArrayList<PendingGatewayMessage>(32);
-        boolean requeue = false;
-        while (!_stop) {
-            try {
-                synchronized (_wantsPumping) {
-                    if (requeue && gw != null) {
-                        // in case another packet came in
-                        _wantsPumping.remove(gw);
-                        if (_backlogged.add(gw))
-                            _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
-                    }
-                    gw = null;
-                    if (_wantsPumping.isEmpty()) {
-                        _wantsPumping.wait();
-                    } else {
-                        Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
-                        gw = iter.next();
-                        iter.remove();
-                    }
-                }
-            } catch (InterruptedException ie) {}
-            if (gw != null) {
-                if (gw.getMessagesSent() == POISON_PTG)
-                    break;
-                requeue = gw.pump(queueBuf);
-            }
+            _bufferCache.release(queueBuf);
         }
     }
 
@@ -131,23 +65,8 @@
         }
 
         public void timeReached() {
-            synchronized (_wantsPumping) {
-                _backlogged.remove(_ptg);
-                if (_wantsPumping.add(_ptg))
-                    _wantsPumping.notify();
-            }
+            _backlogged.remove(_ptg);
+            wantsPumping(_ptg);
         }
     }
-
-
-    private static final int POISON_PTG = -99999;
-
-    private static class PoisonPTG extends PumpedTunnelGateway {
-        public PoisonPTG(RouterContext ctx) {
-            super(ctx, null, null, null, null);
-        }
-
-        @Override
-        public int getMessagesSent() { return POISON_PTG; }
-    }
 }
--- i2p-0.9.41/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
+++ 41p/router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java
@@ -68,15 +68,10 @@
             _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = false;
-        } else if (receiver != null) {  // extended by ThrottledPTG for IB
+        } else {  // extended by ThrottledPTG for IB
             // Bounded non-priority queue for inbound
             _prequeue = new CoDelBlockingQueue<PendingGatewayMessage>(context, "IBGW", MAX_IB_QUEUE);
             _nextHop = receiver.getSendTo();
-            _isInbound = true;
-        } else {
-            // Poison PTG
-            _prequeue = null;
-            _nextHop = null;
             _isInbound = true;
         }
         _pumper = pumper;
@@ -138,7 +133,7 @@
             return false;
         boolean rv = !_prequeue.isEmpty();
 
-        long startAdd = System.currentTimeMillis();
+        long startAdd = _context.clock().now();
         long beforeLock = startAdd;
         long afterAdded = -1;
         boolean delayedFlush = false;
@@ -148,11 +143,11 @@
         long afterExpire = 0;
         synchronized (_queue) {
             _queue.addAll(queueBuf);
-            afterAdded = System.currentTimeMillis();
+            afterAdded = _context.clock().now();
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Added before direct flush preprocessing for " + toString() + ": " + _queue);
             delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
-            afterPreprocess = System.currentTimeMillis();
+            afterPreprocess = _context.clock().now();
             if (delayedFlush)
                 delayAmount = _preprocessor.getDelayAmount();
             _lastFlush = _context.clock().now();
@@ -167,7 +162,7 @@
                     i--;
                 }
             }
-            afterExpire = System.currentTimeMillis();
+            afterExpire = _context.clock().now();
             remaining = _queue.size();
             if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
                 _log.debug("Remaining after preprocessing: " + _queue);
@@ -178,7 +173,7 @@
         }
         //_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
         if (_log.shouldLog(Log.DEBUG)) {
-            long complete = System.currentTimeMillis();
+            long complete = _context.clock().now();
             _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
                        + " delayed? " + delayedFlush + " remaining: " + remaining
                        + " add: " + (afterAdded-beforeLock)

comment:5 Changed 4 months ago by zzz

I don't think you can assume anything about what people like or don't like, it doesn't look like anybody has looked at this at all. However I was testing the first patch in the Feb-March 2019 timeframe, but I can't find any notes about results.

The TGWP javadocs contain notes from me in 2012 questioning the thread count and possibly separating into OB and IB, as you note in the OP. While you've removed those docs in your 2nd patch, the issue is not resolved.

I think what we need is some statement of what problem we're trying to solve, other than just that the code looks ugly. If the separate threads are a source of latency and the changes make things "snappier" then we should be able to measure that, and that will help us prioritize this ticket.

comment:6 Changed 4 months ago by jogger

OK, I will try to make it clearer, what comment 4 does:

Elimination of this thread pool eliminates the associated context switching and other overhead.
So the NTCP/UDP/I2CP readers are doing the same direct dispatching for local traffic that happens for participating traffic. As a result we are parallizing more than before.
I thought it would be a no-brainer that this must be faster.
Will be difficult to measure since I2CP readers can disappear any time. But maybe worth a try.

comment:7 Changed 4 months ago by jogger

Fortunately I did not need to measure. Found through code analysis a bug I would classify as major where all T GW Pumper threads can be temporarily locked up by a single fast tunnel. Easiest to see when one compares with the livewrites logic of NTCP Writer. Lockup occurs within pump() over _queue.

Comment 4 solves this for the single threaded I2CP readers and eases it a bit for the multithreaded inbound receivers. I am already testing a "livepumps" enhancement to comment 4 and will publish here once finished.

comment:8 Changed 4 months ago by zzz

I just made some changes to PTG to only do the timing calls for log.shouldDebug(), so that should speed things up a little.

I glanced at what the timers were doing and TunnelGateway?$DelayedFlush? is a heavy user of the timers. I wonder if we could do something with that to reduce the timer usage.

As far as removing the pumper threads, the important questions are whether the "pumping" part can block, how long does it take, what threads this work would end up in if we got rid of the pumpers, and what's the impact of them running longer or blocking.

comment:9 Changed 4 months ago by zzz

There are definitely some cleanups that could be done here. But I don't think removing all the threads and queues is a good idea. The point of having a queue is to allow some batching, so that the preprocessor can efficiently pack I2NP message fragments into tunnel messages. By short-circuiting all of that, the preprocessor never has more than one message to process, preventing efficient packing. This could dramatically reduce the bandwidth efficiency of the tunnels. So I'm inclined to NAK all of this.

We just changed the batch time in #2586 and we can do more experiments to tune things further, and especially to measure the efficiency - but the basic structure is sound.

comment:10 Changed 4 months ago by jogger

I am currently long-time testing current code against possible improvements in my testbed.

On average 300 KBps local traffic outbound I see 1.2 context switches plus >300 µs spent idle in the processor run queue per pump. Including overhead this amounts to well above 2 ms in/out latency that I am targeting.

As for inbound the code in comment 4 changes nothing and does not defeat batching as long as the inbound receivers are multithreaded, but moves the logic to the inbound readers without context switching. As the average queue is .15 before the pump() for a single pumper thread (comment 2), there is no point discussing 1 or 2 messages.

For the use case of 4 consecutive messages for the same tunnel current code as well as comment 4 work like this:
Pumper 1 pumps 1 msg
pumper 2 blocks
pumper 3 blocks
pumper 4 blocks - entire pumper stalled
pumper 1 finishes
pumper 2 pumps 3 msg in a batch and finishes
pumper 3+4 pump on empty queue

This is why comment 2 with 2 threads was already an improvement. This is also why I suggested a "livepumps" logic in comment 7. This will make the first caller of pump() loop and also batch all pending pumps. All other callers will return immediately - no context switch. The basic structure is not touched by just executing the same logic from a different thread. Patch below.

As for outbound zzz has a point. Comment 4 lets the single I2CP receivers loop read-pump-read-pump. I would have been willing to trade that CPU for other savings. Now consider the use case of 2 tunnels with 4 consecutive messages each. For maximum batching - lowest CPU a single pumper thread would be best, pumping the first message and then batches of 3 and 4 afterwards. Patch below.

I have this mod running quite some time and total thread activations are down from 315/sec for the clumsy logic illustrated above to 65/sec for the outbound only thread on 30% higher outbound local traffic. The patch below thus pumps an average of 6.5K per pump outbound and consumes 15% CPU per 500 kBps on ARM32. That is enough headroom and really good batching.

comment:11 Changed 4 months ago by jogger

--- i2p-0.9.41/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
+++ 41p/router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java
@@ -6,30 +6,24 @@
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
+import net.i2p.util.ConcurrentHashSet;
+import net.i2p.util.TryCache;
 import net.i2p.router.RouterContext;
 import net.i2p.util.I2PThread;
 import net.i2p.util.SimpleTimer;
-import net.i2p.util.SystemVersion;
 
 /**
- * Run through the tunnel gateways that have had messages added to them and push
- * those messages through the preprocessing and sending process.
- *
- * TODO do we need this many threads?
- * TODO this combines IBGWs and OBGWs, do we wish to separate the two
- * and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
- */
+ * straight pumping for multithreaded inbound receivers
+ * queueing for outbound I2CP receivers
+**/
+
 class TunnelGatewayPumper implements Runnable {
     private final RouterContext _context;
-    private final Set<PumpedTunnelGateway> _wantsPumping;
     private final Set<PumpedTunnelGateway> _backlogged;
-    private final List<Thread> _threads;
+    private final Set<PumpedTunnelGateway> _livepumps;
+    private final Set<PumpedTunnelGateway> _inbound;
+    private final Set<PumpedTunnelGateway> _outbound;
     private volatile boolean _stop;
-    private static final int MIN_PUMPERS = 1;
-    private static final int MAX_PUMPERS = 4;
-    private final int _pumpers;
 
     /**
      *  Wait just a little, but this lets the pumper queue back up.
@@ -37,89 +31,89 @@
      */
     private static final long REQUEUE_TIME = 50;
     
+    private static final TryCache<List<PendingGatewayMessage>> _bufferCache = new TryCache<>(new BufferFactory(), 16);
+
+    private static class BufferFactory implements TryCache.ObjectFactory<List<PendingGatewayMessage>> {
+        public List<PendingGatewayMessage> newInstance() {
+            return new ArrayList<PendingGatewayMessage>(32);
+        }
+    }
+
     /** Creates a new instance of TunnelGatewayPumper */
     public TunnelGatewayPumper(RouterContext ctx) {
         _context = ctx;
-        _wantsPumping = new LinkedHashSet<PumpedTunnelGateway>(16);
-        _backlogged = new HashSet<PumpedTunnelGateway>(16);
-        _threads = new CopyOnWriteArrayList<Thread>();
-        if (ctx.getBooleanProperty("i2p.dummyTunnelManager")) {
-            _pumpers = 1;
-        } else {
-            long maxMemory = SystemVersion.getMaxMemory();
-            _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024))));
+        _backlogged = new ConcurrentHashSet<PumpedTunnelGateway>(16);
+        _livepumps = new ConcurrentHashSet<PumpedTunnelGateway>(16);
+        _inbound = new ConcurrentHashSet<PumpedTunnelGateway>(16);
+        _outbound = new LinkedHashSet<PumpedTunnelGateway>(16);
+        new I2PThread(this, "Tunnel GW pumper ", true).start();
+    }
+
+    public void stopPumping() {
+        _stop = true;
+        synchronized (_outbound) {
+            _outbound.notify();
         }
-        for (int i = 0; i < _pumpers; i++) {
-            Thread t = new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true);
-            _threads.add(t);
-            t.start();
+        _backlogged.clear();
+        _livepumps.clear();
+        _inbound.clear();
+        _outbound.clear();
+    }
+    
+    public void wantsPumping(PumpedTunnelGateway gw) {
+        if (!_backlogged.contains(gw) && !_stop) {
+            if (gw._isInbound) {
+                if (_inbound.add(gw)) { // not queued up already
+                    // in the extremely unlikely case of a race
+                    // we will have an additional empty pump() blocking shortly
+                    // not as expensive as complicated logic here every time
+                    if (!_livepumps.add(gw)) // let others return early
+                        return; // somebody else working already
+                    List<PendingGatewayMessage> queueBuf = _bufferCache.acquire();
+                    while (_inbound.remove(gw) && !_stop) {
+                        _livepumps.add(gw);
+                        if (gw.pump(queueBuf)) { // extremely unlikely chance of race, pump() will block
+                            _backlogged.add(gw);
+                            _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
+                        }
+                        _livepumps.remove(gw); // _inbound added first, removed last.
+                    }
+                    _bufferCache.release(queueBuf);
+                }
+            } else {
+                 synchronized (_outbound) { // used reentrant
+                     if (_outbound.add(gw))
+                         _outbound.notify();
+                }
+            }
         }
     }
 
-    public void stopPumping() {
-        _stop=true;
-        _wantsPumping.clear();
-        for (int i = 0; i < _pumpers; i++) {
-            PumpedTunnelGateway poison = new PoisonPTG(_context);
-            wantsPumping(poison);
-        }
-        for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
-            try {
-                Thread.sleep(i * 50);
-            } catch (InterruptedException ie) {}
-        }
-        for (Thread t : _threads) {
-            t.interrupt();
-        }
-        _threads.clear();
-        _wantsPumping.clear();
-    }
-    
-    public void wantsPumping(PumpedTunnelGateway gw) {
-        if (!_stop) {
-            synchronized (_wantsPumping) {
-                if ((!_backlogged.contains(gw)) && _wantsPumping.add(gw))
-                    _wantsPumping.notify();
-            }
-        }
-    }
-    
-    public void run() {
-        try {
-            run2();
-        } finally {
-            _threads.remove(Thread.currentThread());
-        }
-    }
-
-    private void run2() {
+   public void run() {
+        // this also needs a livepumps logic if it were multi-threaded
         PumpedTunnelGateway gw = null;
-        List<PendingGatewayMessage> queueBuf = new ArrayList<PendingGatewayMessage>(32);
+        List<PendingGatewayMessage> queueBuf = _bufferCache.acquire();
         boolean requeue = false;
         while (!_stop) {
             try {
-                synchronized (_wantsPumping) {
-                    if (requeue && gw != null) {
+                synchronized (_outbound) {
+                    if (requeue) { // usually happens less than 1 / hour
                         // in case another packet came in
-                        _wantsPumping.remove(gw);
-                        if (_backlogged.add(gw))
-                            _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
+                        _outbound.remove(gw);
+                        _backlogged.add(gw);
+                        _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
                     }
-                    gw = null;
-                    if (_wantsPumping.isEmpty()) {
-                        _wantsPumping.wait();
-                    } else {
-                        Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
-                        gw = iter.next();
-                        iter.remove();
+                    while (_outbound.isEmpty()) { // spurios wakeup
+                        _outbound.wait();
+                        if (_stop)
+                            return;
                     }
+                    Iterator<PumpedTunnelGateway> iter = _outbound.iterator();
+                    gw = iter.next();
+                    iter.remove();
                 }
             } catch (InterruptedException ie) {}
-            if (gw != null) {
-                if (gw.getMessagesSent() == POISON_PTG)
-                    break;
-                requeue = gw.pump(queueBuf);
-            }
+            requeue = gw.pump(queueBuf); // if single thread: average queue length before this < 0.15 on busy router
         }
     }
 
@@ -131,23 +125,8 @@
         }
 
         public void timeReached() {
-            synchronized (_wantsPumping) {
-                _backlogged.remove(_ptg);
-                if (_wantsPumping.add(_ptg))
-                    _wantsPumping.notify();
-            }
+            _backlogged.remove(_ptg);
+            wantsPumping(_ptg);
         }
     }
-
-
-    private static final int POISON_PTG = -99999;
-
-    private static class PoisonPTG extends PumpedTunnelGateway {
-        public PoisonPTG(RouterContext ctx) {
-            super(ctx, null, null, null, null);
-        }
-
-        @Override
-        public int getMessagesSent() { return POISON_PTG; }
-    }
 }

comment:12 Changed 3 months ago by jogger

Following up on the discussion for #2617, here is a non-blocking shorter version eliminating the thread that has a livepumps logic and batches together messages for in- and outbound. The PTG patch above still applies.

package net.i2p.router.tunnel;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.TryCache;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.SimpleTimer;

/**
 * straight pumping for inbound and outbound I2CP receivers
**/

class TunnelGatewayPumper {
    private final RouterContext _context;
    private final Set<PumpedTunnelGateway> _backlogged;
    private final Set<PumpedTunnelGateway> _livepumps;
    private final Set<PumpedTunnelGateway> _wantsPumping;
    private volatile boolean _stop;

    /**
     *  Wait just a little, but this lets the pumper queue back up.
     *  See additional comments in PTG.
     */
    private static final long REQUEUE_TIME = 50;

    private static final TryCache<List<PendingGatewayMessage>> _bufferCache = new TryCache<>(new BufferFactory(), 16);

    private static class BufferFactory implements TryCache.ObjectFactory<List<PendingGatewayMessage>> {
        public List<PendingGatewayMessage> newInstance() {
            return new ArrayList<PendingGatewayMessage>(32);
        }
    }

    /** Creates a new instance of TunnelGatewayPumper */
    public TunnelGatewayPumper(RouterContext ctx) {
        _context = ctx;
        _backlogged = new ConcurrentHashSet<PumpedTunnelGateway>(16);
        _livepumps = new HashSet<PumpedTunnelGateway>(16);
        _wantsPumping = new HashSet<PumpedTunnelGateway>(16);
    }

    public void stopPumping() {
        _stop = true;
        _backlogged.clear();
        synchronized (_wantsPumping) {
            _livepumps.clear();
            _wantsPumping.clear();
        }
    }

    public void wantsPumping(PumpedTunnelGateway gw) {
        if (_backlogged.contains(gw) || _stop)
            return;
        synchronized (_wantsPumping) {
            if (!_wantsPumping.add(gw) || !_livepumps.add(gw))
                return; // let others return early if somebody else working already
            _wantsPumping.remove(gw);
        }
        boolean keepPumping = true;
        List<PendingGatewayMessage> queueBuf = _bufferCache.acquire();
        while (keepPumping && !_stop) {
            if (gw.pump(queueBuf)) {  // once per hour
                _backlogged.add(gw);
                _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
                synchronized (_wantsPumping) {
                    _wantsPumping.remove(gw);
                }
            }
            synchronized (_wantsPumping) {
                if (!(keepPumping = _wantsPumping.remove(gw)))
                    _livepumps.remove(gw);
            }
        }
        _bufferCache.release(queueBuf);
    }

    private class Requeue implements SimpleTimer.TimedEvent {
        private final PumpedTunnelGateway _ptg;

        public Requeue(PumpedTunnelGateway ptg) {
            _ptg = ptg;
        }

        public void timeReached() {
            _backlogged.remove(_ptg);
            wantsPumping(_ptg);
        }
    }
}

comment:13 Changed 3 months ago by zzz

To clarify, your current proposal is the PTG patch from the 2nd half of comment 4, together with the TGP replacement in comment 12?

comment:14 Changed 3 months ago by jogger

Yes, correct.

comment:15 Changed 3 months ago by Zlatin Balevsky

Ok, if someone puts together a *single* cumulative patch for this ticket + #2617 I will benchmark it. I'm afraid to pick and choose from different comments and tickets as chances are I'll get it wrong.

comment:16 Changed 3 months ago by zzz

Parent Tickets: 2617

As discussed in #2617, that ticket depends on this one.

comment:17 Changed 3 months ago by Zlatin Balevsky

Tested the combined patch that jogger sent me by email, don't see any real difference in throughput from the "patched" column in the spreadsheet attached to #2617.

My test is arguably very limited and open to a lot of criticism. I would prefer if my results are not taken into consideration when deciding whether to merge these changes to trunk or not.

comment:18 Changed 3 months ago by jogger

@zab: I am afraid you did not test my patches. If you are eepget-ting from an in-JVM connection then you execute the following unchanged code from ClientManager?.java :

            //_ctx.jobQueue().addJob(j);
            j.runJob();

So no parallization. One should look into changing this as well to see whether the local apps can fill the queue fast enough for this.

I clearly talked about the I2CP Readers and the respective code, not the internal ones. The test scenario I referred to included average 500 KBps torrent out from 6 I2CP Readers through > 200 tunnels. There I get the batching and reduction of context switches.

From a user perspective this change causes the local traffic not to be choked down under higher load with 1000s of participating tunnels with their direct dispatch while local traffic trickles one-at-a-time through multiple threads.

comment:19 Changed 3 months ago by Zlatin Balevsky

My setup is :

Alice runs the built-in Jetty instance and hosts a file
Bob run eepget for that file

Bob is definitely going through I2CP reader because eepget is run in a separate JVM. I don't know about Alice, that is probably in-JVM, but then so would be a i2psnark in most cases.

I'll change ClientManager? and see if that makes a difference.

comment:20 Changed 3 months ago by Zlatin Balevsky

Ok, with the change to parallelize the internal I2CP handler the throughput dropped even more drastically, by about 50%. Here are the 20 readings from eepget:

 083.26
 054.57
 049.45
 059.55
 067.70
 041.87
 053.95
 056.95
 064.81
 040.40
 048.99
 043.35
 060.76
 046.51
 062.62
 048.01
 043.09
 040.09
 051.26
 043.38

And here is the full patch against monotone that I'm testing:

#
# old_revision [906b285eec98c87136f87ae590b8ca41c6da9a4c]
#
# patch "router/java/src/net/i2p/router/ClientMessagePool.java"
#  from [917ef181d8aa052bc151900a9a40d1aad31f97f9]
#    to [4113fd89de739ba217365ecd0099e929cf1832b2]
# 
# patch "router/java/src/net/i2p/router/JobQueue.java"
#  from [bdc45c30d66a1269fb4c28c6fa8cf35a55e4d6c6]
#    to [800a4b43d12f2d2a2dc816e40b74ab1868fcfb58]
# 
# patch "router/java/src/net/i2p/router/client/ClientManager.java"
#  from [464d0797f3292ede2eab427e71295938ab026fbb]
#    to [2ccd4daadc98a6550cea52248422e3e806df172d]
# 
# patch "router/java/src/net/i2p/router/networkdb/reseed/ReseedChecker.java"
#  from [b2f49c2cef1bfef6da22c4b3fac423731c7ed49d]
#    to [29336b429c3f102deeefd431de6e9c622c77e855]
# 
# patch "router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java"
#  from [1c32139248fa7c48523f091113298951a8519c8c]
#    to [bcdf76db3826d18650c76942a252bd5e88faf8d2]
# 
# patch "router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java"
#  from [71a49516c47f296061b7767c57a84a551549f805]
#    to [eef9e6e5db2ab586a7eb9b1b4edc642cec4455ee]
#
============================================================
--- router/java/src/net/i2p/router/ClientMessagePool.java	917ef181d8aa052bc151900a9a40d1aad31f97f9
+++ router/java/src/net/i2p/router/ClientMessagePool.java	4113fd89de739ba217365ecd0099e929cf1832b2
@@ -73,7 +73,7 @@ public class ClientMessagePool {
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Adding message for remote delivery");
             OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, _cache, msg);
-            if (true) // blocks the I2CP reader for a nontrivial period of time
+            if (false) // blocks the I2CP reader for a nontrivial period of time
                 j.runJob();
             else
                 _context.jobQueue().addJob(j);
============================================================
--- router/java/src/net/i2p/router/JobQueue.java	bdc45c30d66a1269fb4c28c6fa8cf35a55e4d6c6
+++ router/java/src/net/i2p/router/JobQueue.java	800a4b43d12f2d2a2dc816e40b74ab1868fcfb58
@@ -68,11 +68,11 @@ public class JobQueue {
     static {
         long maxMemory = SystemVersion.getMaxMemory();
         if (maxMemory < 64*1024*1024)
-            RUNNERS = 3;
+            RUNNERS = 8;
         else if (maxMemory < 256*1024*1024)
-            RUNNERS = 4;
+            RUNNERS = 8;
         else
-            RUNNERS = 5;
+            RUNNERS = 8;
     }
 
     /** default max # job queue runners operating */
============================================================
--- router/java/src/net/i2p/router/client/ClientManager.java	464d0797f3292ede2eab427e71295938ab026fbb
+++ router/java/src/net/i2p/router/client/ClientManager.java	2ccd4daadc98a6550cea52248422e3e806df172d
@@ -443,8 +443,8 @@ class ClientManager {
             }
             // run this inline so we don't clog up the job queue
             Job j = new DistributeLocal(toDest, runner, sender, fromDest, payload, msgId, messageNonce);
-            //_ctx.jobQueue().addJob(j);
-            j.runJob();
+            _ctx.jobQueue().addJob(j);
+            //j.runJob();
         } else if (!_metaDests.isEmpty() && _metaDests.contains(toDest)) {
             // meta dests don't have runners but are local, and you can't send to them
             ClientConnectionRunner sender = getRunner(fromDest);
============================================================
--- router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java	1c32139248fa7c48523f091113298951a8519c8c
+++ router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java	bcdf76db3826d18650c76942a252bd5e88faf8d2
@@ -68,16 +68,11 @@ class PumpedTunnelGateway extends Tunnel
             _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = false;
-        } else if (receiver != null) {  // extended by ThrottledPTG for IB
+        } else {  // extended by ThrottledPTG for IB
             // Bounded non-priority queue for inbound
             _prequeue = new CoDelBlockingQueue<PendingGatewayMessage>(context, "IBGW", MAX_IB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = true;
-        } else {
-            // Poison PTG
-            _prequeue = null;
-            _nextHop = null;
-            _isInbound = true;
         }
         _pumper = pumper;
     }
============================================================
--- router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java	71a49516c47f296061b7767c57a84a551549f805
+++ router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java	eef9e6e5db2ab586a7eb9b1b4edc642cec4455ee
@@ -6,121 +6,78 @@ import java.util.Set;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
+import net.i2p.util.ConcurrentHashSet;
+import net.i2p.util.TryCache;
 import net.i2p.router.RouterContext;
 import net.i2p.util.I2PThread;
 import net.i2p.util.SimpleTimer;
-import net.i2p.util.SystemVersion;
 
 /**
- * Run through the tunnel gateways that have had messages added to them and push
- * those messages through the preprocessing and sending process.
- *
- * TODO do we need this many threads?
- * TODO this combines IBGWs and OBGWs, do we wish to separate the two
- * and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
- */
-class TunnelGatewayPumper implements Runnable {
+ * straight pumping for inbound and outbound I2CP receivers
+**/
+
+class TunnelGatewayPumper {
     private final RouterContext _context;
-    private final Set<PumpedTunnelGateway> _wantsPumping;
     private final Set<PumpedTunnelGateway> _backlogged;
-    private final List<Thread> _threads;
+    private final Set<PumpedTunnelGateway> _livepumps;
+    private final Set<PumpedTunnelGateway> _wantsPumping;
     private volatile boolean _stop;
-    private static final int MIN_PUMPERS = 1;
-    private static final int MAX_PUMPERS = 4;
-    private final int _pumpers;
 
     /**
      *  Wait just a little, but this lets the pumper queue back up.
      *  See additional comments in PTG.
      */
     private static final long REQUEUE_TIME = 50;
-    
+
+    private static final TryCache<List<PendingGatewayMessage>> _bufferCache = new TryCache<>(new BufferFactory(), 16);
+
+    private static class BufferFactory implements TryCache.ObjectFactory<List<PendingGatewayMessage>> {
+        public List<PendingGatewayMessage> newInstance() {
+            return new ArrayList<PendingGatewayMessage>(32);
+        }
+    }
+
     /** Creates a new instance of TunnelGatewayPumper */
     public TunnelGatewayPumper(RouterContext ctx) {
         _context = ctx;
-        _wantsPumping = new LinkedHashSet<PumpedTunnelGateway>(16);
-        _backlogged = new HashSet<PumpedTunnelGateway>(16);
-        _threads = new CopyOnWriteArrayList<Thread>();
-        if (ctx.getBooleanProperty("i2p.dummyTunnelManager")) {
-            _pumpers = 1;
-        } else {
-            long maxMemory = SystemVersion.getMaxMemory();
-            _pumpers = (int) Math.max(MIN_PUMPERS, Math.min(MAX_PUMPERS, 1 + (maxMemory / (32*1024*1024))));
-        }
-        for (int i = 0; i < _pumpers; i++) {
-            Thread t = new I2PThread(this, "Tunnel GW pumper " + (i+1) + '/' + _pumpers, true);
-            _threads.add(t);
-            t.start();
-        }
+        _backlogged = new ConcurrentHashSet<PumpedTunnelGateway>(16);
+        _livepumps = new HashSet<PumpedTunnelGateway>(16);
+        _wantsPumping = new HashSet<PumpedTunnelGateway>(16);
     }
 
     public void stopPumping() {
-        _stop=true;
-        _wantsPumping.clear();
-        for (int i = 0; i < _pumpers; i++) {
-            PumpedTunnelGateway poison = new PoisonPTG(_context);
-            wantsPumping(poison);
+        _stop = true;
+        _backlogged.clear();
+        synchronized (_wantsPumping) {
+            _livepumps.clear();
+            _wantsPumping.clear();
         }
-        for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
-            try {
-                Thread.sleep(i * 50);
-            } catch (InterruptedException ie) {}
-        }
-        for (Thread t : _threads) {
-            t.interrupt();
-        }
-        _threads.clear();
-        _wantsPumping.clear();
     }
-    
+
     public void wantsPumping(PumpedTunnelGateway gw) {
-        if (!_stop) {
-            synchronized (_wantsPumping) {
-                if ((!_backlogged.contains(gw)) && _wantsPumping.add(gw))
-                    _wantsPumping.notify();
-            }
+        if (_backlogged.contains(gw) || _stop)
+            return;
+        synchronized (_wantsPumping) {
+            if (!_wantsPumping.add(gw) || !_livepumps.add(gw))
+                return; // let others return early if somebody else working already
+            _wantsPumping.remove(gw);
         }
-    }
-    
-    public void run() {
-        try {
-            run2();
-        } finally {
-            _threads.remove(Thread.currentThread());
-        }
-    }
-
-    private void run2() {
-        PumpedTunnelGateway gw = null;
-        List<PendingGatewayMessage> queueBuf = new ArrayList<PendingGatewayMessage>(32);
-        boolean requeue = false;
-        while (!_stop) {
-            try {
+        boolean keepPumping = true;
+        List<PendingGatewayMessage> queueBuf = _bufferCache.acquire();
+        while (keepPumping && !_stop) {
+            if (gw.pump(queueBuf)) {  // once per hour
+                _backlogged.add(gw);
+                _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
                 synchronized (_wantsPumping) {
-                    if (requeue && gw != null) {
-                        // in case another packet came in
-                        _wantsPumping.remove(gw);
-                        if (_backlogged.add(gw))
-                            _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
-                    }
-                    gw = null;
-                    if (_wantsPumping.isEmpty()) {
-                        _wantsPumping.wait();
-                    } else {
-                        Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
-                        gw = iter.next();
-                        iter.remove();
-                    }
+                    _wantsPumping.remove(gw);
                 }
-            } catch (InterruptedException ie) {}
-            if (gw != null) {
-                if (gw.getMessagesSent() == POISON_PTG)
-                    break;
-                requeue = gw.pump(queueBuf);
             }
+            synchronized (_wantsPumping) {
+                if (!(keepPumping = _wantsPumping.remove(gw)))
+                    _livepumps.remove(gw);
+            }
         }
+        _bufferCache.release(queueBuf);
     }
 
     private class Requeue implements SimpleTimer.TimedEvent {
@@ -131,23 +88,8 @@ class TunnelGatewayPumper implements Run
         }
 
         public void timeReached() {
-            synchronized (_wantsPumping) {
-                _backlogged.remove(_ptg);
-                if (_wantsPumping.add(_ptg))
-                    _wantsPumping.notify();
-            }
+            _backlogged.remove(_ptg);
+            wantsPumping(_ptg);
         }
     }
-
-
-    private static final int POISON_PTG = -99999;
-
-    private static class PoisonPTG extends PumpedTunnelGateway {
-        public PoisonPTG(RouterContext ctx) {
-            super(ctx, null, null, null, null);
-        }
-
-        @Override
-        public int getMessagesSent() { return POISON_PTG; }
-    }
 }
============================================================
--- router/java/src/net/i2p/router/networkdb/reseed/ReseedChecker.java	b2f49c2cef1bfef6da22c4b3fac423731c7ed49d
+++ router/java/src/net/i2p/router/networkdb/reseed/ReseedChecker.java	29336b429c3f102deeefd431de6e9c622c77e855
@@ -33,7 +33,7 @@ public class ReseedChecker {
     private volatile String _lastError = "";
     private volatile boolean _networkLogged;
 
-    public static final int MINIMUM = 50;
+    public static final int MINIMUM = 2;
     private static final long STATUS_CLEAN_TIME = 20*60*1000;
 
     /**

comment:21 Changed 3 months ago by jogger

I have reviewed your patch to ClientManager?.java. As said before I was not sure and now I am convinced we should not do that. Now Internal I2CP Reader runs so little code so that it is faster than the web app feeding it. The queue runs dry and you should be able to see a lot more context switches.

If we would want to change this then the code would need to be modified in a way that Internal I2CP Reader together with the queue is abandoned and the web apps directly put their stuff on the job queue. That could be a boost, I will look into this but not for .43.

So I was successful with external bittorrent clients via I2CP and your test would need an external web server via I2CP.

Also for whatever test, have you made sure that the TGW backlogged code does not strike? On the live net it strikes only once per hour average under high load. So a single occurence invalidates your test results.

Note: See TracTickets for help on using tickets.