Opened 7 years ago

Closed 6 years ago

Last modified 6 years ago

#660 closed defect (fixed)

The same PacketHandler thread should handle packets from the same host

Reported by: zab Owned by: zzz
Priority: major Milestone: 0.9.2
Component: router/transport Version: 0.9
Keywords: SSU Cc:
Parent Tickets:

Description

background post #8 on http://zzz.i2p/topics/1198

Since multiple PacketHandler? threads are handling incoming packets, if packets from the same host arrive too fast it is possible for for them to be handled out of order by different threads. That in turn causes failed session establishments and probably other problems.

Instead of having all packet handler threads wait on the same packet queue, there should be one queue per packet handler thread and the UDPReceiver thread should select which queue to offer() a packet to based on the source ip of the packet.

Suggested approach:
1 have each PacketHandler? thread poll it's own queue
2 when a packet arrives, UDPReceiver thread takes a hash of the ip:port
3 mods that by the number of packet handlers
4 offers the packet to the appropriate queue.

Subtickets

Change History (9)

comment:1 Changed 7 years ago by zab

One of many possible implementations of the suggestion above.

#
# old_revision [47f04ff21e8edd00134a0fd68219f86fd3caba36]
#
# patch "router/java/src/net/i2p/router/transport/udp/PacketHandler.java"
#  from [d9181b2ecca9c8b6dcc0f9193bd97a72f3876867]
#    to [e876356ac8782f71c17f4035e36b064ecfe78f4c]
# 
# patch "router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java"
#  from [e29656f40cec05c0c99f792c3a75689b164d8c0c]
#    to [f7611726d957c60fa0377779ca4c499e75aed9c3]
# 
# patch "router/java/src/net/i2p/router/transport/udp/UDPReceiver.java"
#  from [26cd34990b12acd2de0b7237b74327b618ade484]
#    to [f550bd5304f7f67be6e6d4fb79ab46e686ad9095]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/PacketHandler.java	d9181b2ecca9c8b6dcc0f9193bd97a72f3876867
+++ router/java/src/net/i2p/router/transport/udp/PacketHandler.java	e876356ac8782f71c17f4035e36b064ecfe78f4c
@@ -1,6 +1,7 @@ import java.util.Date;
 package net.i2p.router.transport.udp;
 
 import java.util.Date;
+import java.util.concurrent.BlockingQueue;
 
 import net.i2p.router.Router;
 import net.i2p.router.RouterContext;
@@ -136,11 +137,19 @@ class PacketHandler {
         
         public void run() {
             _state = 1;
+            BlockingQueue<UDPPacket> myQ = _endpoint.registerProcessor();
             while (_keepReading) {
                 _state = 2;
-                UDPPacket packet = _endpoint.receive();
+                UDPPacket packet;
+                try {
+                	packet = myQ.take();
+                } catch (InterruptedException timeToDie) {
+                	if (_log.shouldLog(Log.INFO))
+                		_log.info("I was told to die",timeToDie);
+                	return;
+                }
                 _state = 3;
-                if (packet == null) break; // keepReading is probably false, or bind failed...
+                if (packet == null || packet.getMessageType() == UDPReceiver.TYPE_POISON) break; // keepReading is probably false, or bind failed...
 
                 packet.received();
                 if (_log.shouldLog(Log.INFO))
============================================================
--- router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java	e29656f40cec05c0c99f792c3a75689b164d8c0c
+++ router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java	f7611726d957c60fa0377779ca4c499e75aed9c3
@@ -3,6 +3,8 @@ import java.net.SocketException;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.SocketException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 
 import net.i2p.router.RouterContext;
 import net.i2p.util.Log;
@@ -60,6 +62,14 @@ class UDPEndpoint {
     }
     
     public void setListenPort(int newPort) { _listenPort = newPort; }
+    
+    /**
+     * Registers a new packet processor with this endpoint 
+     * @return a BlockingQueue to be poll()-ed for new packets.
+     */
+    BlockingQueue<UDPPacket> registerProcessor() {
+    	return _receiver.registerReceiver();
+    }
 
 /*******
     public void updateListenPort(int newPort) {
@@ -145,13 +155,4 @@ class UDPEndpoint {
         return _sender.add(packet); 
     }
     
-    /**
-     * Blocking call to receive the next inbound UDP packet from any peer.
-     * @return null if we have shut down
-     */
-    public UDPPacket receive() { 
-        if (_receiver == null)
-            return null;
-        return _receiver.receiveNext(); 
-    }
 }
============================================================
--- router/java/src/net/i2p/router/transport/udp/UDPReceiver.java	26cd34990b12acd2de0b7237b74327b618ade484
+++ router/java/src/net/i2p/router/transport/udp/UDPReceiver.java	f550bd5304f7f67be6e6d4fb79ab46e686ad9095
@@ -2,7 +2,13 @@ import java.net.DatagramSocket;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import net.i2p.router.RouterContext;
@@ -24,20 +30,22 @@ class UDPReceiver {
     private final Log _log;
     private DatagramSocket _socket;
     private String _name;
-    private final BlockingQueue<UDPPacket> _inboundQueue;
+    private final List<BlockingQueue<UDPPacket>> _inboundQueues;
     private boolean _keepRunning;
     private final Runner _runner;
     private final UDPTransport _transport;
     private static int __id;
     private final int _id;
-    private static final int TYPE_POISON = -99999;
+    static final int TYPE_POISON = -99999;
+    private final Set<Thread> _processorThreads;
     
     public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
         _context = ctx;
         _log = ctx.logManager().getLog(UDPReceiver.class);
         _id = ++__id;
         _name = name;
-        _inboundQueue = new LinkedBlockingQueue();
+        _inboundQueues = new ArrayList<BlockingQueue<UDPPacket>>();
+        _processorThreads = new HashSet<Thread>();
         _socket = socket;
         _transport = transport;
         _runner = new Runner();
@@ -55,22 +63,27 @@ class UDPReceiver {
         t.start();
     }
     
-    public void shutdown() {
-        _keepRunning = false;
-        _inboundQueue.clear();
-        for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
-            UDPPacket poison = UDPPacket.acquire(_context, false);
-            poison.setMessageType(TYPE_POISON);
-            _inboundQueue.offer(poison);
-        }
-        for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
-            try {
-                Thread.sleep(i * 50);
-            } catch (InterruptedException ie) {}
-        }
-        _inboundQueue.clear();
+    public synchronized void shutdown() {
+    	_keepRunning = false;
+    	for (BlockingQueue<UDPPacket> receiverQueue : _inboundQueues) {
+    		UDPPacket poison = UDPPacket.acquire(_context, false);
+    		poison.setMessageType(TYPE_POISON);
+    		receiverQueue.offer(poison);
+    	}
+    	_inboundQueues.clear();
+    	_processorThreads.clear();
     }
     
+    synchronized BlockingQueue<UDPPacket> registerReceiver() {
+    	Thread me = Thread.currentThread();
+    	if (!_processorThreads.add(me))
+    		throw new IllegalStateException("Already registered thread "+me);
+    	BlockingQueue<UDPPacket> rv = new LinkedBlockingQueue<UDPPacket>(); // shouldn't be unbound, fix later --zab
+    	if (!_inboundQueues.add(rv))
+    		throw new IllegalStateException("Not possible");
+    	return rv;
+    }
+    
 /*********
     private void adjustDropProbability() {
         String p = _context.getProperty("i2np.udp.dropProbability");
@@ -157,25 +170,30 @@ class UDPReceiver {
         boolean rejected = false;
         int queueSize = 0;
         long headPeriod = 0;
+        final int sourceHash = from.hashCode(); // speed up that hashcode later --zab
+        BlockingQueue<UDPPacket> q;
+        synchronized(this) {
+        	 q = _inboundQueues.get(sourceHash % _inboundQueues.size());
+        }
 
-            UDPPacket head = _inboundQueue.peek();
-            if (head != null) {
-                headPeriod = head.getLifetime();
-                if (headPeriod > MAX_QUEUE_PERIOD) {
-                    rejected = true;
-                }
-            }
-            if (!rejected) {
-                _inboundQueue.offer(packet);
-                //return queueSize + 1;
-                return 0;
-            }
+	     UDPPacket head = q.peek();
+	     if (head != null) {
+	     	headPeriod = head.getLifetime();
+	     	if (headPeriod > MAX_QUEUE_PERIOD) {
+	     		rejected = true;
+	     	}
+	     }
+	     if (!rejected) {
+	     	q.offer(packet);
+	     	//return queueSize + 1;
+	     	return 0;
+	     }
         
         // rejected
         packet.release();
         _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
         if (_log.shouldLog(Log.WARN)) {
-            queueSize = _inboundQueue.size();
+            queueSize = q.size();
             StringBuilder msg = new StringBuilder();
             msg.append("Dropping inbound packet with ");
             msg.append(queueSize);
@@ -195,25 +213,6 @@ class UDPReceiver {
     }
   ****/
     
-    /**
-     * Blocking call to retrieve the next inbound packet, or null if we have
-     * shut down.
-     *
-     */
-    public UDPPacket receiveNext() {
-        UDPPacket rv = null;
-        //int remaining = 0;
-        while (_keepRunning && rv == null) {
-            try {
-                rv = _inboundQueue.take();
-            } catch (InterruptedException ie) {}
-            if (rv != null && rv.getMessageType() == TYPE_POISON)
-                return null;
-        }
-        //_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
-        return rv;
-    }
-    
     private class Runner implements Runnable {
         private boolean _socketChanged;
         public void run() {

comment:2 Changed 7 years ago by zab

oops, the sync statement inside UDPReceiver.doReceive should look like:

        final int sourceHash = from.hashCode(); // speed up that hashcode later --zab
        final BlockingQueue<UDPPacket> q;
        synchronized(this) {
        	int nQueues = _inboundQueues.size();
        	if (nQueues == 0) 
        		q = null;
        	else 
        		q = _inboundQueues.get(sourceHash % nQueues);
        }
        
        if (q == null) {
        	// either shutdown or nobody registered yet, recycle & drop
        	packet.release();
        	return 0;
        }

comment:3 Changed 7 years ago by zzz

  • Milestone set to 0.9.2
  • Status changed from new to accepted

interesting.

But if we don't handle out-of-order packets in a new session correctly (subsequent packets should be queued until the first packet is processed) then the above is only a partial (albeit probably 99%) fix, as packets could still arrive out-of order due to network routing. Whether that's worth worrying about, I dont know.

comment:4 Changed 7 years ago by zab

At some point down the arms race a DPI operator could deliberately shuffle packets to detect i2p nodes, so eventually this will need a real 100% solution

comment:5 Changed 7 years ago by zab

An epic fail on my side, the hashcode up there should be

final int sourceHash = Math.abs(from.hashCode());

comment:6 Changed 6 years ago by zab

Unfortunately as described in ticket #663 it may not be feasible to use blocking queues. The modified suggestion below switchs to LinkedLists? instead.

Available with pretty colors here http://pastethis.i2p/show/1471/

#
# old_revision [2a403f031d73e2b624fd6cfcdde5e12a91d792f3]
#
# patch "router/java/src/net/i2p/router/transport/udp/PacketHandler.java"
#  from [d9181b2ecca9c8b6dcc0f9193bd97a72f3876867]
#    to [ed810b5931995bf2f1dac7643a1a0d125c6e6521]
# 
# patch "router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java"
#  from [e29656f40cec05c0c99f792c3a75689b164d8c0c]
#    to [d8e32c63a48256711afbe3e075eabc00979c374e]
# 
# patch "router/java/src/net/i2p/router/transport/udp/UDPReceiver.java"
#  from [26cd34990b12acd2de0b7237b74327b618ade484]
#    to [7457aec8ffdc0b6092a7bc258af864021852d490]
#
============================================================
--- router/java/src/net/i2p/router/transport/udp/PacketHandler.java	d9181b2ecca9c8b6dcc0f9193bd97a72f3876867
+++ router/java/src/net/i2p/router/transport/udp/PacketHandler.java	ed810b5931995bf2f1dac7643a1a0d125c6e6521
@@ -1,6 +1,7 @@ import java.util.Date;
 package net.i2p.router.transport.udp;
 
 import java.util.Date;
+import java.util.LinkedList;
 
 import net.i2p.router.Router;
 import net.i2p.router.RouterContext;
@@ -136,11 +137,23 @@ class PacketHandler {
         
         public void run() {
             _state = 1;
+            LinkedList<UDPPacket> myQ = _endpoint.registerProcessor();
             while (_keepReading) {
                 _state = 2;
-                UDPPacket packet = _endpoint.receive();
+                UDPPacket packet;
+                try {
+                	synchronized(myQ) {
+                		while(myQ.isEmpty())
+                			myQ.wait();
+                		packet = myQ.removeFirst();
+                	}
+                } catch (InterruptedException timeToDie) {
+                	if (_log.shouldLog(Log.INFO))
+                		_log.info("I was told to die",timeToDie);
+                	return;
+                }
                 _state = 3;
-                if (packet == null) break; // keepReading is probably false, or bind failed...
+                if (packet == null || packet.getMessageType() == UDPReceiver.TYPE_POISON) break; // keepReading is probably false, or bind failed...
 
                 packet.received();
                 if (_log.shouldLog(Log.INFO))
============================================================
--- router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java	e29656f40cec05c0c99f792c3a75689b164d8c0c
+++ router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java	d8e32c63a48256711afbe3e075eabc00979c374e
@@ -3,6 +3,7 @@ import java.net.SocketException;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.SocketException;
+import java.util.LinkedList;
 
 import net.i2p.router.RouterContext;
 import net.i2p.util.Log;
@@ -60,6 +61,14 @@ class UDPEndpoint {
     }
     
     public void setListenPort(int newPort) { _listenPort = newPort; }
+    
+    /**
+     * Registers a new packet processor with this endpoint 
+     * @return a BlockingQueue to be poll()-ed for new packets.
+     */
+    LinkedList<UDPPacket> registerProcessor() {
+    	return _receiver.registerReceiver();
+    }
 
 /*******
     public void updateListenPort(int newPort) {
@@ -145,13 +154,4 @@ class UDPEndpoint {
         return _sender.add(packet); 
     }
     
-    /**
-     * Blocking call to receive the next inbound UDP packet from any peer.
-     * @return null if we have shut down
-     */
-    public UDPPacket receive() { 
-        if (_receiver == null)
-            return null;
-        return _receiver.receiveNext(); 
-    }
 }
============================================================
--- router/java/src/net/i2p/router/transport/udp/UDPReceiver.java	26cd34990b12acd2de0b7237b74327b618ade484
+++ router/java/src/net/i2p/router/transport/udp/UDPReceiver.java	7457aec8ffdc0b6092a7bc258af864021852d490
@@ -2,14 +2,16 @@ import java.net.DatagramSocket;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Set;
 
 import net.i2p.router.RouterContext;
 import net.i2p.router.transport.FIFOBandwidthLimiter;
 import net.i2p.util.I2PThread;
 import net.i2p.util.Log;
-import net.i2p.util.SimpleTimer;
 
 /**
  * Lowest level component to pull raw UDP datagrams off the wire as fast
@@ -24,20 +26,22 @@ class UDPReceiver {
     private final Log _log;
     private DatagramSocket _socket;
     private String _name;
-    private final BlockingQueue<UDPPacket> _inboundQueue;
+    private final List<LinkedList<UDPPacket>> _inboundQueues;
     private boolean _keepRunning;
     private final Runner _runner;
     private final UDPTransport _transport;
     private static int __id;
     private final int _id;
-    private static final int TYPE_POISON = -99999;
+    static final int TYPE_POISON = -99999;
+    private final Set<Thread> _processorThreads;
     
     public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
         _context = ctx;
         _log = ctx.logManager().getLog(UDPReceiver.class);
         _id = ++__id;
         _name = name;
-        _inboundQueue = new LinkedBlockingQueue();
+        _inboundQueues = new ArrayList<LinkedList<UDPPacket>>();
+        _processorThreads = new HashSet<Thread>();
         _socket = socket;
         _transport = transport;
         _runner = new Runner();
@@ -55,22 +59,30 @@ class UDPReceiver {
         t.start();
     }
     
-    public void shutdown() {
-        _keepRunning = false;
-        _inboundQueue.clear();
-        for (int i = 0; i < _transport.getPacketHandlerCount(); i++) {
-            UDPPacket poison = UDPPacket.acquire(_context, false);
-            poison.setMessageType(TYPE_POISON);
-            _inboundQueue.offer(poison);
-        }
-        for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
-            try {
-                Thread.sleep(i * 50);
-            } catch (InterruptedException ie) {}
-        }
-        _inboundQueue.clear();
+    public synchronized void shutdown() {
+    	_keepRunning = false;
+    	for (LinkedList<UDPPacket> receiverQueue : _inboundQueues) {
+    		UDPPacket poison = UDPPacket.acquire(_context, false);
+    		poison.setMessageType(TYPE_POISON);
+    		synchronized(receiverQueue) {
+    			receiverQueue.offer(poison);
+    			receiverQueue.notifyAll();
+    		}
+    	}
+    	_inboundQueues.clear();
+    	_processorThreads.clear();
     }
     
+    synchronized LinkedList<UDPPacket> registerReceiver() {
+    	Thread me = Thread.currentThread();
+    	if (!_processorThreads.add(me))
+    		throw new IllegalStateException("Already registered thread "+me);
+    	LinkedList<UDPPacket> rv = new LinkedList<UDPPacket>(); // shouldn't be unbound, fix later --zab
+    	if (!_inboundQueues.add(rv))
+    		throw new IllegalStateException("Not possible");
+    	return rv;
+    }
+    
 /*********
     private void adjustDropProbability() {
         String p = _context.getProperty("i2np.udp.dropProbability");
@@ -157,25 +169,42 @@ class UDPReceiver {
         boolean rejected = false;
         int queueSize = 0;
         long headPeriod = 0;
+        final int sourceHash = Math.abs(from.hashCode()); // speed up that hashcode later --zab
+        final LinkedList<UDPPacket> q;
+        synchronized(this) {
+        	int size = _inboundQueues.size();
+        	if (size == 0)
+        		q = null;
+        	else
+        		q = _inboundQueues.get(sourceHash % _inboundQueues.size());
+        }
+        
+        if (q == null) {
+        	packet.release();
+        	return 0;
+        }
 
-            UDPPacket head = _inboundQueue.peek();
-            if (head != null) {
-                headPeriod = head.getLifetime();
-                if (headPeriod > MAX_QUEUE_PERIOD) {
-                    rejected = true;
-                }
-            }
-            if (!rejected) {
-                _inboundQueue.offer(packet);
-                //return queueSize + 1;
-                return 0;
-            }
+        synchronized(q) {
+	     UDPPacket head = q.peek();
+	     if (head != null) {
+	     	headPeriod = head.getLifetime();
+	     	if (headPeriod > MAX_QUEUE_PERIOD) {
+	     		rejected = true;
+	     	}
+	     }
+	     if (!rejected) {
+	     	q.offer(packet);
+	     	q.notifyAll();
+	     	//return queueSize + 1;
+	     	return 0;
+	     }
+        }
         
         // rejected
         packet.release();
         _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
         if (_log.shouldLog(Log.WARN)) {
-            queueSize = _inboundQueue.size();
+            synchronized(q) { queueSize = q.size(); }
             StringBuilder msg = new StringBuilder();
             msg.append("Dropping inbound packet with ");
             msg.append(queueSize);
@@ -195,25 +224,6 @@ class UDPReceiver {
     }
   ****/
     
-    /**
-     * Blocking call to retrieve the next inbound packet, or null if we have
-     * shut down.
-     *
-     */
-    public UDPPacket receiveNext() {
-        UDPPacket rv = null;
-        //int remaining = 0;
-        while (_keepRunning && rv == null) {
-            try {
-                rv = _inboundQueue.take();
-            } catch (InterruptedException ie) {}
-            if (rv != null && rv.getMessageType() == TYPE_POISON)
-                return null;
-        }
-        //_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
-        return rv;
-    }
-    
     private class Runner implements Runnable {
         private boolean _socketChanged;
         public void run() {

comment:7 Changed 6 years ago by zzz

ref: http://www.i2p2.i2p/udp http://www.i2p2.i2p/udp_spec

Don't understand this fully. The standard session establishment is a 1-2-3 handshake and thus can't get out of order. The logs in the zzz.i2p post #8, the first one is a 64-byte packet which would be possibly a RelayResponse?? But it's followed by a Data packet? Not sure exactly how this could happen. Unless the two are unrelated. Or maybe it's a clue for the bigger problem.

The SessionConfirmed? pkt could I suppose get out-of-order with the first Data packet.

Not really happy with the proposed solutions but it does seem like we have a problem. I don't think it's in session establishment though as you posit in the OP. Session establishment seems solid to me.

comment:8 Changed 6 years ago by zzz

  • Resolution set to fixed
  • Status changed from accepted to closed

PacketHandler? thread count fixed at 1 in 0.9.1-3. Based on an initial scan of the code, it shouldn't block, at least for very long. If it doesn't work well due to blocking somewhere, we'll have to reopen this ticket.

As I said above in my first comment the network could still reorder. That's a problem for another day.

comment:9 Changed 6 years ago by zzz

just a note - another example of affinity between connections and threads is in ntcp Reader.java. Not necessarily a good example. This has a lot of locking and was never rewritten for concurrent.

Note: See TracTickets for help on using tickets.