Changeset 94faf74


Ignore:
Timestamp:
Nov 24, 2009 8:12:27 PM (11 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
234c084
Parents:
e78dd1f
Message:
  • Streaming:
    • When an "immediate" ack is requested, do it within 250 ms (was 2000)
    • Request immediate acks when < 1/3 of window remains, or when < 3 packets remain in window, and every 8 packets (was when < 2 packets in window remain)
    • Change requested delay to RTT/2 (was RTO/2)
    • Log cleanup and javadoc
Location:
apps/streaming/java/src/net/i2p/client/streaming
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/Connection.java

    re78dd1f r94faf74  
    190190                        try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;}
    191191                    } else {
    192                         if (_log.shouldLog(Log.DEBUG))
    193                             _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
    194                                        + "), waiting indefinitely");
     192                        //if (_log.shouldLog(Log.DEBUG))
     193                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
     194                        //               + "), waiting indefinitely");
    195195                        try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000
    196196                    }
     
    298298        if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
    299299            ackOnly = true;
    300             if (_log.shouldLog(Log.DEBUG))
    301                 _log.debug("No resend for " + packet);
     300            //if (_log.shouldLog(Log.DEBUG))
     301            //    _log.debug("No resend for " + packet);
    302302        } else {
    303             int remaining = 0;
     303            int windowSize;
     304            int remaining;
    304305            synchronized (_outboundPackets) {
    305306                _outboundPackets.put(new Long(packet.getSequenceNum()), packet);
    306                 remaining = _options.getWindowSize() - _outboundPackets.size() ;
     307                windowSize = _options.getWindowSize();
     308                remaining = windowSize - _outboundPackets.size() ;
    307309                _outboundPackets.notifyAll();
    308310            }
    309             if (remaining < 0)
    310                 remaining = 0;
    311             if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
     311            // the other end has no idea what our window size is, so
     312            // help him out by requesting acks below the 1/3 point,
     313            // if remaining < 3, and every 8 minimum.
     314            if (packet.isFlagSet(Packet.FLAG_CLOSE) ||
     315                (remaining < (windowSize + 2) / 3) ||
     316                (remaining < 3) ||
     317                (packet.getSequenceNum() % 8 == 0)) {
    312318                packet.setOptionalDelay(0);
    313319                packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     
    315321                    _log.debug("Requesting no ack delay for packet " + packet);
    316322            } else {
    317                 int delay = _options.getRTO() / 2;
     323                // This is somewhat of a waste of time, unless the RTT < 4000,
     324                // since the other end limits it to getSendAckDelay()
     325                // which is always 2000, but it's good for diagnostics to see what the other end thinks
     326                // the RTT is.
     327                int delay = _options.getRTT() / 2;
    318328                packet.setOptionalDelay(delay);
    319329                if (delay > 0)
     
    322332                    _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
    323333            }
     334            // WHY always set?
    324335            packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
    325336           
     
    328339                timeout = MAX_RESEND_DELAY;
    329340            if (_log.shouldLog(Log.DEBUG))
    330                 _log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by"));
     341                _log.debug("Resend in " + timeout + " for " + packet);
    331342
    332343            // schedules itself
     
    371382*********/
    372383   
     384    /**
     385     *  Process the acks and nacks received in a packet
     386     *  @return List of packets acked or null
     387     */
    373388    List ackPackets(long ackThrough, long nacks[]) {
    374389        if (ackThrough < _highestAckedThrough) {
     
    686701     */
    687702    public long getNextSendTime() { return _nextSendTime; }
     703
     704    /**
     705     *  If the next send time is currently >= 0 (i.e. not "never"),
     706     *  this may make the next time sooner but will not make it later.
     707     *  If the next send time is currently < 0 (i.e. "never"),
     708     *  this will set it to the time specified, but not later than
     709     *  options.getSendAckDelay() from now (2000 ms)
     710     */
    688711    public void setNextSendTime(long when) {
    689712        if (_nextSendTime >= 0) {
     
    700723        }
    701724       
    702         if (_log.shouldLog(Log.DEBUG) && false) {
    703             if (_nextSendTime <= 0)
    704                 _log.debug("set next send time to an unknown time", new Exception(toString()));
    705             else
    706                 _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
    707         }
     725        //if (_log.shouldLog(Log.DEBUG) && false) {
     726        //    if (_nextSendTime <= 0)
     727        //        _log.debug("set next send time to an unknown time", new Exception(toString()));
     728        //    else
     729        //        _log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
     730        //}
    708731    }
    709732   
     
    743766    public long getCongestionWindowEnd() { return _congestionWindowEnd; }
    744767    public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; }
     768    /** @return the highest outbound packet we have recieved an ack for */
    745769    public long getHighestAckedThrough() { return _highestAckedThrough; }
     770    /** @deprecated unused */
    746771    public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; }
    747772   
     
    836861        long howLong = _options.getInactivityTimeout();
    837862        howLong += _randomWait; // randomize it a bit, so both sides don't do it at once
    838         if (_log.shouldLog(Log.DEBUG))
    839             _log.debug("Resetting the inactivity timer to " + howLong, new Exception(toString()));
     863        //if (_log.shouldLog(Log.DEBUG))
     864        //    _log.debug("Resetting the inactivity timer to " + howLong);
    840865        // this will get rescheduled, and rescheduled, and rescheduled...
    841866        _activityTimer.reschedule(howLong, false); // use the later of current and previous timeout
     
    10881113                    // resends in the air and we dont want to make a bad situation
    10891114                    // worse.  wait another second
     1115                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
     1116                    // also seen with seq# > 0. Is the _activeResends count reliable?
    10901117                    if (_log.shouldLog(Log.INFO))
    10911118                        _log.info("Delaying resend of " + _packet + " as there are "
     
    11051132                if (choke > 0)
    11061133                    _packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     1134                // this seems unnecessary to send the MSS again:
    11071135                _packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
    11081136                // bugfix release 0.7.8, we weren't dividing by 1000
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java

    re78dd1f r94faf74  
    167167       
    168168        con.getInputStream().updateAcks(packet);
     169        // note that the optional delay is usually rewritten in Connection.sendPacket()
    169170        int choke = con.getOptions().getChoke();
    170171        packet.setOptionalDelay(choke);
     
    198199            packet.setFlag(Packet.FLAG_CLOSE);
    199200            con.setCloseSentOn(_context.clock().now());
    200             if (_log.shouldLog(Log.DEBUG))
    201                 _log.debug("Closed is set for a new packet on " + con + ": " + packet);
    202         } else {
    203             if (_log.shouldLog(Log.DEBUG))
    204                 _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);
    205         }
     201        }
     202        if (_log.shouldLog(Log.DEBUG))
     203            _log.debug("New outbound packet on " + _connection + ": " + packet);
    206204        return packet;
    207205    }
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java

    re78dd1f r94faf74  
    4747    public static final String PROP_INITIAL_ACK_DELAY = "i2p.streaming.initialAckDelay";
    4848    public static final String PROP_INITIAL_WINDOW_SIZE = "i2p.streaming.initialWindowSize";
     49    /** unused */
    4950    public static final String PROP_INITIAL_RECEIVE_WINDOW = "i2p.streaming.initialReceiveWindow";
    5051    public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout";
     
    5960    static final int DEFAULT_MAX_SENDS = 8;
    6061    public static final int DEFAULT_INITIAL_RTT = 8*1000;   
     62    public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000;   
    6163    static final int MIN_WINDOW_SIZE = 1;
    6264    private static final boolean DEFAULT_ANSWER_PINGS = true;
     
    218220        setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1));
    219221        setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
    220         setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
     222        setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
    221223        setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
    222224        setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
     
    250252            setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
    251253        if (opts.containsKey(PROP_INITIAL_ACK_DELAY))
    252             setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 2000));
     254            setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY));
    253255        if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
    254256            setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
     
    296298     */
    297299    public boolean getRequireFullySigned() { return _fullySigned; }
     300    /** unused, see above */
    298301    public void setRequireFullySigned(boolean sign) { _fullySigned = sign; }
    299302   
     
    326329   
    327330    /** after how many consecutive messages should we ack?
    328      * This doesn't appear to be used.
     331     * @deprecated This doesn't appear to be used.
    329332     * @return receive window size.
    330333     */
     
    406409     */
    407410    public int getSendAckDelay() { return _sendAckDelay; }
     411    /**
     412     *  Unused except here, so expect the default initial delay of 2000 ms unless set by the user
     413     *  to remain constant.
     414     */
    408415    public void setSendAckDelay(int delayMs) { _sendAckDelay = delayMs; }
    409416   
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java

    re78dd1f r94faf74  
    132132        }
    133133       
    134         if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
    135             if (_log.shouldLog(Log.DEBUG))
    136                 _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
    137                            + " packet: " + packet + " con: " + con);
    138         }
     134        //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
     135        //    if (_log.shouldLog(Log.DEBUG))
     136        //        _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
     137        //                   + " packet: " + packet + " con: " + con);
     138        //}
     139
     140        if (_log.shouldLog(Log.DEBUG))
     141            _log.debug((isNew ? "New" : "Dup or ack-only") + " inbound packet on " + con + ": " + packet);
    139142
    140143        // close *after* receiving the data, as well as after verifying the signatures / etc
     
    152155                if (_log.shouldLog(Log.DEBUG))
    153156                    _log.debug("Scheduling immediate ack for " + packet);
    154                 con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
     157                //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
     158                // honor request "almost" immediately
     159                con.setNextSendTime(_context.clock().now() + 250);
    155160            } else {
    156161                int delay = con.getOptions().getSendAckDelay();
     
    223228    }
    224229   
     230    /**
     231     * Process the acks in a received packet, and adjust our window and RTT
     232     * @return are we congested?
     233     */
    225234    private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) {
    226235        if (ackThrough < 0) return false;
     
    288297    }
    289298   
    290    
     299    /** @return are we congested? */
    291300    private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) {
    292301        boolean congested = false;
  • apps/streaming/java/src/net/i2p/client/streaming/Packet.java

    re78dd1f r94faf74  
    4343 * <li>{@link #FLAG_PROFILE_INTERACTIVE}: no option data</li>
    4444 * <li>{@link #FLAG_ECHO}: no option data</li>
    45  * <li>{@link #FLAG_NO_ACK}: no option data</li>
     45 * <li>{@link #FLAG_NO_ACK}: no option data - this appears to be unused, we always ack, even for the first packet</li>
    4646 * </ol>
    4747 *
Note: See TracChangeset for help on using the changeset viewer.