Changeset 7fa874f


Ignore:
Timestamp:
Jul 15, 2011 8:52:18 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
7ba6f5a
Parents:
55bfd6a
Message:
  • Tracking, expiration, closing of DCC tunnels
  • I2PTunnelRunner cleanups
Location:
apps/i2ptunnel/java/src/net/i2p/i2ptunnel
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java

    r55bfd6a r7fa874f  
    6060    private boolean listenerReady = false;
    6161
    62     private ServerSocket ss;
     62    protected ServerSocket ss;
    6363
    6464    private final Object startLock = new Object();
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java

    r55bfd6a r7fa874f  
    145145                _DCCServer = null;
    146146            }
     147            if (_DCCClientManager != null) {
     148                _DCCClientManager.close(forced);
     149                _DCCClientManager = null;
     150            }
    147151        }
    148152        return super.close(forced);
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java

    r55bfd6a r7fa874f  
    2121
    2222public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener {
    23     private final static Log _log = new Log(I2PTunnelRunner.class);
     23    private final Log _log = new Log(I2PTunnelRunner.class);
    2424
    2525    private static volatile long __runnerId;
    26     private long _runnerId;
     26    private final long _runnerId;
    2727    /**
    2828     * max bytes streamed in a packet - smaller ones might be filled
     
    3535    static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE;
    3636
    37     private Socket s;
    38     private I2PSocket i2ps;
    39     final Object slock, finishLock = new Object();
     37    private final Socket s;
     38    private final I2PSocket i2ps;
     39    private final Object slock, finishLock = new Object();
    4040    boolean finished = false;
    41     HashMap ostreams, sockets;
    42     byte[] initialI2PData;
    43     byte[] initialSocketData;
     41    private HashMap ostreams, sockets;
     42    private final byte[] initialI2PData;
     43    private final byte[] initialSocketData;
    4444    /** when the last data was sent/received (or -1 if never) */
    4545    private long lastActivityOn;
    4646    /** when the runner started up */
    47     private long startedOn;
    48     private List sockList;
     47    private final long startedOn;
     48    private final List sockList;
    4949    /** if we die before receiving any data, run this job */
    50     private Runnable onTimeout;
     50    private final Runnable onTimeout;
    5151    private long totalSent;
    5252    private long totalReceived;
     
    5757        this(s, i2ps, slock, initialI2PData, null, sockList, null);
    5858    }
     59
    5960    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList) {
    6061        this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null);
    6162    }
     63
    6264    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) {
    6365        this(s, i2ps, slock, initialI2PData, null, sockList, onTimeout);
    6466    }
     67
     68    /**
     69     *  Starts itself
     70     *
     71     *  @param initialI2PData may be null
     72     *  @param initialSocketData may be null
     73     *  @param sockList may be null
     74     *  @param onTImeout may be null
     75     */
    6576    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList, Runnable onTimeout) {
    6677        this.sockList = sockList;
     
    238249    private class StreamForwarder extends I2PAppThread {
    239250
    240         InputStream in;
    241         OutputStream out;
    242         String direction;
    243         private boolean _toI2P;
    244         private ByteCache _cache;
     251        private final InputStream in;
     252        private final OutputStream out;
     253        private final String direction;
     254        private final boolean _toI2P;
     255        private final ByteCache _cache;
    245256
    246257        private StreamForwarder(InputStream in, OutputStream out, boolean toI2P) {
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java

    r55bfd6a r7fa874f  
    2525 * @since 0.8.9
    2626 */
    27 public class DCCClientManager {
    28 
     27public class DCCClientManager extends EventReceiver {
    2928    private final I2PSocketManager sockMgr;
    3029    private final EventDispatcher _dispatch;
     
    3332    private final Log _log;
    3433
    35     private final ConcurrentHashMap<Integer, I2PAddress> _incoming;
     34    private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _incoming;
     35    private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _active;
     36
    3637    // list of client tunnels?
    3738    private static long _id;
     
    3940    private static final int MAX_INCOMING_PENDING = 10;
    4041    private static final int MAX_INCOMING_ACTIVE = 10;
    41     private static final long INBOUND_EXPIRE = 30*60*1000;
     42    private static final long ACTIVE_EXPIRE = 60*60*1000;
    4243
    4344    public DCCClientManager(I2PSocketManager sktMgr, Logging logging,
     
    4950        _log = tunnel.getContext().logManager().getLog(DCCClientManager.class);
    5051        _incoming = new ConcurrentHashMap(8);
     52        _active = new ConcurrentHashMap(8);
     53    }
     54
     55    public boolean close(boolean forced) {
     56        for (I2PTunnelDCCClient c : _incoming.values()) {
     57            c.stop();
     58        }
     59        _incoming.clear();
     60        for (I2PTunnelDCCClient c : _active.values()) {
     61            c.stop();
     62        }
     63        _active.clear();
     64        return true;
    5165    }
    5266
     
    6175    public int newIncoming(String b32, int port, String type) {
    6276        expireInbound();
    63         if (_incoming.size() >= MAX_INCOMING_PENDING) {
    64             _log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING);
     77        if (_incoming.size() >= MAX_INCOMING_PENDING ||
     78            _active.size() >= MAX_INCOMING_PENDING) {
     79            _log.error("Too many incoming DCC, max is " + MAX_INCOMING_PENDING +
     80                       '/' + MAX_INCOMING_ACTIVE + " pending/active");
    6581            return -1;
    6682        }
    67         I2PAddress client = new I2PAddress(b32, port, _tunnel.getContext().clock().now() + INBOUND_EXPIRE);
    6883        try {
    6984            // Transparent tunnel used for all types...
     
    7186            I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr,
    7287                                                                _dispatch, _tunnel, ++_id);
     88            cTunnel.attachEventDispatcher(this);
    7389            int lport = cTunnel.getLocalPort();
    7490            if (_log.shouldLog(Log.WARN))
    7591                _log.warn("Opened client tunnel at port " + lport +
    7692                          " pointing to " + b32 + ':' + port);
    77             _incoming.put(Integer.valueOf(lport), client);
     93            _incoming.put(Integer.valueOf(lport), cTunnel);
    7894            return lport;
    7995        } catch (IllegalArgumentException uhe) {
     
    84100    }
    85101
    86     private void expireInbound() {
    87         for (Iterator<I2PAddress> iter = _incoming.values().iterator(); iter.hasNext(); ) {
    88             I2PAddress a = iter.next();
    89             if (a.expire < _tunnel.getContext().clock().now())
    90                 iter.remove();
     102    /**
     103     *  The EventReceiver callback
     104     */
     105    public void notifyEvent(String eventName, Object args) {
     106        if (eventName.equals(I2PTunnelDCCClient.CONNECT_START_EVENT)) {
     107            try {
     108                I2PTunnelDCCClient client = (I2PTunnelDCCClient) args;
     109                connStarted(client);
     110            } catch (ClassCastException cce) {}
     111        } else if (eventName.equals(I2PTunnelDCCClient.CONNECT_STOP_EVENT)) {
     112            try {
     113                Integer port = (Integer) args;
     114                connStopped(port);
     115            } catch (ClassCastException cce) {}
    91116        }
    92117    }
    93118
    94     private static class I2PAddress {
    95         public final String dest;
    96         public final int port;
    97         public final long expire;
    98 
    99         public I2PAddress(String b32, int p, long exp) {
    100             dest = b32;
    101             port = p;
    102             expire = exp;
     119    private void connStarted(I2PTunnelDCCClient client) {
     120        Integer lport = Integer.valueOf(client.getLocalPort());
     121        I2PTunnelDCCClient c = _incoming.remove(lport);
     122        if (c != null) {
     123            _active.put(lport, client);
     124            if (_log.shouldLog(Log.WARN))
     125                _log.warn("Added client tunnel for port " + lport +
     126                          " pending count now: " + _incoming.size() +
     127                          " active count now: " + _active.size());
    103128        }
    104129    }
     130
     131    private void connStopped(Integer lport) {
     132        _incoming.remove(lport);
     133        _active.remove(lport);
     134        if (_log.shouldLog(Log.WARN))
     135            _log.warn("Removed client tunnel for port " + lport +
     136                      " pending count now: " + _incoming.size() +
     137                      " active count now: " + _active.size());
     138    }
     139
     140    private void expireInbound() {
     141        for (Iterator<I2PTunnelDCCClient> iter = _incoming.values().iterator(); iter.hasNext(); ) {
     142            I2PTunnelDCCClient c = iter.next();
     143            if (c.getExpires() < _tunnel.getContext().clock().now()) {
     144                iter.remove();
     145                c.stop();
     146            }
     147        }
     148        // shouldn't need to expire active
     149    }
    105150}
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java

    r55bfd6a r7fa874f  
    3030    private final String _dest;
    3131    private final int _remotePort;
     32    private final long _expires;
     33
     34    private static final long INBOUND_EXPIRE = 30*60*1000;
     35    public static final String CONNECT_START_EVENT = "connectionStarted";
     36    public static final String CONNECT_STOP_EVENT = "connectionStopped";
    3237
    3338    /**
     
    4247        _dest = dest;
    4348        _remotePort = remotePort;
     49        _expires = tunnel.getContext().clock().now() + INBOUND_EXPIRE;
    4450
    4551        setName("DCC send -> " + dest + ':' + remotePort);
    4652
    4753        startRunning();
    48 
    49         notifyEvent("openClientResult", "ok");
    5054    }
    5155
     56    /**
     57     *  Accept one connection only.
     58     */
    5259    protected void clientConnectionRun(Socket s) {
    5360        I2PSocket i2ps = null;
     
    5865            _log.error("Could not find leaseset for DCC connection to " + _dest + ':' + _remotePort);
    5966            closeSocket(s);
    60             // shutdown?
     67            stop();
     68            notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
    6169            return;
    6270        }
     
    6674        try {
    6775            i2ps = createI2PSocket(dest, opts);
    68             new I2PTunnelRunner(s, i2ps, sockLock, null, mySockets);
     76            new Runner(s, i2ps);
    6977        } catch (Exception ex) {
    7078            _log.error("Could not make DCC connection to " + _dest + ':' + _remotePort, ex);
     
    7381                try { i2ps.close(); } catch (IOException ioe) {}
    7482            }
     83            notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
     84        }
     85        stop();
     86    }
     87
     88    public long getExpires() {
     89        return _expires;
     90    }
     91
     92    /**
     93     *  Stop listening for new sockets.
     94     *  We can't call super.close() as it kills all sockets in the sockMgr
     95     */
     96    public void stop() {
     97        open = false;
     98        try {
     99            ss.close();
     100        } catch (IOException ioe) {}
     101    }
     102
     103    /**
     104     *  Just so we can do the callbacks
     105     */
     106    private class Runner extends I2PTunnelRunner {
     107
     108        public Runner(Socket s, I2PSocket i2ps) {
     109            // super calls start()
     110            super(s, i2ps, sockLock, null, mySockets);
     111        }
     112
     113        @Override
     114        public void run() {
     115            notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this);
     116            super.run();
     117            notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
    75118        }
    76119    }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java

    r55bfd6a r7fa874f  
    3737
    3838    private final ConcurrentHashMap<Integer, LocalAddress> _outgoing;
     39    private final ConcurrentHashMap<Integer, I2PSocket> _active;
     40
    3941    // list of client tunnels?
    4042    private static long _id;
     
    5557    private static final int MAX_OUTGOING_ACTIVE = 20;
    5658    private static final long OUTBOUND_EXPIRE = 30*60*1000;
     59    private static final long ACTIVE_EXPIRE = 60*60*1000;
    5760
    5861    /**
     
    6871        super(DUMMY, 0, sktMgr, l, notifyThis, tunnel);
    6972        _outgoing = new ConcurrentHashMap(8);
     73        _active = new ConcurrentHashMap(8);
    7074    }
    7175
     
    8286            expireOutbound();
    8387            int myPort = socket.getLocalPort();
    84             // TODO remove, add to active
    85             LocalAddress local = _outgoing.get(Integer.valueOf(myPort));
     88            // Port is a one-time-use only
     89            LocalAddress local = _outgoing.remove(Integer.valueOf(myPort));
    8690            if (local == null) {
    8791                if (_log.shouldLog(Log.WARN))
    88                     _log.warn("Incoming DCC connection for unknown port " + myPort);
     92                    _log.warn("Rejecting incoming DCC connection for unknown port " + myPort);
    8993                try {
    9094                    socket.close();
     
    97101            Socket s = new Socket(local.ia, local.port);
    98102            new I2PTunnelRunner(s, socket, slock, null, null);
     103            _active.put(Integer.valueOf(myPort), socket);
    99104        } catch (SocketException ex) {
    100105            try {
     
    106111            _log.error("Error while waiting for I2PConnections", ex);
    107112        }
     113    }
     114
     115    @Override
     116    public boolean close(boolean forced) {
     117        _outgoing.clear();
     118        _active.clear();
     119        return super.close(forced);
    108120    }
    109121
     
    118130    public int newOutgoing(byte[] ip, int port, String type) {
    119131        expireOutbound();
    120         if (_outgoing.size() >= MAX_OUTGOING_PENDING) {
    121             _log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING);
     132        if (_outgoing.size() >= MAX_OUTGOING_PENDING ||
     133            _active.size() >= MAX_OUTGOING_ACTIVE) {
     134            _log.error("Too many outgoing DCC, max is " + MAX_OUTGOING_PENDING +
     135                       '/' + MAX_OUTGOING_ACTIVE + " pending/active");
    122136            return -1;
    123137        }
     
    131145        for (int i = 0; i < 10; i++) {
    132146            int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT);
     147            if (_active.containsKey(Integer.valueOf(iport)))
     148                continue;
    133149            LocalAddress old = _outgoing.putIfAbsent(Integer.valueOf(iport), client);
    134150            if (old != null)
     
    158174                iter.remove();
    159175        }
     176        for (Iterator<I2PSocket> iter = _active.values().iterator(); iter.hasNext(); ) {
     177            I2PSocket s = iter.next();
     178            if (s.isClosed())
     179                iter.remove();
     180        }
    160181    }
    161182
Note: See TracChangeset for help on using the changeset viewer.