Changeset 3fbf60ee


Ignore:
Timestamp:
Sep 1, 2012 1:21:25 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
8bfbe855
Parents:
6bfd916f
Message:

Codel:

Location:
router/java/src/net/i2p/router/util
Files:
2 added
1 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/util/CoDelBlockingQueue.java

    r6bfd916f r3fbf60ee  
    11package net.i2p.router.util;
    22
     3import java.util.Collection;
    34import java.util.concurrent.LinkedBlockingQueue;
    45import java.util.concurrent.TimeUnit;
     
    1516 *  Input: add(), offer(), and put() are overridden to add a timestamp.
    1617 *
    17  *  Output : take() is overridden to implement AQM and drop entries
    18  *  if necessary. poll(), peek(), and remove() are NOT overridden, and do
     18 *  Output : take(), poll(), and drainTo() are overridden to implement AQM and drop entries
     19 *  if necessary. peek(), and remove() are NOT overridden, and do
    1920 *  NOT implement AQM or update stats.
    2021 *
     
    5859    private final String STAT_DROP;
    5960    private final String STAT_DELAY;
     61    private static final long[] RATES = {5*60*1000};
    6062
    6163    /**
     
    6567        super(capacity);
    6668        _context = ctx;
    67         STAT_DROP = "router.codel." + name + ".drop";
    68         STAT_DELAY = "router.codel." + name + ".delay";
    69         ctx.statManager().createRequiredRateStat(STAT_DROP, "drop rate", "Router", new long[] { 10*60*1000 });
    70         ctx.statManager().createRequiredRateStat(STAT_DELAY, "delay", "Router", new long[] { 10*60*1000 });
     69        STAT_DROP = "codel." + name + ".drop";
     70        STAT_DELAY = "codel." + name + ".delay";
     71        ctx.statManager().createRequiredRateStat(STAT_DROP, "AQM drop events", "Router", RATES);
     72        ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES);
    7173    }
    7274
     
    115117    }
    116118
     119    @Override
     120    public E poll() {
     121        E rv = super.poll();
     122        return codel(rv);
     123    }
     124
     125    @Override
     126    public int drainTo(Collection<? super E> c) {
     127        int rv = 0;
     128        E e;
     129        while ((e = poll()) != null) {
     130            c.add(e);
     131            rv++;
     132        }
     133        return rv;
     134    }
     135
     136    @Override
     137    public int drainTo(Collection<? super E> c, int maxElements) {
     138        int rv = 0;
     139        E e;
     140        while ((e = poll()) != null && rv++ < maxElements) {
     141            c.add(e);
     142        }
     143        return rv;
     144    }
     145
    117146    /////// private below here
    118147
     
    122151     */
    123152    private boolean updateVars(E entry) {
    124         // This is a helper routine the does the actual packet dequeue and tracks whether the sojourn time
     153        // This is a helper routine that tracks whether the sojourn time
    125154        // is above or below target and, if above, if it has remained above continuously for at least interval.
    126         // It returns two values, a Boolean indicating whether it is OK to drop (sojourn time above target
    127         // for at least interval) and the packet dequeued.
    128         _now = _context.clock().now();
     155        // It returns a boolean indicating whether it is OK to drop (sojourn time above target
     156        // for at least interval)
    129157        if (entry == null) {
    130158            _first_above_time = 0;
    131159            return false;
    132160        }
     161        _now = _context.clock().now();
    133162        boolean ok_to_drop = false;
    134163        long sojurn = _now - entry.getEnqueueTime();
     
    154183    private E deque() throws InterruptedException {
    155184        E rv = super.take();
     185        return codel(rv);
     186    }
     187
     188
     189    /**
     190     *  @param rv may be null
     191     *  @return rv or a subequent entry or null if dropped
     192     */
     193    private E codel(E rv) {
    156194        synchronized (this) {
    157195            // non-blocking inside this synchronized block
     
    177215                        // I2P - we poll here instead of lock so we don't get stuck
    178216                        // inside the lock. If empty, deque() will be called again.
    179                         rv = poll();
     217                        rv = super.poll();
    180218                        ok_to_drop = updateVars(rv);
    181219                        if (!ok_to_drop) {
     
    201239                // I2P - we poll here instead of lock so we don't get stuck
    202240                // inside the lock. If empty, deque() will be called again.
    203                 rv = poll();
     241                rv = super.poll();
    204242                updateVars(rv);
    205243                _dropping = true;
Note: See TracChangeset for help on using the changeset viewer.