Changeset 807e5bf


Ignore:
Timestamp:
Nov 26, 2015 2:14:17 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
e5f186f6
Parents:
868e5e9
Message:

v3 sink working

File:
1 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    r868e5e9 r807e5bf  
    3232    private final String _sinkDir;
    3333    private String _conOptions;
    34     private SAMReader _reader;
     34    private SAMReader _reader, _reader2;
    3535    private boolean _isV3;
     36    private String _v3ID;
    3637    //private boolean _dead;
    37     private final SAMEventHandler _eventHandler;
    3838    /** Connection id (Integer) to peer (Flooder) */
    3939    private final Map<String, Sink> _remotePeers;
     
    5959        _sinkDir = sinkDir;
    6060        _conOptions = "";
    61         _eventHandler = new SinkEventHandler(_context);
    6261        _remotePeers = new HashMap<String, Sink>();
    6362    }
     
    6867        try {
    6968            Socket sock = connect();
    70             _reader = new SAMReader(_context, sock.getInputStream(), _eventHandler);
     69            SAMEventHandler eventHandler = new SinkEventHandler(_context);
     70            _reader = new SAMReader(_context, sock.getInputStream(), eventHandler);
    7171            _reader.startReading();
    7272            if (_log.shouldLog(Log.DEBUG))
    7373                _log.debug("Reader created");
    7474            OutputStream out = sock.getOutputStream();
    75             String ourDest = handshake(out, version, true);
     75            String ourDest = handshake(out, version, true, eventHandler);
     76            if (ourDest == null)
     77                throw new IOException("handshake failed");
    7678            if (_log.shouldLog(Log.DEBUG))
    7779                _log.debug("Handshake complete.  we are " + ourDest);
    78             if (ourDest != null) {
    79                 //boolean written =
    80                 writeDest(ourDest);
    81             } else {
    82                 _reader.stopReading();
    83             }
     80            if (_isV3) {
     81                Socket sock2 = connect();
     82                eventHandler = new SinkEventHandler2(_context, sock2.getInputStream());
     83                _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
     84                _reader2.startReading();
     85                if (_log.shouldLog(Log.DEBUG))
     86                    _log.debug("Reader2 created");
     87                out = sock2.getOutputStream();
     88                String ok = handshake(out, version, false, eventHandler);
     89                if (ok == null)
     90                    throw new IOException("2nd handshake failed");
     91                if (_log.shouldLog(Log.DEBUG))
     92                    _log.debug("Handshake2 complete.");
     93            }
     94            writeDest(ourDest);
    8495        } catch (IOException e) {
    8596            _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e);
     
    134145        }
    135146    }
     147
     148    private class SinkEventHandler2 extends SinkEventHandler {
     149
     150        private final InputStream _in;
     151
     152        public SinkEventHandler2(I2PAppContext ctx, InputStream in) {
     153            super(ctx);
     154            _in = in;
     155        }
     156
     157        @Override
     158        public void streamStatusReceived(String result, String id, String message) {
     159            if (_log.shouldLog(Log.DEBUG))
     160                _log.debug("got STREAM STATUS, result=" + result);
     161            super.streamStatusReceived(result, id, message);
     162            // with SILENT=true, there's nothing else coming, so fire up the Sink
     163            Sink sink = null;
     164            try {
     165                String dest = "TODO if not silent";
     166                sink = new Sink(_v3ID, dest);
     167                synchronized (_remotePeers) {
     168                    _remotePeers.put(_v3ID, sink);
     169                }
     170            } catch (IOException ioe) {
     171                _log.error("Error creating a new sink", ioe);
     172                try { _in.close(); } catch (IOException ioe2) {}
     173                if (sink != null)
     174                    sink.closed();
     175                return;
     176            }
     177            // inline so the reader doesn't grab the data
     178            try {
     179                boolean gotDest = false;
     180                byte[] dest = new byte[1024];
     181                int dlen = 0;
     182                byte buf[] = new byte[4096];
     183                int len;
     184                while((len = _in.read(buf)) >= 0) {
     185                    if (!gotDest) {
     186                        // eat the dest line
     187                        for (int i = 0; i < len; i++) {
     188                            byte b = buf[i];
     189                            if (b == (byte) '\n') {
     190                                gotDest = true;
     191                                if (_log.shouldInfo()) {
     192                                    try {
     193                                        _log.info("Got incoming accept from: \"" + new String(dest, 0, dlen, "ISO-8859-1") + '"');
     194                                    } catch (IOException uee) {}
     195                                }
     196                                // feed any remaining to the sink
     197                                i++;
     198                                if (i < len)
     199                                    sink.received(buf, i, len - i);
     200                                break;
     201                            } else {
     202                                if (dlen < dest.length) {
     203                                    dest[dlen++] = b;
     204                                } else if (dlen == dest.length) {
     205                                    dlen++;
     206                                    _log.error("first line overflow on accept");
     207                                }
     208                            }
     209                        }
     210                    } else {
     211                        sink.received(buf, 0, len);
     212                    }
     213                }
     214                sink.closed();
     215            } catch (IOException ioe) {
     216                _log.error("Error reading", ioe);
     217            } finally {
     218                try { _in.close(); } catch (IOException ioe) {}
     219            }
     220        }
     221    }
    136222   
    137223    private Socket connect() throws IOException {
     
    140226   
    141227    /** @return our b64 dest or null */
    142     private String handshake(OutputStream samOut, String version, boolean isMaster) {
     228    private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) {
    143229        synchronized (samOut) {
    144230            try {
     
    147233                if (_log.shouldLog(Log.DEBUG))
    148234                    _log.debug("Hello sent");
    149                 String hisVersion = _eventHandler.waitForHelloReply();
     235                String hisVersion = eventHandler.waitForHelloReply();
    150236                if (_log.shouldLog(Log.DEBUG))
    151237                    _log.debug("Hello reply found: " + hisVersion);
    152238                if (hisVersion == null)
    153239                    throw new IOException("Hello failed");
     240                if (!isMaster) {
     241                    // only for v3
     242                    //String req = "STREAM ACCEPT SILENT=true ID=" + _v3ID + "\n";
     243                    String req = "STREAM ACCEPT SILENT=false ID=" + _v3ID + "\n";
     244                    samOut.write(req.getBytes());
     245                    samOut.flush();
     246                    if (_log.shouldLog(Log.DEBUG))
     247                        _log.debug("STREAM ACCEPT sent");
     248                    // docs were wrong, we do not get a STREAM STATUS if SILENT=true
     249                    //boolean ok = eventHandler.waitForStreamStatusReply();
     250                    //if (!ok)
     251                    //    throw new IOException("Stream status failed");
     252                    if (_log.shouldLog(Log.DEBUG))
     253                        _log.debug("got STREAM STATUS, awaiting connetion");
     254                    return "OK";
     255                }
    154256                _isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
    155257                String dest;
     
    179281                        byte[] id = new byte[5];
    180282                        _context.random().nextBytes(id);
    181                         _conOptions = "ID=" + Base32.encode(id);
     283                        _v3ID = Base32.encode(id);
     284                        _conOptions = "ID=" + _v3ID;
    182285                    }
    183286                } else {
     
    191294                if (_log.shouldLog(Log.DEBUG))
    192295                    _log.debug("Session create sent");
    193                 boolean ok = _eventHandler.waitForSessionCreateReply();
     296                boolean ok = eventHandler.waitForSessionCreateReply();
    194297                if (!ok)
    195298                    throw new IOException("Session create failed");
     
    202305                if (_log.shouldLog(Log.DEBUG))
    203306                    _log.debug("Naming lookup sent");
    204                 String destination = _eventHandler.waitForNamingReply("ME");
     307                String destination = eventHandler.waitForNamingReply("ME");
    205308                if (_log.shouldLog(Log.DEBUG))
    206309                    _log.debug("Naming lookup reply found: " + destination);
Note: See TracChangeset for help on using the changeset viewer.