Changeset 2b841ad


Ignore:
Timestamp:
Nov 23, 2005 4:04:52 PM (15 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
dbb4b3d
Parents:
5e094b4
git-author:
jrandom <jrandom> (11/23/05 16:04:52)
git-committer:
zzz <zzz@…> (11/23/05 16:04:52)
Message:

2005-11-23 jrandom

  • Removed spurious streaming lib RTO increase (it wasn't helpful)
  • Streamlined the tunnel batching to schedule batch transmissions more appropriately.
  • Default tunnel pool variance to 2 +0-1 hops
Files:
1 added
9 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/Connection.java

    r5e094b4 r2b841ad  
    7373    private long _lifetimeDupMessageReceived;
    7474   
    75     public static final long MAX_RESEND_DELAY = 15*1000;
     75    public static final long MAX_RESEND_DELAY = 10*1000;
    7676    public static final long MIN_RESEND_DELAY = 2*1000;
    7777
     
    993993                       
    994994                        // setRTT has its own ceiling
    995                         getOptions().setRTT(getOptions().getRTT() + 10*1000);
     995                        //getOptions().setRTT(getOptions().getRTT() + 10*1000);
    996996                        getOptions().setWindowSize(newWindowSize);
    997997
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java

    r5e094b4 r2b841ad  
    8181                // requested choke
    8282                choke = true;
    83                 con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
     83                //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
    8484            }
    8585        }
     
    273273
    274274            // setRTT has its own ceiling
    275             con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
     275            //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
    276276            con.getOptions().setWindowSize(oldSize);
    277277
  • history.txt

    r5e094b4 r2b841ad  
    1 $Id: history.txt,v 1.325 2005/11/19 23:42:17 jrandom Exp $
     1$Id: history.txt,v 1.326 2005/11/21 09:37:10 jrandom Exp $
     2
     32005-11-23  jrandom
     4    * Removed spurious streaming lib RTO increase (it wasn't helpful)
     5    * Streamlined the tunnel batching to schedule batch transmissions more
     6      appropriately.
     7    * Default tunnel pool variance to 2 +0-1 hops
    28
    392005-11-21  jrandom
  • router/java/src/net/i2p/router/RouterVersion.java

    r5e094b4 r2b841ad  
    1616 */
    1717public class RouterVersion {
    18     public final static String ID = "$Revision: 1.293 $ $Date: 2005/11/19 23:42:17 $";
     18    public final static String ID = "$Revision: 1.294 $ $Date: 2005/11/21 09:37:09 $";
    1919    public final static String VERSION = "0.6.1.5";
    20     public final static long BUILD = 4;
     20    public final static long BUILD = 5;
    2121    public static void main(String args[]) {
    2222        System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
  • router/java/src/net/i2p/router/TunnelPoolSettings.java

    r5e094b4 r2b841ad  
    4545    public static final int     DEFAULT_DURATION = 10*60*1000;
    4646    public static final int     DEFAULT_LENGTH = 2;
    47     public static final int     DEFAULT_LENGTH_VARIANCE = -1;
     47    public static final int     DEFAULT_LENGTH_VARIANCE = 1;
    4848    public static final boolean DEFAULT_ALLOW_ZERO_HOP = true;
    4949   
  • router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java

    r5e094b4 r2b841ad  
    2020    private Log _log;
    2121    private long _pendingSince;
    22    
    23     public BatchedPreprocessor(I2PAppContext ctx) {
     22    private String _name;
     23   
     24    public BatchedPreprocessor(I2PAppContext ctx, String name) {
    2425        super(ctx);
    2526        _log = ctx.logManager().getLog(BatchedPreprocessor.class);
     27        _name = name;
    2628        _pendingSince = 0;
    2729        ctx.statManager().createRateStat("tunnel.batchMultipleCount", "How many messages are batched into a tunnel message", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    2830        ctx.statManager().createRateStat("tunnel.batchDelay", "How many messages were pending when the batching waited", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    2931        ctx.statManager().createRateStat("tunnel.batchDelaySent", "How many messages were flushed when the batching delay completed", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
     32        ctx.statManager().createRateStat("tunnel.batchCount", "How many groups of messages were flushed together", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
     33        ctx.statManager().createRateStat("tunnel.batchDelayAmount", "How long we should wait before flushing the batch", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
     34        ctx.statManager().createRateStat("tunnel.batchFlushRemaining", "How many messages remain after flushing", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
     35        ctx.statManager().createRateStat("tunnel.writeDelay", "How long after a message reaches the gateway is it processed (lifetime is size)", "Tunnels", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
    3036    }
    3137   
     
    3844   
    3945    /* not final or private so the test code can adjust */
    40     static long DEFAULT_DELAY = 500;
     46    static long DEFAULT_DELAY = 100;
    4147    /** wait up to 2 seconds before sending a small message */
    4248    protected long getSendDelay() { return DEFAULT_DELAY; }
     49   
     50    /** if we have 50 messages queued that are too small, flush them anyway */
     51    private static final int FORCE_BATCH_FLUSH = 50;
     52   
     53    /** how long do we want to wait before flushing */
     54    public long getDelayAmount() { return getDelayAmount(true); }
     55    private long getDelayAmount(boolean shouldStat) {
     56        long rv = -1;
     57        long defaultAmount = getSendDelay();
     58        if (_pendingSince > 0)
     59            rv = _pendingSince + defaultAmount - _context.clock().now();
     60        if (rv > defaultAmount)
     61            rv = defaultAmount;
     62        if (shouldStat)
     63            _context.statManager().addRateData("tunnel.batchDelayAmount", rv, 0);
     64        return rv;
     65    }
    4366   
    4467    public boolean preprocessQueue(List pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    4568        if (_log.shouldLog(Log.DEBUG))
    4669            _log.debug("Preprocess queue with " + pending.size() + " to send");
    47        
     70
     71        if (false) {
    4872        if (DISABLE_BATCHING || getSendDelay() <= 0) {
    4973            if (_log.shouldLog(Log.INFO))
     
    6084            return false;
    6185        }
    62 
     86        }
     87
     88        int batchCount = 0;
     89        int beforeLooping = pending.size();
     90       
    6391        while (pending.size() > 0) {
    6492            int allocated = 0;
     
    79107                            _log.debug("Pushback of " + curWanted + " (message " + (i+1) + ")");
    80108                    }
    81                     if (_pendingSince > 0)
    82                         _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), 0);
     109                    if (_pendingSince > 0) {
     110                        long waited = _context.clock().now() - _pendingSince;
     111                        _context.statManager().addRateData("tunnel.batchDelaySent", pending.size(), waited);
     112                    }
    83113                    _pendingSince = 0;
    84114                    send(pending, 0, i, sender, rec);
    85115                    if (_log.shouldLog(Log.INFO))
    86116                        _log.info("Allocated=" + allocated + " so we sent " + (i+1)
    87                                   + " (last complete? " + (msg.getOffset() >= msg.getData().length) + ")");
     117                                  + " (last complete? " + (msg.getOffset() >= msg.getData().length)
     118                                  + ", off=" + msg.getOffset() + ", count=" + pending.size() + ")");
    88119
    89120                    for (int j = 0; j < i; j++) {
    90121                        TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
     122                        if (cur.getOffset() < cur.getData().length)
     123                            throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
     124                                                               + " len=" + cur.getData().length + " alloc=" + allocated);
    91125                        notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
     126                        _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
    92127                    }
    93128                    if (msg.getOffset() >= msg.getData().length) {
     
    95130                        TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
    96131                        notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
     132                        _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
    97133                    }
    98134                    if (i > 0)
    99135                        _context.statManager().addRateData("tunnel.batchMultipleCount", i+1, 0);
    100136                    allocated = 0;
    101                     break;
     137                    // don't break - we may have enough source messages for multiple full tunnel messages
     138                    //break;
     139                    batchCount++;
    102140                }
    103141            }
     142           
     143            display(allocated, pending, "after looping to clear " + (beforeLooping - pending.size()));
    104144           
    105145            if (allocated > 0) {
     
    107147                // have enough data to send a full message
    108148
    109                 if ( (_pendingSince > 0) && (_pendingSince + getSendDelay() <= _context.clock().now()) ) {
    110                     if (_log.shouldLog(Log.INFO))
    111                         _log.info("Passed through pending list, with " + allocated + "/" + pending.size()
    112                                   + " left to clean up, but we've waited, so flush");
    113 
     149                if ( (pending.size() > FORCE_BATCH_FLUSH) || ( (_pendingSince > 0) && (getDelayAmount() <= 0) ) ) {
    114150                    // not even a full message, but we want to flush it anyway
    115151                   
     
    120156                    send(pending, 0, pending.size()-1, sender, rec);
    121157                   
    122                     while (pending.size() > 0) {
    123                         TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.remove(0);
    124                         notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
    125                     }
    126                     _pendingSince = 0;
    127                     return false;
     158                    int beforeSize = pending.size();
     159                    for (int i = 0; i < pending.size(); i++) {
     160                        TunnelGateway.Pending cur = (TunnelGateway.Pending)pending.get(i);
     161                        if (cur.getOffset() >= cur.getData().length) {
     162                            pending.remove(i);
     163                            notePreprocessing(cur.getMessageId(), cur.getFragmentNumber());
     164                            _context.statManager().addRateData("tunnel.writeDelay", cur.getLifetime(), cur.getData().length);
     165                            i--;
     166                        }
     167                    }
     168                    if (pending.size() > 0) {
     169                        _pendingSince = _context.clock().now();
     170                        _context.statManager().addRateData("tunnel.batchFlushRemaining", pending.size(), beforeSize);
     171                        display(allocated, pending, "flushed, some remain");
     172                        return true;
     173                    } else {
     174                        long delayAmount = _context.clock().now() - _pendingSince;
     175                        _pendingSince = 0;
     176                        if (batchCount > 1)
     177                            _context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
     178                        display(allocated, pending, "flushed " + (beforeSize) + ", no remaining after " + delayAmount);
     179                        return false;
     180                    }
    128181                } else {
    129                     if (_log.shouldLog(Log.INFO))
    130                         _log.info("Passed through pending list, with " + allocated + "/"+ pending.size()
    131                                   + " left to clean up, but we've haven't waited, so don't flush (wait="
    132                                   + (_context.clock().now() - _pendingSince) + " / " + _pendingSince + ")");
    133182                    _context.statManager().addRateData("tunnel.batchDelay", pending.size(), 0);
    134183                    if (_pendingSince <= 0)
    135184                        _pendingSince = _context.clock().now();
     185                    if (batchCount > 1)
     186                        _context.statManager().addRateData("tunnel.batchCount", batchCount, 0);
    136187                    // not yet time to send the delayed flush
     188                    display(allocated, pending, "dont flush");
    137189                    return true;
    138190                }
     
    149201        return false;
    150202    }
     203   
     204    private void display(long allocated, List pending, String title) {
     205        if (_log.shouldLog(Log.INFO)) {
     206            long highestDelay = 0;
     207            StringBuffer buf = new StringBuffer();
     208            buf.append(_name).append(": ");
     209            buf.append(title);
     210            buf.append(" allocated: ").append(allocated);
     211            buf.append(" pending: ").append(pending.size());
     212            if (_pendingSince > 0)
     213                buf.append(" delay: ").append(getDelayAmount(false));
     214            for (int i = 0; i < pending.size(); i++) {
     215                TunnelGateway.Pending curPending = (TunnelGateway.Pending)pending.get(i);
     216                buf.append(" pending[").append(i).append("]: ");
     217                buf.append(curPending.getOffset()).append("/").append(curPending.getData().length).append('/');
     218                buf.append(curPending.getLifetime());
     219                if (curPending.getLifetime() > highestDelay)
     220                    highestDelay = curPending.getLifetime();
     221            }
     222            _log.info(buf.toString());
     223        }
     224    }
     225           
    151226   
    152227    /**
  • router/java/src/net/i2p/router/tunnel/BatchedRouterPreprocessor.java

    r5e094b4 r2b841ad  
    1919    public static final String PROP_BATCH_FREQUENCY = "batchFrequency";
    2020    public static final String PROP_ROUTER_BATCH_FREQUENCY = "router.batchFrequency";
    21     public static final int DEFAULT_BATCH_FREQUENCY = 500;
     21    public static final int DEFAULT_BATCH_FREQUENCY = 100;
    2222   
    2323    public BatchedRouterPreprocessor(RouterContext ctx) {
     
    2525    }
    2626    public BatchedRouterPreprocessor(RouterContext ctx, TunnelCreatorConfig cfg) {
    27         super(ctx);
     27        super(ctx, getName(cfg));
    2828        _routerContext = ctx;
    2929        _config = cfg;
    3030    }
    3131    public BatchedRouterPreprocessor(RouterContext ctx, HopConfig cfg) {
    32         super(ctx);
     32        super(ctx, getName(cfg));
    3333        _routerContext = ctx;
    3434        _hopConfig = cfg;
     35    }
     36   
     37    private static String getName(HopConfig cfg) {
     38        if (cfg == null) return "[unknown]";
     39        if (cfg.getReceiveTunnel() != null)
     40            return cfg.getReceiveTunnel().getTunnelId() + "";
     41        else if (cfg.getSendTunnel() != null)
     42            return cfg.getSendTunnel().getTunnelId() + "";
     43        else
     44            return "[n/a]";
     45    }
     46   
     47    private static String getName(TunnelCreatorConfig cfg) {
     48        if (cfg == null) return "[unknown]";
     49        if (cfg.getReceiveTunnelId(0) != null)
     50            return cfg.getReceiveTunnelId(0).getTunnelId() + "";
     51        else if (cfg.getSendTunnelId(0) != null)
     52            return cfg.getSendTunnelId(0).getTunnelId() + "";
     53        else
     54            return "[n/a]";
    3555    }
    3656
     
    4262            if (opts != null)
    4363                freq = opts.getProperty(PROP_BATCH_FREQUENCY);
    44         } else {
     64        }
     65        if (freq == null)
    4566            freq = _routerContext.getProperty(PROP_ROUTER_BATCH_FREQUENCY);
    46         }
    4767       
    4868        if (freq != null) {
  • router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java

    r5e094b4 r2b841ad  
    3333        _log = ctx.logManager().getLog(TrivialPreprocessor.class);
    3434    }
    35    
     35 
     36    /** how long do we want to wait before flushing */
     37    public long getDelayAmount() { return 0; }
     38 
    3639    /**
    3740     * Return true if there were messages remaining, and we should queue up
  • router/java/src/net/i2p/router/tunnel/TunnelGateway.java

    r5e094b4 r2b841ad  
    6565        _delayedFlush = new DelayedFlush();
    6666        _lastFlush = _context.clock().now();
     67        _context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     68        _context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
    6769    }
    6870   
     
    8890        _messagesSent++;
    8991        boolean delayedFlush = false;
    90        
     92        long delayAmount = -1;
     93       
     94        int remaining = 0;
     95        long beforeLock = _context.clock().now();
     96        long afterAdded = -1;
    9197        Pending cur = new PendingImpl(msg, toRouter, toTunnel);
    9298        synchronized (_queue) {
    9399            _queue.add(cur);
     100            afterAdded = _context.clock().now();
    94101            delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
     102            if (delayedFlush)
     103                delayAmount = _preprocessor.getDelayAmount();
    95104            _lastFlush = _context.clock().now();
    96105           
     
    105114                }
    106115            }
     116            remaining = _queue.size();
    107117        }
    108118       
    109119        if (delayedFlush) {
    110             SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency);
    111         }
     120            FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount);
     121        }
     122        _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
    112123    }
    113124   
     
    132143         */
    133144        public boolean preprocessQueue(List pending, Sender sender, Receiver receiver);
     145       
     146        /** how long do we want to wait before flushing */
     147        public long getDelayAmount();
    134148    }
    135149   
     
    149163        protected int _offset;
    150164        protected int _fragmentNumber;
    151        
    152         public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
     165        protected long _created;
     166       
     167        public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
     168            this(message, toRouter, toTunnel, System.currentTimeMillis());
     169        }
     170        public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) {
    153171            _toRouter = toRouter;
    154172            _toTunnel = toTunnel;
     
    158176            _offset = 0;
    159177            _fragmentNumber = 0;
     178            _created = now;
    160179        }
    161180        /** may be null */
     
    171190        /** move the offset */
    172191        public void setOffset(int offset) { _offset = offset; }
     192        public long getLifetime() { return System.currentTimeMillis()-_created; }
    173193        /** which fragment are we working on (0 for the first fragment) */
    174194        public int getFragmentNumber() { return _fragmentNumber; }
     
    177197    }
    178198    private class PendingImpl extends Pending {
    179         private long _created;
    180        
    181199        public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
    182             super(message, toRouter, toTunnel);
    183             _created = _context.clock().now();
     200            super(message, toRouter, toTunnel, _context.clock().now());
    184201        }       
    185202       
     
    204221            return buf.toString();
    205222        }
     223
     224        public long getLifetime() { return _context.clock().now()-_created; }
    206225    }
    207226   
     
    209228        public void timeReached() {
    210229            boolean wantRequeue = false;
     230            int remaining = 0;
     231            long beforeLock = _context.clock().now();
     232            long afterChecked = -1;
     233            long delayAmount = -1;
     234            if (_queue.size() > 10000) // stay out of the synchronized block
     235                System.out.println("foo!");
    211236            synchronized (_queue) {
    212                 if (_queue.size() > 0)
     237                if (_queue.size() > 10000) // stay in the synchronized block
     238                    System.out.println("foo!");
     239                afterChecked = _context.clock().now();
     240                if (_queue.size() > 0) {
    213241                    wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
     242                    if (wantRequeue)
     243                        delayAmount = _preprocessor.getDelayAmount();
     244                }
     245                remaining = _queue.size();
    214246            }
    215247           
    216248            if (wantRequeue)
    217                 SimpleTimer.getInstance().addEvent(_delayedFlush, _flushFrequency);
     249                FlushTimer.getInstance().addEvent(_delayedFlush, delayAmount);
    218250            else
    219251                _lastFlush = _context.clock().now();
     252           
     253            _context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
    220254        }
    221255    }
Note: See TracChangeset for help on using the changeset viewer.