Changeset 2d8f0c2


Ignore:
Timestamp:
Feb 9, 2017 5:24:03 PM (4 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
50450ec, e01c443
Parents:
f0241d4
Message:

Streaming: Fix optional delay and choking (tickets #1046, 1939)

  • Don't always send optional delay
  • Don't overwrite choking delay with non-choking delay
  • Don't send optional delay of 0 every 8 packets
  • Don't set options both in CDR.buildPacket() and Conn.sendPacket()
  • Set or clear optional delay in packet when retransmitting
  • Move choking state variables from ConnectionOptions? to Connection
  • Move updateAcks() call from PacketLocal? to PacketQueue?
  • Fully implement choking and un-choking
  • Reduce periods for some stats
  • Comment out some debug logging
  • Cleanups
  • Fix javadoc HTML broken in previous checkin
Files:
14 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java

    rf0241d4 r2d8f0c2  
    7070    private long _lastCongestionTime;
    7171    private volatile long _lastCongestionHighestUnacked;
     72    /** has the other side choked us? */
     73    private volatile boolean _isChoked;
     74    /** are we choking the other side? */
     75    private volatile boolean _isChoking;
     76    private final AtomicInteger _unchokesToSend = new AtomicInteger();
    7277    private final AtomicBoolean _ackSinceCongestion;
    7378    /** Notify this on connection (or connection failure) */
     
    103108
    104109    public static final int MAX_WINDOW_SIZE = 128;
     110    private static final int UNCHOKES_TO_SEND = 8;
    105111   
    106112/****
     
    188194            synchronized (_outboundPackets) {
    189195                if (!started)
    190                     _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size(), timeoutMs);
     196                    _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size());
    191197                if (start + 5*60*1000 < _context.clock().now()) // ok, 5 minutes blocking?  I dont think so
    192198                    return false;
     
    206212                int unacked = _outboundPackets.size();
    207213                int wsz = _options.getWindowSize();
    208                 if (unacked >= wsz ||
     214                if (_isChoked || unacked >= wsz ||
    209215                    _activeResends.get() >= (wsz + 1) / 2 ||
    210216                    _lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) {
     
    212218                        if (timeLeft <= 0) {
    213219                            if (_log.shouldLog(Log.INFO))
    214                                 _log.info("Outbound window is full " + unacked
     220                                _log.info("Outbound window is full (choked? " + _isChoked + ' ' + unacked
    215221                                          + " unacked with " + _activeResends + " active resends"
    216222                                          + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): "
     
    219225                        }
    220226                        if (_log.shouldLog(Log.DEBUG))
    221                             _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/"
     227                            _log.debug("Outbound window is full (choked? " + _isChoked + ' ' + unacked + '/' + wsz + '/'
    222228                                       + _activeResends + "), waiting " + timeLeft);
    223229                        try {
     
    241247                    }
    242248                } else {
    243                     _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start);
     249                    _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size());
    244250                    return true;
    245251                }
     
    258264   
    259265    void ackImmediately() {
    260         PacketLocal packet = null;
     266        PacketLocal packet;
    261267/*** why would we do this?
    262268     was it to force a congestion indication at the other end?
     
    344350    }
    345351   
     352    /**
     353     *  This sends all 'normal' packets (acks and data) for the first time.
     354     *  Retransmits are done in ResendPacketEvent below.
     355     *  Resets, pings, and pongs are done elsewhere in this class,
     356     *  or in ConnectionManager or ConnectionHandler.
     357     */
    346358    void sendPacket(PacketLocal packet) {
    347359        if (packet == null) return;
     
    354366       
    355367        if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
    356             //if (_log.shouldLog(Log.DEBUG))
    357             //    _log.debug("No resend for " + packet);
     368            // ACK-only
     369            if (_isChoking) {
     370                packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
     371                packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     372            } else if (_unchokesToSend.decrementAndGet() > 0) {
     373                // don't worry about wrapping around
     374                packet.setOptionalDelay(0);
     375                packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     376            }
    358377        } else {
    359378            int windowSize;
     
    365384                _outboundPackets.notifyAll();
    366385            }
    367             // the other end has no idea what our window size is, so
    368             // help him out by requesting acks below the 1/3 point,
    369             // if remaining < 3, and every 8 minimum.
    370             if (packet.isFlagSet(Packet.FLAG_CLOSE) ||
    371                 (remaining < (windowSize + 2) / 3) ||
     386
     387            if (_isChoking) {
     388                packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
     389                packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     390            } else if (packet.isFlagSet(Packet.FLAG_CLOSE) ||
     391                _unchokesToSend.decrementAndGet() > 0 ||
     392                // the other end has no idea what our window size is, so
     393                // help him out by requesting acks below the 1/3 point,
     394                // if remaining < 3, and every 8 minimum.
    372395                (remaining < 3) ||
    373                 (packet.getSequenceNum() % 8 == 0)) {
     396                (remaining < (windowSize + 2) / 3) /* ||
     397                (packet.getSequenceNum() % 8 == 0) */ ) {
    374398                packet.setOptionalDelay(0);
    375399                packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     
    381405                // which is always 2000, but it's good for diagnostics to see what the other end thinks
    382406                // the RTT is.
     407/**
    383408                int delay = _options.getRTT() / 2;
    384409                packet.setOptionalDelay(delay);
     
    387412                if (_log.shouldLog(Log.DEBUG))
    388413                    _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
    389             }
    390             // WHY always set?
    391             //packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     414**/
     415            }
    392416           
    393417            long timeout = _options.getRTO();
     
    9851009    }
    9861010   
     1011    /**
     1012     *  Set or clear if we are choking the other side.
     1013     *  If on is true or the value has changed, this will call ackImmediately().
     1014     *  @param on true for choking
     1015     *  @since 0.9.29
     1016     */
     1017    public void setChoking(boolean on) {
     1018        if (on != _isChoking) {
     1019            _isChoking = on;
     1020           if (!on)
     1021               _unchokesToSend.set(UNCHOKES_TO_SEND);
     1022           ackImmediately();
     1023        } else if (on) {
     1024           ackImmediately();
     1025        }
     1026    }
     1027   
     1028    /**
     1029     *  Set or clear if we are being choked by the other side.
     1030     *  @param on true for choked
     1031     *  @since 0.9.29
     1032     */
     1033    public void setChoked(boolean on) {
     1034        _isChoked = on;
     1035        if (on) {
     1036            congestionOccurred();
     1037            // https://en.wikipedia.org/wiki/Transmission_Control_Protocol
     1038            // When a receiver advertises a window size of 0, the sender stops sending data and starts the persist timer.
     1039            // The persist timer is used to protect TCP from a deadlock situation that could arise
     1040            // if a subsequent window size update from the receiver is lost,
     1041            // and the sender cannot send more data until receiving a new window size update from the receiver.
     1042            // When the persist timer expires, the TCP sender attempts recovery by sending a small packet
     1043            // so that the receiver responds by sending another acknowledgement containing the new window size.
     1044            // ...
     1045            // We don't do any of that, but we set the window size to 1, and let the retransmission
     1046            // of packets do the "attempted recovery".
     1047            getOptions().setWindowSize(1);
     1048        }
     1049    }
     1050   
     1051    /**
     1052     *  Is the other side choking us?
     1053     *  @return if choked
     1054     *  @since 0.9.29
     1055     */
     1056    public boolean isChoked() {
     1057        return _isChoked;
     1058    }
     1059
    9871060    /** how many packets have we sent and the other side has ACKed?
    9881061     * @return Count of how many packets ACKed.
     
    13821455                // updateAcks done in enqueue()
    13831456                //_inputStream.updateAcks(_packet);
    1384                 int choke = getOptions().getChoke();
    1385                 _packet.setOptionalDelay(choke);
    1386                 if (choke > 0)
     1457                if (_isChoking) {
     1458                    _packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE);
    13871459                    _packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     1460                } else if (_unchokesToSend.decrementAndGet() > 0) {
     1461                    // don't worry about wrapping around
     1462                    _packet.setOptionalDelay(0);
     1463                    _packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     1464                } else {
     1465                    // clear flag
     1466                    _packet.setFlag(Packet.FLAG_DELAY_REQUESTED, false);
     1467                }
     1468
    13881469                // this seems unnecessary to send the MSS again:
    13891470                //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
     
    13971478                int newWindowSize = getOptions().getWindowSize();
    13981479
    1399                 if (_ackSinceCongestion.get()) {
     1480                if (_isChoked) {
     1481                    congestionOccurred();
     1482                    getOptions().setWindowSize(1);
     1483                } else if (_ackSinceCongestion.get()) {
    14001484                    // only shrink the window once per window
    14011485                    if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java

    rf0241d4 r2d8f0c2  
    157157   
    158158    /**
     159     * Compose a packet.
     160     * Most flags are set here; however, some are set in Connection.sendPacket()
     161     * and Connection.ResendPacketEvent.retransmit().
     162     * Take care not to set the same options both here and in Connection.
     163     *
    159164     * @param buf data to be sent - may be null
    160165     * @param off offset into the buffer to start writing from
     
    165170     */
    166171    private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) {
    167         Connection con = _connection;
    168172        if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")");
    169         boolean ackOnly = isAckOnly(con, size);
    170         boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0);
    171        
    172         PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con);
     173        boolean ackOnly = isAckOnly(_connection, size);
     174        boolean isFirst = (_connection.getAckedPackets() <= 0) && (_connection.getUnackedPacketsSent() <= 0);
     175       
     176        PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);
    173177        //ByteArray data = packet.acquirePayload();
    174178        ByteArray data = new ByteArray(new byte[size]);
     
    181185            packet.setSequenceNum(0);
    182186        else
    183             packet.setSequenceNum(con.getNextOutboundPacketNum());
    184         packet.setSendStreamId(con.getSendStreamId());
    185         packet.setReceiveStreamId(con.getReceiveStreamId());
     187            packet.setSequenceNum(_connection.getNextOutboundPacketNum());
     188        packet.setSendStreamId(_connection.getSendStreamId());
     189        packet.setReceiveStreamId(_connection.getReceiveStreamId());
    186190       
    187191        // not needed here, handled in PacketQueue.enqueue()
    188192        //con.getInputStream().updateAcks(packet);
    189         // note that the optional delay is usually rewritten in Connection.sendPacket()
    190         int choke = con.getOptions().getChoke();
    191         packet.setOptionalDelay(choke);
    192         if (choke > 0)
    193             packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
     193
     194        // Do not set optional delay here, set in Connection.sendPacket()
     195
    194196        // bugfix release 0.7.8, we weren't dividing by 1000
    195         packet.setResendDelay(con.getOptions().getResendDelay() / 1000);
    196        
    197         if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
     197        packet.setResendDelay(_connection.getOptions().getResendDelay() / 1000);
     198       
     199        if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)
    198200            packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true);
    199201        else
    200202            packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false);
    201        
    202         packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, con.getOptions().getRequireFullySigned());
    203203       
    204204        //if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) {
     
    206206            packet.setFlag(Packet.FLAG_SYNCHRONIZE);
    207207            packet.setOptionalFrom();
    208             packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
    209         }
    210         packet.setLocalPort(con.getLocalPort());
    211         packet.setRemotePort(con.getPort());
    212         if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
     208            packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize());
     209        }
     210        packet.setLocalPort(_connection.getLocalPort());
     211        packet.setRemotePort(_connection.getPort());
     212        if (_connection.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
    213213            packet.setFlag(Packet.FLAG_NO_ACK);
    214214        }
     
    222222        // throughout network?
    223223        //
    224         if (con.getOutputStream().getClosed() &&
    225             ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
     224        if (_connection.getOutputStream().getClosed() &&
     225            ( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {
    226226            packet.setFlag(Packet.FLAG_CLOSE);
    227             con.notifyCloseSent();
     227            _connection.notifyCloseSent();
    228228        }
    229229        if (_log.shouldLog(Log.DEBUG))
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java

    rf0241d4 r2d8f0c2  
    106106        _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
    107107        // Stats for Connection
    108         _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
    109         _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
    110         _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
    111         _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 });
     108        _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*60*1000 });
     109        _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*60*1000 });
     110        _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*60*1000 });
     111        _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 10*60*1000 });
    112112        // Stats for PacketQueue
    113         _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
    114         _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
     113        _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 });
     114        _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 });
    115115    }
    116116   
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java

    rf0241d4 r2d8f0c2  
    3434    private int _sendAckDelay;
    3535    private int _maxMessageSize;
    36     private int _choke;
    3736    private int _maxResends;
    3837    private int _inactivityTimeout;
     
    328327            setResendDelay(opts.getResendDelay());
    329328            setMaxMessageSize(opts.getMaxMessageSize());
    330             setChoke(opts.getChoke());
    331329            setMaxResends(opts.getMaxResends());
    332330            setInactivityTimeout(opts.getInactivityTimeout());
     
    678676    public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); }
    679677   
    680     /**
    681      * how long we want to wait before any data is transferred on the
    682      * connection in either direction
    683      *
    684      * @return how long to wait before any data is transferred in either direction in ms
    685      */
    686     public int getChoke() { return _choke; }
    687     public void setChoke(int ms) { _choke = ms; }
    688 
    689678    /**
    690679     * What profile do we want to use for this connection?
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java

    rf0241d4 r2d8f0c2  
    9797        boolean choke = false;
    9898        if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) {
    99             if (packet.getOptionalDelay() > 60000) {
     99            if (packet.getOptionalDelay() >= Packet.MIN_DELAY_CHOKE) {
    100100                // requested choke
    101101                choke = true;
     102                if (_log.shouldWarn())
     103                    _log.warn("Got a choke on connection " + con + ": " + packet);
    102104                //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000);
    103105            }
     106            // Only call this if the flag is set
     107            con.setChoked(choke);
    104108        }
    105109       
     
    107111            if (_log.shouldWarn())
    108112                _log.warn("Inbound buffer exceeded on connection " + con +
    109                           ", dropping " + packet);
    110             con.getOptions().setChoke(61*1000);
     113                          ", choking and dropping " + packet);
     114            // this will call ackImmediately()
     115            con.setChoking(true);
     116            // TODO we could still process the acks for this packet before discarding
    111117            packet.releasePayload();
    112             con.ackImmediately();
    113118            return;
    114         }
    115         con.getOptions().setChoke(0);
     119        } // else we will call setChoking(false) below
    116120
    117121        _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize());
     
    133137        // But not ack-only packets!
    134138        boolean isNew;
    135         if (seqNum > 0 || isSYN)
    136             isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload());
    137         else
     139        if (seqNum > 0 || isSYN) {
     140            isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()) &&
     141                    !allowAck;
     142        } else {
    138143            isNew = false;
    139         if (!allowAck)
    140             isNew = false;
     144        }
     145
     146        if (isNew && packet.getPayloadSize() > 1500) {
     147            // don't clear choking unless it was new, and a big packet
     148            // this will call ackImmediately() if changed
     149            // TODO if this filled in a hole, we shouldn't unchoke
     150            // TODO a bunch of small packets should unchoke also
     151            con.setChoking(false);
     152        }
    141153       
    142154        //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
     
    159171        }
    160172
    161         boolean fastAck = false;
    162173        boolean ackOnly = false;
    163174       
     
    171182                //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
    172183                // honor request "almost" immediately
     184                // TODO the 250 below _may_ be a big limiter in how fast local "loopback" connections
     185                // can go, however if it goes too fast then we start choking which causes
     186                // frequent stalls anyway.
    173187                con.setNextSendTime(_context.clock().now() + 250);
    174188            } else {
     
    223237        }
    224238
     239        boolean fastAck;
    225240        if (isSYN && (packet.getSendStreamId() <= 0) ) {
    226241            // don't honor the ACK 0 in SYN packets received when the other side
    227242            // has obviously not seen our messages
     243            fastAck = false;
    228244        } else {
    229245            fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew, choke);
    230246        }
    231247        con.eventOccurred();
    232         if (fastAck) {
     248        if (fastAck && !choke) {
    233249            if (!isNew) {
    234250                // if we're congested (fastAck) but this is also a new packet,
     
    267283    /**
    268284     * Process the acks in a received packet, and adjust our window and RTT
     285     * @param isNew was it a new packet? false for ack-only
     286     * @param choke did we get a choke in the packet?
    269287     * @return are we congested?
    270288     */
     
    355373    }
    356374   
    357     /** @return are we congested? */
     375    /**
     376     * This either does nothing or increases the window, it never decreases it.
     377     * Decreasing is done in Connection.ResendPacketEvent.retransmit()
     378     *
     379     * @param isNew was it a new packet? false for ack-only
     380     * @param sequenceNum 0 for ack-only
     381     * @param choke did we get a choke in the packet?
     382     * @return are we congested?
     383     */
    358384    private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) {
    359         boolean congested = false;
    360         if ( (!isNew) && (sequenceNum > 0) ) {
     385        boolean congested;
     386        if (choke || (!isNew && sequenceNum > 0) || con.isChoked()) {
    361387            if (_log.shouldLog(Log.DEBUG))
    362388                _log.debug("Congestion occurred on the sending side. Not adjusting window "+con);
    363 
    364389            congested = true;
    365         }
    366        
     390        } else {
     391            congested = false;
     392        }
     393
    367394        long lowest = con.getHighestAckedThrough();
    368395        // RFC 2581
  • apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java

    rf0241d4 r2d8f0c2  
    1515 * Receive raw information from the I2PSession and turn it into
    1616 * Packets, if we can.
    17  *&lt;p&gt;
     17 *<p>
    1818 * I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream
    1919 */
  • apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java

    rf0241d4 r2d8f0c2  
    1717 * Stream that can be given messages out of order
    1818 * yet present them in order.
    19  *&lt;p&gt;
     19 *<p>
    2020 * I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream
    21  *&lt;p&gt;
     21 *<p>
    2222 * This buffers unlimited data via messageReceived() -
    2323 * limiting / blocking is done in ConnectionPacketHandler.receivePacket().
     
    103103    /**
    104104     *  Determine if this packet will fit in our buffering limits.
     105     *  Always returns true for zero payloadSize.
    105106     *
    106107     *  @return true if we have room. If false, do not call messageReceived()
  • apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java

    rf0241d4 r2d8f0c2  
    1717 * on flush or when the buffer is full.  It also blocks according
    1818 * to the data receiver's needs.
    19  *&lt;p&gt;
     19 *<p>
    2020 * MessageOutputStream -&gt; ConnectionDataReceiver -&gt; Connection -&gt; PacketQueue -&gt; I2PSession
    2121 */
  • apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java

    rf0241d4 r2d8f0c2  
    165165    public static final int DEFAULT_MAX_SIZE = 32*1024;
    166166    protected static final int MAX_DELAY_REQUEST = 65535;
     167    public static final int MIN_DELAY_CHOKE = 60001;
     168    public static final int SEND_DELAY_CHOKE = 61000;
    167169
    168170    /**
  • apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java

    rf0241d4 r2d8f0c2  
    1212 * receive a packet and dispatch it correctly to the connection specified,
    1313 * the server socket, or queue a reply RST packet.
    14  *&lt;p&gt;
     14 *<p>
    1515 * I2PSession -&gt; MessageHandler -&gt; PacketHandler -&gt; ConnectionPacketHandler -&gt; MessageInputStream
    1616 */
  • apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java

    rf0241d4 r2d8f0c2  
    110110                         FLAG_CLOSE |
    111111                         FLAG_ECHO);
    112     }
    113    
    114     /** last minute update of ack fields, just before write/sign  */
    115     public void prepare() {
    116         if (_connection != null)
    117             _connection.getInputStream().updateAcks(this);
    118         int numSends = _numSends.get();
    119         if (numSends > 0) {
    120             // so we can debug to differentiate resends
    121             setOptionalDelay(numSends * 1000);
    122             setFlag(FLAG_DELAY_REQUESTED);
    123         }
    124112    }
    125113   
  • apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java

    rf0241d4 r2d8f0c2  
    2424 * send them immediately with no blocking, since the
    2525 * mode=bestEffort doesnt block in the SDK.
    26  *&lt;p&gt;
     26 *<p>
    2727 * MessageOutputStream -&gt; ConnectionDataReceiver -&gt; Connection -&gt; PacketQueue -&gt; I2PSession
    2828 */
     
    6565   
    6666    /**
    67      * Add a new packet to be sent out ASAP
     67     * Add a new packet to be sent out ASAP.
     68     * This updates the acks.
    6869     *
    6970     * keys and tags disabled since dropped in I2PSession
     
    7374        if (_dead)
    7475            return false;
    75         // this updates the ack/nack field
    76         packet.prepare();
    7776       
    7877        //SessionKey keyUsed = packet.getKeyUsed();
     
    8887            return false;
    8988        }
     89
     90        Connection con = packet.getConnection();
     91        if (con != null) {
     92            // this updates the ack/nack fields
     93            con.getInputStream().updateAcks(packet);
     94        }
    9095   
    9196        ByteArray ba = _cache.acquire();
     
    97102        try {
    98103            int size = 0;
    99             long beforeWrite = System.currentTimeMillis();
     104            //long beforeWrite = System.currentTimeMillis();
    100105            if (packet.shouldSign())
    101106                size = packet.writeSignedPacket(buf, 0);
    102107            else
    103108                size = packet.writePacket(buf, 0);
    104             long writeTime = System.currentTimeMillis() - beforeWrite;
    105             if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) )
    106                 _log.warn("took " + writeTime + "ms to write the packet: " + packet);
     109            //long writeTime = System.currentTimeMillis() - beforeWrite;
     110            //if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) )
     111            //    _log.warn("took " + writeTime + "ms to write the packet: " + packet);
    107112
    108113            // last chance to short circuit...
     
    122127            boolean listenForStatus = false;
    123128            if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) {
    124                 Connection con = packet.getConnection();
    125129                if (con != null) {
    126130                    if (con.isInbound())
     
    142146                options.setTagThreshold(FINAL_TAG_THRESHOLD);
    143147            } else {
    144                 Connection con = packet.getConnection();
    145148                if (con != null) {
    146149                    if (con.isInbound() && con.getLifetime() < 2*60*1000)
     
    158161                                 I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(),
    159162                                 options, this);
    160                 _messageStatusMap.put(Long.valueOf(id), packet.getConnection());
     163                _messageStatusMap.put(Long.valueOf(id), con);
    161164                sent = true;
    162165            } else {
     
    174177                _context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime());
    175178           
    176             Connection con = packet.getConnection();
    177179            if (con != null) {
    178180                con.incrementBytesSent(size);
     
    190192            if (_log.shouldLog(Log.WARN))
    191193                _log.warn("Send failed for " + packet);
    192             Connection c = packet.getConnection();
    193             if (c != null) // handle race on b0rk
    194                 c.disconnect(false);
     194            if (con != null) // handle race on b0rk
     195                con.disconnect(false);
    195196        } else {
    196197            //packet.setKeyUsed(keyUsed);
    197198            //packet.setTagsSent(tagsSent);
    198199            packet.incrementSends();
    199             Connection c = packet.getConnection();
    200             if (c != null && _log.shouldDebug()) {
    201                 String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO();
    202                 c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
     200            if (con != null && _log.shouldDebug()) {
     201                String suffix = "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO();
     202                con.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
    203203            }
    204204            if (I2PSocketManagerFull.pcapWriter != null &&
  • history.txt

    rf0241d4 r2d8f0c2  
     12017-02-09 zzz
     2 * Streaming: Fix optional delay and choking (tickets #1046, 1939)
     3
     42017-02-08 zzz
     5 * I2CP: Return local delivery failure on queue overflow (ticket #1939)
     6
     72017-02-05 zzz
     8 * Console: Consolidate timer threads (ticket #1068)
     9 * NTCP: Don't write to an inbound connection before
     10   fully established, causing NPE (ticket #996)
     11 * Streaming:
     12   - Don't always send optional delay (ticket #1046)
     13   - Don't hard fail on expired message error (ticket #1748)
     14
    1152017-02-04 zzz
     16 * HTTP proxies:
     17   - Pass through relative referer URIs, convert same-origin
     18     absolute referer URIs to relative (ticket #1862)
    219 * NTP: Enable IPv6 support (ticket #1896)
    320
  • router/java/src/net/i2p/router/RouterVersion.java

    rf0241d4 r2d8f0c2  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 5;
     21    public final static long BUILD = 6;
    2222
    2323    /** for example "-test" */
Note: See TracChangeset for help on using the changeset viewer.