Changeset f22865a


Ignore:
Timestamp:
Aug 24, 2011 2:24:25 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
f6d2ac7
Parents:
f99f9e4 (diff), 69701ab (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p.zzz.test4' (head a1d80c1c396eaa49c7b46a69397b36fe9717ff2e)

to branch 'i2p.i2p' (head 7d00d6f11ce1172c218ce44b0a8ac28e4addf03d)

Location:
router/java/src/net/i2p/router
Files:
22 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/OutNetMessagePool.java

    rf99f9e4 rf22865a  
    88 *
    99 */
    10 
    11 import java.util.Comparator;
    1210
    1311import net.i2p.util.Log;
  • router/java/src/net/i2p/router/transport/udp/ACKSender.java

    rf99f9e4 rf22865a  
    1515 * Blocking thread that is given peers by the inboundFragment pool, sending out
    1616 * any outstanding ACKs. 
    17  *
     17 * The ACKs are sent directly to UDPSender,
     18 * bypassing OutboundMessageFragments and PacketPusher.
    1819 */
    1920class ACKSender implements Runnable {
    20     private RouterContext _context;
    21     private Log _log;
    22     private UDPTransport _transport;
    23     private PacketBuilder _builder;
     21    private final RouterContext _context;
     22    private final Log _log;
     23    private final UDPTransport _transport;
     24    private final PacketBuilder _builder;
    2425    /** list of peers (PeerState) who we have received data from but not yet ACKed to */
    2526    private final BlockingQueue<PeerState> _peersToACK;
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    rf99f9e4 rf22865a  
    7373        _context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES);
    7474        _context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES);
     75        // following are for PeerState
     76        _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
     77        _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
     78        _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
     79        _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
     80        _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
     81        _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
     82        _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
     83        _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
     84        _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
     85        _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
     86        //_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
     87        //_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
    7588    }
    7689   
     
    319332    /**
    320333     * Got a SessionDestroy on an established conn
     334     * @since 0.8.1
    321335     */
    322336    void receiveSessionDestroy(RemoteHostId from, PeerState state) {
     
    328342    /**
    329343     * Got a SessionDestroy during outbound establish
     344     * @since 0.8.1
    330345     */
    331346    void receiveSessionDestroy(RemoteHostId from, OutboundEstablishState state) {
     
    339354    /**
    340355     * Got a SessionDestroy - maybe after an inbound establish
     356     * @since 0.8.1
    341357     */
    342358    void receiveSessionDestroy(RemoteHostId from) {
  • router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java

    rf99f9e4 rf22865a  
    1919 */
    2020class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
    21     private RouterContext _context;
    22     private Log _log;
     21    private final RouterContext _context;
     22    private final Log _log;
    2323    /** list of message IDs recently received, so we can ignore in flight dups */
    2424    private DecayingBloomFilter _recentlyCompletedMessages;
    25     private OutboundMessageFragments _outbound;
    26     private UDPTransport _transport;
    27     private ACKSender _ackSender;
    28     private MessageReceiver _messageReceiver;
     25    private final OutboundMessageFragments _outbound;
     26    private final UDPTransport _transport;
     27    private final ACKSender _ackSender;
     28    private final MessageReceiver _messageReceiver;
    2929    private boolean _alive;
    3030   
     
    149149                _ackSender.ackPeer(from);
    150150
    151                 if (_log.shouldLog(Log.INFO))
    152                     _log.info("Message received completely!  " + state);
     151                if (_log.shouldLog(Log.DEBUG))
     152                    _log.debug("Message received completely!  " + state);
    153153
    154154                _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
     
    159159                if (_log.shouldLog(Log.WARN))
    160160                    _log.warn("Message expired while only being partially read: " + state);
    161                 _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString());
     161                _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
    162162            } else if (partialACK) {
    163163                // not expired but not yet complete... lets queue up a partial ACK
     
    175175    }
    176176   
     177    /**
     178     *  @return the number of bitfields in the ack? why?
     179     */
    177180    private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) {
    178181        int rv = 0;
     182        boolean newAck = false;
    179183        if (data.readACKsIncluded()) {
    180             int fragments = 0;
    181184            int ackCount = data.readACKCount();
    182185            if (ackCount > 0) {
     
    187190                for (int i = 0; i < ackCount; i++) {
    188191                    long id = data.readACK(i);
    189                     if (_log.shouldLog(Log.INFO))
    190                         _log.info("Full ACK of message " + id + " received!");
    191                     fragments += _outbound.acked(id, from.getRemotePeer());
     192                    if (from.acked(id)) {
     193                        if (_log.shouldLog(Log.DEBUG))
     194                            _log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer());
     195                        newAck = true;
     196                    //} else if (_log.shouldLog(Log.DEBUG)) {
     197                    //    _log.debug("Dup full ACK of message " + id + " received from " + from.getRemotePeer());
     198                    }
    192199                }
    193200            } else {
     
    202209
    203210                for (int i = 0; i < bitfields.length; i++) {
    204                     if (_log.shouldLog(Log.INFO))
    205                         _log.info("Partial ACK received: " + bitfields[i]);
    206                     _outbound.acked(bitfields[i], from.getRemotePeer());
     211                    if (from.acked(bitfields[i])) {
     212                        if (_log.shouldLog(Log.DEBUG))
     213                            _log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
     214                        newAck = true;
     215                    } else if (_log.shouldLog(Log.DEBUG)) {
     216                        _log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
     217                    }
    207218                }
    208219            }
     
    212223        else
    213224            from.dataReceived();
     225
     226        // Wake up the packet pusher if it is sleeping.
     227        // By calling add(), this also is a failsafe against possible
     228        // races in OutboundMessageFragments.
     229        if (newAck && from.getOutboundMessageCount() > 0)
     230            _outbound.add(from);
     231
    214232        return rv;
    215233    }
  • router/java/src/net/i2p/router/transport/udp/InboundMessageState.java

    rf99f9e4 rf22865a  
    1313 */
    1414class InboundMessageState {
    15     private RouterContext _context;
    16     private Log _log;
    17     private long _messageId;
    18     private Hash _from;
     15    private final RouterContext _context;
     16    private final Log _log;
     17    private final long _messageId;
     18    private final Hash _from;
    1919    /**
    2020     * indexed array of fragments for the message, where not yet
    2121     * received fragments are null.
    2222     */
    23     private ByteArray _fragments[];
     23    private final ByteArray _fragments[];
    2424    /**
    2525     * what is the last fragment in the message (or -1 if not yet known)
    2626     */
    2727    private int _lastFragment;
    28     private long _receiveBegin;
     28    private final long _receiveBegin;
    2929    private int _completeSize;
    3030    private boolean _released;
     
    3434    public static final int MAX_FRAGMENTS = 64;
    3535   
    36     private static final ByteCache _fragmentCache = ByteCache.getInstance(64, 2048);
     36    private static final int MAX_FRAGMENT_SIZE = UDPPacket.MAX_PACKET_SIZE;
     37    private static final ByteCache _fragmentCache = ByteCache.getInstance(64, MAX_FRAGMENT_SIZE);
    3738   
    3839    public InboundMessageState(RouterContext ctx, long messageId, Hash from) {
     
    154155   
    155156    public void releaseResources() {
    156         if (_fragments != null)
    157             for (int i = 0; i < _fragments.length; i++)
     157        for (int i = 0; i < _fragments.length; i++) {
     158            if (_fragments[i] != null) {
    158159                _fragmentCache.release(_fragments[i]);
    159         //_fragments = null;
     160                _fragments[i] = null;
     161            }
     162        }
    160163        _released = true;
    161164    }
     
    179182            buf.append(getCompleteSize()).append(" bytes");
    180183        } else {
    181             for (int i = 0; (_fragments != null) && (i < _fragments.length); i++) {
     184            for (int i = 0; i < _lastFragment; i++) {
    182185                buf.append(" fragment ").append(i);
    183186                if (_fragments[i] != null)
  • router/java/src/net/i2p/router/transport/udp/IntroductionManager.java

    rf99f9e4 rf22865a  
    2222 */
    2323class IntroductionManager {
    24     private RouterContext _context;
    25     private Log _log;
    26     private UDPTransport _transport;
    27     private PacketBuilder _builder;
     24    private final RouterContext _context;
     25    private final Log _log;
     26    private final UDPTransport _transport;
     27    private final PacketBuilder _builder;
    2828    /** map of relay tag to PeerState that should receive the introduction */
    2929    private final Map<Long, PeerState> _outbound;
  • router/java/src/net/i2p/router/transport/udp/MessageReceiver.java

    rf99f9e4 rf22865a  
    2121 */
    2222class MessageReceiver {
    23     private RouterContext _context;
    24     private Log _log;
    25     private UDPTransport _transport;
     23    private final RouterContext _context;
     24    private final Log _log;
     25    private final UDPTransport _transport;
    2626    /** list of messages (InboundMessageState) fully received but not interpreted yet */
    2727    private final BlockingQueue<InboundMessageState> _completeMessages;
  • router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java

    rf99f9e4 rf22865a  
    5151    private RemoteHostId _remoteHostId;
    5252    private final RouterIdentity _remotePeer;
    53     private SessionKey _introKey;
     53    private final SessionKey _introKey;
    5454    private final Queue<OutNetMessage> _queuedMessages;
    5555    private int _currentState;
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java

    rf99f9e4 rf22865a  
    22
    33import java.util.ArrayList;
     4import java.util.Iterator;
    45import java.util.List;
     6import java.util.Set;
    57
    68import net.i2p.data.Hash;
     
    911import net.i2p.router.OutNetMessage;
    1012import net.i2p.router.RouterContext;
     13import net.i2p.util.ConcurrentHashSet;
    1114import net.i2p.util.Log;
    1215
     
    2427 */
    2528class OutboundMessageFragments {
    26     private RouterContext _context;
    27     private Log _log;
    28     private UDPTransport _transport;
     29    private final RouterContext _context;
     30    private final Log _log;
     31    private final UDPTransport _transport;
    2932    // private ActiveThrottle _throttle; // LINT not used ??
    30     /** peers we are actively sending messages to */
    31     private final List<PeerState> _activePeers;
     33
     34    /**
     35     *  Peers we are actively sending messages to.
     36     *  We use the iterator so we treat it like a list,
     37     *  but we use a HashSet so remove() is fast and
     38     *  we don't need to do contains().
     39     *  Even though most (but NOT all) accesses are synchronized,
     40     *  we use a ConcurrentHashSet as the iterator is long-lived.
     41     */
     42    private final Set<PeerState> _activePeers;
     43
     44    /**
     45     *  The long-lived iterator over _activePeers.
     46     */
     47    private Iterator<PeerState> _iterator;
     48
     49    /**
     50     *  Avoid sync in add() if possible (not 100% reliable)
     51     */
     52    private boolean _isWaiting;
     53
    3254    private boolean _alive;
    33     /** which peer should we build the next packet out of? */
    34     private int _nextPeer;
    35     private PacketBuilder _builder;
     55    private final PacketBuilder _builder;
    3656    private long _lastCycleTime = System.currentTimeMillis();
    3757
     
    4363    // don't send a packet more than 10 times
    4464    static final int MAX_VOLLEYS = 10;
     65    private static final int MAX_WAIT = 1000;
    4566
    4667    public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
     
    4970        _transport = transport;
    5071        // _throttle = throttle;
    51         _activePeers = new ArrayList(256);
     72        _activePeers = new ConcurrentHashSet(256);
    5273        _builder = new PacketBuilder(ctx, transport);
    5374        _alive = true;
     
    6081        _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES);
    6182        _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES);
     83        _context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES);
    6284        _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES);
    6385        _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES);
     
    7395
    7496    public void startup() { _alive = true; }
     97
    7598    public void shutdown() {
    7699        _alive = false;
     100        _activePeers.clear();
    77101        synchronized (_activePeers) {
    78102            _activePeers.notifyAll();
    79103        }
    80104    }
     105
    81106    void dropPeer(PeerState peer) {
    82107        if (_log.shouldLog(Log.INFO))
    83108            _log.info("Dropping peer " + peer.getRemotePeer().toBase64());
    84109        peer.dropOutbound();
    85         synchronized (_activePeers) {
    86             _activePeers.remove(peer);
    87             _activePeers.notifyAll();
    88         }
     110        _activePeers.remove(peer);
    89111    }
    90112
     
    146168            }
    147169            int active = peer.add(state);
    148             synchronized (_activePeers) {
    149                 if (!_activePeers.contains(peer)) {
    150                     if (_log.shouldLog(Log.DEBUG))
    151                         _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
    152                     _activePeers.add(peer);
    153                 } else {
    154                     if (_log.shouldLog(Log.DEBUG))
    155                         _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
    156                 }
    157                 _activePeers.notifyAll();
    158             }
    159             //msg.timestamp("made active along with: " + active);
     170            add(peer);
    160171            _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    161172        } else {
     
    163174                _log.warn("Error initializing " + msg);
    164175        }
    165         //finishMessages();
    166176    }
    167177
     
    175185            throw new RuntimeException("wtf, null peer for " + state);
    176186        int active = peer.add(state);
    177         synchronized (_activePeers) {
    178             if (!_activePeers.contains(peer)) {
    179                 if (_log.shouldLog(Log.DEBUG))
    180                     _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
    181                 if (_activePeers.isEmpty())
    182                     _lastCycleTime = System.currentTimeMillis();
    183                 _activePeers.add(peer);
    184             } else {
    185                 if (_log.shouldLog(Log.DEBUG))
    186                     _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
     187        add(peer);
     188        _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     189    }
     190
     191    /**
     192     * Add the peer to the list of peers wanting to transmit something.
     193     * This wakes up the packet pusher if it is sleeping.
     194     *
     195     * Avoid synchronization where possible.
     196     * There are small chances of races.
     197     * There are larger chances of adding the PeerState "behind" where
     198     * the iterator is now... but these issues are the same as before concurrentification.
     199     *
     200     * @since 0.8.9
     201     */
     202    public void add(PeerState peer) {
     203        boolean wasEmpty = _activePeers.isEmpty();
     204        boolean added = _activePeers.add(peer);
     205        if (added) {
     206            if (_log.shouldLog(Log.DEBUG))
     207                _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
     208            if (wasEmpty)
     209                _lastCycleTime = System.currentTimeMillis();
     210        } else {
     211            if (_log.shouldLog(Log.DEBUG))
     212                _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
     213        }
     214        _context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0);
     215
     216        // Avoid sync if possible
     217        // no, this doesn't always work.
     218        // Also note that the iterator in getNextVolley may have alreay passed us,
     219        // or not reflect the addition.
     220        if (_isWaiting || wasEmpty) {
     221            synchronized (_activePeers) {
     222                _activePeers.notifyAll();
    187223            }
    188             _activePeers.notifyAll();
    189         }
    190         _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    191         // should we finish messages here too?
    192         /*
    193         synchronized (_activeMessages) {
    194             _activeMessages.add(state);
    195             if (_activeMessages.size() == 1)
    196                 _lastCycleTime = System.currentTimeMillis();
    197             _activeMessages.notifyAll();
    198         }
    199          */
     224        }
    200225    }
    201226
     
    203228     * Remove any expired or complete messages
    204229     */
     230/****
    205231    private void finishMessages() {
    206         int rv = 0;
    207         List peers = null;
    208         synchronized (_activePeers) {
    209             peers = new ArrayList(_activePeers.size());
    210             for (int i = 0; i < _activePeers.size(); i++) {
    211                 PeerState state = _activePeers.get(i);
    212                 if (state.getOutboundMessageCount() <= 0) {
    213                     _activePeers.remove(i);
    214                     i--;
    215                 } else {
    216                     peers.add(state);
    217                 }
    218             }
    219             _activePeers.notifyAll();
    220         }
    221         for (int i = 0; i < peers.size(); i++) {
    222             PeerState state = (PeerState)peers.get(i);
    223             int remaining = state.finishMessages();
    224             if (remaining <= 0) {
    225                 if (_log.shouldLog(Log.DEBUG))
    226                     _log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
    227             }
    228             rv += remaining;
    229         }
    230     }
    231 
     232        for (Iterator<PeerState> iter = _activePeers.iterator(); iter.hasNext(); ) {
     233             PeerState state = iter.next();
     234             if (state.getOutboundMessageCount() <= 0) {
     235                 iter.remove();
     236             } else {
     237                 int remaining = state.finishMessages();
     238                 if (remaining <= 0) {
     239                     if (_log.shouldLog(Log.DEBUG))
     240                         _log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
     241                     iter.remove();
     242                 }
     243             }
     244         }
     245     }
     246****/
     247 
    232248    /**
    233249     * Fetch all the packets for a message volley, blocking until there is a
     
    236252     * already ACKed fragments.
    237253     *
     254     * NOT thread-safe. Called by the PacketPusher thread only.
     255     *
     256     * @return null only on shutdown
    238257     */
    239258    public UDPPacket[] getNextVolley() {
    240259        PeerState peer = null;
    241260        OutboundMessageState state = null;
     261        // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
     262        int peersProcessed = 0;
    242263        while (_alive && (state == null) ) {
    243             long now = _context.clock().now();
    244             int nextSendDelay = -1;
    245             finishMessages();
    246             try {
    247                 synchronized (_activePeers) {
    248                     for (int i = 0; i < _activePeers.size(); i++) {
    249                         int cur = (i + _nextPeer) % _activePeers.size();
    250                         if (cur == 0) {
    251                             // FIXME or delete, these stats aren't much help since they include the sleep time
    252                             long ts = System.currentTimeMillis();
    253                             long cycleTime = ts - _lastCycleTime;
    254                             _lastCycleTime = ts;
    255                             _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size());
    256                             // make longer than the default sleep time below
    257                             if (cycleTime > 1100)
    258                                 _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
     264            int nextSendDelay = Integer.MAX_VALUE;
     265            // no, not every time - O(n**2) - do just before waiting below
     266            //finishMessages();
     267
     268                    // do we need a new long-lived iterator?
     269                    if (_iterator == null ||
     270                        ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
     271                        _iterator = _activePeers.iterator();
     272                    }
     273
     274                    // Go through all the peers that we are actively sending messages to.
     275                    // Call finishMessages() for each one, and remove them from the iterator
     276                    // if there is nothing left to send.
     277                    // Otherwise, return the volley to be sent.
     278                    // Otherwise, wait()
     279                    while (_iterator.hasNext()) {
     280                        peer = _iterator.next();
     281                        int remaining = peer.finishMessages();
     282                        if (remaining <= 0) {
     283                            // race with add()
     284                            _iterator.remove();
     285                            if (_log.shouldLog(Log.DEBUG))
     286                                _log.debug("No more pending messages for " + peer.getRemotePeer().toBase64());
     287                            continue;
    259288                        }
    260                         peer = _activePeers.get(i);
     289                        peersProcessed++;
    261290                        state = peer.allocateSend();
    262291                        if (state != null) {
    263292                            // we have something to send and we will be returning it
    264                             _nextPeer = i + 1;
     293                            break;
     294                        } else if (peersProcessed >= _activePeers.size()) {
     295                            // we've gone all the way around, time to sleep
    265296                            break;
    266297                        } else {
    267                             // Update the minimum delay for all peers (getNextDelay() returns 1 for "now")
     298                            // Update the minimum delay for all peers
    268299                            // which will be used if we found nothing to send across all peers
    269300                            int delay = peer.getNextDelay();
    270                             if ( (nextSendDelay <= 0) || (delay < nextSendDelay) )
     301                            if (delay < nextSendDelay)
    271302                                nextSendDelay = delay;
    272303                            peer = null;
    273                             state = null;
    274304                        }
    275305                    }
    276                     if (_log.shouldLog(Log.DEBUG))
     306
     307                    if (peer != null && _log.shouldLog(Log.DEBUG))
    277308                        _log.debug("Done looping, next peer we are sending for: " +
    278                                    (peer != null ? peer.getRemotePeer().toBase64() : "none"));
    279                     if (state == null) {
     309                                   peer.getRemotePeer().toBase64());
     310
     311                    // if we've gone all the way through the loop, wait
     312                    // ... unless nextSendDelay says we have more ready now
     313                    if (state == null && peersProcessed >= _activePeers.size() && nextSendDelay > 0) {
     314                        _isWaiting = true;
     315                        peersProcessed = 0;
     316                        // why? we do this in the loop one at a time
     317                        //finishMessages();
     318                        // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
     319                        // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
     320                        // gets called regularly
     321                        int toWait = Math.min(Math.max(nextSendDelay, 10), MAX_WAIT);
    280322                        if (_log.shouldLog(Log.DEBUG))
    281                             _log.debug("wait for " + nextSendDelay);
     323                            _log.debug("wait for " + toWait);
    282324                        // wait.. or somethin'
    283                         // wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says
    284                         if (nextSendDelay > 0)
    285                             _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000));
    286                         else
    287                             _activePeers.wait(1000);
    288                     } else {
    289                         if (_log.shouldLog(Log.DEBUG))
    290                             _log.debug("dont wait: alive=" + _alive + " state = " + state);
    291                     }
    292                 }
    293             } catch (InterruptedException ie) {
    294                 // noop
    295                 if (_log.shouldLog(Log.DEBUG))
    296                     _log.debug("Woken up while waiting");
    297             }
    298         }
     325                        synchronized (_activePeers) {
     326                            try {
     327                                _activePeers.wait(toWait);
     328                            } catch (InterruptedException ie) {
     329                                // noop
     330                                if (_log.shouldLog(Log.DEBUG))
     331                                     _log.debug("Woken up while waiting");
     332                            }
     333                        }
     334                        _isWaiting = false;
     335                    //} else {
     336                    //    if (_log.shouldLog(Log.DEBUG))
     337                    //        _log.debug("dont wait: alive=" + _alive + " state = " + state);
     338                    }
     339
     340        } // while alive && state == null
    299341
    300342        if (_log.shouldLog(Log.DEBUG))
     
    302344
    303345        UDPPacket packets[] = preparePackets(state, peer);
     346
     347      /****
    304348        if ( (state != null) && (state.getMessage() != null) ) {
    305349            int valid = 0;
     
    307351                if (packets[i] != null)
    308352                    valid++;
    309             /*
    310353            state.getMessage().timestamp("sending a volley of " + valid
    311354                                         + " lastReceived: "
     
    313356                                         + " lastSentFully: "
    314357                                         + (_context.clock().now() - peer.getLastSendFullyTime()));
    315             */
    316         }
     358        }
     359       ****/
     360
    317361        return packets;
    318362    }
    319363
     364    /**
     365     *  @return null if state or peer is null
     366     */
    320367    private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
    321368        if ( (state != null) && (peer != null) ) {
     
    398445    }
    399446
    400     /**
    401      * We received an ACK of the given messageId from the given peer, so if it
    402      * is still unacked, mark it as complete.
    403      *
    404      * @return fragments acked
    405      */
    406     public int acked(long messageId, Hash ackedBy) {
    407         PeerState peer = _transport.getPeerState(ackedBy);
    408         if (peer != null) {
    409             if (_log.shouldLog(Log.DEBUG))
    410                 _log.debug("acked [" + messageId + "] by " + ackedBy.toBase64());
    411             return peer.acked(messageId);
    412         } else {
    413             if (_log.shouldLog(Log.DEBUG))
    414                 _log.debug("acked [" + messageId + "] by an unknown remote peer?  " + ackedBy.toBase64());
    415             return 0;
    416         }
    417     }
    418 
    419     public void acked(ACKBitfield bitfield, Hash ackedBy) {
    420         PeerState peer = _transport.getPeerState(ackedBy);
    421         if (peer != null) {
    422             if (_log.shouldLog(Log.DEBUG))
    423                 _log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64());
    424             peer.acked(bitfield);
    425         } else {
    426             if (_log.shouldLog(Log.DEBUG))
    427                 _log.debug("partial acked [" + bitfield + "] by an unknown remote peer?  " + ackedBy.toBase64());
    428         }
    429     }
    430 
    431447    public interface ActiveThrottle {
    432448        public void choke(Hash peer);
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    rf99f9e4 rf22865a  
    1313
    1414/**
    15  * Maintain the outbound fragmentation for resending
     15 * Maintain the outbound fragmentation for resending, for a single message.
    1616 *
    1717 */
    1818class OutboundMessageState {
    19     private I2PAppContext _context;
    20     private Log _log;
     19    private final I2PAppContext _context;
     20    private final Log _log;
    2121    /** may be null if we are part of the establishment */
    2222    private OutNetMessage _message;
     
    5050    }
    5151   
     52/****
    5253    public boolean initialize(OutNetMessage msg) {
    5354        if (msg == null) return false;
     
    6162        }
    6263    }
    63    
     64****/
     65   
     66    /**
     67     *  Called from UDPTransport
     68     */
    6469    public boolean initialize(I2NPMessage msg, PeerState peer) {
    6570        if (msg == null)
     
    7681    }
    7782   
     83    /**
     84     *  Called from OutboundMessageFragments
     85     */
    7886    public boolean initialize(OutNetMessage m, I2NPMessage msg) {
    7987        if ( (m == null) || (msg == null) )
     
    199207       
    200208        boolean rv = isComplete();
     209      /****
    201210        if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed
    202211            long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY);
     
    211220            //_nextSendTime = now;
    212221        }
     222      ****/
    213223        return rv;
    214224    }
  • router/java/src/net/i2p/router/transport/udp/PacketBuilder.java

    rf99f9e4 rf22865a  
    9696 */
    9797class PacketBuilder {
    98     private I2PAppContext _context;
    99     private Log _log;
    100     private UDPTransport _transport;
     98    private final I2PAppContext _context;
     99    private final Log _log;
     100    private final UDPTransport _transport;
    101101   
    102102    private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
     
    657657    /**
    658658     *  Build a destroy packet, which contains a header but no body.
     659     *  Session must be established or this will NPE in authenticate().
     660     *  Unused until 0.8.9.
    659661     *
    660662     *  @since 0.8.1
  • router/java/src/net/i2p/router/transport/udp/PacketHandler.java

    rf99f9e4 rf22865a  
    373373                isValid = packet.validate(state.getMACKey());
    374374                if (isValid) {
    375                     if (_log.shouldLog(Log.WARN))
    376                         _log.warn("Valid introduction packet received for inbound con: " + packet);
     375                    if (_log.shouldLog(Log.INFO))
     376                        _log.info("Valid introduction packet received for inbound con: " + packet);
    377377
    378378                    _state = 32;
     
    419419                isValid = packet.validate(state.getMACKey());
    420420                if (isValid) {
    421                     if (_log.shouldLog(Log.WARN))
    422                         _log.warn("Valid introduction packet received for outbound established con: " + packet);
     421                    if (_log.shouldLog(Log.INFO))
     422                        _log.info("Valid introduction packet received for outbound established con: " + packet);
    423423
    424424                    _state = 37;
     
    433433            isValid = packet.validate(state.getIntroKey());
    434434            if (isValid) {
    435                 if (_log.shouldLog(Log.WARN))
    436                     _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet);
     435                if (_log.shouldLog(Log.INFO))
     436                    _log.info("Valid introduction packet received for outbound established con with old intro key: " + packet);
    437437                _state = 39;
    438438                packet.decrypt(state.getIntroKey());
  • router/java/src/net/i2p/router/transport/udp/PacketPusher.java

    rf99f9e4 rf22865a  
    1212class PacketPusher implements Runnable {
    1313    // private RouterContext _context;
    14     private Log _log;
    15     private OutboundMessageFragments _fragments;
    16     private UDPSender _sender;
    17     private boolean _alive;
     14    private final Log _log;
     15    private final OutboundMessageFragments _fragments;
     16    private final UDPSender _sender;
     17    private volatile boolean _alive;
    1818   
    1919    public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) {
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    rf99f9e4 rf22865a  
    2525 */
    2626class PeerState {
    27     private RouterContext _context;
    28     private Log _log;
     27    private final RouterContext _context;
     28    private final Log _log;
    2929    /**
    3030     * The peer are we talking to.  This should be set as soon as this
     
    158158    private long _consecutiveSmall;
    159159    /** when did we last check the MTU? */
    160     private long _mtuLastChecked;
     160    //private long _mtuLastChecked;
    161161    private long _mtuIncreases;
    162162    private long _mtuDecreases;
     
    193193    private OutboundMessageState _retransmitter;
    194194   
    195     private UDPTransport _transport;
     195    private final UDPTransport _transport;
    196196   
    197197    /** have we migrated away from this peer to another newer one? */
     
    225225     *
    226226     * Well, we really need to count the acks as well, especially
    227      * 4 * MAX_RESEND_ACKS which can take up a significant amount of space.
     227     * 1 + (4 * MAX_RESEND_ACKS_SMALL) which can take up a significant amount of space.
    228228     * We reduce the max acks when using the small MTU but it may not be enough...
    229229     *
     
    235235     * (larger I2NP messages may be up to 1900B-4500B, which isn't going to fit
    236236     * into a live network MTU anyway)
    237      */
    238     private static final int LARGE_MTU = 1350;
     237     *
     238     * TODO
     239     * VTBM is 2646, it would be nice to fit in two large
     240     * 2646 / 2 = 1323
     241     * 1323 + 74 + 46 + 1 + (4 * 9) = 1480
     242     * So why not make it 1492 (old ethernet is 1492, new is 1500)
     243     */
     244    private static final int LARGE_MTU = 1492;
    239245   
    240246    private static final int MIN_RTO = 100 + ACKSender.ACK_FREQUENCY;
     
    262268        _mtu = getDefaultMTU();
    263269        _mtuReceive = _mtu;
    264         _mtuLastChecked = -1;
     270        //_mtuLastChecked = -1;
    265271        _lastACKSend = -1;
    266272        _rto = MIN_RTO;
     
    269275        _inboundMessages = new HashMap(8);
    270276        _outboundMessages = new ArrayList(32);
    271         _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
    272         _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
    273         _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
    274         _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
    275         _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
    276         _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
    277         _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
    278         _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
    279         _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
    280         _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
    281         _context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
    282         _context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
     277        // all createRateStat() moved to EstablishmentManager
    283278    }
    284279   
     
    382377    /** what is the largest packet we can send to the peer? */
    383378    public int getMTU() { return _mtu; }
    384     /** estimate how large the other side is sending packets */
     379
     380    /**
     381     *  Estimate how large the other side's MTU is.
     382     *  This could be wrong.
     383     *  It is used only for the HTML status.
     384     */
    385385    public int getReceiveMTU() { return _mtuReceive; }
     386
    386387    /** when did we last check the MTU? */
     388  /****
    387389    public long getMTULastChecked() { return _mtuLastChecked; }
    388390    public long getMTUIncreases() { return _mtuIncreases; }
    389391    public long getMTUDecreases() { return _mtuDecreases; }
    390    
     392  ****/
    391393   
    392394    /**
     
    549551     */
    550552    public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; }
     553
    551554    /** what is the largest packet we can send to the peer? */
     555  /****
    552556    public void setMTU(int mtu) {
    553557        _mtu = mtu;
    554558        _mtuLastChecked = _context.clock().now();
    555559    }
     560  ****/
     561
    556562    public int getSlowStartThreshold() { return _slowStartThreshold; }
    557563    public int getConcurrentSends() { return _concurrentMessagesActive; }
     
    991997    public long getPacketsReceived() { return _packetsReceived; }
    992998    public long getPacketsReceivedDuplicate() { return _packetsReceivedDuplicate; }
     999
     1000    private static final int MTU_RCV_DISPLAY_THRESHOLD = 20;
     1001
    9931002    public void packetReceived(int size) {
    9941003        _packetsReceived++;
    995         if (size <= MIN_MTU)
     1004        if (size <= MIN_MTU) {
    9961005            _consecutiveSmall++;
    997         else
     1006        } else {
    9981007            _consecutiveSmall = 0;
    999        
    1000         if (_packetsReceived > 50) {
    1001             if (_consecutiveSmall < 50)
     1008            _mtuReceive = LARGE_MTU;
     1009            return;
     1010        }
     1011       
     1012        if (_packetsReceived > MTU_RCV_DISPLAY_THRESHOLD) {
     1013            if (_consecutiveSmall < MTU_RCV_DISPLAY_THRESHOLD)
    10021014                _mtuReceive = LARGE_MTU;
    10031015            else
     
    10621074            _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
    10631075        List<OutboundMessageState> msgs = _outboundMessages;
    1064         if (msgs == null) return 0;
    10651076        int rv = 0;
    10661077        boolean fail = false;
     
    10711082                fail = true;
    10721083                rv--;
     1084
     1085         /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
     1086
    10731087            } else if (_retransmitter != null) {
    10741088                long lifetime = _retransmitter.getLifetime();
    10751089                long totalLifetime = lifetime;
    10761090                for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter
    1077                     OutboundMessageState cur = (OutboundMessageState)msgs.get(i);
     1091                    OutboundMessageState cur = msgs.get(i);
    10781092                    totalLifetime += cur.getLifetime();
    10791093                }
     
    11041118                    msgs.add(state);
    11051119                }
     1120
     1121             *******/
     1122
    11061123            } else {
    11071124                msgs.add(state);
     
    11121129        return rv;
    11131130    }
     1131
    11141132    /** drop all outbound messages */
    11151133    public void dropOutbound() {
     
    11191137        //_outboundMessages = null;
    11201138        _retransmitter = null;
    1121         if (msgs != null) {
     1139
    11221140            int sz = 0;
    11231141            List<OutboundMessageState> tempList = null;
     
    11311149            for (int i = 0; i < sz; i++)
    11321150                _transport.failed(tempList.get(i), false);
    1133         }
     1151
    11341152        // so the ACKSender will drop this peer from its queue
    11351153        _wantACKSendSince = -1;
    11361154    }
    11371155   
     1156    /**
     1157     * @return number of active outbound messages remaining (unsynchronized)
     1158     */
    11381159    public int getOutboundMessageCount() {
    1139         List<OutboundMessageState> msgs = _outboundMessages;
    11401160        if (_dead) return 0;
    1141         if (msgs != null) {
    1142             synchronized (msgs) {
    1143                 return msgs.size();
    1144             }
    1145         } else {
    1146             return 0;
    1147         }
     1161        return _outboundMessages.size();
    11481162    }
    11491163   
     
    11531167     */
    11541168    public int finishMessages() {
    1155         int rv = 0;
    11561169        List<OutboundMessageState> msgs = _outboundMessages;
     1170        // short circuit, unsynchronized
     1171        if (msgs.isEmpty())
     1172            return 0;
     1173
    11571174        if (_dead) {
    11581175            dropOutbound();
    11591176            return 0;
    11601177        }
     1178
     1179        int rv = 0;
    11611180        List<OutboundMessageState> succeeded = null;
    11621181        List<OutboundMessageState> failed = null;
    11631182        synchronized (msgs) {
    1164             int size = msgs.size();
    1165             for (int i = 0; i < size; i++) {
    1166                 OutboundMessageState state = msgs.get(i);
     1183            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1184                OutboundMessageState state = iter.next();
    11671185                if (state.isComplete()) {
    1168                     msgs.remove(i);
    1169                     i--;
    1170                     size--;
     1186                    iter.remove();
    11711187                    if (_retransmitter == state)
    11721188                        _retransmitter = null;
     
    11741190                    succeeded.add(state);
    11751191                } else if (state.isExpired()) {
    1176                     msgs.remove(i);
    1177                     i--;
    1178                     size--;
     1192                    iter.remove();
    11791193                    if (_retransmitter == state)
    11801194                        _retransmitter = null;
     
    11831197                    failed.add(state);
    11841198                } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
    1185                     msgs.remove(i);
    1186                     i--;
    1187                     size--;
     1199                    iter.remove();
    11881200                    if (state == _retransmitter)
    11891201                        _retransmitter = null;
     
    12331245        if (_dead) return null;
    12341246        synchronized (msgs) {
    1235             int size = msgs.size();
    1236             for (int i = 0; i < size; i++) {
    1237                 OutboundMessageState state = msgs.get(i);
     1247            for (OutboundMessageState state : msgs) {
    12381248                if (locked_shouldSend(state)) {
    12391249                    if (_log.shouldLog(Log.DEBUG))
     
    12621272   
    12631273    /**
    1264      * return how long to wait before sending, or -1 if we have nothing to send
     1274     * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
     1275     *         If ready now, will return 0 or a negative value.
    12651276     */
    12661277    public int getNextDelay() {
    1267         int rv = -1;
     1278        int rv = Integer.MAX_VALUE;
     1279        if (_dead) return rv;
    12681280        long now = _context.clock().now();
    12691281        List<OutboundMessageState> msgs = _outboundMessages;
    1270         if (_dead) return -1;
    12711282        synchronized (msgs) {
    12721283            if (_retransmitter != null) {
    12731284                rv = (int)(_retransmitter.getNextSendTime() - now);
    1274                 if (rv <= 0)
    1275                     return 1;
    1276                 else
    1277                     return rv;
    1278             }
    1279             int size = msgs.size();
    1280             for (int i = 0; i < size; i++) {
    1281                 OutboundMessageState state = msgs.get(i);
     1285                return rv;
     1286            }
     1287            for (OutboundMessageState state : msgs) {
    12821288                int delay = (int)(state.getNextSendTime() - now);
    1283                 if (delay <= 0)
    1284                     delay = 1;
    1285                 if ( (rv <= 0) || (delay < rv) )
     1289                if (delay < rv)
    12861290                    rv = delay;
    12871291            }
     
    13071311    static final int UDP_HEADER_SIZE = 8;
    13081312    static final int IP_HEADER_SIZE = 20;
    1309     /** how much payload data can we shove in there? */
     1313
     1314    /**
     1315     *  how much payload data can we shove in there?
     1316     *  @return MTU - 74
     1317     */
    13101318    private static final int fragmentSize(int mtu) {
    13111319        return mtu - SSU_HEADER_SIZE - UDP_HEADER_SIZE - IP_HEADER_SIZE;
     
    13161324        if (state.getNextSendTime() <= now) {
    13171325            if (!state.isFragmented()) {
    1318                 state.fragment(fragmentSize(getMTU()));
     1326                state.fragment(fragmentSize(_mtu));
    13191327                if (state.getMessage() != null)
    13201328                    state.getMessage().timestamp("fragment into " + state.getFragmentCount());
     
    13731381                //if (state.getMessage() != null)
    13741382                //    state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining());
    1375                 if (_log.shouldLog(Log.WARN))
    1376                     _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
     1383                if (_log.shouldLog(Log.INFO))
     1384                    _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes()
    13771385                              + " available=" + getSendWindowBytesRemaining()
    13781386                              + " for message " + state.getMessageId() + ": " + state);
    13791387                state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) +
    13801388                                      _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK);
    1381                 if (_log.shouldLog(Log.WARN))
    1382                     _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
     1389                if (_log.shouldLog(Log.INFO))
     1390                    _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
    13831391                //_throttle.choke(peer.getRemotePeer());
    13841392
     
    13941402    }
    13951403   
    1396     public int acked(long messageId) {
     1404    /**
     1405     *  A full ACK was received.
     1406     *
     1407     *  @return true if the message was acked for the first time
     1408     */
     1409    public boolean acked(long messageId) {
     1410        if (_dead) return false;
    13971411        OutboundMessageState state = null;
    13981412        List<OutboundMessageState> msgs = _outboundMessages;
    1399         if (_dead) return 0;
    14001413        synchronized (msgs) {
    1401             int sz = msgs.size();
    1402             for (int i = 0; i < sz; i++) {
    1403                 state = msgs.get(i);
     1414            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1415                state = iter.next();
    14041416                if (state.getMessageId() == messageId) {
    1405                     msgs.remove(i);
     1417                    iter.remove();
    14061418                    break;
    14071419                } else {
     
    14391451           
    14401452            state.releaseResources();
    1441             return numFragments;
    14421453        } else {
    14431454            // dupack, likely
    14441455            if (_log.shouldLog(Log.DEBUG))
    14451456                _log.debug("Received an ACK for a message not pending: " + messageId);
    1446             return 0;
    1447         }
    1448     }
    1449    
    1450     public void acked(ACKBitfield bitfield) {
     1457        }
     1458        return state != null;
     1459    }
     1460   
     1461    /**
     1462     *  A partial ACK was received. This is much less common than full ACKs.
     1463     *
     1464     *  @return true if the message was completely acked for the first time
     1465     */
     1466    public boolean acked(ACKBitfield bitfield) {
    14511467        if (_dead)
    1452             return;
     1468            return false;
    14531469       
    14541470        if (bitfield.receivedComplete()) {
    1455             acked(bitfield.getMessageId());
    1456             return;
     1471            return acked(bitfield.getMessageId());
    14571472        }
    14581473   
     
    14621477        boolean isComplete = false;
    14631478        synchronized (msgs) {
    1464             for (int i = 0; i < msgs.size(); i++) {
    1465                 state = msgs.get(i);
     1479            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1480                state = iter.next();
    14661481                if (state.getMessageId() == bitfield.getMessageId()) {
    14671482                    boolean complete = state.acked(bitfield);
    14681483                    if (complete) {
    14691484                        isComplete = true;
    1470                         msgs.remove(i);
     1485                        iter.remove();
    14711486                        if (state == _retransmitter)
    14721487                            _retransmitter = null;
     
    15151530                //    state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
    15161531            }
    1517             return;
     1532            return isComplete;
    15181533        } else {
    15191534            // dupack
    15201535            if (_log.shouldLog(Log.DEBUG))
    15211536                _log.debug("Received an ACK for a message not pending: " + bitfield);
    1522             return;
     1537            return false;
    15231538        }
    15241539    }
     
    15811596        }
    15821597    }
     1598
     1599    // why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
    15831600
    15841601    /*
  • router/java/src/net/i2p/router/transport/udp/PeerTestManager.java

    rf99f9e4 rf22865a  
    9292 */
    9393class PeerTestManager {
    94     private RouterContext _context;
    95     private Log _log;
    96     private UDPTransport _transport;
    97     private PacketBuilder _packetBuilder;
     94    private final RouterContext _context;
     95    private final Log _log;
     96    private final UDPTransport _transport;
     97    private final PacketBuilder _packetBuilder;
    9898    /** map of Long(nonce) to PeerTestState for tests currently in progress (as Bob/Charlie) */
    9999    private final Map<Long, PeerTestState> _activeTests;
     
    102102    private boolean _currentTestComplete;
    103103    /** as Alice */
    104     private Queue<Long> _recentTests;
     104    private final Queue<Long> _recentTests;
    105105   
    106106    /** longest we will keep track of a Charlie nonce for */
  • router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java

    rf99f9e4 rf22865a  
    1818 *
    1919 * WARNING - UNUSED since 0.6.1.11
    20  * See comments in DQAT.java and mtn history ca. 2006-02-19
     20 * See comments in DummyThrottle.java and mtn history ca. 2006-02-19
    2121 *
    2222 */
  • router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java

    rf99f9e4 rf22865a  
    138138     * the bandwidth limiter)
    139139     *
    140      * @return number of packets in the send queue
     140     * @return ZERO (used to be number of packets in the queue)
    141141     */
    142142    public int send(UDPPacket packet) {
  • router/java/src/net/i2p/router/transport/udp/UDPPacket.java

    rf99f9e4 rf22865a  
    2020    private I2PAppContext _context;
    2121    private static Log _log;
    22     private volatile DatagramPacket _packet;
     22    private final DatagramPacket _packet;
    2323    private volatile short _priority;
    2424    private volatile long _initializeTime;
    2525    private volatile long _expiration;
    26     private byte[] _data;
    27     private byte[] _validateBuf;
    28     private byte[] _ivBuf;
     26    private final byte[] _data;
     27    private final byte[] _validateBuf;
     28    private final byte[] _ivBuf;
    2929    private volatile int _markedType;
    3030    private volatile RemoteHostId _remoteHost;
     
    5252    }
    5353   
    54     static final int MAX_PACKET_SIZE = 2048;
     54    /**
     55     *  Actually it is one less than this, we assume
     56     *  if a received packet is this big it is truncated.
     57     *  This is bigger than PeerState.LARGE_MTU, as the far-end's
     58     *  LARGE_MTU may be larger than ours.
     59     */
     60    static final int MAX_PACKET_SIZE = 1536;
    5561    public static final int IV_SIZE = 16;
    5662    public static final int MAC_SIZE = 16;
     
    8288    private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE;
    8389
    84     private UDPPacket(I2PAppContext ctx, boolean inbound) {
     90    private UDPPacket(I2PAppContext ctx) {
    8591        ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES);
    8692        // the data buffer is clobbered on init(..), but we need it to bootstrap
     
    8995        _validateBuf = new byte[MAX_VALIDATE_SIZE];
    9096        _ivBuf = new byte[IV_SIZE];
    91         init(ctx, inbound);
    92     }
    93     // FIXME optimization, remove the inbound parameter, as it is unused. FIXME
    94     private void init(I2PAppContext ctx, boolean inbound) {
     97        init(ctx);
     98    }
     99
     100    private void init(I2PAppContext ctx) {
    95101        _context = ctx;
    96102        //_dataBuf = _dataCache.acquire();
    97103        Arrays.fill(_data, (byte)0);
    98104        //_packet = new DatagramPacket(_data, MAX_PACKET_SIZE);
     105        //
     106        // WARNING -
     107        // Doesn't seem like we should have to do this every time,
     108        // from reading the DatagramPacket javadocs,
     109        // but we get massive corruption without it.
    99110        _packet.setData(_data);
    100111        // _isInbound = inbound;
     
    263274    }
    264275   
     276    /**
     277     *  @param inbound unused
     278     */
    265279    public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
    266280        UDPPacket rv = null;
     
    268282            rv = _packetCache.poll();
    269283            if (rv != null)
    270                 rv.init(ctx, inbound);
     284                rv.init(ctx);
    271285        }
    272286        if (rv == null)
    273             rv = new UDPPacket(ctx, inbound);
     287            rv = new UDPPacket(ctx);
    274288        //if (rv._acquiredBy != null) {
    275289        //    _log.log(Log.CRIT, "Already acquired!  current stack trace is:", new Exception());
  • router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java

    rf99f9e4 rf22865a  
    1717 */
    1818class UDPPacketReader {
    19     private I2PAppContext _context;
    20     private Log _log;
     19    private final I2PAppContext _context;
     20    private final Log _log;
    2121    private byte _message[];
    2222    private int _payloadBeginOffset;
    2323    private int _payloadLength;
    24     private SessionRequestReader _sessionRequestReader;
    25     private SessionCreatedReader _sessionCreatedReader;
    26     private SessionConfirmedReader _sessionConfirmedReader;
    27     private DataReader _dataReader;
    28     private PeerTestReader _peerTestReader;
    29     private RelayRequestReader _relayRequestReader;
    30     private RelayIntroReader _relayIntroReader;
    31     private RelayResponseReader _relayResponseReader;
     24    private final SessionRequestReader _sessionRequestReader;
     25    private final SessionCreatedReader _sessionCreatedReader;
     26    private final SessionConfirmedReader _sessionConfirmedReader;
     27    private final DataReader _dataReader;
     28    private final PeerTestReader _peerTestReader;
     29    private final RelayRequestReader _relayRequestReader;
     30    private final RelayIntroReader _relayIntroReader;
     31    private final RelayResponseReader _relayResponseReader;
    3232   
    3333    private static final int KEYING_MATERIAL_LENGTH = 64;
     
    355355            return ((int)DataHelper.fromLong(_message, off, 2)) & 0x3FFF;
    356356        }
    357         public void readMessageFragment(int fragmentNum, byte target[], int targetOffset) {
     357
     358        public void readMessageFragment(int fragmentNum, byte target[], int targetOffset)
     359                                                      throws ArrayIndexOutOfBoundsException {
    358360            int off = getFragmentBegin(fragmentNum);
    359361            off += 4; // messageId
  • router/java/src/net/i2p/router/transport/udp/UDPReceiver.java

    rf99f9e4 rf22865a  
    244244                    // and block after we know how much we read but before
    245245                    // we release the packet to the inbound queue
     246                    if (size >= UDPPacket.MAX_PACKET_SIZE) {
     247                        // DatagramSocket javadocs: If the message is longer than the packet's length, the message is truncated.
     248                        throw new IOException("packet too large! truncated and dropped");
     249                    }
    246250                    if (size > 0) {
    247251                        //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver");
  • router/java/src/net/i2p/router/transport/udp/UDPSender.java

    rf99f9e4 rf22865a  
    1717 */
    1818class UDPSender {
    19     private RouterContext _context;
    20     private Log _log;
     19    private final RouterContext _context;
     20    private final Log _log;
    2121    private DatagramSocket _socket;
    2222    private String _name;
    2323    private final BlockingQueue<UDPPacket> _outboundQueue;
    2424    private boolean _keepRunning;
    25     private Runner _runner;
     25    private final Runner _runner;
    2626    private static final int TYPE_POISON = 99999;
    2727   
     
    9292     *
    9393     * @param blockTime how long to block IGNORED
    94      * @return number of packets queued
     94     * @return ZERO (used to be number of packets in the queue)
    9595     * @deprecated use add(packet)
    9696     */
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    rf99f9e4 rf22865a  
    6565    private final ExpirePeerEvent _expireEvent;
    6666    private final PeerTestEvent _testEvent;
     67    private final PacketBuilder _destroyBuilder;
    6768    private short _reachabilityStatus;
    6869    private long _reachabilityStatusLastUpdated;
     
    185186        _dropList = new ConcurrentHashSet(2);
    186187       
    187         // See comments in DQAT.java
     188        // See comments in DummyThrottle.java
    188189        if (USE_PRIORITY) {
    189190            TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this);
     
    201202        }
    202203
     204        _destroyBuilder = new PacketBuilder(_context, this);
    203205        _fragments = new OutboundMessageFragments(_context, this, _activeThrottle);
    204206        _inboundFragments = new InboundMessageFragments(_context, _fragments, this);
     
    297299            _handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager);
    298300       
    299         // See comments in DQAT.java
     301        // See comments in DummyThrottle.java
    300302        if (USE_PRIORITY && _refiller == null)
    301303            _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages);
     
    338340   
    339341    public void shutdown() {
     342        destroyAll();
    340343        if (_endpoint != null)
    341344            _endpoint.shutdown();
     
    346349        if (_handler != null)
    347350            _handler.shutdown();
    348         _fragments.shutdown();
    349351        if (_pusher != null)
    350352            _pusher.shutdown();
     353        _fragments.shutdown();
    351354        if (_establisher != null)
    352355            _establisher.shutdown();
     
    354357        _expireEvent.setIsAlive(false);
    355358        _testEvent.setIsAlive(false);
     359        _peersByRemoteHost.clear();
     360        _peersByIdent.clear();
     361        _dropList.clear();
     362        _introManager.reset();
    356363    }
    357364   
     
    10121019    }
    10131020   
     1021    /**
     1022     *  This sends it directly out, bypassing OutboundMessageFragments
     1023     *  and the PacketPusher. The only queueing is for the bandwidth limiter.
     1024     *
     1025     *  @return ZERO (used to be number of packets in the queue)
     1026     */
    10141027    int send(UDPPacket packet) {
    10151028        if (_log.shouldLog(Log.DEBUG))
     
    10181031    }
    10191032   
     1033    /**
     1034     *  Send a session destroy message, bypassing OMF and PacketPusher.
     1035     *
     1036     *  @since 0.8.9
     1037     */
     1038    private void sendDestroy(PeerState peer) {
     1039        // peer must be fully established
     1040        if (peer.getCurrentCipherKey() == null)
     1041            return;
     1042        UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer);
     1043        if (_log.shouldLog(Log.WARN))
     1044            _log.warn("Sending destroy to : " + peer);
     1045        send(pkt);
     1046    }
     1047
     1048    /**
     1049     *  Send a session destroy message to everybody
     1050     *
     1051     *  @since 0.8.9
     1052     */
     1053    private void destroyAll() {
     1054        int howMany = _peersByIdent.size();
     1055        if (_log.shouldLog(Log.WARN))
     1056            _log.warn("Sending destroy to : " + howMany + " peers");
     1057        for (PeerState peer : _peersByIdent.values()) {
     1058            sendDestroy(peer);
     1059        }
     1060        int toSleep = Math.min(howMany / 3, 750);
     1061        if (toSleep > 0) {
     1062            try {
     1063                Thread.sleep(toSleep);
     1064            } catch (InterruptedException ie) {}
     1065        }
     1066    }
     1067
    10201068    /** minimum active peers to maintain IP detection, etc. */
    10211069    private static final int MIN_PEERS = 3;
     
    11131161   
    11141162    public String getStyle() { return STYLE; }
     1163
    11151164    @Override
    11161165    public void send(OutNetMessage msg) {
     
    11521201                _log.debug("Add to fragments for " + to.toBase64());
    11531202
    1154             // See comments in DQAT.java
     1203            // See comments in DummyThrottle.java
    11551204            if (USE_PRIORITY)
    11561205                _outboundMessages.add(msg);
     
    11641213        }
    11651214    }
     1215
    11661216    void send(I2NPMessage msg, PeerState peer) {
    11671217        if (_log.shouldLog(Log.DEBUG))
     
    22352285                }
    22362286
    2237             for (int i = 0; i < _expireBuffer.size(); i++)
    2238                 dropPeer(_expireBuffer.get(i), false, "idle too long");
     2287            for (PeerState peer : _expireBuffer) {
     2288                sendDestroy(peer);
     2289                dropPeer(peer, false, "idle too long");
     2290            }
    22392291            _expireBuffer.clear();
    22402292
Note: See TracChangeset for help on using the changeset viewer.