Opened 10 months ago

Closed 9 months ago

#2719 closed enhancement (fixed)

Streaming: TCP-Westwood+

Reported by: Zlatin Balevsky Owned by:
Priority: minor Milestone: 0.9.46
Component: streaming Version: 0.9.45
Keywords: Cc:
Parent Tickets: Sensitive: no

Description

This builds on top of ticket #2718 and implements "anti-aliased" sampling window. According to this paper https://c3lab.poliba.it/images/c/cd/QoSIP03.pdf should provide better estimate than regular Westwood.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index a06a48809..d4730d12c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -68,9 +69,11 @@ class Connection {
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
+    private volatile int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private final AtomicLong _nextRetransmitTime = new AtomicLong();
+    private final RetransmitEvent _retransmitEvent;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -99,6 +102,17 @@ class Connection {
     public static final long MAX_RESEND_DELAY = 45*1000;
     public static final long MIN_RESEND_DELAY = 100;
 
+    public static final int MAX_WINDOW_SIZE = 128;
+    public static final int MAX_SLOW_START_WINDOW = 24;
+
+    /** Westwood parameters and state */
+    private static final int WESTWOOD_TAU = 5000;
+    private static final int WESTWOOD_M = 2;
+    private final VirtualAckEvent _virtualAckEvent;
+    private final AtomicInteger _acksThisRtt = new AtomicInteger();
+    private long _tAck;
+    private volatile double _bK, _bKFiltered;
+
     /**
      *  Wait up to 5 minutes after disconnection so we can ack/close packets.
      *  Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL)
@@ -108,7 +122,6 @@ class Connection {
     public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000;
     private static final long MAX_CONNECT_TIMEOUT = 2*60*1000;
 
-    public static final int MAX_WINDOW_SIZE = 128;
     private static final int UNCHOKES_TO_SEND = 8;
     
 /****
@@ -156,6 +169,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
+        _ssthresh = MAX_SLOW_START_WINDOW;
         _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
@@ -165,11 +179,17 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
+        _virtualAckEvent = new VirtualAckEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +259,9 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
+                                       + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -423,10 +443,15 @@ class Connection {
             long timeout = _options.getRTO();
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+                _retransmitEvent.forceReschedule(timeout);
+            } else if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " timer was already running");
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -452,7 +477,7 @@ class Connection {
         }
          */
     }
-    
+   
 /*********
     private class PingNotifier implements ConnectionManager.PingNotifier {
         private long _startedPingOn;
@@ -492,6 +517,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -552,18 +578,57 @@ class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("All outbound packets acked, clearing " + _activeResends);
-                _activeResends.set(0);
-            }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
+            _acksThisRtt.addAndGet(acked.size());
+            _virtualAckEvent.scheduleFirst();
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _nextRetransmitTime.set(_context.clock().now() + rto);
+                _retransmitEvent.forceReschedule(rto);
+
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+                _nextRetransmitTime.set(0);
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
+    /**
+     * Updates the bandwidth estimate
+     * @param now what time is it now
+     * @param nAcked number of packets acked, can be zero for virtual acks
+     */
+    private synchronized void updateBK(long now, int nAcked) {
+        if (_tAck > 0) {
+
+            double bKFiltered = _bKFiltered; // for debug logging only
+
+            long deltaT = now - _tAck;
+            double bk = nAcked * 1.0 / deltaT;
+            double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT);
+
+            _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2;
+            _bK = bk;
+
+            if (_log.shouldLog(Log.DEBUG)) { 
+                _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered +
+                    " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); 
+            }
+        }
+        _tAck = now;
+    }
+
     //private long _occurredTime;
     //private long _occurredEventCount;
 
@@ -788,6 +853,8 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
+        _virtualAckEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1385,7 +1452,9 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
+        buf.append(" bkFiltered ").append(_bKFiltered);
+        buf.append(" minRTT ").append(getOptions().getMinRTT());
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1393,6 +1462,143 @@ class Connection {
         return buf.toString();
     }
 
+
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            ConnectionOptions opts = getOptions();
+            synchronized(opts) {
+                opts.doubleRTO();
+                reschedule(opts.getRTO());
+                _nextRetransmitTime.set(now + opts.getRTO());
+            }
+
+            // 2. cut ssthresh in accordance to what Westwood paper section 3.2
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.values().iterator().next();
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, (toResend.size() + 1) / 2);
+            }
+
+
+            // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+        }
+    }
+
+
+    /**
+     * In order for the low-pass filter to work it is important to
+     * receive bw measurements at least every so often.
+     */
+    class VirtualAckEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _firstScheduled;
+        private boolean _virtual;
+        private long _nextRttSampleTime;
+
+        VirtualAckEvent() {
+            super(_timer);
+        }
+
+        public synchronized void timeReached() {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " Westwood+ timer hit, virtual " + _virtual);
+
+            final long now = _context.clock().now();
+
+            if (_virtual) {
+                updateBK(now, 0);
+            } else {
+                updateBK(now, _acksThisRtt.getAndSet(0));
+                _nextRttSampleTime = now + getOptions().getRTT();
+            }
+
+            scheduleNext(now);
+        }
+
+        synchronized void scheduleFirst() {
+            if (_firstScheduled)
+                return;
+            _firstScheduled = true;
+            final long now = _context.clock().now();
+            _nextRttSampleTime = now + getOptions().getRTT();
+            scheduleNext(now);
+        }
+
+        private void scheduleNext(long now) {
+            int timeToRtt = (int)(_nextRttSampleTime - now);
+            if (timeToRtt < WESTWOOD_TAU / WESTWOOD_M ) {
+                _virtual = false;
+                forceReschedule(timeToRtt);
+            } else {
+                _virtual = true;
+                forceReschedule( WESTWOOD_TAU / WESTWOOD_M );
+            }
+        }
+    }    
+
     /**
      * fired to reschedule event notification
      */
@@ -1428,13 +1634,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1454,43 +1663,11 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
+
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1526,9 +1703,6 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
                         
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
@@ -1536,7 +1710,12 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+
+                        if (_packet.getNumSends() == 1) {
+                            _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                            int wSize = getOptions().getWindowSize();
+                            getOptions().setWindowSize(Math.min(_ssthresh, wSize));
+                        }
 
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
@@ -1581,19 +1760,24 @@ class Connection {
                     long rto = _options.getRTO();
                     if (rto < MIN_RESEND_DELAY)
                         rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_nextRetransmitTime.compareAndSet(0, _nextSend)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                            _retransmitEvent.forceReschedule(timeout);
+                        }
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1605,8 +1789,6 @@ class Connection {
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
-
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1618,12 +1800,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index 1d48a33b3..6d0dd9c5c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _receiveWindow;
     private int _profile;
     private int _rtt;
+    private int _minRtt = Integer.MAX_VALUE;
     private int _rttDev;
     private int _rto = INITIAL_RTO;
     private int _resendDelay;
@@ -577,6 +578,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public synchronized int getRTT() { return _rtt; }
 
+    /**
+     * @return minimum RTT observed over the life of the connection
+     */
+    public synchronized int getMinRTT() {return _minRtt; }
+
     /**
      *  not public, use updateRTT()
      */
@@ -671,6 +677,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      *  @param measuredValue must be positive
      */
     public synchronized void updateRTT(int measuredValue) {
+        _minRtt = Math.min(_minRtt, measuredValue);
         switch(_initState) {
         case INIT:
             _initState = AckInit.FIRST;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..8065dd77c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -432,8 +430,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..ff351232d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
     private long _ackOn; 
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
-    private volatile boolean _retransmitted;
     private volatile SimpleTimer2.TimedEvent _resendEvent;
     
     /** not bound to a connection */
@@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
-        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
-            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
-            _retransmitted = true;
-            evt.reschedule(0);
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
+        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null  &&
+            (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) {  // Don't fast retx if we recently resent it
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
-                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
+                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
             }
         } else if (_log.shouldLog(Log.DEBUG)) {
-            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
+            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
         }
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..08aa1639b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;

Subtickets

Change History (5)

comment:1 Changed 9 months ago by Zlatin Balevsky

Rebasing against -10. Other notes:

  • MAX_RTX is 1, pending research
  • Multiple fast retransmit removed, pending ticket #2711
  • TAU reduced to 1 second without good reason tbh, just "feels" better in loopback tests
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index ce414faf2..f817fc240 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -5,6 +5,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,7 @@ class Connection {
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
-    private final Map<Long, PacketLocal> _outboundPackets;
+    private final SortedMap<Long, PacketLocal> _outboundPackets;
     private final PacketQueue _outboundQueue;
     private final ConnectionPacketHandler _handler;
     private ConnectionOptions _options;
@@ -70,7 +71,6 @@ class Connection {
     private final ActivityTimer _activityTimer;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
-    private volatile long _nextRetransmitTime;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -84,6 +84,7 @@ class Connection {
     /** how many messages have been resent and not yet ACKed? */
     private final AtomicInteger _activeResends = new AtomicInteger();
     private final ConEvent _connectionEvent;
+    private final RetransmitEvent _retransmitEvent;
     private final int _randomWait;
     private final int _localPort;
     private final int _remotePort;
@@ -110,6 +111,17 @@ class Connection {
 
     public static final int MAX_WINDOW_SIZE = 128;
     private static final int UNCHOKES_TO_SEND = 8;
+
+    /** Maximum number of packets to retransmit when the timer hits */
+    private static final int MAX_RTX = 1;
+
+    /** Westwood parameters and state */
+    private static final int WESTWOOD_TAU = 1000;
+    private static final int WESTWOOD_M = 2;
+    private final VirtualAckEvent _virtualAckEvent;
+    private final AtomicInteger _acksThisRtt = new AtomicInteger();
+    private long _tAck;
+    private volatile double _bK, _bKFiltered;
     
 /****
     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
@@ -165,6 +177,8 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
+        _virtualAckEvent = new VirtualAckEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
@@ -431,7 +445,15 @@ class Connection {
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+            } else {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " timer was already running");
+            }
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -497,6 +519,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -562,15 +585,58 @@ class Connection {
                     _log.info("All outbound packets acked, clearing " + _activeResends);
                 _activeResends.set(0);
             }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
         if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
-            _nextRetransmitTime = _context.clock().now() + getOptions().getRTO();
+            _acksThisRtt.addAndGet(acked.size());
+            _virtualAckEvent.scheduleFirst();
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _retransmitEvent.pushBackRTO(rto);
+                
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+
+                _retransmitEvent.cancel();
+            }
+
         }
         return acked;
     }
 
+    /**
+     * Updates the bandwidth estimate
+     * @param now what time is it now
+     * @param nAcked number of packets acked, can be zero for virtual acks
+     */
+    private synchronized void updateBK(long now, int nAcked) {
+        if (_tAck > 0) {
+
+            double bKFiltered = _bKFiltered; // for debug logging only
+
+            long deltaT = now - _tAck;
+            double bk = nAcked * 1.0 / deltaT;
+            double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT);
+
+            _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2;
+            _bK = bk;
+
+            if (_log.shouldLog(Log.DEBUG)) { 
+                _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered +
+                    " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); 
+            }
+        }
+        _tAck = now;
+    }
+
+
     //private long _occurredTime;
     //private long _occurredEventCount;
 
@@ -795,6 +861,8 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
+        _virtualAckEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1390,6 +1458,8 @@ class Connection {
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
         buf.append(" ssThresh ").append(_ssthresh); 
+        buf.append(" bKFiltered ").append(_bKFiltered);
+        buf.append(" minRTT ").append(getOptions().getMinRTT());
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1397,6 +1467,214 @@ class Connection {
         return buf.toString();
     }
 
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _scheduled;
+
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        @Override
+        public synchronized boolean cancel() {
+            _scheduled = false;
+            return super.cancel();
+        }
+
+        public synchronized boolean scheduleIfNotRunning(long delay) {
+            if (_scheduled)
+                return false;
+            _scheduled = true;
+            schedule(delay);
+            return true;
+        }
+
+        public synchronized void pushBackRTO(int rto) {
+            if (!_scheduled) {
+                _log.log(Log.ERROR, Connection.this + " timer was not scheduled", new Exception());
+            }
+            reschedule(rto, false);
+        }
+
+        @Override
+        public void timeReached() {
+
+           if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " rtx event after close or reset");
+                return;
+            }
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            pushBackRTO(getOptions().doubleRTO());
+
+            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.get(_outboundPackets.firstKey());
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    _ssthresh = Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 );
+                    _ssthresh = Math.min( ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh );
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, Math.min(MAX_RTX, (toResend.size() + 1) / 2));
+            }
+
+            // 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else {
+                    
+                    if (_isChoking) {
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug(Connection.this + " packet is choking " + packet);
+                        packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
+                        packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
+                    } else if (_unchokesToSend.decrementAndGet() > 0) {
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug(Connection.this + " packet is unchoking " + packet);
+                        // don't worry about wrapping around
+                        packet.setOptionalDelay(0);
+                        packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
+                    } else {
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug(Connection.this + " packet clearing flag " + packet);
+                        // clear flag
+                        packet.setFlag(Packet.FLAG_DELAY_REQUESTED, false);
+                    }
+
+                    // this seems unnecessary to send the MSS again:
+                    //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
+                    // bugfix release 0.7.8, we weren't dividing by 1000
+                    packet.setResendDelay(getOptions().getResendDelay() / 1000);
+                    if (packet.getReceiveStreamId() <= 0)
+                        packet.setReceiveStreamId(_receiveStreamId.get());
+                    if (packet.getSendStreamId() <= 0)
+                        packet.setSendStreamId(_sendStreamId.get());
+
+                    ResendPacketEvent rpe = packet.getResendEvent();
+                    if (rpe != null) 
+                        rpe._nextSend = now + getOptions().getRTO();
+                    
+
+                    if (_outboundQueue.enqueue(packet)) {
+                        if (_log.shouldLog(Log.INFO)) 
+                            _log.info(Connection.this + " resent packet " + packet);
+                        if (nResends == 1)
+                            _activeResends.incrementAndGet();
+                        sentAny = true;
+                    } else if (_log.shouldLog(Log.DEBUG)) 
+                       _log.debug(Connection.this + " could not resend packet " + packet);
+                }
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+                windowAdjusted();
+            }
+
+        }
+    }
+
+    /**
+     * In order for the low-pass filter to work it is important to
+     * receive bw measurements at least every so often.
+     */
+    class VirtualAckEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _firstScheduled;
+        private boolean _virtual;
+        private long _nextRttSampleTime;
+
+        VirtualAckEvent() {
+            super(_timer);
+        }
+
+        public synchronized void timeReached() {
+
+           if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " virtual ack event after close or reset");
+                return;
+            }
+
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " Westwood+ timer hit, virtual " + _virtual);
+
+            final long now = _context.clock().now();
+
+            if (_virtual) {
+                updateBK(now, 0);
+            } else {
+                updateBK(now, _acksThisRtt.getAndSet(0));
+                _nextRttSampleTime = now + getOptions().getRTT();
+            }
+
+            scheduleNext(now);
+        }
+
+        synchronized void scheduleFirst() {
+            if (_firstScheduled)
+                return;
+            _firstScheduled = true;
+            final long now = _context.clock().now();
+            _nextRttSampleTime = now + getOptions().getRTT();
+            scheduleNext(now);
+        }
+
+        private void scheduleNext(long now) {
+            int timeToRtt = (int)(_nextRttSampleTime - now);
+            if (timeToRtt < WESTWOOD_TAU / WESTWOOD_M ) {
+                _virtual = false;
+                forceReschedule(timeToRtt);
+            } else {
+                _virtual = true;
+                forceReschedule( WESTWOOD_TAU / WESTWOOD_M );
+            }
+        }
+    }    
+
+
+
     /**
      * fired to reschedule event notification
      */
@@ -1426,14 +1704,12 @@ class Connection {
     class ResendPacketEvent extends SimpleTimer2.TimedEvent {
         private final PacketLocal _packet;
         private long _nextSend;
-        private boolean _fastRetransmit;
 
         public ResendPacketEvent(PacketLocal packet, long delay) {
             super(_timer);
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
@@ -1444,7 +1720,6 @@ class Connection {
          * @since 0.9.46
          */
         void fastRetransmit() {
-            _fastRetransmit = true;
             reschedule(0);
         }
 
@@ -1469,49 +1744,6 @@ class Connection {
             }
 
             long now = _context.clock().now();
-            long nextRetransmitTime = _nextRetransmitTime;
-            if (nextRetransmitTime > now  && !_fastRetransmit) {
-                long delay = nextRetransmitTime - now;
-                if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Resend time reached but will be delayed " + delay + " for packet " + _packet);
-                forceReschedule(delay);
-                return false;
-            }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                if ( (!isLowest) && (!_fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
-                
-                // It's the lowest, or it's fast retransmit time. Resend the packet.
-
-                if (_fastRetransmit)
                     _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
@@ -1548,21 +1780,18 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
-                        
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
                         // See RFC 6298 section 5 item 5.5
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
 
                         if (_packet.getNumSends() == 1) {
-                            int flightSize = getUnackedPacketsSent();
-                            _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                            _ssthresh = Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 );
+                            _ssthresh = Math.min( ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh );
+                            int wSize = getOptions().getWindowSize();
+                            getOptions().setWindowSize(Math.min(_ssthresh, wSize));
                         }
 
                         if (_log.shouldLog(Log.INFO))
@@ -1606,19 +1835,24 @@ class Connection {
                 } else {
                     //long timeout = _options.getResendDelay() << numSends;
                     long rto = _options.getRTO();
-                    long timeout = rto << (numSends-2);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                        }
+
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Resent packet " +
-                                  (_fastRetransmit ? "(fast) " : "(timeout) ") +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1627,12 +1861,10 @@ class Connection {
                                   + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
                         _unackedPacketsReceived.set(0);
                         _lastSendTime = _context.clock().now();
-                        _fastRetransmit = false;
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
 
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1644,12 +1876,7 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
+            
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index f69f2ed31..0235863c6 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _receiveWindow;
     private int _profile;
     private int _rtt;
+    private int _minRtt = Integer.MAX_VALUE;
     private int _rttDev;
     private int _rto = INITIAL_RTO;
     private int _resendDelay;
@@ -577,6 +578,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public synchronized int getRTT() { return _rtt; }
 
+    /**
+     * @return minimum RTT observed over the life of the connection
+     */
+    public synchronized int getMinRTT() {return _minRtt; }
+
+
     /**
      *  not public, use updateRTT()
      */
@@ -676,6 +683,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      *  @param measuredValue must be positive
      */
     public synchronized void updateRTT(int measuredValue) {
+        _minRtt = Math.min(_minRtt, measuredValue);
         switch(_initState) {
         case INIT:
             _initState = AckInit.FIRST;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index eb9b65392..dee04479b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -433,7 +433,7 @@ class ConnectionPacketHandler {
 
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
-            if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
+            if ( (!congested) && (acked > 0) ) {
                 int ssthresh = con.getSSThresh();
                 if (newWindowSize < ssthresh) {
                     // slow start - exponential growth

comment:2 Changed 9 months ago by Zlatin Balevsky

Parent Tickets: 2718

Removing #2718 as parent ticket as it will not be implemented.

comment:3 Changed 9 months ago by Zlatin Balevsky

Rebasing against 0.9.45-11:

No changes from patch in previous comment.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index 558f31c15..f0a63811f 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -114,6 +114,14 @@ class Connection {
 
     /** Maximum number of packets to retransmit when the timer hits */
     private static final int MAX_RTX = 16;
+
+    /** Westwood parameters and state */
+    private static final int WESTWOOD_TAU = 1000;
+    private static final int WESTWOOD_M = 2;
+    private final VirtualAckEvent _virtualAckEvent;
+    private final AtomicInteger _acksThisRtt = new AtomicInteger();
+    private long _tAck;
+    private volatile double _bK, _bKFiltered;
     
 /****
     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
@@ -170,6 +178,7 @@ class Connection {
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
         _retransmitEvent = new RetransmitEvent();
+        _virtualAckEvent = new VirtualAckEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
@@ -579,6 +588,8 @@ class Connection {
         }
         if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
+            _acksThisRtt.addAndGet(acked.size());
+            _virtualAckEvent.scheduleFirst();
             if (anyLeft) {
                 // RFC 6298 section 5.3
                 int rto = getOptions().getRTO();
@@ -597,6 +608,32 @@ class Connection {
         return acked;
     }
 
+    /**
+     * Updates the bandwidth estimate
+     * @param now what time is it now
+     * @param nAcked number of packets acked, can be zero for virtual acks
+     */
+    private synchronized void updateBK(long now, int nAcked) {
+        if (_tAck > 0) {
+
+            double bKFiltered = _bKFiltered; // for debug logging only
+
+            long deltaT = now - _tAck;
+            double bk = nAcked * 1.0 / deltaT;
+            double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT);
+
+            _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2;
+            _bK = bk;
+
+            if (_log.shouldLog(Log.DEBUG)) { 
+                _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered +
+                    " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); 
+            }
+        }
+        _tAck = now;
+    }
+
+
     //private long _occurredTime;
     //private long _occurredEventCount;
 
@@ -822,6 +859,7 @@ class Connection {
         _receiver.destroy();
         _activityTimer.cancel();
         _retransmitEvent.cancel();
+        _virtualAckEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1416,7 +1454,9 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        buf.append(" ssThresh ").append(_ssthresh); 
+        buf.append(" ssThresh ").append(_ssthresh);
+        buf.append(" bKFiltered ").append(_bKFiltered); 
+        buf.append(" minRTT ").append(getOptions().getMinRTT()); 
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1477,7 +1517,7 @@ class Connection {
             final long now = _context.clock().now();
             pushBackRTO(getOptions().doubleRTO());
 
-            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
+            // 2. cut ssthresh to BWE * minRTT and window to 1 ( Westwood paper, section 3.2)
             List<PacketLocal> toResend = null;
             synchronized(_outboundPackets) {
                 if (_outboundPackets.isEmpty()) {
@@ -1490,8 +1530,8 @@ class Connection {
                 if (oldest.getNumSends() == 1) {
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug(Connection.this + " cutting ssthresh and window");
-                    int flightSize = _outboundPackets.size();
-                    _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                    _ssthresh = Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 );
+                    _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh );
                     getOptions().setWindowSize(1);
                 } else if (_log.shouldLog(Log.DEBUG))
                     _log.debug(Connection.this + " not cutting ssthresh and window");
@@ -1575,7 +1615,64 @@ class Connection {
         }
     }
 
+    /**
+     * In order for the low-pass filter to work it is important to
+     * receive bw measurements at least every so often.
+     */
+    class VirtualAckEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _firstScheduled;
+        private boolean _virtual;
+        private long _nextRttSampleTime;
+
+        VirtualAckEvent() {
+            super(_timer);
+        }
+
+        public synchronized void timeReached() {
 
+           if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " virtual ack event after close or reset");
+                return;
+            }
+
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " Westwood+ timer hit, virtual " + _virtual);
+
+            final long now = _context.clock().now();
+
+            if (_virtual) {
+                updateBK(now, 0);
+            } else {
+                updateBK(now, _acksThisRtt.getAndSet(0));
+                _nextRttSampleTime = now + getOptions().getRTT();
+            }
+
+            scheduleNext(now);
+        }
+
+        synchronized void scheduleFirst() {
+            if (_firstScheduled)
+                return;
+            _firstScheduled = true;
+            final long now = _context.clock().now();
+            _nextRttSampleTime = now + getOptions().getRTT();
+            scheduleNext(now);
+        }
+
+        private void scheduleNext(long now) {
+            int timeToRtt = (int)(_nextRttSampleTime - now);
+            if (timeToRtt < WESTWOOD_TAU / WESTWOOD_M ) {
+                _virtual = false;
+                forceReschedule(timeToRtt);
+            } else {
+                _virtual = true;
+                forceReschedule( WESTWOOD_TAU / WESTWOOD_M );
+            }
+        }
+    } 
 
     /**
      * fired to reschedule event notification
@@ -1693,11 +1790,13 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(1);
 
+                        // cut ssthresh to BWE * minRTT, wsize to ssthresh (Westwood paper section 3.1)
                         if (_packet.getNumSends() == 1) {
-                            int flightSize = getUnackedPacketsSent();
-                            _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                            _ssthresh = Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 );
+                            _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh);
+                            int wsize = getOptions().getWindowSize();
+                            getOptions().setWindowSize(Math.min(_ssthresh,wsize));
                         }
 
                         if (_log.shouldLog(Log.INFO))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index 4d9ae6ef5..d0774e108 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _receiveWindow;
     private int _profile;
     private int _rtt;
+    private int _minRtt = Integer.MAX_VALUE;
     private int _rttDev;
     private int _rto = INITIAL_RTO;
     private int _resendDelay;
@@ -578,6 +579,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public synchronized int getRTT() { return _rtt; }
 
+    /**
+     * @return minimum RTT observed over the life of the connection
+     */
+    public synchronized int getMinRTT() {return _minRtt; }
+
+
     /**
      *  not public, use updateRTT()
      */
@@ -677,6 +684,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      *  @param measuredValue must be positive
      */
     public synchronized void updateRTT(int measuredValue) {
+        _minRtt = Math.min(_minRtt, measuredValue);
         switch(_initState) {
         case INIT:
             _initState = AckInit.FIRST;

comment:4 Changed 9 months ago by Zlatin Balevsky

A different approach to implementing Westwood[+] is to not use a timer, but instead keep history of incoming acks and compute the Bandwidth Estimate (BWE) on demand. The following patch and two new files implement one such approach.

  1. Patch to the apps/streaming package:
    diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/
    streaming/impl/Connection.java
    index 558f31c15..615da2b47 100644
    --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
    +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
    @@ -114,6 +114,9 @@ class Connection {
     
         /** Maximum number of packets to retransmit when the timer hits */
         private static final int MAX_RTX = 16;
    +
    +
    +    private final BandwidthEstimator _bwEstimator;
         
     /****
         public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
    @@ -170,6 +173,7 @@ class Connection {
             _nextSendLock = new Object();
             _connectionEvent = new ConEvent();
             _retransmitEvent = new RetransmitEvent();
    +        _bwEstimator = new WestwoodBandwidthEstimator(ctx, 8 * MAX_WINDOW_SIZE, 1000, 2, 10);
             _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
             // all createRateStats in ConnectionManager
             if (_log.shouldLog(Log.INFO))
    @@ -579,6 +583,7 @@ class Connection {
             }
             if ((acked != null) && (!acked.isEmpty()) ) {
                 _ackSinceCongestion.set(true);
    +            _bwEstimator.addSample(acked.size());
                 if (anyLeft) {
                     // RFC 6298 section 5.3
                     int rto = getOptions().getRTO();
    @@ -1477,7 +1482,7 @@ class Connection {
                 final long now = _context.clock().now();
                 pushBackRTO(getOptions().doubleRTO());
     
    -            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
    +            // 2. cut ssthresh to bandwidth estimate, window to 1
                 List<PacketLocal> toResend = null;
                 synchronized(_outboundPackets) {
                     if (_outboundPackets.isEmpty()) {
    @@ -1490,8 +1495,8 @@ class Connection {
                     if (oldest.getNumSends() == 1) {
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug(Connection.this + " cutting ssthresh and window");
    -                    int flightSize = _outboundPackets.size();
    -                    _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
    +                    _ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 );
    +                    _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh);
                         getOptions().setWindowSize(1);
                     } else if (_log.shouldLog(Log.DEBUG))
                         _log.debug(Connection.this + " not cutting ssthresh and window");
    @@ -1693,11 +1698,12 @@ class Connection {
                             // This prevents being stuck at a window size of 1, retransmitting every packet,
                             // never updating the RTT or RTO.
                             getOptions().doubleRTO();
    -                        getOptions().setWindowSize(1);
     
                             if (_packet.getNumSends() == 1) {
    -                            int flightSize = getUnackedPacketsSent();
    -                            _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
    +                            _ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 );
    +                            _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh);
    +                            int wsize = getOptions().getWindowSize();
    +                            getOptions().setWindowSize(Math.min(_ssthresh, wsize));
                             }
     
                             if (_log.shouldLog(Log.INFO))
    diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
    index 4d9ae6ef5..d0774e108 100644
    --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
    +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
    @@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
         private int _receiveWindow;
         private int _profile;
         private int _rtt;
    +    private int _minRtt = Integer.MAX_VALUE;
         private int _rttDev;
         private int _rto = INITIAL_RTO;
         private int _resendDelay;
    @@ -578,6 +579,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
          */
         public synchronized int getRTT() { return _rtt; }
     
    +    /**
    +     * @return minimum RTT observed over the life of the connection
    +     */
    +    public synchronized int getMinRTT() {return _minRtt; }
    +
    +
         /**
          *  not public, use updateRTT()
          */
    @@ -677,6 +684,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
          *  @param measuredValue must be positive
          */
         public synchronized void updateRTT(int measuredValue) {
    +        _minRtt = Math.min(_minRtt, measuredValue);
             switch(_initState) {
             case INIT:
                 _initState = AckInit.FIRST;
    
    
  1. The BandwidthEstimator base class
package net.i2p.client.streaming.impl;

import java.util.Queue;
import java.util.ArrayDeque;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

import net.i2p.I2PAppContext;
import net.i2p.util.Log;

abstract class BandwidthEstimator {

    private final I2PAppContext _context;
    private final Log _log;
    private final BlockingQueue<AckSample> _samples;
    private final int _maxSamples;

    BandwidthEstimator(I2PAppContext ctx, int maxSamples) {
        _context = ctx;
        _log = ctx.logManager().getLog(BandwidthEstimator.class);
        _samples = new ArrayBlockingQueue<>(maxSamples);
        _maxSamples = maxSamples;
    }

    /**
     * Records an arriving ack.
     * @param acked how many packets were acked with this ack
     */
    public final void addSample(int acked) {
        long now = _context.clock().now();
        AckSample sample = new AckSample(now, acked);

        while (!_samples.offer(sample)) {
            if (_log.shouldLog(Log.INFO))
                _log.info("Samples overflow, discarding earliest");
            AckSample toDiscard = _samples.poll();
            discard(toDiscard);
        }
    }

    /**
     * Notification that a sample was discarded
     * Can be overriden by subclasses
     */
    protected void discard(AckSample discarded) {}

    /**
     * @return the current bandwidth estimate in packets/ms.
     */
    public final double getBandwidthEstimate() {
        final long now = _context.clock().now();

        Queue<AckSample> copy = new ArrayDeque<>(_maxSamples);
        _samples.drainTo(copy);

        int nSamples = copy.size();
        double rv = computeBWE(now, copy);

        if (_log.shouldLog(Log.DEBUG))
            _log.debug("computed " + rv + " from samples " + nSamples);

        return rv;
    }

    /**
     * Performs the actual computation.  To be implemented by subclasses
     * @param now what time is it now
     * @param samples list of AckSamples to base the computation on
     * @return the current bandwidth estimate in packets/ms
     */
    protected abstract double computeBWE(long now, Queue<AckSample> samples);

    protected static class AckSample {
        final long timestamp;
        final int acked;

        AckSample(long timestamp, int acked) {
            this.timestamp = timestamp;
            this.acked = acked;
        }

        @Override
        public String toString() {
            return "AckSample[" + timestamp + ":" + acked + "]";
        }
    }
}
  1. The WestwoodBandwidthEstimator implementation of regular Westwood with fixed aliasing interval
package net.i2p.client.streaming.impl;


import java.util.Queue;
import java.util.ArrayDeque;

import net.i2p.I2PAppContext;
import net.i2p.util.Log;


class WestwoodBandwidthEstimator extends BandwidthEstimator {

    private final Log _log;

    private final int _tau;
    private final int _m;
    private final int _tolerance;

    private long _tAck;
    private double _bKFiltered, _bK;

    /**
     * @param tau - westwood TAU constant
     * @param m - westwood M constant
     * @param tolerance - if samples are received within this many milliseconds, alias them
     */
    WestwoodBandwidthEstimator(I2PAppContext ctx, int maxSamples, int tau, int m, int tolerance) {
        super(ctx, maxSamples);
        this._log = ctx.logManager().getLog(WestwoodBandwidthEstimator.class);
        this._tau = tau;
        this._m = m;
        this._tolerance = tolerance;
    }

    @Override
    protected synchronized void discard(AckSample discarded) {
        // start fresh
        _tAck = 0;
        _bKFiltered = 0;
        _bK = 0;
    }

    @Override
    protected synchronized double computeBWE(final long now, Queue<AckSample> samples) {

        if (_log.shouldLog(Log.DEBUG))
            _log.debug(this + " computeBWE now:" + now + " samples:" + samples);

        if (samples.isEmpty() && _tAck == 0)
            return 0.0; // nothing ever sampled


        // 1. pre-process queue by aliasing any samples within the tolerance
        int samplesSize = samples.size();
        Queue<AckSample> aliased = new ArrayDeque(samples.size());
        while(!samples.isEmpty()) {
            AckSample pivot = samples.poll();
            int packets = pivot.acked;
            final long time = pivot.timestamp;
            AckSample sample;
            while((sample = samples.peek()) != null) {
                if (sample.timestamp <= time + _tolerance) {

                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug(this + " aliasing " + sample + " with " + pivot);

                    packets += sample.acked;
                    samples.poll();
                } else
                    break;
            }
            AckSample processed = new AckSample(time, packets);
            aliased.offer(processed);
        }

        if (_log.shouldLog(Log.DEBUG)) {
            _log.debug(this + " Aliasing: " + samplesSize + " -> " + aliased.size());
            _log.debug(this + " Aliased samples:" + aliased);
        }

        // init with first sample if necessary
        if (_tAck == 0) {
            AckSample first = aliased.poll();
            _tAck = first.timestamp;
            _bKFiltered = first.acked;
            _bK = first.acked;
        }

        // go through the samples and compute _bKFiltered
        while(!aliased.isEmpty()) {
            AckSample first = aliased.peek();
            if (first.timestamp - _tAck <= _tau / _m ) {
                // consume the sample
                aliased.poll();
                updateBK(first.timestamp, first.acked);
            } else {
                // virtual sample
                updateBK(_tAck + _tau / _m, 0);
            }
        }

        // fill up with virtual samples past the last real sample
        int virtualSamples = 0;
        while (_tAck < now - _tau / _m) {
            virtualSamples++;
            updateBK(_tAck + _tau / _m, 0);
        }

        if (_log.shouldLog(Log.DEBUG))
            _log.debug(this + " padded virtual samples at the end:" + virtualSamples);

        return _bKFiltered;
    }

    /**
     * 
     * @param time the time of the measurement
     * @param packets number of packets acked, 0 for virtual samples
     */
    private void updateBK(long time, int packets) {
        long deltaT = time - _tAck;
        double bk = packets * 1.0 / deltaT;
        double alphaK = (2.0 * _tau - deltaT) / (2.0 * _tau + deltaT);
        _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * (bk + _bK) / 2;
        _bK = bk;
        _tAck = time;
    }

    @Override
    public synchronized String toString() {
        return "WBE[" +
                " _bKFiltered " + _bKFiltered +
                " _tAck " + _tAck +
                "]";
    }
}

comment:5 Changed 9 months ago by zzz

Milestone: undecided0.9.46
Resolution: fixed
Status: newclosed

A reworked and simplified version of the patch in comment 4 above, based on Linux kernel's implementation, in 090d47ac035674bcc624dbde67d094699592f626 0.9.45-14. Still calculated on-demand, but the 'history' is now just a counter. BandwidthEstimator? is now a simple interface, to support testing and comparison of different implementations. Testing will continue. Early results show up to 40% improvement on lossy connections, in line with the Westwood+ paper.

Note: See TracTickets for help on using tickets.