Changeset f630d2dd


Ignore:
Timestamp:
Nov 23, 2011 11:36:37 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
17773a2
Parents:
f69f06b
Message:
  • NTCP:
    • More optimizations in recvEncrypted()
    • More efficient XOR
    • Reduce bandwidth stat update frequency
    • Check for repeated zero-length reads
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    rf69f06b rf630d2dd  
     12011-11-23 zzz
     2  * CryptixAESEngine: Fix bogus bounds checks
     3  * NTCP:
     4    - More optimizations in recvEncrypted()
     5    - More efficient XOR
     6    - Reduce bandwidth stat update frequency
     7    - Check for repeated zero-length reads
     8  * RandomSource: Add new method getBytes(buf, offset, length)
     9  * Tunnel encryption: More efficient XOR
     10
    1112011-11-21 zzz
    212  * NTCP Pumper:
  • router/java/src/net/i2p/router/RouterVersion.java

    rf69f06b rf630d2dd  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 6;
     21    public final static long BUILD = 7;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    rf69f06b rf630d2dd  
    102102        _context.statManager().createRateStat("ntcp.pumperKeysPerLoop", "", "ntcp", new long[] {10*60*1000} );
    103103        _context.statManager().createRateStat("ntcp.pumperLoopsPerSecond", "", "ntcp", new long[] {10*60*1000} );
     104        _context.statManager().createRateStat("ntcp.zeroRead", "", "ntcp", new long[] {10*60*1000} );
     105        _context.statManager().createRateStat("ntcp.zeroReadDrop", "", "ntcp", new long[] {10*60*1000} );
    104106    }
    105107   
     
    562564                //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
    563565                releaseBuf(buf);
     566                // workaround for channel stuck returning 0 all the time, causing 100% CPU
     567                int consec = con.gotZeroRead();
     568                if (consec >= 5) {
     569                    _context.statManager().addRateData("ntcp.zeroReadDrop", 1);
     570                    if (_log.shouldLog(Log.WARN))
     571                        _log.warn("Fail safe zero read close " + con);
     572                    con.close();
     573                } else {
     574                    _context.statManager().addRateData("ntcp.zeroRead", consec);
     575                    if (_log.shouldLog(Log.INFO))
     576                        _log.info("nothing to read for " + con + ", but stay interested");
     577                }
    564578            } else if (read > 0) {
     579                // clear counter for workaround above
     580                con.clearZeroRead();
    565581                // ZERO COPY. The buffer will be returned in Reader.processRead()
    566582                buf.flip();
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    rf69f06b rf630d2dd  
    107107    private long _lastSendTime;
    108108    private long _lastReceiveTime;
     109    private long _lastRateUpdated;
    109110    private final long _created;
    110111    private long _nextMetaTime;
     112    private int _consecutiveZeroReads;
     113
     114    private static final int BLOCK_SIZE = 16;
     115    private static final int META_SIZE = BLOCK_SIZE;
     116
    111117    /** unencrypted outbound metadata buffer */
    112     private final byte _meta[] = new byte[16];
     118    private final byte _meta[] = new byte[META_SIZE];
    113119    private boolean _sendingMeta;
    114120    /** how many consecutive sends were failed due to (estimated) send queue time */
     
    116122    private long _nextInfoTime;
    117123   
     124    /*
     125     *  Update frequency for send/recv rates in console peers page
     126     */
     127    private static final long STAT_UPDATE_TIME_MS = 30*1000;
     128
    118129    private static final int META_FREQUENCY = 10*60*1000;
    119130    /** how often we send our routerinfo unsolicited */
     
    145156        _outbound = new LinkedBlockingQueue();
    146157        _isInbound = true;
    147         _decryptBlockBuf = new byte[16];
     158        _decryptBlockBuf = new byte[BLOCK_SIZE];
    148159        _curReadState = new ReadState();
    149160        _establishState = new EstablishState(ctx, transport, this);
     
    170181        _outbound = new LinkedBlockingQueue();
    171182        _isInbound = false;
    172         _decryptBlockBuf = new byte[16];
     183        _decryptBlockBuf = new byte[BLOCK_SIZE];
    173184        _curReadState = new ReadState();
    174185        initialize();
     
    178189        _lastSendTime = _created;
    179190        _lastReceiveTime = _created;
    180         _curReadBlock = new byte[16];
    181         _prevReadBlock = new byte[16];
     191        _lastRateUpdated = _created;
     192        _curReadBlock = new byte[BLOCK_SIZE];
     193        _prevReadBlock = new byte[BLOCK_SIZE];
    182194        _transport.establishing(this);
    183195    }
     
    201213        _clockSkew = clockSkew;
    202214        _prevWriteEnd = prevWriteEnd;
    203         System.arraycopy(prevReadEnd, prevReadEnd.length-16, _prevReadBlock, 0, _prevReadBlock.length);
    204         if (_log.shouldLog(Log.DEBUG))
    205             _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
     215        System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
     216        //if (_log.shouldLog(Log.DEBUG))
     217        //    _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
    206218        _established = true;
    207219        _establishedOn = System.currentTimeMillis();
     
    242254
    243255    public int getConsecutiveBacklog() { return _consecutiveBacklog; }
    244    
     256     
     257    /**
     258     *  workaround for EventPumper
     259     *  @since 0.8.12
     260     */
     261    public void clearZeroRead() {
     262        _consecutiveZeroReads = 0;
     263    }
     264
     265    /**
     266     *  workaround for EventPumper
     267     *  @return value after incrementing
     268     *  @since 0.8.12
     269     */
     270    public int gotZeroRead() {
     271        return ++_consecutiveZeroReads;
     272    }
     273
    245274    public boolean isClosed() { return _closed; }
    246275    public void close() { close(false); }
     
    442471        _clockSkew = clockSkew;
    443472        _prevWriteEnd = prevWriteEnd;
    444         System.arraycopy(prevReadEnd, prevReadEnd.length-16, _prevReadBlock, 0, _prevReadBlock.length);
     473        System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
    445474        if (_log.shouldLog(Log.DEBUG))
    446475            _log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd));
     
    596625     */
    597626    synchronized void prepareNextWriteFast() {
    598         if (_log.shouldLog(Log.DEBUG))
    599             _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
     627        //if (_log.shouldLog(Log.DEBUG))
     628        //    _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established);
    600629        if (!_isInbound && !_established) {
    601630            if (_establishState == null) {
     
    716745            padding = 16 - rem;
    717746       
    718         buf.padLength = padding;
    719747        buf.unencryptedLength = min+padding;
    720748        DataHelper.toLong(buf.unencrypted, 0, 2, sz);
    721749        System.arraycopy(buf.base, 0, buf.unencrypted, 2, buf.baseLength);
    722         if (padding > 0)
    723             _context.random().nextBytes(buf.pad); // maybe more than necessary, but its only the prng
    724         System.arraycopy(buf.pad, 0, buf.unencrypted, 2+sz, buf.padLength);
     750        if (padding > 0) {
     751            _context.random().nextBytes(buf.unencrypted, 2+sz, padding);
     752        }
    725753       
    726754        //long serialized = System.currentTimeMillis();
     
    766794        final byte base[];
    767795        int baseLength;
    768         final byte pad[];
    769         int padLength;
    770796        final Adler32 crc;
    771797        byte encrypted[];
     
    774800            unencrypted = new byte[BUFFER_SIZE];
    775801            base = new byte[BUFFER_SIZE];
    776             pad = new byte[16];
    777802            crc = new Adler32();
    778803        }
     
    781806            unencryptedLength = 0;
    782807            baseLength = 0;
    783             padLength = 0;
    784808            encrypted = null;
    785809            crc.reset();
     
    949973    /** _bytesSent when we last updated the rate */
    950974    private long _lastBytesSent;
    951     private long _lastRateUpdated;
    952975    private float _sendBps;
    953976    private float _recvBps;
    954     private float _sendBps15s;
    955     private float _recvBps15s;
    956    
    957     public float getSendRate() { return _sendBps15s; }
    958     public float getRecvRate() { return _recvBps15s; }
    959    
     977    //private float _sendBps15s;
     978    //private float _recvBps15s;
     979   
     980    public float getSendRate() { return _sendBps; }
     981    public float getRecvRate() { return _recvBps; }
     982   
     983    /**
     984     *  Stats only for console
     985     */
    960986    private void updateStats() {
    961987        long now = System.currentTimeMillis();
    962988        long time = now - _lastRateUpdated;
    963         // If at least one second has passed
    964         if (time >= 1000) {
     989        // If enough time has passed...
     990        // Perhaps should synchronize, but if so do the time check before synching...
     991        // only for console so don't bother....
     992        if (time >= STAT_UPDATE_TIME_MS) {
    965993            long totS = _bytesSent;
    966994            long totR = _bytesReceived;
     
    9771005            // Weights (0.955 and 0.045) are tuned so that transition between two values (e.g. 0..10)
    9781006            // would reach their midpoint (e.g. 5) in 15s
    979             _sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time;
    980             _recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time;
     1007            //_sendBps15s = (0.955f)*_sendBps15s + (0.045f)*((float)sent*1000f)/(float)time;
     1008            //_recvBps15s = (0.955f)*_recvBps15s + (0.045f)*((float)recv*1000)/(float)time;
    9811009
    9821010            if (_log.shouldLog(Log.DEBUG))
    9831011                _log.debug("Rates updated to "
    984                            + _sendBps + "/" + _recvBps + "Bps in/out ("
    985                            + _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s) after "
    986                            + sent + "/" + recv + " in " + time);
     1012                           + _sendBps + '/' + _recvBps + "Bps in/out "
     1013                           //+ _sendBps15s + "/" + _recvBps15s + "Bps in/out 15s after "
     1014                           + sent + '/' + recv + " in " + DataHelper.formatDuration(time));
    9871015        }
    9881016    }
     
    10041032        //if (_log.shouldLog(Log.DEBUG))
    10051033        //    _log.debug("receive encrypted i2np: " + buf.remaining());
     1034        // hasArray() is false for direct buffers, at least on my system...
     1035        if (_curReadBlockIndex == 0 && buf.hasArray()) {
     1036            // fast way
     1037            int tot = buf.remaining();
     1038            if (tot >= 32 && tot % 16 == 0) {
     1039                recvEncryptedFast(buf);
     1040                return;
     1041            }
     1042        }
     1043
    10061044        while (buf.hasRemaining() && !_closed) {
    1007             int want = Math.min(buf.remaining(), _curReadBlock.length-_curReadBlockIndex);
     1045            int want = Math.min(buf.remaining(), BLOCK_SIZE - _curReadBlockIndex);
    10081046            if (want > 0) {
    10091047                buf.get(_curReadBlock, _curReadBlockIndex, want);
     
    10111049            }
    10121050            //_curReadBlock[_curReadBlockIndex++] = buf.get();
    1013             if (_curReadBlockIndex >= _curReadBlock.length) {
     1051            if (_curReadBlockIndex >= BLOCK_SIZE) {
    10141052                // cbc
    10151053                _context.aes().decryptBlock(_curReadBlock, 0, _sessionKey, _decryptBlockBuf, 0);
    1016                 DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, _decryptBlockBuf.length);
     1054                //DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE);
     1055                for (int i = 0; i < BLOCK_SIZE; i++) {
     1056                    _decryptBlockBuf[i] ^= _prevReadBlock[i];
     1057                }
    10171058                //if (_log.shouldLog(Log.DEBUG))
    10181059                //    _log.debug("parse decrypted i2np block (remaining: " + buf.remaining() + ")");
     
    10301071        }
    10311072    }
     1073
     1074    /**
     1075     *  Decrypt directly out of the ByteBuffer instead of copying the bytes
     1076     *  16 at a time to the _curReadBlock / _prevReadBlock flip buffers.
     1077     *
     1078     *  More efficient but can only be used if buf.hasArray == true AND
     1079     *  _curReadBlockIndex must be 0 and buf.getRemaining() % 16 must be 0
     1080     *  and buf.getRemaining() must be >= 16.
     1081     *  All this is true for most buffers.
     1082     *  In theory this could be fixed up to handle the other cases too but that's hard.
     1083     *  Caller must synchronize!
     1084     *  @since 0.8.12
     1085     */
     1086    private void recvEncryptedFast(ByteBuffer buf) {
     1087        byte[] array = buf.array();
     1088        int pos = buf.arrayOffset();
     1089        int end = pos + buf.remaining();
     1090        boolean first = true;
     1091
     1092        for ( ; pos < end && !_closed; pos += BLOCK_SIZE) {
     1093            _context.aes().decryptBlock(array, pos, _sessionKey, _decryptBlockBuf, 0);
     1094            if (first) {
     1095                // XOR with _prevReadBlock the first time...
     1096                //DataHelper.xor(_decryptBlockBuf, 0, _prevReadBlock, 0, _decryptBlockBuf, 0, BLOCK_SIZE);
     1097                for (int i = 0; i < BLOCK_SIZE; i++) {
     1098                    _decryptBlockBuf[i] ^= _prevReadBlock[i];
     1099                }
     1100                first = false;
     1101            } else {
     1102                //DataHelper.xor(_decryptBlockBuf, 0, array, pos - BLOCK_SIZE, _decryptBlockBuf, 0, BLOCK_SIZE);
     1103                int start = pos - BLOCK_SIZE;
     1104                for (int i = 0; i < BLOCK_SIZE; i++) {
     1105                    _decryptBlockBuf[i] ^= array[start + i];
     1106                }
     1107            }
     1108            boolean ok = recvUnencryptedI2NP();
     1109            if (!ok) {
     1110                _log.error("Read buffer " + System.identityHashCode(buf) + " contained corrupt data");
     1111                _context.statManager().addRateData("ntcp.corruptDecryptedI2NP", 1);
     1112                return;
     1113            }
     1114        }
     1115        // ...and copy to _prevReadBlock the last time
     1116        System.arraycopy(array, end - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE);
     1117    }
    10321118   
    10331119    /**
     
    10391125    private boolean recvUnencryptedI2NP() {
    10401126        _curReadState.receiveBlock(_decryptBlockBuf);
     1127        // FIXME move check to ReadState; must we close? possible attack vector?
    10411128        if (_curReadState.getSize() > BUFFER_SIZE) {
    10421129            _log.error("I2NP message too big - size: " + _curReadState.getSize() + " Dropping " + toString());
     
    10881175    }
    10891176
     1177    /**
     1178     * One special case is a metadata message where the sizeof(data) is 0.  In
     1179     * that case, the unencrypted message is encoded as:
     1180     *<pre>
     1181     *  +-------+-------+-------+-------+-------+-------+-------+-------+
     1182     *  |       0       |      timestamp in seconds     | uninterpreted             
     1183     *  +-------+-------+-------+-------+-------+-------+-------+-------+
     1184     *          uninterpreted           | adler checksum of sz+data+pad |
     1185     *  +-------+-------+-------+-------+-------+-------+-------+-------+
     1186     *</pre>
     1187     */
    10901188    private void sendMeta() {
    10911189        byte encrypted[] = new byte[_meta.length];
    10921190        synchronized (_meta) {
    1093             _context.random().nextBytes(_meta); // randomize the uninterpreted, then overwrite w/ data
    10941191            DataHelper.toLong(_meta, 0, 2, 0);
    10951192            DataHelper.toLong(_meta, 2, 4, (_context.clock().now() + 500) / 1000);
     1193            _context.random().nextBytes(_meta, 6, 6);
    10961194            Adler32 crc = new Adler32();
    10971195            crc.update(_meta, 0, _meta.length-4);
     
    12351333        /** @param buf 16 bytes */
    12361334        private void receiveInitial(byte buf[]) {
    1237             _stateBegin = System.currentTimeMillis();
    12381335            _size = (int)DataHelper.fromLong(buf, 0, 2);
    12391336            if (_size == 0) {
    12401337                readMeta(buf);
    12411338                init();
    1242                 return;
    12431339            } else {
     1340                _stateBegin = System.currentTimeMillis();
    12441341                _dataBuf = acquireReadBuf();
    12451342                System.arraycopy(buf, 2, _dataBuf.data, 0, buf.length-2);
     
    12631360            }
    12641361            if ( (remaining <= 0) && (buf.length-blockUsed < 4) ) {
     1362                // we've received all the data but not the 4-byte checksum
    12651363                if (_log.shouldLog(Log.DEBUG))
    12661364                    _log.debug("crc wraparound required on block " + _blocks + " in message " + _messagesRead);
     
    12851383            _crc.update(buf, 0, buf.length-4);
    12861384            long val = _crc.getValue();
    1287             if (_log.shouldLog(Log.DEBUG))
    1288                 _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size);
     1385            //if (_log.shouldLog(Log.DEBUG))
     1386            //    _log.debug("CRC value computed: " + val + " expected: " + _expectedCrc + " size: " + _size);
    12891387            if (val == _expectedCrc) {
    12901388                try {
     
    13301428                        _log.warn("Error parsing I2NP message", ime);
    13311429                    _context.statManager().addRateData("ntcp.corruptI2NPIME", 1);
     1430                    // FIXME don't close the con, possible attack vector?
    13321431                    close();
    13331432                    // handler and databuf are lost
     
    13381437                    _log.warn("CRC incorrect for message " + _messagesRead + " (calc=" + val + " expected=" + _expectedCrc + ") size=" + _size + " blocks " + _blocks);
    13391438                    _context.statManager().addRateData("ntcp.corruptI2NPCRC", 1);
    1340                 // FIXME should we try to read in the message and keep going?
     1439                // FIXME don't close the con, possible attack vector?
    13411440                close();
    13421441                // databuf is lost
  • router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java

    rf69f06b rf630d2dd  
    757757            buf.append(THINSP).append(DataHelper.formatDuration2(con.getTimeSinceSend()));
    758758            buf.append("</td><td class=\"cells\" align=\"right\">");
    759             if (con.getTimeSinceReceive() < 10*1000) {
    760                 buf.append(formatRate(con.getRecvRate()/1024));
    761                 bpsRecv += con.getRecvRate();
     759            if (con.getTimeSinceReceive() < 2*60*1000) {
     760                float r = con.getRecvRate();
     761                buf.append(formatRate(r / 1024));
     762                bpsRecv += r;
    762763            } else {
    763764                buf.append(formatRate(0));
    764765            }
    765766            buf.append(THINSP);
    766             if (con.getTimeSinceSend() < 10*1000) {
    767                 buf.append(formatRate(con.getSendRate()/1024));
    768                 bpsSend += con.getSendRate();
     767            if (con.getTimeSinceSend() < 2*60*1000) {
     768                float r = con.getSendRate();
     769                buf.append(formatRate(r / 1024));
     770                bpsSend += r;
    769771            } else {
    770772                buf.append(formatRate(0));
Note: See TracChangeset for help on using the changeset viewer.