Changeset 9ce8fce


Ignore:
Timestamp:
Nov 27, 2015 8:58:18 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
b1668bb
Parents:
8d7edaae (diff), 01d2371 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p.zzz.sam' (head b328f0edb961263d7606ea964ecb3f7c319ca1cf)

to branch 'i2p.i2p' (head 7b4c0525be182722ef2cc7b564691f27d997da3b)

Files:
6 added
24 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/build.xml

    r8d7edaae r9ce8fce  
    2222    </target>
    2323
    24     <property name="javac.compilerargs" value="" />
     24    <!-- ignored for now, we require java 7 here -->
     25    <property name="javac.compilerargs7" value="" />
     26    <!-- ignored for now, we require java 7 here -->
    2527    <property name="javac.version" value="1.6" />
    2628
     
    3133        <javac
    3234            srcdir="./src"
    33             debug="true" deprecation="on" source="${javac.version}" target="${javac.version}"
     35            debug="true" deprecation="on" source="1.7" target="1.7"
    3436            includeAntRuntime="false"
    3537            destdir="./build/obj"
    3638            classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" >
    37             <compilerarg line="${javac.compilerargs}" />
     39            <compilerarg line="${javac.compilerargs7}" />
    3840        </javac>
    3941    </target>
     
    4244        <javac
    4345            srcdir="./test"
    44             debug="true" deprecation="on" source="${javac.version}" target="${javac.version}"
     46            debug="true" deprecation="on" source="1.7" target="1.7"
    4547            includeAntRuntime="false"
    4648            destdir="./build/obj"
    4749            classpath="../../../core/java/build/i2p.jar:../../ministreaming/java/build/mstreaming.jar" >
    48             <compilerarg line="${javac.compilerargs}" />
     50            <compilerarg line="${javac.compilerargs7}" />
    4951        </javac>
    5052    </target>
  • apps/sam/java/src/net/i2p/sam/SAMBridge.java

    r8d7edaae r9ce8fce  
    1010
    1111import java.io.BufferedReader;
     12import java.io.File;
    1213import java.io.FileInputStream;
    1314import java.io.FileNotFoundException;
     
    1516import java.io.IOException;
    1617import java.io.InputStreamReader;
     18import java.net.InetAddress;
    1719import java.net.InetSocketAddress;
    1820import java.nio.channels.ServerSocketChannel;
    1921import java.nio.channels.SocketChannel;
    20 import java.nio.ByteBuffer;
    2122import java.util.Arrays;
    2223import java.util.ArrayList;
     
    2829import java.util.Set;
    2930
     31import javax.net.ssl.SSLServerSocket;
     32import javax.net.ssl.SSLServerSocketFactory;
     33
     34import gnu.getopt.Getopt;
     35
    3036import net.i2p.I2PAppContext;
    3137import net.i2p.app.*;
    3238import static net.i2p.app.ClientAppState.*;
    3339import net.i2p.data.DataFormatException;
     40import net.i2p.data.DataHelper;
    3441import net.i2p.data.Destination;
    3542import net.i2p.util.I2PAppThread;
     43import net.i2p.util.I2PSSLSocketFactory;
    3644import net.i2p.util.Log;
    3745import net.i2p.util.PortMapper;
     46import net.i2p.util.SystemVersion;
    3847
    3948/**
     
    4958    private final int _listenPort;
    5059    private final Properties i2cpProps;
     60    private final boolean _useSSL;
     61    private final File _configFile;
    5162    private volatile Thread _runner;
     63    private final Object _v3DGServerLock = new Object();
     64    private SAMv3DatagramServer _v3DGServer;
    5265
    5366    /**
     
    6679
    6780    private final ClientAppManager _mgr;
    68     private final String[] _args;
    6981    private volatile ClientAppState _state = UNINITIALIZED;
    7082
     
    7284   
    7385    public static final String DEFAULT_SAM_KEYFILE = "sam.keys";
     86    static final String DEFAULT_SAM_CONFIGFILE = "sam.config";
     87    private static final String PROP_SAM_KEYFILE = "sam.keyfile";
     88    private static final String PROP_SAM_SSL = "sam.useSSL";
    7489    public static final String PROP_TCP_HOST = "sam.tcp.host";
    7590    public static final String PROP_TCP_PORT = "sam.tcp.port";
    76     protected static final String DEFAULT_TCP_HOST = "0.0.0.0";
     91    public static final String PROP_AUTH = "sam.auth";
     92    public static final String PROP_PW_PREFIX = "sam.auth.";
     93    public static final String PROP_PW_SUFFIX = ".shash";
     94    protected static final String DEFAULT_TCP_HOST = "127.0.0.1";
    7795    protected static final String DEFAULT_TCP_PORT = "7656";
    7896   
    7997    public static final String PROP_DATAGRAM_HOST = "sam.udp.host";
    8098    public static final String PROP_DATAGRAM_PORT = "sam.udp.port";
    81     protected static final String DEFAULT_DATAGRAM_HOST = "0.0.0.0";
    82     protected static final String DEFAULT_DATAGRAM_PORT = "7655";
     99    protected static final String DEFAULT_DATAGRAM_HOST = "127.0.0.1";
     100    protected static final int DEFAULT_DATAGRAM_PORT_INT = 7655;
     101    protected static final String DEFAULT_DATAGRAM_PORT = Integer.toString(DEFAULT_DATAGRAM_PORT_INT);
    83102
    84103
     
    96115        _log = context.logManager().getLog(SAMBridge.class);
    97116        _mgr = mgr;
    98         _args = args;
    99117        Options options = getOptions(args);
    100118        _listenHost = options.host;
    101119        _listenPort = options.port;
     120        _useSSL = options.isSSL;
     121        if (_useSSL && !SystemVersion.isJava7())
     122            throw new IllegalArgumentException("SSL requires Java 7 or higher");
    102123        persistFilename = options.keyFile;
     124        _configFile = options.configFile;
    103125        nameToPrivKeys = new HashMap<String,String>(8);
    104126        _handlers = new HashSet<Handler>(8);
     
    124146     * @throws RuntimeException if a server socket can't be opened
    125147     */
    126     public SAMBridge(String listenHost, int listenPort, Properties i2cpProps, String persistFile) {
     148    public SAMBridge(String listenHost, int listenPort, boolean isSSL, Properties i2cpProps,
     149                     String persistFile, File configFile) {
    127150        _log = I2PAppContext.getGlobalContext().logManager().getLog(SAMBridge.class);
    128151        _mgr = null;
    129         _args = new String[] {listenHost, Integer.toString(listenPort) };  // placeholder
    130152        _listenHost = listenHost;
    131153        _listenPort = listenPort;
     154        _useSSL = isSSL;
     155        if (_useSSL && !SystemVersion.isJava7())
     156            throw new IllegalArgumentException("SSL requires Java 7 or higher");
     157        this.i2cpProps = i2cpProps;
    132158        persistFilename = persistFile;
     159        _configFile = configFile;
    133160        nameToPrivKeys = new HashMap<String,String>(8);
    134161        _handlers = new HashSet<Handler>(8);
     
    143170            throw new RuntimeException(e);
    144171        }
    145         this.i2cpProps = i2cpProps;
    146172        _state = INITIALIZED;
    147173    }
     
    151177     */
    152178    private void openSocket() throws IOException {
    153         if ( (_listenHost != null) && !("0.0.0.0".equals(_listenHost)) ) {
    154             serverSocket = ServerSocketChannel.open();
    155             serverSocket.socket().bind(new InetSocketAddress(_listenHost, _listenPort));
    156             if (_log.shouldLog(Log.DEBUG))
    157                 _log.debug("SAM bridge listening on "
    158                            + _listenHost + ":" + _listenPort);
     179        if (_useSSL) {
     180            SSLServerSocketFactory fact = SSLUtil.initializeFactory(i2cpProps);
     181            InetAddress addr;
     182            if (_listenHost != null && !_listenHost.equals("0.0.0.0"))
     183                addr = InetAddress.getByName(_listenHost);
     184            else
     185                addr = null;
     186            SSLServerSocket sock = (SSLServerSocket) fact.createServerSocket(_listenPort, 0, addr);
     187            I2PSSLSocketFactory.setProtocolsAndCiphers(sock);
     188            serverSocket = new SSLServerSocketChannel(sock);
    159189        } else {
    160190            serverSocket = ServerSocketChannel.open();
    161             serverSocket.socket().bind(new InetSocketAddress(_listenPort));
    162             if (_log.shouldLog(Log.DEBUG))
    163                 _log.debug("SAM bridge listening on 0.0.0.0:" + _listenPort);
     191            if (_listenHost != null && !_listenHost.equals("0.0.0.0")) {
     192                serverSocket.socket().bind(new InetSocketAddress(_listenHost, _listenPort));
     193                if (_log.shouldLog(Log.DEBUG))
     194                    _log.debug("SAM bridge listening on "
     195                               + _listenHost + ":" + _listenPort);
     196            } else {
     197                serverSocket.socket().bind(new InetSocketAddress(_listenPort));
     198                if (_log.shouldLog(Log.DEBUG))
     199                    _log.debug("SAM bridge listening on 0.0.0.0:" + _listenPort);
     200            }
    164201        }
    165202    }
     
    321358    }
    322359
     360    /**
     361     * Was a static singleton, now a singleton for this bridge.
     362     * Instantiate and start server if it doesn't exist.
     363     * We only listen on one host and port, as specified in the
     364     * sam.udp.host and sam.udp.port properties.
     365     * TODO we could have multiple servers on different hosts/ports in the future.
     366     *
     367     * @param props non-null instantiate and start server if it doesn't exist
     368     * @param return non-null
     369     * @throws IOException if can't bind to host/port, or if different than existing
     370     * @since 0.9.24
     371     */
     372    SAMv3DatagramServer getV3DatagramServer(Properties props) throws IOException {
     373        String host = props.getProperty(PROP_DATAGRAM_HOST, DEFAULT_DATAGRAM_HOST);
     374        int port;
     375        String portStr = props.getProperty(PROP_DATAGRAM_PORT, DEFAULT_DATAGRAM_PORT);
     376        try {
     377            port = Integer.parseInt(portStr);
     378        } catch (NumberFormatException e) {
     379            port = DEFAULT_DATAGRAM_PORT_INT;
     380        }
     381        synchronized (_v3DGServerLock) {
     382            if (_v3DGServer == null) {
     383                _v3DGServer = new SAMv3DatagramServer(this, host, port, props);
     384                _v3DGServer.start();
     385            } else {
     386                if (_v3DGServer.getPort() != port || !_v3DGServer.getHost().equals(host))
     387                    throw new IOException("Already have V3 DatagramServer with host=" + host + " port=" + port);
     388            }
     389            return _v3DGServer;
     390        }
     391    }
     392
     393
    323394    ////// begin ClientApp interface, use only if using correct construtor
    324395
     
    382453     */
    383454    public String getDisplayName() {
    384         return "SAM " + Arrays.toString(_args);
     455        return "SAM " + _listenHost + ':' + _listenPort;
    385456    }
    386457
     
    422493        try {
    423494            Options options = getOptions(args);
    424             SAMBridge bridge = new SAMBridge(options.host, options.port, options.opts, options.keyFile);
     495            SAMBridge bridge = new SAMBridge(options.host, options.port, options.isSSL, options.opts,
     496                                             options.keyFile, options.configFile);
    425497            bridge.startThread();
    426498        } catch (RuntimeException e) {
     
    460532        private final int port;
    461533        private final Properties opts;
    462 
    463         public Options(String host, int port, Properties opts, String keyFile) {
     534        private final boolean isSSL;
     535        private final File configFile;
     536
     537        public Options(String host, int port, boolean isSSL, Properties opts, String keyFile, File configFile) {
    464538            this.host = host; this.port = port; this.opts = opts; this.keyFile = keyFile;
     539            this.isSSL = isSSL;
     540            this.configFile = configFile;
    465541        }
    466542    }
     
    477553     * @param args [ keyfile [ listenHost ] listenPort [ name=val ]* ]
    478554     * @return non-null Options or throws Exception
     555     * @throws HelpRequestedException on command line problems
     556     * @throws IllegalArgumentException if specified config file does not exist
     557     * @throws IOException if specified config file cannot be read, or on SSL keystore problems
    479558     * @since 0.9.6
    480559     */
    481560    private static Options getOptions(String args[]) throws Exception {
    482         String keyfile = DEFAULT_SAM_KEYFILE;
    483         int port = SAM_LISTENPORT;
    484         String host = DEFAULT_TCP_HOST;
    485         Properties opts = null;
    486         if (args.length > 0) {
    487                 opts = parseOptions(args, 0);
    488                 keyfile = args[0];
    489                 int portIndex = 1;
    490                 try {
    491                         if (args.length>portIndex) port = Integer.parseInt(args[portIndex]);
    492                 } catch (NumberFormatException nfe) {
    493                         host = args[portIndex];
    494                         portIndex++;
    495                         try {
    496                                 if (args.length>portIndex) port = Integer.parseInt(args[portIndex]);
    497                         } catch (NumberFormatException nfe1) {
    498                                 port = Integer.parseInt(opts.getProperty(SAMBridge.PROP_TCP_PORT, SAMBridge.DEFAULT_TCP_PORT));
    499                                 host = opts.getProperty(SAMBridge.PROP_TCP_HOST, SAMBridge.DEFAULT_TCP_HOST);
    500                         }
    501                 }
    502         }
    503         return new Options(host, port, opts, keyfile);
    504     }
    505 
    506     private static Properties parseOptions(String args[], int startArgs) throws HelpRequestedException {
    507         Properties props = new Properties();
    508         // skip over first few options
     561        String keyfile = null;
     562        int port = -1;
     563        String host = null;
     564        boolean isSSL = false;
     565        String cfile = null;
     566        Getopt g = new Getopt("SAM", args, "hsc:");
     567        int c;
     568        while ((c = g.getopt()) != -1) {
     569          switch (c) {
     570            case 's':
     571                isSSL = true;
     572                break;
     573
     574            case 'c':
     575                cfile = g.getOptarg();
     576                break;
     577
     578            case 'h':
     579            case '?':
     580            case ':':
     581            default:
     582                throw new HelpRequestedException();
     583          }  // switch
     584        } // while
     585
     586        int startArgs = g.getOptind();
     587        // possible args before ones containing '=';
     588        // (none)
     589        // key port
     590        // key host port
     591        int startOpts;
     592        for (startOpts = startArgs; startOpts < args.length; startOpts++) {
     593            if (args[startOpts].contains("="))
     594                break;
     595        }
     596        int numArgs = startOpts - startArgs;
     597        switch (numArgs) {
     598            case 0:
     599                break;
     600
     601            case 2:
     602                keyfile = args[startArgs];
     603                try {
     604                    port = Integer.parseInt(args[startArgs + 1]);
     605                } catch (NumberFormatException nfe) {
     606                    throw new HelpRequestedException();
     607                }
     608                break;
     609
     610            case 3:
     611                keyfile = args[startArgs];
     612                host = args[startArgs + 1];
     613                try {
     614                    port = Integer.parseInt(args[startArgs + 2]);
     615                } catch (NumberFormatException nfe) {
     616                    throw new HelpRequestedException();
     617                }
     618                break;
     619
     620            default:
     621                throw new HelpRequestedException();
     622        }
     623
     624        String scfile = cfile != null ? cfile : DEFAULT_SAM_CONFIGFILE;
     625        File file = new File(scfile);
     626        if (!file.isAbsolute())
     627            file = new File(I2PAppContext.getGlobalContext().getConfigDir(), scfile);
     628
     629        Properties opts = new Properties();
     630        if (file.exists()) {
     631            DataHelper.loadProps(opts, file);
     632        } else if (cfile != null) {
     633            // only throw if specified on command line
     634            throw new IllegalArgumentException("Config file not found: " + file);
     635        }
     636        // command line trumps config file trumps defaults
     637        if (host == null)
     638            host = opts.getProperty(PROP_TCP_HOST, DEFAULT_TCP_HOST);
     639        if (port < 0) {
     640            try {
     641                port = Integer.parseInt(opts.getProperty(PROP_TCP_PORT, DEFAULT_TCP_PORT));
     642            } catch (NumberFormatException nfe) {
     643                throw new HelpRequestedException();
     644            }
     645        }
     646        if (keyfile == null)
     647            keyfile = opts.getProperty(PROP_SAM_KEYFILE, DEFAULT_SAM_KEYFILE);
     648        if (!isSSL)
     649            isSSL = Boolean.parseBoolean(opts.getProperty(PROP_SAM_SSL));
     650        if (isSSL) {
     651            // must do this before we add command line opts since we may be writing them back out
     652            boolean shouldSave = SSLUtil.verifyKeyStore(opts);
     653            if (shouldSave)
     654                DataHelper.storeProps(opts, file);
     655        }
     656
     657        int remaining = args.length - startOpts;
     658        if (remaining > 0) {
     659                parseOptions(args, startOpts, opts);
     660        }
     661        return new Options(host, port, isSSL, opts, keyfile, file);
     662    }
     663
     664    /**
     665     *  Parse key=value options starting at startArgs.
     666     *  @param props out parameter, any options found are added
     667     *  @throws HelpRequestedException on any item not of the form key=value.
     668     */
     669    private static void parseOptions(String args[], int startArgs, Properties props) throws HelpRequestedException {
    509670        for (int i = startArgs; i < args.length; i++) {
    510                 if (args[i].equals("-h")) throw new HelpRequestedException();
    511671            int eq = args[i].indexOf('=');
    512             if (eq <= 0) continue;
    513             if (eq >= args[i].length()-1) continue;
     672            if (eq <= 0)
     673                throw new HelpRequestedException();
     674            if (eq >= args[i].length()-1)
     675                throw new HelpRequestedException();
    514676            String key = args[i].substring(0, eq);
    515677            String val = args[i].substring(eq+1);
     
    518680            if ( (key.length() > 0) && (val.length() > 0) )
    519681                props.setProperty(key, val);
    520         }
    521         return props;
     682            else
     683                throw new HelpRequestedException();
     684        }
    522685    }
    523686   
    524687    private static void usage() {
    525         System.err.println("Usage: SAMBridge [keyfile [listenHost] listenPortNum[ name=val]*]");
    526         System.err.println("or:");
    527         System.err.println("       SAMBridge [ name=val ]*");
    528         System.err.println(" keyfile: location to persist private keys (default sam.keys)");
    529         System.err.println(" listenHost: interface to listen on (0.0.0.0 for all interfaces)");
    530         System.err.println(" listenPort: port to listen for SAM connections on (default 7656)");
    531         System.err.println(" name=val: options to pass when connecting via I2CP, such as ");
    532         System.err.println("           i2cp.host=localhost and i2cp.port=7654");
    533         System.err.println("");
    534         System.err.println("Host and ports of the SAM bridge can be specified with the alternate");
    535         System.err.println("form by specifying options "+SAMBridge.PROP_TCP_HOST+" and/or "+
    536                         SAMBridge.PROP_TCP_PORT);
    537         System.err.println("");
    538         System.err.println("Options "+SAMBridge.PROP_DATAGRAM_HOST+" and "+SAMBridge.PROP_DATAGRAM_PORT+
    539                         " specify the listening ip");
    540         System.err.println("range and the port of SAM datagram server. This server is");
    541         System.err.println("only launched after a client creates the first SAM datagram");
    542         System.err.println("or raw session, after a handshake with SAM version >= 3.0.");
    543         System.err.println("");
    544         System.err.println("The option loglevel=[DEBUG|WARN|ERROR|CRIT] can be used");
    545         System.err.println("for tuning the log verbosity.\n");
     688        System.err.println("Usage: SAMBridge [-s] [-c sam.config] [keyfile [listenHost] listenPortNum[ name=val]*]\n" +
     689                           "or:\n" +
     690                           "       SAMBridge [ name=val ]*\n" +
     691                           " -s: Use SSL\n" +
     692                           " -c sam.config: Specify config file\n" +
     693                           " keyfile: location to persist private keys (default sam.keys)\n" +
     694                           " listenHost: interface to listen on (0.0.0.0 for all interfaces)\n" +
     695                           " listenPort: port to listen for SAM connections on (default 7656)\n" +
     696                           " name=val: options to pass when connecting via I2CP, such as \n" +
     697                           "           i2cp.host=localhost and i2cp.port=7654\n" +
     698                           "\n" +
     699                           "Host and ports of the SAM bridge can be specified with the alternate\n" +
     700                           "form by specifying options "+SAMBridge.PROP_TCP_HOST+" and/or "+
     701                           SAMBridge.PROP_TCP_PORT +
     702                           "\n" +
     703                           "Options "+SAMBridge.PROP_DATAGRAM_HOST+" and "+SAMBridge.PROP_DATAGRAM_PORT+
     704                           " specify the listening ip\n" +
     705                           "range and the port of SAM datagram server. This server is\n" +
     706                           "only launched after a client creates the first SAM datagram\n" +
     707                           "or raw session, after a handshake with SAM version >= 3.0.\n" +
     708                           "\n" +
     709                           "The option loglevel=[DEBUG|WARN|ERROR|CRIT] can be used\n" +
     710                           "for tuning the log verbosity.");
    546711    }
    547712   
     
    622787        }
    623788    }
     789
     790    /** @since 0.9.24 */
     791    public void saveConfig() throws IOException {
     792        DataHelper.storeProps(i2cpProps, _configFile);
     793    }
    624794}
  • apps/sam/java/src/net/i2p/sam/SAMDatagramReceiver.java

    r8d7edaae r9ce8fce  
    2323     * @param sender Destination
    2424     * @param data Byte array to be received
     25     * @param proto I2CP protocol
     26     * @param fromPort I2CP from port
     27     * @param toPort I2CP to port
    2528     * @throws IOException
    2629     */
    27     public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException;
     30    public void receiveDatagramBytes(Destination sender, byte data[], int proto, int fromPort, int toPort) throws IOException;
    2831
    2932    /**
  • apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java

    r8d7edaae r9ce8fce  
    7979     * @param dest Destination
    8080     * @param data Bytes to be sent
     81     * @param proto ignored, will always use PROTO_DATAGRAM (17)
    8182     *
    8283     * @return True if the data was sent, false otherwise
     
    8485     * @throws I2PSessionException on serious error, probably session closed
    8586     */
    86     public boolean sendBytes(String dest, byte[] data) throws DataFormatException, I2PSessionException {
     87    public boolean sendBytes(String dest, byte[] data, int proto,
     88                             int fromPort, int toPort) throws DataFormatException, I2PSessionException {
    8789        if (data.length > DGRAM_SIZE_MAX)
    8890            throw new DataFormatException("Datagram size exceeded (" + data.length + ")");
     
    9193                dgram = dgramMaker.makeI2PDatagram(data);
    9294        }
    93         // TODO pass ports through
    94         return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM,
    95                                               I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
     95        return sendBytesThroughMessageSession(dest, dgram, I2PSession.PROTO_DATAGRAM, fromPort, toPort);
    9696    }
    9797
    98     protected void messageReceived(byte[] msg) {
     98    protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
    9999        byte[] payload;
    100100        Destination sender;
     
    107107        } catch (DataFormatException e) {
    108108            if (_log.shouldLog(Log.DEBUG)) {
    109                 _log.debug("Dropping ill-formatted I2P repliable datagram");
     109                _log.debug("Dropping ill-formatted I2P repliable datagram", e);
    110110            }
    111111            return;
    112112        } catch (I2PInvalidDatagramException e) {
    113113            if (_log.shouldLog(Log.DEBUG)) {
    114                 _log.debug("Dropping ill-signed I2P repliable datagram");
     114                _log.debug("Dropping ill-signed I2P repliable datagram", e);
    115115            }
    116116            return;
     
    118118
    119119        try {
    120             recv.receiveDatagramBytes(sender, payload);
     120            recv.receiveDatagramBytes(sender, payload, proto, fromPort, toPort);
    121121        } catch (IOException e) {
    122122            _log.error("Error forwarding message to receiver", e);
  • apps/sam/java/src/net/i2p/sam/SAMHandler.java

    r8d7edaae r9ce8fce  
    3636    protected final SocketChannel socket;
    3737
    38     protected final int verMajor;
    39     protected final int verMinor;
     38    public final int verMajor;
     39    public final int verMinor;
    4040   
    4141    /** I2CP options configuring the I2CP connection (port, host, numHops, etc) */
     
    103103    }
    104104   
    105     static public void writeBytes(ByteBuffer data, SocketChannel out) throws IOException {
     105    /**
     106     *  Caller must synch
     107     */
     108    private static void writeBytes(ByteBuffer data, SocketChannel out) throws IOException {
    106109        while (data.hasRemaining()) out.write(data);           
    107110        out.socket().getOutputStream().flush();
     
    133136    }
    134137
    135     /** @return success */
     138    /**
     139     * Unsynchronized, use with caution
     140     * @return success
     141     */
    136142    public static boolean writeString(String str, SocketChannel out)
    137143    {
     
    159165     */
    160166    public void stopHandling() {
     167        if (_log.shouldInfo())
     168            _log.info("Stopping: " + this, new Exception("I did it"));
    161169        synchronized (stopLock) {
    162170            stopHandler = true;
  • apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java

    r8d7edaae r9ce8fce  
    1919import net.i2p.data.DataHelper;
    2020import net.i2p.util.Log;
     21import net.i2p.util.PasswordManager;
    2122import net.i2p.util.VersionComparator;
    2223
     
    2627class SAMHandlerFactory {
    2728
    28     private static final String VERSION = "3.1";
     29    private static final String VERSION = "3.2";
    2930
    3031    private static final int HELLO_TIMEOUT = 60*1000;
     
    4647        try {
    4748            Socket sock = s.socket();
    48             sock.setSoTimeout(HELLO_TIMEOUT);
    4949            sock.setKeepAlive(true);
    50             String line = DataHelper.readLine(sock.getInputStream());
     50            StringBuilder buf = new StringBuilder(128);
     51            ReadLine.readLine(sock, buf, HELLO_TIMEOUT);
     52            String line = buf.toString();
    5153            sock.setSoTimeout(0);
    52             if (line == null) {
    53                 log.debug("Connection closed by client");
    54                 return null;
    55             }
    5654            tok = new StringTokenizer(line.trim(), " ");
    5755        } catch (SocketTimeoutException e) {
     
    5957        } catch (IOException e) {
    6058            throw new SAMException("Error reading from socket", e);
    61         } catch (Exception e) {
     59        } catch (RuntimeException e) {
    6260            throw new SAMException("Unexpected error", e);
    6361        }
     
    9492            return null;
    9593        }
     94
     95        if (Boolean.parseBoolean(i2cpProps.getProperty(SAMBridge.PROP_AUTH))) {
     96            String user = props.getProperty("USER");
     97            String pw = props.getProperty("PASSWORD");
     98            if (user == null || pw == null)
     99                throw new SAMException("USER and PASSWORD required");
     100            String savedPW = i2cpProps.getProperty(SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX);
     101            if (savedPW == null)
     102                throw new SAMException("Authorization failed");
     103            PasswordManager pm = new PasswordManager(I2PAppContext.getGlobalContext());
     104            if (!pm.checkHash(savedPW, pw))
     105                throw new SAMException("Authorization failed");
     106        }
     107
    96108        // Let's answer positively
    97109        if (!SAMHandler.writeString("HELLO REPLY RESULT=OK VERSION=" + ver + "\n", s))
     
    132144            VersionComparator.comp(VERSION, maxVer) <= 0)
    133145            return VERSION;
     146        if (VersionComparator.comp("3.1", minVer) >= 0 &&
     147            VersionComparator.comp("3.1", maxVer) <= 0)
     148            return "3.1";
    134149        // in VersionComparator, "3" < "3.0" so
    135150        // use comparisons carefully
  • apps/sam/java/src/net/i2p/sam/SAMMessageSession.java

    r8d7edaae r9ce8fce  
    2020import net.i2p.client.I2PSession;
    2121import net.i2p.client.I2PSessionException;
    22 import net.i2p.client.I2PSessionListener;
     22import net.i2p.client.I2PSessionMuxedListener;
     23import net.i2p.client.SendMessageOptions;
    2324import net.i2p.data.Base64;
    2425import net.i2p.data.DataFormatException;
    2526import net.i2p.data.Destination;
    26 import net.i2p.util.HexDump;
     27//import net.i2p.util.HexDump;
    2728import net.i2p.util.I2PAppThread;
    2829import net.i2p.util.Log;
     
    9899     * @throws I2PSessionException on serious error, probably session closed
    99100     */
    100     public abstract boolean sendBytes(String dest, byte[] data) throws DataFormatException, I2PSessionException;
     101    public abstract boolean sendBytes(String dest, byte[] data, int proto,
     102                                      int fromPort, int toPort) throws DataFormatException, I2PSessionException;
    101103
    102104    /**
     
    127129
    128130    /**
     131     * Actually send bytes through the SAM message-based session I2PSession.
     132     * TODO unused, umimplemented in the sessions and handlers
     133     *
     134     * @param dest Destination
     135     * @param data Bytes to be sent
     136     * @param proto I2CP protocol
     137     * @param fromPort I2CP from port
     138     * @param toPort I2CP to port
     139     *
     140     * @return True if the data was sent, false otherwise
     141     * @throws DataFormatException on unknown / bad dest
     142     * @throws I2PSessionException on serious error, probably session closed
     143     * @since 0.9.24
     144     */
     145    protected boolean sendBytesThroughMessageSession(String dest, byte[] data,
     146                                        int proto, int fromPort, int toPort,
     147                                        boolean sendLeaseSet, int sendTags,
     148                                        int tagThreshold, long expires)
     149                                        throws DataFormatException, I2PSessionException {
     150        Destination d = SAMUtils.getDest(dest);
     151
     152        if (_log.shouldLog(Log.DEBUG)) {
     153            _log.debug("Sending " + data.length + " bytes to " + dest);
     154        }
     155        SendMessageOptions opts = new SendMessageOptions();
     156        opts.setSendLeaseSet(sendLeaseSet);
     157        opts.setTagsToSend(sendTags);
     158        opts.setTagThreshold(tagThreshold);
     159        opts.setDate(expires);
     160
     161        return session.sendMessage(d, data, 0, data.length, proto, fromPort, toPort, opts);
     162    }
     163
     164    /**
    129165     * Close a SAM message-based session.
    130166     */
     
    137173     * @param msg Message payload
    138174     */
    139     protected abstract void messageReceived(byte[] msg);
     175    protected abstract void messageReceived(byte[] msg, int proto, int fromPort, int toPort);
    140176   
    141177    /**
     
    159195     * @author human
    160196     */
    161     class SAMMessageSessionHandler implements Runnable, I2PSessionListener {
     197    class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
    162198
    163199        private final Object runningLock = new Object();
     
    188224                _log.debug("I2P session connected");
    189225
    190             session.setSessionListener(this);
     226            session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
    191227        }
    192228
     
    219255           
    220256            shutDown();
     257            session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
    221258           
    222259            try {
     
    244281        }
    245282           
    246         public void messageAvailable(I2PSession session, int msgId, long size){
     283        public void messageAvailable(I2PSession session, int msgId, long size) {
     284            messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED,
     285                             I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
     286        }
     287
     288        /** @since 0.9.24 */
     289        public void messageAvailable(I2PSession session, int msgId, long size,
     290                                     int proto, int fromPort, int toPort) {
     291
    247292            if (_log.shouldLog(Log.DEBUG)) {
    248293                _log.debug("I2P message available (id: " + msgId
     
    253298                if (msg == null)
    254299                    return;
    255                 if (_log.shouldLog(Log.DEBUG)) {
    256                     _log.debug("Content of message " + msgId + ":\n"
    257                                + HexDump.dump(msg));
    258                 }
     300                //if (_log.shouldLog(Log.DEBUG)) {
     301                //    _log.debug("Content of message " + msgId + ":\n"
     302                //               + HexDump.dump(msg));
     303                //}
    259304               
    260                 messageReceived(msg);
     305                messageReceived(msg, proto, fromPort, toPort);
    261306            } catch (I2PSessionException e) {
    262307                _log.error("Error fetching I2P message", e);
  • apps/sam/java/src/net/i2p/sam/SAMRawReceiver.java

    r8d7edaae r9ce8fce  
    2121     *
    2222     * @param data Byte array to be received
     23     * @param proto I2CP protocol
     24     * @param fromPort I2CP from port
     25     * @param toPort I2CP to port
    2326     * @throws IOException
    2427     */
    25     public void receiveRawBytes(byte data[]) throws IOException;
     28    public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException;
    2629
    2730    /**
  • apps/sam/java/src/net/i2p/sam/SAMRawSession.java

    r8d7edaae r9ce8fce  
    6868     *
    6969     * @param data Bytes to be sent
     70     * @param proto if 0, will use PROTO_DATAGRAM_RAW (18)
    7071     *
    7172     * @return True if the data was sent, false otherwise
     
    7374     * @throws I2PSessionException on serious error, probably session closed
    7475     */
    75     public boolean sendBytes(String dest, byte[] data) throws DataFormatException, I2PSessionException {
     76    public boolean sendBytes(String dest, byte[] data, int proto,
     77                             int fromPort, int toPort) throws DataFormatException, I2PSessionException {
    7678        if (data.length > RAW_SIZE_MAX)
    7779            throw new DataFormatException("Data size limit exceeded (" + data.length + ")");
    78         // TODO pass ports through
    79         return sendBytesThroughMessageSession(dest, data, I2PSession.PROTO_DATAGRAM_RAW,
    80                                               I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
     80        if (proto == I2PSession.PROTO_UNSPECIFIED)
     81            proto = I2PSession.PROTO_DATAGRAM_RAW;
     82        return sendBytesThroughMessageSession(dest, data, proto, fromPort, toPort);
    8183    }
    8284
    83     protected void messageReceived(byte[] msg) {
     85    protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
    8486        try {
    85             recv.receiveRawBytes(msg);
     87            recv.receiveRawBytes(msg, proto, fromPort, toPort);
    8688        } catch (IOException e) {
    8789            _log.error("Error forwarding message to receiver", e);
  • apps/sam/java/src/net/i2p/sam/SAMv1Handler.java

    r8d7edaae r9ce8fce  
    2424import net.i2p.I2PException;
    2525import net.i2p.client.I2PClient;
     26import net.i2p.client.I2PSession;
    2627import net.i2p.client.I2PSessionException;
    2728import net.i2p.crypto.SigType;
     
    187188            if (_log.shouldLog(Log.DEBUG))
    188189                _log.debug("Caught IOException for message [" + msg + "]", e);
    189         } catch (Exception e) {
     190        } catch (SAMException e) {
     191            _log.error("Unexpected exception for message [" + msg + "]", e);
     192        } catch (RuntimeException e) {
    190193            _log.error("Unexpected exception for message [" + msg + "]", e);
    191194        } finally {
     
    439442
    440443            int size;
    441             {
    442                 String strsize = props.getProperty("SIZE");
    443                 if (strsize == null) {
    444                     if (_log.shouldLog(Log.DEBUG))
    445                         _log.debug("Size not specified in DATAGRAM SEND message");
    446                     return false;
    447                 }
     444            String strsize = props.getProperty("SIZE");
     445            if (strsize == null) {
     446                if (_log.shouldLog(Log.WARN))
     447                    _log.warn("Size not specified in DATAGRAM SEND message");
     448                return false;
     449            }
     450            try {
     451                size = Integer.parseInt(strsize);
     452            } catch (NumberFormatException e) {
     453                if (_log.shouldLog(Log.WARN))
     454                    _log.warn("Invalid DATAGRAM SEND size specified: " + strsize);
     455                return false;
     456            }
     457            if (!checkDatagramSize(size)) {
     458                if (_log.shouldLog(Log.WARN))
     459                     _log.warn("Specified size (" + size
     460                           + ") is out of protocol limits");
     461                return false;
     462            }
     463            int proto = I2PSession.PROTO_DATAGRAM;
     464            int fromPort = I2PSession.PORT_UNSPECIFIED;
     465            int toPort = I2PSession.PORT_UNSPECIFIED;
     466            String s = props.getProperty("FROM_PORT");
     467            if (s != null) {
    448468                try {
    449                     size = Integer.parseInt(strsize);
     469                    fromPort = Integer.parseInt(s);
    450470                } catch (NumberFormatException e) {
    451                     if (_log.shouldLog(Log.DEBUG))
    452                         _log.debug("Invalid DATAGRAM SEND size specified: " + strsize);
    453                     return false;
    454                 }
    455                 if (!checkDatagramSize(size)) {
    456                     if (_log.shouldLog(Log.DEBUG))
    457                          _log.debug("Specified size (" + size
    458                                + ") is out of protocol limits");
    459                     return false;
     471                    if (_log.shouldLog(Log.WARN))
     472                        _log.warn("Invalid DATAGRAM SEND port specified: " + s);
     473                }
     474            }
     475            s = props.getProperty("TO_PORT");
     476            if (s != null) {
     477                try {
     478                    toPort = Integer.parseInt(s);
     479                } catch (NumberFormatException e) {
     480                    if (_log.shouldLog(Log.WARN))
     481                        _log.warn("Invalid RAW SEND port specified: " + s);
    460482                }
    461483            }
     
    467489                in.readFully(data);
    468490
    469                 if (!getDatagramSession().sendBytes(dest, data)) {
     491                if (!getDatagramSession().sendBytes(dest, data, proto, fromPort, toPort)) {
    470492                    _log.error("DATAGRAM SEND failed");
    471493                    // a message send failure is no reason to drop the SAM session
     
    524546
    525547            int size;
    526             {
    527                 String strsize = props.getProperty("SIZE");
    528                 if (strsize == null) {
    529                     if (_log.shouldLog(Log.DEBUG))
    530                         _log.debug("Size not specified in RAW SEND message");
    531                     return false;
    532                 }
     548            String strsize = props.getProperty("SIZE");
     549            if (strsize == null) {
     550                if (_log.shouldLog(Log.WARN))
     551                    _log.warn("Size not specified in RAW SEND message");
     552                return false;
     553            }
     554            try {
     555                size = Integer.parseInt(strsize);
     556            } catch (NumberFormatException e) {
     557                if (_log.shouldLog(Log.WARN))
     558                    _log.warn("Invalid RAW SEND size specified: " + strsize);
     559                return false;
     560            }
     561            if (!checkSize(size)) {
     562                if (_log.shouldLog(Log.WARN))
     563                    _log.warn("Specified size (" + size
     564                           + ") is out of protocol limits");
     565                return false;
     566            }
     567            int proto = I2PSession.PROTO_DATAGRAM_RAW;
     568            int fromPort = I2PSession.PORT_UNSPECIFIED;
     569            int toPort = I2PSession.PORT_UNSPECIFIED;
     570            String s = props.getProperty("PROTOCOL");
     571            if (s != null) {
    533572                try {
    534                     size = Integer.parseInt(strsize);
     573                    proto = Integer.parseInt(s);
    535574                } catch (NumberFormatException e) {
    536                     if (_log.shouldLog(Log.DEBUG))
    537                         _log.debug("Invalid RAW SEND size specified: " + strsize);
    538                     return false;
    539                 }
    540                 if (!checkSize(size)) {
    541                     if (_log.shouldLog(Log.DEBUG))
    542                         _log.debug("Specified size (" + size
    543                                + ") is out of protocol limits");
    544                     return false;
     575                    if (_log.shouldLog(Log.WARN))
     576                        _log.warn("Invalid RAW SEND protocol specified: " + s);
     577                }
     578            }
     579            s = props.getProperty("FROM_PORT");
     580            if (s != null) {
     581                try {
     582                    fromPort = Integer.parseInt(s);
     583                } catch (NumberFormatException e) {
     584                    if (_log.shouldLog(Log.WARN))
     585                        _log.warn("Invalid RAW SEND port specified: " + s);
     586                }
     587            }
     588            s = props.getProperty("TO_PORT");
     589            if (s != null) {
     590                try {
     591                    toPort = Integer.parseInt(s);
     592                } catch (NumberFormatException e) {
     593                    if (_log.shouldLog(Log.WARN))
     594                        _log.warn("Invalid RAW SEND port specified: " + s);
    545595                }
    546596            }
     
    552602                in.readFully(data);
    553603
    554                 if (!getRawSession().sendBytes(dest, data)) {
     604                if (!getRawSession().sendBytes(dest, data, proto, fromPort, toPort)) {
    555605                    _log.error("RAW SEND failed");
    556606                    // a message send failure is no reason to drop the SAM session
     
    797847   
    798848    // SAMRawReceiver implementation
    799     public void receiveRawBytes(byte data[]) throws IOException {
     849    public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException {
    800850        if (getRawSession() == null) {
    801851            _log.error("BUG! Received raw bytes, but session is null!");
     
    803853        }
    804854
    805         ByteArrayOutputStream msg = new ByteArrayOutputStream();
    806 
    807         String msgText = "RAW RECEIVED SIZE=" + data.length + "\n";
     855        ByteArrayOutputStream msg = new ByteArrayOutputStream(64 + data.length);
     856
     857        String msgText = "RAW RECEIVED SIZE=" + data.length;
    808858        msg.write(DataHelper.getASCII(msgText));
     859        if ((verMajor == 3 && verMinor >= 2) || verMajor > 3) {
     860            msgText = " PROTOCOL=" + proto + " FROM_PORT=" + fromPort + " TO_PORT=" + toPort;
     861            msg.write(DataHelper.getASCII(msgText));
     862        }
     863        msg.write((byte) '\n');
    809864        msg.write(data);
    810865       
     
    833888
    834889    // SAMDatagramReceiver implementation
    835     public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
     890    public void receiveDatagramBytes(Destination sender, byte data[], int proto,
     891                                     int fromPort, int toPort) throws IOException {
    836892        if (getDatagramSession() == null) {
    837893            _log.error("BUG! Received datagram bytes, but session is null!");
     
    839895        }
    840896
    841         ByteArrayOutputStream msg = new ByteArrayOutputStream();
     897        ByteArrayOutputStream msg = new ByteArrayOutputStream(100 + data.length);
    842898
    843899        String msgText = "DATAGRAM RECEIVED DESTINATION=" + sender.toBase64()
    844                          + " SIZE=" + data.length + "\n";
     900                         + " SIZE=" + data.length;
    845901        msg.write(DataHelper.getASCII(msgText));
     902        if ((verMajor == 3 && verMinor >= 2) || verMajor > 3) {
     903            msgText = " FROM_PORT=" + fromPort + " TO_PORT=" + toPort;
     904            msg.write(DataHelper.getASCII(msgText));
     905        }
     906        msg.write((byte) '\n');
    846907       
    847908        if (_log.shouldLog(Log.DEBUG))
  • apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java

    r8d7edaae r9ce8fce  
    2222       
    2323        private final SAMv3Handler handler;
    24         private final SAMv3Handler.DatagramServer server;
     24        private final SAMv3DatagramServer server;
    2525        private final String nick;
    2626        private final SocketAddress clientAddress;
     
    3131         *   build a DatagramSession according to informations registered
    3232         *   with the given nickname
     33         *
    3334         * @param nick nickname of the session
    3435         * @throws IOException
     
    3637         * @throws I2PSessionException
    3738         */
    38         public SAMv3DatagramSession(String nick)
    39         throws IOException, DataFormatException, I2PSessionException, SAMException {
    40                
     39        public SAMv3DatagramSession(String nick, SAMv3DatagramServer dgServer)
     40                        throws IOException, DataFormatException, I2PSessionException, SAMException {
    4141                super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
    4242                                SAMv3Handler.sSessionsHash.get(nick).getProps(),
    4343                                null  // to be replaced by this
    4444                                );
    45                 this.nick = nick ;
    46                 this.recv = this ;  // replacement
    47                 this.server = SAMv3Handler.DatagramServer.getInstance() ;
     45                this.nick = nick;
     46                this.recv = this;  // replacement
     47                this.server = dgServer;
    4848
    4949                SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    50         if ( rec==null ) throw new SAMException("Record disappeared for nickname : \""+nick+"\"") ;
     50                if (rec == null)
     51                        throw new SAMException("Record disappeared for nickname : \""+nick+"\"");
    5152
    52         this.handler = rec.getHandler();
     53                this.handler = rec.getHandler();
    5354               
    54         Properties props = rec.getProps();
    55         String portStr = props.getProperty("PORT") ;
    56         if ( portStr==null ) {
    57                 _log.debug("receiver port not specified. Current socket will be used.");
    58                 this.clientAddress = null;
    59         }
    60         else {
    61                 int port = Integer.parseInt(portStr);
    62        
    63                 String host = props.getProperty("HOST");
    64                 if ( host==null ) {             
    65                         host = rec.getHandler().getClientIP();
    66                         _log.debug("no host specified. Taken from the client socket : " + host+':'+port);
    67                 }
    68 
    69        
    70                 this.clientAddress = new InetSocketAddress(host,port);
    71         }
     55                Properties props = rec.getProps();
     56                String portStr = props.getProperty("PORT");
     57                if (portStr == null) {
     58                        if (_log.shouldDebug())
     59                                _log.debug("receiver port not specified. Current socket will be used.");
     60                        this.clientAddress = null;
     61                } else {
     62                        int port = Integer.parseInt(portStr);
     63                        String host = props.getProperty("HOST");
     64                        if (host == null) {             
     65                                host = rec.getHandler().getClientIP();
     66                                if (_log.shouldDebug())
     67                                        _log.debug("no host specified. Taken from the client socket : " + host+':'+port);
     68                        }
     69                        this.clientAddress = new InetSocketAddress(host, port);
     70                }
    7271        }
    7372
    74         public void receiveDatagramBytes(Destination sender, byte[] data) throws IOException {
     73        public void receiveDatagramBytes(Destination sender, byte[] data, int proto,
     74                                         int fromPort, int toPort) throws IOException {
    7575                if (this.clientAddress==null) {
    76                         this.handler.receiveDatagramBytes(sender, data);
     76                        this.handler.receiveDatagramBytes(sender, data, proto, fromPort, toPort);
    7777                } else {
    78                         String msg = sender.toBase64()+"\n";
     78                        StringBuilder buf = new StringBuilder(600);
     79                        buf.append(sender.toBase64());
     80                        if ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) {
     81                                buf.append(" FROM_PORT=").append(fromPort).append(" TO_PORT=").append(toPort);
     82                        }
     83                        buf.append('\n');
     84                        String msg = buf.toString();
    7985                        ByteBuffer msgBuf = ByteBuffer.allocate(msg.length()+data.length);
    8086                        msgBuf.put(DataHelper.getASCII(msg));
  • apps/sam/java/src/net/i2p/sam/SAMv3Handler.java

    r8d7edaae r9ce8fce  
    1111
    1212import java.io.ByteArrayOutputStream;
    13 import java.io.ByteArrayInputStream;
    1413import java.io.IOException;
    1514import java.io.InputStream;
     
    1716import java.net.ConnectException;
    1817import java.net.InetSocketAddress;
     18import java.net.Socket;
    1919import java.net.SocketAddress;
     20import java.net.SocketException;
     21import java.net.SocketTimeoutException;
    2022import java.net.NoRouteToHostException;
    21 import java.nio.channels.DatagramChannel;
    2223import java.nio.channels.SocketChannel;
    2324import java.nio.ByteBuffer;
     
    2930import net.i2p.I2PException;
    3031import net.i2p.client.I2PClient;
     32import net.i2p.client.I2PSession;
    3133import net.i2p.client.I2PSessionException;
    3234import net.i2p.crypto.SigType;
     
    3739import net.i2p.util.Log;
    3840import net.i2p.util.I2PAppThread;
     41import net.i2p.util.PasswordManager;
    3942
    4043/**
     
    5154        private volatile boolean stolenSocket;
    5255        private volatile boolean streamForwardingSocket;
    53 
     56        private final boolean sendPorts;
     57        private long _lastPing;
     58        private static final int READ_TIMEOUT = 3*60*1000;
    5459       
    5560        interface Session {
    5661                String getNick();
    5762                void close();
    58                 boolean sendBytes(String dest, byte[] data) throws DataFormatException, I2PSessionException;
     63                boolean sendBytes(String dest, byte[] data, int proto,
     64                                  int fromPort, int toPort) throws DataFormatException, I2PSessionException;
    5965        }
    6066       
     
    8995        {
    9096                super(s, verMajor, verMinor, i2cpProps, parent);
     97                sendPorts = (verMajor == 3 && verMinor >= 2) || verMajor > 3;
    9198                if (_log.shouldLog(Log.DEBUG))
    9299                        _log.debug("SAM version 3 handler instantiated");
     
    97104        {
    98105                return (verMajor == 3);
    99         }
    100 
    101         public static class DatagramServer  {
    102                
    103                 private static DatagramServer _instance;
    104                 private static DatagramChannel server;
    105                
    106                 public static DatagramServer getInstance() throws IOException {
    107                         return getInstance(new Properties());
    108                 }
    109                
    110                 public static DatagramServer getInstance(Properties props) throws IOException {
    111                         synchronized(DatagramServer.class) {
    112                                 if (_instance==null)
    113                                         _instance = new DatagramServer(props);
    114                                 return _instance ;
    115                         }
    116                 }
    117                
    118                 public DatagramServer(Properties props) throws IOException {
    119                         synchronized(DatagramServer.class) {
    120                                 if (server==null)
    121                                         server = DatagramChannel.open();
    122                         }
    123                        
    124                         String host = props.getProperty(SAMBridge.PROP_DATAGRAM_HOST, SAMBridge.DEFAULT_DATAGRAM_HOST);
    125                         String portStr = props.getProperty(SAMBridge.PROP_DATAGRAM_PORT, SAMBridge.DEFAULT_DATAGRAM_PORT);
    126                         int port ;
    127                         try {
    128                                 port = Integer.parseInt(portStr);
    129                         } catch (NumberFormatException e) {
    130                                 port = Integer.parseInt(SAMBridge.DEFAULT_DATAGRAM_PORT);
    131                         }
    132                        
    133                         server.socket().bind(new InetSocketAddress(host, port));
    134                         new I2PAppThread(new Listener(server), "DatagramListener").start();
    135                 }
    136                
    137                 public void send(SocketAddress addr, ByteBuffer msg) throws IOException {
    138                         server.send(msg, addr);
    139                 }
    140                
    141                 static class Listener implements Runnable {
    142                        
    143                         private final DatagramChannel server;
    144                        
    145                         public Listener(DatagramChannel server)
    146                         {
    147                                 this.server = server ;
    148                         }
    149                         public void run()
    150                         {
    151                                 ByteBuffer inBuf = ByteBuffer.allocateDirect(SAMRawSession.RAW_SIZE_MAX+1024);
    152                                
    153                                 while (!Thread.interrupted())
    154                                 {
    155                                         inBuf.clear();
    156                                         try {
    157                                                 server.receive(inBuf);
    158                                         } catch (IOException e) {
    159                                                 break ;
    160                                         }
    161                                         inBuf.flip();
    162                                         ByteBuffer outBuf = ByteBuffer.wrap(new byte[inBuf.remaining()]);
    163                                         outBuf.put(inBuf);
    164                                         outBuf.flip();
    165                                         // A new thread for every message is wildly inefficient...
    166                                         //new I2PAppThread(new MessageDispatcher(outBuf.array()), "MessageDispatcher").start();
    167                                         // inline
    168                                         // Even though we could be sending messages through multiple sessions,
    169                                         // that isn't a common use case, and blocking should be rare.
    170                                         // Inside router context, I2CP drops on overflow.
    171                                         (new MessageDispatcher(outBuf.array())).run();
    172                                 }
    173                         }
    174                 }
    175         }
    176 
    177         private static class MessageDispatcher implements Runnable
    178         {
    179                 private final ByteArrayInputStream is;
    180                
    181                 public MessageDispatcher(byte[] buf)
    182                 {
    183                         this.is = new java.io.ByteArrayInputStream(buf) ;
    184                 }
    185                
    186                 public void run() {
    187                         try {
    188                                 String header = DataHelper.readLine(is).trim();
    189                                 StringTokenizer tok = new StringTokenizer(header, " ");
    190                                 if (tok.countTokens() != 3) {
    191                                         // This is not a correct message, for sure
    192                                         //_log.debug("Error in message format");
    193                                         // FIXME log? throw?
    194                                         return;
    195                                 }
    196                                 String version = tok.nextToken();
    197                                 if (!"3.0".equals(version)) return ;
    198                                 String nick = tok.nextToken();
    199                                 String dest = tok.nextToken();
    200 
    201                                 byte[] data = new byte[is.available()];
    202                                 is.read(data);
    203                                 SessionRecord rec = sSessionsHash.get(nick);
    204                                 if (rec!=null) {
    205                                         rec.getHandler().session.sendBytes(dest,data);
    206                                 } else {
    207                                         Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMv3Handler.class);
    208                                         if (log.shouldLog(Log.WARN))
    209                                                 log.warn("Dropping datagram, no session for " + nick);
    210                                 }
    211                         } catch (Exception e) {
    212                                 Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMv3Handler.class);
    213                                 if (log.shouldLog(Log.WARN))
    214                                         log.warn("Error handling datagram", e);
    215                         }
    216                 }
    217106        }
    218107       
     
    343232        {
    344233                stolenSocket = true ;
     234                if (sendPorts) {
     235                        try {
     236                               socket.socket().setSoTimeout(0);
     237                        } catch (SocketException se) {}
     238                }
    345239                this.stopHandling();
    346240        }
     
    354248        }
    355249       
     250        /**
     251         *  For SAMv3DatagramServer
     252         *  @return may be null
     253         *  @since 0.9.24
     254         */
     255        Session getSession() {
     256                return session;
     257        }
     258       
     259        @Override
    356260        public void handle() {
    357261                String msg = null;
     
    367271
    368272                try {
    369                         InputStream in = getClientSocket().socket().getInputStream();
    370 
     273                        Socket socket = getClientSocket().socket();
     274                        InputStream in = socket.getInputStream();
     275
     276                        StringBuilder buf = new StringBuilder(1024);
    371277                        while (true) {
    372278                                if (shouldStop()) {
     
    375281                                        break;
    376282                                }
    377                                 String line = DataHelper.readLine(in) ;
     283                                String line;
     284                                if (sendPorts) {
     285                                        // client supports PING
     286                                        try {
     287                                                ReadLine.readLine(socket, buf, READ_TIMEOUT);
     288                                                line = buf.toString();
     289                                                buf.setLength(0);                                       
     290                                        } catch (SocketTimeoutException ste) {
     291                                                long now = System.currentTimeMillis();
     292                                                if (buf.length() <= 0) {
     293                                                        if (_lastPing > 0) {
     294                                                                if (now - _lastPing >= READ_TIMEOUT) {
     295                                                                        if (_log.shouldWarn())
     296                                                                                _log.warn("Failed to respond to PING");
     297                                                                        writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
     298                                                                        break;
     299                                                                }
     300                                                        } else {
     301                                                                if (_log.shouldDebug())
     302                                                                        _log.debug("Sendng PING " + now);
     303                                                                _lastPing = now;
     304                                                                if (!writeString("PING " + now + '\n'))
     305                                                                        break;
     306                                                        }
     307                                                } else {
     308                                                        if (_lastPing > 0) {
     309                                                                if (now - _lastPing >= 2*READ_TIMEOUT) {
     310                                                                        if (_log.shouldWarn())
     311                                                                                _log.warn("Failed to respond to PING");
     312                                                                        writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
     313                                                                        break;
     314                                                                }
     315                                                        } else if (_lastPing < 0) {
     316                                                                if (_log.shouldWarn())
     317                                                                        _log.warn("2nd timeout");
     318                                                                writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
     319                                                                break;
     320                                                        } else {
     321                                                                // don't clear buffer, don't send ping,
     322                                                                // go around again
     323                                                                _lastPing = -1;
     324                                                                if (_log.shouldWarn())
     325                                                                        _log.warn("timeout after partial: " + buf);
     326                                                        }
     327                                                }
     328                                                if (_log.shouldDebug())
     329                                                        _log.debug("loop after timeout");
     330                                                continue;
     331                                        }
     332                                } else {
     333                                        buf.setLength(0);                                       
     334                                        if (DataHelper.readLine(in, buf))
     335                                                line = buf.toString();
     336                                        else
     337                                                line = null;
     338                                }
    378339                                if (line==null) {
    379340                                        if (_log.shouldLog(Log.DEBUG))
     
    395356
    396357                                tok = new StringTokenizer(msg, " ");
    397                                 if (tok.countTokens() < 2) {
     358                                int count = tok.countTokens();
     359                                if (count <= 0) {
    398360                                        // This is not a correct message, for sure
    399361                                        if (_log.shouldLog(Log.DEBUG))
    400                                                 _log.debug("Error in message format");
     362                                                _log.debug("Ignoring whitespace");
     363                                        continue;
     364                                }
     365                                domain = tok.nextToken();
     366                                // these may not have a second token
     367                                if (domain.equals("PING")) {
     368                                        execPingMessage(tok);
     369                                        continue;
     370                                } else if (domain.equals("PONG")) {
     371                                        execPongMessage(tok);
     372                                        continue;
     373                                } else if (domain.equals("QUIT") || domain.equals("STOP") ||
     374                                           domain.equals("EXIT")) {
     375                                        writeString(domain + " STATUS RESULT=OK MESSAGE=bye\n");
    401376                                        break;
    402377                                }
    403                                 domain = tok.nextToken();
     378                                if (count <= 1) {
     379                                        // This is not a correct message, for sure
     380                                        if (writeString(domain + " STATUS RESULT=I2P_ERROR MESSAGE=\"command not specified\"\n"))
     381                                                continue;
     382                                        else
     383                                                break;
     384                                }
    404385                                opcode = tok.nextToken();
    405386                                if (_log.shouldLog(Log.DEBUG)) {
     
    425406                                        // TODO not yet overridden, ID is ignored, most recent RAW session is used
    426407                                        canContinue = execRawMessage(opcode, props);
     408                                } else if (domain.equals("AUTH")) {
     409                                        canContinue = execAuthMessage(opcode, props);
    427410                                } else {
    428411                                        if (_log.shouldLog(Log.DEBUG))
     
    435418                                        break;
    436419                                }
    437                         }
     420                        } // while
    438421                } catch (IOException e) {
    439422                        if (_log.shouldLog(Log.DEBUG))
    440                                 _log.debug("Caught IOException for message [" + msg + "]", e);
    441                 } catch (Exception e) {
    442                         _log.error("Unexpected exception for message [" + msg + "]", e);
     423                                _log.debug("Caught IOException in handler", e);
     424                } catch (SAMException e) {
     425                        _log.error("Unexpected exception for message [" + msg + ']', e);
     426                } catch (RuntimeException e) {
     427                        _log.error("Unexpected exception for message [" + msg + ']', e);
    443428                } finally {
    444429                        if (_log.shouldLog(Log.DEBUG))
     
    482467        @Override
    483468        public void stopHandling() {
     469            if (_log.shouldInfo())
     470                _log.info("Stopping (stolen? " + stolenSocket + "): " + this, new Exception("I did it"));
    484471            synchronized (stopLock) {
    485472                stopHandler = true;
     
    610597
    611598                                if (style.equals("RAW")) {
    612                                         DatagramServer.getInstance(i2cpProps);
    613                                         SAMv3RawSession v3 = newSAMRawSession(nick);
     599                                        SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
     600                                        SAMv3RawSession v3 = new SAMv3RawSession(nick, dgs);
    614601                                        rawSession = v3;
    615602                                        this.session = v3;
    616603                                } else if (style.equals("DATAGRAM")) {
    617                                         DatagramServer.getInstance(i2cpProps);
    618                                         SAMv3DatagramSession v3 = newSAMDatagramSession(nick);
     604                                        SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
     605                                        SAMv3DatagramSession v3 = new SAMv3DatagramSession(nick, dgs);
    619606                                        datagramSession = v3;
    620607                                        this.session = v3;
     
    670657        }
    671658
    672         private static SAMv3RawSession newSAMRawSession(String login )
    673                         throws IOException, DataFormatException, SAMException, I2PSessionException
    674         {
    675                 return new SAMv3RawSession( login ) ;
    676         }
    677 
    678         private static SAMv3DatagramSession newSAMDatagramSession(String login )
    679                         throws IOException, DataFormatException, SAMException, I2PSessionException
    680         {
    681                 return new SAMv3DatagramSession( login ) ;
    682         }
    683 
    684659        /* Parse and execute a STREAM message */
    685660        @Override
     
    758733        @Override
    759734        protected boolean execStreamConnect( Properties props) {
     735                // Messages are NOT sent if SILENT=true,
     736                // The specs said that they were.
     737                boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
    760738                try {
    761739                        if (props.isEmpty()) {
    762                                 notifyStreamResult(true,"I2P_ERROR","No parameters specified in STREAM CONNECT message");
     740                                notifyStreamResult(verbose, "I2P_ERROR","No parameters specified in STREAM CONNECT message");
    763741                                if (_log.shouldLog(Log.DEBUG))
    764742                                        _log.debug("No parameters specified in STREAM CONNECT message");
    765743                                return false;
    766744                        }
    767                         boolean verbose = props.getProperty("SILENT","false").equals("false");
    768745               
    769746                        String dest = props.getProperty("DESTINATION");
     
    805782        }
    806783
    807         protected boolean execStreamForwardIncoming( Properties props ) {
     784        private boolean execStreamForwardIncoming( Properties props ) {
     785                // Messages ARE sent if SILENT=true,
     786                // which is different from CONNECT and ACCEPT.
     787                // But this matched the specs.
    808788                try {
    809789                        try {
    810790                                streamForwardingSocket = true ;
    811                                 ((SAMv3StreamSession)streamSession).startForwardingIncoming(props);
     791                                ((SAMv3StreamSession)streamSession).startForwardingIncoming(props, sendPorts);
    812792                                notifyStreamResult( true, "OK", null );
    813793                                return true ;
     
    822802        }
    823803
    824         protected boolean execStreamAccept( Properties props )
     804        private boolean execStreamAccept( Properties props )
    825805        {
    826                 boolean verbose = props.getProperty( "SILENT", "false").equals("false");
     806                // Messages are NOT sent if SILENT=true,
     807                // The specs said that they were.
     808                boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
    827809                try {
    828810                        try {
     
    859841        }
    860842
    861         public void notifyStreamIncomingConnection(Destination d) throws IOException {
     843        public void notifyStreamIncomingConnection(Destination d, int fromPort, int toPort) throws IOException {
    862844            if (getStreamSession() == null) {
    863845                _log.error("BUG! Received stream connection, but session is null!");
    864846                throw new NullPointerException("BUG! STREAM session is null!");
    865847            }
    866 
    867             if (!writeString(d.toBase64() + "\n")) {
     848            StringBuilder buf = new StringBuilder(600);
     849            buf.append(d.toBase64());
     850            if (sendPorts) {
     851                buf.append(" FROM_PORT=").append(fromPort).append(" TO_PORT=").append(toPort);
     852            }
     853            buf.append('\n');
     854            if (!writeString(buf.toString())) {
    868855                throw new IOException("Error notifying connection to SAM client");
    869856            }
     
    875862            }
    876863        }
    877 
     864       
     865        /** @since 0.9.24 */
     866        public static void notifyStreamIncomingConnection(SocketChannel client, Destination d,
     867                                                          int fromPort, int toPort) throws IOException {
     868            if (!writeString(d.toBase64() + " FROM_PORT=" + fromPort + " TO_PORT=" + toPort + '\n', client)) {
     869                throw new IOException("Error notifying connection to SAM client");
     870            }
     871        }
     872
     873        /** @since 0.9.24 */
     874        private boolean execAuthMessage(String opcode, Properties props) {
     875                if (opcode.equals("ENABLE")) {
     876                        i2cpProps.setProperty(SAMBridge.PROP_AUTH, "true");
     877                } else if (opcode.equals("DISABLE")) {
     878                        i2cpProps.setProperty(SAMBridge.PROP_AUTH, "false");
     879                } else if (opcode.equals("ADD")) {
     880                        String user = props.getProperty("USER");
     881                        String pw = props.getProperty("PASSWORD");
     882                        if (user == null || pw == null)
     883                                return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER and PASSWORD required\"\n");
     884                        String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
     885                        if (i2cpProps.containsKey(prop))
     886                                return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " already exists\"\n");
     887                        PasswordManager pm = new PasswordManager(I2PAppContext.getGlobalContext());
     888                        String shash = pm.createHash(pw);
     889                        i2cpProps.setProperty(prop, shash);
     890                } else if (opcode.equals("REMOVE")) {
     891                        String user = props.getProperty("USER");
     892                        if (user == null)
     893                                return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER required\"\n");
     894                        String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
     895                        if (!i2cpProps.containsKey(prop))
     896                                return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " not found\"\n");
     897                        i2cpProps.remove(prop);
     898                } else {
     899                        return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown AUTH command\"\n");
     900                }
     901                try {
     902                        bridge.saveConfig();
     903                        return writeString("AUTH STATUS RESULT=OK\n");
     904                } catch (IOException ioe) {
     905                        return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Config save failed: " + ioe + "\"\n");
     906                }
     907        }
     908
     909        /**
     910         * Handle a PING.
     911         * Send a PONG.
     912         * @since 0.9.24
     913         */
     914        private void execPingMessage(StringTokenizer tok) {
     915                StringBuilder buf = new StringBuilder();
     916                buf.append("PONG");
     917                while (tok.hasMoreTokens()) {
     918                        buf.append(' ').append(tok.nextToken());
     919                }
     920                buf.append('\n');
     921                writeString(buf.toString());
     922        }
     923
     924        /**
     925         * Handle a PONG.
     926         * @since 0.9.24
     927         */
     928        private void execPongMessage(StringTokenizer tok) {
     929                String s;
     930                if (tok.hasMoreTokens()) {
     931                        s = tok.nextToken();
     932                } else {
     933                        s = "";
     934                }
     935                if (_lastPing > 0) {
     936                        String expected = Long.toString(_lastPing);
     937                        if (expected.equals(s)) {
     938                                _lastPing = 0;
     939                                if (_log.shouldInfo())
     940                                        _log.warn("Got expected pong: " + s);
     941                        } else {
     942                                if (_log.shouldInfo())
     943                                        _log.warn("Got unexpected pong: " + s);
     944                        }
     945                } else {
     946                        if (_log.shouldWarn())
     947                                _log.warn("Pong received without a ping: " + s);
     948                }
     949        }
    878950}
    879951
  • apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java

    r8d7edaae r9ce8fce  
    1313import net.i2p.client.I2PSessionException;
    1414import net.i2p.data.DataFormatException;
     15import net.i2p.data.DataHelper;
    1516import net.i2p.util.Log;
    1617
     
    2324        private final String nick;
    2425        private final SAMv3Handler handler;
    25         private final SAMv3Handler.DatagramServer server;
     26        private final SAMv3DatagramServer server;
    2627        private final SocketAddress clientAddress;
     28        private final boolean _sendHeader;
    2729
    2830        public String getNick() { return nick; }
     
    3739         * @throws I2PSessionException
    3840         */
    39         public SAMv3RawSession(String nick)
    40         throws IOException, DataFormatException, I2PSessionException {
    41                
     41        public SAMv3RawSession(String nick, SAMv3DatagramServer dgServer)
     42                        throws IOException, DataFormatException, I2PSessionException {
    4243                super(SAMv3Handler.sSessionsHash.get(nick).getDest(),
    43                                 SAMv3Handler.sSessionsHash.get(nick).getProps(),
    44                                 SAMv3Handler.sSessionsHash.get(nick).getHandler()  // to be replaced by this
    45                                 );
     44                      SAMv3Handler.sSessionsHash.get(nick).getProps(),
     45                      SAMv3Handler.sSessionsHash.get(nick).getHandler()  // to be replaced by this
     46                );
    4647                this.nick = nick ;
    4748                this.recv = this ;  // replacement
    48                 this.server = SAMv3Handler.DatagramServer.getInstance() ;
     49                this.server = dgServer;
    4950
    5051                SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    51         if ( rec==null ) throw new InterruptedIOException() ;
    52 
    53         this.handler = rec.getHandler();
    54                
    55         Properties props = rec.getProps();
    56        
    57        
    58         String portStr = props.getProperty("PORT") ;
    59         if ( portStr==null ) {
    60                 if (_log.shouldLog(Log.DEBUG))
    61                         _log.debug("receiver port not specified. Current socket will be used.");
    62                 this.clientAddress = null;
    63         }
    64         else {
    65                 int port = Integer.parseInt(portStr);
    66        
    67                 String host = props.getProperty("HOST");
    68                 if ( host==null ) {
    69                         host = rec.getHandler().getClientIP();
    70 
     52                if (rec == null)
     53                        throw new InterruptedIOException() ;
     54                this.handler = rec.getHandler();
     55                Properties props = rec.getProps();
     56                String portStr = props.getProperty("PORT") ;
     57                if (portStr == null) {
    7158                        if (_log.shouldLog(Log.DEBUG))
    72                                 _log.debug("no host specified. Taken from the client socket : " + host +':'+port);
    73                 }
    74 
    75        
    76                 this.clientAddress = new InetSocketAddress(host,port);
    77         }
     59                                _log.debug("receiver port not specified. Current socket will be used.");
     60                        this.clientAddress = null;
     61                } else {
     62                        int port = Integer.parseInt(portStr);
     63                        String host = props.getProperty("HOST");
     64                        if ( host==null ) {
     65                                host = rec.getHandler().getClientIP();
     66                                if (_log.shouldLog(Log.DEBUG))
     67                                        _log.debug("no host specified. Taken from the client socket : " + host +':'+port);
     68                        }
     69                        this.clientAddress = new InetSocketAddress(host, port);
     70                }
     71                _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) &&
     72                              Boolean.parseBoolean(props.getProperty("HEADER"));
    7873        }
    7974       
    80         public void receiveRawBytes(byte[] data) throws IOException {
     75        public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException {
    8176                if (this.clientAddress==null) {
    82                         this.handler.receiveRawBytes(data);
     77                        this.handler.receiveRawBytes(data, proto, fromPort, toPort);
    8378                } else {
    84                         ByteBuffer msgBuf = ByteBuffer.allocate(data.length);
     79                        ByteBuffer msgBuf;
     80                        if (_sendHeader) {
     81                                StringBuilder buf = new StringBuilder(64);
     82                                buf.append("PROTOCOL=").append(proto)
     83                                   .append(" FROM_PORT=").append(fromPort)
     84                                   .append(" TO_PORT=").append(toPort)
     85                                   .append('\n');
     86                                String msg = buf.toString();
     87                                msgBuf = ByteBuffer.allocate(msg.length()+data.length);
     88                                msgBuf.put(DataHelper.getASCII(msg));
     89                        } else {
     90                                msgBuf = ByteBuffer.allocate(data.length);
     91                        }
    8592                        msgBuf.put(data);
    8693                        msgBuf.flip();
  • apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java

    r8d7edaae r9ce8fce  
    1212import java.io.InterruptedIOException;
    1313import java.net.ConnectException;
     14import java.net.InetSocketAddress;
    1415import java.net.NoRouteToHostException;
     16import java.net.SocketTimeoutException;
     17import java.nio.channels.Channels;
     18import java.nio.channels.ReadableByteChannel;
     19import java.nio.channels.WritableByteChannel;
     20import java.nio.ByteBuffer;
     21import java.nio.channels.SocketChannel;
     22import java.security.GeneralSecurityException;
    1523import java.util.Properties;
    16 
     24import java.util.concurrent.atomic.AtomicInteger;
     25
     26import javax.net.ssl.SSLException;
     27import javax.net.ssl.SSLSocket;
     28
     29import net.i2p.I2PAppContext;
    1730import net.i2p.I2PException;
    1831import net.i2p.client.streaming.I2PServerSocket;
     
    2235import net.i2p.data.Destination;
    2336import net.i2p.util.I2PAppThread;
     37import net.i2p.util.I2PSSLSocketFactory;
    2438import net.i2p.util.Log;
    25 import java.nio.channels.Channels;
    26 import java.nio.channels.ReadableByteChannel;
    27 import java.nio.channels.WritableByteChannel;
    28 import java.nio.ByteBuffer;
    29 import java.nio.channels.SocketChannel;
    3039
    3140/**
     
    4150               
    4251                private final Object socketServerLock = new Object();
     52                /** this is ONLY set for FORWARD, not for ACCEPT */
    4353                private I2PServerSocket socketServer;
     54                /** this is the count of active ACCEPT sockets */
     55                private final AtomicInteger _acceptors = new AtomicInteger();
     56
     57                private static I2PSSLSocketFactory _sslSocketFactory;
    4458       
    4559                private final String nick ;
     
    92106                        DataFormatException, InterruptedIOException, IOException {
    93107
    94                 boolean verbose = (props.getProperty("SILENT", "false").equals("false"));
     108                boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
    95109                Destination d = SAMUtils.getDest(dest);
    96110
     
    98112                if (props.getProperty(I2PSocketOptions.PROP_CONNECT_TIMEOUT) == null)
    99113                    opts.setConnectTimeout(60 * 1000);
     114                String fromPort = props.getProperty("FROM_PORT");
     115                if (fromPort != null) {
     116                    try {
     117                        opts.setLocalPort(Integer.parseInt(fromPort));
     118                    } catch (NumberFormatException nfe) {
     119                        throw new I2PException("Bad port " + fromPort);
     120                    }
     121                }
     122                String toPort = props.getProperty("TO_PORT");
     123                if (toPort != null) {
     124                    try {
     125                        opts.setPort(Integer.parseInt(toPort));
     126                    } catch (NumberFormatException nfe) {
     127                        throw new I2PException("Bad port " + toPort);
     128                    }
     129                }
    100130
    101131                if (_log.shouldLog(Log.DEBUG))
     
    130160            /**
    131161             * Accept a single incoming STREAM on the socket stolen from the handler.
     162             * As of version 3.2 (0.9.24), multiple simultaneous accepts are allowed.
     163             * Accepts and forwarding may not be done at the same time.
    132164             *
    133165             * @param handler The handler that communicates with the requesting client
     
    146178                throws I2PException, InterruptedIOException, IOException, SAMException {
    147179
    148                 synchronized( this.socketServerLock )
    149                 {
    150                         if (this.socketServer!=null) {
    151                                 if (_log.shouldLog(Log.DEBUG))
    152                                         _log.debug("a socket server is already defined for this destination");
    153                                 throw new SAMException("a socket server is already defined for this destination");
    154                         }
    155                         this.socketServer = this.socketMgr.getServerSocket();
    156                 }
    157                
    158                 I2PSocket i2ps = this.socketServer.accept();
    159 
    160                 synchronized( this.socketServerLock )
    161                 {
    162                         this.socketServer = null ;
    163                 }
    164                
     180                synchronized(this.socketServerLock) {
     181                        if (this.socketServer != null) {
     182                                if (_log.shouldWarn())
     183                                        _log.warn("a forwarding server is already defined for this destination");
     184                                throw new SAMException("a forwarding server is already defined for this destination");
     185                        }
     186                }
     187
     188                I2PSocket i2ps;
     189                _acceptors.incrementAndGet();
     190                try {
     191                        i2ps = socketMgr.getServerSocket().accept();
     192                } finally {
     193                        _acceptors.decrementAndGet();
     194                }
     195
    165196                SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    166197
    167198                if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
    168199
    169                 if (verbose)
    170                         handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
    171 
     200                if (verbose) {
     201                        handler.notifyStreamIncomingConnection(i2ps.getPeerDestination(),
     202                                                               i2ps.getPort(), i2ps.getLocalPort());
     203                }
    172204                handler.stealSocket() ;
    173205                ReadableByteChannel fromClient = handler.getClientSocket();
     
    186218
    187219           
    188             public void startForwardingIncoming( Properties props ) throws SAMException, InterruptedIOException
     220            /**
     221             *  Forward sockets from I2P to the host/port provided.
     222             *  Accepts and forwarding may not be done at the same time.
     223             */
     224            public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException
    189225            {
    190226                SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    191                 boolean verbose = props.getProperty("SILENT", "false").equals("false");
     227                boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT"));
    192228               
    193229                if ( rec==null ) throw new InterruptedIOException() ;
     
    207243                                _log.debug("no host specified. Taken from the client socket : " + host +':'+port);
    208244                }
    209 
    210                
    211                 synchronized( this.socketServerLock )
    212                 {
    213                         if (this.socketServer!=null) {
    214                                 if (_log.shouldLog(Log.DEBUG))
    215                                         _log.debug("a socket server is already defined for this destination");
    216                                 throw new SAMException("a socket server is already defined for this destination");
    217                         }
     245                boolean isSSL = Boolean.parseBoolean(props.getProperty("SSL"));
     246                if (_acceptors.get() > 0) {
     247                        if (_log.shouldWarn())
     248                                _log.warn("an accepting server is already defined for this destination");
     249                        throw new SAMException("an accepting server is already defined for this destination");
     250                }
     251                synchronized(this.socketServerLock) {
     252                        if (this.socketServer!=null) {
     253                                if (_log.shouldWarn())
     254                                        _log.warn("a forwarding server is already defined for this destination");
     255                                throw new SAMException("a forwarding server is already defined for this destination");
     256                        }
    218257                        this.socketServer = this.socketMgr.getServerSocket();
    219258                }
    220259               
    221                 SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose);
     260                SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, this, verbose, sendPorts);
    222261                (new I2PAppThread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
    223262            }
    224263           
     264            /**
     265             *  Forward sockets from I2P to the host/port provided
     266             */
    225267            private static class SocketForwarder implements Runnable
    226268            {
     
    228270                private final int port;
    229271                private final SAMv3StreamSession session;
    230                 private final boolean verbose;
    231                
    232                 SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose) {
     272                private final boolean isSSL, verbose, sendPorts;
     273               
     274                SocketForwarder(String host, int port, boolean isSSL,
     275                                SAMv3StreamSession session, boolean verbose, boolean sendPorts) {
    233276                        this.host = host ;
    234277                        this.port = port ;
    235278                        this.session = session ;
    236279                        this.verbose = verbose ;
     280                        this.sendPorts = sendPorts;
     281                        this.isSSL = isSSL;
    237282                }
    238283               
     
    242287                               
    243288                                // wait and accept a connection from I2P side
    244                                 I2PSocket i2ps = null ;
     289                                I2PSocket i2ps;
    245290                                try {
    246291                                        i2ps = session.getSocketServer().accept();
    247                                 } catch (Exception e) {}
     292                                        if (i2ps == null)
     293                                                continue;
     294                                } catch (SocketTimeoutException ste) {
     295                                        continue;
     296                                } catch (ConnectException ce) {
     297                                        Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMv3StreamSession.class);
     298                                        if (log.shouldLog(Log.WARN))
     299                                                log.warn("Error accepting", ce);
     300                                        try { Thread.sleep(50); } catch (InterruptedException ie) {}
     301                                        continue;
     302                                } catch (I2PException ipe) {
     303                                        Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMv3StreamSession.class);
     304                                        if (log.shouldLog(Log.WARN))
     305                                                log.warn("Error accepting", ipe);
     306                                        break;
     307                                }
     308
     309                                // open a socket towards client
    248310                               
    249                                 if (i2ps==null) {
    250                                         continue ;
    251                                 }
    252 
    253                                 // open a socket towards client
    254                                 java.net.InetSocketAddress addr = new java.net.InetSocketAddress(host,port);
    255                                
    256                                 SocketChannel clientServerSock = null ;
     311                                SocketChannel clientServerSock;
    257312                                try {
    258                                         clientServerSock = SocketChannel.open(addr) ;
    259                                 }
    260                                 catch ( IOException e ) {
    261                                         continue ;
     313                                        if (isSSL) {
     314                                                I2PAppContext ctx =  I2PAppContext.getGlobalContext();
     315                                                synchronized(SAMv3StreamSession.class) {
     316                                                        if (_sslSocketFactory == null) {
     317                                                                try {
     318                                                                        _sslSocketFactory = new I2PSSLSocketFactory(
     319                                                                            ctx, true, "certificates/sam");
     320                                                                } catch (GeneralSecurityException gse) {
     321                                                                        Log log = ctx.logManager().getLog(SAMv3StreamSession.class);
     322                                                                        log.error("SSL error", gse);
     323                                                                        try {
     324                                                                                i2ps.close();
     325                                                                        } catch (IOException ee) {}
     326                                                                        throw new RuntimeException("SSL error", gse);
     327                                                                }
     328                                                        }
     329                                                }
     330                                                SSLSocket sock = (SSLSocket) _sslSocketFactory.createSocket(host, port);
     331                                                I2PSSLSocketFactory.verifyHostname(ctx, sock, host);
     332                                                clientServerSock = new SSLSocketChannel(sock);
     333                                        } else {
     334                                                InetSocketAddress addr = new InetSocketAddress(host, port);
     335                                                clientServerSock = SocketChannel.open(addr) ;
     336                                        }
     337                                } catch (IOException ioe) {
     338                                        Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMv3StreamSession.class);
     339                                        if (log.shouldLog(Log.WARN))
     340                                                log.warn("Error forwarding", ioe);
     341                                        try {
     342                                                i2ps.close();
     343                                        } catch (IOException ee) {}
     344                                        continue;
    262345                                }
    263346
     
    265348                                try {
    266349                                        clientServerSock.socket().setKeepAlive(true);
    267                                         if (this.verbose)
    268                                                 SAMv3Handler.notifyStreamIncomingConnection(
     350                                        if (this.verbose) {
     351                                                if (sendPorts) {
     352                                                       SAMv3Handler.notifyStreamIncomingConnection(
     353                                                                clientServerSock, i2ps.getPeerDestination(),
     354                                                                i2ps.getPort(), i2ps.getLocalPort());
     355                                                } else {
     356                                                       SAMv3Handler.notifyStreamIncomingConnection(
    269357                                                                clientServerSock, i2ps.getPeerDestination());
     358                                                }
     359                                        }
    270360                                        ReadableByteChannel fromClient = clientServerSock ;
    271361                                        ReadableByteChannel fromI2P    = Channels.newChannel(i2ps.getInputStream());
     
    348438            }
    349439           
    350             public I2PServerSocket getSocketServer()
     440            private I2PServerSocket getSocketServer()
    351441            {
    352442                synchronized ( this.socketServerLock ) {
     
    391481            }
    392482
    393             public boolean sendBytes(String s, byte[] b) throws DataFormatException
     483            /**
     484             *  Unsupported
     485             *  @throws DataFormatException always
     486             */
     487            public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException
    394488            {
    395489                throw new DataFormatException(null);
  • apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java

    r8d7edaae r9ce8fce  
    88public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListener {
    99    public void destReplyReceived(String publicKey, String privateKey) {}
    10     public void helloReplyReceived(boolean ok) {}
     10    public void helloReplyReceived(boolean ok, String version) {}
    1111    public void namingReplyReceived(String name, String result, String value, String message) {}
    1212    public void sessionStatusReceived(String result, String destination, String message) {}
    13     public void streamClosedReceived(String result, int id, String message) {}
    14     public void streamConnectedReceived(String remoteDestination, int id) {}
    15     public void streamDataReceived(int id, byte[] data, int offset, int length) {}
    16     public void streamStatusReceived(String result, int id, String message) {}
     13    public void streamClosedReceived(String result, String id, String message) {}
     14    public void streamConnectedReceived(String remoteDestination, String id) {}
     15    public void streamDataReceived(String id, byte[] data, int offset, int length) {}
     16    public void streamStatusReceived(String result, String id, String message) {}
     17    public void datagramReceived(String dest, byte[] data, int offset, int length, int fromPort, int toPort) {}
     18    public void rawReceived(byte[] data, int offset, int length, int fromPort, int toPort, int protocol) {}
     19    public void pingReceived(String data) {}
     20    public void pongReceived(String data) {}
    1721    public void unknownMessageReceived(String major, String minor, Properties params) {}
    1822}
  • apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java

    r8d7edaae r9ce8fce  
    1414public class SAMEventHandler extends SAMClientEventListenerImpl {
    1515    //private I2PAppContext _context;
    16     private Log _log;
     16    private final Log _log;
    1717    private Boolean _helloOk;
    18     private Object _helloLock = new Object();
     18    private String _version;
     19    private final Object _helloLock = new Object();
    1920    private Boolean _sessionCreateOk;
    20     private Object _sessionCreateLock = new Object();
    21     private Object _namingReplyLock = new Object();
    22     private Map<String,String> _namingReplies = new HashMap<String,String>();
     21    private Boolean _streamStatusOk;
     22    private final Object _sessionCreateLock = new Object();
     23    private final Object _namingReplyLock = new Object();
     24    private final Object _streamStatusLock = new Object();
     25    private final Map<String,String> _namingReplies = new HashMap<String,String>();
    2326
    2427    public SAMEventHandler(I2PAppContext ctx) {
     
    2730    }
    2831   
    29         @Override
    30     public void helloReplyReceived(boolean ok) {
     32    @Override
     33    public void helloReplyReceived(boolean ok, String version) {
    3134        synchronized (_helloLock) {
    3235            if (ok)
     
    3437            else
    3538                _helloOk = Boolean.FALSE;
     39            _version = version;
    3640            _helloLock.notifyAll();
    3741        }
    3842    }
    3943
    40         @Override
     44    @Override
    4145    public void sessionStatusReceived(String result, String destination, String msg) {
    4246        synchronized (_sessionCreateLock) {
     
    4953    }
    5054
    51         @Override
     55    @Override
    5256    public void namingReplyReceived(String name, String result, String value, String msg) {
    5357        synchronized (_namingReplyLock) {
     
    6064    }
    6165
    62         @Override
     66    @Override
     67    public void streamStatusReceived(String result, String id, String message) {
     68        synchronized (_streamStatusLock) {
     69            if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result))
     70                _streamStatusOk = Boolean.TRUE;
     71            else
     72                _streamStatusOk = Boolean.FALSE;
     73            _streamStatusLock.notifyAll();
     74        }
     75    }
     76
     77    @Override
    6378    public void unknownMessageReceived(String major, String minor, Properties params) {
    64         _log.error("wrt, [" + major + "] [" + minor + "] [" + params + "]");
     79        _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]");
    6580    }
    6681
     
    7186
    7287    /**
    73      * Wait for the connection to be established, returning true if everything
     88     * Wait for the connection to be established, returning the server version if everything
    7489     * went ok
    75      * @return true if everything ok
     90     * @return SAM server version if everything ok, or null on failure
    7691     */
    77     public boolean waitForHelloReply() {
     92    public String waitForHelloReply() {
    7893        while (true) {
    7994            try {
     
    8297                        _helloLock.wait();
    8398                    else
    84                         return _helloOk.booleanValue();
     99                        return _helloOk.booleanValue() ? _version : null;
    85100                }
    86             } catch (InterruptedException ie) {}
     101            } catch (InterruptedException ie) { return null; }
    87102        }
    88103    }
     
    102117                        return _sessionCreateOk.booleanValue();
    103118                }
    104             } catch (InterruptedException ie) {}
     119            } catch (InterruptedException ie) { return false; }
     120        }
     121    }
     122
     123    /**
     124     * Wait for the stream to be created, returning true if everything went ok
     125     *
     126     * @return true if everything ok
     127     */
     128    public boolean waitForStreamStatusReply() {
     129        while (true) {
     130            try {
     131                synchronized (_streamStatusLock) {
     132                    if (_streamStatusOk == null)
     133                        _streamStatusLock.wait();
     134                    else
     135                        return _streamStatusOk.booleanValue();
     136                }
     137            } catch (InterruptedException ie) { return false; }
    105138        }
    106139    }
     
    129162                    }
    130163                }
    131             } catch (InterruptedException ie) {}
     164            } catch (InterruptedException ie) { return null; }
    132165        }
    133166    }
  • apps/sam/java/src/net/i2p/sam/client/SAMReader.java

    r8d7edaae r9ce8fce  
    1717 */
    1818public class SAMReader {
    19     private Log _log;
    20     private InputStream _inRaw;
    21     private SAMClientEventListener _listener;
    22     private boolean _live;
     19    private final Log _log;
     20    private final InputStream _inRaw;
     21    private final SAMClientEventListener _listener;
     22    private volatile boolean _live;
     23    private Thread _thread;
    2324   
    2425    public SAMReader(I2PAppContext context, InputStream samIn, SAMClientEventListener listener) {
     
    2829    }
    2930   
    30     public void startReading() {
     31    public synchronized void startReading() {
     32        if (_live)
     33            throw new IllegalStateException();
    3134        _live = true;
    3235        I2PAppThread t = new I2PAppThread(new Runner(), "SAM reader");
    3336        t.start();
    34     }
    35     public void stopReading() { _live = false; }
     37        _thread = t;
     38    }
     39
     40    public synchronized void stopReading() {
     41        _live = false;
     42        if (_thread != null) {
     43            _thread.interrupt();
     44            _thread = null;
     45            try { _inRaw.close(); } catch (IOException ioe) {}
     46        }
     47    }
    3648   
    3749    /**
     
    6173        public static final String NAMING_REPLY_KEY_NOT_FOUND = "KEY_NOT_FOUND";
    6274       
    63         public void helloReplyReceived(boolean ok);
     75        public void helloReplyReceived(boolean ok, String version);
    6476        public void sessionStatusReceived(String result, String destination, String message);
    65         public void streamStatusReceived(String result, int id, String message);
    66         public void streamConnectedReceived(String remoteDestination, int id);
    67         public void streamClosedReceived(String result, int id, String message);
    68         public void streamDataReceived(int id, byte data[], int offset, int length);
     77        public void streamStatusReceived(String result, String id, String message);
     78        public void streamConnectedReceived(String remoteDestination, String id);
     79        public void streamClosedReceived(String result, String id, String message);
     80        public void streamDataReceived(String id, byte data[], int offset, int length);
    6981        public void namingReplyReceived(String name, String result, String value, String message);
    7082        public void destReplyReceived(String publicKey, String privateKey);
     83        public void datagramReceived(String dest, byte[] data, int offset, int length, int fromPort, int toPort);
     84        public void rawReceived(byte[] data, int offset, int length, int fromPort, int toPort, int protocol);
     85        public void pingReceived(String data);
     86        public void pongReceived(String data);
    7187       
    7288        public void unknownMessageReceived(String major, String minor, Properties params);
     
    88104                    }
    89105                    if (c == -1) {
    90                         _log.error("Error reading from the SAM bridge");
    91                         return;
     106                        _log.info("EOF reading from the SAM bridge");
     107                        break;
    92108                    }
    93109                } catch (IOException ioe) {
    94110                    _log.error("Error reading from SAM", ioe);
     111                    break;
    95112                }
    96113               
     
    98115                baos.reset();
    99116               
    100                 if (line == null) {
    101                     _log.info("No more data from the SAM bridge");
     117                if (_log.shouldDebug())
     118                    _log.debug("Line read from the bridge: " + line);
     119               
     120                StringTokenizer tok = new StringTokenizer(line);
     121               
     122                if (tok.countTokens() <= 0) {
     123                    _log.error("Invalid SAM line: [" + line + "]");
    102124                    break;
    103125                }
    104126               
    105                 _log.debug("Line read from the bridge: " + line);
    106                
    107                 StringTokenizer tok = new StringTokenizer(line);
    108                
    109                 if (tok.countTokens() < 2) {
    110                     _log.error("Invalid SAM line: [" + line + "]");
    111                     _live = false;
    112                     return;
    113                 }
    114                
    115127                String major = tok.nextToken();
    116                 String minor = tok.nextToken();
     128                String minor = tok.hasMoreTokens() ? tok.nextToken() : "";
    117129               
    118130                params.clear();
     
    133145                processEvent(major, minor, params);
    134146            }
     147            _live = false;
     148            if (_log.shouldWarn())
     149                _log.warn("SAMReader exiting");
    135150        }
    136151    }
     
    145160            if ("REPLY".equals(minor)) {
    146161                String result = params.getProperty("RESULT");
    147                 if ("OK".equals(result))
    148                     _listener.helloReplyReceived(true);
     162                String version= params.getProperty("VERSION");
     163                if ("OK".equals(result) && version != null)
     164                    _listener.helloReplyReceived(true, version);
    149165                else
    150                     _listener.helloReplyReceived(false);
     166                    _listener.helloReplyReceived(false, version);
    151167            } else {
    152168                _listener.unknownMessageReceived(major, minor, params);
     
    166182                String id = params.getProperty("ID");
    167183                String msg = params.getProperty("MESSAGE");
    168                 if (id != null) {
    169                     try {
    170                         _listener.streamStatusReceived(result, Integer.parseInt(id), msg);
    171                     } catch (NumberFormatException nfe) {
    172                         _listener.unknownMessageReceived(major, minor, params);
    173                     }
    174                 } else {
    175                     _listener.unknownMessageReceived(major, minor, params);
    176                 }
     184                // id is null in v3, so pass it through regardless
     185                //if (id != null) {
     186                    _listener.streamStatusReceived(result, id, msg);
     187                //} else {
     188                //    _listener.unknownMessageReceived(major, minor, params);
     189                //}
    177190            } else if ("CONNECTED".equals(minor)) {
    178191                String dest = params.getProperty("DESTINATION");
    179192                String id = params.getProperty("ID");
    180193                if (id != null) {
    181                     try {
    182                         _listener.streamConnectedReceived(dest, Integer.parseInt(id));
    183                     } catch (NumberFormatException nfe) {
    184                         _listener.unknownMessageReceived(major, minor, params);
    185                     }
     194                    _listener.streamConnectedReceived(dest, id);
    186195                } else {
    187196                    _listener.unknownMessageReceived(major, minor, params);
     
    192201                String msg = params.getProperty("MESSAGE");
    193202                if (id != null) {
    194                     try {
    195                         _listener.streamClosedReceived(result, Integer.parseInt(id), msg);
    196                     } catch (NumberFormatException nfe) {
    197                         _listener.unknownMessageReceived(major, minor, params);
    198                     }
     203                    _listener.streamClosedReceived(result, id, msg);
    199204                } else {
    200205                    _listener.unknownMessageReceived(major, minor, params);
     
    205210                if (id != null) {
    206211                    try {
    207                         int idVal = Integer.parseInt(id);
    208212                        int sizeVal = Integer.parseInt(size);
    209213                       
     
    213217                            _listener.unknownMessageReceived(major, minor, params);
    214218                        } else {
    215                             _listener.streamDataReceived(idVal, data, 0, sizeVal);
     219                            _listener.streamDataReceived(id, data, 0, sizeVal);
    216220                        }
    217221                    } catch (NumberFormatException nfe) {
     
    222226                    }
    223227                } else {
     228                    _listener.unknownMessageReceived(major, minor, params);
     229                }
     230            } else {
     231                _listener.unknownMessageReceived(major, minor, params);
     232            }
     233        } else if ("DATAGRAM".equals(major)) {
     234            if ("RECEIVED".equals(minor)) {
     235                String dest = params.getProperty("DESTINATION");
     236                String size = params.getProperty("SIZE");
     237                String fp = params.getProperty("FROM_PORT");
     238                String tp = params.getProperty("TO_PORT");
     239                int fromPort = 0;
     240                int toPort = 0;
     241                if (dest != null) {
     242                    try {
     243                      if (fp != null)
     244                          fromPort = Integer.parseInt(fp);
     245                      if (tp != null)
     246                          toPort = Integer.parseInt(tp);
     247                        int sizeVal = Integer.parseInt(size);
     248                        byte data[] = new byte[sizeVal];
     249                        int read = DataHelper.read(_inRaw, data);
     250                        if (read != sizeVal) {
     251                            _listener.unknownMessageReceived(major, minor, params);
     252                        } else {
     253                            _listener.datagramReceived(dest, data, 0, sizeVal, fromPort, toPort);
     254                        }
     255                    } catch (NumberFormatException nfe) {
     256                        _listener.unknownMessageReceived(major, minor, params);
     257                    } catch (IOException ioe) {
     258                        _live = false;
     259                        _listener.unknownMessageReceived(major, minor, params);
     260                    }
     261                } else {
     262                    _listener.unknownMessageReceived(major, minor, params);
     263                }
     264            } else {
     265                _listener.unknownMessageReceived(major, minor, params);
     266            }
     267        } else if ("RAW".equals(major)) {
     268            if ("RECEIVED".equals(minor)) {
     269                String size = params.getProperty("SIZE");
     270                String fp = params.getProperty("FROM_PORT");
     271                String tp = params.getProperty("TO_PORT");
     272                String pr = params.getProperty("PROTOCOL");
     273                int fromPort = 0;
     274                int toPort = 0;
     275                int protocol = 18;
     276                try {
     277                    if (fp != null)
     278                        fromPort = Integer.parseInt(fp);
     279                    if (tp != null)
     280                        toPort = Integer.parseInt(tp);
     281                    if (pr != null)
     282                        protocol = Integer.parseInt(pr);
     283                    int sizeVal = Integer.parseInt(size);
     284                    byte data[] = new byte[sizeVal];
     285                    int read = DataHelper.read(_inRaw, data);
     286                    if (read != sizeVal) {
     287                        _listener.unknownMessageReceived(major, minor, params);
     288                    } else {
     289                        _listener.rawReceived(data, 0, sizeVal, fromPort, toPort, protocol);
     290                    }
     291                } catch (NumberFormatException nfe) {
     292                    _listener.unknownMessageReceived(major, minor, params);
     293                } catch (IOException ioe) {
     294                    _live = false;
    224295                    _listener.unknownMessageReceived(major, minor, params);
    225296                }
     
    245316                _listener.unknownMessageReceived(major, minor, params);
    246317            }
     318        } else if ("PING".equals(major)) {
     319            // this omits anything after a space
     320            _listener.pingReceived(minor);
     321        } else if ("PONG".equals(major)) {
     322            // this omits anything after a space
     323            _listener.pongReceived(minor);
    247324        } else {
    248325            _listener.unknownMessageReceived(major, minor, params);
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    r8d7edaae r9ce8fce  
    11package net.i2p.sam.client;
    22
     3import java.io.ByteArrayOutputStream;
     4import java.io.File;
    35import java.io.FileInputStream;
    46import java.io.IOException;
    57import java.io.InputStream;
    68import java.io.OutputStream;
     9import java.net.DatagramPacket;
     10import java.net.DatagramSocket;
     11import java.net.InetSocketAddress;
    712import java.net.Socket;
     13import java.security.GeneralSecurityException;
    814import java.util.HashMap;
    915import java.util.Map;
     16import javax.net.ssl.SSLSocket;
     17
     18import gnu.getopt.Getopt;
    1019
    1120import net.i2p.I2PAppContext;
     21import net.i2p.data.Base32;
    1222import net.i2p.data.DataHelper;
    1323import net.i2p.util.I2PAppThread;
     24import net.i2p.util.I2PSSLSocketFactory;
    1425import net.i2p.util.Log;
     26import net.i2p.util.VersionComparator;
    1527
    1628/**
    17  * Send a file to a peer
     29 * Swiss army knife tester.
     30 * Sends a file (datafile) to a peer (b64 dest in peerDestFile).
    1831 *
    19  * Usage: SAMStreamSend samHost samPort peerDestFile dataFile
     32 * Usage: SAMStreamSend [options] peerDestFile dataFile
     33 *
     34 * See apps/sam/doc/README-test.txt for info on test setup.
     35 * Sends data in one of 5 modes.
     36 * Optionally uses SSL.
     37 * Configurable SAM client version.
    2038 *
    2139 */
    2240public class SAMStreamSend {
    23     private I2PAppContext _context;
    24     private Log _log;
    25     private String _samHost;
    26     private String _samPort;
    27     private String _destFile;
    28     private String _dataFile;
     41    private final I2PAppContext _context;
     42    private final Log _log;
     43    private final String _samHost;
     44    private final String _samPort;
     45    private final String _destFile;
     46    private final String _dataFile;
    2947    private String _conOptions;
    30     private Socket _samSocket;
    31     private OutputStream _samOut;
    32     private InputStream _samIn;
    33     private SAMReader _reader;
     48    private SAMReader _reader, _reader2;
     49    private boolean _isV3;
     50    private boolean _isV32;
     51    private String _v3ID;
    3452    //private boolean _dead;
    35     private SAMEventHandler _eventHandler;
    3653    /** Connection id (Integer) to peer (Flooder) */
    37     private Map<Integer, Sender> _remotePeers;
    38    
     54    private final Map<String, Sender> _remotePeers;
     55    private static I2PSSLSocketFactory _sslSocketFactory;
     56   
     57    private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
     58    private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] peerDestFile dataDir\n" +
     59                                        "       modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
     60                                        "       -s: use SSL\n" +
     61                                        "       multiple -o session options are allowed";
     62
    3963    public static void main(String args[]) {
    40         if (args.length < 4) {
    41             System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile");
     64        Getopt g = new Getopt("SAM", args, "sb:m:o:p:u:v:w:");
     65        boolean isSSL = false;
     66        int mode = STREAM;
     67        String version = "1.0";
     68        String host = "127.0.0.1";
     69        String port = "7656";
     70        String user = null;
     71        String password = null;
     72        String opts = "";
     73        int c;
     74        while ((c = g.getopt()) != -1) {
     75          switch (c) {
     76            case 's':
     77                isSSL = true;
     78                break;
     79
     80            case 'm':
     81                mode = Integer.parseInt(g.getOptarg());
     82                if (mode < 0 || mode > V1RAW) {
     83                    System.err.println(USAGE);
     84                    return;
     85                }
     86                break;
     87
     88            case 'v':
     89                version = g.getOptarg();
     90                break;
     91
     92            case 'b':
     93                host = g.getOptarg();
     94                break;
     95
     96            case 'o':
     97                opts = opts + ' ' + g.getOptarg();
     98                break;
     99
     100            case 'p':
     101                port = g.getOptarg();
     102                break;
     103
     104            case 'u':
     105                user = g.getOptarg();
     106                break;
     107
     108            case 'w':
     109                password = g.getOptarg();
     110                break;
     111
     112            case 'h':
     113            case '?':
     114            case ':':
     115            default:
     116                System.err.println(USAGE);
     117                return;
     118          }  // switch
     119        } // while
     120
     121        int startArgs = g.getOptind();
     122        if (args.length - startArgs != 2) {
     123            System.err.println(USAGE);
    42124            return;
    43125        }
    44         I2PAppContext ctx = new I2PAppContext();
    45         //String files[] = new String[args.length - 3];
    46         SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]);
    47         sender.startup();
     126        if ((user == null && password != null) ||
     127            (user != null && password == null)) {
     128            System.err.println("both user and password or neither");
     129            return;
     130        }
     131        if (user != null && password != null && VersionComparator.comp(version, "3.2") < 0) {
     132            System.err.println("user/password require 3.2");
     133            return;
     134        }
     135        I2PAppContext ctx = I2PAppContext.getGlobalContext();
     136        SAMStreamSend sender = new SAMStreamSend(ctx, host, port,
     137                                                      args[startArgs], args[startArgs + 1]);
     138        sender.startup(version, isSSL, mode, user, password, opts);
    48139    }
    49140   
     
    57148        _dataFile = dataFile;
    58149        _conOptions = "";
    59         _eventHandler = new SendEventHandler(_context);
    60         _remotePeers = new HashMap<Integer,Sender>();
    61     }
    62    
    63     public void startup() {
     150        _remotePeers = new HashMap<String, Sender>();
     151    }
     152   
     153    public void startup(String version, boolean isSSL, int mode, String user, String password, String sessionOpts) {
    64154        if (_log.shouldLog(Log.DEBUG))
    65155            _log.debug("Starting up");
    66         boolean ok = connect();
    67         if (_log.shouldLog(Log.DEBUG))
    68             _log.debug("Connected: " + ok);
    69         if (ok) {
    70             _reader = new SAMReader(_context, _samIn, _eventHandler);
     156        try {
     157            Socket sock = connect(isSSL);
     158            SAMEventHandler eventHandler = new SendEventHandler(_context);
     159            _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
    71160            _reader.startReading();
    72161            if (_log.shouldLog(Log.DEBUG))
    73162                _log.debug("Reader created");
    74             String ourDest = handshake();
     163            OutputStream out = sock.getOutputStream();
     164            String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
     165            if (ourDest == null)
     166                throw new IOException("handshake failed");
    75167            if (_log.shouldLog(Log.DEBUG))
    76168                _log.debug("Handshake complete.  we are " + ourDest);
    77             if (ourDest != null) {
    78                 send();
    79             }
     169            if (_isV3 && mode == STREAM) {
     170                Socket sock2 = connect(isSSL);
     171                eventHandler = new SendEventHandler(_context);
     172                _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
     173                _reader2.startReading();
     174                if (_log.shouldLog(Log.DEBUG))
     175                    _log.debug("Reader2 created");
     176                out = sock2.getOutputStream();
     177                String ok = handshake(out, version, false, eventHandler, mode, user, password, "");
     178                if (ok == null)
     179                    throw new IOException("2nd handshake failed");
     180                if (_log.shouldLog(Log.DEBUG))
     181                    _log.debug("Handshake2 complete.");
     182            }
     183            if (mode == DG || mode == RAW)
     184                out = null;
     185            send(out, eventHandler, mode);
     186        } catch (IOException e) {
     187            _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
     188            if (_reader != null)
     189                _reader.stopReading();
     190            if (_reader2 != null)
     191                _reader2.stopReading();
    80192        }
    81193    }
     
    83195    private class SendEventHandler extends SAMEventHandler {
    84196        public SendEventHandler(I2PAppContext ctx) { super(ctx); }
    85         public void streamClosedReceived(String result, int id, String message) {
     197
     198        @Override
     199        public void streamClosedReceived(String result, String id, String message) {
    86200            Sender sender = null;
    87201            synchronized (_remotePeers) {
    88                 sender = _remotePeers.remove(Integer.valueOf(id));
     202                sender = _remotePeers.remove(id);
    89203            }
    90204            if (sender != null) {
     
    93207                    _log.debug("Connection " + sender.getConnectionId() + " closed to " + sender.getDestination());
    94208            } else {
    95                 _log.error("wtf, not connected to " + id + " but we were just closed?");
    96             }
    97         }
    98     }
    99    
    100     private boolean connect() {
    101         try {
    102             _samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
    103             _samOut = _samSocket.getOutputStream();
    104             _samIn = _samSocket.getInputStream();
    105             return true;
    106         } catch (Exception e) {
    107             _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
    108             return false;
    109         }
    110     }
    111    
    112     private String handshake() {
    113         synchronized (_samOut) {
     209                _log.error("not connected to " + id + " but we were just closed?");
     210            }
     211        }
     212    }
     213   
     214    private Socket connect(boolean isSSL) throws IOException {
     215        int port = Integer.parseInt(_samPort);
     216        if (!isSSL)
     217            return new Socket(_samHost, port);
     218        synchronized(SAMStreamSink.class) {
     219            if (_sslSocketFactory == null) {
     220                try {
     221                    _sslSocketFactory = new I2PSSLSocketFactory(
     222                        _context, true, "certificates/sam");
     223                } catch (GeneralSecurityException gse) {
     224                    throw new IOException("SSL error", gse);
     225                }
     226            }
     227        }
     228        SSLSocket sock = (SSLSocket) _sslSocketFactory.createSocket(_samHost, port);
     229        I2PSSLSocketFactory.verifyHostname(_context, sock, _samHost);
     230        return sock;
     231    }
     232   
     233    /** @return our b64 dest or null */
     234    private String handshake(OutputStream samOut, String version, boolean isMaster,
     235                             SAMEventHandler eventHandler, int mode, String user, String password,
     236                             String opts) {
     237        synchronized (samOut) {
    114238            try {
    115                 _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
    116                 _samOut.flush();
     239                if (user != null && password != null)
     240                    samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes());
     241                else
     242                    samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
     243                samOut.flush();
    117244                if (_log.shouldLog(Log.DEBUG))
    118245                    _log.debug("Hello sent");
    119                 boolean ok = _eventHandler.waitForHelloReply();
    120                 if (_log.shouldLog(Log.DEBUG))
    121                     _log.debug("Hello reply found: " + ok);
     246                String hisVersion = eventHandler.waitForHelloReply();
     247                if (_log.shouldLog(Log.DEBUG))
     248                    _log.debug("Hello reply found: " + hisVersion);
     249                if (hisVersion == null)
     250                    throw new IOException("Hello failed");
     251                if (!isMaster)
     252                    return "OK";
     253                _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
     254                if (_isV3) {
     255                    _isV32 = VersionComparator.comp(hisVersion, "3.2") >= 0;
     256                    byte[] id = new byte[5];
     257                    _context.random().nextBytes(id);
     258                    _v3ID = Base32.encode(id);
     259                    _conOptions = "ID=" + _v3ID;
     260                }
     261                String style;
     262                if (mode == STREAM)
     263                    style = "STREAM";
     264                else if (mode == DG || mode == V1DG)
     265                    style = "DATAGRAM";
     266                else
     267                    style = "RAW";
     268                String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + ' ' + opts + '\n';
     269                samOut.write(req.getBytes());
     270                samOut.flush();
     271                if (_log.shouldLog(Log.DEBUG))
     272                    _log.debug("Session create sent");
     273                boolean ok = eventHandler.waitForSessionCreateReply();
    122274                if (!ok)
    123                     throw new IOException("wtf, hello failed?");
    124                 String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n";
    125                 _samOut.write(req.getBytes());
    126                 _samOut.flush();
    127                 if (_log.shouldLog(Log.DEBUG))
    128                     _log.debug("Session create sent");
    129                 ok = _eventHandler.waitForSessionCreateReply();
     275                    throw new IOException("Session create failed");
    130276                if (_log.shouldLog(Log.DEBUG))
    131277                    _log.debug("Session create reply found: " + ok);
    132278
    133279                req = "NAMING LOOKUP NAME=ME\n";
    134                 _samOut.write(req.getBytes());
    135                 _samOut.flush();
     280                samOut.write(req.getBytes());
     281                samOut.flush();
    136282                if (_log.shouldLog(Log.DEBUG))
    137283                    _log.debug("Naming lookup sent");
    138                 String destination = _eventHandler.waitForNamingReply("ME");
     284                String destination = eventHandler.waitForNamingReply("ME");
    139285                if (_log.shouldLog(Log.DEBUG))
    140286                    _log.debug("Naming lookup reply found: " + destination);
     
    146292                }
    147293                return destination;
    148             } catch (Exception e) {
     294            } catch (IOException e) {
    149295                _log.error("Error handshaking", e);
    150296                return null;
     
    153299    }
    154300   
    155     private void send() {
    156         Sender sender = new Sender();
     301    private void send(OutputStream samOut, SAMEventHandler eventHandler, int mode) throws IOException {
     302        Sender sender = new Sender(samOut, eventHandler, mode);
    157303        boolean ok = sender.openConnection();
    158304        if (ok) {
    159305            I2PAppThread t = new I2PAppThread(sender, "Sender");
    160306            t.start();
     307        } else {
     308            throw new IOException("Sender failed to connect");
    161309        }
    162310    }
    163311   
    164312    private class Sender implements Runnable {
    165         private int _connectionId;
     313        private final String _connectionId;
    166314        private String _remoteDestination;
    167315        private InputStream _in;
    168         private boolean _closed;
     316        private volatile boolean _closed;
    169317        private long _started;
    170318        private long _totalSent;
     319        private final OutputStream _samOut;
     320        private final SAMEventHandler _eventHandler;
     321        private final int _mode;
     322        private final DatagramSocket _dgSock;
     323        private final InetSocketAddress _dgSAM;
    171324       
    172         public Sender() {
    173             _closed = false;
     325        public Sender(OutputStream samOut, SAMEventHandler eventHandler, int mode) throws IOException {
     326            _samOut = samOut;
     327            _eventHandler = eventHandler;
     328            _mode = mode;
     329            if (mode == DG || mode == RAW) {
     330                // samOut will be null
     331                _dgSock = new DatagramSocket();
     332                _dgSAM = new InetSocketAddress(_samHost, 7655);
     333            } else {
     334                _dgSock = null;
     335                _dgSAM = null;
     336            }
     337            synchronized (_remotePeers) {
     338                if (_v3ID != null)
     339                    _connectionId = _v3ID;
     340                else
     341                    _connectionId = Integer.toString(_remotePeers.size() + 1);
     342                _remotePeers.put(_connectionId, Sender.this);
     343            }
    174344        }
    175345       
     
    182352
    183353                _remoteDestination = DataHelper.getUTF8(dest, 0, read);
    184                 synchronized (_remotePeers) {
    185                     _connectionId = _remotePeers.size() + 1;
    186                     _remotePeers.put(Integer.valueOf(_connectionId), Sender.this);
    187                 }
    188354
    189355                _context.statManager().createRateStat("send." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 });
     
    191357                _context.statManager().createRateStat("send." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
    192358               
    193                 byte msg[] = ("STREAM CONNECT ID=" + _connectionId + " DESTINATION=" + _remoteDestination + "\n").getBytes();
    194                 synchronized (_samOut) {
    195                     _samOut.write(msg);
    196                     _samOut.flush();
     359                if (_mode == STREAM) {
     360                    StringBuilder buf = new StringBuilder(1024);
     361                    buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination);
     362                    // not supported until 3.2 but 3.0-3.1 will ignore
     363                    if (_isV3)
     364                        buf.append(" FROM_PORT=1234 TO_PORT=5678");
     365                    buf.append('\n');
     366                    byte[] msg = DataHelper.getASCII(buf.toString());
     367                    synchronized (_samOut) {
     368                        _samOut.write(msg);
     369                        _samOut.flush();
     370                    }
     371                    _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
     372                    boolean ok = _eventHandler.waitForStreamStatusReply();
     373                    if (!ok)
     374                        throw new IOException("STREAM CONNECT failed");
    197375                }
    198376
     
    211389        }
    212390       
    213         public int getConnectionId() { return _connectionId; }
     391        public String getConnectionId() { return _connectionId; }
    214392        public String getDestination() { return _remoteDestination; }
    215393       
     
    225403            _started = _context.clock().now();
    226404            _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
    227             byte data[] = new byte[1024];
     405            final long toSend = (new File(_dataFile)).length();
     406            byte data[] = new byte[8192];
    228407            long lastSend = _context.clock().now();
    229408            while (!_closed) {
     
    240419                        lastSend = now;
    241420                       
    242                         byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes();
    243                         synchronized (_samOut) {
    244                             _samOut.write(msg);
    245                             _samOut.write(data, 0, read);
    246                             _samOut.flush();
     421                        if (_samOut != null) {
     422                            synchronized (_samOut) {
     423                                if (!_isV3 || _mode == V1DG || _mode == V1RAW) {
     424                                    String m;
     425                                    if (_mode == STREAM) {
     426                                        m = "STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n";
     427                                    } else if (_mode == V1DG) {
     428                                        m = "DATAGRAM SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
     429                                    } else if (_mode == V1RAW) {
     430                                        m = "RAW SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
     431                                    } else {
     432                                        throw new IOException("unsupported mode " + _mode);
     433                                    }
     434                                    byte msg[] = DataHelper.getASCII(m);
     435                                    _samOut.write(msg);
     436                                }
     437                                _samOut.write(data, 0, read);
     438                                _samOut.flush();
     439                            }
     440                        } else {
     441                            // real datagrams
     442                            ByteArrayOutputStream baos = new ByteArrayOutputStream(read + 1024);
     443                            baos.write(DataHelper.getASCII("3.0 "));
     444                            baos.write(DataHelper.getASCII(_v3ID));
     445                            baos.write((byte) ' ');
     446                            baos.write(DataHelper.getASCII(_remoteDestination));
     447                            if (_isV32) {
     448                                // only set TO_PORT to test session setting of FROM_PORT
     449                                if (_mode == RAW)
     450                                    baos.write(DataHelper.getASCII(" PROTOCOL=123 TO_PORT=5678"));
     451                                else
     452                                    baos.write(DataHelper.getASCII(" TO_PORT=5678"));
     453                            }
     454                            baos.write((byte) '\n');
     455                            baos.write(data, 0, read);
     456                            byte[] pkt = baos.toByteArray();
     457                            DatagramPacket p = new DatagramPacket(pkt, pkt.length, _dgSAM);
     458                            _dgSock.send(p);
     459                            try { Thread.sleep(25); } catch (InterruptedException ie) {}
    247460                        }
    248461                       
     
    252465                } catch (IOException ioe) {
    253466                    _log.error("Error sending", ioe);
     467                    break;
    254468                }
    255469            }
    256470           
    257             byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
    258             try {
    259                 synchronized (_samOut) {
    260                     _samOut.write(msg);
    261                     _samOut.flush();
    262                 }
    263             } catch (IOException ioe) {
    264                 _log.error("Error closing", ioe);
     471            if (_samOut != null) {
     472                if (_isV3) {
     473                    try {
     474                        _samOut.close();
     475                    } catch (IOException ioe) {
     476                        _log.info("Error closing", ioe);
     477                    }
     478                } else {
     479                    byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes();
     480                    try {
     481                        synchronized (_samOut) {
     482                            _samOut.write(msg);
     483                            _samOut.flush();
     484                            _samOut.close();
     485                        }
     486                    } catch (IOException ioe) {
     487                        _log.info("Error closing", ioe);
     488                    }
     489                }
     490            } else if (_dgSock != null) {
     491                _dgSock.close();
    265492            }
    266493           
    267494            closed();
     495            if (_log.shouldLog(Log.DEBUG))
     496                _log.debug("Runner exiting");
     497            if (toSend != _totalSent)
     498                _log.error("Only sent " + _totalSent + " of " + toSend + " bytes");
     499            if (_reader2 != null)
     500                _reader2.stopReading();
     501            // stop the reader, since we're only doing this once for testing
     502            // you wouldn't do this in a real application
     503            if (_isV3) {
     504                // closing the master socket too fast will kill the data socket flushing through
     505                try {
     506                    Thread.sleep(10000);
     507                } catch (InterruptedException ie) {}
     508            }
     509            _reader.stopReading();
    268510        }
    269511    }
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    r8d7edaae r9ce8fce  
    11package net.i2p.sam.client;
    22
     3import java.io.ByteArrayInputStream;
    34import java.io.File;
    45import java.io.FileOutputStream;
     
    67import java.io.InputStream;
    78import java.io.OutputStream;
     9import java.net.DatagramPacket;
     10import java.net.DatagramSocket;
     11import java.net.ServerSocket;
    812import java.net.Socket;
     13import java.security.GeneralSecurityException;
    914import java.util.HashMap;
    1015import java.util.Map;
     16import java.util.Properties;
     17import javax.net.ssl.SSLSocket;
     18import javax.net.ssl.SSLServerSocket;
     19
     20import gnu.getopt.Getopt;
    1121
    1222import net.i2p.I2PAppContext;
     23import net.i2p.data.Base32;
     24import net.i2p.data.DataHelper;
     25import net.i2p.util.I2PAppThread;
     26import net.i2p.util.I2PSSLSocketFactory;
    1327import net.i2p.util.Log;
     28import net.i2p.util.VersionComparator;
    1429
    1530/**
    16  * Sit around on a SAM destination, receiving lots of data and
    17  * writing it to disk
     31 * Swiss army knife tester.
     32 * Saves our transient b64 destination to myKeyFile where SAMStreamSend can get it.
     33 * Saves received data to a file (in sinkDir).
    1834 *
    19  * Usage: SAMStreamSink samHost samPort myKeyFile sinkDir
     35 * Usage: SAMStreamSink [options] myKeyFile sinkDir
     36 *
     37 * See apps/sam/doc/README-test.txt for info on test setup.
     38 * Receives data in one of 7 modes.
     39 * Optionally uses SSL.
     40 * Configurable SAM client version.
    2041 *
    2142 */
    2243public class SAMStreamSink {
    23     private I2PAppContext _context;
    24     private Log _log;
    25     private String _samHost;
    26     private String _samPort;
    27     private String _destFile;
    28     private String _sinkDir;
     44    private final I2PAppContext _context;
     45    private final Log _log;
     46    private final String _samHost;
     47    private final String _samPort;
     48    private final String _destFile;
     49    private final String _sinkDir;
    2950    private String _conOptions;
    30     private Socket _samSocket;
    31     private OutputStream _samOut;
    32     private InputStream _samIn;
    33     private SAMReader _reader;
    34     //private boolean _dead;
    35     private SAMEventHandler _eventHandler;
     51    private SAMReader _reader, _reader2;
     52    private boolean _isV3;
     53    private boolean _isV32;
     54    private String _v3ID;
    3655    /** Connection id (Integer) to peer (Flooder) */
    37     private Map<Integer, Sink> _remotePeers;
     56    private final Map<String, Sink> _remotePeers;
     57    private static I2PSSLSocketFactory _sslSocketFactory;
    3858   
     59    private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6;
     60    private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" +
     61                                        "       modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4; raw-with-headers: 5; stream-forward: 6\n" +
     62                                        "       -s: use SSL\n" +
     63                                        "       multiple -o session options are allowed";
     64    private static final int V3FORWARDPORT=9998;
     65    private static final int V3DGPORT=9999;
     66
    3967    public static void main(String args[]) {
    40         if (args.length < 4) {
    41             System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir");
     68        Getopt g = new Getopt("SAM", args, "sb:m:p:u:v:w:");
     69        boolean isSSL = false;
     70        int mode = STREAM;
     71        String version = "1.0";
     72        String host = "127.0.0.1";
     73        String port = "7656";
     74        String user = null;
     75        String password = null;
     76        String opts = "";
     77        int c;
     78        while ((c = g.getopt()) != -1) {
     79          switch (c) {
     80            case 's':
     81                isSSL = true;
     82                break;
     83
     84            case 'm':
     85                mode = Integer.parseInt(g.getOptarg());
     86                if (mode < 0 || mode > FORWARD) {
     87                    System.err.println(USAGE);
     88                    return;
     89                }
     90                break;
     91
     92            case 'v':
     93                version = g.getOptarg();
     94                break;
     95
     96            case 'b':
     97                host = g.getOptarg();
     98                break;
     99
     100            case 'o':
     101                opts = opts + ' ' + g.getOptarg();
     102                break;
     103
     104            case 'p':
     105                port = g.getOptarg();
     106                break;
     107
     108            case 'u':
     109                user = g.getOptarg();
     110                break;
     111
     112            case 'w':
     113                password = g.getOptarg();
     114                break;
     115
     116            case 'h':
     117            case '?':
     118            case ':':
     119            default:
     120                System.err.println(USAGE);
     121                return;
     122          }  // switch
     123        } // while
     124
     125        int startArgs = g.getOptind();
     126        if (args.length - startArgs != 2) {
     127            System.err.println(USAGE);
    42128            return;
    43129        }
    44         I2PAppContext ctx = new I2PAppContext();
    45         SAMStreamSink sink = new SAMStreamSink(ctx, args[0], args[1], args[2], args[3]);
    46         sink.startup();
     130        if ((user == null && password != null) ||
     131            (user != null && password == null)) {
     132            System.err.println("both user and password or neither");
     133            return;
     134        }
     135        if (user != null && password != null && VersionComparator.comp(version, "3.2") < 0) {
     136            System.err.println("user/password require 3.2");
     137            return;
     138        }
     139        I2PAppContext ctx = I2PAppContext.getGlobalContext();
     140        SAMStreamSink sink = new SAMStreamSink(ctx, host, port,
     141                                                    args[startArgs], args[startArgs + 1]);
     142        sink.startup(version, isSSL, mode, user, password, opts);
    47143    }
    48144   
     
    50146        _context = ctx;
    51147        _log = ctx.logManager().getLog(SAMStreamSink.class);
    52         //_dead = false;
    53148        _samHost = samHost;
    54149        _samPort = samPort;
     
    56151        _sinkDir = sinkDir;
    57152        _conOptions = "";
    58         _eventHandler = new SinkEventHandler(_context);
    59         _remotePeers = new HashMap<Integer,Sink>();
     153        _remotePeers = new HashMap<String, Sink>();
    60154    }
    61155   
    62     public void startup() {
     156    public void startup(String version, boolean isSSL, int mode, String user, String password, String sessionOpts) {
    63157        if (_log.shouldLog(Log.DEBUG))
    64158            _log.debug("Starting up");
    65         boolean ok = connect();
    66         if (_log.shouldLog(Log.DEBUG))
    67             _log.debug("Connected: " + ok);
    68         if (ok) {
    69             _reader = new SAMReader(_context, _samIn, _eventHandler);
     159        try {
     160            Socket sock = connect(isSSL);
     161            OutputStream out = sock.getOutputStream();
     162            SAMEventHandler eventHandler = new SinkEventHandler(_context, out);
     163            _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
    70164            _reader.startReading();
    71165            if (_log.shouldLog(Log.DEBUG))
    72166                _log.debug("Reader created");
    73             String ourDest = handshake();
     167            String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
     168            if (ourDest == null)
     169                throw new IOException("handshake failed");
    74170            if (_log.shouldLog(Log.DEBUG))
    75171                _log.debug("Handshake complete.  we are " + ourDest);
    76             if (ourDest != null) {
    77                 //boolean written =
    78                 writeDest(ourDest);
    79                 if (_log.shouldLog(Log.DEBUG))
    80                     _log.debug("Dest written");
     172            if (_isV32) {
     173                _log.debug("Starting pinger");
     174                Thread t = new Pinger(out);
     175                t.start();
     176            }
     177            if (_isV3 && (mode == STREAM || mode == FORWARD)) {
     178                // test multiple acceptors, only works in 3.2
     179                int acceptors = (_isV32 && mode == STREAM) ? 4 : 1;
     180                for (int i = 0; i < acceptors; i++) {
     181                    Socket sock2 = connect(isSSL);
     182                    out = sock2.getOutputStream();
     183                    eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
     184                    _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
     185                    _reader2.startReading();
     186                    if (_log.shouldLog(Log.DEBUG))
     187                        _log.debug("Reader " + (2 + i) + " created");
     188                    String ok = handshake(out, version, false, eventHandler, mode, user, password, "");
     189                    if (ok == null)
     190                        throw new IOException("handshake " + (2 + i) + " failed");
     191                    if (_log.shouldLog(Log.DEBUG))
     192                        _log.debug("Handshake " + (2 + i) + " complete.");
     193                }
     194                if (mode == FORWARD) {
     195                    // set up a listening ServerSocket
     196                    (new FwdRcvr(isSSL)).start();
     197                }
     198            } else if (_isV3 && (mode == DG || mode == RAW || mode == RAWHDR)) {
     199                // set up a listening DatagramSocket
     200                (new DGRcvr(mode)).start();
     201            }
     202            writeDest(ourDest);
     203        } catch (IOException e) {
     204            _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
     205        }
     206    }
     207
     208    private class DGRcvr extends I2PAppThread {
     209        private final int _mode;
     210
     211        public DGRcvr(int mode) { _mode = mode; }
     212
     213        public void run() {
     214            byte[] buf = new byte[32768];
     215            try {
     216                Sink sink = new Sink("FAKE", "FAKEFROM");
     217                DatagramSocket dg = new DatagramSocket(V3DGPORT);
     218                while (true) {
     219                    DatagramPacket p = new DatagramPacket(buf, 32768);
     220                    dg.receive(p);
     221                    int len = p.getLength();
     222                    int off = p.getOffset();
     223                    byte[] data = p.getData();
     224                    _log.info("Got datagram length " + len);
     225                    if (_mode == DG || _mode == RAWHDR) {
     226                        ByteArrayInputStream bais = new ByteArrayInputStream(data, off, len);
     227                        String line = DataHelper.readLine(bais);
     228                        if (line == null) {
     229                            _log.error("DGRcvr no header line");
     230                            continue;
     231                        }
     232                        if (_mode == DG && line.length() < 516) {
     233                            _log.error("DGRcvr line too short: \"" + line + '\n');
     234                            continue;
     235                        }
     236                        String[] parts = line.split(" ");
     237                        int i = 0;
     238                        if (_mode == DG) {
     239                            String dest = parts[0];
     240                            _log.info("DG is from " + dest);
     241                            i++;
     242                        }
     243                        for ( ; i < parts.length; i++) {
     244                            _log.info("Parameter: " + parts[i]);
     245                        }
     246                        int left = bais.available();
     247                        sink.received(data, off + len - left, left);
     248                    } else {
     249                        sink.received(data, off, len);
     250                    }
     251                }
     252            } catch (IOException ioe) {
     253                _log.error("DGRcvr", ioe);
     254            }
     255        }
     256    }
     257
     258    private class FwdRcvr extends I2PAppThread {
     259        private final boolean _isSSL;
     260
     261        public FwdRcvr(boolean isSSL) {
     262            if (isSSL)
     263                throw new UnsupportedOperationException("TODO");
     264            _isSSL = isSSL;
     265        }
     266
     267        public void run() {
     268            try {
     269                ServerSocket ss;
     270                if (_isSSL) {
     271                    throw new UnsupportedOperationException("TODO");
     272                } else {
     273                    ss = new ServerSocket(V3FORWARDPORT);
     274                }
     275                while (true) {
     276                    Socket s = ss.accept();
     277                    Sink sink = new Sink("FAKE", "FAKEFROM");
     278                    try {
     279                        InputStream in = s.getInputStream();
     280                        byte[] buf = new byte[32768];
     281                        int len;
     282                        while((len = in.read(buf)) >= 0) {
     283                            sink.received(buf, 0, len);
     284                        }
     285                        sink.closed();
     286                    } catch (IOException ioe) {
     287                        _log.error("Fwdcvr", ioe);
     288                    }
     289                }
     290            } catch (IOException ioe) {
     291                _log.error("Fwdcvr", ioe);
     292            }
     293        }
     294    }
     295
     296    private static class Pinger extends I2PAppThread {
     297        private final OutputStream _out;
     298
     299        public Pinger(OutputStream out) {
     300            super("SAM Sink Pinger");
     301            setDaemon(true);
     302            _out = out;
     303        }
     304
     305        public void run() {
     306            while (true) {
     307                try {
     308                    Thread.sleep(127*1000);
     309                    synchronized(_out) {
     310                        _out.write(DataHelper.getASCII("PING " + System.currentTimeMillis() + '\n'));
     311                        _out.flush();
     312                    }
     313                } catch (InterruptedException ie) {
     314                    break;
     315                } catch (IOException ioe) {
     316                    break;
     317                }
    81318            }
    82319        }
     
    85322    private class SinkEventHandler extends SAMEventHandler {
    86323
    87         public SinkEventHandler(I2PAppContext ctx) { super(ctx); }
     324        protected final OutputStream _out;
     325
     326        public SinkEventHandler(I2PAppContext ctx, OutputStream out) {
     327            super(ctx);
     328            _out = out;
     329        }
    88330
    89331        @Override
    90         public void streamClosedReceived(String result, int id, String message) {
    91             Sink sink = null;
     332        public void streamClosedReceived(String result, String id, String message) {
     333            Sink sink;
    92334            synchronized (_remotePeers) {
    93                 sink = _remotePeers.remove(Integer.valueOf(id));
     335                sink = _remotePeers.remove(id);
    94336            }
    95337            if (sink != null) {
     
    98340                    _log.debug("Connection " + sink.getConnectionId() + " closed to " + sink.getDestination());
    99341            } else {
    100                 _log.error("wtf, not connected to " + id + " but we were just closed?");
     342                _log.error("not connected to " + id + " but we were just closed?");
    101343            }
    102344        }
    103345
    104346        @Override
    105         public void streamDataReceived(int id, byte data[], int offset, int length) {
    106             Sink sink = null;
     347        public void streamDataReceived(String id, byte data[], int offset, int length) {
     348            Sink sink;
    107349            synchronized (_remotePeers) {
    108                 sink = _remotePeers.get(Integer.valueOf(id));
     350                sink = _remotePeers.get(id);
    109351            }
    110352            if (sink != null) {
    111353                sink.received(data, offset, length);
    112354            } else {
    113                 _log.error("wtf, not connected to " + id + " but we received " + length + "?");
     355                _log.error("not connected to " + id + " but we received " + length + "?");
    114356            }
    115357        }
    116358
    117359        @Override
    118         public void streamConnectedReceived(String dest, int id) { 
     360        public void streamConnectedReceived(String dest, String id) { 
    119361            if (_log.shouldLog(Log.DEBUG))
    120362                _log.debug("Connection " + id + " received from " + dest);
     
    123365                Sink sink = new Sink(id, dest);
    124366                synchronized (_remotePeers) {
    125                     _remotePeers.put(Integer.valueOf(id), sink);
     367                    _remotePeers.put(id, sink);
    126368                }
    127369            } catch (IOException ioe) {
     
    129371            }
    130372        }
     373
     374        @Override
     375        public void pingReceived(String data) {
     376            if (_log.shouldInfo())
     377                _log.info("Got PING " + data + ", sending PONG " + data);
     378            synchronized (_out) {
     379                try {
     380                    _out.write(("PONG " + data + '\n').getBytes());
     381                    _out.flush();
     382                } catch (IOException ioe) {
     383                    _log.error("PONG fail", ioe);
     384                }
     385            }
     386        }
     387
     388        @Override
     389        public void datagramReceived(String dest, byte[] data, int offset, int length, int fromPort, int toPort) {
     390            // just get the first
     391            Sink sink;
     392            synchronized (_remotePeers) {
     393                if (_remotePeers.isEmpty()) {
     394                    _log.error("not connected but we received datagram " + length + "?");
     395                    return;
     396                }
     397                sink = _remotePeers.values().iterator().next();
     398            }
     399            sink.received(data, offset, length);
     400        }
     401
     402        @Override
     403        public void rawReceived(byte[] data, int offset, int length, int fromPort, int toPort, int protocol) {
     404            // just get the first
     405            Sink sink;
     406            synchronized (_remotePeers) {
     407                if (_remotePeers.isEmpty()) {
     408                    _log.error("not connected but we received raw " + length + "?");
     409                    return;
     410                }
     411                sink = _remotePeers.values().iterator().next();
     412            }
     413            sink.received(data, offset, length);
     414        }
     415    }
     416
     417    private class SinkEventHandler2 extends SinkEventHandler {
     418
     419        private final InputStream _in;
     420
     421        public SinkEventHandler2(I2PAppContext ctx, InputStream in, OutputStream out) {
     422            super(ctx, out);
     423            _in = in;
     424        }
     425
     426        @Override
     427        public void streamStatusReceived(String result, String id, String message) {
     428            if (_log.shouldLog(Log.DEBUG))
     429                _log.debug("got STREAM STATUS, result=" + result);
     430            super.streamStatusReceived(result, id, message);
     431            Sink sink = null;
     432            try {
     433                String dest = "TODO_if_not_silent";
     434                sink = new Sink(_v3ID, dest);
     435                synchronized (_remotePeers) {
     436                    _remotePeers.put(_v3ID, sink);
     437                }
     438            } catch (IOException ioe) {
     439                _log.error("Error creating a new sink", ioe);
     440                try { _in.close(); } catch (IOException ioe2) {}
     441                if (sink != null)
     442                    sink.closed();
     443                return;
     444            }
     445            // inline so the reader doesn't grab the data
     446            try {
     447                boolean gotDest = false;
     448                byte[] dest = new byte[1024];
     449                int dlen = 0;
     450                byte buf[] = new byte[4096];
     451                int len;
     452                while((len = _in.read(buf)) >= 0) {
     453                    if (!gotDest) {
     454                        // eat the dest line
     455                        for (int i = 0; i < len; i++) {
     456                            byte b = buf[i];
     457                            if (b == (byte) '\n') {
     458                                gotDest = true;
     459                                if (_log.shouldInfo()) {
     460                                    try {
     461                                        _log.info("Got incoming accept from: \"" + new String(dest, 0, dlen, "ISO-8859-1") + '"');
     462                                    } catch (IOException uee) {}
     463                                }
     464                                // feed any remaining to the sink
     465                                i++;
     466                                if (i < len)
     467                                    sink.received(buf, i, len - i);
     468                                break;
     469                            } else {
     470                                if (dlen < dest.length) {
     471                                    dest[dlen++] = b;
     472                                } else if (dlen == dest.length) {
     473                                    dlen++;
     474                                    _log.error("first line overflow on accept");
     475                                }
     476                            }
     477                        }
     478                    } else {
     479                        sink.received(buf, 0, len);
     480                    }
     481                }
     482                sink.closed();
     483            } catch (IOException ioe) {
     484                _log.error("Error reading", ioe);
     485            } finally {
     486                try { _in.close(); } catch (IOException ioe) {}
     487            }
     488        }
    131489    }
    132490   
    133     private boolean connect() {
    134         try {
    135             _samSocket = new Socket(_samHost, Integer.parseInt(_samPort));
    136             _samOut = _samSocket.getOutputStream();
    137             _samIn = _samSocket.getInputStream();
    138             return true;
    139         } catch (Exception e) {
    140             _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
    141             return false;
    142         }
     491    private Socket connect(boolean isSSL) throws IOException {
     492        int port = Integer.parseInt(_samPort);
     493        if (!isSSL)
     494            return new Socket(_samHost, port);
     495        synchronized(SAMStreamSink.class) {
     496            if (_sslSocketFactory == null) {
     497                try {
     498                    _sslSocketFactory = new I2PSSLSocketFactory(
     499                        _context, true, "certificates/sam");
     500                } catch (GeneralSecurityException gse) {
     501                    throw new IOException("SSL error", gse);
     502                }
     503            }
     504        }
     505        SSLSocket sock = (SSLSocket) _sslSocketFactory.createSocket(_samHost, port);
     506        I2PSSLSocketFactory.verifyHostname(_context, sock, _samHost);
     507        return sock;
    143508    }
    144509   
    145     private String handshake() {
    146         synchronized (_samOut) {
     510    /** @return our b64 dest or null */
     511    private String handshake(OutputStream samOut, String version, boolean isMaster,
     512                             SAMEventHandler eventHandler, int mode, String user, String password,
     513                             String sopts) {
     514        synchronized (samOut) {
    147515            try {
    148                 _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
    149                 _samOut.flush();
     516                if (user != null && password != null)
     517                    samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + " USER=" + user + " PASSWORD=" + password + '\n').getBytes());
     518                else
     519                    samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
     520                samOut.flush();
    150521                if (_log.shouldLog(Log.DEBUG))
    151522                    _log.debug("Hello sent");
    152                 boolean ok = _eventHandler.waitForHelloReply();
     523                String hisVersion = eventHandler.waitForHelloReply();
    153524                if (_log.shouldLog(Log.DEBUG))
    154                     _log.debug("Hello reply found: " + ok);
    155                 if (!ok)
    156                     throw new IOException("wtf, hello failed?");
    157                 String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + _destFile + " " + _conOptions + "\n";
    158                 _samOut.write(req.getBytes());
    159                 _samOut.flush();
     525                    _log.debug("Hello reply found: " + hisVersion);
     526                if (hisVersion == null)
     527                    throw new IOException("Hello failed");
     528                if (!isMaster) {
     529                    // only for v3
     530                    //String req = "STREAM ACCEPT SILENT=true ID=" + _v3ID + "\n";
     531                    // TO_PORT not supported until 3.2 but 3.0-3.1 will ignore
     532                    String req;
     533                    if (mode == STREAM)
     534                        req = "STREAM ACCEPT SILENT=false TO_PORT=5678 ID=" + _v3ID + "\n";
     535                    else if (mode == FORWARD)
     536                        req = "STREAM FORWARD ID=" + _v3ID + " PORT=" + V3FORWARDPORT + '\n';
     537                    else
     538                        throw new IllegalStateException("mode " + mode);
     539                    samOut.write(req.getBytes());
     540                    samOut.flush();
     541                    if (_log.shouldLog(Log.DEBUG))
     542                        _log.debug("STREAM ACCEPT/FORWARD sent");
     543                    if (mode == FORWARD) {
     544                        // docs were wrong, we do not get a STREAM STATUS if SILENT=true for ACCEPT
     545                        boolean ok = eventHandler.waitForStreamStatusReply();
     546                        if (!ok)
     547                            throw new IOException("Stream status failed");
     548                        if (_log.shouldLog(Log.DEBUG))
     549                            _log.debug("got STREAM STATUS, awaiting connection");
     550                    }
     551                    return "OK";
     552                }
     553                _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
     554                String dest;
     555                if (_isV3) {
     556                    _isV32 = VersionComparator.comp(hisVersion, "3.2") >= 0;
     557                    // we use the filename as the name in sam.keys
     558                    // and read it in ourselves
     559                    File keys = new File("sam.keys");
     560                    if (keys.exists()) {
     561                        Properties opts = new Properties();
     562                        DataHelper.loadProps(opts, keys);
     563                        String s = opts.getProperty(_destFile);
     564                        if (s != null) {
     565                            dest = s;
     566                        } else {
     567                            dest = "TRANSIENT";
     568                            (new File(_destFile)).delete();
     569                            if (_log.shouldLog(Log.DEBUG))
     570                                _log.debug("Requesting new transient destination");
     571                        }
     572                    } else {
     573                        dest = "TRANSIENT";
     574                        (new File(_destFile)).delete();
     575                        if (_log.shouldLog(Log.DEBUG))
     576                            _log.debug("Requesting new transient destination");
     577                    }
     578                    if (isMaster) {
     579                        byte[] id = new byte[5];
     580                        _context.random().nextBytes(id);
     581                        _v3ID = Base32.encode(id);
     582                        _conOptions = "ID=" + _v3ID;
     583                    }
     584                } else {
     585                    // we use the filename as the name in sam.keys
     586                    // and give it to the SAM server
     587                    dest = _destFile;
     588                }
     589                String style;
     590                if (mode == STREAM || mode == FORWARD)
     591                    style = "STREAM";
     592                else if (mode == V1DG)
     593                    style = "DATAGRAM";
     594                else if (mode == DG)
     595                    style = "DATAGRAM PORT=" + V3DGPORT;
     596                else if (mode == V1RAW)
     597                    style = "RAW";
     598                else if (mode == RAW)
     599                    style = "RAW PORT=" + V3DGPORT;
     600                else
     601                    style = "RAW HEADER=true PORT=" + V3DGPORT;
     602                String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n';
     603                samOut.write(req.getBytes());
     604                samOut.flush();
    160605                if (_log.shouldLog(Log.DEBUG))
    161606                    _log.debug("Session create sent");
    162                 ok = _eventHandler.waitForSessionCreateReply();
    163                 if (_log.shouldLog(Log.DEBUG))
    164                     _log.debug("Session create reply found: " + ok);
    165 
     607                if (mode == STREAM) {
     608                    boolean ok = eventHandler.waitForSessionCreateReply();
     609                    if (!ok)
     610                        throw new IOException("Session create failed");
     611                    if (_log.shouldLog(Log.DEBUG))
     612                        _log.debug("Session create reply found: " + ok);
     613                }
    166614                req = "NAMING LOOKUP NAME=ME\n";
    167                 _samOut.write(req.getBytes());
    168                 _samOut.flush();
     615                samOut.write(req.getBytes());
     616                samOut.flush();
    169617                if (_log.shouldLog(Log.DEBUG))
    170618                    _log.debug("Naming lookup sent");
    171                 String destination = _eventHandler.waitForNamingReply("ME");
     619                String destination = eventHandler.waitForNamingReply("ME");
    172620                if (_log.shouldLog(Log.DEBUG))
    173621                    _log.debug("Naming lookup reply found: " + destination);
     
    175623                    _log.error("No naming lookup reply found!");
    176624                    return null;
    177                 } else {
     625                }
     626                if (_log.shouldInfo())
    178627                    _log.info(_destFile + " is located at " + destination);
     628                if (mode == V1DG || mode == V1RAW) {
     629                    // fake it so the sink starts
     630                    eventHandler.streamConnectedReceived(destination, "FAKE");
    179631                }
    180632                return destination;
    181             } catch (Exception e) {
     633            } catch (IOException e) {
    182634                _log.error("Error handshaking", e);
    183635                return null;
     
    187639   
    188640    private boolean writeDest(String dest) {
     641        File f = new File(_destFile);
     642/*
     643        if (f.exists()) {
     644            if (_log.shouldLog(Log.DEBUG))
     645                _log.debug("Destination file exists, not overwriting: " + _destFile);
     646            return false;
     647        }
     648*/
    189649        FileOutputStream fos = null;
    190650        try {
    191             fos = new FileOutputStream(_destFile);
     651            fos = new FileOutputStream(f);
    192652            fos.write(dest.getBytes());
    193         } catch (Exception e) {
     653            if (_log.shouldLog(Log.DEBUG))
     654                _log.debug("My destination written to " + _destFile);
     655        } catch (IOException e) {
    194656            _log.error("Error writing to " + _destFile, e);
    195657            return false;
     
    201663   
    202664    private class Sink {
    203         private int _connectionId;
    204         private String _remoteDestination;
    205         private boolean _closed;
    206         private long _started;
     665        private final String _connectionId;
     666        private final String _remoteDestination;
     667        private volatile boolean _closed;
     668        private final long _started;
    207669        private long _lastReceivedOn;
    208         private OutputStream _out;
     670        private final OutputStream _out;
    209671       
    210         public Sink(int conId, String remDest) throws IOException {
     672        public Sink(String conId, String remDest) throws IOException {
    211673            _connectionId = conId;
    212674            _remoteDestination = remDest;
     
    222684           
    223685            File out = File.createTempFile("sink", ".dat", sinkDir);
     686            if (_log.shouldWarn())
     687                _log.warn("outputting to " + out);
    224688            _out = new FileOutputStream(out);
     689            _started = _context.clock().now();
    225690        }
    226691       
    227         public int getConnectionId() { return _connectionId; }
     692        public String getConnectionId() { return _connectionId; }
    228693        public String getDestination() { return _remoteDestination; }
    229694       
     
    236701                _out.close();
    237702            } catch (IOException ioe) {
    238                 _log.error("Error closing", ioe);
     703                _log.info("Error closing", ioe);
    239704            }
    240705        }
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionHandler.java

    r8d7edaae r9ce8fce  
    295295        public static final int POISON_MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1;
    296296
    297         public PoisonPacket() {
    298             super(null);
    299             setOptionalDelay(POISON_MAX_DELAY_REQUEST);
     297        @Override
     298        public int getOptionalDelay() { return POISON_MAX_DELAY_REQUEST; }
     299
     300        @Override
     301        public String toString() {
     302            return "POISON";
    300303        }
    301304    }
  • build.properties

    r8d7edaae r9ce8fce  
    4747
    4848# Optional compiler args
     49# This one is for subsystems requiring Java 6
    4950# This one keeps gcj a lot quieter
    5051#javac.compilerargs=-warn:-unchecked,raw,unused,serial
     52# This one is for subsystems requiring Java 7
     53#javac.compilerargs7=
    5154
    5255#
  • core/java/src/net/i2p/util/PasswordManager.java

    r8d7edaae r9ce8fce  
    100100        if (shash == null)
    101101            return false;
     102        return checkHash(shash, pw);
     103    }
     104   
     105    /**
     106     *  Check pw against b64 salt+hash, as generated by createHash()
     107     *
     108     *  @param shash b64 string
     109     *  @param pw plain text non-null, already trimmed
     110     *  @return if pw verified
     111     *  @since 0.9.24
     112     */
     113    public boolean checkHash(String shash, String pw) {
    102114        byte[] shashBytes = Base64.decode(shash);
    103115        if (shashBytes == null || shashBytes.length != SHASH_LENGTH)
     
    111123    }
    112124   
     125    /**
     126     *  Create a salt+hash, to be saved and verified later by verifyHash().
     127     *
     128     *  @param pw plain text non-null, already trimmed
     129     *  @return salted+hash b64 string
     130     *  @since 0.9.24
     131     */
     132    public String createHash(String pw) {
     133        byte[] salt = new byte[SALT_LENGTH];
     134        _context.random().nextBytes(salt);
     135        byte[] pwHash = _context.keyGenerator().generateSessionKey(salt, DataHelper.getUTF8(pw)).getData();
     136        byte[] shashBytes = new byte[SHASH_LENGTH];
     137        System.arraycopy(salt, 0, shashBytes, 0, SALT_LENGTH);
     138        System.arraycopy(pwHash, 0, shashBytes, SALT_LENGTH, SessionKey.KEYSIZE_BYTES);
     139        return Base64.encode(shashBytes);
     140    }
     141
    113142    /**
    114143     *  Either plain or b64
  • installer/install.xml

    r8d7edaae r9ce8fce  
    1010        </authors>
    1111        <url>https://geti2p.net/</url>
    12         <javaversion>1.6</javaversion>
     12        <javaversion>1.7</javaversion>
    1313
    1414        <!-- use pack200 compression, saves about 33%
  • router/java/src/net/i2p/router/util/RouterPasswordManager.java

    r8d7edaae r9ce8fce  
    159159        if (user != null && user.length() > 0)
    160160            pfx += '.' + user;
    161         byte[] salt = new byte[SALT_LENGTH];
    162         _context.random().nextBytes(salt);
    163         byte[] pwHash = _context.keyGenerator().generateSessionKey(salt, DataHelper.getUTF8(pw)).getData();
    164         byte[] shashBytes = new byte[SHASH_LENGTH];
    165         System.arraycopy(salt, 0, shashBytes, 0, SALT_LENGTH);
    166         System.arraycopy(pwHash, 0, shashBytes, SALT_LENGTH, SessionKey.KEYSIZE_BYTES);
    167         String shash = Base64.encode(shashBytes);
     161        String shash = createHash(pw);
    168162        Map<String, String> toAdd = Collections.singletonMap(pfx + PROP_SHASH, shash);
    169163        List<String> toDel = new ArrayList<String>(4);
Note: See TracChangeset for help on using the changeset viewer.