Changeset f938cc7b


Ignore:
Timestamp:
Mar 2, 2011 5:18:37 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
cb70778
Parents:
28bd180
Message:
  • BuildHandler?:
    • Limit request queue size
    • Concurrent request queue
    • Remove dead code for queued rely handling
Location:
router/java/src/net/i2p/router/tunnel
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java

    r28bd180 rf938cc7b  
    710710    }
    711711
    712     private static int getShareBandwidth(RouterContext ctx) {
     712    /** @return in KBps */
     713    public static int getShareBandwidth(RouterContext ctx) {
    713714        int irateKBps = ctx.bandwidthLimiter().getInboundKBytesPerSecond();
    714715        int orateKBps = ctx.bandwidthLimiter().getOutboundKBytesPerSecond();
  • router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java

    r28bd180 rf938cc7b  
    2828class BuildExecutor implements Runnable {
    2929    private final ArrayList<Long> _recentBuildIds = new ArrayList(100);
    30     private RouterContext _context;
    31     private Log _log;
    32     private TunnelPoolManager _manager;
     30    private final RouterContext _context;
     31    private final Log _log;
     32    private final TunnelPoolManager _manager;
    3333    /** list of TunnelCreatorConfig elements of tunnels currently being built */
    3434    private final Object _currentlyBuilding;
     
    3838    private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap;
    3939    private boolean _isRunning;
    40     private BuildHandler _handler;
     40    private final BuildHandler _handler;
    4141    private boolean _repoll;
    4242    private static final int MAX_CONCURRENT_BUILDS = 10;
     
    249249       
    250250        //long loopBegin = 0;
    251         //long beforeHandleInboundReplies = 0;
    252         //long afterHandleInboundReplies = 0;
    253251        //long afterBuildZeroHop = 0;
    254252        long afterBuildReal = 0;
     
    269267                }
    270268
    271                 //beforeHandleInboundReplies = System.currentTimeMillis();
    272                 _handler.handleInboundReplies();
    273                 //afterHandleInboundReplies = System.currentTimeMillis();
    274                
    275269                // allowed() also expires timed out requests (for new style requests)
    276270                int allowed = allowed();
     
    328322                                buildTunnel(pool, cfg);
    329323                                realBuilt++;
    330                                
    331                                 // we want replies to go to the top of the queue
    332                                 _handler.handleInboundReplies();
    333324                            } else {
    334325                                i--;
     
    392383     *
    393384     */
    394     private static class TunnelPoolComparator implements Comparator {
    395         public int compare(Object l, Object r) {
    396             TunnelPool tpl = (TunnelPool) l;
    397             TunnelPool tpr = (TunnelPool) r;
     385    private static class TunnelPoolComparator implements Comparator<TunnelPool> {
     386        public int compare(TunnelPool tpl, TunnelPool tpr) {
    398387            if (tpl.getSettings().isExploratory() && !tpr.getSettings().isExploratory())
    399388                return -1;
  • router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java

    r28bd180 rf938cc7b  
    11package net.i2p.router.tunnel.pool;
    22
    3 import java.util.ArrayList;
    43import java.util.List;
     4import java.util.concurrent.LinkedBlockingQueue;
    55
    66import net.i2p.data.Base64;
     
    2828import net.i2p.router.tunnel.BuildReplyHandler;
    2929import net.i2p.router.tunnel.HopConfig;
     30import net.i2p.router.tunnel.TunnelDispatcher;
    3031import net.i2p.stat.Rate;
    3132import net.i2p.stat.RateStat;
     
    3334
    3435/**
     36 * Handle the received tunnel build message requests and replies,
     37 * including sending responsses to requests, updating the
     38 * lists of our tunnels and participating tunnels,
     39 * and updating stats.
     40 *
     41 * Replies are handled immediately on reception; requests are queued.
    3542 *
    3643 * Note that 10 minute tunnel expiration is hardcoded in here.
    37  *
    3844 */
    3945class BuildHandler {
     
    4349    private final Job _buildMessageHandlerJob;
    4450    private final Job _buildReplyMessageHandlerJob;
    45     /** list of BuildMessageState, oldest first */
    46     private final List<BuildMessageState> _inboundBuildMessages;
    47     /** list of BuildReplyMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
    48     private final List<BuildReplyMessageState> _inboundBuildReplyMessages;
    49     /** list of BuildEndMessageState, oldest first - unused unless HANDLE_REPLIES_INLINE == false */
    50     private final List<BuildEndMessageState> _inboundBuildEndMessages;
     51    private final LinkedBlockingQueue<BuildMessageState> _inboundBuildMessages;
    5152    private final BuildMessageProcessor _processor;
    5253    private final ParticipatingThrottler _throttler;
    5354
    54     private static final boolean HANDLE_REPLIES_INLINE = true;
    55    
     55    /** TODO these may be too high, review and adjust */
     56    private static final int MIN_QUEUE = 12;
     57    private static final int MAX_QUEUE = 96;
     58
    5659    public BuildHandler(RouterContext ctx, BuildExecutor exec) {
    5760        _context = ctx;
    5861        _log = ctx.logManager().getLog(getClass());
    5962        _exec = exec;
    60         _inboundBuildMessages = new ArrayList(16);
    61         if (HANDLE_REPLIES_INLINE) {
    62             _inboundBuildEndMessages = null;
    63             _inboundBuildReplyMessages = null;
    64         } else {
    65             _inboundBuildEndMessages = new ArrayList(16);
    66             _inboundBuildReplyMessages = new ArrayList(16);
    67         }
     63        // Queue size = 12 * share BW / 48K
     64        int sz = Math.min(MAX_QUEUE, Math.max(MIN_QUEUE, TunnelDispatcher.getShareBandwidth(ctx) * MIN_QUEUE / 48));
     65        _inboundBuildMessages = new LinkedBlockingQueue(sz);
    6866   
    6967        _context.statManager().createRateStat("tunnel.reject.10", "How often we reject a tunnel probabalistically", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     
    9593       
    9694        _processor = new BuildMessageProcessor(ctx);
     95        _throttler = new ParticipatingThrottler(ctx);
    9796        _buildMessageHandlerJob = new TunnelBuildMessageHandlerJob(ctx);
    9897        _buildReplyMessageHandlerJob = new TunnelBuildReplyMessageHandlerJob(ctx);
     
    103102        ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildMessage.MESSAGE_TYPE, tbmhjb);
    104103        ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb);
    105         _throttler = new ParticipatingThrottler(ctx);
    106104    }
    107105   
     
    111109    /**
    112110     * Blocking call to handle a few of the pending inbound requests, returning how many
    113      * requests remain after this pass
     111     * requests remain after this pass. This is called by BuildExecutor.
    114112     */
    115113    int handleInboundRequests() {
    116         int dropExpired = 0;
    117         int remaining = 0;
    118         List handled = null;
    119         long beforeFindHandled = System.currentTimeMillis();
    120         synchronized (_inboundBuildMessages) {
    121             int toHandle = _inboundBuildMessages.size();
    122             if (toHandle > 0) {
    123                 if (toHandle > MAX_HANDLE_AT_ONCE)
    124                     toHandle = MAX_HANDLE_AT_ONCE;
    125                 handled = new ArrayList(toHandle);
    126                 //if (false) {
    127                 //    for (int i = 0; i < toHandle; i++) // LIFO for lower response time (should we RED it for DoS?)
    128                 //        handled.add(_inboundBuildMessages.remove(_inboundBuildMessages.size()-1));
    129                 //} else {
    130                     // drop any expired messages
    131                     long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
    132                     do {
    133                         BuildMessageState state = (BuildMessageState)_inboundBuildMessages.get(0);
    134                         if (state.recvTime <= dropBefore) {
    135                             _inboundBuildMessages.remove(0);
    136                             dropExpired++;
    137                             if (_log.shouldLog(Log.WARN))
    138                                 _log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
    139                                            + ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
    140                             _context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
    141                         } else {
    142                             break;
    143                         }
    144                     } while (!_inboundBuildMessages.isEmpty());
    145                    
    146                     if (dropExpired > 0)
    147                         _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
    148 
    149                     // now pull off the oldest requests first (we're doing a tail-drop
    150                     // when adding)
    151                     for (int i = 0; i < toHandle && !_inboundBuildMessages.isEmpty(); i++)
    152                         handled.add(_inboundBuildMessages.remove(0));
    153                 //}
    154             }
    155             remaining = _inboundBuildMessages.size();
    156         }
    157         if (handled != null) {
     114        for (int i = 0; i < MAX_HANDLE_AT_ONCE; ) {
     115            BuildMessageState state = _inboundBuildMessages.poll();
     116            if (state == null)
     117                return 0;
     118            long dropBefore = System.currentTimeMillis() - (BuildRequestor.REQUEST_TIMEOUT/4);
     119            if (state.recvTime <= dropBefore) {
     120                if (_log.shouldLog(Log.WARN))
     121                    _log.warn("Not even trying to handle/decrypt the request " + state.msg.getUniqueId()
     122                              + ", since we received it a long time ago: " + (System.currentTimeMillis() - state.recvTime));
     123                _context.statManager().addRateData("tunnel.dropLoadDelay", System.currentTimeMillis() - state.recvTime, 0);
     124                _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Too slow"));
     125                continue;
     126            }       
     127
     128            i++;
     129            long beforeHandle = System.currentTimeMillis();
     130            long actualTime = handleRequest(state);
    158131            if (_log.shouldLog(Log.DEBUG))
    159                 _log.debug("Handling " + handled.size() + " requests (took " + (System.currentTimeMillis()-beforeFindHandled) + "ms to find them)");
    160            
    161             for (int i = 0; i < handled.size(); i++) {
    162                 BuildMessageState state = (BuildMessageState)handled.get(i);
    163                 long beforeHandle = System.currentTimeMillis();
    164                 long actualTime = handleRequest(state);
    165                 if (_log.shouldLog(Log.DEBUG))
    166                     _log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime + " (" + i + " out of " + handled.size() + " with " + remaining + " remaining)");
    167             }
    168             handled.clear();
    169         }
    170         if (!HANDLE_REPLIES_INLINE) {
    171             synchronized (_inboundBuildEndMessages) {
    172                 int toHandle = _inboundBuildEndMessages.size();
    173                 if (toHandle > 0) {
    174                     if (handled == null)
    175                         handled = new ArrayList(_inboundBuildEndMessages);
    176                     else
    177                         handled.addAll(_inboundBuildEndMessages);
    178                     _inboundBuildEndMessages.clear();
    179                 }
    180             }
    181         }
    182         if (handled != null) {
    183             if (_log.shouldLog(Log.DEBUG))
    184                 _log.debug("Handling " + handled.size() + " requests that are actually replies");
    185             // these are inbound build messages that actually contain the full replies, since
    186             // they are for inbound tunnels we have created
    187             for (int i = 0; i < handled.size(); i++) {
    188                 BuildEndMessageState state = (BuildEndMessageState)handled.get(i);
    189                 handleRequestAsInboundEndpoint(state);
    190             }
    191         }
    192        
    193         // anything else?
    194         /*
    195         synchronized (_inboundBuildMessages) {
    196             int remaining = _inboundBuildMessages.size();
    197             return remaining;
    198         }
    199          */
     132                _log.debug("Handle took " + (System.currentTimeMillis()-beforeHandle) + "/" + actualTime +
     133                           " (" + i + " with " + _inboundBuildMessages.size() + " remaining)");
     134        }
     135
     136        int remaining = _inboundBuildMessages.size();
    200137        if (remaining > 0)
    201138            _context.statManager().addRateData("tunnel.handleRemaining", remaining, 0);
    202139        return remaining;
    203     }
    204    
    205     /** Warning - noop if HANDLE_REPLIES_INLINE == true */
    206     void handleInboundReplies() {
    207         if (HANDLE_REPLIES_INLINE)
    208             return;
    209         List handled = null;
    210         synchronized (_inboundBuildReplyMessages) {
    211             int toHandle = _inboundBuildReplyMessages.size();
    212             if (toHandle > 0) {
    213                 // always handle all of them - they're replies that we were waiting for!
    214                 handled = new ArrayList(_inboundBuildReplyMessages);
    215                 _inboundBuildReplyMessages.clear();
    216             }
    217         }
    218         if (handled != null) {
    219             if (_log.shouldLog(Log.DEBUG))
    220                 _log.debug("Handling " + handled.size() + " replies");
    221            
    222             for (int i = 0; i < handled.size(); i++) {
    223                 BuildReplyMessageState state = (BuildReplyMessageState)handled.get(i);
    224                 handleReply(state);
    225             }
    226         }
    227140    }
    228141   
     
    344257    }
    345258   
    346     /** @return handle time or -1 */
     259    /** @return handle time or -1 if it wasn't completely handled */
    347260    private long handleRequest(BuildMessageState state) {
    348261        long timeSinceReceived = System.currentTimeMillis()-state.recvTime;
     
    366279        long decryptTime = System.currentTimeMillis() - beforeDecrypt;
    367280        _context.statManager().addRateData("tunnel.decryptRequestTime", decryptTime, decryptTime);
    368         if (decryptTime > 500)
     281        if (decryptTime > 500 && _log.shouldLog(Log.WARN))
    369282            _log.warn("Took too long to decrypt the request: " + decryptTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
    370283        if (req == null) {
     
    380293        RouterInfo nextPeerInfo = _context.netDb().lookupRouterInfoLocally(nextPeer);
    381294        long lookupTime = System.currentTimeMillis()-beforeLookup;
    382         if (lookupTime > 500)
     295        if (lookupTime > 500 && _log.shouldLog(Log.WARN))
    383296            _log.warn("Took too long to lookup the request: " + lookupTime + "/" + readPeerTime + " for message " + state.msg.getUniqueId() + " received " + (timeSinceReceived+decryptTime) + " ago");
    384297        if (nextPeerInfo == null) {
     
    417330   
    418331    private class HandleReq extends JobImpl {
    419         private BuildMessageState _state;
    420         private BuildRequestRecord _req;
    421         private Hash _nextPeer;
     332        private final BuildMessageState _state;
     333        private final BuildRequestRecord _req;
     334        private final Hash _nextPeer;
    422335        HandleReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
    423336            super(ctx);
     
    440353
    441354    private static class TimeoutReq extends JobImpl {
    442         private BuildMessageState _state;
    443         private BuildRequestRecord _req;
    444         private Hash _nextPeer;
     355        private final BuildMessageState _state;
     356        private final BuildRequestRecord _req;
     357        private final Hash _nextPeer;
    445358        TimeoutReq(RouterContext ctx, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) {
    446359            super(ctx);
     
    581494            if (from == null)
    582495                from = state.from.calculateHash();
    583             if (_throttler.shouldThrottle(from)) {
     496            if (from != null && _throttler.shouldThrottle(from)) {
    584497                if (_log.shouldLog(Log.WARN))
    585498                    _log.warn("Rejecting tunnel (hop throttle), previous hop: " + from);
     
    732645   
    733646    public int getInboundBuildQueueSize() {
    734         synchronized (_inboundBuildMessages) {
    735647            return _inboundBuildMessages.size();
    736         }
    737648    }
    738649   
     
    757668                }
    758669                BuildEndMessageState state = new BuildEndMessageState(cfg, receivedMessage);
    759                 if (HANDLE_REPLIES_INLINE) {
    760                     handleRequestAsInboundEndpoint(state);
    761                 } else {
    762                     synchronized (_inboundBuildEndMessages) {
    763                         _inboundBuildEndMessages.add(state);
    764                     }
    765                     _exec.repoll();
    766                 }
     670                handleRequestAsInboundEndpoint(state);
    767671            } else {
    768672                if (_exec.wasRecentlyBuilding(reqId)) {
     
    772676                    _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0);
    773677                } else {
    774                     synchronized (_inboundBuildMessages) {
    775                         boolean removed = false;
    776                         int dropped = 0;
    777                         for (int i = 0; i < _inboundBuildMessages.size(); i++) {
    778                             BuildMessageState cur = (BuildMessageState)_inboundBuildMessages.get(i);
    779                             long age = System.currentTimeMillis() - cur.recvTime;
    780                             if (age >= BuildRequestor.REQUEST_TIMEOUT/4) {
    781                                 _inboundBuildMessages.remove(i);
    782                                 i--;
    783                                 dropped++;
    784                                 _context.statManager().addRateData("tunnel.dropLoad", age, _inboundBuildMessages.size());
    785                             }
    786                         }
    787                         if (dropped > 0) {
     678                    int sz = _inboundBuildMessages.size();
     679                    BuildMessageState cur = _inboundBuildMessages.peek();
     680                    boolean accept = true;
     681                    if (cur != null) {
     682                        long age = System.currentTimeMillis() - cur.recvTime;
     683                        if (age >= BuildRequestor.REQUEST_TIMEOUT/4) {
     684                            _context.statManager().addRateData("tunnel.dropLoad", age, sz);
    788685                            _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
    789686                            // if the queue is backlogged, stop adding new messages
    790                             _context.statManager().addRateData("tunnel.dropLoadBacklog", _inboundBuildMessages.size(), _inboundBuildMessages.size());
     687                            _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
     688                            accept = false;
     689                        }
     690                    }
     691                    if (accept) {
     692                        int queueTime = estimateQueueTime(sz);
     693                        float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
     694                        pDrop = (float)Math.pow(pDrop, 16); // steeeep
     695                        float f = _context.random().nextFloat();
     696                        //if ( (pDrop > f) && (allowProactiveDrop()) ) {
     697                        if (pDrop > f) {
     698                            _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
     699                            _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, sz);
    791700                        } else {
    792                             int queueTime = estimateQueueTime(_inboundBuildMessages.size());
    793                             float pDrop = queueTime/((float)BuildRequestor.REQUEST_TIMEOUT*3);
    794                             pDrop = (float)Math.pow(pDrop, 16); // steeeep
    795                             float f = _context.random().nextFloat();
    796                             if ( (pDrop > f) && (allowProactiveDrop()) ) {
    797                                 _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: Queue time"));
    798                                 _context.statManager().addRateData("tunnel.dropLoadProactive", queueTime, _inboundBuildMessages.size());
     701                            accept = _inboundBuildMessages.offer(new BuildMessageState(receivedMessage, from, fromHash));
     702                            if (accept) {
     703                                // wake up the Executor to call handleInboundRequests()
     704                                _exec.repoll();
    799705                            } else {
    800                                 _inboundBuildMessages.add(new BuildMessageState(receivedMessage, from, fromHash));
     706                                _context.throttle().setTunnelStatus(_x("Dropping tunnel requests: High load"));
     707                                _context.statManager().addRateData("tunnel.dropLoadBacklog", sz, sz);
    801708                            }
    802709                        }
    803710                    }
    804                     _exec.repoll();
    805711                }
    806712            }
     
    809715    }
    810716   
     717/****
    811718    private boolean allowProactiveDrop() {
    812         String allow = _context.getProperty("router.allowProactiveDrop", "true");
    813         boolean rv = false;
    814         if ( (allow == null) || (Boolean.valueOf(allow).booleanValue()) )
    815             rv = true;
     719        boolean rv = _context.getBooleanPropertyDefaultTrue("router.allowProactiveDrop");
    816720        if (!rv)
    817721            _context.statManager().addRateData("tunnel.dropLoadProactiveAbort", 1, 0);
    818722        return rv;
    819723    }
     724****/
    820725   
    821726    private int estimateQueueTime(int numPendingMessages) {
     
    846751                _log.debug("Receive tunnel build reply message " + receivedMessage.getUniqueId() + " from "
    847752                           + (fromHash != null ? fromHash.toBase64() : from != null ? from.calculateHash().toBase64() : "a tunnel"));
    848             if (HANDLE_REPLIES_INLINE) {
    849                 handleReply(new BuildReplyMessageState(receivedMessage));
    850             } else {
    851                 synchronized (_inboundBuildReplyMessages) {
    852                     _inboundBuildReplyMessages.add(new BuildReplyMessageState(receivedMessage));
    853                 }
    854                 _exec.repoll();
    855             }
     753            handleReply(new BuildReplyMessageState(receivedMessage));
    856754            return _buildReplyMessageHandlerJob;
    857755        }
     
    860758    /** normal inbound requests from other people */
    861759    private static class BuildMessageState {
    862         TunnelBuildMessage msg;
    863         RouterIdentity from;
    864         Hash fromHash;
    865         long recvTime;
     760        final TunnelBuildMessage msg;
     761        final RouterIdentity from;
     762        final Hash fromHash;
     763        final long recvTime;
    866764        public BuildMessageState(I2NPMessage m, RouterIdentity f, Hash h) {
    867765            msg = (TunnelBuildMessage)m;
     
    873771    /** replies for outbound tunnels that we have created */
    874772    private static class BuildReplyMessageState {
    875         TunnelBuildReplyMessage msg;
    876         long recvTime;
     773        final TunnelBuildReplyMessage msg;
     774        final long recvTime;
    877775        public BuildReplyMessageState(I2NPMessage m) {
    878776            msg = (TunnelBuildReplyMessage)m;
     
    882780    /** replies for inbound tunnels we have created */
    883781    private static class BuildEndMessageState {
    884         TunnelBuildMessage msg;
    885         PooledTunnelCreatorConfig cfg;
    886         long recvTime;
     782        final TunnelBuildMessage msg;
     783        final PooledTunnelCreatorConfig cfg;
     784        final long recvTime;
    887785        public BuildEndMessageState(PooledTunnelCreatorConfig c, I2NPMessage m) {
    888786            cfg = c;
     
    892790    }
    893791
    894     // noop
     792    /** noop */
    895793    private static class TunnelBuildMessageHandlerJob extends JobImpl {
    896794        private TunnelBuildMessageHandlerJob(RouterContext ctx) { super(ctx); }
     
    898796        public String getName() { return "Receive tunnel build message"; }
    899797    }
    900     // noop
     798
     799    /** noop */
    901800    private static class TunnelBuildReplyMessageHandlerJob extends JobImpl {
    902801        private TunnelBuildReplyMessageHandlerJob(RouterContext ctx) { super(ctx); }
     
    911810     */
    912811    private static class TunnelBuildNextHopFailJob extends JobImpl {
    913         HopConfig _cfg;
     812        final HopConfig _cfg;
    914813        private TunnelBuildNextHopFailJob(RouterContext ctx, HopConfig cfg) {
    915814            super(ctx);
Note: See TracChangeset for help on using the changeset viewer.