Opened 4 weeks ago

Last modified 6 days ago

#2412 new enhancement

Unbrake getNextVolley

Reported by: jogger Owned by: zzz
Priority: major Milestone: undecided
Component: router/transport Version: 0.9.38
Keywords: Cc:
Parent Tickets:

Description

No matter how I tried to optimize UDP packet pusher, I saw throughput up, but max CPU down (I have a cron job running that assigns a dedicated CPU core to UDP packet pusher notwithstanding GC of course, whenever i2p is restarted).

So clicking around in good old jconsole and saw getNextVolley frequently waiting. WHAT? > 1MBps UDP outbound and waiting? What for? So turns out that there is a forced 10ms wait at the end of getNextVolley. Might have been OK in the past, but if a senddelay is calulated it should be used. Will look deeper, if it is properly set before.

diff and results tomorrow. There is also some useless code I have cleaned up too.

Subtickets (add)

#2427: Maintaining bandwidth correctly: PeerState.javaenhancementnewzzz

Attachments (1)

bandwidth-jogger.png (263.0 KB) - added by zab 2 weeks ago.
Bandwidth with this patch

Download all attachments as: .zip

Change History (35)

comment:1 Changed 4 weeks ago by zab

I'm going to guess defensive programming against bugs in calculating nextSendDelay Yes this would affect throughput quite a bit at the expense of ... ?

comment:2 Changed 4 weeks ago by jogger

Have been digging deeper. getNextVolley is fundamentally flawed. Really hard to read. Will fall into wait() too often with wrong sendNextDelay and despite packets outstanding.

Currently testing a much shorter cleaned up version. What the "defensive programming" achieved is that they made bandwidth limiters work without knowing. Will look into this. Easiest will be to insert a 1 ms wait() into allocatedSend() if not a single packet can be sent due to limits.

Results to come.

comment:3 Changed 4 weeks ago by jogger

It´s getting easier.

First a quick win. Reduced MAX_WAIT to 50. Now the job queue runner (I have only 1 configured) does start much faster along with the entire router. 1000ms wait between UDP packets was just too slow.

Now let´s completely ditch the CPU intensive getNextDelay() in favour of a self tuning idle timer that decrements when we send something and increments otherwise. This way we save CPU and automatically adapt to any load changes AND automagically integrate the bandwidth logic with timed waiting within getNextVolley(), which would have needed complex coding otherwise.

comment:4 Changed 4 weeks ago by zab

The more intrusive the change you're proposing, the more time it will take to analyze it properly. I can almost immediately support reducing the minimum wait of 10ms, but what you're talking about in comment 3 is another story.

I don't have the equipment to test a CPU-constrained scenario, my xeon is too fast and my ISP router is consumer-grade so it will choke if I try to increase the number of connections. So I'm relying on the numbers you give us, but if you're testing radical rather than incremental ideas then the numbers don't mean as much.

comment:5 Changed 4 weeks ago by jogger

You are right, but I have analyzed deep and we have to do this. It is not radical. New version will be cleaned up and shorter. After testing I will come up with a longer writeup and patch with detailed comments.

This should be upgraded to a defect because of the following:
DEFECT: During the final loop before waiting concurrent calls to peer.add() issue a useless notify(). The loop does not catch the adds in most cases. getNextVolley() goes into wait() with packets pending without a chance of being notified. This is the reason for seeing frequent wait despite higher load (see OP).
Minor: nextSendDelay gets cleared out at some random time within the loop.
Performance: Information from peer.add() gets lost even if this peer is the only one ready to send, leading to useless further looping.

comment:6 Changed 4 weeks ago by jogger

Why the current wait logic must be replaced:

  1. Severe design error: When we fall into wait() with unsent packets due to b/w limits using a wait time based on future unsent packets is plain ridiculous.
  1. Illegal implicit assumptions about traffic shape. Packets arrive from the internet at random points in time. Ideally they travel at near constant speed through the router to packet pusher. If this was the case at 1000 packets/s we would see wait(1) frequently and 2 or 3 occasionally. A peek a jconsole (stock 0.9.38) compared with top -H shows much longer wait cycles. Same goes for my simplified redesign currently in testing. So the packets arrive at packet pusher in bursts with idle time thereafter and the dev designed for that by looking at future packets. The reason (locking, blocking, some congestion elsewhere) does not matter. We need a design that works for any traffic shape and adapts to future code changes elsewhere. These bursts also make the above defect more dangerous than it looks because we might miss an entire burst and then wait() long.

Why we need the CPU currently spent in getNextDelay():

Apart from results being discarded most of the time and some design flaws we need the CPU to faster drive through the iterator. If the packets appear in bursts we need approx. one pass to clear them out and one pass to fall into wait(). If they would appear at a near constant rate we would need (given the current design) an average half pass to clear and one for the wait(). So: burst of 3 packets = 2 iterations, 3 single packets = 4.5 iterations. Since the function has no side effects it can be left out without harm.

My redesign will do away with getNextDelay() and adapt wait time based on send frequency, CPU speed and frequency of fruitless looping.

comment:7 Changed 3 weeks ago by jogger

Put several days into this, have tested many scenarios, would like to see soon. First used tcpdump to confirm outbound traffic is much more bursty than input. Holds also true for NTCP. So these long waits may go away any day when something else within i2p changes.

Description of proposed patch:

the current getNextVolley() loop is simplified and shortened, stripping out the getNextDelay(). Much easier to read, all flow control reduced to single conditions.

We introduce an adaptive wait timer (_idlewait, complete description in source)
The problem of concurrent add() is solved by always updating a hot send candidate in a long lived _targetPeer. This also provides an additional performance boost.

Test results for effective wait times

Mac mini server 200 tunnels on low 40 KBps total traffic: average wait time unchanged 38ms

ARM 32 medium traffic 200 tunnels, 64 I2CP tunnels, 185 packets/s UDP out, 320 KBps average total output, 1.000 UDP peers: Average wait time 14 up from 12 before, 67 waits/sec unchanged, this is reduced CPU.

same but on restricted bandwidth 200 KBps
12 tunnels, 64 I2CP tunnels, 110 packet/s UDP out, 173 kBps total output, 250 UDP peers, 66 waits/sec, 10 sec average wait.

ARM 32 high traffic >2.000 tunnels, 70 I2CP tunnels, 800 packets/s UDP out, 1.65 MBps average total output, 2.000 UDP peers: average wait 3ms, 205 waits/sec

same but running CPU constrained 3 slower cores only
near 2.000 tunnels, 70 I2CP tunnels, 660 packet/s UDP out, 1.3 MBps total output, 1.800 UDP peers, 66 waits/sec, 10 sec average wait.

These values appear totally sane to me, because of the notify() any higher traffic interrupts longer waits, so the current algorithm with its 10ms minimum does plain nothing in such case.

Here´s the patch against 0.9.38 for OutboundMessageFragments?.java:

64c64,68
<     private static final int MAX_WAIT = 1000;
---
>     private static final int MAX_WAIT = 50; // for a fast router start
> 
>     private int _idleWait;
>     private volatile PeerState _targetPeer; // holds latest concurrent add or hot send candidate otherwise
>         // this one we immediately handle in getNextVolley loop
71a76
>         _iterator = _activePeers.iterator();
73a79,80
>         _idleWait = 0;
>         _targetPeer = null;
197,199c204
<             int fsz = state.fragmentSize(0);
<             if (fsz < min)
<                 min = fsz;
---
>             min = Math.min(state.fragmentSize(0), min);
235a241
>             _targetPeer = peer; // caught in getNextVolley loop
271,274c277,280
<         // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
<         int peersProcessed = 0;
<         while (_alive && (states == null) ) {
<             int nextSendDelay = Integer.MAX_VALUE;
---
> 
>         while ((states == null) && _alive) {
>             // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
>             int peersProcessed = 0;
278,283d283
<                     // do we need a new long-lived iterator?
<                     if (_iterator == null ||
<                         ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
<                         _iterator = _activePeers.iterator();
<                     }
< 
290,298c290,310
<                     while (_iterator.hasNext()) {
<                         peer = _iterator.next();
<                         int remaining = peer.finishMessages(now);
<                         if (remaining <= 0) {
<                             // race with add()
<                             _iterator.remove();
<                             if (_log.shouldLog(Log.DEBUG))
<                                 _log.debug("No more pending messages for " + peer.getRemotePeer());
<                             continue;
---
>                     long size = _activePeers.size();
>                     while (peersProcessed < size) {
>                         // we will catch concurrent adds through _targetPeer
>                         // no need to update size
>                         // may check _targetPeer twice, unlikely
>                         if (_targetPeer != null) {
>                             peer = _targetPeer;
>                             // don´t not call finishMessages for this hot peer
>                             _targetPeer = null;
>                         } else {
>                             if (!_iterator.hasNext())
>                                 _iterator = _activePeers.iterator();
>                             peer = _iterator.next();
>                             peersProcessed++;
>                             if (peer.finishMessages(now) <= 0) {
>                                 // race with add()
>                                 _iterator.remove();
>                                 if (_log.shouldLog(Log.DEBUG))
>                                     _log.debug("No more pending messages for " + peer.getRemotePeer());
>                                 continue;
>                             }
300c312
<                         peersProcessed++;
---
> 
305,314d316
<                         } else if (peersProcessed >= _activePeers.size()) {
<                             // we've gone all the way around, time to sleep
<                             break;
<                         } else {
<                             // Update the minimum delay for all peers
<                             // which will be used if we found nothing to send across all peers
<                             int delay = peer.getNextDelay();
<                             if (delay < nextSendDelay)
<                                 nextSendDelay = delay;
<                             peer = null;
318c320
<                     //if (peer != null && _log.shouldLog(Log.DEBUG))
---
>                     //if (states != null && _log.shouldLog(Log.DEBUG))
322,334c324,327
<                     // if we've gone all the way through the loop, wait
<                     // ... unless nextSendDelay says we have more ready now
<                     if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
<                         peersProcessed = 0;
<                         // why? we do this in the loop one at a time
<                         //finishMessages();
<                         // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
<                         // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
<                         // gets called regularly
<                         int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
<                         //if (_log.shouldLog(Log.DEBUG))
<                         //    _log.debug("wait for " + toWait);
<                         // wait.. or somethin'
---
>                     if (states == null) {
>                         // sleep 1 if many consecutive sends before, add up recent loop time otherwise
>                         _idleWait = Math.min(Math.max(1, ++_idleWait + (int)(_context.clock().now() - now)), MAX_WAIT);
>                         // adapts wait time to load and CPU speed
337c330
<                                 _activePeers.wait(toWait);
---
>                                 _activePeers.wait(_idleWait);
339c332,333
<                                 // noop
---
>                                 //Oops, we waited too long
>                                 _idleWait = 0;  // turns negative when we send to the guy that woke us up
341c335
<                                      _log.debug("Woken up while waiting");
---
>                                     _log.debug("Woken up while waiting");
348d341
< 
354a348,353
> 
>         if (packets != null) {
>             if (_targetPeer == null) // no recent peer.add()? Give current peer a second chance
>                 _targetPeer = peer; // overwritten by peer.add(). Performance !! this guy may have more
>             _idleWait--; // we are busy so reduce next wait
>         }

comment:8 Changed 3 weeks ago by zab

Two quick comments:

  • _targetPeer should be a LinkedHashSet, not a single field as multiple hot peers may be added while the pusher thread is iterating through all peers
  • in ::add(PeerState, int) _targetPeer should be written inside the synchronized(_activePeers) block even if it's volatile because the adding thread may get preempted between the notify() call and the writing to the field, which will result in the added hot peer to be missed.

And one not-so-quick comment: In very high load scenarios this approach may cause one or more busy peers to "starve" slower peers. The existing logic errs on the opposite side - i.e. busy peers may be starved by many slower peers.

comment:9 Changed 3 weeks ago by jogger

Already thought of your comments. I just want to make sure not to fall into wait() when there at least is a recent missed add. I do not care whether it is overwritten, I need just one that works. All others will be found by the next round robin pass. Look at the code, with every ready peer getNextVolley() breaks out of the loop for sending and afterwards a new round starts finding all recent add(). Nothing to be missed anymore.

In those high load scenarios this is what I exactly aim at: Find them all during the next pass through the iterator which will also at the same time cover any other not so busy peers that need service. This is the reason not collecting and servicing all recent add() --> that would be a LIFO with the detrimental effect you described.

I am already making progress on the packet burst phenomenon.

comment:10 Changed 3 weeks ago by zab

Ok, makes more sense. The second quick comment from comment 8 still stands. Also, I suggest making _targetPeer an AtomicReference and rewriting the hot path to

PeerState tmp = _targetPeer.getAndSet(null);
if (tmp != null) {
    peer = tmp;
} else {
...

then when you're about to decrement _idleWait

if (packets != null) {
    _targetPeer.compareAndSet(null,peer);
    _idleWait--;
}

(although I think this latter logic unnecessarily gives weight to busy peers)

This way you avoid many possible races between the thread(s) that call add(...) and the packet pusher. Also, it should help performance at least on x86/64 systems where compareAndSet translates to LOCK CMPXCHG. No idea how it will affect ARM though.

comment:11 Changed 3 weeks ago by zab

The second quick comment from comment 8 still stands.

On a second thought, you want to always set _targetPeer regardless if it's already in the set of active peers.

I've put together a version of your patch that does that and uses AtomicReference here http://paste.crypthost.i2p/?33dc49624b5397a6#FgvvIEGh/LYYzrimIU9Kpv7scNuxmlL6Au04bMnv7iU=

comment:12 Changed 3 weeks ago by jogger

Yep, looks good to me. As a last change already tested (other cases trigger nothing):

    public void add(PeerState peer, int size) {
        boolean willSend = peer.getSendWindowBytesRemaining() >= size;
        if (willSend)
            _targetPeer.set(peer); // caught in getNextVolley loop
...
        if (willSend) {
            synchronized (_activePeers) {
                _activePeers.notify();
            }
        }
Last edited 3 weeks ago by jogger (previous) (diff)

comment:13 Changed 3 weeks ago by jogger

Last comment regarding the final

    _targetPeer.compareAndSet(null,peer);
    _idleWait--;

This is just in case the message to peer is larger than a volley holds. Unless overriden while sending he deserves a second chance.

comment:14 Changed 3 weeks ago by zab

Ok, I think I understand. To be sure, please review this cumulative patch http://paste.crypthost.i2p/?70144192aac939ac#gQroOX6nuJUbf2yEzWM5N4kGU6C9CCJkBrvxij9mc6s=

I'm going to start testing this on my busy router as 0.9.38-6-jogger3 and will post some graphs in a few days. I would ask of you to do the same and pay attention to all variables; I noticed somewhat increased CPU usage with the patch in comment 11.

comment:15 Changed 3 weeks ago by jogger

Yep, this is aimed at reducing latency and increasing throughput. Alone the previously missed peers should cost you half a round robin pass each by average. CPU is driven by the number of peers configured.

On the other hand it performs very well under heavy load: Many hits during the round robin pass, (finishMessages() and allocateSend() do little under lower load, but frequently). So a short trip through the _activePeers to the next hit and then plenty of CPU to spend in preparing and sending packets.

Or put the other way round: CPU/packet goes down drastically under load. Look at the constrained test scenario above: Only 20% speed loss when running on 3 of the slower cores instead of all 8.

I have a performance mod for Tunnel GW pumper in testing since three hours that already brings more tunnels and peers than ever, good opportunity to see this one perform under higher load.

comment:16 Changed 3 weeks ago by jogger

Thought again about your scenario, this is the one I did not test.

You must usually be going packet-at-a-time with waiting thereafter, resetting the timer and then building up slowly because the clock portion of the formula does not add much on fast machines.

I suggest we cut the timer value in half after wakeup instead of resetting it.

comment:17 Changed 3 weeks ago by zab

I suggest we cut the timer value in half after wakeup instead of resetting it.

Right now it's not being reset except in the case of InterruptedException which should only happen on router shutdown. If you want to reset it or cut it in half, that needs to be right after the wait() call, but not inside the "catch" statement. Is that how you intended it?

comment:18 Changed 3 weeks ago by zab

Actually it's a bit more complicated - there is no way to know whether you're out of wait(...) because someone notify()-ed you or because the wait timer elapsed (unless you check for how long you've waited but I assume you don't want to be adding more calls to now()).

InterruptedException is thrown only if someone calls interrupt() which is meant to be used for exceptional circumstances like when you want to stop a thread.

comment:19 Changed 3 weeks ago by jogger

Yep, you are right, was late last night after I put in the final patch. Disregard my comment for now.

Actually, after running this for a couple of days, I am pleased with CPU Usage at all throughput levels. If you watch yours, I would be surprised if you really see a CPU hike larger than a bit attributed to now catching the previously missed ones.

It would been to be done in the old code anyway. If really significant we should think again.

comment:20 Changed 3 weeks ago by zzz

It sounds like this is still a work in progress...

@OP when you're ready for me to review and test, please provide a current, unreversed, unified diff

comment:21 Changed 2 weeks ago by jogger

After reviewing: the unified diff in comment 14 does it all, no further mods necessary. zab is testing as I also continue to do so.

On the other hand some of the supporting functions in PeerState?.java are incorrect, at least getSendWindowBytesRemaining() and allocateSendingBytes() as they improperly handle the send window.

Just testing a shorter corrected version of both functions with a very simple sliding window. Looks good now, could be one to make our torrent friends happy, but I currently have no idea how it will affect getNetxtVolley() CPU usage. Hopefully further downward in terms of CPU/packet.

Will provide a diff as requested.

comment:22 Changed 2 weeks ago by zzz

ok thank you, I will review and test the OMF patch from comment 14, and will await your PeerState? diff.

Changed 2 weeks ago by zab

Bandwidth with this patch

comment:23 Changed 2 weeks ago by zab

Hi,
in the bandwidth graph in the attached screenshot jogger3 is with the patch from comment 14, jogger4 is that patch + zzz's patch for EstablishmentManager. Nice spikes!

comment:24 Changed 2 weeks ago by jogger

The spikes are inbound?!

Would be nice if you would try my mod for PeerState? too. Should smoothen out the sends.

If tried with comparable output this one gives you good indication of actual wait intervals also.

sudo tcpdump -c 800 -ttt -Q out -n -B 1024 -q -i eth# udp port <port> | grep -v "00:00:00.00"

comment:25 Changed 2 weeks ago by zab

The spikes could be due to anything, but the magnitude shows that the router is performing well under that type of load. If it were to hit some kind of barrier, the shape of the graph would be smoother.

The only possible relevance to the patch in this ticket is that without the forced 10ms wait in PacketPusher smaller RTT values get computed and as result SSU links transfer faster (but I'm really reaching here)

comment:26 Changed 11 days ago by zzz

Comments on the patch in comment 14:

  • The goal in this and similar changes in other tickets should be to _increase_ MAX_WAIT. If our notification/wakeup logic, and calculation of is working correctly, then no failsafe should be necessary, and MAX_WAIT in theory could be infinity. For power consumption on Android, for example, we don't want a bunch of threads waking up 20 times a second.
  • You've abandoned the use of peer.getNextDelay() to calculate how long to sleep, and just changed it to an adaptive counter, idleWait, that gets incremented or decremented based on how busy we are, with a max of 50 ms. I think this is going in the wrong direction. We should be using getNextDelay() and if it's not working right then fix the wakeup logic. The goal is to sleep for exactly the right time, and wakeup early if something new happens.
  • nextSendDelay is now unused but still defined. But from above, I don't think it should be unused.
  • I think there's a race where the new iterator created at line 303 could be empty and then at line 304 _iterator.next() will throw an exception, you're not calling hasNext()? not sure
  • still awaiting the PeerState? diff promised in comment 21

I believe your test results that this patch makes things work better for high-bandwidth routers, but it's at the expense of power and CPU on the low end. I think it needs to be completely rethought and I don't think it's likely to get into 0.9.39

comment:27 Changed 11 days ago by zzz

To make it even more clear:

basic logic for any outbound transport should be:

while (true) {
   send everything that's ready
   sleep(min(time to earliest retransmission timer, large failsafe time)
}

your proposed patch is:

while(true)
    send everything that's ready, starting with the one that woke us up
    sleep(some very small time, adjusted based on previous traffic,
          but independent of retransmission timers)
}

it's the sleep time that's a step in the wrong direction

Last edited 11 days ago by zzz (previous) (diff)

comment:28 Changed 11 days ago by jogger

@zzz
My PeerState? diff is in #2427 for some days now.

I think you are missing some important points:

  • The patch fixes an important bug that 0.9.39 should not ship with. The current code falls into wait() with packets outstanding for transmission. That´s what the _targetPeer logic is for, we discussed above.
  • The patch drives CPU down considerably, not up. So wait is up, not down. I would not claim 50% CPU reduction on slower machines, but it is close to that.
  • results from getNextDelay are currently discarded in 99% of cases, so it just eats lots of CPU. The CPU necessary results in longer iterations through the loop, that means less wait() instead of more. Every time we send something, everything is discarded and also when result <= 10 or when a notify() occurs.
  • getNextDelay() is fundamentally flawed anyway by using relative time. To be useful on slower machines, where time runs away while driving through the loop, it would need to be reworked to use absolute time.
  • the adaptive wait logic times the loop. So slower machines get much more wait with a very simple logic that does not eat CPU.
  • There is always work to do in the loop, finishing messages, ACKing out etc. Within the current code this is postponed until something is ready to send, which will then be delayed by this housekeeping. My wait timer grows slowly on fast CPUs, making sure they are ready for an immediate send, if something is due.
  • the size<=0 is covered by willSend.
  • The iterator can not be empty: checked size above.

So I would ask to test this by someone else besides zab and me, who both got impressive results, and if tests show, that CPU down/wait up/traffic up can be confirmed, then I see no reason not to ship this.

comment:29 Changed 11 days ago by jogger

Addition: Now that we have covered the missed peer bug, it would not cost much latency to leave Max_WAIT at 1000 and keep the previous minimum of 10ms.

comment:30 Changed 10 days ago by zzz

Add a subticket #2427.

comment:31 Changed 10 days ago by zzz

Can you provide a patch that fixes only the important bug (bullet 1 in comment 28) ? Then we can review and test that for 39, while we discuss the other topics.

On the other topics, if getNextDelay() is flawed then we should fix it, not discard it, according to the principles in my comments 26 and 27. I see the failsafe of 100 ms in there, been there for 6 years, that looks like a big flaw for sure.

comment:32 Changed 9 days ago by jogger

In this discussion we all overlooked a design fault in the original code. getNextDelay() was executed in the hot path with results only useful when nothing to send and discarded in most cases. So the solution is to drive a second loop through getNextDelay() when we are unable to send. I have reworked getNextDelay() to use absolute time and guarded the whole thing against concurrent add() and time running away.

This is a small change to the patch zab has posted and that we have successfully tested for some time. Most of the above patch is needed anyway for fix the missed peer issue. So I have a diff ready that is in sync with zzz´s comments and a combined diff for PeerState?.

--- "PeerState orig.java"	2019-02-06 13:21:26.571744716 +0100
+++ "PeerState patch.java"	2019-02-14 10:00:10.765045369 +0100
@@ -479,7 +479,20 @@
 
     /** how many bytes can we send to the peer in the current second */
     public int getSendWindowBytesRemaining() {
-        synchronized(_outboundMessages) {
+        synchronized(_outboundMessages) { // possibly reentrant
+            long now = _context.clock().now();
+            int duration = (int)(now - _lastSendRefill);
+            if (_sendBytes > 0 && duration > 0 || duration >= 1000) {
+                _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + (_sendWindowBytes * duration + 500) / 1000, _sendWindowBytes);
+                _sendBps = (99 * _sendBps + _sendBytes * 1000 / duration + 50) / 100;
+             //if (isForACK) {
+            //    _sendACKBytes += size;
+            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
+            //}
+               _sendBytes = 0;
+            //_sendACKBytes = 0;
+                _lastSendRefill = now;
+            }
             return _sendWindowBytesRemaining;
         }
     }
@@ -695,27 +708,14 @@
      *  Caller should synch
      */
     private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) { 
-        long now = _context.clock().now();
-        long duration = now - _lastSendRefill;
-        if (duration >= 1000) {
-            _sendWindowBytesRemaining = _sendWindowBytes;
-            _sendBytes += size;
-            _sendBps = (int)(0.9f*_sendBps + 0.1f*(_sendBytes * (1000f/duration)));
-            //if (isForACK) {
-            //    _sendACKBytes += size;
-            //    _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration)));
-            //}
-            _sendBytes = 0;
-            //_sendACKBytes = 0;
-            _lastSendRefill = now;
-        }
         //if (true) return true;
-        if (IGNORE_CWIN || size <= _sendWindowBytesRemaining || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
+        if (size <= getSendWindowBytesRemaining() || IGNORE_CWIN || isForACK || (ALWAYS_ALLOW_FIRST_PUSH && messagePushCount == 0)) {
             if ( (messagePushCount == 0) && (_concurrentMessagesActive > _concurrentMessagesAllowed) ) {
                 _consecutiveRejections++;
                 _context.statManager().addRateData("udp.rejectConcurrentActive", _concurrentMessagesActive, _consecutiveRejections);
                 return false;
-            } else if (messagePushCount == 0) {
+            }
+            if (messagePushCount == 0) {
                 _context.statManager().addRateData("udp.allowConcurrentActive", _concurrentMessagesActive, _concurrentMessagesAllowed);
                 _concurrentMessagesActive++;
                 if (_consecutiveRejections > 0) 
@@ -724,13 +724,12 @@
             }
             _sendWindowBytesRemaining -= size; 
             _sendBytes += size;
-            _lastSendTime = now;
+            _lastSendTime = _context.clock().now();
             //if (isForACK) 
             //    _sendACKBytes += size;
             return true;
-        } else {
-            return false;
         }
+        return false;
     }
     
     /** if we need to contact them, do we need to talk to an introducer? */
@@ -1178,22 +1177,16 @@
                         _sendWindowBytes += bytesACKed; //512; // bytesACKed;
                 //}
             }
-        } else {
-            int allow = _concurrentMessagesAllowed - 1;
-            if (allow < MIN_CONCURRENT_MSGS)
-                allow = MIN_CONCURRENT_MSGS;
-            _concurrentMessagesAllowed = allow;
-        }
+        } else
+            _concurrentMessagesAllowed = Math.max(_concurrentMessagesAllowed - 1, MIN_CONCURRENT_MSGS);
+
         if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
             _sendWindowBytes = MAX_SEND_WINDOW_BYTES;
         _lastReceiveTime = _context.clock().now();
         _lastSendFullyTime = _lastReceiveTime;
         
         //if (true) {
-            if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes)
-                _sendWindowBytesRemaining += bytesACKed;
-            else
-                _sendWindowBytesRemaining = _sendWindowBytes;
+            _sendWindowBytesRemaining = Math.min(_sendWindowBytesRemaining + bytesACKed, _sendWindowBytes);
         //}
         
         if (numSends < 2) {
@@ -1708,30 +1701,19 @@
      * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
      * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
      *
-     * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
-     *         If ready now, will return 0 or a negative value.
+     * @return first send time (may already be in the past), or Long.MAX_VALUE if we have nothing to send.
      */
-    public int getNextDelay() {
-        int rv = Integer.MAX_VALUE;
+    public long getNextDelay() {
+        Long rv = Long.MAX_VALUE;
         if (_dead) return rv;
-        long now = _context.clock().now();
         synchronized (_outboundMessages) {
-            if (_retransmitter != null) {
-                rv = (int)(_retransmitter.getNextSendTime() - now);
-                return rv;
-            }
-            for (OutboundMessageState state : _outboundMessages) {
-                int delay = (int)(state.getNextSendTime() - now);
-                // short circuit once we hit something ready to go
-                if (delay <= 0)
-                    return delay;
-                if (delay < rv)
-                    rv = delay;
-            }
+            if (_retransmitter != null)
+                return _retransmitter.getNextSendTime();
+            for (OutboundMessageState state : _outboundMessages)
+                rv = Math.min(rv, state.getNextSendTime());
+                // we short circuit in getNextVolley
         }
-        // failsafe... is this OK?
-        if (rv > 100 && !_outboundQueue.isEmpty())
-            rv = 100;
+        // no failsafe, we work on _outboundQueue elsewhere
         return rv;
     }
--- "OutboundMessageFragments orig.java"	2019-02-14 10:26:33.357827848 +0100
+++ "OutboundMessageFragments patch.java"	2019-02-14 10:38:11.286700990 +0100
@@ -8,6 +8,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import net.i2p.data.DataHelper;
 import net.i2p.data.Hash;
@@ -63,12 +64,16 @@
     static final int MAX_VOLLEYS = 10;
     private static final int MAX_WAIT = 1000;
 
+    private final AtomicReference<PeerState> _targetPeer = new AtomicReference<PeerState>(); // holds latest concurrent add or hot send candidate otherwise
+        // this one we immediately handle in getNextVolley loop
+
     public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
         _context = ctx;
         _log = ctx.logManager().getLog(OutboundMessageFragments.class);
         _transport = transport;
         // _throttle = throttle;
         _activePeers = new ConcurrentHashSet<PeerState>(256);
+        _iterator = _activePeers.iterator();
         _builder = new PacketBuilder(ctx, transport);
         _alive = true;
         // _allowExcess = false;
@@ -194,9 +199,7 @@
         for (int i = 0; i < sz; i++) {
             OutboundMessageState state = states.get(i);
             peer.add(state);
-            int fsz = state.fragmentSize(0);
-            if (fsz < min)
-                min = fsz;
+            min = Math.min(state.fragmentSize(0), min);
         }
         add(peer, min);
         //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
@@ -215,6 +218,9 @@
      * @since 0.8.9
      */
     public void add(PeerState peer, int size) {
+        boolean willSend = peer.getSendWindowBytesRemaining() >= size;
+        if (willSend)
+            _targetPeer.set(peer);
         boolean added = _activePeers.add(peer);
         if (added) {
             if (_log.shouldLog(Log.DEBUG))
@@ -229,7 +235,7 @@
         // no, this doesn't always work.
         // Also note that the iterator in getNextVolley may have alreay passed us,
         // or not reflect the addition.
-        if (added || size <= 0 || peer.getSendWindowBytesRemaining() >= size) {
+        if (willSend) {
             synchronized (_activePeers) {
                 _activePeers.notify();
             }
@@ -268,73 +274,71 @@
     public List<UDPPacket> getNextVolley() {
         PeerState peer = null;
         List<OutboundMessageState> states = null;
+
+        while ((states == null) && _alive) {
         // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
         int peersProcessed = 0;
-        while (_alive && (states == null) ) {
-            int nextSendDelay = Integer.MAX_VALUE;
             // no, not every time - O(n**2) - do just before waiting below
             //finishMessages();
 
-                    // do we need a new long-lived iterator?
-                    if (_iterator == null ||
-                        ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
-                        _iterator = _activePeers.iterator();
-                    }
-
                     // Go through all the peers that we are actively sending messages to.
                     // Call finishMessages() for each one, and remove them from the iterator
                     // if there is nothing left to send.
                     // Otherwise, return the volley to be sent.
                     // Otherwise, wait()
+                    long size = _activePeers.size();
                     long now = _context.clock().now();
-                    while (_iterator.hasNext()) {
+                    while (peersProcessed < size) {
+                        // we will catch concurrent adds through _targetPeer
+                        // no need to update size
+                        // may check _targetPeer twice, unlikely
+                        PeerState tmp = _targetPeer.getAndSet(null);
+                        if (tmp != null) {
+                            peer = tmp;
+                            // don´t not call finishMessages for this hot peer
+                        } else {
+                            if (!_iterator.hasNext())
+                                _iterator = _activePeers.iterator();
                         peer = _iterator.next();
-                        int remaining = peer.finishMessages(now);
-                        if (remaining <= 0) {
+                            peersProcessed++;
+                            if (peer.finishMessages(now) <= 0) {
                             // race with add()
                             _iterator.remove();
                             if (_log.shouldLog(Log.DEBUG))
                                 _log.debug("No more pending messages for " + peer.getRemotePeer());
                             continue;
                         }
-                        peersProcessed++;
+                        }
+
                         states = peer.allocateSend();
                         if (states != null) {
                             // we have something to send and we will be returning it
                             break;
-                        } else if (peersProcessed >= _activePeers.size()) {
-                            // we've gone all the way around, time to sleep
-                            break;
-                        } else {
-                            // Update the minimum delay for all peers
-                            // which will be used if we found nothing to send across all peers
-                            int delay = peer.getNextDelay();
-                            if (delay < nextSendDelay)
-                                nextSendDelay = delay;
-                            peer = null;
                         }
                     }
 
-                    //if (peer != null && _log.shouldLog(Log.DEBUG))
+                    //if (states != null && _log.shouldLog(Log.DEBUG))
                     //    _log.debug("Done looping, next peer we are sending for: " +
                     //               peer.getRemotePeer());
 
-                    // if we've gone all the way through the loop, wait
-                    // ... unless nextSendDelay says we have more ready now
-                    if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
-                        peersProcessed = 0;
-                        // why? we do this in the loop one at a time
-                        //finishMessages();
+                    if (states == null) {
+                        Long nextSendTime = Long.MAX_VALUE;
+                        Iterator<PeerState> delayIter = _activePeers.iterator();
+                        now = _context.clock().now();
+                        while (_targetPeer.get() == null && delayIter.hasNext() && nextSendTime > now) {
+                            peer = delayIter.next();
+                            nextSendTime = Math.min(nextSendTime, peer.getNextDelay());
+                        }
+
+                        if (_targetPeer.get() != null || nextSendTime <= (now = _context.clock().now()))
+                            return null;
+
+                        synchronized (_activePeers) {
+                            try {
                         // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
                         // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
                         // gets called regularly
-                        int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("wait for " + toWait);
-                        // wait.. or somethin'
-                        synchronized (_activePeers) {
-                            try {
-                                _activePeers.wait(toWait);
+                                _activePeers.wait(Math.min(Math.max(nextSendTime - now, 10), MAX_WAIT));
                             } catch (InterruptedException ie) {
                                 // noop
                                 if (_log.shouldLog(Log.DEBUG))
@@ -353,6 +356,11 @@
 
         List<UDPPacket> packets = preparePackets(states, peer);
 
+        if (packets != null)
+           // no recent peer.add()? Give current peer a second chance
+           // overwritten by peer.add(). Performance !! this guy may have more
+            _targetPeer.compareAndSet(null,peer);
+
       /****
         if ( (state != null) && (state.getMessage() != null) ) {
             int valid = 0;

comment:33 Changed 7 days ago by zab

I don't have a strong opinion on the adaptive vs. computed sleep, and I agree with the use of the AtomicReference.

However, I can't possibly justify the 10ms floor in the sleep. If we're going to compute the wait time based on the retransmission timer, why are we saying it can never be less than 10ms?

comment:34 Changed 6 days ago by jogger

You should discuss the floor with zzz, who wanted to keep the current logic. Otherwise I agree with you.

Note: See TracTickets for help on using tickets.