Opened 2 weeks ago

Last modified 10 days ago

#2432 new enhancement

Overhaul Tunnel GW Pumper

Reported by: jogger Owned by: zzz
Priority: minor Milestone: undecided
Component: router/general Version: 0.9.38
Keywords: Cc: zab
Parent Tickets:

Description

This thing has confusing flow control with a weird poison logic that gets never executed because the "_stop" control variable rules it all. For readabilitys sake this should be cleaned up.

Then are two To-Do´s:

  • # of threads
  • prio inbound vs. outbound

I have run T GW pumper with one thread only since more than a year now without a hitch. The requeue logic is executed less than once per hour, the average queue length when entering the pump() is less then 0.15 on a loaded ARM32. This confirms one thread is sufficient.

Then I came up with a queue prio logic for inbound vs. outbound. With outbound prio seeding torrents was faster. With inbound prio surfing eepsites felt snappier.

My suggestion: We fix this thing to two threads. If there are inbound as well as outbound GWs in the queue, one serves inbound first, the other outbound first. This way we cater to everybodys needs.

If nobody objects I will code that.

Subtickets (add)

Change History (3)

comment:1 Changed 2 weeks ago by zab

  • Cc zab added

comment:2 Changed 2 weeks ago by jogger

OK, have run this thing some time, absolutely satisfied. Feels faster.

Since this is much shorter and 90% recoded I provide the source with a supporting diff to PumpedTunnelGateway?.java. This way it is easier to read. diff no problem.

TunnelGatewayPumper?.java

package net.i2p.router.tunnel;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SystemVersion;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Run through the tunnel gateways that have had messages added to them and push
 * those messages through the preprocessing and sending process.
 *
 * Starts two threads: one for inbound and outbound priority each.
 */
class TunnelGatewayPumper implements Runnable {
    private final RouterContext _context;
    private final Set<PumpedTunnelGateway> _outbound;
    private final Set<PumpedTunnelGateway> _inbound;
    private final Set<PumpedTunnelGateway> _backlogged;
    private volatile boolean _stop;
    private AtomicBoolean _inboundFirst = new AtomicBoolean(false);

    /**
     *  Wait just a little, but this lets the pumper queue back up.
     *  See additional comments in PTG.
     */
    private static final long REQUEUE_TIME = 50;

    /** Creates a new instance of TunnelGatewayPumper */
    public TunnelGatewayPumper(RouterContext ctx) {
        _context = ctx;
        _outbound = new LinkedHashSet<PumpedTunnelGateway>(16);
        _inbound = new LinkedHashSet<PumpedTunnelGateway>(16);
        _backlogged = new HashSet<PumpedTunnelGateway>(16);
        for (int i = 1; i <= ((ctx.getBooleanProperty("i2p.dummyTunnelManager")) ? 1 : 2); i++)
            new I2PThread(this, "Tunnel GW pumper " + i, true).start();
    }

    public void stopPumping() {
        _stop = true;
        synchronized (_outbound) {
            _outbound.notifyAll();
        }
    }

    public void wantsPumping(PumpedTunnelGateway gw) {
         synchronized (_outbound) { // used reentrant
             if (!_outbound.contains(gw) && !_inbound.contains(gw) && !_backlogged.contains(gw)) {
                 if (gw._isInbound)
                    _inbound.add(gw);
                 else
                     _outbound.add(gw);
                _outbound.notify();
            }
        }
    }

    public void run() {
        boolean inboundFirst = _inboundFirst.getAndSet(true);
        PumpedTunnelGateway gw = null;
        List<PendingGatewayMessage> queueBuf = new ArrayList<PendingGatewayMessage>(32);
        boolean requeue = false;
        while (!_stop) {
            try {
                synchronized (_outbound) {
                    if (requeue) { // usually happens less than 1 / hour
                        // in case another packet came in
                        _outbound.remove(gw);
                        _inbound.remove(gw);
                        if (_backlogged.add(gw))  // if not necessary, synchronized
                            _context.simpleTimer2().addEvent(new Requeue(gw), REQUEUE_TIME);
                    }
                    while (_outbound.isEmpty() && _inbound.isEmpty()) { // spurios wakeup
                        _outbound.wait();
                        if (_stop)
                            return;
                    }
                    Iterator<PumpedTunnelGateway> iter = (_outbound.isEmpty() ||
                        inboundFirst && !_inbound.isEmpty()) ? _inbound.iterator() : _outbound.iterator();
                    gw = iter.next();
                    iter.remove();
                }
            } catch (InterruptedException ie) {}
            requeue = gw.pump(queueBuf); // if single thread: average queue length before this < 0.15 on busy router
        }
    }

    private class Requeue implements SimpleTimer.TimedEvent {
        private final PumpedTunnelGateway _ptg;

        public Requeue(PumpedTunnelGateway ptg) {
            _ptg = ptg;
        }

        public void timeReached() {
            synchronized (_outbound) {
                _backlogged.remove(_ptg);
                wantsPumping(_ptg);
            }
        }
    }
}

PumpedTunnelGateway?.java

--- "PumpedTunnelGateway orig.java"	2019-02-08 18:36:54.149434258 +0100
+++ "PumpedTunnelGateway patch.java"	2019-02-08 18:35:59.461682571 +0100
@@ -25,11 +25,11 @@
  *     messages or instruct the TunnelGateway to offer it the messages again in
  *     a short while (in an attempt to coallesce them).
  * <li>when the QueueProcessor accepts a TunnelGateway.Pending, it preprocesses
- *     it into fragments, forwarding each preprocessed fragment group through 
+ *     it into fragments, forwarding each preprocessed fragment group through
  *     the Sender.</li>
- * <li>the Sender then encrypts the preprocessed data and delivers it to the 
+ * <li>the Sender then encrypts the preprocessed data and delivers it to the
  *     Receiver.</li>
- * <li>the Receiver now has the encrypted message and may do with it as it 
+ * <li>the Receiver now has the encrypted message and may do with it as it
  *     pleases (e.g. wrap it as necessary and enqueue it onto the OutNetMessagePool,
  *     or if debugging, verify that it can be decrypted properly)</li>
  * </ol>
@@ -38,9 +38,9 @@
 class PumpedTunnelGateway extends TunnelGateway {
     private final BlockingQueue<PendingGatewayMessage> _prequeue;
     private final TunnelGatewayPumper _pumper;
-    private final boolean _isInbound;
+    public final boolean _isInbound;
     private final Hash _nextHop;
-    
+
     /**
      *  warning - these limit total messages per second throughput due to
      *  requeue delay in TunnelGatewayPumper to max * 1000 / REQUEUE_TIME
@@ -53,9 +53,9 @@
     /**
      * @param preprocessor this pulls Pending messages off a list, builds some
      *                     full preprocessed messages, and pumps those into the sender
-     * @param sender this takes a preprocessed message, encrypts it, and sends it to 
+     * @param sender this takes a preprocessed message, encrypts it, and sends it to
      *               the receiver
-     * @param receiver this receives the encrypted message and forwards it off 
+     * @param receiver this receives the encrypted message and forwards it off
      *                 to the first hop
      */
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -68,20 +68,15 @@
             _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = false;
-        } else if (receiver != null) {  // extended by ThrottledPTG for IB
+        } else {  // extended by ThrottledPTG for IB
             // Bounded non-priority queue for inbound
             _prequeue = new CoDelBlockingQueue<PendingGatewayMessage>(context, "IBGW", MAX_IB_QUEUE);
             _nextHop = receiver.getSendTo();
             _isInbound = true;
-        } else {
-            // Poison PTG
-            _prequeue = null;
-            _nextHop = null;
-            _isInbound = true;
         }
         _pumper = pumper;
     }
-    
+
     /**
      * Add a message to be sent down the tunnel, either sending it now (perhaps
      * coallesced with other pending messages) or after a brief pause (_flushFrequency).
@@ -138,7 +133,7 @@
             return false;
         boolean rv = !_prequeue.isEmpty();
 
-        long startAdd = System.currentTimeMillis();
+        long startAdd = _context.clock().now();
         long beforeLock = startAdd;
         long afterAdded = -1;
         boolean delayedFlush = false;
@@ -148,15 +143,15 @@
         long afterExpire = 0;
         synchronized (_queue) {
             _queue.addAll(queueBuf);
-            afterAdded = System.currentTimeMillis();
+            afterAdded = _context.clock().now();
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Added before direct flush preprocessing for " + toString() + ": " + _queue);
             delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
-            afterPreprocess = System.currentTimeMillis();
+            afterPreprocess = _context.clock().now();
             if (delayedFlush)
                 delayAmount = _preprocessor.getDelayAmount();
             _lastFlush = _context.clock().now();
-            
+
             // expire any as necessary, even if its framented
             for (int i = 0; i < _queue.size(); i++) {
                 PendingGatewayMessage m = _queue.get(i);
@@ -167,18 +162,18 @@
                     i--;
                 }
             }
-            afterExpire = System.currentTimeMillis();
+            afterExpire = _context.clock().now();
             remaining = _queue.size();
             if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
                 _log.debug("Remaining after preprocessing: " + _queue);
         }
-        
+
         if (delayedFlush) {
             _delayedFlush.reschedule(delayAmount);
         }
         //_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
         if (_log.shouldLog(Log.DEBUG)) {
-            long complete = System.currentTimeMillis();
+            long complete = _context.clock().now();
             _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
                        + " delayed? " + delayedFlush + " remaining: " + remaining
                        + " add: " + (afterAdded-beforeLock)
@@ -192,5 +187,5 @@
                       " IB? " + _isInbound + " backlogged? " + backlogged);
         return rv;
     }
-    
+
 }

comment:3 Changed 10 days ago by zzz

  • Component changed from router/transport to router/general
Note: See TracTickets for help on using tickets.