Changeset d305eb6


Ignore:
Timestamp:
Aug 27, 2012 8:39:00 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
fa504ae8
Parents:
f8bc6f8
Message:
  • SSU:
    • Limit UDPSender queue size
    • Increase UDPSender max packet lifetime
    • Clear UDPSender queue before sending destroys to all
    • Increase PeerState? queue size so large streaming windows don't get dropped right away, especially at slow start
    • Various improvements on iterating over pending outbound messages in PeerState?
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    rf8bc6f8 rd305eb6  
     12012-08-27 zzz
     2 * i2psnark: Notify threads awaiting DHT replies at shutdown
     3 * Reseed: Remove forum.i2p2.de
     4 * Streaming: Limit amount of slow-start exponential growth
     5 * SSU:
     6   - Limit UDPSender queue size
     7   - Increase UDPSender max packet lifetime
     8   - Clear UDPSender queue before sending destroys to all
     9   - Increase PeerState queue size so large streaming windows
     10     don't get dropped right away, especially at slow start
     11   - Various improvements on iterating over pending outbound
     12     messages in PeerState
     13 * Wrapper: Update armv7 to 3.5.15
     14
    1152012-08-27 kytv
    216 * Update Java Service Wrapper to v3.5.15.
  • router/java/src/net/i2p/router/RouterVersion.java

    rf8bc6f8 rd305eb6  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 19;
     21    public final static long BUILD = 20;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    rf8bc6f8 rd305eb6  
    4343    /** would two caches, one for small and one for large messages, be better? */
    4444    private static final ByteCache _cache = ByteCache.getInstance(MAX_ENTRIES, MAX_MSG_SIZE);
     45
     46    private static final long EXPIRATION = 10*1000;
    4547   
    4648    public OutboundMessageState(I2PAppContext context) {
     
    6567    /**
    6668     *  Called from UDPTransport
     69     *  TODO make two constructors, remove this, and make more things final
    6770     *  @return success
    6871     */
     
    8386    /**
    8487     *  Called from OutboundMessageFragments
     88     *  TODO make two constructors, remove this, and make more things final
    8589     *  @return success
    8690     */
     
    122126            _startedOn = _context.clock().now();
    123127            _nextSendTime = _startedOn;
    124             _expiration = _startedOn + 10*1000;
     128            _expiration = _startedOn + EXPIRATION;
    125129            //_expiration = msg.getExpiration();
    126130
  • router/java/src/net/i2p/router/transport/udp/PacketPusher.java

    rf8bc6f8 rd305eb6  
    3939                    for (int i = 0; i < packets.length; i++) {
    4040                        if (packets[i] != null) // null for ACKed fragments
    41                             //_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
     41                            // BLOCKING if queue is full
    4242                            _sender.add(packets[i]);
    4343                    }
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    rf8bc6f8 rd305eb6  
    217217    private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
    218218    private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
     219
     220    /**
     221     *  Was 32 before 0.9.2, but since the streaming lib goes up to 128,
     222     *  we would just drop our own msgs right away during slow start.
     223     *  May need to adjust based on memory.
     224     */
     225    private static final int MAX_SEND_MSGS_PENDING = 128;
    219226
    220227    /*
     
    11821189    RemoteHostId getRemoteHostId() { return _remoteHostId; }
    11831190   
     1191    /**
     1192     *  TODO should this use a queue, separate from the list of msgs pending an ack?
     1193     *  TODO bring back tail drop?
     1194     *  TODO priority queue? (we don't implement priorities in SSU now)
     1195     *  TODO backlog / pushback / block instead of dropping? Can't really block here.
     1196     *  TODO SSU does not support isBacklogged() now
     1197     *  @return total pending messages
     1198     */
    11841199    public int add(OutboundMessageState state) {
    11851200        if (_dead) {
     
    11941209        synchronized (_outboundMessages) {
    11951210            rv = _outboundMessages.size() + 1;
    1196             if (rv > 32) {
    1197                 // 32 queued messages?  to *one* peer?  nuh uh.
     1211            if (rv > MAX_SEND_MSGS_PENDING) {
     1212                // too many queued messages to one peer?  nuh uh.
    11981213                fail = true;
    11991214                rv--;
     
    12411256            }
    12421257        }
    1243         if (fail)
     1258        if (fail) {
     1259            if (_log.shouldLog(Log.WARN))
     1260                _log.warn("Dropping msg, OB queue full for " + toString());
    12441261            _transport.failed(state, false);
     1262        }
    12451263        return rv;
    12461264    }
     
    12791297    /**
    12801298     * Expire / complete any outbound messages
     1299     * High usage -
     1300     * OutboundMessageFragments.getNextVolley() calls this 1st.
     1301     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
     1302     *
    12811303     * @return number of active outbound messages remaining
    12821304     */
     
    13511373    /**
    13521374     * Pick a message we want to send and allocate it out of our window
     1375     * High usage -
     1376     * OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned > 0.
     1377     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
     1378     *
    13531379     * @return allocated message to send, or null if no messages or no resources
    1354      *
    13551380     */
    13561381    public OutboundMessageState allocateSend() {
     
    13581383        synchronized (_outboundMessages) {
    13591384            for (OutboundMessageState state : _outboundMessages) {
    1360                 if (locked_shouldSend(state)) {
     1385                // We have 3 return values, because if allocateSendingBytes() returns false,
     1386                // then we can stop iterating.
     1387                ShouldSend should = locked_shouldSend(state);
     1388                if (should == ShouldSend.YES) {
    13611389                    if (_log.shouldLog(Log.DEBUG))
    13621390                        _log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
     
    13701398                     */
    13711399                    return state;
     1400                } else if (should == ShouldSend.NO_BW) {
     1401                    // no more bandwidth available
     1402                    // we don't bother looking for a smaller msg that would fit.
     1403                    // By not looking further, we keep strict sending order, and that allows
     1404                    // some efficiency in acked() below.
     1405                    break;
    13721406                } /* else {
    13731407                    OutNetMessage msg = state.getMessage();
     
    13831417   
    13841418    /**
     1419     * High usage -
     1420     * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
     1421     * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
     1422     *
    13851423     * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
    13861424     *         If ready now, will return 0 or a negative value.
     
    13971435            for (OutboundMessageState state : _outboundMessages) {
    13981436                int delay = (int)(state.getNextSendTime() - now);
     1437                // short circuit once we hit something ready to go
     1438                if (delay <= 0)
     1439                    return delay;
    13991440                if (delay < rv)
    14001441                    rv = delay;
     
    14361477    }
    14371478   
    1438     private boolean locked_shouldSend(OutboundMessageState state) {
     1479    private enum ShouldSend { YES, NO, NO_BW };
     1480
     1481    /**
     1482     *  Have 3 return values, because if allocateSendingBytes() returns false,
     1483     *  then allocateSend() can stop iterating
     1484     */
     1485    private ShouldSend locked_shouldSend(OutboundMessageState state) {
    14391486        long now = _context.clock().now();
    14401487        if (state.getNextSendTime() <= now) {
     
    14661513                    //if (state.getMessage() != null)
    14671514                    //    state.getMessage().timestamp("choked, with another message retransmitting");
    1468                     return false;
     1515                    return ShouldSend.NO;
    14691516                } else {
    14701517                    //if (state.getMessage() != null)
     
    14921539                //if (peer.getSendWindowBytesRemaining() > 0)
    14931540                //    _throttle.unchoke(peer.getRemotePeer());
    1494                 return true;
     1541                return ShouldSend.YES;
    14951542            } else {
    14961543                _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
     
    15111558                //                                 + getSendWindowBytes() + " available="
    15121559                //                                 + getSendWindowBytesRemaining());
    1513                 return false;
     1560                return ShouldSend.NO_BW;
    15141561            }
    15151562        } // nextTime <= now
    15161563
    1517         return false;
     1564        return ShouldSend.NO;
    15181565    }
    15191566   
    15201567    /**
    15211568     *  A full ACK was received.
     1569     *  TODO if messages awaiting ack were a HashSet this would be faster.
    15221570     *
    15231571     *  @return true if the message was acked for the first time
     
    15311579                if (state.getMessageId() == messageId) {
    15321580                    iter.remove();
     1581                    break;
     1582                } else if (state.getPushCount() <= 0) {
     1583                    // _outboundMessages is ordered, so once we get to a msg that
     1584                    // hasn't been transmitted yet, we can stop
     1585                    state = null;
    15331586                    break;
    15341587                } else {
     
    16001653                            _retransmitter = null;
    16011654                    }
     1655                    break;
     1656                } else if (state.getPushCount() <= 0) {
     1657                    // _outboundMessages is ordered, so once we get to a msg that
     1658                    // hasn't been transmitted yet, we can stop
     1659                    state = null;
    16021660                    break;
    16031661                } else {
  • router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java

    rf8bc6f8 rd305eb6  
    137137     * Add the packet to the outobund queue to be sent ASAP (as allowed by
    138138     * the bandwidth limiter)
    139      *
    140      * @return ZERO (used to be number of packets in the queue)
     139     * BLOCKING if queue is full.
    141140     */
    142     public int send(UDPPacket packet) {
    143         if (_sender == null)
    144             return 0;
    145         return _sender.add(packet);
     141    public void send(UDPPacket packet) {
     142        _sender.add(packet);
    146143    }
    147144   
     
    155152        return _receiver.receiveNext();
    156153    }
     154   
     155    /**
     156     *  Clear outbound queue, probably in preparation for sending destroy() to everybody.
     157     *  @since 0.9.2
     158     */
     159    public void clearOutbound() {
     160        _sender.clear();
     161    }
    157162}
  • router/java/src/net/i2p/router/transport/udp/UDPSender.java

    rf8bc6f8 rd305eb6  
    2626    private static final int TYPE_POISON = 99999;
    2727   
    28     //private static final int MAX_QUEUED = 4;
     28    private static final int MIN_QUEUE_SIZE = 64;
     29    private static final int MAX_QUEUE_SIZE = 384;
    2930   
    3031    public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
    3132        _context = ctx;
    3233        _log = ctx.logManager().getLog(UDPSender.class);
    33         _outboundQueue = new LinkedBlockingQueue();
     34        long maxMemory = Runtime.getRuntime().maxMemory();
     35        if (maxMemory == Long.MAX_VALUE)
     36            maxMemory = 96*1024*1024l;
     37        int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
     38        _outboundQueue = new LinkedBlockingQueue(qsize);
    3439        _socket = socket;
    3540        _runner = new Runner();
     
    8287    }
    8388   
     89    /**
     90     *  Clear outbound queue, probably in preparation for sending destroy() to everybody.
     91     *  @since 0.9.2
     92     */
     93    public void clear() {
     94        _outboundQueue.clear();
     95    }
     96   
    8497/*********
    8598    public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
     
    94107     *
    95108     * @param blockTime how long to block IGNORED
    96      * @return ZERO (used to be number of packets in the queue)
    97109     * @deprecated use add(packet)
    98110     */
    99     public int add(UDPPacket packet, int blockTime) {
     111    public void add(UDPPacket packet, int blockTime) {
    100112     /********
    101113        //long expiration = _context.clock().now() + blockTime;
     
    149161        return remaining;
    150162     ********/
    151         return add(packet);
    152     }
    153    
    154     private static final int MAX_HEAD_LIFETIME = 1000;
     163        add(packet);
     164    }
     165   
     166    private static final int MAX_HEAD_LIFETIME = 3*1000;
    155167   
    156168    /**
    157      * Put it on the queue
    158      * @return ZERO (used to be number of packets in the queue)
     169     * Put it on the queue.
     170     * BLOCKING if queue is full (backs up PacketPusher thread)
    159171     */
    160     public int add(UDPPacket packet) {
    161         if (packet == null || !_keepRunning) return 0;
    162         int size = 0;
     172    public void add(UDPPacket packet) {
     173        if (packet == null || !_keepRunning) return;
    163174        int psz = packet.getPacket().getLength();
    164175        if (psz > PeerState.LARGE_MTU) {
    165176            _log.error("Dropping large UDP packet " + psz + " bytes: " + packet);
    166             return 0;
    167         }
    168         _outboundQueue.offer(packet);
     177            return;
     178        }
     179        try {
     180            _outboundQueue.put(packet);
     181        } catch (InterruptedException ie) {
     182            return;
     183        }
    169184        //size = _outboundQueue.size();
    170185        //_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
    171186        if (_log.shouldLog(Log.DEBUG)) {
    172             size = _outboundQueue.size();
    173             _log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + packet.getLifetime());
    174         }
    175         return size;
     187            _log.debug("Added the packet onto the queue with a lifetime of " + packet.getLifetime());
     188        }
    176189    }
    177190   
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    rf8bc6f8 rd305eb6  
    11201120     *  This sends it directly out, bypassing OutboundMessageFragments
    11211121     *  and the PacketPusher. The only queueing is for the bandwidth limiter.
    1122      *
    1123      *  @return ZERO (used to be number of packets in the queue)
    1124      */
    1125     int send(UDPPacket packet) {
     1122     *  BLOCKING if OB queue is full.
     1123     */
     1124    void send(UDPPacket packet) {
    11261125        if (_log.shouldLog(Log.DEBUG))
    11271126            _log.debug("Sending packet " + packet);
    1128         return _endpoint.send(packet);
     1127        _endpoint.send(packet);
    11291128    }
    11301129   
    11311130    /**
    11321131     *  Send a session destroy message, bypassing OMF and PacketPusher.
     1132     *  BLOCKING if OB queue is full.
    11331133     *
    11341134     *  @since 0.8.9
     
    11461146    /**
    11471147     *  Send a session destroy message to everybody
     1148     *  BLOCKING if OB queue is full.
    11481149     *
    11491150     *  @since 0.8.9
    11501151     */
    11511152    private void destroyAll() {
     1153        _endpoint.clearOutbound();
    11521154        int howMany = _peersByIdent.size();
    11531155        if (_log.shouldLog(Log.WARN))
Note: See TracChangeset for help on using the changeset viewer.