Changeset 49bba10


Ignore:
Timestamp:
Jul 23, 2011 11:16:28 PM (10 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
702da0a
Parents:
ddec4f8
Message:
  • UDP:
    • Complete rewrite of OutboundMessageFragments? for concurrent and for efficiency to avoid O(n2) behavior
    • Queue a new send immediately after a packet is acked
    • Cleanups, log tweaks, javadocs, final
Location:
router/java/src/net/i2p/router/transport/udp
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/transport/udp/ACKSender.java

    rddec4f8 r49bba10  
    1515 * Blocking thread that is given peers by the inboundFragment pool, sending out
    1616 * any outstanding ACKs. 
    17  *
     17 * The ACKs are sent directly to UDPSender,
     18 * bypassing OutboundMessageFragments and PacketPusher.
    1819 */
    1920class ACKSender implements Runnable {
    20     private RouterContext _context;
    21     private Log _log;
    22     private UDPTransport _transport;
    23     private PacketBuilder _builder;
     21    private final RouterContext _context;
     22    private final Log _log;
     23    private final UDPTransport _transport;
     24    private final PacketBuilder _builder;
    2425    /** list of peers (PeerState) who we have received data from but not yet ACKed to */
    2526    private final BlockingQueue<PeerState> _peersToACK;
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    rddec4f8 r49bba10  
    7373        _context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", UDPTransport.RATES);
    7474        _context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", UDPTransport.RATES);
     75        // following are for PeerState
     76        _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
     77        _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
     78        _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
     79        _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
     80        _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
     81        _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
     82        _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
     83        _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
     84        _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
     85        _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
     86        //_context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
     87        //_context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
    7588    }
    7689   
  • router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java

    rddec4f8 r49bba10  
    1919 */
    2020class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
    21     private RouterContext _context;
    22     private Log _log;
     21    private final RouterContext _context;
     22    private final Log _log;
    2323    /** list of message IDs recently received, so we can ignore in flight dups */
    2424    private DecayingBloomFilter _recentlyCompletedMessages;
    25     private OutboundMessageFragments _outbound;
    26     private UDPTransport _transport;
    27     private ACKSender _ackSender;
    28     private MessageReceiver _messageReceiver;
     25    private final OutboundMessageFragments _outbound;
     26    private final UDPTransport _transport;
     27    private final ACKSender _ackSender;
     28    private final MessageReceiver _messageReceiver;
    2929    private boolean _alive;
    3030   
     
    149149                _ackSender.ackPeer(from);
    150150
    151                 if (_log.shouldLog(Log.INFO))
    152                     _log.info("Message received completely!  " + state);
     151                if (_log.shouldLog(Log.DEBUG))
     152                    _log.debug("Message received completely!  " + state);
    153153
    154154                _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
     
    175175    }
    176176   
     177    /**
     178     *  @return the number of bitfields in the ack? why?
     179     */
    177180    private int receiveACKs(PeerState from, UDPPacketReader.DataReader data) {
    178181        int rv = 0;
     182        boolean newAck = false;
    179183        if (data.readACKsIncluded()) {
    180             int fragments = 0;
    181184            int ackCount = data.readACKCount();
    182185            if (ackCount > 0) {
     
    187190                for (int i = 0; i < ackCount; i++) {
    188191                    long id = data.readACK(i);
    189                     if (_log.shouldLog(Log.INFO))
    190                         _log.info("Full ACK of message " + id + " received!");
    191                     fragments += _outbound.acked(id, from.getRemotePeer());
     192                    if (from.acked(id)) {
     193                        if (_log.shouldLog(Log.DEBUG))
     194                            _log.debug("First full ACK of message " + id + " received from " + from.getRemotePeer());
     195                        newAck = true;
     196                    //} else if (_log.shouldLog(Log.DEBUG)) {
     197                    //    _log.debug("Dup full ACK of message " + id + " received from " + from.getRemotePeer());
     198                    }
    192199                }
    193200            } else {
     
    202209
    203210                for (int i = 0; i < bitfields.length; i++) {
    204                     if (_log.shouldLog(Log.INFO))
    205                         _log.info("Partial ACK received: " + bitfields[i]);
    206                     _outbound.acked(bitfields[i], from.getRemotePeer());
     211                    if (from.acked(bitfields[i])) {
     212                        if (_log.shouldLog(Log.DEBUG))
     213                            _log.debug("Final partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
     214                        newAck = true;
     215                    } else if (_log.shouldLog(Log.DEBUG)) {
     216                        _log.debug("Partial ACK received: " + bitfields[i] + " from " + from.getRemotePeer());
     217                    }
    207218                }
    208219            }
     
    212223        else
    213224            from.dataReceived();
     225
     226        // Wake up the packet pusher if it is sleeping.
     227        // By calling add(), this also is a failsafe against possible
     228        // races in OutboundMessageFragments.
     229        if (newAck && from.getOutboundMessageCount() > 0)
     230            _outbound.add(from);
     231
    214232        return rv;
    215233    }
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java

    rddec4f8 r49bba10  
    22
    33import java.util.ArrayList;
     4import java.util.Iterator;
    45import java.util.List;
     6import java.util.Set;
    57
    68import net.i2p.data.Hash;
     
    911import net.i2p.router.OutNetMessage;
    1012import net.i2p.router.RouterContext;
     13import net.i2p.util.ConcurrentHashSet;
    1114import net.i2p.util.Log;
    1215
     
    2427 */
    2528class OutboundMessageFragments {
    26     private RouterContext _context;
    27     private Log _log;
    28     private UDPTransport _transport;
     29    private final RouterContext _context;
     30    private final Log _log;
     31    private final UDPTransport _transport;
    2932    // private ActiveThrottle _throttle; // LINT not used ??
    30     /** peers we are actively sending messages to */
    31     private final List<PeerState> _activePeers;
     33
     34    /**
     35     *  Peers we are actively sending messages to.
     36     *  We use the iterator so we treat it like a list,
     37     *  but we use a HashSet so remove() is fast and
     38     *  we don't need to do contains().
     39     *  Even though most (but NOT all) accesses are synchronized,
     40     *  we use a ConcurrentHashSet as the iterator is long-lived.
     41     */
     42    private final Set<PeerState> _activePeers;
     43
     44    /**
     45     *  The long-lived iterator over _activePeers.
     46     */
     47    private Iterator<PeerState> _iterator;
     48
     49    /**
     50     *  Avoid sync in add() if possible (not 100% reliable)
     51     */
     52    private boolean _isWaiting;
     53
    3254    private boolean _alive;
    33     /** which peer should we build the next packet out of? */
    34     private int _nextPeer;
    35     private PacketBuilder _builder;
     55    private final PacketBuilder _builder;
    3656    private long _lastCycleTime = System.currentTimeMillis();
    3757
     
    4363    // don't send a packet more than 10 times
    4464    static final int MAX_VOLLEYS = 10;
     65    private static final int MAX_WAIT = 1000;
    4566
    4667    public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) {
     
    4970        _transport = transport;
    5071        // _throttle = throttle;
    51         _activePeers = new ArrayList(256);
     72        _activePeers = new ConcurrentHashSet(256);
    5273        _builder = new PacketBuilder(ctx, transport);
    5374        _alive = true;
     
    6081        _context.statManager().createRateStat("udp.sendAggressiveFailed", "How many volleys was a packet sent before we gave up", "udp", UDPTransport.RATES);
    6182        _context.statManager().createRateStat("udp.outboundActiveCount", "How many messages are in the peer's active pool", "udp", UDPTransport.RATES);
     83        _context.statManager().createRateStat("udp.outboundActivePeers", "How many peers we are actively sending to", "udp", UDPTransport.RATES);
    6284        _context.statManager().createRateStat("udp.sendRejected", "What volley are we on when the peer was throttled (time == message lifetime)", "udp", UDPTransport.RATES);
    6385        _context.statManager().createRateStat("udp.partialACKReceived", "How many fragments were partially ACKed (time == message lifetime)", "udp", UDPTransport.RATES);
     
    7395
    7496    public void startup() { _alive = true; }
     97
    7598    public void shutdown() {
    7699        _alive = false;
     100        _activePeers.clear();
    77101        synchronized (_activePeers) {
    78102            _activePeers.notifyAll();
    79103        }
    80104    }
     105
    81106    void dropPeer(PeerState peer) {
    82107        if (_log.shouldLog(Log.INFO))
    83108            _log.info("Dropping peer " + peer.getRemotePeer().toBase64());
    84109        peer.dropOutbound();
    85         synchronized (_activePeers) {
    86             _activePeers.remove(peer);
    87             _activePeers.notifyAll();
    88         }
     110        _activePeers.remove(peer);
    89111    }
    90112
     
    146168            }
    147169            int active = peer.add(state);
    148             synchronized (_activePeers) {
    149                 if (!_activePeers.contains(peer)) {
    150                     if (_log.shouldLog(Log.DEBUG))
    151                         _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
    152                     _activePeers.add(peer);
    153                 } else {
    154                     if (_log.shouldLog(Log.DEBUG))
    155                         _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
    156                 }
    157                 _activePeers.notifyAll();
    158             }
    159             //msg.timestamp("made active along with: " + active);
     170            add(peer);
    160171            _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    161172        } else {
     
    163174                _log.warn("Error initializing " + msg);
    164175        }
    165         //finishMessages();
    166176    }
    167177
     
    175185            throw new RuntimeException("wtf, null peer for " + state);
    176186        int active = peer.add(state);
    177         synchronized (_activePeers) {
    178             if (!_activePeers.contains(peer)) {
    179                 if (_log.shouldLog(Log.DEBUG))
    180                     _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
    181                 if (_activePeers.isEmpty())
    182                     _lastCycleTime = System.currentTimeMillis();
    183                 _activePeers.add(peer);
    184             } else {
    185                 if (_log.shouldLog(Log.DEBUG))
    186                     _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
     187        add(peer);
     188        _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     189    }
     190
     191    /**
     192     * Add the peer to the list of peers wanting to transmit something.
     193     * This wakes up the packet pusher if it is sleeping.
     194     *
     195     * Avoid synchronization where possible.
     196     * There are small chances of races.
     197     * There are larger chances of adding the PeerState "behind" where
     198     * the iterator is now... but these issues are the same as before concurrentification.
     199     *
     200     * @since 0.8.9
     201     */
     202    public void add(PeerState peer) {
     203        boolean wasEmpty = _activePeers.isEmpty();
     204        boolean added = _activePeers.add(peer);
     205        if (added) {
     206            if (_log.shouldLog(Log.DEBUG))
     207                _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64());
     208            if (wasEmpty)
     209                _lastCycleTime = System.currentTimeMillis();
     210        } else {
     211            if (_log.shouldLog(Log.DEBUG))
     212                _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64());
     213        }
     214        _context.statManager().addRateData("udp.outboundActivePeers", _activePeers.size(), 0);
     215
     216        // Avoid sync if possible
     217        // no, this doesn't always work.
     218        // Also note that the iterator in getNextVolley may have alreay passed us,
     219        // or not reflect the addition.
     220        if (_isWaiting || wasEmpty) {
     221            synchronized (_activePeers) {
     222                _activePeers.notifyAll();
    187223            }
    188             _activePeers.notifyAll();
    189         }
    190         _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    191         // should we finish messages here too?
    192         /*
    193         synchronized (_activeMessages) {
    194             _activeMessages.add(state);
    195             if (_activeMessages.size() == 1)
    196                 _lastCycleTime = System.currentTimeMillis();
    197             _activeMessages.notifyAll();
    198         }
    199          */
     224        }
    200225    }
    201226
     
    203228     * Remove any expired or complete messages
    204229     */
     230/****
    205231    private void finishMessages() {
    206         int rv = 0;
    207         List peers = null;
    208         synchronized (_activePeers) {
    209             peers = new ArrayList(_activePeers.size());
    210             for (int i = 0; i < _activePeers.size(); i++) {
    211                 PeerState state = _activePeers.get(i);
    212                 if (state.getOutboundMessageCount() <= 0) {
    213                     _activePeers.remove(i);
    214                     i--;
    215                 } else {
    216                     peers.add(state);
    217                 }
    218             }
    219             _activePeers.notifyAll();
    220         }
    221         for (int i = 0; i < peers.size(); i++) {
    222             PeerState state = (PeerState)peers.get(i);
    223             int remaining = state.finishMessages();
    224             if (remaining <= 0) {
    225                 if (_log.shouldLog(Log.DEBUG))
    226                     _log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
    227             }
    228             rv += remaining;
    229         }
    230     }
    231 
     232        for (Iterator<PeerState> iter = _activePeers.iterator(); iter.hasNext(); ) {
     233             PeerState state = iter.next();
     234             if (state.getOutboundMessageCount() <= 0) {
     235                 iter.remove();
     236             } else {
     237                 int remaining = state.finishMessages();
     238                 if (remaining <= 0) {
     239                     if (_log.shouldLog(Log.DEBUG))
     240                         _log.debug("No more pending messages for " + state.getRemotePeer().toBase64());
     241                     iter.remove();
     242                 }
     243             }
     244         }
     245     }
     246****/
     247 
    232248    /**
    233249     * Fetch all the packets for a message volley, blocking until there is a
     
    236252     * already ACKed fragments.
    237253     *
     254     * NOT thread-safe. Called by the PacketPusher thread only.
     255     *
     256     * @return null only on shutdown
    238257     */
    239258    public UDPPacket[] getNextVolley() {
    240259        PeerState peer = null;
    241260        OutboundMessageState state = null;
     261        // Keep track of how many we've looked at, since we don't start the iterator at the beginning.
     262        int peersProcessed = 0;
    242263        while (_alive && (state == null) ) {
    243             long now = _context.clock().now();
    244264            int nextSendDelay = -1;
    245             finishMessages();
    246             try {
    247                 synchronized (_activePeers) {
    248                     for (int i = 0; i < _activePeers.size(); i++) {
    249                         int cur = (i + _nextPeer) % _activePeers.size();
    250                         if (cur == 0) {
    251                             // FIXME or delete, these stats aren't much help since they include the sleep time
    252                             long ts = System.currentTimeMillis();
    253                             long cycleTime = ts - _lastCycleTime;
    254                             _lastCycleTime = ts;
    255                             _context.statManager().addRateData("udp.sendCycleTime", cycleTime, _activePeers.size());
    256                             // make longer than the default sleep time below
    257                             if (cycleTime > 1100)
    258                                 _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
     265            // no, not every time - O(n**2) - do just before waiting below
     266            //finishMessages();
     267
     268                    // do we need a new long-lived iterator?
     269                    if (_iterator == null ||
     270                        ((!_activePeers.isEmpty()) && (!_iterator.hasNext()))) {
     271                        _iterator = _activePeers.iterator();
     272                    }
     273
     274                    // Go through all the peers that we are actively sending messages to.
     275                    // Call finishMessages() for each one, and remove them from the iterator
     276                    // if there is nothing left to send.
     277                    // Otherwise, return the volley to be sent.
     278                    // Otherwise, wait()
     279                    while (_iterator.hasNext()) {
     280                        peer = _iterator.next();
     281                        int remaining = peer.finishMessages();
     282                        if (remaining <= 0) {
     283                            // race with add()
     284                            _iterator.remove();
     285                            if (_log.shouldLog(Log.DEBUG))
     286                                _log.debug("No more pending messages for " + peer.getRemotePeer().toBase64());
     287                            continue;
    259288                        }
    260                         peer = _activePeers.get(i);
     289                        peersProcessed++;
    261290                        state = peer.allocateSend();
    262291                        if (state != null) {
    263292                            // we have something to send and we will be returning it
    264                             _nextPeer = i + 1;
     293                            break;
     294                        } else if (peersProcessed >= _activePeers.size()) {
     295                            // we've gone all the way around, time to sleep
    265296                            break;
    266297                        } else {
     
    271302                                nextSendDelay = delay;
    272303                            peer = null;
    273                             state = null;
    274304                        }
    275305                    }
    276                     if (_log.shouldLog(Log.DEBUG))
     306
     307                    if (peer != null && _log.shouldLog(Log.DEBUG))
    277308                        _log.debug("Done looping, next peer we are sending for: " +
    278                                    (peer != null ? peer.getRemotePeer().toBase64() : "none"));
    279                     if (state == null) {
     309                                   peer.getRemotePeer().toBase64());
     310
     311                    // if we've gone all the way through the loop, wait
     312                    if (state == null && peersProcessed >= _activePeers.size()) {
     313                        peersProcessed = 0;
     314                        // why? we do this in the loop one at a time
     315                        //finishMessages();
    280316                        if (_log.shouldLog(Log.DEBUG))
    281317                            _log.debug("wait for " + nextSendDelay);
    282318                        // wait.. or somethin'
    283                         // wait a min of 10 and a max of 3000 ms no matter what peer.getNextDelay() says
    284                         if (nextSendDelay > 0)
    285                             _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), 3000));
    286                         else
    287                             _activePeers.wait(1000);
    288                     } else {
    289                         if (_log.shouldLog(Log.DEBUG))
    290                             _log.debug("dont wait: alive=" + _alive + " state = " + state);
    291                     }
    292                 }
    293             } catch (InterruptedException ie) {
    294                 // noop
    295                 if (_log.shouldLog(Log.DEBUG))
    296                     _log.debug("Woken up while waiting");
    297             }
    298         }
     319                        // wait a min of 10 and a max of MAX_WAIT ms no matter what peer.getNextDelay() says
     320                        _isWaiting = true;
     321                        synchronized (_activePeers) {
     322                            try {
     323                                // use max of 1 second so finishMessages() and/or PeerState.finishMessages()
     324                                // gets called regularly
     325                                if (nextSendDelay > 0)
     326                                    _activePeers.wait(Math.min(Math.max(nextSendDelay, 10), MAX_WAIT));
     327                                else
     328                                    _activePeers.wait(MAX_WAIT);
     329                            } catch (InterruptedException ie) {
     330                                // noop
     331                                if (_log.shouldLog(Log.DEBUG))
     332                                     _log.debug("Woken up while waiting");
     333                            }
     334                        }
     335                        _isWaiting = false;
     336                    //} else {
     337                    //    if (_log.shouldLog(Log.DEBUG))
     338                    //        _log.debug("dont wait: alive=" + _alive + " state = " + state);
     339                    }
     340
     341        } // while alive && state == null
    299342
    300343        if (_log.shouldLog(Log.DEBUG))
     
    302345
    303346        UDPPacket packets[] = preparePackets(state, peer);
     347
     348      /****
    304349        if ( (state != null) && (state.getMessage() != null) ) {
    305350            int valid = 0;
     
    307352                if (packets[i] != null)
    308353                    valid++;
    309             /*
    310354            state.getMessage().timestamp("sending a volley of " + valid
    311355                                         + " lastReceived: "
     
    313357                                         + " lastSentFully: "
    314358                                         + (_context.clock().now() - peer.getLastSendFullyTime()));
    315             */
    316         }
     359        }
     360       ****/
     361
    317362        return packets;
    318363    }
    319364
     365    /**
     366     *  @return null if state or peer is null
     367     */
    320368    private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) {
    321369        if ( (state != null) && (peer != null) ) {
     
    398446    }
    399447
    400     /**
    401      * We received an ACK of the given messageId from the given peer, so if it
    402      * is still unacked, mark it as complete.
    403      *
    404      * @return fragments acked
    405      */
    406     public int acked(long messageId, Hash ackedBy) {
    407         PeerState peer = _transport.getPeerState(ackedBy);
    408         if (peer != null) {
    409             if (_log.shouldLog(Log.DEBUG))
    410                 _log.debug("acked [" + messageId + "] by " + ackedBy.toBase64());
    411             return peer.acked(messageId);
    412         } else {
    413             if (_log.shouldLog(Log.DEBUG))
    414                 _log.debug("acked [" + messageId + "] by an unknown remote peer?  " + ackedBy.toBase64());
    415             return 0;
    416         }
    417     }
    418 
    419     public void acked(ACKBitfield bitfield, Hash ackedBy) {
    420         PeerState peer = _transport.getPeerState(ackedBy);
    421         if (peer != null) {
    422             if (_log.shouldLog(Log.DEBUG))
    423                 _log.debug("partial acked [" + bitfield + "] by " + ackedBy.toBase64());
    424             peer.acked(bitfield);
    425         } else {
    426             if (_log.shouldLog(Log.DEBUG))
    427                 _log.debug("partial acked [" + bitfield + "] by an unknown remote peer?  " + ackedBy.toBase64());
    428         }
    429     }
    430 
    431448    public interface ActiveThrottle {
    432449        public void choke(Hash peer);
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    rddec4f8 r49bba10  
    1313
    1414/**
    15  * Maintain the outbound fragmentation for resending
     15 * Maintain the outbound fragmentation for resending, for a single message.
    1616 *
    1717 */
  • router/java/src/net/i2p/router/transport/udp/PacketPusher.java

    rddec4f8 r49bba10  
    1212class PacketPusher implements Runnable {
    1313    // private RouterContext _context;
    14     private Log _log;
    15     private OutboundMessageFragments _fragments;
    16     private UDPSender _sender;
    17     private boolean _alive;
     14    private final Log _log;
     15    private final OutboundMessageFragments _fragments;
     16    private final UDPSender _sender;
     17    private volatile boolean _alive;
    1818   
    1919    public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) {
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    rddec4f8 r49bba10  
    2525 */
    2626class PeerState {
    27     private RouterContext _context;
    28     private Log _log;
     27    private final RouterContext _context;
     28    private final Log _log;
    2929    /**
    3030     * The peer are we talking to.  This should be set as soon as this
     
    193193    private OutboundMessageState _retransmitter;
    194194   
    195     private UDPTransport _transport;
     195    private final UDPTransport _transport;
    196196   
    197197    /** have we migrated away from this peer to another newer one? */
     
    269269        _inboundMessages = new HashMap(8);
    270270        _outboundMessages = new ArrayList(32);
    271         _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", UDPTransport.RATES);
    272         _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", UDPTransport.RATES);
    273         _context.statManager().createRateStat("udp.sendACKPartial", "Number of partial ACKs sent (duration == number of full ACKs in that ack packet)", "udp", UDPTransport.RATES);
    274         _context.statManager().createRateStat("udp.sendBps", "How fast we are transmitting when a packet is acked", "udp", UDPTransport.RATES);
    275         _context.statManager().createRateStat("udp.receiveBps", "How fast we are receiving when a packet is fully received (at most one per second)", "udp", UDPTransport.RATES);
    276         _context.statManager().createRateStat("udp.mtuIncrease", "How many retransmissions have there been to the peer when the MTU was increased (period is total packets transmitted)", "udp", UDPTransport.RATES);
    277         _context.statManager().createRateStat("udp.mtuDecrease", "How many retransmissions have there been to the peer when the MTU was decreased (period is total packets transmitted)", "udp", UDPTransport.RATES);
    278         _context.statManager().createRateStat("udp.rejectConcurrentActive", "How many messages are currently being sent to the peer when we reject it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
    279         _context.statManager().createRateStat("udp.allowConcurrentActive", "How many messages are currently being sent to the peer when we accept it (period is how many concurrent packets we allow)", "udp", UDPTransport.RATES);
    280         _context.statManager().createRateStat("udp.rejectConcurrentSequence", "How many consecutive concurrency rejections have we had when we stop rejecting (period is how many concurrent packets we are on)", "udp", UDPTransport.RATES);
    281         _context.statManager().createRateStat("udp.queueDropSize", "How many messages were queued up when it was considered full, causing a tail drop?", "udp", UDPTransport.RATES);
    282         _context.statManager().createRateStat("udp.queueAllowTotalLifetime", "When a peer is retransmitting and we probabalistically allow a new message, what is the sum of the pending message lifetimes? (period is the new message's lifetime)?", "udp", UDPTransport.RATES);
     271        // all createRateStat() moved to EstablishmentManager
    283272    }
    284273   
     
    10621051            _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
    10631052        List<OutboundMessageState> msgs = _outboundMessages;
    1064         if (msgs == null) return 0;
    10651053        int rv = 0;
    10661054        boolean fail = false;
     
    10711059                fail = true;
    10721060                rv--;
     1061
     1062         /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
     1063
    10731064            } else if (_retransmitter != null) {
    10741065                long lifetime = _retransmitter.getLifetime();
    10751066                long totalLifetime = lifetime;
    10761067                for (int i = 1; i < msgs.size(); i++) { // skip the first, as thats the retransmitter
    1077                     OutboundMessageState cur = (OutboundMessageState)msgs.get(i);
     1068                    OutboundMessageState cur = msgs.get(i);
    10781069                    totalLifetime += cur.getLifetime();
    10791070                }
     
    11041095                    msgs.add(state);
    11051096                }
     1097
     1098             *******/
     1099
    11061100            } else {
    11071101                msgs.add(state);
     
    11121106        return rv;
    11131107    }
     1108
    11141109    /** drop all outbound messages */
    11151110    public void dropOutbound() {
     
    11191114        //_outboundMessages = null;
    11201115        _retransmitter = null;
    1121         if (msgs != null) {
     1116
    11221117            int sz = 0;
    11231118            List<OutboundMessageState> tempList = null;
     
    11311126            for (int i = 0; i < sz; i++)
    11321127                _transport.failed(tempList.get(i), false);
    1133         }
     1128
    11341129        // so the ACKSender will drop this peer from its queue
    11351130        _wantACKSendSince = -1;
    11361131    }
    11371132   
     1133    /**
     1134     * @return number of active outbound messages remaining (unsynchronized)
     1135     */
    11381136    public int getOutboundMessageCount() {
    1139         List<OutboundMessageState> msgs = _outboundMessages;
    11401137        if (_dead) return 0;
    1141         if (msgs != null) {
    1142             synchronized (msgs) {
    1143                 return msgs.size();
    1144             }
    1145         } else {
    1146             return 0;
    1147         }
     1138        return _outboundMessages.size();
    11481139    }
    11491140   
     
    11531144     */
    11541145    public int finishMessages() {
    1155         int rv = 0;
    11561146        List<OutboundMessageState> msgs = _outboundMessages;
     1147        // short circuit, unsynchronized
     1148        if (msgs.isEmpty())
     1149            return 0;
     1150
    11571151        if (_dead) {
    11581152            dropOutbound();
    11591153            return 0;
    11601154        }
     1155
     1156        int rv = 0;
    11611157        List<OutboundMessageState> succeeded = null;
    11621158        List<OutboundMessageState> failed = null;
    11631159        synchronized (msgs) {
    1164             int size = msgs.size();
    1165             for (int i = 0; i < size; i++) {
    1166                 OutboundMessageState state = msgs.get(i);
     1160            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1161                OutboundMessageState state = iter.next();
    11671162                if (state.isComplete()) {
    1168                     msgs.remove(i);
    1169                     i--;
    1170                     size--;
     1163                    iter.remove();
    11711164                    if (_retransmitter == state)
    11721165                        _retransmitter = null;
     
    11741167                    succeeded.add(state);
    11751168                } else if (state.isExpired()) {
    1176                     msgs.remove(i);
    1177                     i--;
    1178                     size--;
     1169                    iter.remove();
    11791170                    if (_retransmitter == state)
    11801171                        _retransmitter = null;
     
    11831174                    failed.add(state);
    11841175                } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) {
    1185                     msgs.remove(i);
    1186                     i--;
    1187                     size--;
     1176                    iter.remove();
    11881177                    if (state == _retransmitter)
    11891178                        _retransmitter = null;
     
    12331222        if (_dead) return null;
    12341223        synchronized (msgs) {
    1235             int size = msgs.size();
    1236             for (int i = 0; i < size; i++) {
    1237                 OutboundMessageState state = msgs.get(i);
     1224            for (OutboundMessageState state : msgs) {
    12381225                if (locked_shouldSend(state)) {
    12391226                    if (_log.shouldLog(Log.DEBUG))
     
    12771264                    return rv;
    12781265            }
    1279             int size = msgs.size();
    1280             for (int i = 0; i < size; i++) {
    1281                 OutboundMessageState state = msgs.get(i);
     1266            for (OutboundMessageState state : msgs) {
    12821267                int delay = (int)(state.getNextSendTime() - now);
    12831268                if (delay <= 0)
     
    13941379    }
    13951380   
    1396     public int acked(long messageId) {
     1381    /**
     1382     *  A full ACK was received.
     1383     *
     1384     *  @return true if the message was acked for the first time
     1385     */
     1386    public boolean acked(long messageId) {
     1387        if (_dead) return false;
    13971388        OutboundMessageState state = null;
    13981389        List<OutboundMessageState> msgs = _outboundMessages;
    1399         if (_dead) return 0;
    14001390        synchronized (msgs) {
    1401             int sz = msgs.size();
    1402             for (int i = 0; i < sz; i++) {
    1403                 state = msgs.get(i);
     1391            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1392                state = iter.next();
    14041393                if (state.getMessageId() == messageId) {
    1405                     msgs.remove(i);
     1394                    iter.remove();
    14061395                    break;
    14071396                } else {
     
    14391428           
    14401429            state.releaseResources();
    1441             return numFragments;
    14421430        } else {
    14431431            // dupack, likely
    14441432            if (_log.shouldLog(Log.DEBUG))
    14451433                _log.debug("Received an ACK for a message not pending: " + messageId);
    1446             return 0;
    1447         }
    1448     }
    1449    
    1450     public void acked(ACKBitfield bitfield) {
     1434        }
     1435        return state != null;
     1436    }
     1437   
     1438    /**
     1439     *  A partial ACK was received. This is much less common than full ACKs.
     1440     *
     1441     *  @return true if the message was completely acked for the first time
     1442     */
     1443    public boolean acked(ACKBitfield bitfield) {
    14511444        if (_dead)
    1452             return;
     1445            return false;
    14531446       
    14541447        if (bitfield.receivedComplete()) {
    1455             acked(bitfield.getMessageId());
    1456             return;
     1448            return acked(bitfield.getMessageId());
    14571449        }
    14581450   
     
    14621454        boolean isComplete = false;
    14631455        synchronized (msgs) {
    1464             for (int i = 0; i < msgs.size(); i++) {
    1465                 state = msgs.get(i);
     1456            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1457                state = iter.next();
    14661458                if (state.getMessageId() == bitfield.getMessageId()) {
    14671459                    boolean complete = state.acked(bitfield);
    14681460                    if (complete) {
    14691461                        isComplete = true;
    1470                         msgs.remove(i);
     1462                        iter.remove();
    14711463                        if (state == _retransmitter)
    14721464                            _retransmitter = null;
     
    15151507                //    state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
    15161508            }
    1517             return;
     1509            return isComplete;
    15181510        } else {
    15191511            // dupack
    15201512            if (_log.shouldLog(Log.DEBUG))
    15211513                _log.debug("Received an ACK for a message not pending: " + bitfield);
    1522             return;
     1514            return false;
    15231515        }
    15241516    }
  • router/java/src/net/i2p/router/transport/udp/UDPSender.java

    rddec4f8 r49bba10  
    1717 */
    1818class UDPSender {
    19     private RouterContext _context;
    20     private Log _log;
     19    private final RouterContext _context;
     20    private final Log _log;
    2121    private DatagramSocket _socket;
    2222    private String _name;
    2323    private final BlockingQueue<UDPPacket> _outboundQueue;
    2424    private boolean _keepRunning;
    25     private Runner _runner;
     25    private final Runner _runner;
    2626    private static final int TYPE_POISON = 99999;
    2727   
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    rddec4f8 r49bba10  
    346346        if (_handler != null)
    347347            _handler.shutdown();
    348         _fragments.shutdown();
    349348        if (_pusher != null)
    350349            _pusher.shutdown();
     350        _fragments.shutdown();
    351351        if (_establisher != null)
    352352            _establisher.shutdown();
Note: See TracChangeset for help on using the changeset viewer.