Changeset e9d0d79


Ignore:
Timestamp:
Dec 4, 2011 7:01:52 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
3fc312a6
Parents:
69cae1a
Message:
  • Tunnel RED:
    • Complete rework of participating traffic RED. Implement an accurate bandwidth tracker in FIFOBandwidthRefiller.
    • Fix drop priority of VTBM at OBEP
    • Lower drop priority of VTBRM at IBGW
    • Raise threshold from 95% to 120%
    • Remove unused things in HopConfig?

…needs more testing…

Files:
10 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    r69cae1a re9d0d79  
     12011-12-04 zzz
     2  * Console:
     3    - Less icons on configclients.jsp
     4    - Fix some browsers breaking line on negative numbers
     5    - Tab CSS tweaks
     6  * i2psnark: Fix directories not always deleted when torrent is deleted
     7  * IRC Client: Lower log level for connect error to warn (thx echelon/sponge)
     8  * Tunnel RED:
     9    - Complete rework of participating traffic RED.
     10      Implement an accurate bandwidth tracker in FIFOBandwidthRefiller.
     11    - Fix drop priority of VTBM at OBEP
     12    - Lower drop priority of VTBRM at IBGW
     13    - Raise threshold from 95% to 120%
     14    - Remove unused things in HopConfig
     15  * UDP: Fix i2np.udp.allowLocal operation (thx Quizzers)
     16
    1172011-12-02 zzz
    218  * Console:
  • router/java/src/net/i2p/router/RouterVersion.java

    r69cae1a re9d0d79  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 14;
     21    public final static long BUILD = 15;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java

    r69cae1a re9d0d79  
    5252    /** how large _availableOutbound can get - aka our outbound rate during a burst */
    5353    private int _maxOutbound;
    54     /** shortcut of whether our outbound rate is unlimited */
     54    /** shortcut of whether our outbound rate is unlimited - UNUSED always false for now */
    5555    private boolean _outboundUnlimited;
    56     /** shortcut of whether our inbound rate is unlimited */
     56    /** shortcut of whether our inbound rate is unlimited - UNUSED always false for now */
    5757    private boolean _inboundUnlimited;
    5858    /** lifetime counter of bytes received */
     
    6060    /** lifetime counter of bytes sent */
    6161    private final AtomicLong _totalAllocatedOutboundBytes = new AtomicLong();
     62
    6263    /** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */
    63     private final AtomicLong _totalWastedInboundBytes = new AtomicLong();
     64    //private final AtomicLong _totalWastedInboundBytes = new AtomicLong();
    6465    /** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
    65     private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
     66    //private final AtomicLong _totalWastedOutboundBytes = new AtomicLong();
     67
    6668    private final FIFOBandwidthRefiller _refiller;
    6769    private final Thread _refillerThread;
     
    102104    public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes.get(); }
    103105    public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes.get(); }
    104     public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); }
    105     public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); }
     106    //public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); }
     107    //public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); }
    106108    //public long getMaxInboundBytes() { return _maxInboundBytes; }
    107109    //public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; }
    108110    //public long getMaxOutboundBytes() { return _maxOutboundBytes; }
    109111    //public void setMaxOutboundBytes(int numBytes) { _maxOutboundBytes = numBytes; }
    110     public boolean getInboundUnlimited() { return _inboundUnlimited; }
    111     public void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; }
    112     public boolean getOutboundUnlimited() { return _outboundUnlimited; }
    113     public void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; }
     112
     113    /** @deprecated unused for now, we are always limited */
     114    void setInboundUnlimited(boolean isUnlimited) { _inboundUnlimited = isUnlimited; }
     115
     116    /** @deprecated unused for now, we are always limited */
     117    void setOutboundUnlimited(boolean isUnlimited) { _outboundUnlimited = isUnlimited; }
     118
     119    /** @return smoothed one second rate */
    114120    public float getSendBps() { return _sendBps; }
     121
     122    /** @return smoothed one second rate */
    115123    public float getReceiveBps() { return _recvBps; }
     124
     125    /** @return smoothed 15 second rate */
    116126    public float getSendBps15s() { return _sendBps15s; }
     127
     128    /** @return smoothed 15 second rate */
    117129    public float getReceiveBps15s() { return _recvBps15s; }
    118130   
    119     /** These are the configured maximums, not the current rate */
     131    /** The configured maximum, not the current rate */
    120132    public int getOutboundKBytesPerSecond() { return _refiller.getOutboundKBytesPerSecond(); }
     133   
     134    /** The configured maximum, not the current rate */
    121135    public int getInboundKBytesPerSecond() { return _refiller.getInboundKBytesPerSecond(); }
     136   
     137    /** The configured maximum, not the current rate */
    122138    public int getOutboundBurstKBytesPerSecond() { return _refiller.getOutboundBurstKBytesPerSecond(); }
     139   
     140    /** The configured maximum, not the current rate */
    123141    public int getInboundBurstKBytesPerSecond() { return _refiller.getInboundBurstKBytesPerSecond(); }
    124142   
     
    147165        _unavailableInboundBurst.set(0);
    148166        _unavailableOutboundBurst.set(0);
    149         _inboundUnlimited = false;
    150         _outboundUnlimited = false;
    151     }
    152    
     167        // always limited for now
     168        //_inboundUnlimited = false;
     169        //_outboundUnlimited = false;
     170    }
     171   
     172    /**
     173     *  We sent a message.
     174     *
     175     *  @param size bytes
     176     *  @since 0.8.12
     177     */
     178    public void sentParticipatingMessage(int size) {
     179        _refiller.incrementParticipatingMessageBytes(size);
     180    }
     181
     182    /**
     183     *  Out bandwidth. Actual bandwidth, not smoothed, not bucketed.
     184     *
     185     *  @return Bps in recent period (a few seconds)
     186     *  @since 0.8.12
     187     */
     188    public int getCurrentParticipatingBandwidth() {
     189        return _refiller.getCurrentParticipatingBandwidth();
     190    }
     191
    153192    public Request createRequest() { return new SimpleRequest(); }
    154193
     
    242281     * we can
    243282     *
     283     * @param buf contains satisfied outbound requests, really just to avoid object thrash, not really used
    244284     * @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative)
    245285     * @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative)
     
    262302            _availableInbound.set(_maxInbound);
    263303            if (uib > _maxInboundBurst) {
    264                 _totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst);
     304                //_totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst);
    265305                _unavailableInboundBurst.set(_maxInboundBurst);
    266306            }
     
    293333
    294334            if (uob > _maxOutboundBurst) {
    295                 _totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst);
     335                //_totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst);
    296336                _unavailableOutboundBurst.set(_maxOutboundBurst);
    297337            }
     
    377417     * each one satisfied that the request has been granted). 
    378418     *
    379      * @param buffer returned with the satisfied outbound requests only
     419     * @param buffer Out parameter, returned with the satisfied outbound requests only
    380420     */
    381421    private final void satisfyRequests(List<Request> buffer) {
     
    386426    }
    387427   
     428    /**
     429     * @param satisfied Out parameter, returned with the satisfied requests added
     430     */
    388431    private final void satisfyInboundRequests(List<Request> satisfied) {
    389432        synchronized (_pendingInboundRequests) {
     
    530573    }
    531574   
     575    /**
     576     * @param satisfied Out parameter, returned with the satisfied requests added
     577     */
    532578    private final void satisfyOutboundRequests(List<Request> satisfied) {
    533579        synchronized (_pendingOutboundRequests) {
     
    896942        public void init(int in, int out, String target);
    897943        public void setCompleteListener(CompleteListener lsnr);
     944        /** Only supported if the request is not satisfied */
    898945        public void attach(Object obj);
    899946        public Object attachment();
     
    906953
    907954    private static final NoopRequest _noop = new NoopRequest();
     955
    908956    private static class NoopRequest implements Request {
    909         private CompleteListener _lsnr;
    910         private Object _attachment;
    911957        public void abort() {}
    912958        public boolean getAborted() { return false; }
     
    919965        public void waitForNextAllocation() {}
    920966        public void init(int in, int out, String target) {}
    921         public CompleteListener getCompleteListener() { return _lsnr; }
     967        public CompleteListener getCompleteListener() { return null; }
    922968        public void setCompleteListener(CompleteListener lsnr) {
    923             _lsnr = lsnr;
    924969            lsnr.complete(NoopRequest.this);
    925970        }
    926         public void attach(Object obj) { _attachment = obj; }
    927         public Object attachment() { return _attachment; }
     971        public void attach(Object obj) {
     972            throw new UnsupportedOperationException("Don't attach to a satisfied request");
     973        }
     974        public Object attachment() { return null; }
    928975    }
    929976}
  • router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java

    r69cae1a re9d0d79  
    33import java.util.ArrayList;
    44import java.util.List;
     5import java.util.concurrent.atomic.AtomicInteger;
    56
    67import net.i2p.I2PAppContext;
     8import net.i2p.data.DataHelper;
    79import net.i2p.util.Log;
    810
     11/**
     12 *  Thread that runs every 100 ms to "give" bandwidth to
     13 *  FIFOBandwidthLimiter.
     14 *  Instantiated by FIFOBandwidthLimiter.
     15 *
     16 *  As of 0.8.12, this also contains a counter for outbound participating bandwidth.
     17 *  This was a good place for it since we needed a 100ms thread for it.
     18 *
     19 *  Public only for the properties and defaults.
     20 */
    921public class FIFOBandwidthRefiller implements Runnable {
    1022    private final Log _log;
     
    6476    private static final long REPLENISH_FREQUENCY = 100;
    6577   
    66     public FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
     78    FIFOBandwidthRefiller(I2PAppContext context, FIFOBandwidthLimiter limiter) {
    6779        _limiter = limiter;
    6880        _context = context;
     
    7385
    7486    /** @since 0.8.8 */
    75     public void shutdown() {
     87    void shutdown() {
    7688        _isRunning = false;
    7789    }
     
    89101            }
    90102           
     103            updateParticipating(now);
    91104            boolean updated = updateQueues(buffer, now);
    92105            if (updated) {
     
    98111    }
    99112   
    100     public void reinitialize() {
     113    void reinitialize() {
    101114        _lastRefillTime = _limiter.now();
    102115        checkConfig();
     
    106119    private boolean updateQueues(List<FIFOBandwidthLimiter.Request> buffer, long now) {
    107120        long numMs = (now - _lastRefillTime);
    108         if (_log.shouldLog(Log.INFO))
    109             _log.info("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString()
     121        if (_log.shouldLog(Log.DEBUG))
     122            _log.debug("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString()
    110123                       + " rate in="
    111124                       + _inboundKBytesPerSecond + ", out="
     
    121134            if (outboundToAdd < 0) outboundToAdd = 0;
    122135
     136         /**** Always limited for now
    123137            if (_inboundKBytesPerSecond <= 0) {
    124138                _limiter.setInboundUnlimited(true);
     
    133147                _limiter.setOutboundUnlimited(false);
    134148            }
     149         ****/
    135150           
    136151            long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000;
     
    138153            _limiter.refillBandwidthQueues(buffer, inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut);
    139154           
    140             if (_log.shouldLog(Log.DEBUG)) {
    141                 _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");
    142                 _log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable");
    143             }
     155            //if (_log.shouldLog(Log.DEBUG)) {
     156            //    _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable");
     157            //    _log.debug("Adding " + outboundToAdd + " bytes to outboundAvailable");
     158            //}
    144159            return true;
    145160        } else {
     
    158173        updateOutboundPeak();
    159174       
    160         if (_inboundKBytesPerSecond <= 0) {
    161             _limiter.setInboundUnlimited(true);
    162         } else {
    163             _limiter.setInboundUnlimited(false);
    164         }
    165         if (_outboundKBytesPerSecond <= 0) {
    166             _limiter.setOutboundUnlimited(true);
    167         } else {
    168             _limiter.setOutboundUnlimited(false);
    169         }
    170 
     175        // We are always limited for now
     176        //_limiter.setInboundUnlimited(_inboundKBytesPerSecond <= 0);
     177        //_limiter.setOutboundUnlimited(_outboundKBytesPerSecond <= 0);
    171178    }
    172179   
     
    186193            _inboundKBytesPerSecond = DEFAULT_INBOUND_BANDWIDTH;
    187194    }
     195
    188196    private void updateOutboundRate() {
    189197        int out = _context.getProperty(PROP_OUTBOUND_BANDWIDTH, DEFAULT_OUTBOUND_BANDWIDTH);
     
    277285    int getOutboundBurstKBytesPerSecond() { return _outboundBurstKBytesPerSecond; }
    278286    int getInboundBurstKBytesPerSecond() { return _inboundBurstKBytesPerSecond; }
     287
     288    /**
     289     *  Participating counter stuff below here
     290     *  TOTAL_TIME needs to be high enough to get a burst without dropping
     291     *  @since 0.8.12
     292     */
     293    private static final int TOTAL_TIME = 4000;
     294    private static final int PERIODS = TOTAL_TIME / (int) REPLENISH_FREQUENCY;
     295    /** count in current 100 ms period */
     296    private final AtomicInteger _currentParticipating = new AtomicInteger();
     297    private long _lastPartUpdateTime;
     298    private int _lastTotal;
     299    /** the actual length of last total period as coalesced (nominally TOTAL_TIME) */
     300    private long _lastTotalTime;
     301    private int _lastIndex;
     302    /** buffer of count per 100 ms period, last is at _lastIndex, older at higher indexes (wraps) */
     303    private final int[] _counts = new int[PERIODS];
     304    /** the actual length of the period (nominally REPLENISH_FREQUENCY) */
     305    private final long[] _times = new long[PERIODS];
     306
     307    /**
     308     *  We sent a message.
     309     *
     310     *  @param size bytes
     311     *  @since 0.8.12
     312     */
     313    void incrementParticipatingMessageBytes(int size) {
     314        _currentParticipating.addAndGet(size);
     315    }
     316
     317    /**
     318     *  Out bandwidth. Actual bandwidth, not smoothed, not bucketed.
     319     *
     320     *  @return Bps in recent period (a few seconds)
     321     *  @since 0.8.12
     322     */
     323    synchronized int getCurrentParticipatingBandwidth() {
     324        int current = _currentParticipating.get();
     325        long totalTime = (_limiter.now() - _lastPartUpdateTime) + _lastTotalTime;
     326        if (totalTime <= 0)
     327            return 0;
     328        // 1000 for ms->seconds in denominator
     329        long bw = 1000l * (current + _lastTotal) / totalTime;
     330        if (bw > Integer.MAX_VALUE)
     331            return 0;
     332        return (int) bw;
     333    }
     334
     335    /**
     336     *  Run once every 100 ms
     337     *
     338     *  @since 0.8.12
     339     */
     340    private synchronized void updateParticipating(long now) {
     341        long elapsed = now - _lastPartUpdateTime;
     342        if (elapsed <= 0) {
     343            // glitch in the matrix
     344            _lastPartUpdateTime = now;
     345            return;
     346        }
     347        _lastPartUpdateTime = now;
     348        if (--_lastIndex < 0)
     349            _lastIndex = PERIODS - 1;
     350        _counts[_lastIndex] = _currentParticipating.getAndSet(0);
     351        _times[_lastIndex] = elapsed;
     352        _lastTotal = 0;
     353        _lastTotalTime = 0;
     354        // add up total counts and times
     355        for (int i = 0; i < PERIODS; i++) {
     356            int idx = (_lastIndex + i) % PERIODS;
     357             _lastTotal += _counts[idx];
     358             _lastTotalTime += _times[idx];
     359             if (_lastTotalTime >= TOTAL_TIME)
     360                 break;
     361        }
     362        if (_lastIndex == 0 && _lastTotalTime > 0) {
     363            long bw = 1000l * _lastTotal / _lastTotalTime;
     364            _context.statManager().addRateData("tunnel.participatingBandwidthOut", bw);
     365            if (_lastTotal > 0 && _log.shouldLog(Log.INFO))
     366                _log.info(DataHelper.formatSize(_lastTotal) + " bytes out part. tunnels in last " + _lastTotalTime + " ms: " +
     367                          DataHelper.formatSize(bw) + " Bps");
     368        }
     369    }
    279370}
  • router/java/src/net/i2p/router/tunnel/HopConfig.java

    r69cae1a re9d0d79  
    2626    private long _creation;
    2727    private long _expiration;
    28     private Map _options;
     28    //private Map _options;
     29
    2930    // these 4 were longs, let's save some space
    3031    // 2 billion * 1KB / 10 minutes = 3 GBps in a single tunnel
    3132    private int _messagesProcessed;
    3233    private int _oldMessagesProcessed;
    33     private int _messagesSent;
    34     private int _oldMessagesSent;
     34    //private int _messagesSent;
     35    //private int _oldMessagesSent;
    3536   
    3637    /** IV length for {@link #getReplyIV} */
     
    4950        return _receiveTunnel;
    5051    }
     52
    5153    public void setReceiveTunnelId(byte id[]) { _receiveTunnelId = id; }
    5254    public void setReceiveTunnelId(TunnelId id) { _receiveTunnelId = DataHelper.toLong(4, id.getTunnelId()); }
     
    107109     *
    108110     */
    109     public Map getOptions() { return _options; }
    110     public void setOptions(Map options) { _options = options; }
     111    //public Map getOptions() { return _options; }
     112    //public void setOptions(Map options) { _options = options; }
    111113   
    112     /** take note of a message being pumped through this tunnel */
    113     /** "processed" is for incoming and "sent" is for outgoing (could be dropped in between) */
     114    /**
     115     *  Take note of a message being pumped through this tunnel.
     116     *  "processed" is for incoming and "sent" is for outgoing (could be dropped in between)
     117     */
    114118    public void incrementProcessedMessages() { _messagesProcessed++; }
    115119
     
    122126    }
    123127
     128    /**
     129     *  Take note of a message being pumped through this tunnel.
     130     *  "processed" is for incoming and "sent" is for outgoing (could be dropped in between)
     131     */
     132  /****
    124133    public void incrementSentMessages() { _messagesSent++; }
    125134
     
    131140        return rv;
    132141    }
     142  ****/
    133143   
     144    /** */
    134145    @Override
    135146    public String toString() {
  • router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java

    r69cae1a re9d0d79  
    4949        //if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW", encrypted.length))
    5050        //    return -1;
    51         _config.incrementSentMessages();
     51        //_config.incrementSentMessages();
     52        _context.bandwidthLimiter().sentParticipatingMessage(1024);
    5253        TunnelDataMessage msg = new TunnelDataMessage(_context);
    5354        msg.setData(encrypted);
  • router/java/src/net/i2p/router/tunnel/OutboundTunnelEndpoint.java

    r69cae1a re9d0d79  
    5151                           + (toRouter != null ? toRouter.toBase64().substring(0,4) : "")
    5252                           + (toTunnel != null ? ":" + toTunnel.getTunnelId() : ""));
     53            int size = msg.getMessageSize();
    5354            // don't drop it if we are the target
    54             if ((!_context.routerHash().equals(toRouter)) &&
    55                 _context.tunnelDispatcher().shouldDropParticipatingMessage("OBEP " + msg.getType(), msg.getMessageSize()))
     55            boolean toUs = _context.routerHash().equals(toRouter);
     56            if ((!toUs) &&
     57                _context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.OBEP, msg.getType(), size))
    5658                return;
    57             _config.incrementSentMessages();
     59            // this overstates the stat somewhat, but ok for now
     60            //int kb = (size + 1023) / 1024;
     61            //for (int i = 0; i < kb; i++)
     62            //    _config.incrementSentMessages();
     63            if (!toUs)
     64                _context.bandwidthLimiter().sentParticipatingMessage(size);
    5865            _outDistributor.distribute(msg, toRouter, toTunnel);
    5966        }
  • router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java

    r69cae1a re9d0d79  
    11package net.i2p.router.tunnel;
    2 
    3 import java.util.ArrayList;
    4 import java.util.List;
    52
    63import net.i2p.data.Hash;
     
    3936        // We assume that it's the outbound bandwidth that is the issue...
    4037        int size = Math.max(msg.getMessageSize(), 1024/2);
    41         if (_context.tunnelDispatcher().shouldDropParticipatingMessage("IBGW " + msg.getType(), size)) {
     38        if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.IBGW, msg.getType(), size)) {
    4239            // this overstates the stat somewhat, but ok for now
    4340            int kb = (size + 1023) / 1024;
     
    4643            return;
    4744        }
    48         super.add(msg, toRouter,toTunnel);
     45        super.add(msg, toRouter, toTunnel);
    4946    }
    5047}
  • router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java

    r69cae1a re9d0d79  
    1313import net.i2p.data.TunnelId;
    1414import net.i2p.data.i2np.I2NPMessage;
     15import net.i2p.data.i2np.TunnelBuildMessage;
     16import net.i2p.data.i2np.TunnelBuildReplyMessage;
    1517import net.i2p.data.i2np.TunnelDataMessage;
    1618import net.i2p.data.i2np.TunnelGatewayMessage;
     19import net.i2p.data.i2np.VariableTunnelBuildMessage;
     20import net.i2p.data.i2np.VariableTunnelBuildReplyMessage;
    1721import net.i2p.router.JobImpl;
    1822import net.i2p.router.Router;
     
    4246    private final LeaveTunnel _leaveJob;
    4347    /** what is the date/time we last deliberately dropped a tunnel? **/
    44     private long _lastDropTime;
     48    //private long _lastDropTime;
    4549    private final TunnelGatewayPumper _pumper;
     50
     51    /** for shouldDropParticipatingMessage() */
     52    enum Location {OBEP, PARTICIPANT, IBGW}
    4653
    4754    private static final long[] RATES = { 10*60*1000l, 60*60*1000l, 3*60*60*1000l, 24*60*60*1000 };
     
    199206            TunnelId outId = cfg.getConfig(0).getSendTunnel();
    200207            _outboundGateways.put(outId, gw);
    201             _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0);
     208            _context.statManager().addRateData("tunnel.joinOutboundGateway", 1);
    202209            _context.messageHistory().tunnelJoined("outbound", cfg);
    203210        } else {
     
    205212            TunnelId outId = cfg.getConfig(0).getSendTunnel();
    206213            _outboundGateways.put(outId, gw);
    207             _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0);
     214            _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1);
    208215            _context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
    209216        }
     
    221228            TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
    222229            _participants.put(recvId, participant);
    223             _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0);
     230            _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1);
    224231            _context.messageHistory().tunnelJoined("inboundEndpoint", cfg);
    225232        } else {
     
    227234            TunnelId recvId = cfg.getConfig(0).getReceiveTunnel();
    228235            _inboundGateways.put(recvId, gw);
    229             _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0);
     236            _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1);
    230237            _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg);
    231238        }
     
    244251        _participatingConfig.put(recvId, cfg);
    245252        _context.messageHistory().tunnelJoined("participant", cfg);
    246         _context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
     253        _context.statManager().addRateData("tunnel.joinParticipant", 1);
    247254        if (cfg.getExpiration() > _lastParticipatingExpiration)
    248255            _lastParticipatingExpiration = cfg.getExpiration();
     
    262269        _participatingConfig.put(recvId, cfg);
    263270        _context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
    264         _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
     271        _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1);
    265272
    266273        if (cfg.getExpiration() > _lastParticipatingExpiration)
     
    285292        _participatingConfig.put(recvId, cfg);
    286293        _context.messageHistory().tunnelJoined("inboundGateway", cfg);
    287         _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
     294        _context.statManager().addRateData("tunnel.joinInboundGateway", 1);
    288295
    289296        if (cfg.getExpiration() > _lastParticipatingExpiration)
     
    389396            _context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getTunnelId(), "participant");
    390397            participant.dispatch(msg, recvFrom);
    391             _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
     398            _context.statManager().addRateData("tunnel.dispatchParticipant", 1);
    392399        } else {
    393400            OutboundTunnelEndpoint endpoint = _outboundEndpoints.get(msg.getTunnelIdObj());
     
    400407                endpoint.dispatch(msg, recvFrom);
    401408               
    402                 _context.statManager().addRateData("tunnel.dispatchEndpoint", 1, 0);
     409                _context.statManager().addRateData("tunnel.dispatchEndpoint", 1);
    403410            } else {
    404411                _context.messageHistory().droppedTunnelDataMessageUnknown(msg.getUniqueId(), msg.getTunnelId());
     
    447454            _context.messageHistory().tunnelDispatched(msg.getUniqueId(), msg.getMessage().getUniqueId(), msg.getTunnelId().getTunnelId(), "inbound gateway");
    448455            gw.add(msg);
    449             _context.statManager().addRateData("tunnel.dispatchInbound", 1, 0);
     456            _context.statManager().addRateData("tunnel.dispatchInbound", 1);
    450457        } else {
    451458            _context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), msg.getTunnelId().getTunnelId());
     
    482489        dispatchOutbound(msg, outboundTunnel, null, targetPeer);
    483490    }
     491
    484492    /**
    485493     * We are the outbound tunnel gateway (we created it), so wrap up this message
     
    524532            gw.add(msg, targetPeer, targetTunnel);
    525533            if (targetTunnel == null)
    526                 _context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1, 0);
     534                _context.statManager().addRateData("tunnel.dispatchOutboundPeer", 1);
    527535            else
    528                 _context.statManager().addRateData("tunnel.dispatchOutboundTunnel", 1, 0);
     536                _context.statManager().addRateData("tunnel.dispatchOutboundTunnel", 1);
    529537        } else {
    530538            _context.messageHistory().droppedTunnelGatewayMessageUnknown(msg.getUniqueId(), outboundTunnel.getTunnelId());
     
    562570     */
    563571    public void updateParticipatingStats(int ms) {
    564         List<HopConfig> participating = listParticipatingTunnels();
    565         int size = participating.size();
    566572        long count = 0;
    567573        long bw = 0;
    568         long bwOut = 0;
     574        //long bwOut = 0;
    569575        long tcount = 0;
    570576        long tooYoung = _context.clock().now() - 60*1000;
    571577        long tooOld = tooYoung - 9*60*1000;
    572         for (int i = 0; i < size; i++) {
    573             HopConfig cfg = participating.get(i);
    574             // rare NPE seen here, guess CHS.values() isn't atomic?
    575             if (cfg == null)
    576                 continue;
     578        for (HopConfig cfg : _participatingConfig.values()) {
    577579            long c = cfg.getRecentMessagesCount();
    578580            bw += c;
    579             bwOut += cfg.getRecentSentMessagesCount();
     581            //bwOut += cfg.getRecentSentMessagesCount();
    580582            long created = cfg.getCreation();
    581583            if (created > tooYoung || created < tooOld)
     
    588590        _context.statManager().addRateData("tunnel.participatingMessageCount", count, ms);
    589591        _context.statManager().addRateData("tunnel.participatingBandwidth", bw*1024/(ms/1000), ms);
    590         _context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/(ms/1000), ms);
    591         _context.statManager().addRateData("tunnel.participatingTunnels", size, 0);
     592        // moved to FIFOBandwidthRefiller
     593        //_context.statManager().addRateData("tunnel.participatingBandwidthOut", bwOut*1024/(ms/1000), ms);
     594        _context.statManager().addRateData("tunnel.participatingTunnels", tcount);
    592595    }
    593596
     
    610613     * (a plain participant could be earlier or later, but on average is later)
    611614     *
    612      * @param type message hop location and type
     615     * @param loc message hop location
     616     * @param type I2NP message type
    613617     * @param length the length of the message
    614618     */
    615     public boolean shouldDropParticipatingMessage(String type, int length) {
     619    public boolean shouldDropParticipatingMessage(Location loc, int type, int length) {
    616620        if (length <= 0)
    617621            return false;
     622     /****
     623        Don't use the tunnel.participatingBandwidth stat any more. It could be up to 3 minutes old.
     624        Also, it counts inbound bandwidth, i.e. before dropping, which resulted in too many drops
     625        during a burst.
     626        We now use the bandwidth limiter to track outbound participating bandwidth
     627        over the last few seconds.
     628     ****/
     629
     630     /****
    618631        RateStat rs = _context.statManager().getRate("tunnel.participatingBandwidth");
    619632        if (rs == null)
     
    631644
    632645        int usedIn = Math.min(_context.router().get1sRateIn(), _context.router().get15sRateIn());
    633         usedIn = Math.min(usedIn, bw);
     646        if (bw < usedIn)
     647            usedIn = bw;
    634648        if (usedIn <= 0)
    635649            return false;
    636650        int usedOut = Math.min(_context.router().get1sRate(true), _context.router().get15sRate(true));
    637         usedOut = Math.min(usedOut, bw);
     651        if (bw < usedOut)
     652            usedOut = bw;
    638653        if (usedOut <= 0)
    639654            return false;
    640655        int used = Math.min(usedIn, usedOut);
     656     ****/
     657        int used = _context.bandwidthLimiter().getCurrentParticipatingBandwidth();
     658
    641659        int maxKBps = Math.min(_context.bandwidthLimiter().getInboundKBytesPerSecond(),
    642660                               _context.bandwidthLimiter().getOutboundKBytesPerSecond());
    643661        float share = (float) _context.router().getSharePercentage();
    644662
    645         // start dropping at 95% of the limit
    646         float maxBps = maxKBps * share * 1024f * 0.95f;
     663        // start dropping at 120% of the limit,
     664        // as we rely on Throttle for long-term bandwidth control by rejecting tunnels
     665        float maxBps = maxKBps * share * (1024f * 1.20f);
    647666        float pctDrop = (used - maxBps) / used;
    648667        if (pctDrop <= 0)
    649668            return false;
    650669        // increase the drop probability for OBEP,
    651         // (except lower it for tunnel build messages (type 21)),
     670        // (except lower it for tunnel build messages type 21/22/23/24),
    652671        // and lower it for IBGW, for network efficiency
    653672        double len = length;
    654         if (type.startsWith("OBEP")) {
    655             if (type.equals("OBEP 21"))
     673        if (loc == Location.OBEP) {
     674            // we don't need to check for VTBRM/TBRM as that happens at tunnel creation
     675            if (type == VariableTunnelBuildMessage.MESSAGE_TYPE || type == TunnelBuildMessage.MESSAGE_TYPE)
    656676                len /= 1.5;
    657677            else
    658678                len *= 1.5;
    659         } else if (type.startsWith("IBGW")) {
    660             len /= 1.5;
     679        } else if (loc == Location.IBGW) {
     680            // we don't need to check for VTBM/TBM as that happens at tunnel creation
     681            if (type == VariableTunnelBuildReplyMessage.MESSAGE_TYPE || type == TunnelBuildReplyMessage.MESSAGE_TYPE)
     682                len /= 1.5 * 1.5 * 1.5;
     683            else
     684                len /= 1.5;
    661685        }
    662686        // drop in proportion to size w.r.t. a standard 1024-byte message
     
    672696                _log.warn("Drop part. msg. avail/max/used " + availBps + "/" + (int) maxBps + "/"
    673697                          + used + " %Drop = " + pctDrop
    674                           + ' ' + type + ' ' + length);
     698                          + ' ' + loc + ' ' + type + ' ' + length);
    675699            }
    676             _context.statManager().addRateData("tunnel.participatingMessageDropped", 1, 0);
     700            _context.statManager().addRateData("tunnel.participatingMessageDropped", 1);
    677701        }
    678702        return reject;
  • router/java/src/net/i2p/router/tunnel/TunnelParticipant.java

    r69cae1a re9d0d79  
    182182
    183183    private void send(HopConfig config, TunnelDataMessage msg, RouterInfo ri) {
    184         if (_context.tunnelDispatcher().shouldDropParticipatingMessage("TDM", 1024))
     184        if (_context.tunnelDispatcher().shouldDropParticipatingMessage(TunnelDispatcher.Location.PARTICIPANT,
     185                                                                       TunnelDataMessage.MESSAGE_TYPE, 1024))
    185186            return;
    186         _config.incrementSentMessages();
     187        //_config.incrementSentMessages();
     188        _context.bandwidthLimiter().sentParticipatingMessage(1024);
    187189        long oldId = msg.getUniqueId();
    188190        long newId = _context.random().nextLong(I2NPMessage.MAX_ID_VALUE);
Note: See TracChangeset for help on using the changeset viewer.