Opened 4 months ago

Closed 4 months ago

#2718 closed enhancement (wontfix)

Streaming: TCP-Westwood

Reported by: Zlatin Balevsky Owned by:
Priority: minor Milestone: undecided
Component: streaming Version: 0.9.45
Keywords: Cc:
Parent Tickets: #2715 Sensitive: no

Description

This ticket builds upon #2715, or more specifically if the patch in that ticket is merged then the effort to implement Westwood is minimal.

I've chosen the constants for the low-pass filter arbitrarily. More tuning and testnet results to follow.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index a06a48809..fc56ee402 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -68,9 +69,11 @@ class Connection {
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
+    private volatile int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private final AtomicLong _nextRetransmitTime = new AtomicLong();
+    private final RetransmitEvent _retransmitEvent;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -99,6 +102,15 @@ class Connection {
     public static final long MAX_RESEND_DELAY = 45*1000;
     public static final long MIN_RESEND_DELAY = 100;
 
+    public static final int MAX_SLOW_START_WINDOW = 24;
+
+    /** Westwood parameters and state */
+    private static final int WESTWOOD_TAU = 5000;
+    private static final int WESTWOOD_M = 2;
+    private final VirtualAckEvent _virtualAckEvent;
+    private long _tAck;
+    private volatile double _bK, _bKFiltered;
+
     /**
      *  Wait up to 5 minutes after disconnection so we can ack/close packets.
      *  Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL)
@@ -156,6 +168,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
+        _ssthresh = MAX_SLOW_START_WINDOW;
         _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
@@ -165,11 +178,17 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
+        _virtualAckEvent = new VirtualAckEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +258,9 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
+                                       + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -423,10 +442,15 @@ class Connection {
             long timeout = _options.getRTO();
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+                _retransmitEvent.forceReschedule(timeout);
+            } else if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " timer was already running");
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -452,7 +476,7 @@ class Connection {
         }
          */
     }
-    
+   
 /*********
     private class PingNotifier implements ConnectionManager.PingNotifier {
         private long _startedPingOn;
@@ -492,6 +516,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -552,18 +577,58 @@ class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("All outbound packets acked, clearing " + _activeResends);
-                _activeResends.set(0);
-            }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
+            final long now = _context.clock().now();
+            updateBK(now, acked.size());
+            _virtualAckEvent.pushOut();
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _nextRetransmitTime.set(now + rto);
+                _retransmitEvent.forceReschedule(rto);
+
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+                _nextRetransmitTime.set(0);
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
+    /**
+     * Updates the bandwidth estimate
+     * @param now what time is it now
+     * @param nAcked number of packets acked, can be zero for virtual acks
+     */
+    private synchronized void updateBK(long now, int nAcked) {
+        if (_tAck > 0) {
+
+            double bKFiltered = _bKFiltered; // for debug logging only
+
+            long deltaT = Math.max(1,now - _tAck);
+            double bk = nAcked * 1.0 / deltaT;
+            double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT);
+
+            _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2;
+            _bK = bk;
+
+            if (_log.shouldLog(Log.DEBUG)) { 
+                _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered +
+                    " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); 
+            }
+        }
+        _tAck = now;
+    }
+
     //private long _occurredTime;
     //private long _occurredEventCount;
 
@@ -788,6 +853,8 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
+        _virtualAckEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1385,7 +1452,9 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
+        buf.append(" bkFiltered ").append(_bKFiltered);
+        buf.append(" minRTT ").append(getOptions().getMinRTT());
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1393,6 +1462,115 @@ class Connection {
         return buf.toString();
     }
 
+
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            ConnectionOptions opts = getOptions();
+            synchronized(opts) {
+                opts.doubleRTO();
+                reschedule(opts.getRTO());
+                _nextRetransmitTime.set(now + opts.getRTO());
+            }
+
+            // 2. cut ssthresh in accordance to what Westwood paper section 3.2
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.values().iterator().next();
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, (toResend.size() + 1) / 2);
+            }
+
+
+            // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+        }
+    }
+
+
+    /**
+     * In order for the low-pass filter to work it is important to
+     * receive bw measurements at least every so often.
+     */
+    class VirtualAckEvent extends SimpleTimer2.TimedEvent {
+        VirtualAckEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " injecting virtual ack");
+
+            final long now = _context.clock().now();
+            updateBK(now, 0);
+            pushOut();
+        }
+
+        void pushOut() {
+            forceReschedule( WESTWOOD_TAU / WESTWOOD_M );
+        }
+    }    
+
     /**
      * fired to reschedule event notification
      */
@@ -1428,13 +1606,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1454,43 +1635,11 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
+
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1526,9 +1675,6 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
                         
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
@@ -1536,7 +1682,12 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+
+                        if (_packet.getNumSends() == 1) {
+                            _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                            int wSize = getOptions().getWindowSize();
+                            getOptions().setWindowSize(Math.min(_ssthresh, wSize));
+                        }
 
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
@@ -1581,19 +1732,24 @@ class Connection {
                     long rto = _options.getRTO();
                     if (rto < MIN_RESEND_DELAY)
                         rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_nextRetransmitTime.compareAndSet(0, _nextSend)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                            _retransmitEvent.forceReschedule(timeout);
+                        }
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1605,8 +1761,6 @@ class Connection {
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
-
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1618,12 +1772,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index 1d48a33b3..6d0dd9c5c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _receiveWindow;
     private int _profile;
     private int _rtt;
+    private int _minRtt = Integer.MAX_VALUE;
     private int _rttDev;
     private int _rto = INITIAL_RTO;
     private int _resendDelay;
@@ -577,6 +578,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public synchronized int getRTT() { return _rtt; }
 
+    /**
+     * @return minimum RTT observed over the life of the connection
+     */
+    public synchronized int getMinRTT() {return _minRtt; }
+
     /**
      *  not public, use updateRTT()
      */
@@ -671,6 +677,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      *  @param measuredValue must be positive
      */
     public synchronized void updateRTT(int measuredValue) {
+        _minRtt = Math.min(_minRtt, measuredValue);
         switch(_initState) {
         case INIT:
             _initState = AckInit.FIRST;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..8065dd77c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -432,8 +430,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..ff351232d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
     private long _ackOn; 
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
-    private volatile boolean _retransmitted;
     private volatile SimpleTimer2.TimedEvent _resendEvent;
     
     /** not bound to a connection */
@@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
-        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
-            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
-            _retransmitted = true;
-            evt.reschedule(0);
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
+        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null  &&
+            (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) {  // Don't fast retx if we recently resent it
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
-                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
+                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
             }
         } else if (_log.shouldLog(Log.DEBUG)) {
-            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
+            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
         }
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..08aa1639b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;

Subtickets

Attachments (1)

vanilla_reno_westwood.ods (19.2 KB) - added by Zlatin Balevsky 4 months ago.
Testnet results from vanilla vs. Reno vs. Westwood (current implementation)

Download all attachments as: .zip

Change History (6)

Changed 4 months ago by Zlatin Balevsky

Attachment: vanilla_reno_westwood.ods added

Testnet results from vanilla vs. Reno vs. Westwood (current implementation)

comment:1 Changed 4 months ago by Zlatin Balevsky

Some observations after playing with this further:

  • if MAX_SLOW_START_WINDOW == MAX_WINDOW_SIZE chokes happen and we don't seem to recover too well
  • there needs to be a new constant MIN_DT for the minimum amount of time between two acks. In the live net sometimes more than one ack arrives in the same millisecond and with the code as it is in the patch in OP _bK explodes causing _bKFiltered to explode causing _ssthresh to max out. I'm experimenting with cutoff of 5ms, dropping any acks that arrive within that interval from one another.

comment:2 Changed 4 months ago by Zlatin Balevsky

Add a subticket #2719 (Streaming: TCP-Westwood+).

comment:3 Changed 4 months ago by zzz

re: chokes, see #1939 for background.

setChoked() forces window size to 1 now. Maybe should set ssThresh to something also so we recover faster after the #2708 change? Needs further testing on loopback. This may need to be part of the #2715 change if it goes in first.

comment:4 Changed 4 months ago by Zlatin Balevsky

Remove a subticket #2719 (Streaming: TCP-Westwood+).

comment:5 Changed 4 months ago by Zlatin Balevsky

Resolution: wontfix
Status: newclosed

Resolving as wontfix as it's obsoleted by #2719 (Westwood+)

Note: See TracTickets for help on using tickets.