Changeset dd7d993


Ignore:
Timestamp:
Sep 25, 2008 11:59:01 PM (12 years ago)
Author:
sponge <sponge@…>
Branches:
master
Children:
b0313bd6
Parents:
ee2fd32
Message:

Added Simple true/false storage class to the utilities
Added socketSoTimeout
CHANGED RetransmissionTimer? is now public
FIXED SimpleTimer? has a way to be stopped, and reap it's children
FIXED Lots of javadoc additions, where I could
CLEANUP all code that needed to catch the timeout exception for socketSoTimeout

Files:
1 added
12 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java

    ree2fd32 rdd7d993  
    1313import java.net.Socket;
    1414import java.net.SocketException;
     15import java.net.SocketTimeoutException;
    1516import java.util.Iterator;
    1617import java.util.Properties;
     
    220221                        _log.error("Error accepting", ce);
    221222                    // not killing the server..
     223                } catch(SocketTimeoutException ste) {
     224                        // ignored, we never set the timeout
    222225                }
    223226            }
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java

    ree2fd32 rdd7d993  
    33import java.net.ConnectException;
    44
     5import java.net.SocketTimeoutException;
    56import net.i2p.I2PException;
    67
     
    1011 */
    1112public interface I2PServerSocket {
     13
    1214    /**
    1315     * Closes the socket.
     
    2527     *         from the data available (aka the I2PSession closed, etc)
    2628     * @throws ConnectException if the I2PServerSocket is closed
     29         * @throws SocketTimeoutException
     30         */
     31        public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException;
     32
     33        /**
     34         * Set Sock Option accept timeout
     35         * @param x
     36         */
     37        public void setSoTimeout(long x);
     38
     39        /**
     40         * Get Sock Option accept timeout
     41         * @return timeout
    2742     */
    28     public I2PSocket accept() throws I2PException, ConnectException;
     43        public long getSoTimeout();
    2944
    3045    /**
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    ree2fd32 rdd7d993  
    1818 */
    1919class I2PServerSocketImpl implements I2PServerSocket {
     20
    2021    private final static Log _log = new Log(I2PServerSocketImpl.class);
    2122    private I2PSocketManager mgr;
    2223    /** list of sockets waiting for the client to accept them */
    2324    private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
    24    
    2525    /** have we been closed */
    2626    private volatile boolean closing = false;
    27    
    2827    /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */
    2928    private Object socketAcceptedLock = new Object();
     
    3130    private Object socketAddedLock = new Object();
    3231   
     32        /**
     33         * Set Sock Option accept timeout stub, does nothing
     34         * @param x
     35         */
     36        public void setSoTimeout(long x) {
     37        }
     38
     39        /**
     40         * Get Sock Option accept timeout stub, does nothing
     41         * @return timeout
     42         */
     43        public long getSoTimeout() {
     44                return -1;
     45        }
     46
    3347    public I2PServerSocketImpl(I2PSocketManager mgr) {
    3448        this.mgr = mgr;
     
    4862     */
    4963    public I2PSocket accept() throws I2PException, ConnectException {
    50         if (_log.shouldLog(Log.DEBUG))
     64                if(_log.shouldLog(Log.DEBUG)) {
    5165            _log.debug("accept() called, pending: " + pendingSockets.size());
    52        
     66                }
    5367        I2PSocket ret = null;
    5468       
    5569        while ( (ret == null) && (!closing) ){
    5670            while (pendingSockets.size() <= 0) {
    57                 if (closing) throw new ConnectException("I2PServerSocket closed");
     71                                if(closing) {
     72                                        throw new ConnectException("I2PServerSocket closed");
     73                                }
    5874                try {
    5975                    synchronized(socketAddedLock) {
    6076                        socketAddedLock.wait();
    6177                    }
    62                 } catch (InterruptedException ie) {}
     78                                } catch(InterruptedException ie) {
     79                                }
    6380            }
    6481            synchronized (pendingSockets) {
     
    7491        }
    7592       
    76         if (_log.shouldLog(Log.DEBUG))
     93                if(_log.shouldLog(Log.DEBUG)) {
    7794            _log.debug("TIMING: handed out accept result " + ret.hashCode());
     95                }
    7896        return ret;
    7997    }
     
    89107     */
    90108    public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
    91         if (_log.shouldLog(Log.DEBUG))
     109                if(_log.shouldLog(Log.DEBUG)) {
    92110            _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
    93        
     111                }
    94112        if (closing) {
    95             if (_log.shouldLog(Log.WARN))
     113                        if(_log.shouldLog(Log.WARN)) {
    96114                _log.warn("Already closing the socket");
     115                        }
    97116            return false;
    98117        }
     
    111130            long now = clock.now();
    112131            if (now >= end) {
    113                 if (_log.shouldLog(Log.INFO))
     132                                if(_log.shouldLog(Log.INFO)) {
    114133                    _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
     134                                }
    115135                pendingSockets.remove(s);
    116136                return false;
    117137            }
    118138            if (closing) {
    119                 if (_log.shouldLog(Log.WARN))
     139                                if(_log.shouldLog(Log.WARN)) {
    120140                    _log.warn("Server socket closed while waiting for accept");
     141                                }
    121142                pendingSockets.remove(s);
    122143                return false;
     
    127148                    socketAcceptedLock.wait(remaining);
    128149                }
    129             } catch (InterruptedException ie) {}
     150                        } catch(InterruptedException ie) {
     151                        }
    130152        }
    131153        long now = clock.now();
    132         if (_log.shouldLog(Log.DEBUG))
     154                if(_log.shouldLog(Log.DEBUG)) {
    133155            _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
     156                }
    134157        return true;
    135158    }
     
    147170    }
    148171   
    149     public I2PSocketManager getManager() { return mgr; }
     172        public I2PSocketManager getManager() {
     173                return mgr;
     174        }
    150175}
  • apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java

    ree2fd32 rdd7d993  
    66import java.io.InputStream;
    77import java.net.ConnectException;
     8import java.net.SocketTimeoutException;
    89import java.util.Properties;
    910
     
    2122 */
    2223public class StreamSinkServer {
     24
    2325    private Log _log;
    2426    private String _sinkDir;
     
    3739        this(sinkDir, ourDestFile, null, -1, 3);
    3840    }
     41
    3942    public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) {
    4043        _sinkDir = sinkDir;
     
    5356    public void runServer() {
    5457        I2PSocketManager mgr = null;
    55         if (_i2cpHost != null)
     58                if(_i2cpHost != null) {
    5659            mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties());
    57         else
     60                } else {
    5861            mgr = I2PSocketManagerFactory.createManager();
     62                }
    5963        Destination dest = mgr.getSession().getMyDestination();
    60         if (_log.shouldLog(Log.INFO))
     64                if(_log.shouldLog(Log.INFO)) {
    6165            _log.info("Listening for connections on: " + dest.calculateHash().toBase64());
     66                }
    6267        FileOutputStream fos = null;
    6368        try {
     
    7176            return;
    7277        } finally {
    73             if (fos != null) try { fos.close(); } catch (IOException ioe) {}
     78                        if(fos != null) {
     79                                try {
     80                                        fos.close();
     81                                } catch(IOException ioe) {
     82                                }
     83                        }
    7484        }
    7585       
     
    92102     */
    93103    private class ClientRunner implements Runnable {
     104
    94105        private I2PServerSocket _socket;
     106
    95107        public ClientRunner(I2PServerSocket socket) {
    96108            _socket = socket;
    97109        }
     110
    98111        public void run() {
    99112            while (true) {
    100113                try {
    101114                    I2PSocket socket = _socket.accept();
    102                     if (socket != null)
     115                                        if(socket != null) {
    103116                        handle(socket);
     117                                        }
    104118                } catch (I2PException ie) {
    105119                    _log.error("Error accepting connection", ie);
     
    108122                    _log.error("Connection already dropped", ce);
    109123                    return;
     124                                } catch(SocketTimeoutException ste) {
     125                                        // ignored
    110126                }       
    111127            }
     
    116132            try {
    117133                File sink = new File(_sinkDir);
    118                 if (!sink.exists())
     134                                if(!sink.exists()) {
    119135                    sink.mkdirs();
     136                                }
    120137                File cur = File.createTempFile("clientSink", ".dat", sink);
    121138                fos = new FileOutputStream(cur);
    122                 if (_log.shouldLog(Log.DEBUG))
     139                                if(_log.shouldLog(Log.DEBUG)) {
    123140                    _log.debug("Writing to " + cur.getAbsolutePath());
     141                                }
    124142            } catch (IOException ioe) {
    125143                _log.error("Error creating sink", ioe);
     
    136154                    //_fos.write(buf, 0, read);
    137155                    written += read;
    138                     if (_log.shouldLog(Log.DEBUG))
     156                                        if(_log.shouldLog(Log.DEBUG)) {
    139157                        _log.debug("read and wrote " + read + " (" + written + ")");
    140158                }
     159                                }
    141160                fos.write(("written: [" + written + "]\n").getBytes());
    142161                long lifetime = System.currentTimeMillis() - start;
     
    145164                _log.error("Error writing the sink", ioe);
    146165            } finally {
    147                 if (fos != null) try { fos.close(); } catch (IOException ioe) {}
    148                 if (sock != null) try { sock.close(); } catch (IOException ioe) {}
     166                                if(fos != null) {
     167                                        try {
     168                                                fos.close();
     169                                        } catch(IOException ioe) {
     170                                        }
     171                                }
     172                                if(sock != null) {
     173                                        try {
     174                                                sock.close();
     175                                        } catch(IOException ioe) {
     176                                        }
     177                                }
    149178                _log.debug("Client socket closed");
    150179            }
     
    175204                    try {
    176205                        handlers = Integer.parseInt(args[4]);
    177                     } catch (NumberFormatException nfe) {}
     206                                        } catch(NumberFormatException nfe) {
     207                                        }
    178208                }
    179209                try {
     
    187217                System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]");
    188218        }
    189         if (server != null)
     219                if(server != null) {
    190220            server.runServer();
    191221    }
    192222}
     223}
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java

    ree2fd32 rdd7d993  
    11package net.i2p.client.streaming;
    22
     3import java.net.SocketTimeoutException;
    34import java.util.ArrayList;
    45import java.util.List;
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    ree2fd32 rdd7d993  
    2222 */
    2323public class ConnectionManager {
     24
    2425    private I2PAppContext _context;
    2526    private Log _log;
     
    4041    private volatile int _numWaiting;
    4142    private Object _connectionLock;
     43        private long SoTimeout;
    4244   
    4345    public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
     
    5961        _defaultOptions = defaultOptions;
    6062        _numWaiting = 0;
     63                /** Socket timeout for accept() */
     64                SoTimeout = -1;
     65
    6166        _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    6267        _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     
    7681        }
    7782    }
     83
    7884    /**
    7985     * not guaranteed to be unique, but in case we receive more than one packet
     
    8490            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    8591                Connection con = (Connection)iter.next();
    86                 if (DataHelper.eq(con.getSendStreamId(), id))
     92                                if(DataHelper.eq(con.getSendStreamId(), id)) {
    8793                    return con;
    8894            }
    8995        }
     96                }
    9097        return null;
    9198    }
    9299   
     100        /**
     101         * Set the socket accept() timeout.
     102         * @param x
     103         */
     104        public void MsetSoTimeout(long x) {
     105                SoTimeout = x;
     106        }
     107
     108        /**
     109         * Get the socket accept() timeout.
     110         * @return
     111         */
     112        public long MgetSoTimeout() {
     113                return SoTimeout;
     114        }
     115
    93116    public void setAllowIncomingConnections(boolean allow) {
    94117        _connectionHandler.setActive(allow);
    95118    }
     119
    96120    /** should we acceot connections, or just reject everyone? */
    97121    public boolean getAllowIncomingConnections() {
     
    114138            total = _connectionByInboundId.size();
    115139            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    116                 if ( ((Connection)iter.next()).getIsConnected() )
     140                                if(((Connection)iter.next()).getIsConnected()) {
    117141                    active++;
    118142            }
     143                        }
    119144            if (locked_tooManyStreams()) {
    120145                reject = true;
     
    136161       
    137162        if (reject) {
    138             if (_log.shouldLog(Log.WARN))
    139                 _log.warn("Refusing connection since we have exceeded our max of "
    140                           + _maxConcurrentStreams + " connections");
     163                        if(_log.shouldLog(Log.WARN)) {
     164                                _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections");
     165                        }
    141166            PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom());
    142167            reply.setFlag(Packet.FLAG_RESET);
     
    164189        return con;
    165190    }
    166    
    167191    private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000;
    168192   
     
    177201        long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
    178202        long expiration = _context.clock().now() + opts.getConnectTimeout();
    179         if (opts.getConnectTimeout() <= 0)
     203                if(opts.getConnectTimeout() <= 0) {
    180204            expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
     205                }
    181206        _numWaiting++;
    182207        while (true) {
    183208            long remaining = expiration - _context.clock().now();
    184209            if (remaining <= 0) {
    185                 if (_log.shouldLog(Log.WARN))
    186                 _log.warn("Refusing to connect since we have exceeded our max of "
    187                           + _maxConcurrentStreams + " connections");
     210                                if(_log.shouldLog(Log.WARN)) {
     211                                        _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections");
     212                                }
    188213                _numWaiting--;
    189214                return null;
     
    194219                    // allow a full buffer of pending/waiting streams
    195220                    if (_numWaiting > _maxConcurrentStreams) {
    196                         if (_log.shouldLog(Log.WARN))
    197                             _log.warn("Refusing connection since we have exceeded our max of "
    198                                       + _maxConcurrentStreams + " and there are " + _numWaiting
    199                                       + " waiting already");
     221                                                if(_log.shouldLog(Log.WARN)) {
     222                                                        _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already");
     223                                                }
    200224                        _numWaiting--;
    201225                        return null;
     
    203227                   
    204228                    // no remaining streams, lets wait a bit
    205                     try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
     229                                        try {
     230                                                _connectionLock.wait(remaining);
     231                                        } catch(InterruptedException ie) {
     232                                        }
    206233                } else {
    207234                    con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
     
    225252            con.waitForConnect();
    226253        }
    227         if (_numWaiting > 0)
     254                if(_numWaiting > 0) {
    228255            _numWaiting--;
    229        
     256                }
    230257        _context.statManager().addRateData("stream.connectionCreated", 1, 0);
    231258        return con;
     
    233260
    234261    private boolean locked_tooManyStreams() {
    235         if (_maxConcurrentStreams <= 0) return false;
    236         if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
     262                if(_maxConcurrentStreams <= 0) {
     263                        return false;
     264                }
     265                if(_connectionByInboundId.size() < _maxConcurrentStreams) {
     266                        return false;
     267                }
    237268        int active = 0;
    238269        for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    239270            Connection con = (Connection)iter.next();
    240             if (con.getIsConnected())
     271                        if(con.getIsConnected()) {
    241272                active++;
    242273        }
    243        
    244         if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
    245             _log.info("More than 100 connections!  " + active
    246                       + " total: " + _connectionByInboundId.size());
    247 
     274                }
     275       
     276                if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) {
     277                        _log.info("More than 100 connections!  " + active + " total: " + _connectionByInboundId.size());
     278                }
    248279        return (active >= _maxConcurrentStreams);
    249280    }
    250281   
    251     public MessageHandler getMessageHandler() { return _messageHandler; }
    252     public PacketHandler getPacketHandler() { return _packetHandler; }
    253     public ConnectionHandler getConnectionHandler() { return _connectionHandler; }
    254     public I2PSession getSession() { return _session; }
    255     public PacketQueue getPacketQueue() { return _outboundQueue; }
     282        public MessageHandler getMessageHandler() {
     283                return _messageHandler;
     284        }
     285
     286        public PacketHandler getPacketHandler() {
     287                return _packetHandler;
     288        }
     289
     290        public ConnectionHandler getConnectionHandler() {
     291                return _connectionHandler;
     292        }
     293
     294        public I2PSession getSession() {
     295                return _session;
     296        }
     297
     298        public PacketQueue getPacketQueue() {
     299                return _outboundQueue;
     300        }
    256301   
    257302    /**
     
    280325            Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
    281326            removed = (o == con);
    282             if (_log.shouldLog(Log.DEBUG))
    283                 _log.debug("Connection removed? " + removed + " remaining: "
    284                            + _connectionByInboundId.size() + ": " + con);
    285             if (!removed && _log.shouldLog(Log.DEBUG))
     327                        if(_log.shouldLog(Log.DEBUG)) {
     328                                _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con);
     329                        }
     330                        if(!removed && _log.shouldLog(Log.DEBUG)) {
    286331                _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
     332                        }
    287333            _connectionLock.notifyAll();
    288334        }
     
    310356        return ping(peer, timeoutMs, true);
    311357    }
     358
    312359    public boolean ping(Destination peer, long timeoutMs, boolean blocking) {
    313360        return ping(peer, timeoutMs, blocking, null, null, null);
    314361    }
     362
    315363    public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
    316364        Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
     
    336384        if (blocking) {
    337385            synchronized (req) {
    338                 if (!req.pongReceived())
    339                     try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
     386                                if(!req.pongReceived()) {
     387                                        try {
     388                                                req.wait(timeoutMs);
     389                                        } catch(InterruptedException ie) {
     390                                        }
     391                                }
    340392            }
    341393           
     
    352404
    353405    interface PingNotifier {
     406
    354407        public void pingComplete(boolean ok);
    355408    }
    356409   
    357410    private class PingFailed implements SimpleTimer.TimedEvent {
     411
    358412        private Long _id;
    359413        private PingNotifier _notifier;
     414
    360415        public PingFailed(Long id, PingNotifier notifier) {
    361416            _id = id;
     
    367422            synchronized (_pendingPings) {
    368423                Object o = _pendingPings.remove(_id);
    369                 if (o != null)
     424                                if(o != null) {
    370425                    removed = true;
    371426            }
     427                        }
    372428            if (removed) {
    373                 if (_notifier != null)
     429                                if(_notifier != null) {
    374430                    _notifier.pingComplete(false);
    375                 if (_log.shouldLog(Log.INFO))
     431                                }
     432                                if(_log.shouldLog(Log.INFO)) {
    376433                    _log.info("Ping failed");
    377434            }
    378435        }
    379436    }
     437        }
    380438   
    381439    private class PingRequest {
     440
    382441        private boolean _ponged;
    383442        private Destination _peer;
    384443        private PacketLocal _packet;
    385444        private PingNotifier _notifier;
     445
    386446        public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) {
    387447            _ponged = false;
     
    390450            _notifier = notifier;
    391451        }
     452
    392453        public void pong() {
    393454            _log.debug("Ping successful");
     
    397458                ConnectionManager.PingRequest.this.notifyAll();
    398459            }
    399             if (_notifier != null)
     460                        if(_notifier != null) {
    400461                _notifier.pingComplete(true);
    401462        }
    402         public boolean pongReceived() { return _ponged; }
     463                }
     464
     465                public boolean pongReceived() {
     466                        return _ponged;
     467                }
    403468    }
    404469   
     
    408473            req = (PingRequest)_pendingPings.remove(new Long(pingId));
    409474        }
    410         if (req != null)
     475                if(req != null) {
    411476            req.pong();
    412477    }
    413478}
     479}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java

    ree2fd32 rdd7d993  
    11package net.i2p.client.streaming;
    22
     3import java.net.SocketTimeoutException;
     4import java.util.logging.Level;
     5import java.util.logging.Logger;
    36import net.i2p.I2PException;
    47
     
    811 */
    912public class I2PServerSocketFull implements I2PServerSocket {
     13
    1014    private I2PSocketManagerFull _socketManager;
    1115   
     16        /**
     17         *
     18         * @param mgr
     19         */
    1220    public I2PServerSocketFull(I2PSocketManagerFull mgr) {
    1321        _socketManager = mgr;
    1422    }
    1523   
    16     public I2PSocket accept() throws I2PException {
     24        /**
     25         *
     26         * @return
     27         * @throws net.i2p.I2PException
     28         * @throws SocketTimeoutException
     29         */
     30        public I2PSocket accept() throws I2PException, SocketTimeoutException {
    1731        return _socketManager.receiveSocket();
    1832    }
    1933   
    20     public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); }
     34        public long getSoTimeout() {
     35                return _socketManager.getConnectionManager().MgetSoTimeout();
     36        }
    2137   
    22     public I2PSocketManager getManager() { return _socketManager; }
     38        public void setSoTimeout(long x) {
     39                _socketManager.getConnectionManager().MsetSoTimeout(x);
     40        }
     41        /**
     42         * Close the connection.
     43         */
     44        public void close() {
     45                _socketManager.getConnectionManager().setAllowIncomingConnections(false);
     46        }
     47
     48        /**
     49         *
     50         * @return _socketManager
     51         */
     52        public I2PSocketManager getManager() {
     53                return _socketManager;
     54        }
    2355}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java

    ree2fd32 rdd7d993  
    1212 */
    1313public class I2PSocketFull implements I2PSocket {
     14
    1415    private Connection _connection;
    1516    private I2PSocket.SocketErrorListener _listener;
     
    2526    }
    2627   
     28
    2729    public void close() throws IOException {
    2830        Connection c = _connection;
    29         if (c == null) return;
     31                if(c == null) {
     32                        return;
     33                }
    3034        if (c.getIsConnected()) {
    3135            OutputStream out = c.getOutputStream();
     
    4549    }
    4650   
    47     Connection getConnection() { return _connection; }
     51        Connection getConnection() {
     52                return _connection;
     53        }
    4854   
    4955    public InputStream getInputStream() {
    5056        Connection c = _connection;
    51         if (c != null)
     57                if(c != null) {
    5258            return c.getInputStream();
    53         else
     59                } else {
    5460            return null;
    5561    }
     62        }
    5663   
    5764    public I2PSocketOptions getOptions() {
    5865        Connection c = _connection;
    59         if (c != null)
     66                if(c != null) {
    6067            return c.getOptions();
    61         else
     68                } else {
    6269            return null;
    6370    }
     71        }
    6472   
    6573    public OutputStream getOutputStream() throws IOException {
    6674        Connection c = _connection;
    67         if (c != null)
     75                if(c != null) {
    6876            return c.getOutputStream();
    69         else
     77                } else {
    7078            return null;
    7179    }
     80        }
    7281   
    73     public Destination getPeerDestination() { return _remotePeer; }
     82        public Destination getPeerDestination() {
     83                return _remotePeer;
     84        }
    7485   
    7586    public long getReadTimeout() {
    7687        I2PSocketOptions opts = getOptions();
    77         if (opts != null)
     88                if(opts != null) {
    7889            return opts.getReadTimeout();
    79         else
     90                } else {
    8091            return -1;
    8192    }
     93        }
    8294   
    83     public Destination getThisDestination() { return _localPeer; }
     95        public Destination getThisDestination() {
     96                return _localPeer;
     97        }
    8498   
    8599    public void setOptions(I2PSocketOptions options) {
    86100        Connection c = _connection;
    87         if (c == null) return;
    88        
    89         if (options instanceof ConnectionOptions)
     101                if(c == null) {
     102                        return;
     103                }
     104                if(options instanceof ConnectionOptions) {
    90105            c.setOptions((ConnectionOptions)options);
    91         else
     106                } else {
    92107            c.setOptions(new ConnectionOptions(options));
    93108    }
     109        }
    94110   
    95111    public void setReadTimeout(long ms) {
    96112        Connection c = _connection;
    97         if (c == null) return;
    98        
     113                if(c == null) {
     114                        return;
     115                }
    99116        c.getInputStream().setReadTimeout((int)ms);
    100117        c.getOptions().setReadTimeout(ms);
     
    117134        _connection = null;
    118135        _listener = null;
    119         if (c != null)
     136                if(c != null) {
    120137            c.disconnectComplete();
    121138    }
     139        }
     140
    122141    public String toString() {
    123142        Connection c = _connection;
    124         if (c == null)
     143                if(c == null) {
    125144            return super.toString();
    126         else
     145                } else {
    127146            return c.toString();
    128147    }
    129148}
     149}
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java

    ree2fd32 rdd7d993  
    22
    33import java.net.NoRouteToHostException;
     4import java.net.SocketTimeoutException;
    45import java.util.HashSet;
    56import java.util.Iterator;
     
    1314import net.i2p.data.Destination;
    1415import net.i2p.util.Log;
    15 
    1616
    1717/**
     
    2424 */
    2525public class I2PSocketManagerFull implements I2PSocketManager {
     26
    2627    private I2PAppContext _context;
    2728    private Log _log;
     
    3435    private static int __managerId = 0;
    3536    private ConnectionManager _connectionManager;
    36    
    3737    /**
    3838     * How long to wait for the client app to accept() before sending back CLOSE?
     
    4141    private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000;
    4242   
     43        /**
     44         *
     45         */
    4346    public I2PSocketManagerFull() {
    4447        _context = null;
    4548        _session = null;
    4649    }
     50
     51        /**
     52         *
     53         * @param context
     54         * @param session
     55         * @param opts
     56         * @param name
     57         */
    4758    public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) {
    4859        this();
    4960        init(context, session, opts, name);
    5061    }
    51    
    5262    /** how many streams will we allow at once?  */
    5363    public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
     
    5565    /**
    5666     *
     67         *
     68         * @param context
     69         * @param session
     70         * @param opts
     71         * @param name
    5772     */
    5873    public void init(I2PAppContext context, I2PSession session, Properties opts, String name) {
     
    6681            _maxStreams = Integer.parseInt(num);
    6782        } catch (NumberFormatException nfe) {
    68             if (_log.shouldLog(Log.WARN))
     83                        if(_log.shouldLog(Log.WARN)) {
    6984                _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
     85                        }
    7086            _maxStreams = -1;
    7187        }
     
    7793       
    7894        if (_log.shouldLog(Log.INFO)) {
    79             _log.info("Socket manager created.  \ndefault options: " + _defaultOptions
    80                       + "\noriginal properties: " + opts);
    81         }
    82     }
    83 
    84     public I2PSocketOptions buildOptions() { return buildOptions(null); }
     95                        _log.info("Socket manager created.  \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts);
     96        }
     97    }
     98
     99        /**
     100         *
     101         * @return
     102         */
     103        public I2PSocketOptions buildOptions() {
     104                return buildOptions(null);
     105        }
     106
     107        /**
     108         *
     109         * @param opts
     110         * @return
     111         */
    85112    public I2PSocketOptions buildOptions(Properties opts) {
    86113        ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions);
     
    89116    }
    90117   
     118        /**
     119         *
     120         * @return
     121         */
    91122    public I2PSession getSession() {
    92123        return _session;
    93124    }
    94125   
     126        /**
     127         *
     128         * @return
     129         */
    95130    public ConnectionManager getConnectionManager() {
    96131        return _connectionManager;
    97132    }
    98133
    99     public I2PSocket receiveSocket() throws I2PException {
     134        /**
     135         *
     136         * @return
     137         * @throws net.i2p.I2PException
     138         * @throws java.net.SocketTimeoutException
     139         */
     140        public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException {
    100141        verifySession();
    101         Connection con = _connectionManager.getConnectionHandler().accept(-1);
    102         if (_log.shouldLog(Log.DEBUG))
     142                Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout());
     143                if(_log.shouldLog(Log.DEBUG)) {
    103144            _log.debug("receiveSocket() called: " + con);
     145                }
    104146        if (con != null) {
    105147            I2PSocketFull sock = new I2PSocketFull(con);
     
    107149            return sock;
    108150        } else {
     151                        if(_connectionManager.MgetSoTimeout() == -1) {
    109152            return null;
    110153        }
     154                        throw new SocketTimeoutException("I2PSocket timed out");
     155                }
    111156    }
    112157   
     
    115160     * the timeout specified, false otherwise.  This call blocks.
    116161     *
     162         *
     163         * @param peer
     164         * @param timeoutMs
     165         * @return
    117166     */
    118167    public boolean ping(Destination peer, long timeoutMs) {
     
    126175     * @param ms milliseconds to wait, maximum
    127176     */
    128     public void setAcceptTimeout(long ms) { _acceptTimeout = ms; }
    129     public long getAcceptTimeout() { return _acceptTimeout; }
    130 
     177        public void setAcceptTimeout(long ms) {
     178                _acceptTimeout = ms;
     179        }
     180
     181        /**
     182         *
     183         * @return
     184         */
     185        public long getAcceptTimeout() {
     186                return _acceptTimeout;
     187        }
     188
     189        /**
     190         *
     191         * @param options
     192         */
    131193    public void setDefaultOptions(I2PSocketOptions options) {
    132194        _defaultOptions = new ConnectionOptions((ConnectionOptions) options);
    133195    }
    134196
     197        /**
     198         *
     199         * @return
     200         */
    135201    public I2PSocketOptions getDefaultOptions() {
    136202        return _defaultOptions;
    137203    }
    138204
     205        /**
     206         *
     207         * @return
     208         */
    139209    public I2PServerSocket getServerSocket() {
    140210        _connectionManager.setAllowIncomingConnections(true);
     
    143213
    144214    private void verifySession() throws I2PException {
    145         if (!_connectionManager.getSession().isClosed())
     215                if(!_connectionManager.getSession().isClosed()) {
    146216            return;
     217                }
    147218        _connectionManager.getSession().connect();
    148219    }
     
    160231                             throws I2PException, NoRouteToHostException {
    161232        verifySession();
    162         if (options == null)
     233                if(options == null) {
    163234            options = _defaultOptions;
     235                }
    164236        ConnectionOptions opts = null;
    165         if (options instanceof ConnectionOptions)
     237                if(options instanceof ConnectionOptions) {
    166238            opts = new ConnectionOptions((ConnectionOptions)options);
    167         else
     239                } else {
    168240            opts = new ConnectionOptions(options);
    169        
    170         if (_log.shouldLog(Log.INFO))
    171             _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
    172                       + " with options: " + opts);
     241                }
     242                if(_log.shouldLog(Log.INFO)) {
     243                        _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts);
     244                }
    173245        Connection con = _connectionManager.connect(peer, opts);
    174         if (con == null)
     246                if(con == null) {
    175247            throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
     248                }
    176249        I2PSocketFull socket = new I2PSocketFull(con);
    177250        con.setSocket(socket);
     
    188261     * @param peer Destination to connect to
    189262     *
     263         * @return
    190264     * @throws NoRouteToHostException if the peer is not found or not reachable
    191265     * @throws I2PException if there is some other I2P-related problem
     
    217291     * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely.
    218292     *
     293         *
     294         * @return
    219295     */
    220296    public Set listSockets() {
     
    223299        for (Iterator iter = connections.iterator(); iter.hasNext(); ) {
    224300            Connection con = (Connection)iter.next();
    225             if (con.getSocket() != null)
     301                        if(con.getSocket() != null) {
    226302                rv.add(con.getSocket());
    227303        }
     304                }
    228305        return rv;
    229306    }
    230307
    231     public String getName() { return _name; }
    232     public void setName(String name) { _name = name; }
    233    
    234    
     308        /**
     309         *
     310         * @return
     311         */
     312        public String getName() {
     313                return _name;
     314        }
     315   
     316        /**
     317         *
     318         * @param name
     319         */
     320        public void setName(String name) {
     321                _name = name;
     322        }
     323   
     324        /**
     325         *
     326         * @param lsnr
     327         */
    235328    public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    236329        _connectionManager.getMessageHandler().addDisconnectListener(lsnr);
    237330    }
     331
     332        /**
     333         *
     334         * @param lsnr
     335         */
    238336    public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    239337        _connectionManager.getMessageHandler().removeDisconnectListener(lsnr);
  • apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java

    ree2fd32 rdd7d993  
    66 *
    77 */
    8 class RetransmissionTimer extends SimpleTimer {
     8public class RetransmissionTimer extends SimpleTimer {
    99    private static final RetransmissionTimer _instance = new RetransmissionTimer();
    1010    public static final SimpleTimer getInstance() { return _instance; }
  • core/java/src/net/i2p/util/Executor.java

    ree2fd32 rdd7d993  
    66
    77class Executor implements Runnable {
     8
    89    private I2PAppContext _context;
    910    private Log _log;
    1011    private List _readyEvents;
    11     public Executor(I2PAppContext ctx, Log log, List events) {
     12        private SimpleStore runn;
     13
     14        public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
    1215        _context = ctx;
    1316        _readyEvents = events;
     17                runn = x;
    1418    }
     19
    1520    public void run() {
    16         while (true) {
     21                while(runn.getAnswer()) {
    1722            SimpleTimer.TimedEvent evt = null;
    1823            synchronized (_readyEvents) {
    19                 if (_readyEvents.size() <= 0)
    20                     try { _readyEvents.wait(); } catch (InterruptedException ie) {}
    21                 if (_readyEvents.size() > 0)
     24                                if(_readyEvents.size() <= 0) {
     25                                        try {
     26                                                _readyEvents.wait();
     27                                        } catch(InterruptedException ie) {
     28                                        }
     29                                }
     30                                if(_readyEvents.size() > 0) {
    2231                    evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
    2332            }
     33                        }
    2434
    2535            if (evt != null) {
     
    3141                }
    3242                long time = _context.clock().now() - before;
    33                 if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
     43                                if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) {
    3444                    _log.warn("wtf, event execution took " + time + ": " + evt);
    3545            }
    3646        }
    3747    }
     48        }
    3849   
     50        /**
     51         *
     52         * @param msg
     53         * @param t
     54         */
    3955    private void log(String msg, Throwable t) {
    4056        synchronized (this) {
    41             if (_log == null)
     57                        if(_log == null) {
    4258                _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
    4359        }
     60                }
    4461        _log.log(Log.CRIT, msg, t);
    4562    }
  • core/java/src/net/i2p/util/SimpleTimer.java

    ree2fd32 rdd7d993  
    1717 */
    1818public class SimpleTimer {
     19
    1920    private static final SimpleTimer _instance = new SimpleTimer();
    20     public static SimpleTimer getInstance() { return _instance; }
     21
     22        public static SimpleTimer getInstance() {
     23                return _instance;
     24        }
    2125    private I2PAppContext _context;
    2226    private Log _log;
     
    2630    private Map _eventTimes;
    2731    private List _readyEvents;
    28    
    29     protected SimpleTimer() { this("SimpleTimer"); }
     32        private SimpleStore runn;
     33
     34        /**
     35         *
     36         */
     37        protected SimpleTimer() {
     38                this("SimpleTimer");
     39        }
     40   
     41        /**
     42         *
     43         * @param name
     44         */
    3045    protected SimpleTimer(String name) {
     46                runn = new SimpleStore(true);
    3147        _context = I2PAppContext.getGlobalContext();
    3248        _log = _context.logManager().getLog(SimpleTimer.class);
     
    3955        runner.start();
    4056        for (int i = 0; i < 3; i++) {
    41             I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents));
     57                        I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn));
    4258            executor.setName(name + "Executor " + i);
    4359            executor.setDaemon(true);
     
    4662    }
    4763   
     64        /**
     65         * Removes the SimpleTimer.
     66         */
     67        public void removeSimpleTimer() {
     68                synchronized(_events) {
     69                        runn.setAnswer(false);
     70                        _events.notifyAll();
     71                }
     72        }
     73
     74        /**
     75         *
     76         * @param event
     77         * @param timeoutMs
     78         */
    4879    public void reschedule(TimedEvent event, long timeoutMs) {
    4980        addEvent(event, timeoutMs, false);
     
    5687     * timeout.  If this is not the desired behavior, call removeEvent first.
    5788     *
     89         * @param event
     90         * @param timeoutMs
    5891     */
    59     public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); }
     92        public void addEvent(TimedEvent event, long timeoutMs) {
     93                addEvent(event, timeoutMs, true);
     94        }
     95
    6096    /**
     97         * @param event
     98         * @param timeoutMs
    6199     * @param useEarliestTime if its already scheduled, use the earlier of the
    62100     *                        two timeouts, else use the later
     
    87125                }
    88126            }
    89             while (_events.containsKey(time))
     127                        while(_events.containsKey(time)) {
    90128                time = new Long(time.longValue() + 1);
     129                        }
    91130            _events.put(time, event);
    92131            _eventTimes.put(event, time);
     
    108147        }
    109148        if (time.longValue() > eventTime + 100) {
    110             if (_log.shouldLog(Log.WARN))
    111                 _log.warn("Lots of timer congestion, had to push " + event + " back "
    112                            + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")");
     149                        if(_log.shouldLog(Log.WARN)) {
     150                                _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")");
     151                        }
    113152        }
    114153        long timeToAdd = System.currentTimeMillis() - now;
    115154        if (timeToAdd > 50) {
    116             if (_log.shouldLog(Log.WARN))
     155                        if(_log.shouldLog(Log.WARN)) {
    117156                _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued");
    118157        }
     158                }
    119159           
    120160    }
    121161   
     162        /**
     163         *
     164         * @param evt
     165         * @return
     166         */
    122167    public boolean removeEvent(TimedEvent evt) {
    123         if (evt == null) return false;
     168                if(evt == null) {
     169                        return false;
     170                }
    124171        synchronized (_events) {
    125172            Long when = (Long)_eventTimes.remove(evt);
    126             if (when != null)
     173                        if(when != null) {
    127174                _events.remove(when);
     175                        }
    128176            return null != when;
    129177        }
     
    134182     */
    135183    public interface TimedEvent {
     184
    136185        /**
    137186         * the time requested has been reached (this call should NOT block,
     
    141190        public void timeReached();
    142191    }
    143    
    144192    private long _occurredTime;
    145193    private long _occurredEventCount;
    146     private TimedEvent _recentEvents[] = new TimedEvent[5];
    147    
     194        // not used
     195        //  private TimedEvent _recentEvents[] = new TimedEvent[5];
    148196    private class SimpleTimerRunner implements Runnable {
     197
    149198        public void run() {
    150199            List eventsToFire = new ArrayList(1);
    151             while (true) {
     200                        while(runn.getAnswer()) {
    152201                try {
    153202                    synchronized (_events) {
     
    159208                        long nextEventDelay = -1;
    160209                        Object nextEvent = null;
    161                         while (true) {
    162                             if (_events.size() <= 0) break;
     210                                                while(runn.getAnswer()) {
     211                                                        if(_events.size() <= 0) {
     212                                                                break;
     213                                                        }
    163214                            Long when = (Long)_events.firstKey();
    164215                            if (when.longValue() <= now) {
     
    176227                        if (eventsToFire.size() <= 0) {
    177228                            if (nextEventDelay != -1) {
    178                                 if (_log.shouldLog(Log.DEBUG))
     229                                                                if(_log.shouldLog(Log.DEBUG)) {
    179230                                    _log.debug("Next event in " + nextEventDelay + ": " + nextEvent);
     231                                                                }
    180232                                _events.wait(nextEventDelay);
    181233                            } else {
     
    184236                        }
    185237                    }
    186                 } catch (ThreadDeath td) {
    187                     return; // die
    188238                } catch (InterruptedException ie) {
    189239                    // ignore
     
    201251
    202252                synchronized (_readyEvents) {
    203                     for (int i = 0; i < eventsToFire.size(); i++)
     253                                        for(int i = 0; i < eventsToFire.size(); i++) {
    204254                        _readyEvents.add(eventsToFire.get(i));
     255                                        }
    205256                    _readyEvents.notifyAll();
    206257                }
Note: See TracChangeset for help on using the changeset viewer.