Changeset 612e01c


Ignore:
Timestamp:
Nov 25, 2015 8:46:21 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
868e5e9
Parents:
13fd613
Message:

More SAM client cleanup and fixes, beginning of v3 support
v3 unfinished, does not work yet

Location:
apps/sam/java/src/net/i2p/sam/client
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java

    r13fd613 r612e01c  
    88public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListener {
    99    public void destReplyReceived(String publicKey, String privateKey) {}
    10     public void helloReplyReceived(boolean ok) {}
     10    public void helloReplyReceived(boolean ok, String version) {}
    1111    public void namingReplyReceived(String name, String result, String value, String message) {}
    1212    public void sessionStatusReceived(String result, String destination, String message) {}
  • apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java

    r13fd613 r612e01c  
    1616    private final Log _log;
    1717    private Boolean _helloOk;
     18    private String _version;
    1819    private final Object _helloLock = new Object();
    1920    private Boolean _sessionCreateOk;
     
    2829   
    2930        @Override
    30     public void helloReplyReceived(boolean ok) {
     31    public void helloReplyReceived(boolean ok, String version) {
    3132        synchronized (_helloLock) {
    3233            if (ok)
     
    3435            else
    3536                _helloOk = Boolean.FALSE;
     37            _version = version;
    3638            _helloLock.notifyAll();
    3739        }
     
    6264        @Override
    6365    public void unknownMessageReceived(String major, String minor, Properties params) {
    64         _log.error("wrt, [" + major + "] [" + minor + "] [" + params + "]");
     66        _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]");
    6567    }
    6668
     
    7173
    7274    /**
    73      * Wait for the connection to be established, returning true if everything
     75     * Wait for the connection to be established, returning the server version if everything
    7476     * went ok
    75      * @return true if everything ok
     77     * @return SAM server version if everything ok, or null on failure
    7678     */
    77     public boolean waitForHelloReply() {
     79    public String waitForHelloReply() {
    7880        while (true) {
    7981            try {
     
    8284                        _helloLock.wait();
    8385                    else
    84                         return _helloOk.booleanValue();
     86                        return _helloOk.booleanValue() ? _version : null;
    8587                }
    8688            } catch (InterruptedException ie) {}
  • apps/sam/java/src/net/i2p/sam/client/SAMReader.java

    r13fd613 r612e01c  
    7272        public static final String NAMING_REPLY_KEY_NOT_FOUND = "KEY_NOT_FOUND";
    7373       
    74         public void helloReplyReceived(boolean ok);
     74        public void helloReplyReceived(boolean ok, String version);
    7575        public void sessionStatusReceived(String result, String destination, String message);
    7676        public void streamStatusReceived(String result, int id, String message);
     
    160160            if ("REPLY".equals(minor)) {
    161161                String result = params.getProperty("RESULT");
    162                 if ("OK".equals(result))
    163                     _listener.helloReplyReceived(true);
     162                String version= params.getProperty("VERSION");
     163                if ("OK".equals(result) && version != null)
     164                    _listener.helloReplyReceived(true, version);
    164165                else
    165                     _listener.helloReplyReceived(false);
     166                    _listener.helloReplyReceived(false, version);
    166167            } else {
    167168                _listener.unknownMessageReceived(major, minor, params);
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java

    r13fd613 r612e01c  
    11package net.i2p.sam.client;
    22
     3import java.io.File;
    34import java.io.FileInputStream;
    45import java.io.IOException;
     
    1011
    1112import net.i2p.I2PAppContext;
     13import net.i2p.data.Base32;
    1214import net.i2p.data.DataHelper;
    1315import net.i2p.util.I2PAppThread;
    1416import net.i2p.util.Log;
     17import net.i2p.util.VersionComparator;
    1518
    1619/**
     
    2730    private final String _destFile;
    2831    private final String _dataFile;
    29     private final String _conOptions;
     32    private String _conOptions;
    3033    private Socket _samSocket;
    3134    private OutputStream _samOut;
     
    3942    public static void main(String args[]) {
    4043        if (args.length < 4) {
    41             System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile");
     44            System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile [version]");
    4245            return;
    4346        }
    44         I2PAppContext ctx = new I2PAppContext();
     47        I2PAppContext ctx = I2PAppContext.getGlobalContext();
    4548        //String files[] = new String[args.length - 3];
    4649        SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]);
    47         sender.startup();
     50        String version = (args.length >= 5) ? args[4] : "1.0";
     51        sender.startup(version);
    4852    }
    4953   
     
    6165    }
    6266   
    63     public void startup() {
     67    public void startup(String version) {
    6468        if (_log.shouldLog(Log.DEBUG))
    6569            _log.debug("Starting up");
     
    7276            if (_log.shouldLog(Log.DEBUG))
    7377                _log.debug("Reader created");
    74             String ourDest = handshake();
     78            String ourDest = handshake(version);
    7579            if (_log.shouldLog(Log.DEBUG))
    7680                _log.debug("Handshake complete.  we are " + ourDest);
     
    8387    private class SendEventHandler extends SAMEventHandler {
    8488        public SendEventHandler(I2PAppContext ctx) { super(ctx); }
     89
     90        @Override
    8591        public void streamClosedReceived(String result, int id, String message) {
    8692            Sender sender = null;
     
    9399                    _log.debug("Connection " + sender.getConnectionId() + " closed to " + sender.getDestination());
    94100            } else {
    95                 _log.error("wtf, not connected to " + id + " but we were just closed?");
     101                _log.error("not connected to " + id + " but we were just closed?");
    96102            }
    97103        }
     
    110116    }
    111117   
    112     private String handshake() {
     118    private String handshake(String version) {
    113119        synchronized (_samOut) {
    114120            try {
    115                 _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
     121                _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
    116122                _samOut.flush();
    117123                if (_log.shouldLog(Log.DEBUG))
    118124                    _log.debug("Hello sent");
    119                 boolean ok = _eventHandler.waitForHelloReply();
    120                 if (_log.shouldLog(Log.DEBUG))
    121                     _log.debug("Hello reply found: " + ok);
    122                 if (!ok)
    123                     throw new IOException("wtf, hello failed?");
     125                String hisVersion = _eventHandler.waitForHelloReply();
     126                if (_log.shouldLog(Log.DEBUG))
     127                    _log.debug("Hello reply found: " + hisVersion);
     128                if (hisVersion == null)
     129                    throw new IOException("Hello failed");
     130                boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
     131                if (isV3) {
     132                    byte[] id = new byte[5];
     133                    _context.random().nextBytes(id);
     134                    _conOptions = "ID=" + Base32.encode(id);
     135                }
    124136                String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n";
    125137                _samOut.write(req.getBytes());
     
    127139                if (_log.shouldLog(Log.DEBUG))
    128140                    _log.debug("Session create sent");
    129                 ok = _eventHandler.waitForSessionCreateReply();
     141                boolean ok = _eventHandler.waitForSessionCreateReply();
     142                if (!ok)
     143                    throw new IOException("Session create failed");
    130144                if (_log.shouldLog(Log.DEBUG))
    131145                    _log.debug("Session create reply found: " + ok);
     
    223237            _started = _context.clock().now();
    224238            _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0);
     239            final long toSend = (new File(_dataFile)).length();
    225240            byte data[] = new byte[1024];
    226241            long lastSend = _context.clock().now();
     
    250265                } catch (IOException ioe) {
    251266                    _log.error("Error sending", ioe);
     267                    break;
    252268                }
    253269            }
     
    260276                }
    261277            } catch (IOException ioe) {
    262                 _log.error("Error closing", ioe);
     278                _log.info("Error closing", ioe);
    263279            }
    264280           
     
    266282            if (_log.shouldLog(Log.DEBUG))
    267283                _log.debug("Runner exiting");
     284            if (toSend != _totalSent)
     285                _log.error("Only sent " + _totalSent + " of " + toSend + " bytes");
    268286            // stop the reader, since we're only doing this once for testing
    269287            // you wouldn't do this in a real application
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    r13fd613 r612e01c  
    99import java.util.HashMap;
    1010import java.util.Map;
     11import java.util.Properties;
    1112
    1213import net.i2p.I2PAppContext;
     14import net.i2p.data.Base32;
     15import net.i2p.data.DataHelper;
    1316import net.i2p.util.Log;
     17import net.i2p.util.VersionComparator;
    1418
    1519/**
     
    2731    private final String _destFile;
    2832    private final String _sinkDir;
    29     private final String _conOptions;
     33    private String _conOptions;
    3034    private Socket _samSocket;
    3135    private OutputStream _samOut;
     
    3943    public static void main(String args[]) {
    4044        if (args.length < 4) {
    41             System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir");
     45            System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir [version]");
    4246            return;
    4347        }
    44         I2PAppContext ctx = new I2PAppContext();
     48        I2PAppContext ctx = I2PAppContext.getGlobalContext();
    4549        SAMStreamSink sink = new SAMStreamSink(ctx, args[0], args[1], args[2], args[3]);
    46         sink.startup();
     50        String version = (args.length >= 5) ? args[4] : "1.0";
     51        sink.startup(version);
    4752    }
    4853   
     
    6065    }
    6166   
    62     public void startup() {
     67    public void startup(String version) {
    6368        if (_log.shouldLog(Log.DEBUG))
    6469            _log.debug("Starting up");
     
    7176            if (_log.shouldLog(Log.DEBUG))
    7277                _log.debug("Reader created");
    73             String ourDest = handshake();
     78            String ourDest = handshake(version);
    7479            if (_log.shouldLog(Log.DEBUG))
    7580                _log.debug("Handshake complete.  we are " + ourDest);
     
    7782                //boolean written =
    7883                writeDest(ourDest);
    79                 if (_log.shouldLog(Log.DEBUG))
    80                     _log.debug("My destination written to " + _destFile);
     84            } else {
     85                _reader.stopReading();
    8186            }
    8287        }
     
    98103                    _log.debug("Connection " + sink.getConnectionId() + " closed to " + sink.getDestination());
    99104            } else {
    100                 _log.error("wtf, not connected to " + id + " but we were just closed?");
     105                _log.error("not connected to " + id + " but we were just closed?");
    101106            }
    102107        }
     
    111116                sink.received(data, offset, length);
    112117            } else {
    113                 _log.error("wtf, not connected to " + id + " but we received " + length + "?");
     118                _log.error("not connected to " + id + " but we received " + length + "?");
    114119            }
    115120        }
     
    143148    }
    144149   
    145     private String handshake() {
     150    private String handshake(String version) {
    146151        synchronized (_samOut) {
    147152            try {
    148                 _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes());
     153                _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes());
    149154                _samOut.flush();
    150155                if (_log.shouldLog(Log.DEBUG))
    151156                    _log.debug("Hello sent");
    152                 boolean ok = _eventHandler.waitForHelloReply();
    153                 if (_log.shouldLog(Log.DEBUG))
    154                     _log.debug("Hello reply found: " + ok);
    155                 if (!ok)
    156                     throw new IOException("wtf, hello failed?");
    157                 String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + _destFile + " " + _conOptions + "\n";
     157                String hisVersion = _eventHandler.waitForHelloReply();
     158                if (_log.shouldLog(Log.DEBUG))
     159                    _log.debug("Hello reply found: " + hisVersion);
     160                if (hisVersion == null)
     161                    throw new IOException("Hello failed");
     162                boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
     163                String dest;
     164                if (isV3) {
     165                    // we use the filename as the name in sam.keys
     166                    // and read it in ourselves
     167                    File keys = new File("sam.keys");
     168                    if (keys.exists()) {
     169                        Properties opts = new Properties();
     170                        DataHelper.loadProps(opts, keys);
     171                        String s = opts.getProperty(_destFile);
     172                        if (s != null) {
     173                            dest = s;
     174                        } else {
     175                            dest = "TRANSIENT";
     176                            (new File(_destFile)).delete();
     177                            if (_log.shouldLog(Log.DEBUG))
     178                                _log.debug("Requesting new transient destination");
     179                        }
     180                    } else {
     181                        dest = "TRANSIENT";
     182                        (new File(_destFile)).delete();
     183                        if (_log.shouldLog(Log.DEBUG))
     184                            _log.debug("Requesting new transient destination");
     185                    }
     186                    byte[] id = new byte[5];
     187                    _context.random().nextBytes(id);
     188                    _conOptions = "ID=" + Base32.encode(id);
     189                } else {
     190                    // we use the filename as the name in sam.keys
     191                    // and give it to the SAM server
     192                    dest = _destFile;
     193                }
     194                String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n";
    158195                _samOut.write(req.getBytes());
    159196                _samOut.flush();
    160197                if (_log.shouldLog(Log.DEBUG))
    161198                    _log.debug("Session create sent");
    162                 ok = _eventHandler.waitForSessionCreateReply();
     199                boolean ok = _eventHandler.waitForSessionCreateReply();
     200                if (!ok)
     201                    throw new IOException("Session create failed");
    163202                if (_log.shouldLog(Log.DEBUG))
    164203                    _log.debug("Session create reply found: " + ok);
     
    176215                    return null;
    177216                } else {
    178                     _log.info(_destFile + " is located at " + destination);
     217                    if (_log.shouldInfo())
     218                        _log.info(_destFile + " is located at " + destination);
    179219                }
    180220                return destination;
     
    187227   
    188228    private boolean writeDest(String dest) {
     229        File f = new File(_destFile);
     230        if (f.exists()) {
     231            if (_log.shouldLog(Log.DEBUG))
     232                _log.debug("Destination file exists, not overwriting:" + _destFile);
     233            return false;
     234        }
    189235        FileOutputStream fos = null;
    190236        try {
    191             fos = new FileOutputStream(_destFile);
     237            fos = new FileOutputStream(f);
    192238            fos.write(dest.getBytes());
     239            if (_log.shouldLog(Log.DEBUG))
     240                _log.debug("My destination written to " + _destFile);
    193241        } catch (Exception e) {
    194242            _log.error("Error writing to " + _destFile, e);
     
    237285                _out.close();
    238286            } catch (IOException ioe) {
    239                 _log.error("Error closing", ioe);
     287                _log.info("Error closing", ioe);
    240288            }
    241289        }
Note: See TracChangeset for help on using the changeset viewer.