Opened 9 months ago

Closed 8 months ago

#2260 closed defect (fixed)

OutboundMessageFragments._isWaiting

Reported by: zab Owned by: zzz
Priority: minor Milestone: 0.9.36
Component: router/transport Version: 0.9.34
Keywords: ssu Cc:
Parent Tickets:

Description

This flag should at least be volatile, but I prefer if it weren't there at all. As it stands now, new messages may be delayed up to a second (OutboundMessageFragments.MAX_WAIT).

The intention of the flag seems to have been to avoid sync on the _activePeers collection but that is now concurrent and the only time we sync on it is when we wait() for more packets to come.

Subtickets

Change History (5)

comment:1 Changed 9 months ago by zzz

  • Milestone changed from undecided to 0.9.36

Added volatile in 34050bd5a9bab4208cd5bb0716084e0e1243d263 0.9.34-21-rc
Leaving open for additional fixes in .36 at OP's request.

comment:2 Changed 9 months ago by zzz

OP wants to remove _isWaiting field completely, I said I would research.
This was my code in rev 85b52acae1ec041fa4ba6703378e33b47d148f1e 2011-07-23 and hasn't been touched since. This was part of my original concurrent-izing of SSU. Not clear why I was trying so hard to avoid the sync and notify of _activePeers.

Proposed change below, testing now. This assumes we don't want to do the wakeup unless it's an added connection. Is this right? Also changed notifyAll() to notify(), we know there's only one waiter.

#
# old_revision [566b1eeda884669d179397966db673e71775debb]
#
# patch "router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java"
#  from [c8785d0564c1d7450824de5ce9bba0bfc04a9be9]
#    to [8ba75a05fdba314ad6f0ff970f2f41c0b03a8008]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	c8785d0564c1d7450824de5ce9bba0bfc04a9be9
+++ router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	8ba75a05fdba314ad6f0ff970f2f41c0b03a8008
@@ -51,11 +51,6 @@ class OutboundMessageFragments {
      */
     private Iterator<PeerState> _iterator;
 
-    /**
-     *  Avoid sync in add() if possible (not 100% reliable)
-     */
-    private volatile boolean _isWaiting;
-
     private volatile boolean _alive;
     private final PacketBuilder _builder;
 
@@ -104,7 +99,7 @@ class OutboundMessageFragments {
         _alive = false;
         _activePeers.clear();
         synchronized (_activePeers) {
-            _activePeers.notifyAll();
+            _activePeers.notify();
         }
     }
 
@@ -214,9 +209,11 @@ class OutboundMessageFragments {
      * @since 0.8.9
      */
     public void add(PeerState peer) {
-        boolean wasEmpty = _activePeers.isEmpty();
         boolean added = _activePeers.add(peer);
         if (added) {
+            synchronized (_activePeers) {
+                _activePeers.notify();
+            }
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Add a new message to a new peer " + peer.getRemotePeer());
         } else {
@@ -224,16 +221,6 @@ class OutboundMessageFragments {
                 _log.debug("Add a new message to an existing peer " + peer.getRemotePeer());
         }
         _context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size());
-
-        // Avoid sync if possible
-        // no, this doesn't always work.
-        // Also note that the iterator in getNextVolley may have alreay passed us,
-        // or not reflect the addition.
-        if (_isWaiting || wasEmpty) {
-            synchronized (_activePeers) {
-                _activePeers.notifyAll();
-            }
-        }
     }
 
     /**
@@ -321,7 +308,6 @@ class OutboundMessageFragments {
                     // if we've gone all the way through the loop, wait
                     // ... unless nextSendDelay says we have more ready now
                     if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
-                        _isWaiting = true;
                         peersProcessed = 0;
                         // why? we do this in the loop one at a time
                         //finishMessages();
@@ -341,7 +327,6 @@ class OutboundMessageFragments {
                                      _log.debug("Woken up while waiting");
                             }
                         }
-                        _isWaiting = false;
                     //} else {
                     //    if (_log.shouldLog(Log.DEBUG))
                     //        _log.debug("dont wait: alive=" + _alive + " state = " + state);

comment:3 Changed 9 months ago by zzz

Can't do only if added, because PeerState? could be in _activePeers because it's awaiting acks. We want to notify() if we can send another, but not if the window is full. New proposed patch:

#
# old_revision [95e2b9f35eac4e0ca2160d2f554bb4cdba5c074d]
#
# patch "router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java"
#  from [e242bda220d49b24bcd174f76fdc1ab892ce7435]
#    to [80422565a77256f4f4e688a8a0960d5910f7eeae]
# 
# patch "router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java"
#  from [c8785d0564c1d7450824de5ce9bba0bfc04a9be9]
#    to [e60afa24e6473455f9bb919a8fbacc538c0b2440]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java	e242bda220d49b24bcd174f76fdc1ab892ce7435
+++ router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java	80422565a77256f4f4e688a8a0960d5910f7eeae
@@ -260,7 +260,7 @@ class InboundMessageFragments /*implemen
         // By calling add(), this also is a failsafe against possible
         // races in OutboundMessageFragments.
         if (newAck && from.getOutboundMessageCount() > 0)
-            _outbound.add(from);
+            _outbound.add(from, 1);
 
         return rv;
     }
============================================================
--- router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	c8785d0564c1d7450824de5ce9bba0bfc04a9be9
+++ router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	e60afa24e6473455f9bb919a8fbacc538c0b2440
@@ -51,11 +51,6 @@ class OutboundMessageFragments {
      */
     private Iterator<PeerState> _iterator;
 
-    /**
-     *  Avoid sync in add() if possible (not 100% reliable)
-     */
-    private volatile boolean _isWaiting;
-
     private volatile boolean _alive;
     private final PacketBuilder _builder;
 
@@ -104,7 +99,7 @@ class OutboundMessageFragments {
         _alive = false;
         _activePeers.clear();
         synchronized (_activePeers) {
-            _activePeers.notifyAll();
+            _activePeers.notify();
         }
     }
 
@@ -165,7 +160,7 @@ class OutboundMessageFragments {
             // will throw IAE if peer == null
             OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
             peer.add(state);
-            add(peer);
+            add(peer, state.fragmentSize(0));
         } catch (IllegalArgumentException iae) {
             _transport.failed(msg, "Peer disconnected quickly");
             return;
@@ -182,7 +177,7 @@ class OutboundMessageFragments {
         if (peer == null)
             throw new RuntimeException("null peer for " + state);
         peer.add(state);
-        add(peer);
+        add(peer, state.fragmentSize(0));
         //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     }
 
@@ -195,10 +190,15 @@ class OutboundMessageFragments {
         if (peer == null)
             throw new RuntimeException("null peer");
         int sz = states.size();
+        int min = peer.fragmentSize();
         for (int i = 0; i < sz; i++) {
-            peer.add(states.get(i));
+            OutboundMessageState state = states.get(i);
+            peer.add(state);
+            int fsz = state.fragmentSize(0);
+            if (fsz < min)
+                min = fsz;
         }
-        add(peer);
+        add(peer, min);
         //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     }
 
@@ -211,10 +211,10 @@ class OutboundMessageFragments {
      * There are larger chances of adding the PeerState "behind" where
      * the iterator is now... but these issues are the same as before concurrentification.
      *
+     * @param the minimum size we can send
      * @since 0.8.9
      */
-    public void add(PeerState peer) {
-        boolean wasEmpty = _activePeers.isEmpty();
+    public void add(PeerState peer, int size) {
         boolean added = _activePeers.add(peer);
         if (added) {
             if (_log.shouldLog(Log.DEBUG))
@@ -229,9 +229,9 @@ class OutboundMessageFragments {
         // no, this doesn't always work.
         // Also note that the iterator in getNextVolley may have alreay passed us,
         // or not reflect the addition.
-        if (_isWaiting || wasEmpty) {
+        if (added || peer.getSendWindowBytesRemaining() >= size) {
             synchronized (_activePeers) {
-                _activePeers.notifyAll();
+                _activePeers.notify();
             }
         }
     }
@@ -321,7 +321,6 @@ class OutboundMessageFragments {
                     // if we've gone all the way through the loop, wait
                     // ... unless nextSendDelay says we have more ready now
                     if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
-                        _isWaiting = true;
                         peersProcessed = 0;
                         // why? we do this in the loop one at a time
                         //finishMessages();
@@ -341,7 +340,6 @@ class OutboundMessageFragments {
                                      _log.debug("Woken up while waiting");
                             }
                         }
-                        _isWaiting = false;
                     //} else {
                     //    if (_log.shouldLog(Log.DEBUG))
                     //        _log.debug("dont wait: alive=" + _alive + " state = " + state);

comment:4 Changed 9 months ago by zzz

  • Status changed from new to accepted

version 3, always notify from the InboundMessageFragments? call

#
# old_revision [95e2b9f35eac4e0ca2160d2f554bb4cdba5c074d]
#
# patch "router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java"
#  from [e242bda220d49b24bcd174f76fdc1ab892ce7435]
#    to [987213fb2f0a4464532f521e629323ad3ed03f09]
# 
# patch "router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java"
#  from [c8785d0564c1d7450824de5ce9bba0bfc04a9be9]
#    to [38c39df7beb0ee615138416427a670dca20ed7f5]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java	e242bda220d49b24bcd174f76fdc1ab892ce7435
+++ router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java	987213fb2f0a4464532f521e629323ad3ed03f09
@@ -260,7 +260,7 @@ class InboundMessageFragments /*implemen
         // By calling add(), this also is a failsafe against possible
         // races in OutboundMessageFragments.
         if (newAck && from.getOutboundMessageCount() > 0)
-            _outbound.add(from);
+            _outbound.add(from, 0);
 
         return rv;
     }
============================================================
--- router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	c8785d0564c1d7450824de5ce9bba0bfc04a9be9
+++ router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java	38c39df7beb0ee615138416427a670dca20ed7f5
@@ -51,11 +51,6 @@ class OutboundMessageFragments {
      */
     private Iterator<PeerState> _iterator;
 
-    /**
-     *  Avoid sync in add() if possible (not 100% reliable)
-     */
-    private volatile boolean _isWaiting;
-
     private volatile boolean _alive;
     private final PacketBuilder _builder;
 
@@ -104,7 +99,7 @@ class OutboundMessageFragments {
         _alive = false;
         _activePeers.clear();
         synchronized (_activePeers) {
-            _activePeers.notifyAll();
+            _activePeers.notify();
         }
     }
 
@@ -165,7 +160,7 @@ class OutboundMessageFragments {
             // will throw IAE if peer == null
             OutboundMessageState state = new OutboundMessageState(_context, msg, peer);
             peer.add(state);
-            add(peer);
+            add(peer, state.fragmentSize(0));
         } catch (IllegalArgumentException iae) {
             _transport.failed(msg, "Peer disconnected quickly");
             return;
@@ -182,7 +177,7 @@ class OutboundMessageFragments {
         if (peer == null)
             throw new RuntimeException("null peer for " + state);
         peer.add(state);
-        add(peer);
+        add(peer, state.fragmentSize(0));
         //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     }
 
@@ -195,10 +190,15 @@ class OutboundMessageFragments {
         if (peer == null)
             throw new RuntimeException("null peer");
         int sz = states.size();
+        int min = peer.fragmentSize();
         for (int i = 0; i < sz; i++) {
-            peer.add(states.get(i));
+            OutboundMessageState state = states.get(i);
+            peer.add(state);
+            int fsz = state.fragmentSize(0);
+            if (fsz < min)
+                min = fsz;
         }
-        add(peer);
+        add(peer, min);
         //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     }
 
@@ -211,10 +211,10 @@ class OutboundMessageFragments {
      * There are larger chances of adding the PeerState "behind" where
      * the iterator is now... but these issues are the same as before concurrentification.
      *
+     * @param the minimum size we can send, or 0 to always notify
      * @since 0.8.9
      */
-    public void add(PeerState peer) {
-        boolean wasEmpty = _activePeers.isEmpty();
+    public void add(PeerState peer, int size) {
         boolean added = _activePeers.add(peer);
         if (added) {
             if (_log.shouldLog(Log.DEBUG))
@@ -229,9 +229,9 @@ class OutboundMessageFragments {
         // no, this doesn't always work.
         // Also note that the iterator in getNextVolley may have alreay passed us,
         // or not reflect the addition.
-        if (_isWaiting || wasEmpty) {
+        if (added || size <= 0 || peer.getSendWindowBytesRemaining() >= size) {
             synchronized (_activePeers) {
-                _activePeers.notifyAll();
+                _activePeers.notify();
             }
         }
     }
@@ -321,7 +321,6 @@ class OutboundMessageFragments {
                     // if we've gone all the way through the loop, wait
                     // ... unless nextSendDelay says we have more ready now
                     if (states == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
-                        _isWaiting = true;
                         peersProcessed = 0;
                         // why? we do this in the loop one at a time
                         //finishMessages();
@@ -341,7 +340,6 @@ class OutboundMessageFragments {
                                      _log.debug("Woken up while waiting");
                             }
                         }
-                        _isWaiting = false;
                     //} else {
                     //    if (_log.shouldLog(Log.DEBUG))
                     //        _log.debug("dont wait: alive=" + _alive + " state = " + state);

comment:5 Changed 8 months ago by zzz

  • Resolution set to fixed
  • Status changed from accepted to closed

Above patch in b987a4e95826fc7dc83e50078c5540e833d38136 0.9.35-9

Note: See TracTickets for help on using tickets.