Changeset e8a8f3c


Ignore:
Timestamp:
Sep 10, 2012 9:30:54 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
50ec279
Parents:
e0fc642
Message:
  • TunnelGateway?: Implement pushback from a backlogged transport queue to the pre-fragmentation queue
Location:
router/java/src/net/i2p/router/tunnel
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java

    re0fc642 re8a8f3c  
    11package net.i2p.router.tunnel;
    22
     3import net.i2p.data.Hash;
    34import net.i2p.data.RouterInfo;
    45import net.i2p.data.i2np.TunnelDataMessage;
     
    89
    910/**
    10  *  Handle messages at the IBGW
     11 *  Handle messages at the IBGW.
     12 *  Not used for zero-hop IBGWs.
    1113 */
    1214class InboundGatewayReceiver implements TunnelGateway.Receiver {
     
    6567    }
    6668   
     69    /**
     70     * The next hop
     71     * @return non-null
     72     * @since 0.9.3
     73     */
     74    public Hash getSendTo() {
     75        return _config.getSendTo();
     76    }
     77
    6778    private class ReceiveJob extends JobImpl {
    6879        private final byte[] _encrypted;
  • router/java/src/net/i2p/router/tunnel/OutboundReceiver.java

    re0fc642 re8a8f3c  
    11package net.i2p.router.tunnel;
    22
     3import net.i2p.data.Hash;
    34import net.i2p.data.RouterInfo;
    45import net.i2p.data.i2np.TunnelDataMessage;
     
    1314 * then forward it on to the first hop in the tunnel.
    1415 *
     16 * Not used for zero-hop OBGWs.
    1517 */
    1618class OutboundReceiver implements TunnelGateway.Receiver {
     
    5456            return -1;
    5557        }
     58    }
     59
     60    /**
     61     * The next hop
     62     * @return non-null
     63     * @since 0.9.3
     64     */
     65    public Hash getSendTo() {
     66        return _config.getPeer(1);
    5667    }
    5768
  • router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java

    re0fc642 re8a8f3c  
    4040    private final TunnelGatewayPumper _pumper;
    4141    private final boolean _isInbound;
     42    private final Hash _nextHop;
    4243   
    43     private static final int MAX_OB_MSGS_PER_PUMP = 16;
    44     private static final int MAX_IB_MSGS_PER_PUMP = 8;
     44    /**
     45     *  warning - these limit total messages per second throughput due to
     46     *  requeue delay in TunnelGatewayPumper to max * 1000 / REQUEUE_TIME
     47     */
     48    private static final int MAX_OB_MSGS_PER_PUMP = 64;
     49    private static final int MAX_IB_MSGS_PER_PUMP = 24;
    4550    private static final int INITIAL_OB_QUEUE = 64;
    4651    private static final int MAX_IB_QUEUE = 1024;
     
    5459     *                 to the first hop
    5560     */
    56     public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
     61    public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor,
     62                               Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
    5763        super(context, preprocessor, sender, receiver);
    5864        if (getClass() == PumpedTunnelGateway.class) {
    5965            // Unbounded priority queue for outbound
    6066            _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
     67            _nextHop = receiver.getSendTo();
    6168            _isInbound = false;
    62         } else {  // extended by ThrottledPTG for IB
     69        } else if (receiver != null) {  // extended by ThrottledPTG for IB
    6370            // Bounded non-priority queue for inbound
    6471            _prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE);
     72            _nextHop = receiver.getSendTo();
     73            _isInbound = true;
     74        } else {
     75            // Poison PTG
     76            _prequeue = null;
     77            _nextHop = null;
    6578            _isInbound = true;
    6679        }
     
    104117     * @param queueBuf Empty list for convenience, to use as a temporary buffer.
    105118     *                 Must be empty when called; will always be emptied before return.
     119     * @return true if we did not finish, and the pumper should be requeued.
    106120     */
    107     void pump(List<PendingGatewayMessage> queueBuf) {
    108         // TODO if an IBGW, and the next hop is backlogged,
    109         // drain less or none... better to let things back up here.
    110         // Don't do this for OBGWs?
    111         int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
     121    public boolean pump(List<PendingGatewayMessage> queueBuf) {
     122        // If the next hop is backlogged,
     123        // drain only a little... better to let things back up here,
     124        // before fragmentation, where we have priority queueing (for OBGW)
     125        int max;
     126        boolean backlogged = _context.commSystem().isBacklogged(_nextHop);
     127        if (backlogged && _log.shouldLog(Log.INFO))
     128            _log.info("PTG backlogged, queued to " + _nextHop + " : " + _prequeue.size() +
     129                      " IB? " + _isInbound);
     130        if (backlogged)
     131            max = _isInbound ? 1 : 2;
     132        else
     133            max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
    112134        _prequeue.drainTo(queueBuf, max);
    113135        if (queueBuf.isEmpty())
    114             return;
     136            return false;
     137        boolean rv = !_prequeue.isEmpty();
    115138
    116139        long startAdd = System.currentTimeMillis();
     
    163186        }
    164187        queueBuf.clear();
    165         if (!_prequeue.isEmpty())
    166             _pumper.wantsPumping(this);
     188        if (rv && _log.shouldLog(Log.INFO))
     189            _log.info("PTG remaining to " + _nextHop + " : " + _prequeue.size() +
     190                      " IB? " + _isInbound + " backlogged? " + backlogged);
     191        return rv;
    167192    }
    168193   
  • router/java/src/net/i2p/router/tunnel/TunnelGateway.java

    re0fc642 re8a8f3c  
    184184         */
    185185        public long receiveEncrypted(byte encrypted[]);
     186
     187        /**
     188         * The next hop
     189         * @return non-null
     190         * @since 0.9.3
     191         */
     192        public Hash getSendTo();
    186193    }
    187194
  • router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java

    re0fc642 re8a8f3c  
    22
    33import java.util.ArrayList;
     4import java.util.HashSet;
    45import java.util.Iterator;
    56import java.util.LinkedHashSet;
     
    1112import net.i2p.router.RouterContext;
    1213import net.i2p.util.I2PThread;
     14import net.i2p.util.SimpleScheduler;
     15import net.i2p.util.SimpleTimer;
    1316
    1417/**
     
    2326    private final RouterContext _context;
    2427    private final Set<PumpedTunnelGateway> _wantsPumping;
     28    private final Set<PumpedTunnelGateway> _backlogged;
    2529    private volatile boolean _stop;
    2630    private static final int MIN_PUMPERS = 1;
    2731    private static final int MAX_PUMPERS = 4;
    2832    private final int _pumpers;
     33
     34    /**
     35     *  Wait just a little, but this lets the pumper queue back up.
     36     *  See additional comments in PTG.
     37     */
     38    private static final long REQUEUE_TIME = 50;
    2939   
    3040    /** Creates a new instance of TunnelGatewayPumper */
     
    3242        _context = ctx;
    3343        _wantsPumping = new LinkedHashSet(16);
     44        _backlogged = new HashSet(16);
    3445        long maxMemory = Runtime.getRuntime().maxMemory();
    3546        if (maxMemory == Long.MAX_VALUE)
     
    5869        if (!_stop) {
    5970            synchronized (_wantsPumping) {
    60                 if (_wantsPumping.add(gw))
     71                if ((!_backlogged.contains(gw)) && _wantsPumping.add(gw))
    6172                    _wantsPumping.notify();
    6273            }
     
    6778        PumpedTunnelGateway gw = null;
    6879        List<PendingGatewayMessage> queueBuf = new ArrayList(32);
     80        boolean requeue = false;
    6981        while (!_stop) {
    7082            try {
    7183                synchronized (_wantsPumping) {
     84                    if (requeue && gw != null) {
     85                        // in case another packet came in
     86                        _wantsPumping.remove(gw);
     87                        if (_backlogged.add(gw))
     88                            _context.simpleScheduler().addEvent(new Requeue(gw), REQUEUE_TIME);
     89                    }
     90                    gw = null;
    7291                    if (_wantsPumping.isEmpty()) {
    7392                        _wantsPumping.wait();
     
    82101                if (gw.getMessagesSent() == POISON_PTG)
    83102                    break;
    84                 gw.pump(queueBuf);
    85                 gw = null;
     103                requeue = gw.pump(queueBuf);
    86104            }
    87105        }
    88106    }
     107
     108    private class Requeue implements SimpleTimer.TimedEvent {
     109        private final PumpedTunnelGateway _ptg;
     110
     111        public Requeue(PumpedTunnelGateway ptg) {
     112            _ptg = ptg;
     113        }
     114
     115        public void timeReached() {
     116            synchronized (_wantsPumping) {
     117                _backlogged.remove(_ptg);
     118                if (_wantsPumping.add(_ptg))
     119                    _wantsPumping.notify();
     120            }
     121        }
     122    }
     123
    89124
    90125    private static final int POISON_PTG = -99999;
Note: See TracChangeset for help on using the changeset viewer.