Changeset 4cf1047


Ignore:
Timestamp:
Sep 8, 2012 12:47:17 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
6162908
Parents:
2c866e2
Message:
Location:
router/java/src/net/i2p/router/tunnel
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/PendingGatewayMessage.java

    r2c866e2 r4cf1047  
    88import net.i2p.data.i2np.I2NPMessage;
    99import net.i2p.router.RouterContext;
     10import net.i2p.router.util.CDQEntry;
    1011
    1112/**
     
    1415 *  @since 0.9.3 refactored from TunnelGateway.Pending
    1516 */
    16 class PendingGatewayMessage {
     17class PendingGatewayMessage implements CDQEntry {
    1718    protected final Hash _toRouter;
    1819    protected final TunnelId _toTunnel;
     
    2425    protected final long _created;
    2526    private List<Long> _messageIds;
     27    private long _enqueueTime;
    2628   
    2729    public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
     
    8587        }
    8688    }
     89
     90    /**
     91     *  For CDQ
     92     *  @since 0.9.3
     93     */
     94    public void setEnqueueTime(long now) {
     95        _enqueueTime = now;
     96    }
     97
     98    /**
     99     *  For CDQ
     100     *  @since 0.9.3
     101     */
     102    public long getEnqueueTime() {
     103        return _enqueueTime;
     104    }
     105
     106    /**
     107     *  For CDQ
     108     *  @since 0.9.3
     109     */
     110    public void drop() {
     111    }
    87112   
    88113    @Override
  • router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java

    r2c866e2 r4cf1047  
    33import java.util.List;
    44import java.util.concurrent.BlockingQueue;
    5 import java.util.concurrent.LinkedBlockingQueue;
    65
    76import net.i2p.data.Hash;
     
    109import net.i2p.router.Router;
    1110import net.i2p.router.RouterContext;
     11import net.i2p.router.util.CoDelBlockingQueue;
     12import net.i2p.router.util.CoDelPriorityBlockingQueue;
    1213import net.i2p.util.Log;
    1314
     
    3839    private final BlockingQueue<PendingGatewayMessage> _prequeue;
    3940    private final TunnelGatewayPumper _pumper;
     41    private final boolean _isInbound;
    4042   
    41     private static final int MAX_MSGS_PER_PUMP = 16;
    42     private static final int MAX_OB_QUEUE = 2048;
     43    private static final int MAX_OB_MSGS_PER_PUMP = 16;
     44    private static final int MAX_IB_MSGS_PER_PUMP = 8;
     45    private static final int INITIAL_OB_QUEUE = 64;
    4346    private static final int MAX_IB_QUEUE = 1024;
    4447
     
    5356    public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
    5457        super(context, preprocessor, sender, receiver);
    55         if (getClass() == PumpedTunnelGateway.class)
    56             _prequeue = new LinkedBlockingQueue(MAX_OB_QUEUE);
    57         else  // extended by ThrottledPTG for IB
    58             _prequeue = new LinkedBlockingQueue(MAX_IB_QUEUE);
     58        if (getClass() == PumpedTunnelGateway.class) {
     59            // Unbounded priority queue for outbound
     60            _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
     61            _isInbound = false;
     62        } else {  // extended by ThrottledPTG for IB
     63            // Bounded non-priority queue for inbound
     64            _prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE);
     65            _isInbound = true;
     66        }
    5967        _pumper = pumper;
    6068    }
     
    6573     * If it is queued up past its expiration, it is silently dropped
    6674     *
     75     * This is only for OBGWs. See TPTG override for IBGWs.
     76     *
    6777     * @param msg message to be sent through the tunnel
    6878     * @param toRouter router to send to after the endpoint (or null for endpoint processing)
     
    7181    @Override
    7282    public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
     83        OutboundGatewayMessage cur = new OutboundGatewayMessage(msg, toRouter, toTunnel);
     84        if (_log.shouldLog(Log.DEBUG))
     85            _log.debug("OB PTG add type " + msg.getType() + " pri " + cur.getPriority());
     86        add(cur);
     87    }
     88
     89    protected void add(PendingGatewayMessage cur) {
    7390        _messagesSent++;
    74         PendingGatewayMessage cur = new PendingGatewayMessage(msg, toRouter, toTunnel);
    7591        if (_prequeue.offer(cur))
    7692            _pumper.wantsPumping(this);
     
    90106     */
    91107    void pump(List<PendingGatewayMessage> queueBuf) {
    92         _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP);
     108        // TODO if an IBGW, and the next hop is backlogged,
     109        // drain less or none... better to let things back up here.
     110        // Don't do this for OBGWs?
     111        int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
     112        _prequeue.drainTo(queueBuf, max);
    93113        if (queueBuf.isEmpty())
    94114            return;
  • router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java

    r2c866e2 r4cf1047  
    4343            return;
    4444        }
    45         super.add(msg, toRouter, toTunnel);
     45        add(new PendingGatewayMessage(msg, toRouter, toTunnel));
    4646    }
    4747}
Note: See TracChangeset for help on using the changeset viewer.