Changeset bd048b0


Ignore:
Timestamp:
Nov 26, 2015 8:55:10 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
6373c8a
Parents:
b9ab933
Message:

Fix ReadLine? bug that buffered and lost input;
can't handle UTF-8 for now.
Start support of datagrams and raw in the client

Location:
apps/sam/java/src/net/i2p/sam
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/ReadLine.java

    rb9ab933 rbd048b0  
    3535            throw new SocketTimeoutException();
    3636        long expires = System.currentTimeMillis() + timeout;
    37         InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8");
     37        // this reads and buffers extra bytes, so we can't use it
     38        // unless we're going to decode UTF-8 on-the-fly, we're stuck with ASCII
     39        //InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8");
     40        InputStream in = socket.getInputStream();
    3841        int c;
    3942        int i = 0;
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    rb9ab933 rbd048b0  
    4040    private final Map<String, Sender> _remotePeers;
    4141   
    42     private static final String USAGE = "Usage: SAMStreamSend [-s] [-d] [-r] [-v version] [-b samHost] [-p samPort] peerDestFile dataDir";
     42    private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
     43    private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] peerDestFile dataDir\n" +
     44                                        "       modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
     45                                        "       -s: use SSL";
    4346
    4447    public static void main(String args[]) {
    45         Getopt g = new Getopt("SAM", args, "drsb:p:v:");
     48        Getopt g = new Getopt("SAM", args, "sb:m:p:v:");
    4649        boolean isSSL = false;
    47         int mode = 0; // stream
     50        int mode = STREAM;
    4851        String version = "1.0";
    4952        String host = "127.0.0.1";
     
    5659                break;
    5760
    58             case 'd':
    59                 mode = 1;  // datagram
    60                 break;
    61 
    62             case 'r':
    63                 mode = 2;  // raw
     61            case 'm':
     62                mode = Integer.parseInt(g.getOptarg());
     63                if (mode < 0 || mode > V1RAW) {
     64                    System.err.println(USAGE);
     65                    return;
     66                }
    6467                break;
    6568
     
    9396        SAMStreamSend sender = new SAMStreamSend(ctx, host, port,
    9497                                                      args[startArgs], args[startArgs + 1]);
    95         sender.startup(version);
     98        sender.startup(version, isSSL, mode);
    9699    }
    97100   
     
    108111    }
    109112   
    110     public void startup(String version) {
     113    public void startup(String version, boolean isSSL, int mode) {
    111114        if (_log.shouldLog(Log.DEBUG))
    112115            _log.debug("Starting up");
    113116        try {
    114             Socket sock = connect();
     117            Socket sock = connect(isSSL);
    115118            SAMEventHandler eventHandler = new SendEventHandler(_context);
    116119            _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
     
    119122                _log.debug("Reader created");
    120123            OutputStream out = sock.getOutputStream();
    121             String ourDest = handshake(out, version, true, eventHandler);
     124            String ourDest = handshake(out, version, true, eventHandler, mode);
    122125            if (ourDest == null)
    123126                throw new IOException("handshake failed");
    124127            if (_log.shouldLog(Log.DEBUG))
    125128                _log.debug("Handshake complete.  we are " + ourDest);
    126             if (_isV3) {
    127                 Socket sock2 = connect();
     129            if (_isV3 && mode != V1DG && mode != V1RAW) {
     130                Socket sock2 = connect(isSSL);
    128131                eventHandler = new SendEventHandler(_context);
    129132                _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
     
    132135                    _log.debug("Reader2 created");
    133136                out = sock2.getOutputStream();
    134                 String ok = handshake(out, version, false, eventHandler);
     137                String ok = handshake(out, version, false, eventHandler, mode);
    135138                if (ok == null)
    136139                    throw new IOException("2nd handshake failed");
     
    139142            }
    140143            if (ourDest != null) {
    141                 send(out, eventHandler);
     144                send(out, eventHandler, mode);
    142145            }
    143146        } catch (IOException e) {
     
    169172    }
    170173   
    171     private Socket connect() throws IOException {
     174    private Socket connect(boolean isSSL) throws IOException {
    172175        return new Socket(_samHost, Integer.parseInt(_samPort));
    173176    }
    174177   
    175178    /** @return our b64 dest or null */
    176     private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) {
     179    private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode) {
    177180        synchronized (samOut) {
    178181            try {
     
    195198                    _conOptions = "ID=" + _v3ID;
    196199                }
    197                 String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n";
     200                String style;
     201                if (mode == STREAM)
     202                    style = "STREAM";
     203                else if (mode == DG || mode == V1DG)
     204                    style = "DATAGRAM";
     205                else
     206                    style = "RAW";
     207                String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + "\n";
    198208                samOut.write(req.getBytes());
    199209                samOut.flush();
     
    228238    }
    229239   
    230     private void send(OutputStream samOut, SAMEventHandler eventHandler) throws IOException {
    231         Sender sender = new Sender(samOut, eventHandler);
     240    private void send(OutputStream samOut, SAMEventHandler eventHandler, int mode) throws IOException {
     241        Sender sender = new Sender(samOut, eventHandler, mode);
    232242        boolean ok = sender.openConnection();
    233243        if (ok) {
     
    248258        private final OutputStream _samOut;
    249259        private final SAMEventHandler _eventHandler;
     260        private final int _mode;
    250261       
    251         public Sender(OutputStream samOut, SAMEventHandler eventHandler) {
     262        public Sender(OutputStream samOut, SAMEventHandler eventHandler, int mode) {
    252263            _samOut = samOut;
    253264            _eventHandler = eventHandler;
     265            _mode = mode;
    254266            synchronized (_remotePeers) {
    255267                if (_v3ID != null)
     
    274286                _context.statManager().createRateStat("send." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 });
    275287               
    276                 StringBuilder buf = new StringBuilder(1024);
    277                 buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination);
    278                 // not supported until 3.2 but 3.0-3.1 will ignore
    279                 if (_isV3)
    280                     buf.append(" FROM_PORT=1234 TO_PORT=5678");
    281                 buf.append('\n');
    282                 byte[] msg = DataHelper.getASCII(buf.toString());
    283                 synchronized (_samOut) {
    284                     _samOut.write(msg);
    285                     _samOut.flush();
    286                 }
    287                 _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
    288                 boolean ok = _eventHandler.waitForStreamStatusReply();
    289                 if (!ok)
    290                     throw new IOException("STREAM CONNECT failed");
     288                if (_mode == STREAM) {
     289                    StringBuilder buf = new StringBuilder(1024);
     290                    buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination);
     291                    // not supported until 3.2 but 3.0-3.1 will ignore
     292                    if (_isV3)
     293                        buf.append(" FROM_PORT=1234 TO_PORT=5678");
     294                    buf.append('\n');
     295                    byte[] msg = DataHelper.getASCII(buf.toString());
     296                    synchronized (_samOut) {
     297                        _samOut.write(msg);
     298                        _samOut.flush();
     299                    }
     300                    _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS...");
     301                    boolean ok = _eventHandler.waitForStreamStatusReply();
     302                    if (!ok)
     303                        throw new IOException("STREAM CONNECT failed");
     304                }
    291305
    292306                _in = new FileInputStream(_dataFile);
     
    319333            _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
    320334            final long toSend = (new File(_dataFile)).length();
    321             byte data[] = new byte[1024];
     335            byte data[] = new byte[8192];
    322336            long lastSend = _context.clock().now();
    323337            while (!_closed) {
     
    335349                       
    336350                        synchronized (_samOut) {
    337                             if (!_isV3) {
    338                                 byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes();
     351                            if (!_isV3 || _mode == V1DG || _mode == V1RAW) {
     352                                String m;
     353                                if (_mode == STREAM)
     354                                    m = "STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n";
     355                                else if (_mode == V1DG)
     356                                    m = "DATAGRAM SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
     357                                else if (_mode == V1RAW)
     358                                    m = "RAW SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n";
     359                                else
     360                                    throw new IOException("unsupported mode " + _mode);
     361                                byte msg[] = DataHelper.getASCII(m);
    339362                                _samOut.write(msg);
    340363                            }
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    rb9ab933 rbd048b0  
    4141    private final Map<String, Sink> _remotePeers;
    4242   
    43     private static final String USAGE = "Usage: SAMStreamSink [-s] [-d] [-r] [-v version] [-b samHost] [-p samPort] myDestFile sinkDir";
     43    private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4;
     44    private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort] myDestFile sinkDir\n" +
     45                                        "       modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" +
     46                                        "       -s: use SSL";
    4447
    4548    public static void main(String args[]) {
    46         Getopt g = new Getopt("SAM", args, "drsb:p:v:");
     49        Getopt g = new Getopt("SAM", args, "sb:m:p:v:");
    4750        boolean isSSL = false;
    48         int mode = 0; // stream
     51        int mode = STREAM;
    4952        String version = "1.0";
    5053        String host = "127.0.0.1";
     
    5760                break;
    5861
    59             case 'd':
    60                 mode = 1;  // datagram
    61                 break;
    62 
    63             case 'r':
    64                 mode = 2;  // raw
     62            case 'm':
     63                mode = Integer.parseInt(g.getOptarg());
     64                if (mode < 0 || mode > V1RAW) {
     65                    System.err.println(USAGE);
     66                    return;
     67                }
    6568                break;
    6669
     
    9497        SAMStreamSink sink = new SAMStreamSink(ctx, host, port,
    9598                                                    args[startArgs], args[startArgs + 1]);
    96         sink.startup(version);
     99        sink.startup(version, isSSL, mode);
    97100    }
    98101   
     
    109112    }
    110113   
    111     public void startup(String version) {
     114    public void startup(String version, boolean isSSL, int mode) {
    112115        if (_log.shouldLog(Log.DEBUG))
    113116            _log.debug("Starting up");
    114117        try {
    115             Socket sock = connect();
     118            Socket sock = connect(isSSL);
    116119            OutputStream out = sock.getOutputStream();
    117120            SAMEventHandler eventHandler = new SinkEventHandler(_context, out);
     
    120123            if (_log.shouldLog(Log.DEBUG))
    121124                _log.debug("Reader created");
    122             String ourDest = handshake(out, version, true, eventHandler);
     125            String ourDest = handshake(out, version, true, eventHandler, mode);
    123126            if (ourDest == null)
    124127                throw new IOException("handshake failed");
     
    126129                _log.debug("Handshake complete.  we are " + ourDest);
    127130            if (_isV3) {
    128                 Socket sock2 = connect();
     131                Socket sock2 = connect(isSSL);
    129132                out = sock2.getOutputStream();
    130133                eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
     
    133136                if (_log.shouldLog(Log.DEBUG))
    134137                    _log.debug("Reader2 created");
    135                 String ok = handshake(out, version, false, eventHandler);
     138                String ok = handshake(out, version, false, eventHandler, mode);
    136139                if (ok == null)
    137140                    throw new IOException("2nd handshake failed");
     
    286289    }
    287290   
    288     private Socket connect() throws IOException {
     291    private Socket connect(boolean isSSL) throws IOException {
    289292        return new Socket(_samHost, Integer.parseInt(_samPort));
    290293    }
    291294   
    292295    /** @return our b64 dest or null */
    293     private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) {
     296    private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode) {
    294297        synchronized (samOut) {
    295298            try {
     
    355358                    dest = _destFile;
    356359                }
    357                 String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n";
     360                String style;
     361                if (mode == STREAM)
     362                    style = "STREAM";
     363                else if (mode == DG || mode == V1DG)
     364                    style = "DATAGRAM";
     365                else
     366                    style = "RAW";
     367                String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + " " + _conOptions + "\n";
    358368                samOut.write(req.getBytes());
    359369                samOut.flush();
Note: See TracChangeset for help on using the changeset viewer.