Changeset 4e78517


Ignore:
Timestamp:
Sep 1, 2012 5:20:52 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
5eff26e
Parents:
10d9eb7
Message:

TunnelGateway?:

  • Limit queue sizes
  • Add stat for overflow
  • Remove some stats
  • Change pumper to LinkedHashSet? for efficiency (like NTCP Reader/Writer?)
  • Limit messages pumped per cycle to increase round-robin fairness
  • Comment out some unused code
  • Javadoc
Location:
router/java/src/net/i2p/router/tunnel
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java

    r10d9eb7 r4e78517  
    3939    private final TunnelGatewayPumper _pumper;
    4040   
     41    private static final int MAX_MSGS_PER_PUMP = 16;
     42    private static final int MAX_OB_QUEUE = 2048;
     43    private static final int MAX_IB_QUEUE = 1024;
     44
    4145    /**
    4246     * @param preprocessor this pulls Pending messages off a list, builds some
     
    4953    public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
    5054        super(context, preprocessor, sender, receiver);
    51         _prequeue = new LinkedBlockingQueue();
     55        if (getClass() == PumpedTunnelGateway.class)
     56            _prequeue = new LinkedBlockingQueue(MAX_OB_QUEUE);
     57        else  // extended by ThrottledPTG for IB
     58            _prequeue = new LinkedBlockingQueue(MAX_IB_QUEUE);
    5259        _pumper = pumper;
    5360    }
     
    6673        _messagesSent++;
    6774        Pending cur = new PendingImpl(msg, toRouter, toTunnel);
    68         _prequeue.offer(cur);
    69         _pumper.wantsPumping(this);
     75        if (_prequeue.offer(cur))
     76            _pumper.wantsPumping(this);
     77        else
     78            _context.statManager().addRateData("tunnel.dropGatewayOverflow", 1);
    7079    }
    7180
     
    8190     */
    8291    void pump(List<Pending> queueBuf) {
    83         _prequeue.drainTo(queueBuf);
     92        _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP);
    8493        if (queueBuf.isEmpty())
    8594            return;
     
    123132            _delayedFlush.reschedule(delayAmount);
    124133        }
    125         _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
    126         long complete = System.currentTimeMillis();
    127         if (_log.shouldLog(Log.DEBUG))
     134        //_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
     135        if (_log.shouldLog(Log.DEBUG)) {
     136            long complete = System.currentTimeMillis();
    128137            _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
    129138                       + " delayed? " + delayedFlush + " remaining: " + remaining
     
    132141                       + " expire: " + (afterExpire-afterPreprocess)
    133142                       + " queue flush: " + (complete-afterExpire));
     143        }
    134144        queueBuf.clear();
     145        if (!_prequeue.isEmpty())
     146            _pumper.wantsPumping(this);
    135147    }
    136148   
  • router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java

    r10d9eb7 r4e78517  
    3030 * various tunnels.
    3131 *
     32 *<pre>
     33 *  For each type of tunnel, it creates a chain of handlers, as follows:
     34 *
     35 *  Following tunnels are created by us:
     36 *
     37 *    Outbound Gateway > 0 hops:
     38 *       PumpedTunnelGateway
     39 *         BatchedRouterPreprocessor -> OutboundSender -> OutboundReceiver -> OutNetMessagePool
     40 *
     41 *    Outbound zero-hop Gateway+Endpoint:
     42 *       TunnelGatewayZeroHop
     43 *         OutboundMessageDistributor -> OutNetMessagePool
     44 *
     45 *    Inbound Endpoint > 0 hops:
     46 *       TunnelParticipant
     47 *        RouterFragmentHandler ->  InboundEndpointProcessor -> InboundMessageDistributor -> InNetMessagePool
     48 *
     49 *    Inbound zero-hop Gateway+Endpoint:
     50 *       TunnelGatewayZeroHop
     51 *         InboundMessageDistributor -> InNetMessagePool
     52 *
     53 *
     54 *  Following tunnels are NOT created by us:
     55 *
     56 *    Participant (not gateway or endpoint)
     57 *       TunnelParticipant
     58 *         HopProcessor -> OutNetMessagePool
     59 *
     60 *    Outbound Endpoint > 0 hops:
     61 *       OutboundTunnelEndpoint
     62 *         RouterFragmentHandler -> HopProcessor -> OutboundMessageDistributor -> OutNetMessagePool
     63 *
     64 *    Inbound Gateway > 0 hops:
     65 *       ThrottledPumpedTunnelGateway
     66 *         BatchedRouterPreprocessor -> InboundSender -> InboundGatewayReceiver -> OutNetMessagePool
     67 *
     68 *</pre>
    3269 */
    3370public class TunnelDispatcher implements Service {
     
    175212        ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 60*60*1000 });
    176213        ctx.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*60*1000 });
     214        // following is for PumpedTunnelGateway
     215        ctx.statManager().createRateStat("tunnel.dropGatewayOverflow", "Dropped message at GW, queue full", "Tunnels", new long[] { 60*60*1000 });
    177216    }
    178217
  • router/java/src/net/i2p/router/tunnel/TunnelGateway.java

    r10d9eb7 r4e78517  
    3333 * </ol>
    3434 *
     35 * Unused directly - see PumpedTunnelGateway, ThrottledPumpedTunnelGateway, and TunnelGatewayZeroHop overrides.
    3536 */
    3637class TunnelGateway {
     
    6465        _delayedFlush = new DelayedFlush();
    6566        _lastFlush = _context.clock().now();
    66         _context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
    67         _context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     67        //_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     68        //_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
    6869    }
    6970   
     
    8283     * If it is queued up past its expiration, it is silently dropped
    8384     *
     85     * UNUSED - see overrides
     86     *
    8487     * @param msg message to be sent through the tunnel
    8588     * @param toRouter router to send to after the endpoint (or null for endpoint processing)
     
    8790     */
    8891    public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
     92        throw new UnsupportedOperationException("unused, right?");
     93/****
    8994        _messagesSent++;
    9095        long startAdd = System.currentTimeMillis();
     
    138143                       + " queue flush: " + (complete-afterExpire));
    139144        }
     145****/
    140146    }
    141147   
     
    313319                _lastFlush = _context.clock().now();
    314320           
    315             _context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
     321            //_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
    316322        }
    317323    }
  • router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java

    r10d9eb7 r4e78517  
    22
    33import java.util.ArrayList;
     4import java.util.Iterator;
     5import java.util.LinkedHashSet;
    46import java.util.List;
     7import java.util.Set;
    58import java.util.concurrent.BlockingQueue;
    69import java.util.concurrent.LinkedBlockingQueue;
     
    1013
    1114/**
    12  * run through the tunnel gateways that have had messages added to them and push
    13  * those messages through the preprocessing and sending process
     15 * Run through the tunnel gateways that have had messages added to them and push
     16 * those messages through the preprocessing and sending process.
     17 *
     18 * TODO do we need this many threads?
     19 * TODO this combines IBGWs and OBGWs, do we wish to separate the two
     20 * and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
    1421 */
    1522class TunnelGatewayPumper implements Runnable {
    1623    private final RouterContext _context;
    17     private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
    18     private boolean _stop;
     24    private final Set<PumpedTunnelGateway> _wantsPumping;
     25    private volatile boolean _stop;
    1926    private static final int MIN_PUMPERS = 1;
    2027    private static final int MAX_PUMPERS = 4;
     
    2431    public TunnelGatewayPumper(RouterContext ctx) {
    2532        _context = ctx;
    26         _wantsPumping = new LinkedBlockingQueue();
     33        _wantsPumping = new LinkedHashSet(16);
    2734        long maxMemory = Runtime.getRuntime().maxMemory();
    2835        if (maxMemory == Long.MAX_VALUE)
     
    3643        _stop=true;
    3744        _wantsPumping.clear();
    38         PumpedTunnelGateway poison = new PoisonPTG(_context);
    39         for (int i = 0; i < _pumpers; i++)
    40             _wantsPumping.offer(poison);
     45        for (int i = 0; i < _pumpers; i++) {
     46            PumpedTunnelGateway poison = new PoisonPTG(_context);
     47            wantsPumping(poison);
     48        }
    4149        for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
    4250            try {
     
    4856   
    4957    public void wantsPumping(PumpedTunnelGateway gw) {
    50         if (!_stop)
    51             _wantsPumping.offer(gw);
     58        if (!_stop) {
     59            synchronized (_wantsPumping) {
     60                if (_wantsPumping.add(gw))
     61                    _wantsPumping.notify();
     62            }
     63        }
    5264    }
    5365   
     
    5769        while (!_stop) {
    5870            try {
    59                 gw = _wantsPumping.take();
     71                synchronized (_wantsPumping) {
     72                    if (_wantsPumping.isEmpty()) {
     73                        _wantsPumping.wait();
     74                    } else {
     75                        Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
     76                        gw = iter.next();
     77                        iter.remove();
     78                    }
     79                }
    6080            } catch (InterruptedException ie) {}
    6181            if (gw != null) {
Note: See TracChangeset for help on using the changeset viewer.