Changeset 4d1ea6e


Ignore:
Timestamp:
Oct 3, 2012 7:05:56 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
15a47b5
Parents:
13ef00c
Message:
  • SSU:
    • Increase max outbound establishments based on bandwidth
    • Synchronization fix for Java 5
    • Use multiple buffer sizes in OutboundMessageState? to reduce memory usage
    • Adjust skew calculation, synchronize too
    • Ping loop improvements
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    r13ef00c r4d1ea6e  
     12012-10-03 zzz
     2 * NTCP: Reduce conLock contention
     3 * SSU:
     4   - Increase max outbound establishments based on bandwidth
     5   - Synchronization fix for Java 5
     6   - Use multiple buffer sizes in OutboundMessageState to
     7     reduce memory usage
     8   - Adjust skew calculation, synchronize too
     9   - Ping loop improvements
     10
    1112012-10-02 zzz
    212 * I2CP: Delay after sending disconnect message to
  • router/java/src/net/i2p/router/RouterVersion.java

    r13ef00c r4d1ea6e  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 5;
     21    public final static long BUILD = 6;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    r13ef00c r4d1ea6e  
    8787   
    8888    /** max outbound in progress - max inbound is half of this */
    89     private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 30;
     89    private final int DEFAULT_MAX_CONCURRENT_ESTABLISH;
     90    private static final int DEFAULT_LOW_MAX_CONCURRENT_ESTABLISH = 20;
     91    private static final int DEFAULT_HIGH_MAX_CONCURRENT_ESTABLISH = 150;
    9092    private static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
    9193
     
    133135        _outboundByHash = new ConcurrentHashMap();
    134136        _activityLock = new Object();
     137        DEFAULT_MAX_CONCURRENT_ESTABLISH = Math.max(DEFAULT_LOW_MAX_CONCURRENT_ESTABLISH,
     138                                                    Math.min(DEFAULT_HIGH_MAX_CONCURRENT_ESTABLISH,
     139                                                             ctx.bandwidthLimiter().getOutboundKBytesPerSecond() / 2));
    135140        _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
    136141        _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES);
     
    574579                }
    575580               
     581        if (_outboundStates.size() < getMaxConcurrentEstablish() && !_queuedOutbound.isEmpty()) {
     582            // in theory shouldn't need locking, but
     583            // getting IllegalStateExceptions on old Java 5,
     584            // which hoses this state.
     585            synchronized(_queuedOutbound) {
    576586                locked_admitQueued();
     587            }
     588        }
    577589            //remaining = _queuedOutbound.size();
    578590
     
    601613
    602614            Map.Entry<RemoteHostId, List<OutNetMessage>> entry = iter.next();
     615            // java 5 IllegalStateException here
    603616            iter.remove();
    604617            RemoteHostId to = entry.getKey();
     
    710723        // SimpleTimer.getInstance().addEvent(new PublishToNewInbound(peer), 10*1000);
    711724        if (_log.shouldLog(Log.INFO))
    712             _log.info("Completing to the peer after confirm: " + peer);
     725            _log.info("Completing to the peer after IB confirm: " + peer);
    713726        DeliveryStatusMessage dsm = new DeliveryStatusMessage(_context);
    714727        dsm.setArrival(Router.NETWORK_ID); // overloaded, sure, but future versions can check this
  • router/java/src/net/i2p/router/transport/udp/MessageReceiver.java

    r13ef00c r4d1ea6e  
    6262        _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
    6363        //_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
    64         _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
     64        //_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
    6565        //_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
    6666        //_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
    67         _context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
     67        //_context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
    6868       
    6969        _alive = true;
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    r13ef00c r4d1ea6e  
    4343   
    4444    public static final int MAX_MSG_SIZE = 32 * 1024;
    45     /** is this enough for a high-bandwidth router? */
    46     private static final int MAX_ENTRIES = 64;
    47     /** would two caches, one for small and one for large messages, be better? */
    48     private static final ByteCache _cache = ByteCache.getInstance(MAX_ENTRIES, MAX_MSG_SIZE);
     45    private static final int CACHE4_BYTES = MAX_MSG_SIZE;
     46    private static final int CACHE3_BYTES = CACHE4_BYTES / 4;
     47    private static final int CACHE2_BYTES = CACHE3_BYTES / 4;
     48    private static final int CACHE1_BYTES = CACHE2_BYTES / 4;
     49
     50    private static final int CACHE1_MAX = 256;
     51    private static final int CACHE2_MAX = CACHE1_MAX / 4;
     52    private static final int CACHE3_MAX = CACHE2_MAX / 4;
     53    private static final int CACHE4_MAX = CACHE3_MAX / 4;
     54
     55    private static final ByteCache _cache1 = ByteCache.getInstance(CACHE1_MAX, CACHE1_BYTES);
     56    private static final ByteCache _cache2 = ByteCache.getInstance(CACHE2_MAX, CACHE2_BYTES);
     57    private static final ByteCache _cache3 = ByteCache.getInstance(CACHE3_MAX, CACHE3_BYTES);
     58    private static final ByteCache _cache4 = ByteCache.getInstance(CACHE4_MAX, CACHE4_BYTES);
    4959
    5060    private static final long EXPIRATION = 10*1000;
     
    7383     *  TODO make two constructors, remove this, and make more things final
    7484     *  @return success
     85     *  @throws IAE if too big
    7586     */
    7687    public boolean initialize(I2NPMessage msg, PeerState peer) {
     
    92103     *  TODO make two constructors, remove this, and make more things final
    93104     *  @return success
     105     *  @throws IAE if too big
    94106     */
    95107    public boolean initialize(OutNetMessage m, I2NPMessage msg) {
     
    111123     *  @param m null if msg is "injected"
    112124     *  @return success
     125     *  @throws IAE if too big
    113126     */
    114127    private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
    115128        _message = m;
    116129        _peer = peer;
    117         if (_messageBuf != null) {
    118             _cache.release(_messageBuf);
    119             _messageBuf = null;
    120         }
    121 
    122         _messageBuf = _cache.acquire();
    123130        int size = msg.getRawMessageSize();
    124         if (size > _messageBuf.getData().length)
    125             throw new IllegalArgumentException("Size too large!  " + size + " in " + msg);
     131        acquireBuf(size);
    126132        try {
    127133            int len = msg.toRawByteArray(_messageBuf.getData());
     
    138144            return true;
    139145        } catch (IllegalStateException ise) {
    140             _cache.release(_messageBuf);
    141             _messageBuf = null;
    142             _released = true;
    143             return false;
    144         }
    145     }
    146    
     146            releaseBuf();
     147            return false;
     148        }
     149    }
     150   
     151    /**
     152     *  @throws IAE if too big
     153     *  @since 0.9.3
     154     */
     155    private void acquireBuf(int size) {
     156        if (_messageBuf != null)
     157            releaseBuf();
     158        if (size <= CACHE1_BYTES)
     159            _messageBuf =  _cache1.acquire();
     160        else if (size <= CACHE2_BYTES)
     161            _messageBuf = _cache2.acquire();
     162        else if (size <= CACHE3_BYTES)
     163            _messageBuf = _cache3.acquire();
     164        else if (size <= CACHE4_BYTES)
     165            _messageBuf = _cache4.acquire();
     166        else
     167            throw new IllegalArgumentException("Size too large! " + size);
     168    }
     169   
     170    /**
     171     *  @since 0.9.3
     172     */
     173    private void releaseBuf() {
     174        if (_messageBuf == null)
     175            return;
     176        int size = _messageBuf.getData().length;
     177        if (size == CACHE1_BYTES)
     178            _cache1.release(_messageBuf);
     179        else if (size == CACHE2_BYTES)
     180            _cache2.release(_messageBuf);
     181        else if (size == CACHE3_BYTES)
     182            _cache3.release(_messageBuf);
     183        else if (size == CACHE4_BYTES)
     184            _cache4.release(_messageBuf);
     185        _messageBuf = null;
     186        _released = true;
     187    }
     188
    147189    /**
    148190     *  This is synchronized with writeFragment(),
     
    152194    public synchronized void releaseResources() {
    153195        if (_messageBuf != null && !_released) {
    154             _cache.release(_messageBuf);
    155             _released = true;
     196            releaseBuf();
    156197            if (_log.shouldLog(Log.WARN))
    157198                _releasedBy = new Exception ("Released on " + new Date() + " by:");
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r13ef00c r4d1ea6e  
    7373     */
    7474    private long _clockSkew;
     75    private final Object _clockSkewLock = new Object();
    7576
    7677    /** what is the current receive second, for congestion control? */
     
    8081    /** when did we last send them a message that was ACKed */
    8182    private long _lastSendFullyTime;
     83    /** when did we last send them a ping? */
     84    private long _lastPingTime;
    8285    /** when did we last receive a packet from them? */
    8386    private long _lastReceiveTime;
     
    290293    public static final int INIT_RTT = INIT_RTO / 2;
    291294    private static final int MAX_RTO = 15*1000;
     295    private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3;
    292296   
    293297    public PeerState(RouterContext ctx, UDPTransport transport,
     
    523527     */
    524528    public void adjustClockSkew(long skew) {
    525         _clockSkew = (long) (0.9*_clockSkew + 0.1*(skew - (_rtt / 2)));
     529        // the real one-way delay is much less than RTT / 2, due to ack delays,
     530        // so add a fudge factor
     531        double adj = 0.1 * (skew + CLOCK_SKEW_FUDGE - (_rtt / 2));
     532        synchronized(_clockSkewLock) {
     533            _clockSkew = (long) (0.9*_clockSkew + adj);
     534        }
    526535    }
    527536
     
    532541    /** when did we last receive a packet from them? */
    533542    public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
     543
     544    /**
     545     *  Note ping sent. Does not update last send time.
     546     *  @since 0.9.3
     547     */
     548    public void setLastPingTime(long when) { _lastPingTime = when; }
     549
     550    /**
     551     *  Latest of last sent and last ping
     552     *  @since 0.9.3
     553     */
     554    public long getLastSendOrPingTime() { return Math.max(_lastSendTime, _lastPingTime); }
     555
    534556    /** return the smoothed send transfer rate */
    535557    public int getSendBps() { return _sendBps; }
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    r13ef00c r4d1ea6e  
    24412441        private final List<PeerState> _expireBuffer;
    24422442        private volatile boolean _alive;
     2443        private int _runCount;
     2444        // we've seen firewalls change ports after 40 seconds
     2445        private static final long PING_FIREWALL_TIME = 30*1000;
     2446        private static final long PING_FIREWALL_CUTOFF = PING_FIREWALL_TIME / 2;
     2447        // ping 1/4 of the peers every loop
     2448        private static final int SLICES = 4;
     2449        private static final long SHORT_LOOP_TIME = PING_FIREWALL_CUTOFF / (SLICES + 1);
     2450        private static final long LONG_LOOP_TIME = 25*1000;
    24432451
    24442452        public ExpirePeerEvent() {
     
    24582466            long longInactivityCutoff = now - EXPIRE_TIMEOUT;
    24592467            long pingCutoff = now - (2 * 60*60*1000);
    2460             long pingFirewallCutoff = now - (60 * 1000);
     2468            long pingFirewallCutoff = now - PING_FIREWALL_CUTOFF;
    24612469            boolean shouldPingFirewall = _reachabilityStatus != CommSystemFacade.STATUS_OK;
    24622470            boolean pingOneOnly = shouldPingFirewall && _externalListenPort == _endpoint.getListenPort();
     2471            boolean shortLoop = shouldPingFirewall;
    24632472            _expireBuffer.clear();
     2473            _runCount++;
    24642474
    24652475                for (Iterator<PeerState> iter = _expirePeers.iterator(); iter.hasNext(); ) {
     
    24752485                        iter.remove();
    24762486                    } else if (shouldPingFirewall &&
    2477                                peer.getLastSendTime() < pingFirewallCutoff &&
     2487                               ((_runCount ^ peer.hashCode()) & (SLICES - 1)) == 0 &&
     2488                               peer.getLastSendOrPingTime() < pingFirewallCutoff &&
    24782489                               peer.getLastReceiveTime() < pingFirewallCutoff) {
    24792490                        // ping if firewall is mapping the port to keep port the same...
     
    24862497                        //peer.setLastSendTime(now);
    24872498                        send(_destroyBuilder.buildPing(peer));
     2499                        peer.setLastPingTime(now);
    24882500                        // If external port is different, it may be changing the port for every
    24892501                        // session, so ping all of them. Otherwise only one.
     
    25002512
    25012513            if (_alive)
    2502                 schedule(30*1000);
     2514                schedule(shortLoop ? SHORT_LOOP_TIME : LONG_LOOP_TIME);
    25032515        }
    25042516
     
    25142526            _alive = isAlive;
    25152527            if (isAlive) {
    2516                 reschedule(30*1000);
     2528                reschedule(LONG_LOOP_TIME);
    25172529            } else {
    25182530                cancel();
Note: See TracChangeset for help on using the changeset viewer.