Changeset fa5c721


Ignore:
Timestamp:
Sep 25, 2008 6:55:04 AM (12 years ago)
Author:
sponge <sponge@…>
Branches:
master
Children:
ee2fd32
Parents:
8d78a77
Message:

Added {get,set}SOTimeout() to the ServerSocket? API,
and fixed all the broken mainstream applications depending on it.
Fixed a grave bug in SimpleTimer?.
Fixed Steraming Timer to be public.
Fixed a pile of JavaDoc? comments, and reformatted the files I touched.

Files:
1 added
12 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java

    r8d78a77 rfa5c721  
    1313import java.net.Socket;
    1414import java.net.SocketException;
     15import java.net.SocketTimeoutException;
    1516import java.util.Iterator;
    1617import java.util.Properties;
     
    220221                        _log.error("Error accepting", ce);
    221222                    // not killing the server..
    222                 }
     223                } catch(SocketTimeoutException ste) {
     224                        // ignored, we never set the timeout
     225                }
    223226            }
    224227        }
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java

    r8d78a77 rfa5c721  
    33import java.net.ConnectException;
    44
     5import java.net.SocketTimeoutException;
    56import net.i2p.I2PException;
    67
     
    1011 */
    1112public interface I2PServerSocket {
    12     /**
    13      * Closes the socket.
    14      */
    15     public void close() throws I2PException;
    1613
    17     /**
    18      * Waits for the next socket connecting.  If a remote user tried to make a
    19      * connection and the local application wasn't .accept()ing new connections,
    20      * they should get refused (if .accept() doesnt occur in some small period)
    21      *
    22      * @return a connected I2PSocket
    23      *
    24      * @throws I2PException if there is a problem with reading a new socket
    25      *         from the data available (aka the I2PSession closed, etc)
    26      * @throws ConnectException if the I2PServerSocket is closed
    27      */
    28     public I2PSocket accept() throws I2PException, ConnectException;
     14        /**
     15         * Closes the socket.
     16         */
     17        public void close() throws I2PException;
    2918
    30     /**
    31      * Access the manager which is coordinating the server socket
    32      */
    33     public I2PSocketManager getManager();
     19        /**
     20         * Waits for the next socket connecting.  If a remote user tried to make a
     21         * connection and the local application wasn't .accept()ing new connections,
     22         * they should get refused (if .accept() doesnt occur in some small period)
     23         *
     24         * @return a connected I2PSocket
     25         *
     26         * @throws I2PException if there is a problem with reading a new socket
     27         *         from the data available (aka the I2PSession closed, etc)
     28         * @throws ConnectException if the I2PServerSocket is closed
     29         * @throws SocketTimeoutException
     30         */
     31        public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
     32
     33        /**
     34         * Set Sock Option accept timeout
     35         * @param x
     36         */
     37        public void setSoTimeout(long x);
     38
     39        /**
     40         * Get Sock Option accept timeout
     41         * @return timeout
     42         */
     43        public long getSoTimeout();
     44
     45        /**
     46         * Access the manager which is coordinating the server socket
     47         */
     48        public I2PSocketManager getManager();
    3449}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    r8d78a77 rfa5c721  
    1818 */
    1919class I2PServerSocketImpl implements I2PServerSocket {
    20     private final static Log _log = new Log(I2PServerSocketImpl.class);
    21     private I2PSocketManager mgr;
    22     /** list of sockets waiting for the client to accept them */
    23     private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
    24    
    25     /** have we been closed */
    26     private volatile boolean closing = false;
    27    
    28     /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
    29     private Object socketAcceptedLock = new Object();
    30     /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
    31     private Object socketAddedLock = new Object();
    32    
    33     public I2PServerSocketImpl(I2PSocketManager mgr) {
    34         this.mgr = mgr;
    35     }
    36    
    37     /**
    38      * Waits for the next socket connecting.  If a remote user tried to make a
    39      * connection and the local application wasn't .accept()ing new connections,
    40      * they should get refused (if .accept() doesnt occur in some small period -
    41      * currently 5 seconds)
    42      *
    43      * @return a connected I2PSocket
    44      *
    45      * @throws I2PException if there is a problem with reading a new socket
    46      *         from the data available (aka the I2PSession closed, etc)
    47      * @throws ConnectException if the I2PServerSocket is closed
    48      */
    49     public I2PSocket accept() throws I2PException, ConnectException {
    50         if (_log.shouldLog(Log.DEBUG))
    51             _log.debug("accept() called, pending: " + pendingSockets.size());
    52        
    53         I2PSocket ret = null;
    54        
    55         while ( (ret == null) && (!closing) ){
    56             while (pendingSockets.size() <= 0) {
    57                 if (closing) throw new ConnectException("I2PServerSocket closed");
    58                 try {
    59                     synchronized(socketAddedLock) {
    60                         socketAddedLock.wait();
    61                     }
    62                 } catch (InterruptedException ie) {}
    63             }
    64             synchronized (pendingSockets) {
    65                 if (pendingSockets.size() > 0) {
    66                     ret = (I2PSocket)pendingSockets.remove(0);
    67                 }
    68             }
    69             if (ret != null) {
    70                 synchronized (socketAcceptedLock) {
    71                     socketAcceptedLock.notifyAll();
    72                 }
    73             }
    74         }
    75        
    76         if (_log.shouldLog(Log.DEBUG))
    77             _log.debug("TIMING: handed out accept result " + ret.hashCode());
    78         return ret;
    79     }
    80    
    81     /**
    82      * Make the socket available and wait until the client app accepts it, or until
    83      * the given timeout elapses.  This doesn't have any limits on the queue size -
    84      * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
    85      *
    86      * @param timeoutMs how long to wait until accept
    87      * @return true if the socket was accepted, false if the timeout expired
    88      *         or the socket was closed
    89      */
    90     public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
    91         if (_log.shouldLog(Log.DEBUG))
    92             _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
    93        
    94         if (closing) {
    95             if (_log.shouldLog(Log.WARN))
    96                 _log.warn("Already closing the socket");
    97             return false;
    98         }
    99        
    100         Clock clock = I2PAppContext.getGlobalContext().clock();
    101         long start = clock.now();
    102         long end = start + timeoutMs;
    103         pendingSockets.add(s);
    104         synchronized (socketAddedLock) {
    105             socketAddedLock.notifyAll();
    106         }
    107        
    108         // keep looping until the socket has been grabbed by the accept()
    109         // (or the expiration passes, or the socket is closed)
    110         while (pendingSockets.contains(s)) {
    111             long now = clock.now();
    112             if (now >= end) {
    113                 if (_log.shouldLog(Log.INFO))
    114                     _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
    115                 pendingSockets.remove(s);
    116                 return false;
    117             }
    118             if (closing) {
    119                 if (_log.shouldLog(Log.WARN))
    120                     _log.warn("Server socket closed while waiting for accept");
    121                 pendingSockets.remove(s);
    122                 return false;
    123             }
    124             long remaining = end - now;
    125             try {
    126                 synchronized (socketAcceptedLock) {
    127                     socketAcceptedLock.wait(remaining);
    128                 }
    129             } catch (InterruptedException ie) {}
    130         }
    131         long now = clock.now();
    132         if (_log.shouldLog(Log.DEBUG))
    133             _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
    134         return true;
    135     }
    136    
    137     public void close() {
    138         closing = true;
    139         // let anyone .accept()ing know to fsck off
    140         synchronized (socketAddedLock) {
    141             socketAddedLock.notifyAll();
    142         }
    143         // let anyone addWaitForAccept()ing know to fsck off
    144         synchronized (socketAcceptedLock) {
    145             socketAcceptedLock.notifyAll();
    146         }
    147     }
    148    
    149     public I2PSocketManager getManager() { return mgr; }
     20
     21        private final static Log _log = new Log(I2PServerSocketImpl.class);
     22        private I2PSocketManager mgr;
     23        /** list of sockets waiting for the client to accept them */
     24        private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
     25        /** have we been closed */
     26        private volatile boolean closing = false;
     27        /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
     28        private Object socketAcceptedLock = new Object();
     29        /** lock on this when adding a new socket to the pending list, and wait on it accordingly */
     30        private Object socketAddedLock = new Object();
     31
     32        /**
     33         * Set Sock Option accept timeout stub, does nothing
     34         * @param x
     35         */
     36        public void setSoTimeout(long x) {
     37        }
     38
     39        /**
     40         * Get Sock Option accept timeout stub, does nothing
     41         * @return timeout
     42         */
     43        public long getSoTimeout() {
     44                return -1;
     45        }
     46
     47        public I2PServerSocketImpl(I2PSocketManager mgr) {
     48                this.mgr = mgr;
     49        }
     50
     51        /**
     52         * Waits for the next socket connecting.  If a remote user tried to make a
     53         * connection and the local application wasn't .accept()ing new connections,
     54         * they should get refused (if .accept() doesnt occur in some small period -
     55         * currently 5 seconds)
     56         *
     57         * @return a connected I2PSocket
     58         *
     59         * @throws I2PException if there is a problem with reading a new socket
     60         *         from the data available (aka the I2PSession closed, etc)
     61         * @throws ConnectException if the I2PServerSocket is closed
     62         */
     63        public I2PSocket accept() throws I2PException, ConnectException {
     64                if(_log.shouldLog(Log.DEBUG)) {
     65                        _log.debug("accept() called, pending: " + pendingSockets.size());
     66                }
     67                I2PSocket ret = null;
     68
     69                while((ret == null) && (!closing)) {
     70                        while(pendingSockets.size() <= 0) {
     71                                if(closing) {
     72                                        throw new ConnectException("I2PServerSocket closed");
     73                                }
     74                                try {
     75                                        synchronized(socketAddedLock) {
     76                                                socketAddedLock.wait();
     77                                        }
     78                                } catch(InterruptedException ie) {
     79                                }
     80                        }
     81                        synchronized(pendingSockets) {
     82                                if(pendingSockets.size() > 0) {
     83                                        ret = (I2PSocket)pendingSockets.remove(0);
     84                                }
     85                        }
     86                        if(ret != null) {
     87                                synchronized(socketAcceptedLock) {
     88                                        socketAcceptedLock.notifyAll();
     89                                }
     90                        }
     91                }
     92
     93                if(_log.shouldLog(Log.DEBUG)) {
     94                        _log.debug("TIMING: handed out accept result " + ret.hashCode());
     95                }
     96                return ret;
     97        }
     98
     99        /**
     100         * Make the socket available and wait until the client app accepts it, or until
     101         * the given timeout elapses.  This doesn't have any limits on the queue size -
     102         * perhaps it should add some choking (e.g. after 5 waiting for accept, refuse)
     103         *
     104         * @param timeoutMs how long to wait until accept
     105         * @return true if the socket was accepted, false if the timeout expired
     106         *         or the socket was closed
     107         */
     108        public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
     109                if(_log.shouldLog(Log.DEBUG)) {
     110                        _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
     111                }
     112                if(closing) {
     113                        if(_log.shouldLog(Log.WARN)) {
     114                                _log.warn("Already closing the socket");
     115                        }
     116                        return false;
     117                }
     118
     119                Clock clock = I2PAppContext.getGlobalContext().clock();
     120                long start = clock.now();
     121                long end = start + timeoutMs;
     122                pendingSockets.add(s);
     123                synchronized(socketAddedLock) {
     124                        socketAddedLock.notifyAll();
     125                }
     126
     127                // keep looping until the socket has been grabbed by the accept()
     128                // (or the expiration passes, or the socket is closed)
     129                while(pendingSockets.contains(s)) {
     130                        long now = clock.now();
     131                        if(now >= end) {
     132                                if(_log.shouldLog(Log.INFO)) {
     133                                        _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
     134                                }
     135                                pendingSockets.remove(s);
     136                                return false;
     137                        }
     138                        if(closing) {
     139                                if(_log.shouldLog(Log.WARN)) {
     140                                        _log.warn("Server socket closed while waiting for accept");
     141                                }
     142                                pendingSockets.remove(s);
     143                                return false;
     144                        }
     145                        long remaining = end - now;
     146                        try {
     147                                synchronized(socketAcceptedLock) {
     148                                        socketAcceptedLock.wait(remaining);
     149                                }
     150                        } catch(InterruptedException ie) {
     151                        }
     152                }
     153                long now = clock.now();
     154                if(_log.shouldLog(Log.DEBUG)) {
     155                        _log.info("Socket accepted after " + (now - start) + "ms for socket " + s.toString());
     156                }
     157                return true;
     158        }
     159
     160        public void close() {
     161                closing = true;
     162                // let anyone .accept()ing know to fsck off
     163                synchronized(socketAddedLock) {
     164                        socketAddedLock.notifyAll();
     165                }
     166                // let anyone addWaitForAccept()ing know to fsck off
     167                synchronized(socketAcceptedLock) {
     168                        socketAcceptedLock.notifyAll();
     169                }
     170        }
     171
     172        public I2PSocketManager getManager() {
     173                return mgr;
     174        }
    150175}
  • apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java

    r8d78a77 rfa5c721  
    66import java.io.InputStream;
    77import java.net.ConnectException;
     8import java.net.SocketTimeoutException;
    89import java.util.Properties;
    910
     
    2122 */
    2223public class StreamSinkServer {
    23     private Log _log;
    24     private String _sinkDir;
    25     private String _destFile;
    26     private String _i2cpHost;
    27     private int _i2cpPort;
    28     private int _handlers;
    29    
    30     /**
    31      * Create but do not start the streaming server. 
    32      *
    33      * @param sinkDir Directory to store received files in
    34      * @param ourDestFile filename to write our binary destination to
    35      */
    36     public StreamSinkServer(String sinkDir, String ourDestFile) {
    37         this(sinkDir, ourDestFile, null, -1, 3);
    38     }
    39     public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
    40         _sinkDir = sinkDir;
    41         _destFile = ourDestFile;
    42         _i2cpHost = i2cpHost;
    43         _i2cpPort = i2cpPort;
    44         _handlers = handlers;
    45         _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
    46     }
    47    
    48     /**
    49      * Actually fire up the server - this call blocks forever (or until the server
    50      * socket closes)
    51      *
    52      */
    53     public void runServer() {
    54         I2PSocketManager mgr = null;
    55         if (_i2cpHost != null)
    56             mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
    57         else
    58             mgr = I2PSocketManagerFactory.createManager();
    59         Destination dest = mgr.getSession().getMyDestination();
    60         if (_log.shouldLog(Log.INFO))
    61             _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
    62         FileOutputStream fos = null;
    63         try {
    64             fos = new FileOutputStream(_destFile);
    65             dest.writeBytes(fos);
    66         } catch (IOException ioe) {
    67             _log.error("Error writing out our destination to " + _destFile, ioe);
    68             return;
    69         } catch (DataFormatException dfe) {
    70             _log.error("Error formatting the destination", dfe);
    71             return;
    72         } finally {
    73             if (fos != null) try { fos.close(); } catch (IOException ioe) {}
    74         }
    75        
    76         I2PServerSocket sock = mgr.getServerSocket();
    77         startup(sock);
    78     }
    79    
    80     public void startup(I2PServerSocket sock) {
    81         for (int i = 0; i < _handlers; i++) {
    82             I2PThread t = new I2PThread(new ClientRunner(sock));
    83             t.setName("Handler " + i);
    84             t.setDaemon(false);
    85             t.start();
    86         }
    87     }
    88    
    89     /**
    90      * Actually deal with a client - pull anything they send us and write it to a file.
    91      *
    92      */
    93     private class ClientRunner implements Runnable {
    94         private I2PServerSocket _socket;
    95         public ClientRunner(I2PServerSocket socket) {
    96             _socket = socket;
    97         }
    98         public void run() {
    99             while (true) {
    100                 try {
    101                     I2PSocket socket = _socket.accept();
    102                     if (socket != null)
    103                         handle(socket);
    104                 } catch (I2PException ie) {
    105                     _log.error("Error accepting connection", ie);
    106                     return;
    107                 } catch (ConnectException ce) {
    108                     _log.error("Connection already dropped", ce);
    109                     return;
    110                 }       
    111             }
    112         }
    113        
    114         private void handle(I2PSocket sock) {
    115             FileOutputStream fos = null;
    116             try {
    117                 File sink = new File(_sinkDir);
    118                 if (!sink.exists())
    119                     sink.mkdirs();
    120                 File cur = File.createTempFile("clientSink", ".dat", sink);
    121                 fos = new FileOutputStream(cur);
    122                 if (_log.shouldLog(Log.DEBUG))
    123                     _log.debug("Writing to " + cur.getAbsolutePath());
    124             } catch (IOException ioe) {
    125                 _log.error("Error creating sink", ioe);
    126                 return;
    127             }
    128            
    129             long start = System.currentTimeMillis();
    130             try {
    131                 InputStream in = sock.getInputStream();
    132                 byte buf[] = new byte[4096];
    133                 long written = 0;
    134                 int read = 0;
    135                 while ( (read = in.read(buf)) != -1) {
    136                     //_fos.write(buf, 0, read);
    137                     written += read;
    138                     if (_log.shouldLog(Log.DEBUG))
    139                         _log.debug("read and wrote " + read + " (" + written + ")");
    140                 }
    141                 fos.write(("written: [" + written + "]\n").getBytes());
    142                 long lifetime = System.currentTimeMillis() - start;
    143                 _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
    144             } catch (IOException ioe) {
    145                 _log.error("Error writing the sink", ioe);
    146             } finally {
    147                 if (fos != null) try { fos.close(); } catch (IOException ioe) {}
    148                 if (sock != null) try { sock.close(); } catch (IOException ioe) {}
    149                 _log.debug("Client socket closed");
    150             }
    151         }
    152     }
    153    
    154     /**
    155      * Fire up the streaming server.  <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
    156      * <ul>
    157      *  <li><b>sinkDir</b>: Directory to store received files in</li>
    158      *  <li><b>ourDestFile</b>: filename to write our binary destination to</li>
    159      *  <li><b>numHandlers</b>: how many concurrent connections to handle</li>
    160      * </ul>
    161      */
    162     public static void main(String args[]) {
    163         StreamSinkServer server = null;
    164         switch (args.length) {
    165             case 0:
    166                 server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
    167                 break;
    168             case 2:
    169                 server = new StreamSinkServer(args[0], args[1]);
    170                 break;
    171             case 4:
    172             case 5:
    173                 int handlers = 3;
    174                 if (args.length == 5) {
    175                     try {
    176                         handlers = Integer.parseInt(args[4]);
    177                     } catch (NumberFormatException nfe) {}
    178                 }
    179                 try {
    180                     int port = Integer.parseInt(args[1]);
    181                     server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
    182                 } catch (NumberFormatException nfe) {
    183                     System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
    184                 }
    185                 break;
    186             default:
    187                 System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
    188         }
    189         if (server != null)
    190             server.runServer();
    191     }
     24
     25        private Log _log;
     26        private String _sinkDir;
     27        private String _destFile;
     28        private String _i2cpHost;
     29        private int _i2cpPort;
     30        private int _handlers;
     31
     32        /**
     33         * Create but do not start the streaming server. 
     34         *
     35         * @param sinkDir Directory to store received files in
     36         * @param ourDestFile filename to write our binary destination to
     37         */
     38        public StreamSinkServer(String sinkDir, String ourDestFile) {
     39                this(sinkDir, ourDestFile, null, -1, 3);
     40        }
     41
     42        public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
     43                _sinkDir = sinkDir;
     44                _destFile = ourDestFile;
     45                _i2cpHost = i2cpHost;
     46                _i2cpPort = i2cpPort;
     47                _handlers = handlers;
     48                _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class);
     49        }
     50
     51        /**
     52         * Actually fire up the server - this call blocks forever (or until the server
     53         * socket closes)
     54         *
     55         */
     56        public void runServer() {
     57                I2PSocketManager mgr = null;
     58                if(_i2cpHost != null) {
     59                        mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
     60                } else {
     61                        mgr = I2PSocketManagerFactory.createManager();
     62                }
     63                Destination dest = mgr.getSession().getMyDestination();
     64                if(_log.shouldLog(Log.INFO)) {
     65                        _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
     66                }
     67                FileOutputStream fos = null;
     68                try {
     69                        fos = new FileOutputStream(_destFile);
     70                        dest.writeBytes(fos);
     71                } catch(IOException ioe) {
     72                        _log.error("Error writing out our destination to " + _destFile, ioe);
     73                        return;
     74                } catch(DataFormatException dfe) {
     75                        _log.error("Error formatting the destination", dfe);
     76                        return;
     77                } finally {
     78                        if(fos != null) {
     79                                try {
     80                                        fos.close();
     81                                } catch(IOException ioe) {
     82                                }
     83                        }
     84                }
     85
     86                I2PServerSocket sock = mgr.getServerSocket();
     87                startup(sock);
     88        }
     89
     90        public void startup(I2PServerSocket sock) {
     91                for(int i = 0; i < _handlers; i++) {
     92                        I2PThread t = new I2PThread(new ClientRunner(sock));
     93                        t.setName("Handler " + i);
     94                        t.setDaemon(false);
     95                        t.start();
     96                }
     97        }
     98
     99        /**
     100         * Actually deal with a client - pull anything they send us and write it to a file.
     101         *
     102         */
     103        private class ClientRunner implements Runnable {
     104
     105                private I2PServerSocket _socket;
     106
     107                public ClientRunner(I2PServerSocket socket) {
     108                        _socket = socket;
     109                }
     110
     111                public void run() {
     112                        while(true) {
     113                                try {
     114                                        I2PSocket socket = _socket.accept();
     115                                        if(socket != null) {
     116                                                handle(socket);
     117                                        }
     118                                } catch(I2PException ie) {
     119                                        _log.error("Error accepting connection", ie);
     120                                        return;
     121                                } catch(ConnectException ce) {
     122                                        _log.error("Connection already dropped", ce);
     123                                        return;
     124                                } catch(SocketTimeoutException ste) {
     125                                        // ignored
     126                                }
     127                        }
     128                }
     129
     130                private void handle(I2PSocket sock) {
     131                        FileOutputStream fos = null;
     132                        try {
     133                                File sink = new File(_sinkDir);
     134                                if(!sink.exists()) {
     135                                        sink.mkdirs();
     136                                }
     137                                File cur = File.createTempFile("clientSink", ".dat", sink);
     138                                fos = new FileOutputStream(cur);
     139                                if(_log.shouldLog(Log.DEBUG)) {
     140                                        _log.debug("Writing to " + cur.getAbsolutePath());
     141                                }
     142                        } catch(IOException ioe) {
     143                                _log.error("Error creating sink", ioe);
     144                                return;
     145                        }
     146
     147                        long start = System.currentTimeMillis();
     148                        try {
     149                                InputStream in = sock.getInputStream();
     150                                byte buf[] = new byte[4096];
     151                                long written = 0;
     152                                int read = 0;
     153                                while((read = in.read(buf)) != -1) {
     154                                        //_fos.write(buf, 0, read);
     155                                        written += read;
     156                                        if(_log.shouldLog(Log.DEBUG)) {
     157                                                _log.debug("read and wrote " + read + " (" + written + ")");
     158                                        }
     159                                }
     160                                fos.write(("written: [" + written + "]\n").getBytes());
     161                                long lifetime = System.currentTimeMillis() - start;
     162                                _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]");
     163                        } catch(IOException ioe) {
     164                                _log.error("Error writing the sink", ioe);
     165                        } finally {
     166                                if(fos != null) {
     167                                        try {
     168                                                fos.close();
     169                                        } catch(IOException ioe) {
     170                                        }
     171                                }
     172                                if(sock != null) {
     173                                        try {
     174                                                sock.close();
     175                                        } catch(IOException ioe) {
     176                                        }
     177                                }
     178                                _log.debug("Client socket closed");
     179                        }
     180                }
     181        }
     182
     183        /**
     184         * Fire up the streaming server.  <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br />
     185         * <ul>
     186         *  <li><b>sinkDir</b>: Directory to store received files in</li>
     187         *  <li><b>ourDestFile</b>: filename to write our binary destination to</li>
     188         *  <li><b>numHandlers</b>: how many concurrent connections to handle</li>
     189         * </ul>
     190         */
     191        public static void main(String args[]) {
     192                StreamSinkServer server = null;
     193                switch(args.length) {
     194                        case 0:
     195                                server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3);
     196                                break;
     197                        case 2:
     198                                server = new StreamSinkServer(args[0], args[1]);
     199                                break;
     200                        case 4:
     201                        case 5:
     202                                int handlers = 3;
     203                                if(args.length == 5) {
     204                                        try {
     205                                                handlers = Integer.parseInt(args[4]);
     206                                        } catch(NumberFormatException nfe) {
     207                                        }
     208                                }
     209                                try {
     210                                        int port = Integer.parseInt(args[1]);
     211                                        server = new StreamSinkServer(args[2], args[3], args[0], port, handlers);
     212                                } catch(NumberFormatException nfe) {
     213                                        System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
     214                                }
     215                                break;
     216                        default:
     217                                System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
     218                }
     219                if(server != null) {
     220                        server.runServer();
     221                }
     222        }
    192223}
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java

    r8d78a77 rfa5c721  
    11package net.i2p.client.streaming;
    22
     3import java.net.SocketTimeoutException;
    34import java.util.ArrayList;
    45import java.util.List;
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    r8d78a77 rfa5c721  
    2222 */
    2323public class ConnectionManager {
    24     private I2PAppContext _context;
    25     private Log _log;
    26     private I2PSession _session;
    27     private MessageHandler _messageHandler;
    28     private PacketHandler _packetHandler;
    29     private ConnectionHandler _connectionHandler;
    30     private PacketQueue _outboundQueue;
    31     private SchedulerChooser _schedulerChooser;
    32     private ConnectionPacketHandler _conPacketHandler;
    33     /** Inbound stream ID (Long) to Connection map */
    34     private Map _connectionByInboundId;
    35     /** Ping ID (Long) to PingRequest */
    36     private Map _pendingPings;
    37     private boolean _allowIncoming;
    38     private int _maxConcurrentStreams;
    39     private ConnectionOptions _defaultOptions;
    40     private volatile int _numWaiting;
    41     private Object _connectionLock;
    42    
    43     public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
    44         _context = context;
    45         _log = context.logManager().getLog(ConnectionManager.class);
    46         _connectionByInboundId = new HashMap(32);
    47         _pendingPings = new HashMap(4);
    48         _connectionLock = new Object();
    49         _messageHandler = new MessageHandler(context, this);
    50         _packetHandler = new PacketHandler(context, this);
    51         _connectionHandler = new ConnectionHandler(context, this);
    52         _schedulerChooser = new SchedulerChooser(context);
    53         _conPacketHandler = new ConnectionPacketHandler(context);
    54         _session = session;
    55         session.setSessionListener(_messageHandler);
    56         _outboundQueue = new PacketQueue(context, session, this);
    57         _allowIncoming = false;
    58         _maxConcurrentStreams = maxConcurrent;
    59         _defaultOptions = defaultOptions;
    60         _numWaiting = 0;
    61         _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    62         _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    63         _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    64         _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    65         _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    66         _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    67         _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    68         _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    69         _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    70         _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    71     }
    72    
    73     Connection getConnectionByInboundId(long id) {
    74         synchronized (_connectionLock) {
    75             return (Connection)_connectionByInboundId.get(new Long(id));
    76         }
    77     }
    78     /**
    79      * not guaranteed to be unique, but in case we receive more than one packet
    80      * on an inbound connection that we havent ack'ed yet...
    81      */
    82     Connection getConnectionByOutboundId(long id) {
    83         synchronized (_connectionLock) {
    84             for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    85                 Connection con = (Connection)iter.next();
    86                 if (DataHelper.eq(con.getSendStreamId(), id))
    87                     return con;
    88             }
    89         }
    90         return null;
    91     }
    92    
    93     public void setAllowIncomingConnections(boolean allow) {
    94         _connectionHandler.setActive(allow);
    95     }
    96     /** should we acceot connections, or just reject everyone? */
    97     public boolean getAllowIncomingConnections() {
    98         return _connectionHandler.getActive();
    99     }
    100    
    101     /**
    102      * Create a new connection based on the SYN packet we received.
    103      *
    104      * @return created Connection with the packet's data already delivered to
    105      *         it, or null if the syn's streamId was already taken
    106      */
    107     public Connection receiveConnection(Packet synPacket) {
    108         Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
    109         long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
    110         boolean reject = false;
    111         int active = 0;
    112         int total = 0;
    113         synchronized (_connectionLock) {
    114             total = _connectionByInboundId.size();
    115             for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    116                 if ( ((Connection)iter.next()).getIsConnected() )
    117                     active++;
    118             }
    119             if (locked_tooManyStreams()) {
    120                 reject = true;
    121             } else {
    122                 while (true) {
    123                     Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
    124                     if (oldCon == null) {
    125                         break;
    126                     } else {
    127                         _connectionByInboundId.put(new Long(receiveId), oldCon);
    128                         // receiveId already taken, try another
    129                         receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
    130                     }
    131                 }
    132             }
    133         }
    134        
    135         _context.statManager().addRateData("stream.receiveActive", active, total);
    136        
    137         if (reject) {
    138             if (_log.shouldLog(Log.WARN))
    139                 _log.warn("Refusing connection since we have exceeded our max of "
    140                           + _maxConcurrentStreams + " connections");
    141             PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
    142             reply.setFlag(Packet.FLAG_RESET);
    143             reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
    144             reply.setAckThrough(synPacket.getSequenceNum());
    145             reply.setSendStreamId(synPacket.getReceiveStreamId());
    146             reply.setReceiveStreamId(0);
    147             reply.setOptionalFrom(_session.getMyDestination());
    148             // this just sends the packet - no retries or whatnot
    149             _outboundQueue.enqueue(reply);
    150             return null;
    151         }
    152        
    153         con.setReceiveStreamId(receiveId);
    154         try {
    155             con.getPacketHandler().receivePacket(synPacket, con);
    156         } catch (I2PException ie) {
    157             synchronized (_connectionLock) {
    158                 _connectionByInboundId.remove(new Long(receiveId));
    159             }
    160             return null;
    161         }
    162        
    163         _context.statManager().addRateData("stream.connectionReceived", 1, 0);
    164         return con;
    165     }
    166    
    167     private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
    168    
    169     /**
    170      * Build a new connection to the given peer.  This blocks if there is no
    171      * connection delay, otherwise it returns immediately.
    172      *
    173      * @return new connection, or null if we have exceeded our limit
    174      */
    175     public Connection connect(Destination peer, ConnectionOptions opts) {
    176         Connection con = null;
    177         long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
    178         long expiration = _context.clock().now() + opts.getConnectTimeout();
    179         if (opts.getConnectTimeout() <= 0)
    180             expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
    181         _numWaiting++;
    182         while (true) {
    183             long remaining = expiration - _context.clock().now();
    184             if (remaining <= 0) {
    185                 if (_log.shouldLog(Log.WARN))
    186                 _log.warn("Refusing to connect since we have exceeded our max of "
    187                           + _maxConcurrentStreams + " connections");
    188                 _numWaiting--;
    189                 return null;
    190             }
    191             boolean reject = false;
    192             synchronized (_connectionLock) {
    193                 if (locked_tooManyStreams()) {
    194                     // allow a full buffer of pending/waiting streams
    195                     if (_numWaiting > _maxConcurrentStreams) {
    196                         if (_log.shouldLog(Log.WARN))
    197                             _log.warn("Refusing connection since we have exceeded our max of "
    198                                       + _maxConcurrentStreams + " and there are " + _numWaiting
    199                                       + " waiting already");
    200                         _numWaiting--;
    201                         return null;
    202                     }
    203                    
    204                     // no remaining streams, lets wait a bit
    205                     try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
    206                 } else {
    207                     con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
    208                     con.setRemotePeer(peer);
    209            
    210                     while (_connectionByInboundId.containsKey(new Long(receiveId))) {
    211                         receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
    212                     }
    213                     _connectionByInboundId.put(new Long(receiveId), con);
    214                     break; // stop looping as a psuedo-wait
    215                 }
    216             }
    217         }
    218 
    219         // ok we're in...
    220         con.setReceiveStreamId(receiveId);       
    221         con.eventOccurred();
    222        
    223         _log.debug("Connect() conDelay = " + opts.getConnectDelay());
    224         if (opts.getConnectDelay() <= 0) {
    225             con.waitForConnect();
    226         }
    227         if (_numWaiting > 0)
    228             _numWaiting--;
    229        
    230         _context.statManager().addRateData("stream.connectionCreated", 1, 0);
    231         return con;
    232     }
    233 
    234     private boolean locked_tooManyStreams() {
    235         if (_maxConcurrentStreams <= 0) return false;
    236         if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
    237         int active = 0;
    238         for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    239             Connection con = (Connection)iter.next();
    240             if (con.getIsConnected())
    241                 active++;
    242         }
    243        
    244         if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
    245             _log.info("More than 100 connections!  " + active
    246                       + " total: " + _connectionByInboundId.size());
    247 
    248         return (active >= _maxConcurrentStreams);
    249     }
    250    
    251     public MessageHandler getMessageHandler() { return _messageHandler; }
    252     public PacketHandler getPacketHandler() { return _packetHandler; }
    253     public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
    254     public I2PSession getSession() { return _session; }
    255     public PacketQueue getPacketQueue() { return _outboundQueue; }
    256    
    257     /**
    258      * Something b0rked hard, so kill all of our connections without mercy.
    259      * Don't bother sending close packets.
    260      *
    261      */
    262     public void disconnectAllHard() {
    263         synchronized (_connectionLock) {
    264             for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    265                 Connection con = (Connection)iter.next();
    266                 con.disconnect(false, false);
    267             }
    268             _connectionByInboundId.clear();
    269             _connectionLock.notifyAll();
    270         }
    271     }
    272    
    273     /**
    274      * Drop the (already closed) connection on the floor.
    275      *
    276      */
    277     public void removeConnection(Connection con) {
    278         boolean removed = false;
    279         synchronized (_connectionLock) {
    280             Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
    281             removed = (o == con);
    282             if (_log.shouldLog(Log.DEBUG))
    283                 _log.debug("Connection removed? " + removed + " remaining: "
    284                            + _connectionByInboundId.size() + ": " + con);
    285             if (!removed && _log.shouldLog(Log.DEBUG))
    286                 _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
    287             _connectionLock.notifyAll();
    288         }
    289         if (removed) {
    290             _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
    291             _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
    292             _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
    293             _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
    294             _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
    295             _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
    296             _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
    297             _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
    298             _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
    299         }
    300     }
    301    
    302     /** return a set of Connection objects */
    303     public Set listConnections() {
    304         synchronized (_connectionLock) {
    305             return new HashSet(_connectionByInboundId.values());
    306         }
    307     }
    308    
    309     public boolean ping(Destination peer, long timeoutMs) {
    310         return ping(peer, timeoutMs, true);
    311     }
    312     public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
    313         return ping(peer, timeoutMs, blocking, null, null, null);
    314     }
    315     public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
    316         Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
    317         PacketLocal packet = new PacketLocal(_context, peer);
    318         packet.setSendStreamId(id.longValue());
    319         packet.setFlag(Packet.FLAG_ECHO);
    320         packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
    321         packet.setOptionalFrom(_session.getMyDestination());
    322         if ( (keyToUse != null) && (tagsToSend != null) ) {
    323             packet.setKeyUsed(keyToUse);
    324             packet.setTagsSent(tagsToSend);
    325         }
    326        
    327         PingRequest req = new PingRequest(peer, packet, notifier);
    328        
    329         synchronized (_pendingPings) {
    330             _pendingPings.put(id, req);
    331         }
    332        
    333         _outboundQueue.enqueue(packet);
    334         packet.releasePayload();
    335        
    336         if (blocking) {
    337             synchronized (req) {
    338                 if (!req.pongReceived())
    339                     try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
    340             }
    341            
    342             synchronized (_pendingPings) {
    343                 _pendingPings.remove(id);
    344             }
    345         } else {
    346             SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
    347         }
    348        
    349         boolean ok = req.pongReceived();
    350         return ok;
    351     }
    352 
    353     interface PingNotifier {
    354         public void pingComplete(boolean ok);
    355     }
    356    
    357     private class PingFailed implements SimpleTimer.TimedEvent {
    358         private Long _id;
    359         private PingNotifier _notifier;
    360         public PingFailed(Long id, PingNotifier notifier) {
    361             _id = id;
    362             _notifier = notifier;
    363         }
    364        
    365         public void timeReached() {
    366             boolean removed = false;
    367             synchronized (_pendingPings) {
    368                 Object o = _pendingPings.remove(_id);
    369                 if (o != null)
    370                     removed = true;
    371             }
    372             if (removed) {
    373                 if (_notifier != null)
    374                     _notifier.pingComplete(false);
    375                 if (_log.shouldLog(Log.INFO))
    376                     _log.info("Ping failed");
    377             }
    378         }
    379     }
    380    
    381     private class PingRequest {
    382         private boolean _ponged;
    383         private Destination _peer;
    384         private PacketLocal _packet;
    385         private PingNotifier _notifier;
    386         public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
    387             _ponged = false;
    388             _peer = peer;
    389             _packet = packet;
    390             _notifier = notifier;
    391         }
    392         public void pong() {
    393             _log.debug("Ping successful");
    394             _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
    395             synchronized (ConnectionManager.PingRequest.this) {
    396                 _ponged = true;
    397                 ConnectionManager.PingRequest.this.notifyAll();
    398             }
    399             if (_notifier != null)
    400                 _notifier.pingComplete(true);
    401         }
    402         public boolean pongReceived() { return _ponged; }
    403     }
    404    
    405     void receivePong(long pingId) {
    406         PingRequest req = null;
    407         synchronized (_pendingPings) {
    408             req = (PingRequest)_pendingPings.remove(new Long(pingId));
    409         }
    410         if (req != null)
    411             req.pong();
    412     }
     24
     25        private I2PAppContext _context;
     26        private Log _log;
     27        private I2PSession _session;
     28        private MessageHandler _messageHandler;
     29        private PacketHandler _packetHandler;
     30        private ConnectionHandler _connectionHandler;
     31        private PacketQueue _outboundQueue;
     32        private SchedulerChooser _schedulerChooser;
     33        private ConnectionPacketHandler _conPacketHandler;
     34        /** Inbound stream ID (Long) to Connection map */
     35        private Map _connectionByInboundId;
     36        /** Ping ID (Long) to PingRequest */
     37        private Map _pendingPings;
     38        private boolean _allowIncoming;
     39        private int _maxConcurrentStreams;
     40        private ConnectionOptions _defaultOptions;
     41        private volatile int _numWaiting;
     42        private Object _connectionLock;
     43        private long SoTimeout;
     44
     45        public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
     46                _context = context;
     47                _log = context.logManager().getLog(ConnectionManager.class);
     48                _connectionByInboundId = new HashMap(32);
     49                _pendingPings = new HashMap(4);
     50                _connectionLock = new Object();
     51                _messageHandler = new MessageHandler(context, this);
     52                _packetHandler = new PacketHandler(context, this);
     53                _connectionHandler = new ConnectionHandler(context, this);
     54                _schedulerChooser = new SchedulerChooser(context);
     55                _conPacketHandler = new ConnectionPacketHandler(context);
     56                _session = session;
     57                session.setSessionListener(_messageHandler);
     58                _outboundQueue = new PacketQueue(context, session, this);
     59                _allowIncoming = false;
     60                _maxConcurrentStreams = maxConcurrent;
     61                _defaultOptions = defaultOptions;
     62                _numWaiting = 0;
     63                /** Socket timeout for accept() */
     64                SoTimeout = -1;
     65
     66                _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     67                _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     68                _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     69                _context.statManager().createRateStat("stream.con.lifetimeBytesReceived", "How many bytes do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     70                _context.statManager().createRateStat("stream.con.lifetimeDupMessagesSent", "How many duplicate messages do we send on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     71                _context.statManager().createRateStat("stream.con.lifetimeDupMessagesReceived", "How many duplicate messages do we receive on a stream?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     72                _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     73                _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     74                _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     75                _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] {60 * 60 * 1000, 24 * 60 * 60 * 1000});
     76        }
     77
     78        Connection getConnectionByInboundId(long id) {
     79                synchronized(_connectionLock) {
     80                        return (Connection)_connectionByInboundId.get(new Long(id));
     81                }
     82        }
     83
     84        /**
     85         * not guaranteed to be unique, but in case we receive more than one packet
     86         * on an inbound connection that we havent ack'ed yet...
     87         */
     88        Connection getConnectionByOutboundId(long id) {
     89                synchronized(_connectionLock) {
     90                        for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
     91                                Connection con = (Connection)iter.next();
     92                                if(DataHelper.eq(con.getSendStreamId(), id)) {
     93                                        return con;
     94                                }
     95                        }
     96                }
     97                return null;
     98        }
     99
     100        /**
     101         * Set the socket accept() timeout.
     102         * @param x
     103         */
     104        public void MsetSoTimeout(long x) {
     105                SoTimeout = x;
     106        }
     107
     108        /**
     109         * Get the socket accept() timeout.
     110         * @return
     111         */
     112        public long MgetSoTimeout() {
     113                return SoTimeout;
     114        }
     115
     116        public void setAllowIncomingConnections(boolean allow) {
     117                _connectionHandler.setActive(allow);
     118        }
     119
     120        /** should we acceot connections, or just reject everyone? */
     121        public boolean getAllowIncomingConnections() {
     122                return _connectionHandler.getActive();
     123        }
     124
     125        /**
     126         * Create a new connection based on the SYN packet we received.
     127         *
     128         * @return created Connection with the packet's data already delivered to
     129         *         it, or null if the syn's streamId was already taken
     130         */
     131        public Connection receiveConnection(Packet synPacket) {
     132                Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
     133                long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
     134                boolean reject = false;
     135                int active = 0;
     136                int total = 0;
     137                synchronized(_connectionLock) {
     138                        total = _connectionByInboundId.size();
     139                        for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
     140                                if(((Connection)iter.next()).getIsConnected()) {
     141                                        active++;
     142                                }
     143                        }
     144                        if(locked_tooManyStreams()) {
     145                                reject = true;
     146                        } else {
     147                                while(true) {
     148                                        Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
     149                                        if(oldCon == null) {
     150                                                break;
     151                                        } else {
     152                                                _connectionByInboundId.put(new Long(receiveId), oldCon);
     153                                                // receiveId already taken, try another
     154                                                receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
     155                                        }
     156                                }
     157                        }
     158                }
     159
     160                _context.statManager().addRateData("stream.receiveActive", active, total);
     161
     162                if(reject) {
     163                        if(_log.shouldLog(Log.WARN)) {
     164                                _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections");
     165                        }
     166                        PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
     167                        reply.setFlag(Packet.FLAG_RESET);
     168                        reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
     169                        reply.setAckThrough(synPacket.getSequenceNum());
     170                        reply.setSendStreamId(synPacket.getReceiveStreamId());
     171                        reply.setReceiveStreamId(0);
     172                        reply.setOptionalFrom(_session.getMyDestination());
     173                        // this just sends the packet - no retries or whatnot
     174                        _outboundQueue.enqueue(reply);
     175                        return null;
     176                }
     177
     178                con.setReceiveStreamId(receiveId);
     179                try {
     180                        con.getPacketHandler().receivePacket(synPacket, con);
     181                } catch(I2PException ie) {
     182                        synchronized(_connectionLock) {
     183                                _connectionByInboundId.remove(new Long(receiveId));
     184                        }
     185                        return null;
     186                }
     187
     188                _context.statManager().addRateData("stream.connectionReceived", 1, 0);
     189                return con;
     190        }
     191        private static final long DEFAULT_STREAM_DELAY_MAX = 10 * 1000;
     192
     193        /**
     194         * Build a new connection to the given peer.  This blocks if there is no
     195         * connection delay, otherwise it returns immediately.
     196         *
     197         * @return new connection, or null if we have exceeded our limit
     198         */
     199        public Connection connect(Destination peer, ConnectionOptions opts) {
     200                Connection con = null;
     201                long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
     202                long expiration = _context.clock().now() + opts.getConnectTimeout();
     203                if(opts.getConnectTimeout() <= 0) {
     204                        expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
     205                }
     206                _numWaiting++;
     207                while(true) {
     208                        long remaining = expiration - _context.clock().now();
     209                        if(remaining <= 0) {
     210                                if(_log.shouldLog(Log.WARN)) {
     211                                        _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
     212                                }
     213                                _numWaiting--;
     214                                return null;
     215                        }
     216                        boolean reject = false;
     217                        synchronized(_connectionLock) {
     218                                if(locked_tooManyStreams()) {
     219                                        // allow a full buffer of pending/waiting streams
     220                                        if(_numWaiting > _maxConcurrentStreams) {
     221                                                if(_log.shouldLog(Log.WARN)) {
     222                                                        _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
     223                                                }
     224                                                _numWaiting--;
     225                                                return null;
     226                                        }
     227
     228                                        // no remaining streams, lets wait a bit
     229                                        try {
     230                                                _connectionLock.wait(remaining);
     231                                        } catch(InterruptedException ie) {
     232                                        }
     233                                } else {
     234                                        con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
     235                                        con.setRemotePeer(peer);
     236
     237                                        while(_connectionByInboundId.containsKey(new Long(receiveId))) {
     238                                                receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1;
     239                                        }
     240                                        _connectionByInboundId.put(new Long(receiveId), con);
     241                                        break; // stop looping as a psuedo-wait
     242                                }
     243                        }
     244                }
     245
     246                // ok we're in...
     247                con.setReceiveStreamId(receiveId);
     248                con.eventOccurred();
     249
     250                _log.debug("Connect() conDelay = " + opts.getConnectDelay());
     251                if(opts.getConnectDelay() <= 0) {
     252                        con.waitForConnect();
     253                }
     254                if(_numWaiting > 0) {
     255                        _numWaiting--;
     256                }
     257                _context.statManager().addRateData("stream.connectionCreated", 1, 0);
     258                return con;
     259        }
     260
     261        private boolean locked_tooManyStreams() {
     262                if(_maxConcurrentStreams <= 0) {
     263                        return false;
     264                }
     265                if(_connectionByInboundId.size() < _maxConcurrentStreams) {
     266                        return false;
     267                }
     268                int active = 0;
     269                for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
     270                        Connection con = (Connection)iter.next();
     271                        if(con.getIsConnected()) {
     272                                active++;
     273                        }
     274                }
     275
     276                if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) {
     277                        _log.info("More than 100 connections!  " + active + " total: " + _connectionByInboundId.size());
     278                }
     279                return (active >= _maxConcurrentStreams);
     280        }
     281
     282        public MessageHandler getMessageHandler() {
     283                return _messageHandler;
     284        }
     285
     286        public PacketHandler getPacketHandler() {
     287                return _packetHandler;
     288        }
     289
     290        public ConnectionHandler getConnectionHandler() {
     291                return _connectionHandler;
     292        }
     293
     294        public I2PSession getSession() {
     295                return _session;
     296        }
     297
     298        public PacketQueue getPacketQueue() {
     299                return _outboundQueue;
     300        }
     301
     302        /**
     303         * Something b0rked hard, so kill all of our connections without mercy.
     304         * Don't bother sending close packets.
     305         *
     306         */
     307        public void disconnectAllHard() {
     308                synchronized(_connectionLock) {
     309                        for(Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext();) {
     310                                Connection con = (Connection)iter.next();
     311                                con.disconnect(false, false);
     312                        }
     313                        _connectionByInboundId.clear();
     314                        _connectionLock.notifyAll();
     315                }
     316        }
     317
     318        /**
     319         * Drop the (already closed) connection on the floor.
     320         *
     321         */
     322        public void removeConnection(Connection con) {
     323                boolean removed = false;
     324                synchronized(_connectionLock) {
     325                        Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
     326                        removed = (o == con);
     327                        if(_log.shouldLog(Log.DEBUG)) {
     328                                _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
     329                        }
     330                        if(!removed && _log.shouldLog(Log.DEBUG)) {
     331                                _log.debug("Failed to remove " + con + "\n" + _connectionByInboundId.values());
     332                        }
     333                        _connectionLock.notifyAll();
     334                }
     335                if(removed) {
     336                        _context.statManager().addRateData("stream.con.lifetimeMessagesSent", con.getLastSendId(), con.getLifetime());
     337                        _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", con.getHighestAckedThrough(), con.getLifetime());
     338                        _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime());
     339                        _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime());
     340                        _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime());
     341                        _context.statManager().addRateData("stream.con.lifetimeDupMessagesReceived", con.getLifetimeDupMessagesReceived(), con.getLifetime());
     342                        _context.statManager().addRateData("stream.con.lifetimeRTT", con.getOptions().getRTT(), con.getLifetime());
     343                        _context.statManager().addRateData("stream.con.lifetimeCongestionSeenAt", con.getLastCongestionSeenAt(), con.getLifetime());
     344                        _context.statManager().addRateData("stream.con.lifetimeSendWindowSize", con.getOptions().getWindowSize(), con.getLifetime());
     345                }
     346        }
     347
     348        /** return a set of Connection objects */
     349        public Set listConnections() {
     350                synchronized(_connectionLock) {
     351                        return new HashSet(_connectionByInboundId.values());
     352                }
     353        }
     354
     355        public boolean ping(Destination peer, long timeoutMs) {
     356                return ping(peer, timeoutMs, true);
     357        }
     358
     359        public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
     360                return ping(peer, timeoutMs, blocking, null, null, null);
     361        }
     362
     363        public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
     364                Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID - 1) + 1);
     365                PacketLocal packet = new PacketLocal(_context, peer);
     366                packet.setSendStreamId(id.longValue());
     367                packet.setFlag(Packet.FLAG_ECHO);
     368                packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
     369                packet.setOptionalFrom(_session.getMyDestination());
     370                if((keyToUse != null) && (tagsToSend != null)) {
     371                        packet.setKeyUsed(keyToUse);
     372                        packet.setTagsSent(tagsToSend);
     373                }
     374
     375                PingRequest req = new PingRequest(peer, packet, notifier);
     376
     377                synchronized(_pendingPings) {
     378                        _pendingPings.put(id, req);
     379                }
     380
     381                _outboundQueue.enqueue(packet);
     382                packet.releasePayload();
     383
     384                if(blocking) {
     385                        synchronized(req) {
     386                                if(!req.pongReceived()) {
     387                                        try {
     388                                                req.wait(timeoutMs);
     389                                        } catch(InterruptedException ie) {
     390                                        }
     391                                }
     392                        }
     393
     394                        synchronized(_pendingPings) {
     395                                _pendingPings.remove(id);
     396                        }
     397                } else {
     398                        SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
     399                }
     400
     401                boolean ok = req.pongReceived();
     402                return ok;
     403        }
     404
     405        interface PingNotifier {
     406
     407                public void pingComplete(boolean ok);
     408        }
     409
     410        private class PingFailed implements SimpleTimer.TimedEvent {
     411
     412                private Long _id;
     413                private PingNotifier _notifier;
     414
     415                public PingFailed(Long id, PingNotifier notifier) {
     416                        _id = id;
     417                        _notifier = notifier;
     418                }
     419
     420                public void timeReached() {
     421                        boolean removed = false;
     422                        synchronized(_pendingPings) {
     423                                Object o = _pendingPings.remove(_id);
     424                                if(o != null) {
     425                                        removed = true;
     426                                }
     427                        }
     428                        if(removed) {
     429                                if(_notifier != null) {
     430                                        _notifier.pingComplete(false);
     431                                }
     432                                if(_log.shouldLog(Log.INFO)) {
     433                                        _log.info("Ping failed");
     434                                }
     435                        }
     436                }
     437        }
     438
     439        private class PingRequest {
     440
     441                private boolean _ponged;
     442                private Destination _peer;
     443                private PacketLocal _packet;
     444                private PingNotifier _notifier;
     445
     446                public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
     447                        _ponged = false;
     448                        _peer = peer;
     449                        _packet = packet;
     450                        _notifier = notifier;
     451                }
     452
     453                public void pong() {
     454                        _log.debug("Ping successful");
     455                        _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
     456                        synchronized(ConnectionManager.PingRequest.this) {
     457                                _ponged = true;
     458                                ConnectionManager.PingRequest.this.notifyAll();
     459                        }
     460                        if(_notifier != null) {
     461                                _notifier.pingComplete(true);
     462                        }
     463                }
     464
     465                public boolean pongReceived() {
     466                        return _ponged;
     467                }
     468        }
     469
     470        void receivePong(long pingId) {
     471                PingRequest req = null;
     472                synchronized(_pendingPings) {
     473                        req = (PingRequest)_pendingPings.remove(new Long(pingId));
     474                }
     475                if(req != null) {
     476                        req.pong();
     477                }
     478        }
    413479}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java

    r8d78a77 rfa5c721  
    11package net.i2p.client.streaming;
    22
     3import java.net.SocketTimeoutException;
     4import java.util.logging.Level;
     5import java.util.logging.Logger;
    36import net.i2p.I2PException;
    47
     
    811 */
    912public class I2PServerSocketFull implements I2PServerSocket {
    10     private I2PSocketManagerFull _socketManager;
    11    
    12     public I2PServerSocketFull(I2PSocketManagerFull mgr) {
    13         _socketManager = mgr;
    14     }
    15    
    16     public I2PSocket accept() throws I2PException {
    17         return _socketManager.receiveSocket();
    18     }
    19    
    20     public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); }
    21    
    22     public I2PSocketManager getManager() { return _socketManager; }
     13
     14        private I2PSocketManagerFull _socketManager;
     15
     16        /**
     17         *
     18         * @param mgr
     19         */
     20        public I2PServerSocketFull(I2PSocketManagerFull mgr) {
     21                _socketManager = mgr;
     22        }
     23
     24        /**
     25         *
     26         * @return
     27         * @throws net.i2p.I2PException
     28         * @throws SocketTimeoutException
     29         */
     30        public I2PSocket accept() throws I2PException, SocketTimeoutException {
     31                        return _socketManager.receiveSocket();
     32        }
     33
     34        public long getSoTimeout() {
     35                return _socketManager.getConnectionManager().MgetSoTimeout();
     36        }
     37
     38        public void setSoTimeout(long x) {
     39                _socketManager.getConnectionManager().MsetSoTimeout(x);
     40        }
     41        /**
     42         * Close the connection.
     43         */
     44        public void close() {
     45                _socketManager.getConnectionManager().setAllowIncomingConnections(false);
     46        }
     47
     48        /**
     49         *
     50         * @return _socketManager
     51         */
     52        public I2PSocketManager getManager() {
     53                return _socketManager;
     54        }
    2355}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java

    r8d78a77 rfa5c721  
    1212 */
    1313public class I2PSocketFull implements I2PSocket {
    14     private Connection _connection;
    15     private I2PSocket.SocketErrorListener _listener;
    16     private Destination _remotePeer;
    17     private Destination _localPeer;
    18    
    19     public I2PSocketFull(Connection con) {
    20         _connection = con;
    21         if (con != null) {
    22             _remotePeer = con.getRemotePeer();
    23             _localPeer = con.getSession().getMyDestination();
    24         }
    25     }
    26    
    27     public void close() throws IOException {
    28         Connection c = _connection;
    29         if (c == null) return;
    30         if (c.getIsConnected()) {
    31             OutputStream out = c.getOutputStream();
    32             if (out != null) {
    33                 try {
    34                     out.close();
    35                 } catch (IOException ioe) {
    36                     // ignore any write error, as we want to keep on and kill the
    37                     // con (thanks Complication!)
    38                 }
    39             }
    40             c.disconnect(true);
    41         } else {
    42             //throw new IOException("Not connected");
    43         }
    44         destroy();
    45     }
    46    
    47     Connection getConnection() { return _connection; }
    48    
    49     public InputStream getInputStream() {
    50         Connection c = _connection;
    51         if (c != null)
    52             return c.getInputStream();
    53         else
    54             return null;
    55     }
    56    
    57     public I2PSocketOptions getOptions() {
    58         Connection c = _connection;
    59         if (c != null)
    60             return c.getOptions();
    61         else
    62             return null;
    63     }
    64    
    65     public OutputStream getOutputStream() throws IOException {
    66         Connection c = _connection;
    67         if (c != null)
    68             return c.getOutputStream();
    69         else
    70             return null;
    71     }
    72    
    73     public Destination getPeerDestination() { return _remotePeer; }
    74    
    75     public long getReadTimeout() {
    76         I2PSocketOptions opts = getOptions();
    77         if (opts != null)
    78             return opts.getReadTimeout();
    79         else
    80             return -1;
    81     }
    82    
    83     public Destination getThisDestination() { return _localPeer; }
    84    
    85     public void setOptions(I2PSocketOptions options) {
    86         Connection c = _connection;
    87         if (c == null) return;
    88        
    89         if (options instanceof ConnectionOptions)
    90             c.setOptions((ConnectionOptions)options);
    91         else
    92             c.setOptions(new ConnectionOptions(options));
    93     }
    94    
    95     public void setReadTimeout(long ms) {
    96         Connection c = _connection;
    97         if (c == null) return;
    98        
    99         c.getInputStream().setReadTimeout((int)ms);
    100         c.getOptions().setReadTimeout(ms);
    101     }
    102    
    103     public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
    104         _listener = lsnr;
    105     }
    106    
    107     public boolean isClosed() {
    108         Connection c = _connection;
    109         return ((c == null) ||
    110                 (!c.getIsConnected()) ||
    111                 (c.getResetReceived()) ||
    112                 (c.getResetSent()));
    113     }
    114    
    115     void destroy() {
    116         Connection c = _connection;
    117         _connection = null;
    118         _listener = null;
    119         if (c != null)
    120             c.disconnectComplete();
    121     }
    122     public String toString() {
    123         Connection c = _connection;
    124         if (c == null)
    125             return super.toString();
    126         else
    127             return c.toString();
    128     }
     14
     15        private Connection _connection;
     16        private I2PSocket.SocketErrorListener _listener;
     17        private Destination _remotePeer;
     18        private Destination _localPeer;
     19
     20        public I2PSocketFull(Connection con) {
     21                _connection = con;
     22                if(con != null) {
     23                        _remotePeer = con.getRemotePeer();
     24                        _localPeer = con.getSession().getMyDestination();
     25                }
     26        }
     27
     28
     29        public void close() throws IOException {
     30                Connection c = _connection;
     31                if(c == null) {
     32                        return;
     33                }
     34                if(c.getIsConnected()) {
     35                        OutputStream out = c.getOutputStream();
     36                        if(out != null) {
     37                                try {
     38                                        out.close();
     39                                } catch(IOException ioe) {
     40                                        // ignore any write error, as we want to keep on and kill the
     41                                        // con (thanks Complication!)
     42                                }
     43                        }
     44                        c.disconnect(true);
     45                } else {
     46                        //throw new IOException("Not connected");
     47                }
     48                destroy();
     49        }
     50
     51        Connection getConnection() {
     52                return _connection;
     53        }
     54
     55        public InputStream getInputStream() {
     56                Connection c = _connection;
     57                if(c != null) {
     58                        return c.getInputStream();
     59                } else {
     60                        return null;
     61                }
     62        }
     63
     64        public I2PSocketOptions getOptions() {
     65                Connection c = _connection;
     66                if(c != null) {
     67                        return c.getOptions();
     68                } else {
     69                        return null;
     70                }
     71        }
     72
     73        public OutputStream getOutputStream() throws IOException {
     74                Connection c = _connection;
     75                if(c != null) {
     76                        return c.getOutputStream();
     77                } else {
     78                        return null;
     79                }
     80        }
     81
     82        public Destination getPeerDestination() {
     83                return _remotePeer;
     84        }
     85
     86        public long getReadTimeout() {
     87                I2PSocketOptions opts = getOptions();
     88                if(opts != null) {
     89                        return opts.getReadTimeout();
     90                } else {
     91                        return -1;
     92                }
     93        }
     94
     95        public Destination getThisDestination() {
     96                return _localPeer;
     97        }
     98
     99        public void setOptions(I2PSocketOptions options) {
     100                Connection c = _connection;
     101                if(c == null) {
     102                        return;
     103                }
     104                if(options instanceof ConnectionOptions) {
     105                        c.setOptions((ConnectionOptions)options);
     106                } else {
     107                        c.setOptions(new ConnectionOptions(options));
     108                }
     109        }
     110
     111        public void setReadTimeout(long ms) {
     112                Connection c = _connection;
     113                if(c == null) {
     114                        return;
     115                }
     116                c.getInputStream().setReadTimeout((int)ms);
     117                c.getOptions().setReadTimeout(ms);
     118        }
     119
     120        public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
     121                _listener = lsnr;
     122        }
     123
     124        public boolean isClosed() {
     125                Connection c = _connection;
     126                return ((c == null) ||
     127                        (!c.getIsConnected()) ||
     128                        (c.getResetReceived()) ||
     129                        (c.getResetSent()));
     130        }
     131
     132        void destroy() {
     133                Connection c = _connection;
     134                _connection = null;
     135                _listener = null;
     136                if(c != null) {
     137                        c.disconnectComplete();
     138                }
     139        }
     140
     141        public String toString() {
     142                Connection c = _connection;
     143                if(c == null) {
     144                        return super.toString();
     145                } else {
     146                        return c.toString();
     147                }
     148        }
    129149}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java

    r8d78a77 rfa5c721  
    22
    33import java.net.NoRouteToHostException;
     4import java.net.SocketTimeoutException;
    45import java.util.HashSet;
    56import java.util.Iterator;
     
    1314import net.i2p.data.Destination;
    1415import net.i2p.util.Log;
    15 
    1616
    1717/**
     
    2424 */
    2525public class I2PSocketManagerFull implements I2PSocketManager {
    26     private I2PAppContext _context;
    27     private Log _log;
    28     private I2PSession _session;
    29     private I2PServerSocketFull _serverSocket;
    30     private ConnectionOptions _defaultOptions;
    31     private long _acceptTimeout;
    32     private String _name;
    33     private int _maxStreams;
    34     private static int __managerId = 0;
    35     private ConnectionManager _connectionManager;
    36    
    37     /**
    38      * How long to wait for the client app to accept() before sending back CLOSE?
    39      * This includes the time waiting in the queue.  Currently set to 5 seconds.
    40      */
    41     private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
    42    
    43     public I2PSocketManagerFull() {
    44         _context = null;
    45         _session = null;
    46     }
    47     public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
    48         this();
    49         init(context, session, opts, name);
    50     }
    51    
    52     /** how many streams will we allow at once?  */
    53     public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
    54    
    55     /**
    56      *
    57      */
    58     public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
    59         _context = context;
    60         _session = session;
    61         _log = _context.logManager().getLog(I2PSocketManagerFull.class);
    62        
    63         _maxStreams = -1;
    64         try {
    65             String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
    66             _maxStreams = Integer.parseInt(num);
    67         } catch (NumberFormatException nfe) {
    68             if (_log.shouldLog(Log.WARN))
    69                 _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
    70             _maxStreams = -1;
    71         }
    72         _name = name + " " + (++__managerId);
    73         _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
    74         _defaultOptions = new ConnectionOptions(opts);
    75         _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
    76         _serverSocket = new I2PServerSocketFull(this);
    77        
    78         if (_log.shouldLog(Log.INFO)) {
    79             _log.info("Socket manager created.  \ndefault options: " + _defaultOptions
    80                       + "\noriginal properties: " + opts);
    81         }
    82     }
    83 
    84     public I2PSocketOptions buildOptions() { return buildOptions(null); }
    85     public I2PSocketOptions buildOptions(Properties opts) {
    86         ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
    87         curOpts.setProperties(opts);
    88         return curOpts;
    89     }
    90    
    91     public I2PSession getSession() {
    92         return _session;
    93     }
    94    
    95     public ConnectionManager getConnectionManager() {
    96         return _connectionManager;
    97     }
    98 
    99     public I2PSocket receiveSocket() throws I2PException {
    100         verifySession();
    101         Connection con = _connectionManager.getConnectionHandler().accept(-1);
    102         if (_log.shouldLog(Log.DEBUG))
    103             _log.debug("receiveSocket() called: " + con);
    104         if (con != null) {
    105             I2PSocketFull sock = new I2PSocketFull(con);
    106             con.setSocket(sock);
    107             return sock;
    108         } else {
    109             return null;
    110         }
    111     }
    112    
    113     /**
    114      * Ping the specified peer, returning true if they replied to the ping within
    115      * the timeout specified, false otherwise.  This call blocks.
    116      *
    117      */
    118     public boolean ping(Destination peer, long timeoutMs) {
    119         return _connectionManager.ping(peer, timeoutMs);
    120     }
    121 
    122     /**
    123      * How long should we wait for the client to .accept() a socket before
    124      * sending back a NACK/Close? 
    125      *
    126      * @param ms milliseconds to wait, maximum
    127      */
    128     public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
    129     public long getAcceptTimeout() { return _acceptTimeout; }
    130 
    131     public void setDefaultOptions(I2PSocketOptions options) {
    132         _defaultOptions = new ConnectionOptions((ConnectionOptions) options);
    133     }
    134 
    135     public I2PSocketOptions getDefaultOptions() {
    136         return _defaultOptions;
    137     }
    138 
    139     public I2PServerSocket getServerSocket() {
    140         _connectionManager.setAllowIncomingConnections(true);
    141         return _serverSocket;
    142     }
    143 
    144     private void verifySession() throws I2PException {
    145         if (!_connectionManager.getSession().isClosed())
    146             return;
    147         _connectionManager.getSession().connect();
    148     }
    149    
    150     /**
    151      * Create a new connected socket (block until the socket is created)
    152      *
    153      * @param peer Destination to connect to
    154      * @param options I2P socket options to be used for connecting
    155      *
    156      * @throws NoRouteToHostException if the peer is not found or not reachable
    157      * @throws I2PException if there is some other I2P-related problem
    158      */
    159     public I2PSocket connect(Destination peer, I2PSocketOptions options)
    160                              throws I2PException, NoRouteToHostException {
    161         verifySession();
    162         if (options == null)
    163             options = _defaultOptions;
    164         ConnectionOptions opts = null;
    165         if (options instanceof ConnectionOptions)
    166             opts = new ConnectionOptions((ConnectionOptions)options);
    167         else
    168             opts = new ConnectionOptions(options);
    169        
    170         if (_log.shouldLog(Log.INFO))
    171             _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
    172                       + " with options: " + opts);
    173         Connection con = _connectionManager.connect(peer, opts);
    174         if (con == null)
    175             throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
    176         I2PSocketFull socket = new I2PSocketFull(con);
    177         con.setSocket(socket);
    178         if (con.getConnectionError() != null) {
    179             con.disconnect(false);
    180             throw new NoRouteToHostException(con.getConnectionError());
    181         }
    182         return socket;
    183     }
    184 
    185     /**
    186      * Create a new connected socket (block until the socket is created)
    187      *
    188      * @param peer Destination to connect to
    189      *
    190      * @throws NoRouteToHostException if the peer is not found or not reachable
    191      * @throws I2PException if there is some other I2P-related problem
    192      */
    193     public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
    194         return connect(peer, _defaultOptions);
    195     }
    196 
    197     /**
    198      * Destroy the socket manager, freeing all the associated resources.  This
    199      * method will block untill all the managed sockets are closed.
    200      *
    201      */
    202     public void destroySocketManager() {
    203         _connectionManager.disconnectAllHard();
    204         _connectionManager.setAllowIncomingConnections(false);
    205         // should we destroy the _session too?
    206         // yes, since the old lib did (and SAM wants it to, and i dont know why not)
    207         if ( (_session != null) && (!_session.isClosed()) ) {
    208             try {
    209                 _session.destroySession();
    210             } catch (I2PSessionException ise) {
    211                 _log.warn("Unable to destroy the session", ise);
    212             }
    213         }
    214     }
    215 
    216     /**
    217      * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
    218      *
    219      */
    220     public Set listSockets() {
    221         Set connections = _connectionManager.listConnections();
    222         Set rv = new HashSet(connections.size());
    223         for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
    224             Connection con = (Connection)iter.next();
    225             if (con.getSocket() != null)
    226                 rv.add(con.getSocket());
    227         }
    228         return rv;
    229     }
    230 
    231     public String getName() { return _name; }
    232     public void setName(String name) { _name = name; }
    233    
    234    
    235     public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    236         _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
    237     }
    238     public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    239         _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
    240     }
     26
     27        private I2PAppContext _context;
     28        private Log _log;
     29        private I2PSession _session;
     30        private I2PServerSocketFull _serverSocket;
     31        private ConnectionOptions _defaultOptions;
     32        private long _acceptTimeout;
     33        private String _name;
     34        private int _maxStreams;
     35        private static int __managerId = 0;
     36        private ConnectionManager _connectionManager;
     37        /**
     38         * How long to wait for the client app to accept() before sending back CLOSE?
     39         * This includes the time waiting in the queue.  Currently set to 5 seconds.
     40         */
     41        private static final long ACCEPT_TIMEOUT_DEFAULT = 5 * 1000;
     42
     43        /**
     44         *
     45         */
     46        public I2PSocketManagerFull() {
     47                _context = null;
     48                _session = null;
     49        }
     50
     51        /**
     52         *
     53         * @param context
     54         * @param session
     55         * @param opts
     56         * @param name
     57         */
     58        public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
     59                this();
     60                init(context, session, opts, name);
     61        }
     62        /** how many streams will we allow at once?  */
     63        public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
     64
     65        /**
     66         *
     67         *
     68         * @param context
     69         * @param session
     70         * @param opts
     71         * @param name
     72         */
     73        public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
     74                _context = context;
     75                _session = session;
     76                _log = _context.logManager().getLog(I2PSocketManagerFull.class);
     77
     78                _maxStreams = -1;
     79                try {
     80                        String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
     81                        _maxStreams = Integer.parseInt(num);
     82                } catch(NumberFormatException nfe) {
     83                        if(_log.shouldLog(Log.WARN)) {
     84                                _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
     85                        }
     86                        _maxStreams = -1;
     87                }
     88                _name = name + " " + (++__managerId);
     89                _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
     90                _defaultOptions = new ConnectionOptions(opts);
     91                _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
     92                _serverSocket = new I2PServerSocketFull(this);
     93
     94                if(_log.shouldLog(Log.INFO)) {
     95                        _log.info("Socket manager created.  \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts);
     96                }
     97        }
     98
     99        /**
     100         *
     101         * @return
     102         */
     103        public I2PSocketOptions buildOptions() {
     104                return buildOptions(null);
     105        }
     106
     107        /**
     108         *
     109         * @param opts
     110         * @return
     111         */
     112        public I2PSocketOptions buildOptions(Properties opts) {
     113                ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
     114                curOpts.setProperties(opts);
     115                return curOpts;
     116        }
     117
     118        /**
     119         *
     120         * @return
     121         */
     122        public I2PSession getSession() {
     123                return _session;
     124        }
     125
     126        /**
     127         *
     128         * @return
     129         */
     130        public ConnectionManager getConnectionManager() {
     131                return _connectionManager;
     132        }
     133
     134        /**
     135         *
     136         * @return
     137         * @throws net.i2p.I2PException
     138         * @throws java.net.SocketTimeoutException
     139         */
     140        public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
     141                verifySession();
     142                Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
     143                if(_log.shouldLog(Log.DEBUG)) {
     144                        _log.debug("receiveSocket() called: " + con);
     145                }
     146                if(con != null) {
     147                        I2PSocketFull sock = new I2PSocketFull(con);
     148                        con.setSocket(sock);
     149                        return sock;
     150                } else {
     151                        if(_connectionManager.MgetSoTimeout() == -1) {
     152                                return null;
     153                        }
     154                        throw new SocketTimeoutException("I2PSocket timed out");
     155                }
     156        }
     157
     158        /**
     159         * Ping the specified peer, returning true if they replied to the ping within
     160         * the timeout specified, false otherwise.  This call blocks.
     161         *
     162         *
     163         * @param peer
     164         * @param timeoutMs
     165         * @return
     166         */
     167        public boolean ping(Destination peer, long timeoutMs) {
     168                return _connectionManager.ping(peer, timeoutMs);
     169        }
     170
     171        /**
     172         * How long should we wait for the client to .accept() a socket before
     173         * sending back a NACK/Close? 
     174         *
     175         * @param ms milliseconds to wait, maximum
     176         */
     177        public void setAcceptTimeout(long ms) {
     178                _acceptTimeout = ms;
     179        }
     180
     181        /**
     182         *
     183         * @return
     184         */
     185        public long getAcceptTimeout() {
     186                return _acceptTimeout;
     187        }
     188
     189        /**
     190         *
     191         * @param options
     192         */
     193        public void setDefaultOptions(I2PSocketOptions options) {
     194                _defaultOptions = new ConnectionOptions((ConnectionOptions)options);
     195        }
     196
     197        /**
     198         *
     199         * @return
     200         */
     201        public I2PSocketOptions getDefaultOptions() {
     202                return _defaultOptions;
     203        }
     204
     205        /**
     206         *
     207         * @return
     208         */
     209        public I2PServerSocket getServerSocket() {
     210                _connectionManager.setAllowIncomingConnections(true);
     211                return _serverSocket;
     212        }
     213
     214        private void verifySession() throws I2PException {
     215                if(!_connectionManager.getSession().isClosed()) {
     216                        return;
     217                }
     218                _connectionManager.getSession().connect();
     219        }
     220
     221        /**
     222         * Create a new connected socket (block until the socket is created)
     223         *
     224         * @param peer Destination to connect to
     225         * @param options I2P socket options to be used for connecting
     226         *
     227         * @throws NoRouteToHostException if the peer is not found or not reachable
     228         * @throws I2PException if there is some other I2P-related problem
     229         */
     230        public I2PSocket connect(Destination peer, I2PSocketOptions options)
     231                throws I2PException, NoRouteToHostException {
     232                verifySession();
     233                if(options == null) {
     234                        options = _defaultOptions;
     235                }
     236                ConnectionOptions opts = null;
     237                if(options instanceof ConnectionOptions) {
     238                        opts = new ConnectionOptions((ConnectionOptions)options);
     239                } else {
     240                        opts = new ConnectionOptions(options);
     241                }
     242                if(_log.shouldLog(Log.INFO)) {
     243                        _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
     244                }
     245                Connection con = _connectionManager.connect(peer, opts);
     246                if(con == null) {
     247                        throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
     248                }
     249                I2PSocketFull socket = new I2PSocketFull(con);
     250                con.setSocket(socket);
     251                if(con.getConnectionError() != null) {
     252                        con.disconnect(false);
     253                        throw new NoRouteToHostException(con.getConnectionError());
     254                }
     255                return socket;
     256        }
     257
     258        /**
     259         * Create a new connected socket (block until the socket is created)
     260         *
     261         * @param peer Destination to connect to
     262         *
     263         * @return
     264         * @throws NoRouteToHostException if the peer is not found or not reachable
     265         * @throws I2PException if there is some other I2P-related problem
     266         */
     267        public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException {
     268                return connect(peer, _defaultOptions);
     269        }
     270
     271        /**
     272         * Destroy the socket manager, freeing all the associated resources.  This
     273         * method will block untill all the managed sockets are closed.
     274         *
     275         */
     276        public void destroySocketManager() {
     277                _connectionManager.disconnectAllHard();
     278                _connectionManager.setAllowIncomingConnections(false);
     279                // should we destroy the _session too?
     280                // yes, since the old lib did (and SAM wants it to, and i dont know why not)
     281                if((_session != null) && (!_session.isClosed())) {
     282                        try {
     283                                _session.destroySession();
     284                        } catch(I2PSessionException ise) {
     285                                _log.warn("Unable to destroy the session", ise);
     286                        }
     287                }
     288        }
     289
     290        /**
     291         * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
     292         *
     293         *
     294         * @return
     295         */
     296        public Set listSockets() {
     297                Set connections = _connectionManager.listConnections();
     298                Set rv = new HashSet(connections.size());
     299                for(Iterator iter = connections.iterator(); iter.hasNext();) {
     300                        Connection con = (Connection)iter.next();
     301                        if(con.getSocket() != null) {
     302                                rv.add(con.getSocket());
     303                        }
     304                }
     305                return rv;
     306        }
     307
     308        /**
     309         *
     310         * @return
     311         */
     312        public String getName() {
     313                return _name;
     314        }
     315
     316        /**
     317         *
     318         * @param name
     319         */
     320        public void setName(String name) {
     321                _name = name;
     322        }
     323
     324        /**
     325         *
     326         * @param lsnr
     327         */
     328        public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
     329                _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
     330        }
     331
     332        /**
     333         *
     334         * @param lsnr
     335         */
     336        public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
     337                _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
     338        }
    241339}
  • apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java

    r8d78a77 rfa5c721  
    66 *
    77 */
    8 class RetransmissionTimer extends SimpleTimer {
     8public class RetransmissionTimer extends SimpleTimer {
    99    private static final RetransmissionTimer _instance = new RetransmissionTimer();
    1010    public static final SimpleTimer getInstance() { return _instance; }
  • core/java/src/net/i2p/util/Executor.java

    r8d78a77 rfa5c721  
    66
    77class Executor implements Runnable {
    8     private I2PAppContext _context;
    9     private Log _log;
    10     private List _readyEvents;
    11     public Executor(I2PAppContext ctx, Log log, List events) {
    12         _context = ctx;
    13         _readyEvents = events;
    14     }
    15     public void run() {
    16         while (true) {
    17             SimpleTimer.TimedEvent evt = null;
    18             synchronized (_readyEvents) {
    19                 if (_readyEvents.size() <= 0)
    20                     try { _readyEvents.wait(); } catch (InterruptedException ie) {}
    21                 if (_readyEvents.size() > 0)
    22                     evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
    23             }
    248
    25             if (evt != null) {
    26                 long before = _context.clock().now();
    27                 try {
    28                     evt.timeReached();
    29                 } catch (Throwable t) {
    30                     log("wtf, event borked: " + evt, t);
    31                 }
    32                 long time = _context.clock().now() - before;
    33                 if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
    34                     _log.warn("wtf, event execution took " + time + ": " + evt);
    35             }
    36         }
    37     }
    38    
    39     private void log(String msg, Throwable t) {
    40         synchronized (this) {
    41             if (_log == null)
    42                 _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
    43         }
    44         _log.log(Log.CRIT, msg, t);
    45     }
     9        private I2PAppContext _context;
     10        private Log _log;
     11        private List _readyEvents;
     12        private SimpleStore runn;
     13
     14        public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
     15                _context = ctx;
     16                _readyEvents = events;
     17                runn = x;
     18        }
     19
     20        public void run() {
     21                while(runn.getAnswer()) {
     22                        SimpleTimer.TimedEvent evt = null;
     23                        synchronized(_readyEvents) {
     24                                if(_readyEvents.size() <= 0) {
     25                                        try {
     26                                                _readyEvents.wait();
     27                                        } catch(InterruptedException ie) {
     28                                        }
     29                                }
     30                                if(_readyEvents.size() > 0) {
     31                                        evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
     32                                }
     33                        }
     34
     35                        if(evt != null) {
     36                                long before = _context.clock().now();
     37                                try {
     38                                        evt.timeReached();
     39                                } catch(Throwable t) {
     40                                        log("wtf, event borked: " + evt, t);
     41                                }
     42                                long time = _context.clock().now() - before;
     43                                if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
     44                                        _log.warn("wtf, event execution took " + time + ": " + evt);
     45                                }
     46                        }
     47                }
     48        }
     49
     50        /**
     51         *
     52         * @param msg
     53         * @param t
     54         */
     55        private void log(String msg, Throwable t) {
     56                synchronized(this) {
     57                        if(_log == null) {
     58                                _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
     59                        }
     60                }
     61                _log.log(Log.CRIT, msg, t);
     62        }
    4663}
  • core/java/src/net/i2p/util/SimpleTimer.java

    r8d78a77 rfa5c721  
    1717 */
    1818public class SimpleTimer {
    19     private static final SimpleTimer _instance = new SimpleTimer();
    20     public static SimpleTimer getInstance() { return _instance; }
    21     private I2PAppContext _context;
    22     private Log _log;
    23     /** event time (Long) to event (TimedEvent) mapping */
    24     private TreeMap _events;
    25     /** event (TimedEvent) to event time (Long) mapping */
    26     private Map _eventTimes;
    27     private List _readyEvents;
    28    
    29     protected SimpleTimer() { this("SimpleTimer"); }
    30     protected SimpleTimer(String name) {
    31         _context = I2PAppContext.getGlobalContext();
    32         _log = _context.logManager().getLog(SimpleTimer.class);
    33         _events = new TreeMap();
    34         _eventTimes = new HashMap(256);
    35         _readyEvents = new ArrayList(4);
    36         I2PThread runner = new I2PThread(new SimpleTimerRunner());
    37         runner.setName(name);
    38         runner.setDaemon(true);
    39         runner.start();
    40         for (int i = 0; i < 3; i++) {
    41             I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
    42             executor.setName(name + "Executor " + i);
    43             executor.setDaemon(true);
    44             executor.start();
    45         }
    46     }
    47    
    48     public void reschedule(TimedEvent event, long timeoutMs) {
    49         addEvent(event, timeoutMs, false);
    50     }
    51    
    52     /**
    53      * Queue up the given event to be fired no sooner than timeoutMs from now.
    54      * However, if this event is already scheduled, the event will be scheduled
    55      * for the earlier of the two timeouts, which may be before this stated
    56      * timeout.  If this is not the desired behavior, call removeEvent first.
    57      *
    58      */
    59     public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
    60     /**
    61      * @param useEarliestTime if its already scheduled, use the earlier of the
    62      *                        two timeouts, else use the later
    63      */
    64     public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
    65         int totalEvents = 0;
    66         long now = System.currentTimeMillis();
    67         long eventTime = now + timeoutMs;
    68         Long time = new Long(eventTime);
    69         synchronized (_events) {
    70             // remove the old scheduled position, then reinsert it
    71             Long oldTime = (Long)_eventTimes.get(event);
    72             if (oldTime != null) {
    73                 if (useEarliestTime) {
    74                     if (oldTime.longValue() < eventTime) {
    75                         _events.notifyAll();
    76                         return; // already scheduled for sooner than requested
    77                     } else {
    78                         _events.remove(oldTime);
    79                     }
    80                 } else {
    81                     if (oldTime.longValue() > eventTime) {
    82                         _events.notifyAll();
    83                         return; // already scheduled for later than the given period
    84                     } else {
    85                         _events.remove(oldTime);
    86                     }
    87                 }
    88             }
    89             while (_events.containsKey(time))
    90                 time = new Long(time.longValue() + 1);
    91             _events.put(time, event);
    92             _eventTimes.put(event, time);
    93            
    94             if ( (_events.size() != _eventTimes.size()) ) {
    95                 _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
    96                 for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
    97                     TimedEvent evt = (TimedEvent)iter.next();
    98                     Long when = (Long)_eventTimes.get(evt);
    99                     TimedEvent cur = (TimedEvent)_events.get(when);
    100                     if (cur != evt) {
    101                         _log.error("event " + evt + " @ " + when + ": " + cur);
    102                     }
    103                 }
    104             }
    105            
    106             totalEvents = _events.size();
    107             _events.notifyAll();
    108         }
    109         if (time.longValue() > eventTime + 100) {
    110             if (_log.shouldLog(Log.WARN))
    111                 _log.warn("Lots of timer congestion, had to push " + event + " back "
    112                            + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");
    113         }
    114         long timeToAdd = System.currentTimeMillis() - now;
    115         if (timeToAdd > 50) {
    116             if (_log.shouldLog(Log.WARN))
    117                 _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
    118         }
    119            
    120     }
    121    
    122     public boolean removeEvent(TimedEvent evt) {
    123         if (evt == null) return false;
    124         synchronized (_events) {
    125             Long when = (Long)_eventTimes.remove(evt);
    126             if (when != null)
    127                 _events.remove(when);
    128             return null != when;
    129         }
    130     }
    131    
    132     /**
    133      * Simple interface for events to be queued up and notified on expiration
    134      */
    135     public interface TimedEvent {
    136         /**
    137          * the time requested has been reached (this call should NOT block,
    138          * otherwise the whole SimpleTimer gets backed up)
    139          *
    140          */
    141         public void timeReached();
    142     }
    143    
    144     private long _occurredTime;
    145     private long _occurredEventCount;
    146     private TimedEvent _recentEvents[] = new TimedEvent[5];
    147    
    148     private class SimpleTimerRunner implements Runnable {
    149         public void run() {
    150             List eventsToFire = new ArrayList(1);
    151             while (true) {
    152                 try {
    153                     synchronized (_events) {
    154                         //if (_events.size() <= 0)
    155                         //    _events.wait();
    156                         //if (_events.size() > 100)
    157                         //    _log.warn("> 100 events!  " + _events.values());
    158                         long now = System.currentTimeMillis();
    159                         long nextEventDelay = -1;
    160                         Object nextEvent = null;
    161                         while (true) {
    162                             if (_events.size() <= 0) break;
    163                             Long when = (Long)_events.firstKey();
    164                             if (when.longValue() <= now) {
    165                                 TimedEvent evt = (TimedEvent)_events.remove(when);
    166                                 if (evt != null) {                           
    167                                     _eventTimes.remove(evt);
    168                                     eventsToFire.add(evt);
    169                                 }
    170                             } else {
    171                                 nextEventDelay = when.longValue() - now;
    172                                 nextEvent = _events.get(when);
    173                                 break;
    174                             }
    175                         }
    176                         if (eventsToFire.size() <= 0) {
    177                             if (nextEventDelay != -1) {
    178                                 if (_log.shouldLog(Log.DEBUG))
    179                                     _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
    180                                 _events.wait(nextEventDelay);
    181                             } else {
    182                                 _events.wait();
    183                             }
    184                         }
    185                     }
    186                 } catch (ThreadDeath td) {
    187                     return; // die
    188                 } catch (InterruptedException ie) {
    189                     // ignore
    190                 } catch (Throwable t) {
    191                     if (_log != null) {
    192                         _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
    193                     } else {
    194                         System.err.println("Uncaught exception in SimpleTimer");
    195                         t.printStackTrace();
    196                     }
    197                 }
    198                
    199                 long now = System.currentTimeMillis();
    200                 now = now - (now % 1000);
    201 
    202                 synchronized (_readyEvents) {
    203                     for (int i = 0; i < eventsToFire.size(); i++)
    204                         _readyEvents.add(eventsToFire.get(i));
    205                     _readyEvents.notifyAll();
    206                 }
    207 
    208                 if (_occurredTime == now) {
    209                     _occurredEventCount += eventsToFire.size();
    210                 } else {
    211                     _occurredTime = now;
    212                     if (_occurredEventCount > 2500) {
    213                         StringBuffer buf = new StringBuffer(128);
    214                         buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
    215                         buf.append(") in a second!");
    216                         _log.log(Log.WARN, buf.toString());
    217                     }
    218                     _occurredEventCount = 0;
    219                 }
    220 
    221                 eventsToFire.clear();
    222             }
    223         }
    224     }
     19
     20        private static final SimpleTimer _instance = new SimpleTimer();
     21
     22        public static SimpleTimer getInstance() {
     23                return _instance;
     24        }
     25        private I2PAppContext _context;
     26        private Log _log;
     27        /** event time (Long) to event (TimedEvent) mapping */
     28        private TreeMap _events;
     29        /** event (TimedEvent) to event time (Long) mapping */
     30        private Map _eventTimes;
     31        private List _readyEvents;
     32        private SimpleStore runn;
     33
     34        /**
     35         *
     36         */
     37        protected SimpleTimer() {
     38                this("SimpleTimer");
     39        }
     40
     41        /**
     42         *
     43         * @param name
     44         */
     45        protected SimpleTimer(String name) {
     46                runn = new SimpleStore(true);
     47                _context = I2PAppContext.getGlobalContext();
     48                _log = _context.logManager().getLog(SimpleTimer.class);
     49                _events = new TreeMap();
     50                _eventTimes = new HashMap(256);
     51                _readyEvents = new ArrayList(4);
     52                I2PThread runner = new I2PThread(new SimpleTimerRunner());
     53                runner.setName(name);
     54                runner.setDaemon(true);
     55                runner.start();
     56                for(int i = 0; i < 3; i++) {
     57                        I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
     58                        executor.setName(name + "Executor " + i);
     59                        executor.setDaemon(true);
     60                        executor.start();
     61                }
     62        }
     63
     64        /**
     65         * Removes the SimpleTimer.
     66         */
     67        public void removeSimpleTimer() {
     68                synchronized(_events) {
     69                        runn.setAnswer(false);
     70                        _events.notifyAll();
     71                }
     72        }
     73
     74        /**
     75         *
     76         * @param event
     77         * @param timeoutMs
     78         */
     79        public void reschedule(TimedEvent event, long timeoutMs) {
     80                addEvent(event, timeoutMs, false);
     81        }
     82
     83        /**
     84         * Queue up the given event to be fired no sooner than timeoutMs from now.
     85         * However, if this event is already scheduled, the event will be scheduled
     86         * for the earlier of the two timeouts, which may be before this stated
     87         * timeout.  If this is not the desired behavior, call removeEvent first.
     88         *
     89         * @param event
     90         * @param timeoutMs
     91         */
     92        public void addEvent(TimedEvent event, long timeoutMs) {
     93                addEvent(event, timeoutMs, true);
     94        }
     95
     96        /**
     97         * @param event
     98         * @param timeoutMs
     99         * @param useEarliestTime if its already scheduled, use the earlier of the
     100         *                        two timeouts, else use the later
     101         */
     102        public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) {
     103                int totalEvents = 0;
     104                long now = System.currentTimeMillis();
     105                long eventTime = now + timeoutMs;
     106                Long time = new Long(eventTime);
     107                synchronized(_events) {
     108                        // remove the old scheduled position, then reinsert it
     109                        Long oldTime = (Long)_eventTimes.get(event);
     110                        if(oldTime != null) {
     111                                if(useEarliestTime) {
     112                                        if(oldTime.longValue() < eventTime) {
     113                                                _events.notifyAll();
     114                                                return; // already scheduled for sooner than requested
     115                                        } else {
     116                                                _events.remove(oldTime);
     117                                        }
     118                                } else {
     119                                        if(oldTime.longValue() > eventTime) {
     120                                                _events.notifyAll();
     121                                                return; // already scheduled for later than the given period
     122                                        } else {
     123                                                _events.remove(oldTime);
     124                                        }
     125                                }
     126                        }
     127                        while(_events.containsKey(time)) {
     128                                time = new Long(time.longValue() + 1);
     129                        }
     130                        _events.put(time, event);
     131                        _eventTimes.put(event, time);
     132
     133                        if((_events.size() != _eventTimes.size())) {
     134                                _log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
     135                                for(Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext();) {
     136                                        TimedEvent evt = (TimedEvent)iter.next();
     137                                        Long when = (Long)_eventTimes.get(evt);
     138                                        TimedEvent cur = (TimedEvent)_events.get(when);
     139                                        if(cur != evt) {
     140                                                _log.error("event " + evt + " @ " + when + ": " + cur);
     141                                        }
     142                                }
     143                        }
     144
     145                        totalEvents = _events.size();
     146                        _events.notifyAll();
     147                }
     148                if(time.longValue() > eventTime + 100) {
     149                        if(_log.shouldLog(Log.WARN)) {
     150                                _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")");
     151                        }
     152                }
     153                long timeToAdd = System.currentTimeMillis() - now;
     154                if(timeToAdd > 50) {
     155                        if(_log.shouldLog(Log.WARN)) {
     156                                _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
     157                        }
     158                }
     159
     160        }
     161
     162        /**
     163         *
     164         * @param evt
     165         * @return
     166         */
     167        public boolean removeEvent(TimedEvent evt) {
     168                if(evt == null) {
     169                        return false;
     170                }
     171                synchronized(_events) {
     172                        Long when = (Long)_eventTimes.remove(evt);
     173                        if(when != null) {
     174                                _events.remove(when);
     175                        }
     176                        return null != when;
     177                }
     178        }
     179
     180        /**
     181         * Simple interface for events to be queued up and notified on expiration
     182         */
     183        public interface TimedEvent {
     184
     185                /**
     186                 * the time requested has been reached (this call should NOT block,
     187                 * otherwise the whole SimpleTimer gets backed up)
     188                 *
     189                 */
     190                public void timeReached();
     191        }
     192        private long _occurredTime;
     193        private long _occurredEventCount;
     194        // not used
     195        //  private TimedEvent _recentEvents[] = new TimedEvent[5];
     196        private class SimpleTimerRunner implements Runnable {
     197
     198                public void run() {
     199                        List eventsToFire = new ArrayList(1);
     200                        while(runn.getAnswer()) {
     201                                try {
     202                                        synchronized(_events) {
     203                                                //if (_events.size() <= 0)
     204                                                //    _events.wait();
     205                                                //if (_events.size() > 100)
     206                                                //    _log.warn("> 100 events!  " + _events.values());
     207                                                long now = System.currentTimeMillis();
     208                                                long nextEventDelay = -1;
     209                                                Object nextEvent = null;
     210                                                while(runn.getAnswer()) {
     211                                                        if(_events.size() <= 0) {
     212                                                                break;
     213                                                        }
     214                                                        Long when = (Long)_events.firstKey();
     215                                                        if(when.longValue() <= now) {
     216                                                                TimedEvent evt = (TimedEvent)_events.remove(when);
     217                                                                if(evt != null) {
     218                                                                        _eventTimes.remove(evt);
     219                                                                        eventsToFire.add(evt);
     220                                                                }
     221                                                        } else {
     222                                                                nextEventDelay = when.longValue() - now;
     223                                                                nextEvent = _events.get(when);
     224                                                                break;
     225                                                        }
     226                                                }
     227                                                if(eventsToFire.size() <= 0) {
     228                                                        if(nextEventDelay != -1) {
     229                                                                if(_log.shouldLog(Log.DEBUG)) {
     230                                                                        _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
     231                                                                }
     232                                                                _events.wait(nextEventDelay);
     233                                                        } else {
     234                                                                _events.wait();
     235                                                        }
     236                                                }
     237                                        }
     238                                } catch(InterruptedException ie) {
     239                                        // ignore
     240                                } catch(Throwable t) {
     241                                        if(_log != null) {
     242                                                _log.log(Log.CRIT, "Uncaught exception in the SimpleTimer!", t);
     243                                        } else {
     244                                                System.err.println("Uncaught exception in SimpleTimer");
     245                                                t.printStackTrace();
     246                                        }
     247                                }
     248
     249                                long now = System.currentTimeMillis();
     250                                now = now - (now % 1000);
     251
     252                                synchronized(_readyEvents) {
     253                                        for(int i = 0; i < eventsToFire.size(); i++) {
     254                                                _readyEvents.add(eventsToFire.get(i));
     255                                        }
     256                                        _readyEvents.notifyAll();
     257                                }
     258
     259                                if(_occurredTime == now) {
     260                                        _occurredEventCount += eventsToFire.size();
     261                                } else {
     262                                        _occurredTime = now;
     263                                        if(_occurredEventCount > 2500) {
     264                                                StringBuffer buf = new StringBuffer(128);
     265                                                buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
     266                                                buf.append(") in a second!");
     267                                                _log.log(Log.WARN, buf.toString());
     268                                        }
     269                                        _occurredEventCount = 0;
     270                                }
     271
     272                                eventsToFire.clear();
     273                        }
     274                }
     275        }
    225276}
    226277
Note: See TracChangeset for help on using the changeset viewer.