Opened 7 months ago

Closed 6 days ago

#2713 closed enhancement (fixed)

SSU: single streaming timer, reset after ack

Reported by: Zlatin Balevsky Owned by: zzz
Priority: major Milestone: 0.9.48
Component: router/transport Version: 0.9.45
Keywords: ssu Cc:
Parent Tickets: Sensitive: no

Description (last modified by Zlatin Balevsky)

This is the SSU equivalent of #2710. There we have the problem - each message has it's own timer so to speak, whereas there should be a single timer per connection.

The patch below replaces the nextSendTime variable in OutboundMessageState with a single variable in PeerState. There are few other changes to bring the implementation more in-line with RFCs 6298 and 5681: up to half of the outstanding window can be retransmitted during congestion as opposed to just a single message as it is now.

The patch also includes the patch to #2714

Testnet results in a synthetic (no loss or delay) and realistic (70ms delay, 0.5% loss) will be attached soon.

diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
index 2159b7707..ad8e7f2a7 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
@@ -270,8 +270,8 @@ class OutboundMessageFragments {
         List<OutboundMessageState> states = null;
         // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
         int peersProcessed = 0;
+        int nextSendDelay = Integer.MAX_VALUE;
         while (_alive && (states == null) ) {
-            int nextSendDelay = Integer.MAX_VALUE;
             // no, not every time - O(n**2) - do just before waiting below
             //finishMessages();
 
@@ -288,30 +288,29 @@ class OutboundMessageFragments {
                     // Otherwise, wait()
                     long now = _context.clock().now();
                     while (_iterator.hasNext()) {
-                        peer = _iterator.next();
-                        int remaining = peer.finishMessages(now);
+                        PeerState p = _iterator.next();
+                        int remaining = p.finishMessages(now);
                         if (remaining <= 0) {
                             // race with add()
                             _iterator.remove();
                             if (_log.shouldLog(Log.DEBUG))
-                                _log.debug("No more pending messages for " + peer.getRemotePeer());
+                                _log.debug("No more pending messages for " + p.getRemotePeer());
                             continue;
                         }
                         peersProcessed++;
-                        states = peer.allocateSend();
+                        states = p.allocateSend();
                         if (states != null) {
+                            peer = p;
                             // we have something to send and we will be returning it
                             break;
-                        } else if (peersProcessed >= _activePeers.size()) {
+                        } 
+                        int delay = p.getNextDelay();
+                        if (delay < nextSendDelay)
+                            nextSendDelay = delay;
+                        
+                        if (peersProcessed >= _activePeers.size()) {
                             // we've gone all the way around, time to sleep
                             break;
-                        } else {
-                            // Update the minimum delay for all peers
-                            // which will be used if we found nothing to send across all peers
-                            int delay = peer.getNextDelay();
-                            if (delay < nextSendDelay)
-                                nextSendDelay = delay;
-                            peer = null;
                         }
                     }
 
@@ -329,8 +328,10 @@ class OutboundMessageFragments {
                         // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
                         // gets called regularly
                         int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("wait for " + toWait);
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("wait for " + toWait);
+
+                        nextSendDelay = Integer.MAX_VALUE;
                         // wait.. or somethin'
                         synchronized (_activePeers) {
                             try {
@@ -370,6 +371,12 @@ class OutboundMessageFragments {
         return packets;
     }
 
+    void nudge() {
+        synchronized(_activePeers) {
+            _activePeers.notifyAll();
+        }
+    }
+
     /**
      *  @return null if state or peer is null
      */
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
index f5a42ee27..f010e4dd3 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
@@ -31,7 +31,6 @@ class OutboundMessageState implements CDPQEntry {
     private long _fragmentAcks;
     private final int _numFragments;
     private final long _startedOn;
-    private long _nextSendTime;
     private int _pushCount;
     private int _maxSends;
     // we can't use the ones in _message since it is null for injections
@@ -77,7 +76,6 @@ class OutboundMessageState implements CDPQEntry {
         _i2npMessage = msg;
         _peer = peer;
         _startedOn = _context.clock().now();
-        _nextSendTime = _startedOn;
         _expiration = _startedOn + EXPIRATION;
         //_expiration = msg.getExpiration();
 
@@ -166,9 +164,6 @@ class OutboundMessageState implements CDPQEntry {
         return isComplete();
     }
     
-    public long getNextSendTime() { return _nextSendTime; }
-    public void setNextSendTime(long when) { _nextSendTime = when; }
-
     /**
      *  The max number of sends for any fragment, which is the
      *  same as the push count, at least as it's coded now.
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index d67390e5b..44a27395c 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -219,8 +219,8 @@ public class PeerState {
     //private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
     private final PriBlockingQueue<OutboundMessageState> _outboundQueue;
 
-    /** which outbound message is currently being retransmitted */
-    private OutboundMessageState _retransmitter;
+    /** when the retransmit timer is about to trigger */
+    private volatile long _retransmitTimer;
     
     private final UDPTransport _transport;
     
@@ -246,9 +246,6 @@ public class PeerState {
     private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
 
-    /** max number of msgs returned from allocateSend() */
-    private static final int MAX_ALLOCATE_SEND = 2;
-
     /**
      *  Was 32 before 0.9.2, but since the streaming lib goes up to 128,
      *  we would just drop our own msgs right away during slow start.
@@ -928,6 +925,14 @@ public class PeerState {
             _sendWindowBytes = MINIMUM_WINDOW_BYTES;
         //if (congestionAt/2 < _slowStartThreshold)
             _slowStartThreshold = congestionAt/2;
+
+        int oldRto = _rto;
+        long oldTimer = _retransmitTimer - now;
+        _rto = Math.min(MAX_RTO, Math.max(minRTO(), _rto << 1 ));
+        _retransmitTimer = now + _rto;
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
+
         return true;
     }
     
@@ -1186,7 +1191,7 @@ public class PeerState {
      *  We sent a message which was ACKed containing the given # of bytes.
      *  Caller should synch on this
      */
-    private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) {
+    private void locked_messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
         _consecutiveFailedSends = 0;
         // _lastFailedSendPeriod = -1;
         if (numSends < 2) {
@@ -1231,17 +1236,31 @@ public class PeerState {
                 adjustMTU();
             //}
         }
+        
+        if (!anyPending) {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(_remotePeer + " nothing pending, cancelling timer");
+            _retransmitTimer = 0;
+        } else {
+            // any time new data gets acked, push out the timer
+            long now = _context.clock().now();
+            long oldTimer = _retransmitTimer - now;
+            _retransmitTimer = now + getRTO();
+            if (_log.shouldLog(Log.DEBUG))
+               _log.debug(_remotePeer + " ACK, timer: " + oldTimer + " -> " + (_retransmitTimer - now));
+        }
+        _transport.getOMF().nudge();
     }
 
     /**
      *  We sent a message which was ACKed containing the given # of bytes.
      */
-    private void messageACKed(int bytesACKed, long lifetime, int numSends) {
+    private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
         synchronized(this) {
-            locked_messageACKed(bytesACKed, lifetime, numSends);
+            locked_messageACKed(bytesACKed, lifetime, numSends, anyPending);
         }
         if (numSends >= 2 && _log.shouldLog(Log.INFO))
-            _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
+            _log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
         
         _context.statManager().addRateData("udp.sendBps", _sendBps);
     }
@@ -1548,7 +1567,6 @@ public class PeerState {
 
             List<OutboundMessageState> tempList;
             synchronized (_outboundMessages) {
-                    _retransmitter = null;
                     tempList = new ArrayList<OutboundMessageState>(_outboundMessages);
                     _outboundMessages.clear();
             }
@@ -1610,21 +1628,15 @@ public class PeerState {
                 OutboundMessageState state = iter.next();
                 if (state.isComplete()) {
                     iter.remove();
-                    if (_retransmitter == state)
-                        _retransmitter = null;
                     if (succeeded == null) succeeded = new ArrayList<OutboundMessageState>(4);
                     succeeded.add(state);
                 } else if (state.isExpired(now)) {
                     iter.remove();
-                    if (_retransmitter == state)
-                        _retransmitter = null;
                     _context.statManager().addRateData("udp.sendFailed", state.getPushCount());
                     if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
                     failed.add(state);
                 } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
                     iter.remove();
-                    if (state == _retransmitter)
-                        _retransmitter = null;
                     _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount());
                     if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
                     failed.add(state);
@@ -1667,9 +1679,28 @@ public class PeerState {
      * @return allocated messages to send (never empty), or null if no messages or no resources
      */
     List<OutboundMessageState> allocateSend() {
+        long now = _context.clock().now();
+        List<OutboundMessageState> rv = allocateSend2(now >= _retransmitTimer);
+        if (rv != null && !rv.isEmpty()) {
+            synchronized(this) {
+                long old = _retransmitTimer;
+                if (_retransmitTimer == 0)
+                    _retransmitTimer = now + getRTO();
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
+            }
+        }
+        return rv;
+    }
+
+    /**
+     * @param canSendOld if any already sent messages can be sent.  If false, only new messages will be considered
+     */
+    private List<OutboundMessageState> allocateSend2(boolean canSendOld) {
         if (_dead) return null;
         List<OutboundMessageState> rv = null;
         synchronized (_outboundMessages) {
+            if (canSendOld) {
             for (OutboundMessageState state : _outboundMessages) {
                 // We have 3 return values, because if allocateSendingBytes() returns false,
                 // then we can stop iterating.
@@ -1686,18 +1717,22 @@ public class PeerState {
                     }
                      */
                     if (rv == null)
-                        rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
+                        rv = new ArrayList<OutboundMessageState>(_outboundMessages.size());
                     rv.add(state);
-                    if (rv.size() >= MAX_ALLOCATE_SEND)
+                    if (rv.size() >= _outboundMessages.size() / 2)
                         return rv;
                 } else if (should == ShouldSend.NO_BW) {
                     // no more bandwidth available
                     // we don't bother looking for a smaller msg that would fit.
                     // By not looking further, we keep strict sending order, and that allows
                     // some efficiency in acked() below.
-                    if (rv == null && _log.shouldLog(Log.DEBUG))
-                        _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
-                                   " / " + _outboundQueue.size() + " remaining");
+                    if (_log.shouldLog(Log.DEBUG)) {
+                        if (rv == null)
+                            _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
+                                       " / " + _outboundQueue.size() + " remaining");
+                        else 
+                           _log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
+                    }
                     return rv;
                 } /* else {
                     OutNetMessage msg = state.getMessage();
@@ -1705,7 +1740,7 @@ public class PeerState {
                         msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
                 } */
             }
-
+            }
             // Peek at head of _outboundQueue and see if we can send it.
             // If so, pull it off, put it in _outbundMessages, test
             // again for bandwidth if necessary, and return it.
@@ -1729,9 +1764,9 @@ public class PeerState {
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
                         if (rv == null)
-                            rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
+                            rv = new ArrayList<OutboundMessageState>(_concurrentMessagesAllowed);
                         rv.add(dequeuedState);
-                        if (rv.size() >= MAX_ALLOCATE_SEND)
+                        if (rv.size() >= _concurrentMessagesAllowed)
                             return rv;
                     }
                 }
@@ -1739,7 +1774,8 @@ public class PeerState {
         }
         if ( rv == null && _log.shouldLog(Log.DEBUG))
             _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
-                       " / " + _outboundQueue.size() + " remaining");
+                       " / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now()));
+
         return rv;
     }
     
@@ -1755,23 +1791,10 @@ public class PeerState {
         int rv = Integer.MAX_VALUE;
         if (_dead) return rv;
         long now = _context.clock().now();
-        synchronized (_outboundMessages) {
-            if (_retransmitter != null) {
-                rv = (int)(_retransmitter.getNextSendTime() - now);
-                return rv;
-            }
-            for (OutboundMessageState state : _outboundMessages) {
-                int delay = (int)(state.getNextSendTime() - now);
-                // short circuit once we hit something ready to go
-                if (delay <= 0)
-                    return delay;
-                if (delay < rv)
-                    rv = delay;
-            }
+        synchronized(this) {
+            if (_retransmitTimer >= now) 
+                return (int) (_retransmitTimer - now);
         }
-        // failsafe... is this OK?
-        if (rv > 100 && !_outboundQueue.isEmpty())
-            rv = 100;
         return rv;
     }
 
@@ -1827,52 +1850,19 @@ public class PeerState {
      */
     private ShouldSend locked_shouldSend(OutboundMessageState state) {
         long now = _context.clock().now();
-        if (state.getNextSendTime() <= now) {
-            OutboundMessageState retrans = _retransmitter;
-            if ( (retrans != null) && ( (retrans.isExpired(now) || retrans.isComplete()) ) ) {
-                _retransmitter = null;
-                retrans = null;
-	    }
-            
-            if ( (retrans != null) && (retrans != state) ) {
-                // choke it, since there's already another message retransmitting to this
-                // peer.
-                _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted);
-                int max = state.getMaxSends();
-                if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
-                } else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("choked, with another message retransmitting");
-                    return ShouldSend.NO;
-                } else {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");                    
-                }
-            }
-
             int size = state.getUnackedSize();
             if (allocateSendingBytes(size, state.getPushCount())) {
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Allocation of " + size + " allowed with " 
+                    _log.debug(_remotePeer + " Allocation of " + size + " allowed with " 
                               + getSendWindowBytesRemaining() 
                               + "/" + getSendWindowBytes() 
                               + " remaining"
                               + " for message " + state.getMessageId() + ": " + state);
 
-                int rto = getRTO();
-                if (state.getPushCount() > 0) {
-                    _retransmitter = state;
-                    rto = Math.min(MAX_RTO, rto << state.getPushCount()); // Section 5.5 RFC 6298
-                }
 
                 if (state.push())
                     _messagesSent++;
             
-                // messages with multiple fragments need more time
-                state.setNextSendTime(now + rto + ((state.getFragmentCount() - 1) * ACKSender.ACK_FREQUENCY));
-
                 //if (peer.getSendWindowBytesRemaining() > 0)
                 //    _throttle.unchoke(peer.getRemotePeer());
                 return ShouldSend.YES;
@@ -1881,13 +1871,9 @@ public class PeerState {
                 //if (state.getMessage() != null)
                 //    state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
                 if (_log.shouldLog(Log.INFO))
-                    _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+                    _log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
                               + " available=" + getSendWindowBytesRemaining()
                               + " for message " + state.getMessageId() + ": " + state);
-                state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
-                                      _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
                 //_throttle.choke(peer.getRemotePeer());
 
                 //if (state.getMessage() != null)
@@ -1896,9 +1882,7 @@ public class PeerState {
                 //                                 + getSendWindowBytesRemaining());
                 return ShouldSend.NO_BW;
             }
-        } // nextTime <= now 
 
-        return ShouldSend.NO;
     }
     
     /**
@@ -1910,6 +1894,7 @@ public class PeerState {
     boolean acked(long messageId) {
         if (_dead) return false;
         OutboundMessageState state = null;
+        boolean anyPending;
         synchronized (_outboundMessages) {
             for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
@@ -1925,8 +1910,7 @@ public class PeerState {
                     state = null;
                 }
             }
-            if ( (state != null) && (state == _retransmitter) )
-                _retransmitter = null;
+            anyPending = !_outboundMessages.isEmpty();
         }
         
         if (state != null) {
@@ -1948,7 +1932,7 @@ public class PeerState {
             _context.statManager().addRateData("udp.sendConfirmVolley", numSends);
             _transport.succeeded(state);
             // this adjusts the rtt/rto/window/etc
-            messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
+            messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
             //if (getSendWindowBytesRemaining() > 0)
             //    _throttle.unchoke(peer.getRemotePeer());
             
@@ -1976,6 +1960,7 @@ public class PeerState {
     
         OutboundMessageState state = null;
         boolean isComplete = false;
+        boolean anyPending;
         synchronized (_outboundMessages) {
             for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
@@ -1984,8 +1969,6 @@ public class PeerState {
                     if (complete) {
                         isComplete = true;
                         iter.remove();
-                        if (state == _retransmitter)
-                            _retransmitter = null;
                     }
                     break;
                 } else if (state.getPushCount() <= 0) {
@@ -1997,6 +1980,7 @@ public class PeerState {
                     state = null;
                 }
             }
+            anyPending = !_outboundMessages.isEmpty();
         }
         
         if (state != null) {
@@ -2020,7 +2004,7 @@ public class PeerState {
                 _transport.succeeded(state);
                 
                 // this adjusts the rtt/rto/window/etc
-                messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
+                messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
                 //if (state.getPeer().getSendWindowBytesRemaining() > 0)
                 //    _throttle.unchoke(state.getPeer().getRemotePeer());
 
@@ -2085,13 +2069,10 @@ public class PeerState {
         synchronized (oldPeer._outboundMessages) {
             tmp2.addAll(oldPeer._outboundMessages);
             oldPeer._outboundMessages.clear();
-            retransmitter = oldPeer._retransmitter;
-            oldPeer._retransmitter = null;
         }
         if (!_dead) {
             synchronized (_outboundMessages) {
                 _outboundMessages.addAll(tmp2);
-                _retransmitter = retransmitter;
             }
         }
     }
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index ad841f374..22a263843 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -341,6 +341,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
 
         _context.simpleTimer2().addPeriodicEvent(new PingIntroducers(), MIN_EXPIRE_TIMEOUT * 3 / 4);
     }
+
+    OutboundMessageFragments getOMF() {
+        return _fragments;
+    }
     
     /**
      *  Pick a port if not previously configured, so that TransportManager may

Subtickets

Attachments (3)

ssu_streaming_in_loss.ods (22.1 KB) - added by Zlatin Balevsky 7 months ago.
Results from testing the ssu and streaming patches in the testnet with various packet loss probabilities.
2713-loss.ods (11.9 KB) - added by Zlatin Balevsky 2 weeks ago.
Benchmark of recent (0.9.47) router with this patch in lossy conditions.
2713-loss-2.ods (25.9 KB) - added by Zlatin Balevsky 10 days ago.
Benchmark of recent (0.9.47) router with this patch in lossy conditions part 2

Download all attachments as: .zip

Change History (15)

comment:1 Changed 7 months ago by Zlatin Balevsky

Description: modified (diff)

comment:2 Changed 7 months ago by Zlatin Balevsky

Description: modified (diff)

comment:3 Changed 7 months ago by Zlatin Balevsky

Description: modified (diff)

comment:4 Changed 7 months ago by Zlatin Balevsky

Description: modified (diff)

comment:5 Changed 7 months ago by Zlatin Balevsky

Description: modified (diff)

Changed 7 months ago by Zlatin Balevsky

Attachment: ssu_streaming_in_loss.ods added

Results from testing the ssu and streaming patches in the testnet with various packet loss probabilities.

comment:6 Changed 7 months ago by zzz

This patch conflicts with the pending rolled-up jogger patch (4330-line preliminary version received January 23 2020), which purports to fix the following tickets:

#657 #774 #1613 #2259 #2386 #2412 #2427 #2506 #2512 #2576 #2609 #2613 #2640 #2641 #2642 #2646 #2647 #2648 #2649 #2650 #2653 #2654 #2655 #2656 #2657 #2658 #2660 #2664 #2668 #2675

To this date we have not come to an agreement on that patch. We have not agreed to take it as-is and he has not agreed to split it up into smaller patches.

It is worth reviewing his patch for an alternative way of fixing this.
Accepting the patch here would probably seal a decision not to take the jogger patch.

It's also worth finding which of the above tickets may overlap with this one.

Changed 2 weeks ago by Zlatin Balevsky

Attachment: 2713-loss.ods added

Benchmark of recent (0.9.47) router with this patch in lossy conditions.

comment:7 Changed 11 days ago by zzz

Milestone: undecided0.9.48
Priority: minormajor

Looks good and the tl;dr on your test results are a 2x speedup which is fantastic.
The 'perfect' (presumably no delay/drop) tests aren't really useful.
The results are consistent with some of jogger's tickets and comments.

At some point, I'd like to see additional tests with other drop percentages -
perhaps 0.1, 0.25, and 1.0 - but those can probably wait until the patch
is almost finalized.

Given the current test results, I think we should try to get this in for .48,
no need to depend on a follow-on patch for montgomery.

comments and questions - nothing major:

  • Pass now as arg to allocateSend()
  • Reindent in PS.allocateSend2() now that review is done
  • javadocs with @since on new methods nudge(), getOMF(), allocateSend2()
  • PS.locked_shouldSend() never returns ShouldSend?.NO any more. Not sure if any of the checks you deleted should be reinstated. The state.getMaxSends() check may need to be there… but it's the same as state.getPushCount() which is odd. Not sure what's going on there. If we're certain that we never need ShouldSend?.NO anywhere, we should fix things up
  • In PS, THROTTLE_INITIAL_SEND and THROTTLE_RESENDS are now unused, can be deleted
  • In PS, _retransmitTimer is volatile. There's only one spot where it's outside of a sync - that's the 2nd line in allocateSend(). Volatile doesn't fix sync problems for a long on a 32 bit machine; need to switch to an AtomicLong?, or get the last spot inside the sync, or something
  • In PS, canSendOld should be now ≥ timer && !_outboundMessages.isEmpty() to avoid creating the iterator on an empty list. Or maybe timer > 0 && now ≥ timer. Or maybe check inside the sync block in allocateSend2()
  • In allocateSend2() in theory could return both retransmitted and new states. Not sure if that's really the case, or if we want to do that? We should make it explicit if not.
  • In OMF.nudge(), we can do notify(), not notifyAll(), there's only one waiter. We use notify() elsewhere.
  • It appears, as you said in OP, that the patch pulls in #2714 unchanged. So for sanity, let's not do that, and apply #2714 first then have a simplified patch here that goes on top of #2714.
  • This ticket would appear to resolve jogger's tickets #2412 #2649 and #2654 - agreed? Probably worth reviewing 2412 again.
  • This fix appears to prevent iterating through all the pending messages in allocateSend() except on timer expiration. So it partially fixes the TODO in several of the PS javadocs about iterating 3 times. Those TODOs should be updated.

comment:8 Changed 11 days ago by Zlatin Balevsky

ACK on all points, with following caveats:

  • PS.locked_shouldSend() never returning NO: this return value is an artifact of each message having it's own nextSendTime variable. The javadoc of the _outboundMessages says that some may not have been sent yet. This is no longer true, so this enum can be replaced with a boolean
  • _outboundMessages is a CachedIteratorCollection so there is no iterator object created
  • the RFC prohibits mixing of already sent with new messages when the retransmit timer hits.
  • as I mention in #2174 I need that fix in order to run my single eepget test in the testnet, as most of the time only single peer has anything to send.

I suggest we start with #2714 as that is very straight forward. Once that is merged I'll rebase with your comments incorporated.

One thing I'm not very clear on, and the RFC doesn't help here, is how much to resend if the number of unacked messages is not divisible by 2. Let's say we have 3 unacked messages, right now this patch will resend only 1, but do we want resend 2?

comment:9 Changed 10 days ago by zzz

Haven't looked but I'd expect kernels to do the easiest thing possible which would be right shift to divide by two (round down). Rounding down would work, as long as we handle the n=1 case, which we do, since the check is after calling rv.add(state).

I looked to see what we did in streaming:

   toResend = toResend.subList(0, Math.min(MAX_RTX, (toResend.size() + 1) / 2));

so that looks like rounding up.

I would say that RFC 5681 IS clear:

the number of segments transmitted in each RTT MUST be no more than half...

so I think rounding down is correct, with the n=1 exception.

comment:10 Changed 10 days ago by Zlatin Balevsky

Below is a rebased patch after #2714 was merged. I believe I've addressed all points from comment 7.

diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
index 03ec420dc..b8c34533b 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
@@ -298,13 +298,13 @@ class OutboundMessageFragments {
                             continue;
                         }
                         peersProcessed++;
-                        states = p.allocateSend();
+                        states = p.allocateSend(now);
                         if (states != null) {
                             peer = p;
                             // we have something to send and we will be returning it
                             break;
                         } 
-                        int delay = p.getNextDelay();
+                        int delay = p.getNextDelay(now);
                         if (delay < nextSendDelay)
                             nextSendDelay = delay;
                         
@@ -371,6 +371,16 @@ class OutboundMessageFragments {
         return packets;
     }
 
+    /**
+     * Wakes up the packet pusher thread.
+     * @since 0.9.48
+     */
+    void nudge() {
+        synchronized(_activePeers) {
+            _activePeers.notify();
+        }
+    }
+
     /**
      *  @return null if state or peer is null
      */
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
index f5a42ee27..f010e4dd3 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
@@ -31,7 +31,6 @@ class OutboundMessageState implements CDPQEntry {
     private long _fragmentAcks;
     private final int _numFragments;
     private final long _startedOn;
-    private long _nextSendTime;
     private int _pushCount;
     private int _maxSends;
     // we can't use the ones in _message since it is null for injections
@@ -77,7 +76,6 @@ class OutboundMessageState implements CDPQEntry {
         _i2npMessage = msg;
         _peer = peer;
         _startedOn = _context.clock().now();
-        _nextSendTime = _startedOn;
         _expiration = _startedOn + EXPIRATION;
         //_expiration = msg.getExpiration();
 
@@ -166,9 +164,6 @@ class OutboundMessageState implements CDPQEntry {
         return isComplete();
     }
     
-    public long getNextSendTime() { return _nextSendTime; }
-    public void setNextSendTime(long when) { _nextSendTime = when; }
-
     /**
      *  The max number of sends for any fragment, which is the
      *  same as the push count, at least as it's coded now.
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index d67390e5b..6f64dffb8 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -219,8 +219,8 @@ public class PeerState {
     //private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
     private final PriBlockingQueue<OutboundMessageState> _outboundQueue;
 
-    /** which outbound message is currently being retransmitted */
-    private OutboundMessageState _retransmitter;
+    /** when the retransmit timer is about to trigger */
+    private long _retransmitTimer;
     
     private final UDPTransport _transport;
     
@@ -246,9 +246,6 @@ public class PeerState {
     private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
 
-    /** max number of msgs returned from allocateSend() */
-    private static final int MAX_ALLOCATE_SEND = 2;
-
     /**
      *  Was 32 before 0.9.2, but since the streaming lib goes up to 128,
      *  we would just drop our own msgs right away during slow start.
@@ -693,15 +690,14 @@ public class PeerState {
      *
      *  Caller should synch
      */
-    private boolean allocateSendingBytes(int size, int messagePushCount) {
-        return allocateSendingBytes(size, false, messagePushCount);
+    private boolean allocateSendingBytes(int size, int messagePushCount, long now) {
+        return allocateSendingBytes(size, false, messagePushCount, now);
     }
 
     /**
      *  Caller should synch
      */
-    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
+    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount, long now) { 
         long duration = now - _lastSendRefill;
         if (duration >= 1000) {
             _sendWindowBytesRemaining = _sendWindowBytes;
@@ -928,6 +924,14 @@ public class PeerState {
             _sendWindowBytes = MINIMUM_WINDOW_BYTES;
         //if (congestionAt/2 < _slowStartThreshold)
             _slowStartThreshold = congestionAt/2;
+
+        int oldRto = _rto;
+        long oldTimer = _retransmitTimer - now;
+        _rto = Math.min(MAX_RTO, Math.max(minRTO(), _rto << 1 ));
+        _retransmitTimer = now + _rto;
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
+
         return true;
     }
     
@@ -1186,7 +1190,7 @@ public class PeerState {
      *  We sent a message which was ACKed containing the given # of bytes.
      *  Caller should synch on this
      */
-    private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) {
+    private void locked_messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
         _consecutiveFailedSends = 0;
         // _lastFailedSendPeriod = -1;
         if (numSends < 2) {
@@ -1231,17 +1235,31 @@ public class PeerState {
                 adjustMTU();
             //}
         }
+        
+        if (!anyPending) {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(_remotePeer + " nothing pending, cancelling timer");
+            _retransmitTimer = 0;
+        } else {
+            // any time new data gets acked, push out the timer
+            long now = _context.clock().now();
+            long oldTimer = _retransmitTimer - now;
+            _retransmitTimer = now + getRTO();
+            if (_log.shouldLog(Log.DEBUG))
+               _log.debug(_remotePeer + " ACK, timer: " + oldTimer + " -> " + (_retransmitTimer - now));
+        }
+        _transport.getOMF().nudge();
     }
 
     /**
      *  We sent a message which was ACKed containing the given # of bytes.
      */
-    private void messageACKed(int bytesACKed, long lifetime, int numSends) {
+    private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
         synchronized(this) {
-            locked_messageACKed(bytesACKed, lifetime, numSends);
+            locked_messageACKed(bytesACKed, lifetime, numSends, anyPending);
         }
         if (numSends >= 2 && _log.shouldLog(Log.INFO))
-            _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
+            _log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
         
         _context.statManager().addRateData("udp.sendBps", _sendBps);
     }
@@ -1548,7 +1566,6 @@ public class PeerState {
 
             List<OutboundMessageState> tempList;
             synchronized (_outboundMessages) {
-                    _retransmitter = null;
                     tempList = new ArrayList<OutboundMessageState>(_outboundMessages);
                     _outboundMessages.clear();
             }
@@ -1610,21 +1627,15 @@ public class PeerState {
                 OutboundMessageState state = iter.next();
                 if (state.isComplete()) {
                     iter.remove();
-                    if (_retransmitter == state)
-                        _retransmitter = null;
                     if (succeeded == null) succeeded = new ArrayList<OutboundMessageState>(4);
                     succeeded.add(state);
                 } else if (state.isExpired(now)) {
                     iter.remove();
-                    if (_retransmitter == state)
-                        _retransmitter = null;
                     _context.statManager().addRateData("udp.sendFailed", state.getPushCount());
                     if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
                     failed.add(state);
                 } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
                     iter.remove();
-                    if (state == _retransmitter)
-                        _retransmitter = null;
                     _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount());
                     if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
                     failed.add(state);
@@ -1660,59 +1671,83 @@ public class PeerState {
     
     /**
      * Pick one or more messages we want to send and allocate them out of our window
+     * Adjusts the retransmit timer if necessary.
      * High usage -
      * OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned &gt; 0.
-     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
+     * TODO combine finishMessages() and allocateSend() so we don't iterate 2 times.
      *
      * @return allocated messages to send (never empty), or null if no messages or no resources
      */
-    List<OutboundMessageState> allocateSend() {
+    List<OutboundMessageState> allocateSend(long now) {
+        long retransmitTimer;
+        synchronized(this) {
+            retransmitTimer = _retransmitTimer;
+        }
+        List<OutboundMessageState> rv = allocateSend2(now >= retransmitTimer, now);
+        if (rv != null && !rv.isEmpty()) {
+            synchronized(this) {
+                long old = _retransmitTimer;
+                if (_retransmitTimer == 0)
+                    _retransmitTimer = now + getRTO();
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
+            }
+        }
+        return rv;
+    }
+
+    /**
+     * Pick one or more messages to send.
+     * @param canSendOld if any already sent messages can be sent.  If false, only new messages will be considered
+     * @param now what time is it now
+     * @since 0.9.48
+     */
+    private List<OutboundMessageState> allocateSend2(boolean canSendOld, long now) {
         if (_dead) return null;
         List<OutboundMessageState> rv = null;
         synchronized (_outboundMessages) {
-            for (OutboundMessageState state : _outboundMessages) {
-                // We have 3 return values, because if allocateSendingBytes() returns false,
-                // then we can stop iterating.
-                ShouldSend should = locked_shouldSend(state);
-                if (should == ShouldSend.YES) {
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
-                    /*
-                    while (iter.hasNext()) {
-                        OutboundMessageState later = (OutboundMessageState)iter.next();
-                        OutNetMessage msg = later.getMessage();
-                        if (msg != null)
-                            msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
-                    }
-                     */
-                    if (rv == null)
-                        rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
-                    rv.add(state);
-                    if (rv.size() >= MAX_ALLOCATE_SEND)
+            if (canSendOld) {
+                for (OutboundMessageState state : _outboundMessages) {
+                    boolean should = locked_shouldSend(state, now);
+                    if (should) {
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
+                        /*
+                        while (iter.hasNext()) {
+                            OutboundMessageState later = (OutboundMessageState)iter.next();
+                            OutNetMessage msg = later.getMessage();
+                            if (msg != null)
+                                msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
+                        }
+                         */
+                        if (rv == null)
+                            rv = new ArrayList<OutboundMessageState>(_outboundMessages.size());
+                        rv.add(state);
+                        if (rv.size() >= _outboundMessages.size() / 2)
+                            return rv;
+                    } else {
+                        // no more bandwidth available
+                        // we don't bother looking for a smaller msg that would fit.
+                        // By not looking further, we keep strict sending order, and that allows
+                        // some efficiency in acked() below.
+                        if (_log.shouldLog(Log.DEBUG)) {
+                            if (rv == null)
+                                _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
+                                       " / " + _outboundQueue.size() + " remaining");
+                            else 
+                               _log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
+                        }
                         return rv;
-                } else if (should == ShouldSend.NO_BW) {
-                    // no more bandwidth available
-                    // we don't bother looking for a smaller msg that would fit.
-                    // By not looking further, we keep strict sending order, and that allows
-                    // some efficiency in acked() below.
-                    if (rv == null && _log.shouldLog(Log.DEBUG))
-                        _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
-                                   " / " + _outboundQueue.size() + " remaining");
-                    return rv;
-                } /* else {
-                    OutNetMessage msg = state.getMessage();
-                    if (msg != null)
-                        msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
-                } */
+                    } 
+                }
             }
-
             // Peek at head of _outboundQueue and see if we can send it.
             // If so, pull it off, put it in _outbundMessages, test
             // again for bandwidth if necessary, and return it.
             OutboundMessageState state;
             synchronized (_outboundQueue) {
                 while ((state = _outboundQueue.peek()) != null &&
-                       ShouldSend.YES == locked_shouldSend(state)) {
+                       locked_shouldSend(state, now)) {
                     // This is guaranted to be the same as what we got in peek(),
                     // due to locking and because we aren't using the dropping CDPBQ.
                     // If we do switch to CDPBQ,
@@ -1729,9 +1764,9 @@ public class PeerState {
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
                         if (rv == null)
-                            rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
+                            rv = new ArrayList<OutboundMessageState>(_concurrentMessagesAllowed);
                         rv.add(dequeuedState);
-                        if (rv.size() >= MAX_ALLOCATE_SEND)
+                        if (rv.size() >= _concurrentMessagesAllowed)
                             return rv;
                     }
                 }
@@ -1739,39 +1774,27 @@ public class PeerState {
         }
         if ( rv == null && _log.shouldLog(Log.DEBUG))
             _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
-                       " / " + _outboundQueue.size() + " remaining");
+                       " / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now()));
+
         return rv;
     }
     
     /**
      * High usage -
      * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
-     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
+     * TODO combine finishMessages(), allocateSend() so we don't iterate 2 times.
      *
+     * @param now what time it is now
      * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
      *         If ready now, will return 0 or a negative value.
      */
-    int getNextDelay() {
+    int getNextDelay(long now) {
         int rv = Integer.MAX_VALUE;
         if (_dead) return rv;
-        long now = _context.clock().now();
-        synchronized (_outboundMessages) {
-            if (_retransmitter != null) {
-                rv = (int)(_retransmitter.getNextSendTime() - now);
-                return rv;
-            }
-            for (OutboundMessageState state : _outboundMessages) {
-                int delay = (int)(state.getNextSendTime() - now);
-                // short circuit once we hit something ready to go
-                if (delay <= 0)
-                    return delay;
-                if (delay < rv)
-                    rv = delay;
-            }
+        synchronized(this) {
+            if (_retransmitTimer >= now) 
+                return (int) (_retransmitTimer - now);
         }
-        // failsafe... is this OK?
-        if (rv > 100 && !_outboundQueue.isEmpty())
-            rv = 100;
         return rv;
     }
 
@@ -1782,20 +1805,6 @@ public class PeerState {
         return _dead || _outboundQueue.isBacklogged();
     }
 
-    /**
-     * If set to true, we should throttle retransmissions of all but the first message in
-     * flight to a peer.  If set to false, we will only throttle the initial flight of a
-     * message to a peer while a retransmission is going on.
-     */
-    private static final boolean THROTTLE_RESENDS = true;
-    /** 
-     * if true, throttle the initial volley of a message if there is a resend in progress.
-     * if false, always send the first volley, regardless of retransmissions (but keeping in
-     * mind bw/cwin throttle, etc)
-     *
-     */
-    private static final boolean THROTTLE_INITIAL_SEND = true;
-    
     /**
      *  Always leave room for this many explicit acks.
      *  Only for data packets. Does not affect ack-only packets.
@@ -1817,88 +1826,42 @@ public class PeerState {
                MIN_ACK_SIZE;
     }
     
-    private enum ShouldSend { YES, NO, NO_BW };
-
     /**
-     *  Have 3 return values, because if allocateSendingBytes() returns false,
-     *  then allocateSend() can stop iterating
-     *
      *  Caller should synch
      */
-    private ShouldSend locked_shouldSend(OutboundMessageState state) {
-        long now = _context.clock().now();
-        if (state.getNextSendTime() <= now) {
-            OutboundMessageState retrans = _retransmitter;
-            if ( (retrans != null) && ( (retrans.isExpired(now) || retrans.isComplete()) ) ) {
-                _retransmitter = null;
-                retrans = null;
-	    }
-            
-            if ( (retrans != null) && (retrans != state) ) {
-                // choke it, since there's already another message retransmitting to this
-                // peer.
-                _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted);
-                int max = state.getMaxSends();
-                if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
-                } else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("choked, with another message retransmitting");
-                    return ShouldSend.NO;
-                } else {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");                    
-                }
-            }
-
+    private boolean locked_shouldSend(OutboundMessageState state, long now) {
             int size = state.getUnackedSize();
-            if (allocateSendingBytes(size, state.getPushCount())) {
+            if (allocateSendingBytes(size, state.getPushCount(), now)) {
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Allocation of " + size + " allowed with " 
+                    _log.debug(_remotePeer + " Allocation of " + size + " allowed with " 
                               + getSendWindowBytesRemaining() 
                               + "/" + getSendWindowBytes() 
                               + " remaining"
                               + " for message " + state.getMessageId() + ": " + state);
 
-                int rto = getRTO();
-                if (state.getPushCount() > 0) {
-                    _retransmitter = state;
-                    rto = Math.min(MAX_RTO, rto << state.getPushCount()); // Section 5.5 RFC 6298
-                }
 
                 if (state.push())
                     _messagesSent++;
             
-                // messages with multiple fragments need more time
-                state.setNextSendTime(now + rto + ((state.getFragmentCount() - 1) * ACKSender.ACK_FREQUENCY));
-
                 //if (peer.getSendWindowBytesRemaining() > 0)
                 //    _throttle.unchoke(peer.getRemotePeer());
-                return ShouldSend.YES;
+                return true;
             } else {
                 _context.statManager().addRateData("udp.sendRejected", state.getPushCount());
                 //if (state.getMessage() != null)
                 //    state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
                 if (_log.shouldLog(Log.INFO))
-                    _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+                    _log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
                               + " available=" + getSendWindowBytesRemaining()
                               + " for message " + state.getMessageId() + ": " + state);
-                state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
-                                      _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
                 //_throttle.choke(peer.getRemotePeer());
 
                 //if (state.getMessage() != null)
                 //    state.getMessage().timestamp("choked, not enough available, wsize=" 
                 //                                 + getSendWindowBytes() + " available="
                 //                                 + getSendWindowBytesRemaining());
-                return ShouldSend.NO_BW;
+                return false;
             }
-        } // nextTime <= now 
-
-        return ShouldSend.NO;
     }
     
     /**
@@ -1910,6 +1873,7 @@ public class PeerState {
     boolean acked(long messageId) {
         if (_dead) return false;
         OutboundMessageState state = null;
+        boolean anyPending;
         synchronized (_outboundMessages) {
             for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
@@ -1925,8 +1889,7 @@ public class PeerState {
                     state = null;
                 }
             }
-            if ( (state != null) && (state == _retransmitter) )
-                _retransmitter = null;
+            anyPending = !_outboundMessages.isEmpty();
         }
         
         if (state != null) {
@@ -1948,7 +1911,7 @@ public class PeerState {
             _context.statManager().addRateData("udp.sendConfirmVolley", numSends);
             _transport.succeeded(state);
             // this adjusts the rtt/rto/window/etc
-            messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
+            messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
             //if (getSendWindowBytesRemaining() > 0)
             //    _throttle.unchoke(peer.getRemotePeer());
             
@@ -1976,6 +1939,7 @@ public class PeerState {
     
         OutboundMessageState state = null;
         boolean isComplete = false;
+        boolean anyPending;
         synchronized (_outboundMessages) {
             for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
@@ -1984,8 +1948,6 @@ public class PeerState {
                     if (complete) {
                         isComplete = true;
                         iter.remove();
-                        if (state == _retransmitter)
-                            _retransmitter = null;
                     }
                     break;
                 } else if (state.getPushCount() <= 0) {
@@ -1997,6 +1959,7 @@ public class PeerState {
                     state = null;
                 }
             }
+            anyPending = !_outboundMessages.isEmpty();
         }
         
         if (state != null) {
@@ -2020,7 +1983,7 @@ public class PeerState {
                 _transport.succeeded(state);
                 
                 // this adjusts the rtt/rto/window/etc
-                messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
+                messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
                 //if (state.getPeer().getSendWindowBytesRemaining() > 0)
                 //    _throttle.unchoke(state.getPeer().getRemotePeer());
 
@@ -2085,13 +2048,10 @@ public class PeerState {
         synchronized (oldPeer._outboundMessages) {
             tmp2.addAll(oldPeer._outboundMessages);
             oldPeer._outboundMessages.clear();
-            retransmitter = oldPeer._retransmitter;
-            oldPeer._retransmitter = null;
         }
         if (!_dead) {
             synchronized (_outboundMessages) {
                 _outboundMessages.addAll(tmp2);
-                _retransmitter = retransmitter;
             }
         }
     }
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 37617a876..370cbda7f 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -345,6 +345,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
 
         _context.simpleTimer2().addPeriodicEvent(new PingIntroducers(), MIN_EXPIRE_TIMEOUT * 3 / 4);
     }
+
+    /**
+     * @returns the instance of OutboundMessageFragments
+     * @since 0.9.48
+     */
+    OutboundMessageFragments getOMF() {
+        return _fragments;
+    }
     
     /**
      *  Pick a port if not previously configured, so that TransportManager may

Changed 10 days ago by Zlatin Balevsky

Attachment: 2713-loss-2.ods added

Benchmark of recent (0.9.47) router with this patch in lossy conditions part 2

comment:11 Changed 6 days ago by Zlatin Balevsky

Patch updated with latest comments from zzz

diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
index 03ec420dc..c94f2d1ce 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
@@ -294,17 +294,17 @@ class OutboundMessageFragments {
                             // race with add()
                             _iterator.remove();
                             if (_log.shouldLog(Log.DEBUG))
-                                _log.debug("No more pending messages for " + peer.getRemotePeer());
+                                _log.debug("No more pending messages for " + p.getRemotePeer());
                             continue;
                         }
                         peersProcessed++;
-                        states = p.allocateSend();
+                        states = p.allocateSend(now);
                         if (states != null) {
                             peer = p;
                             // we have something to send and we will be returning it
                             break;
                         } 
-                        int delay = p.getNextDelay();
+                        int delay = p.getNextDelay(now);
                         if (delay < nextSendDelay)
                             nextSendDelay = delay;
                         
@@ -371,6 +371,16 @@ class OutboundMessageFragments {
         return packets;
     }
 
+    /**
+     * Wakes up the packet pusher thread.
+     * @since 0.9.48
+     */
+    void nudge() {
+        synchronized(_activePeers) {
+            _activePeers.notify();
+        }
+    }
+
     /**
      *  @return null if state or peer is null
      */
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
index f5a42ee27..f010e4dd3 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java
@@ -31,7 +31,6 @@ class OutboundMessageState implements CDPQEntry {
     private long _fragmentAcks;
     private final int _numFragments;
     private final long _startedOn;
-    private long _nextSendTime;
     private int _pushCount;
     private int _maxSends;
     // we can't use the ones in _message since it is null for injections
@@ -77,7 +76,6 @@ class OutboundMessageState implements CDPQEntry {
         _i2npMessage = msg;
         _peer = peer;
         _startedOn = _context.clock().now();
-        _nextSendTime = _startedOn;
         _expiration = _startedOn + EXPIRATION;
         //_expiration = msg.getExpiration();
 
@@ -166,9 +164,6 @@ class OutboundMessageState implements CDPQEntry {
         return isComplete();
     }
     
-    public long getNextSendTime() { return _nextSendTime; }
-    public void setNextSendTime(long when) { _nextSendTime = when; }
-
     /**
      *  The max number of sends for any fragment, which is the
      *  same as the push count, at least as it's coded now.
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index d67390e5b..4b03005d8 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -219,8 +219,8 @@ public class PeerState {
     //private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
     private final PriBlockingQueue<OutboundMessageState> _outboundQueue;
 
-    /** which outbound message is currently being retransmitted */
-    private OutboundMessageState _retransmitter;
+    /** when the retransmit timer is about to trigger */
+    private long _retransmitTimer;
     
     private final UDPTransport _transport;
     
@@ -246,9 +246,6 @@ public class PeerState {
     private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
 
-    /** max number of msgs returned from allocateSend() */
-    private static final int MAX_ALLOCATE_SEND = 2;
-
     /**
      *  Was 32 before 0.9.2, but since the streaming lib goes up to 128,
      *  we would just drop our own msgs right away during slow start.
@@ -693,15 +690,14 @@ public class PeerState {
      *
      *  Caller should synch
      */
-    private boolean allocateSendingBytes(int size, int messagePushCount) {
-        return allocateSendingBytes(size, false, messagePushCount);
+    private boolean allocateSendingBytes(int size, int messagePushCount, long now) {
+        return allocateSendingBytes(size, false, messagePushCount, now);
     }
 
     /**
      *  Caller should synch
      */
-    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
+    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount, long now) { 
         long duration = now - _lastSendRefill;
         if (duration >= 1000) {
             _sendWindowBytesRemaining = _sendWindowBytes;
@@ -928,6 +924,14 @@ public class PeerState {
             _sendWindowBytes = MINIMUM_WINDOW_BYTES;
         //if (congestionAt/2 < _slowStartThreshold)
             _slowStartThreshold = congestionAt/2;
+
+        int oldRto = _rto;
+        long oldTimer = _retransmitTimer - now;
+        _rto = Math.min(MAX_RTO, Math.max(minRTO(), _rto << 1 ));
+        _retransmitTimer = now + _rto;
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
+
         return true;
     }
     
@@ -1186,7 +1190,7 @@ public class PeerState {
      *  We sent a message which was ACKed containing the given # of bytes.
      *  Caller should synch on this
      */
-    private void locked_messageACKed(int bytesACKed, long lifetime, int numSends) {
+    private void locked_messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
         _consecutiveFailedSends = 0;
         // _lastFailedSendPeriod = -1;
         if (numSends < 2) {
@@ -1231,17 +1235,31 @@ public class PeerState {
                 adjustMTU();
             //}
         }
+        
+        if (!anyPending) {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(_remotePeer + " nothing pending, cancelling timer");
+            _retransmitTimer = 0;
+        } else {
+            // any time new data gets acked, push out the timer
+            long now = _context.clock().now();
+            long oldTimer = _retransmitTimer - now;
+            _retransmitTimer = now + getRTO();
+            if (_log.shouldLog(Log.DEBUG))
+               _log.debug(_remotePeer + " ACK, timer: " + oldTimer + " -> " + (_retransmitTimer - now));
+        }
+        _transport.getOMF().nudge();
     }
 
     /**
      *  We sent a message which was ACKed containing the given # of bytes.
      */
-    private void messageACKed(int bytesACKed, long lifetime, int numSends) {
+    private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending) {
         synchronized(this) {
-            locked_messageACKed(bytesACKed, lifetime, numSends);
+            locked_messageACKed(bytesACKed, lifetime, numSends, anyPending);
         }
         if (numSends >= 2 && _log.shouldLog(Log.INFO))
-            _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
+            _log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
         
         _context.statManager().addRateData("udp.sendBps", _sendBps);
     }
@@ -1548,7 +1566,6 @@ public class PeerState {
 
             List<OutboundMessageState> tempList;
             synchronized (_outboundMessages) {
-                    _retransmitter = null;
                     tempList = new ArrayList<OutboundMessageState>(_outboundMessages);
                     _outboundMessages.clear();
             }
@@ -1610,21 +1627,15 @@ public class PeerState {
                 OutboundMessageState state = iter.next();
                 if (state.isComplete()) {
                     iter.remove();
-                    if (_retransmitter == state)
-                        _retransmitter = null;
                     if (succeeded == null) succeeded = new ArrayList<OutboundMessageState>(4);
                     succeeded.add(state);
                 } else if (state.isExpired(now)) {
                     iter.remove();
-                    if (_retransmitter == state)
-                        _retransmitter = null;
                     _context.statManager().addRateData("udp.sendFailed", state.getPushCount());
                     if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
                     failed.add(state);
                 } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
                     iter.remove();
-                    if (state == _retransmitter)
-                        _retransmitter = null;
                     _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount());
                     if (failed == null) failed = new ArrayList<OutboundMessageState>(4);
                     failed.add(state);
@@ -1660,59 +1671,84 @@ public class PeerState {
     
     /**
      * Pick one or more messages we want to send and allocate them out of our window
+     * Adjusts the retransmit timer if necessary.
      * High usage -
      * OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned &gt; 0.
-     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
+     * TODO combine finishMessages() and allocateSend() so we don't iterate 2 times.
      *
      * @return allocated messages to send (never empty), or null if no messages or no resources
      */
-    List<OutboundMessageState> allocateSend() {
+    List<OutboundMessageState> allocateSend(long now) {
+        long retransmitTimer;
+        synchronized(this) {
+            retransmitTimer = _retransmitTimer;
+        }
+        List<OutboundMessageState> rv = allocateSend2(retransmitTimer > 0 && now >= retransmitTimer, now);
+        if (rv != null && !rv.isEmpty()) {
+            synchronized(this) {
+                long old = _retransmitTimer;
+                if (_retransmitTimer == 0)
+                    _retransmitTimer = now + getRTO();
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(_remotePeer + " allocated " + rv.size() + " pushing retransmitter from " + old + " to " + _retransmitTimer);
+            }
+        }
+        return rv;
+    }
+
+    /**
+     * Pick one or more messages to send.  This will alloace either old or new messages, but not both.
+     * @param canSendOld if any already sent messages can be sent.  If false, only new messages will be considered
+     * @param now what time is it now
+     * @since 0.9.48
+     */
+    private List<OutboundMessageState> allocateSend2(boolean canSendOld, long now) {
         if (_dead) return null;
         List<OutboundMessageState> rv = null;
         synchronized (_outboundMessages) {
-            for (OutboundMessageState state : _outboundMessages) {
-                // We have 3 return values, because if allocateSendingBytes() returns false,
-                // then we can stop iterating.
-                ShouldSend should = locked_shouldSend(state);
-                if (should == ShouldSend.YES) {
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
-                    /*
-                    while (iter.hasNext()) {
-                        OutboundMessageState later = (OutboundMessageState)iter.next();
-                        OutNetMessage msg = later.getMessage();
-                        if (msg != null)
-                            msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
-                    }
-                     */
-                    if (rv == null)
-                        rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
-                    rv.add(state);
-                    if (rv.size() >= MAX_ALLOCATE_SEND)
+            if (canSendOld) {
+                for (OutboundMessageState state : _outboundMessages) {
+                    boolean should = locked_shouldSend(state, now);
+                    if (should) {
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
+                        /*
+                        while (iter.hasNext()) {
+                            OutboundMessageState later = (OutboundMessageState)iter.next();
+                            OutNetMessage msg = later.getMessage();
+                            if (msg != null)
+                                msg.timestamp("not reached for allocation " + msgs.size() + " other peers");
+                        }
+                         */
+                        if (rv == null)
+                            rv = new ArrayList<OutboundMessageState>(_outboundMessages.size());
+                        rv.add(state);
+                        if (rv.size() >= _outboundMessages.size() / 2)
+                            return rv;
+                    } else {
+                        // no more bandwidth available
+                        // we don't bother looking for a smaller msg that would fit.
+                        // By not looking further, we keep strict sending order, and that allows
+                        // some efficiency in acked() below.
+                        if (_log.shouldLog(Log.DEBUG)) {
+                            if (rv == null)
+                                _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
+                                       " / " + _outboundQueue.size() + " remaining");
+                            else 
+                               _log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
+                        }
                         return rv;
-                } else if (should == ShouldSend.NO_BW) {
-                    // no more bandwidth available
-                    // we don't bother looking for a smaller msg that would fit.
-                    // By not looking further, we keep strict sending order, and that allows
-                    // some efficiency in acked() below.
-                    if (rv == null && _log.shouldLog(Log.DEBUG))
-                        _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
-                                   " / " + _outboundQueue.size() + " remaining");
-                    return rv;
-                } /* else {
-                    OutNetMessage msg = state.getMessage();
-                    if (msg != null)
-                        msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
-                } */
+                    } 
+                }
+                return null;
             }
-
             // Peek at head of _outboundQueue and see if we can send it.
             // If so, pull it off, put it in _outbundMessages, test
             // again for bandwidth if necessary, and return it.
             OutboundMessageState state;
             synchronized (_outboundQueue) {
                 while ((state = _outboundQueue.peek()) != null &&
-                       ShouldSend.YES == locked_shouldSend(state)) {
+                       locked_shouldSend(state, now)) {
                     // This is guaranted to be the same as what we got in peek(),
                     // due to locking and because we aren't using the dropping CDPBQ.
                     // If we do switch to CDPBQ,
@@ -1729,9 +1765,9 @@ public class PeerState {
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
                         if (rv == null)
-                            rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
+                            rv = new ArrayList<OutboundMessageState>(_concurrentMessagesAllowed);
                         rv.add(dequeuedState);
-                        if (rv.size() >= MAX_ALLOCATE_SEND)
+                        if (rv.size() >= _concurrentMessagesAllowed)
                             return rv;
                     }
                 }
@@ -1739,39 +1775,27 @@ public class PeerState {
         }
         if ( rv == null && _log.shouldLog(Log.DEBUG))
             _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
-                       " / " + _outboundQueue.size() + " remaining");
+                       " / " + _outboundQueue.size() + " remaining, rtx timer in " + (_retransmitTimer - _context.clock().now()));
+
         return rv;
     }
     
     /**
      * High usage -
      * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
-     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
+     * TODO combine finishMessages(), allocateSend() so we don't iterate 2 times.
      *
+     * @param now what time it is now
      * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
      *         If ready now, will return 0 or a negative value.
      */
-    int getNextDelay() {
+    int getNextDelay(long now) {
         int rv = Integer.MAX_VALUE;
         if (_dead) return rv;
-        long now = _context.clock().now();
-        synchronized (_outboundMessages) {
-            if (_retransmitter != null) {
-                rv = (int)(_retransmitter.getNextSendTime() - now);
-                return rv;
-            }
-            for (OutboundMessageState state : _outboundMessages) {
-                int delay = (int)(state.getNextSendTime() - now);
-                // short circuit once we hit something ready to go
-                if (delay <= 0)
-                    return delay;
-                if (delay < rv)
-                    rv = delay;
-            }
+        synchronized(this) {
+            if (_retransmitTimer >= now) 
+                return (int) (_retransmitTimer - now);
         }
-        // failsafe... is this OK?
-        if (rv > 100 && !_outboundQueue.isEmpty())
-            rv = 100;
         return rv;
     }
 
@@ -1782,20 +1806,6 @@ public class PeerState {
         return _dead || _outboundQueue.isBacklogged();
     }
 
-    /**
-     * If set to true, we should throttle retransmissions of all but the first message in
-     * flight to a peer.  If set to false, we will only throttle the initial flight of a
-     * message to a peer while a retransmission is going on.
-     */
-    private static final boolean THROTTLE_RESENDS = true;
-    /** 
-     * if true, throttle the initial volley of a message if there is a resend in progress.
-     * if false, always send the first volley, regardless of retransmissions (but keeping in
-     * mind bw/cwin throttle, etc)
-     *
-     */
-    private static final boolean THROTTLE_INITIAL_SEND = true;
-    
     /**
      *  Always leave room for this many explicit acks.
      *  Only for data packets. Does not affect ack-only packets.
@@ -1817,88 +1827,42 @@ public class PeerState {
                MIN_ACK_SIZE;
     }
     
-    private enum ShouldSend { YES, NO, NO_BW };
-
     /**
-     *  Have 3 return values, because if allocateSendingBytes() returns false,
-     *  then allocateSend() can stop iterating
-     *
      *  Caller should synch
      */
-    private ShouldSend locked_shouldSend(OutboundMessageState state) {
-        long now = _context.clock().now();
-        if (state.getNextSendTime() <= now) {
-            OutboundMessageState retrans = _retransmitter;
-            if ( (retrans != null) && ( (retrans.isExpired(now) || retrans.isComplete()) ) ) {
-                _retransmitter = null;
-                retrans = null;
-	    }
-            
-            if ( (retrans != null) && (retrans != state) ) {
-                // choke it, since there's already another message retransmitting to this
-                // peer.
-                _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted);
-                int max = state.getMaxSends();
-                if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley...");
-                } else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("choked, with another message retransmitting");
-                    return ShouldSend.NO;
-                } else {
-                    //if (state.getMessage() != null)
-                    //    state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");                    
-                }
-            }
-
+    private boolean locked_shouldSend(OutboundMessageState state, long now) {
             int size = state.getUnackedSize();
-            if (allocateSendingBytes(size, state.getPushCount())) {
+            if (allocateSendingBytes(size, state.getPushCount(), now)) {
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Allocation of " + size + " allowed with " 
+                    _log.debug(_remotePeer + " Allocation of " + size + " allowed with " 
                               + getSendWindowBytesRemaining() 
                               + "/" + getSendWindowBytes() 
                               + " remaining"
                               + " for message " + state.getMessageId() + ": " + state);
 
-                int rto = getRTO();
-                if (state.getPushCount() > 0) {
-                    _retransmitter = state;
-                    rto = Math.min(MAX_RTO, rto << state.getPushCount()); // Section 5.5 RFC 6298
-                }
 
                 if (state.push())
                     _messagesSent++;
             
-                // messages with multiple fragments need more time
-                state.setNextSendTime(now + rto + ((state.getFragmentCount() - 1) * ACKSender.ACK_FREQUENCY));
-
                 //if (peer.getSendWindowBytesRemaining() > 0)
                 //    _throttle.unchoke(peer.getRemotePeer());
-                return ShouldSend.YES;
+                return true;
             } else {
                 _context.statManager().addRateData("udp.sendRejected", state.getPushCount());
                 //if (state.getMessage() != null)
                 //    state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
                 if (_log.shouldLog(Log.INFO))
-                    _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
+                    _log.info(_remotePeer + " Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
                               + " available=" + getSendWindowBytesRemaining()
                               + " for message " + state.getMessageId() + ": " + state);
-                state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
-                                      _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
                 //_throttle.choke(peer.getRemotePeer());
 
                 //if (state.getMessage() != null)
                 //    state.getMessage().timestamp("choked, not enough available, wsize=" 
                 //                                 + getSendWindowBytes() + " available="
                 //                                 + getSendWindowBytesRemaining());
-                return ShouldSend.NO_BW;
+                return false;
             }
-        } // nextTime <= now 
-
-        return ShouldSend.NO;
     }
     
     /**
@@ -1910,6 +1874,7 @@ public class PeerState {
     boolean acked(long messageId) {
         if (_dead) return false;
         OutboundMessageState state = null;
+        boolean anyPending;
         synchronized (_outboundMessages) {
             for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
@@ -1925,8 +1890,7 @@ public class PeerState {
                     state = null;
                 }
             }
-            if ( (state != null) && (state == _retransmitter) )
-                _retransmitter = null;
+            anyPending = !_outboundMessages.isEmpty();
         }
         
         if (state != null) {
@@ -1948,7 +1912,7 @@ public class PeerState {
             _context.statManager().addRateData("udp.sendConfirmVolley", numSends);
             _transport.succeeded(state);
             // this adjusts the rtt/rto/window/etc
-            messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
+            messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
             //if (getSendWindowBytesRemaining() > 0)
             //    _throttle.unchoke(peer.getRemotePeer());
             
@@ -1976,6 +1940,7 @@ public class PeerState {
     
         OutboundMessageState state = null;
         boolean isComplete = false;
+        boolean anyPending;
         synchronized (_outboundMessages) {
             for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
@@ -1984,8 +1949,6 @@ public class PeerState {
                     if (complete) {
                         isComplete = true;
                         iter.remove();
-                        if (state == _retransmitter)
-                            _retransmitter = null;
                     }
                     break;
                 } else if (state.getPushCount() <= 0) {
@@ -1997,6 +1960,7 @@ public class PeerState {
                     state = null;
                 }
             }
+            anyPending = !_outboundMessages.isEmpty();
         }
         
         if (state != null) {
@@ -2020,7 +1984,7 @@ public class PeerState {
                 _transport.succeeded(state);
                 
                 // this adjusts the rtt/rto/window/etc
-                messageACKed(state.getMessageSize(), state.getLifetime(), numSends);
+                messageACKed(state.getMessageSize(), state.getLifetime(), numSends, anyPending);
                 //if (state.getPeer().getSendWindowBytesRemaining() > 0)
                 //    _throttle.unchoke(state.getPeer().getRemotePeer());
 
@@ -2085,13 +2049,10 @@ public class PeerState {
         synchronized (oldPeer._outboundMessages) {
             tmp2.addAll(oldPeer._outboundMessages);
             oldPeer._outboundMessages.clear();
-            retransmitter = oldPeer._retransmitter;
-            oldPeer._retransmitter = null;
         }
         if (!_dead) {
             synchronized (_outboundMessages) {
                 _outboundMessages.addAll(tmp2);
-                _retransmitter = retransmitter;
             }
         }
     }
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 37617a876..370cbda7f 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -345,6 +345,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
 
         _context.simpleTimer2().addPeriodicEvent(new PingIntroducers(), MIN_EXPIRE_TIMEOUT * 3 / 4);
     }
+
+    /**
+     * @returns the instance of OutboundMessageFragments
+     * @since 0.9.48
+     */
+    OutboundMessageFragments getOMF() {
+        return _fragments;
+    }
     
     /**
      *  Pick a port if not previously configured, so that TransportManager may

comment:12 Changed 6 days ago by zzz

Resolution: fixed
Status: newclosed
Note: See TracTickets for help on using tickets.