Changeset f87e3b52


Ignore:
Timestamp:
Jul 16, 2011 8:22:00 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
8a9882c
Parents:
c826f7fb
Message:

more on resume/accept, untested

Location:
apps/i2ptunnel/java/src/net/i2p/i2ptunnel
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java

    rc826f7fb rf87e3b52  
    202202
    203203    public int resumeOutgoing(int port) {
     204        DCCClientManager tracker = _DCCClientManager;
     205        if (tracker != null)
     206            return tracker.resumeOutgoing(port);
    204207        return -1;
    205208    }
    206209
    207210    public int resumeIncoming(int port) {
     211        I2PTunnelDCCServer server = _DCCServer;
     212        if (server != null)
     213            return server.resumeIncoming(port);
    208214        return -1;
    209215    }
    210216
    211217    public int acceptOutgoing(int port) {
     218        I2PTunnelDCCServer server = _DCCServer;
     219        if (server != null)
     220            return server.acceptOutgoing(port);
    212221        return -1;
    213222    }
    214223
    215224    public int acceptIncoming(int port) {
     225        DCCClientManager tracker = _DCCClientManager;
     226        if (tracker != null)
     227            return tracker.acceptIncoming(port);
    216228        return -1;
    217229    }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/DCCClientManager.java

    rc826f7fb rf87e3b52  
    1616 * <pre>
    1717 *
    18  *                <---  I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
     18 *                                            direct conn
     19 *                <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
    1920 *   originating                                                                     responding
    2021 *   chat client                                                                     chat client
    21  *                ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
     22 *        CHAT    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
     23 *        SEND    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
     24 *        RESUME  <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
     25 *        ACCEPT  ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
    2226 *
    2327 * </pre>
     
    3236    private final Log _log;
    3337
     38    /** key is the DCC client's local port */
    3439    private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _incoming;
     40    /** key is the DCC client's local port */
    3541    private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _active;
     42    /** key is the DCC client's local port */
     43    private final ConcurrentHashMap<Integer, I2PTunnelDCCClient> _complete;
    3644
    3745    // list of client tunnels?
     
    5159        _incoming = new ConcurrentHashMap(8);
    5260        _active = new ConcurrentHashMap(8);
     61        _complete = new ConcurrentHashMap(8);
    5362    }
    5463
     
    6271        }
    6372        _active.clear();
     73        _complete.clear();
    6474        return true;
    6575    }
     
    6878     *  An incoming DCC request
    6979     *
    70      *  @param b32 remote dcc server address
    71      *  @param port remote dcc server port
     80     *  @param b32 remote dcc server b32 address
     81     *  @param port remote dcc server I2P port
    7282     *  @param type ignored
    73      *  @return local server port or -1 on error
     83     *  @return local DCC client tunnel port or -1 on error
    7484     */
    7585    public int newIncoming(String b32, int port, String type) {
     86        return newIncoming(b32, port, type, 0);
     87    }
     88
     89    /**
     90     *  @param localPort bind to port or 0; if nonzero it will be the rv
     91     */
     92    private int newIncoming(String b32, int port, String type, int localPort) {
    7693        expireInbound();
    7794        if (_incoming.size() >= MAX_INCOMING_PENDING ||
     
    84101            // Transparent tunnel used for all types...
    85102            // Do we need to do any filtering for chat?
    86             I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, port, l, sockMgr,
     103            I2PTunnelDCCClient cTunnel = new I2PTunnelDCCClient(b32, localPort, port, l, sockMgr,
    87104                                                                _dispatch, _tunnel, ++_id);
    88105            cTunnel.attachEventDispatcher(this);
     
    101118
    102119    /**
     120     *  An outgoing RESUME request
     121     *
     122     *  @param port local DCC client tunnel port
     123     *  @return remote DCC server i2p port or -1 on error
     124     */
     125    public int resumeOutgoing(int port) {
     126        Integer lport = Integer.valueOf(port);
     127        I2PTunnelDCCClient tun = _complete.get(lport);
     128        if (tun == null) {
     129            tun = _active.get(lport);
     130            if (tun == null)
     131                // shouldn't happen
     132                tun = _incoming.get(lport);
     133        }
     134        if (tun != null) {
     135            tun.stop();
     136            return tun.getLocalPort();
     137        }
     138        return -1;
     139    }
     140
     141    /**
     142     *  An incoming ACCEPT response
     143     *
     144     *  @param port remote dcc server I2P port
     145     *  @return local DCC client tunnel port or -1 on error
     146     */
     147    public int acceptIncoming(int port) {
     148        // do a reverse lookup
     149        for (I2PTunnelDCCClient tun : _complete.values()) {
     150            if (tun.getRemotePort() == port)
     151                return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort());
     152        }
     153        for (I2PTunnelDCCClient tun : _active.values()) {
     154            if (tun.getRemotePort() == port)
     155                return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort());
     156        }
     157        for (I2PTunnelDCCClient tun : _incoming.values()) {
     158            if (tun.getRemotePort() == port) {
     159                // shouldn't happen
     160                tun.stop();
     161                return newIncoming(tun.getDest(), port, "ACCEPT", tun.getLocalPort());
     162            }
     163        }
     164        return -1;
     165    }
     166
     167    /**
    103168     *  The EventReceiver callback
    104169     */
     
    125190                _log.warn("Added client tunnel for port " + lport +
    126191                          " pending count now: " + _incoming.size() +
    127                           " active count now: " + _active.size());
     192                          " active count now: " + _active.size() +
     193                          " complete count now: " + _complete.size());
    128194        }
    129195    }
    130196
    131197    private void connStopped(Integer lport) {
    132         _incoming.remove(lport);
    133         _active.remove(lport);
     198        I2PTunnelDCCClient tun = _incoming.remove(lport);
     199        if (tun != null)
     200            _complete.put(lport, tun);
     201        tun = _active.remove(lport);
     202        if (tun != null)
     203            _complete.put(lport, tun);
    134204        if (_log.shouldLog(Log.WARN))
    135205            _log.warn("Removed client tunnel for port " + lport +
    136206                      " pending count now: " + _incoming.size() +
    137                       " active count now: " + _active.size());
     207                      " active count now: " + _active.size() +
     208                      " complete count now: " + _complete.size());
    138209    }
    139210
     
    147218        }
    148219        // shouldn't need to expire active
     220        for (Iterator<I2PTunnelDCCClient> iter = _complete.values().iterator(); iter.hasNext(); ) {
     221            I2PTunnelDCCClient c = iter.next();
     222            if (c.getExpires() < _tunnel.getContext().clock().now()) {
     223                iter.remove();
     224                c.stop();
     225            }
     226        }
    149227    }
    150228}
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCClient.java

    rc826f7fb rf87e3b52  
    3030    private final String _dest;
    3131    private final int _remotePort;
    32     private final long _expires;
     32    private long _expires;
    3333
    3434    private static final long INBOUND_EXPIRE = 30*60*1000;
     35    private static final long INBOUND_STOP_EXPIRE = 30*60*1000;
    3536    public static final String CONNECT_START_EVENT = "connectionStarted";
    3637    public static final String CONNECT_STOP_EVENT = "connectionStopped";
     
    3839    /**
    3940     * @param dest the target, presumably b32
     41     * @param localPort if 0, use any port, get actual port selected with getLocalPort()
    4042     * @throws IllegalArgumentException if the I2PTunnel does not contain
    4143     *                                  valid config to contact the router
    4244     */
    43     public I2PTunnelDCCClient(String dest, int remotePort, Logging l,
     45    public I2PTunnelDCCClient(String dest, int localPort, int remotePort, Logging l,
    4446                           I2PSocketManager sktMgr, EventDispatcher notifyThis,
    4547                           I2PTunnel tunnel, long clientId) throws IllegalArgumentException {
    46         super(0, l, sktMgr, tunnel, notifyThis, clientId);
     48        super(localPort, l, sktMgr, tunnel, notifyThis, clientId);
    4749        _dest = dest;
    4850        _remotePort = remotePort;
     
    9092    }
    9193
     94    public String getDest() {
     95        return _dest;
     96    }
     97
     98    public int getRemotePort() {
     99        return _remotePort;
     100    }
     101
    92102    /**
    93103     *  Stop listening for new sockets.
     
    113123        @Override
    114124        public void run() {
     125            _expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE;
    115126            notifyEvent(CONNECT_START_EVENT, I2PTunnelDCCClient.this);
    116127            super.run();
     128            _expires = getTunnel().getContext().clock().now() + INBOUND_STOP_EXPIRE;
    117129            notifyEvent(CONNECT_STOP_EVENT, Integer.valueOf(getLocalPort()));
    118130        }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/I2PTunnelDCCServer.java

    rc826f7fb rf87e3b52  
    77import java.net.UnknownHostException;
    88import java.util.Iterator;
     9import java.util.List;
    910import java.util.Map;
    1011import java.util.concurrent.ConcurrentHashMap;
     12import java.util.concurrent.CopyOnWriteArrayList;
    1113
    1214import net.i2p.client.streaming.I2PSocket;
     
    2527 * <pre>
    2628 *
    27  *                <---  I2PTunnelDCCServer <--------------- I2PTunnelDCCClient <----
     29 *                                            direct conn
     30 *                <---> I2PTunnelDCCServer <--------------->I2PTunnelDCCClient <---->
    2831 *   originating                                                                     responding
    2932 *   chat client                                                                     chat client
    30  *                ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
     33 *        CHAT    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
     34 *        SEND    ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
     35 *        RESUME  <--- I2PTunnelIRCClient <-- IRC server <-- I2TunnelIRCClient <-----
     36 *        ACCEPT  ---> I2PTunnelIRCClient --> IRC server --> I2TunnelIRCClient ----->
    3137 *
    3238 * </pre>
     
    3642public class I2PTunnelDCCServer extends I2PTunnelServer {
    3743
     44    /** key is the server's local I2P port */
    3845    private final ConcurrentHashMap<Integer, LocalAddress> _outgoing;
    39     private final ConcurrentHashMap<Integer, I2PSocket> _active;
     46    /** key is the server's local I2P port */
     47    private final ConcurrentHashMap<Integer, LocalAddress> _active;
     48    /** key is the server's local I2P port */
     49    private final ConcurrentHashMap<Integer, LocalAddress> _resume;
     50    private final List<I2PSocket> _sockList;
    4051
    4152    // list of client tunnels?
     
    7283        _outgoing = new ConcurrentHashMap(8);
    7384        _active = new ConcurrentHashMap(8);
     85        _resume = new ConcurrentHashMap(8);
     86        _sockList = new CopyOnWriteArrayList();
    7487    }
    7588
     
    100113                          " sending to " + local.ia + ':' + local.port);
    101114            Socket s = new Socket(local.ia, local.port);
    102             new I2PTunnelRunner(s, socket, slock, null, null);
    103             _active.put(Integer.valueOf(myPort), socket);
     115            _sockList.add(socket);
     116            new I2PTunnelRunner(s, socket, slock, null, _sockList);
     117            local.socket = socket;
     118            local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
     119            _active.put(Integer.valueOf(myPort), local);
    104120        } catch (SocketException ex) {
    105121            try {
     
    117133        _outgoing.clear();
    118134        _active.clear();
     135        for (I2PSocket s : _sockList) {
     136            try {
     137                s.close();
     138            } catch (IOException ioe) {}
     139        }
     140        _sockList.clear();
    119141        return super.close(forced);
    120142    }
     
    129151     */
    130152    public int newOutgoing(byte[] ip, int port, String type) {
     153        return newOutgoing(ip, port, type, 0);
     154    }
     155
     156    /**
     157     *  @param port local dcc server I2P port or 0 to pick one at random
     158     */
     159    private int newOutgoing(byte[] ip, int port, String type, int i2pPort) {
    131160        expireOutbound();
    132161        if (_outgoing.size() >= MAX_OUTGOING_PENDING ||
     
    142171            return -1;
    143172        }
     173        int limit = i2pPort > 0 ? 10 : 1;
    144174        LocalAddress client = new LocalAddress(ia, port, getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE);
    145         for (int i = 0; i < 10; i++) {
    146             int iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT);
     175        for (int i = 0; i < limit; i++) {
     176            int iport;
     177            if (i2pPort > 0)
     178                iport = i2pPort;
     179            else
     180                iport = MIN_I2P_PORT + getTunnel().getContext().random().nextInt(1 + MAX_I2P_PORT - MIN_I2P_PORT);
    147181            if (_active.containsKey(Integer.valueOf(iport)))
    148182                continue;
     
    157191    }
    158192
     193    /**
     194     *  An incoming RESUME request
     195     *
     196     *  @param port local dcc server I2P port
     197     *  @return local IRC client DCC port or -1 on error
     198     */
     199    public int resumeIncoming(int port) {
     200        Integer iport = Integer.valueOf(port);
     201        LocalAddress local = _active.remove(iport);
     202        if (local != null) {
     203            local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
     204            _resume.put(Integer.valueOf(local.port), local);
     205            return local.port;
     206        }
     207        local = _outgoing.get(iport);
     208        if (local != null) {
     209            // shouldn't happen
     210            local.expire = getTunnel().getContext().clock().now() + OUTBOUND_EXPIRE;
     211            return local.port;
     212        }
     213        return -1;
     214    }
     215
     216    /**
     217     *  An outgoing ACCEPT response
     218     *
     219     *  @param port local irc client DCC port
     220     *  @return local DCC server i2p port or -1 on error
     221     */
     222    public int acceptOutgoing(int port) {
     223        // do a reverse lookup
     224        for (Iterator<Map.Entry<Integer, LocalAddress>> iter = _resume.entrySet().iterator(); iter.hasNext(); ) {
     225            Map.Entry<Integer, LocalAddress> e = iter.next();
     226            LocalAddress local = e.getValue();
     227            if (local.port == port) {
     228                iter.remove();
     229                return newOutgoing(local.ia.getAddress(), port, "ACCEPT", e.getKey().intValue());
     230            }
     231        }
     232        return -1;
     233    }
     234
    159235    private InetAddress getListenHost(Logging l) {
    160236        try {
     
    174250                iter.remove();
    175251        }
    176         for (Iterator<I2PSocket> iter = _active.values().iterator(); iter.hasNext(); ) {
    177             I2PSocket s = iter.next();
    178             if (s.isClosed())
     252        for (Iterator<LocalAddress> iter = _active.values().iterator(); iter.hasNext(); ) {
     253            LocalAddress a = iter.next();
     254            I2PSocket s = a.socket;
     255            if (s != null && s.isClosed())
    179256                iter.remove();
    180257        }
     
    184261        public final InetAddress ia;
    185262        public final int port;
    186         public final long expire;
     263        public long expire;
     264        public I2PSocket socket;
    187265
    188266        public LocalAddress(InetAddress a, int p, long exp) {
Note: See TracChangeset for help on using the changeset viewer.