Changeset 612fab1


Ignore:
Timestamp:
Aug 21, 2012 7:53:08 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
85fbbf8
Parents:
fbd8c69
Message:
  • SSU:
    • Use external, not internal port to sign SessionCreated? message. Together with previous fix to allow external port change, this should fix session establish fails when NAT changes our port
    • Track outbound establishments by both Hash and IP/port, to improve lookups of establishments in progress
    • Fix expiration of outbound establishments
    • Validate address/port in RelayResponse? messages
    • Change RemoteHostID to store Hash instead of byte[] for the peer hash
    • Log tweaks
Files:
10 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    rfbd8c69 r612fab1  
     12012-08-21 zzz
     2 * NetDB: Decrease stat publish probability
     3 * SSU:
     4   - Use external, not internal port to sign SessionCreated message.
     5     Together with previous fix to allow external port change, this
     6     should fix session establish fails when NAT changes our port
     7   - Track outbound establishments by both Hash and IP/port,
     8     to improve lookups of establishments in progress
     9   - Fix expiration of outbound establishments
     10   - Validate address/port in RelayResponse messages
     11   - Change RemoteHostID to store Hash instead of byte[] for the peer hash
     12   - Log tweaks
     13
    1142012-08-20 zzz
    215 * I2CP: MessageStatus cleanup
  • router/java/src/net/i2p/router/RouterVersion.java

    rfbd8c69 r612fab1  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 13;
     21    public final static long BUILD = 14;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    rfbd8c69 r612fab1  
    4040    private final UDPTransport _transport;
    4141    private final PacketBuilder _builder;
     42
    4243    /** map of RemoteHostId to InboundEstablishState */
    4344    private final ConcurrentHashMap<RemoteHostId, InboundEstablishState> _inboundStates;
    44     /** map of RemoteHostId to OutboundEstablishState */
     45
     46    /**
     47     * Map of RemoteHostId to OutboundEstablishState.
     48     * The key could be either an IP/Port (for direct) or
     49     * a Hash (for indirect, before the RelayResponse is received).
     50     * Once the RelayResponse is received we change the key.
     51     */
    4552    private final ConcurrentHashMap<RemoteHostId, OutboundEstablishState> _outboundStates;
     53
    4654    /** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */
    4755    private final ConcurrentHashMap<RemoteHostId, List<OutNetMessage>> _queuedOutbound;
    48     /** map of nonce (Long) to OutboundEstablishState */
     56
     57    /**
     58     *  Map of nonce (Long) to OutboundEstablishState.
     59     *  Only for indirect, before we receive the RelayResponse.
     60     *  This is so we can lookup state for the RelayResponse.
     61     *  After we receive the relay response, _outboundStates is keyed by actual IP.
     62     */
    4963    private final ConcurrentHashMap<Long, OutboundEstablishState> _liveIntroductions;
     64
     65    /**
     66     *  Map of claimed IP/port to OutboundEstablishState.
     67     *  Only for indirect, before we receive the RelayResponse.
     68     *  This is so we can lookup a pending introduction by IP
     69     *  even before we know the "real" IP, so we can match an inbound packet.
     70     *  After we receive the relay response, _outboundStates is keyed by actual IP.
     71     */
     72    private final ConcurrentHashMap<RemoteHostId, OutboundEstablishState> _outboundByClaimedAddress;
     73
     74    /**
     75     *  Map of router hash to OutboundEstablishState.
     76     *  Only for indirect, after we receive the RelayResponse.
     77     *  This is so we can lookup a pending connection by Hash
     78     *  even after we've got the IP/port, so we can match a subsequent outbound packet.
     79     *  Before we receive the relay response, _outboundStates is keyed by hash.
     80     */
     81    private final ConcurrentHashMap<Hash, OutboundEstablishState> _outboundByHash;
     82
    5083    private volatile boolean _alive;
    5184    private final Object _activityLock;
     
    82115
    83116    /** max before receiving a response to a single message during outbound establishment */
    84     private static final int OB_MESSAGE_TIMEOUT = 15*1000;
     117    public static final int OB_MESSAGE_TIMEOUT = 15*1000;
    85118
    86119    /** for the DSM and or netdb store */
     
    96129        _queuedOutbound = new ConcurrentHashMap();
    97130        _liveIntroductions = new ConcurrentHashMap();
     131        _outboundByClaimedAddress = new ConcurrentHashMap();
     132        _outboundByHash = new ConcurrentHashMap();
    98133        _activityLock = new Object();
    99134        _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
     
    150185    OutboundEstablishState getOutboundState(RemoteHostId from) {
    151186            OutboundEstablishState state = _outboundStates.get(from);
     187            if (state == null) {
     188                state = _outboundByClaimedAddress.get(from);
     189                if (state != null && _log.shouldLog(Log.INFO))
     190                    _log.info("Found by claimed address: " + state);
     191            }
    152192            // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
    153193            //     _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates);
     
    167207     * the message is failed.
    168208     *
     209     * Note - if we go back to multiple PacketHandler threads, this may need more locking.
    169210     */
    170211    public void establish(OutNetMessage msg) {
    171         RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
     212        establish(msg, true);
     213    }
     214
     215    /**
     216     *  @param queueIfMaxExceeded true normally, false if called from locked_admit so we don't loop
     217     *  @since 0.9.2
     218     */
     219    private void establish(OutNetMessage msg, boolean queueIfMaxExceeded) {
     220        RouterInfo toRouterInfo = msg.getTarget();
     221        RouterAddress ra = toRouterInfo.getTargetAddress(_transport.getStyle());
    172222        if (ra == null) {
    173223            _transport.failed(msg, "Remote peer has no address, cannot establish");
    174224            return;
    175225        }
    176         if (msg.getTarget().getNetworkId() != Router.NETWORK_ID) {
    177             _context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash());
    178             _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
     226        RouterIdentity toIdentity = toRouterInfo.getIdentity();
     227        Hash toHash = toIdentity.calculateHash();
     228        if (toRouterInfo.getNetworkId() != Router.NETWORK_ID) {
     229            _context.shitlist().shitlistRouter(toHash);
     230            _transport.markUnreachable(toHash);
    179231            _transport.failed(msg, "Remote peer is on the wrong network, cannot establish");
    180232            return;
    181233        }
    182234        UDPAddress addr = new UDPAddress(ra);
    183         RemoteHostId to = null;
     235        RemoteHostId maybeTo = null;
    184236        InetAddress remAddr = addr.getHostAddress();
    185237        int port = addr.getPort();
     238
     239        // check for validity and existing inbound state, using the
     240        // claimed address (which we won't be using if indirect)
    186241        if (remAddr != null && port > 0 && port <= 65535) {
    187             to = new RemoteHostId(remAddr.getAddress(), port);
    188 
    189             if (!_transport.isValid(to.getIP())) {
     242            maybeTo = new RemoteHostId(remAddr.getAddress(), port);
     243
     244            if (!_transport.isValid(maybeTo.getIP())) {
    190245                _transport.failed(msg, "Remote peer's IP isn't valid");
    191                 _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
     246                _transport.markUnreachable(toHash);
    192247                //_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE);
    193248                return;
    194249            }
    195250
    196             InboundEstablishState inState = _inboundStates.get(to);
     251            InboundEstablishState inState = _inboundStates.get(maybeTo);
    197252            if (inState != null) {
    198253                // we have an inbound establishment in progress, queue it there instead
     
    223278                return;
    224279            }
    225 
    226 
    227 
     280        }
     281
     282        RemoteHostId to;
     283        boolean isIndirect = addr.getIntroducerCount() > 0 || maybeTo == null;
     284        if (isIndirect) {
     285            to = new RemoteHostId(toHash);
    228286        } else {
    229             if (_log.shouldLog(Log.DEBUG))
    230                 _log.debug("Add indirect outbound establish state to: " + addr);
    231             to = new RemoteHostId(msg.getTarget().getIdentity().calculateHash().getData());
     287            to = maybeTo;
    232288        }
    233289       
     
    239295            state = _outboundStates.get(to);
    240296            if (state == null) {
    241                 if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
     297                state = _outboundByHash.get(toHash);
     298                if (state != null && _log.shouldLog(Log.INFO))
     299                    _log.info("Found by hash: " + state);
     300            }
     301            if (state == null) {
     302                if (queueIfMaxExceeded && _outboundStates.size() >= getMaxConcurrentEstablish()) {
    242303                    if (_queuedOutbound.size() >= MAX_QUEUED_OUTBOUND) {
    243304                        rejected = true;
     
    266327                    byte[] keyBytes = addr.getIntroKey();
    267328                    if (keyBytes == null) {
    268                         _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
     329                        _transport.markUnreachable(toHash);
    269330                        _transport.failed(msg, "Peer has no key, cannot establish");
    270331                        return;
     
    274335                        sessionKey = new SessionKey(keyBytes);
    275336                    } catch (IllegalArgumentException iae) {
    276                         _transport.markUnreachable(msg.getTarget().getIdentity().calculateHash());
     337                        _transport.markUnreachable(toHash);
    277338                        _transport.failed(msg, "Peer has bad key, cannot establish");
    278339                        return;
    279340                    }
    280                     state = new OutboundEstablishState(_context, remAddr, port,
    281                                                        msg.getTarget().getIdentity(),
     341                    state = new OutboundEstablishState(_context, maybeTo, to,
     342                                                       toIdentity,
    282343                                                       sessionKey, addr, _transport.getDHBuilder());
    283344                    OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
    284345                    boolean isNew = oldState == null;
    285346                    if (isNew) {
     347                        if (isIndirect && maybeTo != null)
     348                            _outboundByClaimedAddress.put(maybeTo, state);
    286349                        if (_log.shouldLog(Log.DEBUG))
    287350                            _log.debug("Adding new " + state);
     
    365428                if (!_transport.allowConnection())
    366429                    return; // drop the packet
    367                 state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort(),
     430                state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getExternalPort(),
    368431                                                  _transport.getDHBuilder());
    369432                state.receiveSessionRequest(reader.getSessionRequestReader());
     
    388451                    _log.info("Received NEW session request from " + from + ", sending relay tag " + tag);
    389452            } else {
     453                // we got an IB even though we were firewalled, hidden, not high cap, etc.
    390454                if (_log.shouldLog(Log.INFO))
    391455                    _log.info("Received session request, but our status is " + _transport.getReachabilityStatus());
     
    393457        } else {
    394458            if (_log.shouldLog(Log.DEBUG))
    395                 _log.debug("Receive DUP session request from: " + state.getRemoteHostId());
     459                _log.debug("Receive DUP session request from: " + state);
    396460        }
    397461       
     
    409473            notifyActivity();
    410474            if (_log.shouldLog(Log.DEBUG))
    411                 _log.debug("Receive session confirmed from: " + state.getRemoteHostId().toString());
     475                _log.debug("Receive session confirmed from: " + state);
    412476        } else {
    413477            if (_log.shouldLog(Log.WARN))
     
    426490            notifyActivity();
    427491            if (_log.shouldLog(Log.DEBUG))
    428                 _log.debug("Receive session created from: " + state.getRemoteHostId().toString());
     492                _log.debug("Receive session created from: " + state);
    429493        } else {
    430494            if (_log.shouldLog(Log.WARN))
     
    477541     * means its complete (yay!).  This is a blocking call, more than I'd like...
    478542     *
     543     * @return the new PeerState
    479544     */
    480545    PeerState receiveData(OutboundEstablishState state) {
     
    542607                continue;
    543608           
    544             OutNetMessage msg = queued.get(0);
    545             RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
    546             if (ra == null) {
    547                 for (int i = 0; i < queued.size(); i++)
    548                     _transport.failed(queued.get(i), "Cannot admit to the queue, as it has no address");
    549                 continue;
    550             }
    551             UDPAddress addr = new UDPAddress(ra);
    552             InetAddress remAddr = addr.getHostAddress();
    553             int port = addr.getPort();
    554 
    555             OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port,
    556                                                msg.getTarget().getIdentity(),
    557                                                new SessionKey(addr.getIntroKey()), addr,
    558                                                _transport.getDHBuilder());
    559             OutboundEstablishState old = _outboundStates.putIfAbsent(to, qstate);
    560             if (old != null)
    561                 qstate = old;
    562 
    563             for (int i = 0; i < queued.size(); i++) {
    564                 OutNetMessage m = queued.get(i);
     609            for (OutNetMessage m : queued) {
    565610                m.timestamp("no longer deferred... establishing");
    566                 qstate.addMessage(m);
     611                establish(m, false);
    567612            }
    568613            admitted++;
     
    612657       
    613658        if (_log.shouldLog(Log.DEBUG))
    614             _log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString()
     659            _log.debug("Handle completely established (inbound): " + state
    615660                       + " - " + peer.getRemotePeer());
    616661       
     
    682727     * queued messages
    683728     *
     729     * @return the new PeerState
    684730     */
    685731    private PeerState handleCompletelyEstablished(OutboundEstablishState state) {
     
    692738        long now = _context.clock().now();
    693739        RouterIdentity remote = state.getRemoteIdentity();
     740        // only if == state
     741        RemoteHostId claimed = state.getClaimedAddress();
     742        if (claimed != null)
     743            _outboundByClaimedAddress.remove(claimed, state);
     744        _outboundByHash.remove(remote.calculateHash(), state);
    694745        PeerState peer = new PeerState(_context, _transport,
    695746                                       state.getSentIP(), state.getSentPort(), remote.calculateHash(), false);
     
    704755       
    705756        if (_log.shouldLog(Log.DEBUG))
    706             _log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString()
     757            _log.debug("Handle completely established (outbound): " + state
    707758                       + " - " + peer.getRemotePeer());
    708759       
     
    765816       
    766817        if (_log.shouldLog(Log.DEBUG))
    767             _log.debug("Send created to: " + state.getRemoteHostId().toString());
     818            _log.debug("Send created to: " + state);
    768819       
    769820        try {
     
    771822        } catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) {
    772823            if (_log.shouldLog(Log.ERROR))
    773                 _log.error("Peer " + state.getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe);
     824                _log.error("Peer " + state + " sent us an invalid DH parameter (or were spoofed)", ippe);
    774825            _inboundStates.remove(state.getRemoteHostId());
    775826            return;
     
    784835    private void sendRequest(OutboundEstablishState state) {
    785836        if (_log.shouldLog(Log.DEBUG))
    786             _log.debug("Send SessionRequest to: " + state.getRemoteHostId());
     837            _log.debug("Send SessionRequest to: " + state);
    787838        UDPPacket packet = _builder.buildSessionRequestPacket(state);
    788839        if (packet != null) {
     
    790841        } else {
    791842            if (_log.shouldLog(Log.WARN))
    792                 _log.warn("Unable to build a session request packet for " + state.getRemoteHostId());
     843                _log.warn("Unable to build a session request packet for " + state);
    793844        }
    794845        state.requestSent();
     
    817868        }
    818869        if (_log.shouldLog(Log.DEBUG))
    819             _log.debug("Send intro for " + state.getRemoteHostId() + " with our intro key as " + _transport.getIntroKey());
     870            _log.debug("Send intro for " + state + " with our intro key as " + _transport.getIntroKey());
    820871        state.introSent();
    821872    }
     
    824875        long nonce = reader.getRelayResponseReader().readNonce();
    825876        OutboundEstablishState state = _liveIntroductions.remove(Long.valueOf(nonce));
    826         if (state == null)
     877        if (state == null) {
     878            if (_log.shouldLog(Log.INFO))
     879                _log.info("Dup or unknown RelayResponse: " + nonce);
    827880            return; // already established
     881        }
    828882       
    829883        int sz = reader.getRelayResponseReader().readCharlieIPSize();
    830884        byte ip[] = new byte[sz];
    831885        reader.getRelayResponseReader().readCharlieIP(ip, 0);
     886        int port = reader.getRelayResponseReader().readCharliePort();
    832887        InetAddress addr = null;
    833888        try {
     889            if (!_transport.isValid(ip))
     890                throw new UnknownHostException("non-public IP");
     891            if (port <= 0 || port > 65535)
     892                throw new UnknownHostException("bad port " + port);
    834893            addr = InetAddress.getByAddress(ip);
    835894        } catch (UnknownHostException uhe) {
    836895            if (_log.shouldLog(Log.WARN))
    837                 _log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid IP for our target: " + Addresses.toString(ip), uhe);
     896                _log.warn("Introducer for " + state + " (" + bob + ") sent us an invalid address for our target: " + Addresses.toString(ip, port), uhe);
    838897            // these two cause this peer to requeue for a new intro peer
    839898            state.introductionFailed();
     
    842901        }
    843902        _context.statManager().addRateData("udp.receiveIntroRelayResponse", state.getLifetime(), 0);
    844         int port = reader.getRelayResponseReader().readCharliePort();
    845903        if (_log.shouldLog(Log.INFO))
    846904            _log.info("Received RelayResponse for " + state.getRemoteIdentity().calculateHash() + " - they are on "
    847                       + addr.toString() + ":" + port + " (according to " + bob + ")");
     905                      + addr.toString() + ":" + port + " (according to " + bob + ") nonce=" + nonce);
    848906        synchronized (state) {
    849907            RemoteHostId oldId = state.getRemoteHostId();
    850             state.introduced(addr, ip, port);
     908            state.introduced(ip, port);
    851909            RemoteHostId newId = state.getRemoteHostId();
    852             // Swap out the RemoteHostId the state is indexed under
    853             // TODO only if !oldId.equals(newId) ? synch?
    854             // FIXME if the RemoteHostIDs aren't the same we have problems
    855             // FIXME if the RemoteHostIDs aren't the same the SessionCreated signature is probably going to fail
    856             // Common occurrence - port changes
     910            // Swap out the RemoteHostId the state is indexed under.
     911            // It was a Hash, change it to a IP/port.
     912            // Remove the entry in the byClaimedAddress map as it's now in main map.
     913            // Add an entry in the byHash map so additional OB pkts can find it.
     914            _outboundByHash.put(state.getRemoteIdentity().calculateHash(), state);
     915            RemoteHostId claimed = state.getClaimedAddress();
    857916            if (!oldId.equals(newId)) {
    858917                _outboundStates.remove(oldId);
    859918                _outboundStates.put(newId, state);
    860919                if (_log.shouldLog(Log.WARN))
    861                     _log.warn("RR replaced " + oldId + " with " + newId + " -> " + state);
    862             }
     920                    _log.warn("RR replaced " + oldId + " with " + newId + ", claimed address was " + claimed);
     921            }
     922            //
     923            if (claimed != null)
     924                _outboundByClaimedAddress.remove(oldId, state);  // only if == state
    863925        }
    864926        notifyActivity();
     
    873935    private void sendConfirmation(OutboundEstablishState state) {
    874936        boolean valid = state.validateSessionCreated();
    875         if (!valid) // validate clears fields on failure
     937        if (!valid) {
     938            // validate clears fields on failure
     939            // TODO - send destroy? shitlist?
     940            if (_log.shouldLog(Log.WARN))
     941                _log.warn("SessionCreated validate failed: " + state);
    876942            return;
     943        }
    877944       
    878945        if (!_transport.isValid(state.getReceivedIP()) || !_transport.isValid(state.getRemoteHostId().getIP())) {
     
    9371004    /**
    9381005     * Drive through the inbound establishment states, adjusting one of them
    939      * as necessary
     1006     * as necessary. Called from Establisher thread only.
    9401007     * @return next requested time or -1
    9411008     */
     
    10541121    /**
    10551122     * Drive through the outbound establishment states, adjusting one of them
    1056      * as necessary
     1123     * as necessary. Called from Establisher thread only.
    10571124     * @return next requested time or -1
    10581125     */
     
    10741141                        _log.debug("Removing confirmed outbound: " + cur);
    10751142                    break;
    1076                 } else if (cur.getLifetime() > MAX_OB_ESTABLISH_TIME) {
     1143                } else if (cur.getLifetime() >= MAX_OB_ESTABLISH_TIME) {
    10771144                    // took too long
    10781145                    iter.remove();
     
    11171184            //    _log.debug("Processing for outbound: " + outboundState);
    11181185            synchronized (outboundState) {
    1119                 boolean expired = outboundState.getLifetime() > MAX_OB_ESTABLISH_TIME;
     1186                boolean expired = outboundState.getLifetime() >= MAX_OB_ESTABLISH_TIME;
    11201187                switch (outboundState.getState()) {
    11211188                    case OB_STATE_UNKNOWN:  // fall thru
     
    11301197                        // no response yet (or it was invalid), lets retry
    11311198                        long rtime = outboundState.getRequestSentTime();
    1132                         if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT < now))
     1199                        if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT <= now))
    11331200                            processExpired(outboundState);
    11341201                        else if (outboundState.getNextSendTime() <= now)
     
    11451212                    case OB_STATE_CONFIRMED_PARTIALLY:
    11461213                        long ctime = outboundState.getConfirmedSentTime();
    1147                         if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT < now)) {
     1214                        if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT <= now)) {
    11481215                            sendDestroy(outboundState);
    11491216                            processExpired(outboundState);
     
    11621229                    case OB_STATE_PENDING_INTRO:
    11631230                        long itime = outboundState.getIntroSentTime();
    1164                         if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT < now))
     1231                        if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT <= now))
    11651232                            processExpired(outboundState);
    11661233                        else if (outboundState.getNextSendTime() <= now)
     
    11871254            if (removed) {
    11881255                if (_log.shouldLog(Log.DEBUG))
    1189                     _log.debug("Send intro for " + outboundState.getRemoteHostId() + " timed out");
     1256                    _log.debug("Send intro for " + outboundState + " timed out");
    11901257                _context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0);
    11911258            }
    11921259        }
     1260        // only if == state
     1261        RemoteHostId claimed = outboundState.getClaimedAddress();
     1262        if (claimed != null)
     1263            _outboundByClaimedAddress.remove(claimed, outboundState);
     1264        _outboundByHash.remove(outboundState.getRemoteIdentity().calculateHash(), outboundState);
    11931265        // should have already been removed in handleOutbound() above
    11941266        // remove only if value == state
     
    11961268        if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) {
    11971269            if (_log.shouldLog(Log.INFO))
    1198                 _log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
     1270                _log.info("Expired: " + outboundState + " Lifetime: " + outboundState.getLifetime());
    11991271            OutNetMessage msg;
    12001272            while ((msg = outboundState.getNextQueuedMessage()) != null) {
     
    12451317            _outboundStates.clear();
    12461318            _queuedOutbound.clear();
    1247             _liveIntroductions.clear();
    1248         }
    1249     }
    1250 
    1251     // Debugging
    1252     private long _lastPrinted;
    1253     private static final long PRINT_INTERVAL = 5*1000;
    1254    
    1255     private void doPass() {
    1256         if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) {
    1257             _lastPrinted = _context.clock().now();
    1258             int iactive = _inboundStates.size();
    1259             int oactive = _outboundStates.size();
    1260             if (iactive > 0 || oactive > 0) {
    1261                 int queued = _queuedOutbound.size();
    1262                 int live = _liveIntroductions.size();
    1263                 _log.debug("OB states: " + oactive + " IB states: " + iactive +
    1264                            " OB queued: " + queued + " intros: " + live);
    1265             }
    1266         }
    1267         _activity = 0;
    1268         long now = _context.clock().now();
    1269         long nextSendTime = -1;
    1270         long nextSendInbound = handleInbound();
    1271         long nextSendOutbound = handleOutbound();
    1272         if (nextSendInbound > 0)
    1273             nextSendTime = nextSendInbound;
    1274         if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) )
    1275             nextSendTime = nextSendOutbound;
    1276 
    1277         long delay = nextSendTime - now;
    1278         if ( (nextSendTime == -1) || (delay > 0) ) {
    1279             if (delay > 1000)
    1280                 delay = 1000;
    1281             try {
    1282                 synchronized (_activityLock) {
    1283                     if (_activity > 0)
    1284                         return;
    1285                     if (nextSendTime == -1)
    1286                         _activityLock.wait(1000);
    1287                     else
    1288                         _activityLock.wait(delay);
    1289                 }
    1290             } catch (InterruptedException ie) {
    1291             }
    1292             // if (_log.shouldLog(Log.DEBUG))
    1293             //     _log.debug("After waiting w/ nextSend=" + nextSendTime
    1294             //                + " and delay=" + delay + " and interrupted=" + interrupted);
     1319            _outboundByClaimedAddress.clear();
     1320            _outboundByHash.clear();
     1321        }
     1322
     1323        private long _lastFailsafe;
     1324        private static final long FAILSAFE_INTERVAL = 3*60*1000;
     1325        // Debugging
     1326        private long _lastPrinted;
     1327        private static final long PRINT_INTERVAL = 5*1000;
     1328   
     1329        private void doPass() {
     1330            if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) {
     1331                _lastPrinted = _context.clock().now();
     1332                int iactive = _inboundStates.size();
     1333                int oactive = _outboundStates.size();
     1334                if (iactive > 0 || oactive > 0) {
     1335                    int queued = _queuedOutbound.size();
     1336                    int live = _liveIntroductions.size();
     1337                    int claimed = _outboundByClaimedAddress.size();
     1338                    int hash = _outboundByHash.size();
     1339                    _log.debug("OB states: " + oactive + " IB states: " + iactive +
     1340                               " OB queued: " + queued + " intros: " + live +
     1341                               " OB claimed: " + claimed + " hash: " + hash);
     1342                }
     1343            }
     1344            _activity = 0;
     1345            long now = _context.clock().now();
     1346            if (_lastFailsafe + FAILSAFE_INTERVAL < _context.clock().now()) {
     1347                _lastFailsafe = _context.clock().now();
     1348                doFailsafe();
     1349            }
     1350            long nextSendTime = -1;
     1351            long nextSendInbound = handleInbound();
     1352            long nextSendOutbound = handleOutbound();
     1353            if (nextSendInbound > 0)
     1354                nextSendTime = nextSendInbound;
     1355            if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) )
     1356                nextSendTime = nextSendOutbound;
     1357
     1358            long delay = nextSendTime - now;
     1359            if ( (nextSendTime == -1) || (delay > 0) ) {
     1360                if (delay > 1000)
     1361                    delay = 1000;
     1362                try {
     1363                    synchronized (_activityLock) {
     1364                        if (_activity > 0)
     1365                            return;
     1366                        if (nextSendTime == -1)
     1367                           _activityLock.wait(1000);
     1368                        else
     1369                            _activityLock.wait(delay);
     1370                    }
     1371                } catch (InterruptedException ie) {
     1372                }
     1373                // if (_log.shouldLog(Log.DEBUG))
     1374                //     _log.debug("After waiting w/ nextSend=" + nextSendTime
     1375                //                + " and delay=" + delay + " and interrupted=" + interrupted);
     1376            }
     1377        }
     1378
     1379        /** @since 0.9.2 */
     1380        private void doFailsafe() {
     1381            for (Iterator<OutboundEstablishState> iter = _liveIntroductions.values().iterator(); iter.hasNext(); ) {
     1382                OutboundEstablishState state = iter.next();
     1383                if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) {
     1384                    iter.remove();
     1385                    if (_log.shouldLog(Log.WARN))
     1386                        _log.warn("Failsafe remove LI " + state);
     1387                }
     1388            }
     1389            for (Iterator<OutboundEstablishState> iter = _outboundByClaimedAddress.values().iterator(); iter.hasNext(); ) {
     1390                OutboundEstablishState state = iter.next();
     1391                if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) {
     1392                    iter.remove();
     1393                    if (_log.shouldLog(Log.WARN))
     1394                        _log.warn("Failsafe remove OBBCA " + state);
     1395                }
     1396            }
     1397            for (Iterator<OutboundEstablishState> iter = _outboundByHash.values().iterator(); iter.hasNext(); ) {
     1398                OutboundEstablishState state = iter.next();
     1399                if (state.getLifetime() > 3*MAX_OB_ESTABLISH_TIME) {
     1400                    iter.remove();
     1401                    if (_log.shouldLog(Log.WARN))
     1402                        _log.warn("Failsafe remove OBBH " + state);
     1403                }
     1404            }
    12951405        }
    12961406    }
  • router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java

    rfbd8c69 r612fab1  
    8484    private static final long MAX_DELAY = 15*1000;
    8585
     86    /**
     87     *  @param localPort Must be our external port, otherwise the signature of the
     88     &                   SessionCreated message will be bad if the external port != the internal port.
     89     */
    8690    public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort,
    8791                                 DHSessionKeyBuilder dh) {
  • router/java/src/net/i2p/router/transport/udp/IntroductionManager.java

    rfbd8c69 r612fab1  
    1313import net.i2p.data.SessionKey;
    1414import net.i2p.router.RouterContext;
     15import net.i2p.router.transport.TransportImpl;
     16import net.i2p.util.Addresses;
    1517import net.i2p.util.ConcurrentHashSet;
    1618import net.i2p.util.Log;
     
    142144                continue;
    143145            }
     146            byte[] ip = cur.getRemoteIP();
     147            int port = cur.getRemotePort();
     148            if (ip == null || !TransportImpl.isPubliclyRoutable(ip) || port <= 0 || port > 65535)
     149                continue;
    144150            if (_log.shouldLog(Log.INFO))
    145151                _log.info("Picking introducer: " + cur);
    146152            cur.setIntroducerTime();
    147153            UDPAddress ura = new UDPAddress(ra);
    148             ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, cur.getRemoteHostId().toHostString());
    149             ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(cur.getRemotePort()));
     154            ssuOptions.setProperty(UDPAddress.PROP_INTRO_HOST_PREFIX + found, Addresses.toString(ip));
     155            ssuOptions.setProperty(UDPAddress.PROP_INTRO_PORT_PREFIX + found, String.valueOf(port));
    150156            ssuOptions.setProperty(UDPAddress.PROP_INTRO_KEY_PREFIX + found, Base64.encode(ura.getIntroKey()));
    151157            ssuOptions.setProperty(UDPAddress.PROP_INTRO_TAG_PREFIX + found, String.valueOf(cur.getTheyRelayToUsAs()));
  • router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java

    rfbd8c69 r612fab1  
    5151    private long _nextSend;
    5252    private RemoteHostId _remoteHostId;
     53    private final RemoteHostId _claimedAddress;
    5354    private final RouterIdentity _remotePeer;
    5455    private final SessionKey _introKey;
     
    9293
    9394    /**
     95     *  @param claimedAddress an IP/port based RemoteHostId, or null if unknown
     96     *  @param remoteHostId non-null, == claimedAddress if direct, or a hash-based one if indirect
    9497     *  @param addr non-null
    9598     */
    96     public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort,
     99    public OutboundEstablishState(RouterContext ctx, RemoteHostId claimedAddress,
     100                                  RemoteHostId remoteHostId,
    97101                                  RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr,
    98102                                  DHSessionKeyBuilder dh) {
    99103        _context = ctx;
    100104        _log = ctx.logManager().getLog(OutboundEstablishState.class);
    101         if ( (remoteHost != null) && (remotePort > 0) ) {
    102             _bobIP = remoteHost.getAddress();
    103             _bobPort = remotePort;
    104             _remoteHostId = new RemoteHostId(_bobIP, _bobPort);
    105         } else {
    106             _bobIP = null;
     105        if (claimedAddress != null) {
     106            _bobIP = claimedAddress.getIP();
     107            _bobPort = claimedAddress.getPort();
     108        } else {
     109            //_bobIP = null;
    107110            _bobPort = -1;
    108             _remoteHostId = new RemoteHostId(remotePeer.calculateHash().getData());
    109         }
     111        }
     112        _claimedAddress = claimedAddress;
     113        _remoteHostId = remoteHostId;
    110114        _remotePeer = remotePeer;
    111115        _introKey = introKey;
     
    174178
    175179    public byte[] getSentX() { return _sentX; }
    176     /** the remote side (Bob) */
     180
     181    /**
     182     * The remote side (Bob) - note that in some places he's called Charlie.
     183     * Warning - may change after introduction. May be null before introduction.
     184     */
    177185    public synchronized byte[] getSentIP() { return _bobIP; }
    178     /** the remote side (Bob) */
     186
     187    /**
     188     * The remote side (Bob) - note that in some places he's called Charlie.
     189     * Warning - may change after introduction. May be -1 before introduction.
     190     */
    179191    public synchronized int getSentPort() { return _bobPort; }
    180192
     
    410422            _confirmedSentTime = _lastSend;
    411423        } else {
    412             delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount, MAX_DELAY);
     424            delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount,
     425                             _confirmedSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend);
    413426        }
    414427        _confirmedSentCount++;
     
    438451            _requestSentTime = _lastSend;
    439452        } else {
    440             delay = Math.min(RETRANSMIT_DELAY << _requestSentCount, MAX_DELAY);
     453            delay = Math.min(RETRANSMIT_DELAY << _requestSentCount,
     454                             _requestSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend);
    441455        }
    442456        _requestSentCount++;
     
    464478            _introSentTime = _lastSend;
    465479        } else {
    466             delay = Math.min(RETRANSMIT_DELAY << _introSentCount, MAX_DELAY);
     480            delay = Math.min(RETRANSMIT_DELAY << _introSentCount,
     481                             _introSentTime + EstablishmentManager.OB_MESSAGE_TIMEOUT - _lastSend);
    467482        }
    468483        _introSentCount++;
     
    485500   
    486501    /**
    487      *  This changes the remoteHostId from a hash-based one to a IP/Port one,
    488      *  OR the IP or port could change.
    489      */
    490     public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) {
     502     *  This changes the remoteHostId from a hash-based one or possibly
     503     *  incorrect IP/port to what the introducer told us.
     504     *  All params are for the remote end (NOT the introducer) and must have been validated already.
     505     */
     506    public synchronized void introduced(byte bobIP[], int bobPort) {
    491507        if (_currentState != OutboundState.OB_STATE_PENDING_INTRO)
    492508            return; // we've already successfully been introduced, so don't overwrite old settings
    493509        _nextSend = _context.clock().now() + 500; // wait briefly for the hole punching
    494510        _currentState = OutboundState.OB_STATE_INTRODUCED;
    495         _bobIP = bobIP;
    496         _bobPort = bobPort;
    497         _remoteHostId = new RemoteHostId(bobIP, bobPort);
     511        if (_claimedAddress != null && bobPort == _bobPort && DataHelper.eq(bobIP, _bobIP)) {
     512            // he's who he said he was
     513            _remoteHostId = _claimedAddress;
     514        } else {
     515            // no IP/port or wrong IP/port in RI
     516            _bobIP = bobIP;
     517            _bobPort = bobPort;
     518            _remoteHostId = new RemoteHostId(bobIP, bobPort);
     519        }
    498520        if (_log.shouldLog(Log.INFO))
    499521            _log.info("Introduced to " + _remoteHostId + ", now lets get on with establishing");
     
    505527    public synchronized long getNextSendTime() { return _nextSend; }
    506528
    507     /** uniquely identifies an attempt */
     529    /**
     530     *  This should be what the state is currently indexed by in the _outboundStates table.
     531     *  Beware -
     532     *  During introduction, this is a router hash.
     533     *  After introduced() is called, this is set to the IP/port the introducer told us.
     534     *  @return non-null
     535     */
    508536    RemoteHostId getRemoteHostId() { return _remoteHostId; }
     537
     538    /**
     539     *  This will never be a hash-based address.
     540     *  This is the 'claimed' (unverified) address from the netdb, or null.
     541     *  It is not changed after introduction. Use getRemoteHostId() for the verified address.
     542     *  @return may be null
     543     */
     544    RemoteHostId getClaimedAddress() { return _claimedAddress; }
    509545
    510546    /** we have received a real data packet, so we're done establishing */
  • router/java/src/net/i2p/router/transport/udp/RemoteHostId.java

    rfbd8c69 r612fab1  
    33import net.i2p.data.Base64;
    44import net.i2p.data.DataHelper;
     5import net.i2p.data.Hash;
    56import net.i2p.util.Addresses;
    67
     
    1415    private final byte _ip[];
    1516    private final int _port;
    16     private final byte _peerHash[];
     17    private final Hash _peerHash;
    1718    private final int _hashCode;
    1819   
     
    2324
    2425    /** indirect */
    25     public RemoteHostId(byte peerHash[]) {
     26    public RemoteHostId(Hash peerHash) {
    2627        this(null, 0, peerHash);
    2728    }
    2829   
    29     private RemoteHostId(byte ip[], int port, byte peerHash[]) {
     30    private RemoteHostId(byte ip[], int port, Hash peerHash) {
    3031        _ip = ip;
    3132        _port = port;
     
    4142
    4243    /** @return null if direct */
    43     public byte[] getPeerHash() { return _peerHash; }
     44    public Hash getPeerHash() { return _peerHash; }
    4445   
    4546    @Override
     
    5960   
    6061    @Override
    61     public String toString() { return toString(true); }
    62 
    63     private String toString(boolean includePort) {
     62    public String toString() {
    6463        if (_ip != null) {
    65             if (includePort)
    66                 return Addresses.toString(_ip, _port);
    67             else
    68                 return Addresses.toString(_ip);
     64            return Addresses.toString(_ip, _port);
    6965        } else {
    70             return Base64.encode(_peerHash);
     66            return _peerHash.toString();
    7167        }
    7268    }
    73 
    74     public String toHostString() { return toString(false); }
    7569}
  • router/java/src/net/i2p/router/transport/udp/UDPAddress.java

    rfbd8c69 r612fab1  
    4040
    4141    public UDPAddress(RouterAddress addr) {
    42         parse(addr);
    43     }
    44    
    45     @Override
    46     public String toString() {
    47         StringBuilder rv = new StringBuilder(64);
    48         if (_introHosts != null) {
    49             for (int i = 0; i < _introHosts.length; i++) {
    50                 rv.append("ssu://");
    51                 rv.append(_introTags[i]).append('@');
    52                 rv.append(_introHosts[i]).append(':').append(_introPorts[i]);
    53                 //rv.append('/').append(Base64.encode(_introKeys[i]));
    54                 if (i + 1 < _introKeys.length)
    55                     rv.append(", ");
    56             }
    57         } else {
    58             if ( (_host != null) && (_port > 0) )
    59                 rv.append("ssu://").append(_host).append(':').append(_port);//.append('/').append(Base64.encode(_introKey));
    60             else
    61                 rv.append("ssu://autodetect.not.yet.complete:").append(_port);
    62         }
    63         return rv.toString();
    64     }
    65    
    66     private void parse(RouterAddress addr) {
     42        // TODO make everything final
    6743        if (addr == null) return;
    6844        _host = addr.getOption(PROP_HOST);
     
    199175        return _mtu;
    200176    }
     177   
     178    @Override
     179    public String toString() {
     180        StringBuilder rv = new StringBuilder(64);
     181        if (_introHosts != null) {
     182            for (int i = 0; i < _introHosts.length; i++) {
     183                rv.append("ssu://");
     184                rv.append(_introTags[i]).append('@');
     185                rv.append(_introHosts[i]).append(':').append(_introPorts[i]);
     186                //rv.append('/').append(Base64.encode(_introKeys[i]));
     187                if (i + 1 < _introKeys.length)
     188                    rv.append(", ");
     189            }
     190        } else {
     191            if ( (_host != null) && (_port > 0) )
     192                rv.append("ssu://").append(_host).append(':').append(_port);//.append('/').append(Base64.encode(_introKey));
     193            else
     194                rv.append("ssu://autodetect.not.yet.complete:").append(_port);
     195        }
     196        return rv.toString();
     197    }
    201198}
  • router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java

    rfbd8c69 r612fab1  
    491491     */   
    492492    private class PacketACKBitfield implements ACKBitfield {
    493         private int _start;
    494         private int _bitfieldStart;
    495         private int _bitfieldSize;
     493        private final int _start;
     494        private final int _bitfieldStart;
     495        private final int _bitfieldSize;
     496
    496497        public PacketACKBitfield(int start) {
    497498            _start = start;
    498499            _bitfieldStart = start + 4;
    499             _bitfieldSize = 1;
     500            int bfsz = 1;
    500501            // bitfield is an array of bytes where the high bit is 1 if
    501502            // further bytes in the bitfield follow
    502             while ((_message[_bitfieldStart + _bitfieldSize - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0)
    503                 _bitfieldSize++;
    504         }
     503            while ((_message[_bitfieldStart + bfsz - 1] & UDPPacket.BITFIELD_CONTINUATION) != 0x0)
     504                bfsz++;
     505            _bitfieldSize = bfsz;
     506        }
     507
    505508        public long getMessageId() { return DataHelper.fromLong(_message, _start, 4); }
    506509        public int getByteLength() { return 4 + _bitfieldSize; }
    507510        public int fragmentCount() { return _bitfieldSize * 7; }
    508511        public boolean receivedComplete() { return false; }
     512
    509513        public boolean received(int fragmentNum) {
    510514            if ( (fragmentNum < 0) || (fragmentNum >= _bitfieldSize*7) )
     
    515519            return (_message[byteNum] & (1 << flagNum)) != 0x0;
    516520        }
     521
    517522        @Override
    518523        public String toString() {
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    rfbd8c69 r612fab1  
    690690     *  Now false if we need introducers (as perhaps that's why we need them,
    691691     *  our firewall is changing our port), unless overridden by the property.
     692     *  We must have an accurate external port when firewalled, or else
     693     *  our signature of the SessionCreated packet will be invalid.
    692694     */
    693695    private boolean getIsPortFixed() {
     
    798800        RemoteHostId remoteId = peer.getRemoteHostId();
    799801        if (remoteId == null) return false;
     802        // Should always be direct... except maybe for hidden mode?
     803        // or do we always know the IP by now?
    800804        if (remoteId.getIP() == null && _log.shouldLog(Log.WARN))
    801805            _log.warn("Add indirect: " + peer);
     
    11151119            return;
    11161120        UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer);
    1117         if (_log.shouldLog(Log.WARN))
    1118             _log.warn("Sending destroy to : " + peer);
     1121        if (_log.shouldLog(Log.DEBUG))
     1122            _log.debug("Sending destroy to : " + peer);
    11191123        send(pkt);
    11201124    }
Note: See TracChangeset for help on using the changeset viewer.