Changeset f232775


Ignore:
Timestamp:
Sep 19, 2012 7:00:06 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
0eedc3aa
Parents:
bd57463
Message:

CoDel? for build handler

File:
1 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java

    rbd57463 rf232775  
    22
    33import java.util.List;
    4 import java.util.concurrent.LinkedBlockingQueue;
     4import java.util.concurrent.BlockingQueue;
    55
    66import net.i2p.data.Base64;
     
    2929import net.i2p.router.tunnel.HopConfig;
    3030import net.i2p.router.tunnel.TunnelDispatcher;
     31import net.i2p.router.util.CDQEntry;
     32import net.i2p.router.util.CoDelBlockingQueue;
    3133import net.i2p.stat.Rate;
    3234import net.i2p.stat.RateStat;
     
    5254    private final Job _buildMessageHandlerJob;
    5355    private final Job _buildReplyMessageHandlerJob;
    54     private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
     56    private final BlockingQueue<BuildMessageState> _inboundBuildMessages;
    5557    private final BuildMessageProcessor _processor;
    5658    private final ParticipatingThrottler _throttler;
     
    8082        // Queue size = 12 * share BW / 48K
    8183        int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48));
    82         _inboundBuildMessages = new LinkedBlockingQueue(sz);
     84        _inboundBuildMessages = new CoDelBlockingQueue(ctx, "BuildHandler", sz);
    8385   
    8486        _context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     
    138140        _isRunning = false;
    139141        _inboundBuildMessages.clear();
    140         BuildMessageState poison = new BuildMessageState(null, null, null);
     142        BuildMessageState poison = new BuildMessageState(_context, null, null, null);
    141143        for (int i = 0; i < numThreads; i++) {
    142144            _inboundBuildMessages.offer(poison);
     
    766768                } else {
    767769                    int sz = _inboundBuildMessages.size();
     770                    // Can probably remove this check, since CoDel is in use
    768771                    BuildMessageState cur = _inboundBuildMessages.peek();
    769772                    boolean accept = true;
     
    774777                            _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
    775778                            // if the queue is backlogged, stop adding new messages
    776                             _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
    777779                            accept = false;
    778780                        }
    779781                    }
    780782                    if (accept) {
    781                         int queueTime = estimateQueueTime(sz);
    782                         float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
    783                         pDrop = (float)Math.pow(pDrop, 16); // steeeep
    784                         float f = _context.random().nextFloat();
     783                        // This is expensive and rarely seen, use CoDel instead
     784                        //int queueTime = estimateQueueTime(sz);
     785                        //float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
     786                        //pDrop = (float)Math.pow(pDrop, 16); // steeeep
     787                        //float f = _context.random().nextFloat();
    785788                        //if ( (pDrop > f) && (allowProactiveDrop()) ) {
    786                         if (pDrop > f) {
    787                             _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
    788                             _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
    789                         } else {
    790                             accept = _inboundBuildMessages.offer(new BuildMessageState(receivedMessage, from, fromHash));
     789                        //if (pDrop > f) {
     790                        //    _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
     791                        //    _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
     792                        //} else {
     793                            accept = _inboundBuildMessages.offer(new BuildMessageState(_context, receivedMessage, from, fromHash));
    791794                            if (accept) {
    792795                                // wake up the Executor to call handleInboundRequests()
     
    796799                                _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
    797800                            }
    798                         }
     801                        //}
    799802                    }
    800803                }
     
    813816****/
    814817   
     818/****
    815819    private int estimateQueueTime(int numPendingMessages) {
    816820        int decryptTime = 200;
     
    833837        return (int)estimatedQueueTime;
    834838    }
    835    
    836    
     839****/
     840   
     841    /** */
    837842    private class TunnelBuildReplyMessageHandlerJobBuilder implements HandlerJobBuilder {
    838843        public Job createJob(I2NPMessage receivedMessage, RouterIdentity from, Hash fromHash) {
     
    846851   
    847852    /** normal inbound requests from other people */
    848     private static class BuildMessageState {
     853    private static class BuildMessageState implements CDQEntry {
     854        private final RouterContext _ctx;
    849855        final TunnelBuildMessage msg;
    850856        final RouterIdentity from;
    851857        final Hash fromHash;
    852858        final long recvTime;
    853         public BuildMessageState(I2NPMessage m, RouterIdentity f, Hash h) {
     859
     860        public BuildMessageState(RouterContext ctx, I2NPMessage m, RouterIdentity f, Hash h) {
     861            _ctx = ctx;
    854862            msg = (TunnelBuildMessage)m;
    855863            from = f;
    856864            fromHash = h;
    857             recvTime = System.currentTimeMillis();
     865            recvTime = ctx.clock().now();
     866        }
     867
     868        public void setEnqueueTime(long time) {
     869            // set at instantiation, which is just before enqueueing
     870        }
     871
     872        public long getEnqueueTime() {
     873            return recvTime;
     874        }
     875
     876        public void drop() {
     877            _ctx.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
     878            _ctx.statManager().addRateData("tunnel.dropLoadProactive", _ctx.clock().now() - recvTime);
    858879        }
    859880    }
Note: See TracChangeset for help on using the changeset viewer.