Opened 3 years ago

Last modified 10 months ago

#2427 accepted enhancement

Maintaining bandwidth correctly: PeerState.java

Reported by: jogger Owned by: zzz
Priority: minor Milestone: 0.9.49
Component: router/transport Version: 0.9.38
Keywords: Cc:
Parent Tickets: #2506 Sensitive: no

Description

I am making this a separate ticket, as it is of fundamental importance. This is to resolve the following issues:

  • Whenever available bandwidth is checked, it must be made sure that the value returned belongs to the current time interval. getSendWindowBytesRemaining() ignores this. This is the major performance issue I2CP users complained about as they often come with large chunks of data that will then neither notify getNextVolley() nor cause an immediate send that should be allowed.
  • Maintaining bandwidth in long time intervals (currently ≥ 1000 ms by allocateSendingBytes()) leads to short term traffic bursts, as allowed traffic may concentrate at the end and the beginning of two consecutive time intervals, possibly causing a short term overload for outbound queues and the network connection.

The solution is easy by moving the bandwidth refill to getSendWindowBytesRemaining() and handing out bandwidth in much smaller chunks as low as 8 bytes only (unlikely, but possible on fast CPU). I simply do this before each send, provided the system clock has advanced. The maintenance of send window size stays untouched as well as the ACK logic.

Results are convincing:

  • I2CP traffic outbound (seeding torrents) clearly up
  • UDP packet pusher CPU usage clearly down
  • message delay seems down a bit
  • UDP send pauses smaller / more evenly distributed according to tcpdump

This is independent of #2412, however they make a perfect team as the improved logic in #2412 heavily relies on the correctness of the above functions.

Hope this diff is correct.

--- "PeerState orig.java"	2019-02-06 13:21:26.571744716 +0100
+++ "PeerState patch.java"	2019-02-06 14:06:03.014869734 +0100
@@ -479,7 +479,20 @@
 
     /** how many bytes can we send to the peer in the current second */
     public int getSendWindowBytesRemaining() {
-        synchronized(_outboundMessages) {
+        synchronized(_outboundMessages) { // possibly reentrant
+            long now = _context.clock().now();
+            int duration = (int)(now - _lastSendRefill);
+            if (_sendBytes > 0 && duration > 0 || duration >= 1000) {
+                _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);
+                _sendBps = (99 * _sendBps + _sendBytes + 50) / 100;
+             //if (isForACK) {
+            //    _sendACKBytes += size;
+            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
+            //}
+               _sendBytes = 0;
+            //_sendACKBytes = 0;
+                _lastSendRefill = now;
+            }
             return _sendWindowBytesRemaining;
         }
     }
@@ -694,43 +707,29 @@
     /**
      *  Caller should synch
      */
-    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
-        long duration = now - _lastSendRefill;
-        if (duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            _sendBytes += size;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            //if (isForACK) {
-            //    _sendACKBytes += size;
-            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
-            //}
-            _sendBytes = 0;
-            //_sendACKBytes = 0;
-            _lastSendRefill = now;
-        }
+    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
         //if (true) return true;
-        if (IGNORE_CWIN || size <= _sendWindowBytesRemaining || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
+        if (size <= getSendWindowBytesRemaining() || IGNORE_CWIN || isForACK || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
             if ( (messagePushCount == 0) && (_concurrentMessagesActive > _concurrentMessagesAllowed) ) {
                 _consecutiveRejections++;
                 _context.statManager().addRateData("udp.rejectConcurrentActive", _concurrentMessagesActive, _consecutiveRejections);
                 return false;
-            } else if (messagePushCount == 0) {
+            }
+            if (messagePushCount == 0) {
                 _context.statManager().addRateData("udp.allowConcurrentActive", _concurrentMessagesActive, _concurrentMessagesAllowed);
                 _concurrentMessagesActive++;
                 if (_consecutiveRejections > 0) 
                     _context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _concurrentMessagesActive);
                 _consecutiveRejections = 0;
             }
-            _sendWindowBytesRemaining -= size; 
+            _sendWindowBytesRemaining -= size;
             _sendBytes += size;
-            _lastSendTime = now;
-            //if (isForACK) 
+            _lastSendTime = _context.clock().now();
+            //if (isForACK)
             //    _sendACKBytes += size;
             return true;
-        } else {
-            return false;
         }
+        return false;
     }
     
     /** if we need to contact them, do we need to talk to an introducer? */
@@ -1178,22 +1177,16 @@
                         _sendWindowBytes += bytesACKed; //512; // bytesACKed;
                 //}
             }
-        } else {
-            int allow = _concurrentMessagesAllowed - 1;
-            if (allow < MIN_CONCURRENT_MSGS)
-                allow = MIN_CONCURRENT_MSGS;
-            _concurrentMessagesAllowed = allow;
-        }
+        } else
+            _concurrentMessagesAllowed = Math.max(_concurrentMessagesAllowed - 1, MIN_CONCURRENT_MSGS);
+
         if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
             _sendWindowBytes = MAX_SEND_WINDOW_BYTES;
         _lastReceiveTime = _context.clock().now();
         _lastSendFullyTime = _lastReceiveTime;
         
         //if (true) {
-            if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
-                _sendWindowBytesRemaining += bytesACKed;
-            else
-                _sendWindowBytesRemaining = _sendWindowBytes;
+            _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + bytesACKed, _sendWindowBytes);
         //}
         
         if (numSends < 2) {

Subtickets

Attachments (3)

udp_sendBps.png (149.6 KB) - added by Zlatin Balevsky 3 years ago.
sendBps_graph_2.png (347.6 KB) - added by Zlatin Balevsky 2 years ago.
Stat still broken
ssu-fragments.ods (21.7 KB) - added by Zlatin Balevsky 10 months ago.
Benchmark of 0.9.48 vanilla vs. fragments patch

Download all attachments as: .zip

Change History (28)

comment:1 Changed 3 years ago by jogger

After introducing the sliding window now there is a cosmetic glitch in the above full stats only calculation of _sendBps. Should be

_sendBps = (99 * _sendBps + _sendBytes * 1000 / duration + 50) / 100;

comment:2 Changed 3 years ago by zzz

@OP reference to "the major performance issue I2CP users complained about" ? link or ticket?

related: #2424

comment:3 Changed 3 years ago by jogger

I was referring to complaints in current and previous forums about torrent send speed. The updated bandwidth calculation deals with possibly not allowing traffic that would fit the send window.

comment:4 Changed 3 years ago by zzz

Parent Tickets: 2412

This ticket is to fix a bug described in #2412 comment 21.

According to OP here, #2412 relies on this ticket, so making this one a child.

comment:5 Changed 3 years ago by Zlatin Balevsky

I haven't analyzed the patch as thoroughly as I should yet, but I did put it on my busy router and the udp.sendBps metric is way, way up. Overall traffic however is about the same, which leads me to think that metric is somehow broken by this patch.

comment:6 Changed 3 years ago by Zlatin Balevsky

You are allocating too much bandwidth in the following logic:

if (_sendBytes > 0 && duration > 0 || duration >= 1000) {
                _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);

To see what I mean, imagine the initial available bytes are X. They get used up immediately, in less than 1ms (not likely but possible). 1ms elapses and you allocate another X/1000 + 0.5, and so on - total bandwidth used for the second interval will be 2X + 500.

Last edited 3 years ago by Zlatin Balevsky (previous) (diff)

comment:7 Changed 3 years ago by jogger

Correct sendBps calculation now found in the combined patch within #2412.

My bandwidth calculation is correct. The current logic hands out too little bw when not run exactly every 1000 ms. It might hand out too much if X is transmitted twice within a second when an interval boundary falls in between.

In my logic using a sliding window, if X is available at any given point in time, this means there has been no traffic over the past second. So X can be used up immediately, beginning a new interval with bandwidth building up to X again over the netxt 1000 ms. The 500 mentioned statistically vanish through integer truncation.

Changed 3 years ago by Zlatin Balevsky

Attachment: udp_sendBps.png added

comment:8 Changed 3 years ago by Zlatin Balevsky

The attachment udp_sendBps.png is with the patch from comment 32 from ticket 2412. Doesn't look fixed at all. Are you sure about that multiplication by 1000?

comment:9 Changed 3 years ago by jogger

absolutely sure. To see it is correct just assume duration = 1000,

comment:10 Changed 2 years ago by jogger

Sensitive: unset

I am continuing here as zzz suggested. Full overhauled Patch below including necessary mods to the debug code. Questions raised by zab to solve: Are my bandwidth calcs correct?

sendBps is calculated as an sliding average with integer rounding:

_sendBps = (99 * _sendBps + _sendBytes * 1000 / duration + 50) / 100;

Question: Does the term "_sendBytes * 1000 / duration" deliver correct current bandwidth?
2,000 bytes sent within 50 ms: Result 40,000 - correct
100 bytes sent within 500 ms: Result 200 - correct

_sendWindowBytesRemaining is refilled up to the max using:

sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);

The term "(_sendWindowBytes * duration + 500) / 1000" is the integer rounded number of bytes to be sent within "duration" ms. Since "duration" has elapsed, this is the correct refill.

Question raised by zab: What if the full amount is used within the first ms?
It is true that we then hand out a small number of "_sendWindowBytes / 1000" in the next ms. This can happen only in the first second directly after the peer is initialized. Afterwards a full window means zero traffic in the past second, so such a send can be attributed to the past interval and it is completely legal to gradually refill.

If someome does not want this behaviour, _sendWindowBytesRemaining could be initialized to some lower value, even 0. Please note that the current code allows 100% overdraft anytime by sending the full amount at the end and the beginning of two consecutive intervals. There are also gaps if the code is not called every ms.

comment:11 in reply to:  10 Changed 2 years ago by Zlatin Balevsky

Replying to jogger:

Full overhauled Patch below

Can you attach a patch against current mtn trunk so I can test whether the issue with the metric is still there?

comment:12 Changed 2 years ago by jogger

Never worked with mtn. Patch against stock 0.9.41:

--- ~/i2p/i2p-0.9.41/router/java/src/net/i2p/router/transport/udp/PeerState.java	2019-07-03 14:27:37.000000000 +0200
+++ ~/i2p/41p/router/java/src/net/i2p/router/transport/udp/PeerState.java	2019-08-06 20:24:32.000000000 +0200
@@ -491,7 +491,18 @@
 
     /** how many bytes can we send to the peer in the current second */
     public int getSendWindowBytesRemaining() {
-        synchronized(_outboundMessages) {
+        synchronized(_outboundMessages) { // possibly reentrant
+            long now = _context.clock().now();
+            int duration;
+            if ((duration = (int)(now - _lastSendRefill)) <= 0)
+                return _sendWindowBytesRemaining;
+            _sendBps = (99 * _sendBps + _sendBytes * 1000 / duration + 50) / 100;
+            _lastSendRefill = now;
+            _sendBytes = 0;
+            if (duration < 1000)
+                _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);
+            else
+                _sendWindowBytesRemaining = _sendWindowBytes;
             return _sendWindowBytesRemaining;
         }
     }
@@ -702,20 +713,7 @@
      *  Caller should synch
      */
     private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
-        long duration = now - _lastSendRefill;
-        if (duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            _sendBytes += size;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            //if (isForACK) {
-            //    _sendACKBytes += size;
-            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
-            //}
-            _sendBytes = 0;
-            //_sendACKBytes = 0;
-            _lastSendRefill = now;
-        }
+        int sendWindowBytesRemaining = getSendWindowBytesRemaining();
 
         // Ticket 2505
         // We always send all unacked fragments for a message,
@@ -741,7 +739,7 @@
             if (_sendWindowBytesRemaining < 0)
                 _sendWindowBytesRemaining = 0; 
             _sendBytes += size;
-            _lastSendTime = now;
+            _lastSendTime = _context.clock().now();
             //if (isForACK) 
             //    _sendACKBytes += size;
             return true;
@@ -1848,7 +1846,7 @@
             if (allocateSendingBytes(size, state.getPushCount())) {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("Allocation of " + size + " allowed with " 
-                              + getSendWindowBytesRemaining() 
+                              + _sendWindowBytesRemaining
                               + "/" + getSendWindowBytes() 
                               + " remaining"
                               + " for message " + state.getMessageId() + ": " + state);
@@ -1873,7 +1871,7 @@
                 //    state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
                 if (_log.shouldLog(Log.INFO))
                     _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
-                              + " available=" + getSendWindowBytesRemaining()
+                              + " available=" + _sendWindowBytesRemaining
                               + " for message " + state.getMessageId() + ": " + state);
                 state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
                                       _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);

Changed 2 years ago by Zlatin Balevsky

Attachment: sendBps_graph_2.png added

Stat still broken

comment:13 Changed 2 years ago by jogger

Got it! Another decade old bug. To see current code also overstating sendBps, just monitor a dying connecting. I just converted the logic from floating point to integer, overlooking the false underlying assumption of all intervals being of equal length.

Now using a moving 10 sec average which gets reset if traffic drops down to one packet within 10 sec or less. Should work much better.

--- ~/i2p/i2p-0.9.41/router/java/src/net/i2p/router/transport/udp/PeerState.java	2019-07-03 14:27:37.000000000 +0200
+++ ~/i2p/41p/router/java/src/net/i2p/router/transport/udp/PeerState.java	2019-08-08 08:42:36.000000000 +0200
@@ -491,7 +491,21 @@
 
     /** how many bytes can we send to the peer in the current second */
     public int getSendWindowBytesRemaining() {
-        synchronized(_outboundMessages) {
+        synchronized(_outboundMessages) { // possibly reentrant
+            long now = _context.clock().now();
+            int duration;
+            if ((duration = (int)(now - _lastSendRefill)) <= 0)
+                return _sendWindowBytesRemaining;
+            int oldBps = _sendBps;
+            _sendBps = (_sendBytes + 5) / 10; // sendrate for 10 sec interval
+            if (duration < 10000) // add earlier traffic from rest of 10 sec
+                _sendBps += ((10000 - duration) * oldBps + 5000) / 10000;
+            _lastSendRefill = now;
+            _sendBytes = 0;
+            if (duration < 1000)
+                _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);
+            else
+                _sendWindowBytesRemaining = _sendWindowBytes;
             return _sendWindowBytesRemaining;
         }
     }
@@ -702,20 +716,7 @@
      *  Caller should synch
      */
     private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
-        long duration = now - _lastSendRefill;
-        if (duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            _sendBytes += size;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            //if (isForACK) {
-            //    _sendACKBytes += size;
-            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
-            //}
-            _sendBytes = 0;
-            //_sendACKBytes = 0;
-            _lastSendRefill = now;
-        }
+        int sendWindowBytesRemaining = getSendWindowBytesRemaining();
 
         // Ticket 2505
         // We always send all unacked fragments for a message,
@@ -741,7 +742,7 @@
             if (_sendWindowBytesRemaining < 0)
                 _sendWindowBytesRemaining = 0; 
             _sendBytes += size;
-            _lastSendTime = now;
+            _lastSendTime = _context.clock().now();
             //if (isForACK) 
             //    _sendACKBytes += size;
             return true;
@@ -1848,7 +1849,7 @@
             if (allocateSendingBytes(size, state.getPushCount())) {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("Allocation of " + size + " allowed with " 
-                              + getSendWindowBytesRemaining() 
+                              + _sendWindowBytesRemaining
                               + "/" + getSendWindowBytes() 
                               + " remaining"
                               + " for message " + state.getMessageId() + ": " + state);
@@ -1873,7 +1874,7 @@
                 //    state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
                 if (_log.shouldLog(Log.INFO))
                     _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
-                              + " available=" + getSendWindowBytesRemaining()
+                              + " available=" + _sendWindowBytesRemaining
                               + " for message " + state.getMessageId() + ": " + state);
                 state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
                                       _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);

comment:14 Changed 2 years ago by zzz

There's a lot above over the six months. Would you please summarize how the current code "overstates", or point to one of the above comments if it already says it? Thanks!

comment:15 Changed 2 years ago by jogger

The overstating occurs in the stats calculation

_sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));

Each interval gets the same weight, regardless of duration. Imagine a long series of alternating intervals of 1000 and 10000 ms with 1k transferred each. Actual bandwidth used is 2k/11000ms = 182 Bps. The formula will give you the average of 1 KBps and 100 Bps = 550 Bps long time. That is the pitfall that zab noticed and I overlooked in my first patch.

The actual bandwidth handed out is too small of course, since there are gaps if duration > 1000.

comment:16 Changed 2 years ago by zzz

Parent Tickets: 24122412, 2506

Thank you for the explanation.
Note: #2506 now depends on this ticket.

Two things about _sendBps:

  • It's currently a smoothed average. In your example, with two data points, yes it is not exact. It's never exact, it's an approximation.
  • _sendBps is NOT used for any accept/reject logic. It's for display only, on the /peers page. Your patches above do not use _sendBps for any decision making either. So changing the calculation doesn't change anything.

There may be changes to make on the measurement window, but the _sendBps calculation is not the root cause of anything.

comment:17 Changed 12 months ago by zzz

Parent Tickets: 2412, 25062506

comment:18 Changed 12 months ago by zzz

Milestone: undecided0.9.49
Priority: majorminor

Now that phase 1 of the congestion control is done by #2713, we can work on phase 2 which is Westwood+. A byproduct of that will be a bandwidth estimator, which we can use for PS.getSendBps() and getReceiveBps() for display on /peers.

comment:19 Changed 11 months ago by zzz

Status: newaccepted

WIP, lightly tested
probably needs more work before performance testing
contains some unrelated cleanup I might split out and apply separately
comments welcome

#
# old_revision [5f3f9fcf8ce23d55b9696463f25a99ae9153bcae]
#
# add_file "router/java/src/net/i2p/router/transport/udp/BandwidthEstimator.java"
#  content [de86099a8404efb913fa3b3854e8cc55e4130b3d]
# 
# add_file "router/java/src/net/i2p/router/transport/udp/SimpleBandwidthEstimator.java"
#  content [7ee283d139c0e5905af1ae8905154509cd8a1ea7]
# 
# patch "router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java"
#  from [c7025d6fb800e85baf233b62a965cccb85472cf2]
#    to [742a3736c04a89548a1a71b1b3b0978846ebac8a]
# 
# patch "router/java/src/net/i2p/router/transport/udp/PeerState.java"
#  from [f97884f28504ac4c07435372804b006cfe2b393b]
#    to [e7506bb175343e29bf96fccfd25202304bb314c5]
#
============================================================
--- /dev/null	
+++ router/java/src/net/i2p/router/transport/udp/BandwidthEstimator.java	de86099a8404efb913fa3b3854e8cc55e4130b3d
@@ -0,0 +1,20 @@
+package net.i2p.router.transport.udp;
+
+/**
+ *  A Westwood bandwidth estimator
+ *
+ *  @since 0.9.49 adapted from streaming
+ */
+interface BandwidthEstimator {
+
+    /**
+     * Records an arriving ack.
+     * @param acked how many bytes were acked with this ack
+     */
+    public void addSample(int acked);
+
+    /**
+     * @return the current bandwidth estimate in bytes/ms.
+     */
+    public float getBandwidthEstimate();
+}
============================================================
--- router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java	c7025d6fb800e85baf233b62a965cccb85472cf2
+++ router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java	742a3736c04a89548a1a71b1b3b0978846ebac8a
@@ -170,8 +170,6 @@ class EstablishmentManager {
         _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
-        _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
-        _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
============================================================
--- router/java/src/net/i2p/router/transport/udp/PeerState.java	f97884f28504ac4c07435372804b006cfe2b393b
+++ router/java/src/net/i2p/router/transport/udp/PeerState.java	e7506bb175343e29bf96fccfd25202304bb314c5
@@ -124,17 +124,10 @@ public class PeerState {
     private int _sendWindowBytes;
     /** how many bytes can we send to the peer in the current second */
     private int _sendWindowBytesRemaining;
-    private long _lastSendRefill;
+    private final BandwidthEstimator _bwEstimator;
     // smoothed value, for display only
-    private int _sendBps;
-    private int _sendBytes;
-    // smoothed value, for display only
     private int _receiveBps;
     private int _receiveBytes;
-    //private int _sendACKBps;
-    //private int _sendZACKBytes;
-    //private int _receiveACKBps;
-    //private int _receiveACKBytes;
     private long _receivePeriodBegin;
     private volatile long _lastCongestionOccurred;
     /** 
@@ -173,8 +166,6 @@ public class PeerState {
     private int _largeMTU;
     /* how many consecutive packets at or under the min MTU have been received */
     private long _consecutiveSmall;
-    /** when did we last check the MTU? */
-    //private long _mtuLastChecked;
     private int _mtuIncreases;
     private int _mtuDecreases;
     /** current round trip time estimate */
@@ -242,8 +233,6 @@ public class PeerState {
     /** Last time it was made an introducer **/
     private long _lastIntroducerTime;
 
-    private static final int DEFAULT_SEND_WINDOW_BYTES = 8*1024;
-    private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
     private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
 
     /**
@@ -347,12 +336,8 @@ public class PeerState {
         _lastReceiveTime = now;
         _currentACKs = new ConcurrentHashSet<Long>();
         _currentACKsResend = new LinkedBlockingQueue<ResendACK>();
-        _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
-        _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
         _slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
-        _lastSendRefill = now;
         _receivePeriodBegin = now;
-        _lastCongestionOccurred = -1;
         _remotePort = remotePort;
         if (remoteIP.length == 4) {
             _mtu = DEFAULT_MTU;
@@ -363,7 +348,13 @@ public class PeerState {
             _mtuReceive = MIN_IPV6_MTU;
             _largeMTU = transport.getMTU(true);
         }
-        //_mtuLastChecked = -1;
+        // RFC 5681 sec. 3.1
+        if (_mtu > 1095)
+            _sendWindowBytes = 3 * _mtu;
+        else
+            _sendWindowBytes = 4 * _mtu;
+        _sendWindowBytesRemaining = _sendWindowBytes;
+
         _lastACKSend = -1;
 
         _rto = INIT_RTO;
@@ -382,6 +373,7 @@ public class PeerState {
         _remotePeer = remotePeer;
         _isInbound = isInbound;
         _remoteHostId = new RemoteHostId(remoteIP, remotePort);
+        _bwEstimator = new SimpleBandwidthEstimator(ctx, this);
     }
     
     /** 
@@ -539,13 +531,6 @@ public class PeerState {
      */
     public int getReceiveMTU() { return _mtuReceive; }
 
-    /** when did we last check the MTU? */
-  /****
-    public long getMTULastChecked() { return _mtuLastChecked; }
-    public long getMTUIncreases() { return _mtuIncreases; }
-    public long getMTUDecreases() { return _mtuDecreases; }
-  ****/
-    
     /** 
      * The AES key used to verify packets, set only after the connection is
      * established.  
@@ -642,10 +627,10 @@ public class PeerState {
     }
 
     /**
-     * An approximation, for display only
+     * The Westwood+ bandwidth estimate
      * @return the smoothed send transfer rate
      */
-    public int getSendBps() { return _sendBps; }
+    public int getSendBps() { return (int) (_bwEstimator.getBandwidthEstimate() * 1000); }
 
     /**
      * An approximation, for display only
@@ -672,17 +657,7 @@ public class PeerState {
         return now - lastActivity;
     }
     
-    /** how fast we are sending *ack* packets */
-    //public int getSendACKBps() { return _sendACKBps; }
-    //public int getReceiveACKBps() { return _receiveACKBps; }
-    
     /** 
-     * have all of the packets received in the current second requested that
-     * the previous second's ACKs be sent?
-     */
-    //public void remoteDoesNotWantPreviousACKs() { _remoteWantsPreviousACKs = false; }
-    
-    /** 
      * Decrement the remaining bytes in the current period's window,
      * returning true if the full size can be decremented, false if it
      * cannot.  If it is not decremented, the window size remaining is 
@@ -696,22 +671,9 @@ public class PeerState {
 
     /**
      *  Caller should synch
+     *  @param isForACK unused
      */
     private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount, long now) { 
-        long duration = now - _lastSendRefill;
-        if (duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            _sendBytes += size;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            //if (isForACK) {
-            //    _sendACKBytes += size;
-            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
-            //}
-            _sendBytes = 0;
-            //_sendACKBytes = 0;
-            _lastSendRefill = now;
-        }
-
         // Ticket 2505
         // We always send all unacked fragments for a message,
         // because we don't have any mechanism in OutboundMessageFragments
@@ -735,10 +697,7 @@ public class PeerState {
             _sendWindowBytesRemaining -= size; 
             if (_sendWindowBytesRemaining < 0)
                 _sendWindowBytesRemaining = 0; 
-            _sendBytes += size;
             _lastSendTime = now;
-            //if (isForACK) 
-            //    _sendACKBytes += size;
             return true;
         } else {
             return false;
@@ -763,14 +722,6 @@ public class PeerState {
      */
     void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; }
 
-    /** what is the largest packet we can send to the peer? */
-  /****
-    public void setMTU(int mtu) { 
-        _mtu = mtu; 
-        _mtuLastChecked = _context.clock().now();
-    }
-  ****/
-
     /**
      *  stat in SST column, otherwise unused,
      *  candidate for removal
@@ -850,12 +801,8 @@ public class PeerState {
         long duration = now - _receivePeriodBegin;
         if (duration >= 1000) {
             _receiveBps = (int)(0.9f*_receiveBps + 0.1f*(_receiveBytes * (1000f/duration)));
-            //if (isForACK)
-            //    _receiveACKBps = (int)(0.9f*(float)_receiveACKBps + 0.1f*((float)_receiveACKBytes * (1000f/(float)duration)));
-            //_receiveACKBytes = 0;
             _receiveBytes = 0;
             _receivePeriodBegin = now;
-           _context.statManager().addRateData("udp.receiveBps", _receiveBps);
         }
         
         if (_wantACKSendSince <= 0)
@@ -907,32 +854,28 @@ public class PeerState {
      * either they told us to back off, or we had to resend to get 
      * the data through.  
      *  Caller should synch on this
-     *  @return true if window shrunk, but nobody uses the return value
      */
-    private boolean congestionOccurred() {
+    private void congestionOccurred() {
         long now = _context.clock().now();
         if (_lastCongestionOccurred + _rto > now)
-            return false; // only shrink once every few seconds
+            return; // only shrink once every few seconds
         _lastCongestionOccurred = now;
-        
+        // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+        // 2. cut ssthresh to bandwidth estimate, window to 1 MTU
+        // 3. Retransmit up to half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
         int congestionAt = _sendWindowBytes;
-        //if (true)
-        //    _sendWindowBytes -= 10000;
-        //else
-            _sendWindowBytes = _sendWindowBytes/2; //(_sendWindowBytes*2) / 3;
-        if (_sendWindowBytes < MINIMUM_WINDOW_BYTES)
-            _sendWindowBytes = MINIMUM_WINDOW_BYTES;
-        //if (congestionAt/2 < _slowStartThreshold)
-            _slowStartThreshold = congestionAt/2;
+        _sendWindowBytes = _mtu;
+        int oldsst = _slowStartThreshold;
+        _slowStartThreshold = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * _rtt), 2 * _mtu);
 
         int oldRto = _rto;
         long oldTimer = _retransmitTimer - now;
         _rto = Math.min(MAX_RTO, Math.max(minRTO(), _rto << 1 ));
         _retransmitTimer = now + _rto;
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now));
-
-        return true;
+        if (_log.shouldInfo())
+            _log.info(_remotePeer + " Congestion, RTO: " + oldRto + " -> " + _rto + " timer: " + oldTimer + " -> " + (_retransmitTimer - now) +
+                                    " window: " + congestionAt + " -> " + _sendWindowBytes +
+                                    " SST: " + oldsst + " -> " + _slowStartThreshold);
     }
     
     /**
@@ -1221,12 +1164,9 @@ public class PeerState {
         _lastReceiveTime = _context.clock().now();
         _lastSendFullyTime = _lastReceiveTime;
         
-        //if (true) {
-            if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
-                _sendWindowBytesRemaining += bytesACKed;
-            else
-                _sendWindowBytesRemaining = _sendWindowBytes;
-        //}
+        _sendWindowBytesRemaining += bytesACKed;
+        if (_sendWindowBytesRemaining > _sendWindowBytes)
+            _sendWindowBytesRemaining = _sendWindowBytes;
         
         if (numSends < 2) {
             // caller synchs
@@ -1258,10 +1198,9 @@ public class PeerState {
         synchronized(this) {
             locked_messageACKed(bytesACKed, lifetime, numSends, anyPending);
         }
+        _bwEstimator.addSample(bytesACKed);
         if (numSends >= 2 && _log.shouldLog(Log.INFO))
             _log.info(_remotePeer + " acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed);
-        
-        _context.statManager().addRateData("udp.sendBps", _sendBps);
     }
 
     /** This is the value specified in RFC 2988 */
============================================================
--- /dev/null	
+++ router/java/src/net/i2p/router/transport/udp/SimpleBandwidthEstimator.java	7ee283d139c0e5905af1ae8905154509cd8a1ea7
@@ -0,0 +1,159 @@
+package net.i2p.router.transport.udp;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.DataHelper;
+import net.i2p.util.Log;
+
+/**
+ *  A Westwood+ bandwidth estimator with
+ *  a first stage anti-aliasing low pass filter based on RTT,
+ *  and the time-varying Westwood filter based on inter-arrival time.
+ *
+ *  Ref: TCP Westwood: End-to-End Congestion Control for Wired/Wireless Networks
+ *  Casetti et al
+ *  (Westwood)
+ *
+ *  Ref: End-to-End Bandwidth Estimation for Congestion Control in Packet Networks
+ *  Grieco and Mascolo
+ *  (Westwood+)
+ *
+ *  Adapted from: Linux kernel tcp_westwood.c (GPLv2)
+ *
+ *  @since 0.9.49 adapted from streaming
+ */
+class SimpleBandwidthEstimator implements BandwidthEstimator {
+
+    private final I2PAppContext _context;
+    private final Log _log;
+    private final PeerState _state;
+
+    private long _tAck;
+    // bw_est, bw_ns_est
+    private float _bKFiltered, _bK_ns_est;
+    // bk
+    private int _acked;
+
+    // As in kernel tcp_westwood.c
+    // Should probably match ConnectionOptions.TCP_ALPHA
+    private static final int DECAY_FACTOR = 8;
+    private static final int WESTWOOD_RTT_MIN = 500;
+
+    SimpleBandwidthEstimator(I2PAppContext ctx, PeerState state) {
+        _log = ctx.logManager().getLog(SimpleBandwidthEstimator.class);
+        _context = ctx;
+        _state = state;
+        // assume we're about to send something
+        _tAck = ctx.clock().now();
+        _acked = -1;
+    }
+
+    /**
+     * Records an arriving ack.
+     * @param acked how many bytes were acked with this ack
+     */
+    public synchronized void addSample(int acked) {
+        long now = _context.clock().now();
+        if (_acked < 0) {
+            // first sample
+            // use time since constructed as the RTT
+            // getRTT() would return zero here.
+            long deltaT = Math.max(now - _tAck, WESTWOOD_RTT_MIN);
+            float bkdt = ((float) acked) / deltaT;
+            _bKFiltered = bkdt;
+            _bK_ns_est = bkdt;
+            _acked = 0;
+            _tAck = now;
+            if (_log.shouldDebug())
+                _log.debug("first sample packets: " + acked + " deltaT: " + deltaT + ' ' + this);
+        } else {
+            _acked += acked;
+            // anti-aliasing filter
+            // As in kernel tcp_westwood.c
+            // and the Westwood+ paper
+            if (now - _tAck >= Math.max(_state.getRTT(), WESTWOOD_RTT_MIN))
+                computeBWE(now);
+        }
+    }
+
+    /**
+     * @return the current bandwidth estimate in bytes/ms.
+     */
+    public synchronized float getBandwidthEstimate() {
+        long now = _context.clock().now();
+        // anti-aliasing filter
+        // As in kernel tcp_westwood.c
+        // and the Westwood+ paper
+        if (now - _tAck >= Math.max(_state.getRTT(), WESTWOOD_RTT_MIN))
+            return computeBWE(now);
+        return _bKFiltered;
+    }
+
+    private synchronized float computeBWE(final long now) {
+        if (_acked < 0)
+            return 0.0f; // nothing ever sampled
+        updateBK(now, _acked);
+        _acked = 0;
+        return _bKFiltered;
+    }
+
+    /**
+     * Optimized version of updateBK with packets == 0
+     */
+    private void decay() {
+        _bK_ns_est *= (DECAY_FACTOR - 1) / (float) DECAY_FACTOR;
+        _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
+    }
+
+    /**
+     * Here we insert virtual null samples if necessary as in Westwood,
+     * And use a very simple EWMA (exponential weighted moving average)
+     * time-varying filter, as in kernel tcp_westwood.c
+     * 
+     * @param time the time of the measurement
+     * @param packets number of packets acked
+     */
+    private void updateBK(long time, int packets) {
+        long deltaT = time - _tAck;
+        int rtt = Math.max(_state.getRTT(), WESTWOOD_RTT_MIN);
+        if (deltaT > 2 * rtt) {
+            // Decay with virtual null samples as in the Westwood paper
+            int numrtts = Math.min((int) ((deltaT / rtt) - 1), 2 * DECAY_FACTOR);
+            for (int i = 0; i < numrtts; i++) {
+                decay();
+            }
+            deltaT -= numrtts * rtt;
+            if (_log.shouldDebug())
+                _log.debug("decayed " + numrtts + " times, new _bK_ns_est: " + _bK_ns_est + ' ' + this);
+        }
+        float bkdt;
+        if (packets > 0) {
+            // As in kernel tcp_westwood.c
+            bkdt = ((float) packets) / deltaT;
+            _bK_ns_est = westwood_do_filter(_bK_ns_est, bkdt);
+            _bKFiltered = westwood_do_filter(_bKFiltered, _bK_ns_est);
+        } else {
+            bkdt = 0;
+            decay();
+        }
+        _tAck = time;
+        if (_log.shouldDebug())
+            _log.debug("computeBWE packets: " + packets + " deltaT: " + deltaT +
+                       " bk/deltaT: " + bkdt + " _bK_ns_est: " + _bK_ns_est + ' ' + this);
+    }
+
+    /**
+     *  As in kernel tcp_westwood.c
+     */
+    private static float westwood_do_filter(float a, float b) {
+        return (((DECAY_FACTOR - 1) * a) + b) / DECAY_FACTOR;
+    }
+
+    @Override
+    public synchronized String toString() {
+        return "SBE[" +
+                " _bKFiltered " + _bKFiltered +
+                " _tAck " + _tAck + "; " +
+                DataHelper.formatSize2Decimal((long) (_bKFiltered * 1000 * _state.getMTU()), false) +
+                "Bps]";
+    }
+}

comment:20 Changed 11 months ago by zzz

At least one of the problems with the above patch is the reduction of DEFAULT_SEND_WINDOW_BYTES from 8K to 2480. This exacerbates a longstanding issue with not being able to send only some of the outstanding fragments, which leaves a lot of the send window unused. See #2505.

I think I may need to address this first. Will involve changes to OMF, OMS, and PS.

So the order of things will probably be:

  • nudge() fixes (just before 48 release)
  • /peers SSU tab enhancements for easier debugging
  • PS cleanup
  • send subset of fragments
  • w+

comment:21 Changed 10 months ago by zzz

first three items from comment 20 done.
Working on sending individual fragments now.

comment:22 Changed 10 months ago by zzz

update - done:

  • nudge() fixes (just before 48 release)
  • /peers SSU tab enhancements for easier debugging
  • PS cleanup
  • OMF loop workaround to fix issues triggered by reducing default window size, in 0.9.48-4

todo:

Changed 10 months ago by Zlatin Balevsky

Attachment: ssu-fragments.ods added

Benchmark of 0.9.48 vanilla vs. fragments patch

comment:23 Changed 10 months ago by zzz

0.9.48-4 loop fix
0.9.48-5 fragments
0.9.48-6 w+
0.9.48-7 partial ack fix

comment:24 Changed 10 months ago by zzz

41c7b7382ade20f207cbee9c619ba00095f1ce21 0.9.48-12 fast rtx see gitlab MR 11

comment:25 Changed 10 months ago by zzz

Note: See TracTickets for help on using tickets.