Opened 10 months ago
Closed 9 months ago
#2719 closed enhancement (fixed)
Streaming: TCP-Westwood+
Reported by: | Zlatin Balevsky | Owned by: | |
---|---|---|---|
Priority: | minor | Milestone: | 0.9.46 |
Component: | streaming | Version: | 0.9.45 |
Keywords: | Cc: | ||
Parent Tickets: | Sensitive: | no |
Description
This builds on top of ticket #2718 and implements "anti-aliased" sampling window. According to this paper https://c3lab.poliba.it/images/c/cd/QoSIP03.pdf should provide better estimate than regular Westwood.
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index a06a48809..d4730d12c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -53,6 +53,7 @@ class Connection { private final AtomicInteger _unackedPacketsReceived = new AtomicInteger(); private long _congestionWindowEnd; private volatile long _highestAckedThrough; + private volatile int _ssthresh; private final boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ @@ -68,9 +69,11 @@ class Connection { private long _lastReceivedOn; private final ActivityTimer _activityTimer; /** window size when we last saw congestion */ - private int _lastCongestionSeenAt; + private volatile int _lastCongestionSeenAt; private long _lastCongestionTime; private volatile long _lastCongestionHighestUnacked; + private final AtomicLong _nextRetransmitTime = new AtomicLong(); + private final RetransmitEvent _retransmitEvent; /** has the other side choked us? */ private volatile boolean _isChoked; /** are we choking the other side? */ @@ -99,6 +102,17 @@ class Connection { public static final long MAX_RESEND_DELAY = 45*1000; public static final long MIN_RESEND_DELAY = 100; + public static final int MAX_WINDOW_SIZE = 128; + public static final int MAX_SLOW_START_WINDOW = 24; + + /** Westwood parameters and state */ + private static final int WESTWOOD_TAU = 5000; + private static final int WESTWOOD_M = 2; + private final VirtualAckEvent _virtualAckEvent; + private final AtomicInteger _acksThisRtt = new AtomicInteger(); + private long _tAck; + private volatile double _bK, _bKFiltered; + /** * Wait up to 5 minutes after disconnection so we can ack/close packets. * Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL) @@ -108,7 +122,6 @@ class Connection { public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; private static final long MAX_CONNECT_TIMEOUT = 2*60*1000; - public static final int MAX_WINDOW_SIZE = 128; private static final int UNCHOKES_TO_SEND = 8; /**** @@ -156,6 +169,7 @@ class Connection { _createdOn = _context.clock().now(); _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; + _ssthresh = MAX_SLOW_START_WINDOW; _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; @@ -165,11 +179,17 @@ class Connection { _connectLock = new Object(); _nextSendLock = new Object(); _connectionEvent = new ConEvent(); + _retransmitEvent = new RetransmitEvent(); + _virtualAckEvent = new VirtualAckEvent(); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage // all createRateStats in ConnectionManager if (_log.shouldLog(Log.INFO)) _log.info("New connection created with options: " + _options); } + + int getSSThresh() { + return _ssthresh; + } public long getNextOutboundPacketNum() { return _lastSendId.incrementAndGet(); @@ -239,9 +259,9 @@ class Connection { throw ie; } } else { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends - // + "), waiting indefinitely"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + + "), waiting indefinitely"); try { _outboundPackets.wait(250); } catch (InterruptedException ie) { @@ -423,10 +443,15 @@ class Connection { long timeout = _options.getRTO(); if (timeout > MAX_RESEND_DELAY) timeout = MAX_RESEND_DELAY; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resend in " + timeout + " for " + packet); - // schedules itself + // RFC 6298 section 5.1 + if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " Resend in " + timeout + " for " + packet); + _retransmitEvent.forceReschedule(timeout); + } else if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " timer was already running"); + new ResendPacketEvent(packet, timeout); } @@ -452,7 +477,7 @@ class Connection { } */ } - + /********* private class PingNotifier implements ConnectionManager.PingNotifier { private long _startedPingOn; @@ -492,6 +517,7 @@ class Connection { } List<PacketLocal> acked = null; + boolean anyLeft = false; synchronized (_outboundPackets) { if (!_outboundPackets.isEmpty()) { // short circuit iterator for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) { @@ -552,18 +578,57 @@ class Connection { } } } - if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) { - if (_log.shouldLog(Log.INFO)) - _log.info("All outbound packets acked, clearing " + _activeResends); - _activeResends.set(0); - } + anyLeft = !_outboundPackets.isEmpty(); _outboundPackets.notifyAll(); } - if ((acked != null) && (!acked.isEmpty()) ) + if ((acked != null) && (!acked.isEmpty()) ) { + _acksThisRtt.addAndGet(acked.size()); + _virtualAckEvent.scheduleFirst(); _ackSinceCongestion.set(true); + if (anyLeft) { + // RFC 6298 section 5.3 + int rto = getOptions().getRTO(); + _nextRetransmitTime.set(_context.clock().now() + rto); + _retransmitEvent.forceReschedule(rto); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto); + } else { + // RFC 6298 section 5.2 + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " all outstanding packets acked, cancelling timer"); + _nextRetransmitTime.set(0); + _retransmitEvent.cancel(); + } + } return acked; } + /** + * Updates the bandwidth estimate + * @param now what time is it now + * @param nAcked number of packets acked, can be zero for virtual acks + */ + private synchronized void updateBK(long now, int nAcked) { + if (_tAck > 0) { + + double bKFiltered = _bKFiltered; // for debug logging only + + long deltaT = now - _tAck; + double bk = nAcked * 1.0 / deltaT; + double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT); + + _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2; + _bK = bk; + + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered + + " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); + } + } + _tAck = now; + } + //private long _occurredTime; //private long _occurredEventCount; @@ -788,6 +853,8 @@ class Connection { _outputStream.destroy(); _receiver.destroy(); _activityTimer.cancel(); + _retransmitEvent.cancel(); + _virtualAckEvent.cancel(); _inputStream.streamErrorOccurred(new IOException("Socket closed")); if (_log.shouldLog(Log.INFO)) @@ -1385,7 +1452,9 @@ class Connection { buf.append(" sent: ").append(1 + _lastSendId.get()); buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" ackThru ").append(_highestAckedThrough); - + buf.append(" ssThresh ").append(_ssthresh); + buf.append(" bkFiltered ").append(_bKFiltered); + buf.append(" minRTT ").append(getOptions().getMinRTT()); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); @@ -1393,6 +1462,143 @@ class Connection { return buf.toString(); } + + class RetransmitEvent extends SimpleTimer2.TimedEvent { + RetransmitEvent() { + super(_timer); + } + + public void timeReached() { + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " rtx timer timeReached()"); + + congestionOccurred(); + + // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6) + final long now = _context.clock().now(); + ConnectionOptions opts = getOptions(); + synchronized(opts) { + opts.doubleRTO(); + reschedule(opts.getRTO()); + _nextRetransmitTime.set(now + opts.getRTO()); + } + + // 2. cut ssthresh in accordance to what Westwood paper section 3.2 + List<PacketLocal> toResend = null; + synchronized(_outboundPackets) { + if (_outboundPackets.isEmpty()) { + if (_log.shouldLog(Log.WARN)) + _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??"); + return; + } + + PacketLocal oldest = _outboundPackets.values().iterator().next(); + if (oldest.getNumSends() == 1) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " cutting ssthresh and window"); + _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 )); + getOptions().setWindowSize(1); + } else if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " not cutting ssthresh and window"); + + toResend = new ArrayList<>(_outboundPackets.values()); + toResend = toResend.subList(0, (toResend.size() + 1) / 2); + } + + + // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3) + boolean sentAny = false; + for (PacketLocal packet : toResend) { + final int nResends = packet.getNumSends(); + if (packet.getNumSends() > getOptions().getMaxResends()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " packet " + packet + " resent too many times, closing"); + packet.cancelled(); + disconnect(false); + return; + } else if (packet.getNumSends() >= 3 && + packet.isFlagSet(Packet.FLAG_CLOSE) && + packet.getPayloadSize() <= 0 && + getCloseReceivedOn() > 0) { + // Bug workaround to prevent 5 minutes of retransmission + // Routers before 0.9.9 have bugs, they won't ack anything after + // they sent a close. Only send 3 CLOSE packets total, then + // shut down normally. + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " too many close resends, closing"); + packet.cancelled(); + disconnect(false); + return; + } else if (_outboundQueue.enqueue(packet)) { + if (_log.shouldLog(Log.INFO)) + _log.info(Connection.this + " resent packet " + packet); + if (nResends == 1) + _activeResends.incrementAndGet(); + sentAny = true; + } else if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " could not resend packet " + packet); + } + + if (sentAny) { + _lastSendTime = now; + resetActivityTimer(); + } + } + } + + + /** + * In order for the low-pass filter to work it is important to + * receive bw measurements at least every so often. + */ + class VirtualAckEvent extends SimpleTimer2.TimedEvent { + + private boolean _firstScheduled; + private boolean _virtual; + private long _nextRttSampleTime; + + VirtualAckEvent() { + super(_timer); + } + + public synchronized void timeReached() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " Westwood+ timer hit, virtual " + _virtual); + + final long now = _context.clock().now(); + + if (_virtual) { + updateBK(now, 0); + } else { + updateBK(now, _acksThisRtt.getAndSet(0)); + _nextRttSampleTime = now + getOptions().getRTT(); + } + + scheduleNext(now); + } + + synchronized void scheduleFirst() { + if (_firstScheduled) + return; + _firstScheduled = true; + final long now = _context.clock().now(); + _nextRttSampleTime = now + getOptions().getRTT(); + scheduleNext(now); + } + + private void scheduleNext(long now) { + int timeToRtt = (int)(_nextRttSampleTime - now); + if (timeToRtt < WESTWOOD_TAU / WESTWOOD_M ) { + _virtual = false; + forceReschedule(timeToRtt); + } else { + _virtual = true; + forceReschedule( WESTWOOD_TAU / WESTWOOD_M ); + } + } + } + /** * fired to reschedule event notification */ @@ -1428,13 +1634,16 @@ class Connection { _packet = packet; _nextSend = delay + _context.clock().now(); packet.setResendPacketEvent(ResendPacketEvent.this); - schedule(delay); } public long getNextSendTime() { return _nextSend; } public void timeReached() { retransmit(); } + void fastRetransmit() { + reschedule(0); + } + /** * Retransmit the packet if we need to. * @@ -1454,43 +1663,11 @@ class Connection { _packet.cancelled(); return false; } - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Resend period reached for " + _packet); - boolean resend = false; - boolean isLowest = false; - synchronized (_outboundPackets) { - // allow appx. half the window to be "lowest" and be active resends, minimum of 3 - // Note: we should really pick the N lowest, not the lowest one + N more who - // happen to get here next, as the timers get out-of-order esp. after fast retx - if (_packet.getSequenceNum() == _highestAckedThrough + 1 || - _packet.getNumSends() > 1 || - _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2)) - isLowest = true; - if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum()))) - resend = true; - } - if ( (resend) && (_packet.getAckTime() <= 0) ) { - boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1)); - if ( (!isLowest) && (!fastRetransmit) ) { - // we want to resend this packet, but there are already active - // resends in the air and we dont want to make a bad situation - // worse. wait another second - // BUG? seq# = 0, activeResends = 0, loop forever - why? - // also seen with seq# > 0. Is the _activeResends count reliable? - if (_log.shouldLog(Log.INFO)) - _log.info("Delaying resend of " + _packet + " with " - + _activeResends + " active resend, " - + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize()); - forceReschedule(1333); - _nextSend = 1333 + _context.clock().now(); - return false; - } + // It's the lowest, or it's fast retransmit time. Resend the packet. - if (fastRetransmit) - _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime()); + _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime()); // revamp various fields, in case we need to ack more, etc // updateAcks done in enqueue() @@ -1526,9 +1703,6 @@ class Connection { if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) { congestionOccurred(); _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime()); - newWindowSize /= 2; - if (newWindowSize <= 0) - newWindowSize = 1; // The timeout for _this_ packet will be doubled below, but we also // need to double the RTO for the _next_ packets. @@ -1536,7 +1710,12 @@ class Connection { // This prevents being stuck at a window size of 1, retransmitting every packet, // never updating the RTT or RTO. getOptions().doubleRTO(); - getOptions().setWindowSize(newWindowSize); + + if (_packet.getNumSends() == 1) { + _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 )); + int wSize = getOptions().getWindowSize(); + getOptions().setWindowSize(Math.min(_ssthresh, wSize)); + } if (_log.shouldLog(Log.INFO)) _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize @@ -1581,19 +1760,24 @@ class Connection { long rto = _options.getRTO(); if (rto < MIN_RESEND_DELAY) rto = MIN_RESEND_DELAY; - long timeout = rto << (numSends-1); + long timeout = rto; if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) timeout = MAX_RESEND_DELAY; // set this before enqueue() as it passes it on to the router _nextSend = timeout + _context.clock().now(); if (_outboundQueue.enqueue(_packet)) { + if (_nextRetransmitTime.compareAndSet(0, _nextSend)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " fast retransmit and schedule timer"); + _retransmitEvent.forceReschedule(timeout); + } // first resend for this packet ? if (numSends == 2) _activeResends.incrementAndGet(); if (_log.shouldLog(Log.INFO)) - _log.info("Resent packet " + - (fastRetransmit ? "(fast) " : "(timeout) ") + + _log.info(Connection.this + " Resent packet " + + "(fast) " + _packet + " next resend in " + timeout + "ms" + " activeResends: " + _activeResends + @@ -1605,8 +1789,6 @@ class Connection { // timer reset added 0.9.1 resetActivityTimer(); } - - forceReschedule(timeout); } // acked during resending (... or somethin') ???????????? @@ -1618,12 +1800,6 @@ class Connection { } return true; - } else { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Packet acked before resend (resend="+ resend + "): " - // + _packet + " on " + Connection.this); - return false; - } } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index 1d48a33b3..6d0dd9c5c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _receiveWindow; private int _profile; private int _rtt; + private int _minRtt = Integer.MAX_VALUE; private int _rttDev; private int _rto = INITIAL_RTO; private int _resendDelay; @@ -577,6 +578,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public synchronized int getRTT() { return _rtt; } + /** + * @return minimum RTT observed over the life of the connection + */ + public synchronized int getMinRTT() {return _minRtt; } + /** * not public, use updateRTT() */ @@ -671,6 +677,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * @param measuredValue must be positive */ public synchronized void updateRTT(int measuredValue) { + _minRtt = Math.min(_minRtt, measuredValue); switch(_initState) { case INIT: _initState = AckInit.FIRST; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index 3466ee6f0..8065dd77c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -28,8 +28,6 @@ class ConnectionPacketHandler { private final Log _log; private final ByteCache _cache = ByteCache.getInstance(32, 4*1024); - public static final int MAX_SLOW_START_WINDOW = 24; - // see tickets 1939 and 2584 private static final int IMMEDIATE_ACK_DELAY = 150; @@ -432,8 +430,8 @@ class ConnectionPacketHandler { _context.statManager().addRateData("stream.trend", trend, newWindowSize); if ( (!congested) && (acked > 0) && (numResends <= 0) ) { - if (newWindowSize < con.getLastCongestionSeenAt() / 2) { - // Don't make this <= LastCongestion/2 or we'll jump right back to where we were + int ssthresh = con.getSSThresh(); + if (newWindowSize < ssthresh) { // slow start - exponential growth // grow acked/N times (where N = the slow start factor) // always grow at least 1 @@ -443,10 +441,10 @@ class ConnectionPacketHandler { // as it often leads to a big packet loss (30-50) all at once that // takes quite a while (a minute or more) to recover from, // especially if crypto tags are lost - if (newWindowSize >= MAX_SLOW_START_WINDOW) + if (newWindowSize >= ssthresh) newWindowSize++; else - newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked); + newWindowSize = Math.min(ssthresh, newWindowSize + acked); } else if (acked < factor) newWindowSize++; else diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java index 4a2ada514..ff351232d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java @@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private long _ackOn; private long _cancelledOn; private final AtomicInteger _nackCount = new AtomicInteger(); - private volatile boolean _retransmitted; private volatile SimpleTimer2.TimedEvent _resendEvent; /** not bound to a connection */ @@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { */ public void incrementNACKs() { final int cnt = _nackCount.incrementAndGet(); - SimpleTimer2.TimedEvent evt = _resendEvent; - if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) && - (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) { // Don't fast retx if we recently resent it - _retransmitted = true; - evt.reschedule(0); + Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent; + if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && + (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) { // Don't fast retx if we recently resent it + evt.fastRetransmit(); // the predicate used to be '+', changing to '-' --zab if (_log.shouldLog(Log.DEBUG)) { - final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+ + final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+ " numSends=%d, lastSend=%d, now=%d", - toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now()); + toString(), cnt, _numSends.get(), _lastSend, _context.clock().now()); _log.debug(log); } } else if (_log.shouldLog(Log.DEBUG)) { - final String log = String.format("%s nack but no retransmit. Criteria: nacks=%d, retransmitted=%b,"+ + final String log = String.format("%s nack but no retransmit. Criteria: nacks=%d, "+ " numSends=%d, lastSend=%d, now=%d", - toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now()); + toString(), cnt, _numSends.get(), _lastSend, _context.clock().now()); _log.debug(log); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java index ae9e06196..08aa1639b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java @@ -42,7 +42,7 @@ class TCBShare { ///// private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2; private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5); - private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW; + private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE; public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) { _context = ctx;
Subtickets
Change History (5)
comment:1 Changed 9 months ago by
comment:2 Changed 9 months ago by
Parent Tickets: | 2718 |
---|
Removing #2718 as parent ticket as it will not be implemented.
comment:3 Changed 9 months ago by
Rebasing against 0.9.45-11:
No changes from patch in previous comment.
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java index 558f31c15..f0a63811f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -114,6 +114,14 @@ class Connection { /** Maximum number of packets to retransmit when the timer hits */ private static final int MAX_RTX = 16; + + /** Westwood parameters and state */ + private static final int WESTWOOD_TAU = 1000; + private static final int WESTWOOD_M = 2; + private final VirtualAckEvent _virtualAckEvent; + private final AtomicInteger _acksThisRtt = new AtomicInteger(); + private long _tAck; + private volatile double _bK, _bKFiltered; /**** public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, @@ -170,6 +178,7 @@ class Connection { _nextSendLock = new Object(); _connectionEvent = new ConEvent(); _retransmitEvent = new RetransmitEvent(); + _virtualAckEvent = new VirtualAckEvent(); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage // all createRateStats in ConnectionManager if (_log.shouldLog(Log.INFO)) @@ -579,6 +588,8 @@ class Connection { } if ((acked != null) && (!acked.isEmpty()) ) { _ackSinceCongestion.set(true); + _acksThisRtt.addAndGet(acked.size()); + _virtualAckEvent.scheduleFirst(); if (anyLeft) { // RFC 6298 section 5.3 int rto = getOptions().getRTO(); @@ -597,6 +608,32 @@ class Connection { return acked; } + /** + * Updates the bandwidth estimate + * @param now what time is it now + * @param nAcked number of packets acked, can be zero for virtual acks + */ + private synchronized void updateBK(long now, int nAcked) { + if (_tAck > 0) { + + double bKFiltered = _bKFiltered; // for debug logging only + + long deltaT = now - _tAck; + double bk = nAcked * 1.0 / deltaT; + double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT); + + _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2; + _bK = bk; + + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered + + " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); + } + } + _tAck = now; + } + + //private long _occurredTime; //private long _occurredEventCount; @@ -822,6 +859,7 @@ class Connection { _receiver.destroy(); _activityTimer.cancel(); _retransmitEvent.cancel(); + _virtualAckEvent.cancel(); _inputStream.streamErrorOccurred(new IOException("Socket closed")); if (_log.shouldLog(Log.INFO)) @@ -1416,7 +1454,9 @@ class Connection { buf.append(" sent: ").append(1 + _lastSendId.get()); buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" ackThru ").append(_highestAckedThrough); - buf.append(" ssThresh ").append(_ssthresh); + buf.append(" ssThresh ").append(_ssthresh); + buf.append(" bKFiltered ").append(_bKFiltered); + buf.append(" minRTT ").append(getOptions().getMinRTT()); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); @@ -1477,7 +1517,7 @@ class Connection { final long now = _context.clock().now(); pushBackRTO(getOptions().doubleRTO()); - // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4) + // 2. cut ssthresh to BWE * minRTT and window to 1 ( Westwood paper, section 3.2) List<PacketLocal> toResend = null; synchronized(_outboundPackets) { if (_outboundPackets.isEmpty()) { @@ -1490,8 +1530,8 @@ class Connection { if (oldest.getNumSends() == 1) { if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " cutting ssthresh and window"); - int flightSize = _outboundPackets.size(); - _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 )); + _ssthresh = Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ); + _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh ); getOptions().setWindowSize(1); } else if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " not cutting ssthresh and window"); @@ -1575,7 +1615,64 @@ class Connection { } } + /** + * In order for the low-pass filter to work it is important to + * receive bw measurements at least every so often. + */ + class VirtualAckEvent extends SimpleTimer2.TimedEvent { + + private boolean _firstScheduled; + private boolean _virtual; + private long _nextRttSampleTime; + + VirtualAckEvent() { + super(_timer); + } + + public synchronized void timeReached() { + if (_resetSentOn.get() > 0 || _resetReceived.get() || _finalDisconnect.get()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " virtual ack event after close or reset"); + return; + } + + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(Connection.this + " Westwood+ timer hit, virtual " + _virtual); + + final long now = _context.clock().now(); + + if (_virtual) { + updateBK(now, 0); + } else { + updateBK(now, _acksThisRtt.getAndSet(0)); + _nextRttSampleTime = now + getOptions().getRTT(); + } + + scheduleNext(now); + } + + synchronized void scheduleFirst() { + if (_firstScheduled) + return; + _firstScheduled = true; + final long now = _context.clock().now(); + _nextRttSampleTime = now + getOptions().getRTT(); + scheduleNext(now); + } + + private void scheduleNext(long now) { + int timeToRtt = (int)(_nextRttSampleTime - now); + if (timeToRtt < WESTWOOD_TAU / WESTWOOD_M ) { + _virtual = false; + forceReschedule(timeToRtt); + } else { + _virtual = true; + forceReschedule( WESTWOOD_TAU / WESTWOOD_M ); + } + } + } /** * fired to reschedule event notification @@ -1693,11 +1790,13 @@ class Connection { // This prevents being stuck at a window size of 1, retransmitting every packet, // never updating the RTT or RTO. getOptions().doubleRTO(); - getOptions().setWindowSize(1); + // cut ssthresh to BWE * minRTT, wsize to ssthresh (Westwood paper section 3.1) if (_packet.getNumSends() == 1) { - int flightSize = getUnackedPacketsSent(); - _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 )); + _ssthresh = Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ); + _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh); + int wsize = getOptions().getWindowSize(); + getOptions().setWindowSize(Math.min(_ssthresh,wsize)); } if (_log.shouldLog(Log.INFO)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index 4d9ae6ef5..d0774e108 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _receiveWindow; private int _profile; private int _rtt; + private int _minRtt = Integer.MAX_VALUE; private int _rttDev; private int _rto = INITIAL_RTO; private int _resendDelay; @@ -578,6 +579,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public synchronized int getRTT() { return _rtt; } + /** + * @return minimum RTT observed over the life of the connection + */ + public synchronized int getMinRTT() {return _minRtt; } + + /** * not public, use updateRTT() */ @@ -677,6 +684,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * @param measuredValue must be positive */ public synchronized void updateRTT(int measuredValue) { + _minRtt = Math.min(_minRtt, measuredValue); switch(_initState) { case INIT: _initState = AckInit.FIRST;
comment:4 Changed 9 months ago by
A different approach to implementing Westwood[+] is to not use a timer, but instead keep history of incoming acks and compute the Bandwidth Estimate (BWE) on demand. The following patch and two new files implement one such approach.
- Patch to the apps/streaming package:
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/ streaming/impl/Connection.java index 558f31c15..615da2b47 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java @@ -114,6 +114,9 @@ class Connection { /** Maximum number of packets to retransmit when the timer hits */ private static final int MAX_RTX = 16; + + + private final BandwidthEstimator _bwEstimator; /**** public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, @@ -170,6 +173,7 @@ class Connection { _nextSendLock = new Object(); _connectionEvent = new ConEvent(); _retransmitEvent = new RetransmitEvent(); + _bwEstimator = new WestwoodBandwidthEstimator(ctx, 8 * MAX_WINDOW_SIZE, 1000, 2, 10); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage // all createRateStats in ConnectionManager if (_log.shouldLog(Log.INFO)) @@ -579,6 +583,7 @@ class Connection { } if ((acked != null) && (!acked.isEmpty()) ) { _ackSinceCongestion.set(true); + _bwEstimator.addSample(acked.size()); if (anyLeft) { // RFC 6298 section 5.3 int rto = getOptions().getRTO(); @@ -1477,7 +1482,7 @@ class Connection { final long now = _context.clock().now(); pushBackRTO(getOptions().doubleRTO()); - // 2. cut ssthresh in half the outstanding size (RFC 5681, equation 4) + // 2. cut ssthresh to bandwidth estimate, window to 1 List<PacketLocal> toResend = null; synchronized(_outboundPackets) { if (_outboundPackets.isEmpty()) { @@ -1490,8 +1495,8 @@ class Connection { if (oldest.getNumSends() == 1) { if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " cutting ssthresh and window"); - int flightSize = _outboundPackets.size(); - _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 )); + _ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 ); + _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh); getOptions().setWindowSize(1); } else if (_log.shouldLog(Log.DEBUG)) _log.debug(Connection.this + " not cutting ssthresh and window"); @@ -1693,11 +1698,12 @@ class Connection { // This prevents being stuck at a window size of 1, retransmitting every packet, // never updating the RTT or RTO. getOptions().doubleRTO(); - getOptions().setWindowSize(1); if (_packet.getNumSends() == 1) { - int flightSize = getUnackedPacketsSent(); - _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, Math.max( flightSize / 2, 2 )); + _ssthresh = Math.max( (int)(_bwEstimator.getBandwidthEstimate() * getOptions().getMinRTT()), 2 ); + _ssthresh = Math.min(ConnectionPacketHandler.MAX_SLOW_START_WINDOW, _ssthresh); + int wsize = getOptions().getWindowSize(); + getOptions().setWindowSize(Math.min(_ssthresh, wsize)); } if (_log.shouldLog(Log.INFO)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java index 4d9ae6ef5..d0774e108 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java @@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { private int _receiveWindow; private int _profile; private int _rtt; + private int _minRtt = Integer.MAX_VALUE; private int _rttDev; private int _rto = INITIAL_RTO; private int _resendDelay; @@ -578,6 +579,12 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public synchronized int getRTT() { return _rtt; } + /** + * @return minimum RTT observed over the life of the connection + */ + public synchronized int getMinRTT() {return _minRtt; } + + /** * not public, use updateRTT() */ @@ -677,6 +684,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * @param measuredValue must be positive */ public synchronized void updateRTT(int measuredValue) { + _minRtt = Math.min(_minRtt, measuredValue); switch(_initState) { case INIT: _initState = AckInit.FIRST;
- The BandwidthEstimator base class
package net.i2p.client.streaming.impl; import java.util.Queue; import java.util.ArrayDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ArrayBlockingQueue; import net.i2p.I2PAppContext; import net.i2p.util.Log; abstract class BandwidthEstimator { private final I2PAppContext _context; private final Log _log; private final BlockingQueue<AckSample> _samples; private final int _maxSamples; BandwidthEstimator(I2PAppContext ctx, int maxSamples) { _context = ctx; _log = ctx.logManager().getLog(BandwidthEstimator.class); _samples = new ArrayBlockingQueue<>(maxSamples); _maxSamples = maxSamples; } /** * Records an arriving ack. * @param acked how many packets were acked with this ack */ public final void addSample(int acked) { long now = _context.clock().now(); AckSample sample = new AckSample(now, acked); while (!_samples.offer(sample)) { if (_log.shouldLog(Log.INFO)) _log.info("Samples overflow, discarding earliest"); AckSample toDiscard = _samples.poll(); discard(toDiscard); } } /** * Notification that a sample was discarded * Can be overriden by subclasses */ protected void discard(AckSample discarded) {} /** * @return the current bandwidth estimate in packets/ms. */ public final double getBandwidthEstimate() { final long now = _context.clock().now(); Queue<AckSample> copy = new ArrayDeque<>(_maxSamples); _samples.drainTo(copy); int nSamples = copy.size(); double rv = computeBWE(now, copy); if (_log.shouldLog(Log.DEBUG)) _log.debug("computed " + rv + " from samples " + nSamples); return rv; } /** * Performs the actual computation. To be implemented by subclasses * @param now what time is it now * @param samples list of AckSamples to base the computation on * @return the current bandwidth estimate in packets/ms */ protected abstract double computeBWE(long now, Queue<AckSample> samples); protected static class AckSample { final long timestamp; final int acked; AckSample(long timestamp, int acked) { this.timestamp = timestamp; this.acked = acked; } @Override public String toString() { return "AckSample[" + timestamp + ":" + acked + "]"; } } }
- The WestwoodBandwidthEstimator implementation of regular Westwood with fixed aliasing interval
package net.i2p.client.streaming.impl; import java.util.Queue; import java.util.ArrayDeque; import net.i2p.I2PAppContext; import net.i2p.util.Log; class WestwoodBandwidthEstimator extends BandwidthEstimator { private final Log _log; private final int _tau; private final int _m; private final int _tolerance; private long _tAck; private double _bKFiltered, _bK; /** * @param tau - westwood TAU constant * @param m - westwood M constant * @param tolerance - if samples are received within this many milliseconds, alias them */ WestwoodBandwidthEstimator(I2PAppContext ctx, int maxSamples, int tau, int m, int tolerance) { super(ctx, maxSamples); this._log = ctx.logManager().getLog(WestwoodBandwidthEstimator.class); this._tau = tau; this._m = m; this._tolerance = tolerance; } @Override protected synchronized void discard(AckSample discarded) { // start fresh _tAck = 0; _bKFiltered = 0; _bK = 0; } @Override protected synchronized double computeBWE(final long now, Queue<AckSample> samples) { if (_log.shouldLog(Log.DEBUG)) _log.debug(this + " computeBWE now:" + now + " samples:" + samples); if (samples.isEmpty() && _tAck == 0) return 0.0; // nothing ever sampled // 1. pre-process queue by aliasing any samples within the tolerance int samplesSize = samples.size(); Queue<AckSample> aliased = new ArrayDeque(samples.size()); while(!samples.isEmpty()) { AckSample pivot = samples.poll(); int packets = pivot.acked; final long time = pivot.timestamp; AckSample sample; while((sample = samples.peek()) != null) { if (sample.timestamp <= time + _tolerance) { if (_log.shouldLog(Log.DEBUG)) _log.debug(this + " aliasing " + sample + " with " + pivot); packets += sample.acked; samples.poll(); } else break; } AckSample processed = new AckSample(time, packets); aliased.offer(processed); } if (_log.shouldLog(Log.DEBUG)) { _log.debug(this + " Aliasing: " + samplesSize + " -> " + aliased.size()); _log.debug(this + " Aliased samples:" + aliased); } // init with first sample if necessary if (_tAck == 0) { AckSample first = aliased.poll(); _tAck = first.timestamp; _bKFiltered = first.acked; _bK = first.acked; } // go through the samples and compute _bKFiltered while(!aliased.isEmpty()) { AckSample first = aliased.peek(); if (first.timestamp - _tAck <= _tau / _m ) { // consume the sample aliased.poll(); updateBK(first.timestamp, first.acked); } else { // virtual sample updateBK(_tAck + _tau / _m, 0); } } // fill up with virtual samples past the last real sample int virtualSamples = 0; while (_tAck < now - _tau / _m) { virtualSamples++; updateBK(_tAck + _tau / _m, 0); } if (_log.shouldLog(Log.DEBUG)) _log.debug(this + " padded virtual samples at the end:" + virtualSamples); return _bKFiltered; } /** * * @param time the time of the measurement * @param packets number of packets acked, 0 for virtual samples */ private void updateBK(long time, int packets) { long deltaT = time - _tAck; double bk = packets * 1.0 / deltaT; double alphaK = (2.0 * _tau - deltaT) / (2.0 * _tau + deltaT); _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * (bk + _bK) / 2; _bK = bk; _tAck = time; } @Override public synchronized String toString() { return "WBE[" + " _bKFiltered " + _bKFiltered + " _tAck " + _tAck + "]"; } }
comment:5 Changed 9 months ago by
Milestone: | undecided → 0.9.46 |
---|---|
Resolution: | → fixed |
Status: | new → closed |
A reworked and simplified version of the patch in comment 4 above, based on Linux kernel's implementation, in 090d47ac035674bcc624dbde67d094699592f626 0.9.45-14. Still calculated on-demand, but the 'history' is now just a counter. BandwidthEstimator? is now a simple interface, to support testing and comparison of different implementations. Testing will continue. Early results show up to 40% improvement on lossy connections, in line with the Westwood+ paper.
Rebasing against -10. Other notes: