Changeset ee2fd32


Ignore:
Timestamp:
Sep 25, 2008 11:31:57 PM (12 years ago)
Author:
sponge <sponge@…>
Branches:
master
Children:
dd7d993
Parents:
fa5c721
Message:

disapproval of revision 'bd09bb36a90e766b3a406d78055d427a6200dd41'

Files:
1 deleted
12 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java

    rfa5c721 ree2fd32  
    1313import java.net.Socket;
    1414import java.net.SocketException;
    15 import java.net.SocketTimeoutException;
    1615import java.util.Iterator;
    1716import java.util.Properties;
     
    221220                        _log.error("Error accepting", ce);
    222221                    // not killing the server..
    223                 } catch(SocketTimeoutException ste) {
    224                         // ignored, we never set the timeout
    225                 }
     222                }
    226223            }
    227224        }
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java

    rfa5c721 ree2fd32  
    33import java.net.ConnectException;
    44
    5 import java.net.SocketTimeoutException;
    65import net.i2p.I2PException;
    76
     
    1110 */
    1211public interface I2PServerSocket {
     12    /**
     13     * Closes the socket.
     14     */
     15    public void close() throws I2PException;
    1316
    14         /**
    15          * Closes the socket.
    16          */
    17         public void close() throws I2PException;
     17    /**
     18     * Waits for the next socket connecting.  If a remote user tried to make a
     19     * connection and the local application wasn't .accept()ing new connections,
     20     * they should get refused (if .accept() doesnt occur in some small period)
     21     *
     22     * @return a connected I2PSocket
     23     *
     24     * @throws I2PException if there is a problem with reading a new socket
     25     *         from the data available (aka the I2PSession closed, etc)
     26     * @throws ConnectException if the I2PServerSocket is closed
     27     */
     28    public I2PSocket accept() throws I2PException, ConnectException;
    1829
    19         /**
    20          * Waits for the next socket connecting.  If a remote user tried to make a
    21          * connection and the local application wasn't .accept()ing new connections,
    22          * they should get refused (if .accept() doesnt occur in some small period)
    23          *
    24          * @return a connected I2PSocket
    25          *
    26          * @throws I2PException if there is a problem with reading a new socket
    27          *         from the data available (aka the I2PSession closed, etc)
    28          * @throws ConnectException if the I2PServerSocket is closed
    29          * @throws SocketTimeoutException
    30          */
    31         public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
    32 
    33         /**
    34          * Set Sock Option accept timeout
    35          * @param x
    36          */
    37         public void setSoTimeout(long x);
    38 
    39         /**
    40          * Get Sock Option accept timeout
    41          * @return timeout
    42          */
    43         public long getSoTimeout();
    44 
    45         /**
    46          * Access the manager which is coordinating the server socket
    47          */
    48         public I2PSocketManager getManager();
     30    /**
     31     * Access the manager which is coordinating the server socket
     32     */
     33    public I2PSocketManager getManager();
    4934}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    rfa5c721 ree2fd32  
    1818 */
    1919class I2PServerSocketImpl implements I2PServerSocket {
    20 
    21         private final static Log _log = new Log(I2PServerSocketImpl.class);
    22         private I2PSocketManager mgr;
    23         /** list of sockets waiting for the client to accept them */
    24         private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
    25         /** have we been closed */
    26         private volatile boolean closing = false;
    27         /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
    28         private Object socketAcceptedLock = new Object();
    29         /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
    30         private Object socketAddedLock = new Object();
    31 
    32         /**
    33          * Set Sock Option accept timeout stub, does nothing
    34          * @param x
    35          */
    36         public void setSoTimeout(long x) {
    37         }
    38 
    39         /**
    40          * Get Sock Option accept timeout stub, does nothing
    41          * @return timeout
    42          */
    43         public long getSoTimeout() {
    44                 return -1;
    45         }
    46 
    47         public I2PServerSocketImpl(I2PSocketManager mgr) {
    48                 this.mgr = mgr;
    49         }
    50 
    51         /**
    52          * Waits for the next socket connecting.  If a remote user tried to make a
    53          * connection and the local application wasn't .accept()ing new connections,
    54          * they should get refused (if .accept() doesnt occur in some small period -
    55          * currently 5 seconds)
    56          *
    57          * @return a connected I2PSocket
    58          *
    59          * @throws I2PException if there is a problem with reading a new socket
    60          *         from the data available (aka the I2PSession closed, etc)
    61          * @throws ConnectException if the I2PServerSocket is closed
    62          */
    63         public I2PSocket accept() throws I2PException, ConnectException {
    64                 if(_log.shouldLog(Log.DEBUG)) {
    65                         _log.debug("accept() called, pending: " + pendingSockets.size());
    66                 }
    67                 I2PSocket ret = null;
    68 
    69                 while((ret == null) && (!closing)) {
    70                         while(pendingSockets.size() <= 0) {
    71                                 if(closing) {
    72                                         throw new ConnectException("I2PServerSocket closed");
    73                                 }
    74                                 try {
    75                                         synchronized(socketAddedLock) {
    76                                                 socketAddedLock.wait();
    77                                         }
    78                                 } catch(InterruptedException ie) {
    79                                 }
    80                         }
    81                         synchronized(pendingSockets) {
    82                                 if(pendingSockets.size() > 0) {
    83                                         ret = (I2PSocket)pendingSockets.remove(0);
    84                                 }
    85                         }
    86                         if(ret != null) {
    87                                 synchronized(socketAcceptedLock) {
    88                                         socketAcceptedLock.notifyAll();
    89                                 }
    90                         }
    91                 }
    92 
    93                 if(_log.shouldLog(Log.DEBUG)) {
    94                         _log.debug("TIMING: handed out accept result " + ret.hashCode());
    95                 }
    96                 return ret;
    97         }
    98 
    99         /**
    100          * Make the socket available and wait until the client app accepts it, or until
    101          * the given timeout elapses.  This doesn't have any limits on the queue size -
    102          * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
    103          *
    104          * @param timeoutMs how long to wait until accept
    105          * @return true if the socket was accepted, false if the timeout expired
    106          *         or the socket was closed
    107          */
    108         public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
    109                 if(_log.shouldLog(Log.DEBUG)) {
    110                         _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
    111                 }
    112                 if(closing) {
    113                         if(_log.shouldLog(Log.WARN)) {
    114                                 _log.warn("Already closing the socket");
    115                         }
    116                         return false;
    117                 }
    118 
    119                 Clock clock = I2PAppContext.getGlobalContext().clock();
    120                 long start = clock.now();
    121                 long end = start + timeoutMs;
    122                 pendingSockets.add(s);
    123                 synchronized(socketAddedLock) {
    124                         socketAddedLock.notifyAll();
    125                 }
    126 
    127                 // keep looping until the socket has been grabbed by the accept()
    128                 // (or the expiration passes, or the socket is closed)
    129                 while(pendingSockets.contains(s)) {
    130                         long now = clock.now();
    131                         if(now >= end) {
    132                                 if(_log.shouldLog(Log.INFO)) {
    133                                         _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
    134                                 }
    135                                 pendingSockets.remove(s);
    136                                 return false;
    137                         }
    138                         if(closing) {
    139                                 if(_log.shouldLog(Log.WARN)) {
    140                                         _log.warn("Server socket closed while waiting for accept");
    141                                 }
    142                                 pendingSockets.remove(s);
    143                                 return false;
    144                         }
    145                         long remaining = end - now;
    146                         try {
    147                                 synchronized(socketAcceptedLock) {
    148                                         socketAcceptedLock.wait(remaining);
    149                                 }
    150                         } catch(InterruptedException ie) {
    151                         }
    152                 }
    153                 long now = clock.now();
    154                 if(_log.shouldLog(Log.DEBUG)) {
    155                         _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString());
    156                 }
    157                 return true;
    158         }
    159 
    160         public void close() {
    161                 closing = true;
    162                 // let anyone .accept()ing know to fsck off
    163                 synchronized(socketAddedLock) {
    164                         socketAddedLock.notifyAll();
    165                 }
    166                 // let anyone addWaitForAccept()ing know to fsck off
    167                 synchronized(socketAcceptedLock) {
    168                         socketAcceptedLock.notifyAll();
    169                 }
    170         }
    171 
    172         public I2PSocketManager getManager() {
    173                 return mgr;
    174         }
     20    private final static Log _log = new Log(I2PServerSocketImpl.class);
     21    private I2PSocketManager mgr;
     22    /** list of sockets waiting for the client to accept them */
     23    private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
     24   
     25    /** have we been closed */
     26    private volatile boolean closing = false;
     27   
     28    /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
     29    private Object socketAcceptedLock = new Object();
     30    /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
     31    private Object socketAddedLock = new Object();
     32   
     33    public I2PServerSocketImpl(I2PSocketManager mgr) {
     34        this.mgr = mgr;
     35    }
     36   
     37    /**
     38     * Waits for the next socket connecting.  If a remote user tried to make a
     39     * connection and the local application wasn't .accept()ing new connections,
     40     * they should get refused (if .accept() doesnt occur in some small period -
     41     * currently 5 seconds)
     42     *
     43     * @return a connected I2PSocket
     44     *
     45     * @throws I2PException if there is a problem with reading a new socket
     46     *         from the data available (aka the I2PSession closed, etc)
     47     * @throws ConnectException if the I2PServerSocket is closed
     48     */
     49    public I2PSocket accept() throws I2PException, ConnectException {
     50        if (_log.shouldLog(Log.DEBUG))
     51            _log.debug("accept() called, pending: " + pendingSockets.size());
     52       
     53        I2PSocket ret = null;
     54       
     55        while ( (ret == null) && (!closing) ){
     56            while (pendingSockets.size() <= 0) {
     57                if (closing) throw new ConnectException("I2PServerSocket closed");
     58                try {
     59                    synchronized(socketAddedLock) {
     60                        socketAddedLock.wait();
     61                    }
     62                } catch (InterruptedException ie) {}
     63            }
     64            synchronized (pendingSockets) {
     65                if (pendingSockets.size() > 0) {
     66                    ret = (I2PSocket)pendingSockets.remove(0);
     67                }
     68            }
     69            if (ret != null) {
     70                synchronized (socketAcceptedLock) {
     71                    socketAcceptedLock.notifyAll();
     72                }
     73            }
     74        }
     75       
     76        if (_log.shouldLog(Log.DEBUG))
     77            _log.debug("TIMING: handed out accept result " + ret.hashCode());
     78        return ret;
     79    }
     80   
     81    /**
     82     * Make the socket available and wait until the client app accepts it, or until
     83     * the given timeout elapses.  This doesn't have any limits on the queue size -
     84     * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
     85     *
     86     * @param timeoutMs how long to wait until accept
     87     * @return true if the socket was accepted, false if the timeout expired
     88     *         or the socket was closed
     89     */
     90    public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
     91        if (_log.shouldLog(Log.DEBUG))
     92            _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
     93       
     94        if (closing) {
     95            if (_log.shouldLog(Log.WARN))
     96                _log.warn("Already closing the socket");
     97            return false;
     98        }
     99       
     100        Clock clock = I2PAppContext.getGlobalContext().clock();
     101        long start = clock.now();
     102        long end = start + timeoutMs;
     103        pendingSockets.add(s);
     104        synchronized (socketAddedLock) {
     105            socketAddedLock.notifyAll();
     106        }
     107       
     108        // keep looping until the socket has been grabbed by the accept()
     109        // (or the expiration passes, or the socket is closed)
     110        while (pendingSockets.contains(s)) {
     111            long now = clock.now();
     112            if (now >= end) {
     113                if (_log.shouldLog(Log.INFO))
     114                    _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
     115                pendingSockets.remove(s);
     116                return false;
     117            }
     118            if (closing) {
     119                if (_log.shouldLog(Log.WARN))
     120                    _log.warn("Server socket closed while waiting for accept");
     121                pendingSockets.remove(s);
     122                return false;
     123            }
     124            long remaining = end - now;
     125            try {
     126                synchronized (socketAcceptedLock) {
     127                    socketAcceptedLock.wait(remaining);
     128                }
     129            } catch (InterruptedException ie) {}
     130        }
     131        long now = clock.now();
     132        if (_log.shouldLog(Log.DEBUG))
     133            _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
     134        return true;
     135    }
     136   
     137    public void close() {
     138        closing = true;
     139        // let anyone .accept()ing know to fsck off
     140        synchronized (socketAddedLock) {
     141            socketAddedLock.notifyAll();
     142        }
     143        // let anyone addWaitForAccept()ing know to fsck off
     144        synchronized (socketAcceptedLock) {
     145            socketAcceptedLock.notifyAll();
     146        }
     147    }
     148   
     149    public I2PSocketManager getManager() { return mgr; }
    175150}
  • apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java

    rfa5c721 ree2fd32  
    66import java.io.InputStream;
    77import java.net.ConnectException;
    8 import java.net.SocketTimeoutException;
    98import java.util.Properties;
    109
     
    2221 */
    2322public class StreamSinkServer {
    24 
    25         private Log _log;
    26         private String _sinkDir;
    27         private String _destFile;
    28         private String _i2cpHost;
    29         private int _i2cpPort;
    30         private int _handlers;
    31 
    32         /**
    33          * Create but do not start the streaming server. 
    34          *
    35          * @param sinkDir Directory to store received files in
    36          * @param ourDestFile filename to write our binary destination to
    37          */
    38         public StreamSinkServer(String sinkDir, String ourDestFile) {
    39                 this(sinkDir, ourDestFile, null, -1, 3);
    40         }
    41 
    42         public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
    43                 _sinkDir = sinkDir;
    44                 _destFile = ourDestFile;
    45                 _i2cpHost = i2cpHost;
    46                 _i2cpPort = i2cpPort;
    47                 _handlers = handlers;
    48                 _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
    49         }
    50 
    51         /**
    52          * Actually fire up the server - this call blocks forever (or until the server
    53          * socket closes)
    54          *
    55          */
    56         public void runServer() {
    57                 I2PSocketManager mgr = null;
    58                 if(_i2cpHost != null) {
    59                         mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
    60                 } else {
    61                         mgr = I2PSocketManagerFactory.createManager();
    62                 }
    63                 Destination dest = mgr.getSession().getMyDestination();
    64                 if(_log.shouldLog(Log.INFO)) {
    65                         _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
    66                 }
    67                 FileOutputStream fos = null;
    68                 try {
    69                         fos = new FileOutputStream(_destFile);
    70                         dest.writeBytes(fos);
    71                 } catch(IOException ioe) {
    72                         _log.error("Error writing out our destination to " + _destFile, ioe);
    73                         return;
    74                 } catch(DataFormatException dfe) {
    75                         _log.error("Error formatting the destination", dfe);
    76                         return;
    77                 } finally {
    78                         if(fos != null) {
    79                                 try {
    80                                         fos.close();
    81                                 } catch(IOException ioe) {
    82                                 }
    83                         }
    84                 }
    85 
    86                 I2PServerSocket sock = mgr.getServerSocket();
    87                 startup(sock);
    88         }
    89 
    90         public void startup(I2PServerSocket sock) {
    91                 for(int i = 0; i < _handlers; i++) {
    92                         I2PThread t = new I2PThread(new ClientRunner(sock));
    93                         t.setName("Handler " + i);
    94                         t.setDaemon(false);
    95                         t.start();
    96                 }
    97         }
    98 
    99         /**
    100          * Actually deal with a client - pull anything they send us and write it to a file.
    101          *
    102          */
    103         private class ClientRunner implements Runnable {
    104 
    105                 private I2PServerSocket _socket;
    106 
    107                 public ClientRunner(I2PServerSocket socket) {
    108                         _socket = socket;
    109                 }
    110 
    111                 public void run() {
    112                         while(true) {
    113                                 try {
    114                                         I2PSocket socket = _socket.accept();
    115                                         if(socket != null) {
    116                                                 handle(socket);
    117                                         }
    118                                 } catch(I2PException ie) {
    119                                         _log.error("Error accepting connection", ie);
    120                                         return;
    121                                 } catch(ConnectException ce) {
    122                                         _log.error("Connection already dropped", ce);
    123                                         return;
    124                                 } catch(SocketTimeoutException ste) {
    125                                         // ignored
    126                                 }
    127                         }
    128                 }
    129 
    130                 private void handle(I2PSocket sock) {
    131                         FileOutputStream fos = null;
    132                         try {
    133                                 File sink = new File(_sinkDir);
    134                                 if(!sink.exists()) {
    135                                         sink.mkdirs();
    136                                 }
    137                                 File cur = File.createTempFile("clientSink", ".dat", sink);
    138                                 fos = new FileOutputStream(cur);
    139                                 if(_log.shouldLog(Log.DEBUG)) {
    140                                         _log.debug("Writing to " + cur.getAbsolutePath());
    141                                 }
    142                         } catch(IOException ioe) {
    143                                 _log.error("Error creating sink", ioe);
    144                                 return;
    145                         }
    146 
    147                         long start = System.currentTimeMillis();
    148                         try {
    149                                 InputStream in = sock.getInputStream();
    150                                 byte buf[] = new byte[4096];
    151                                 long written = 0;
    152                                 int read = 0;
    153                                 while((read = in.read(buf)) != -1) {
    154                                         //_fos.write(buf, 0, read);
    155                                         written += read;
    156                                         if(_log.shouldLog(Log.DEBUG)) {
    157                                                 _log.debug("read and wrote " + read + " (" + written + ")");
    158                                         }
    159                                 }
    160                                 fos.write(("written: [" + written + "]\n").getBytes());
    161                                 long lifetime = System.currentTimeMillis() - start;
    162                                 _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
    163                         } catch(IOException ioe) {
    164                                 _log.error("Error writing the sink", ioe);
    165                         } finally {
    166                                 if(fos != null) {
    167                                         try {
    168                                                 fos.close();
    169                                         } catch(IOException ioe) {
    170                                         }
    171                                 }
    172                                 if(sock != null) {
    173                                         try {
    174                                                 sock.close();
    175                                         } catch(IOException ioe) {
    176                                         }
    177                                 }
    178                                 _log.debug("Client socket closed");
    179                         }
    180                 }
    181         }
    182 
    183         /**
    184          * Fire up the streaming server.  <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
    185          * <ul>
    186          *  <li><b>sinkDir</b>: Directory to store received files in</li>
    187          *  <li><b>ourDestFile</b>: filename to write our binary destination to</li>
    188          *  <li><b>numHandlers</b>: how many concurrent connections to handle</li>
    189          * </ul>
    190          */
    191         public static void main(String args[]) {
    192                 StreamSinkServer server = null;
    193                 switch(args.length) {
    194                         case 0:
    195                                 server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
    196                                 break;
    197                         case 2:
    198                                 server = new StreamSinkServer(args[0], args[1]);
    199                                 break;
    200                         case 4:
    201                         case 5:
    202                                 int handlers = 3;
    203                                 if(args.length == 5) {
    204                                         try {
    205                                                 handlers = Integer.parseInt(args[4]);
    206                                         } catch(NumberFormatException nfe) {
    207                                         }
    208                                 }
    209                                 try {
    210                                         int port = Integer.parseInt(args[1]);
    211                                         server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
    212                                 } catch(NumberFormatException nfe) {
    213                                         System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
    214                                 }
    215                                 break;
    216                         default:
    217                                 System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
    218                 }
    219                 if(server != null) {
    220                         server.runServer();
    221                 }
    222         }
     23    private Log _log;
     24    private String _sinkDir;
     25    private String _destFile;
     26    private String _i2cpHost;
     27    private int _i2cpPort;
     28    private int _handlers;
     29   
     30    /**
     31     * Create but do not start the streaming server. 
     32     *
     33     * @param sinkDir Directory to store received files in
     34     * @param ourDestFile filename to write our binary destination to
     35     */
     36    public StreamSinkServer(String sinkDir, String ourDestFile) {
     37        this(sinkDir, ourDestFile, null, -1, 3);
     38    }
     39    public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
     40        _sinkDir = sinkDir;
     41        _destFile = ourDestFile;
     42        _i2cpHost = i2cpHost;
     43        _i2cpPort = i2cpPort;
     44        _handlers = handlers;
     45        _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
     46    }
     47   
     48    /**
     49     * Actually fire up the server - this call blocks forever (or until the server
     50     * socket closes)
     51     *
     52     */
     53    public void runServer() {
     54        I2PSocketManager mgr = null;
     55        if (_i2cpHost != null)
     56            mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
     57        else
     58            mgr = I2PSocketManagerFactory.createManager();
     59        Destination dest = mgr.getSession().getMyDestination();
     60        if (_log.shouldLog(Log.INFO))
     61            _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
     62        FileOutputStream fos = null;
     63        try {
     64            fos = new FileOutputStream(_destFile);
     65            dest.writeBytes(fos);
     66        } catch (IOException ioe) {
     67            _log.error("Error writing out our destination to " + _destFile, ioe);
     68            return;
     69        } catch (DataFormatException dfe) {
     70            _log.error("Error formatting the destination", dfe);
     71            return;
     72        } finally {
     73            if (fos != null) try { fos.close(); } catch (IOException ioe) {}
     74        }
     75       
     76        I2PServerSocket sock = mgr.getServerSocket();
     77        startup(sock);
     78    }
     79   
     80    public void startup(I2PServerSocket sock) {
     81        for (int i = 0; i < _handlers; i++) {
     82            I2PThread t = new I2PThread(new ClientRunner(sock));
     83            t.setName("Handler " + i);
     84            t.setDaemon(false);
     85            t.start();
     86        }
     87    }
     88   
     89    /**
     90     * Actually deal with a client - pull anything they send us and write it to a file.
     91     *
     92     */
     93    private class ClientRunner implements Runnable {
     94        private I2PServerSocket _socket;
     95        public ClientRunner(I2PServerSocket socket) {
     96            _socket = socket;
     97        }
     98        public void run() {
     99            while (true) {
     100                try {
     101                    I2PSocket socket = _socket.accept();
     102                    if (socket != null)
     103                        handle(socket);
     104                } catch (I2PException ie) {
     105                    _log.error("Error accepting connection", ie);
     106                    return;
     107                } catch (ConnectException ce) {
     108                    _log.error("Connection already dropped", ce);
     109                    return;
     110                }       
     111            }
     112        }
     113       
     114        private void handle(I2PSocket sock) {
     115            FileOutputStream fos = null;
     116            try {
     117                File sink = new File(_sinkDir);
     118                if (!sink.exists())
     119                    sink.mkdirs();
     120                File cur = File.createTempFile("clientSink", ".dat", sink);
     121                fos = new FileOutputStream(cur);
     122                if (_log.shouldLog(Log.DEBUG))
     123                    _log.debug("Writing to " + cur.getAbsolutePath());
     124            } catch (IOException ioe) {
     125                _log.error("Error creating sink", ioe);
     126                return;
     127            }
     128           
     129            long start = System.currentTimeMillis();
     130            try {
     131                InputStream in = sock.getInputStream();
     132                byte buf[] = new byte[4096];
     133                long written = 0;
     134                int read = 0;
     135                while ( (read = in.read(buf)) != -1) {
     136                    //_fos.write(buf, 0, read);
     137                    written += read;
     138                    if (_log.shouldLog(Log.DEBUG))
     139                        _log.debug("read and wrote " + read + " (" + written + ")");
     140                }
     141                fos.write(("written: [" + written + "]\n").getBytes());
     142                long lifetime = System.currentTimeMillis() - start;
     143                _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
     144            } catch (IOException ioe) {
     145                _log.error("Error writing the sink", ioe);
     146            } finally {
     147                if (fos != null) try { fos.close(); } catch (IOException ioe) {}
     148                if (sock != null) try { sock.close(); } catch (IOException ioe) {}
     149                _log.debug("Client socket closed");
     150            }
     151        }
     152    }
     153   
     154    /**
     155     * Fire up the streaming server.  <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
     156     * <ul>
     157     *  <li><b>sinkDir</b>: Directory to store received files in</li>
     158     *  <li><b>ourDestFile</b>: filename to write our binary destination to</li>
     159     *  <li><b>numHandlers</b>: how many concurrent connections to handle</li>
     160     * </ul>
     161     */
     162    public static void main(String args[]) {
     163        StreamSinkServer server = null;
     164        switch (args.length) {
     165            case 0:
     166                server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
     167                break;
     168            case 2:
     169                server = new StreamSinkServer(args[0], args[1]);
     170                break;
     171            case 4:
     172            case 5:
     173                int handlers = 3;
     174                if (args.length == 5) {
     175                    try {
     176                        handlers = Integer.parseInt(args[4]);
     177                    } catch (NumberFormatException nfe) {}
     178                }
     179                try {
     180                    int port = Integer.parseInt(args[1]);
     181                    server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
     182                } catch (NumberFormatException nfe) {
     183                    System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
     184                }
     185                break;
     186            default:
     187                System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
     188        }
     189        if (server != null)
     190            server.runServer();
     191    }
    223192}
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java

    rfa5c721 ree2fd32  
    11package net.i2p.client.streaming;
    22
    3 import java.net.SocketTimeoutException;
    43import java.util.ArrayList;
    54import java.util.List;
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    rfa5c721 ree2fd32  
    2222 */
    2323public class ConnectionManager {
    24 
    25         private I2PAppContext _context;
    26         private Log _log;
    27         private I2PSession _session;
    28         private MessageHandler _messageHandler;
    29         private PacketHandler _packetHandler;
    30         private ConnectionHandler _connectionHandler;
    31         private PacketQueue _outboundQueue;
    32         private SchedulerChooser _schedulerChooser;
    33         private ConnectionPacketHandler _conPacketHandler;
    34         /** Inbound stream ID (Long) to Connection map */
    35         private Map _connectionByInboundId;
    36         /** Ping ID (Long) to PingRequest */
    37         private Map _pendingPings;
    38         private boolean _allowIncoming;
    39         private int _maxConcurrentStreams;
    40         private ConnectionOptions _defaultOptions;
    41         private volatile int _numWaiting;
    42         private Object _connectionLock;
    43         private long SoTimeout;
    44 
    45         public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
    46                 _context = context;
    47                 _log = context.logManager().getLog(ConnectionManager.class);
    48                 _connectionByInboundId = new HashMap(32);
    49                 _pendingPings = new HashMap(4);
    50                 _connectionLock = new Object();
    51                 _messageHandler = new MessageHandler(context, this);
    52                 _packetHandler = new PacketHandler(context, this);
    53                 _connectionHandler = new ConnectionHandler(context, this);
    54                 _schedulerChooser = new SchedulerChooser(context);
    55                 _conPacketHandler = new ConnectionPacketHandler(context);
    56                 _session = session;
    57                 session.setSessionListener(_messageHandler);
    58                 _outboundQueue = new PacketQueue(context, session, this);
    59                 _allowIncoming = false;
    60                 _maxConcurrentStreams = maxConcurrent;
    61                 _defaultOptions = defaultOptions;
    62                 _numWaiting = 0;
    63                 /** Socket timeout for accept() */
    64                 SoTimeout = -1;
    65 
    66                 _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    67                 _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    68                 _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    69                 _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    70                 _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    71                 _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    72                 _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    73                 _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    74                 _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    75                 _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
    76         }
    77 
    78         Connection getConnectionByInboundId(long id) {
    79                 synchronized(_connectionLock) {
    80                         return (Connection)_connectionByInboundId.get(new Long(id));
    81                 }
    82         }
    83 
    84         /**
    85          * not guaranteed to be unique, but in case we receive more than one packet
    86          * on an inbound connection that we havent ack'ed yet...
    87          */
    88         Connection getConnectionByOutboundId(long id) {
    89                 synchronized(_connectionLock) {
    90                         for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
    91                                 Connection con = (Connection)iter.next();
    92                                 if(DataHelper.eq(con.getSendStreamId(), id)) {
    93                                         return con;
    94                                 }
    95                         }
    96                 }
    97                 return null;
    98         }
    99 
    100         /**
    101          * Set the socket accept() timeout.
    102          * @param x
    103          */
    104         public void MsetSoTimeout(long x) {
    105                 SoTimeout = x;
    106         }
    107 
    108         /**
    109          * Get the socket accept() timeout.
    110          * @return
    111          */
    112         public long MgetSoTimeout() {
    113                 return SoTimeout;
    114         }
    115 
    116         public void setAllowIncomingConnections(boolean allow) {
    117                 _connectionHandler.setActive(allow);
    118         }
    119 
    120         /** should we acceot connections, or just reject everyone? */
    121         public boolean getAllowIncomingConnections() {
    122                 return _connectionHandler.getActive();
    123         }
    124 
    125         /**
    126          * Create a new connection based on the SYN packet we received.
    127          *
    128          * @return created Connection with the packet's data already delivered to
    129          *         it, or null if the syn's streamId was already taken
    130          */
    131         public Connection receiveConnection(Packet synPacket) {
    132                 Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
    133                 long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
    134                 boolean reject = false;
    135                 int active = 0;
    136                 int total = 0;
    137                 synchronized(_connectionLock) {
    138                         total = _connectionByInboundId.size();
    139                         for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
    140                                 if(((Connection)iter.next()).getIsConnected()) {
    141                                         active++;
    142                                 }
    143                         }
    144                         if(locked_tooManyStreams()) {
    145                                 reject = true;
    146                         } else {
    147                                 while(true) {
    148                                         Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
    149                                         if(oldCon == null) {
    150                                                 break;
    151                                         } else {
    152                                                 _connectionByInboundId.put(new Long(receiveId), oldCon);
    153                                                 // receiveId already taken, try another
    154                                                 receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
    155                                         }
    156                                 }
    157                         }
    158                 }
    159 
    160                 _context.statManager().addRateData("stream.receiveActive", active, total);
    161 
    162                 if(reject) {
    163                         if(_log.shouldLog(Log.WARN)) {
    164                                 _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections");
    165                         }
    166                         PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
    167                         reply.setFlag(Packet.FLAG_RESET);
    168                         reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
    169                         reply.setAckThrough(synPacket.getSequenceNum());
    170                         reply.setSendStreamId(synPacket.getReceiveStreamId());
    171                         reply.setReceiveStreamId(0);
    172                         reply.setOptionalFrom(_session.getMyDestination());
    173                         // this just sends the packet - no retries or whatnot
    174                         _outboundQueue.enqueue(reply);
    175                         return null;
    176                 }
    177 
    178                 con.setReceiveStreamId(receiveId);
    179                 try {
    180                         con.getPacketHandler().receivePacket(synPacket, con);
    181                 } catch(I2PException ie) {
    182                         synchronized(_connectionLock) {
    183                                 _connectionByInboundId.remove(new Long(receiveId));
    184                         }
    185                         return null;
    186                 }
    187 
    188                 _context.statManager().addRateData("stream.connectionReceived", 1, 0);
    189                 return con;
    190         }
    191         private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000;
    192 
    193         /**
    194          * Build a new connection to the given peer.  This blocks if there is no
    195          * connection delay, otherwise it returns immediately.
    196          *
    197          * @return new connection, or null if we have exceeded our limit
    198          */
    199         public Connection connect(Destination peer, ConnectionOptions opts) {
    200                 Connection con = null;
    201                 long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
    202                 long expiration = _context.clock().now() + opts.getConnectTimeout();
    203                 if(opts.getConnectTimeout() <= 0) {
    204                         expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
    205                 }
    206                 _numWaiting++;
    207                 while(true) {
    208                         long remaining = expiration - _context.clock().now();
    209                         if(remaining <= 0) {
    210                                 if(_log.shouldLog(Log.WARN)) {
    211                                         _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
    212                                 }
    213                                 _numWaiting--;
    214                                 return null;
    215                         }
    216                         boolean reject = false;
    217                         synchronized(_connectionLock) {
    218                                 if(locked_tooManyStreams()) {
    219                                         // allow a full buffer of pending/waiting streams
    220                                         if(_numWaiting > _maxConcurrentStreams) {
    221                                                 if(_log.shouldLog(Log.WARN)) {
    222                                                         _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
    223                                                 }
    224                                                 _numWaiting--;
    225                                                 return null;
    226                                         }
    227 
    228                                         // no remaining streams, lets wait a bit
    229                                         try {
    230                                                 _connectionLock.wait(remaining);
    231                                         } catch(InterruptedException ie) {
    232                                         }
    233                                 } else {
    234                                         con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
    235                                         con.setRemotePeer(peer);
    236 
    237                                         while(_connectionByInboundId.containsKey(new Long(receiveId))) {
    238                                                 receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
    239                                         }
    240                                         _connectionByInboundId.put(new Long(receiveId), con);
    241                                         break; // stop looping as a psuedo-wait
    242                                 }
    243                         }
    244                 }
    245 
    246                 // ok we're in...
    247                 con.setReceiveStreamId(receiveId);
    248                 con.eventOccurred();
    249 
    250                 _log.debug("Connect() conDelay = " + opts.getConnectDelay());
    251                 if(opts.getConnectDelay() <= 0) {
    252                         con.waitForConnect();
    253                 }
    254                 if(_numWaiting > 0) {
    255                         _numWaiting--;
    256                 }
    257                 _context.statManager().addRateData("stream.connectionCreated", 1, 0);
    258                 return con;
    259         }
    260 
    261         private boolean locked_tooManyStreams() {
    262                 if(_maxConcurrentStreams <= 0) {
    263                         return false;
    264                 }
    265                 if(_connectionByInboundId.size() < _maxConcurrentStreams) {
    266                         return false;
    267                 }
    268                 int active = 0;
    269                 for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
    270                         Connection con = (Connection)iter.next();
    271                         if(con.getIsConnected()) {
    272                                 active++;
    273                         }
    274                 }
    275 
    276                 if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) {
    277                         _log.info("More than 100 connections!  " + active + " total: " + _connectionByInboundId.size());
    278                 }
    279                 return (active >= _maxConcurrentStreams);
    280         }
    281 
    282         public MessageHandler getMessageHandler() {
    283                 return _messageHandler;
    284         }
    285 
    286         public PacketHandler getPacketHandler() {
    287                 return _packetHandler;
    288         }
    289 
    290         public ConnectionHandler getConnectionHandler() {
    291                 return _connectionHandler;
    292         }
    293 
    294         public I2PSession getSession() {
    295                 return _session;
    296         }
    297 
    298         public PacketQueue getPacketQueue() {
    299                 return _outboundQueue;
    300         }
    301 
    302         /**
    303          * Something b0rked hard, so kill all of our connections without mercy.
    304          * Don't bother sending close packets.
    305          *
    306          */
    307         public void disconnectAllHard() {
    308                 synchronized(_connectionLock) {
    309                         for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
    310                                 Connection con = (Connection)iter.next();
    311                                 con.disconnect(false, false);
    312                         }
    313                         _connectionByInboundId.clear();
    314                         _connectionLock.notifyAll();
    315                 }
    316         }
    317 
    318         /**
    319          * Drop the (already closed) connection on the floor.
    320          *
    321          */
    322         public void removeConnection(Connection con) {
    323                 boolean removed = false;
    324                 synchronized(_connectionLock) {
    325                         Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
    326                         removed = (o == con);
    327                         if(_log.shouldLog(Log.DEBUG)) {
    328                                 _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
    329                         }
    330                         if(!removed && _log.shouldLog(Log.DEBUG)) {
    331                                 _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values());
    332                         }
    333                         _connectionLock.notifyAll();
    334                 }
    335                 if(removed) {
    336                         _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
    337                         _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
    338                         _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
    339                         _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
    340                         _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
    341                         _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
    342                         _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
    343                         _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
    344                         _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
    345                 }
    346         }
    347 
    348         /** return a set of Connection objects */
    349         public Set listConnections() {
    350                 synchronized(_connectionLock) {
    351                         return new HashSet(_connectionByInboundId.values());
    352                 }
    353         }
    354 
    355         public boolean ping(Destination peer, long timeoutMs) {
    356                 return ping(peer, timeoutMs, true);
    357         }
    358 
    359         public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
    360                 return ping(peer, timeoutMs, blocking, null, null, null);
    361         }
    362 
    363         public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
    364                 Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1);
    365                 PacketLocal packet = new PacketLocal(_context, peer);
    366                 packet.setSendStreamId(id.longValue());
    367                 packet.setFlag(Packet.FLAG_ECHO);
    368                 packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
    369                 packet.setOptionalFrom(_session.getMyDestination());
    370                 if((keyToUse != null) && (tagsToSend != null)) {
    371                         packet.setKeyUsed(keyToUse);
    372                         packet.setTagsSent(tagsToSend);
    373                 }
    374 
    375                 PingRequest req = new PingRequest(peer, packet, notifier);
    376 
    377                 synchronized(_pendingPings) {
    378                         _pendingPings.put(id, req);
    379                 }
    380 
    381                 _outboundQueue.enqueue(packet);
    382                 packet.releasePayload();
    383 
    384                 if(blocking) {
    385                         synchronized(req) {
    386                                 if(!req.pongReceived()) {
    387                                         try {
    388                                                 req.wait(timeoutMs);
    389                                         } catch(InterruptedException ie) {
    390                                         }
    391                                 }
    392                         }
    393 
    394                         synchronized(_pendingPings) {
    395                                 _pendingPings.remove(id);
    396                         }
    397                 } else {
    398                         SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
    399                 }
    400 
    401                 boolean ok = req.pongReceived();
    402                 return ok;
    403         }
    404 
    405         interface PingNotifier {
    406 
    407                 public void pingComplete(boolean ok);
    408         }
    409 
    410         private class PingFailed implements SimpleTimer.TimedEvent {
    411 
    412                 private Long _id;
    413                 private PingNotifier _notifier;
    414 
    415                 public PingFailed(Long id, PingNotifier notifier) {
    416                         _id = id;
    417                         _notifier = notifier;
    418                 }
    419 
    420                 public void timeReached() {
    421                         boolean removed = false;
    422                         synchronized(_pendingPings) {
    423                                 Object o = _pendingPings.remove(_id);
    424                                 if(o != null) {
    425                                         removed = true;
    426                                 }
    427                         }
    428                         if(removed) {
    429                                 if(_notifier != null) {
    430                                         _notifier.pingComplete(false);
    431                                 }
    432                                 if(_log.shouldLog(Log.INFO)) {
    433                                         _log.info("Ping failed");
    434                                 }
    435                         }
    436                 }
    437         }
    438 
    439         private class PingRequest {
    440 
    441                 private boolean _ponged;
    442                 private Destination _peer;
    443                 private PacketLocal _packet;
    444                 private PingNotifier _notifier;
    445 
    446                 public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
    447                         _ponged = false;
    448                         _peer = peer;
    449                         _packet = packet;
    450                         _notifier = notifier;
    451                 }
    452 
    453                 public void pong() {
    454                         _log.debug("Ping successful");
    455                         _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
    456                         synchronized(ConnectionManager.PingRequest.this) {
    457                                 _ponged = true;
    458                                 ConnectionManager.PingRequest.this.notifyAll();
    459                         }
    460                         if(_notifier != null) {
    461                                 _notifier.pingComplete(true);
    462                         }
    463                 }
    464 
    465                 public boolean pongReceived() {
    466                         return _ponged;
    467                 }
    468         }
    469 
    470         void receivePong(long pingId) {
    471                 PingRequest req = null;
    472                 synchronized(_pendingPings) {
    473                         req = (PingRequest)_pendingPings.remove(new Long(pingId));
    474                 }
    475                 if(req != null) {
    476                         req.pong();
    477                 }
    478         }
     24    private I2PAppContext _context;
     25    private Log _log;
     26    private I2PSession _session;
     27    private MessageHandler _messageHandler;
     28    private PacketHandler _packetHandler;
     29    private ConnectionHandler _connectionHandler;
     30    private PacketQueue _outboundQueue;
     31    private SchedulerChooser _schedulerChooser;
     32    private ConnectionPacketHandler _conPacketHandler;
     33    /** Inbound stream ID (Long) to Connection map */
     34    private Map _connectionByInboundId;
     35    /** Ping ID (Long) to PingRequest */
     36    private Map _pendingPings;
     37    private boolean _allowIncoming;
     38    private int _maxConcurrentStreams;
     39    private ConnectionOptions _defaultOptions;
     40    private volatile int _numWaiting;
     41    private Object _connectionLock;
     42   
     43    public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
     44        _context = context;
     45        _log = context.logManager().getLog(ConnectionManager.class);
     46        _connectionByInboundId = new HashMap(32);
     47        _pendingPings = new HashMap(4);
     48        _connectionLock = new Object();
     49        _messageHandler = new MessageHandler(context, this);
     50        _packetHandler = new PacketHandler(context, this);
     51        _connectionHandler = new ConnectionHandler(context, this);
     52        _schedulerChooser = new SchedulerChooser(context);
     53        _conPacketHandler = new ConnectionPacketHandler(context);
     54        _session = session;
     55        session.setSessionListener(_messageHandler);
     56        _outboundQueue = new PacketQueue(context, session, this);
     57        _allowIncoming = false;
     58        _maxConcurrentStreams = maxConcurrent;
     59        _defaultOptions = defaultOptions;
     60        _numWaiting = 0;
     61        _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     62        _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     63        _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     64        _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     65        _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     66        _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     67        _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     68        _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     69        _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     70        _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     71    }
     72   
     73    Connection getConnectionByInboundId(long id) {
     74        synchronized (_connectionLock) {
     75            return (Connection)_connectionByInboundId.get(new Long(id));
     76        }
     77    }
     78    /**
     79     * not guaranteed to be unique, but in case we receive more than one packet
     80     * on an inbound connection that we havent ack'ed yet...
     81     */
     82    Connection getConnectionByOutboundId(long id) {
     83        synchronized (_connectionLock) {
     84            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
     85                Connection con = (Connection)iter.next();
     86                if (DataHelper.eq(con.getSendStreamId(), id))
     87                    return con;
     88            }
     89        }
     90        return null;
     91    }
     92   
     93    public void setAllowIncomingConnections(boolean allow) {
     94        _connectionHandler.setActive(allow);
     95    }
     96    /** should we acceot connections, or just reject everyone? */
     97    public boolean getAllowIncomingConnections() {
     98        return _connectionHandler.getActive();
     99    }
     100   
     101    /**
     102     * Create a new connection based on the SYN packet we received.
     103     *
     104     * @return created Connection with the packet's data already delivered to
     105     *         it, or null if the syn's streamId was already taken
     106     */
     107    public Connection receiveConnection(Packet synPacket) {
     108        Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
     109        long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
     110        boolean reject = false;
     111        int active = 0;
     112        int total = 0;
     113        synchronized (_connectionLock) {
     114            total = _connectionByInboundId.size();
     115            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
     116                if ( ((Connection)iter.next()).getIsConnected() )
     117                    active++;
     118            }
     119            if (locked_tooManyStreams()) {
     120                reject = true;
     121            } else {
     122                while (true) {
     123                    Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
     124                    if (oldCon == null) {
     125                        break;
     126                    } else {
     127                        _connectionByInboundId.put(new Long(receiveId), oldCon);
     128                        // receiveId already taken, try another
     129                        receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
     130                    }
     131                }
     132            }
     133        }
     134       
     135        _context.statManager().addRateData("stream.receiveActive", active, total);
     136       
     137        if (reject) {
     138            if (_log.shouldLog(Log.WARN))
     139                _log.warn("Refusing connection since we have exceeded our max of "
     140                          + _maxConcurrentStreams + " connections");
     141            PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
     142            reply.setFlag(Packet.FLAG_RESET);
     143            reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
     144            reply.setAckThrough(synPacket.getSequenceNum());
     145            reply.setSendStreamId(synPacket.getReceiveStreamId());
     146            reply.setReceiveStreamId(0);
     147            reply.setOptionalFrom(_session.getMyDestination());
     148            // this just sends the packet - no retries or whatnot
     149            _outboundQueue.enqueue(reply);
     150            return null;
     151        }
     152       
     153        con.setReceiveStreamId(receiveId);
     154        try {
     155            con.getPacketHandler().receivePacket(synPacket, con);
     156        } catch (I2PException ie) {
     157            synchronized (_connectionLock) {
     158                _connectionByInboundId.remove(new Long(receiveId));
     159            }
     160            return null;
     161        }
     162       
     163        _context.statManager().addRateData("stream.connectionReceived", 1, 0);
     164        return con;
     165    }
     166   
     167    private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
     168   
     169    /**
     170     * Build a new connection to the given peer.  This blocks if there is no
     171     * connection delay, otherwise it returns immediately.
     172     *
     173     * @return new connection, or null if we have exceeded our limit
     174     */
     175    public Connection connect(Destination peer, ConnectionOptions opts) {
     176        Connection con = null;
     177        long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
     178        long expiration = _context.clock().now() + opts.getConnectTimeout();
     179        if (opts.getConnectTimeout() <= 0)
     180            expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
     181        _numWaiting++;
     182        while (true) {
     183            long remaining = expiration - _context.clock().now();
     184            if (remaining <= 0) {
     185                if (_log.shouldLog(Log.WARN))
     186                _log.warn("Refusing to connect since we have exceeded our max of "
     187                          + _maxConcurrentStreams + " connections");
     188                _numWaiting--;
     189                return null;
     190            }
     191            boolean reject = false;
     192            synchronized (_connectionLock) {
     193                if (locked_tooManyStreams()) {
     194                    // allow a full buffer of pending/waiting streams
     195                    if (_numWaiting > _maxConcurrentStreams) {
     196                        if (_log.shouldLog(Log.WARN))
     197                            _log.warn("Refusing connection since we have exceeded our max of "
     198                                      + _maxConcurrentStreams + " and there are " + _numWaiting
     199                                      + " waiting already");
     200                        _numWaiting--;
     201                        return null;
     202                    }
     203                   
     204                    // no remaining streams, lets wait a bit
     205                    try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
     206                } else {
     207                    con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
     208                    con.setRemotePeer(peer);
     209           
     210                    while (_connectionByInboundId.containsKey(new Long(receiveId))) {
     211                        receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
     212                    }
     213                    _connectionByInboundId.put(new Long(receiveId), con);
     214                    break; // stop looping as a psuedo-wait
     215                }
     216            }
     217        }
     218
     219        // ok we're in...
     220        con.setReceiveStreamId(receiveId);       
     221        con.eventOccurred();
     222       
     223        _log.debug("Connect() conDelay = " + opts.getConnectDelay());
     224        if (opts.getConnectDelay() <= 0) {
     225            con.waitForConnect();
     226        }
     227        if (_numWaiting > 0)
     228            _numWaiting--;
     229       
     230        _context.statManager().addRateData("stream.connectionCreated", 1, 0);
     231        return con;
     232    }
     233
     234    private boolean locked_tooManyStreams() {
     235        if (_maxConcurrentStreams <= 0) return false;
     236        if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
     237        int active = 0;
     238        for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
     239            Connection con = (Connection)iter.next();
     240            if (con.getIsConnected())
     241                active++;
     242        }
     243       
     244        if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
     245            _log.info("More than 100 connections!  " + active
     246                      + " total: " + _connectionByInboundId.size());
     247
     248        return (active >= _maxConcurrentStreams);
     249    }
     250   
     251    public MessageHandler getMessageHandler() { return _messageHandler; }
     252    public PacketHandler getPacketHandler() { return _packetHandler; }
     253    public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
     254    public I2PSession getSession() { return _session; }
     255    public PacketQueue getPacketQueue() { return _outboundQueue; }
     256   
     257    /**
     258     * Something b0rked hard, so kill all of our connections without mercy.
     259     * Don't bother sending close packets.
     260     *
     261     */
     262    public void disconnectAllHard() {
     263        synchronized (_connectionLock) {
     264            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
     265                Connection con = (Connection)iter.next();
     266                con.disconnect(false, false);
     267            }
     268            _connectionByInboundId.clear();
     269            _connectionLock.notifyAll();
     270        }
     271    }
     272   
     273    /**
     274     * Drop the (already closed) connection on the floor.
     275     *
     276     */
     277    public void removeConnection(Connection con) {
     278        boolean removed = false;
     279        synchronized (_connectionLock) {
     280            Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
     281            removed = (o == con);
     282            if (_log.shouldLog(Log.DEBUG))
     283                _log.debug("Connection removed? " + removed + " remaining: "
     284                           + _connectionByInboundId.size() + ": " + con);
     285            if (!removed && _log.shouldLog(Log.DEBUG))
     286                _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
     287            _connectionLock.notifyAll();
     288        }
     289        if (removed) {
     290            _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
     291            _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
     292            _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
     293            _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
     294            _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
     295            _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
     296            _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
     297            _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
     298            _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
     299        }
     300    }
     301   
     302    /** return a set of Connection objects */
     303    public Set listConnections() {
     304        synchronized (_connectionLock) {
     305            return new HashSet(_connectionByInboundId.values());
     306        }
     307    }
     308   
     309    public boolean ping(Destination peer, long timeoutMs) {
     310        return ping(peer, timeoutMs, true);
     311    }
     312    public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
     313        return ping(peer, timeoutMs, blocking, null, null, null);
     314    }
     315    public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
     316        Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
     317        PacketLocal packet = new PacketLocal(_context, peer);
     318        packet.setSendStreamId(id.longValue());
     319        packet.setFlag(Packet.FLAG_ECHO);
     320        packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
     321        packet.setOptionalFrom(_session.getMyDestination());
     322        if ( (keyToUse != null) && (tagsToSend != null) ) {
     323            packet.setKeyUsed(keyToUse);
     324            packet.setTagsSent(tagsToSend);
     325        }
     326       
     327        PingRequest req = new PingRequest(peer, packet, notifier);
     328       
     329        synchronized (_pendingPings) {
     330            _pendingPings.put(id, req);
     331        }
     332       
     333        _outboundQueue.enqueue(packet);
     334        packet.releasePayload();
     335       
     336        if (blocking) {
     337            synchronized (req) {
     338                if (!req.pongReceived())
     339                    try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
     340            }
     341           
     342            synchronized (_pendingPings) {
     343                _pendingPings.remove(id);
     344            }
     345        } else {
     346            SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
     347        }
     348       
     349        boolean ok = req.pongReceived();
     350        return ok;
     351    }
     352
     353    interface PingNotifier {
     354        public void pingComplete(boolean ok);
     355    }
     356   
     357    private class PingFailed implements SimpleTimer.TimedEvent {
     358        private Long _id;
     359        private PingNotifier _notifier;
     360        public PingFailed(Long id, PingNotifier notifier) {
     361            _id = id;
     362            _notifier = notifier;
     363        }
     364       
     365        public void timeReached() {
     366            boolean removed = false;
     367            synchronized (_pendingPings) {
     368                Object o = _pendingPings.remove(_id);
     369                if (o != null)
     370                    removed = true;
     371            }
     372            if (removed) {
     373                if (_notifier != null)
     374                    _notifier.pingComplete(false);
     375                if (_log.shouldLog(Log.INFO))
     376                    _log.info("Ping failed");
     377            }
     378        }
     379    }
     380   
     381    private class PingRequest {
     382        private boolean _ponged;
     383        private Destination _peer;
     384        private PacketLocal _packet;
     385        private PingNotifier _notifier;
     386        public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
     387            _ponged = false;
     388            _peer = peer;
     389            _packet = packet;
     390            _notifier = notifier;
     391        }
     392        public void pong() {
     393            _log.debug("Ping successful");
     394            _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
     395            synchronized (ConnectionManager.PingRequest.this) {
     396                _ponged = true;
     397                ConnectionManager.PingRequest.this.notifyAll();
     398            }
     399            if (_notifier != null)
     400                _notifier.pingComplete(true);
     401        }
     402        public boolean pongReceived() { return _ponged; }
     403    }
     404   
     405    void receivePong(long pingId) {
     406        PingRequest req = null;
     407        synchronized (_pendingPings) {
     408            req = (PingRequest)_pendingPings.remove(new Long(pingId));
     409        }
     410        if (req != null)
     411            req.pong();
     412    }
    479413}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java

    rfa5c721 ree2fd32  
    11package net.i2p.client.streaming;
    22
    3 import java.net.SocketTimeoutException;
    4 import java.util.logging.Level;
    5 import java.util.logging.Logger;
    63import net.i2p.I2PException;
    74
     
    118 */
    129public class I2PServerSocketFull implements I2PServerSocket {
    13 
    14         private I2PSocketManagerFull _socketManager;
    15 
    16         /**
    17          *
    18          * @param mgr
    19          */
    20         public I2PServerSocketFull(I2PSocketManagerFull mgr) {
    21                 _socketManager = mgr;
    22         }
    23 
    24         /**
    25          *
    26          * @return
    27          * @throws net.i2p.I2PException
    28          * @throws SocketTimeoutException
    29          */
    30         public I2PSocket accept() throws I2PException, SocketTimeoutException {
    31                         return _socketManager.receiveSocket();
    32         }
    33 
    34         public long getSoTimeout() {
    35                 return _socketManager.getConnectionManager().MgetSoTimeout();
    36         }
    37 
    38         public void setSoTimeout(long x) {
    39                 _socketManager.getConnectionManager().MsetSoTimeout(x);
    40         }
    41         /**
    42          * Close the connection.
    43          */
    44         public void close() {
    45                 _socketManager.getConnectionManager().setAllowIncomingConnections(false);
    46         }
    47 
    48         /**
    49          *
    50          * @return _socketManager
    51          */
    52         public I2PSocketManager getManager() {
    53                 return _socketManager;
    54         }
     10    private I2PSocketManagerFull _socketManager;
     11   
     12    public I2PServerSocketFull(I2PSocketManagerFull mgr) {
     13        _socketManager = mgr;
     14    }
     15   
     16    public I2PSocket accept() throws I2PException {
     17        return _socketManager.receiveSocket();
     18    }
     19   
     20    public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); }
     21   
     22    public I2PSocketManager getManager() { return _socketManager; }
    5523}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java

    rfa5c721 ree2fd32  
    1212 */
    1313public class I2PSocketFull implements I2PSocket {
    14 
    15         private Connection _connection;
    16         private I2PSocket.SocketErrorListener _listener;
    17         private Destination _remotePeer;
    18         private Destination _localPeer;
    19 
    20         public I2PSocketFull(Connection con) {
    21                 _connection = con;
    22                 if(con != null) {
    23                         _remotePeer = con.getRemotePeer();
    24                         _localPeer = con.getSession().getMyDestination();
    25                 }
    26         }
    27 
    28 
    29         public void close() throws IOException {
    30                 Connection c = _connection;
    31                 if(c == null) {
    32                         return;
    33                 }
    34                 if(c.getIsConnected()) {
    35                         OutputStream out = c.getOutputStream();
    36                         if(out != null) {
    37                                 try {
    38                                         out.close();
    39                                 } catch(IOException ioe) {
    40                                         // ignore any write error, as we want to keep on and kill the
    41                                         // con (thanks Complication!)
    42                                 }
    43                         }
    44                         c.disconnect(true);
    45                 } else {
    46                         //throw new IOException("Not connected");
    47                 }
    48                 destroy();
    49         }
    50 
    51         Connection getConnection() {
    52                 return _connection;
    53         }
    54 
    55         public InputStream getInputStream() {
    56                 Connection c = _connection;
    57                 if(c != null) {
    58                         return c.getInputStream();
    59                 } else {
    60                         return null;
    61                 }
    62         }
    63 
    64         public I2PSocketOptions getOptions() {
    65                 Connection c = _connection;
    66                 if(c != null) {
    67                         return c.getOptions();
    68                 } else {
    69                         return null;
    70                 }
    71         }
    72 
    73         public OutputStream getOutputStream() throws IOException {
    74                 Connection c = _connection;
    75                 if(c != null) {
    76                         return c.getOutputStream();
    77                 } else {
    78                         return null;
    79                 }
    80         }
    81 
    82         public Destination getPeerDestination() {
    83                 return _remotePeer;
    84         }
    85 
    86         public long getReadTimeout() {
    87                 I2PSocketOptions opts = getOptions();
    88                 if(opts != null) {
    89                         return opts.getReadTimeout();
    90                 } else {
    91                         return -1;
    92                 }
    93         }
    94 
    95         public Destination getThisDestination() {
    96                 return _localPeer;
    97         }
    98 
    99         public void setOptions(I2PSocketOptions options) {
    100                 Connection c = _connection;
    101                 if(c == null) {
    102                         return;
    103                 }
    104                 if(options instanceof ConnectionOptions) {
    105                         c.setOptions((ConnectionOptions)options);
    106                 } else {
    107                         c.setOptions(new ConnectionOptions(options));
    108                 }
    109         }
    110 
    111         public void setReadTimeout(long ms) {
    112                 Connection c = _connection;
    113                 if(c == null) {
    114                         return;
    115                 }
    116                 c.getInputStream().setReadTimeout((int)ms);
    117                 c.getOptions().setReadTimeout(ms);
    118         }
    119 
    120         public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
    121                 _listener = lsnr;
    122         }
    123 
    124         public boolean isClosed() {
    125                 Connection c = _connection;
    126                 return ((c == null) ||
    127                         (!c.getIsConnected()) ||
    128                         (c.getResetReceived()) ||
    129                         (c.getResetSent()));
    130         }
    131 
    132         void destroy() {
    133                 Connection c = _connection;
    134                 _connection = null;
    135                 _listener = null;
    136                 if(c != null) {
    137                         c.disconnectComplete();
    138                 }
    139         }
    140 
    141         public String toString() {
    142                 Connection c = _connection;
    143                 if(c == null) {
    144                         return super.toString();
    145                 } else {
    146                         return c.toString();
    147                 }
    148         }
     14    private Connection _connection;
     15    private I2PSocket.SocketErrorListener _listener;
     16    private Destination _remotePeer;
     17    private Destination _localPeer;
     18   
     19    public I2PSocketFull(Connection con) {
     20        _connection = con;
     21        if (con != null) {
     22            _remotePeer = con.getRemotePeer();
     23            _localPeer = con.getSession().getMyDestination();
     24        }
     25    }
     26   
     27    public void close() throws IOException {
     28        Connection c = _connection;
     29        if (c == null) return;
     30        if (c.getIsConnected()) {
     31            OutputStream out = c.getOutputStream();
     32            if (out != null) {
     33                try {
     34                    out.close();
     35                } catch (IOException ioe) {
     36                    // ignore any write error, as we want to keep on and kill the
     37                    // con (thanks Complication!)
     38                }
     39            }
     40            c.disconnect(true);
     41        } else {
     42            //throw new IOException("Not connected");
     43        }
     44        destroy();
     45    }
     46   
     47    Connection getConnection() { return _connection; }
     48   
     49    public InputStream getInputStream() {
     50        Connection c = _connection;
     51        if (c != null)
     52            return c.getInputStream();
     53        else
     54            return null;
     55    }
     56   
     57    public I2PSocketOptions getOptions() {
     58        Connection c = _connection;
     59        if (c != null)
     60            return c.getOptions();
     61        else
     62            return null;
     63    }
     64   
     65    public OutputStream getOutputStream() throws IOException {
     66        Connection c = _connection;
     67        if (c != null)
     68            return c.getOutputStream();
     69        else
     70            return null;
     71    }
     72   
     73    public Destination getPeerDestination() { return _remotePeer; }
     74   
     75    public long getReadTimeout() {
     76        I2PSocketOptions opts = getOptions();
     77        if (opts != null)
     78            return opts.getReadTimeout();
     79        else
     80            return -1;
     81    }
     82   
     83    public Destination getThisDestination() { return _localPeer; }
     84   
     85    public void setOptions(I2PSocketOptions options) {
     86        Connection c = _connection;
     87        if (c == null) return;
     88       
     89        if (options instanceof ConnectionOptions)
     90            c.setOptions((ConnectionOptions)options);
     91        else
     92            c.setOptions(new ConnectionOptions(options));
     93    }
     94   
     95    public void setReadTimeout(long ms) {
     96        Connection c = _connection;
     97        if (c == null) return;
     98       
     99        c.getInputStream().setReadTimeout((int)ms);
     100        c.getOptions().setReadTimeout(ms);
     101    }
     102   
     103    public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
     104        _listener = lsnr;
     105    }
     106   
     107    public boolean isClosed() {
     108        Connection c = _connection;
     109        return ((c == null) ||
     110                (!c.getIsConnected()) ||
     111                (c.getResetReceived()) ||
     112                (c.getResetSent()));
     113    }
     114   
     115    void destroy() {
     116        Connection c = _connection;
     117        _connection = null;
     118        _listener = null;
     119        if (c != null)
     120            c.disconnectComplete();
     121    }
     122    public String toString() {
     123        Connection c = _connection;
     124        if (c == null)
     125            return super.toString();
     126        else
     127            return c.toString();
     128    }
    149129}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java

    rfa5c721 ree2fd32  
    22
    33import java.net.NoRouteToHostException;
    4 import java.net.SocketTimeoutException;
    54import java.util.HashSet;
    65import java.util.Iterator;
     
    1413import net.i2p.data.Destination;
    1514import net.i2p.util.Log;
     15
    1616
    1717/**
     
    2424 */
    2525public class I2PSocketManagerFull implements I2PSocketManager {
    26 
    27         private I2PAppContext _context;
    28         private Log _log;
    29         private I2PSession _session;
    30         private I2PServerSocketFull _serverSocket;
    31         private ConnectionOptions _defaultOptions;
    32         private long _acceptTimeout;
    33         private String _name;
    34         private int _maxStreams;
    35         private static int __managerId = 0;
    36         private ConnectionManager _connectionManager;
    37         /**
    38          * How long to wait for the client app to accept() before sending back CLOSE?
    39          * This includes the time waiting in the queue.  Currently set to 5 seconds.
    40          */
    41         private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000;
    42 
    43         /**
    44          *
    45          */
    46         public I2PSocketManagerFull() {
    47                 _context = null;
    48                 _session = null;
    49         }
    50 
    51         /**
    52          *
    53          * @param context
    54          * @param session
    55          * @param opts
    56          * @param name
    57          */
    58         public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
    59                 this();
    60                 init(context, session, opts, name);
    61         }
    62         /** how many streams will we allow at once?  */
    63         public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
    64 
    65         /**
    66          *
    67          *
    68          * @param context
    69          * @param session
    70          * @param opts
    71          * @param name
    72          */
    73         public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
    74                 _context = context;
    75                 _session = session;
    76                 _log = _context.logManager().getLog(I2PSocketManagerFull.class);
    77 
    78                 _maxStreams = -1;
    79                 try {
    80                         String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
    81                         _maxStreams = Integer.parseInt(num);
    82                 } catch(NumberFormatException nfe) {
    83                         if(_log.shouldLog(Log.WARN)) {
    84                                 _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
    85                         }
    86                         _maxStreams = -1;
    87                 }
    88                 _name = name + " " + (++__managerId);
    89                 _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
    90                 _defaultOptions = new ConnectionOptions(opts);
    91                 _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
    92                 _serverSocket = new I2PServerSocketFull(this);
    93 
    94                 if(_log.shouldLog(Log.INFO)) {
    95                         _log.info("Socket manager created.  \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts);
    96                 }
    97         }
    98 
    99         /**
    100          *
    101          * @return
    102          */
    103         public I2PSocketOptions buildOptions() {
    104                 return buildOptions(null);
    105         }
    106 
    107         /**
    108          *
    109          * @param opts
    110          * @return
    111          */
    112         public I2PSocketOptions buildOptions(Properties opts) {
    113                 ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
    114                 curOpts.setProperties(opts);
    115                 return curOpts;
    116         }
    117 
    118         /**
    119          *
    120          * @return
    121          */
    122         public I2PSession getSession() {
    123                 return _session;
    124         }
    125 
    126         /**
    127          *
    128          * @return
    129          */
    130         public ConnectionManager getConnectionManager() {
    131                 return _connectionManager;
    132         }
    133 
    134         /**
    135          *
    136          * @return
    137          * @throws net.i2p.I2PException
    138          * @throws java.net.SocketTimeoutException
    139          */
    140         public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
    141                 verifySession();
    142                 Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
    143                 if(_log.shouldLog(Log.DEBUG)) {
    144                         _log.debug("receiveSocket() called: " + con);
    145                 }
    146                 if(con != null) {
    147                         I2PSocketFull sock = new I2PSocketFull(con);
    148                         con.setSocket(sock);
    149                         return sock;
    150                 } else {
    151                         if(_connectionManager.MgetSoTimeout() == -1) {
    152                                 return null;
    153                         }
    154                         throw new SocketTimeoutException("I2PSocket timed out");
    155                 }
    156         }
    157 
    158         /**
    159          * Ping the specified peer, returning true if they replied to the ping within
    160          * the timeout specified, false otherwise.  This call blocks.
    161          *
    162          *
    163          * @param peer
    164          * @param timeoutMs
    165          * @return
    166          */
    167         public boolean ping(Destination peer, long timeoutMs) {
    168                 return _connectionManager.ping(peer, timeoutMs);
    169         }
    170 
    171         /**
    172          * How long should we wait for the client to .accept() a socket before
    173          * sending back a NACK/Close? 
    174          *
    175          * @param ms milliseconds to wait, maximum
    176          */
    177         public void setAcceptTimeout(long ms) {
    178                 _acceptTimeout = ms;
    179         }
    180 
    181         /**
    182          *
    183          * @return
    184          */
    185         public long getAcceptTimeout() {
    186                 return _acceptTimeout;
    187         }
    188 
    189         /**
    190          *
    191          * @param options
    192          */
    193         public void setDefaultOptions(I2PSocketOptions options) {
    194                 _defaultOptions = new ConnectionOptions((ConnectionOptions)options);
    195         }
    196 
    197         /**
    198          *
    199          * @return
    200          */
    201         public I2PSocketOptions getDefaultOptions() {
    202                 return _defaultOptions;
    203         }
    204 
    205         /**
    206          *
    207          * @return
    208          */
    209         public I2PServerSocket getServerSocket() {
    210                 _connectionManager.setAllowIncomingConnections(true);
    211                 return _serverSocket;
    212         }
    213 
    214         private void verifySession() throws I2PException {
    215                 if(!_connectionManager.getSession().isClosed()) {
    216                         return;
    217                 }
    218                 _connectionManager.getSession().connect();
    219         }
    220 
    221         /**
    222          * Create a new connected socket (block until the socket is created)
    223          *
    224          * @param peer Destination to connect to
    225          * @param options I2P socket options to be used for connecting
    226          *
    227          * @throws NoRouteToHostException if the peer is not found or not reachable
    228          * @throws I2PException if there is some other I2P-related problem
    229          */
    230         public I2PSocket connect(Destination peer, I2PSocketOptions options)
    231                 throws I2PException, NoRouteToHostException {
    232                 verifySession();
    233                 if(options == null) {
    234                         options = _defaultOptions;
    235                 }
    236                 ConnectionOptions opts = null;
    237                 if(options instanceof ConnectionOptions) {
    238                         opts = new ConnectionOptions((ConnectionOptions)options);
    239                 } else {
    240                         opts = new ConnectionOptions(options);
    241                 }
    242                 if(_log.shouldLog(Log.INFO)) {
    243                         _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
    244                 }
    245                 Connection con = _connectionManager.connect(peer, opts);
    246                 if(con == null) {
    247                         throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
    248                 }
    249                 I2PSocketFull socket = new I2PSocketFull(con);
    250                 con.setSocket(socket);
    251                 if(con.getConnectionError() != null) {
    252                         con.disconnect(false);
    253                         throw new NoRouteToHostException(con.getConnectionError());
    254                 }
    255                 return socket;
    256         }
    257 
    258         /**
    259          * Create a new connected socket (block until the socket is created)
    260          *
    261          * @param peer Destination to connect to
    262          *
    263          * @return
    264          * @throws NoRouteToHostException if the peer is not found or not reachable
    265          * @throws I2PException if there is some other I2P-related problem
    266          */
    267         public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
    268                 return connect(peer, _defaultOptions);
    269         }
    270 
    271         /**
    272          * Destroy the socket manager, freeing all the associated resources.  This
    273          * method will block untill all the managed sockets are closed.
    274          *
    275          */
    276         public void destroySocketManager() {
    277                 _connectionManager.disconnectAllHard();
    278                 _connectionManager.setAllowIncomingConnections(false);
    279                 // should we destroy the _session too?
    280                 // yes, since the old lib did (and SAM wants it to, and i dont know why not)
    281                 if((_session != null) && (!_session.isClosed())) {
    282                         try {
    283                                 _session.destroySession();
    284                         } catch(I2PSessionException ise) {
    285                                 _log.warn("Unable to destroy the session", ise);
    286                         }
    287                 }
    288         }
    289 
    290         /**
    291          * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
    292          *
    293          *
    294          * @return
    295          */
    296         public Set listSockets() {
    297                 Set connections = _connectionManager.listConnections();
    298                 Set rv = new HashSet(connections.size());
    299                 for(Iterator iter = connections.iterator(); iter.hasNext();) {
    300                         Connection con = (Connection)iter.next();
    301                         if(con.getSocket() != null) {
    302                                 rv.add(con.getSocket());
    303                         }
    304                 }
    305                 return rv;
    306         }
    307 
    308         /**
    309          *
    310          * @return
    311          */
    312         public String getName() {
    313                 return _name;
    314         }
    315 
    316         /**
    317          *
    318          * @param name
    319          */
    320         public void setName(String name) {
    321                 _name = name;
    322         }
    323 
    324         /**
    325          *
    326          * @param lsnr
    327          */
    328         public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    329                 _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
    330         }
    331 
    332         /**
    333          *
    334          * @param lsnr
    335          */
    336         public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    337                 _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
    338         }
     26    private I2PAppContext _context;
     27    private Log _log;
     28    private I2PSession _session;
     29    private I2PServerSocketFull _serverSocket;
     30    private ConnectionOptions _defaultOptions;
     31    private long _acceptTimeout;
     32    private String _name;
     33    private int _maxStreams;
     34    private static int __managerId = 0;
     35    private ConnectionManager _connectionManager;
     36   
     37    /**
     38     * How long to wait for the client app to accept() before sending back CLOSE?
     39     * This includes the time waiting in the queue.  Currently set to 5 seconds.
     40     */
     41    private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
     42   
     43    public I2PSocketManagerFull() {
     44        _context = null;
     45        _session = null;
     46    }
     47    public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
     48        this();
     49        init(context, session, opts, name);
     50    }
     51   
     52    /** how many streams will we allow at once?  */
     53    public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
     54   
     55    /**
     56     *
     57     */
     58    public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
     59        _context = context;
     60        _session = session;
     61        _log = _context.logManager().getLog(I2PSocketManagerFull.class);
     62       
     63        _maxStreams = -1;
     64        try {
     65            String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
     66            _maxStreams = Integer.parseInt(num);
     67        } catch (NumberFormatException nfe) {
     68            if (_log.shouldLog(Log.WARN))
     69                _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
     70            _maxStreams = -1;
     71        }
     72        _name = name + " " + (++__managerId);
     73        _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
     74        _defaultOptions = new ConnectionOptions(opts);
     75        _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
     76        _serverSocket = new I2PServerSocketFull(this);
     77       
     78        if (_log.shouldLog(Log.INFO)) {
     79            _log.info("Socket manager created.  \ndefault options: " + _defaultOptions
     80                      + "\noriginal properties: " + opts);
     81        }
     82    }
     83
     84    public I2PSocketOptions buildOptions() { return buildOptions(null); }
     85    public I2PSocketOptions buildOptions(Properties opts) {
     86        ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
     87        curOpts.setProperties(opts);
     88        return curOpts;
     89    }
     90   
     91    public I2PSession getSession() {
     92        return _session;
     93    }
     94   
     95    public ConnectionManager getConnectionManager() {
     96        return _connectionManager;
     97    }
     98
     99    public I2PSocket receiveSocket() throws I2PException {
     100        verifySession();
     101        Connection con = _connectionManager.getConnectionHandler().accept(-1);
     102        if (_log.shouldLog(Log.DEBUG))
     103            _log.debug("receiveSocket() called: " + con);
     104        if (con != null) {
     105            I2PSocketFull sock = new I2PSocketFull(con);
     106            con.setSocket(sock);
     107            return sock;
     108        } else {
     109            return null;
     110        }
     111    }
     112   
     113    /**
     114     * Ping the specified peer, returning true if they replied to the ping within
     115     * the timeout specified, false otherwise.  This call blocks.
     116     *
     117     */
     118    public boolean ping(Destination peer, long timeoutMs) {
     119        return _connectionManager.ping(peer, timeoutMs);
     120    }
     121
     122    /**
     123     * How long should we wait for the client to .accept() a socket before
     124     * sending back a NACK/Close? 
     125     *
     126     * @param ms milliseconds to wait, maximum
     127     */
     128    public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
     129    public long getAcceptTimeout() { return _acceptTimeout; }
     130
     131    public void setDefaultOptions(I2PSocketOptions options) {
     132        _defaultOptions = new ConnectionOptions((ConnectionOptions) options);
     133    }
     134
     135    public I2PSocketOptions getDefaultOptions() {
     136        return _defaultOptions;
     137    }
     138
     139    public I2PServerSocket getServerSocket() {
     140        _connectionManager.setAllowIncomingConnections(true);
     141        return _serverSocket;
     142    }
     143
     144    private void verifySession() throws I2PException {
     145        if (!_connectionManager.getSession().isClosed())
     146            return;
     147        _connectionManager.getSession().connect();
     148    }
     149   
     150    /**
     151     * Create a new connected socket (block until the socket is created)
     152     *
     153     * @param peer Destination to connect to
     154     * @param options I2P socket options to be used for connecting
     155     *
     156     * @throws NoRouteToHostException if the peer is not found or not reachable
     157     * @throws I2PException if there is some other I2P-related problem
     158     */
     159    public I2PSocket connect(Destination peer, I2PSocketOptions options)
     160                             throws I2PException, NoRouteToHostException {
     161        verifySession();
     162        if (options == null)
     163            options = _defaultOptions;
     164        ConnectionOptions opts = null;
     165        if (options instanceof ConnectionOptions)
     166            opts = new ConnectionOptions((ConnectionOptions)options);
     167        else
     168            opts = new ConnectionOptions(options);
     169       
     170        if (_log.shouldLog(Log.INFO))
     171            _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
     172                      + " with options: " + opts);
     173        Connection con = _connectionManager.connect(peer, opts);
     174        if (con == null)
     175            throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
     176        I2PSocketFull socket = new I2PSocketFull(con);
     177        con.setSocket(socket);
     178        if (con.getConnectionError() != null) {
     179            con.disconnect(false);
     180            throw new NoRouteToHostException(con.getConnectionError());
     181        }
     182        return socket;
     183    }
     184
     185    /**
     186     * Create a new connected socket (block until the socket is created)
     187     *
     188     * @param peer Destination to connect to
     189     *
     190     * @throws NoRouteToHostException if the peer is not found or not reachable
     191     * @throws I2PException if there is some other I2P-related problem
     192     */
     193    public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
     194        return connect(peer, _defaultOptions);
     195    }
     196
     197    /**
     198     * Destroy the socket manager, freeing all the associated resources.  This
     199     * method will block untill all the managed sockets are closed.
     200     *
     201     */
     202    public void destroySocketManager() {
     203        _connectionManager.disconnectAllHard();
     204        _connectionManager.setAllowIncomingConnections(false);
     205        // should we destroy the _session too?
     206        // yes, since the old lib did (and SAM wants it to, and i dont know why not)
     207        if ( (_session != null) && (!_session.isClosed()) ) {
     208            try {
     209                _session.destroySession();
     210            } catch (I2PSessionException ise) {
     211                _log.warn("Unable to destroy the session", ise);
     212            }
     213        }
     214    }
     215
     216    /**
     217     * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
     218     *
     219     */
     220    public Set listSockets() {
     221        Set connections = _connectionManager.listConnections();
     222        Set rv = new HashSet(connections.size());
     223        for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
     224            Connection con = (Connection)iter.next();
     225            if (con.getSocket() != null)
     226                rv.add(con.getSocket());
     227        }
     228        return rv;
     229    }
     230
     231    public String getName() { return _name; }
     232    public void setName(String name) { _name = name; }
     233   
     234   
     235    public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
     236        _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
     237    }
     238    public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
     239        _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
     240    }
    339241}
  • apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java

    rfa5c721 ree2fd32  
    66 *
    77 */
    8 public class RetransmissionTimer extends SimpleTimer {
     8class RetransmissionTimer extends SimpleTimer {
    99    private static final RetransmissionTimer _instance = new RetransmissionTimer();
    1010    public static final SimpleTimer getInstance() { return _instance; }
  • core/java/src/net/i2p/util/Executor.java

    rfa5c721 ree2fd32  
    66
    77class Executor implements Runnable {
     8    private I2PAppContext _context;
     9    private Log _log;
     10    private List _readyEvents;
     11    public Executor(I2PAppContext ctx, Log log, List events) {
     12        _context = ctx;
     13        _readyEvents = events;
     14    }
     15    public void run() {
     16        while (true) {
     17            SimpleTimer.TimedEvent evt = null;
     18            synchronized (_readyEvents) {
     19                if (_readyEvents.size() <= 0)
     20                    try { _readyEvents.wait(); } catch (InterruptedException ie) {}
     21                if (_readyEvents.size() > 0)
     22                    evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
     23            }
    824
    9         private I2PAppContext _context;
    10         private Log _log;
    11         private List _readyEvents;
    12         private SimpleStore runn;
    13 
    14         public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
    15                 _context = ctx;
    16                 _readyEvents = events;
    17                 runn = x;
    18         }
    19 
    20         public void run() {
    21                 while(runn.getAnswer()) {
    22                         SimpleTimer.TimedEvent evt = null;
    23                         synchronized(_readyEvents) {
    24                                 if(_readyEvents.size() <= 0) {
    25                                         try {
    26                                                 _readyEvents.wait();
    27                                         } catch(InterruptedException ie) {
    28                                         }
    29                                 }
    30                                 if(_readyEvents.size() > 0) {
    31                                         evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
    32                                 }
    33                         }
    34 
    35                         if(evt != null) {
    36                                 long before = _context.clock().now();
    37                                 try {
    38                                         evt.timeReached();
    39                                 } catch(Throwable t) {
    40                                         log("wtf, event borked: " + evt, t);
    41                                 }
    42                                 long time = _context.clock().now() - before;
    43                                 if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
    44                                         _log.warn("wtf, event execution took " + time + ": " + evt);
    45                                 }
    46                         }
    47                 }
    48         }
    49 
    50         /**
    51          *
    52          * @param msg
    53          * @param t
    54          */
    55         private void log(String msg, Throwable t) {
    56                 synchronized(this) {
    57                         if(_log == null) {
    58                                 _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
    59                         }
    60                 }
    61                 _log.log(Log.CRIT, msg, t);
    62         }
     25            if (evt != null) {
     26                long before = _context.clock().now();
     27                try {
     28                    evt.timeReached();
     29                } catch (Throwable t) {
     30                    log("wtf, event borked: " + evt, t);
     31                }
     32                long time = _context.clock().now() - before;
     33                if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
     34                    _log.warn("wtf, event execution took " + time + ": " + evt);
     35            }
     36        }
     37    }
     38   
     39    private void log(String msg, Throwable t) {
     40        synchronized (this) {
     41            if (_log == null)
     42                _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
     43        }
     44        _log.log(Log.CRIT, msg, t);
     45    }
    6346}
  • core/java/src/net/i2p/util/SimpleTimer.java

    rfa5c721 ree2fd32  
    1717 */
    1818public class SimpleTimer {
    19 
    20         private static final SimpleTimer _instance = new SimpleTimer();
    21 
    22         public static SimpleTimer getInstance() {
    23                 return _instance;
    24         }
    25         private I2PAppContext _context;
    26         private Log _log;
    27         /** event time (Long) to event (TimedEvent) mapping */
    28         private TreeMap _events;
    29         /** event (TimedEvent) to event time (Long) mapping */
    30         private Map _eventTimes;
    31         private List _readyEvents;
    32         private SimpleStore runn;
    33 
    34         /**
    35          *
    36          */
    37         protected SimpleTimer() {
    38                 this("SimpleTimer");
    39         }
    40 
    41         /**
    42          *
    43          * @param name
    44          */
    45         protected SimpleTimer(String name) {
    46                 runn = new SimpleStore(true);
    47                 _context = I2PAppContext.getGlobalContext();
    48                 _log = _context.logManager().getLog(SimpleTimer.class);
    49                 _events = new TreeMap();
    50                 _eventTimes = new HashMap(256);
    51                 _readyEvents = new ArrayList(4);
    52                 I2PThread runner = new I2PThread(new SimpleTimerRunner());
    53                 runner.setName(name);
    54                 runner.setDaemon(true);
    55                 runner.start();
    56                 for(int i = 0; i < 3; i++) {
    57                         I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
    58                         executor.setName(name + "Executor " + i);
    59                         executor.setDaemon(true);
    60                         executor.start();
    61                 }
    62         }
    63 
    64         /**
    65          * Removes the SimpleTimer.
    66          */
    67         public void removeSimpleTimer() {
    68                 synchronized(_events) {
    69                         runn.setAnswer(false);
    70                         _events.notifyAll();
    71                 }
    72         }
    73 
    74         /**
    75          *
    76          * @param event
    77          * @param timeoutMs
    78          */
    79         public void reschedule(TimedEvent event, long timeoutMs) {
    80                 addEvent(event, timeoutMs, false);
    81         }
    82 
    83         /**
    84          * Queue up the given event to be fired no sooner than timeoutMs from now.
    85          * However, if this event is already scheduled, the event will be scheduled
    86          * for the earlier of the two timeouts, which may be before this stated
    87          * timeout.  If this is not the desired behavior, call removeEvent first.
    88          *
    89          * @param event
    90          * @param timeoutMs
    91          */
    92         public void addEvent(TimedEvent event, long timeoutMs) {
    93                 addEvent(event, timeoutMs, true);
    94         }
    95 
    96         /**
    97          * @param event
    98          * @param timeoutMs
    99          * @param useEarliestTime if its already scheduled, use the earlier of the
    100          *                        two timeouts, else use the later
    101          */
    102         public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
    103                 int totalEvents = 0;
    104                 long now = System.currentTimeMillis();
    105                 long eventTime = now + timeoutMs;
    106                 Long time = new Long(eventTime);
    107                 synchronized(_events) {
    108                         // remove the old scheduled position, then reinsert it
    109                         Long oldTime = (Long)_eventTimes.get(event);
    110                         if(oldTime != null) {
    111                                 if(useEarliestTime) {
    112                                         if(oldTime.longValue() < eventTime) {
    113                                                 _events.notifyAll();
    114                                                 return; // already scheduled for sooner than requested
    115                                         } else {
    116                                                 _events.remove(oldTime);
    117                                         }
    118                                 } else {
    119                                         if(oldTime.longValue() > eventTime) {
    120                                                 _events.notifyAll();
    121                                                 return; // already scheduled for later than the given period
    122                                         } else {
    123                                                 _events.remove(oldTime);
    124                                         }
    125                                 }
    126                         }
    127                         while(_events.containsKey(time)) {
    128                                 time = new Long(time.longValue() + 1);
    129                         }
    130                         _events.put(time, event);
    131                         _eventTimes.put(event, time);
    132 
    133                         if((_events.size() != _eventTimes.size())) {
    134                                 _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
    135                                 for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) {
    136                                         TimedEvent evt = (TimedEvent)iter.next();
    137                                         Long when = (Long)_eventTimes.get(evt);
    138                                         TimedEvent cur = (TimedEvent)_events.get(when);
    139                                         if(cur != evt) {
    140                                                 _log.error("event " + evt + " @ " + when + ": " + cur);
    141                                         }
    142                                 }
    143                         }
    144 
    145                         totalEvents = _events.size();
    146                         _events.notifyAll();
    147                 }
    148                 if(time.longValue() > eventTime + 100) {
    149                         if(_log.shouldLog(Log.WARN)) {
    150                                 _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")");
    151                         }
    152                 }
    153                 long timeToAdd = System.currentTimeMillis() - now;
    154                 if(timeToAdd > 50) {
    155                         if(_log.shouldLog(Log.WARN)) {
    156                                 _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
    157                         }
    158                 }
    159 
    160         }
    161 
    162         /**
    163          *
    164          * @param evt
    165          * @return
    166          */
    167         public boolean removeEvent(TimedEvent evt) {
    168                 if(evt == null) {
    169                         return false;
    170                 }
    171                 synchronized(_events) {
    172                         Long when = (Long)_eventTimes.remove(evt);
    173                         if(when != null) {
    174                                 _events.remove(when);
    175                         }
    176                         return null != when;
    177                 }
    178         }
    179 
    180         /**
    181          * Simple interface for events to be queued up and notified on expiration
    182          */
    183         public interface TimedEvent {
    184 
    185                 /**
    186                  * the time requested has been reached (this call should NOT block,
    187                  * otherwise the whole SimpleTimer gets backed up)
    188                  *
    189                  */
    190                 public void timeReached();
    191         }
    192         private long _occurredTime;
    193         private long _occurredEventCount;
    194         // not used
    195         //  private TimedEvent _recentEvents[] = new TimedEvent[5];
    196         private class SimpleTimerRunner implements Runnable {
    197 
    198                 public void run() {
    199                         List eventsToFire = new ArrayList(1);
    200                         while(runn.getAnswer()) {
    201                                 try {
    202                                         synchronized(_events) {
    203                                                 //if (_events.size() <= 0)
    204                                                 //    _events.wait();
    205                                                 //if (_events.size() > 100)
    206                                                 //    _log.warn("> 100 events!  " + _events.values());
    207                                                 long now = System.currentTimeMillis();
    208                                                 long nextEventDelay = -1;
    209                                                 Object nextEvent = null;
    210                                                 while(runn.getAnswer()) {
    211                                                         if(_events.size() <= 0) {
    212                                                                 break;
    213                                                         }
    214                                                         Long when = (Long)_events.firstKey();
    215                                                         if(when.longValue() <= now) {
    216                                                                 TimedEvent evt = (TimedEvent)_events.remove(when);
    217                                                                 if(evt != null) {
    218                                                                         _eventTimes.remove(evt);
    219                                                                         eventsToFire.add(evt);
    220                                                                 }
    221                                                         } else {
    222                                                                 nextEventDelay = when.longValue() - now;
    223                                                                 nextEvent = _events.get(when);
    224                                                                 break;
    225                                                         }
    226                                                 }
    227                                                 if(eventsToFire.size() <= 0) {
    228                                                         if(nextEventDelay != -1) {
    229                                                                 if(_log.shouldLog(Log.DEBUG)) {
    230                                                                         _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
    231                                                                 }
    232                                                                 _events.wait(nextEventDelay);
    233                                                         } else {
    234                                                                 _events.wait();
    235                                                         }
    236                                                 }
    237                                         }
    238                                 } catch(InterruptedException ie) {
    239                                         // ignore
    240                                 } catch(Throwable t) {
    241                                         if(_log != null) {
    242                                                 _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
    243                                         } else {
    244                                                 System.err.println("Uncaught exception in SimpleTimer");
    245                                                 t.printStackTrace();
    246                                         }
    247                                 }
    248 
    249                                 long now = System.currentTimeMillis();
    250                                 now = now - (now % 1000);
    251 
    252                                 synchronized(_readyEvents) {
    253                                         for(int i = 0; i < eventsToFire.size(); i++) {
    254                                                 _readyEvents.add(eventsToFire.get(i));
    255                                         }
    256                                         _readyEvents.notifyAll();
    257                                 }
    258 
    259                                 if(_occurredTime == now) {
    260                                         _occurredEventCount += eventsToFire.size();
    261                                 } else {
    262                                         _occurredTime = now;
    263                                         if(_occurredEventCount > 2500) {
    264                                                 StringBuffer buf = new StringBuffer(128);
    265                                                 buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
    266                                                 buf.append(") in a second!");
    267                                                 _log.log(Log.WARN, buf.toString());
    268                                         }
    269                                         _occurredEventCount = 0;
    270                                 }
    271 
    272                                 eventsToFire.clear();
    273                         }
    274                 }
    275         }
     19    private static final SimpleTimer _instance = new SimpleTimer();
     20    public static SimpleTimer getInstance() { return _instance; }
     21    private I2PAppContext _context;
     22    private Log _log;
     23    /** event time (Long) to event (TimedEvent) mapping */
     24    private TreeMap _events;
     25    /** event (TimedEvent) to event time (Long) mapping */
     26    private Map _eventTimes;
     27    private List _readyEvents;
     28   
     29    protected SimpleTimer() { this("SimpleTimer"); }
     30    protected SimpleTimer(String name) {
     31        _context = I2PAppContext.getGlobalContext();
     32        _log = _context.logManager().getLog(SimpleTimer.class);
     33        _events = new TreeMap();
     34        _eventTimes = new HashMap(256);
     35        _readyEvents = new ArrayList(4);
     36        I2PThread runner = new I2PThread(new SimpleTimerRunner());
     37        runner.setName(name);
     38        runner.setDaemon(true);
     39        runner.start();
     40        for (int i = 0; i < 3; i++) {
     41            I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
     42            executor.setName(name + "Executor " + i);
     43            executor.setDaemon(true);
     44            executor.start();
     45        }
     46    }
     47   
     48    public void reschedule(TimedEvent event, long timeoutMs) {
     49        addEvent(event, timeoutMs, false);
     50    }
     51   
     52    /**
     53     * Queue up the given event to be fired no sooner than timeoutMs from now.
     54     * However, if this event is already scheduled, the event will be scheduled
     55     * for the earlier of the two timeouts, which may be before this stated
     56     * timeout.  If this is not the desired behavior, call removeEvent first.
     57     *
     58     */
     59    public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
     60    /**
     61     * @param useEarliestTime if its already scheduled, use the earlier of the
     62     *                        two timeouts, else use the later
     63     */
     64    public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
     65        int totalEvents = 0;
     66        long now = System.currentTimeMillis();
     67        long eventTime = now + timeoutMs;
     68        Long time = new Long(eventTime);
     69        synchronized (_events) {
     70            // remove the old scheduled position, then reinsert it
     71            Long oldTime = (Long)_eventTimes.get(event);
     72            if (oldTime != null) {
     73                if (useEarliestTime) {
     74                    if (oldTime.longValue() < eventTime) {
     75                        _events.notifyAll();
     76                        return; // already scheduled for sooner than requested
     77                    } else {
     78                        _events.remove(oldTime);
     79                    }
     80                } else {
     81                    if (oldTime.longValue() > eventTime) {
     82                        _events.notifyAll();
     83                        return; // already scheduled for later than the given period
     84                    } else {
     85                        _events.remove(oldTime);
     86                    }
     87                }
     88            }
     89            while (_events.containsKey(time))
     90                time = new Long(time.longValue() + 1);
     91            _events.put(time, event);
     92            _eventTimes.put(event, time);
     93           
     94            if ( (_events.size() != _eventTimes.size()) ) {
     95                _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
     96                for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
     97                    TimedEvent evt = (TimedEvent)iter.next();
     98                    Long when = (Long)_eventTimes.get(evt);
     99                    TimedEvent cur = (TimedEvent)_events.get(when);
     100                    if (cur != evt) {
     101                        _log.error("event " + evt + " @ " + when + ": " + cur);
     102                    }
     103                }
     104            }
     105           
     106            totalEvents = _events.size();
     107            _events.notifyAll();
     108        }
     109        if (time.longValue() > eventTime + 100) {
     110            if (_log.shouldLog(Log.WARN))
     111                _log.warn("Lots of timer congestion, had to push " + event + " back "
     112                           + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");
     113        }
     114        long timeToAdd = System.currentTimeMillis() - now;
     115        if (timeToAdd > 50) {
     116            if (_log.shouldLog(Log.WARN))
     117                _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
     118        }
     119           
     120    }
     121   
     122    public boolean removeEvent(TimedEvent evt) {
     123        if (evt == null) return false;
     124        synchronized (_events) {
     125            Long when = (Long)_eventTimes.remove(evt);
     126            if (when != null)
     127                _events.remove(when);
     128            return null != when;
     129        }
     130    }
     131   
     132    /**
     133     * Simple interface for events to be queued up and notified on expiration
     134     */
     135    public interface TimedEvent {
     136        /**
     137         * the time requested has been reached (this call should NOT block,
     138         * otherwise the whole SimpleTimer gets backed up)
     139         *
     140         */
     141        public void timeReached();
     142    }
     143   
     144    private long _occurredTime;
     145    private long _occurredEventCount;
     146    private TimedEvent _recentEvents[] = new TimedEvent[5];
     147   
     148    private class SimpleTimerRunner implements Runnable {
     149        public void run() {
     150            List eventsToFire = new ArrayList(1);
     151            while (true) {
     152                try {
     153                    synchronized (_events) {
     154                        //if (_events.size() <= 0)
     155                        //    _events.wait();
     156                        //if (_events.size() > 100)
     157                        //    _log.warn("> 100 events!  " + _events.values());
     158                        long now = System.currentTimeMillis();
     159                        long nextEventDelay = -1;
     160                        Object nextEvent = null;
     161                        while (true) {
     162                            if (_events.size() <= 0) break;
     163                            Long when = (Long)_events.firstKey();
     164                            if (when.longValue() <= now) {
     165                                TimedEvent evt = (TimedEvent)_events.remove(when);
     166                                if (evt != null) {                           
     167                                    _eventTimes.remove(evt);
     168                                    eventsToFire.add(evt);
     169                                }
     170                            } else {
     171                                nextEventDelay = when.longValue() - now;
     172                                nextEvent = _events.get(when);
     173                                break;
     174                            }
     175                        }
     176                        if (eventsToFire.size() <= 0) {
     177                            if (nextEventDelay != -1) {
     178                                if (_log.shouldLog(Log.DEBUG))
     179                                    _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
     180                                _events.wait(nextEventDelay);
     181                            } else {
     182                                _events.wait();
     183                            }
     184                        }
     185                    }
     186                } catch (ThreadDeath td) {
     187                    return; // die
     188                } catch (InterruptedException ie) {
     189                    // ignore
     190                } catch (Throwable t) {
     191                    if (_log != null) {
     192                        _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
     193                    } else {
     194                        System.err.println("Uncaught exception in SimpleTimer");
     195                        t.printStackTrace();
     196                    }
     197                }
     198               
     199                long now = System.currentTimeMillis();
     200                now = now - (now % 1000);
     201
     202                synchronized (_readyEvents) {
     203                    for (int i = 0; i < eventsToFire.size(); i++)
     204                        _readyEvents.add(eventsToFire.get(i));
     205                    _readyEvents.notifyAll();
     206                }
     207
     208                if (_occurredTime == now) {
     209                    _occurredEventCount += eventsToFire.size();
     210                } else {
     211                    _occurredTime = now;
     212                    if (_occurredEventCount > 2500) {
     213                        StringBuffer buf = new StringBuffer(128);
     214                        buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
     215                        buf.append(") in a second!");
     216                        _log.log(Log.WARN, buf.toString());
     217                    }
     218                    _occurredEventCount = 0;
     219                }
     220
     221                eventsToFire.clear();
     222            }
     223        }
     224    }
    276225}
    277226
Note: See TracChangeset for help on using the changeset viewer.