Custom Query (1940 matches)

Filters
 
Or
 
  
 
Columns

Show under each result:


Results (37 - 39 of 1940)

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
Ticket Resolution Summary Owner Reporter
#2723 not our bug The information you requested. Steve Smith
Description

I am running arch linux and have installed i2pd from my distro. I then installed i2pbrowser from a download and have been using that as my web browser. As far as Java installs go, I used 8,10,11, and 13 where 13 is the default and they all gave me the same java errors. The jre and jdk are installed. After doing some research, I noticed that there are some classes that I can download in jar form which I did but could not get them to work. I also noticed that /var/lib/i2pd/addressbook/addresses.csv does not exist.

This is pertinent output from the only log I could find.

/var/log/i2pd/i2pd.log

17:16:07@884/info - Log: min messages level set to info 17:16:07@884/info - Log: will send messages to /var/log/i2pd/i2pd.log 17:16:07@884/info - i2pd v2.30.0 starting 17:16:07@884/info - Daemon: bandwidth set to 'low' 17:16:07@884/info - Daemon: using system limit in 4096 max open files 17:16:07@884/info - Daemon: starting NetDB 17:16:07@884/info - Family: 5 certificates loaded 17:16:07@884/info - NetDb?: 0 routers loaded (0 floodfils) 17:16:07@884/info - Reseed: 8 certificates loaded 17:16:07@884/error - RouterInfo?: Can't open file 17:16:07@884/info - Reseed: Downloading SU3 from https://i2p.novg.net/i2pseeds.su3 ……… 6:09@884/info - Daemon: starting Transports 17:16:09@884/info - Daemon: ntcp disabled 17:16:09@884/info - NTCP2: Start listening TCP port 10345 17:16:09@884/info - Transports: Start listening UDP port 10345 17:16:09@884/info - Daemon: Transports started 17:16:09@884/info - Daemon: starting HTTP Server at 127.0.0.1:7070 17:16:09@884/info - Daemon: starting Tunnels 17:16:09@884/info - Daemon: starting Client 17:16:09@77/warn - SSU: Can't connect to unreachable router and no ipv4 non-expired introducers presented 17:16:09@77/warn - SSU: Can't connect to unreachable router and no ipv4 non-expired introducers presented 17:16:09@884/warn - Addressbook: Can't open /var/lib/i2pd/addressbook/addresses.csv 17:16:09@884/error - Addressbook: resetting eTags

#2719 fixed Streaming: TCP-Westwood+ Zlatin Balevsky
Description

This builds on top of ticket #2718 and implements "anti-aliased" sampling window. According to this paper https://c3lab.poliba.it/images/c/cd/QoSIP03.pdf should provide better estimate than regular Westwood.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index a06a48809..d4730d12c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -68,9 +69,11 @@ class Connection {
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
+    private volatile int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private final AtomicLong _nextRetransmitTime = new AtomicLong();
+    private final RetransmitEvent _retransmitEvent;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -99,6 +102,17 @@ class Connection {
     public static final long MAX_RESEND_DELAY = 45*1000;
     public static final long MIN_RESEND_DELAY = 100;
 
+    public static final int MAX_WINDOW_SIZE = 128;
+    public static final int MAX_SLOW_START_WINDOW = 24;
+
+    /** Westwood parameters and state */
+    private static final int WESTWOOD_TAU = 5000;
+    private static final int WESTWOOD_M = 2;
+    private final VirtualAckEvent _virtualAckEvent;
+    private final AtomicInteger _acksThisRtt = new AtomicInteger();
+    private long _tAck;
+    private volatile double _bK, _bKFiltered;
+
     /**
      *  Wait up to 5 minutes after disconnection so we can ack/close packets.
      *  Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL)
@@ -108,7 +122,6 @@ class Connection {
     public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000;
     private static final long MAX_CONNECT_TIMEOUT = 2*60*1000;
 
-    public static final int MAX_WINDOW_SIZE = 128;
     private static final int UNCHOKES_TO_SEND = 8;
     
 /****
@@ -156,6 +169,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
+        _ssthresh = MAX_SLOW_START_WINDOW;
         _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
@@ -165,11 +179,17 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
+        _virtualAckEvent = new VirtualAckEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +259,9 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
+                                       + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -423,10 +443,15 @@ class Connection {
             long timeout = _options.getRTO();
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+                _retransmitEvent.forceReschedule(timeout);
+            } else if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " timer was already running");
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -452,7 +477,7 @@ class Connection {
         }
          */
     }
-    
+   
 /*********
     private class PingNotifier implements ConnectionManager.PingNotifier {
         private long _startedPingOn;
@@ -492,6 +517,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -552,18 +578,57 @@ class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("All outbound packets acked, clearing " + _activeResends);
-                _activeResends.set(0);
-            }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
+            _acksThisRtt.addAndGet(acked.size());
+            _virtualAckEvent.scheduleFirst();
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _nextRetransmitTime.set(_context.clock().now() + rto);
+                _retransmitEvent.forceReschedule(rto);
+
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+                _nextRetransmitTime.set(0);
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
+    /**
+     * Updates the bandwidth estimate
+     * @param now what time is it now
+     * @param nAcked number of packets acked, can be zero for virtual acks
+     */
+    private synchronized void updateBK(long now, int nAcked) {
+        if (_tAck > 0) {
+
+            double bKFiltered = _bKFiltered; // for debug logging only
+
+            long deltaT = now - _tAck;
+            double bk = nAcked * 1.0 / deltaT;
+            double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT);
+
+            _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2;
+            _bK = bk;
+
+            if (_log.shouldLog(Log.DEBUG)) { 
+                _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered +
+                    " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); 
+            }
+        }
+        _tAck = now;
+    }
+
     //private long _occurredTime;
     //private long _occurredEventCount;
 
@@ -788,6 +853,8 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
+        _virtualAckEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1385,7 +1452,9 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
+        buf.append(" bkFiltered ").append(_bKFiltered);
+        buf.append(" minRTT ").append(getOptions().getMinRTT());
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1393,6 +1462,143 @@ class Connection {
         return buf.toString();
     }
 
+
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            ConnectionOptions opts = getOptions();
+            synchronized(opts) {
+                opts.doubleRTO();
+                reschedule(opts.getRTO());
+                _nextRetransmitTime.set(now + opts.getRTO());
+            }
+
+            // 2. cut ssthresh in accordance to what Westwood paper section 3.2
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.values().iterator().next();
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, (toResend.size() + 1) / 2);
+            }
+
+
+            // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+        }
+    }
+
+
+    /**
+     * In order for the low-pass filter to work it is important to
+     * receive bw measurements at least every so often.
+     */
+    class VirtualAckEvent extends SimpleTimer2.TimedEvent {
+
+        private boolean _firstScheduled;
+        private boolean _virtual;
+        private long _nextRttSampleTime;
+
+        VirtualAckEvent() {
+            super(_timer);
+        }
+
+        public synchronized void timeReached() {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " Westwood+ timer hit, virtual " + _virtual);
+
+            final long now = _context.clock().now();
+
+            if (_virtual) {
+                updateBK(now, 0);
+            } else {
+                updateBK(now, _acksThisRtt.getAndSet(0));
+                _nextRttSampleTime = now + getOptions().getRTT();
+            }
+
+            scheduleNext(now);
+        }
+
+        synchronized void scheduleFirst() {
+            if (_firstScheduled)
+                return;
+            _firstScheduled = true;
+            final long now = _context.clock().now();
+            _nextRttSampleTime = now + getOptions().getRTT();
+            scheduleNext(now);
+        }
+
+        private void scheduleNext(long now) {
+            int timeToRtt = (int)(_nextRttSampleTime - now);
+            if (timeToRtt < WESTWOOD_TAU / WESTWOOD_M ) {
+                _virtual = false;
+                forceReschedule(timeToRtt);
+            } else {
+                _virtual = true;
+                forceReschedule( WESTWOOD_TAU / WESTWOOD_M );
+            }
+        }
+    }    
+
     /**
      * fired to reschedule event notification
      */
@@ -1428,13 +1634,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1454,43 +1663,11 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
+
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1526,9 +1703,6 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
                         
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
@@ -1536,7 +1710,12 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+
+                        if (_packet.getNumSends() == 1) {
+                            _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                            int wSize = getOptions().getWindowSize();
+                            getOptions().setWindowSize(Math.min(_ssthresh, wSize));
+                        }
 
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
@@ -1581,19 +1760,24 @@ class Connection {
                     long rto = _options.getRTO();
                     if (rto < MIN_RESEND_DELAY)
                         rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_nextRetransmitTime.compareAndSet(0, _nextSend)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                            _retransmitEvent.forceReschedule(timeout);
+                        }
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1605,8 +1789,6 @@ class Connection {
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
-
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1618,12 +1800,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index 1d48a33b3..6d0dd9c5c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _receiveWindow;
     private int _profile;
     private int _rtt;
+    private int _minRtt = Integer.MAX_VALUE;
     private int _rttDev;
     private int _rto = INITIAL_RTO;
     private int _resendDelay;
@@ -577,6 +578,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public synchronized int getRTT() { return _rtt; }
 
+    /**
+     * @return minimum RTT observed over the life of the connection
+     */
+    public synchronized int getMinRTT() {return _minRtt; }
+
     /**
      *  not public, use updateRTT()
      */
@@ -671,6 +677,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      *  @param measuredValue must be positive
      */
     public synchronized void updateRTT(int measuredValue) {
+        _minRtt = Math.min(_minRtt, measuredValue);
         switch(_initState) {
         case INIT:
             _initState = AckInit.FIRST;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..8065dd77c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -432,8 +430,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..ff351232d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
     private long _ackOn; 
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
-    private volatile boolean _retransmitted;
     private volatile SimpleTimer2.TimedEvent _resendEvent;
     
     /** not bound to a connection */
@@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
-        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
-            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
-            _retransmitted = true;
-            evt.reschedule(0);
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
+        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null  &&
+            (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) {  // Don't fast retx if we recently resent it
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
-                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
+                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
             }
         } else if (_log.shouldLog(Log.DEBUG)) {
-            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
+            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
         }
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..08aa1639b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;
#2718 wontfix Streaming: TCP-Westwood Zlatin Balevsky
Description

This ticket builds upon #2715, or more specifically if the patch in that ticket is merged then the effort to implement Westwood is minimal.

I've chosen the constants for the low-pass filter arbitrarily. More tuning and testnet results to follow.

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
index a06a48809..fc56ee402 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
@@ -53,6 +53,7 @@ class Connection {
     private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
     private long _congestionWindowEnd;
     private volatile long _highestAckedThrough;
+    private volatile int _ssthresh;
     private final boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
@@ -68,9 +69,11 @@ class Connection {
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
-    private int _lastCongestionSeenAt;
+    private volatile int _lastCongestionSeenAt;
     private long _lastCongestionTime;
     private volatile long _lastCongestionHighestUnacked;
+    private final AtomicLong _nextRetransmitTime = new AtomicLong();
+    private final RetransmitEvent _retransmitEvent;
     /** has the other side choked us? */
     private volatile boolean _isChoked;
     /** are we choking the other side? */
@@ -99,6 +102,15 @@ class Connection {
     public static final long MAX_RESEND_DELAY = 45*1000;
     public static final long MIN_RESEND_DELAY = 100;
 
+    public static final int MAX_SLOW_START_WINDOW = 24;
+
+    /** Westwood parameters and state */
+    private static final int WESTWOOD_TAU = 5000;
+    private static final int WESTWOOD_M = 2;
+    private final VirtualAckEvent _virtualAckEvent;
+    private long _tAck;
+    private volatile double _bK, _bKFiltered;
+
     /**
      *  Wait up to 5 minutes after disconnection so we can ack/close packets.
      *  Roughly equal to the TIME-WAIT time in RFC 793, where the recommendation is 4 minutes (2 * MSL)
@@ -156,6 +168,7 @@ class Connection {
         _createdOn = _context.clock().now();
         _congestionWindowEnd = _options.getWindowSize()-1;
         _highestAckedThrough = -1;
+        _ssthresh = MAX_SLOW_START_WINDOW;
         _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
@@ -165,11 +178,17 @@ class Connection {
         _connectLock = new Object();
         _nextSendLock = new Object();
         _connectionEvent = new ConEvent();
+        _retransmitEvent = new RetransmitEvent();
+        _virtualAckEvent = new VirtualAckEvent();
         _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
         // all createRateStats in ConnectionManager
         if (_log.shouldLog(Log.INFO))
             _log.info("New connection created with options: " + _options);
     }
+
+    int getSSThresh() {
+        return _ssthresh;
+    }
     
     public long getNextOutboundPacketNum() { 
         return _lastSendId.incrementAndGet();
@@ -239,9 +258,9 @@ class Connection {
                             throw ie;
                         }
                     } else {
-                        //if (_log.shouldLog(Log.DEBUG))
-                        //    _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
-                        //               + "), waiting indefinitely");
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends 
+                                       + "), waiting indefinitely");
                         try {
                             _outboundPackets.wait(250);
                         } catch (InterruptedException ie) {
@@ -423,10 +442,15 @@ class Connection {
             long timeout = _options.getRTO();
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Resend in " + timeout + " for " + packet);
 
-            // schedules itself
+            // RFC 6298 section 5.1
+            if (_nextRetransmitTime.compareAndSet(0, _context.clock().now() + timeout)) {
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " Resend in " + timeout + " for " + packet);
+                _retransmitEvent.forceReschedule(timeout);
+            } else if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " timer was already running");
+
             new ResendPacketEvent(packet, timeout);
         }
 
@@ -452,7 +476,7 @@ class Connection {
         }
          */
     }
-    
+   
 /*********
     private class PingNotifier implements ConnectionManager.PingNotifier {
         private long _startedPingOn;
@@ -492,6 +516,7 @@ class Connection {
         }
         
         List<PacketLocal> acked = null;
+        boolean anyLeft = false;
         synchronized (_outboundPackets) {
             if (!_outboundPackets.isEmpty()) {  // short circuit iterator
               for (Iterator<Map.Entry<Long, PacketLocal>> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) {
@@ -552,18 +577,58 @@ class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.isEmpty()) && (_activeResends.get() != 0) ) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("All outbound packets acked, clearing " + _activeResends);
-                _activeResends.set(0);
-            }
+            anyLeft = !_outboundPackets.isEmpty();
             _outboundPackets.notifyAll();
         }
-        if ((acked != null) && (!acked.isEmpty()) )
+        if ((acked != null) && (!acked.isEmpty()) ) {
+            final long now = _context.clock().now();
+            updateBK(now, acked.size());
+            _virtualAckEvent.pushOut();
             _ackSinceCongestion.set(true);
+            if (anyLeft) {
+                // RFC 6298 section 5.3
+                int rto = getOptions().getRTO();
+                _nextRetransmitTime.set(now + rto);
+                _retransmitEvent.forceReschedule(rto);
+
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not all packets acked, pushing timer out " + rto);
+            } else {
+                // RFC 6298 section 5.2
+                if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " all outstanding packets acked, cancelling timer");
+                _nextRetransmitTime.set(0);
+                _retransmitEvent.cancel();
+            }
+        }
         return acked;
     }
 
+    /**
+     * Updates the bandwidth estimate
+     * @param now what time is it now
+     * @param nAcked number of packets acked, can be zero for virtual acks
+     */
+    private synchronized void updateBK(long now, int nAcked) {
+        if (_tAck > 0) {
+
+            double bKFiltered = _bKFiltered; // for debug logging only
+
+            long deltaT = Math.max(1,now - _tAck);
+            double bk = nAcked * 1.0 / deltaT;
+            double alphaK = (2.0 * WESTWOOD_TAU - deltaT) / (2.0 * WESTWOOD_TAU + deltaT);
+
+            _bKFiltered = alphaK * _bKFiltered + (1 - alphaK) * ( bk + _bK ) / 2;
+            _bK = bk;
+
+            if (_log.shouldLog(Log.DEBUG)) { 
+                _log.debug(Connection.this + " bKFiltered: " + bKFiltered + " -> " + _bKFiltered +
+                    " deltaT " + deltaT + " bK " + _bK + " alphaK " + alphaK + " nAcked " + nAcked); 
+            }
+        }
+        _tAck = now;
+    }
+
     //private long _occurredTime;
     //private long _occurredEventCount;
 
@@ -788,6 +853,8 @@ class Connection {
         _outputStream.destroy();
         _receiver.destroy();
         _activityTimer.cancel();
+        _retransmitEvent.cancel();
+        _virtualAckEvent.cancel();
         _inputStream.streamErrorOccurred(new IOException("Socket closed"));
         
         if (_log.shouldLog(Log.INFO))
@@ -1385,7 +1452,9 @@ class Connection {
         buf.append(" sent: ").append(1 + _lastSendId.get());
         buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         buf.append(" ackThru ").append(_highestAckedThrough);
-        
+        buf.append(" ssThresh ").append(_ssthresh); 
+        buf.append(" bkFiltered ").append(_bKFiltered);
+        buf.append(" minRTT ").append(getOptions().getMinRTT());
         buf.append(" maxWin ").append(getOptions().getMaxWindowSize());
         buf.append(" MTU ").append(getOptions().getMaxMessageSize());
         
@@ -1393,6 +1462,115 @@ class Connection {
         return buf.toString();
     }
 
+
+    class RetransmitEvent extends SimpleTimer2.TimedEvent {
+        RetransmitEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " rtx timer timeReached()");
+
+            congestionOccurred();
+
+            // 1. Double RTO and backoff (RFC 6298 section 5.5 & 5.6)
+            final long now = _context.clock().now();
+            ConnectionOptions opts = getOptions();
+            synchronized(opts) {
+                opts.doubleRTO();
+                reschedule(opts.getRTO());
+                _nextRetransmitTime.set(now + opts.getRTO());
+            }
+
+            // 2. cut ssthresh in accordance to what Westwood paper section 3.2
+            List<PacketLocal> toResend = null;
+            synchronized(_outboundPackets) {
+                if (_outboundPackets.isEmpty()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn(Connection.this + " Retransmission timer hit but nothing transmitted??");
+                    return;
+                }
+
+                PacketLocal oldest = _outboundPackets.values().iterator().next();
+                if (oldest.getNumSends() == 1) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " cutting ssthresh and window");
+                    _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                    getOptions().setWindowSize(1);
+                } else if (_log.shouldLog(Log.DEBUG))
+                    _log.debug(Connection.this + " not cutting ssthresh and window");
+
+                toResend = new ArrayList<>(_outboundPackets.values());
+                toResend = toResend.subList(0, (toResend.size() + 1) / 2);
+            }
+
+
+            // 3. Retransmit half of the packets in flight (RFC 6298 section 5.4 and RFC 5681 section 4.3)
+            boolean sentAny = false;
+            for (PacketLocal packet : toResend) {
+                final int nResends = packet.getNumSends();
+                if (packet.getNumSends() > getOptions().getMaxResends()) {
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " packet " + packet + " resent too many times, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (packet.getNumSends() >= 3 &&
+                           packet.isFlagSet(Packet.FLAG_CLOSE) &&
+                           packet.getPayloadSize() <= 0 &&
+                           getCloseReceivedOn() > 0) {
+                    // Bug workaround to prevent 5 minutes of retransmission
+                    // Routers before 0.9.9 have bugs, they won't ack anything after
+                    // they sent a close. Only send 3 CLOSE packets total, then
+                    // shut down normally.
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(Connection.this + " too many close resends, closing");
+                    packet.cancelled();
+                    disconnect(false);
+                    return;
+                } else if (_outboundQueue.enqueue(packet)) {
+                    if (_log.shouldLog(Log.INFO)) 
+                        _log.info(Connection.this + " resent packet " + packet);
+                    if (nResends == 1)
+                        _activeResends.incrementAndGet();
+                    sentAny = true;
+                } else if (_log.shouldLog(Log.DEBUG)) 
+                    _log.debug(Connection.this + " could not resend packet " + packet);
+            }
+
+            if (sentAny) {
+                _lastSendTime = now;
+                resetActivityTimer();
+            }
+        }
+    }
+
+
+    /**
+     * In order for the low-pass filter to work it is important to
+     * receive bw measurements at least every so often.
+     */
+    class VirtualAckEvent extends SimpleTimer2.TimedEvent {
+        VirtualAckEvent() {
+            super(_timer);
+        }
+
+        public void timeReached() {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(Connection.this + " injecting virtual ack");
+
+            final long now = _context.clock().now();
+            updateBK(now, 0);
+            pushOut();
+        }
+
+        void pushOut() {
+            forceReschedule( WESTWOOD_TAU / WESTWOOD_M );
+        }
+    }    
+
     /**
      * fired to reschedule event notification
      */
@@ -1428,13 +1606,16 @@ class Connection {
             _packet = packet;
             _nextSend = delay + _context.clock().now();
             packet.setResendPacketEvent(ResendPacketEvent.this);
-            schedule(delay);
         }
         
         public long getNextSendTime() { return _nextSend; }
 
         public void timeReached() { retransmit(); }
 
+        void fastRetransmit() {
+            reschedule(0);
+        }
+
         /**
          * Retransmit the packet if we need to.  
          *
@@ -1454,43 +1635,11 @@ class Connection {
                 _packet.cancelled();
                 return false;
             }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("Resend period reached for " + _packet);
-            boolean resend = false;
-            boolean isLowest = false;
-            synchronized (_outboundPackets) {
-                // allow appx. half the window to be "lowest" and be active resends, minimum of 3
-                // Note: we should really pick the N lowest, not the lowest one + N more who
-                // happen to get here next, as the timers get out-of-order esp. after fast retx
-                if (_packet.getSequenceNum() == _highestAckedThrough + 1 ||
-                    _packet.getNumSends() > 1 ||
-                    _activeResends.get() < Math.max(3, (_options.getWindowSize() + 1) / 2))
-                    isLowest = true;
-                if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum())))
-                    resend = true;
-            }
-            if ( (resend) && (_packet.getAckTime() <= 0) ) {
-                boolean fastRetransmit = ( (_packet.getNACKs() >= FAST_RETRANSMIT_THRESHOLD) && (_packet.getNumSends() == 1));
-                if ( (!isLowest) && (!fastRetransmit) ) {
-                    // we want to resend this packet, but there are already active
-                    // resends in the air and we dont want to make a bad situation 
-                    // worse.  wait another second
-                    // BUG? seq# = 0, activeResends = 0, loop forever - why?
-                    // also seen with seq# > 0. Is the _activeResends count reliable?
-                    if (_log.shouldLog(Log.INFO))
-                        _log.info("Delaying resend of " + _packet + " with " 
-                                  + _activeResends + " active resend, "
-                                  + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize());
-                    forceReschedule(1333);
-                    _nextSend = 1333 + _context.clock().now();
-                    return false;
-                }
+
                 
                 // It's the lowest, or it's fast retransmit time. Resend the packet.
 
-                if (fastRetransmit)
-                    _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
+                _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime());
                 
                 // revamp various fields, in case we need to ack more, etc
                 // updateAcks done in enqueue()
@@ -1526,9 +1675,6 @@ class Connection {
                     if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) {
                         congestionOccurred();
                         _context.statManager().addRateData("stream.con.windowSizeAtCongestion", newWindowSize, _packet.getLifetime());
-                        newWindowSize /= 2;
-                        if (newWindowSize <= 0)
-                            newWindowSize = 1;
                         
                         // The timeout for _this_ packet will be doubled below, but we also
                         // need to double the RTO for the _next_ packets.
@@ -1536,7 +1682,12 @@ class Connection {
                         // This prevents being stuck at a window size of 1, retransmitting every packet,
                         // never updating the RTT or RTO.
                         getOptions().doubleRTO();
-                        getOptions().setWindowSize(newWindowSize);
+
+                        if (_packet.getNumSends() == 1) {
+                            _ssthresh = Math.min(MAX_SLOW_START_WINDOW, Math.max( (int)(_bKFiltered * getOptions().getMinRTT()), 2 ));
+                            int wSize = getOptions().getWindowSize();
+                            getOptions().setWindowSize(Math.min(_ssthresh, wSize));
+                        }
 
                         if (_log.shouldLog(Log.INFO))
                             _log.info("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize 
@@ -1581,19 +1732,24 @@ class Connection {
                     long rto = _options.getRTO();
                     if (rto < MIN_RESEND_DELAY)
                         rto = MIN_RESEND_DELAY;
-                    long timeout = rto << (numSends-1);
+                    long timeout = rto;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     // set this before enqueue() as it passes it on to the router
                     _nextSend = timeout + _context.clock().now();
 
                     if (_outboundQueue.enqueue(_packet)) {
+                        if (_nextRetransmitTime.compareAndSet(0, _nextSend)) {
+                            if (_log.shouldLog(Log.DEBUG))
+                                _log.debug(Connection.this + " fast retransmit and schedule timer");
+                            _retransmitEvent.forceReschedule(timeout);
+                        }
                         // first resend for this packet ?
                         if (numSends == 2)
                             _activeResends.incrementAndGet();
                         if (_log.shouldLog(Log.INFO))
-                            _log.info("Resent packet " +
-                                  (fastRetransmit ? "(fast) " : "(timeout) ") +
+                            _log.info(Connection.this + " Resent packet " +
+                                  "(fast) " +
                                   _packet +
                                   " next resend in " + timeout + "ms" +
                                   " activeResends: " + _activeResends + 
@@ -1605,8 +1761,6 @@ class Connection {
                         // timer reset added 0.9.1
                         resetActivityTimer();
                     }
-
-                    forceReschedule(timeout);
                 }
                 
                 // acked during resending (... or somethin') ????????????
@@ -1618,12 +1772,6 @@ class Connection {
                 }
 
                 return true;
-            } else {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("Packet acked before resend (resend="+ resend + "): " 
-                //               + _packet + " on " + Connection.this);
-                return false;
-            }
         }
     }
 }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
index 1d48a33b3..6d0dd9c5c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
@@ -28,6 +28,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
     private int _receiveWindow;
     private int _profile;
     private int _rtt;
+    private int _minRtt = Integer.MAX_VALUE;
     private int _rttDev;
     private int _rto = INITIAL_RTO;
     private int _resendDelay;
@@ -577,6 +578,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      */
     public synchronized int getRTT() { return _rtt; }
 
+    /**
+     * @return minimum RTT observed over the life of the connection
+     */
+    public synchronized int getMinRTT() {return _minRtt; }
+
     /**
      *  not public, use updateRTT()
      */
@@ -671,6 +677,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
      *  @param measuredValue must be positive
      */
     public synchronized void updateRTT(int measuredValue) {
+        _minRtt = Math.min(_minRtt, measuredValue);
         switch(_initState) {
         case INIT:
             _initState = AckInit.FIRST;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
index 3466ee6f0..8065dd77c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
@@ -28,8 +28,6 @@ class ConnectionPacketHandler {
     private final Log _log;
     private final ByteCache _cache = ByteCache.getInstance(32, 4*1024);
 
-    public static final int MAX_SLOW_START_WINDOW = 24;
-    
     // see tickets 1939 and 2584
     private static final int IMMEDIATE_ACK_DELAY = 150;
 
@@ -432,8 +430,8 @@ class ConnectionPacketHandler {
             _context.statManager().addRateData("stream.trend", trend, newWindowSize);
             
             if ( (!congested) && (acked > 0) && (numResends <= 0) ) {
-                if (newWindowSize < con.getLastCongestionSeenAt() / 2) {
-                    // Don't make this <= LastCongestion/2 or we'll jump right back to where we were
+                int ssthresh = con.getSSThresh();
+                if (newWindowSize < ssthresh) {
                     // slow start - exponential growth
                     // grow acked/N times (where N = the slow start factor)
                     // always grow at least 1
@@ -443,10 +441,10 @@ class ConnectionPacketHandler {
                         // as it often leads to a big packet loss (30-50) all at once that
                         // takes quite a while (a minute or more) to recover from,
                         // especially if crypto tags are lost
-                        if (newWindowSize >= MAX_SLOW_START_WINDOW)
+                        if (newWindowSize >= ssthresh)
                             newWindowSize++;
                         else
-                            newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
+                            newWindowSize = Math.min(ssthresh, newWindowSize + acked);
                     } else if (acked < factor)
                         newWindowSize++;
                     else
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
index 4a2ada514..ff351232d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
@@ -35,7 +35,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
     private long _ackOn; 
     private long _cancelledOn;
     private final AtomicInteger _nackCount = new AtomicInteger();
-    private volatile boolean _retransmitted;
     private volatile SimpleTimer2.TimedEvent _resendEvent;
     
     /** not bound to a connection */
@@ -189,23 +188,22 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
      */
     public void incrementNACKs() { 
         final int cnt = _nackCount.incrementAndGet();
-        SimpleTimer2.TimedEvent evt = _resendEvent;
-        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
-            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
-            _retransmitted = true;
-            evt.reschedule(0);
+        Connection.ResendPacketEvent evt = (Connection.ResendPacketEvent) _resendEvent;
+        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null  &&
+            (_numSends.get() == 1 || _lastSend < _context.clock().now() - _connection.getOptions().getRTT())) {  // Don't fast retx if we recently resent it
+            evt.fastRetransmit();
             // the predicate used to be '+', changing to '-' --zab
             
             if (_log.shouldLog(Log.DEBUG)) {
-                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
+                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
             }
         } else if (_log.shouldLog(Log.DEBUG)) {
-            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
+            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, "+
                     " numSends=%d, lastSend=%d, now=%d",
-                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
+                    toString(), cnt, _numSends.get(), _lastSend, _context.clock().now());
                     _log.debug(log);
         }
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
index ae9e06196..08aa1639b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/TCBShare.java
@@ -42,7 +42,7 @@ class TCBShare {
     /////
     private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
     private static final int MAX_RTT_DEV = (int) (MAX_RTT * 1.5);
-    private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
+    private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE;
     
     public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
         _context = ctx;

3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
Note: See TracQuery for help on using queries.