Changeset 3a54661


Ignore:
Timestamp:
Aug 17, 2012 2:15:01 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
264df83
Parents:
3cac01ff
Message:
  • SSU:
    • Use remote MTU when published (ticket #687)
    • Queue outbound msgs during inbound establish
    • IntroManager? cleanups
    • More synchronization
    • More log tweaks
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    r3cac01ff r3a54661  
     12012-08-17 zzz
     2 * i2psnark:
     3   - Adjust DHT timeouts
     4   - Add max peers per-torrent in tracker
     5   - Remove duplicate clean task for nodes
     6   - Fix another DHT warning message
     7 * SSU:
     8   - Use remote MTU when published (ticket #687)
     9   - Queue outbound msgs during inbound establish
     10   - IntroManager cleanups
     11   - More synchronization
     12
    1132012-08-17 sponge
    214 * BOB: just some cleanup of old, dead meaningless commentedout code
    315        and a little reformatting.
     16
     172012-08-16 zzz
     18 * Utils: Drop unused BufferedRandomSource, PooledRandomSource,
     19   EepGetScheduler, EepPost and HTTPSendData, moved to i2p.scripts
    420
    5212012-08-15 zzz
  • router/java/src/net/i2p/router/RouterVersion.java

    r3cac01ff r3a54661  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 11;
     21    public final static long BUILD = 12;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    r3cac01ff r3a54661  
    1313import net.i2p.data.RouterAddress;
    1414import net.i2p.data.RouterIdentity;
     15import net.i2p.data.RouterInfo;
    1516import net.i2p.data.SessionKey;
    1617import net.i2p.data.i2np.DatabaseStoreMessage;
     
    192193                return;
    193194            }
    194            
    195             if (_log.shouldLog(Log.DEBUG))
    196                 _log.debug("Add outbound establish state to: " + to);
     195
     196            InboundEstablishState inState = _inboundStates.get(to);
     197            if (inState != null) {
     198                // we have an inbound establishment in progress, queue it there instead
     199                synchronized (inState) {
     200                    switch (inState.getState()) {
     201                      case IB_STATE_UNKNOWN:
     202                      case IB_STATE_REQUEST_RECEIVED:
     203                      case IB_STATE_CREATED_SENT:
     204                      case IB_STATE_CONFIRMED_PARTIALLY:
     205                      case IB_STATE_CONFIRMED_COMPLETELY:
     206                        // queue it
     207                        inState.addMessage(msg);
     208                        if (_log.shouldLog(Log.WARN))
     209                            _log.debug("OB msg queued to IES");
     210                        break;
     211
     212                      case IB_STATE_COMPLETE:
     213                        // race, send it out (but don't call _transport.send() again and risk a loop)
     214                        _transport.sendIfEstablished(msg);
     215                        break;
     216
     217                      case IB_STATE_FAILED:
     218                        // race, failed
     219                        _transport.failed(msg, "OB msg failed during IB establish");
     220                        break;
     221                    }
     222                }
     223                return;
     224            }
     225
     226
     227
    197228        } else {
    198229            if (_log.shouldLog(Log.DEBUG))
     
    252283                    OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
    253284                    boolean isNew = oldState == null;
    254                     if (!isNew)
     285                    if (isNew) {
     286                        if (_log.shouldLog(Log.DEBUG))
     287                            _log.debug("Adding new " + state);
     288                    } else {
    255289                        // whoops, somebody beat us to it, throw out the state we just created
    256290                        state = oldState;
     291                    }
    257292                }
    258293            }
     
    549584     */
    550585    private void handleCompletelyEstablished(InboundEstablishState state) {
    551         if (state.complete()) return;
     586        if (state.isComplete()) return;
    552587       
    553588        RouterIdentity remote = state.getConfirmedIdentity();
     
    557592        peer.setCurrentMACKey(state.getMACKey());
    558593        peer.setWeRelayToThemAs(state.getSentRelayTag());
     594        // Lookup the peer's MTU from the netdb, since it isn't included in the protocol setup (yet)
     595        // TODO if we don't have RI then we will get it shortly, but too late.
     596        // Perhaps netdb should notify transport when it gets a new RI...
     597        RouterInfo info = _context.netDb().lookupRouterInfoLocally(remote.calculateHash());
     598        if (info != null) {
     599            RouterAddress addr = info.getTargetAddress(UDPTransport.STYLE);
     600            if (addr != null) {
     601                String smtu = addr.getOption(UDPAddress.PROP_MTU);
     602                if (smtu != null) {
     603                    try {
     604                        int mtu = MTU.rectify(Integer.parseInt(smtu));
     605                        peer.setHisMTU(mtu);
     606                    } catch (NumberFormatException nfe) {}
     607                }
     608            }
     609        }
    559610        // 0 is the default
    560611        //peer.setTheyRelayToUsAs(0);
     
    574625        _context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0);
    575626        sendInboundComplete(peer);
     627        OutNetMessage msg;
     628        while ((msg = state.getNextQueuedMessage()) != null) {
     629            if (_context.clock().now() - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) {
     630                msg.timestamp("took too long but established...");
     631                _transport.failed(msg, "Took too long to establish, but it was established");
     632            } else {
     633                msg.timestamp("session fully established and sent");
     634                _transport.send(msg);
     635            }
     636        }
     637        state.complete();
    576638    }
    577639
     
    635697        peer.setCurrentMACKey(state.getMACKey());
    636698        peer.setTheyRelayToUsAs(state.getReceivedRelayTag());
     699        int mtu = state.getRemoteAddress().getMTU();
     700        if (mtu > 0)
     701            peer.setHisMTU(mtu);
    637702        // 0 is the default
    638703        //peer.setWeRelayToThemAs(0);
     
    781846            _log.info("Received RelayResponse for " + state.getRemoteIdentity().calculateHash() + " - they are on "
    782847                      + addr.toString() + ":" + port + " (according to " + bob + ")");
    783         RemoteHostId oldId = state.getRemoteHostId();
    784         state.introduced(addr, ip, port);
    785         RemoteHostId newId = state.getRemoteHostId();
    786         // Swap out the RemoteHostId the state is indexed under
    787         // TODO only if !oldId.equals(newId) ? synch?
    788         OutboundEstablishState oldState = _outboundStates.remove(oldId);
    789         _outboundStates.put(newId, state);
    790         if (_log.shouldLog(Log.DEBUG))
    791             _log.debug("RR replaced " + oldId + " -> " + oldState + " with " + newId + " -> " + state);
     848        synchronized (state) {
     849            RemoteHostId oldId = state.getRemoteHostId();
     850            state.introduced(addr, ip, port);
     851            RemoteHostId newId = state.getRemoteHostId();
     852            // Swap out the RemoteHostId the state is indexed under
     853            // TODO only if !oldId.equals(newId) ? synch?
     854            // FIXME if the RemoteHostIDs aren't the same we have problems
     855            // FIXME if the RemoteHostIDs aren't the same the SessionCreated signature is probably going to fail
     856            // Common occurrence - port changes
     857            if (!oldId.equals(newId)) {
     858                _outboundStates.remove(oldId);
     859                _outboundStates.put(newId, state);
     860                if (_log.shouldLog(Log.WARN))
     861                    _log.warn("RR replaced " + oldId + " with " + newId + " -> " + state);
     862            }
     863        }
    792864        notifyActivity();
    793865    }
     
    924996                switch (inboundState.getState()) {
    925997                  case IB_STATE_REQUEST_RECEIVED:
    926                     if (!expired)
     998                    if (expired)
     999                        processExpired(inboundState);
     1000                    else
    9271001                        sendCreated(inboundState);
    9281002                    break;
     
    9321006                    if (expired) {
    9331007                        sendDestroy(inboundState);
     1008                        processExpired(inboundState);
    9341009                    } else if (inboundState.getNextSendTime() <= now) {
    9351010                        sendCreated(inboundState);
     
    9461021                            _context.blocklist().add(inboundState.getSentIP());
    9471022                            inboundState.fail();
     1023                            processExpired(inboundState);
    9481024                        } else {
    9491025                            handleCompletelyEstablished(inboundState);
     
    9531029                            _log.warn("confirmed with invalid? " + inboundState);
    9541030                        inboundState.fail();
     1031                        processExpired(inboundState);
    9551032                    }
    9561033                    break;
    9571034
     1035                  case IB_STATE_COMPLETE:  // fall through
    9581036                  case IB_STATE_FAILED:
    9591037                    break; // already removed;
     
    11191197            if (_log.shouldLog(Log.INFO))
    11201198                _log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
    1121             while (true) {
    1122                 OutNetMessage msg = outboundState.getNextQueuedMessage();
    1123                 if (msg == null)
    1124                     break;
     1199            OutNetMessage msg;
     1200            while ((msg = outboundState.getNextQueuedMessage()) != null) {
    11251201                _transport.failed(msg, "Expired during failed establish");
    11261202            }
     
    11321208            //_context.profileManager().commErrorOccurred(peer);
    11331209        } else {
    1134             while (true) {
    1135                 OutNetMessage msg = outboundState.getNextQueuedMessage();
    1136                 if (msg == null)
    1137                     break;
     1210            OutNetMessage msg;
     1211            while ((msg = outboundState.getNextQueuedMessage()) != null) {
    11381212                _transport.send(msg);
    11391213            }
     1214        }
     1215    }
     1216
     1217   
     1218    /**
     1219     *  Caller should probably synch on inboundState
     1220     *  @since 0.9.2
     1221     */
     1222    private void processExpired(InboundEstablishState inboundState) {
     1223        OutNetMessage msg;
     1224        while ((msg = inboundState.getNextQueuedMessage()) != null) {
     1225            _transport.failed(msg, "Expired during failed establish");
    11401226        }
    11411227    }
  • router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java

    r3cac01ff r3a54661  
    33import java.io.ByteArrayInputStream;
    44import java.io.IOException;
     5import java.util.Queue;
     6import java.util.concurrent.LinkedBlockingQueue;
    57
    68import net.i2p.data.Base64;
     
    1113import net.i2p.data.SessionKey;
    1214import net.i2p.data.Signature;
     15import net.i2p.router.OutNetMessage;
    1316import net.i2p.router.RouterContext;
    1417import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
     
    5356    private final RemoteHostId _remoteHostId;
    5457    private InboundState _currentState;
    55     private boolean _complete;
     58    private final Queue<OutNetMessage> _queuedMessages;
    5659    // count for backoff
    5760    private int _createdSentCount;
     
    7073        IB_STATE_CONFIRMED_COMPLETELY,
    7174        /** we are explicitly failing it */
    72         IB_STATE_FAILED
     75        IB_STATE_FAILED,
     76        /** Successful completion, PeerState created and added to transport */
     77        IB_STATE_COMPLETE
    7378    }
    7479   
     
    9095        _establishBegin = ctx.clock().now();
    9196        _keyBuilder = dh;
     97        _queuedMessages = new LinkedBlockingQueue();
    9298    }
    9399   
     
    95101
    96102    /** @return if previously complete */
    97     public synchronized boolean complete() {
    98         boolean already = _complete;
    99         _complete = true;
    100         return already;
    101     }
    102    
     103    public synchronized boolean isComplete() {
     104        return _currentState == InboundState.IB_STATE_COMPLETE ||
     105               _currentState == InboundState.IB_STATE_FAILED;
     106    }
     107
     108    /** Notify successful completion */
     109    public synchronized void complete() {
     110        _currentState = InboundState.IB_STATE_COMPLETE;
     111    }
     112   
     113    /**
     114     *  Queue a message to be sent after the session is established.
     115     *  This will only happen if we decide to send something during establishment
     116     *  @since 0.9.2
     117     */
     118    public void addMessage(OutNetMessage msg) {
     119        // chance of a duplicate here in a race, that's ok
     120        if (!_queuedMessages.contains(msg))
     121            _queuedMessages.offer(msg);
     122        else if (_log.shouldLog(Log.WARN))
     123             _log.warn("attempt to add duplicate msg to queue: " + msg);
     124    }
     125
     126    /**
     127     *  Pull from the message queue
     128     *  @return null if none
     129     *  @since 0.9.2
     130     */
     131    public OutNetMessage getNextQueuedMessage() {
     132        return _queuedMessages.poll();
     133    }
     134
    103135    public synchronized void receiveSessionRequest(UDPPacketReader.SessionRequestReader req) {
    104136        if (_receivedX == null)
     
    201233            StringBuilder buf = new StringBuilder(128);
    202234            buf.append("Signing sessionCreated:");
    203             buf.append(" ReceivedX: ").append(Base64.encode(_receivedX));
    204             buf.append(" SentY: ").append(Base64.encode(_sentY));
     235            //buf.append(" ReceivedX: ").append(Base64.encode(_receivedX));
     236            //buf.append(" SentY: ").append(Base64.encode(_sentY));
    205237            buf.append(" Alice: ").append(Addresses.toString(_aliceIP, _alicePort));
    206238            buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
     
    371403    public String toString() {           
    372404        StringBuilder buf = new StringBuilder(128);
    373         buf.append("IES ").append(super.toString());
     405        buf.append("IES ");
     406        buf.append(Addresses.toString(_aliceIP, _alicePort));
    374407        if (_receivedX != null)
    375408            buf.append(" ReceivedX: ").append(Base64.encode(_receivedX, 0, 4));
    376409        if (_sentY != null)
    377410            buf.append(" SentY: ").append(Base64.encode(_sentY, 0, 4));
    378         buf.append(" Alice: ").append(Addresses.toString(_aliceIP, _alicePort));
    379         buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
     411        //buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort));
    380412        buf.append(" RelayTag: ").append(_sentRelayTag);
    381         buf.append(" SignedOn: ").append(_sentSignedOnTime);
     413        //buf.append(" SignedOn: ").append(_sentSignedOnTime);
    382414        buf.append(' ').append(_currentState);
    383415        return buf.toString();
  • router/java/src/net/i2p/router/transport/udp/IntroductionManager.java

    r3cac01ff r3a54661  
    8282    }
    8383   
    84     public PeerState get(long id) {
     84    private PeerState get(long id) {
    8585        return _outbound.get(Long.valueOf(id));
    8686    }
     
    188188    }
    189189
     190    /**
     191     *  We are Charlie and we got this from Bob.
     192     *  Send a HolePunch to Alice, who will soon be sending us a RelayRequest.
     193     *  We should already have a session with Bob, but probably not with Alice.
     194     *
     195     *  We do some throttling here.
     196     */
    190197    void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) {
    191198        if (_context.router().isHidden())
     
    194201            _log.info("Receive relay intro from " + bob);
    195202        _context.statManager().addRateData("udp.receiveRelayIntro", 1, 0);
     203
     204        if (!_transport.allowConnection())
     205            return;
     206
     207        // TODO throttle
     208        // TODO IB req limits
     209        // TODO check if already have a session or in progress state.
     210
    196211        _transport.send(_builder.buildHolePunch(reader));
    197212    }
    198213   
     214    /**
     215     *  We are Bob and we got this from Alice.
     216     *  Send a RelayIntro to Charlie and a RelayResponse to Alice.
     217     *  We should already have a session with Charlie, but not necessarily with Alice.
     218     */
    199219    void receiveRelayRequest(RemoteHostId alice, UDPPacketReader reader) {
    200220        if (_context.router().isHidden())
    201221            return;
    202222        long tag = reader.getRelayRequestReader().readTag();
    203         PeerState charlie = _transport.getPeerState(tag);
     223        PeerState charlie = get(tag);
     224        if (charlie == null) {
     225            if (_log.shouldLog(Log.INFO))
     226                _log.info("Receive relay request from " + alice
     227                      + " with unknown tag");
     228            _context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0);
     229            return;
     230        }
    204231        if (_log.shouldLog(Log.INFO))
    205232            _log.info("Receive relay request from " + alice
    206233                      + " for tag " + tag
    207234                      + " and relaying with " + charlie);
    208         if (charlie == null) {
    209             _context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0);
    210             return;
    211         }
     235
     236        // TODO throttle based on alice identity and/or intro tag?
     237
    212238        _context.statManager().addRateData("udp.receiveRelayRequest", 1, 0);
    213239        byte key[] = new byte[SessionKey.KEYSIZE_BYTES];
  • router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java

    r3cac01ff r3a54661  
    9191    private static final long MAX_DELAY = 15*1000;
    9292
     93    /**
     94     *  @param addr non-null
     95     */
    9396    public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort,
    9497                                  RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr,
     
    114117        _sentX = new byte[UDPPacketReader.SessionRequestReader.X_LENGTH];
    115118        prepareSessionRequest();
    116         if ( (addr != null) && (addr.getIntroducerCount() > 0) ) {
     119        if (addr.getIntroducerCount() > 0) {
    117120            if (_log.shouldLog(Log.DEBUG))
    118121                _log.debug("new outbound establish to " + remotePeer.calculateHash() + ", with address: " + addr);
     
    132135    }
    133136
     137    /** @return non-null */
    134138    public UDPAddress getRemoteAddress() { return _remoteAddress; }
     139
    135140    public void setIntroNonce(long nonce) { _introductionNonce = nonce; }
    136141
     
    138143    public long getIntroNonce() { return _introductionNonce; }
    139144   
     145    /**
     146     *  Queue a message to be sent after the session is established.
     147     */
    140148    public void addMessage(OutNetMessage msg) {
    141149        // chance of a duplicate here in a race, that's ok
     
    191199       
    192200        if (_log.shouldLog(Log.DEBUG))
    193             _log.debug("Receive session created:\neSig: " + Base64.encode(_receivedEncryptedSignature)
    194                        + "\nreceivedIV: " + Base64.encode(_receivedIV)
    195                        + "\nAliceIP: " + Addresses.toString(_aliceIP)
     201            _log.debug("Receive session created:Sig: " + Base64.encode(_receivedEncryptedSignature)
     202                       + "receivedIV: " + Base64.encode(_receivedIV)
     203                       + "AliceIP: " + Addresses.toString(_aliceIP)
    196204                       + " RelayTag: " + _receivedRelayTag
    197205                       + " SignedOn: " + _receivedSignedOnTime
    198                        + "\nthis: " + this.toString());
     206                       + ' ' + this.toString());
    199207       
    200208        if (_currentState == OutboundState.OB_STATE_UNKNOWN ||
     
    213221     *
    214222     *  Generates session key and mac key.
     223     *
     224     * @return true if valid
    215225     */
    216226    public synchronized boolean validateSessionCreated() {
     
    254264
    255265        if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) ||
    256              (_currentState == OutboundState.OB_STATE_REQUEST_SENT) ||
    257266             (_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) )
    258267            _currentState = OutboundState.OB_STATE_REQUEST_SENT;
     
    293302        _receivedSignature = new Signature(signatureBytes);
    294303        if (_log.shouldLog(Log.DEBUG))
    295             _log.debug("Decrypted received signature: \n" + Base64.encode(signatureBytes));
     304            _log.debug("Decrypted received signature: " + Base64.encode(signatureBytes));
    296305    }
    297306
     
    476485   
    477486    /**
    478      *  This changes the remoteHostId from a hash-based one to a IP/Port one
     487     *  This changes the remoteHostId from a hash-based one to a IP/Port one,
     488     *  OR the IP or port could change.
    479489     */
    480490    public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) {
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r3cac01ff r3a54661  
    157157    private int _mtuReceive;
    158158    /** what is the largest packet we will ever send to the peer? */
    159     private final int _largeMTU;
     159    private int _largeMTU;
    160160    /* how many consecutive packets at or under the min MTU have been received */
    161161    private long _consecutiveSmall;
     
    988988        _messagesSent++;
    989989        if (numSends < 2) {
    990             recalculateTimeouts(lifetime);
    991             adjustMTU();
     990            synchronized (this) {
     991                recalculateTimeouts(lifetime);
     992                adjustMTU();
     993            }
    992994        }
    993995        else if (_log.shouldLog(Log.INFO))
     
    997999    }
    9981000
    999     /** adjust the tcp-esque timeouts */
     1001    /**
     1002     *  Adjust the tcp-esque timeouts.
     1003     *  Caller should synch on this
     1004     */
    10001005    private void recalculateTimeouts(long lifetime) {
    10011006        _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation));
     
    10181023    }
    10191024   
     1025    /**
     1026     *  Caller should synch on this
     1027     */
    10201028    private void adjustMTU() {
    10211029        double retransPct = 0;
     
    10381046        }
    10391047    }
     1048
     1049    /**
     1050     *  @since 0.9.2
     1051     */
     1052    public synchronized void setHisMTU(int mtu) {
     1053        if (mtu <= MIN_MTU || mtu >= _largeMTU)
     1054            return;
     1055        _largeMTU = mtu;
     1056        if (mtu < _mtu)
     1057            _mtu = mtu;
     1058    }
    10401059   
    10411060    /** we are resending a packet, so lets jack up the rto */
     
    10551074        congestionOccurred();
    10561075        _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
    1057         adjustMTU();
     1076        synchronized (this) {
     1077            adjustMTU();
     1078        }
    10581079        //_rto *= 2;
    10591080    }
  • router/java/src/net/i2p/router/transport/udp/UDPAddress.java

    r3cac01ff r3a54661  
    6363        return rv.toString();
    6464    }
    65     
     65   
    6666    private void parse(RouterAddress addr) {
    6767        if (addr == null) return;
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    r3cac01ff r3a54661  
    696696    PeerState getPeerState(Hash remotePeer) {
    697697            return _peersByIdent.get(remotePeer);
    698     }
    699    
    700     /**
    701      * get the state for the peer being introduced, or null if we aren't
    702      * offering to introduce anyone with that tag.
    703      */
    704     PeerState getPeerState(long relayTag) {
    705         return _introManager.get(relayTag);
    706698    }
    707699   
     
    12771269            _establisher.establish(msg);
    12781270        }
     1271    }
     1272
     1273    /**
     1274     *  Send only if established, otherwise fail immediately.
     1275     *  Never queue with the establisher.
     1276     *  @since 0.9.2
     1277     */
     1278    void sendIfEstablished(OutNetMessage msg) {
     1279        _fragments.add(msg);
    12791280    }
    12801281
Note: See TracChangeset for help on using the changeset viewer.