Changeset 18a475e


Ignore:
Timestamp:
Apr 8, 2009 6:41:53 AM (11 years ago)
Author:
mkvore-commit <mkvore-commit@…>
Branches:
master
Children:
0f1f33e
Parents:
ed259ac (diff), 54255ca (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

merge of '9666f5fdfc24a7fc2ca3a99a95ea5dfef5583b1b'

and 'e76b1962963aa7cadb74aacc32f90adf31db3761'

Location:
apps
Files:
19 added
26 edited

Legend:

Unmodified
Added
Removed
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java

    red259ac r18a475e  
    3232
    3333    /**
     34     * accept(timeout) waits timeout ms for a socket connecting. If a socket is
     35     * not available during the timeout, return null. accept(0) behaves like accept()
     36     *
     37     * @param timeout in ms
     38     *
     39     * @return a connected I2PSocket, or null
     40     *
     41     * @throws I2PException if there is a problem with reading a new socket
     42     *         from the data available (aka the I2PSession closed, etc)
     43     * @throws ConnectException if the I2PServerSocket is closed
     44     * @throws InterruptedException if thread is interrupted while waiting
     45     */
     46    public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException;
     47
     48    /**
     49     * Wait until there is a socket waiting for acception or the timeout is
     50     * reached.
     51     *
     52     * @param timeoutMs timeout in ms. If ms is 0, wait forever.
     53     *
     54     * @return true if a socket is available, false if not
     55     *
     56     * @throws I2PException if there is a problem with reading a new socket
     57     *         from the data available (aka the I2PSession closed, etc)
     58     * @throws ConnectException if the I2PServerSocket is closed
     59     * @throws InterruptedException if the thread is interrupted before
     60     *         completion
     61     */
     62    public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException;
     63
     64    /**
    3465     * Set Sock Option accept timeout
    3566     * @param x timeout in ms
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    red259ac r18a475e  
    22
    33import java.net.ConnectException;
     4import java.net.SocketTimeoutException;
    45import java.util.ArrayList;
    56import java.util.Collections;
     
    2122    private I2PSocketManager mgr;
    2223    /** list of sockets waiting for the client to accept them */
    23     private List pendingSockets = Collections.synchronizedList(new ArrayList(4));
     24    private List<I2PSocket> pendingSockets = Collections.synchronizedList(new ArrayList<I2PSocket>(4));
    2425   
    2526    /** have we been closed */
     
    5152   
    5253    /**
    53      * Waits for the next socket connecting.  If a remote user tried to make a
    54      * connection and the local application wasn't .accept()ing new connections,
    55      * they should get refused (if .accept() doesnt occur in some small period -
    56      * currently 5 seconds)
    57      *
    58      * @return a connected I2PSocket
     54     * Waits until there is a socket waiting for acception or the timeout is
     55     * reached.
     56     *
     57     * @param timeoutMs timeout in ms. A negative value waits forever.
    5958     *
    6059     * @throws I2PException if there is a problem with reading a new socket
    6160     *         from the data available (aka the I2PSession closed, etc)
    6261     * @throws ConnectException if the I2PServerSocket is closed
    63      */
    64     public I2PSocket accept() throws I2PException, ConnectException {
    65         if (_log.shouldLog(Log.DEBUG))
    66             _log.debug("accept() called, pending: " + pendingSockets.size());
    67        
    68         I2PSocket ret = null;
    69        
    70         while ( (ret == null) && (!closing) ){
     62     * @throws InterruptedException if thread is interrupted while waiting
     63     */
     64    public void waitIncoming(long timeoutMs) throws I2PException, ConnectException, InterruptedException {
     65        if (_log.shouldLog(Log.DEBUG))
     66            _log.debug("waitIncoming() called, pending: " + pendingSockets.size());
     67       
     68        boolean isTimed = (timeoutMs>0);
     69        if (isTimed) {
     70            Clock clock = I2PAppContext.getGlobalContext().clock();
     71            long now = clock.now();
     72            long end = now + timeoutMs;
     73            while (pendingSockets.size() <= 0 && now<end) {
     74                if (closing) throw new ConnectException("I2PServerSocket closed");
     75                synchronized(socketAddedLock) {
     76                    socketAddedLock.wait(end - now);
     77                }
     78                now = clock.now();
     79            }
     80        } else {
    7181            while (pendingSockets.size() <= 0) {
    7282                if (closing) throw new ConnectException("I2PServerSocket closed");
     
    7787                } catch (InterruptedException ie) {}
    7888            }
    79             synchronized (pendingSockets) {
    80                 if (pendingSockets.size() > 0) {
    81                     ret = (I2PSocket)pendingSockets.remove(0);
    82                 }
     89        }
     90        }
     91
     92    /**
     93     * accept(timeout) waits timeout ms for a socket connecting. If a socket is
     94     * not available during the timeout, return null. accept(0) behaves like accept()
     95     *
     96     * @param timeout in ms
     97     *
     98     * @return a connected I2PSocket, or null
     99     *
     100     * @throws I2PException if there is a problem with reading a new socket
     101     *         from the data available (aka the I2PSession closed, etc)
     102     * @throws ConnectException if the I2PServerSocket is closed
     103     * @throws InterruptedException if thread is interrupted while waiting
     104     */
     105        public I2PSocket accept(long timeout) throws I2PException, ConnectException, InterruptedException {
     106        I2PSocket ret = null;
     107       
     108        if (timeout<=0) {
     109                ret = accept();
     110        } else {
     111                long now  = I2PAppContext.getGlobalContext().clock().now();
     112            long expiration = timeout + now ;
     113                synchronized (pendingSockets) {
     114                while (pendingSockets.size() == 0 && expiration>now) {
     115                        pendingSockets.wait(expiration-now);
     116                        now  = I2PAppContext.getGlobalContext().clock().now();
     117                }
     118                ret = (I2PSocket)pendingSockets.remove(0);
    83119            }
    84120            if (ret != null) {
     
    86122                    socketAcceptedLock.notifyAll();
    87123                }
    88             }
     124            }           
     125        }
     126                return ret;
     127        }
     128
     129        /**
     130     * Waits for the next socket connecting.  If a remote user tried to make a
     131     * connection and the local application wasn't .accept()ing new connections,
     132     * they should get refused (if .accept() doesnt occur in some small period -
     133     * currently 5 seconds)
     134     *
     135     * @return a connected I2PSocket
     136     *
     137     * @throws I2PException if there is a problem with reading a new socket
     138     *         from the data available (aka the I2PSession closed, etc)
     139     * @throws ConnectException if the I2PServerSocket is closed
     140     */
     141    public I2PSocket accept() throws I2PException, ConnectException {
     142        if (_log.shouldLog(Log.DEBUG))
     143            _log.debug("accept() called, pending: " + pendingSockets.size());
     144       
     145        I2PSocket ret = null;
     146       
     147        while ( (ret == null) && (!closing) ){
     148                try {
     149                        this.waitIncoming(0);
     150                        ret = accept(1);
     151                } catch (InterruptedException e) {
     152                        throw new I2PException("Thread interrupted") ;
     153                }
    89154        }
    90155       
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    red259ac r18a475e  
    351351                bc.notifyAll();
    352352            }
    353             boolean timedOut = false;
    354353
    355354            while ( (read.length == 0) && (!inStreamClosed) ) {
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java

    red259ac r18a475e  
    1414import net.i2p.client.I2PSession;
    1515import net.i2p.client.I2PSessionException;
    16 import net.i2p.data.Destination;
    1716import net.i2p.util.Log;
    1817
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerImpl.java

    red259ac r18a475e  
    4444    private I2PServerSocketImpl _serverSocket = null;
    4545    private Object lock = new Object(); // for locking socket lists
    46     private HashMap _outSockets;
    47     private HashMap _inSockets;
     46    private HashMap<String,I2PSocket> _outSockets;
     47    private HashMap<String,I2PSocket> _inSockets;
    4848    private I2PSocketOptions _defaultOptions;
    4949    private long _acceptTimeout;
    5050    private String _name;
    51     private List _listeners;
     51    private List<DisconnectListener> _listeners;
    5252    private static int __managerId = 0;
    5353   
     
    7777        _context = context;
    7878        _log = _context.logManager().getLog(I2PSocketManager.class);
    79         _inSockets = new HashMap(16);
    80         _outSockets = new HashMap(16);
     79        _inSockets = new HashMap<String,I2PSocket>(16);
     80        _outSockets = new HashMap<String,I2PSocket>(16);
    8181        _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
    82         _listeners = new ArrayList(1);
     82        _listeners = new ArrayList<DisconnectListener>(1);
    8383        setSession(session);
    8484        setDefaultOptions(buildOptions(opts));
     
    114114        _log.info(getName() + ": Disconnected from the session");
    115115        destroySocketManager();
    116         List listeners = null;
     116        List<DisconnectListener> listeners = null;
    117117        synchronized (_listeners) {
    118             listeners = new ArrayList(_listeners);
     118            listeners = new ArrayList<DisconnectListener>(_listeners);
    119119            _listeners.clear();
    120120        }
     
    131131    public void messageAvailable(I2PSession session, int msgId, long size) {
    132132        try {
    133             I2PSocketImpl s;
    134133            byte msg[] = session.receiveMessage(msgId);
    135134            if (msg.length == 1 && msg[0] == -1) {
     
    661660     */
    662661    public Set listSockets() {
    663         Set sockets = new HashSet(8);
     662        Set<I2PSocket> sockets = new HashSet<I2PSocket>(8);
    664663        synchronized (lock) {
    665664            sockets.addAll(_inSockets.values());
  • apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java

    red259ac r18a475e  
    2929    private boolean _dead; // unused? used elsewhere?
    3030   
     31    public void antiCompilationWarnings() {
     32        _log.debug(""+_conOptions+_dead);
     33    }
     34   
    3135    public static void main(String args[]) {
    3236        if (args.length < 1) {
     
    132136        }
    133137       
     138        public void antiCompilationWarnings() {
     139                _log.debug(""+this._lastReceived+this._lastReceivedOn+this._started);
     140        }
     141        public void antiCompilationWarnings(long x, long y) {
     142                if (false)
     143                        _log.debug(""+x+y);
     144        }
     145
    134146        public Flooder(I2PSocket socket) {
    135147            _socket = socket;
     
    155167            long value = 0;
    156168            long lastSend = _context.clock().now();
     169            this.antiCompilationWarnings(value, lastSend);
     170           
    157171            if (_socket == null) {
    158172                try {
  • apps/sam/java/build.xml

    red259ac r18a475e  
    55    <target name="builddep">
    66        <ant dir="../../ministreaming/java/" target="build" />
     7        <ant dir="../../streaming/java/" target="build" />
    78        <!-- ministreaming will build core -->
    89    </target>
     
    1920                <pathelement location="../../../core/java/build/obj" />
    2021                <pathelement location="../../ministreaming/java/build/obj" />
     22                <pathelement location="../../streaming/java/build/obj" />
    2123            </classpath>
    2224        </depend>
     
    2931            debug="true" deprecation="on" source="1.5" target="1.5"
    3032            destdir="./build/obj"
    31             classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" />
     33            classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar:../../streaming/java/build/streaming.jar" />
    3234    </target>
    3335    <target name="compileTest" depends="compile">
     
    3638            debug="true" deprecation="on" source="1.5" target="1.5"
    3739            destdir="./build/obj"
    38             classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" />
     40            classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar:../../streaming/java/build/streaming.jar" />
    3941    </target>
    4042    <target name="jar" depends="compile">
     
    4244            <manifest>
    4345                <attribute name="Main-Class" value="net.i2p.sam.SAMBridge" />
    44                 <attribute name="Class-Path" value="i2p.jar mstreaming.jar" />
     46                <attribute name="Class-Path" value="i2p.jar mstreaming.jar streaming.jar" />
    4547            </manifest>
    4648        </jar>
     
    5355        <mkdir dir="./build/javadoc" />
    5456        <javadoc
    55             sourcepath="./src:../../../core/java/src:../../ministreaming/java/src" destdir="./build/javadoc"
     57            sourcepath="./src:../../../core/java/src:../../ministreaming/java/src:../../streaming/java/src" destdir="./build/javadoc"
    5658            packagenames="*"
    5759            use="true"
     
    6567        <!-- ministreaming will clean core -->
    6668        <ant dir="../../ministreaming/java/" target="distclean" />
     69        <ant dir="../../streaming/java/" target="distclean" />
    6770    </target>
    6871    <target name="distclean" depends="clean">
    6972        <!-- ministreaming will clean core -->
    7073        <ant dir="../../ministreaming/java/" target="distclean" />
     74        <ant dir="../../streaming/java/" target="distclean" />
    7175    </target>
    7276</project>
  • apps/sam/java/src/net/i2p/sam/SAMBridge.java

    red259ac r18a475e  
    1515import java.io.IOException;
    1616import java.io.InputStreamReader;
    17 import java.net.InetAddress;
    18 import java.net.ServerSocket;
    19 import java.net.Socket;
     17import java.net.InetSocketAddress;
     18import java.nio.channels.ServerSocketChannel;
     19import java.nio.channels.SocketChannel;
     20import java.nio.ByteBuffer;
    2021import java.util.HashMap;
    2122import java.util.Iterator;
     
    3536public class SAMBridge implements Runnable {
    3637    private final static Log _log = new Log(SAMBridge.class);
    37     private ServerSocket serverSocket;
     38    private ServerSocketChannel serverSocket;
    3839    private Properties i2cpProps;
    3940    /**
     
    4647     * destination keys (Destination+PrivateKey+SigningPrivateKey)
    4748     */
    48     private Map nameToPrivKeys;
     49    private Map<String,String> nameToPrivKeys;
    4950
    5051    private boolean acceptConnections = true;
    5152
    5253    private static final int SAM_LISTENPORT = 7656;
     54   
    5355    public static final String DEFAULT_SAM_KEYFILE = "sam.keys";
     56    public static final String PROP_TCP_HOST = "sam.tcp.host";
     57    public static final String PROP_TCP_PORT = "sam.tcp.port";
     58    protected static final String DEFAULT_TCP_HOST = "0.0.0.0";
     59    protected static final String DEFAULT_TCP_PORT = "7656";
     60   
     61    public static final String PROP_DATAGRAM_HOST = "sam.udp.host";
     62    public static final String PROP_DATAGRAM_PORT = "sam.udp.port";
     63    protected static final String DEFAULT_DATAGRAM_HOST = "0.0.0.0";
     64    protected static final String DEFAULT_DATAGRAM_PORT = "7655";
     65
    5466   
    5567    private SAMBridge() {}
     
    6577    public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) {
    6678        persistFilename = persistFile;
    67         nameToPrivKeys = new HashMap(8);
     79        nameToPrivKeys = new HashMap<String,String>(8);
    6880        loadKeys();
    6981        try {
    7082            if ( (listenHost != null) && !("0.0.0.0".equals(listenHost)) ) {
    71                 serverSocket = new ServerSocket(listenPort, 0, InetAddress.getByName(listenHost));
     83                serverSocket = ServerSocketChannel.open();
     84                serverSocket.socket().bind(new InetSocketAddress(listenHost, listenPort));
    7285                if (_log.shouldLog(Log.DEBUG))
    7386                    _log.debug("SAM bridge listening on "
    7487                               + listenHost + ":" + listenPort);
    7588            } else {
    76                 serverSocket = new ServerSocket(listenPort);
     89                serverSocket = ServerSocketChannel.open();
     90                serverSocket.socket().bind(new InetSocketAddress(listenPort));
    7791                if (_log.shouldLog(Log.DEBUG))
    7892                    _log.debug("SAM bridge listening on 0.0.0.0:" + listenPort);
     
    192206    }
    193207   
     208    static class HelpRequested extends Exception {static final long serialVersionUID=0x1;}
     209   
    194210    /**
    195211     * Usage:
    196      *  <pre>SAMBridge [[listenHost ]listenPort[ name=val]*]</pre>
    197      *
     212     *  <pre>SAMBridge [ keyfile [listenHost ] listenPort [ name=val ]* ]</pre>
     213     * or:
     214     *  <pre>SAMBridge [ name=val ]* </pre>
     215     * 
    198216     * name=val options are passed to the I2CP code to build a session,
    199217     * allowing the bridge to specify an alternate I2CP host and port, tunnel
    200218     * depth, etc.
    201      * @param args [[listenHost ]listenPort[ name=val]*]
     219     * @param args [ keyfile [ listenHost ] listenPort [ name=val ]* ]
    202220     */
    203221    public static void main(String args[]) {
    204222        String keyfile = DEFAULT_SAM_KEYFILE;
    205223        int port = SAM_LISTENPORT;
    206         String host = "0.0.0.0";
     224        String host = DEFAULT_TCP_HOST;
    207225        Properties opts = null;
    208226        if (args.length > 0) {
    209             keyfile = args[0];
    210             int portIndex = 1;
    211             try {
    212                 port = Integer.parseInt(args[portIndex]);
    213             } catch (NumberFormatException nfe) {
    214                 host = args[1];
    215                 portIndex++;
    216                 try {
    217                     port = Integer.parseInt(args[portIndex]);
    218                 } catch (NumberFormatException nfe1) {
    219                     usage();
    220                     return;
    221                 }
    222             }
    223             opts = parseOptions(args, portIndex+1);
     227                try {
     228                        opts = parseOptions(args, 0);
     229                        keyfile = args[0];
     230                        int portIndex = 1;
     231                        try {
     232                                if (args.length>portIndex) port = Integer.parseInt(args[portIndex]);
     233                        } catch (NumberFormatException nfe) {
     234                                host = args[portIndex];
     235                                portIndex++;
     236                                try {
     237                                        if (args.length>portIndex) port = Integer.parseInt(args[portIndex]);
     238                                } catch (NumberFormatException nfe1) {
     239                                        try {
     240                                                port = Integer.parseInt(opts.getProperty(SAMBridge.PROP_TCP_PORT, SAMBridge.DEFAULT_TCP_PORT));
     241                                                host = opts.getProperty(SAMBridge.PROP_TCP_HOST, SAMBridge.DEFAULT_TCP_HOST);
     242                                        } catch (NumberFormatException e) {
     243                                                usage();
     244                                                return;
     245                                        }
     246                                }
     247                        }
     248                } catch (HelpRequested e) {
     249                        usage();
     250                        return;
     251                }
    224252        }
    225253        SAMBridge bridge = new SAMBridge(host, port, opts, keyfile);
     
    237265    }
    238266
    239     private static Properties parseOptions(String args[], int startArgs) {
     267    private static Properties parseOptions(String args[], int startArgs) throws HelpRequested {
    240268        Properties props = new Properties();
    241269        // skip over first few options
    242270        for (int i = startArgs; i < args.length; i++) {
     271                if (args[i].equals("-h")) throw new HelpRequested();
    243272            int eq = args[i].indexOf('=');
    244273            if (eq <= 0) continue;
     
    256285    private static void usage() {
    257286        System.err.println("Usage: SAMBridge [keyfile [listenHost] listenPortNum[ name=val]*]");
     287        System.err.println("or:");
     288        System.err.println("       SAMBridge [ name=val ]*");
    258289        System.err.println(" keyfile: location to persist private keys (default sam.keys)");
    259290        System.err.println(" listenHost: interface to listen on (0.0.0.0 for all interfaces)");
     
    261292        System.err.println(" name=val: options to pass when connecting via I2CP, such as ");
    262293        System.err.println("           i2cp.host=localhost and i2cp.port=7654");
     294        System.err.println("");
     295        System.err.println("Host and ports of the SAM bridge can be specified with the alternate");
     296        System.err.println("form by specifying options "+SAMBridge.PROP_TCP_HOST+" and/or "+
     297                        SAMBridge.PROP_TCP_PORT);
     298        System.err.println("");
     299        System.err.println("Options "+SAMBridge.PROP_DATAGRAM_HOST+" and "+SAMBridge.PROP_DATAGRAM_PORT+
     300                        " specify the listening ip");
     301        System.err.println("range and the port of SAM datagram server. This server is");
     302        System.err.println("only launched after a client creates the first SAM datagram");
     303        System.err.println("or raw session, after a handshake with SAM version >= 3.0.");
     304        System.err.println("");
     305        System.err.println("The option loglevel=[DEBUG|WARN|ERROR|CRIT] can be used");
     306        System.err.println("for tuning the log verbosity.\n");
    263307    }
    264308   
     
    267311        try {
    268312            while (acceptConnections) {
    269                 Socket s = serverSocket.accept();
     313                SocketChannel s = serverSocket.accept();
    270314                if (_log.shouldLog(Log.DEBUG))
    271315                    _log.debug("New connection from "
    272                                + s.getInetAddress().toString() + ":"
    273                                + s.getPort());
    274 
    275                 try {
    276                     SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps);
    277                     if (handler == null) {
    278                         if (_log.shouldLog(Log.DEBUG))
    279                             _log.debug("SAM handler has not been instantiated");
     316                               + s.socket().getInetAddress().toString() + ":"
     317                               + s.socket().getPort());
     318
     319                class HelloHandler implements Runnable {
     320                        SocketChannel s ;
     321                        SAMBridge parent ;
     322                        HelloHandler(SocketChannel s, SAMBridge parent) {
     323                                this.s = s ;
     324                                this.parent = parent ;
     325                        }
     326                        public void run() {
    280327                        try {
    281                             s.close();
    282                         } catch (IOException e) {}
    283                         continue;
    284                     }
    285                     handler.setBridge(this);
    286                     handler.startHandling();
    287                 } catch (SAMException e) {
    288                     if (_log.shouldLog(Log.ERROR))
    289                         _log.error("SAM error: " + e.getMessage(), e);
    290                     try {
    291                         String reply = "HELLO REPLY RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n";
    292                         s.getOutputStream().write(reply.getBytes("ISO-8859-1"));
    293                     } catch (IOException ioe) {
    294                         if (_log.shouldLog(Log.ERROR))
    295                             _log.error("SAM Error sending error reply", ioe);
    296                     }
    297                     try { s.close(); } catch (IOException ioe) {}
    298                 } catch (Exception ee) {
    299                     try { s.close(); } catch (IOException ioe) {}
    300                     _log.log(Log.CRIT, "Unexpected error handling SAM connection", ee);
    301                 }
     328                            SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps);
     329                            if (handler == null) {
     330                                if (_log.shouldLog(Log.DEBUG))
     331                                    _log.debug("SAM handler has not been instantiated");
     332                                try {
     333                                    s.close();
     334                                } catch (IOException e) {}
     335                                return;
     336                            }
     337                            handler.setBridge(parent);
     338                            handler.startHandling();
     339                        } catch (SAMException e) {
     340                            if (_log.shouldLog(Log.ERROR))
     341                                _log.error("SAM error: " + e.getMessage(), e);
     342                            try {
     343                                String reply = "HELLO REPLY RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n";
     344                                s.write(ByteBuffer.wrap(reply.getBytes("ISO-8859-1")));
     345                            } catch (IOException ioe) {
     346                                if (_log.shouldLog(Log.ERROR))
     347                                    _log.error("SAM Error sending error reply", ioe);
     348                            }
     349                            try { s.close(); } catch (IOException ioe) {}
     350                        } catch (Exception ee) {
     351                            try { s.close(); } catch (IOException ioe) {}
     352                            _log.log(Log.CRIT, "Unexpected error handling SAM connection", ee);
     353                        }                               
     354                        }
     355                }
     356                new I2PAppThread(new HelloHandler(s,this), "HelloHandler").start();
    302357            }
    303358        } catch (Exception e) {
  • apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java

    red259ac r18a475e  
    3131    public static int DGRAM_SIZE_MAX = 31*1024;
    3232
    33     private SAMDatagramReceiver recv = null;
     33    protected SAMDatagramReceiver recv = null;
    3434
    3535    private I2PDatagramMaker dgramMaker;
     
    8585        if (data.length > DGRAM_SIZE_MAX)
    8686            throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
    87        
    88         byte[] dgram = dgramMaker.makeI2PDatagram(data);
    89 
     87        byte[] dgram ;
     88        synchronized (dgramMaker) {
     89                dgram = dgramMaker.makeI2PDatagram(data);
     90        }
    9091        return sendBytesThroughMessageSession(dest, dgram);
    9192    }
  • apps/sam/java/src/net/i2p/sam/SAMException.java

    red259ac r18a475e  
    1616public class SAMException extends Exception {
    1717
     18        static final long serialVersionUID = 1 ;
     19
    1820    public SAMException() {
    19         super();
     21        super();
    2022    }
    2123   
    2224    public SAMException(String s) {
    23         super(s);
     25        super(s);
    2426    }
    2527}
  • apps/sam/java/src/net/i2p/sam/SAMHandler.java

    red259ac r18a475e  
    1010
    1111import java.io.IOException;
    12 import java.io.InputStream;
    13 import java.io.OutputStream;
    14 import java.net.Socket;
     12import java.nio.channels.SocketChannel;
     13import java.nio.ByteBuffer;
    1514import java.util.Properties;
    1615
     
    3332
    3433    private Object socketWLock = new Object(); // Guards writings on socket
    35     private Socket socket = null;
    36     private OutputStream socketOS = null; // Stream associated to socket
     34    protected SocketChannel socket = null;
    3735
    3836    protected int verMajor = 0;
     
    5452     * @throws IOException
    5553     */
    56     protected SAMHandler(Socket s,
     54    protected SAMHandler(SocketChannel s,
    5755                         int verMajor, int verMinor, Properties i2cpProps) throws IOException {
    5856        socket = s;
    59         socketOS = socket.getOutputStream();
    6057
    6158        this.verMajor = verMajor;
     
    8784     * @throws IOException
    8885     */
    89     protected final InputStream getClientSocketInputStream() throws IOException {
    90         return socket.getInputStream();
     86    protected final SocketChannel getClientSocket() {
     87        return socket ;
    9188    }
    9289
     
    9996     * @throws IOException
    10097     */
    101     protected final void writeBytes(byte[] data) throws IOException {
     98    protected final void writeBytes(ByteBuffer data) throws IOException {
    10299        synchronized (socketWLock) {
    103             socketOS.write(data);
    104             socketOS.flush();
     100            writeBytes(data, socket);
    105101        }
     102    }
     103   
     104    static public void writeBytes(ByteBuffer data, SocketChannel out) throws IOException {
     105        while (data.hasRemaining()) out.write(data);           
     106        out.socket().getOutputStream().flush();
    106107    }
    107108   
     
    113114     */
    114115    protected Object getWriteLock() { return socketWLock; }
    115     protected OutputStream getOut() { return socketOS; }
    116116
    117117    /**
     
    122122     * @param str A byte array to be written
    123123     *
    124      * @return True is the string was successfully written, false otherwise
     124     * @return True if the string was successfully written, false otherwise
    125125     */
    126126    protected final boolean writeString(String str) {
    127127        if (_log.shouldLog(Log.DEBUG))
    128128            _log.debug("Sending the client: [" + str + "]");
    129         try {
    130             writeBytes(str.getBytes("ISO-8859-1"));
     129        return writeString(str, socket);
     130    }
     131
     132    public static boolean writeString(String str, SocketChannel out)
     133    {
     134        try {
     135            writeBytes(ByteBuffer.wrap(str.getBytes("ISO-8859-1")), out);
    131136        } catch (IOException e) {
    132137            _log.debug("Caught IOException", e);
    133138            return false;
    134139        }
    135 
    136         return true;
     140        return true ;
    137141    }
    138 
     142   
    139143    /**
    140144     * Close the socket connected to the SAM client.
     
    179183                + "; SAM version: " + verMajor + "." + verMinor
    180184                + "; client: "
    181                 + this.socket.getInetAddress().toString() + ":"
    182                 + this.socket.getPort() + ")");
     185                + this.socket.socket().getInetAddress().toString() + ":"
     186                + this.socket.socket().getPort() + ")");
    183187    }
    184188
  • apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java

    red259ac r18a475e  
    1010
    1111import java.io.IOException;
    12 import java.io.OutputStream;
    1312import java.io.UnsupportedEncodingException;
    14 import java.net.Socket;
     13import java.nio.channels.SocketChannel;
     14import java.nio.ByteBuffer;
    1515import java.util.Properties;
    1616import java.util.StringTokenizer;
     
    3535     * @return A SAM protocol handler, or null if the client closed before the handshake
    3636     */
    37     public static SAMHandler createSAMHandler(Socket s, Properties i2cpProps) throws SAMException {
     37    public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException {
    3838        String line;
    3939        StringTokenizer tok;
    4040
    4141        try {
    42             line = DataHelper.readLine(s.getInputStream());
     42            line = DataHelper.readLine(s.socket().getInputStream());
    4343            if (line == null) {
    4444                _log.debug("Connection closed by client");
    4545                return null;
    4646            }
    47             tok = new StringTokenizer(line, " ");
     47            tok = new StringTokenizer(line.trim(), " ");
    4848        } catch (IOException e) {
    4949            throw new SAMException("Error reading from socket: "
     
    8585
    8686        String ver = chooseBestVersion(minVer, maxVer);
    87         if (ver == null)
    88             throw new SAMException("No version specified");
    8987
    90         // Let's answer positively
    9188        try {
    92             OutputStream out = s.getOutputStream();
    93             out.write(("HELLO REPLY RESULT=OK VERSION="
    94                        + ver + "\n").getBytes("ISO-8859-1"));
     89            if (ver == null) {
     90                s.write(ByteBuffer.wrap(("HELLO REPLY RESULT=NOVERSION\n").getBytes("ISO-8859-1")));
     91                return null ;
     92            }
     93            // Let's answer positively
     94            s.write(ByteBuffer.wrap(("HELLO REPLY RESULT=OK VERSION="
     95                       + ver + "\n").getBytes("ISO-8859-1")));
    9596        } catch (UnsupportedEncodingException e) {
    9697            _log.error("Caught UnsupportedEncodingException ("
     
    116117                handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps);
    117118                break;
     119            case 3:
     120                handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps);
     121                break;
    118122            default:
    119123                _log.error("BUG! Trying to initialize the wrong SAM version!");
     
    129133    /* Return the best version we can use, or null on failure */
    130134    private static String chooseBestVersion(String minVer, String maxVer) {
     135       
    131136        int minMajor = getMajor(minVer), minMinor = getMinor(minVer);
    132137        int maxMajor = getMajor(maxVer), maxMinor = getMinor(maxVer);
     
    143148        float fmaxVer = (float) maxMajor + (float) maxMinor / 10 ;
    144149       
     150
     151        if ( ( fminVer <=  3.0 ) && ( fmaxVer >= 3.0 ) ) return "3.0" ;
    145152
    146153        if ( ( fminVer <=  2.0 ) && ( fmaxVer >= 2.0 ) ) return "2.0" ;
  • apps/sam/java/src/net/i2p/sam/SAMInvalidDirectionException.java

    red259ac r18a475e  
    1616 */
    1717public class SAMInvalidDirectionException extends Exception {
    18 
     18        static final long serialVersionUID = 1 ;
     19       
    1920    public SAMInvalidDirectionException() {
    2021        super();
  • apps/sam/java/src/net/i2p/sam/SAMMessageSession.java

    red259ac r18a475e  
    110110     */
    111111    protected boolean sendBytesThroughMessageSession(String dest, byte[] data) throws DataFormatException {
    112         Destination d = new Destination();
    113         d.fromBase64(dest);
     112        Destination d = SAMUtils.getDest(dest);
    114113
    115114        if (_log.shouldLog(Log.DEBUG)) {
  • apps/sam/java/src/net/i2p/sam/SAMRawSession.java

    red259ac r18a475e  
    2727    public static final int RAW_SIZE_MAX = 32*1024;
    2828
    29     private SAMRawReceiver recv = null;
     29    protected SAMRawReceiver recv = null;
    3030    /**
    3131     * Create a new SAM RAW session.
  • apps/sam/java/src/net/i2p/sam/SAMStreamReceiver.java

    red259ac r18a475e  
    1010
    1111import java.io.IOException;
     12import java.nio.ByteBuffer;
    1213
    1314import net.i2p.data.Destination;
     
    6162     * @throws IOException
    6263     */
    63     public void receiveStreamBytes(int id, byte data[], int len) throws IOException;
     64    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException;
    6465
    6566    /**
  • apps/sam/java/src/net/i2p/sam/SAMStreamSession.java

    red259ac r18a475e  
    1414import java.io.InterruptedIOException;
    1515import java.io.OutputStream;
     16import java.nio.ByteBuffer;
     17import java.nio.channels.Channels;
    1618import java.net.ConnectException;
    1719import java.net.NoRouteToHostException;
     
    5254    protected SAMStreamReceiver recv = null;
    5355
    54     private SAMStreamSessionServer server = null;
     56    protected SAMStreamSessionServer server = null;
    5557
    5658    protected I2PSocketManager socketMgr = null;
     
    5860    private Object handlersMapLock = new Object();
    5961    /** stream id (Long) to SAMStreamSessionSocketReader */
    60     private HashMap handlersMap = new HashMap();
     62    private HashMap<Integer,SAMStreamSessionSocketReader> handlersMap = new HashMap<Integer,SAMStreamSessionSocketReader>();
    6163    /** stream id (Long) to StreamSender */
    62     private HashMap sendersMap = new HashMap();
     64    private HashMap<Integer,StreamSender> sendersMap = new HashMap<Integer,StreamSender>();
    6365
    6466    private Object idLock = new Object();
     
    7678    public static String PROP_FORCE_FLUSH = "sam.forceFlush";
    7779    public static String DEFAULT_FORCE_FLUSH = "false";
     80   
     81    public SAMStreamSession() {
     82       
     83    }
    7884   
    7985    /**
     
    167173    }
    168174   
    169     private class DisconnectListener implements I2PSocketManager.DisconnectListener {
     175    protected class DisconnectListener implements I2PSocketManager.DisconnectListener {
    170176        public void sessionDisconnected() {
    171177            close();
     
    573579
    574580            int read = -1;
    575             byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
     581            ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE);
    576582
    577583            try {
     
    579585
    580586                while (stillRunning) {
    581                     read = in.read(data);
     587                        data.clear();
     588                    read = Channels.newChannel(in).read(data);
    582589                    if (read == -1) {
    583590                        _log.debug("Handler " + id + ": connection closed");
    584591                        break;
    585592                    }
    586                    
    587                     recv.receiveStreamBytes(id, data, read);
     593                    data.flip();
     594                    recv.receiveStreamBytes(id, data);
    588595                }
    589596            } catch (IOException e) {
     
    651658    protected class v1StreamSender extends StreamSender
    652659      {
    653         private List _data;
     660        private List<ByteArray> _data;
    654661        private int _id;
    655662        private ByteCache _cache;
     
    661668        public v1StreamSender ( I2PSocket s, int id ) throws IOException {
    662669            super ( s, id );
    663             _data = new ArrayList(1);
     670            _data = new ArrayList<ByteArray>(1);
    664671            _id = id;
    665672            _cache = ByteCache.getInstance(4, 32*1024);
  • apps/sam/java/src/net/i2p/sam/SAMUtils.java

    red259ac r18a475e  
    99 */
    1010
     11import java.io.ByteArrayInputStream;
    1112import java.io.IOException;
    1213import java.io.OutputStream;
     
    2021import net.i2p.client.I2PClientFactory;
    2122import net.i2p.client.naming.NamingService;
     23import net.i2p.data.Base64;
    2224import net.i2p.data.DataFormatException;
    2325import net.i2p.data.Destination;
     26import net.i2p.data.PrivateKey;
     27import net.i2p.data.SigningPrivateKey;
    2428import net.i2p.util.Log;
    2529
     
    7478        }
    7579    }
     80   
     81    public static class InvalidDestination extends Exception {
     82        static final long serialVersionUID = 0x1 ;
     83    }
     84    public static void checkPrivateDestination(String dest) throws InvalidDestination {
     85        ByteArrayInputStream destKeyStream = new ByteArrayInputStream(Base64.decode(dest));
     86
     87        try {
     88                new Destination().readBytes(destKeyStream);
     89                new PrivateKey().readBytes(destKeyStream);
     90                new SigningPrivateKey().readBytes(destKeyStream);
     91        } catch (Exception e) {
     92                throw new InvalidDestination();
     93        }
     94    }
     95
    7696
    7797    /**
     
    102122    }
    103123   
     124    /**
     125     * Resolve the destination from a key or a hostname
     126     *
     127     * @param s Hostname or key to be resolved
     128     *
     129     * @return the Destination for the specified hostname, or null if not found
     130     */
     131    public static Destination getDest(String s) throws DataFormatException
     132    {
     133        Destination d = new Destination() ;
     134        try {
     135                d.fromBase64(s);
     136        } catch (DataFormatException e) {
     137                d = lookupHost(s, null);
     138                if ( d==null ) {
     139                        throw e ;
     140                }
     141        }
     142        return d ;
     143    }
     144
    104145    /**
    105146     * Parse SAM parameters, and put them into a Propetries object
  • apps/sam/java/src/net/i2p/sam/SAMv1Handler.java

    red259ac r18a475e  
    1313import java.io.EOFException;
    1414import java.io.IOException;
    15 import java.io.InputStream;
    1615import java.io.InterruptedIOException;
    17 import java.io.OutputStream;
    1816import java.net.ConnectException;
    1917import java.net.NoRouteToHostException;
    20 import java.net.Socket;
     18import java.nio.channels.SocketChannel;
     19import java.nio.ByteBuffer;
    2120import java.util.Properties;
    2221import java.util.StringTokenizer;
     
    4140    private final static Log _log = new Log(SAMv1Handler.class);
    4241
    43     private final static int IN_BUFSIZE = 2048;
    44 
    45     private SAMRawSession rawSession = null;
    46     private SAMDatagramSession datagramSession = null;
    47   protected SAMStreamSession streamSession = null;
    48 
    49     private long _id;
    50     private static volatile long __id = 0;
     42    protected SAMRawSession rawSession = null;
     43    protected SAMDatagramSession datagramSession = null;
     44    protected SAMStreamSession streamSession = null;
     45    protected SAMDatagramSession getDatagramSession() {return datagramSession ;}       
     46    protected SAMRawSession getRawSession() {return rawSession ;}
     47
     48    protected long _id;
     49    protected static volatile long __id = 0;
    5150   
    5251    /**
     
    6160     * @throws IOException
    6261     */
    63     public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException, IOException {
     62    public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException {
    6463        this(s, verMajor, verMinor, new Properties());
    6564    }
     
    7675     * @throws IOException
    7776     */
    78     public SAMv1Handler(Socket s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
     77    public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
    7978        super(s, verMajor, verMinor, i2cpProps);
    8079        _id = ++__id;
     
    102101
    103102        try {
    104             InputStream in = getClientSocketInputStream();
    105             int b = -1;
    106 
    107103            while (true) {
    108104                if (shouldStop()) {
     
    111107                }
    112108
    113                 msg = DataHelper.readLine(in);
     109                msg = DataHelper.readLine(getClientSocket().socket().getInputStream()).trim();
    114110                if (msg == null) {
    115111                    _log.debug("Connection closed by client");
     
    176172                _log.error("Error closing socket: " + e.getMessage());
    177173            }
    178             if (rawSession != null) {
    179                 rawSession.close();
    180             }
    181             if (datagramSession != null) {
    182                 datagramSession.close();
     174            if (getRawSession() != null) {
     175                getRawSession().close();
     176            }
     177            if (getDatagramSession() != null) {
     178                getDatagramSession().close();
    183179            }
    184180            if (streamSession != null) {
     
    189185
    190186    /* Parse and execute a SESSION message */
    191     private boolean execSessionMessage(String opcode, Properties props) {
     187    protected boolean execSessionMessage(String opcode, Properties props) {
    192188
    193189        String dest = "BUG!";
     
    195191        try{
    196192            if (opcode.equals("CREATE")) {
    197                 if ((rawSession != null) || (datagramSession != null)
     193                if ((getRawSession() != null) || (getDatagramSession() != null)
    198194                    || (streamSession != null)) {
    199195                    _log.debug("Trying to create a session, but one still exists");
     
    294290               
    295291    /* Parse and execute a DEST message*/
    296     private boolean execDestMessage(String opcode, Properties props) {
     292  protected boolean execDestMessage(String opcode, Properties props) {
    297293
    298294        if (opcode.equals("GENERATE")) {
     
    319315
    320316    /* Parse and execute a NAMING message */
    321     private boolean execNamingMessage(String opcode, Properties props) {
     317  protected boolean execNamingMessage(String opcode, Properties props) {
    322318        if (opcode.equals("LOOKUP")) {
    323319            if (props == null) {
     
    332328            }
    333329
    334             Destination dest;
     330            Destination dest = null ;
    335331            if (name.equals("ME")) {
    336                 if (rawSession != null) {
    337                     dest = rawSession.getDestination();
     332                if (getRawSession() != null) {
     333                    dest = getRawSession().getDestination();
    338334                } else if (streamSession != null) {
    339335                    dest = streamSession.getDestination();
    340                 } else if (datagramSession != null) {
    341                     dest = datagramSession.getDestination();
     336                } else if (getDatagramSession() != null) {
     337                    dest = getDatagramSession().getDestination();
    342338                } else {
    343339                    _log.debug("Lookup for SESSION destination, but session is null");
     
    345341                }
    346342            } else {
    347                 dest = SAMUtils.lookupHost(name, null);
     343                try {
     344                        dest = SAMUtils.getDest(name);
     345                } catch (DataFormatException e) {
     346                }
    348347            }
    349348           
     
    365364
    366365    /* Parse and execute a DATAGRAM message */
    367     private boolean execDatagramMessage(String opcode, Properties props) {
    368         if (datagramSession == null) {
     366    protected boolean execDatagramMessage(String opcode, Properties props) {
     367        if (getDatagramSession() == null) {
    369368            _log.error("DATAGRAM message received, but no DATAGRAM session exists");
    370369            return false;
     
    404403
    405404            try {
    406                 DataInputStream in = new DataInputStream(getClientSocketInputStream());
     405                DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
    407406                byte[] data = new byte[size];
    408407
     
    436435
    437436    /* Parse and execute a RAW message */
    438     private boolean execRawMessage(String opcode, Properties props) {
    439         if (rawSession == null) {
     437    protected boolean execRawMessage(String opcode, Properties props) {
     438        if (getRawSession() == null) {
    440439            _log.error("RAW message received, but no RAW session exists");
    441440            return false;
     
    475474
    476475            try {
    477                 DataInputStream in = new DataInputStream(getClientSocketInputStream());
     476                DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
    478477                byte[] data = new byte[size];
    479478
    480479                in.readFully(data);
    481480
    482                 if (!rawSession.sendBytes(dest, data)) {
     481                if (!getRawSession().sendBytes(dest, data)) {
    483482                    _log.error("RAW SEND failed");
    484483                    return true;
     
    568567
    569568        try {
    570             if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) {
     569            if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
    571570                if (_log.shouldLog(Log.WARN))
    572571                    _log.warn("STREAM SEND [" + size + "] failed");
     
    692691    // SAMRawReceiver implementation
    693692    public void receiveRawBytes(byte data[]) throws IOException {
    694         if (rawSession == null) {
     693        if (getRawSession() == null) {
    695694            _log.error("BUG! Received raw bytes, but session is null!");
    696695            throw new NullPointerException("BUG! RAW session is null!");
     
    702701        msg.write(msgText.getBytes("ISO-8859-1"));
    703702        msg.write(data);
     703        msg.flush();
    704704       
    705705        if (_log.shouldLog(Log.DEBUG))
    706706            _log.debug("sending to client: " + msgText);
    707707
    708         writeBytes(msg.toByteArray());
     708        writeBytes(ByteBuffer.wrap(msg.toByteArray()));
    709709    }
    710710
     
    712712        _log.debug("stopRawReceiving() invoked");
    713713
    714         if (rawSession == null) {
     714        if (getRawSession() == null) {
    715715            _log.error("BUG! Got raw receiving stop, but session is null!");
    716716            throw new NullPointerException("BUG! RAW session is null!");
     
    727727    // SAMDatagramReceiver implementation
    728728    public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
    729         if (datagramSession == null) {
     729        if (getDatagramSession() == null) {
    730730            _log.error("BUG! Received datagram bytes, but session is null!");
    731731            throw new NullPointerException("BUG! DATAGRAM session is null!");
     
    741741            _log.debug("sending to client: " + msgText);
    742742        msg.write(data);
    743 
    744         writeBytes(msg.toByteArray());
     743        msg.flush();
     744        writeBytes(ByteBuffer.wrap(msg.toByteArray()));
    745745    }
    746746
     
    748748        _log.debug("stopDatagramReceiving() invoked");
    749749
    750         if (datagramSession == null) {
     750        if (getDatagramSession() == null) {
    751751            _log.error("BUG! Got datagram receiving stop, but session is null!");
    752752            throw new NullPointerException("BUG! DATAGRAM session is null!");
     
    831831    }
    832832 
    833     public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
     833    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
    834834        if (streamSession == null) {
    835835            _log.error("Received stream bytes, but session is null!");
     
    837837        }
    838838
    839         String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + len + "\n";
     839        String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n";
    840840        if (_log.shouldLog(Log.DEBUG))
    841841            _log.debug("sending to client: " + msgText);
    842842       
    843         byte prefix[] = msgText.getBytes("ISO-8859-1");
     843        ByteBuffer prefix = ByteBuffer.wrap(msgText.getBytes("ISO-8859-1"));
    844844       
    845         // dont waste so much memory
    846         //ByteArrayOutputStream msg = new ByteArrayOutputStream();
    847         //msg.write(msgText.getBytes("ISO-8859-1"));
    848         //msg.write(data, 0, len);
    849         // writeBytes(msg.toByteArray());
    850845        Object writeLock = getWriteLock();
    851         OutputStream out = getOut();
    852846        synchronized (writeLock) {
    853             out.write(prefix);
    854             out.write(data, 0, len);
    855             out.flush();
     847                while (prefix.hasRemaining()) socket.write(prefix);
     848            while (data.hasRemaining()) socket.write(data);
     849            socket.socket().getOutputStream().flush();
    856850        }
    857851    }
  • apps/sam/java/src/net/i2p/sam/SAMv2Handler.java

    red259ac r18a475e  
    1010
    1111import java.io.IOException;
    12 import java.net.Socket;
     12import java.nio.channels.SocketChannel;
    1313import java.util.Properties;
    1414
     
    3737                 * @param verMinor SAM minor version to manage
    3838                 */
    39                 public SAMv2Handler ( Socket s, int verMajor, int verMinor ) throws SAMException, IOException
     39                public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
    4040                {
    4141                        this ( s, verMajor, verMinor, new Properties() );
     
    5353                 */
    5454
    55                 public SAMv2Handler ( Socket s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
     55                public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
    5656                {
    5757                        super ( s, verMajor, verMinor, i2cpProps );
  • apps/sam/java/src/net/i2p/sam/SAMv2StreamSession.java

    red259ac r18a475e  
    1313import java.io.InterruptedIOException;
    1414import java.io.OutputStream;
     15import java.nio.channels.Channels;
     16import java.nio.ByteBuffer;
    1517import java.net.ConnectException;
    1618import java.net.NoRouteToHostException;
     
    141143                {
    142144
    143                                 private Object runningLock = new Object();
    144                                 private boolean stillRunning = true;
    145 
    146145                                private int id;
    147146                                private Destination      dest ;
     
    246245
    247246                {
    248                                 private List _data;
     247                                private List<ByteArray> _data;
    249248                                private int _dataSize;
    250249                                private int _id;
     
    258257                                {
    259258                                        super ( s, id );
    260                                         _data = new ArrayList ( 1 );
     259                                        _data = new ArrayList<ByteArray> ( 1 );
    261260                                        _dataSize = 0;
    262261                                        _id = id;
     
    512511
    513512                                        int read = -1;
    514                                         byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
     513                                        ByteBuffer data = ByteBuffer.allocateDirect(SOCKET_HANDLER_BUF_SIZE);
    515514
    516515                                        try
     
    534533                                                        }
    535534                                                       
    536                                                         read = in.read ( data );
     535                                                        data.clear();
     536                                                        read = Channels.newChannel(in).read ( data );
    537537
    538538                                                        if ( read == -1 )
     
    543543
    544544                                                        totalReceived += read ;
    545                                                        
    546                                                         recv.receiveStreamBytes ( id, data, read );
     545                                                        data.flip();
     546                                                        recv.receiveStreamBytes ( id, data );
    547547                                                }
    548548                                        }
  • apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java

    red259ac r18a475e  
    1313 */
    1414public class SAMEventHandler extends SAMClientEventListenerImpl {
    15     private I2PAppContext _context;
     15    //private I2PAppContext _context;
    1616    private Log _log;
    1717    private Boolean _helloOk;
     
    2020    private Object _sessionCreateLock = new Object();
    2121    private Object _namingReplyLock = new Object();
    22     private Map _namingReplies = new HashMap();
     22    private Map<String,String> _namingReplies = new HashMap<String,String>();
    2323
    2424    public SAMEventHandler(I2PAppContext ctx) {
    25         _context = ctx;
     25        //_context = ctx;
    2626        _log = ctx.logManager().getLog(getClass());
    2727    }
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    red259ac r18a475e  
    3232    private InputStream _samIn;
    3333    private SAMReader _reader;
    34     private boolean _dead;
     34    //private boolean _dead;
    3535    private SAMEventHandler _eventHandler;
    3636    /** Connection id (Integer) to peer (Flooder) */
    37     private Map _remotePeers;
     37    private Map<Integer, Sender> _remotePeers;
    3838   
    3939    public static void main(String args[]) {
     
    4343        }
    4444        I2PAppContext ctx = new I2PAppContext();
    45         String files[] = new String[args.length - 3];
     45        //String files[] = new String[args.length - 3];
    4646        SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]);
    4747        sender.startup();
     
    5151        _context = ctx;
    5252        _log = ctx.logManager().getLog(SAMStreamSend.class);
    53         _dead = false;
     53        //_dead = false;
    5454        _samHost = samHost;
    5555        _samPort = samPort;
     
    5858        _conOptions = "";
    5959        _eventHandler = new SendEventHandler(_context);
    60         _remotePeers = new HashMap();
     60        _remotePeers = new HashMap<Integer,Sender>();
    6161    }
    6262   
     
    208208            _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
    209209            byte data[] = new byte[1024];
    210             long value = 0;
    211210            long lastSend = _context.clock().now();
    212211            while (!_closed) {
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    red259ac r18a475e  
    3232    private InputStream _samIn;
    3333    private SAMReader _reader;
    34     private boolean _dead;
     34    //private boolean _dead;
    3535    private SAMEventHandler _eventHandler;
    3636    /** Connection id (Integer) to peer (Flooder) */
    37     private Map _remotePeers;
     37    private Map<Integer, Sink> _remotePeers;
    3838   
    3939    public static void main(String args[]) {
     
    5050        _context = ctx;
    5151        _log = ctx.logManager().getLog(SAMStreamSink.class);
    52         _dead = false;
     52        //_dead = false;
    5353        _samHost = samHost;
    5454        _samPort = samPort;
     
    5757        _conOptions = "";
    5858        _eventHandler = new SinkEventHandler(_context);
    59         _remotePeers = new HashMap();
     59        _remotePeers = new HashMap<Integer,Sink>();
    6060    }
    6161   
     
    7171            _log.debug("Handshake complete.  we are " + ourDest);
    7272            if (ourDest != null) {
    73                 boolean written = writeDest(ourDest);
     73                //boolean written =
     74                        writeDest(ourDest);
    7475                _log.debug("Dest written");
    7576            }
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java

    red259ac r18a475e  
    33import java.util.concurrent.LinkedBlockingQueue;
    44import java.util.concurrent.TimeUnit;
    5 import java.util.ArrayList;
    6 import java.util.List;
    75
    86import net.i2p.I2PAppContext;
     
    2422    private ConnectionManager _manager;
    2523    private LinkedBlockingQueue<Packet> _synQueue;
     24    private Object _synSignal;
    2625    private boolean _active;
    2726    private int _acceptTimeout;
     
    4241        _log = context.logManager().getLog(ConnectionHandler.class);
    4342        _manager = mgr;
    44         _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
     43        _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
     44        _synSignal= new Object();
    4545        _active = false;
    4646        _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
     
    8282        if (success) {
    8383            SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout);
     84            // advertise the new syn packet to threads that could be waiting
     85            // (by calling waitSyn(long)
     86            if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE))
     87                synchronized (this._synSignal) {this._synSignal.notifyAll();}
    8488        } else {
    8589            if (_log.shouldLog(Log.WARN))
     
    8892                sendReset(packet);
    8993        }
     94    }
     95   
     96    /**
     97     * Wait until some SYN packet is available
     98     * @param ms max amount of time to wait for a connection (if negative or null,
     99     *                wait indefinitely)
     100     * @throws InterruptedException
     101     */
     102    public void waitSyn( long ms ) throws InterruptedException {
     103        synchronized (this._synSignal)
     104        {
     105                long now = this._context.clock().now() ;
     106                long expiration = now + ms ;
     107                while ( expiration > now || ms<=0 ) {
     108                        // check if there is a SYN packet in the queue
     109                        for ( Packet p : this._synQueue ) {
     110                                if ( p.isFlagSet(Packet.FLAG_SYNCHRONIZE) ) return ;
     111                        }
     112                        // wait until a SYN is signaled
     113                        if ( ms == 0) {
     114                                this._synSignal.wait();
     115                        } else {
     116                                this._synSignal.wait(expiration-now);
     117                                now = this._context.clock().now();
     118                        }
     119                }
     120        }
    90121    }
    91122   
  • apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java

    red259ac r18a475e  
    22
    33import java.net.SocketTimeoutException;
     4
     5import net.i2p.I2PAppContext;
    46import net.i2p.I2PException;
    57
     
    4648        return _socketManager;
    4749    }
     50
     51    /**
     52     * accept(timeout) waits timeout ms for a socket connecting. If a socket is
     53     * not available during the timeout, return null. accept(0) behaves like accept()
     54     *
     55     * @param timeout in ms
     56     *
     57     * @return a connected I2PSocket, or null
     58     *
     59     * @throws I2PException if there is a problem with reading a new socket
     60     *         from the data available (aka the I2PSession closed, etc)
     61     */
     62
     63        public I2PSocket accept(long timeout)  throws I2PException {
     64                long reset_timeout = this.getSoTimeout();
     65
     66                try {
     67                        this.setSoTimeout(timeout);
     68                        return this.accept();
     69                } catch (SocketTimeoutException e) {
     70                        return null ;
     71                } finally {
     72                        this.setSoTimeout(reset_timeout);
     73                }
     74        }
     75
     76        /**
     77         * block until a SYN packet is detected or the timeout is reached. If timeout is 0,
     78         * block until a SYN packet is detected.
     79         *
     80         * @param timeoutMs
     81         * @throws InterruptedException
     82         * @throws I2PException
     83         */
     84        public void waitIncoming(long timeoutMs) throws I2PException, InterruptedException {
     85                if (this._socketManager.getConnectionManager().getSession().isClosed())
     86                        throw new I2PException("Session is closed");
     87        this._socketManager.getConnectionManager().getConnectionHandler().waitSyn(timeoutMs);
     88        }
    4889}
Note: See TracChangeset for help on using the changeset viewer.