Changeset ae8779e


Ignore:
Timestamp:
Jun 26, 2018 4:47:53 PM (2 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
c826efd3
Parents:
49221ad
Message:

NTCP2: Establishment and data phase

  • Address generation and validation fixes to match proposal changes
  • Fixes for persistence of static s and iv
  • Add methods for keygen and getters for static s and iv
  • Add OutboundNTCP2State for outbound establishment
  • Add support to InboundEstablishState?
  • Add data phase support to NTCPConnection
  • Refactor NTCPConnection for multiple protocols
  • Support concurrent pending outbound messages in NTCPConnection

NTCP1: Cleanups and performance improvements

  • EventPumper? tweaks and logging
  • Eliminate extra data copy in NTCPConnection message sending
  • Remove _meta field in NTCPConnection
  • Locking changes in NTCPConnection and EstablishState? classes
  • Zero out DH keys when done
  • Fix read when buffer position nonzero in NTCPConnection
  • NTCPConnection make more methods package private
  • Do AES decryption in data phase all at once when possible
  • Drop expired outbound messages in NTCPConnection before sending
  • Pass extra data from EstablishState? directly to NTCPConnection to avoid race, remove getExtraBytes() method
  • Remove getException, getError, getFailedBySkew methods and calls from Reader
Location:
router/java/src/net/i2p/router/transport
Files:
1 added
9 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/transport/TransportManager.java

    r49221ad rae8779e  
    3838import static net.i2p.router.transport.Transport.AddressSource.*;
    3939import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
     40import net.i2p.router.transport.crypto.X25519KeyFactory;
    4041import net.i2p.router.transport.ntcp.NTCPTransport;
    4142import net.i2p.router.transport.udp.UDPTransport;
     
    6970    private final UPnPManager _upnpManager;
    7071    private final DHSessionKeyBuilder.PrecalcRunner _dhThread;
     72    private final X25519KeyFactory _xdhThread;
    7173
    7274    /** default true */
     
    7678    /** default true */
    7779    public final static String PROP_ENABLE_UPNP = "i2np.upnp.enable";
     80
     81    private static final String PROP_NTCP2_ENABLE = "i2np.ntcp2.enable";
     82    private static final boolean DEFAULT_NTCP2_ENABLE = false;
    7883
    7984    private static final String PROP_ADVANCED = "routerconsole.advanced";
     
    99104            _upnpManager = null;
    100105        _dhThread = new DHSessionKeyBuilder.PrecalcRunner(context);
     106        boolean enableNTCP2 = isNTCPEnabled(context) &&
     107                              context.getProperty(PROP_NTCP2_ENABLE, DEFAULT_NTCP2_ENABLE);
     108        _xdhThread = enableNTCP2 ? new X25519KeyFactory(context) : null;
    101109    }
    102110
     
    173181        }
    174182        if (isNTCPEnabled(_context)) {
    175             Transport ntcp = new NTCPTransport(_context, _dhThread);
     183            Transport ntcp = new NTCPTransport(_context, _dhThread, _xdhThread);
    176184            addTransport(ntcp);
    177185            initializeAddress(ntcp);
     
    310318        if (_dhThread.getState() == Thread.State.NEW)
    311319            _dhThread.start();
     320        if (_xdhThread != null && _xdhThread.getState() == Thread.State.NEW)
     321            _xdhThread.start();
    312322        // For now, only start UPnP if we have no publicly-routable addresses
    313323        // so we don't open the listener ports to the world.
     
    720730            } else if (unreachableTransports >= _transports.size() && countActivePeers() > 0) {
    721731                // Don't banlist if we aren't talking to anybody, as we may have a network connection issue
     732                // TODO if we are IPv6 only, ban for longer
    722733                boolean incompat = false;
    723734                RouterInfo us = _context.router().getRouterInfo();
  • router/java/src/net/i2p/router/transport/ntcp/EstablishBase.java

    r49221ad rae8779e  
    1010
    1111/**
     12 * Inbound NTCP 1 or 2. Outbound NTCP 1 only.
     13 * OutboundNTCP2State does not extend this.
     14 *
     15 * NTCP 1 establishement overview:
     16 *
    1217 * Handle the 4-phase establishment, which is as follows:
    1318 *
     
    3439 *    H(): 32 byte SHA256 Hash
    3540 *    E(data, session key, IV): AES256 Encrypt
    36  *    S(): 40 byte DSA Signature
     41 *    S(): 40 byte DSA Signature, or length as implied by sig type
    3742 *    tsA, tsB: timestamps (4 bytes, seconds since epoch)
    3843 *    sk: 32 byte Session key
     
    8691    /** bytes received so far */
    8792    protected int _received;
    88     private byte _extra[];
    8993
    9094    protected final DHSessionKeyBuilder _dh;
     
    9296    protected final NTCPTransport _transport;
    9397    protected final NTCPConnection _con;
    94     /** error causing the corruption */
    95     private String _err;
    96     /** exception causing the error */
    97     private Exception _e;
    98     private boolean _failedBySkew;
    9998   
    10099    protected static final int MIN_RI_SIZE = 387;
     
    133132        /** got 1, sent 2, got 3 */
    134133        IB_GOT_RI,
     134
     135        /**
     136         * Next state IB_NTCP2_GOT_X
     137         * @since 0.9.36
     138         */
     139        IB_NTCP2_INIT,
     140        /**
     141         * Got Noise part of msg 1
     142         * Next state IB_NTCP2_GOT_PADDING or IB_NTCP2_READ_RANDOM on fail
     143         * @since 0.9.36
     144         */
     145        IB_NTCP2_GOT_X,
     146        /**
     147         * Got msg 1 incl. padding
     148         * Next state IB_NTCP2_SENT_Y
     149         * @since 0.9.36
     150         */
     151        IB_NTCP2_GOT_PADDING,
     152        /**
     153         * Sent msg 2 and padding
     154         * Next state IB_NTCP2_GOT_RI
     155         * @since 0.9.36
     156         */
     157        IB_NTCP2_SENT_Y,
     158        /**
     159         * Got msg 3
     160         * Next state VERIFIED
     161         * @since 0.9.36
     162         */
     163        IB_NTCP2_GOT_RI,
     164        /**
     165         * Got msg 1 and failed AEAD
     166         * Next state CORRUPT
     167         * @since 0.9.36
     168         */
     169        IB_NTCP2_READ_RANDOM,
    135170
    136171        /** OB: got and verified 4; IB: got and verified 3 and sent 4 */
     
    179214
    180215    /**
    181      * parse the contents of the buffer as part of the handshake.  if the
    182      * handshake is completed and there is more data remaining, the data are
    183      * copieed out so that the next read will be the (still encrypted) remaining
    184      * data (available from getExtraBytes)
     216     * Parse the contents of the buffer as part of the handshake.
    185217     *
    186218     * All data must be copied out of the buffer as Reader.processRead()
    187219     * will return it to the pool.
     220     *
     221     * If there are additional data in the buffer after the handshake is complete,
     222     * the EstablishState is responsible for passing it to NTCPConnection.
    188223     */
    189224    public synchronized void receive(ByteBuffer src) {
     
    202237     */
    203238    public void prepareOutbound() {}
    204 
    205     /**
    206      *  Was this connection failed because of clock skew?
    207      */
    208     public synchronized boolean getFailedBySkew() { return _failedBySkew; }
    209239
    210240    /** did the handshake fail for some reason? */
     
    234264     */
    235265    public abstract int getVersion();
    236 
    237     /** Anything left over in the byte buffer after verification is extra
    238      *
    239      *  All data must be copied out of the buffer as Reader.processRead()
    240      *  will return it to the pool.
    241      *
    242      *  State must be VERIFIED.
    243      *  Caller must synch.
    244      */
    245     protected void prepareExtra(ByteBuffer buf) {
    246         int remaining = buf.remaining();
    247         if (remaining > 0) {
    248             _extra = new byte[remaining];
    249             buf.get(_extra);
    250             _received += remaining;
    251         }
    252         if (_log.shouldLog(Log.DEBUG))
    253             _log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")");
    254     }
    255 
    256     /**
    257      * if complete, this will contain any bytes received as part of the
    258      * handshake that were after the actual handshake.  This may return null.
    259      */
    260     public synchronized byte[] getExtraBytes() { return _extra; }
    261266
    262267    /**
     
    282287            changeState(State.CORRUPT);
    283288        }
    284         _failedBySkew = bySkew;
    285         _err = reason;
    286         _e = e;
    287289        if (_log.shouldLog(Log.WARN))
    288             _log.warn(prefix()+"Failed to establish: " + _err, e);
     290            _log.warn(prefix() + "Failed to establish: " + reason, e);
     291        if (!bySkew)
     292            _context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
    289293        releaseBufs(false);
     294        // con.close()?
    290295    }
    291296
     
    304309    }
    305310
    306     public synchronized String getError() { return _err; }
    307 
    308     public synchronized Exception getException() { return _e; }
    309    
    310311    /**
    311312     *  XOR a into b. Modifies b. a is unmodified.
     
    329330        else
    330331            buf.append("OBES ");
    331         buf.append(System.identityHashCode(this));
     332        buf.append(_con.toString());
    332333        buf.append(' ').append(_state);
    333334        if (_con.isEstablished()) buf.append(" established");
     
    348349        public int getVersion() { return 1; }
    349350
     351        /*
     352         * @throws IllegalStateException always
     353         */
     354        @Override
     355        public void receive(ByteBuffer src) {
     356            throw new IllegalStateException("receive() " + src.remaining() + " on verified state, doing nothing!");
     357        }
     358
     359        /*
     360         * @throws IllegalStateException always
     361         */
    350362        @Override
    351363        public void prepareOutbound() {
    352             Log log = RouterContext.getCurrentContext().logManager().getLog(VerifiedEstablishState.class);
    353             log.warn("prepareOutbound() on verified state, doing nothing!");
     364            throw new IllegalStateException("prepareOutbound() on verified state, doing nothing!");
    354365        }
    355366
     
    370381        public int getVersion() { return 1; }
    371382
     383        /*
     384         * @throws IllegalStateException always
     385         */
     386        @Override
     387        public void receive(ByteBuffer src) {
     388            throw new IllegalStateException("receive() " + src.remaining() + " on failed state, doing nothing!");
     389        }
     390
     391        /*
     392         * @throws IllegalStateException always
     393         */
    372394        @Override
    373395        public void prepareOutbound() {
    374             Log log = RouterContext.getCurrentContext().logManager().getLog(VerifiedEstablishState.class);
    375             log.warn("prepareOutbound() on verified state, doing nothing!");
     396            throw new IllegalStateException("prepareOutbound() on failed state, doing nothing!");
    376397        }
    377398
  • router/java/src/net/i2p/router/transport/ntcp/EstablishState.java

    r49221ad rae8779e  
    1010   
    1111    /**
    12      * parse the contents of the buffer as part of the handshake.  if the
    13      * handshake is completed and there is more data remaining, the data are
    14      * copieed out so that the next read will be the (still encrypted) remaining
    15      * data (available from getExtraBytes)
     12     * Parse the contents of the buffer as part of the handshake.
    1613     *
    1714     * All data must be copied out of the buffer as Reader.processRead()
    1815     * will return it to the pool.
     16     *
     17     * If there are additional data in the buffer after the handshake is complete,
     18     * the EstablishState is responsible for passing it to NTCPConnection.
     19     *
     20     * @throws IllegalStateException
    1921     */
    2022    public void receive(ByteBuffer src);
     
    2426     * We are establishing an outbound connection, so prepare ourselves by
    2527     * queueing up the write of the first part of the handshake
     28     *
     29     * @throws IllegalStateException
    2630     */
    2731    public void prepareOutbound();
    28 
    29     /**
    30      *  Was this connection failed because of clock skew?
    31      */
    32     public boolean getFailedBySkew();
    3332
    3433    /** did the handshake fail for some reason? */
     
    4342     */
    4443    public boolean isComplete();
    45 
    46     /**
    47      * if complete, this will contain any bytes received as part of the
    48      * handshake that were after the actual handshake.  This may return null.
    49      */
    50     public byte[] getExtraBytes();
    5144
    5245    /**
     
    6457    public void close(String reason, Exception e);
    6558
    66     public String getError();
    67 
    68     public Exception getException();
    6959}
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    r49221ad rae8779e  
    284284                                    // we haven't sent or received anything in a really long time, so lets just close 'er up
    285285                                    con.close();
     286                                    if (_log.shouldInfo())
     287                                        _log.info("Failsafe or expire close for " + con);
    286288                                    failsafeCloses++;
    287289                                }
     
    301303                } else {
    302304                    // another 100% CPU workaround
     305                    // TODO remove or only if we appear to be looping with no interest ops
    303306                    if ((loopCount % 512) == 511) {
    304307                        if (_log.shouldLog(Log.INFO))
     
    550553
    551554            SelectionKey ckey = chan.register(_selector, SelectionKey.OP_READ);
    552             new NTCPConnection(_context, _transport, chan, ckey);
     555            NTCPConnection con = new NTCPConnection(_context, _transport, chan, ckey);
     556            ckey.attach(con);
     557            _transport.establishing(con);
    553558        } catch (IOException ioe) {
    554559            _log.error("Error accepting", ioe);
     
    566571                if (shouldSetKeepAlive(chan))
    567572                    chan.socket().setKeepAlive(true);
     573                // key was already set when the channel was created, why do it again here?
    568574                con.setKey(key);
    569575                con.outboundConnected();
     
    620626                        count = _blockedIPs.increment(ba);
    621627                        if (_log.shouldLog(Log.WARN))
    622                             _log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
     628                            _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
    623629                    } else {
    624630                        count = 1;
     
    683689                    count = _blockedIPs.increment(ba);
    684690                    if (_log.shouldLog(Log.WARN))
    685                         _log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
     691                        _log.warn("Blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con, ioe);
    686692                } else {
    687693                    count = 1;
    688694                    if (_log.shouldLog(Log.WARN))
    689                         _log.warn("IOE on inbound before receiving any: " + con);
     695                        _log.warn("IOE on inbound before receiving any: " + con, ioe);
    690696                }
    691697                _context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
  • router/java/src/net/i2p/router/transport/ntcp/InboundEstablishState.java

    r49221ad rae8779e  
    66import java.net.InetAddress;
    77import java.nio.ByteBuffer;
     8import java.security.GeneralSecurityException;
     9import java.util.Arrays;
     10import java.util.EnumSet;
     11import java.util.List;
     12import java.util.Set;
     13
     14import com.southernstorm.noise.protocol.CipherState;
     15import com.southernstorm.noise.protocol.CipherStatePair;
     16import com.southernstorm.noise.protocol.HandshakeState;
    817
    918import net.i2p.crypto.SigType;
    1019import net.i2p.data.Base64;
     20import net.i2p.data.ByteArray;
    1121import net.i2p.data.DataFormatException;
    1222import net.i2p.data.DataHelper;
    1323import net.i2p.data.Hash;
     24import net.i2p.data.SessionKey;
     25import net.i2p.data.Signature;
     26import net.i2p.data.i2np.I2NPMessage;
     27import net.i2p.data.i2np.I2NPMessageException;
     28import net.i2p.data.router.RouterAddress;
    1429import net.i2p.data.router.RouterIdentity;
    15 import net.i2p.data.Signature;
     30import net.i2p.data.router.RouterInfo;
    1631import net.i2p.router.Router;
    1732import net.i2p.router.RouterContext;
     33import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
    1834import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
     35import static net.i2p.router.transport.ntcp.OutboundNTCP2State.*;
     36import net.i2p.util.ByteCache;
    1937import net.i2p.util.Log;
    2038import net.i2p.util.SimpleByteCache;
     
    2644 *  @since 0.9.35 pulled out of EstablishState
    2745 */
    28 class InboundEstablishState extends EstablishBase {
     46class InboundEstablishState extends EstablishBase implements NTCP2Payload.PayloadCallback {
    2947
    3048    /** current encrypted block we are reading (IB only) or an IV buf used at the end for OB */
     
    4058    private int _sz_aliceIdent_tsA_padding_aliceSigSize;
    4159
     60    //// NTCP2 things
     61
     62    private HandshakeState _handshakeState;
     63    private int _padlen1;
     64    private int _msg3p2len;
     65    private int _msg3p2FailReason = -1;
     66    private ByteArray _msg3tmp;
     67    private NTCP2Options _hisPadding;
     68
     69    // same as I2PTunnelRunner
     70    private static final int BUFFER_SIZE = 4*1024;
     71    private static final int MAX_DATA_READ_BUFS = 32;
     72    private static final ByteCache _dataReadBufs = ByteCache.getInstance(MAX_DATA_READ_BUFS, BUFFER_SIZE);
     73
    4274    private static final int NTCP1_MSG1_SIZE = XY_SIZE + HXY_SIZE;
     75    // 287 - 64 = 223
     76    private static final int PADDING1_MAX = TOTAL1_MAX - MSG1_SIZE;
     77    private static final int PADDING1_FAIL_MAX = 128;
     78    private static final int PADDING2_MAX = 64;
     79    // DSA RI, no options, no addresses
     80    private static final int RI_MIN = 387 + 8 + 1 + 1 + 2 + 40;
     81    private static final int MSG3P2_MIN = 1 + 2 + 1 + RI_MIN + MAC_SIZE;
     82    // absolute max, let's enforce less
     83    //private static final int MSG3P2_MAX = BUFFER_SIZE - MSG3P1_SIZE;
     84    private static final int MSG3P2_MAX = 6000;
     85
     86    private static final Set<State> STATES_NTCP2 =
     87        EnumSet.of(State.IB_NTCP2_INIT, State.IB_NTCP2_GOT_X, State.IB_NTCP2_GOT_PADDING,
     88                   State.IB_NTCP2_SENT_Y, State.IB_NTCP2_GOT_RI, State.IB_NTCP2_READ_RANDOM);
     89
    4390   
    4491    public InboundEstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
     
    5198
    5299    /**
    53      * parse the contents of the buffer as part of the handshake.  if the
    54      * handshake is completed and there is more data remaining, the data are
    55      * copieed out so that the next read will be the (still encrypted) remaining
    56      * data (available from getExtraBytes)
     100     * Parse the contents of the buffer as part of the handshake.
    57101     *
    58102     * All data must be copied out of the buffer as Reader.processRead()
    59103     * will return it to the pool.
     104     *
     105     * If there are additional data in the buffer after the handshake is complete,
     106     * the EstablishState is responsible for passing it to NTCPConnection.
    60107     */
    61108    @Override
     
    78125            if (_state == State.IB_INIT)
    79126                return 0;
    80             // TODO NTCP2 states
     127            if (STATES_NTCP2.contains(_state))
     128                return 2;
    81129            return 1;
    82130        }
     
    92140     *  Caller must synch.
    93141     *
    94      *  FIXME none of the _state comparisons use _stateLock, but whole thing
    95      *  is synchronized, should be OK. See isComplete()
    96142     */
    97143    private void receiveInbound(ByteBuffer src) {
     144        if (STATES_NTCP2.contains(_state)) {
     145            receiveInboundNTCP2(src);
     146            return;
     147        }
     148        // TODO if less than 64, buffer and decide later?
    98149        if (_state == State.IB_INIT && src.hasRemaining()) {
    99150            int remaining = src.remaining();
    100             //if (remaining < NTCP1_MSG1_SIZE && _transport.isNTCP2Enabled()) {
    101             //    // NTCP2
    102             //}
     151            if (remaining < NTCP1_MSG1_SIZE && _transport.isNTCP2Enabled()) {
     152                // NTCP2
     153                // TODO can't change our mind later if we get more than 287
     154                _con.setVersion(2);
     155                changeState(State.IB_NTCP2_INIT);
     156                receiveInboundNTCP2(src);
     157                // releaseBufs() will return the unused DH
     158                return;
     159            }
    103160            int toGet = Math.min(remaining, XY_SIZE - _received);
    104161            src.get(_X, _received, toGet);
     
    189246                    fail("Invalid X", e);
    190247                    return;
     248                } catch (IllegalStateException ise) {
     249                    // setPeerPublicValue()
     250                    fail("reused keys?", ise);
     251                    return;
    191252                }
    192253
     
    282343                            if (_log.shouldLog(Log.DEBUG))
    283344                                _log.debug(prefix() + "got the sig");
    284                             verifyInbound();
    285                             if (_state == State.VERIFIED && src.hasRemaining())
    286                                 prepareExtra(src);
     345                            verifyInbound(src);
    287346                            if (_log.shouldLog(Log.DEBUG))
    288347                                _log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size()
     
    292351                            return;
    293352                    }
    294                 } else {
    295353                }
     354        }
     355
     356        // check for remaining data
     357        if ((_state == State.VERIFIED || _state == State.CORRUPT) && src.hasRemaining()) {
     358            if (_log.shouldWarn())
     359                _log.warn("Received unexpected " + src.remaining() + " on " + this, new Exception());
    296360        }
    297361
     
    344408    /**
    345409     * We are Bob. Verify message #3 from Alice, then send message #4 to Alice.
     410     * NTCP 1 only.
    346411     *
    347412     * _aliceIdentSize and _aliceIdent must be set.
     
    359424     *
    360425     *  State must be IB_GOT_RI.
     426     *  This will always change the state to VERIFIED or CORRUPT.
    361427     *  Caller must synch.
    362      */
    363     private void verifyInbound() {
     428     *
     429     *  @param buf possibly containing "extra" data for data phase
     430     */
     431    private void verifyInbound(ByteBuffer buf) {
    364432        byte b[] = _sz_aliceIdent_tsA_padding_aliceSig.toByteArray();
    365433        try {
     
    394462            Signature sig = new Signature(type, s);
    395463            boolean ok = _context.dsa().verifySignature(sig, toVerify, _aliceIdent.getSigningPublicKey());
     464            Hash aliceHash = _aliceIdent.calculateHash();
    396465            if (ok) {
    397                 // get inet-addr
    398                 InetAddress addr = this._con.getChannel().socket().getInetAddress();
    399                 byte[] ip = (addr == null) ? null : addr.getAddress();
    400                 if (_context.banlist().isBanlistedForever(_aliceIdent.calculateHash())) {
    401                     if (_log.shouldLog(Log.WARN))
    402                         _log.warn("Dropping inbound connection from permanently banlisted peer: " + _aliceIdent.calculateHash());
    403                     // So next time we will not accept the con from this IP,
    404                     // rather than doing the whole handshake
    405                     if(ip != null)
    406                        _context.blocklist().add(ip);
    407                     fail("Peer is banlisted forever: " + _aliceIdent.calculateHash());
    408                     return;
    409                 }
    410                 if(ip != null)
    411                    _transport.setIP(_aliceIdent.calculateHash(), ip);
    412                 if (_log.shouldLog(Log.DEBUG))
    413                     _log.debug(prefix() + "verification successful for " + _con);
    414 
    415                 long diff = 1000*Math.abs(_peerSkew);
    416                 if (!_context.clock().getUpdatedSuccessfully()) {
    417                     // Adjust the clock one time in desperation
    418                     // This isn't very likely, outbound will do it first
    419                     // We are Bob, she is Alice, adjust to match Alice
    420                     _context.clock().setOffset(1000 * (0 - _peerSkew), true);
    421                     _peerSkew = 0;
    422                     if (diff != 0)
    423                         _log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
    424                 } else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
    425                     _context.statManager().addRateData("ntcp.invalidInboundSkew", diff);
    426                     _transport.markReachable(_aliceIdent.calculateHash(), true);
    427                     // Only banlist if we know what time it is
    428                     _context.banlist().banlistRouter(DataHelper.formatDuration(diff),
    429                                                        _aliceIdent.calculateHash(),
    430                                                        _x("Excessive clock skew: {0}"));
    431                     _transport.setLastBadSkew(_peerSkew);
    432                     fail("Clocks too skewed (" + diff + " ms)", null, true);
    433                     return;
    434                 } else if (_log.shouldLog(Log.DEBUG)) {
    435                     _log.debug(prefix()+"Clock skew: " + diff + " ms");
    436                 }
    437 
     466                ok = verifyInbound(aliceHash);
     467            }
     468            if (ok) {
    438469                _con.setRemotePeer(_aliceIdent);
    439                 sendInboundConfirm(_aliceIdent, tsA);
     470                sendInboundConfirm(aliceHash, tsA);
    440471                if (_log.shouldLog(Log.DEBUG))
    441472                    _log.debug(prefix()+"e_bobSig is " + _e_bobSig.length + " bytes long");
     
    444475                // this does not copy the IV, do not release to cache
    445476                // We are Bob, she is Alice, clock skew is Alice-Bob
    446                 _con.finishInboundEstablishment(_dh.getSessionKey(), _peerSkew, iv, _prevEncrypted); // skew in seconds
     477                // skew in seconds
     478                _con.finishInboundEstablishment(_dh.getSessionKey(), _peerSkew, iv, _prevEncrypted);
     479                changeState(State.VERIFIED);
     480                if (buf.hasRemaining()) {
     481                    // process "extra" data
     482                    // This is unlikely for inbound, as we must reply with message 4
     483                    if (_log.shouldWarn())
     484                        _log.warn("extra data " + buf.remaining() + " on " + this);
     485                     _con.recvEncryptedI2NP(buf);
     486                }
    447487                releaseBufs(true);
    448488                if (_log.shouldLog(Log.INFO))
    449                     _log.info(prefix()+"Verified remote peer as " + _aliceIdent.calculateHash());
    450                 changeState(State.VERIFIED);
     489                    _log.info(prefix()+"Verified remote peer as " + aliceHash);
    451490            } else {
    452491                _context.statManager().addRateData("ntcp.invalidInboundSignature", 1);
    453                 fail("Peer verification failed - spoof of " + _aliceIdent.calculateHash() + "?");
     492                // verifyInbound(aliceHash) called fail()
    454493            }
    455494        } catch (IOException ioe) {
     
    460499
    461500    /**
     501     *  Common validation things for both NTCP 1 and 2.
     502     *  Call after receiving Alice's RouterIdentity (in message 3).
     503     *  _peerSkew must be set.
     504     *
     505     *  Side effect: sets _msg3p2FailReason when returning false
     506     *
     507     *  @return success or calls fail() and returns false
     508     *  @since 0.9.36 pulled out of verifyInbound()
     509     */
     510    private boolean verifyInbound(Hash aliceHash) {
     511        // get inet-addr
     512        InetAddress addr = this._con.getChannel().socket().getInetAddress();
     513        byte[] ip = (addr == null) ? null : addr.getAddress();
     514        if (_context.banlist().isBanlistedForever(aliceHash)) {
     515            if (_log.shouldLog(Log.WARN))
     516                _log.warn("Dropping inbound connection from permanently banlisted peer: " + aliceHash);
     517            // So next time we will not accept the con from this IP,
     518            // rather than doing the whole handshake
     519            if(ip != null)
     520               _context.blocklist().add(ip);
     521            fail("Peer is banlisted forever: " + aliceHash);
     522            _msg3p2FailReason = NTCPConnection.REASON_BANNED;
     523            return false;
     524        }
     525        if(ip != null)
     526           _transport.setIP(aliceHash, ip);
     527        if (_log.shouldLog(Log.DEBUG))
     528            _log.debug(prefix() + "verification successful for " + _con);
     529
     530        long diff = 1000*Math.abs(_peerSkew);
     531        if (!_context.clock().getUpdatedSuccessfully()) {
     532            // Adjust the clock one time in desperation
     533            // This isn't very likely, outbound will do it first
     534            // We are Bob, she is Alice, adjust to match Alice
     535            _context.clock().setOffset(1000 * (0 - _peerSkew), true);
     536            _peerSkew = 0;
     537            if (diff != 0)
     538                _log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
     539        } else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
     540            _context.statManager().addRateData("ntcp.invalidInboundSkew", diff);
     541            _transport.markReachable(aliceHash, true);
     542            // Only banlist if we know what time it is
     543            _context.banlist().banlistRouter(DataHelper.formatDuration(diff),
     544                                             aliceHash,
     545                                               _x("Excessive clock skew: {0}"));
     546            _transport.setLastBadSkew(_peerSkew);
     547            fail("Clocks too skewed (" + diff + " ms)", null, true);
     548            _msg3p2FailReason = NTCPConnection.REASON_SKEW;
     549            return false;
     550        } else if (_log.shouldLog(Log.DEBUG)) {
     551            _log.debug(prefix()+"Clock skew: " + diff + " ms");
     552        }
     553        return true;
     554    }
     555
     556    /**
    462557     *  We are Bob. Send message #4 to Alice.
    463558     *
    464559     *  State must be VERIFIED.
    465560     *  Caller must synch.
    466      */
    467     private void sendInboundConfirm(RouterIdentity alice, long tsA) {
     561     *
     562     *  @param h Alice's Hash
     563     */
     564    private void sendInboundConfirm(Hash h, long tsA) {
    468565        // send Alice E(S(X+Y+Alice.identHash+tsA+tsB), sk, prev)
    469566        byte toSign[] = new byte[XY_SIZE + XY_SIZE + 32+4+4];
     
    471568        System.arraycopy(_X, 0, toSign, off, XY_SIZE); off += XY_SIZE;
    472569        System.arraycopy(_Y, 0, toSign, off, XY_SIZE); off += XY_SIZE;
    473         Hash h = alice.calculateHash();
    474570        System.arraycopy(h.getData(), 0, toSign, off, 32); off += 32;
    475571        DataHelper.toLong(toSign, off, 4, tsA); off += 4;
     
    497593    }
    498594
     595    //// NTCP2 below here
     596
     597    /**
     598     *  NTCP2 only. State must be one of IB_NTCP2_*
     599     *
     600     *  we are Bob, so receive these bytes as part of an inbound connection
     601     *  This method receives messages 1 and 3, and sends message 2.
     602     *
     603     *  All data must be copied out of the buffer as Reader.processRead()
     604     *  will return it to the pool.
     605     *
     606     *  @since 0.9.36
     607     */
     608    private synchronized void receiveInboundNTCP2(ByteBuffer src) {
     609        if (_state == State.IB_NTCP2_INIT && src.hasRemaining()) {
     610            // use _X for the buffer
     611            int toGet = Math.min(src.remaining(), MSG1_SIZE - _received);
     612            src.get(_X, _received, toGet);
     613            _received += toGet;
     614            if (_received < MSG1_SIZE) {
     615                // TODO if we got less than 64 should we even be here?
     616                if (_log.shouldWarn())
     617                    _log.warn("Short buffer got " + toGet + " total now " + _received);
     618                return;
     619            }
     620            changeState(State.IB_NTCP2_GOT_X);
     621            _received = 0;
     622
     623            // replay check using encrypted key
     624            if (!_transport.isHXHIValid(_X)) {
     625                _context.statManager().addRateData("ntcp.replayHXxorBIH", 1);
     626                fail("Replay msg 1, eX = " + Base64.encode(_X, 0, KEY_SIZE));
     627                return;
     628            }
     629
     630            try {
     631                _handshakeState = new HandshakeState(HandshakeState.RESPONDER, _transport.getXDHFactory());
     632            } catch (GeneralSecurityException gse) {
     633                throw new IllegalStateException("bad proto", gse);
     634            }
     635            _handshakeState.getLocalKeyPair().setPublicKey(_transport.getNTCP2StaticPubkey(), 0);
     636            _handshakeState.getLocalKeyPair().setPrivateKey(_transport.getNTCP2StaticPrivkey(), 0);
     637            Hash h = _context.routerHash();
     638            SessionKey bobHash = new SessionKey(h.getData());
     639            // save encrypted data for CBC for msg 2
     640            System.arraycopy(_X, KEY_SIZE - IV_SIZE, _prevEncrypted, 0, IV_SIZE);
     641            _context.aes().decrypt(_X, 0, _X, 0, bobHash, _transport.getNTCP2StaticIV(), KEY_SIZE);
     642            if (DataHelper.eqCT(_X, 0, ZEROKEY, 0, KEY_SIZE)) {
     643                fail("Bad msg 1, X = 0");
     644                return;
     645            }
     646            byte options[] = new byte[OPTIONS1_SIZE];
     647            try {
     648                _handshakeState.start();
     649                if (_log.shouldWarn())
     650                    _log.warn("After start: " + _handshakeState.toString());
     651                _handshakeState.readMessage(_X, 0, MSG1_SIZE, options, 0);
     652            } catch (GeneralSecurityException gse) {
     653                // Read a random number of bytes, store wanted in _padlen1
     654                _padlen1 = _context.random().nextInt(PADDING1_FAIL_MAX) - src.remaining();
     655                if (_padlen1 > 0) {
     656                    // delayed fail for probing resistance
     657                    // need more bytes before failure
     658                    if (_log.shouldWarn())
     659                        _log.warn("Bad msg 1, X = " + Base64.encode(_X, 0, KEY_SIZE) + " with " + src.remaining() +
     660                                  " more bytes, waiting for " + _padlen1 + " more bytes", gse);
     661                    changeState(State.IB_NTCP2_READ_RANDOM);
     662                } else {
     663                    // got all we need, fail now
     664                    fail("Bad msg 1, X = " + Base64.encode(_X, 0, KEY_SIZE) + " remaining = " + src.remaining(), gse);
     665                }
     666                return;
     667            } catch (RuntimeException re) {
     668                fail("Bad msg 1, X = " + Base64.encode(_X, 0, KEY_SIZE), re);
     669                return;
     670            }
     671            if (_log.shouldWarn())
     672                _log.warn("After msg 1: " + _handshakeState.toString());
     673            int v = options[1] & 0xff;
     674            if (v != NTCPTransport.NTCP2_INT_VERSION) {
     675                fail("Bad version: " + v);
     676                return;
     677            }
     678            _padlen1 = (int) DataHelper.fromLong(options, 2, 2);
     679            _msg3p2len = (int) DataHelper.fromLong(options, 4, 2);
     680            long tsA = DataHelper.fromLong(options, 8, 4);
     681            long now = _context.clock().now();
     682            // In NTCP1, timestamp comes in msg 3 so we know the RTT.
     683            // In NTCP2, it comes in msg 1, so just guess.
     684            // We could defer this to msg 3 to calculate the RTT?
     685            long rtt = 250;
     686            _peerSkew = (now - (tsA * 1000) - (rtt / 2) + 500) / 1000;
     687            if ((_peerSkew > MAX_SKEW || _peerSkew < 0 - MAX_SKEW) &&
     688                !_context.clock().getUpdatedSuccessfully()) {
     689                // If not updated successfully, allow it.
     690                // This isn't very likely, outbound will do it first
     691                // See verifyInbound() above.
     692                fail("Clock Skew: " + _peerSkew, null, true);
     693                return;
     694            }
     695            if (_padlen1 > PADDING1_MAX) {
     696                fail("bad msg 1 padlen: " + _padlen1);
     697                return;
     698            }
     699            if (_msg3p2len < MSG3P2_MIN || _msg3p2len > MSG3P2_MAX) {
     700                fail("bad msg3p2 len: " + _msg3p2len);
     701                return;
     702            }
     703            if (_padlen1 <= 0) {
     704                // No padding specified, go straight to sending msg 2
     705                changeState(State.IB_NTCP2_GOT_PADDING);
     706                if (src.hasRemaining()) {
     707                    // Inbound conn can never have extra data after msg 1
     708                    fail("Extra data after msg 1: " + src.remaining());
     709                } else {
     710                    // write msg 2
     711                    prepareOutbound2();
     712                }
     713                return;
     714            }
     715        }
     716
     717        // delayed fail for probing resistance
     718        if (_state == State.IB_NTCP2_READ_RANDOM && src.hasRemaining()) {
     719            // read more bytes before failing
     720            _received += src.remaining();
     721            if (_received < _padlen1) {
     722                if (_log.shouldWarn())
     723                    _log.warn("Bad msg 1, got " + src.remaining() +
     724                              " more bytes, waiting for " + (_padlen1 - _received) + " more bytes");
     725            } else {
     726                fail("Bad msg 1, failing after getting " + src.remaining() + " more bytes");
     727            }
     728            return;
     729        }
     730
     731        if (_state == State.IB_NTCP2_GOT_X && src.hasRemaining()) {
     732            // skip this if _padlen1 == 0;
     733            // use _X for the buffer
     734            int toGet = Math.min(src.remaining(), _padlen1 - _received);
     735            src.get(_X, _received, toGet);
     736            _received += toGet;
     737            if (_received < _padlen1)
     738                return;
     739            changeState(State.IB_NTCP2_GOT_PADDING);
     740            _handshakeState.mixHash(_X, 0, _padlen1);
     741            if (_log.shouldWarn())
     742                _log.warn("After mixhash padding " + _padlen1 + " msg 1: " + _handshakeState.toString());
     743            _received = 0;
     744            if (src.hasRemaining()) {
     745                // Inbound conn can never have extra data after msg 1
     746                fail("Extra data after msg 1: " + src.remaining());
     747            } else {
     748                // write msg 2
     749                prepareOutbound2();
     750            }
     751            return;
     752        }
     753
     754        if (_state == State.IB_NTCP2_SENT_Y && src.hasRemaining()) {
     755            int msg3tot = MSG3P1_SIZE + _msg3p2len;
     756            if (_msg3tmp == null)
     757                _msg3tmp = _dataReadBufs.acquire();
     758            // use _X for the buffer FIXME too small
     759            byte[] tmp = _msg3tmp.getData();
     760            int toGet = Math.min(src.remaining(), msg3tot - _received);
     761            src.get(tmp, _received, toGet);
     762            _received += toGet;
     763            if (_received < msg3tot)
     764                return;
     765            changeState(State.IB_NTCP2_GOT_RI);
     766            _received = 0;
     767            ByteArray ptmp = _dataReadBufs.acquire();
     768            byte[] payload = ptmp.getData();
     769            try {
     770                _handshakeState.readMessage(tmp, 0, msg3tot, payload, 0);
     771            } catch (GeneralSecurityException gse) {
     772                // TODO delayed failure per spec, as in NTCPConnection.delayedClose()
     773                _dataReadBufs.release(ptmp, false);
     774                fail("Bad msg 3, part 1 is:\n" + net.i2p.util.HexDump.dump(tmp, 0, MSG3P1_SIZE), gse);
     775                return;
     776            } catch (RuntimeException re) {
     777                _dataReadBufs.release(ptmp, false);
     778                fail("Bad msg 3", re);
     779                return;
     780            }
     781            if (_log.shouldWarn())
     782                _log.warn("After msg 3: " + _handshakeState.toString());
     783            try {
     784                // calls callbacks below
     785                NTCP2Payload.processPayload(_context, this, payload, 0, _msg3p2len - MAC_SIZE, true);
     786            } catch (IOException ioe) {
     787                fail("Bad msg 3 payload", ioe);
     788                // probably payload frame/block problems
     789                // setDataPhase() will send termination
     790                if (_msg3p2FailReason < 0)
     791                    _msg3p2FailReason = NTCPConnection.REASON_FRAMING;
     792            } catch (DataFormatException dfe) {
     793                fail("Bad msg 3 payload", dfe);
     794                // probably RI problems
     795                // setDataPhase() will send termination
     796                if (_msg3p2FailReason < 0)
     797                    _msg3p2FailReason = NTCPConnection.REASON_SIGFAIL;
     798                _context.statManager().addRateData("ntcp.invalidInboundSignature", 1);
     799            } catch (I2NPMessageException ime) {
     800                // shouldn't happen, no I2NP msgs in msg3p2
     801                fail("Bad msg 3 payload", ime);
     802                // setDataPhase() will send termination
     803                if (_msg3p2FailReason < 0)
     804                    _msg3p2FailReason = 0;
     805            } finally {
     806                _dataReadBufs.release(ptmp, false);
     807            }
     808
     809            // pass buffer for processing of "extra" data
     810            setDataPhase(src);
     811        }
     812        // TODO check for remaining data and log/throw
     813    }
     814
     815    /**
     816     *  Write the 2nd NTCP2 message.
     817     *  IV (CBC from msg 1) must be in _prevEncrypted
     818     *
     819     *  @since 0.9.36
     820     */
     821    private synchronized void prepareOutbound2() {
     822        // create msg 2 payload
     823        byte[] options2 = new byte[OPTIONS2_SIZE];
     824        int padlen2 = _context.random().nextInt(PADDING2_MAX);
     825        DataHelper.toLong(options2, 2, 2, padlen2);
     826        long now = _context.clock().now() / 1000;
     827        DataHelper.toLong(options2, 8, 4, now);
     828        byte[] tmp = new byte[MSG2_SIZE + padlen2];
     829        try {
     830            _handshakeState.writeMessage(tmp, 0, options2, 0, OPTIONS2_SIZE);
     831        } catch (GeneralSecurityException gse) {
     832            // buffer length error
     833            if (!_log.shouldWarn())
     834                _log.error("Bad msg 2 out", gse);
     835            fail("Bad msg 2 out", gse);
     836            return;
     837        } catch (RuntimeException re) {
     838            if (!_log.shouldWarn())
     839                _log.error("Bad msg 2 out", re);
     840            fail("Bad msg 2 out", re);
     841            return;
     842        }
     843        if (_log.shouldWarn())
     844            _log.warn("After msg 2: " + _handshakeState.toString());
     845        Hash h = _context.routerHash();
     846        SessionKey bobHash = new SessionKey(h.getData());
     847        _context.aes().encrypt(tmp, 0, tmp, 0, bobHash, _prevEncrypted, KEY_SIZE);
     848        if (padlen2 > 0) {
     849            _context.random().nextBytes(tmp, MSG2_SIZE, padlen2);
     850            _handshakeState.mixHash(tmp, MSG2_SIZE, padlen2);
     851            if (_log.shouldWarn())
     852                _log.warn("After mixhash padding " + padlen2 + " msg 2: " + _handshakeState.toString());
     853        }
     854
     855        changeState(State.IB_NTCP2_SENT_Y);
     856        // send it all at once
     857        _transport.getPumper().wantsWrite(_con, tmp);
     858    }
     859
     860    /**
     861     *  KDF for NTCP2 data phase,
     862     *  then calls con.finishInboundEstablishment(),
     863     *  passing over the final keys and states to the con.
     864     *
     865     *  This changes the state to VERIFIED.
     866     *
     867     *  @param buf possibly containing "extra" data for data phase
     868     *  @since 0.9.36
     869     */
     870    private synchronized void setDataPhase(ByteBuffer buf) {
     871        // Data phase ChaChaPoly keys
     872        CipherStatePair ckp = _handshakeState.split();
     873        CipherState rcvr = ckp.getReceiver();
     874        CipherState sender = ckp.getSender();
     875        byte[] k_ab = rcvr.getKey();
     876        byte[] k_ba = sender.getKey();
     877
     878        // Data phase SipHash keys
     879        byte[][] sipkeys = generateSipHashKeys(_context, _handshakeState);
     880        byte[] sip_ab = sipkeys[0];
     881        byte[] sip_ba = sipkeys[1];
     882
     883        if (_msg3p2FailReason >= 0) {
     884            if (_log.shouldWarn())
     885                _log.warn("Failed msg3p2, code " + _msg3p2FailReason + " for " + this);
     886            _con.failInboundEstablishment(sender, sip_ba, _msg3p2FailReason);
     887        } else {
     888            if (_log.shouldWarn()) {
     889                _log.warn("Finished establishment for " + this +
     890                          "\nGenerated ChaCha key for A->B: " + Base64.encode(k_ab) +
     891                          "\nGenerated ChaCha key for B->A: " + Base64.encode(k_ba) +
     892                          "\nGenerated SipHash key for A->B: " + Base64.encode(sip_ab) +
     893                          "\nGenerated SipHash key for B->A: " + Base64.encode(sip_ba));
     894            }
     895            // skew in seconds
     896            _con.finishInboundEstablishment(sender, rcvr, sip_ba, sip_ab, _peerSkew, _hisPadding);
     897            changeState(State.VERIFIED);
     898            if (buf.hasRemaining()) {
     899                // process "extra" data
     900                // This is very likely for inbound, as data should come right after message 3
     901                if (_log.shouldInfo())
     902                    _log.info("extra data " + buf.remaining() + " on " + this);
     903                 _con.recvEncryptedI2NP(buf);
     904            }
     905        }
     906        // zero out everything
     907        releaseBufs(true);
     908        _handshakeState.destroy();
     909        Arrays.fill(sip_ab, (byte) 0);
     910        Arrays.fill(sip_ba, (byte) 0);
     911    }
     912
     913    //// PayloadCallbacks
     914
     915    /**
     916     *  Get "s" static key out of RI, compare to what we got in the handshake.
     917     *  Tell NTCPConnection who it is.
     918     *
     919     *  @param isHandshake always true
     920     *  @throws DataFormatException on bad sig, unknown SigType, no static key,
     921     *                                 static key mismatch, IP checks in verifyInbound()
     922     *  @since 0.9.36
     923     */
     924    public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException {
     925        // Validate Alice static key
     926        String s = null;
     927        // find address with matching version
     928        List<RouterAddress> addrs = ri.getTargetAddresses(NTCPTransport.STYLE, NTCPTransport.STYLE2);
     929        for (RouterAddress addr : addrs) {
     930            String v = addr.getOption("v");
     931            if (v == null ||
     932                (!v.equals(NTCPTransport.NTCP2_VERSION) && !v.startsWith(NTCPTransport.NTCP2_VERSION_ALT))) {
     933                 continue;
     934            }
     935            s = addr.getOption("s");
     936            if (s != null)
     937                break;
     938        }
     939        if (s == null) {
     940            _msg3p2FailReason = NTCPConnection.REASON_S_MISMATCH;
     941            throw new DataFormatException("no s in RI");
     942        }
     943        byte[] sb = Base64.decode(s);
     944        if (sb == null || sb.length != KEY_SIZE) {
     945            _msg3p2FailReason = NTCPConnection.REASON_S_MISMATCH;
     946            throw new DataFormatException("bad s in RI");
     947        }
     948        byte[] nb = new byte[32];
     949        // compare to the _handshakeState
     950        _handshakeState.getRemotePublicKey().getPublicKey(nb, 0);
     951        if (!DataHelper.eqCT(sb, 0, nb, 0, KEY_SIZE)) {
     952            _msg3p2FailReason = NTCPConnection.REASON_S_MISMATCH;
     953            throw new DataFormatException("s mismatch in RI");
     954        }
     955        _aliceIdent = ri.getIdentity();
     956        Hash h = _aliceIdent.calculateHash();
     957        // this sets the reason
     958        boolean ok = verifyInbound(h);
     959        if (!ok)
     960            throw new DataFormatException("NTCP2 verifyInbound() fail");
     961        try {
     962            RouterInfo old = _context.netDb().store(h, ri);
     963            if (flood && !ri.equals(old)) {
     964                FloodfillNetworkDatabaseFacade fndf = (FloodfillNetworkDatabaseFacade) _context.netDb();
     965                if (fndf.floodConditional(ri)) {
     966                    if (_log.shouldLog(Log.WARN))
     967                        _log.warn("Flooded the RI: " + h);
     968                } else {
     969                    if (_log.shouldLog(Log.WARN))
     970                        _log.warn("Flood request but we didn't: " + h);
     971                }
     972            }
     973        } catch (IllegalArgumentException iae) {
     974            // hash collision?
     975            _msg3p2FailReason = NTCPConnection.REASON_UNSPEC;
     976            throw new DataFormatException("RI store fail", iae);
     977        }
     978        _con.setRemotePeer(_aliceIdent);
     979    }
     980
     981    /** @since 0.9.36 */
     982    public void gotOptions(byte[] options, boolean isHandshake) {
     983        if (options.length < 12) {
     984            if (_log.shouldWarn())
     985                _log.warn("Got options length " + options.length + " on: " + this);
     986            return;
     987        }
     988        float tmin = (options[0] & 0xff) / 16.0f;
     989        float tmax = (options[1] & 0xff) / 16.0f;
     990        float rmin = (options[2] & 0xff) / 16.0f;
     991        float rmax = (options[3] & 0xff) / 16.0f;
     992        int tdummy = (int) DataHelper.fromLong(options, 4, 2);
     993        int rdummy = (int) DataHelper.fromLong(options, 6, 2);
     994        int tdelay = (int) DataHelper.fromLong(options, 8, 2);
     995        int rdelay = (int) DataHelper.fromLong(options, 10, 2);
     996        _hisPadding = new NTCP2Options(tmin, tmax, rmin, rmax,
     997                                       tdummy, rdummy, tdelay, rdelay);
     998    }
     999
     1000    /** @since 0.9.36 */
     1001    public void gotPadding(int paddingLength, int frameLength) {}
     1002
     1003    // Following 4 are illegal in handshake, we will never get them
     1004
     1005    /** @since 0.9.36 */
     1006    public void gotTermination(int reason, long lastReceived) {}
     1007    /** @since 0.9.36 */
     1008    public void gotUnknown(int type, int len) {}
     1009    /** @since 0.9.36 */
     1010    public void gotDateTime(long time) {}
     1011    /** @since 0.9.36 */
     1012    public void gotI2NP(I2NPMessage msg) {}
     1013
     1014    /**
     1015     *  @since 0.9.16
     1016     */
     1017    @Override
     1018    protected synchronized void fail(String reason, Exception e, boolean bySkew) {
     1019        super.fail(reason, e, bySkew);
     1020        if (_handshakeState != null) {
     1021            if (_log.shouldWarn())
     1022                _log.warn("State at failure: " + _handshakeState.toString());
     1023            _handshakeState.destroy();
     1024        }
     1025    }
     1026
    4991027    /**
    5001028     *  Only call once. Caller must synch.
     
    5081036        if (!isVerified)
    5091037            SimpleByteCache.release(_curEncrypted);
     1038        Arrays.fill(_X, (byte) 0);
    5101039        SimpleByteCache.release(_X);
     1040        if (_msg3tmp != null) {
     1041            _dataReadBufs.release(_msg3tmp, false);
     1042            _msg3tmp = null;
     1043        }
    5111044    }
    5121045}
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    r49221ad rae8779e  
    77import java.nio.channels.SelectionKey;
    88import java.nio.channels.SocketChannel;
     9import java.security.GeneralSecurityException;
     10import java.util.Arrays;
    911import java.util.ArrayList;
     12import java.util.Collections;
    1013import java.util.List;
    1114import java.util.Queue;
     
    1821import java.util.zip.Adler32;
    1922
     23import com.southernstorm.noise.protocol.CipherState;
     24
     25import net.i2p.crypto.SipHashInline;
    2026import net.i2p.data.Base64;
    2127import net.i2p.data.ByteArray;
     28import net.i2p.data.DataFormatException;
    2229import net.i2p.data.DataHelper;
     30import net.i2p.data.Hash;
    2331import net.i2p.data.router.RouterAddress;
    2432import net.i2p.data.router.RouterIdentity;
     
    3240import net.i2p.router.Router;
    3341import net.i2p.router.RouterContext;
     42import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
    3443import net.i2p.router.transport.FIFOBandwidthLimiter;
    3544import net.i2p.router.transport.FIFOBandwidthLimiter.Request;
     45import net.i2p.router.transport.ntcp.NTCP2Payload.Block;
    3646import net.i2p.router.util.PriBlockingQueue;
    3747import net.i2p.util.ByteCache;
     
    3949import net.i2p.util.HexDump;
    4050import net.i2p.util.Log;
     51import net.i2p.util.SimpleTimer2;
    4152import net.i2p.util.SystemVersion;
    4253import net.i2p.util.VersionComparator;
     
    4455/**
    4556 * Coordinate the connection to a single peer.
     57 * NTCP 1 or 2.
    4658 *
    4759 * Public only for UI peers page. Not a public API, not for external use.
     
    104116    private final PriBlockingQueue<OutNetMessage> _outbound;
    105117    /**
    106      *  current prepared OutNetMessage, or null - synchronize on _outbound to modify or read
    107      *  FIXME why do we need this???
    108      */
    109     private OutNetMessage _currentOutbound;
     118     *  current prepared OutNetMessages, or empty - synchronize to modify or read
     119     */
     120    private final List<OutNetMessage> _currentOutbound;
    110121    private SessionKey _sessionKey;
    111     /** encrypted block of the current I2NP message being read */
    112     private byte _curReadBlock[];
    113     /** next byte to which data should be placed in the _curReadBlock */
    114     private int _curReadBlockIndex;
    115     private final byte _decryptBlockBuf[];
    116     /** last AES block of the encrypted I2NP message (to serve as the next block's IV) */
    117     private byte _prevReadBlock[];
    118122    private byte _prevWriteEnd[];
    119123    /** current partially read I2NP message */
    120     private final ReadState _curReadState;
     124    private ReadState _curReadState;
    121125    private final AtomicInteger _messagesRead = new AtomicInteger();
    122126    private final AtomicInteger _messagesWritten = new AtomicInteger();
     
    127131    // prevent sending meta before established
    128132    private long _nextMetaTime = Long.MAX_VALUE;
    129     private int _consecutiveZeroReads;
     133    private final AtomicInteger _consecutiveZeroReads = new AtomicInteger();
    130134
    131135    private static final int BLOCK_SIZE = 16;
    132136    private static final int META_SIZE = BLOCK_SIZE;
    133137
    134     /** unencrypted outbound metadata buffer */
    135     private final byte _meta[] = new byte[META_SIZE];
    136138    private boolean _sendingMeta;
    137139    /** how many consecutive sends were failed due to (estimated) send queue time */
     
    171173    private final long _connID = __connID.incrementAndGet();
    172174   
    173     /**
    174      * Create an inbound connected (though not established) NTCP connection
    175      *
     175    //// NTCP2 things
     176
     177    private static final int PADDING_RAND_MIN = 16;
     178    private static final int PADDING_MAX = 64;
     179    private static final int SIP_IV_LENGTH = 8;
     180    private static final int NTCP2_FAIL_READ = 1024;
     181    private static final long NTCP2_FAIL_TIMEOUT = 10*1000;
     182    private static final long NTCP2_TERMINATION_CLOSE_DELAY = 50;
     183    static final int REASON_UNSPEC = 0;
     184    static final int REASON_TERMINATION = 1;
     185    static final int REASON_TIMEOUT = 2;
     186    static final int REASON_AEAD = 4;
     187    static final int REASON_OPTIONS = 5;
     188    static final int REASON_SIGTYPE = 6;
     189    static final int REASON_SKEW = 7;
     190    static final int REASON_PADDING = 8;
     191    static final int REASON_FRAMING = 9;
     192    static final int REASON_PAYLOAD = 10;
     193    static final int REASON_MSG1 = 11;
     194    static final int REASON_MSG2 = 12;
     195    static final int REASON_MSG3 = 13;
     196    static final int REASON_FRAME_TIMEOUT = 14;
     197    static final int REASON_SIGFAIL = 15;
     198    static final int REASON_S_MISMATCH = 16;
     199    static final int REASON_BANNED = 17;
     200    static final int PADDING_MIN_DEFAULT_INT = 1;
     201    static final int PADDING_MAX_DEFAULT_INT = 2;
     202    private static final float PADDING_MIN_DEFAULT = PADDING_MIN_DEFAULT_INT / 16.0f;
     203    private static final float PADDING_MAX_DEFAULT = PADDING_MAX_DEFAULT_INT / 16.0f;
     204    static final int DUMMY_DEFAULT = 0;
     205    static final int DELAY_DEFAULT = 0;
     206    private static final NTCP2Options OUR_PADDING = new NTCP2Options(PADDING_MIN_DEFAULT, PADDING_MAX_DEFAULT,
     207                                                                     PADDING_MIN_DEFAULT, PADDING_MAX_DEFAULT,
     208                                                                     DUMMY_DEFAULT, DUMMY_DEFAULT,
     209                                                                     DELAY_DEFAULT, DELAY_DEFAULT);
     210    private static final int MIN_PADDING_RANGE = 64;
     211    private NTCP2Options _paddingConfig;
     212    private int _version;
     213    private CipherState _sender;
     214    private long _sendSipk1, _sendSipk2;
     215    private byte[] _sendSipIV;
     216
     217
     218    /**
     219     * Create an inbound connected (though not established) NTCP connection.
     220     * Caller MUST call transport.establishing(this) after construction.
     221     * Caller MUST key.attach(this) after construction.
    176222     */
    177223    public NTCPConnection(RouterContext ctx, NTCPTransport transport, SocketChannel chan, SelectionKey key) {
     224        this(ctx, transport, null, true);
     225        _chan = chan;
     226        _version = 1;
     227        _conKey = key;
     228        _establishState = new InboundEstablishState(ctx, transport, this);
     229    }
     230
     231    /**
     232     * Create an outbound unconnected NTCP connection.
     233     * Caller MUST call transport.establishing(this) after construction.
     234     *
     235     * @param version must be 1 or 2
     236     */
     237    public NTCPConnection(RouterContext ctx, NTCPTransport transport, RouterIdentity remotePeer,
     238                          RouterAddress remAddr, int version) {
     239        this(ctx, transport, remAddr, false);
     240        _remotePeer = remotePeer;
     241        _version = version;
     242        if (version == 1)
     243            _establishState = new OutboundEstablishState(ctx, transport, this);
     244        else
     245            _establishState = new OutboundNTCP2State(ctx, transport, this);
     246    }
     247
     248    /**
     249     * Base constructor in/out
     250     * @since 0.9.36
     251     */
     252    private NTCPConnection(RouterContext ctx, NTCPTransport transport, RouterAddress remAddr, boolean isIn) {
    178253        _context = ctx;
    179254        _log = ctx.logManager().getLog(getClass());
    180255        _created = ctx.clock().now();
    181256        _transport = transport;
    182         _remAddr = null;
    183         _chan = chan;
     257        _remAddr = remAddr;
     258        _lastSendTime = _created;
     259        _lastReceiveTime = _created;
     260        _lastRateUpdated = _created;
    184261        _readBufs = new ConcurrentLinkedQueue<ByteBuffer>();
    185262        _writeBufs = new ConcurrentLinkedQueue<ByteBuffer>();
     
    188265        //_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
    189266        _outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
    190         _isInbound = true;
    191         _decryptBlockBuf = new byte[BLOCK_SIZE];
    192         _curReadState = new ReadState();
    193         _establishState = new InboundEstablishState(ctx, transport, this);
    194         _conKey = key;
    195         _conKey.attach(this);
     267        _currentOutbound = new ArrayList<OutNetMessage>(1);
     268        _isInbound = isIn;
    196269        _inboundListener = new InboundListener();
    197270        _outboundListener = new OutboundListener();
    198         initialize();
    199     }
    200 
    201     /**
    202      * Create an outbound unconnected NTCP connection
    203      *
    204      * @param version must be 1 or 2
    205      */
    206     public NTCPConnection(RouterContext ctx, NTCPTransport transport, RouterIdentity remotePeer,
    207                           RouterAddress remAddr, int version) {
    208         _context = ctx;
    209         _log = ctx.logManager().getLog(getClass());
    210         _created = ctx.clock().now();
    211         _transport = transport;
    212         _remotePeer = remotePeer;
    213         _remAddr = remAddr;
    214         _readBufs = new ConcurrentLinkedQueue<ByteBuffer>();
    215         _writeBufs = new ConcurrentLinkedQueue<ByteBuffer>();
    216         _bwInRequests = new ConcurrentHashSet<Request>(2);
    217         _bwOutRequests = new ConcurrentHashSet<Request>(8);
    218         //_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
    219         _outbound = new PriBlockingQueue<OutNetMessage>(ctx, "NTCP-Connection", 32);
    220         _isInbound = false;
    221         //if (version == 1)
    222             _establishState = new OutboundEstablishState(ctx, transport, this);
    223         //else
    224         //    _establishState = // TODO
    225         _decryptBlockBuf = new byte[BLOCK_SIZE];
    226         _curReadState = new ReadState();
    227         _inboundListener = new InboundListener();
    228         _outboundListener = new OutboundListener();
    229         initialize();
    230     }
    231 
    232     private void initialize() {
    233         _lastSendTime = _created;
    234         _lastReceiveTime = _created;
    235         _lastRateUpdated = _created;
    236         _curReadBlock = new byte[BLOCK_SIZE];
    237         _prevReadBlock = new byte[BLOCK_SIZE];
    238         _transport.establishing(this);
    239271    }
    240272
     
    250282    public void setChannel(SocketChannel chan) { _chan = chan; }
    251283    public void setKey(SelectionKey key) { _conKey = key; }
     284
    252285    public boolean isInbound() { return _isInbound; }
    253286    public boolean isEstablished() { return _establishState.isComplete(); }
     
    262295
    263296    /**
    264      *  Only valid during establishment; null later
    265      */
    266     public EstablishState getEstablishState() { return _establishState; }
     297     *  Only valid during establishment;
     298     *  replaced with EstablishState.VERIFIED or FAILED afterward
     299     */
     300    EstablishState getEstablishState() { return _establishState; }
    267301
    268302    /**
     
    275309     */
    276310    public RouterIdentity getRemotePeer() { return _remotePeer; }
     311
     312    /**
     313     *  Valid for outbound; valid for inbound after handshake
     314     */
    277315    public void setRemotePeer(RouterIdentity ident) { _remotePeer = ident; }
    278316
    279317    /**
    280      * We are Bob.
     318     * We are Bob. NTCP1 only.
     319     *
     320     * Caller MUST call recvEncryptedI2NP() after, for any remaining bytes in receive buffer
    281321     *
    282322     * @param clockSkew OUR clock minus ALICE's clock in seconds (may be negative, obviously, but |val| should
    283323     *                  be under 1 minute)
    284      * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt
    285      * @param prevReadEnd 16 or more bytes, last 16 bytes copied
    286      */
    287     public void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
     324     * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt, the write AES IV
     325     * @param prevReadEnd 16 or more bytes, last 16 bytes copied as the read AES IV
     326     */
     327    void finishInboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
    288328        NTCPConnection toClose = locked_finishInboundEstablishment(key, clockSkew, prevWriteEnd, prevReadEnd);
    289329        if (toClose != null) {
     
    297337   
    298338    /**
    299      * We are Bob.
     339     * We are Bob. NTCP1 only.
    300340     *
    301341     * @param clockSkew OUR clock minus ALICE's clock in seconds (may be negative, obviously, but |val| should
    302342     *                  be under 1 minute)
    303      * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt
    304      * @param prevReadEnd 16 or more bytes, last 16 bytes copied
     343     * @param prevWriteEnd exactly 16 bytes, not copied, do not corrupt, the write AES IV
     344     * @param prevReadEnd 16 or more bytes, last 16 bytes copied as the read AES IV
    305345     * @return old conn to be closed by caller, or null
    306346     */
    307347    private synchronized NTCPConnection locked_finishInboundEstablishment(
    308348            SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
     349        if (_establishState == EstablishBase.VERIFIED) {
     350            IllegalStateException ise = new IllegalStateException("Already finished on " + this);
     351            _log.error("Already finished", ise);
     352            throw ise;
     353        }
     354        byte[] prevReadBlock = new byte[BLOCK_SIZE];
     355        System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, prevReadBlock, 0, BLOCK_SIZE);
     356        _curReadState = new NTCP1ReadState(prevReadBlock);
    309357        _sessionKey = key;
    310358        _clockSkew = clockSkew;
    311359        _prevWriteEnd = prevWriteEnd;
    312         System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
    313360        _establishedOn = _context.clock().now();
    314361        NTCPConnection rv = _transport.inboundEstablished(this);
     
    338385
    339386    public int getOutboundQueueSize() {
    340             int queued;
    341             synchronized(_outbound) {
    342                 queued = _outbound.size();
    343                 if (getCurrentOutbound() != null)
    344                     queued++;
     387            int queued = _outbound.size();
     388            synchronized(_currentOutbound) {
     389                queued += _currentOutbound.size();
    345390            }
    346391            return queued;
    347392    }
     393
     394    /** @since 0.9.36 */
     395    private boolean hasCurrentOutbound() {
     396        synchronized(_currentOutbound) {
     397            return ! _currentOutbound.isEmpty();
     398        }
     399    }
    348400   
    349     private OutNetMessage getCurrentOutbound() {
    350         synchronized(_outbound) {
    351             return _currentOutbound;
    352         }
    353     }
    354 
    355401    /** @return milliseconds */
    356402    public long getTimeSinceSend() { return _context.clock().now()-_lastSendTime; }
     
    369415
    370416    /**
     417     *  The NTCP2 version, for the console.
     418     *  For outbound, will not change.
     419     *  For inbound, defaults to 1, may change to 2 after establishment.
     420     *
     421     *  @return the version, 1 or 2
     422     *  @since 0.9.36
     423     */
     424    public int getVersion() { return _version; }
     425
     426    /**
     427     *  Set version 2 from InboundEstablishState.
     428     *  Just for logging, so we know before finishInboundEstablish() is called.
     429     *
     430     *  @since 0.9.36
     431     */
     432    public void setVersion(int ver) { _version = ver; }
     433
     434    /**
    371435     * Sets to true.
    372436     * @since 0.9.24
     
    383447     *  @since 0.8.12
    384448     */
    385     public void clearZeroRead() {
    386         _consecutiveZeroReads = 0;
     449    void clearZeroRead() {
     450        _consecutiveZeroReads.set(0);
    387451    }
    388452
     
    392456     *  @since 0.8.12
    393457     */
    394     public int gotZeroRead() {
    395         return ++_consecutiveZeroReads;
     458    int gotZeroRead() {
     459        return _consecutiveZeroReads.incrementAndGet();
    396460    }
    397461
     
    405469            return;
    406470        }
    407         if (_log.shouldLog(Log.INFO))
     471        if (_version == 2) {
     472            // for debugging
     473            if (_log.shouldWarn())
     474                _log.warn("Closing connection " + toString(), new Exception("cause"));
     475        } else if (_log.shouldLog(Log.INFO)) {
    408476            _log.info("Closing connection " + toString(), new Exception("cause"));
     477        }
    409478        NTCPConnection toClose = locked_close(allowRequeue);
    410479        if (toClose != null && toClose != this) {
     
    421490     *  @since 0.9.16
    422491     */
    423     public void closeOnTimeout(String cause, Exception e) {
     492    void closeOnTimeout(String cause, Exception e) {
    424493        EstablishState es = _establishState;
    425494        close();
     
    458527        //_outbound.drainAllTo(pending);
    459528        _outbound.drainTo(pending);
    460         for (OutNetMessage msg : pending)
     529        synchronized(_currentOutbound) {
     530            if (!_currentOutbound.isEmpty())
     531                pending.addAll(_currentOutbound);
     532            _currentOutbound.clear();
     533        }
     534        for (OutNetMessage msg : pending) {
    461535            _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
    462 
    463         OutNetMessage msg = getCurrentOutbound();
    464         if (msg != null)
    465             _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
    466        
     536        }
     537        // zero out everything we can
     538        if (_curReadState != null) {
     539            _curReadState.destroy();
     540            _curReadState = null;
     541        }
     542        if (_sender != null) {
     543            _sender.destroy();
     544            _sender = null;
     545        }
     546        _sendSipk1 = 0;
     547        _sendSipk2 = 0;
     548        if (_sendSipIV != null) {
     549            Arrays.fill(_sendSipIV, (byte) 0);
     550            _sendSipIV = null;
     551        }
    467552        return old;
    468553    }
     
    478563            return;
    479564        }
    480         boolean noOutbound = (getCurrentOutbound() == null);
    481         if (isEstablished() && noOutbound)
     565        if (isEstablished() && !hasCurrentOutbound())
    482566            _transport.getWriter().wantsWrite(this, "enqueued");
    483567    }
     
    494578            int size = _outbound.size();
    495579            if (_log.shouldLog(Log.WARN)) {
    496                 int writeBufs = _writeBufs.size();
    497                 boolean currentOutboundSet = getCurrentOutbound() != null;
     580                int writeBufs = _writeBufs.size();
     581                boolean currentOutboundSet;
     582                long seq;
     583                synchronized(_currentOutbound) {
     584                    currentOutboundSet = !_currentOutbound.isEmpty();
     585                    seq = currentOutboundSet ? _currentOutbound.get(0).getSeqNum() : -1;
     586                }
    498587                try {
    499588                    _log.warn("Too backlogged: size is " + size
    500                           + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
    501                           + ", currentOut set? " + currentOutboundSet
    502                           + ", writeBufs: " + writeBufs + " on " + toString());
     589                              + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
     590                              + ", currentOut set? " + currentOutboundSet
     591                              + ", id: " + seq
     592                              + ", writeBufs: " + writeBufs + " on " + toString());
    503593                } catch (RuntimeException e) {}  // java.nio.channels.CancelledKeyException
    504594            }
     
    510600   
    511601    /**
    512      *  Inject a DatabaseStoreMessage with our RouterInfo
    513      */
    514     public void enqueueInfoMessage() {
     602     *  Inject a DatabaseStoreMessage with our RouterInfo. NTCP 1 or 2.
     603     *
     604     *  Externally, this is only called by NTCPTransport for outbound cons,
     605     *  before the con is established, but we know what version it is.
     606     *
     607     *  Internally, may be called for outbound or inbound, but only after the
     608     *  con is established, so we know what the version is.
     609     */
     610    void enqueueInfoMessage() {
     611        if (_version == 1) {
     612            enqueueInfoMessageNTCP1();
     613            // may change to 2 for inbound
     614        } else if (_isInbound) {
     615            // TODO or if outbound and it's not right at the beginning
     616            // TODO flood
     617            sendOurRouterInfo(false);
     618        }
     619        // don't need to send for NTCP 2 outbound, it's in msg 3
     620    }
     621   
     622    /**
     623     *  Inject a DatabaseStoreMessage with our RouterInfo. NTCP 1 only.
     624     */
     625    private void enqueueInfoMessageNTCP1() {
    515626        int priority = INFO_PRIORITY;
    516627        if (_log.shouldLog(Log.INFO))
     
    525636
    526637    /**
    527      * We are Alice.
     638     * We are Alice. NTCP1 only.
     639     *
     640     * Caller MUST call recvEncryptedI2NP() after, for any remaining bytes in receive buffer
    528641     *
    529642     * @param clockSkew OUR clock minus BOB's clock in seconds (may be negative, obviously, but |val| should
     
    532645     * @param prevReadEnd 16 or more bytes, last 16 bytes copied
    533646     */
    534     public synchronized void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
    535         if (_log.shouldLog(Log.DEBUG))
    536             _log.debug("outbound established (key=" + key + " skew=" + clockSkew + " prevWriteEnd=" + Base64.encode(prevWriteEnd) + ")");
     647    synchronized void finishOutboundEstablishment(SessionKey key, long clockSkew, byte prevWriteEnd[], byte prevReadEnd[]) {
     648        if (_establishState == EstablishBase.VERIFIED) {
     649            IllegalStateException ise = new IllegalStateException("Already finished on " + this);
     650            _log.error("Already finished", ise);
     651            throw ise;
     652        }
     653        byte[] prevReadBlock = new byte[BLOCK_SIZE];
     654        System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, prevReadBlock, 0, BLOCK_SIZE);
     655        _curReadState = new NTCP1ReadState(prevReadBlock);
    537656        _sessionKey = key;
    538657        _clockSkew = clockSkew;
    539658        _prevWriteEnd = prevWriteEnd;
    540         System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
    541659        if (_log.shouldLog(Log.DEBUG))
    542             _log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
     660            _log.debug("outbound established (key=" + key + " skew=" + clockSkew +
     661                       " prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadBlock: " + Base64.encode(prevReadBlock));
    543662
    544663        _establishedOn = _context.clock().now();
    545664        _establishState = EstablishBase.VERIFIED;
    546665        _transport.markReachable(getRemotePeer().calculateHash(), false);
    547         boolean msgs = !_outbound.isEmpty();
    548666        _nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
    549667        _nextInfoTime = _establishedOn + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
    550         if (msgs)
     668        if (!_outbound.isEmpty())
    551669            _transport.getWriter().wantsWrite(this, "outbound established");
    552670    }
    553671   
    554672    /**
    555      * prepare the next i2np message for transmission.  this should be run from
    556      * the Writer thread pool.
     673     * Prepare the next I2NP message for transmission.  This should be run from
     674     * the Writer thread pool. NTCP 1 or 2.
     675     *
     676     * This is the entry point as called from Writer.Runner.run()
    557677     *
    558678     * @param prep an instance of PrepBuffer to use as scratch space
     
    560680     */
    561681    synchronized void prepareNextWrite(PrepBuffer prep) {
    562             prepareNextWriteFast(prep);
    563     }
    564 
    565     /**
    566      * prepare the next i2np message for transmission.  this should be run from
    567      * the Writer thread pool.
    568      *
    569      * Caller must synchronize.
    570      * @param buf a PrepBuffer to use as scratch space
    571      *
    572      */
    573     private void prepareNextWriteFast(PrepBuffer buf) {
    574682        if (_closed.get())
    575683            return;
     
    581689            return;
    582690        }
    583        
     691        if (_version == 1)
     692            prepareNextWriteFast(prep);
     693        else
     694            prepareNextWriteNTCP2(prep);
     695    }
     696
     697    /**
     698     * Prepare the next I2NP message for transmission.  This should be run from
     699     * the Writer thread pool. NTCP 1 only.
     700     *
     701     * Caller must synchronize.
     702     * @param buf a PrepBuffer to use as scratch space
     703     *
     704     */
     705    private void prepareNextWriteFast(PrepBuffer buf) {
    584706        long now = _context.clock().now();
    585707        if (_nextMetaTime <= now) {
     
    588710        }
    589711     
    590         OutNetMessage msg = null;
    591         // this is synchronized only for _currentOutbound
    592         // Todo: figure out how to remove the synchronization
    593         synchronized (_outbound) {
    594             if (_currentOutbound != null) {
     712        OutNetMessage msg;
     713        synchronized (_currentOutbound) {
     714            if (!_currentOutbound.isEmpty()) {
    595715                if (_log.shouldLog(Log.INFO))
    596                     _log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
     716                    _log.info("attempt for multiple outbound messages with " + _currentOutbound.size() + " already waiting and " + _outbound.size() + " queued");
    597717                return;
    598718            }
     719            while (true) {
    599720                msg = _outbound.poll();
    600721                if (msg == null)
    601722                    return;
    602             _currentOutbound = msg;
    603         }
    604        
    605         bufferedPrepare(msg,buf);
     723                if (msg.getExpiration() >= now)
     724                    break;
     725                if (_log.shouldWarn())
     726                    _log.warn("dropping message expired on queue: " + msg + " on " + this);
     727                _transport.afterSend(msg, false, false, msg.getLifetime());
     728            }
     729            _currentOutbound.add(msg);
     730        }
     731
     732        bufferedPrepare(msg, buf);
    606733        _context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength);
    607734        System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
     
    627754    private void bufferedPrepare(OutNetMessage msg, PrepBuffer buf) {
    628755        I2NPMessage m = msg.getMessage();
    629         buf.baseLength = m.toByteArray(buf.base);
    630         int sz = buf.baseLength;
     756        // 2 offset for size
     757        int sz = m.toByteArray(buf.unencrypted, 2) - 2;
    631758        int min = 2 + sz + 4;
    632759        int rem = min % 16;
     
    634761        if (rem > 0)
    635762            padding = 16 - rem;
    636        
    637763        buf.unencryptedLength = min+padding;
    638764        DataHelper.toLong(buf.unencrypted, 0, 2, sz);
    639         System.arraycopy(buf.base, 0, buf.unencrypted, 2, buf.baseLength);
    640765        if (padding > 0) {
    641766            _context.random().nextBytes(buf.unencrypted, 2+sz, padding);
     
    658783    }
    659784
    660     public static class PrepBuffer {
     785    static class PrepBuffer {
    661786        final byte unencrypted[];
    662787        int unencryptedLength;
    663         final byte base[];
    664         int baseLength;
    665788        final Adler32 crc;
    666789        byte encrypted[];
     
    668791        public PrepBuffer() {
    669792            unencrypted = new byte[BUFFER_SIZE];
    670             base = new byte[BUFFER_SIZE];
    671793            crc = new Adler32();
    672794        }
     
    674796        public void init() {
    675797            unencryptedLength = 0;
    676             baseLength = 0;
    677798            encrypted = null;
    678799            crc.reset();
    679800        }
     801    }
     802
     803    /**
     804     * Prepare the next I2NP message for transmission.  This should be run from
     805     * the Writer thread pool.
     806     *
     807     * Caller must synchronize.
     808     *
     809     * @param buf we use buf.enencrypted only
     810     * @since 0.9.36
     811     */
     812    private void prepareNextWriteNTCP2(PrepBuffer buf) {
     813        int size = OutboundNTCP2State.MAC_SIZE;
     814        List<Block> blocks = new ArrayList<Block>(4);
     815        long now = _context.clock().now();
     816        synchronized (_currentOutbound) {
     817            if (!_currentOutbound.isEmpty()) {
     818                if (_log.shouldLog(Log.INFO))
     819                    _log.info("attempt for multiple outbound messages with " + _currentOutbound.size() + " already waiting and " + _outbound.size() + " queued");
     820                return;
     821            }
     822            OutNetMessage msg;
     823            while (true) {
     824                msg = _outbound.poll();
     825                if (msg == null)
     826                    return;
     827                if (msg.getExpiration() >= now)
     828                    break;
     829                if (_log.shouldWarn())
     830                    _log.warn("dropping message expired on queue: " + msg + " on " + this);
     831                _transport.afterSend(msg, false, false, msg.getLifetime());
     832            }
     833            _currentOutbound.add(msg);
     834            // don't make combined msgs too big to minimize latency
     835            final int MAX_MSG_SIZE = 5000;
     836            I2NPMessage m = msg.getMessage();
     837            Block block = new NTCP2Payload.I2NPBlock(m);
     838            blocks.add(block);
     839            size += block.getTotalLength();
     840            // now add more (maybe)
     841            if (size < MAX_MSG_SIZE) {
     842                // keep adding as long as we will be under 5 KB
     843                while (true) {
     844                    msg = _outbound.peek();
     845                    if (msg == null)
     846                        break;
     847                    m = msg.getMessage();
     848                    int msz = m.getMessageSize() - 7;
     849                    if (size + msz > MAX_MSG_SIZE)
     850                        break;
     851                    OutNetMessage msg2 = _outbound.poll();
     852                    if (msg2 == null)
     853                        break;
     854                    if (msg2 != msg) {
     855                        // if it wasn't the one we sized, put it back
     856                        _outbound.offer(msg2);
     857                        break;
     858                    }
     859                    if (msg.getExpiration() >= now) {
     860                        block = new NTCP2Payload.I2NPBlock(m);
     861                        blocks.add(block);
     862                        size += NTCP2Payload.BLOCK_HEADER_SIZE + msz;
     863                    } else {
     864                        if (_log.shouldWarn())
     865                            _log.warn("dropping message expired on queue: " + msg + " on " + this);
     866                        _transport.afterSend(msg, false, false, msg.getLifetime());
     867                    }
     868                }
     869            }
     870        }
     871        if (_nextMetaTime <= now && size + (NTCP2Payload.BLOCK_HEADER_SIZE + 4) <= BUFFER_SIZE) {
     872            Block block = new NTCP2Payload.DateTimeBlock(_context);
     873            blocks.add(block);
     874            size += block.getTotalLength();
     875            _nextMetaTime = now + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY / 2);
     876            if (_log.shouldLog(Log.DEBUG))
     877                _log.debug("Sending NTCP2 datetime block");
     878        }
     879        // 1024 is an estimate, do final check below
     880        if (_nextInfoTime <= now && size + 1024 <= BUFFER_SIZE) {
     881            RouterInfo ri = _context.router().getRouterInfo();
     882            Block block = new NTCP2Payload.RIBlock(ri, false);
     883            int sz = block.getTotalLength();
     884            if (size + sz <= BUFFER_SIZE) {
     885                blocks.add(block);
     886                size += sz;
     887                _nextInfoTime = now + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
     888                if (_log.shouldLog(Log.INFO))
     889                    _log.info("SENDING NTCP2 RI block");
     890            } // else wait until next time
     891        }
     892        int availForPad = BUFFER_SIZE - (size + NTCP2Payload.BLOCK_HEADER_SIZE);
     893        if (availForPad > 0) {
     894            // what we want to send, calculated in proportion to data size
     895            int minSend = (int) (size * _paddingConfig.getSendMin());
     896            int maxSend = (int) (size * _paddingConfig.getSendMax());
     897            // the absolute min and max we can send
     898            int min = Math.min(minSend, availForPad);
     899            int max = Math.min(maxSend, availForPad);
     900            int range = max - min;
     901            if (range < MIN_PADDING_RANGE) {
     902                // reduce min to enforce minimum range if possible
     903                min = Math.max(0, min - (MIN_PADDING_RANGE - range));
     904                range = max - min;
     905            }
     906            int padlen = min;
     907            if (range > 0)
     908                padlen += _context.random().nextInt(1 + range);
     909            if (_log.shouldWarn())
     910                _log.warn("Padding params:" +
     911                          " size: " + size +
     912                          " avail: " + availForPad +
     913                          " minSend: " + minSend +
     914                          " maxSend: " + maxSend +
     915                          " min: " + min +
     916                          " max: " + max +
     917                          " range: " + range +
     918                          " padlen: " + padlen);
     919            // all zeros is fine here
     920            //Block block = new NTCP2Payload.PaddingBlock(_context, padlen);
     921            Block block = new NTCP2Payload.PaddingBlock(padlen);
     922            blocks.add(block);
     923            size += block.getTotalLength();
     924        }
     925        sendNTCP2(buf.unencrypted, blocks);
     926    }
     927
     928    /**
     929     *  NTCP2 only
     930     *
     931     *  @since 0.9.36
     932     */
     933    private void sendOurRouterInfo(boolean shouldFlood) {
     934        sendRouterInfo(_context.router().getRouterInfo(), shouldFlood);
     935    }
     936
     937    /**
     938     *  NTCP2 only
     939     *
     940     *  @since 0.9.36
     941     */
     942    private void sendRouterInfo(RouterInfo ri, boolean shouldFlood) {
     943        // no synch needed, sendNTCP2() is synched
     944        if (_log.shouldWarn())
     945            _log.warn("Sending router info for: " + ri.getHash() + " flood? " + shouldFlood);
     946        List<Block> blocks = new ArrayList<Block>(2);
     947        int plen = 2;
     948        Block block = new NTCP2Payload.RIBlock(ri, shouldFlood);
     949        plen += block.getTotalLength();
     950        blocks.add(block);
     951        int padlen = 1 + _context.random().nextInt(PADDING_MAX);
     952        // all zeros is fine here
     953        //block = new NTCP2Payload.PaddingBlock(_context, padlen);
     954        block = new NTCP2Payload.PaddingBlock(padlen);
     955        plen += block.getTotalLength();
     956        blocks.add(block);
     957        byte[] tmp = new byte[plen];
     958        sendNTCP2(tmp, blocks);
     959    }
     960
     961    /**
     962     *  NTCP2 only
     963     *
     964     *  @since 0.9.36
     965     */
     966    private void sendTermination(int reason, int validFramesRcvd) {
     967        // TODO add param to clear queues?
     968        // no synch needed, sendNTCP2() is synched
     969        if (_log.shouldWarn())
     970            _log.warn("Sending termination, reason: " + reason + ", vaild frames rcvd: " + validFramesRcvd);
     971        List<Block> blocks = new ArrayList<Block>(2);
     972        int plen = 2;
     973        Block block = new NTCP2Payload.TerminationBlock(reason, validFramesRcvd);
     974        plen += block.getTotalLength();
     975        blocks.add(block);
     976        int padlen = 1 + _context.random().nextInt(PADDING_MAX);
     977        // all zeros is fine here
     978        //block = new NTCP2Payload.PaddingBlock(_context, padlen);
     979        block = new NTCP2Payload.PaddingBlock(padlen);
     980        plen += block.getTotalLength();
     981        blocks.add(block);
     982        byte[] tmp = new byte[plen];
     983        sendNTCP2(tmp, blocks);
     984    }
     985
     986    /**
     987     *  This constructs the payload from the blocks, using the
     988     *  tmp byte array, then encrypts the payload and
     989     *  passes it to the pumper for writing.
     990     *
     991     *  @param tmp to be used for output of NTCP2Payload.writePayload(),
     992     *         must have room for 2 byte length and block output
     993     *  @since 0.9.36
     994     */
     995    private synchronized void sendNTCP2(byte[] tmp, List<Block> blocks) {
     996        int payloadlen = NTCP2Payload.writePayload(tmp, 0, blocks);
     997        int framelen = payloadlen + OutboundNTCP2State.MAC_SIZE;
     998        // TODO use a buffer
     999        byte[] enc = new byte[2 + framelen];
     1000        try {
     1001            _sender.encryptWithAd(null, tmp, 0, enc, 2, payloadlen);
     1002        } catch (GeneralSecurityException gse) {
     1003            // TODO anything else?
     1004            _log.error("data enc", gse);
     1005            return;
     1006        }
     1007
     1008        // siphash ^ len
     1009        long sipIV = SipHashInline.hash24(_sendSipk1, _sendSipk2, _sendSipIV);
     1010        enc[0] = (byte) ((framelen >> 8) ^ (sipIV >> 8));
     1011        enc[1] = (byte) (framelen ^ sipIV);
     1012        if (_log.shouldWarn()) {
     1013            StringBuilder buf = new StringBuilder(256);
     1014            buf.append("Sending ").append(blocks.size())
     1015               .append(" blocks in ").append(framelen)
     1016               .append(" byte NTCP2 frame:");
     1017            for (int i = 0; i < blocks.size(); i++) {
     1018                buf.append("\n    ").append(i).append(": ").append(blocks.get(i).toString());
     1019            }
     1020            _log.warn(buf.toString());
     1021        }
     1022        _transport.getPumper().wantsWrite(this, enc);
     1023        toLong8LE(_sendSipIV, 0, sipIV);
    6801024    }
    6811025   
     
    6841028     * as it occurs in the selector thread)
    6851029     */
    686     public void outboundConnected() {
     1030    void outboundConnected() {
    6871031        _conKey.interestOps(_conKey.interestOps() | SelectionKey.OP_READ);
    6881032        // schedule up the beginning of our handshaking by calling prepareNextWrite on the
     
    7491093     * contents have been fully allocated
    7501094     */
    751     public void queuedRecv(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
     1095    void queuedRecv(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
    7521096        req.attach(buf);
    7531097        req.setCompleteListener(_inboundListener);
     
    7561100
    7571101    /** ditto for writes */
    758     public void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
     1102    void queuedWrite(ByteBuffer buf, FIFOBandwidthLimiter.Request req) {
    7591103        req.attach(buf);
    7601104        req.setCompleteListener(_outboundListener);
     
    7681112     * and call EventPumper.releaseBuf().
    7691113     */
    770     public void recv(ByteBuffer buf) {
    771         _bytesReceived += buf.remaining();
     1114    void recv(ByteBuffer buf) {
     1115        if (isClosed()) {
     1116            if (_log.shouldWarn())
     1117                _log.warn("recv() on closed con");
     1118            return;
     1119        }
     1120        synchronized(this) {
     1121            _bytesReceived += buf.remaining();
     1122            updateStats();
     1123        }
    7721124        _readBufs.offer(buf);
    7731125        _transport.getReader().wantsRead(this);
    774         updateStats();
    7751126    }
    7761127
     
    7791130     * been fully allocated for the bandwidth limiter.
    7801131     */
    781     public void write(ByteBuffer buf) {
     1132    void write(ByteBuffer buf) {
    7821133        _writeBufs.offer(buf);
    7831134        _transport.getPumper().wantsWrite(this);
     
    7851136   
    7861137    /** @return null if none available */
    787     public ByteBuffer getNextReadBuf() {
     1138    ByteBuffer getNextReadBuf() {
    7881139        return _readBufs.poll();
    7891140    }
     
    7931144     * @since 0.8.12
    7941145     */
    795     public boolean isWriteBufEmpty() {
     1146    boolean isWriteBufEmpty() {
    7961147        return _writeBufs.isEmpty();
    7971148    }
    7981149
    7991150    /** @return null if none available */
    800     public ByteBuffer getNextWriteBuf() {
     1151    ByteBuffer getNextWriteBuf() {
    8011152        return _writeBufs.peek(); // not remove!  we removeWriteBuf afterwards
    8021153    }
     
    8051156     *  Remove the buffer, which _should_ be the one at the head of _writeBufs
    8061157     */
    807     public void removeWriteBuf(ByteBuffer buf) {
    808         _bytesSent += buf.capacity();
    809         OutNetMessage msg = null;
    810         boolean clearMessage = false;
    811         if (_sendingMeta && (buf.capacity() == _meta.length)) {
    812             _sendingMeta = false;
    813         } else {
    814             clearMessage = true;
     1158    void removeWriteBuf(ByteBuffer buf) {
     1159        // never clear OutNetMessages during establish phase
     1160        boolean clearMessage = isEstablished();
     1161        synchronized(this) {
     1162            _bytesSent += buf.capacity();
     1163            if (_sendingMeta && (buf.capacity() == META_SIZE)) {
     1164                _sendingMeta = false;
     1165                clearMessage = false;
     1166            }
     1167            updateStats();
    8151168        }
    8161169        _writeBufs.remove(buf);
    8171170        if (clearMessage) {
     1171            List<OutNetMessage> msgs = null;
    8181172            // see synchronization comments in prepareNextWriteFast()
    819             synchronized (_outbound) {
    820                 if (_currentOutbound != null) {
    821                     msg = _currentOutbound;
    822                     _currentOutbound = null;
     1173            synchronized (_currentOutbound) {
     1174                if (!_currentOutbound.isEmpty()) {
     1175                    msgs = new ArrayList<OutNetMessage>(_currentOutbound);
     1176                    _currentOutbound.clear();
    8231177                }
    8241178            }
    825             if (msg != null) {
     1179            // push through the bw limiter to reach _writeBufs
     1180            if (!_outbound.isEmpty())
     1181                _transport.getWriter().wantsWrite(this, "write completed");
     1182            if (msgs != null) {
    8261183                _lastSendTime = _context.clock().now();
    827                 _context.statManager().addRateData("ntcp.sendTime", msg.getSendTime());
    828                 if (_log.shouldLog(Log.DEBUG)) {
    829                     _log.debug("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
    830                               + msg.getSendTime() + "/"
    831                               + msg.getLifetime()
    832                               + " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")");
     1184                // stats once is fine for all of them
     1185                _context.statManager().addRateData("ntcp.sendTime", msgs.get(0).getSendTime());
     1186                for (OutNetMessage msg : msgs) {
     1187                    if (_log.shouldLog(Log.DEBUG)) {
     1188                        _log.debug("I2NP message " + _messagesWritten + "/" + msg.getMessageId() + " sent after "
     1189                                  + msg.getSendTime() + "/"
     1190                                  + msg.getLifetime()
     1191                                  + " with " + buf.capacity() + " bytes (uid=" + System.identityHashCode(msg)+" on " + toString() + ")");
     1192                    }
     1193                    _transport.sendComplete(msg);
    8331194                }
    834                 _messagesWritten.incrementAndGet();
    835                 _transport.sendComplete(msg);
     1195                _messagesWritten.addAndGet(msgs.size());
    8361196            }
    8371197        } else {
     1198            // push through the bw limiter to reach _writeBufs
     1199            if (!_outbound.isEmpty())
     1200                _transport.getWriter().wantsWrite(this, "write completed");
    8381201            if (_log.shouldLog(Log.INFO))
    8391202                _log.info("I2NP meta message sent completely");
    840         }
    841        
    842         if (getOutboundQueueSize() > 0) // push through the bw limiter to reach _writeBufs
    843             _transport.getWriter().wantsWrite(this, "write completed");
    844 
    845         updateStats();
     1203            // need to increment as EventPumper will close conn if not completed
     1204            _messagesWritten.incrementAndGet();
     1205        }
    8461206    }
    8471207       
     
    8551215    private float _recvBps;
    8561216   
    857     public float getSendRate() { return _sendBps; }
    858     public float getRecvRate() { return _recvBps; }
     1217    public synchronized float getSendRate() { return _sendBps; }
     1218    public synchronized float getRecvRate() { return _recvBps; }
    8591219   
    8601220    /**
    8611221     *  Stats only for console
    8621222     */
    863     private void updateStats() {
     1223    private synchronized void updateStats() {
    8641224        long now = _context.clock().now();
    8651225        long time = now - _lastRateUpdated;
     
    8931253     * BUT it must copy out the data
    8941254     * as reader will call EventPumper.releaseBuf().
     1255     *
     1256     * This is the entry point as called from Reader.processRead()
    8951257     */
    8961258    synchronized void recvEncryptedI2NP(ByteBuffer buf) {
    897         // hasArray() is false for direct buffers, at least on my system...
    898         if (_curReadBlockIndex == 0 && buf.hasArray()) {
    899             // fast way
    900             int tot = buf.remaining();
    901             if (tot >= 32 && tot % 16 == 0) {
    902                 recvEncryptedFast(buf);
    903                 return;
    904             }
    905         }
    906 
    907         while (buf.hasRemaining() && !_closed.get()) {
    908             int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex);
    909             if (want > 0) {
    910                 buf.get(_curReadBlock, _curReadBlockIndex, want);
    911                 _curReadBlockIndex += want;
    912             }
    913             if (_curReadBlockIndex >= BLOCK_SIZE) {
    914                 // cbc
    915                 _context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0);
    916                 for (int i = 0; i < BLOCK_SIZE; i++) {
    917                     _decryptBlockBuf[i] ^= _prevReadBlock[i];
    918                 }
    919                 boolean ok = recvUnencryptedI2NP();
    920                 if (!ok) {
    921                     if (_log.shouldLog(Log.INFO))
    922                         _log.info("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
    923                     _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1);
    924                     return;
    925                 }
    926                 byte swap[] = _prevReadBlock;
    927                 _prevReadBlock = _curReadBlock;
    928                 _curReadBlock = swap;
    929                 _curReadBlockIndex = 0;
    930             }
    931         }
    932     }
    933 
    934     /**
    935      *  Decrypt directly out of the ByteBuffer instead of copying the bytes
    936      *  16 at a time to the _curReadBlock / _prevReadBlock flip buffers.
    937      *
    938      *  More efficient but can only be used if buf.hasArray == true AND
    939      *  _curReadBlockIndex must be 0 and buf.getRemaining() % 16 must be 0
    940      *  and buf.getRemaining() must be >= 16.
    941      *  All this is true for most buffers.
    942      *  In theory this could be fixed up to handle the other cases too but that's hard.
    943      *  Caller must synchronize!
    944      *  @since 0.8.12
    945      */
    946     private void recvEncryptedFast(ByteBuffer buf) {
    947         byte[] array = buf.array();
    948         int pos = buf.arrayOffset();
    949         int end = pos + buf.remaining();
    950         boolean first = true;
    951 
    952         for ( ; pos < end && !_closed.get(); pos += BLOCK_SIZE) {
    953             _context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0);
    954             if (first) {
    955                 for (int i = 0; i < BLOCK_SIZE; i++) {
    956                     _decryptBlockBuf[i] ^= _prevReadBlock[i];
    957                 }
    958                 first = false;
    959             } else {
    960                 int start = pos - BLOCK_SIZE;
    961                 for (int i = 0; i < BLOCK_SIZE; i++) {
    962                     _decryptBlockBuf[i] ^= array[start + i];
    963                 }
    964             }
    965             boolean ok = recvUnencryptedI2NP();
    966             if (!ok) {
    967                 if (_log.shouldLog(Log.INFO))
    968                     _log.info("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
    969                 _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1);
    970                 return;
    971             }
    972         }
    973         // ...and copy to _prevReadBlock the last time
    974         System.arraycopy(array, end - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
    975     }
    976    
    977     /**
    978      *  Append the next 16 bytes of cleartext to the read state.
    979      *  _decryptBlockBuf contains another cleartext block of I2NP to parse.
    980      *  Caller must synchronize!
    981      *  @return success
    982      */
    983     private boolean recvUnencryptedI2NP() {
    984         _curReadState.receiveBlock(_decryptBlockBuf);
    985         // FIXME move check to ReadState; must we close? possible attack vector?
    986         if (_curReadState.getSize() > BUFFER_SIZE) {
    987             if (_log.shouldLog(Log.WARN))
    988                 _log.warn("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString());
    989             _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _curReadState.getSize());
    990             close();
    991             return false;
    992         } else {
    993             return true;
    994         }
     1259        if (_curReadState == null)
     1260            throw new IllegalStateException("not established");
     1261        _curReadState.receive(buf);
    9951262    }
    9961263   
     
    9981265    * One special case is a metadata message where the sizeof(data) is 0.  In
    9991266    * that case, the unencrypted message is encoded as:
     1267    *
     1268    * <pre>
    10001269    *  +-------+-------+-------+-------+-------+-------+-------+-------+
    10011270    *  |       0       |      timestamp in seconds     | uninterpreted             
     
    10031272    *          uninterpreted           | adler checksum of sz+data+pad |
    10041273    *  +-------+-------+-------+-------+-------+-------+-------+-------+
     1274    * </pre>
    10051275    *
     1276    *  Caller must synch
     1277    *
     1278    *  @param unencrypted 16 bytes starting at off
     1279    *  @param off the offset
    10061280    */
    1007     private void readMeta(byte unencrypted[]) {
    1008         long ourTs = (_context.clock().now() + 500) / 1000;
    1009         long ts = DataHelper.fromLong(unencrypted, 2, 4);
     1281    private void readMeta(byte unencrypted[], int off) {
    10101282        Adler32 crc = new Adler32();
    1011         crc.update(unencrypted, 0, unencrypted.length-4);
     1283        crc.update(unencrypted, off, META_SIZE - 4);
    10121284        long expected = crc.getValue();
    1013         long read = DataHelper.fromLong(unencrypted, unencrypted.length-4, 4);
     1285        long read = DataHelper.fromLong(unencrypted, off + META_SIZE - 4, 4);
    10141286        if (read != expected) {
    10151287            if (_log.shouldLog(Log.WARN))
     
    10181290            close();
    10191291            return;
    1020         } else {
    1021             long newSkew = (ourTs - ts);
    1022             if (Math.abs(newSkew*1000) > Router.CLOCK_FUDGE_FACTOR) {
    1023                 if (_log.shouldLog(Log.WARN))
    1024                     _log.warn("Peer's skew jumped too far (from " + _clockSkew + " s to " + newSkew + " s): " + toString());
    1025                 _context.statManager().addRateData("ntcp.corruptSkew", newSkew);
    1026                 close();
    1027                 return;
    1028             }
    1029             _context.statManager().addRateData("ntcp.receiveMeta", newSkew);
    1030             if (_log.shouldLog(Log.DEBUG))
    1031                 _log.debug("Received NTCP metadata, old skew of " + _clockSkew + " s, new skew of " + newSkew + "s.");
    1032             // FIXME does not account for RTT
    1033             _clockSkew = newSkew;
    1034         }
     1292        }
     1293        long ts = DataHelper.fromLong(unencrypted, off + 2, 4);
     1294        receiveTimestamp(ts);
     1295    }
     1296
     1297    /**
     1298     *  Handle a received timestamp, NTCP 1 or 2.
     1299     *  Caller must synch
     1300     *
     1301     *  @param ts his timestamp in seconds, NOT ms
     1302     *  @since 0.9.36 pulled out of readMeta() above
     1303     */
     1304    private void receiveTimestamp(long ts) {
     1305        long ourTs = (_context.clock().now() + 500) / 1000;
     1306        long newSkew = (ourTs - ts);
     1307        if (Math.abs(newSkew*1000) > Router.CLOCK_FUDGE_FACTOR) {
     1308            if (_log.shouldLog(Log.WARN))
     1309                _log.warn("Peer's skew jumped too far (from " + _clockSkew + " s to " + newSkew + " s): " + toString());
     1310            _context.statManager().addRateData("ntcp.corruptSkew", newSkew);
     1311            close();
     1312            return;
     1313        }
     1314        _context.statManager().addRateData("ntcp.receiveMeta", newSkew);
     1315        if (_log.shouldLog(Log.DEBUG))
     1316            _log.debug("Received NTCP metadata, old skew of " + _clockSkew + " s, new skew of " + newSkew + "s.");
     1317        // FIXME does not account for RTT
     1318        _clockSkew = newSkew;
    10351319    }
    10361320
     
    10381322     * One special case is a metadata message where the sizeof(data) is 0.  In
    10391323     * that case, the unencrypted message is encoded as:
     1324     *
    10401325     *<pre>
    10411326     *  +-------+-------+-------+-------+-------+-------+-------+-------+
     
    10451330     *  +-------+-------+-------+-------+-------+-------+-------+-------+
    10461331     *</pre>
     1332     *
     1333     * Caller must synchronize.
    10471334     */
    10481335    private void sendMeta() {
    1049         byte encrypted[] = new byte[_meta.length];
    1050         synchronized (_meta) {
    1051             DataHelper.toLong(_meta, 0, 2, 0);
    1052             DataHelper.toLong(_meta, 2, 4, (_context.clock().now() + 500) / 1000);
    1053             _context.random().nextBytes(_meta, 6, 6);
    1054             Adler32 crc = new Adler32();
    1055             crc.update(_meta, 0, _meta.length-4);
    1056             DataHelper.toLong(_meta, _meta.length-4, 4, crc.getValue());
    1057             _context.aes().encrypt(_meta, 0, encrypted, 0, _sessionKey, _prevWriteEnd, 0, _meta.length);
    1058         }
    1059         System.arraycopy(encrypted, encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
     1336        byte[] data = new byte[META_SIZE];
     1337        DataHelper.toLong(data, 0, 2, 0);
     1338        DataHelper.toLong(data, 2, 4, (_context.clock().now() + 500) / 1000);
     1339        _context.random().nextBytes(data, 6, 6);
     1340        Adler32 crc = new Adler32();
     1341        crc.update(data, 0, META_SIZE - 4);
     1342        DataHelper.toLong(data, META_SIZE - 4, 4, crc.getValue());
     1343        _context.aes().encrypt(data, 0, data, 0, _sessionKey, _prevWriteEnd, 0, META_SIZE);
     1344        System.arraycopy(data, META_SIZE - 16, _prevWriteEnd, 0, _prevWriteEnd.length);
    10601345        // perhaps this should skip the bw limiter to reduce clock skew issues?
    10611346        if (_log.shouldLog(Log.DEBUG))
    10621347            _log.debug("Sending NTCP metadata");
    10631348        _sendingMeta = true;
    1064         _transport.getPumper().wantsWrite(this, encrypted);
     1349        _transport.getPumper().wantsWrite(this, data);
    10651350    }
    10661351   
     
    10991384    }
    11001385
     1386    private interface ReadState {
     1387        public void receive(ByteBuffer buf);
     1388        public void destroy();
     1389    }
     1390
    11011391    /**
    11021392     * Read the unencrypted message (16 bytes at a time).
     
    11161406     * J 16KB buffers for the cons actually transmitting, instead of one per
    11171407     * con (including idle ones)
    1118      */
    1119     private class ReadState {
     1408     *
     1409     * Call all methods from synchronized parent method.
     1410     *
     1411     */
     1412    private class NTCP1ReadState implements ReadState {
    11201413        private int _size;
    11211414        private ByteArray _dataBuf;
    11221415        private int _nextWrite;
    1123         private long _expectedCrc;
    11241416        private final Adler32 _crc;
    11251417        private long _stateBegin;
    11261418        private int _blocks;
    1127 
    1128         public ReadState() {
     1419        /** encrypted block of the current I2NP message being read */
     1420        private byte _curReadBlock[];
     1421        /** next byte to which data should be placed in the _curReadBlock */
     1422        private int _curReadBlockIndex;
     1423        private final byte _decryptBlockBuf[];
     1424        /** last AES block of the encrypted I2NP message (to serve as the next block's IV) */
     1425        private byte _prevReadBlock[];
     1426
     1427        /**
     1428         *  @param prevReadBlock 16 bytes AES IV
     1429         */
     1430        public NTCP1ReadState(byte[] prevReadBlock) {
    11291431            _crc = new Adler32();
     1432            _prevReadBlock = prevReadBlock;
     1433            _curReadBlock = new byte[BLOCK_SIZE];
     1434            _decryptBlockBuf = new byte[BLOCK_SIZE];
    11301435            init();
    11311436        }
     
    11341439            _size = -1;
    11351440            _nextWrite = 0;
    1136             _expectedCrc = -1;
    11371441            _stateBegin = -1;
    11381442            _blocks = -1;
     
    11411445                releaseReadBuf(_dataBuf);
    11421446            _dataBuf = null;
    1143         }
    1144 
    1145         public int getSize() { return _size; }
     1447            _curReadBlockIndex = 0;
     1448        }
     1449
     1450        /** @since 0.9.36 */
     1451        public void destroy() {
     1452            if (_dataBuf != null) {
     1453                releaseReadBuf(_dataBuf);
     1454                _dataBuf = null;
     1455            }
     1456            // TODO zero things out
     1457        }
     1458       
     1459        /**
     1460         * Connection must be established!
     1461         *
     1462         * The contents of the buffer include some fraction of one or more
     1463         * encrypted and encoded I2NP messages.  individual i2np messages are
     1464         * encoded as "sizeof(data)+data+pad+crc", and those are encrypted
     1465         * with the session key and the last 16 bytes of the previous encrypted
     1466         * i2np message.
     1467         *
     1468         * The NTCP connection now owns the buffer
     1469         * BUT it must copy out the data
     1470         * as reader will call EventPumper.releaseBuf().
     1471         *
     1472         * @since 0.9.36 moved from parent class
     1473         */
     1474        public void receive(ByteBuffer buf) {
     1475            // hasArray() is false for direct buffers, at least on my system...
     1476            if (_curReadBlockIndex == 0 && buf.hasArray()) {
     1477                // fast way
     1478                int tot = buf.remaining();
     1479                if (tot >= 32 && tot % 16 == 0) {
     1480                    recvEncryptedFast(buf);
     1481                    return;
     1482                }
     1483            }
     1484
     1485            while (buf.hasRemaining() && !_closed.get()) {
     1486                int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex);
     1487                if (want > 0) {
     1488                    buf.get(_curReadBlock, _curReadBlockIndex, want);
     1489                    _curReadBlockIndex += want;
     1490                }
     1491                if (_curReadBlockIndex >= BLOCK_SIZE) {
     1492                    // cbc
     1493                    _context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0);
     1494                    xor16(_prevReadBlock, _decryptBlockBuf);
     1495                    boolean ok = recvUnencryptedI2NP();
     1496                    if (!ok) {
     1497                        if (_log.shouldLog(Log.INFO))
     1498                            _log.info("Read buffer " + System.identityHashCode(buf) + " contained corrupt data, IV was: " + Base64.encode(_decryptBlockBuf));
     1499                        _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1);
     1500                        return;
     1501                    }
     1502                    byte swap[] = _prevReadBlock;
     1503                    _prevReadBlock = _curReadBlock;
     1504                    _curReadBlock = swap;
     1505                    _curReadBlockIndex = 0;
     1506                }
     1507            }
     1508        }
     1509
     1510        /**
     1511         *  Decrypt directly out of the ByteBuffer instead of copying the bytes
     1512         *  16 at a time to the _curReadBlock / _prevReadBlock flip buffers.
     1513         *
     1514         *  More efficient but can only be used if buf.hasArray == true AND
     1515         *  _curReadBlockIndex must be 0 and buf.getRemaining() % 16 must be 0
     1516         *  and buf.getRemaining() must be >= 16.
     1517         *  All this is true for most incoming buffers.
     1518         *  In theory this could be fixed up to handle the other cases too but that's hard.
     1519         *  Caller must synchronize!
     1520         *
     1521         *  @since 0.8.12, moved from parent class in 0.9.36
     1522         */
     1523        private void recvEncryptedFast(ByteBuffer buf) {
     1524            byte[] array = buf.array();
     1525            int pos = buf.arrayOffset() + buf.position();
     1526            int end = pos + buf.remaining();
     1527
     1528            // Copy to _curReadBlock for next IV...
     1529            System.arraycopy(array, end - BLOCK_SIZE, _curReadBlock, 0, BLOCK_SIZE);
     1530            // call aes().decrypt() to decrypt all at once, in place
     1531            // decrypt() will offload to the JVM/OS for larger sizes
     1532            _context.aes().decrypt(array, pos, array, pos, _sessionKey, _prevReadBlock, buf.remaining());
     1533
     1534            for ( ; pos < end; pos += BLOCK_SIZE) {
     1535                boolean ok = receiveBlock(array, pos);
     1536                if (!ok) {
     1537                    if (_log.shouldLog(Log.INFO))
     1538                        _log.info("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
     1539                    _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1);
     1540                    return;
     1541                }
     1542            }
     1543            // ...and flip to _prevReadBlock for next time
     1544            byte swap[] = _prevReadBlock;
     1545            _prevReadBlock = _curReadBlock;
     1546            _curReadBlock = swap;
     1547        }
     1548   
     1549        /**
     1550         *  Append the next 16 bytes of cleartext to the read state.
     1551         *  _decryptBlockBuf contains another cleartext block of I2NP to parse.
     1552         *  Caller must synchronize!
     1553         *
     1554         *  @return success
     1555         *  @since 0.9.36 moved from parent class
     1556         */
     1557        private boolean recvUnencryptedI2NP() {
     1558            return receiveBlock(_decryptBlockBuf, 0);
     1559        }
    11461560
    11471561        /**
    11481562         *  Caller must synchronize
    1149          *  @param buf 16 bytes
     1563         *  @param buf 16 bytes starting at off
     1564         *  @param off offset
     1565         *  @return success, only false on initial block with invalid size
    11501566         */
    1151         public void receiveBlock(byte buf[]) {
     1567        private boolean receiveBlock(byte buf[], int off) {
    11521568            if (_size == -1) {
    1153                 receiveInitial(buf);
     1569                return receiveInitial(buf, off);
    11541570            } else {
    1155                 receiveSubsequent(buf);
    1156             }
    1157         }
    1158 
    1159         /** @param buf 16 bytes */
    1160         private void receiveInitial(byte buf[]) {
    1161             _size = (int)DataHelper.fromLong(buf, 0, 2);
     1571                receiveSubsequent(buf, off);
     1572                return true;
     1573            }
     1574        }
     1575
     1576        /**
     1577         *  Caller must synchronize
     1578         *
     1579         *  @param buf 16 bytes starting at off
     1580         *  @param off offset
     1581         *  @return success
     1582         */
     1583        private boolean receiveInitial(byte buf[], int off) {
     1584            _size = (int)DataHelper.fromLong(buf, off, 2);
     1585            if (_size > BUFFER_SIZE) {
     1586                // this is typically an AES decryption error, not actually a large I2NP message
     1587                if (_log.shouldLog(Log.WARN))
     1588                    _log.warn("I2NP message too big - size: " + _size + " Closing " + NTCPConnection.this.toString(), new Exception());
     1589                _context.statManager().addRateData("ntcp.corruptTooLargeI2NP", _size);
     1590                close();
     1591                return false;
     1592            }
    11621593            if (_size == 0) {
    1163                 readMeta(buf);
     1594                readMeta(buf, off);
    11641595                init();
    11651596            } else {
    11661597                _stateBegin = _context.clock().now();
    11671598                _dataBuf = acquireReadBuf();
    1168                 System.arraycopy(buf, 2, _dataBuf.getData(), 0, buf.length-2);
    1169                 _nextWrite += buf.length-2;
    1170                 _crc.update(buf);
     1599                System.arraycopy(buf, off + 2, _dataBuf.getData(), 0, BLOCK_SIZE - 2);
     1600                _nextWrite += BLOCK_SIZE - 2;
     1601                _crc.update(buf, off, BLOCK_SIZE);
    11711602                _blocks++;
    11721603                if (_log.shouldLog(Log.DEBUG))
    11731604                    _log.debug("new I2NP message with size: " + _size + " for message " + _messagesRead);
    11741605            }
    1175         }
    1176 
    1177         /** @param buf 16 bytes */
    1178         private void receiveSubsequent(byte buf[]) {
     1606            return true;
     1607        }
     1608
     1609        /**
     1610         *  Caller must synchronize
     1611         *
     1612         *  @param buf 16 bytes starting at off
     1613         *  @param off offset
     1614         */
     1615        private void receiveSubsequent(byte buf[], int off) {
    11791616            _blocks++;
    11801617            int remaining = _size - _nextWrite;
    1181             int blockUsed = Math.min(buf.length, remaining);
     1618            int blockUsed = Math.min(BLOCK_SIZE, remaining);
    11821619            if (remaining > 0) {
    1183                 System.arraycopy(buf, 0, _dataBuf.getData(), _nextWrite, blockUsed);
     1620                System.arraycopy(buf, off, _dataBuf.getData(), _nextWrite, blockUsed);
    11841621                _nextWrite += blockUsed;
    11851622                remaining -= blockUsed;
    11861623            }
    1187             if ( (remaining <= 0) && (buf.length-blockUsed < 4) ) {
     1624            if ( (remaining <= 0) && (BLOCK_SIZE - blockUsed < 4) ) {
    11881625                // we've received all the data but not the 4-byte checksum
    11891626                if (_log.shouldLog(Log.DEBUG))
    11901627                    _log.debug("crc wraparound required on block " + _blocks + " in message " + _messagesRead);
    1191                 _crc.update(buf);
     1628                _crc.update(buf, off, BLOCK_SIZE);
    11921629                return;
    11931630            } else if (remaining <= 0) {
    1194                 receiveLastBlock(buf);
     1631                receiveLastBlock(buf, off);
    11951632            } else {
    1196                 _crc.update(buf);
    1197             }
    1198         }
    1199 
    1200         /** @param buf 16 bytes */
    1201         private void receiveLastBlock(byte buf[]) {
     1633                _crc.update(buf, off, BLOCK_SIZE);
     1634            }
     1635        }
     1636
     1637        /**
     1638         *  This checks the checksum in buf only.
     1639         *  All previous data, including that in buf, must have been copied to _dataBuf.
     1640         *  Note that the checksum does not cover the padding.
     1641         *  Caller must synchronize.
     1642         *
     1643         *  @param buf 16 bytes starting at off
     1644         *  @param off offset of the 16-byte block (NOT of the checksum only)
     1645         */
     1646        private void receiveLastBlock(byte buf[], int off) {
    12021647            // on the last block
    1203             _expectedCrc = DataHelper.fromLong(buf, buf.length-4, 4);
    1204             _crc.update(buf, 0, buf.length-4);
     1648            long expectedCrc = DataHelper.fromLong(buf, off + BLOCK_SIZE - 4, 4);
     1649            _crc.update(buf, off, BLOCK_SIZE - 4);
    12051650            long val = _crc.getValue();
    1206             if (val == _expectedCrc) {
     1651            if (val == expectedCrc) {
    12071652                try {
    12081653                    I2NPMessageHandler h = acquireHandler(_context);
     
    12331678                } catch (I2NPMessageException ime) {
    12341679                    if (_log.shouldLog(Log.WARN)) {
    1235                         _log.warn("Error parsing I2NP message" +
    1236                                   "\nDUMP:\n" + HexDump.dump(_dataBuf.getData(), 0, _size) +
    1237                                   "\nRAW:\n" + Base64.encode(_dataBuf.getData(), 0, _size) +
     1680                        _log.warn("Error parsing I2NP message on " + NTCPConnection.this +
     1681                                  "\nDUMP:\n" + HexDump.dump(_dataBuf.getData(), 0, _size),
    12381682                                  ime);
    12391683                    }
     
    12421686            } else {
    12431687                if (_log.shouldLog(Log.WARN))
    1244                     _log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
     1688                    _log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + expectedCrc +
     1689                              ") size=" + _size + " blocks=" + _blocks + " on: " + NTCPConnection.this);
    12451690                    _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1);
    12461691            }
     
    12501695    }
    12511696
     1697    //// NTCP2 below here
     1698
     1699    /**
     1700     * We are Alice. NTCP2 only.
     1701     *
     1702     * Caller MUST call recvEncryptedI2NP() after, for any remaining bytes in receive buffer
     1703     *
     1704     * @param clockSkew OUR clock minus BOB's clock in seconds (may be negative, obviously, but |val| should
     1705     *                  be under 1 minute)
     1706     * @param sender use to send to Bob
     1707     * @param receiver use to receive from Bob
     1708     * @param sip_ab 24 bytes to init SipHash to Bob
     1709     * @param sip_ba 24 bytes to init SipHash from Bob
     1710     * @since 0.9.36
     1711     */
     1712    synchronized void finishOutboundEstablishment(CipherState sender, CipherState receiver,
     1713                                                  byte[] sip_ab, byte[] sip_ba, long clockSkew) {
     1714        finishEstablishment(sender, receiver, sip_ab, sip_ba, clockSkew);
     1715        _paddingConfig = OUR_PADDING;
     1716        _transport.markReachable(getRemotePeer().calculateHash(), false);
     1717        if (!_outbound.isEmpty())
     1718            _transport.getWriter().wantsWrite(this, "outbound established");
     1719        // NTCP2 outbound cannot have extra data
     1720    }
     1721
     1722    /**
     1723     * We are Bob. NTCP2 only.
     1724     *
     1725     * Caller MUST call recvEncryptedI2NP() after, for any remaining bytes in receive buffer
     1726     *
     1727     * @param clockSkew OUR clock minus ALICE's clock in seconds (may be negative, obviously, but |val| should
     1728     *                  be under 1 minute)
     1729     * @param sender use to send to Alice
     1730     * @param receiver use to receive from Alice
     1731     * @param sip_ba 24 bytes to init SipHash to Alice
     1732     * @param sip_ab 24 bytes to init SipHash from Alice
     1733     * @param hisPadding may be null
     1734     * @since 0.9.36
     1735     */
     1736    synchronized void finishInboundEstablishment(CipherState sender, CipherState receiver,
     1737                                                 byte[] sip_ba, byte[] sip_ab, long clockSkew,
     1738                                                 NTCP2Options hisPadding) {
     1739        finishEstablishment(sender, receiver, sip_ba, sip_ab, clockSkew);
     1740        if (hisPadding != null) {
     1741            _paddingConfig = OUR_PADDING.merge(hisPadding);
     1742            if (_log.shouldWarn())
     1743                _log.warn("Got padding options:" +
     1744                          "\nhis padding options: " + hisPadding +
     1745                          "\nour padding options: " + OUR_PADDING +
     1746                          "\nmerged config is:    " + _paddingConfig);
     1747        } else {
     1748            _paddingConfig = OUR_PADDING;
     1749        }
     1750        NTCPConnection toClose = _transport.inboundEstablished(this);
     1751        if (toClose != null) {
     1752            if (_log.shouldLog(Log.DEBUG))
     1753                _log.debug("Old connection closed: " + toClose + " replaced by " + this);
     1754            _context.statManager().addRateData("ntcp.inboundEstablishedDuplicate", toClose.getUptime());
     1755            toClose.close();
     1756        }
     1757        enqueueInfoMessage();
     1758    }
     1759
     1760    /**
     1761     * We are Bob. NTCP2 only.
     1762     * This is only for invalid payload received in message 3. We send a termination and close.
     1763     * There will be no receiving.
     1764     *
     1765     * @param sender use to send to Alice
     1766     * @param sip_ba 24 bytes to init SipHash to Alice
     1767     * @since 0.9.36
     1768     */
     1769    synchronized void failInboundEstablishment(CipherState sender, byte[] sip_ba, int reason) {
     1770        _sender = sender;
     1771        _sendSipk1 = fromLong8LE(sip_ba, 0);
     1772        _sendSipk2 = fromLong8LE(sip_ba, 8);
     1773        _sendSipIV = new byte[SIP_IV_LENGTH];
     1774        System.arraycopy(sip_ba, 16, _sendSipIV, 0, SIP_IV_LENGTH);
     1775        if (_log.shouldWarn())
     1776            _log.warn("Send SipHash keys: " + _sendSipk1 + ' ' + _sendSipk2 + ' ' + Base64.encode(_sendSipIV));
     1777        _establishState = EstablishBase.VERIFIED;
     1778        _establishedOn = _context.clock().now();
     1779        _nextMetaTime = Long.MAX_VALUE;
     1780        _nextInfoTime = Long.MAX_VALUE;
     1781        sendTermination(reason, 0);
     1782        try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
     1783        close();
     1784    }
     1785
     1786    /**
     1787     * We are Alice or Bob. NTCP2 only.
     1788     *
     1789     * @param clockSkew see above
     1790     * @param sender use to send
     1791     * @param receiver use to receive
     1792     * @param sip_send 24 bytes to init SipHash out
     1793     * @param sip_recv 24 bytes to init SipHash in
     1794     * @since 0.9.36
     1795     */
     1796    private synchronized void finishEstablishment(CipherState sender, CipherState receiver,
     1797                                                  byte[] sip_send, byte[] sip_recv, long clockSkew) {
     1798        if (_establishState == EstablishBase.VERIFIED) {
     1799            IllegalStateException ise = new IllegalStateException("Already finished on " + this);
     1800            _log.error("Already finished", ise);
     1801            throw ise;
     1802        }
     1803        _sender = sender;
     1804        _sendSipk1 = fromLong8LE(sip_send, 0);
     1805        _sendSipk2 = fromLong8LE(sip_send, 8);
     1806        _sendSipIV = new byte[SIP_IV_LENGTH];
     1807        System.arraycopy(sip_send, 16, _sendSipIV, 0, SIP_IV_LENGTH);
     1808        if (_log.shouldWarn())
     1809            _log.warn("Send SipHash keys: " + _sendSipk1 + ' ' + _sendSipk2 + ' ' + Base64.encode(_sendSipIV));
     1810        _clockSkew = clockSkew;
     1811        _establishState = EstablishBase.VERIFIED;
     1812        _establishedOn = _context.clock().now();
     1813        _nextMetaTime = _establishedOn + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY);
     1814        _nextInfoTime = _establishedOn + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY);
     1815        _curReadState = new NTCP2ReadState(receiver, sip_recv);
     1816    }
     1817
     1818    /**
     1819     * Read the encrypted message
     1820     *
     1821     * Call all methods from synchronized parent method.
     1822     *
     1823     * @since 0.9.36
     1824     */
     1825    private class NTCP2ReadState implements ReadState, NTCP2Payload.PayloadCallback {
     1826        // temp to read the encrypted lengh into
     1827        private final byte[] _recvLen = new byte[2];
     1828        private final long _sipk1, _sipk2;
     1829        // the siphash ratchet, as a byte array
     1830        private final byte[] _sipIV = new byte[SIP_IV_LENGTH];
     1831        private final CipherState _rcvr;
     1832        // the size of the next frame, only valid if _received >= 0
     1833        private int _framelen;
     1834        // bytes received, -2 to _framelen
     1835        private int _received = -2;
     1836        private ByteArray _dataBuf;
     1837        // Valid frames received in data phase
     1838        private int _frameCount;
     1839        // for logging only
     1840        private int _blockCount;
     1841        private boolean _terminated;
     1842
     1843        /**
     1844         *  @param keyData using first 24 bytes
     1845         */
     1846        public NTCP2ReadState(CipherState rcvr, byte[] keyData) {
     1847            _rcvr = rcvr;
     1848            _sipk1 = fromLong8LE(keyData, 0);
     1849            _sipk2 = fromLong8LE(keyData, 8);
     1850            System.arraycopy(keyData, 16, _sipIV, 0, SIP_IV_LENGTH);
     1851            if (_log.shouldWarn())
     1852                _log.warn("Recv SipHash keys: " + _sipk1 + ' ' + _sipk2 + ' ' + Base64.encode(_sipIV));
     1853        }
     1854
     1855        public void receive(ByteBuffer buf) {
     1856            if (_terminated)
     1857                return;
     1858            while (buf.hasRemaining()) {
     1859                if (_received == -2) {
     1860                    _recvLen[0] = buf.get();
     1861                    _received++;
     1862                }
     1863                if (_received == -1 && buf.hasRemaining()) {
     1864                    _recvLen[1] = buf.get();
     1865                    _received++;
     1866                    long sipIV = SipHashInline.hash24(_sipk1, _sipk2, _sipIV);
     1867                    //if (_log.shouldDebug())
     1868                    //    _log.debug("Got Encrypted frame length: " + DataHelper.fromLong(_recvLen, 0, 2) +
     1869                    //               " byte 1: " + (_recvLen[0] & 0xff) + " byte 2: " + (_recvLen[1] & 0xff) +
     1870                    //               " decrypting with keys " + _sipk1 + ' ' + _sipk2 + ' ' + Base64.encode(_sipIV) + ' ' + sipIV);
     1871                    _recvLen[0] ^= (byte) (sipIV >> 8);
     1872                    _recvLen[1] ^= (byte) sipIV;
     1873                    toLong8LE(_sipIV, 0, sipIV);
     1874                    _framelen = (int) DataHelper.fromLong(_recvLen, 0, 2);
     1875                    if (_framelen < OutboundNTCP2State.MAC_SIZE) {
     1876                        if (_log.shouldWarn())
     1877                            _log.warn("Short frame length: " + _framelen);
     1878                        // set a random length, then close
     1879                        delayedClose(buf, _frameCount);
     1880                        return;
     1881                    }
     1882                    //if (_log.shouldDebug())
     1883                    //    _log.debug("Next frame length: " + _framelen);
     1884                }
     1885                int remaining = buf.remaining();
     1886                if (remaining <= 0)
     1887                    return;
     1888                if (_received == 0 && remaining >= _framelen) {
     1889                    // shortcut, zero copy, decrypt directly to the ByteBuffer,
     1890                    // overwriting the encrypted data
     1891                    byte[] data = buf.array();
     1892                    int pos = buf.position();
     1893                    boolean ok = decryptAndProcess(data, pos);
     1894                    buf.position(pos + _framelen);
     1895                    if (!ok) {
     1896                        delayedClose(buf, _frameCount);
     1897                        return;
     1898                    }
     1899                    continue;
     1900                }
     1901
     1902                // allocate ByteArray,
     1903                // unless we have one already and it's big enough
     1904                if (_received == 0 && (_dataBuf == null || _dataBuf.getData().length < _framelen)) {
     1905                    if (_dataBuf != null && _dataBuf.getData().length == BUFFER_SIZE)
     1906                        releaseReadBuf(_dataBuf);
     1907                    if (_framelen > BUFFER_SIZE) {
     1908                        if (_log.shouldWarn())
     1909                            _log.warn("Allocating big ByteArray: " + _framelen);
     1910                        byte[] data = new byte[_framelen];
     1911                        _dataBuf = new ByteArray(data);
     1912                    } else {
     1913                        _dataBuf = acquireReadBuf();
     1914                    }
     1915                }
     1916
     1917                // We now have a ByteArray in _dataBuf,
     1918                // copy from ByteBuffer to ByteArray
     1919                int toGet = Math.min(buf.remaining(), _framelen - _received);
     1920                byte[] data = _dataBuf.getData();
     1921                buf.get(data, _received, toGet);
     1922                _received += toGet;
     1923                if (_received < _framelen)
     1924                    return;
     1925                // decrypt to the ByteArray, overwriting the encrypted data
     1926                boolean ok = decryptAndProcess(data, 0);
     1927                // release buf only if we're not going around again
     1928                if (!ok || buf.remaining() < 2) {
     1929                    if (!ok)
     1930                        delayedClose(buf, _frameCount);
     1931                    // delayedClose() may have zeroed out _databuf
     1932                    if (_dataBuf != null) {
     1933                        if (_dataBuf.getData().length == BUFFER_SIZE)
     1934                            releaseReadBuf(_dataBuf);
     1935                        _dataBuf = null;
     1936                    }
     1937                    if (!ok)
     1938                        return;
     1939                }
     1940                // go around again
     1941            }
     1942        }
     1943
     1944        /**
     1945         *  Decrypts in place.
     1946         *  Length is _framelen
     1947         *  Side effects: Sets _received = -2, increments _frameCount and _blockCount if valid
     1948         *
     1949         *  Does not call close() on failure. Caller MUST call delayedClose() if this returns false.
     1950         *
     1951         *  @return success, false for fatal error (AEAD) only
     1952         */
     1953        private boolean decryptAndProcess(byte[] data, int off) {
     1954            if (_log.shouldWarn())
     1955                _log.warn("Decrypting frame " + _frameCount + " with " + _framelen + " bytes");
     1956            try {
     1957                _rcvr.decryptWithAd(null, data, off, data, off, _framelen);
     1958            } catch (GeneralSecurityException gse) {
     1959                // TODO set a random length, then close
     1960                if (_log.shouldWarn())
     1961                    _log.warn("Bad AEAD data phase frame " + _frameCount + " on " + NTCPConnection.this, gse);
     1962                return false;
     1963            }
     1964            try {
     1965                int blocks = NTCP2Payload.processPayload(_context, this, data, off,
     1966                                                         _framelen - OutboundNTCP2State.MAC_SIZE, false);
     1967                if (_log.shouldWarn())
     1968                    _log.warn("Processed " + blocks + " blocks in frame");
     1969                _blockCount += blocks;
     1970            } catch (IOException ioe) {
     1971                if (_log.shouldWarn())
     1972                    _log.warn("Fail payload " + NTCPConnection.this, ioe);
     1973            } catch (DataFormatException dfe) {
     1974                if (_log.shouldWarn())
     1975                    _log.warn("Fail payload " + NTCPConnection.this, dfe);
     1976            } catch (I2NPMessageException ime) {
     1977                if (_log.shouldWarn())
     1978                    _log.warn("Error parsing I2NP message on " + NTCPConnection.this, ime);
     1979                _context.statManager().addRateData("ntcp.corruptI2NPIME", 1);
     1980            }
     1981            _received = -2;
     1982            _frameCount++;
     1983            return !_terminated;
     1984        }
     1985
     1986        public void destroy() {
     1987            if (_dataBuf != null && _dataBuf.getData().length == BUFFER_SIZE)
     1988                releaseReadBuf(_dataBuf);
     1989            _dataBuf = null;
     1990            _rcvr.destroy();
     1991            _terminated = true;
     1992        }
     1993
     1994        //// PayloadCallbacks
     1995
     1996        public void gotRI(RouterInfo ri, boolean isHandshake, boolean flood) throws DataFormatException {
     1997            if (_log.shouldWarn())
     1998                _log.warn("Got updated RI");
     1999            _messagesRead.incrementAndGet();
     2000            try {
     2001                Hash h = ri.getHash();
     2002                RouterInfo old = _context.netDb().store(h, ri);
     2003                if (flood && !ri.equals(old)) {
     2004                    FloodfillNetworkDatabaseFacade fndf = (FloodfillNetworkDatabaseFacade) _context.netDb();
     2005                    if (fndf.floodConditional(ri)) {
     2006                        if (_log.shouldLog(Log.WARN))
     2007                            _log.warn("Flooded the RI: " + h);
     2008                    } else {
     2009                        if (_log.shouldLog(Log.WARN))
     2010                            _log.warn("Flood request but we didn't: " + h);
     2011                    }
     2012                }
     2013            } catch (IllegalArgumentException iae) {
     2014                throw new DataFormatException("RI store fail", iae);
     2015            }
     2016        }
     2017
     2018        public void gotDateTime(long time) {
     2019            if (_log.shouldWarn())
     2020                _log.warn("Got updated datetime block");
     2021            receiveTimestamp((time + 500) / 1000);
     2022            // update skew
     2023        }
     2024
     2025        public void gotI2NP(I2NPMessage msg) {
     2026            if (_log.shouldWarn())
     2027                _log.warn("Got I2NP msg: " + msg);
     2028            long timeToRecv = 0; // _context.clock().now() - _stateBegin;
     2029            int size = 100; // FIXME
     2030            _transport.messageReceived(msg, _remotePeer, null, timeToRecv, size);
     2031            _lastReceiveTime = _context.clock().now();
     2032            _messagesRead.incrementAndGet();
     2033            // TEST send back. null RI for target, not necesary
     2034            //if (_context.getBooleanProperty("i2np.ntcp2.loopback"))
     2035            //  send(new OutNetMessage(_context, msg, _context.clock().now() + 10*1000, OutNetMessage.PRIORITY_MY_DATA, null));
     2036        }
     2037
     2038        public void gotOptions(byte[] options, boolean isHandshake) {
     2039            if (options.length < 12) {
     2040                if (_log.shouldWarn())
     2041                    _log.warn("Got options length " + options.length + " on: " + this);
     2042                return;
     2043            }
     2044            float tmin = (options[0] & 0xff) / 16.0f;
     2045            float tmax = (options[1] & 0xff) / 16.0f;
     2046            float rmin = (options[2] & 0xff) / 16.0f;
     2047            float rmax = (options[3] & 0xff) / 16.0f;
     2048            int tdummy = (int) DataHelper.fromLong(options, 4, 2);
     2049            int rdummy = (int) DataHelper.fromLong(options, 6, 2);
     2050            int tdelay = (int) DataHelper.fromLong(options, 8, 2);
     2051            int rdelay = (int) DataHelper.fromLong(options, 10, 2);
     2052            NTCP2Options hisPadding = new NTCP2Options(tmin, tmax, rmin, rmax,
     2053                                                       tdummy, rdummy, tdelay, rdelay);
     2054            _paddingConfig = OUR_PADDING.merge(hisPadding);
     2055            if (_log.shouldWarn())
     2056                _log.warn("Got padding options:" +
     2057                          "\nhis padding options: " + hisPadding +
     2058                          "\nour padding options: " + OUR_PADDING +
     2059                          "\nmerged config is:    " + _paddingConfig);
     2060        }
     2061
     2062        public void gotTermination(int reason, long lastReceived) {
     2063            if (_log.shouldWarn())
     2064                _log.warn("Got Termination: " + reason + " total rcvd: " + lastReceived);
     2065            _terminated = true;
     2066            close();
     2067        }
     2068
     2069        public void gotUnknown(int type, int len) {
     2070            if (_log.shouldWarn())
     2071                _log.warn("Got unknown block type " + type + " length " + len);
     2072        }
     2073
     2074        public void gotPadding(int paddingLength, int frameLength) {
     2075            if (_log.shouldWarn())
     2076                _log.warn("Got " + paddingLength +
     2077                          " bytes padding in " + frameLength +
     2078                          " byte frame; ratio: " + (((float) paddingLength) / ((float) frameLength)) +
     2079                          " configured min: " + _paddingConfig.getRecvMin() +
     2080                          " configured max: " + _paddingConfig.getRecvMax());
     2081        }
     2082    }
     2083
     2084    /**
     2085     * After an AEAD failure, read a random number of bytes,
     2086     * with a brief timeout, and then fail.
     2087     * This replaces _curReadState, so no more messages will be received.
     2088     *
     2089     * @param buf possibly with data remaining
     2090     * @param validFramesRcvd to be sent in termination message
     2091     * @since 0.9.36
     2092     */
     2093    private void delayedClose(ByteBuffer buf, int validFramesRcvd) {
     2094        int toRead = 18 + _context.random().nextInt(NTCP2_FAIL_READ);
     2095        int remaining = toRead - buf.remaining();
     2096        if (remaining > 0) {
     2097            if (_log.shouldWarn())
     2098                _log.warn("delayed close after AEAD failure, to read: " + toRead);
     2099            _curReadState = new NTCP2FailState(toRead, validFramesRcvd);
     2100            _curReadState.receive(buf);
     2101        } else {
     2102            if (_log.shouldWarn())
     2103                _log.warn("immediate close after AEAD failure and reading " + toRead);
     2104            sendTermination(REASON_AEAD, validFramesRcvd);
     2105            try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
     2106            close();
     2107        }
     2108    }
     2109
     2110    /**
     2111     * After an AEAD failure, read a random number of bytes,
     2112     * with a brief timeout, and then fail.
     2113     *
     2114     * Call all methods from synchronized parent method.
     2115     *
     2116     * @since 0.9.36
     2117     */
     2118    private class NTCP2FailState extends SimpleTimer2.TimedEvent implements ReadState {
     2119        private final int _toRead;
     2120        private final int _validFramesRcvd;
     2121        private int _read;
     2122
     2123        /**
     2124         *  @param toRead how much left to read
     2125         *  @param validFramesRcvd to be sent in termination message
     2126         */
     2127        public NTCP2FailState(int toRead, int validFramesRcvd) {
     2128            super(_context.simpleTimer2());
     2129            _toRead = toRead;
     2130            _validFramesRcvd = validFramesRcvd;
     2131            schedule(NTCP2_FAIL_TIMEOUT);
     2132        }
     2133
     2134        public void receive(ByteBuffer buf) {
     2135            _read += buf.remaining();
     2136            if (_read >= _toRead) {
     2137                cancel();
     2138                if (_log.shouldWarn())
     2139                    _log.warn("close after AEAD failure and reading " + _toRead);
     2140                sendTermination(REASON_AEAD, _validFramesRcvd);
     2141                try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
     2142                close();
     2143            }
     2144        }
     2145
     2146        public void destroy() {
     2147            cancel();
     2148        }
     2149
     2150        public void timeReached() {
     2151            if (_log.shouldWarn())
     2152                _log.warn("timeout after AEAD failure waiting for more data");
     2153            sendTermination(REASON_AEAD, _validFramesRcvd);
     2154            try { Thread.sleep(NTCP2_TERMINATION_CLOSE_DELAY); } catch (InterruptedException ie) {}
     2155            close();
     2156        }
     2157    }
     2158
     2159    //// Utils
     2160
     2161    /**
     2162     *  XOR a into b. Modifies b. a is unmodified.
     2163     *  @param a 16 bytes
     2164     *  @param b 16 bytes
     2165     *  @since 0.9.36
     2166     */
     2167    private static void xor16(byte[] a, byte[] b) {
     2168        for (int i = 0; i < BLOCK_SIZE; i++) {
     2169            b[i] ^= a[i];
     2170        }
     2171    }
     2172
     2173    /**
     2174     * Little endian.
     2175     * Same as DataHelper.fromlongLE(src, offset, 8) but allows negative result
     2176     *
     2177     * @throws ArrayIndexOutOfBoundsException
     2178     * @since 0.9.36
     2179     */
     2180    private static long fromLong8LE(byte src[], int offset) {
     2181        long rv = 0;
     2182        for (int i = offset + 7; i >= offset; i--) {
     2183            rv <<= 8;
     2184            rv |= src[i] & 0xFF;
     2185        }
     2186        return rv;
     2187    }
     2188   
     2189    /**
     2190     * Little endian.
     2191     * Same as DataHelper.fromlongLE(target, offset, 8, value) but allows negative value
     2192     *
     2193     */
     2194    private static void toLong8LE(byte target[], int offset, long value) {
     2195        int limit = offset + 8;
     2196        for (int i = offset; i < limit; i++) {
     2197            target[i] = (byte) value;
     2198            value >>= 8;
     2199        }
     2200    }
     2201
    12522202    @Override
    12532203    public String toString() {
    1254         return "NTCP conn " +
     2204        return "NTCP" + _version + " conn " +
    12552205               _connID +
    1256                (_isInbound ? " from " : " to ") +
     2206               (_isInbound ? (" from " + _chan.socket().getInetAddress() + " port " + _chan.socket().getPort() + ' ')
     2207                           : (" to " + _remAddr.getHost() + " port " + _remAddr.getPort() + ' ')) +
    12572208               (_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) +
    12582209               (isEstablished() ? "" : " not established") +
     
    12602211               " last send " + DataHelper.formatDuration(getTimeSinceSend()) + " ago," +
    12612212               " last recv " + DataHelper.formatDuration(getTimeSinceReceive()) + " ago," +
    1262                " sent " + _messagesWritten + "," +
     2213               " sent " + _messagesWritten + ',' +
    12632214               " rcvd " + _messagesRead;
    12642215    }
  • router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java

    r49221ad rae8779e  
    99import java.nio.channels.ServerSocketChannel;
    1010import java.nio.channels.SocketChannel;
     11import java.security.KeyPair;
    1112import java.text.DecimalFormat;
    1213import java.text.NumberFormat;
     
    4647import static net.i2p.router.transport.TransportUtil.IPv6Config.*;
    4748import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
     49import net.i2p.router.transport.crypto.X25519KeyFactory;
     50import net.i2p.router.transport.crypto.X25519PublicKey;
     51import net.i2p.router.transport.crypto.X25519PrivateKey;
    4852import net.i2p.router.util.DecayingHashSet;
    4953import net.i2p.router.util.DecayingBloomFilter;
     
    98102    public final static String PROP_I2NP_NTCP_AUTO_IP = "i2np.ntcp.autoip";
    99103    private static final String PROP_ADVANCED = "routerconsole.advanced";
    100     public static final int DEFAULT_COST = 10;
     104    private static final int DEFAULT_COST = 10;
     105    private static final int NTCP2_OUTBOUND_COST = 14;
    101106   
    102107    /** this is rarely if ever used, default is to bind to wildcard address */
     
    105110    private final NTCPSendFinisher _finisher;
    106111    private final DHSessionKeyBuilder.Factory _dhFactory;
     112    private final X25519KeyFactory _xdhFactory;
    107113    private long _lastBadSkew;
    108114    private static final long[] RATES = { 10*60*1000 };
     
    115121    // NTCP2 stuff
    116122    public static final String STYLE = "NTCP";
    117     private static final String STYLE2 = "NTCP2";
    118     private static final String PROP_NTCP2_ENABLE = "i2np.ntcp2.enable";
    119     private static final boolean DEFAULT_NTCP2_ENABLE = false;
    120     private boolean _enableNTCP2;
    121     private static final String NTCP2_PROTO_SHORT = "NXK2CS";
    122     private static final String OPT_NTCP2_SK = 'N' + NTCP2_PROTO_SHORT + "2s";
     123    public static final String STYLE2 = "NTCP2";
    123124    static final int NTCP2_INT_VERSION = 2;
    124     private static final String NTCP2_VERSION = Integer.toString(NTCP2_INT_VERSION);
     125    /** "2" */
     126    static final String NTCP2_VERSION = Integer.toString(NTCP2_INT_VERSION);
     127    /** "2," */
     128    static final String NTCP2_VERSION_ALT = NTCP2_VERSION + ',';
    125129    /** b64 static private key */
    126     private static final String PROP_NTCP2_SP = "i2np.ntcp2.sp";
     130    public static final String PROP_NTCP2_SP = "i2np.ntcp2.sp";
    127131    /** b64 static IV */
    128     private static final String PROP_NTCP2_IV = "i2np.ntcp2.iv";
    129     private static final int NTCP2_IV_LEN = 16;
    130     private static final int NTCP2_KEY_LEN = 32;
     132    public static final String PROP_NTCP2_IV = "i2np.ntcp2.iv";
     133    private static final int NTCP2_IV_LEN = OutboundNTCP2State.IV_SIZE;
     134    private static final int NTCP2_KEY_LEN = OutboundNTCP2State.KEY_SIZE;
     135    private final boolean _enableNTCP2;
     136    private final byte[] _ntcp2StaticPubkey;
    131137    private final byte[] _ntcp2StaticPrivkey;
    132138    private final byte[] _ntcp2StaticIV;
     
    134140    private final String _b64Ntcp2StaticIV;
    135141
    136     public NTCPTransport(RouterContext ctx, DHSessionKeyBuilder.Factory dh) {
     142    /**
     143     *  @param xdh null to disable NTCP2
     144     */
     145    public NTCPTransport(RouterContext ctx, DHSessionKeyBuilder.Factory dh, X25519KeyFactory xdh) {
    137146        super(ctx);
    138147        _dhFactory = dh;
     148        _xdhFactory = xdh;
    139149        _log = ctx.logManager().getLog(getClass());
    140150
     
    223233        _transientFail = new SharedBid(TransportBid.TRANSIENT_FAIL);
    224234
    225         //_enableNTCP2 = ctx.getProperty(PROP_NTCP2_ENABLE, DEFAULT_NTCP2_ENABLE);
    226         _enableNTCP2 = false;
     235        _enableNTCP2 = xdh != null;
    227236        if (_enableNTCP2) {
    228237            boolean shouldSave = false;
    229238            byte[] priv = null;
    230239            byte[] iv = null;
    231             String b64Pub = null;
    232240            String b64IV = null;
    233241            String s = ctx.getProperty(PROP_NTCP2_SP);
     
    236244            }
    237245            if (priv == null || priv.length != NTCP2_KEY_LEN) {
    238                 priv = new byte[NTCP2_KEY_LEN];
    239                 ctx.random().nextBytes(priv);
     246                KeyPair keys = xdh.getKeys();
     247                _ntcp2StaticPrivkey = keys.getPrivate().getEncoded();
     248                _ntcp2StaticPubkey = keys.getPublic().getEncoded();
    240249                shouldSave = true;
    241             }
    242             s = ctx.getProperty(PROP_NTCP2_IV);
    243             if (s != null) {
    244                 iv = Base64.decode(s);
    245                 b64IV = s;
     250            } else {
     251                _ntcp2StaticPrivkey = priv;
     252                _ntcp2StaticPubkey = (new X25519PrivateKey(priv)).toPublic().getEncoded();
     253            }
     254            if (!shouldSave) {
     255                s = ctx.getProperty(PROP_NTCP2_IV);
     256                if (s != null) {
     257                    iv = Base64.decode(s);
     258                    b64IV = s;
     259                }
    246260            }
    247261            if (iv == null || iv.length != NTCP2_IV_LEN) {
     
    252266            if (shouldSave) {
    253267                Map<String, String> changes = new HashMap<String, String>(2);
    254                 String b64Priv = Base64.encode(priv);
     268                String b64Priv = Base64.encode(_ntcp2StaticPrivkey);
    255269                b64IV = Base64.encode(iv);
    256270                changes.put(PROP_NTCP2_SP, b64Priv);
     
    258272                ctx.router().saveConfig(changes, null);
    259273            }
    260             _ntcp2StaticPrivkey = priv;
    261274            _ntcp2StaticIV = iv;
    262             _b64Ntcp2StaticPubkey = "TODO"; // priv->pub
     275            _b64Ntcp2StaticPubkey = Base64.encode(_ntcp2StaticPubkey);
    263276            _b64Ntcp2StaticIV = b64IV;
    264277        } else {
     278            _ntcp2StaticPubkey = null;
    265279            _ntcp2StaticPrivkey = null;
    266280            _ntcp2StaticIV = null;
     
    300314            Hash ih = ident.calculateHash();
    301315            NTCPConnection con = null;
    302             boolean isNew = false;
     316            int newVersion = 0;
    303317            boolean fail = false;
    304318            synchronized (_conLock) {
    305319                con = _conByIdent.get(ih);
    306320                if (con == null) {
    307                     isNew = true;
    308321                    RouterAddress addr = getTargetAddress(target);
    309322                    if (addr != null) {
    310                         int ver = getNTCPVersion(addr);
    311                         if (ver != 0) {
    312                             con = new NTCPConnection(_context, this, ident, addr, ver);
     323                        newVersion = getNTCPVersion(addr);
     324                        if (newVersion != 0) {
     325                            con = new NTCPConnection(_context, this, ident, addr, newVersion);
     326                            establishing(con);
    313327                            //if (_log.shouldLog(Log.DEBUG))
    314328                            //    _log.debug("Send on a new con: " + con + " at " + addr + " for " + ih);
     
    332346                return;
    333347            }
    334             if (isNew) {
    335                 // doesn't do anything yet, just enqueues it
    336                 con.send(msg);
     348            if (newVersion != 0) {
    337349                // As of 0.9.12, don't send our info if the first message is
    338350                // doing the same (common when connecting to a floodfill).
     
    342354                // but that's fixed in 0.9.12.
    343355                boolean shouldSkipInfo = false;
     356                boolean shouldFlood = false;
    344357                I2NPMessage m = msg.getMessage();
    345358                if (m.getType() == DatabaseStoreMessage.MESSAGE_TYPE) {
     
    347360                    if (dsm.getKey().equals(_context.routerHash())) {
    348361                        shouldSkipInfo = true;
     362                        shouldFlood = dsm.getReplyToken() != 0;
     363                        // TODO tell the NTCP2 con to flood in the handshake and mark success when sent
    349364                    }
    350365                }
    351366                if (!shouldSkipInfo) {
     367                    // Queue the message, and our RI
     368                    // doesn't do anything yet, just enqueues it
     369                    con.send(msg);
    352370                    con.enqueueInfoMessage();
     371                } else if (shouldFlood || newVersion == 1) {
     372                    // Queue the message, which is a DSM of our RI
     373                    con.send(msg);
    353374                } else if (_log.shouldLog(Log.INFO)) {
     375                    // Send nothing, the handshake has the RI
     376                    // version == 2 && shouldSkipInfo && !shouldFlood
    354377                    _log.info("SKIPPING INFO message: " + con);
    355378                }
     
    366389                    _context.statManager().addRateData("ntcp.outboundFailedIOEImmediate", 1);
    367390                    con.close();
     391                    afterSend(msg, false);
     392                } catch (IllegalStateException ise) {
     393                    _log.error("Failed opening a channel", ise);
     394                    afterSend(msg, false);
    368395                }
    369396            } else {
     
    678705
    679706        for (NTCPConnection con : _conByIdent.values()) {
     707            // TODO skip isEstablished() check?
    680708            if (con.isEstablished() && con.getCreated() > tooOld)
    681709                skews.addElement(Long.valueOf(con.getClockSkew()));
     
    697725     *  something long-delayed. To be fixed in next version of NTCP.
    698726     *
    699      *  @param hxhi 32 bytes
     727     *  @param hxhi using first 8 bytes only
    700728     *  @return valid
    701729     *  @since 0.9.12
     
    741769        } else if (port > 0) {
    742770            // all detected interfaces
    743             for (InetAddress ia : getSavedLocalAddresses()) {
    744                 OrderedProperties props = new OrderedProperties();
    745                 props.setProperty(RouterAddress.PROP_HOST, ia.getHostAddress());
    746                 props.setProperty(RouterAddress.PROP_PORT, Integer.toString(port));
    747                 addNTCP2Options(props);
    748                 int cost = getDefaultCost(ia instanceof Inet6Address);
    749                 myAddress = new RouterAddress(STYLE, props, cost);
    750                 replaceAddress(myAddress);
    751             }
     771            Collection<InetAddress> addrs = getSavedLocalAddresses();
     772            if (!addrs.isEmpty()) {
     773                for (InetAddress ia : addrs) {
     774                    OrderedProperties props = new OrderedProperties();
     775                    props.setProperty(RouterAddress.PROP_HOST, ia.getHostAddress());
     776                    props.setProperty(RouterAddress.PROP_PORT, Integer.toString(port));
     777                    addNTCP2Options(props);
     778                    int cost = getDefaultCost(ia instanceof Inet6Address);
     779                    myAddress = new RouterAddress(STYLE, props, cost);
     780                    replaceAddress(myAddress);
     781                }
     782            } else if (_enableNTCP2) {
     783                setOutboundNTCP2Address();
     784            }
     785        } else if (_enableNTCP2) {
     786            setOutboundNTCP2Address();
    752787        }
    753788        // TransportManager.startListening() calls router.rebuildRouterInfo()
     789    }
     790
     791    /**
     792     *  Outbound only, NTCP2 with "s" and "v" only
     793     *  @since 0.9.36
     794     */
     795    private void setOutboundNTCP2Address() {
     796        OrderedProperties props = new OrderedProperties();
     797        addNTCP2Options(props);
     798        RouterAddress myAddress = new RouterAddress(STYLE2, props, NTCP2_OUTBOUND_COST);
     799        replaceAddress(myAddress);
    754800    }
    755801
     
    9681014
    9691015    /**
     1016     *  @return null if not configured for NTCP2
     1017     *  @since 0.9.36
     1018     */
     1019    X25519KeyFactory getXDHFactory() {
     1020        return _xdhFactory;
     1021    }
     1022
     1023    /**
    9701024     * Return an unused DH key builder
    9711025     * to be put back onto the queue for reuse.
     
    10721126
    10731127    /**
    1074      * Add the required options to the properties for a NTCP2 address
     1128     * Add the required options to the properties for a NTCP2 address.
     1129     * Host/port must already be set in props if they are going to be.
    10751130     *
    10761131     * @since 0.9.35
     
    10791134        if (!_enableNTCP2)
    10801135            return;
    1081         props.setProperty("i", _b64Ntcp2StaticIV);
    1082         props.setProperty("n", NTCP2_PROTO_SHORT);
     1136        // only set i if we are not firewalled
     1137        if (props.containsKey("host"))
     1138            props.setProperty("i", _b64Ntcp2StaticIV);
    10831139        props.setProperty("s", _b64Ntcp2StaticPubkey);
    10841140        props.setProperty("v", NTCP2_VERSION);
     
    10951151     * The static priv key
    10961152     *
     1153     * @since 0.9.36
     1154     */
     1155    byte[] getNTCP2StaticPubkey() {
     1156        return _ntcp2StaticPubkey;
     1157    }
     1158
     1159    /**
     1160     * The static priv key
     1161     *
    10971162     * @since 0.9.35
    10981163     */
     
    11021167
    11031168    /**
    1104      * Get the valid NTCP version of this NTCP address.
     1169     * The static IV
     1170     *
     1171     * @since 0.9.36
     1172     */
     1173    byte[] getNTCP2StaticIV() {
     1174        return _ntcp2StaticIV;
     1175    }
     1176
     1177    /**
     1178     * Get the valid NTCP version of Bob's NTCP address
     1179     * for our outbound connections as Alice.
    11051180     *
    11061181     * @return the valid version 1 or 2, or 0 if unusable
     
    11171192            if (!_enableNTCP2)
    11181193                return 0;
    1119             rv = 2;
     1194            rv = NTCP2_INT_VERSION;
    11201195        } else {
    11211196            return 0;
    11221197        }
    1123         if (addr.getOption("s") == null ||
     1198        // check version == "2" || version starts with "2,"
     1199        // and static key, and iv
     1200        String v = addr.getOption("v");
     1201        if (v == null ||
    11241202            addr.getOption("i") == null ||
    1125             !NTCP2_VERSION.equals(addr.getOption("v")) ||
    1126             !NTCP2_PROTO_SHORT.equals(addr.getOption("n"))) {
     1203            addr.getOption("s") == null ||
     1204            (!v.equals(NTCP2_VERSION) && !v.startsWith(NTCP2_VERSION_ALT))) {
    11271205            return (rv == 1) ? 1 : 0;
    11281206        }
    11291207        // todo validate s/i b64, or just catch it later?
    1130         return rv;
     1208        return NTCP2_INT_VERSION;
    11311209    }
    11321210
     
    13151393        if (oldAddr == null) {
    13161394            cost = getDefaultCost(isIPv6);
    1317             addNTCP2Options(newProps);
    13181395        } else {
    13191396            cost = oldAddr.getCost();
     
    14371514            }
    14381515        }
     1516        addNTCP2Options(newProps);
    14391517
    14401518        // stopListening stops the pumper, readers, and writers, so required even if
  • router/java/src/net/i2p/router/transport/ntcp/OutboundEstablishState.java

    r49221ad rae8779e  
    44import java.net.InetAddress;
    55import java.nio.ByteBuffer;
     6import java.util.Arrays;
    67
    78import net.i2p.crypto.SigType;
     
    3435
    3536    /**
    36      * parse the contents of the buffer as part of the handshake.  if the
    37      * handshake is completed and there is more data remaining, the data are
    38      * copieed out so that the next read will be the (still encrypted) remaining
    39      * data (available from getExtraBytes)
     37     * Parse the contents of the buffer as part of the handshake.
    4038     *
    4139     * All data must be copied out of the buffer as Reader.processRead()
    4240     * will return it to the pool.
     41     *
     42     * If there are additional data in the buffer after the handshake is complete,
     43     * the EstablishState is responsible for passing it to NTCPConnection.
    4344     */
    4445    @Override
     
    6667     *  Caller must synch.
    6768     *
    68      *  FIXME none of the _state comparisons use _stateLock, but whole thing
    69      *  is synchronized, should be OK. See isComplete()
    7069     */
    7170    private void receiveOutbound(ByteBuffer src) {
     
    7372        // recv Y+E(H(X+Y)+tsB, sk, Y[239:255])
    7473        // Read in Y, which is the first part of message #2
    75         while (_state == State.OB_SENT_X && src.hasRemaining()) {
    76             byte c = src.get();
    77             _Y[_received++] = c;
    78             if (_received >= XY_SIZE) {
    79                 try {
    80                     _dh.setPeerPublicValue(_Y);
    81                     _dh.getSessionKey(); // force the calc
    82                     if (_log.shouldLog(Log.DEBUG))
    83                         _log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
    84                     changeState(State.OB_GOT_Y);
    85                 } catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
    86                     _context.statManager().addRateData("ntcp.invalidDH", 1);
    87                     fail("Invalid X", e);
    88                     return;
    89                 }
    90             }
    91         }
    92 
    93         // Read in Y, which is the first part of message #2
     74        if (_state == State.OB_SENT_X && src.hasRemaining()) {
     75            int toGet = Math.min(src.remaining(), XY_SIZE - _received);
     76            src.get(_Y, _received, toGet);
     77            _received += toGet;
     78            if (_received < XY_SIZE)
     79                return;
     80
     81            try {
     82                _dh.setPeerPublicValue(_Y);
     83                _dh.getSessionKey(); // force the calc
     84                if (_log.shouldLog(Log.DEBUG))
     85                    _log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
     86                changeState(State.OB_GOT_Y);
     87                _received = 0;
     88            } catch (DHSessionKeyBuilder.InvalidPublicParameterException e) {
     89                _context.statManager().addRateData("ntcp.invalidDH", 1);
     90                fail("Invalid X", e);
     91                return;
     92            } catch (IllegalStateException ise) {
     93                // setPeerPublicValue()
     94                fail("reused keys?", ise);
     95                return;
     96            }
     97        }
     98
    9499        // Read in the rest of message #2
    95         while (_state == State.OB_GOT_Y && src.hasRemaining()) {
    96             int i = _received-XY_SIZE;
    97             _received++;
    98             byte c = src.get();
    99             _e_hXY_tsB[i] = c;
    100             if (i+1 >= HXY_TSB_PAD_SIZE) {
    101                 if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "received _e_hXY_tsB fully");
    102                 byte hXY_tsB[] = new byte[HXY_TSB_PAD_SIZE];
    103                 _context.aes().decrypt(_e_hXY_tsB, 0, hXY_tsB, 0, _dh.getSessionKey(), _Y, XY_SIZE-AES_SIZE, HXY_TSB_PAD_SIZE);
    104                 byte XY[] = new byte[XY_SIZE + XY_SIZE];
    105                 System.arraycopy(_X, 0, XY, 0, XY_SIZE);
    106                 System.arraycopy(_Y, 0, XY, XY_SIZE, XY_SIZE);
    107                 byte[] h = SimpleByteCache.acquire(HXY_SIZE);
    108                 _context.sha().calculateHash(XY, 0, XY_SIZE + XY_SIZE, h, 0);
    109                 if (!DataHelper.eq(h, 0, hXY_tsB, 0, HXY_SIZE)) {
    110                     SimpleByteCache.release(h);
    111                     _context.statManager().addRateData("ntcp.invalidHXY", 1);
    112                     fail("Invalid H(X+Y) - mitm attack attempted?");
    113                     return;
    114                 }
     100        if (_state == State.OB_GOT_Y && src.hasRemaining()) {
     101            int toGet = Math.min(src.remaining(), HXY_TSB_PAD_SIZE - _received);
     102            src.get(_e_hXY_tsB, _received, toGet);
     103            _received += toGet;
     104            if (_received < HXY_TSB_PAD_SIZE)
     105                return;
     106
     107            if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix() + "received _e_hXY_tsB fully");
     108            byte hXY_tsB[] = new byte[HXY_TSB_PAD_SIZE];
     109            _context.aes().decrypt(_e_hXY_tsB, 0, hXY_tsB, 0, _dh.getSessionKey(), _Y, XY_SIZE-AES_SIZE, HXY_TSB_PAD_SIZE);
     110            byte XY[] = new byte[XY_SIZE + XY_SIZE];
     111            System.arraycopy(_X, 0, XY, 0, XY_SIZE);
     112            System.arraycopy(_Y, 0, XY, XY_SIZE, XY_SIZE);
     113            byte[] h = SimpleByteCache.acquire(HXY_SIZE);
     114            _context.sha().calculateHash(XY, 0, XY_SIZE + XY_SIZE, h, 0);
     115            if (!DataHelper.eq(h, 0, hXY_tsB, 0, HXY_SIZE)) {
    115116                SimpleByteCache.release(h);
    116                 changeState(State.OB_GOT_HXY);
    117                 // their (Bob's) timestamp in seconds
    118                 _tsB = DataHelper.fromLong(hXY_tsB, HXY_SIZE, 4);
    119                 long now = _context.clock().now();
    120                 // rtt from sending #1 to receiving #2
    121                 long rtt = now - _con.getCreated();
    122                 // our (Alice's) timestamp in seconds
    123                 _tsA = (now + 500) / 1000;
    124                 _peerSkew = (now - (_tsB * 1000) - (rtt / 2) + 500) / 1000;
    125                 if (_log.shouldLog(Log.DEBUG))
    126                     _log.debug(prefix()+"h(X+Y) is correct, skew = " + _peerSkew);
    127 
    128                 // the skew is not authenticated yet, but it is certainly fatal to
    129                 // the establishment, so fail hard if appropriate
    130                 long diff = 1000*Math.abs(_peerSkew);
    131                 if (!_context.clock().getUpdatedSuccessfully()) {
    132                     // Adjust the clock one time in desperation
    133                     // We are Alice, he is Bob, adjust to match Bob
    134                     _context.clock().setOffset(1000 * (0 - _peerSkew), true);
    135                     _peerSkew = 0;
    136                     if (diff != 0)
    137                         _log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
    138                 } else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
    139                     _context.statManager().addRateData("ntcp.invalidOutboundSkew", diff);
    140                     _transport.markReachable(_con.getRemotePeer().calculateHash(), false);
    141                     // Only banlist if we know what time it is
    142                     _context.banlist().banlistRouter(DataHelper.formatDuration(diff),
    143                                                        _con.getRemotePeer().calculateHash(),
    144                                                        _x("Excessive clock skew: {0}"));
    145                     _transport.setLastBadSkew(_peerSkew);
    146                     fail("Clocks too skewed (" + diff + " ms)", null, true);
    147                     return;
    148                 } else if (_log.shouldLog(Log.DEBUG)) {
    149                     _log.debug(prefix()+"Clock skew: " + diff + " ms");
    150                 }
    151 
    152                 // now prepare and send our response
    153                 // send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
    154                 int sigSize = XY_SIZE + XY_SIZE + HXY_SIZE + 4+4;//+12;
    155                 byte preSign[] = new byte[sigSize];
    156                 System.arraycopy(_X, 0, preSign, 0, XY_SIZE);
    157                 System.arraycopy(_Y, 0, preSign, XY_SIZE, XY_SIZE);
    158                 System.arraycopy(_con.getRemotePeer().calculateHash().getData(), 0, preSign, XY_SIZE + XY_SIZE, HXY_SIZE);
    159                 DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE, 4, _tsA);
    160                 DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE + 4, 4, _tsB);
    161                 // hXY_tsB has 12 bytes of padding (size=48, tsB=4 + hXY=32)
    162                 Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
    163 
    164                 byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
    165                 // handle variable signature size
    166                 int min = 2 + ident.length + 4 + sig.length();
    167                 int rem = min % AES_SIZE;
    168                 int padding = 0;
    169                 if (rem > 0)
    170                     padding = AES_SIZE - rem;
    171                 byte preEncrypt[] = new byte[min+padding];
    172                 DataHelper.toLong(preEncrypt, 0, 2, ident.length);
    173                 System.arraycopy(ident, 0, preEncrypt, 2, ident.length);
    174                 DataHelper.toLong(preEncrypt, 2+ident.length, 4, _tsA);
    175                 if (padding > 0)
    176                     _context.random().nextBytes(preEncrypt, 2 + ident.length + 4, padding);
    177                 System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, sig.length());
    178 
    179                 _prevEncrypted = new byte[preEncrypt.length];
    180                 _context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(),
    181                                        _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-AES_SIZE, preEncrypt.length);
    182 
    183                 changeState(State.OB_SENT_RI);
    184                 _transport.getPumper().wantsWrite(_con, _prevEncrypted);
    185             }
     117                _context.statManager().addRateData("ntcp.invalidHXY", 1);
     118                fail("Invalid H(X+Y) - mitm attack attempted?");
     119                return;
     120            }
     121            SimpleByteCache.release(h);
     122            changeState(State.OB_GOT_HXY);
     123            _received = 0;
     124            // their (Bob's) timestamp in seconds
     125            _tsB = DataHelper.fromLong(hXY_tsB, HXY_SIZE, 4);
     126            long now = _context.clock().now();
     127            // rtt from sending #1 to receiving #2
     128            long rtt = now - _con.getCreated();
     129            // our (Alice's) timestamp in seconds
     130            _tsA = (now + 500) / 1000;
     131            _peerSkew = (now - (_tsB * 1000) - (rtt / 2) + 500) / 1000;
     132            if (_log.shouldLog(Log.DEBUG))
     133                _log.debug(prefix()+"h(X+Y) is correct, skew = " + _peerSkew);
     134
     135            // the skew is not authenticated yet, but it is certainly fatal to
     136            // the establishment, so fail hard if appropriate
     137            long diff = 1000*Math.abs(_peerSkew);
     138            if (!_context.clock().getUpdatedSuccessfully()) {
     139                // Adjust the clock one time in desperation
     140                // We are Alice, he is Bob, adjust to match Bob
     141                _context.clock().setOffset(1000 * (0 - _peerSkew), true);
     142                _peerSkew = 0;
     143                if (diff != 0)
     144                    _log.logAlways(Log.WARN, "NTP failure, NTCP adjusting clock by " + DataHelper.formatDuration(diff));
     145            } else if (diff >= Router.CLOCK_FUDGE_FACTOR) {
     146                _context.statManager().addRateData("ntcp.invalidOutboundSkew", diff);
     147                _transport.markReachable(_con.getRemotePeer().calculateHash(), false);
     148                // Only banlist if we know what time it is
     149                _context.banlist().banlistRouter(DataHelper.formatDuration(diff),
     150                                                   _con.getRemotePeer().calculateHash(),
     151                                                   _x("Excessive clock skew: {0}"));
     152                _transport.setLastBadSkew(_peerSkew);
     153                fail("Clocks too skewed (" + diff + " ms)", null, true);
     154                return;
     155            } else if (_log.shouldLog(Log.DEBUG)) {
     156                _log.debug(prefix()+"Clock skew: " + diff + " ms");
     157            }
     158
     159            // now prepare and send our response
     160            // send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
     161            int sigSize = XY_SIZE + XY_SIZE + HXY_SIZE + 4+4;//+12;
     162            byte preSign[] = new byte[sigSize];
     163            System.arraycopy(_X, 0, preSign, 0, XY_SIZE);
     164            System.arraycopy(_Y, 0, preSign, XY_SIZE, XY_SIZE);
     165            System.arraycopy(_con.getRemotePeer().calculateHash().getData(), 0, preSign, XY_SIZE + XY_SIZE, HXY_SIZE);
     166            DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE, 4, _tsA);
     167            DataHelper.toLong(preSign, XY_SIZE + XY_SIZE + HXY_SIZE + 4, 4, _tsB);
     168            // hXY_tsB has 12 bytes of padding (size=48, tsB=4 + hXY=32)
     169            Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
     170
     171            byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
     172            // handle variable signature size
     173            int min = 2 + ident.length + 4 + sig.length();
     174            int rem = min % AES_SIZE;
     175            int padding = 0;
     176            if (rem > 0)
     177                padding = AES_SIZE - rem;
     178            byte preEncrypt[] = new byte[min+padding];
     179            DataHelper.toLong(preEncrypt, 0, 2, ident.length);
     180            System.arraycopy(ident, 0, preEncrypt, 2, ident.length);
     181            DataHelper.toLong(preEncrypt, 2+ident.length, 4, _tsA);
     182            if (padding > 0)
     183                _context.random().nextBytes(preEncrypt, 2 + ident.length + 4, padding);
     184            System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, sig.length());
     185
     186            _prevEncrypted = new byte[preEncrypt.length];
     187            _context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(),
     188                                   _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-AES_SIZE, preEncrypt.length);
     189
     190            changeState(State.OB_SENT_RI);
     191            _transport.getPumper().wantsWrite(_con, _prevEncrypted);
    186192        }
    187193
     
    206212                               src.hasRemaining() + ")");
    207213            } else {
    208                 off = _received - XY_SIZE - HXY_TSB_PAD_SIZE;
     214                off = _received;
    209215                if (_log.shouldLog(Log.DEBUG))
    210216                    _log.debug(prefix() + "continuing to receive E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev) (remaining? " +
     
    224230                    SigType type = _con.getRemotePeer().getSigningPublicKey().getType();
    225231                    int siglen = type.getSigLen();
     232                    // we don't need to do this if no padding!
    226233                    byte bobSigData[] = new byte[siglen];
    227234                    System.arraycopy(bobSig, 0, bobSigData, 0, siglen);
     
    243250                        if (_log.shouldLog(Log.DEBUG))
    244251                            _log.debug(prefix() + "signature verified from Bob.  done!");
    245                         prepareExtra(src);
    246252                        byte nextWriteIV[] = SimpleByteCache.acquire(AES_SIZE);
    247253                        System.arraycopy(_prevEncrypted, _prevEncrypted.length-AES_SIZE, nextWriteIV, 0, AES_SIZE);
    248254                        // this does not copy the nextWriteIV, do not release to cache
    249255                        // We are Alice, he is Bob, clock skew is Bob - Alice
    250                         _con.finishOutboundEstablishment(_dh.getSessionKey(), _peerSkew, nextWriteIV, _e_bobSig); // skew in seconds
     256                        // skew in seconds
     257                        _con.finishOutboundEstablishment(_dh.getSessionKey(), _peerSkew, nextWriteIV, _e_bobSig);
     258                        changeState(State.VERIFIED);
     259                        if (src.hasRemaining()) {
     260                            // process "extra" data
     261                            // This is fairly common for outbound, where Bob may send his updated RI
     262                            if (_log.shouldInfo())
     263                                _log.info("extra data " + src.remaining() + " on " + this);
     264                            _con.recvEncryptedI2NP(src);
     265                        }
    251266                        releaseBufs(true);
    252267                        // if socket gets closed this will be null - prevent NPE
     
    254269                        if (ia != null)
    255270                            _transport.setIP(_con.getRemotePeer().calculateHash(), ia.getAddress());
    256                         changeState(State.VERIFIED);
    257271                    }
    258272                    return;
    259273                }
    260274            }
     275        }
     276
     277        // check for remaining data
     278        if ((_state == State.VERIFIED || _state == State.CORRUPT) && src.hasRemaining()) {
     279            if (_log.shouldWarn())
     280                _log.warn("Received unexpected " + src.remaining() + " on " + this, new Exception());
    261281        }
    262282    }
     
    267287     * queueing up the write of the first part of the handshake
    268288     * This method sends message #1 to Bob.
    269      */
     289     *
     290     * @throws IllegalStateException
     291     */
     292    @Override
    270293    public synchronized void prepareOutbound() {
    271         boolean shouldSend;
    272         synchronized(_stateLock) {   
    273             shouldSend = _state == State.OB_INIT;
    274         }
    275         if (shouldSend) {
     294        if (_state == State.OB_INIT) {
    276295            if (_log.shouldLog(Log.DEBUG))
    277296                _log.debug(prefix() + "send X");
     
    282301            _transport.getPumper().wantsWrite(_con, toWrite);
    283302        } else {
    284             if (_log.shouldLog(Log.WARN))
    285                 _log.warn(prefix() + "unexpected prepareOutbound()");
     303            throw new IllegalStateException(prefix() + "unexpected prepareOutbound()");
    286304        }
    287305    }
     
    294312    protected void releaseBufs(boolean isVerified) {
    295313        super.releaseBufs(isVerified);
     314        Arrays.fill(_Y, (byte) 0);
    296315        SimpleByteCache.release(_Y);
    297316    }
  • router/java/src/net/i2p/router/transport/ntcp/Reader.java

    r49221ad rae8779e  
    150150                return;
    151151            EstablishState est = con.getEstablishState();
    152             if (_log.shouldLog(Log.DEBUG))
    153                 _log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]");
    154152           
    155153            if (est.isComplete()) {
     
    164162            EventPumper.releaseBuf(buf);
    165163            if (est.isCorrupt()) {
    166                 if (_log.shouldLog(Log.WARN))
    167                     _log.warn("closing connection on establishment because: " +est.getError(), est.getException());
    168                 if (!est.getFailedBySkew())
    169                     _context.statManager().addRateData("ntcp.receiveCorruptEstablishment", 1);
    170164                con.close();
    171165                return;
    172166            }
    173             if (est.isComplete() && est.getExtraBytes() != null)
    174                 con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes()));
     167            // EstablishState is responsible for passing "extra" data to the con
    175168        }
    176169        while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) {
    177170            // decrypt the data and push it into an i2np message
    178             if (_log.shouldLog(Log.DEBUG))
    179                 _log.debug("Processing read buffer as part of an i2np message (" + buf.remaining() + " bytes)");
    180171            con.recvEncryptedI2NP(buf);
    181172            EventPumper.releaseBuf(buf);
Note: See TracChangeset for help on using the changeset viewer.