Changeset a4d16af


Ignore:
Timestamp:
Apr 2, 2009 8:22:31 AM (12 years ago)
Author:
mkvore-commit <mkvore-commit@…>
Branches:
master
Children:
e0dccb59
Parents:
9aa8707
Message:

SAM version 3 :

  • Raw and Datagram sessions implemented
  • option "SILENT=true" added to the stream protocol
  • java 6 warnings removed

ministreaming :

  • java 6 warnings removed

ministreaming and streaming :

  • added functions :

I2PServerSocket.waitIncoming(long timeout)
I2PServerSocket.accept(boolean block)

Location:
apps
Files:
19 added
25 edited

Legend:

Unmodified
Added
Removed
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java

    r9aa8707 ra4d16af  
    3232
    3333    /**
     34     * accept(true) has the same behaviour as accept().
     35     * accept(false) does not wait for a socket connecting. If a socket is
     36     * available in the queue, it is accepted. Else, null is returned.
     37     *
     38     * @param true if the call should block until a socket is available
     39     *
     40     * @return a connected I2PSocket, or null
     41     *
     42     * @throws I2PException if there is a problem with reading a new socket
     43     *         from the data available (aka the I2PSession closed, etc)
     44     * @throws ConnectException if the I2PServerSocket is closed
     45     * @throws SocketTimeoutException
     46     */
     47    public I2PSocket accept(boolean blocking) throws I2PException, ConnectException, SocketTimeoutException;
     48
     49    /**
     50     * Waits until there is a socket waiting for acception or the timeout is
     51     * reached.
     52     *
     53     * @param timeoutMs timeout in ms. A negative value waits forever.
     54     *
     55     * @return true if a socket is available, false if not
     56     *
     57     * @throws I2PException if there is a problem with reading a new socket
     58     *         from the data available (aka the I2PSession closed, etc)
     59     * @throws ConnectException if the I2PServerSocket is closed
     60     */
     61    public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException;
     62
     63    /**
    3464     * Set Sock Option accept timeout
    3565     * @param x timeout in ms
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    r9aa8707 ra4d16af  
    2121    private I2PSocketManager mgr;
    2222    /** list of sockets waiting for the client to accept them */
    23     private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
     23    private List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4));
    2424   
    2525    /** have we been closed */
     
    5050    }
    5151   
    52     /**
    53      * Waits for the next socket connecting.  If a remote user tried to make a
    54      * connection and the local application wasn't .accept()ing new connections,
    55      * they should get refused (if .accept() doesnt occur in some small period -
    56      * currently 5 seconds)
    57      *
    58      * @return a connected I2PSocket
     52 
     53   
     54   
     55   
     56   
     57    /**
     58     * Waits until there is a socket waiting for acception or the timeout is
     59     * reached.
     60     *
     61     * @param timeoutMs timeout in ms. A negative value waits forever.
     62     *
     63     * @return true if a socket is available, false if not
    5964     *
    6065     * @throws I2PException if there is a problem with reading a new socket
     
    6267     * @throws ConnectException if the I2PServerSocket is closed
    6368     */
    64     public I2PSocket accept() throws I2PException, ConnectException {
    65         if (_log.shouldLog(Log.DEBUG))
    66             _log.debug("accept() called, pending: " + pendingSockets.size());
    67        
    68         I2PSocket ret = null;
    69        
    70         while ( (ret == null) && (!closing) ){
     69    public boolean waitIncoming(long timeoutMs) throws I2PException, ConnectException {
     70        if (_log.shouldLog(Log.DEBUG))
     71            _log.debug("waitIncoming() called, pending: " + pendingSockets.size());
     72       
     73        boolean isTimed = (timeoutMs>=0);
     74        if (isTimed) {
     75            Clock clock = I2PAppContext.getGlobalContext().clock();
     76            long now = clock.now();
     77            long end = now + timeoutMs;
     78            while (pendingSockets.size() <= 0 && now<end) {
     79                if (closing) throw new ConnectException("I2PServerSocket closed");
     80                try {
     81                    synchronized(socketAddedLock) {
     82                        socketAddedLock.wait(end - now);
     83                    }
     84                } catch (InterruptedException ie) {}
     85                now = clock.now();
     86            }
     87        } else {
    7188            while (pendingSockets.size() <= 0) {
    7289                if (closing) throw new ConnectException("I2PServerSocket closed");
     
    7794                } catch (InterruptedException ie) {}
    7895            }
    79             synchronized (pendingSockets) {
     96        }
     97                return (pendingSockets.size()>0);
     98        }
     99
     100   
     101    /**
     102     * accept(true) has the same behaviour as accept().
     103     * accept(false) does not wait for a socket connecting. If a socket is
     104     * available in the queue, it is accepted. Else, null is returned.
     105     *
     106     * @param true if the call should block until a socket is available
     107     *
     108     * @return a connected I2PSocket, or null
     109     *
     110     * @throws I2PException if there is a problem with reading a new socket
     111     *         from the data available (aka the I2PSession closed, etc)
     112     * @throws ConnectException if the I2PServerSocket is closed
     113     */
     114
     115        public I2PSocket accept(boolean blocking) throws I2PException, ConnectException {
     116        I2PSocket ret = null;
     117       
     118        if (blocking) {
     119                ret = accept();
     120        } else {
     121                synchronized (pendingSockets) {
    80122                if (pendingSockets.size() > 0) {
    81123                    ret = (I2PSocket)pendingSockets.remove(0);
     
    86128                    socketAcceptedLock.notifyAll();
    87129                }
    88             }
     130            }           
     131        }
     132                return ret;
     133        }
     134
     135        /**
     136     * Waits for the next socket connecting.  If a remote user tried to make a
     137     * connection and the local application wasn't .accept()ing new connections,
     138     * they should get refused (if .accept() doesnt occur in some small period -
     139     * currently 5 seconds)
     140     *
     141     * @return a connected I2PSocket
     142     *
     143     * @throws I2PException if there is a problem with reading a new socket
     144     *         from the data available (aka the I2PSession closed, etc)
     145     * @throws ConnectException if the I2PServerSocket is closed
     146     */
     147    public I2PSocket accept() throws I2PException, ConnectException {
     148        if (_log.shouldLog(Log.DEBUG))
     149            _log.debug("accept() called, pending: " + pendingSockets.size());
     150       
     151        I2PSocket ret = null;
     152       
     153        while ( (ret == null) && (!closing) ){
     154               
     155                this.waitIncoming(-1);
     156
     157                ret = accept(false);
    89158        }
    90159       
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    r9aa8707 ra4d16af  
    351351                bc.notifyAll();
    352352            }
    353             boolean timedOut = false;
    354353
    355354            while ( (read.length == 0) && (!inStreamClosed) ) {
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java

    r9aa8707 ra4d16af  
    1414import net.i2p.client.I2PSession;
    1515import net.i2p.client.I2PSessionException;
    16 import net.i2p.data.Destination;
    1716import net.i2p.util.Log;
    1817
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java

    r9aa8707 ra4d16af  
    4444    private I2PServerSocketImpl _serverSocket = null;
    4545    private Object lock = new Object(); // for locking socket lists
    46     private HashMap _outSockets;
    47     private HashMap _inSockets;
     46    private HashMap<String,I2PSocket> _outSockets;
     47    private HashMap<String,I2PSocket> _inSockets;
    4848    private I2PSocketOptions _defaultOptions;
    4949    private long _acceptTimeout;
    5050    private String _name;
    51     private List _listeners;
     51    private List<DisconnectListener> _listeners;
    5252    private static int __managerId = 0;
    5353   
     
    7777        _context = context;
    7878        _log = _context.logManager().getLog(I2PSocketManager.class);
    79         _inSockets = new HashMap(16);
    80         _outSockets = new HashMap(16);
     79        _inSockets = new HashMap<String,I2PSocket>(16);
     80        _outSockets = new HashMap<String,I2PSocket>(16);
    8181        _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
    82         _listeners = new ArrayList(1);
     82        _listeners = new ArrayList<DisconnectListener>(1);
    8383        setSession(session);
    8484        setDefaultOptions(buildOptions(opts));
     
    114114        _log.info(getName() + ": Disconnected from the session");
    115115        destroySocketManager();
    116         List listeners = null;
     116        List<DisconnectListener> listeners = null;
    117117        synchronized (_listeners) {
    118             listeners = new ArrayList(_listeners);
     118            listeners = new ArrayList<DisconnectListener>(_listeners);
    119119            _listeners.clear();
    120120        }
     
    131131    public void messageAvailable(I2PSession session, int msgId, long size) {
    132132        try {
    133             I2PSocketImpl s;
    134133            byte msg[] = session.receiveMessage(msgId);
    135134            if (msg.length == 1 && msg[0] == -1) {
     
    661660     */
    662661    public Set listSockets() {
    663         Set sockets = new HashSet(8);
     662        Set<I2PSocket> sockets = new HashSet<I2PSocket>(8);
    664663        synchronized (lock) {
    665664            sockets.addAll(_inSockets.values());
  • apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java

    r9aa8707 ra4d16af  
    2929    private boolean _dead; // unused? used elsewhere?
    3030   
     31    public void antiCompilationWarnings() {
     32        _log.debug(""+_conOptions+_dead);
     33    }
     34   
    3135    public static void main(String args[]) {
    3236        if (args.length < 1) {
     
    132136        }
    133137       
     138        public void antiCompilationWarnings() {
     139                _log.debug(""+this._lastReceived+this._lastReceivedOn+this._started);
     140        }
     141        public void antiCompilationWarnings(long x, long y) {
     142                if (false)
     143                        _log.debug(""+x+y);
     144        }
     145
    134146        public Flooder(I2PSocket socket) {
    135147            _socket = socket;
     
    155167            long value = 0;
    156168            long lastSend = _context.clock().now();
     169            this.antiCompilationWarnings(value, lastSend);
     170           
    157171            if (_socket == null) {
    158172                try {
  • apps/sam/java/src/net/i2p/sam/SAMBridge.java

    r9aa8707 ra4d16af  
    1515import java.io.IOException;
    1616import java.io.InputStreamReader;
    17 import java.net.InetAddress;
    18 import java.net.ServerSocket;
    19 import java.net.Socket;
     17import java.net.InetSocketAddress;
     18import java.nio.channels.ServerSocketChannel;
     19import java.nio.channels.SocketChannel;
     20import java.nio.ByteBuffer;
    2021import java.util.HashMap;
    2122import java.util.Iterator;
     
    3536public class SAMBridge implements Runnable {
    3637    private final static Log _log = new Log(SAMBridge.class);
    37     private ServerSocket serverSocket;
     38    private ServerSocketChannel serverSocket;
    3839    private Properties i2cpProps;
    3940    /**
     
    4647     * destination keys (Destination+PrivateKey+SigningPrivateKey)
    4748     */
    48     private Map nameToPrivKeys;
     49    private Map<String,String> nameToPrivKeys;
    4950
    5051    private boolean acceptConnections = true;
     
    5253    private static final int SAM_LISTENPORT = 7656;
    5354    public static final String DEFAULT_SAM_KEYFILE = "sam.keys";
     55    public static final String PROP_DATAGRAM_HOST = "sam.datagram.host";
     56    public static final String PROP_DATAGRAM_PORT = "sam.datagram.port";
     57    public static final String DEFAULT_DATAGRAM_HOST = "0.0.0.0";
     58    public static final String DEFAULT_DATAGRAM_PORT = "7655";
     59
    5460   
    5561    private SAMBridge() {}
     
    6571    public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) {
    6672        persistFilename = persistFile;
    67         nameToPrivKeys = new HashMap(8);
     73        nameToPrivKeys = new HashMap<String,String>(8);
    6874        loadKeys();
    6975        try {
    7076            if ( (listenHost != null) && !("0.0.0.0".equals(listenHost)) ) {
    71                 serverSocket = new ServerSocket(listenPort, 0, InetAddress.getByName(listenHost));
     77                serverSocket = ServerSocketChannel.open();
     78                serverSocket.socket().bind(new InetSocketAddress(listenHost, listenPort));
    7279                if (_log.shouldLog(Log.DEBUG))
    7380                    _log.debug("SAM bridge listening on "
    7481                               + listenHost + ":" + listenPort);
    7582            } else {
    76                 serverSocket = new ServerSocket(listenPort);
     83                serverSocket = ServerSocketChannel.open();
     84                serverSocket.socket().bind(new InetSocketAddress(listenPort));
    7785                if (_log.shouldLog(Log.DEBUG))
    7886                    _log.debug("SAM bridge listening on 0.0.0.0:" + listenPort);
     
    194202    /**
    195203     * Usage:
    196      *  <pre>SAMBridge [[listenHost ]listenPort[ name=val]*]</pre>
     204     *  <pre>SAMBridge [ keyfile [listenHost ] listenPort [ name=val ]* ]</pre>
    197205     *
    198206     * name=val options are passed to the I2CP code to build a session,
    199207     * allowing the bridge to specify an alternate I2CP host and port, tunnel
    200208     * depth, etc.
    201      * @param args [[listenHost ]listenPort[ name=val]*]
     209     * @param args [ keyfile [ listenHost ] listenPort [ name=val ]* ]
    202210     */
    203211    public static void main(String args[]) {
     
    267275        try {
    268276            while (acceptConnections) {
    269                 Socket s = serverSocket.accept();
     277                SocketChannel s = serverSocket.accept();
    270278                if (_log.shouldLog(Log.DEBUG))
    271279                    _log.debug("New connection from "
    272                                + s.getInetAddress().toString() + ":"
    273                                + s.getPort());
     280                               + s.socket().getInetAddress().toString() + ":"
     281                               + s.socket().getPort());
    274282
    275283                try {
     
    290298                    try {
    291299                        String reply = "HELLO REPLY RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n";
    292                         s.getOutputStream().write(reply.getBytes("ISO-8859-1"));
     300                        s.write(ByteBuffer.wrap(reply.getBytes("ISO-8859-1")));
    293301                    } catch (IOException ioe) {
    294302                        if (_log.shouldLog(Log.ERROR))
  • apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java

    r9aa8707 ra4d16af  
    3131    public static int DGRAM_SIZE_MAX = 31*1024;
    3232
    33     private SAMDatagramReceiver recv = null;
     33    protected SAMDatagramReceiver recv = null;
    3434
    3535    private I2PDatagramMaker dgramMaker;
     
    8585        if (data.length > DGRAM_SIZE_MAX)
    8686            throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
    87        
    88         byte[] dgram = dgramMaker.makeI2PDatagram(data);
    89 
     87        byte[] dgram ;
     88        synchronized (dgramMaker) {
     89                dgram = dgramMaker.makeI2PDatagram(data);
     90        }
    9091        return sendBytesThroughMessageSession(dest, dgram);
    9192    }
  • apps/sam/java/src/net/i2p/sam/SAMException.java

    r9aa8707 ra4d16af  
    1616public class SAMException extends Exception {
    1717
     18        static final long serialVersionUID = 1 ;
     19
    1820    public SAMException() {
    19         super();
     21        super();
    2022    }
    2123   
    2224    public SAMException(String s) {
    23         super(s);
     25        super(s);
    2426    }
    2527}
  • apps/sam/java/src/net/i2p/sam/SAMHandler.java

    r9aa8707 ra4d16af  
    1010
    1111import java.io.IOException;
    12 import java.io.InputStream;
    13 import java.io.OutputStream;
    14 import java.net.Socket;
     12import java.nio.channels.SocketChannel;
     13import java.nio.ByteBuffer;
    1514import java.util.Properties;
    1615
     
    3332
    3433    private Object socketWLock = new Object(); // Guards writings on socket
    35     private Socket socket = null;
    36     private OutputStream socketOS = null; // Stream associated to socket
     34    protected SocketChannel socket = null;
    3735
    3836    protected int verMajor = 0;
     
    5452     * @throws IOException
    5553     */
    56     protected SAMHandler(Socket s,
     54    protected SAMHandler(SocketChannel s,
    5755                         int verMajor, int verMinor, Properties i2cpProps) throws IOException {
    5856        socket = s;
    59         socketOS = socket.getOutputStream();
    6057
    6158        this.verMajor = verMajor;
     
    8784     * @throws IOException
    8885     */
    89     protected final InputStream getClientSocketInputStream() throws IOException {
    90         return socket.getInputStream();
     86    protected final SocketChannel getClientSocket() {
     87        return socket ;
    9188    }
    9289
     
    9996     * @throws IOException
    10097     */
    101     protected final void writeBytes(byte[] data) throws IOException {
     98    protected final void writeBytes(ByteBuffer data) throws IOException {
    10299        synchronized (socketWLock) {
    103             socketOS.write(data);
    104             socketOS.flush();
     100            writeBytes(data, socket);
    105101        }
     102    }
     103   
     104    static public void writeBytes(ByteBuffer data, SocketChannel out) throws IOException {
     105        while (data.hasRemaining()) out.write(data);           
     106        out.socket().getOutputStream().flush();
    106107    }
    107108   
     
    113114     */
    114115    protected Object getWriteLock() { return socketWLock; }
    115     protected OutputStream getOut() { return socketOS; }
    116116
    117117    /**
     
    122122     * @param str A byte array to be written
    123123     *
    124      * @return True is the string was successfully written, false otherwise
     124     * @return True if the string was successfully written, false otherwise
    125125     */
    126126    protected final boolean writeString(String str) {
    127127        if (_log.shouldLog(Log.DEBUG))
    128128            _log.debug("Sending the client: [" + str + "]");
    129         try {
    130             writeBytes(str.getBytes("ISO-8859-1"));
     129        return writeString(str, socket);
     130    }
     131
     132    public static boolean writeString(String str, SocketChannel out)
     133    {
     134        try {
     135            writeBytes(ByteBuffer.wrap(str.getBytes("ISO-8859-1")), out);
    131136        } catch (IOException e) {
    132137            _log.debug("Caught IOException", e);
    133138            return false;
    134139        }
    135 
    136         return true;
     140        return true ;
    137141    }
    138 
     142   
    139143    /**
    140144     * Close the socket connected to the SAM client.
     
    179183                + "; SAM version: " + verMajor + "." + verMinor
    180184                + "; client: "
    181                 + this.socket.getInetAddress().toString() + ":"
    182                 + this.socket.getPort() + ")");
     185                + this.socket.socket().getInetAddress().toString() + ":"
     186                + this.socket.socket().getPort() + ")");
    183187    }
    184188
  • apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java

    r9aa8707 ra4d16af  
    1010
    1111import java.io.IOException;
    12 import java.io.OutputStream;
    1312import java.io.UnsupportedEncodingException;
    14 import java.net.Socket;
     13import java.nio.channels.SocketChannel;
     14import java.nio.ByteBuffer;
    1515import java.util.Properties;
    1616import java.util.StringTokenizer;
     
    3535     * @return A SAM protocol handler, or null if the client closed before the handshake
    3636     */
    37     public static SAMHandler createSAMHandler(Socket s, Properties i2cpProps) throws SAMException {
     37    public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException {
    3838        String line;
    3939        StringTokenizer tok;
    4040
    4141        try {
    42             line = DataHelper.readLine(s.getInputStream());
     42            line = DataHelper.readLine(s.socket().getInputStream());
    4343            if (line == null) {
    4444                _log.debug("Connection closed by client");
    4545                return null;
    4646            }
    47             tok = new StringTokenizer(line, " ");
     47            tok = new StringTokenizer(line.trim(), " ");
    4848        } catch (IOException e) {
    4949            throw new SAMException("Error reading from socket: "
     
    9090        // Let's answer positively
    9191        try {
    92             OutputStream out = s.getOutputStream();
    93             out.write(("HELLO REPLY RESULT=OK VERSION="
    94                        + ver + "\n").getBytes("ISO-8859-1"));
     92            s.write(ByteBuffer.wrap(("HELLO REPLY RESULT=OK VERSION="
     93                       + ver + "\n").getBytes("ISO-8859-1")));
    9594        } catch (UnsupportedEncodingException e) {
    9695            _log.error("Caught UnsupportedEncodingException ("
     
    116115                handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps);
    117116                break;
     117            case 3:
     118                handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps);
     119                break;
    118120            default:
    119121                _log.error("BUG! Trying to initialize the wrong SAM version!");
     
    129131    /* Return the best version we can use, or null on failure */
    130132    private static String chooseBestVersion(String minVer, String maxVer) {
     133       
    131134        int minMajor = getMajor(minVer), minMinor = getMinor(minVer);
    132135        int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer);
     
    143146        float fmaxVer = (float) maxMajor + (float) maxMinor / 10 ;
    144147       
     148
     149        if ( ( fminVer <=  3.0 ) && ( fmaxVer >= 3.0 ) ) return "3.0" ;
    145150
    146151        if ( ( fminVer <=  2.0 ) && ( fmaxVer >= 2.0 ) ) return "2.0" ;
  • apps/sam/java/src/net/i2p/sam/SAMInvalidDirectionException.java

    r9aa8707 ra4d16af  
    1616 */
    1717public class SAMInvalidDirectionException extends Exception {
    18 
     18        static final long serialVersionUID = 1 ;
     19       
    1920    public SAMInvalidDirectionException() {
    2021        super();
  • apps/sam/java/src/net/i2p/sam/SAMMessageSession.java

    r9aa8707 ra4d16af  
    110110     */
    111111    protected boolean sendBytesThroughMessageSession(String dest, byte[] data) throws DataFormatException {
    112         Destination d = new Destination();
    113         d.fromBase64(dest);
     112        Destination d = SAMUtils.getDest(dest);
    114113
    115114        if (_log.shouldLog(Log.DEBUG)) {
  • apps/sam/java/src/net/i2p/sam/SAMRawSession.java

    r9aa8707 ra4d16af  
    2727    public static final int RAW_SIZE_MAX = 32*1024;
    2828
    29     private SAMRawReceiver recv = null;
     29    protected SAMRawReceiver recv = null;
    3030    /**
    3131     * Create a new SAM RAW session.
  • apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java

    r9aa8707 ra4d16af  
    1010
    1111import java.io.IOException;
     12import java.nio.ByteBuffer;
    1213
    1314import net.i2p.data.Destination;
     
    6162     * @throws IOException
    6263     */
    63     public void receiveStreamBytes(int id, byte data[], int len) throws IOException;
     64    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException;
    6465
    6566    /**
  • apps/sam/java/src/net/i2p/sam/SAMStreamSession.java

    r9aa8707 ra4d16af  
    1414import java.io.InterruptedIOException;
    1515import java.io.OutputStream;
     16import java.nio.ByteBuffer;
     17import java.nio.channels.Channels;
    1618import java.net.ConnectException;
    1719import java.net.NoRouteToHostException;
     
    5254    protected SAMStreamReceiver recv = null;
    5355
    54     private SAMStreamSessionServer server = null;
     56    protected SAMStreamSessionServer server = null;
    5557
    5658    protected I2PSocketManager socketMgr = null;
     
    5860    private Object handlersMapLock = new Object();
    5961    /** stream id (Long) to SAMStreamSessionSocketReader */
    60     private HashMap handlersMap = new HashMap();
     62    private HashMap<Integer,SAMStreamSessionSocketReader> handlersMap = new HashMap<Integer,SAMStreamSessionSocketReader>();
    6163    /** stream id (Long) to StreamSender */
    62     private HashMap sendersMap = new HashMap();
     64    private HashMap<Integer,StreamSender> sendersMap = new HashMap<Integer,StreamSender>();
    6365
    6466    private Object idLock = new Object();
     
    7678    public static String PROP_FORCE_FLUSH = "sam.forceFlush";
    7779    public static String DEFAULT_FORCE_FLUSH = "false";
     80   
     81    public SAMStreamSession() {
     82       
     83    }
    7884   
    7985    /**
     
    167173    }
    168174   
    169     private class DisconnectListener implements I2PSocketManager.DisconnectListener {
     175    protected class DisconnectListener implements I2PSocketManager.DisconnectListener {
    170176        public void sessionDisconnected() {
    171177            close();
     
    573579
    574580            int read = -1;
    575             byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
     581            ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE);
    576582
    577583            try {
     
    579585
    580586                while (stillRunning) {
    581                     read = in.read(data);
     587                        data.clear();
     588                    read = Channels.newChannel(in).read(data);
    582589                    if (read == -1) {
    583590                        _log.debug("Handler " + id + ": connection closed");
    584591                        break;
    585592                    }
    586                    
    587                     recv.receiveStreamBytes(id, data, read);
     593                    data.flip();
     594                    recv.receiveStreamBytes(id, data);
    588595                }
    589596            } catch (IOException e) {
     
    651658    protected class v1StreamSender extends StreamSender
    652659      {
    653         private List _data;
     660        private List<ByteArray> _data;
    654661        private int _id;
    655662        private ByteCache _cache;
     
    661668        public v1StreamSender ( I2PSocket s, int id ) throws IOException {
    662669            super ( s, id );
    663             _data = new ArrayList(1);
     670            _data = new ArrayList<ByteArray>(1);
    664671            _id = id;
    665672            _cache = ByteCache.getInstance(4, 32*1024);
  • apps/sam/java/src/net/i2p/sam/SAMUtils.java

    r9aa8707 ra4d16af  
    102102    }
    103103   
     104    /**
     105     * Resolve the destination from a key or a hostname
     106     *
     107     * @param s Hostname or key to be resolved
     108     *
     109     * @return the Destination for the specified hostname, or null if not found
     110     */
     111    public static Destination getDest(String s)
     112    {
     113          Destination d = new Destination() ;
     114          try {
     115                  d.fromBase64(s);
     116                  return d ;
     117          } catch (DataFormatException e) {
     118                  return  lookupHost(s, null);
     119          }
     120    }
     121
    104122    /**
    105123     * Parse SAM parameters, and put them into a Propetries object
  • apps/sam/java/src/net/i2p/sam/SAMv1Handler.java

    r9aa8707 ra4d16af  
    1313import java.io.EOFException;
    1414import java.io.IOException;
    15 import java.io.InputStream;
    1615import java.io.InterruptedIOException;
    17 import java.io.OutputStream;
    1816import java.net.ConnectException;
    1917import java.net.NoRouteToHostException;
    20 import java.net.Socket;
     18import java.nio.channels.SocketChannel;
     19import java.nio.ByteBuffer;
    2120import java.util.Properties;
    2221import java.util.StringTokenizer;
     
    4140    private final static Log _log = new Log(SAMv1Handler.class);
    4241
    43     private final static int IN_BUFSIZE = 2048;
    44 
    45     private SAMRawSession rawSession = null;
    46     private SAMDatagramSession datagramSession = null;
    47   protected SAMStreamSession streamSession = null;
    48 
    49     private long _id;
    50     private static volatile long __id = 0;
     42    protected SAMRawSession rawSession = null;
     43    protected SAMDatagramSession datagramSession = null;
     44    protected SAMStreamSession streamSession = null;
     45    protected SAMDatagramSession getDatagramSession() {return datagramSession ;}       
     46    protected SAMRawSession getRawSession() {return rawSession ;}
     47
     48    protected long _id;
     49    protected static volatile long __id = 0;
    5150   
    5251    /**
     
    6160     * @throws IOException
    6261     */
    63     public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException, IOException {
     62    public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException {
    6463        this(s, verMajor, verMinor, new Properties());
    6564    }
     
    7675     * @throws IOException
    7776     */
    78     public SAMv1Handler(Socket s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
     77    public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
    7978        super(s, verMajor, verMinor, i2cpProps);
    8079        _id = ++__id;
     
    102101
    103102        try {
    104             InputStream in = getClientSocketInputStream();
    105             int b = -1;
    106 
    107103            while (true) {
    108104                if (shouldStop()) {
     
    111107                }
    112108
    113                 msg = DataHelper.readLine(in);
     109                msg = DataHelper.readLine(getClientSocket().socket().getInputStream()).trim();
    114110                if (msg == null) {
    115111                    _log.debug("Connection closed by client");
     
    176172                _log.error("Error closing socket: " + e.getMessage());
    177173            }
    178             if (rawSession != null) {
    179                 rawSession.close();
    180             }
    181             if (datagramSession != null) {
    182                 datagramSession.close();
     174            if (getRawSession() != null) {
     175                getRawSession().close();
     176            }
     177            if (getDatagramSession() != null) {
     178                getDatagramSession().close();
    183179            }
    184180            if (streamSession != null) {
     
    189185
    190186    /* Parse and execute a SESSION message */
    191     private boolean execSessionMessage(String opcode, Properties props) {
     187    protected boolean execSessionMessage(String opcode, Properties props) {
    192188
    193189        String dest = "BUG!";
     
    195191        try{
    196192            if (opcode.equals("CREATE")) {
    197                 if ((rawSession != null) || (datagramSession != null)
     193                if ((getRawSession() != null) || (getDatagramSession() != null)
    198194                    || (streamSession != null)) {
    199195                    _log.debug("Trying to create a session, but one still exists");
     
    294290               
    295291    /* Parse and execute a DEST message*/
    296     private boolean execDestMessage(String opcode, Properties props) {
     292  protected boolean execDestMessage(String opcode, Properties props) {
    297293
    298294        if (opcode.equals("GENERATE")) {
     
    319315
    320316    /* Parse and execute a NAMING message */
    321     private boolean execNamingMessage(String opcode, Properties props) {
     317  protected boolean execNamingMessage(String opcode, Properties props) {
    322318        if (opcode.equals("LOOKUP")) {
    323319            if (props == null) {
     
    334330            Destination dest;
    335331            if (name.equals("ME")) {
    336                 if (rawSession != null) {
    337                     dest = rawSession.getDestination();
     332                if (getRawSession() != null) {
     333                    dest = getRawSession().getDestination();
    338334                } else if (streamSession != null) {
    339335                    dest = streamSession.getDestination();
    340                 } else if (datagramSession != null) {
    341                     dest = datagramSession.getDestination();
     336                } else if (getDatagramSession() != null) {
     337                    dest = getDatagramSession().getDestination();
    342338                } else {
    343339                    _log.debug("Lookup for SESSION destination, but session is null");
     
    345341                }
    346342            } else {
    347                 dest = SAMUtils.lookupHost(name, null);
     343                dest = SAMUtils.getDest(name);
    348344            }
    349345           
     
    365361
    366362    /* Parse and execute a DATAGRAM message */
    367     private boolean execDatagramMessage(String opcode, Properties props) {
    368         if (datagramSession == null) {
     363    protected boolean execDatagramMessage(String opcode, Properties props) {
     364        if (getDatagramSession() == null) {
    369365            _log.error("DATAGRAM message received, but no DATAGRAM session exists");
    370366            return false;
     
    404400
    405401            try {
    406                 DataInputStream in = new DataInputStream(getClientSocketInputStream());
     402                DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
    407403                byte[] data = new byte[size];
    408404
     
    436432
    437433    /* Parse and execute a RAW message */
    438     private boolean execRawMessage(String opcode, Properties props) {
    439         if (rawSession == null) {
     434    protected boolean execRawMessage(String opcode, Properties props) {
     435        if (getRawSession() == null) {
    440436            _log.error("RAW message received, but no RAW session exists");
    441437            return false;
     
    475471
    476472            try {
    477                 DataInputStream in = new DataInputStream(getClientSocketInputStream());
     473                DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
    478474                byte[] data = new byte[size];
    479475
    480476                in.readFully(data);
    481477
    482                 if (!rawSession.sendBytes(dest, data)) {
     478                if (!getRawSession().sendBytes(dest, data)) {
    483479                    _log.error("RAW SEND failed");
    484480                    return true;
     
    568564
    569565        try {
    570             if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) {
     566            if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
    571567                if (_log.shouldLog(Log.WARN))
    572568                    _log.warn("STREAM SEND [" + size + "] failed");
     
    692688    // SAMRawReceiver implementation
    693689    public void receiveRawBytes(byte data[]) throws IOException {
    694         if (rawSession == null) {
     690        if (getRawSession() == null) {
    695691            _log.error("BUG! Received raw bytes, but session is null!");
    696692            throw new NullPointerException("BUG! RAW session is null!");
     
    702698        msg.write(msgText.getBytes("ISO-8859-1"));
    703699        msg.write(data);
     700        msg.flush();
    704701       
    705702        if (_log.shouldLog(Log.DEBUG))
    706703            _log.debug("sending to client: " + msgText);
    707704
    708         writeBytes(msg.toByteArray());
     705        writeBytes(ByteBuffer.wrap(msg.toByteArray()));
    709706    }
    710707
     
    712709        _log.debug("stopRawReceiving() invoked");
    713710
    714         if (rawSession == null) {
     711        if (getRawSession() == null) {
    715712            _log.error("BUG! Got raw receiving stop, but session is null!");
    716713            throw new NullPointerException("BUG! RAW session is null!");
     
    727724    // SAMDatagramReceiver implementation
    728725    public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
    729         if (datagramSession == null) {
     726        if (getDatagramSession() == null) {
    730727            _log.error("BUG! Received datagram bytes, but session is null!");
    731728            throw new NullPointerException("BUG! DATAGRAM session is null!");
     
    741738            _log.debug("sending to client: " + msgText);
    742739        msg.write(data);
    743 
    744         writeBytes(msg.toByteArray());
     740        msg.flush();
     741        writeBytes(ByteBuffer.wrap(msg.toByteArray()));
    745742    }
    746743
     
    748745        _log.debug("stopDatagramReceiving() invoked");
    749746
    750         if (datagramSession == null) {
     747        if (getDatagramSession() == null) {
    751748            _log.error("BUG! Got datagram receiving stop, but session is null!");
    752749            throw new NullPointerException("BUG! DATAGRAM session is null!");
     
    831828    }
    832829 
    833     public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
     830    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
    834831        if (streamSession == null) {
    835832            _log.error("Received stream bytes, but session is null!");
     
    837834        }
    838835
    839         String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + len + "\n";
     836        String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n";
    840837        if (_log.shouldLog(Log.DEBUG))
    841838            _log.debug("sending to client: " + msgText);
    842839       
    843         byte prefix[] = msgText.getBytes("ISO-8859-1");
     840        ByteBuffer prefix = ByteBuffer.wrap(msgText.getBytes("ISO-8859-1"));
    844841       
    845         // dont waste so much memory
    846         //ByteArrayOutputStream msg = new ByteArrayOutputStream();
    847         //msg.write(msgText.getBytes("ISO-8859-1"));
    848         //msg.write(data, 0, len);
    849         // writeBytes(msg.toByteArray());
    850842        Object writeLock = getWriteLock();
    851         OutputStream out = getOut();
    852843        synchronized (writeLock) {
    853             out.write(prefix);
    854             out.write(data, 0, len);
    855             out.flush();
     844                while (prefix.hasRemaining()) socket.write(prefix);
     845            while (data.hasRemaining()) socket.write(data);
     846            socket.socket().getOutputStream().flush();
    856847        }
    857848    }
  • apps/sam/java/src/net/i2p/sam/SAMv2Handler.java

    r9aa8707 ra4d16af  
    1010
    1111import java.io.IOException;
    12 import java.net.Socket;
     12import java.nio.channels.SocketChannel;
    1313import java.util.Properties;
    1414
     
    3737                 * @param verMinor SAM minor version to manage
    3838                 */
    39                 public SAMv2Handler ( Socket s, int verMajor, int verMinor ) throws SAMException, IOException
     39                public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
    4040                {
    4141                        this ( s, verMajor, verMinor, new Properties() );
     
    5353                 */
    5454
    55                 public SAMv2Handler ( Socket s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
     55                public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
    5656                {
    5757                        super ( s, verMajor, verMinor, i2cpProps );
  • apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java

    r9aa8707 ra4d16af  
    1313import java.io.InterruptedIOException;
    1414import java.io.OutputStream;
     15import java.nio.channels.Channels;
     16import java.nio.ByteBuffer;
    1517import java.net.ConnectException;
    1618import java.net.NoRouteToHostException;
     
    141143                {
    142144
    143                                 private Object runningLock = new Object();
    144                                 private boolean stillRunning = true;
    145 
    146145                                private int id;
    147146                                private Destination      dest ;
     
    246245
    247246                {
    248                                 private List _data;
     247                                private List<ByteArray> _data;
    249248                                private int _dataSize;
    250249                                private int _id;
     
    258257                                {
    259258                                        super ( s, id );
    260                                         _data = new ArrayList ( 1 );
     259                                        _data = new ArrayList<ByteArray> ( 1 );
    261260                                        _dataSize = 0;
    262261                                        _id = id;
     
    512511
    513512                                        int read = -1;
    514                                         byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
     513                                        ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE);
    515514
    516515                                        try
     
    534533                                                        }
    535534                                                       
    536                                                         read = in.read ( data );
     535                                                        data.clear();
     536                                                        read = Channels.newChannel(in).read ( data );
    537537
    538538                                                        if ( read == -1 )
     
    543543
    544544                                                        totalReceived += read ;
    545                                                        
    546                                                         recv.receiveStreamBytes ( id, data, read );
     545                                                        data.flip();
     546                                                        recv.receiveStreamBytes ( id, data );
    547547                                                }
    548548                                        }
  • apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java

    r9aa8707 ra4d16af  
    1313 */
    1414public class SAMEventHandler extends SAMClientEventListenerImpl {
    15     private I2PAppContext _context;
     15    //private I2PAppContext _context;
    1616    private Log _log;
    1717    private Boolean _helloOk;
     
    2020    private Object _sessionCreateLock = new Object();
    2121    private Object _namingReplyLock = new Object();
    22     private Map _namingReplies = new HashMap();
     22    private Map<String,String> _namingReplies = new HashMap<String,String>();
    2323
    2424    public SAMEventHandler(I2PAppContext ctx) {
    25         _context = ctx;
     25        //_context = ctx;
    2626        _log = ctx.logManager().getLog(getClass());
    2727    }
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    r9aa8707 ra4d16af  
    3232    private InputStream _samIn;
    3333    private SAMReader _reader;
    34     private boolean _dead;
     34    //private boolean _dead;
    3535    private SAMEventHandler _eventHandler;
    3636    /** Connection id (Integer) to peer (Flooder) */
    37     private Map _remotePeers;
     37    private Map<Integer, Sender> _remotePeers;
    3838   
    3939    public static void main(String args[]) {
     
    4343        }
    4444        I2PAppContext ctx = new I2PAppContext();
    45         String files[] = new String[args.length - 3];
     45        //String files[] = new String[args.length - 3];
    4646        SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]);
    4747        sender.startup();
     
    5151        _context = ctx;
    5252        _log = ctx.logManager().getLog(SAMStreamSend.class);
    53         _dead = false;
     53        //_dead = false;
    5454        _samHost = samHost;
    5555        _samPort = samPort;
     
    5858        _conOptions = "";
    5959        _eventHandler = new SendEventHandler(_context);
    60         _remotePeers = new HashMap();
     60        _remotePeers = new HashMap<Integer,Sender>();
    6161    }
    6262   
     
    208208            _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
    209209            byte data[] = new byte[1024];
    210             long value = 0;
    211210            long lastSend = _context.clock().now();
    212211            while (!_closed) {
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    r9aa8707 ra4d16af  
    3232    private InputStream _samIn;
    3333    private SAMReader _reader;
    34     private boolean _dead;
     34    //private boolean _dead;
    3535    private SAMEventHandler _eventHandler;
    3636    /** Connection id (Integer) to peer (Flooder) */
    37     private Map _remotePeers;
     37    private Map<Integer, Sink> _remotePeers;
    3838   
    3939    public static void main(String args[]) {
     
    5050        _context = ctx;
    5151        _log = ctx.logManager().getLog(SAMStreamSink.class);
    52         _dead = false;
     52        //_dead = false;
    5353        _samHost = samHost;
    5454        _samPort = samPort;
     
    5757        _conOptions = "";
    5858        _eventHandler = new SinkEventHandler(_context);
    59         _remotePeers = new HashMap();
     59        _remotePeers = new HashMap<Integer,Sink>();
    6060    }
    6161   
     
    7171            _log.debug("Handshake complete.  we are " + ourDest);
    7272            if (ourDest != null) {
    73                 boolean written = writeDest(ourDest);
     73                //boolean written =
     74                        writeDest(ourDest);
    7475                _log.debug("Dest written");
    7576            }
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java

    r9aa8707 ra4d16af  
    11package net.i2p.client.streaming;
    22
     3import java.net.ConnectException;
    34import java.util.ArrayList;
    45import java.util.List;
    56
    67import net.i2p.I2PAppContext;
     8import net.i2p.util.Clock;
    79import net.i2p.util.Log;
    810import net.i2p.util.SimpleTimer;
     
    1517    private Log _log;
    1618    private ConnectionManager _manager;
    17     private List _synQueue;
     19    private List<Packet> _synQueue;
    1820    private boolean _active;
    1921    private int _acceptTimeout;
     
    6264    }
    6365   
     66    public boolean waitSyn( long ms ) throws InterruptedException {
     67        boolean incoming = false ;
     68        boolean isTimed = (ms>=0);
     69
     70        Clock clock = I2PAppContext.getGlobalContext().clock();
     71        long now = clock.now();
     72        long end = now + ms;
     73        while (!incoming && (!isTimed || now<=end) ) {
     74                synchronized (_synQueue) {
     75
     76                        for (Packet p : _synQueue)
     77                        {
     78                                if (p.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
     79                                        incoming = true ;
     80                                        break;
     81                                }
     82                        }
     83                        if (!incoming) {
     84                                if (!isTimed) {
     85                                        _synQueue.wait();
     86                                } else {
     87                                        now = clock.now();
     88                                        if (now < end) {
     89                                                _synQueue.wait(end-now);
     90                                        }
     91                                }
     92                        }
     93                }
     94        }
     95        return incoming ;
     96    }
     97   
    6498    /**
    6599     * Receive an incoming connection (built from a received SYN)
     
    67101     * that they don't get thrown away while the SYN packet before it is queued.
    68102     *
    69      * @param timeoutMs max amount of time to wait for a connection (if less
    70      *                  than 1ms, wait indefinitely)
     103     * @param timeoutMs max amount of time to wait for a connection (if negative,
     104     *                  wait indefinitely)
    71105     * @return connection received, or null if there was a timeout or the
    72106     *                    handler was shut down
     
    78112        long expiration = timeoutMs + _context.clock().now();
    79113        while (true) {
    80             if ( (timeoutMs > 0) && (expiration < _context.clock().now()) )
    81                 return null;
    82114            if (!_active) {
    83115                // fail all the ones we had queued up
     
    98130                        _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: "
    99131                                   + _synQueue.size());
    100                     if (timeoutMs <= 0) {
     132                    if (timeoutMs < 0) {
    101133                        try { _synQueue.wait(); } catch (InterruptedException ie) {}
    102134                    } else {
     
    130162            }
    131163            // keep looping...
     164            if ( (timeoutMs >= 0) && (expiration < _context.clock().now()) )
     165                return null;
    132166        }
    133167    }
  • apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java

    r9aa8707 ra4d16af  
    11package net.i2p.client.streaming;
    22
     3import java.net.ConnectException;
    34import java.net.SocketTimeoutException;
     5
     6import net.i2p.I2PAppContext;
    47import net.i2p.I2PException;
     8import net.i2p.util.Clock;
     9import net.i2p.util.Log;
    510
    611/**
     
    4651        return _socketManager;
    4752    }
     53
     54    /**
     55     * accept(true) has the same behaviour as accept().
     56     * accept(false) does not wait for a socket connecting. If a socket is
     57     * available in the queue, it is accepted. Else, null is returned.
     58     *
     59     * @param true if the call should block until a socket is available
     60     *
     61     * @return a connected I2PSocket, or null
     62     *
     63     * @throws I2PException if there is a problem with reading a new socket
     64     *         from the data available (aka the I2PSession closed, etc)
     65     * @throws SocketTimeoutException if the timeout has been reached
     66     */
     67
     68        public I2PSocket accept(boolean blocking)  throws I2PException, SocketTimeoutException {
     69                long timeout = this.getSoTimeout();
     70
     71                try {
     72                        if (blocking)
     73                        {
     74                                this.setSoTimeout(-1);
     75                        } else {
     76                                this.setSoTimeout(0);
     77                        }
     78                        try {
     79                                return this.accept();
     80                        } catch (SocketTimeoutException e) {
     81                                if (blocking) throw e;
     82                                else return null ;
     83                        }
     84                } finally {
     85                        this.setSoTimeout(timeout);
     86                }
     87        }
     88
     89        public boolean waitIncoming(long timeoutMs) throws InterruptedException {
     90        return this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs);
     91        }
    4892}
Note: See TracChangeset for help on using the changeset viewer.