Changeset 2c866e2


Ignore:
Timestamp:
Sep 8, 2012 12:40:27 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
4cf1047
Parents:
ca91ad3
Message:
  • NTCP: Move NTCPConnection outbound queue to CoDelPriority?
  • SSU:
    • Separate PeerState? outbound message list into a queue for unsent messages and a list for sent messages awaiting ack
    • Implement PeerState? outbound queue as CoDelPriority?
    • Implement backlogged indication like in NTCP
Location:
router/java/src/net/i2p/router
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/OutNetMessage.java

    rca91ad3 r2c866e2  
    2121import net.i2p.data.RouterInfo;
    2222import net.i2p.data.i2np.I2NPMessage;
     23import net.i2p.router.util.CDPQEntry;
    2324import net.i2p.util.Log;
    2425
     
    2829 *
    2930 */
    30 public class OutNetMessage {
     31public class OutNetMessage implements CDPQEntry {
    3132    private final Log _log;
    3233    private final RouterContext _context;
     
    5051    //private Exception _createdBy;
    5152    private final long _created;
     53    private long _enqueueTime;
     54    private long _seqNum;
    5255    /** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
    5356    private HashMap<String, Long> _timestamps;
     
    283286    /** time the transport tries to send the message (including any queueing) */
    284287    public long getSendTime() { return _context.clock().now() - _sendBegin; }
     288
     289    /**
     290     *  For CDQ
     291     *  @since 0.9.3
     292     */
     293    public void setEnqueueTime(long now) {
     294        _enqueueTime = now;
     295    }
     296
     297    /**
     298     *  For CDQ
     299     *  @since 0.9.3
     300     */
     301    public long getEnqueueTime() {
     302        return _enqueueTime;
     303    }
     304
     305    /**
     306     *  For CDQ
     307     *  @since 0.9.3
     308     */
     309    public void drop() {
     310    }
     311
     312    /**
     313     *  For CDPQ
     314     *  @since 0.9.3
     315     */
     316    public void setSeqNum(long num) {
     317        _seqNum = num;
     318    }
     319
     320    /**
     321     *  For CDPQ
     322     *  @since 0.9.3
     323     */
     324    public long getSeqNum() {
     325        return _seqNum;
     326    }
    285327
    286328    /**
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    rca91ad3 r2c866e2  
    55import java.nio.channels.SelectionKey;
    66import java.nio.channels.SocketChannel;
     7import java.util.ArrayList;
    78import java.util.Iterator;
     9import java.util.List;
    810import java.util.Queue;
    911import java.util.Set;
     
    2527import net.i2p.router.RouterContext;
    2628import net.i2p.router.transport.FIFOBandwidthLimiter;
     29import net.i2p.router.util.CoDelPriorityBlockingQueue;
    2730import net.i2p.util.ConcurrentHashSet;
    2831import net.i2p.util.HexDump;
     
    8487     * pending unprepared OutNetMessage instances
    8588     */
    86     private final Queue<OutNetMessage> _outbound;
     89    private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
    8790    /**
    8891     *  current prepared OutNetMessage, or null - synchronize on _outbound to modify
     
    137140    /** 2 bytes for length and 4 for CRC */
    138141    public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
    139    
     142
    140143    private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
    141 
     144   
    142145    /**
    143146     * Create an inbound connected (though not established) NTCP connection
     
    153156        _writeBufs = new ConcurrentLinkedQueue();
    154157        _bwRequests = new ConcurrentHashSet(2);
    155         // TODO possible switch to CLQ but beware non-constant size() - see below
    156         _outbound = new LinkedBlockingQueue();
     158        _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
    157159        _isInbound = true;
    158160        _decryptBlockBuf = new byte[BLOCK_SIZE];
     
    178180        _writeBufs = new ConcurrentLinkedQueue();
    179181        _bwRequests = new ConcurrentHashSet(8);
    180         // TODO possible switch to CLQ but beware non-constant size() - see below
    181         _outbound = new LinkedBlockingQueue();
     182        _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
    182183        _isInbound = false;
    183184        _decryptBlockBuf = new byte[BLOCK_SIZE];
     
    298299        }
    299300
    300         OutNetMessage msg;
    301         while ((msg = _outbound.poll()) != null) {
     301        List<OutNetMessage> pending = new ArrayList();
     302        _outbound.drainAllTo(pending);
     303        for (OutNetMessage msg : pending) {
    302304            Object buf = msg.releasePreparationBuffer();
    303305            if (buf != null)
     
    306308        }
    307309
    308         msg = _currentOutbound;
     310        OutNetMessage msg = _currentOutbound;
    309311        if (msg != null) {
    310312            Object buf = msg.releasePreparationBuffer();
     
    319321     */
    320322    public void send(OutNetMessage msg) {
     323     /****
     324       always enqueue, let the queue do the dropping
     325
    321326        if (tooBacklogged()) {
    322327            boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
     
    338343        }
    339344        _consecutiveBacklog = 0;
    340         int enqueued = 0;
     345     ****/
    341346        //if (FAST_LARGE)
    342347            bufferedPrepare(msg);
    343         boolean noOutbound = false;
    344348        _outbound.offer(msg);
    345         enqueued = _outbound.size();
     349        //int enqueued = _outbound.size();
    346350        // although stat description says ahead of this one, not including this one...
    347         _context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
    348         noOutbound = (_currentOutbound == null);
    349         if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
     351        //_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
     352        boolean noOutbound = (_currentOutbound == null);
     353        //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
    350354        if (_established && noOutbound)
    351355            _transport.getWriter().wantsWrite(this, "enqueued");
    352356    }
    353357
     358/****
    354359    private long queueTime() {   
    355360        OutNetMessage msg = _currentOutbound;
     
    361366        return msg.getSendTime(); // does not include any of the pre-send(...) preparation
    362367    }
     368****/
    363369
    364370    public boolean tooBacklogged() {
    365         long queueTime = queueTime();
    366         if (queueTime <= 0) return false;
    367         boolean currentOutboundSet = _currentOutbound != null;
     371        //long queueTime = queueTime();
     372        //if (queueTime <= 0) return false;
    368373       
    369374        // perhaps we could take into account the size of the queued messages too, our
     
    372377        if (getUptime() < 10*1000) // allow some slack just after establishment
    373378            return false;
    374         if (queueTime > 5*1000) { // bloody arbitrary.  well, its half the average message lifetime...
     379        //if (queueTime > 5*1000) { // bloody arbitrary.  well, its half the average message lifetime...
     380        if (_outbound.isBacklogged()) { // bloody arbitrary.  well, its half the average message lifetime...
    375381            int size = _outbound.size();
    376382            if (_log.shouldLog(Log.WARN)) {
    377383                int writeBufs = _writeBufs.size();
     384                boolean currentOutboundSet = _currentOutbound != null;
    378385                try {
    379                     _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
     386                    _log.warn("Too backlogged: size is " + size
    380387                          + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
    381388                          + ", currentOut set? " + currentOutboundSet
     
    383390                } catch (Exception e) {}  // java.nio.channels.CancelledKeyException
    384391            }
    385             _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
     392            //_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
    386393            return true;
    387394        //} else if (size > 32) { // another arbitrary limit.
     
    652659                return;
    653660            }
     661/****
    654662                //throw new RuntimeException("We should not be preparing a write while we still have one pending");
    655663            if (queueTime() > 3*1000) {  // don't stall low-priority messages
     664****/
    656665                msg = _outbound.poll();
    657666                if (msg == null)
    658667                    return;
     668/****
    659669            } else {
    660670                // FIXME
     
    682692                    _log.warn("Already removed??? " + msg.getMessage().getType());
    683693            }
     694****/
    684695            _currentOutbound = msg;
    685696        }
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java

    rca91ad3 r2c866e2  
    166166                return;
    167167            }
    168             int active = peer.add(state);
     168            peer.add(state);
    169169            add(peer);
    170             _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     170            //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    171171        } else {
    172172            if (_log.shouldLog(Log.WARN))
     
    183183        if (peer == null)
    184184            throw new RuntimeException("wtf, null peer for " + state);
    185         int active = peer.add(state);
     185        peer.add(state);
    186186        add(peer);
    187         _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     187        //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    188188    }
    189189
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    rca91ad3 r2c866e2  
    88import net.i2p.data.i2np.I2NPMessage;
    99import net.i2p.router.OutNetMessage;
     10import net.i2p.router.util.CDPQEntry;
    1011import net.i2p.util.ByteCache;
    1112import net.i2p.util.Log;
     
    1516 *
    1617 */
    17 class OutboundMessageState {
     18class OutboundMessageState implements CDPQEntry {
    1819    private final I2PAppContext _context;
    1920    private final Log _log;
     
    3738    private boolean _released;
    3839    private Exception _releasedBy;
     40    // we can't use the ones in _message since it is null for injections
     41    private long _enqueueTime;
     42    private long _seqNum;
    3943   
    4044    public static final int MAX_MSG_SIZE = 32 * 1024;
     
    105109    /**
    106110     *  Called from OutboundMessageFragments
     111     *  @param m null if msg is "injected"
    107112     *  @return success
    108113     */
     
    129134            //_expiration = msg.getExpiration();
    130135
    131             if (_log.shouldLog(Log.DEBUG))
    132                 _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
     136            //if (_log.shouldLog(Log.DEBUG))
     137            //    _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
    133138            return true;
    134139        } catch (IllegalStateException ise) {
     
    369374    }
    370375   
     376    /**
     377     *  For CDQ
     378     *  @since 0.9.3
     379     */
     380    public void setEnqueueTime(long now) {
     381        _enqueueTime = now;
     382    }
     383
     384    /**
     385     *  For CDQ
     386     *  @since 0.9.3
     387     */
     388    public long getEnqueueTime() {
     389        return _enqueueTime;
     390    }
     391
     392    /**
     393     *  For CDQ
     394     *  @since 0.9.3
     395     */
     396    public void drop() {
     397        _peer.getTransport().failed(this, false);
     398        releaseResources();
     399    }
     400
     401    /**
     402     *  For CDPQ
     403     *  @since 0.9.3
     404     */
     405    public void setSeqNum(long num) {
     406        _seqNum = num;
     407    }
     408
     409    /**
     410     *  For CDPQ
     411     *  @since 0.9.3
     412     */
     413    public long getSeqNum() {
     414        return _seqNum;
     415    }
     416
     417    /**
     418     *  For CDPQ
     419     *  @return OutNetMessage priority or 1000 for injected
     420     *  @since 0.9.3
     421     */
     422    public int getPriority() {
     423        return _message != null ? _message.getPriority() : 1000;
     424    }
     425
    371426    @Override
    372427    public String toString() {
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    rca91ad3 r2c866e2  
    1717import net.i2p.router.OutNetMessage;
    1818import net.i2p.router.RouterContext;
     19import net.i2p.router.util.CoDelPriorityBlockingQueue;
    1920import net.i2p.util.Log;
    2021import net.i2p.util.ConcurrentHashSet;
     
    189190    /** list of InboundMessageState for active message */
    190191    private final Map<Long, InboundMessageState> _inboundMessages;
    191     /** list of OutboundMessageState */
     192
     193    /**
     194     *  Mostly messages that have been transmitted and are awaiting acknowledgement,
     195     *  although there could be some that have not been sent yet.
     196     */
    192197    private final List<OutboundMessageState> _outboundMessages;
     198
     199    /**
     200     *  Priority queue of messages that have not yet been sent.
     201     *  They are taken from here and put in _outboundMessages.
     202     */
     203    private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
     204
    193205    /** which outbound message is currently being retransmitted */
    194206    private OutboundMessageState _retransmitter;
     
    299311        _inboundMessages = new HashMap(8);
    300312        _outboundMessages = new ArrayList(32);
     313        _outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
    301314        // all createRateStat() moved to EstablishmentManager
    302315        _remoteIP = remoteIP;
     
    727740            // no such element exception seen here
    728741            List<Long> rv = new ArrayList(_currentACKs);
    729             if (_log.shouldLog(Log.DEBUG))
    730                 _log.debug("Returning " + _currentACKs.size() + " current acks");
     742            //if (_log.shouldLog(Log.DEBUG))
     743            //    _log.debug("Returning " + _currentACKs.size() + " current acks");
    731744            return rv;
    732745    }
     
    749762            List<Long> randomResends = new ArrayList(_currentACKsResend);
    750763            Collections.shuffle(randomResends, _context.random());
    751             if (_log.shouldLog(Log.DEBUG))
    752                 _log.debug("Returning " + randomResends.size() + " resend acks");
     764            //if (_log.shouldLog(Log.DEBUG))
     765            //    _log.debug("Returning " + randomResends.size() + " resend acks");
    753766            return randomResends;
    754767    }
     
    11951208     *  TODO backlog / pushback / block instead of dropping? Can't really block here.
    11961209     *  TODO SSU does not support isBacklogged() now
    1197      *  @return total pending messages
    1198      */
    1199     public int add(OutboundMessageState state) {
     1210     */
     1211    public void add(OutboundMessageState state) {
    12001212        if (_dead) {
    12011213            _transport.failed(state, false);
    1202             return 0;
     1214            return;
    12031215        }
    12041216        state.setPeer(this);
     
    12061218            _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
    12071219        int rv = 0;
    1208         boolean fail = false;
     1220        // will never fail for CDPQ
     1221        boolean fail = !_outboundQueue.offer(state);
     1222/****
    12091223        synchronized (_outboundMessages) {
    12101224            rv = _outboundMessages.size() + 1;
     
    12131227                fail = true;
    12141228                rv--;
     1229****/
    12151230
    12161231         /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
     
    12511266
    12521267             *******/
    1253 
     1268/****
    12541269            } else {
    12551270                _outboundMessages.add(state);
    12561271            }
    12571272        }
     1273****/
    12581274        if (fail) {
    12591275            if (_log.shouldLog(Log.WARN))
     
    12611277            _transport.failed(state, false);
    12621278        }
    1263         return rv;
    12641279    }
    12651280
     
    12691284        _dead = true;
    12701285        //_outboundMessages = null;
    1271         _retransmitter = null;
    1272 
    1273             int sz = 0;
    1274             List<OutboundMessageState> tempList = null;
     1286
     1287            List<OutboundMessageState> tempList;
    12751288            synchronized (_outboundMessages) {
    1276                 sz = _outboundMessages.size();
    1277                 if (sz > 0) {
     1289                    _retransmitter = null;
    12781290                    tempList = new ArrayList(_outboundMessages);
    12791291                    _outboundMessages.clear();
    1280                 }
    1281             }
    1282             for (int i = 0; i < sz; i++)
    1283                 _transport.failed(tempList.get(i), false);
     1292            }
     1293            _outboundQueue.drainAllTo(tempList);
     1294            for (OutboundMessageState oms : tempList) {
     1295                _transport.failed(oms, false);
     1296            }
    12841297
    12851298        // so the ACKSender will drop this peer from its queue
     
    12921305    public int getOutboundMessageCount() {
    12931306        if (_dead) return 0;
    1294         return _outboundMessages.size();
     1307        return _outboundMessages.size() + _outboundQueue.size();
    12951308    }
    12961309   
     
    13061319        // short circuit, unsynchronized
    13071320        if (_outboundMessages.isEmpty())
    1308             return 0;
     1321            return _outboundQueue.size();
    13091322
    13101323        if (_dead) {
     
    13681381        }
    13691382       
    1370         return rv;
     1383        return rv + _outboundQueue.size();
    13711384    }
    13721385   
     
    13881401                if (should == ShouldSend.YES) {
    13891402                    if (_log.shouldLog(Log.DEBUG))
    1390                         _log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
     1403                        _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
    13911404                    /*
    13921405                    while (iter.hasNext()) {
     
    14031416                    // By not looking further, we keep strict sending order, and that allows
    14041417                    // some efficiency in acked() below.
    1405                     break;
     1418                    if (_log.shouldLog(Log.DEBUG))
     1419                        _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
     1420                                   " / " + _outboundQueue.size() + " remaining");
     1421                    return null;
    14061422                } /* else {
    14071423                    OutNetMessage msg = state.getMessage();
     
    14101426                } */
    14111427            }
     1428            // Peek at head of _outboundQueue and see if we can send it.
     1429            // If so, pull it off, put it in _outbundMessages, test
     1430            // again for bandwidth if necessary, and return it.
     1431            OutboundMessageState state = _outboundQueue.peek();
     1432            if (state != null && ShouldSend.YES == locked_shouldSend(state)) {
     1433                // we could get a different state, or null, when we poll,
     1434                // due to AQM drops, so we test again if necessary
     1435                OutboundMessageState dequeuedState = _outboundQueue.poll();
     1436                if (dequeuedState != null) {
     1437                    _outboundMessages.add(dequeuedState);
     1438                    if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(dequeuedState)) {
     1439                        if (_log.shouldLog(Log.DEBUG))
     1440                            _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
     1441                        return dequeuedState;
     1442                    }
     1443                }
     1444            }
    14121445        }
    14131446        if (_log.shouldLog(Log.DEBUG))
    1414             _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + " remaining");
     1447            _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
     1448                       " / " + _outboundQueue.size() + " remaining");
    14151449        return null;
    14161450    }
     
    14421476            }
    14431477        }
     1478        // failsafe... is this OK?
     1479        if (rv > 100 && !_outboundQueue.isEmpty())
     1480            rv = 100;
    14441481        return rv;
     1482    }
     1483
     1484    /**
     1485     *  @since 0.9.3
     1486     */
     1487    public boolean isBacklogged() {
     1488        return _dead || _outboundQueue.isBacklogged();
    14451489    }
    14461490
     
    15221566            int size = state.getUnackedSize();
    15231567            if (allocateSendingBytes(size, state.getPushCount())) {
    1524                 if (_log.shouldLog(Log.INFO))
    1525                     _log.info("Allocation of " + size + " allowed with "
     1568                if (_log.shouldLog(Log.DEBUG))
     1569                    _log.debug("Allocation of " + size + " allowed with "
    15261570                              + getSendWindowBytesRemaining()
    15271571                              + "/" + getSendWindowBytes()
     
    15671611    /**
    15681612     *  A full ACK was received.
    1569      *  TODO if messages awaiting ack were a HashSet this would be faster.
     1613     *  TODO if messages awaiting ack were a HashMap<Long, OutboundMessageState> this would be faster.
    15701614     *
    15711615     *  @return true if the message was acked for the first time
     
    16211665        } else {
    16221666            // dupack, likely
    1623             if (_log.shouldLog(Log.DEBUG))
    1624                 _log.debug("Received an ACK for a message not pending: " + messageId);
     1667            //if (_log.shouldLog(Log.DEBUG))
     1668            //    _log.debug("Received an ACK for a message not pending: " + messageId);
    16251669        }
    16261670        return state != null;
     
    17681812    }
    17691813
     1814    /**
     1815     *  Convenience for OutboundMessageState so it can fail itself
     1816     *  @since 0.9.3
     1817     */
     1818    public UDPTransport getTransport() {
     1819        return _transport;
     1820    }
     1821
    17701822    // why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
    17711823
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    rca91ad3 r2c866e2  
    16791679    }
    16801680
     1681    /**
     1682     *  @since 0.9.3
     1683     */
     1684    @Override
     1685    public boolean isBacklogged(Hash dest) {
     1686        PeerState peer =  _peersByIdent.get(dest);
     1687        return peer != null && peer.isBacklogged();
     1688    }
     1689
    16811690    public boolean allowConnection() {
     1691
    16821692            return _peersByIdent.size() < getMaxConnections();
    16831693    }
     
    21882198            buf.append(THINSP).append(peer.getConcurrentSendWindow());
    21892199            buf.append(THINSP).append(peer.getConsecutiveSendRejections());
     2200            if (peer.isBacklogged())
     2201                buf.append(' ').append(_("backlogged"));
    21902202            buf.append("</td>");
    21912203
Note: See TracChangeset for help on using the changeset viewer.