Opened 8 weeks ago

Closed 6 weeks ago

#2715 closed enhancement (fixed)

Streaming: single retransmit timer per connection

Reported by: Zlatin Balevsky Owned by:
Priority: major Milestone: 0.9.46
Component: streaming Version: 0.9.45
Keywords: Cc:
Parent Tickets: #2711 Sensitive: no

Description

This patch does for streaming what #2713 does for SSU, i.e. complete rewrite of the re-transmission logic. This ticket obsoletes ticket #2708 #2709 #2710 and #2711

I've put the relevant RFC references in the comments in the code. In short, what changes is:

  • Single SimpleTimer2.TimedEvent? instance per Connection, not per Packet.
  • That event gets rescheduled or cancelled in accordance with the RFCs
  • ssthresh is an explicit variable, send window gets adjusted according to the rules in the RFCs

Testnet benchmarks as soon as I have them.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index a06a48809..a48b016a4 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -68,9 +69,11 @@ class Connection {
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
+    private volatile int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private final AtomicLong _nextRetransmitTime = new AtomicLong();
+    private final RetransmitEvent _retransmitEvent;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -156,6 +159,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
+        _ssthresh = _options.getMaxWindowSize();
         _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
@@ -165,11 +169,16 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +248,9 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
+                                       + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -423,10 +432,15 @@ class Connection {
             long timeout = _options.getRTO();
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+                _retransmitEvent.forceReschedule(timeout);
+            } else if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " timer was already running");
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -452,7 +466,7 @@ class Connection {
         }
          */
     }
-    
+   
 /*********
     private class PingNotifier implements ConnectionManager.PingNotifier {
         private long _startedPingOn;
@@ -492,6 +506,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -552,15 +567,28 @@ class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("All outbound packets acked, clearing " + _activeResends);
-                _activeResends.set(0);
-            }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                final long now = _context.clock().now();
+                int rto = getOptions().getRTO();
+                _nextRetransmitTime.set(now + rto);
+                _retransmitEvent.forceReschedule(rto);
+
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+                _nextRetransmitTime.set(0);
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
@@ -788,6 +816,7 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1385,7 +1414,7 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1393,6 +1422,94 @@ class Connection {
         return buf.toString();
     }
 
+
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            ConnectionOptions opts = getOptions();
+            synchronized(opts) {
+                opts.doubleRTO();
+                reschedule(opts.getRTO());
+                _nextRetransmitTime.set(now + opts.getRTO());
+            }
+
+            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.values().iterator().next();
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    int flightSize = _outboundPackets.size();
+                    _ssthresh = Math.max( flightSize / 2, 2 );
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, (toResend.size() + 1) / 2);
+            }
+
+
+            // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+        }
+    }
+
+    
+
     /**
      * fired to reschedule event notification
      */
@@ -1428,13 +1545,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1454,43 +1574,11 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
+
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1526,9 +1614,6 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
                         
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
@@ -1536,7 +1621,12 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+                        getOptions().setWindowSize(1);
+
+                        if (_packet.getNumSends() == 1) {
+                            int flightSize = getUnackedPacketsSent();
+                            _ssthresh = Math.max( flightSize / 2, 2 );
+                        }
 
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
@@ -1581,19 +1671,24 @@ class Connection {
                     long rto = _options.getRTO();
                     if (rto < MIN_RESEND_DELAY)
                         rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_nextRetransmitTime.compareAndSet(0, _nextSend)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                            _retransmitEvent.forceReschedule(timeout);
+                        }
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1605,8 +1700,6 @@ class Connection {
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
-
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1618,12 +1711,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..8065dd77c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -432,8 +430,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..ff351232d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
     private long _ackOn; 
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
-    private volatile boolean _retransmitted;
     private volatile SimpleTimer2.TimedEvent _resendEvent;
     
     /** not bound to a connection */
@@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
-        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
-            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
-            _retransmitted = true;
-            evt.reschedule(0);
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
+        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null  &&
+            (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) {  // Don't fast retx if we recently resent it
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
-                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
+                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
             }
         } else if (_log.shouldLog(Log.DEBUG)) {
-            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
+            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
         }
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..08aa1639b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;

Subtickets

#2718: Streaming: TCP-Westwoodclosed

Attachments (2)

single_timer_streaming_in_loss.ods (16.3 KB) - added by Zlatin Balevsky 8 weeks ago.
Results from testing the patch in the testnet with various packet loss probabilities.
2715_window_metrics.ods (142.7 KB) - added by Zlatin Balevsky 8 weeks ago.
Window metrics for vanilla vs patched in live net, 16MB eepget download

Download all attachments as: .zip

Change History (11)

Changed 8 weeks ago by Zlatin Balevsky

Results from testing the patch in the testnet with various packet loss probabilities.

Changed 8 weeks ago by Zlatin Balevsky

Attachment: 2715_window_metrics.ods added

Window metrics for vanilla vs patched in live net, 16MB eepget download

comment:1 Changed 8 weeks ago by zzz

Parent Tickets: 2708, 2709, 2710, 2711

Setting possibly-obsoleted tickets as parents for tracking

comment:2 Changed 8 weeks ago by zzz

For the reasons stated in #2708 please retain the MAX_SLOW_START_WINDOW constant. We cannot allow the slow start to go all the way to MAX_WINDOW_SIZE. It should be fine to increase it to maybe 48 or even higher, after testing. If I'm proven wrong later, we can always just change it to MAX_SLOW_START_WINDOW = MAX_WINDOW_SIZE.

comment:3 Changed 8 weeks ago by Zlatin Balevsky

please retain the MAX_SLOW_START_WINDOW constant

Attached is a patch with _ssthresh capped at MAX_SLOW_START_WINDOW, albeigt in a different manner.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index a06a48809..c991f1d65 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -68,9 +69,11 @@ class Connection {
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
+    private volatile int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private final AtomicLong _nextRetransmitTime = new AtomicLong();
+    private final RetransmitEvent _retransmitEvent;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -99,6 +102,8 @@ class Connection {
     public static final long MAX_RESEND_DELAY = 45*1000;
     public static final long MIN_RESEND_DELAY = 100;
 
+    public static final int MAX_SLOW_START_WINDOW = 24;
+
     /**
      *  Wait up to 5 minutes after disconnection so we can ack/close packets.
      *  Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL)
@@ -156,6 +161,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
+        _ssthresh = MAX_SLOW_START_WINDOW;
         _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
@@ -165,11 +171,16 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +250,9 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
+                                       + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -423,10 +434,15 @@ class Connection {
             long timeout = _options.getRTO();
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+                _retransmitEvent.forceReschedule(timeout);
+            } else if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " timer was already running");
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -452,7 +468,7 @@ class Connection {
         }
          */
     }
-    
+   
 /*********
     private class PingNotifier implements ConnectionManager.PingNotifier {
         private long _startedPingOn;
@@ -492,6 +508,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -552,15 +569,28 @@ class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("All outbound packets acked, clearing " + _activeResends);
-                _activeResends.set(0);
-            }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                final long now = _context.clock().now();
+                int rto = getOptions().getRTO();
+                _nextRetransmitTime.set(now + rto);
+                _retransmitEvent.forceReschedule(rto);
+
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+                _nextRetransmitTime.set(0);
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
@@ -788,6 +818,7 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1385,7 +1416,7 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1393,6 +1424,94 @@ class Connection {
         return buf.toString();
     }
 
+
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            ConnectionOptions opts = getOptions();
+            synchronized(opts) {
+                opts.doubleRTO();
+                reschedule(opts.getRTO());
+                _nextRetransmitTime.set(now + opts.getRTO());
+            }
+
+            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.values().iterator().next();
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    int flightSize = _outboundPackets.size();
+                    _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, (toResend.size() + 1) / 2);
+            }
+
+
+            // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+        }
+    }
+
+    
+
     /**
      * fired to reschedule event notification
      */
@@ -1428,13 +1547,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1454,43 +1576,11 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
+
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1526,9 +1616,6 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
                         
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
@@ -1536,7 +1623,12 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+                        getOptions().setWindowSize(1);
+
+                        if (_packet.getNumSends() == 1) {
+                            int flightSize = getUnackedPacketsSent();
+                            _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                        }
 
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
@@ -1581,19 +1673,24 @@ class Connection {
                     long rto = _options.getRTO();
                     if (rto < MIN_RESEND_DELAY)
                         rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_nextRetransmitTime.compareAndSet(0, _nextSend)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                            _retransmitEvent.forceReschedule(timeout);
+                        }
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1605,8 +1702,6 @@ class Connection {
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
-
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1618,12 +1713,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..8065dd77c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -432,8 +430,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..ff351232d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
     private long _ackOn; 
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
-    private volatile boolean _retransmitted;
     private volatile SimpleTimer2.TimedEvent _resendEvent;
     
     /** not bound to a connection */
@@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
-        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
-            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
-            _retransmitted = true;
-            evt.reschedule(0);
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
+        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null  &&
+            (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) {  // Don't fast retx if we recently resent it
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
-                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
+                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
             }
         } else if (_log.shouldLog(Log.DEBUG)) {
-            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
+            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
         }
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..08aa1639b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;

comment:4 Changed 7 weeks ago by Zlatin Balevsky

Add a subticket #2718 (Streaming: TCP-Westwood).

comment:5 Changed 7 weeks ago by zzz

Parent Tickets: 2708, 2709, 2710, 27112708, 2710, 2711

Removing #2709 as parent, fixing it separately.

comment:6 Changed 7 weeks ago by Zlatin Balevsky

Attached is an updated diff against current master with some modifications:

  1. The atomic retransmit time is removed in favor of synced flag in the event
  2. SortedMap?.firstKey() is used instead of an iterator
  3. The amount of packets to be retransmitted is now capped by a constant, which I've set at 1
  4. cwin is incremented on acks even if the ack acknowledges packets that have been retransmitted
  5. The multiple fast retransmit modification (ticket #2711) has been removed pending further discussion.

Regarding point 3 - RFC 6298 section 5.4 states that only the earliest non-acked segment should be retransmitted, whereas RFC 5681 says no more than half of the outstanding data should be retransmitted. My opinion is that if there has been a network congestion and the window had been large, retransmitting half of it in one go is going to make things worse. This is the current behavior and I believe making the change to cap the number of retransmitted packets to 1 is going to reduce overal congestions in the network once deployed.

That leads to point 4 - if only one segment is retransmitted when the timer expires, when that segment gets acked it will not result in increase of cwin (which will have been set to 1 as per RFC 5681) then the connection will remain at cwin of 1 until an ack that acks only non-retransmitted packets, which could be a while. From anecdotal testing in the live net this combination performed the best out of the various Reno flavors.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index ed549b8ab..4ab97e2d2 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -5,6 +5,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,10 +54,11 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
-    private final Map<Long, PacketLocal> _outboundPackets;
+    private final SortedMap<Long, PacketLocal> _outboundPackets;
     private final PacketQueue _outboundQueue;
     private final ConnectionPacketHandler _handler;
     private ConnectionOptions _options;
@@ -84,6 +86,7 @@ class Connection {
     /** how many messages have been resent and not yet ACKed? */
     private final AtomicInteger _activeResends = new AtomicInteger();
     private final ConEvent _connectionEvent;
+    private final RetransmitEvent _retransmitEvent;
     private final int _randomWait;
     private final int _localPort;
     private final int _remotePort;
@@ -110,6 +113,11 @@ class Connection {
 
     public static final int MAX_WINDOW_SIZE = 128;
     private static final int UNCHOKES_TO_SEND = 8;
+
+    public static final int MAX_SLOW_START_WINDOW = 24;
+
+    /** Maximum number of packets to retransmit when the timer hits */
+    private static final int MAX_RTX = 1;
     
 /****
     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
@@ -160,6 +168,8 @@ class Connection {
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
         _lastReceivedOn = -1;
+        _ssthresh = MAX_SLOW_START_WINDOW;
+        _retransmitEvent = new RetransmitEvent();
         _activityTimer = new ActivityTimer();
         _ackSinceCongestion = new AtomicBoolean(true);
         _connectLock = new Object();
@@ -170,6 +180,10 @@ class Connection {
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +253,8 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -424,7 +437,16 @@ class Connection {
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+            } else {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " timer was already running");
+            }
+
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -490,6 +512,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -555,10 +578,28 @@ class Connection {
                     _log.info("All outbound packets acked, clearing " + _activeResends);
                 _activeResends.set(0);
             }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+
+
+        if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _retransmitEvent.pushBackRTO(rto);
+                
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
@@ -786,6 +827,7 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1384,6 +1426,7 @@ class Connection {
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
         
+        buf.append(" ssThresh ").append(_ssthresh);
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1391,6 +1434,115 @@ class Connection {
         return buf.toString();
     }
 
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _scheduled;
+
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public synchronized boolean cancel() {
+            _scheduled = false;
+            return super.cancel();
+        }
+
+        public synchronized boolean scheduleIfNotRunning(long delay) {
+            if (_scheduled)
+                return false;
+            _scheduled = true;
+            schedule(delay);
+            return true;
+        }
+
+        public synchronized void pushBackRTO(int rto) {
+            if (!_scheduled)
+                throw new IllegalStateException(Connection.this + " timer was not scheduled");
+            reschedule(rto, false);
+        }
+
+        @Override
+        public void timeReached() {
+
+           if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " rtx event after close or reset");
+                return;
+            }
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            pushBackRTO(getOptions().doubleRTO());
+
+            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.get(_outboundPackets.firstKey());
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    int flightSize = _outboundPackets.size();
+                    _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, Math.min(MAX_RTX, (toResend.size() + 1) / 2));
+            }
+
+            // 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+
+        }
+    }
+
     /**
      * fired to reschedule event notification
      */
@@ -1426,13 +1578,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1453,42 +1608,7 @@ class Connection {
                 return false;
             }
             
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
-                
-                // It's the lowest, or it's fast retransmit time. Resend the packet.
-
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1524,20 +1644,21 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
-                        
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
                         // See RFC 6298 section 5 item 5.5
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+                        getOptions().setWindowSize(1);
+
+                        if (_packet.getNumSends() == 1) {
+                            int flightSize = getUnackedPacketsSent();
+                            _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 ));
+                        }
 
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
+                            _log.info(Connection.this + " Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
                                       + "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString());
 
                         windowAdjusted();
@@ -1577,19 +1698,24 @@ class Connection {
                 } else {
                     //long timeout = _options.getResendDelay() << numSends;
                     long rto = _options.getRTO();
-                    long timeout = rto << (numSends-2);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                        }
+
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1602,7 +1728,6 @@ class Connection {
                         resetActivityTimer();
                     }
 
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1614,12 +1739,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..a51ae9f8e 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -431,9 +429,9 @@ class ConnectionPacketHandler {
 
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
-            if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+            if ( (!congested) && (acked > 0) ) {
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..027996cb2 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -189,11 +189,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
         if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
             (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
             _retransmitted = true;
-            evt.reschedule(0);
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..e1863539f 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_SLOW_START_WINDOW;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;

comment:7 Changed 7 weeks ago by zzz

Parent Tickets: 2708, 2710, 27112711

Remove #2708 and #2710 as parents, fixed them separately in 50c3a79c761dfbb1eb24e4af01c6e2fdef6df736 0.9.45-9

comment:8 Changed 7 weeks ago by Zlatin Balevsky

Rebased patch from comment 6 against 0.9.45-9. Only other change is instead of throwing an ISE a message gets logged at level ERROR.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index 1a399426b..54bf7c6d7 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -5,6 +5,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,7 @@ class Connection {
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
-    private final Map<Long, PacketLocal> _outboundPackets;
+    private final SortedMap<Long, PacketLocal> _outboundPackets;
     private final PacketQueue _outboundQueue;
     private final ConnectionPacketHandler _handler;
     private ConnectionOptions _options;
@@ -70,7 +71,6 @@ class Connection {
     private final ActivityTimer _activityTimer;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
-    private volatile long _nextRetransmitTime;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -84,6 +84,7 @@ class Connection {
     /** how many messages have been resent and not yet ACKed? */
     private final AtomicInteger _activeResends = new AtomicInteger();
     private final ConEvent _connectionEvent;
+    private final RetransmitEvent _retransmitEvent;
     private final int _randomWait;
     private final int _localPort;
     private final int _remotePort;
@@ -110,6 +111,9 @@ class Connection {
 
     public static final int MAX_WINDOW_SIZE = 128;
     private static final int UNCHOKES_TO_SEND = 8;
+
+    /** Maximum number of packets to retransmit when the timer hits */
+    private static final int MAX_RTX = 1;
     
 /****
     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
@@ -165,6 +169,7 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
@@ -428,10 +433,16 @@ class Connection {
             }
             
             long timeout = _options.getRTO();
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+            } else {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " timer was already running");
+            }
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -497,6 +508,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -562,11 +574,25 @@ class Connection {
                     _log.info("All outbound packets acked, clearing " + _activeResends);
                 _activeResends.set(0);
             }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
         if ((acked != null) && (!acked.isEmpty()) ) {
             _ackSinceCongestion.set(true);
-            _nextRetransmitTime = _context.clock().now() + getOptions().getRTO();
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _retransmitEvent.pushBackRTO(rto);
+                
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+
+                _retransmitEvent.cancel();
+            }
         }
         return acked;
     }
@@ -795,6 +821,7 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1397,6 +1424,119 @@ class Connection {
         return buf.toString();
     }
 
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _scheduled;
+
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        @Override
+        public synchronized boolean cancel() {
+            _scheduled = false;
+            return super.cancel();
+        }
+
+        public synchronized boolean scheduleIfNotRunning(long delay) {
+            if (_scheduled)
+                return false;
+            _scheduled = true;
+            schedule(delay);
+            return true;
+        }
+
+        public synchronized void pushBackRTO(int rto) {
+            if (!_scheduled) {
+                _log.log(Log.ERROR, Connection.this + " timer was not scheduled", new Exception());
+            }
+            reschedule(rto, false);
+        }
+
+        @Override
+        public void timeReached() {
+
+           if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " rtx event after close or reset");
+                return;
+            }
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            pushBackRTO(getOptions().doubleRTO());
+
+            // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4)
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.get(_outboundPackets.firstKey());
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    int flightSize = _outboundPackets.size();
+                    _ssthresh = Math.max( flightSize / 2, 2 );
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, Math.min(MAX_RTX, (toResend.size() + 1) / 2));
+            }
+
+            // 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+
+        }
+    }
+
+
+
     /**
      * fired to reschedule event notification
      */
@@ -1426,14 +1566,12 @@ class Connection {
     class ResendPacketEvent extends SimpleTimer2.TimedEvent {
         private final PacketLocal _packet;
         private long _nextSend;
-        private boolean _fastRetransmit;
 
         public ResendPacketEvent(PacketLocal packet, long delay) {
             super(_timer);
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
@@ -1444,7 +1582,6 @@ class Connection {
          * @since 0.9.46
          */
         void fastRetransmit() {
-            _fastRetransmit = true;
             reschedule(0);
         }
 
@@ -1469,50 +1606,7 @@ class Connection {
             }
 
             long now = _context.clock().now();
-            long nextRetransmitTime = _nextRetransmitTime;
-            if (nextRetransmitTime > now  && !_fastRetransmit) {
-                long delay = nextRetransmitTime - now;
-                if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Resend time reached but will be delayed " + delay + " for packet " + _packet);
-                forceReschedule(delay);
-                return false;
-            }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                if ( (!isLowest) && (!_fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
-                
-                // It's the lowest, or it's fast retransmit time. Resend the packet.
-
-                if (_fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+               _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1548,17 +1642,13 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
-                        
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
                         // See RFC 6298 section 5 item 5.5
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+                        getOptions().setWindowSize(1);
 
                         if (_packet.getNumSends() == 1) {
                             int flightSize = getUnackedPacketsSent();
@@ -1606,19 +1696,24 @@ class Connection {
                 } else {
                     //long timeout = _options.getResendDelay() << numSends;
                     long rto = _options.getRTO();
-                    long timeout = rto << (numSends-2);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_retransmitEvent.scheduleIfNotRunning(timeout)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                        }
+
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Resent packet " +
-                                  (_fastRetransmit ? "(fast) " : "(timeout) ") +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1627,12 +1722,10 @@ class Connection {
                                   + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
                         _unackedPacketsReceived.set(0);
                         _lastSendTime = _context.clock().now();
-                        _fastRetransmit = false;
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
 
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1644,12 +1737,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index d6b7ba577..c2c966969 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -433,7 +433,7 @@ class ConnectionPacketHandler {
 
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
-            if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
+            if ( (!congested) && (acked > 0) ) {
                 int ssthresh = con.getSSThresh();
                 if (newWindowSize < ssthresh) {
                     // slow start - exponential growth

comment:9 Changed 6 weeks ago by zzz

Milestone: undecided0.9.46
Resolution: fixed
Status: newclosed

We made significant additional changes to the above patch to fix bugs, clean up the now-used-only-for-fast-retransmit ResendPacketEvent? timer, and other cleanups and minor changes.

In 8c808c3575c5ecf24ff2429174457d053dbd382a 0.9.45-11

Note: See TracTickets for help on using tickets.