Opened 10 months ago

Last modified 5 weeks ago

#2412 accepted enhancement

Unbrake getNextVolley

Reported by: jogger Owned by: zzz
Priority: major Milestone: undecided
Component: router/transport Version: 0.9.38
Keywords: Cc:
Parent Tickets: Sensitive: no

Description

No matter how I tried to optimize UDP packet pusher, I saw throughput up, but max CPU down (I have a cron job running that assigns a dedicated CPU core to UDP packet pusher notwithstanding GC of course, whenever i2p is restarted).

So clicking around in good old jconsole and saw getNextVolley frequently waiting. WHAT? > 1MBps UDP outbound and waiting? What for? So turns out that there is a forced 10ms wait at the end of getNextVolley. Might have been OK in the past, but if a senddelay is calulated it should be used. Will look deeper, if it is properly set before.

diff and results tomorrow. There is also some useless code I have cleaned up too.

Subtickets

#2427: Maintaining bandwidth correctly: PeerState.javanewzzz

Attachments (1)

bandwidth-jogger.png (263.0 KB) - added by Zlatin Balevsky 9 months ago.
Bandwidth with this patch

Download all attachments as: .zip

Change History (54)

comment:1 Changed 10 months ago by Zlatin Balevsky

I'm going to guess defensive programming against bugs in calculating nextSendDelay Yes this would affect throughput quite a bit at the expense of … ?

comment:2 Changed 10 months ago by jogger

Have been digging deeper. getNextVolley is fundamentally flawed. Really hard to read. Will fall into wait() too often with wrong sendNextDelay and despite packets outstanding.

Currently testing a much shorter cleaned up version. What the "defensive programming" achieved is that they made bandwidth limiters work without knowing. Will look into this. Easiest will be to insert a 1 ms wait() into allocatedSend() if not a single packet can be sent due to limits.

Results to come.

comment:3 Changed 10 months ago by jogger

It´s getting easier.

First a quick win. Reduced MAX_WAIT to 50. Now the job queue runner (I have only 1 configured) does start much faster along with the entire router. 1000ms wait between UDP packets was just too slow.

Now let´s completely ditch the CPU intensive getNextDelay() in favour of a self tuning idle timer that decrements when we send something and increments otherwise. This way we save CPU and automatically adapt to any load changes AND automagically integrate the bandwidth logic with timed waiting within getNextVolley(), which would have needed complex coding otherwise.

comment:4 Changed 10 months ago by Zlatin Balevsky

The more intrusive the change you're proposing, the more time it will take to analyze it properly. I can almost immediately support reducing the minimum wait of 10ms, but what you're talking about in comment 3 is another story.

I don't have the equipment to test a CPU-constrained scenario, my xeon is too fast and my ISP router is consumer-grade so it will choke if I try to increase the number of connections. So I'm relying on the numbers you give us, but if you're testing radical rather than incremental ideas then the numbers don't mean as much.

comment:5 Changed 10 months ago by jogger

You are right, but I have analyzed deep and we have to do this. It is not radical. New version will be cleaned up and shorter. After testing I will come up with a longer writeup and patch with detailed comments.

This should be upgraded to a defect because of the following:
DEFECT: During the final loop before waiting concurrent calls to peer.add() issue a useless notify(). The loop does not catch the adds in most cases. getNextVolley() goes into wait() with packets pending without a chance of being notified. This is the reason for seeing frequent wait despite higher load (see OP).
Minor: nextSendDelay gets cleared out at some random time within the loop.
Performance: Information from peer.add() gets lost even if this peer is the only one ready to send, leading to useless further looping.

comment:6 Changed 10 months ago by jogger

Why the current wait logic must be replaced:

  1. Severe design error: When we fall into wait() with unsent packets due to b/w limits using a wait time based on future unsent packets is plain ridiculous.
  1. Illegal implicit assumptions about traffic shape. Packets arrive from the internet at random points in time. Ideally they travel at near constant speed through the router to packet pusher. If this was the case at 1000 packets/s we would see wait(1) frequently and 2 or 3 occasionally. A peek a jconsole (stock 0.9.38) compared with top -H shows much longer wait cycles. Same goes for my simplified redesign currently in testing. So the packets arrive at packet pusher in bursts with idle time thereafter and the dev designed for that by looking at future packets. The reason (locking, blocking, some congestion elsewhere) does not matter. We need a design that works for any traffic shape and adapts to future code changes elsewhere. These bursts also make the above defect more dangerous than it looks because we might miss an entire burst and then wait() long.

Why we need the CPU currently spent in getNextDelay():

Apart from results being discarded most of the time and some design flaws we need the CPU to faster drive through the iterator. If the packets appear in bursts we need approx. one pass to clear them out and one pass to fall into wait(). If they would appear at a near constant rate we would need (given the current design) an average half pass to clear and one for the wait(). So: burst of 3 packets = 2 iterations, 3 single packets = 4.5 iterations. Since the function has no side effects it can be left out without harm.

My redesign will do away with getNextDelay() and adapt wait time based on send frequency, CPU speed and frequency of fruitless looping.

comment:7 Changed 10 months ago by jogger

Put several days into this, have tested many scenarios, would like to see soon. First used tcpdump to confirm outbound traffic is much more bursty than input. Holds also true for NTCP. So these long waits may go away any day when something else within i2p changes.

Description of proposed patch:

the current getNextVolley() loop is simplified and shortened, stripping out the getNextDelay(). Much easier to read, all flow control reduced to single conditions.

We introduce an adaptive wait timer (_idlewait, complete description in source)
The problem of concurrent add() is solved by always updating a hot send candidate in a long lived _targetPeer. This also provides an additional performance boost.

Test results for effective wait times

Mac mini server 200 tunnels on low 40 KBps total traffic: average wait time unchanged 38ms

ARM 32 medium traffic 200 tunnels, 64 I2CP tunnels, 185 packets/s UDP out, 320 KBps average total output, 1.000 UDP peers: Average wait time 14 up from 12 before, 67 waits/sec unchanged, this is reduced CPU.

same but on restricted bandwidth 200 KBps
12 tunnels, 64 I2CP tunnels, 110 packet/s UDP out, 173 kBps total output, 250 UDP peers, 66 waits/sec, 10 sec average wait.

ARM 32 high traffic >2.000 tunnels, 70 I2CP tunnels, 800 packets/s UDP out, 1.65 MBps average total output, 2.000 UDP peers: average wait 3ms, 205 waits/sec

same but running CPU constrained 3 slower cores only
near 2.000 tunnels, 70 I2CP tunnels, 660 packet/s UDP out, 1.3 MBps total output, 1.800 UDP peers, 66 waits/sec, 10 sec average wait.

These values appear totally sane to me, because of the notify() any higher traffic interrupts longer waits, so the current algorithm with its 10ms minimum does plain nothing in such case.

Here´s the patch against 0.9.38 for OutboundMessageFragments?.java:

64c64,68
<     private static final int MAX_WAIT = 1000;
---
>     private static final int MAX_WAIT = 50; // for a fast router start
> 
>     private int _idleWait;
>     private volatile PeerState _targetPeer; // holds latest concurrent add or hot send candidate otherwise
>         // this one we immediately handle in getNextVolley loop
71a76
>         _iterator = _activePeers.iterator();
73a79,80
>         _idleWait = 0;
>         _targetPeer = null;
197,199c204
<             int fsz = state.fragmentSize(0);
<             if (fsz < min)
<                 min = fsz;
---
>             min = Math.min(state.fragmentSize(0), min);
235a241
>             _targetPeer = peer; // caught in getNextVolley loop
271,274c277,280
<         // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
<         int peersProcessed = 0;
<         while (_alive && (states == null) ) {
<             int nextSendDelay = Integer.MAX_VALUE;
---
> 
>         while ((states == null) && _alive) {
>             // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
>             int peersProcessed = 0;
278,283d283
<                     // do we need a new long-lived iterator?
<                     if (_iterator == null ||
<                         ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
<                         _iterator = _activePeers.iterator();
<                     }
< 
290,298c290,310
<                     while (_iterator.hasNext()) {
<                         peer = _iterator.next();
<                         int remaining = peer.finishMessages(now);
<                         if (remaining <= 0) {
<                             // race with add()
<                             _iterator.remove();
<                             if (_log.shouldLog(Log.DEBUG))
<                                 _log.debug("No more pending messages for " + peer.getRemotePeer());
<                             continue;
---
>                     long size = _activePeers.size();
>                     while (peersProcessed < size) {
>                         // we will catch concurrent adds through _targetPeer
>                         // no need to update size
>                         // may check _targetPeer twice, unlikely
>                         if (_targetPeer != null) {
>                             peer = _targetPeer;
>                             // don´t not call finishMessages for this hot peer
>                             _targetPeer = null;
>                         } else {
>                             if (!_iterator.hasNext())
>                                 _iterator = _activePeers.iterator();
>                             peer = _iterator.next();
>                             peersProcessed++;
>                             if (peer.finishMessages(now) <= 0) {
>                                 // race with add()
>                                 _iterator.remove();
>                                 if (_log.shouldLog(Log.DEBUG))
>                                     _log.debug("No more pending messages for " + peer.getRemotePeer());
>                                 continue;
>                             }
300c312
<                         peersProcessed++;
---
> 
305,314d316
<                         } else if (peersProcessed >= _activePeers.size()) {
<                             // we've gone all the way around, time to sleep
<                             break;
<                         } else {
<                             // Update the minimum delay for all peers
<                             // which will be used if we found nothing to send across all peers
<                             int delay = peer.getNextDelay();
<                             if (delay < nextSendDelay)
<                                 nextSendDelay = delay;
<                             peer = null;
318c320
<                     //if (peer != null && _log.shouldLog(Log.DEBUG))
---
>                     //if (states != null && _log.shouldLog(Log.DEBUG))
322,334c324,327
<                     // if we've gone all the way through the loop, wait
<                     // ... unless nextSendDelay says we have more ready now
<                     if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
<                         peersProcessed = 0;
<                         // why? we do this in the loop one at a time
<                         //finishMessages();
<                         // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
<                         // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
<                         // gets called regularly
<                         int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
<                         //if (_log.shouldLog(Log.DEBUG))
<                         //    _log.debug("wait for " + toWait);
<                         // wait.. or somethin'
---
>                     if (states == null) {
>                         // sleep 1 if many consecutive sends before, add up recent loop time otherwise
>                         _idleWait = Math.min(Math.max(1, ++_idleWait + (int)(_context.clock().now() - now)), MAX_WAIT);
>                         // adapts wait time to load and CPU speed
337c330
<                                 _activePeers.wait(toWait);
---
>                                 _activePeers.wait(_idleWait);
339c332,333
<                                 // noop
---
>                                 //Oops, we waited too long
>                                 _idleWait = 0;  // turns negative when we send to the guy that woke us up
341c335
<                                      _log.debug("Woken up while waiting");
---
>                                     _log.debug("Woken up while waiting");
348d341
< 
354a348,353
> 
>         if (packets != null) {
>             if (_targetPeer == null) // no recent peer.add()? Give current peer a second chance
>                 _targetPeer = peer; // overwritten by peer.add(). Performance !! this guy may have more
>             _idleWait--; // we are busy so reduce next wait
>         }

comment:8 Changed 10 months ago by Zlatin Balevsky

Two quick comments:

  • _targetPeer should be a LinkedHashSet, not a single field as multiple hot peers may be added while the pusher thread is iterating through all peers
  • in ::add(PeerState, int) _targetPeer should be written inside the synchronized(_activePeers) block even if it's volatile because the adding thread may get preempted between the notify() call and the writing to the field, which will result in the added hot peer to be missed.

And one not-so-quick comment: In very high load scenarios this approach may cause one or more busy peers to "starve" slower peers. The existing logic errs on the opposite side - i.e. busy peers may be starved by many slower peers.

comment:9 Changed 10 months ago by jogger

Already thought of your comments. I just want to make sure not to fall into wait() when there at least is a recent missed add. I do not care whether it is overwritten, I need just one that works. All others will be found by the next round robin pass. Look at the code, with every ready peer getNextVolley() breaks out of the loop for sending and afterwards a new round starts finding all recent add(). Nothing to be missed anymore.

In those high load scenarios this is what I exactly aim at: Find them all during the next pass through the iterator which will also at the same time cover any other not so busy peers that need service. This is the reason not collecting and servicing all recent add() —> that would be a LIFO with the detrimental effect you described.

I am already making progress on the packet burst phenomenon.

comment:10 Changed 10 months ago by Zlatin Balevsky

Ok, makes more sense. The second quick comment from comment 8 still stands. Also, I suggest making _targetPeer an AtomicReference and rewriting the hot path to

PeerState tmp = _targetPeer.getAndSet(null);
if (tmp != null) {
    peer = tmp;
} else {
...

then when you're about to decrement _idleWait

if (packets != null) {
    _targetPeer.compareAndSet(null,peer);
    _idleWait--;
}

(although I think this latter logic unnecessarily gives weight to busy peers)

This way you avoid many possible races between the thread(s) that call add(…) and the packet pusher. Also, it should help performance at least on x86/64 systems where compareAndSet translates to LOCK CMPXCHG. No idea how it will affect ARM though.

comment:11 Changed 10 months ago by Zlatin Balevsky

The second quick comment from comment 8 still stands.

On a second thought, you want to always set _targetPeer regardless if it's already in the set of active peers.

I've put together a version of your patch that does that and uses AtomicReference here http://paste.crypthost.i2p/?33dc49624b5397a6#FgvvIEGh/LYYzrimIU9Kpv7scNuxmlL6Au04bMnv7iU=

comment:12 Changed 10 months ago by jogger

Yep, looks good to me. As a last change already tested (other cases trigger nothing):

    public void add(PeerState peer, int size) {
        boolean willSend = peer.getSendWindowBytesRemaining() >= size;
        if (willSend)
            _targetPeer.set(peer); // caught in getNextVolley loop
...
        if (willSend) {
            synchronized (_activePeers) {
                _activePeers.notify();
            }
        }
Last edited 10 months ago by jogger (previous) (diff)

comment:13 Changed 10 months ago by jogger

Last comment regarding the final

    _targetPeer.compareAndSet(null,peer);
    _idleWait--;

This is just in case the message to peer is larger than a volley holds. Unless overriden while sending he deserves a second chance.

comment:14 Changed 10 months ago by Zlatin Balevsky

Ok, I think I understand. To be sure, please review this cumulative patch http://paste.crypthost.i2p/?70144192aac939ac#gQroOX6nuJUbf2yEzWM5N4kGU6C9CCJkBrvxij9mc6s=

I'm going to start testing this on my busy router as 0.9.38-6-jogger3 and will post some graphs in a few days. I would ask of you to do the same and pay attention to all variables; I noticed somewhat increased CPU usage with the patch in comment 11.

comment:15 Changed 10 months ago by jogger

Yep, this is aimed at reducing latency and increasing throughput. Alone the previously missed peers should cost you half a round robin pass each by average. CPU is driven by the number of peers configured.

On the other hand it performs very well under heavy load: Many hits during the round robin pass, (finishMessages() and allocateSend() do little under lower load, but frequently). So a short trip through the _activePeers to the next hit and then plenty of CPU to spend in preparing and sending packets.

Or put the other way round: CPU/packet goes down drastically under load. Look at the constrained test scenario above: Only 20% speed loss when running on 3 of the slower cores instead of all 8.

I have a performance mod for Tunnel GW pumper in testing since three hours that already brings more tunnels and peers than ever, good opportunity to see this one perform under higher load.

comment:16 Changed 10 months ago by jogger

Thought again about your scenario, this is the one I did not test.

You must usually be going packet-at-a-time with waiting thereafter, resetting the timer and then building up slowly because the clock portion of the formula does not add much on fast machines.

I suggest we cut the timer value in half after wakeup instead of resetting it.

comment:17 Changed 10 months ago by Zlatin Balevsky

I suggest we cut the timer value in half after wakeup instead of resetting it.

Right now it's not being reset except in the case of InterruptedException which should only happen on router shutdown. If you want to reset it or cut it in half, that needs to be right after the wait() call, but not inside the "catch" statement. Is that how you intended it?

comment:18 Changed 10 months ago by Zlatin Balevsky

Actually it's a bit more complicated - there is no way to know whether you're out of wait(…) because someone notify()-ed you or because the wait timer elapsed (unless you check for how long you've waited but I assume you don't want to be adding more calls to now()).

InterruptedException is thrown only if someone calls interrupt() which is meant to be used for exceptional circumstances like when you want to stop a thread.

comment:19 Changed 10 months ago by jogger

Yep, you are right, was late last night after I put in the final patch. Disregard my comment for now.

Actually, after running this for a couple of days, I am pleased with CPU Usage at all throughput levels. If you watch yours, I would be surprised if you really see a CPU hike larger than a bit attributed to now catching the previously missed ones.

It would been to be done in the old code anyway. If really significant we should think again.

comment:20 Changed 9 months ago by zzz

It sounds like this is still a work in progress…

@OP when you're ready for me to review and test, please provide a current, unreversed, unified diff

comment:21 Changed 9 months ago by jogger

After reviewing: the unified diff in comment 14 does it all, no further mods necessary. zab is testing as I also continue to do so.

On the other hand some of the supporting functions in PeerState?.java are incorrect, at least getSendWindowBytesRemaining() and allocateSendingBytes() as they improperly handle the send window.

Just testing a shorter corrected version of both functions with a very simple sliding window. Looks good now, could be one to make our torrent friends happy, but I currently have no idea how it will affect getNetxtVolley() CPU usage. Hopefully further downward in terms of CPU/packet.

Will provide a diff as requested.

comment:22 Changed 9 months ago by zzz

ok thank you, I will review and test the OMF patch from comment 14, and will await your PeerState? diff.

Changed 9 months ago by Zlatin Balevsky

Attachment: bandwidth-jogger.png added

Bandwidth with this patch

comment:23 Changed 9 months ago by Zlatin Balevsky

Hi,
in the bandwidth graph in the attached screenshot jogger3 is with the patch from comment 14, jogger4 is that patch + zzz's patch for EstablishmentManager. Nice spikes!

comment:24 Changed 9 months ago by jogger

The spikes are inbound?!

Would be nice if you would try my mod for PeerState? too. Should smoothen out the sends.

If tried with comparable output this one gives you good indication of actual wait intervals also.

sudo tcpdump -c 800 -ttt -Q out -n -B 1024 -q -i eth# udp port <port> | grep -v "00:00:00.00"

comment:25 Changed 9 months ago by Zlatin Balevsky

The spikes could be due to anything, but the magnitude shows that the router is performing well under that type of load. If it were to hit some kind of barrier, the shape of the graph would be smoother.

The only possible relevance to the patch in this ticket is that without the forced 10ms wait in PacketPusher smaller RTT values get computed and as result SSU links transfer faster (but I'm really reaching here)

comment:26 Changed 9 months ago by zzz

Comments on the patch in comment 14:

  • The goal in this and similar changes in other tickets should be to _increase_ MAX_WAIT. If our notification/wakeup logic, and calculation of is working correctly, then no failsafe should be necessary, and MAX_WAIT in theory could be infinity. For power consumption on Android, for example, we don't want a bunch of threads waking up 20 times a second.
  • You've abandoned the use of peer.getNextDelay() to calculate how long to sleep, and just changed it to an adaptive counter, idleWait, that gets incremented or decremented based on how busy we are, with a max of 50 ms. I think this is going in the wrong direction. We should be using getNextDelay() and if it's not working right then fix the wakeup logic. The goal is to sleep for exactly the right time, and wakeup early if something new happens.
  • nextSendDelay is now unused but still defined. But from above, I don't think it should be unused.
  • I think there's a race where the new iterator created at line 303 could be empty and then at line 304 _iterator.next() will throw an exception, you're not calling hasNext()? not sure
  • still awaiting the PeerState? diff promised in comment 21

I believe your test results that this patch makes things work better for high-bandwidth routers, but it's at the expense of power and CPU on the low end. I think it needs to be completely rethought and I don't think it's likely to get into 0.9.39

comment:27 Changed 9 months ago by zzz

To make it even more clear:

basic logic for any outbound transport should be:

while (true) {
   send everything that's ready
   sleep(min(time to earliest retransmission timer, large failsafe time)
}

your proposed patch is:

while(true)
    send everything that's ready, starting with the one that woke us up
    sleep(some very small time, adjusted based on previous traffic,
          but independent of retransmission timers)
}

it's the sleep time that's a step in the wrong direction

Last edited 9 months ago by zzz (previous) (diff)

comment:28 Changed 9 months ago by jogger

@zzz
My PeerState? diff is in #2427 for some days now.

I think you are missing some important points:

  • The patch fixes an important bug that 0.9.39 should not ship with. The current code falls into wait() with packets outstanding for transmission. That´s what the _targetPeer logic is for, we discussed above.
  • The patch drives CPU down considerably, not up. So wait is up, not down. I would not claim 50% CPU reduction on slower machines, but it is close to that.
  • results from getNextDelay are currently discarded in 99% of cases, so it just eats lots of CPU. The CPU necessary results in longer iterations through the loop, that means less wait() instead of more. Every time we send something, everything is discarded and also when result ⇐ 10 or when a notify() occurs.
  • getNextDelay() is fundamentally flawed anyway by using relative time. To be useful on slower machines, where time runs away while driving through the loop, it would need to be reworked to use absolute time.
  • the adaptive wait logic times the loop. So slower machines get much more wait with a very simple logic that does not eat CPU.
  • There is always work to do in the loop, finishing messages, ACKing out etc. Within the current code this is postponed until something is ready to send, which will then be delayed by this housekeeping. My wait timer grows slowly on fast CPUs, making sure they are ready for an immediate send, if something is due.
  • the size⇐0 is covered by willSend.
  • The iterator can not be empty: checked size above.

So I would ask to test this by someone else besides zab and me, who both got impressive results, and if tests show, that CPU down/wait up/traffic up can be confirmed, then I see no reason not to ship this.

comment:29 Changed 9 months ago by jogger

Addition: Now that we have covered the missed peer bug, it would not cost much latency to leave Max_WAIT at 1000 and keep the previous minimum of 10ms.

comment:30 Changed 9 months ago by zzz

Add a subticket #2427.

comment:31 Changed 9 months ago by zzz

Can you provide a patch that fixes only the important bug (bullet 1 in comment 28) ? Then we can review and test that for 39, while we discuss the other topics.

On the other topics, if getNextDelay() is flawed then we should fix it, not discard it, according to the principles in my comments 26 and 27. I see the failsafe of 100 ms in there, been there for 6 years, that looks like a big flaw for sure.

comment:32 Changed 9 months ago by jogger

In this discussion we all overlooked a design fault in the original code. getNextDelay() was executed in the hot path with results only useful when nothing to send and discarded in most cases. So the solution is to drive a second loop through getNextDelay() when we are unable to send. I have reworked getNextDelay() to use absolute time and guarded the whole thing against concurrent add() and time running away.

This is a small change to the patch zab has posted and that we have successfully tested for some time. Most of the above patch is needed anyway for fix the missed peer issue. So I have a diff ready that is in sync with zzz´s comments and a combined diff for PeerState?.

--- "PeerState orig.java"	2019-02-06 13:21:26.571744716 +0100
+++ "PeerState patch.java"	2019-02-14 10:00:10.765045369 +0100
@@ -479,7 +479,20 @@
 
     /** how many bytes can we send to the peer in the current second */
     public int getSendWindowBytesRemaining() {
-        synchronized(_outboundMessages) {
+        synchronized(_outboundMessages) { // possibly reentrant
+            long now = _context.clock().now();
+            int duration = (int)(now - _lastSendRefill);
+            if (_sendBytes > 0 && duration > 0 || duration >= 1000) {
+                _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);
+                _sendBps = (99 * _sendBps + _sendBytes * 1000 / duration + 50) / 100;
+             //if (isForACK) {
+            //    _sendACKBytes += size;
+            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
+            //}
+               _sendBytes = 0;
+            //_sendACKBytes = 0;
+                _lastSendRefill = now;
+            }
             return _sendWindowBytesRemaining;
         }
     }
@@ -695,27 +708,14 @@
      *  Caller should synch
      */
     private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
-        long duration = now - _lastSendRefill;
-        if (duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            _sendBytes += size;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            //if (isForACK) {
-            //    _sendACKBytes += size;
-            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
-            //}
-            _sendBytes = 0;
-            //_sendACKBytes = 0;
-            _lastSendRefill = now;
-        }
         //if (true) return true;
-        if (IGNORE_CWIN || size <= _sendWindowBytesRemaining || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
+        if (size <= getSendWindowBytesRemaining() || IGNORE_CWIN || isForACK || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
             if ( (messagePushCount == 0) && (_concurrentMessagesActive > _concurrentMessagesAllowed) ) {
                 _consecutiveRejections++;
                 _context.statManager().addRateData("udp.rejectConcurrentActive", _concurrentMessagesActive, _consecutiveRejections);
                 return false;
-            } else if (messagePushCount == 0) {
+            }
+            if (messagePushCount == 0) {
                 _context.statManager().addRateData("udp.allowConcurrentActive", _concurrentMessagesActive, _concurrentMessagesAllowed);
                 _concurrentMessagesActive++;
                 if (_consecutiveRejections > 0) 
@@ -724,13 +724,12 @@
             }
             _sendWindowBytesRemaining -= size; 
             _sendBytes += size;
-            _lastSendTime = now;
+            _lastSendTime = _context.clock().now();
             //if (isForACK) 
             //    _sendACKBytes += size;
             return true;
-        } else {
-            return false;
         }
+        return false;
     }
     
     /** if we need to contact them, do we need to talk to an introducer? */
@@ -1178,22 +1177,16 @@
                         _sendWindowBytes += bytesACKed; //512; // bytesACKed;
                 //}
             }
-        } else {
-            int allow = _concurrentMessagesAllowed - 1;
-            if (allow < MIN_CONCURRENT_MSGS)
-                allow = MIN_CONCURRENT_MSGS;
-            _concurrentMessagesAllowed = allow;
-        }
+        } else
+            _concurrentMessagesAllowed = Math.max(_concurrentMessagesAllowed - 1, MIN_CONCURRENT_MSGS);
+
         if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
             _sendWindowBytes = MAX_SEND_WINDOW_BYTES;
         _lastReceiveTime = _context.clock().now();
         _lastSendFullyTime = _lastReceiveTime;
         
         //if (true) {
-            if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
-                _sendWindowBytesRemaining += bytesACKed;
-            else
-                _sendWindowBytesRemaining = _sendWindowBytes;
+            _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + bytesACKed, _sendWindowBytes);
         //}
         
         if (numSends < 2) {
@@ -1708,30 +1701,19 @@
      * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
      * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
      *
-     * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
-     *         If ready now, will return 0 or a negative value.
+     * @return first send time (may already be in the past), or Long.MAX_VALUE if we have nothing to send.
      */
-    public int getNextDelay() {
-        int rv = Integer.MAX_VALUE;
+    public long getNextDelay() {
+        Long rv = Long.MAX_VALUE;
         if (_dead) return rv;
-        long now = _context.clock().now();
         synchronized (_outboundMessages) {
-            if (_retransmitter != null) {
-                rv = (int)(_retransmitter.getNextSendTime() - now);
-                return rv;
-            }
-            for (OutboundMessageState state : _outboundMessages) {
-                int delay = (int)(state.getNextSendTime() - now);
-                // short circuit once we hit something ready to go
-                if (delay <= 0)
-                    return delay;
-                if (delay < rv)
-                    rv = delay;
-            }
+            if (_retransmitter != null)
+                return _retransmitter.getNextSendTime();
+            for (OutboundMessageState state : _outboundMessages)
+                rv = Math.min(rv, state.getNextSendTime());
+                // we short circuit in getNextVolley
         }
-        // failsafe... is this OK?
-        if (rv > 100 && !_outboundQueue.isEmpty())
-            rv = 100;
+        // no failsafe, we work on _outboundQueue elsewhere
         return rv;
     }
--- "OutboundMessageFragments orig.java"	2019-02-14 10:26:33.357827848 +0100
+++ "OutboundMessageFragments patch.java"	2019-02-14 10:38:11.286700990 +0100
@@ -8,6 +8,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import net.i2p.data.DataHelper;
 import net.i2p.data.Hash;
@@ -63,12 +64,16 @@
     static final int MAX_VOLLEYS = 10;
     private static final int MAX_WAIT = 1000;
 
+    private final AtomicReference<PeerState> _targetPeer = new AtomicReference<PeerState>(); // holds latest concurrent add or hot send candidate otherwise
+        // this one we immediately handle in getNextVolley loop
+
     public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
         _context = ctx;
         _log = ctx.logManager().getLog(OutboundMessageFragments.class);
         _transport = transport;
         // _throttle = throttle;
         _activePeers = new ConcurrentHashSet<PeerState>(256);
+        _iterator = _activePeers.iterator();
         _builder = new PacketBuilder(ctx, transport);
         _alive = true;
         // _allowExcess = false;
@@ -194,9 +199,7 @@
         for (int i = 0; i < sz; i++) {
             OutboundMessageState state = states.get(i);
             peer.add(state);
-            int fsz = state.fragmentSize(0);
-            if (fsz < min)
-                min = fsz;
+            min = Math.min(state.fragmentSize(0), min);
         }
         add(peer, min);
         //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
@@ -215,6 +218,9 @@
      * @since 0.8.9
      */
     public void add(PeerState peer, int size) {
+        boolean willSend = peer.getSendWindowBytesRemaining() >= size;
+        if (willSend)
+            _targetPeer.set(peer);
         boolean added = _activePeers.add(peer);
         if (added) {
             if (_log.shouldLog(Log.DEBUG))
@@ -229,7 +235,7 @@
         // no, this doesn't always work.
         // Also note that the iterator in getNextVolley may have alreay passed us,
         // or not reflect the addition.
-        if (added || size <= 0 || peer.getSendWindowBytesRemaining() >= size) {
+        if (willSend) {
             synchronized (_activePeers) {
                 _activePeers.notify();
             }
@@ -268,73 +274,71 @@
     public List<UDPPacket> getNextVolley() {
         PeerState peer = null;
         List<OutboundMessageState> states = null;
+
+        while ((states == null) && _alive) {
         // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
         int peersProcessed = 0;
-        while (_alive && (states == null) ) {
-            int nextSendDelay = Integer.MAX_VALUE;
             // no, not every time - O(n**2) - do just before waiting below
             //finishMessages();
 
-                    // do we need a new long-lived iterator?
-                    if (_iterator == null ||
-                        ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
-                        _iterator = _activePeers.iterator();
-                    }
-
                     // Go through all the peers that we are actively sending messages to.
                     // Call finishMessages() for each one, and remove them from the iterator
                     // if there is nothing left to send.
                     // Otherwise, return the volley to be sent.
                     // Otherwise, wait()
+                    long size = _activePeers.size();
                     long now = _context.clock().now();
-                    while (_iterator.hasNext()) {
+                    while (peersProcessed < size) {
+                        // we will catch concurrent adds through _targetPeer
+                        // no need to update size
+                        // may check _targetPeer twice, unlikely
+                        PeerState tmp = _targetPeer.getAndSet(null);
+                        if (tmp != null) {
+                            peer = tmp;
+                            // don´t not call finishMessages for this hot peer
+                        } else {
+                            if (!_iterator.hasNext())
+                                _iterator = _activePeers.iterator();
                         peer = _iterator.next();
-                        int remaining = peer.finishMessages(now);
-                        if (remaining <= 0) {
+                            peersProcessed++;
+                            if (peer.finishMessages(now) <= 0) {
                             // race with add()
                             _iterator.remove();
                             if (_log.shouldLog(Log.DEBUG))
                                 _log.debug("No more pending messages for " + peer.getRemotePeer());
                             continue;
                         }
-                        peersProcessed++;
+                        }
+
                         states = peer.allocateSend();
                         if (states != null) {
                             // we have something to send and we will be returning it
                             break;
-                        } else if (peersProcessed >= _activePeers.size()) {
-                            // we've gone all the way around, time to sleep
-                            break;
-                        } else {
-                            // Update the minimum delay for all peers
-                            // which will be used if we found nothing to send across all peers
-                            int delay = peer.getNextDelay();
-                            if (delay < nextSendDelay)
-                                nextSendDelay = delay;
-                            peer = null;
                         }
                     }
 
-                    //if (peer != null && _log.shouldLog(Log.DEBUG))
+                    //if (states != null && _log.shouldLog(Log.DEBUG))
                     //    _log.debug("Done looping, next peer we are sending for: " +
                     //               peer.getRemotePeer());
 
-                    // if we've gone all the way through the loop, wait
-                    // ... unless nextSendDelay says we have more ready now
-                    if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
-                        peersProcessed = 0;
-                        // why? we do this in the loop one at a time
-                        //finishMessages();
+                    if (states == null) {
+                        Long nextSendTime = Long.MAX_VALUE;
+                        Iterator<PeerState> delayIter = _activePeers.iterator();
+                        now = _context.clock().now();
+                        while (_targetPeer.get() == null && delayIter.hasNext() && nextSendTime > now) {
+                            peer = delayIter.next();
+                            nextSendTime = Math.min(nextSendTime, peer.getNextDelay());
+                        }
+
+                        if (_targetPeer.get() != null || nextSendTime <= (now = _context.clock().now()))
+                            return null;
+
+                        synchronized (_activePeers) {
+                            try {
                         // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
                         // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
                         // gets called regularly
-                        int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("wait for " + toWait);
-                        // wait.. or somethin'
-                        synchronized (_activePeers) {
-                            try {
-                                _activePeers.wait(toWait);
+                                _activePeers.wait(Math.min(Math.max(nextSendTime - now, 10), MAX_WAIT));
                             } catch (InterruptedException ie) {
                                 // noop
                                 if (_log.shouldLog(Log.DEBUG))
@@ -353,6 +356,11 @@
 
         List<UDPPacket> packets = preparePackets(states, peer);
 
+        if (packets != null)
+           // no recent peer.add()? Give current peer a second chance
+           // overwritten by peer.add(). Performance !! this guy may have more
+            _targetPeer.compareAndSet(null,peer);
+
       /****
         if ( (state != null) && (state.getMessage() != null) ) {
             int valid = 0;

comment:33 Changed 9 months ago by Zlatin Balevsky

I don't have a strong opinion on the adaptive vs. computed sleep, and I agree with the use of the AtomicReference.

However, I can't possibly justify the 10ms floor in the sleep. If we're going to compute the wait time based on the retransmission timer, why are we saying it can never be less than 10ms?

comment:34 Changed 9 months ago by jogger

You should discuss the floor with zzz, who wanted to keep the current logic. Otherwise I agree with you.

comment:35 Changed 9 months ago by zzz

Sorry for the delay. I'm reviewing your responses in comment 28, and I'm reviewing and testing the 2nd patch only (OutboundMessageFragments?) from comment 32, for possible inclusion in 0.9.39. The indenting is a little wonky and I'm pulling out some unnecessary changes just so I can see what i'm looking at for the real changes. I'm also rereading your comments at the top of comment 32 to understand what the patch is doing.

I don't think we need to call _targetPeer.get() every time in the while loop through the 2nd iterator. I'm concerned about the 2nd iterator but if there's nothing to send, then we may as well just do it in the getNextDelay() call in the first iterator like we do now.

In comment 32 you say that the result is "discarded in most cases". You have any data on how often something is sent, vs. how often the thread is woken up but does nothing?

I'm going to keep staring at this and try to rework it to my liking, but it's getting late to get it in for 39.

comment:36 Changed 9 months ago by zzz

Status: newaccepted

In comment 35 I thought we could take only the OMF part of the patch in comment 32, and not the PS part, but I realize that the OMF part relies on changing getNextDelay() in PS to return a time instead of a delay. I've reworked the patches to vastly simplify them, remove minor changes that make the diff harder to analyze, and do some cleanups and fixes.

The changes from what's in comment 32 are:

  • Don't change PS.getNextDelay() from delay to time; leave it as a delay
  • Only check _targetPeer twice, once at the beginning and once before sleeping
  • Pass "now" to PS.getNextDelay()
  • Change getNextDelay() failsafe from 100 to 5000
  • Make check if new iterator is empty
  • Break out of inner loop to calculate delay once delay ⇐ 0; break outer loop rather than returning null if we found something
  • Indenting and style fixes
  • Remove unnecessary minor changes that just made the patch bigger

So what's left are the following major changes:

Lightly tested so far, please review and test

#
# old_revision [7ad38b17696cea2dee16e912d982b44adb252ef0]
#
# patch "router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java"
#  from [a54d3ea26437efe1cf0eddbaac566747e811bb04]
#    to [c61bb959b4fd1b3db06e8b21c501deac9ea7ab7f]
# 
# patch "router/java/src/net/i2p/router/transport/udp/PeerState.java"
#  from [abaccaf0d9394344fd53bd784b3163355be3c4e7]
#    to [b08eea31f687b172e405d166f72003f13c3ada3a]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	a54d3ea26437efe1cf0eddbaac566747e811bb04
+++ router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	c61bb959b4fd1b3db06e8b21c501deac9ea7ab7f
@@ -8,6 +8,7 @@ import java.util.Set;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import net.i2p.data.DataHelper;
 import net.i2p.data.Hash;
@@ -63,12 +64,17 @@ class OutboundMessageFragments {
     static final int MAX_VOLLEYS = 10;
     private static final int MAX_WAIT = 1000;
 
+    // holds latest concurrent add or hot send candidate otherwise
+    // this one we immediately handle in getNextVolley loop
+    private final AtomicReference<PeerState> _targetPeer = new AtomicReference<PeerState>();
+
     public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
         _context = ctx;
         _log = ctx.logManager().getLog(OutboundMessageFragments.class);
         _transport = transport;
         // _throttle = throttle;
         _activePeers = new ConcurrentHashSet<PeerState>(256);
+        _iterator = _activePeers.iterator();
         _builder = new PacketBuilder(ctx, transport);
         _alive = true;
         // _allowExcess = false;
@@ -215,6 +221,9 @@ class OutboundMessageFragments {
      * @since 0.8.9
      */
     public void add(PeerState peer, int size) {
+        boolean willSend = peer.getSendWindowBytesRemaining() >= size;
+        if (willSend)
+            _targetPeer.compareAndSet(null, peer);
         boolean added = _activePeers.add(peer);
         if (added) {
             if (_log.shouldLog(Log.DEBUG))
@@ -229,7 +238,7 @@ class OutboundMessageFragments {
         // no, this doesn't always work.
         // Also note that the iterator in getNextVolley may have alreay passed us,
         // or not reflect the addition.
-        if (added || size <= 0 || peer.getSendWindowBytesRemaining() >= size) {
+        if (willSend) {
             synchronized (_activePeers) {
                 _activePeers.notify();
             }
@@ -266,28 +275,30 @@ class OutboundMessageFragments {
      * @return null only on shutdown
      */
     public List<UDPPacket> getNextVolley() {
-        PeerState peer = null;
-        List<OutboundMessageState> states = null;
+        // hot peer via add()
+        PeerState peer = _targetPeer.getAndSet(null);
+        List<OutboundMessageState> states = (peer != null) ? peer.allocateSend() : null;
+
         // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
         int peersProcessed = 0;
         while (_alive && (states == null) ) {
-            int nextSendDelay = Integer.MAX_VALUE;
             // no, not every time - O(n**2) - do just before waiting below
             //finishMessages();
 
-                    // do we need a new long-lived iterator?
-                    if (_iterator == null ||
-                        ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
-                        _iterator = _activePeers.iterator();
-                    }
-
                     // Go through all the peers that we are actively sending messages to.
                     // Call finishMessages() for each one, and remove them from the iterator
                     // if there is nothing left to send.
                     // Otherwise, return the volley to be sent.
                     // Otherwise, wait()
+                    long size = _activePeers.size();
                     long now = _context.clock().now();
-                    while (_iterator.hasNext()) {
+                    while (peersProcessed < size) {
+                        if (!_iterator.hasNext()) {
+                            _iterator = _activePeers.iterator();
+                            if (!_iterator.hasNext())
+                                break;
+                        }
+                        peersProcessed++;
                         peer = _iterator.next();
                         int remaining = peer.finishMessages(now);
                         if (remaining <= 0) {
@@ -297,21 +308,11 @@ class OutboundMessageFragments {
                                 _log.debug("No more pending messages for " + peer.getRemotePeer());
                             continue;
                         }
-                        peersProcessed++;
+
                         states = peer.allocateSend();
                         if (states != null) {
                             // we have something to send and we will be returning it
                             break;
-                        } else if (peersProcessed >= _activePeers.size()) {
-                            // we've gone all the way around, time to sleep
-                            break;
-                        } else {
-                            // Update the minimum delay for all peers
-                            // which will be used if we found nothing to send across all peers
-                            int delay = peer.getNextDelay();
-                            if (delay < nextSendDelay)
-                                nextSendDelay = delay;
-                            peer = null;
                         }
                     }
 
@@ -321,10 +322,33 @@ class OutboundMessageFragments {
 
                     // if we've gone all the way through the loop, wait
                     // ... unless nextSendDelay says we have more ready now
-                    if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
+                    if (states == null) {
                         peersProcessed = 0;
-                        // why? we do this in the loop one at a time
-                        //finishMessages();
+                        int nextSendDelay = Integer.MAX_VALUE;
+                        for (PeerState peer2 : _activePeers) {
+                            int delay = peer2.getNextDelay(now);
+                            if (delay < nextSendDelay) {
+                                nextSendDelay = delay;
+                                if (nextSendDelay <= 0) {
+                                    peer = peer2;
+                                    states = peer.allocateSend();
+                                    break;
+                                }
+                            }
+                        }
+
+                        // peer found in loop above is ready to send
+                        if (states != null)
+                            break;
+
+                        // check the hot peer one more time before sleeping
+                        peer = _targetPeer.getAndSet(null);
+                        if (peer != null) {
+                            states = peer.allocateSend();
+                            if (states != null)
+                                break;
+                        }
+
                         // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
                         // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
                         // gets called regularly
@@ -353,6 +377,12 @@ class OutboundMessageFragments {
 
         List<UDPPacket> packets = preparePackets(states, peer);
 
+        if (packets != null) {
+           // no recent peer.add()? Give current peer a second chance
+           // overwritten by peer.add(). Performance !! this guy may have more
+            _targetPeer.compareAndSet(null, peer);
+        }
+
       /****
         if ( (state != null) && (state.getMessage() != null) ) {
             int valid = 0;
============================================================
--- router/java/src/net/i2p/router/transport/udp/PeerState.java	abaccaf0d9394344fd53bd784b3163355be3c4e7
+++ router/java/src/net/i2p/router/transport/udp/PeerState.java	b08eea31f687b172e405d166f72003f13c3ada3a
@@ -1718,10 +1718,9 @@ public class PeerState {
      * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
      *         If ready now, will return 0 or a negative value.
      */
-    public int getNextDelay() {
+    public int getNextDelay(long now) {
         int rv = Integer.MAX_VALUE;
         if (_dead) return rv;
-        long now = _context.clock().now();
         synchronized (_outboundMessages) {
             if (_retransmitter != null) {
                 rv = (int)(_retransmitter.getNextSendTime() - now);
@@ -1737,8 +1736,8 @@ public class PeerState {
             }
         }
         // failsafe... is this OK?
-        if (rv > 100 && !_outboundQueue.isEmpty())
-            rv = 100;
+        if (rv > 5000 && !_outboundQueue.isEmpty())
+            rv = 5000;
         return rv;
     }
 

comment:37 Changed 9 months ago by zzz

Not sure I'm happy with the 'hot peer' setting at the bottom of OMF, combined with my change at the top to set(null, peer). This makes the 'hot peer' win as long as it has data to send and prevents the normal round-robin behavior. Actually, there's not even a test at the bottom to see if it has more data to send.

The code at the bottom with the "performance!" note is originally from comment 7. Does OP have any comments on the importance of that code, or the tradeoffs of performance vs. round-robin behavior?

comment:38 Changed 9 months ago by Zlatin Balevsky

I did a very quick and simple study of how big the calculated delay is and to verify the following statement:

results from getNextDelay are currently discarded in 99% of cases

If by that you mean results are discarded because they fall outside the 10ms-1000ms window, my results show the opposite. Out of 422029 loops, only 5874 computed a delay less than 10ms. That is 1.3%.

comment:39 Changed 9 months ago by jogger

This discusssion is pointless to some degree as the latest patch includes getNextDelay(). However I am counting it differently, as I clearly refer to individual calls to the function, not to aggregate values fed into wait(). For me a result is discarded, if any of the following is true:

  • a value ⇐ 10 is computed
  • a value > 10 is computed and a packet is sent afterwards
  • a value > 10 is computed, no packet is sent afterwards, the minimum of all is ⇐ 10
  • a value > 10 is computed, no packet is sent afterwards, the minimum of all is > 10 and a notify() occurs before 10 ms elapse.

With an average wait() < 3 ms on my systems even at moderate traffic it is rare for none of the above conditions to strike.

comment:40 Changed 9 months ago by jogger

Regarding comment #37:

  • the "hot peer" does not always win, it will intentionally be overridden at any time by a fresh add().
  • The test whether there is something to send occurs when getNextVolley() is entered again
  • If the "hot peer" is the only one ready to send that statement saves a full round robin.
  • Since it is only one of > 1000 peers the round robin behaviour is only changed to a very minimal degree.
Version 0, edited 9 months ago by jogger (next)

comment:41 Changed 8 months ago by zzz

We ran out of time for .39 but haven't given up on this, maybe we can get something in for .40. Has anybody tested the patch in comment 36? Or is it fatally flawed, or won't have much if any effect? Not sure where we're at right now?

comment:42 Changed 8 months ago by Zlatin Balevsky

I ran for a while with the patch in comment 36, nothing unusual to report in logs and graphs look unchanged. But I stopped running after you said you weren't happy with it in comment 37 in anticipation of a new patch.

comment:43 Changed 8 months ago by jogger

Actually missed the patch in comment 36. Will analyze. Comment 40 still stands. But I can already add that comment 36 again assumes that the time does not advance, delivering inaccurate results in case of GC, context switches or slower CPU.

comment:44 Changed 8 months ago by jogger

Have now gone through comment 36. Will work, but I think my patch offers clear advantages:

  • My patch includes a corrected send window calculation in PeerState?.java which also makes for more evenly distributed outgoing traffic
  • Frequent checking of targetPeer does not defeat round robin, there are enough rounds like results in comment 7 show. However it addresses the fact that UDP packet pusher usually uses most CPU (provided NTCP Pumper has not locked up) and I want to drive that down because it can max out a single core under load and multiple threads are no option. On fast CPUs this means that almost all packets will go out immediately (near FIFO), other platforms will benefit from the CPU savings. The round robin is not FIFO anyway.
  • Use of relative time bears disadvantages, see previous comment.
  • can´t see how the second if (!_iterator.hasNext()) could ever strike, looks like dead code
  • multiple use of allocateSend() is beneficial of course

comment:45 Changed 6 months ago by jogger

I have now gone through detailed measuring and logging of the patch by zzz and mine, counting loops, waits and calls to the functions on the critical path. As a base I used stock 0.9.40 with the #2505 fix.

Stock i2p uses slightly less calls to allocateSend than to finishMessages and in turn slightly less calls to getNextDelay which is clear from reading the code. Comment 36 saves about 35% on getNextDelay. Comment 32 prioritizes allocateSend (2% more calls than finishMessages) over the round robin by immediately serving the hot peer and thus sends more often. Attracts around 20% more traffic reproducibly (router total, possibly #2512 related). Saves less than 25% on getNextDelay. Since there are more round robins than waits which occur at rates of 100-150/s for any version over a wide range of traffic, prioritizing the hot peer causes a possible shift of 1 ms for other traffic on my 32bit hardware.

My handling of getNextDelay is robust against the various reasons of the clock moving forward, esp. under higher load. getNextDelay with relative time is not, checking for a peer ready to send just after all allocateSends failed is a bit pointless.

I thereby stand by my OMF and getNextDelay patches in comment 32 or 11/12 (lighter on CPU) for that matter.

Comment 11/12, 32 and 36 all solve the missed peer bug as described. Detailed logging shows the bug is more serious. Even with my sliding send window allocateSendingBytes returns false quite often because of the send window being used up. This means setting the willsend boolean from an outdated value of _sendWindowBytesRemaining will produce lots of false negatives. These false negatives are also potential missed peers.

Therefore my PeerState? patch above is an integral part of the solution. However it contains a rare and hard to see integer overflow for slow conns only. Will add a new revision soon.

Last edited 6 months ago by jogger (previous) (diff)

comment:46 Changed 3 months ago by jogger

Sensitive: unset

Glad there is some momentum with SSU right now. This one is not going forward. Too complex? May be we could slice it up and roll out one improvement each release? Subtickets per issue?

The following topics could be addressed independently

  • Correct bandwidth window (PeerState? only)
  • #2506 as a follow up for bandwidth window (PeerState? only)
  • Code cleanup / missed peer (PeerState?, OMF)
  • Handling of getNextDelay. Currently I favor a valid simple statistical approach to substitute GND, very light on the CPU. (OMF only)
  • Rework finishMessages(). New, biggest benefit, DOUBLE digit improvement for all GC params, described below. (PeerState?, OMF or something else, UDPTransport)

Minor: rv calc does not need to be synced, just: return _outboundMessages.size() + _outboundQueue.size(), absolute minimal reduction of missed peers.
Minor: Given current constants (state.getPushCount() > OutboundMessageFragments?.MAX_VOLLEYS) can never strike, safely to be removed
Major: Consolidate thousands of succeeded and failed ArrayLists? into 2 final lists at the UDPTransport level and clean them up there. I am currently cleaning them out just before waiting, but packet pusher CPU could be further reduced by doing this async from some other thread.

comment:47 Changed 3 months ago by zzz

yes, there's several reasons why we're stuck:

  • lack of time
  • technical disagreement
  • complexity
  • inability to reproduce your results
  • things that clearly need fixing combined in with nice-to-have or minor cleanups or things that don't make sense to me.
  • need to do the work early in a release cycle so there's sufficient time for testing

The original point of this ticket was to fix OMF.getNextVolley() and we all agree it needs fixing.

The reason there's other activity in SSU right now is that zlatinb has set up a test network and is investigating various bottlenecks and bugs. Perhaps that could help us with this ticket.

If we can avoid changes that affect OMF and PS at the same time, that would be helpful.

I never fully understood your bandwidth measurement changes, and you started #2427, zlatinb had some comments, but then you abandoned that ticket and rolled the changes back into this ticket? Splitting that back over to #2427, addressing zlatinb's concerns, and having me look at it once you two are happy would be a big help.

comment:48 Changed 3 months ago by jogger

OK, I have posted solutions for the first two #2427, #2506.

The other 3 address issues on the critical path, so they are definitely more than "nice to have". I am running UDP packet pusher with 6 unreleased performance patches and it still exceeds 50% CPU on a 2 GHz core.

If we wanted to avoid changing multiple files at the same time, this advice should have been heard by the original coders putting related add() and remove() into different files.

comment:49 Changed 3 months ago by zzz

this is a 17 year old code base, so it's a challenge for everybody, maintainers and patch submitters alike. Our guidance is not 'only change one file at a time' but 'minimize patches to address one isssue at a time', as that will help avoid the issues listed in comment 47 above.

We're two weeks from the checkin deadline for .42 so the more complex changes will have to wait, but we can keep the discussions going.

comment:50 Changed 3 months ago by jogger

Reviewing this discussion and the comments in source, I came to the conclusion that we all ignored basic wisdom and tried to optimize code based on a data structure that does not fit the task.

Changing data structures of course causes extended code changes, but here it is well worth is.

To record concurrent add() of new traffic we only need a set with a time weighted size below 1.

Of all other peers we are only interested in the one, that might want to send next, but currently iterate through the entire set. The appropriate data structure for this task is a heap sorted by nextSendTime. This lowers the effort for identifying the next peer to work on from O(n) to O(log n) and delivers the wait time at no extra cost.

I tried that with a heap implemented on top of an ArrayList? and CPU time for the getNextVolley() loop nearly disappears. Packet Pusher spends most time in preparePackets. getNextDelay() is transformed into some getNextSendTime(), some minor changes elsewhere needed. #2506 is helpful.

In my testbed total CPU for Packet Pusher is down from 30% area to 11% area with no minimum wait().

I would really like to see this fly some time this year, could send heavily commented source for someone willing to test.

comment:51 Changed 2 months ago by zzz

Not sure how that would work. You'd have to sort every time anyway, because the sort is not stable - the sort order will change. That's why we go through the whole thing. I'm also not sure what a 'sorted heap on top of an ArrayList?' looks like… that sounds like an odd data structure also. You may as well post the code you have, that would probably be easier than trying to explain it.

comment:52 Changed 2 months ago by jogger

Yep, the sort order changes all the time, but we know when. Once I saw how darn fast this O(log n) stuff is, I thought of alternative implementations that are easier to understand then lengthy siftup() and siftdown() functions. I cut down 70% code lines and ended up with a totally sorted list, maintained by binary search. The surprisingly short code goes like this:

        private void insert(PeerState peer) {
            // this only works if we only fiddle with peers outside _activePeers
            // be sure to call remove() first
            long nextSendTime = peer.getNextSendTime();
            int left = 0;
            int right = _activePeers.size() - 1;
            int newpos;
            for (newpos = (left + right + 1) / 2; left <= right; newpos = (left + right + 1) / 2) {
                long tmp = _activePeers.get(newpos).getNextSendTime();
                if (nextSendTime >= tmp)
                    left = newpos + 1;
                if (nextSendTime <= tmp)
                    right = newpos - 1;
            }
            _activePeers.add(newpos, peer);
        }

This way _activePeers.get(0) has all we need. What we win:

  • lots of CPU, most from saved calls on finishMessages() and allocateSend()
  • a bit faster sending

what we lose :

  • sending may a bit be delayed in very rare case of races or when allocateSend() returns null with traffic pending because without a round robin we stick to recorded send times.

The adapted getNextVolley() loop looks like this (left out tweaks to PeerState? functions, where I am also caching nextSendTime):

    private final Set<PeerState> _senders;
    private final List<PeerState> _activePeers;
    private PeerState _savedPeer;

...

        _activePeers = new ArrayList<PeerState>(1024);
        _senders = new ConcurrentHashSet<PeerState>(8);
        _iterator = _senders.iterator();
        _savedPeer = null;

...

    public List<UDPPacket> getNextVolley() {
        if (_savedPeer != null) { // we sent to this peer after removing
            insert(_savedPeer);
            _savedPeer = null;
        }
        PeerState peer = null;
        List<OutboundMessageState> states = null;
        long now = _context.clock().now();
        while ((states == null) && _alive) {
            if (!_senders.isEmpty()) {
                // these are fresh concurrent add()s: 99% case
                if (!_iterator.hasNext())
                    _iterator = _senders.iterator();
                peer = _iterator.next();
                int position = _activePeers.indexOf(peer);
                if (position >= 0) // we are already transmitting
                    peer.finishMessages(now); // free up send window
                states = peer.allocateSend();
                if (states != null) // we have something to send and we will be returning it
                    break;
                _iterator.remove(); // safe, in very small chance of race still caught through _activePeers
                // could not send so put into place
                if (position >= 0) // existing peer
                    _activePeers.remove(position);
                insert(peer);
                // We send until all senders fail to send and are migrated into _activePeers if new
                // this is a bit more looping than needed but lets us clear out any burst
                continue;
            }
            // found no sender ready so try the list
            long waittime;
            if (_activePeers.size() > 0) {
                peer = _activePeers.get(0); // peek at next candidate
                long nextSendTime = peer.getNextSendTime();
                if (nextSendTime <= now) { // maybe this guy is ready to send
                    _activePeers.remove(0);
                    if (peer.finishMessages(now) <= 0) {
                        if (_log.shouldLog(Log.DEBUG))
                            _log.debug("No more pending messages for " + peer.getRemotePeer());
                        continue;
                    }
                    states = peer.allocateSend(); // resend or retry after no b/w: 1% case
                    if (states != null) { // we have something to send and we will be returning it
                        _savedPeer = peer; // we have to sort after sending
                        break;
                    }
                    insert(peer); // we could not send, so re-insert
                    nextSendTime = _activePeers.get(0).getNextSendTime();
                }
                // nothing to send, update clock for correct wait()
                now = _context.clock().now();
                waittime = nextSendTime - now;
                if (waittime <= 0 || !_senders.isEmpty())
                    continue; // someone has become ready
            } else
                waittime = MAX_WAIT; // we have zero active peers
            synchronized (_activePeers) {
                try {
                    _activePeers.wait(waittime);
                } catch (InterruptedException ie) {
                    // noop
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug("Woken up while waiting");
                }
            }
        } // while alive && state == null
        if (_log.shouldLog(Log.DEBUG))
            _log.debug("Sending " + DataHelper.toString(states));
        List<UDPPacket> packets = preparePackets(states, peer);
        return packets;
    }

comment:53 Changed 5 weeks ago by jogger

As zzz requested test results, one can look at CPU usage (column 5 in #2619 comment 10). Much lower CPU for packet pusher at 50 % more UDP (seen from the other UDP threads).

The above approach gives a factor 100 to >1000 CPU reduction for getNextVolley(). So I would really like to know how we can get this released.

Use case for current code: Send a 16k torrent chunk through UDP. On a crowded router with activePeers.size() in the 700 area a round robin goes through all message states 2000 times for a total of 16000 iterations because we are limited to 2k at a time.

The above approach would need one call to finishMessages(), one call to allocateSend() and about 10 calls to getNextSendTime() (partially cached) for the insert. That is 12 total (if we raise limits to allow 16k in one shot) and 96 otherwise. For me this is so convincing, we should get this flying.

Note: See TracTickets for help on using tickets.