Opened 2 months ago

Closed 7 weeks ago

#2708 closed defect (fixed)

Slow start not compliant with RFC 5681

Reported by: Zlatin Balevsky Owned by: zzz
Priority: major Milestone: 0.9.46
Component: streaming Version: 0.9.45
Keywords: Cc:
Parent Tickets: Sensitive: no

Description

At the moment the logic for slow start seems to be very different from what RFC 5681 says. In the attached patch the following are addressed:

  1. The slow start threshold (ssthresh) should be a variable not a constant
  2. ssthresh should be initialized with MAX_WIN size
  3. ssthresh should be reduced to half of outstanding packets on congestion
  4. Slow start should apply whenever window is less than sshthresh, as long as there is no retransmit outstanding

Subtickets

Attachments (2)

ssthresh.patch (5.9 KB) - added by Zlatin Balevsky 2 months ago.
window-ssthresh.ods (56.0 KB) - added by Zlatin Balevsky 2 months ago.
wsize,rtt,rto,ssthresh raw data and a graph

Download all attachments as: .zip

Change History (11)

Changed 2 months ago by Zlatin Balevsky

Attachment: ssthresh.patch added

comment:1 Changed 2 months ago by zzz

Nack on item 2. MAX_SLOW_START_WINDOW is very much on purpose, added by me in 2012, for the reasons stated in the comments at line 444 in CPH. While some of the underlying issues from that time have since been fixed, if the far-end rcv buffer gets filled ("choking") everything still grinds to a halt (temporarily) and so that has to be avoided at all costs. The underlying issues with ElG and Session Tags also still apply. This is mainly a "loopback" testing issue however. In the real net, blasting past 24 at the beginning is very rare anyway.

1,3, and 4 relate to reverting to slow-start mode after a connection is up. While I haven't researched it yet, I believe your premise that we don't do it correctly. For further investigation.

comment:2 Changed 2 months ago by Zlatin Balevsky

In the real net blasting past 24 at the beginning is very rare anyway

Attached is a spreadsheet that tracks two downloads with eepget over the real net with ssthresh initialized to 128. wsize blasts to 128 very quickly if the first few acks don't get lost. The spreadsheet also includes rtt and rto values.

Changed 2 months ago by Zlatin Balevsky

Attachment: window-ssthresh.ods added

wsize,rtt,rto,ssthresh raw data and a graph

comment:3 Changed 2 months ago by Zlatin Balevsky

The argument for initializing ssthresh to MAX_WIN is best laid out in the rfc:

   The initial value of ssthresh SHOULD be set arbitrarily high (e.g.,
   to the size of the largest possible advertised window), but ssthresh
   MUST be reduced in response to congestion.  Setting ssthresh as high
   as possible allows the network conditions, rather than some arbitrary
   host limit, to dictate the sending rate.  In cases where the end
   systems have a solid understanding of the network path, more
   carefully setting the initial ssthresh value may have merit (e.g.,
   such that the end host does not create congestion along the path).

The caveat in the last sentence is a point in favor of making initial ssthresh configurable from the connection options. The user right now has control over the number of hops in a tunnel which impacts the network path conditions significantly. So it makes sense for them to be able to choose an initial ssthresh value as well.

comment:4 Changed 2 months ago by zzz

Milestone: undecided0.9.46
Owner: set to zzz
Priority: minormajor
Status: newaccepted
Version: 0.9.460.9.45

Interesting. So that refutes my argument that going over 24 wouldn't happen in the real net.
So it should be higher than 24. It doesn't refute my argument that it needs to be much less than 128 to prevent choking. We do have an "arbitrary host limit" of 128 at the receiver and know that things go bad if we hit it. But no need to debate it, we'll figure it out in loopback testing.

Haven't caught up on the analysis yet but this looks like something we should get in for 46.

comment:5 Changed 8 weeks ago by zzz

Add a subticket #2715 (Streaming: single retransmit timer per connection).

comment:6 Changed 7 weeks ago by zzz

Comments and clarifications after discussion with OP:

OP point 1 is incorrect. We have an implicit ssthresh variable = getCongestionLastSeenAt() / 2. That value is last window size / 2, not flight size / 2, which is OP point 3. RFC 5681 sec. 3.1 third-to-last par. says this is an "easy mistake". Agreed we should fix it.

I think OP point 4 we do now, for same reasons as above.

One function of this patch is to make an explicit ssthresh variable which makes subsequent changes e.g. #2715 #2718 #2719 and verifying compliance with RFCs easier. That's a good thing.

I think this patch makes the _lastCongestionSeenAt variable and getter unused and can be removed.

Looks like we'll probably take #2715 not this one but putting the comments here for reference.

comment:7 Changed 7 weeks ago by zzz

After discussion with OP, we're considering a plan to take this and #2710 in advance of #2715. I've created a combined diff of this and #2710 which incorporates my comments above and also some related cleanups. I'll be testing it over the next couple days. Initial tests look good.

#
# old_revision [69648178070f14ca7258f3b1e0aaa19a3ccf812f]
#
# patch "apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java"
#  from [bfbc6aa34a26cbb9dcef4d72024b56be6ba3e0d4]
#    to [967ac2231cfc9c428a0c9275f45b85c15e7556f7]
# 
# patch "apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java"
#  from [bdffc30035a7e490de6e82d2a4f02dada3e6d40d]
#    to [78d2b5e374da56fb29ff717fa885ea6dc4daef24]
# 
# patch "apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java"
#  from [cd095aae4d38fbbd2b979b976caaf4783d919a87]
#    to [fc145320fb1743d132cfa6af6193736a1c1475db]
# 
# patch "apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java"
#  from [a411789b15b4a3216dd8de139d1ad8bedef320e6]
#    to [dd77cc396e160e05eed36a770da3f88a908db451]
# 
# patch "apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java"
#  from [0dbd4001d719eac85127aab7a1864eda67f14632]
#    to [6aa5a83204c77fc7f64140569d60b38d930451f8]
#
============================================================
--- apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java	bfbc6aa34a26cbb9dcef4d72024b56be6ba3e0d4
+++ apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java	967ac2231cfc9c428a0c9275f45b85c15e7556f7
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -67,10 +68,9 @@ class Connection {
     private final AtomicLong _disconnectScheduledOn = new AtomicLong();
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
-    /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private volatile long _nextRetransmitTime;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -156,7 +156,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
-        _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
+        _ssthresh = _options.getMaxWindowSize();
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
         _lastReceivedOn = -1;
@@ -170,6 +170,13 @@ class Connection {
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    /**
+     * @since 0.9.46
+     */
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -557,8 +564,10 @@ class Connection {
             }
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
+            _nextRetransmitTime = _context.clock().now() + getOptions().getRTO();
+        }
         return acked;
     }
 
@@ -1137,13 +1146,10 @@ class Connection {
         return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn);
     }
     
-    public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; }
-
     private void congestionOccurred() {
         // if we hit congestion and e.g. 5 packets are resent,
         // dont set the size to (winSize >> 4).  only set the
         if (_ackSinceCongestion.compareAndSet(true,false)) {
-            _lastCongestionSeenAt = _options.getWindowSize();
             _lastCongestionTime = _context.clock().now();
             _lastCongestionHighestUnacked = _lastSendId.get();
         }
@@ -1383,7 +1389,7 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1420,6 +1426,7 @@ class Connection {
     class ResendPacketEvent extends SimpleTimer2.TimedEvent {
         private final PacketLocal _packet;
         private long _nextSend;
+        private boolean _fastRetransmit;
 
         public ResendPacketEvent(PacketLocal packet, long delay) {
             super(_timer);
@@ -1434,6 +1441,14 @@ class Connection {
         public void timeReached() { retransmit(); }
 
         /**
+         * @since 0.9.46
+         */
+        void fastRetransmit() {
+            _fastRetransmit = true;
+            reschedule(0);
+        }
+
+        /**
          * Retransmit the packet if we need to.  
          *
          * ackImmediately() above calls directly in here, so
@@ -1452,6 +1467,16 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
+
+            long now = _context.clock().now();
+            long nextRetransmitTime = _nextRetransmitTime;
+            if (nextRetransmitTime > now  && !_fastRetransmit) {
+                long delay = nextRetransmitTime - now;
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug("Resend time reached but will be delayed " + delay + " for packet " + _packet);
+                forceReschedule(delay);
+                return false;
+            }
             
             //if (_log.shouldLog(Log.DEBUG))
             //    _log.debug("Resend period reached for " + _packet);
@@ -1469,8 +1494,7 @@ class Connection {
                     resend = true;
             }
             if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
+                if ( (!isLowest) && (!_fastRetransmit) ) {
                     // we want to resend this packet, but there are already active
                     // resends in the air and we dont want to make a bad situation 
                     // worse.  wait another second
@@ -1487,7 +1511,7 @@ class Connection {
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
+                if (_fastRetransmit)
                     _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
@@ -1536,6 +1560,11 @@ class Connection {
                         getOptions().doubleRTO();
                         getOptions().setWindowSize(newWindowSize);
 
+                        if (_packet.getNumSends() == 1) {
+                            int flightSize = getUnackedPacketsSent();
+                            _ssthresh = Math.max( flightSize / 2, 2 );
+                        }
+
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
                                       + "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString());
@@ -1589,7 +1618,7 @@ class Connection {
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                                  (_fastRetransmit ? "(fast) " : "(timeout) ") +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1598,6 +1627,7 @@ class Connection {
                                   + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
                         _unackedPacketsReceived.set(0);
                         _lastSendTime = _context.clock().now();
+                        _fastRetransmit = false;
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
============================================================
--- apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java	bdffc30035a7e490de6e82d2a4f02dada3e6d40d
+++ apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java	78d2b5e374da56fb29ff717fa885ea6dc4daef24
@@ -122,7 +122,6 @@ class ConnectionManager {
         _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
-        _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
         // Stats for Connection
@@ -765,7 +764,6 @@ class ConnectionManager {
             _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
             _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
             _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
-            _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
             _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
             if (I2PSocketManagerFull.pcapWriter != null)
                 I2PSocketManagerFull.pcapWriter.flush();
============================================================
--- apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java	cd095aae4d38fbbd2b979b976caaf4783d919a87
+++ apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java	fc145320fb1743d132cfa6af6193736a1c1475db
@@ -204,9 +204,11 @@ class ConnectionPacketHandler {
                 // see tickets 1939 and 2584
                 con.setNextSendTime(_context.clock().now() + IMMEDIATE_ACK_DELAY);
             } else {
-                int delay = con.getOptions().getSendAckDelay();
+                int delay;
                 if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested
                     delay = packet.getOptionalDelay();
+                else
+                    delay = con.getOptions().getSendAckDelay();
                 con.setNextSendTime(delay + _context.clock().now());
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet);
@@ -432,8 +434,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -446,7 +448,7 @@ class ConnectionPacketHandler {
                         if (newWindowSize >= MAX_SLOW_START_WINDOW)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
@@ -483,8 +485,8 @@ class ConnectionPacketHandler {
             con.setCongestionWindowEnd(newWindowSize + lowest);
                                 
             if (_log.shouldLog(Log.INFO))
-                _log.info("New window size " + newWindowSize + "/" + oldWindow + "/" + con.getOptions().getWindowSize() + " congestionSeenAt: "
-                           + con.getLastCongestionSeenAt() + " (#resends: " + numResends 
+                _log.info("New window size " + newWindowSize + "/" + oldWindow + "/" + con.getOptions().getWindowSize()
+                           + " (#resends: " + numResends 
                            + ") for " + con);
         } else {
             if (_log.shouldLog(Log.DEBUG))
============================================================
--- apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java	a411789b15b4a3216dd8de139d1ad8bedef320e6
+++ apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java	dd77cc396e160e05eed36a770da3f88a908db451
@@ -13,7 +13,6 @@ import net.i2p.util.Log;
 import net.i2p.data.SigningPrivateKey;
 import net.i2p.client.streaming.I2PSocketException;
 import net.i2p.util.Log;
-import net.i2p.util.SimpleTimer2;
 
 /**
  * This is the class used for outbound packets.
@@ -36,7 +35,7 @@ class PacketLocal extends Packet impleme
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
     private volatile boolean _retransmitted;
-    private volatile SimpleTimer2.TimedEvent _resendEvent;
+    private volatile Connection.ResendPacketEvent _resendEvent;
     
     /** not bound to a connection */
     public PacketLocal(I2PAppContext ctx, Destination to, I2PSession session) {
@@ -133,13 +132,14 @@ class PacketLocal extends Packet impleme
     
     public long getCreatedOn() { return _createdOn; }
     public long getLifetime() { return _context.clock().now() - _createdOn; }
+
     public void incrementSends() { 
         _numSends.incrementAndGet();
         _lastSend = _context.clock().now();
     }
     
     private void cancelResend() {
-        SimpleTimer2.TimedEvent ev = _resendEvent;
+        Connection.ResendPacketEvent ev = _resendEvent;
         if (ev != null) 
             ev.cancel();
     }
@@ -166,7 +166,7 @@ class PacketLocal extends Packet impleme
             _log.debug("Cancelled! " + toString(), new Exception("cancelled"));
     }
 
-    public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; }
+    public Connection.ResendPacketEvent getResendEvent() { return _resendEvent; }
     
     /** how long after packet creation was it acked?
      * @return how long after packet creation the packet was ACKed in ms
@@ -177,6 +177,7 @@ class PacketLocal extends Packet impleme
         else
             return (int)(_ackOn - _createdOn);
     }
+
     public int getNumSends() { return _numSends.get(); }
     public long getLastSend() { return _lastSend; }
 
@@ -189,11 +190,11 @@ class PacketLocal extends Packet impleme
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
+        Connection.ResendPacketEvent evt = _resendEvent;
         if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
             (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
             _retransmitted = true;
-            evt.reschedule(0);
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
@@ -209,9 +210,10 @@ class PacketLocal extends Packet impleme
                     _log.debug(log);
         }
     }
+
     public int getNACKs() { return _nackCount.get(); }
     
-    public void setResendPacketEvent(SimpleTimer2.TimedEvent evt) { _resendEvent = evt; }
+    public void setResendPacketEvent(Connection.ResendPacketEvent evt) { _resendEvent = evt; }
 
     /**
      * Sign and write the packet to the buffer (starting at the offset) and return
============================================================
--- apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java	0dbd4001d719eac85127aab7a1864eda67f14632
+++ apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java	6aa5a83204c77fc7f64140569d60b38d930451f8
@@ -117,7 +117,7 @@ class PacketQueue implements SendMessage
             // this should not block!
             begin = _context.clock().now();
             long expires = 0;
-            Connection.ResendPacketEvent rpe = (Connection.ResendPacketEvent) packet.getResendEvent();
+            Connection.ResendPacketEvent rpe = packet.getResendEvent();
             if (rpe != null) {
                 // we want the router to expire it a little before we do,
                 // so if we retransmit it will use a new tunnel/lease combo

comment:8 Changed 7 weeks ago by zzz

Remove a subticket #2715 (Streaming: single retransmit timer per connection).

comment:9 Changed 7 weeks ago by zzz

Resolution: fixed
Status: acceptedclosed
Note: See TracTickets for help on using tickets.