Opened 7 years ago

Closed 7 years ago

#664 closed defect (fixed)

UDPReceiver.MAX_QUEUE_PERIOD check can fail sporadically

Reported by: zab Owned by: zzz
Priority: minor Milestone: 0.9.2
Component: router/transport Version: 0.9
Keywords: SSU Cc:
Parent Tickets:

Description

If the fix to ticket #663 is applied, the check in UDPReceiver.run() that makes sure that the oldest packet in the receive queue is not older than MAX_QUEUE_PERIOD can fail. This is the execution sequence in question:

  1. UDPReceiver thread : UDPPacket head = _inboundQueue.peek() ; packet is still in queue!
  2. UDPReceiver thread : context-switched, loses cpu for some reason
  3. PacketHandler? thread : receiveNext(); now the same packet is removed from queue
  4. PacketHandler? thread : process packet, then release()
  5. UDPReceiver thread : gets cpu back, proceeds to call head.getLifetime() -> that calls verifyNotReleased() but packet is released!

Subtickets

Change History (5)

comment:1 Changed 7 years ago by zab

KillYourTV managed to reproduce this scenario after 28 minutes of uptime. His report is here http://pastethis.i2p/show/1467/

Unfortunately there is no easy way to solve this problem without losing some concurrency and moving away from BlockingQueues? to plain old LinkedLists? (and if the project decides to move to java 1.6, the much more efficient java.util.ArrayDeque? collection)

The suggestion below conflicts with ticket #660 so I'm only offering it as a an example. I'll post a separate suggestion on that ticket that is compatible with the other changes.

Also available with syntax highlighting at http://pastethis.i2p/show/1470/

#
# old_revision [2a403f031d73e2b624fd6cfcdde5e12a91d792f3]
#
# patch "router/java/src/net/i2p/router/transport/udp/UDPReceiver.java"
#  from [26cd34990b12acd2de0b7237b74327b618ade484]
#    to [c285e142c460ffc2bc5d91f39700bf82c3762bef]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/UDPReceiver.java	26cd34990b12acd2de0b7237b74327b618ade484
+++ router/java/src/net/i2p/router/transport/udp/UDPReceiver.java	c285e142c460ffc2bc5d91f39700bf82c3762bef
@@ -2,14 +2,12 @@ import java.net.DatagramSocket;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
 
 import net.i2p.router.RouterContext;
 import net.i2p.router.transport.FIFOBandwidthLimiter;
 import net.i2p.util.I2PThread;
 import net.i2p.util.Log;
-import net.i2p.util.SimpleTimer;
 
 /**
  * Lowest level component to pull raw UDP datagrams off the wire as fast
@@ -24,7 +22,8 @@ class UDPReceiver {
     private final Log _log;
     private DatagramSocket _socket;
     private String _name;
-    private final BlockingQueue<UDPPacket> _inboundQueue;
+    /** LOCKING: itself */
+    private final LinkedList<UDPPacket> _inboundQueue;
     private boolean _keepRunning;
     private final Runner _runner;
     private final UDPTransport _transport;
@@ -37,7 +36,7 @@ class UDPReceiver {
         _log = ctx.logManager().getLog(UDPReceiver.class);
         _id = ++__id;
         _name = name;
-        _inboundQueue = new LinkedBlockingQueue();
+        _inboundQueue = new LinkedList<UDPPacket>();
         _socket = socket;
         _transport = transport;
         _runner = new Runner();
@@ -57,18 +56,20 @@ class UDPReceiver {
     
     public void shutdown() {
         _keepRunning = false;
-        _inboundQueue.clear();
-        for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
-            UDPPacket poison = UDPPacket.acquire(_context, false);
-            poison.setMessageType(TYPE_POISON);
-            _inboundQueue.offer(poison);
+        synchronized(_inboundQueue) {
+        	_inboundQueue.clear();
+        	for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
+        		UDPPacket poison = UDPPacket.acquire(_context, false);
+        		poison.setMessageType(TYPE_POISON);
+        		_inboundQueue.offer(poison);
+        	}
+        	for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
+        		try {
+        			_inboundQueue.wait(i * 50); // wait so release monitor
+        		} catch (InterruptedException ie) {}
+        	}
+        	_inboundQueue.clear();
         }
-        for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
-            try {
-                Thread.sleep(i * 50);
-            } catch (InterruptedException ie) {}
-        }
-        _inboundQueue.clear();
     }
     
 /*********
@@ -158,6 +159,7 @@ class UDPReceiver {
         int queueSize = 0;
         long headPeriod = 0;
 
+        synchronized(_inboundQueue) {
             UDPPacket head = _inboundQueue.peek();
             if (head != null) {
                 headPeriod = head.getLifetime();
@@ -167,15 +169,18 @@ class UDPReceiver {
             }
             if (!rejected) {
                 _inboundQueue.offer(packet);
+                _inboundQueue.notifyAll();
                 //return queueSize + 1;
                 return 0;
             }
-        
+        }
         // rejected
         packet.release();
         _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
         if (_log.shouldLog(Log.WARN)) {
-            queueSize = _inboundQueue.size();
+        	synchronized(_inboundQueue) {
+        		queueSize = _inboundQueue.size();
+        	}
             StringBuilder msg = new StringBuilder();
             msg.append("Dropping inbound packet with ");
             msg.append(queueSize);
@@ -203,12 +208,20 @@ class UDPReceiver {
     public UDPPacket receiveNext() {
         UDPPacket rv = null;
         //int remaining = 0;
-        while (_keepRunning && rv == null) {
-            try {
-                rv = _inboundQueue.take();
-            } catch (InterruptedException ie) {}
-            if (rv != null && rv.getMessageType() == TYPE_POISON)
-                return null;
+        synchronized(_inboundQueue) {
+        	while (_keepRunning && _inboundQueue.isEmpty()) {
+        		try {
+        			_inboundQueue.wait();
+        		} catch (InterruptedException ie) {}
+        	}
+        	if (!_keepRunning)
+        		return null;
+        	rv = _inboundQueue.removeFirst();
+        	if (rv != null) { 
+        		if (rv.getMessageType() == TYPE_POISON)
+        			return null;
+        		return rv;
+        	}
         }
         //_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
         return rv;

comment:2 Changed 7 years ago by zzz

  • Milestone set to 0.9.2

Seems like we could just remove verifyNotReleased() from UDPPacket.getLifetime() and that's the end of it?

comment:3 Changed 7 years ago by zab

Removing the check will work most of the time but still suffers from the problem where a stack-local reference to a packet that has already been processed and released is used to make a decision whether the router is overloaded or not, which in turn affects the decision whether to drop an incoming packet. My personal preference is to fix the underlying problem.

comment:4 Changed 7 years ago by zzz

Disagree. We look to see the current queue delay, then use that to make a decision whether to drop a packet. We have to do one then the other. It doesn't have to be perfect. Sure, that could have been the only packet in the queue and there's nothing behind it, but more likely there are others with similar delay. But even that doesn't matter. Delay estimates / drop strategies are by design simple, fast, and a little sloppy.

There may or may not be an "underlying problem" with locking and this whole verifyNotReleased() ugliness. In fact I'm almost sure there is. But I don't think the discard code cares, it's not the reason or the place to fix it.

comment:5 Changed 7 years ago by zzz

  • Resolution set to fixed
  • Status changed from new to closed

verifyNotReleased() removed from getLifetime() in 0.9.1-2. Does not include any of the other concurrency suggestions above. Closing this ticket as some of the same concurrency issues are discussed in #660.

Note: See TracTickets for help on using tickets.