Opened 4 months ago

Closed 4 months ago

Last modified 3 months ago

#2582 closed defect (fixed)

Higher priority message in SSU causes stall

Reported by: Zlatin Balevsky Owned by: zzz
Priority: minor Milestone: 0.9.42
Component: router/transport Version: 0.9.41
Keywords: ssu Cc: jogger
Parent Tickets: Sensitive: no

Description

Consider the following code:

         // Peek at head of _outboundQueue and see if we can send it.
            // If so, pull it off, put it in _outbundMessages, test
            // again for bandwidth if necessary, and return it.
            OutboundMessageState state;
            while ((state = _outboundQueue.peek()) != null &&
                   ShouldSend.YES == locked_shouldSend(state)) {
                // we could get a different state, or null, when we poll,
                // due to AQM drops, so we test again if necessary
                OutboundMessageState dequeuedState = _outboundQueue.poll();
                if (dequeuedState != null) {
                    _outboundMessages.add(dequeuedState);
                    if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(state)) {
                        if (_log.shouldLog(Log.DEBUG))
                            _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
                        if (rv == null)
                            rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
                        rv.add(state);
                        if (rv.size() >= MAX_ALLOCATE_SEND)
                            return rv;
                    }
                }
            }

And note that locked_shouldSend() updates the nextSendTime to be rto from now. So if a higher priority message is inserted in the queue between the peek() and the poll(), the check for equality will fail and the second check will ALWAYS fail (because the first thing locked_shouldSend checks is the nextSendTime).

As a result, the higher priority message will enter the _outboundMessages but the previous head of the queue will not be sent until one rto from now, causing a stall.

The following logs illustrate the effects:

2019/08/01 19:10:49.853 DEBUG [acket pusher] router.transport.udp.PeerState: Nothing to send to [Hash: 2eCs7jCRx6XP7f0tEho-Sv8-sl~6FrmnxkVmA6HpdNs=], with 0 / 5 remaining

Subtickets

Change History (14)

comment:1 Changed 4 months ago by zzz

Milestone: undecided0.9.42
Status: newaccepted

Above is at PeerState? line 1709

Agree with your analysis.

There's a bug there, the second locked_shouldSend(state) should be locked_shouldSend(dequeuedState). So it will generally return YES but not always. If it's a new message (aka state) and there is bandwidth available, it will send it. But it could happen sometimes that it wouldn't return YES, for example if there was an AQM drop. Doesn't look like we have multiple threads pulling from _outboundQueue so things can't vanish that way, but as you say it's a priority queue, so something else could have appeared at the front, or as the comment says there could be a AQM drop.

Not sure what to do here as locked_shouldSend(state) updates state, as if it was going to be sent. If it's AQM dropped I believe the resources will be freed so we can't just send the result of the peek() anyway.

Probably the best solution is to call locked_shouldSend(dequeuedState) anyway but ignore the result, i.e. always send the dequeuedState, even if it's != state…

something like this? (didn't reindent, so the diff is clear what changed)

--- router/java/src/net/i2p/router/transport/udp/PeerState.java	2e2977ae95ff8c0de896cb4e068bec84a5ebeb83
+++ router/java/src/net/i2p/router/transport/udp/PeerState.java	ae4366b337f9cc1bba3505584326f5349fd1f033
@@ -1717,7 +1717,10 @@ public class PeerState {
                 OutboundMessageState dequeuedState = _outboundQueue.poll();
                 if (dequeuedState != null) {
                     _outboundMessages.add(dequeuedState);
-                    if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(state)) {
+                    if (dequeuedState != state) {
+                        // ignore result, always send
+                        locked_shouldSend(dequeuedstate);
+                    }
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
                         if (rv == null)
@@ -1725,7 +1728,6 @@ public class PeerState {
                         rv.add(state);
                         if (rv.size() >= MAX_ALLOCATE_SEND)
                             return rv;
-                    }
                 }
             }
         }

comment:2 Changed 4 months ago by Zlatin Balevsky

I see a few possible approaches:

A) butcher the priority system and switch to a FIFO queue. I've been running the testnet like this and seems to work alright
B) expose a coarse lock around the queue and lock the entire loop, guaranteeing no higher-priority message can get in
C) your approach in comment 1 with the added tweak that if dequeued != state both the dequeued and the original states get sent, because window capacity has already been allocated for the original state.

comment:3 in reply to:  2 Changed 4 months ago by Zlatin Balevsky

Replying to [comment:2 Zlatin Balev

B) expose a coarse lock around the queue and lock the entire loop, guaranteeing no higher-priority message can get in

This is not going to have adverse effects because java.util.concurrent.PriorityBlockingQueue uses a single lock

C) your approach in comment 1 with the added tweak that if dequeued != state both the dequeued and the original states get sent, because window capacity has already been allocated for the original state.

On a second thought I don't know if this is possible at all. Maybe a combination of draining to a temp list then re-adding all elements?

comment:4 Changed 4 months ago by Zlatin Balevsky

suggested patch for option B, not indented for readability:

--- router/java/src/net/i2p/router/transport/udp/PeerState.java	2e2977ae95ff8c0de896cb4e068bec84a5ebeb83
+++ router/java/src/net/i2p/router/transport/udp/PeerState.java	b47f7e8cb3073c387bac031d4a20e7b89b9a2489
@@ -1482,7 +1482,10 @@ public class PeerState {
             _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
         int rv = 0;
         // will never fail for CDPQ
-        boolean fail = !_outboundQueue.offer(state);
+        boolean fail;
+        synchronized(_outboundQueue) {
+            fail = !_outboundQueue.offer(state);
+        }
 /****
         synchronized (_outboundMessages) {
             rv = _outboundMessages.size() + 1;
@@ -1709,6 +1712,7 @@ public class PeerState {
             // Peek at head of _outboundQueue and see if we can send it.
             // If so, pull it off, put it in _outbundMessages, test
             // again for bandwidth if necessary, and return it.
+            synchronized(_outboundQueue) {
             OutboundMessageState state;
             while ((state = _outboundQueue.peek()) != null &&
                    ShouldSend.YES == locked_shouldSend(state)) {
@@ -1728,6 +1732,7 @@ public class PeerState {
                     }
                 }
             }
+            }
         }
         if ( rv == null && _log.shouldLog(Log.DEBUG))
             _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +

of course the logic inside the sync block can be simplified but this illustrates the point

comment:5 Changed 4 months ago by zzz

Alternative for now is just the one line fix to change state to dequeuedState and see how that works.

comment:6 in reply to:  5 Changed 4 months ago by Zlatin Balevsky

Replying to zzz:

Alternative for now is just the one line fix to change state to dequeuedState and see how that works.

The problem with that fix is that if dequeuedState != state the original head of the queue has it's nextSendTime updated but stays in the queue, so the problem remains.

comment:7 Changed 4 months ago by zzz

Option B in comments 2-4 doesn't completely solve the problem, because CDPBQ.poll() may still return something different than peek(). The AQM dropping happens in poll().

Your comment 6 critique of the comment 5 suggestion is correct, comment 5 is an incremental improvement / bug fix but not a complete solution.

Comment 6 also applies to the patch in comment 1, same problem.

So far we have several proposals but no complete solution.

Since we've already called locked_shouldSend(state), perhaps the most conservative solution is to always return BOTH state and dequeuedState in the rv list, i.e. send them both. We'd ignore the result of shouldSend(dequeuedState) as in comment 1. We'd also may want to check that state wasn't dropped by CDPBQ (OMS.drop()) but not clear how to do that.

comment:8 in reply to:  7 Changed 4 months ago by Zlatin Balevsky

Replying to zzz:

Option B in comments 2-4 doesn't completely solve the problem, because CDPBQ.poll() may still return something different than peek(). The AQM dropping happens in poll().

Atm CDPBQ is not being used, it is commented out.

Since we've already called locked_shouldSend(state), perhaps the most conservative solution is to always return BOTH state and dequeuedState in the rv list, i.e. send them both. We'd ignore the result of shouldSend(dequeuedState) as in comment 1. We'd also may want to check that state wasn't dropped by CDPBQ (OMS.drop()) but not clear how to do that.

I don't know how this is possible to achieve given that the peek() does not remove the original state from the queue, and there is no guarantee that only one higher-priority message will enter the queue, and there is no queue.remove(Element) method.

comment:9 Changed 4 months ago by zzz

whups, didn't see that we're using PBQ, not CDPBQ. Will go back and look at all of the above again.

comment:10 Changed 4 months ago by zzz

OK.
Given that we aren't using CDPBQ, and PBQ has a coarse lock, your fix in comment 4 looks like the best approach. We should also sync around drainTo() at line 1558.
Hah found a second bug with the original code, it should have been rv.add(dequeuedState)

here's where I'm at now:

--- router/java/src/net/i2p/router/transport/udp/PeerState.java	2e2977ae95ff8c0de896cb4e068bec84a5ebeb83
+++ router/java/src/net/i2p/router/transport/udp/PeerState.java	fed34e072e18e0ff3400d475cfd1cedd33a0c70d
@@ -1482,7 +1482,10 @@ public class PeerState {
             _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
         int rv = 0;
         // will never fail for CDPQ
-        boolean fail = !_outboundQueue.offer(state);
+        boolean fail;
+        synchronized (_outboundQueue) {
+            fail = !_outboundQueue.offer(state);
+        }
 /****
         synchronized (_outboundMessages) {
             rv = _outboundMessages.size() + 1;
@@ -1555,7 +1558,9 @@ public class PeerState {
                     _outboundMessages.clear();
             }
             //_outboundQueue.drainAllTo(tempList);
-            _outboundQueue.drainTo(tempList);
+            synchronized (_outboundQueue) {
+                _outboundQueue.drainTo(tempList);
+            }
             for (OutboundMessageState oms : tempList) {
                 _transport.failed(oms, false);
             }
@@ -1710,19 +1715,27 @@ public class PeerState {
             // If so, pull it off, put it in _outbundMessages, test
             // again for bandwidth if necessary, and return it.
             OutboundMessageState state;
-            while ((state = _outboundQueue.peek()) != null &&
-                   ShouldSend.YES == locked_shouldSend(state)) {
-                // we could get a different state, or null, when we poll,
-                // due to AQM drops, so we test again if necessary
-                OutboundMessageState dequeuedState = _outboundQueue.poll();
-                if (dequeuedState != null) {
-                    _outboundMessages.add(dequeuedState);
-                    if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(state)) {
+            synchronized (_outboundQueue) {
+                while ((state = _outboundQueue.peek()) != null &&
+                       ShouldSend.YES == locked_shouldSend(state)) {
+                    // This is guaranted to be the same as what we got in peek(),
+                    // due to locking and because we aren't using the dropping CDPBQ.
+                    // If we do switch to CDPBQ,
+                    // we could get a different state, or null, when we poll,
+                    // due to AQM drops, so we test again if necessary
+                    OutboundMessageState dequeuedState = _outboundQueue.poll();
+                    if (dequeuedState != null) {
+                        _outboundMessages.add(dequeuedState);
+                        // TODO if we switch to CDPBQ, see ticket #2582
+                        //if (dequeuedState != state) {
+                        //    // ignore result, always send?
+                        //    locked_shouldSend(dequeuedState);
+                        //}
                         if (_log.shouldLog(Log.DEBUG))
                             _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
                         if (rv == null)
                             rv = new ArrayList<OutboundMessageState>(MAX_ALLOCATE_SEND);
-                        rv.add(state);
+                        rv.add(dequeuedState);
                         if (rv.size() >= MAX_ALLOCATE_SEND)
                             return rv;
                     }

comment:11 Changed 4 months ago by Zlatin Balevsky

I don't think we need the sync around drainTo as that locks on PBQ's internal lock but otherwise looks good.

comment:12 Changed 4 months ago by zzz

The drainTo() could happen between the peek() and the poll(); the PBQ internal lock doesn't save us there. It isn't strictly necessary if we're keeping the null check after the poll(), but I prefer it for belt/suspenders reasons.

comment:13 Changed 4 months ago by zzz

Resolution: fixed
Status: acceptedclosed

comment:14 Changed 3 months ago by jogger

Just as a note: I stumbled across the same thing. The TODO in the comments for CDPBQ will not work properly because locked_shouldSend() would then allocate bandwidth twice for just one state.

Note: See TracTickets for help on using tickets.