Changeset d0376f8


Ignore:
Timestamp:
Apr 17, 2009 1:11:16 PM (11 years ago)
Author:
sponge <sponge@…>
Branches:
master
Children:
c3f9e20
Parents:
834fdfe9
Message:

2009-04-17 sponge

  • Catch NPE in NTCP. This possibly augments fix 2009-04-11 welterde below.
  • Various LINT on NTCP sources, and removal of space-wasting spaces at end of lines in sources touched.
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    r834fdfe9 rd0376f8  
     12009-04-17 sponge
     2    * Catch NPE in NTCP.
     3      This possibly augments fix 2009-04-11 welterde below.
     4    * Various LINT on NTCP sources, and removal of space-wasting
     5      spaces at end of lines in sources touched.
     6
    172009-04-13 Mathiasdm
    28    * Bugfix on tray icon updating
  • router/java/src/net/i2p/router/RouterVersion.java

    r834fdfe9 rd0376f8  
    1818    public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $";
    1919    public final static String VERSION = CoreVersion.VERSION;
    20     public final static long BUILD = 18;
     20    public final static long BUILD = 19;
    2121    public static void main(String args[]) {
    2222        System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
  • router/java/src/net/i2p/router/transport/TransportImpl.java

    r834fdfe9 rd0376f8  
    22/*
    33 * free (adj.): unencumbered; not under the control of others
    4  * Written by jrandom in 2003 and released into the public domain 
    5  * with no warranty of any kind, either expressed or implied. 
    6  * It probably won't make your computer catch on fire, or eat 
     4 * Written by jrandom in 2003 and released into the public domain
     5 * with no warranty of any kind, either expressed or implied.
     6 * It probably won't make your computer catch on fire, or eat
    77 * your children, but it might.  Use at your own risk.
    88 *
     
    1515import java.util.Date;
    1616import java.util.HashMap;
    17 import java.util.HashSet;
    1817import java.util.Iterator;
    1918import java.util.List;
     
    4746    private TransportEventListener _listener;
    4847    private RouterAddress _currentAddress;
    49     private List _sendPool;
     48    private final List _sendPool;
    5049    protected RouterContext _context;
    5150    /** map from routerIdentHash to timestamp (Long) that the peer was last unreachable */
    52     private Map<Hash, Long>  _unreachableEntries;
     51    private final Map<Hash, Long>  _unreachableEntries;
    5352    private Set<Hash> _wasUnreachableEntries;
    5453    /** global router ident -> IP */
     
    6261        _context = context;
    6362        _log = _context.logManager().getLog(TransportImpl.class);
    64        
     63
    6564        _context.statManager().createRateStat("transport.sendMessageFailureLifetime", "How long the lifetime of messages that fail are?", "Transport", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
    6665        _context.statManager().createRateStat("transport.sendMessageSize", "How large are the messages sent?", "Transport", new long[] { 60*1000l, 5*60*1000l, 60*60*1000l, 24*60*60*1000l });
     
    7574        _currentAddress = null;
    7675    }
    77    
     76
    7877    /**
    7978     * How many peers can we talk to right now?
     
    112111     */
    113112    public boolean haveCapacity() { return true; }
    114    
     113
    115114    /**
    116115     * Return our peer clock skews on a transport.
     
    119118     */
    120119    public Vector getClockSkews() { return new Vector(); }
    121    
     120
    122121    public List getMostRecentErrorMessages() { return Collections.EMPTY_LIST; }
    123122    /**
    124123     * Nonblocking call to pull the next outbound message
    125      * off the queue. 
     124     * off the queue.
    126125     *
    127126     * @return the next message or null if none are available
     
    136135        return msg;
    137136    }
    138    
     137
    139138    /**
    140139     * The transport is done sending this message
     
    168167    /**
    169168     * The transport is done sending this message.  This is the method that actually
    170      * does all of the cleanup - firing off jobs, requeueing, updating stats, etc. 
     169     * does all of the cleanup - firing off jobs, requeueing, updating stats, etc.
    171170     *
    172171     * @param msg message in question
     
    181180        else
    182181            msg.timestamp("afterSend(failed)");
    183        
     182
    184183        if (!sendSuccessful)
    185184            msg.transportFailed(getStyle());
     
    187186        if (msToSend > 1000) {
    188187            if (_log.shouldLog(Log.WARN))
    189                 _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " 
    190                           + msg.getMessageType() + " " + msg.getMessageId() + " to " 
    191                           + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend 
     188                _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte "
     189                          + msg.getMessageType() + " " + msg.getMessageId() + " to "
     190                          + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + " took " + msToSend
    192191                          + "/" + msg.getTransmissionTime());
    193192        }
    194         //if (true) 
     193        //if (true)
    195194        //    _log.error("(not error) I2NP message sent? " + sendSuccessful + " " + msg.getMessageId() + " after " + msToSend + "/" + msg.getTransmissionTime());
    196        
     195
    197196        long lifetime = msg.getLifetime();
    198197        if (lifetime > 3000) {
     
    201200                level = Log.INFO;
    202201            if (_log.shouldLog(level))
    203                 _log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " 
    204                           + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) 
     202                _log.log(level, "afterSend slow (" + lifetime + "/" + msToSend + "/" + msg.getTransmissionTime() + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
     203                          + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
    205204                          + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString());
    206205        } else {
    207206            if (_log.shouldLog(Log.INFO))
    208                 _log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " 
    209                           + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) 
     207                _log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte "
     208                          + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6)
    210209                          + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString());
    211210        }
     
    213212        if (sendSuccessful) {
    214213            if (_log.shouldLog(Log.DEBUG))
    215                 _log.debug("Send message " + msg.getMessageType() + " to " 
    216                            + msg.getTarget().getIdentity().getHash().toBase64() + " with transport " 
     214                _log.debug("Send message " + msg.getMessageType() + " to "
     215                           + msg.getTarget().getIdentity().getHash().toBase64() + " with transport "
    217216                           + getStyle() + " successfully");
    218217            Job j = msg.getOnSendJob();
    219             if (j != null) 
     218            if (j != null)
    220219                _context.jobQueue().addJob(j);
    221220            log = true;
     
    223222        } else {
    224223            if (_log.shouldLog(Log.INFO))
    225                 _log.info("Failed to send message " + msg.getMessageType() 
    226                           + " to " + msg.getTarget().getIdentity().getHash().toBase64() 
     224                _log.info("Failed to send message " + msg.getMessageType()
     225                          + " to " + msg.getTarget().getIdentity().getHash().toBase64()
    227226                          + " with transport " + getStyle() + " (details: " + msg + ")");
    228227            if (msg.getExpiration() < _context.clock().now())
    229228                _context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime);
    230            
     229
    231230            if (allowRequeue) {
    232                 if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) 
     231                if ( ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) )
    233232                     && (msg.getMessage() != null) ) {
    234233                    // this may not be the last transport available - keep going
     
    237236                } else {
    238237                    if (_log.shouldLog(Log.INFO))
    239                         _log.info("No more time left (" + new Date(msg.getExpiration()) 
    240                                   + ", expiring without sending successfully the " 
     238                        _log.info("No more time left (" + new Date(msg.getExpiration())
     239                                  + ", expiring without sending successfully the "
    241240                                  + msg.getMessageType());
    242241                    if (msg.getOnFailedSendJob() != null)
     
    252251                MessageSelector selector = msg.getReplySelector();
    253252                if (_log.shouldLog(Log.INFO))
    254                     _log.info("Failed and no requeue allowed for a " 
    255                               + msg.getMessageSize() + " byte " 
     253                    _log.info("Failed and no requeue allowed for a "
     254                              + msg.getMessageSize() + " byte "
    256255                              + msg.getMessageType() + " message with selector " + selector, new Exception("fail cause"));
    257256                if (msg.getOnFailedSendJob() != null)
     
    270269            // the udp transport logs some further details
    271270            /*
    272             _context.messageHistory().sendMessage(type, msg.getMessageId(), 
     271            _context.messageHistory().sendMessage(type, msg.getMessageId(),
    273272                                                  msg.getExpiration(),
    274                                                   msg.getTarget().getIdentity().getHash(), 
     273                                                  msg.getTarget().getIdentity().getHash(),
    275274                                                  sendSuccessful);
    276275             */
     
    282281        if (allTime > 5*1000) {
    283282            if (_log.shouldLog(Log.INFO))
    284                 _log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful 
    285                           + "): " + allTime + "ms/" + sendTime + "ms after failing on: " 
     283                _log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful
     284                          + "): " + allTime + "ms/" + sendTime + "ms after failing on: "
    286285                          + msg.getFailedTransports() + " and succeeding on " + getStyle());
    287286            if ( (allTime > 60*1000) && (sendSuccessful) ) {
    288287                // WTF!!@#
    289288                if (_log.shouldLog(Log.WARN))
    290                     _log.warn("WTF, more than a minute slow? " + msg.getMessageType() 
    291                               + " of id " + msg.getMessageId() + " (send begin on " 
    292                               + new Date(msg.getSendBegin()) + " / created on " 
     289                    _log.warn("WTF, more than a minute slow? " + msg.getMessageType()
     290                              + " of id " + msg.getMessageId() + " (send begin on "
     291                              + new Date(msg.getSendBegin()) + " / created on "
    293292                              + new Date(msg.getCreated()) + "): " + msg, msg.getCreatedBy());
    294                 _context.messageHistory().messageProcessingError(msg.getMessageId(), 
    295                                                                  msg.getMessageType(), 
     293                _context.messageHistory().messageProcessingError(msg.getMessageId(),
     294                                                                 msg.getMessageType(),
    296295                                                                 "Took too long to send [" + allTime + "ms]");
    297296            }
    298297        }
    299298
    300        
     299
    301300        if (sendSuccessful) {
    302301            _context.statManager().addRateData("transport.sendProcessingTime", lifetime, lifetime);
     
    308307        }
    309308    }
    310    
     309
    311310    /**
    312311     * Asynchronously send the message as requested in the message and, if the
     
    324323        boolean duplicate = false;
    325324        synchronized (_sendPool) {
    326             if (_sendPool.contains(msg)) 
     325            if (_sendPool.contains(msg))
    327326                duplicate = true;
    328327            else
     
    331330        if (duplicate) {
    332331            if (_log.shouldLog(Log.ERROR))
    333                 _log.error("Message already is in the queue?  wtf.  msg = " + msg, 
     332                _log.error("Message already is in the queue?  wtf.  msg = " + msg,
    334333                           new Exception("wtf, requeued?"));
    335334        }
     
    347346     */
    348347    protected abstract void outboundMessageReady();
    349    
     348
    350349    /**
    351350     * Message received from the I2NPMessageReader - send it to the listener
     
    353352     */
    354353    public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) {
    355         //if (true) 
     354        //if (true)
    356355        //    _log.error("(not error) I2NP message received: " + inMsg.getUniqueId() + " after " + msToReceive);
    357        
     356
    358357        int level = Log.INFO;
    359358        if (msToReceive > 5000)
     
    386385            _context.statManager().addRateData("transport.receiveMessageSize", bytesReceived, msToReceive);
    387386        }
    388        
     387
    389388        _context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive);
    390389        if (msToReceive > 1000) {
     
    395394        //String type = inMsg.getClass().getName();
    396395        //MessageHistory.getInstance().receiveMessage(type, inMsg.getUniqueId(), inMsg.getMessageExpiration(), remoteIdentHash, true);
    397            
     396
    398397        if (_listener != null) {
    399398            _listener.messageReceived(inMsg, remoteIdent, remoteIdentHash);
     
    403402        }
    404403    }
    405        
     404
    406405    /** What addresses are we currently listening to? */
    407     public RouterAddress getCurrentAddress() { 
     406    public RouterAddress getCurrentAddress() {
    408407        return _currentAddress;
    409408    }
     
    418417            _context.commSystem().notifyReplaceAddress(address);
    419418    }
    420    
     419
    421420    /** Who to notify on message availability */
    422421    public void setListener(TransportEventListener listener) { _listener = listener; }
     
    424423    public void renderStatusHTML(Writer out) throws IOException {}
    425424    public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException { renderStatusHTML(out); }
    426    
     425
    427426    public RouterContext getContext() { return _context; }
    428427    public short getReachabilityStatus() { return CommSystemFacade.STATUS_UNKNOWN; }
     
    430429    public boolean isBacklogged(Hash dest) { return false; }
    431430    public boolean isEstablished(Hash dest) { return false; }
    432    
     431
    433432    private static final long UNREACHABLE_PERIOD = 5*60*1000;
    434433    public boolean isUnreachable(Hash peer) {
     
    507506    }
    508507
    509     public static void setIP(Hash peer, byte[] ip) {
     508    public /* static */ void setIP(Hash peer, byte[] ip) {
    510509        _IPMap.put(peer, ip);
    511510    }
     
    518517        if (addr.length == 4) {
    519518            if ((addr[0]&0xFF) == 127) return false;
    520             if ((addr[0]&0xFF) == 10) return false; 
     519            if ((addr[0]&0xFF) == 10) return false;
    521520            if ( ((addr[0]&0xFF) == 172) && ((addr[1]&0xFF) >= 16) && ((addr[1]&0xFF) <= 31) ) return false;
    522521            if ( ((addr[0]&0xFF) == 192) && ((addr[1]&0xFF) == 168) ) return false;
  • router/java/src/net/i2p/router/transport/ntcp/EstablishState.java

    r834fdfe9 rd0376f8  
    4646    private RouterContext _context;
    4747    private Log _log;
    48    
     48
    4949    // bob receives (and alice sends)
    5050    private byte _X[];
     
    6161    private transient long _tsA;
    6262    private transient byte _e_bobSig[];
    63    
     63
    6464    /** previously received encrypted block (or the IV) */
    6565    private byte _prevEncrypted[];
     
    6767    private byte _curEncrypted[];
    6868    /**
    69      * next index in _curEncrypted to write to (equals _curEncrypted length if the block is 
     69     * next index in _curEncrypted to write to (equals _curEncrypted length if the block is
    7070     * ready to decrypt)
    7171     */
     
    7373    /** decryption buffer */
    7474    private byte _curDecrypted[];
    75    
     75
    7676    /** bytes received so far */
    7777    private int _received;
    7878    /** bytes sent so far */
    7979    private int _sent;
    80    
     80
    8181    private byte _extra[];
    82    
     82
    8383    private DHSessionKeyBuilder _dh;
    84    
     84
    8585    private NTCPTransport _transport;
    8686    private NTCPConnection _con;
     
    9393    private boolean _confirmWritten;
    9494    private boolean _failedBySkew;
    95    
     95
    9696    public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) {
    9797        _context = ctx;
     
    114114            DataHelper.xor(hx, 0, con.getRemotePeer().calculateHash().getData(), 0, _hX_xor_bobIdentHash, 0, hx.length);
    115115        }
    116        
     116
    117117        _prevEncrypted = new byte[16];
    118118        _curEncrypted = new byte[16];
    119119        _curEncryptedOffset = 0;
    120120        _curDecrypted = new byte[16];
    121        
     121
    122122        _received = 0;
    123123    }
    124    
     124
    125125    /**
    126126     * parse the contents of the buffer as part of the handshake.  if the
     
    134134        if (!src.hasRemaining())
    135135            return; // nothing to receive
    136        
     136
    137137        if (_log.shouldLog(Log.DEBUG))
    138138            _log.debug(prefix()+"receive " + src);
     
    142142            receiveOutbound(src);
    143143    }
    144    
     144
    145145    /**
    146146     * we have written all of the data required to confirm the connection
     
    148148     */
    149149    public boolean confirmWritten() { return _confirmWritten; }
    150    
     150
    151151    public boolean getFailedBySkew() { return _failedBySkew; }
    152    
     152
    153153    /** we are Bob, so receive these bytes as part of an inbound connection */
    154154    private void receiveInbound(ByteBuffer src) {
     
    179179                if (_log.shouldLog(Log.DEBUG))
    180180                    _log.debug(prefix()+"Enough data for a DH received");
    181                
     181
    182182                // first verify that Alice knows who she is trying to talk with and that the X
    183183                // isn't corrupt
     
    202202                    if (_log.shouldLog(Log.DEBUG))
    203203                        _log.debug(prefix()+"DH session key calculated (" + _dh.getSessionKey().toBase64() + ")");
    204                    
     204
    205205                    // now prepare our response: Y+E(H(X+Y)+tsB+padding, sk, Y[239:255])
    206206                    _Y = _dh.getMyPublicValueBytes();
     
    234234                    System.arraycopy(_Y, 0, write, 0, _Y.length);
    235235                    System.arraycopy(_e_hXY_tsB, 0, write, _Y.length, _e_hXY_tsB.length);
    236                    
     236
    237237                    // ok, now that is prepared, we want to actually send it, so make sure we are up for writing
    238238                    _transport.getPumper().wantsWrite(_con, write);
     
    244244                }
    245245            }
    246            
     246
    247247            // ok, we are onto the encrypted area
    248248            while (src.hasRemaining() && !_corrupt) {
     
    257257                    //if (_log.shouldLog(Log.DEBUG))
    258258                    //    _log.debug(prefix()+"full block read and decrypted: " + Base64.encode(_curDecrypted));
    259                    
     259
    260260                    byte swap[] = new byte[16];
    261261                    _prevEncrypted = _curEncrypted;
    262262                    _curEncrypted = swap;
    263263                    _curEncryptedOffset = 0;
    264                    
     264
    265265                    if (_aliceIdentSize <= 0) { // we are on the first decrypted block
    266266                        _aliceIdentSize = (int)DataHelper.fromLong(_curDecrypted, 0, 2);
     
    293293                                prepareExtra(src);
    294294                            if (_log.shouldLog(Log.DEBUG))
    295                                 _log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size() 
    296                                            + " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize 
     295                                _log.debug(prefix()+"verifying size (sz=" + _sz_aliceIdent_tsA_padding_aliceSig.size()
     296                                           + " expected=" + _sz_aliceIdent_tsA_padding_aliceSigSize
    297297                                           + " corrupt=" + _corrupt
    298298                                           + " verified=" + _verified + " extra=" + (_extra != null ? _extra.length : 0) + ")");
     
    311311        }
    312312    }
    313    
     313
    314314    /** we are Alice, so receive these bytes as part of an outbound connection */
    315315    private void receiveOutbound(ByteBuffer src) {
    316316        if (_log.shouldLog(Log.DEBUG)) _log.debug(prefix()+"Receive outbound " + src + " received=" + _received);
    317        
     317
    318318        // recv Y+E(H(X+Y)+tsB, sk, Y[239:255])
    319319        while (_received < _Y.length && src.hasRemaining()) {
     
    362362                if (_log.shouldLog(Log.DEBUG))
    363363                    _log.debug(prefix()+"h(X+Y) is correct, tsA-tsB=" + (_tsA-_tsB));
    364                
     364
    365365                // the skew is not authenticated yet, but it is certainly fatal to
    366366                // the establishment, so fail hard if appropriate
     
    375375                    _log.debug(prefix()+"Clock skew: " + diff + " ms");
    376376                }
    377                
     377
    378378                // now prepare and send our response
    379379                // send E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB), sk, hX_xor_Bob.identHash[16:31])
     
    391391                //System.arraycopy(sigPad, 0, preSign, _X.length+_Y.length+Hash.HASH_LENGTH+4+4, padSig);
    392392                Signature sig = _context.dsa().sign(preSign, _context.keyManager().getSigningPrivateKey());
    393  
     393
    394394                //if (_log.shouldLog(Log.DEBUG)) {
    395395                //    _log.debug(prefix()+"signing " + Base64.encode(preSign));
    396396                //}
    397                
     397
    398398                byte ident[] = _context.router().getRouterInfo().getIdentity().toByteArray();
    399399                int min = 2+ident.length+4+Signature.SIGNATURE_BYTES;
     
    410410                System.arraycopy(pad, 0, preEncrypt, 2+ident.length+4, padding);
    411411                System.arraycopy(sig.getData(), 0, preEncrypt, 2+ident.length+4+padding, Signature.SIGNATURE_BYTES);
    412                
     412
    413413                _prevEncrypted = new byte[preEncrypt.length];
    414414                _context.aes().encrypt(preEncrypt, 0, _prevEncrypted, 0, _dh.getSessionKey(), _hX_xor_bobIdentHash, _hX_xor_bobIdentHash.length-16, preEncrypt.length);
    415                
     415
    416416                if (_log.shouldLog(Log.DEBUG)) {
    417417                    //_log.debug(prefix() + "unencrypted response to Bob: " + Base64.encode(preEncrypt));
     
    424424        if (_received >= _Y.length + _e_hXY_tsB.length && src.hasRemaining()) {
    425425            // we are receiving their confirmation
    426            
     426
    427427            // recv E(S(X+Y+Alice.identHash+tsA+tsB)+padding, sk, prev)
    428428            int off = 0;
     
    440440                _e_bobSig[off++] = src.get();
    441441                _received++;
    442                
     442
    443443                if (off >= _e_bobSig.length) {
    444444                    //if (_log.shouldLog(Log.DEBUG))
     
    450450                    System.arraycopy(bobSig, 0, bobSigData, 0, Signature.SIGNATURE_BYTES);
    451451                    Signature sig = new Signature(bobSigData);
    452                    
     452
    453453                    byte toVerify[] = new byte[_X.length+_Y.length+Hash.HASH_LENGTH+4+4];
    454454                    int voff = 0;
     
    458458                    DataHelper.toLong(toVerify, voff, 4, _tsA); voff += 4;
    459459                    DataHelper.toLong(toVerify, voff, 4, _tsB); voff += 4;
    460                    
     460
    461461                    _verified = _context.dsa().verifySignature(sig, toVerify, _con.getRemotePeer().getSigningPublicKey());
    462462                    if (!_verified) {
     
    482482        }
    483483    }
    484    
     484
    485485    /** did the handshake fail for some reason? */
    486486    public boolean isCorrupt() { return _err != null; }
    487487    /** @return is the handshake complete and valid? */
    488488    public boolean isComplete() { return _verified; }
    489    
     489
    490490    /**
    491491     * we are establishing an outbound connection, so prepare ourselves by
     
    505505        }
    506506    }
    507    
     507
    508508    /**
    509509     * make sure the signatures are correct, and if they are, update the
    510      * NIOConnection with the session key / peer ident / clock skew / iv. 
     510     * NIOConnection with the session key / peer ident / clock skew / iv.
    511511     * The NIOConnection itself is responsible for registering with the
    512512     * transport
     
    517517        //if (_log.shouldLog(Log.DEBUG))
    518518        //    _log.debug(prefix()+"decrypted sz(etc) data: " + Base64.encode(b));
    519        
     519
    520520        try {
    521521            RouterIdentity alice = new RouterIdentity();
    522             int sz = (int)DataHelper.fromLong(b, 0, 2);
     522            int sz = (int)DataHelper.fromLong(b, 0, 2); // TO-DO: Hey zzz... Throws an NPE for me... see below, for my "quick fix", need to find out the real reason
    523523            if ( (sz <= 0) || (sz > b.length-2-4-Signature.SIGNATURE_BYTES) ) {
    524524                _context.statManager().addRateData("ntcp.invalidInboundSize", sz, 0);
     
    530530            alice.fromByteArray(aliceData);
    531531            long tsA = DataHelper.fromLong(b, 2+sz, 4);
    532            
     532
    533533            ByteArrayOutputStream baos = new ByteArrayOutputStream(768);
    534534            baos.write(_X);
     
    538538            baos.write(DataHelper.toLong(4, _tsB));
    539539            //baos.write(b, 2+sz+4, b.length-2-sz-4-Signature.SIGNATURE_BYTES);
    540            
     540
    541541            byte toVerify[] = baos.toByteArray();
    542542            if (_log.shouldLog(Log.DEBUG)) {
     
    567567                if (_log.shouldLog(Log.DEBUG))
    568568                    _log.debug(prefix() + "verification successful for " + _con);
    569                                
     569
    570570                long diff = 1000*Math.abs(tsA-_tsB);
    571571                if (diff >= Router.CLOCK_FUDGE_FACTOR) {
     
    598598            _context.statManager().addRateData("ntcp.invalidInboundDFE", 1, 0);
    599599            fail("Error verifying peer", dfe);
    600         }
    601     }
    602    
     600        } catch(NullPointerException npe) {
     601            fail("Error verifying peer", npe); // TO-DO: zzz This is that quick-fix. -- Sponge
     602        }
     603    }
     604
    603605    private void sendInboundConfirm(RouterIdentity alice, long tsA) {
    604606        // send Alice E(S(X+Y+Alice.identHash+tsA+tsB), sk, prev)
     
    611613        DataHelper.toLong(toSign, off, 4, tsA); off += 4;
    612614        DataHelper.toLong(toSign, off, 4, _tsB); off += 4;
    613        
     615
    614616        Signature sig = _context.dsa().sign(toSign, _context.keyManager().getSigningPrivateKey());
    615617        byte preSig[] = new byte[Signature.SIGNATURE_BYTES+8];
     
    620622        _e_bobSig = new byte[preSig.length];
    621623        _context.aes().encrypt(preSig, 0, _e_bobSig, 0, _dh.getSessionKey(), _e_hXY_tsB, _e_hXY_tsB.length-16, _e_bobSig.length);
    622    
     624
    623625        if (_log.shouldLog(Log.DEBUG))
    624626            _log.debug(prefix() + "Sending encrypted inbound confirmation");
    625627        _transport.getPumper().wantsWrite(_con, _e_bobSig);
    626628    }
    627    
     629
    628630    /** anything left over in the byte buffer after verification is extra */
    629631    private void prepareExtra(ByteBuffer buf) {
     
    637639            _log.debug(prefix() + "prepare extra " + remaining + " (total received: " + _received + ")");
    638640    }
    639    
     641
    640642    /**
    641643     * if complete, this will contain any bytes received as part of the
     
    643645     */
    644646    public byte[] getExtraBytes() { return _extra; }
    645    
     647
    646648    private void fail(String reason) { fail(reason, null); }
    647649    private void fail(String reason, Exception e) { fail(reason, e, false); }
     
    654656            _log.warn(prefix()+"Failed to establish: " + _err, e);
    655657    }
    656    
     658
    657659    public String getError() { return _err; }
    658660    public Exception getException() { return _e; }
    659    
     661
    660662    private String prefix() { return toString(); }
     663    @Override
    661664    public String toString() {
    662665        StringBuffer buf = new StringBuffer(64);
     
    670673        return buf.toString();
    671674    }
    672    
     675
    673676    /**
    674677     * a check info connection will receive 256 bytes containing:
     
    704707                long skewSeconds = (ctx.clock().now()/1000)-now;
    705708                if (log.shouldLog(Log.INFO))
    706                     log.info("Check info received: our IP: " + ourIP + " our port: " + port 
     709                    log.info("Check info received: our IP: " + ourIP + " our port: " + port
    707710                             + " skew: " + skewSeconds + " s");
    708711            } catch (UnknownHostException uhe) {
     
    718721        }
    719722    }
    720    
     723
    721724    public static void checkHost(String args[]) {
    722725        if (args.length != 3) {
     
    747750            DataHelper.xor(peer, 0, h.getData(), 0, toSend, toSend.length-32, peer.length);
    748751            System.out.println("check hash: " + h.toBase64());
    749            
     752
    750753            out.write(toSend);
    751754            out.flush();
     
    756759        }
    757760    }
    758    
     761
    759762    public static void main(String args[]) {
    760763        if (args.length == 3) {
     
    781784            out.flush();
    782785            //  DONE SENDING X+(H(X) xor Bob.identHash)----------------------------->
    783  
     786
    784787            //  NOW READ Y+E(H(X+Y)+tsB+padding, sk, Y[239:255])
    785788            InputStream in = s.getInputStream();
     
    809812            System.out.println("unencrypted H(X+Y)+tsB+padding: " + Base64.encode(decrypted));
    810813            long tsB = DataHelper.fromLong(decrypted, 32, 4);
    811            
     814
    812815            //try { Thread.sleep(40*1000); } catch (InterruptedException ie) {}
    813            
     816
    814817            RouterIdentity alice = new RouterIdentity();
    815818            Object k[] = ctx.keyGenerator().generatePKIKeypair();
     
    822825            alice.setPublicKey(pub);
    823826            alice.setSigningPublicKey(spub);
    824            
     827
    825828            //  SEND E(#+Alice.identity+tsA+padding+S(X+Y+Bob.identHash+tsA+tsB+padding), sk, hX_xor_Bob.identHash[16:31])--->
    826  
     829
    827830            ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
    828831            byte aliceb[] = alice.toByteArray();
     
    831834            baos.write(aliceb);
    832835            baos.write(DataHelper.toLong(4, tsA));
    833            
     836
    834837            int base = baos.size() + Signature.SIGNATURE_BYTES;
    835838            int rem = base % 16;
     
    841844            baos.write(pad);
    842845            base += padding;
    843            
     846
    844847            ByteArrayOutputStream sbaos = new ByteArrayOutputStream(512);
    845848            sbaos.write(X);
     
    851854            Signature sig = ctx.dsa().sign(sbaos.toByteArray(), spriv);
    852855            baos.write(sig.toByteArray());
    853            
     856
    854857            byte unencrypted[] = baos.toByteArray();
    855858            byte toWrite[] = new byte[unencrypted.length];
    856859            System.out.println("unencrypted.length = " + unencrypted.length + " alice.size = " + aliceb.length + " padding = " + padding + " base = " + base);
    857860            ctx.aes().encrypt(unencrypted, 0, toWrite, 0, dh.getSessionKey(), hx_xor_bih, 16, unencrypted.length);
    858  
     861
    859862            out.write(toWrite);
    860863            out.flush();
    861            
     864
    862865            System.out.println("unencrypted: " + Base64.encode(unencrypted));
    863866            System.out.println("encrypted: " + Base64.encode(toWrite));
     
    865868
    866869            // now check bob's signature
    867            
     870
    868871            SigningPublicKey bobPubKey = null;
    869872            try {
     
    875878                return;
    876879            }
    877            
     880
    878881            System.out.println("Reading in bob's sig");
    879            
     882
    880883            byte bobRead[] = new byte[48];
    881884            read = 0;
     
    893896            System.arraycopy(preSig, 0, bobSigData, 0, Signature.SIGNATURE_BYTES); // ignore the padding
    894897            System.out.println("Bob's sig: " + Base64.encode(bobSigData));
    895            
     898
    896899            byte signed[] = new byte[256+256+32+4+4];
    897900            int off = 0;
     
    905908            Signature bobSig = new Signature(bobSigData);
    906909            boolean ok = ctx.dsa().verifySignature(bobSig, signed, bobPubKey);
    907            
     910
    908911            System.out.println("bob's sig matches? " + ok);
    909            
     912
    910913            try { Thread.sleep(5*1000); } catch (InterruptedException ie) {}
    911914            byte fakeI2NPbuf[] = new byte[128];
     
    913916            out.write(fakeI2NPbuf);
    914917            out.flush();
    915            
     918
    916919            try { Thread.sleep(30*1000); } catch (InterruptedException ie) {}
    917920            s.close();
    918         } catch (Exception e) { 
     921        } catch (Exception e) {
    919922            e.printStackTrace();
    920923        }
  • router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java

    r834fdfe9 rd0376f8  
    3737    private SharedBid _slowBid;
    3838    private SharedBid _transientFail;
    39     private Object _conLock;
     39    private final Object _conLock;
    4040    private Map _conByIdent;
    4141    private NTCPAddress _myAddress;
     
    4747     * want to remove on establishment or close on timeout
    4848     */
    49     private List _establishing;
     49    private final List _establishing;
    5050
    5151    private List _sent;
    5252    private NTCPSendFinisher _finisher;
    53    
     53
    5454    public NTCPTransport(RouterContext ctx) {
    5555        super(ctx);
    56        
     56
    5757        _log = ctx.logManager().getLog(getClass());
    5858
     
    123123        _conLock = new Object();
    124124        _conByIdent = new HashMap(64);
    125        
     125
    126126        _sent = new ArrayList(4);
    127127        _finisher = new NTCPSendFinisher(ctx, this);
    128        
     128
    129129        _pumper = new EventPumper(ctx, this);
    130130        _reader = new Reader(ctx);
    131131        _writer = new net.i2p.router.transport.ntcp.Writer(ctx);
    132        
     132
    133133        _fastBid = new SharedBid(25); // best
    134134        _slowBid = new SharedBid(70); // better than ssu unestablished, but not better than ssu established
    135135        _transientFail = new SharedBid(TransportBid.TRANSIENT_FAIL);
    136136    }
    137    
     137
    138138    void inboundEstablished(NTCPConnection con) {
    139139        _context.statManager().addRateData("ntcp.inboundEstablished", 1, 0);
     
    151151        }
    152152    }
    153    
     153
    154154    protected void outboundMessageReady() {
    155155        OutNetMessage msg = getNextMessage();
     
    220220                    con.enqueueInfoMessage(); // enqueues a netDb store of our own info
    221221                    con.send(msg); // doesn't do anything yet, just enqueues it
    222      
     222
    223223                    try {
    224224                        SocketChannel channel = SocketChannel.open();
     
    238238        }
    239239    }
    240    
     240
     241    @Override
    241242    public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) {
    242243        super.afterSend(msg, sendSuccessful, allowRequeue, msToSend);
     
    260261            return null;
    261262        }
    262        
     263
    263264        boolean established = isEstablished(toAddress.getIdentity());
    264265        if (established) { // should we check the queue size?  nah, if its valid, use it
     
    268269        }
    269270        RouterAddress addr = toAddress.getTargetAddress(STYLE);
    270        
     271
    271272        if (addr == null) {
    272273            markUnreachable(peer);
     
    295296            }
    296297        }
    297        
     298
    298299        if (!allowConnection()) {
    299300            if (_log.shouldLog(Log.WARN))
     
    302303        }
    303304
    304         //if ( (_myAddress != null) && (_myAddress.equals(addr)) ) 
     305        //if ( (_myAddress != null) && (_myAddress.equals(addr)) )
    305306        //    return null; // dont talk to yourself
    306    
     307
    307308        if (_log.shouldLog(Log.DEBUG))
    308309            _log.debug("slow bid when trying to send to " + toAddress.getIdentity().calculateHash().toBase64());
    309310        return _slowBid;
    310311    }
    311    
     312
    312313    public boolean allowConnection() {
    313314        return countActivePeers() < getMaxConnections();
    314315    }
    315316
     317    @Override
    316318    public boolean haveCapacity() {
    317319        return countActivePeers() < getMaxConnections() * 4 / 5;
     
    324326        return isEstablished(peer.calculateHash());
    325327    }
    326    
     328
     329    @Override
    327330    public boolean isEstablished(Hash dest) {
    328331        synchronized (_conLock) {
     
    331334        }
    332335    }
    333    
     336
     337    @Override
    334338    public boolean isBacklogged(Hash dest) {
    335339        synchronized (_conLock) {
     
    338342        }
    339343    }
    340    
     344
    341345    void removeCon(NTCPConnection con) {
    342346        NTCPConnection removed = null;
     
    353357        }
    354358    }
    355    
     359
    356360    /**
    357361     * How many peers can we talk to right now?
    358362     *
    359363     */
     364    @Override
    360365    public int countActivePeers() { synchronized (_conLock) { return _conByIdent.size(); } }
    361366    /**
    362367     * How many peers are we actively sending messages to (this minute)
    363368     */
     369    @Override
    364370    public int countActiveSendPeers() {
    365371        int active = 0;
     
    373379        return active;
    374380    }
    375    
     381
    376382    /**
    377383     * Return our peer clock skews on this transport.
    378384     * Vector composed of Long, each element representing a peer skew in seconds.
    379385     */
     386    @Override
    380387    public Vector getClockSkews() {
    381388
     
    395402        return skews;
    396403    }
    397    
     404
    398405    private static final int NUM_CONCURRENT_READERS = 3;
    399406    private static final int NUM_CONCURRENT_WRITERS = 3;
    400    
     407
    401408    public RouterAddress startListening() {
    402409        if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting ntcp transport listening");
    403410        _finisher.start();
    404411        _pumper.startPumping();
    405        
     412
    406413        _reader.startReading(NUM_CONCURRENT_READERS);
    407414        _writer.startWriting(NUM_CONCURRENT_WRITERS);
    408        
     415
    409416        configureLocalAddress();
    410417        return bindAddress();
     
    415422        _finisher.start();
    416423        _pumper.startPumping();
    417        
     424
    418425        _reader.startReading(NUM_CONCURRENT_READERS);
    419426        _writer.startWriting(NUM_CONCURRENT_WRITERS);
    420        
     427
    421428        _myAddress = new NTCPAddress(addr);
    422429        return bindAddress();
     
    449456                _log.info("Outbound NTCP connections only - no listener configured");
    450457        }
    451        
     458
    452459        if (_myAddress != null) {
    453460            RouterAddress rv = _myAddress.toRouterAddress();
     
    459466        }
    460467    }
    461    
     468
    462469    Reader getReader() { return _reader; }
    463470    net.i2p.router.transport.ntcp.Writer getWriter() { return _writer; }
    464471    public String getStyle() { return STYLE; }
    465472    EventPumper getPumper() { return _pumper; }
    466    
     473
    467474    /**
    468475     * how long from initial connection attempt (accept() or connect()) until
     
    505512            _context.statManager().addRateData("ntcp.outboundEstablishFailed", expired.size(), 0);
    506513    }
    507    
     514
    508515    //private boolean bindAllInterfaces() { return true; }
    509    
     516
    510517    private void configureLocalAddress() {
    511518        RouterContext ctx = getContext();
     
    532539        }
    533540    }
    534    
     541
    535542    /**
    536543     *  This doesn't (completely) block, caller should check isAlive()
     
    554561    }
    555562    public static final String STYLE = "NTCP";
    556    
     563
    557564    public void renderStatusHTML(java.io.Writer out, int sortFlags) throws IOException {}
     565    @Override
    558566    public void renderStatusHTML(java.io.Writer out, String urlBase, int sortFlags) throws IOException {
    559567        TreeSet peers = new TreeSet(getComparator(sortFlags));
     
    576584        long totalSend = 0;
    577585        long totalRecv = 0;
    578        
     586
    579587        StringBuffer buf = new StringBuffer(512);
    580588        buf.append("<b id=\"ntcpcon\">NTCP connections: ").append(peers.size());
     
    667675            buf.append("</td></tr>\n");
    668676        }
    669                
     677
    670678        buf.append("</table>\n");
    671679        buf.append("Peers currently reading I2NP messages: ").append(readingPeers).append("<br />\n");
     
    674682        buf.setLength(0);
    675683    }
    676    
    677     private static NumberFormat _rateFmt = new DecimalFormat("#,#0.00");
     684
     685    private static final NumberFormat _rateFmt = new DecimalFormat("#,#0.00");
    678686    private static String formatRate(float rate) {
    679687        synchronized (_rateFmt) { return _rateFmt.format(rate); }
    680688    }
    681    
     689
    682690    private Comparator getComparator(int sortFlags) {
    683691        Comparator rv = null;
     
    703711    private static class PeerComparator implements Comparator {
    704712        public int compare(Object lhs, Object rhs) {
    705             if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection)) 
     713            if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection))
    706714                throw new IllegalArgumentException("rhs = " + rhs + " lhs = " + lhs);
    707715            return compare((NTCPConnection)lhs, (NTCPConnection)rhs);
     
    712720        }
    713721    }
    714    
     722
    715723    /**
    716724     * Cache the bid to reduce object churn
     
    718726    private class SharedBid extends TransportBid {
    719727        public SharedBid(int ms) { super(); setLatencyMs(ms); }
     728        @Override
    720729        public Transport getTransport() { return NTCPTransport.this; }
     730        @Override
    721731        public String toString() { return "NTCP bid @ " + getLatencyMs(); }
    722732    }
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java

    r834fdfe9 rd0376f8  
    1313/**
    1414 * Coordinate the outbound fragments and select the next one to be built.
    15  * This pool contains messages we are actively trying to send, essentially 
     15 * This pool contains messages we are actively trying to send, essentially
    1616 * doing a round robin across each message to send one fragment, as implemented
    17  * in {@link #getNextVolley()}.  This also honors per-peer throttling, taking 
     17 * in {@link #getNextVolley()}.  This also honors per-peer throttling, taking
    1818 * note of each peer's allocations.  If a message has each of its fragments
    19  * sent more than a certain number of times, it is failed out.  In addition, 
    20  * this instance also receives notification of message ACKs from the 
    21  * {@link InboundMessageFragments}, signaling that we can stop sending a 
     19 * sent more than a certain number of times, it is failed out.  In addition,
     20 * this instance also receives notification of message ACKs from the
     21 * {@link InboundMessageFragments}, signaling that we can stop sending a
    2222 * message.
    23  * 
     23 *
    2424 */
    2525public class OutboundMessageFragments {
     
    2727    private Log _log;
    2828    private UDPTransport _transport;
    29     private ActiveThrottle _throttle;
     29    private ActiveThrottle _throttle; // LINT not used ??
    3030    /** peers we are actively sending messages to */
    31     private List _activePeers;
     31    private final List _activePeers;
    3232    private boolean _alive;
    3333    /** which peer should we build the next packet out of? */
     
    3535    private PacketBuilder _builder;
    3636    /** if we can handle more messages explicitly, set this to true */
    37     private boolean _allowExcess;
    38     private volatile long _packetsRetransmitted;
    39    
    40     private static final int MAX_ACTIVE = 64;
     37    private boolean _allowExcess; // LINT not used??
     38    private volatile long _packetsRetransmitted; // LINT not used??
     39
     40    // private static final int MAX_ACTIVE = 64; // not used.
    4141    // don't send a packet more than 10 times
    4242    static final int MAX_VOLLEYS = 10;
    43    
     43
    4444    public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
    4545        _context = ctx;
     
    7171        _context.statManager().createRateStat("udp.sendCycleTimeSlow", "How long it takes to cycle through all of the active messages, when its going slowly?", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
    7272    }
    73    
     73
    7474    public void startup() { _alive = true; }
    7575    public void shutdown() {
     
    8888        }
    8989    }
    90    
     90
    9191    /**
    9292     * Block until we allow more messages to be admitted to the active
     
    9696     */
    9797    public boolean waitForMoreAllowed() {
    98         // test without choking. 
     98        // test without choking.
    9999        // perhaps this should check the lifetime of the first activeMessage?
    100100        if (true) return true;
    101101        /*
    102        
     102
    103103        long start = _context.clock().now();
    104104        int numActive = 0;
     
    124124        return false;
    125125    }
    126    
     126
    127127    /**
    128128     * Add a new message to the active pool
     
    134134        if ( (msgBody == null) || (target == null) )
    135135            return;
    136        
     136
    137137        // todo: make sure the outNetMessage is initialzed once and only once
    138138        OutboundMessageState state = new OutboundMessageState(_context);
     
    165165        //finishMessages();
    166166    }
    167    
    168     /** 
    169      * short circuit the OutNetMessage, letting us send the establish 
     167
     168    /**
     169     * short circuit the OutNetMessage, letting us send the establish
    170170     * complete message reliably
    171171     */
     
    229229        }
    230230    }
    231    
     231
    232232    private long _lastCycleTime = System.currentTimeMillis();
    233    
    234     /**
    235      * Fetch all the packets for a message volley, blocking until there is a 
     233
     234    /**
     235     * Fetch all the packets for a message volley, blocking until there is a
    236236     * message which can be fully transmitted (or the transport is shut down).
    237237     * The returned array may be sparse, with null packets taking the place of
     
    271271                    }
    272272                    if (_log.shouldLog(Log.DEBUG))
    273                         _log.debug("Done looping, next peer we are sending for: " + 
     273                        _log.debug("Done looping, next peer we are sending for: " +
    274274                                   (peer != null ? peer.getRemotePeer().toBase64() : "none"));
    275275                    if (state == null) {
     
    292292            }
    293293        }
    294        
     294
    295295        if (_log.shouldLog(Log.DEBUG))
    296296            _log.debug("Sending " + state);
    297    
     297
    298298        UDPPacket packets[] = preparePackets(state, peer);
    299299        if ( (state != null) && (state.getMessage() != null) ) {
     
    304304            /*
    305305            state.getMessage().timestamp("sending a volley of " + valid
    306                                          + " lastReceived: " 
     306                                         + " lastReceived: "
    307307                                         + (_context.clock().now() - peer.getLastReceiveTime())
    308                                          + " lastSentFully: " 
     308                                         + " lastSentFully: "
    309309                                         + (_context.clock().now() - peer.getLastSendFullyTime()));
    310310            */
     
    312312        return packets;
    313313    }
    314    
     314
    315315    private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
    316316        if ( (state != null) && (peer != null) ) {
     
    318318            if (fragments < 0)
    319319                return null;
    320            
     320
    321321            // ok, simplest possible thing is to always tack on the bitfields if
    322322            List msgIds = peer.getCurrentFullACKs();
     
    354354            if (sparseCount > 0)
    355355                remaining.clear();
    356            
     356
    357357            int piggybackedAck = 0;
    358358            if (msgIds.size() != remaining.size()) {
     
    365365                }
    366366            }
    367            
     367
    368368            if (sparseCount > 0)
    369369                _context.statManager().addRateData("udp.sendSparse", sparseCount, state.getLifetime());
     
    391391        }
    392392    }
    393    
     393
    394394    /**
    395395     * We received an ACK of the given messageId from the given peer, so if it
    396      * is still unacked, mark it as complete. 
     396     * is still unacked, mark it as complete.
    397397     *
    398398     * @return fragments acked
     
    410410        }
    411411    }
    412    
     412
    413413    public void acked(ACKBitfield bitfield, Hash ackedBy) {
    414414        PeerState peer = _transport.getPeerState(ackedBy);
     
    422422        }
    423423    }
    424    
     424
    425425    public interface ActiveThrottle {
    426426        public void choke(Hash peer);
Note: See TracChangeset for help on using the changeset viewer.