Changeset 2d8f0c2
- Timestamp:
- Feb 9, 2017 5:24:03 PM (4 years ago)
- Branches:
- master
- Children:
- 50450ec, e01c443
- Parents:
- f0241d4
- Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java
rf0241d4 r2d8f0c2 70 70 private long _lastCongestionTime; 71 71 private volatile long _lastCongestionHighestUnacked; 72 /** has the other side choked us? */ 73 private volatile boolean _isChoked; 74 /** are we choking the other side? */ 75 private volatile boolean _isChoking; 76 private final AtomicInteger _unchokesToSend = new AtomicInteger(); 72 77 private final AtomicBoolean _ackSinceCongestion; 73 78 /** Notify this on connection (or connection failure) */ … … 103 108 104 109 public static final int MAX_WINDOW_SIZE = 128; 110 private static final int UNCHOKES_TO_SEND = 8; 105 111 106 112 /**** … … 188 194 synchronized (_outboundPackets) { 189 195 if (!started) 190 _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size() , timeoutMs);196 _context.statManager().addRateData("stream.chokeSizeBegin", _outboundPackets.size()); 191 197 if (start + 5*60*1000 < _context.clock().now()) // ok, 5 minutes blocking? I dont think so 192 198 return false; … … 206 212 int unacked = _outboundPackets.size(); 207 213 int wsz = _options.getWindowSize(); 208 if ( unacked >= wsz ||214 if (_isChoked || unacked >= wsz || 209 215 _activeResends.get() >= (wsz + 1) / 2 || 210 216 _lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) { … … 212 218 if (timeLeft <= 0) { 213 219 if (_log.shouldLog(Log.INFO)) 214 _log.info("Outbound window is full "+ unacked220 _log.info("Outbound window is full (choked? " + _isChoked + ' ' + unacked 215 221 + " unacked with " + _activeResends + " active resends" 216 222 + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): " … … 219 225 } 220 226 if (_log.shouldLog(Log.DEBUG)) 221 _log.debug("Outbound window is full ( " + unacked + "/" + wsz + "/"227 _log.debug("Outbound window is full (choked? " + _isChoked + ' ' + unacked + '/' + wsz + '/' 222 228 + _activeResends + "), waiting " + timeLeft); 223 229 try { … … 241 247 } 242 248 } else { 243 _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size() , _context.clock().now() - start);249 _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size()); 244 250 return true; 245 251 } … … 258 264 259 265 void ackImmediately() { 260 PacketLocal packet = null;266 PacketLocal packet; 261 267 /*** why would we do this? 262 268 was it to force a congestion indication at the other end? … … 344 350 } 345 351 352 /** 353 * This sends all 'normal' packets (acks and data) for the first time. 354 * Retransmits are done in ResendPacketEvent below. 355 * Resets, pings, and pongs are done elsewhere in this class, 356 * or in ConnectionManager or ConnectionHandler. 357 */ 346 358 void sendPacket(PacketLocal packet) { 347 359 if (packet == null) return; … … 354 366 355 367 if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { 356 //if (_log.shouldLog(Log.DEBUG)) 357 // _log.debug("No resend for " + packet); 368 // ACK-only 369 if (_isChoking) { 370 packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE); 371 packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 372 } else if (_unchokesToSend.decrementAndGet() > 0) { 373 // don't worry about wrapping around 374 packet.setOptionalDelay(0); 375 packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 376 } 358 377 } else { 359 378 int windowSize; … … 365 384 _outboundPackets.notifyAll(); 366 385 } 367 // the other end has no idea what our window size is, so 368 // help him out by requesting acks below the 1/3 point, 369 // if remaining < 3, and every 8 minimum. 370 if (packet.isFlagSet(Packet.FLAG_CLOSE) || 371 (remaining < (windowSize + 2) / 3) || 386 387 if (_isChoking) { 388 packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE); 389 packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 390 } else if (packet.isFlagSet(Packet.FLAG_CLOSE) || 391 _unchokesToSend.decrementAndGet() > 0 || 392 // the other end has no idea what our window size is, so 393 // help him out by requesting acks below the 1/3 point, 394 // if remaining < 3, and every 8 minimum. 372 395 (remaining < 3) || 373 (packet.getSequenceNum() % 8 == 0)) { 396 (remaining < (windowSize + 2) / 3) /* || 397 (packet.getSequenceNum() % 8 == 0) */ ) { 374 398 packet.setOptionalDelay(0); 375 399 packet.setFlag(Packet.FLAG_DELAY_REQUESTED); … … 381 405 // which is always 2000, but it's good for diagnostics to see what the other end thinks 382 406 // the RTT is. 407 /** 383 408 int delay = _options.getRTT() / 2; 384 409 packet.setOptionalDelay(delay); … … 387 412 if (_log.shouldLog(Log.DEBUG)) 388 413 _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); 389 } 390 // WHY always set? 391 //packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 414 **/ 415 } 392 416 393 417 long timeout = _options.getRTO(); … … 985 1009 } 986 1010 1011 /** 1012 * Set or clear if we are choking the other side. 1013 * If on is true or the value has changed, this will call ackImmediately(). 1014 * @param on true for choking 1015 * @since 0.9.29 1016 */ 1017 public void setChoking(boolean on) { 1018 if (on != _isChoking) { 1019 _isChoking = on; 1020 if (!on) 1021 _unchokesToSend.set(UNCHOKES_TO_SEND); 1022 ackImmediately(); 1023 } else if (on) { 1024 ackImmediately(); 1025 } 1026 } 1027 1028 /** 1029 * Set or clear if we are being choked by the other side. 1030 * @param on true for choked 1031 * @since 0.9.29 1032 */ 1033 public void setChoked(boolean on) { 1034 _isChoked = on; 1035 if (on) { 1036 congestionOccurred(); 1037 // https://en.wikipedia.org/wiki/Transmission_Control_Protocol 1038 // When a receiver advertises a window size of 0, the sender stops sending data and starts the persist timer. 1039 // The persist timer is used to protect TCP from a deadlock situation that could arise 1040 // if a subsequent window size update from the receiver is lost, 1041 // and the sender cannot send more data until receiving a new window size update from the receiver. 1042 // When the persist timer expires, the TCP sender attempts recovery by sending a small packet 1043 // so that the receiver responds by sending another acknowledgement containing the new window size. 1044 // ... 1045 // We don't do any of that, but we set the window size to 1, and let the retransmission 1046 // of packets do the "attempted recovery". 1047 getOptions().setWindowSize(1); 1048 } 1049 } 1050 1051 /** 1052 * Is the other side choking us? 1053 * @return if choked 1054 * @since 0.9.29 1055 */ 1056 public boolean isChoked() { 1057 return _isChoked; 1058 } 1059 987 1060 /** how many packets have we sent and the other side has ACKed? 988 1061 * @return Count of how many packets ACKed. … … 1382 1455 // updateAcks done in enqueue() 1383 1456 //_inputStream.updateAcks(_packet); 1384 int choke = getOptions().getChoke(); 1385 _packet.setOptionalDelay(choke); 1386 if (choke > 0) 1457 if (_isChoking) { 1458 _packet.setOptionalDelay(Packet.SEND_DELAY_CHOKE); 1387 1459 _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 1460 } else if (_unchokesToSend.decrementAndGet() > 0) { 1461 // don't worry about wrapping around 1462 _packet.setOptionalDelay(0); 1463 _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 1464 } else { 1465 // clear flag 1466 _packet.setFlag(Packet.FLAG_DELAY_REQUESTED, false); 1467 } 1468 1388 1469 // this seems unnecessary to send the MSS again: 1389 1470 //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); … … 1397 1478 int newWindowSize = getOptions().getWindowSize(); 1398 1479 1399 if (_ackSinceCongestion.get()) { 1480 if (_isChoked) { 1481 congestionOccurred(); 1482 getOptions().setWindowSize(1); 1483 } else if (_ackSinceCongestion.get()) { 1400 1484 // only shrink the window once per window 1401 1485 if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) { -
apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionDataReceiver.java
rf0241d4 r2d8f0c2 157 157 158 158 /** 159 * Compose a packet. 160 * Most flags are set here; however, some are set in Connection.sendPacket() 161 * and Connection.ResendPacketEvent.retransmit(). 162 * Take care not to set the same options both here and in Connection. 163 * 159 164 * @param buf data to be sent - may be null 160 165 * @param off offset into the buffer to start writing from … … 165 170 */ 166 171 private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { 167 Connection con = _connection;168 172 if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")"); 169 boolean ackOnly = isAckOnly( con, size);170 boolean isFirst = ( con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0);171 172 PacketLocal packet = new PacketLocal(_context, con.getRemotePeer(), con);173 boolean ackOnly = isAckOnly(_connection, size); 174 boolean isFirst = (_connection.getAckedPackets() <= 0) && (_connection.getUnackedPacketsSent() <= 0); 175 176 PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection); 173 177 //ByteArray data = packet.acquirePayload(); 174 178 ByteArray data = new ByteArray(new byte[size]); … … 181 185 packet.setSequenceNum(0); 182 186 else 183 packet.setSequenceNum( con.getNextOutboundPacketNum());184 packet.setSendStreamId( con.getSendStreamId());185 packet.setReceiveStreamId( con.getReceiveStreamId());187 packet.setSequenceNum(_connection.getNextOutboundPacketNum()); 188 packet.setSendStreamId(_connection.getSendStreamId()); 189 packet.setReceiveStreamId(_connection.getReceiveStreamId()); 186 190 187 191 // not needed here, handled in PacketQueue.enqueue() 188 192 //con.getInputStream().updateAcks(packet); 189 // note that the optional delay is usually rewritten in Connection.sendPacket() 190 int choke = con.getOptions().getChoke(); 191 packet.setOptionalDelay(choke); 192 if (choke > 0) 193 packet.setFlag(Packet.FLAG_DELAY_REQUESTED); 193 194 // Do not set optional delay here, set in Connection.sendPacket() 195 194 196 // bugfix release 0.7.8, we weren't dividing by 1000 195 packet.setResendDelay( con.getOptions().getResendDelay() / 1000);196 197 if ( con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE)197 packet.setResendDelay(_connection.getOptions().getResendDelay() / 1000); 198 199 if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) 198 200 packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true); 199 201 else 200 202 packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false); 201 202 packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, con.getOptions().getRequireFullySigned());203 203 204 204 //if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { … … 206 206 packet.setFlag(Packet.FLAG_SYNCHRONIZE); 207 207 packet.setOptionalFrom(); 208 packet.setOptionalMaxSize( con.getOptions().getMaxMessageSize());209 } 210 packet.setLocalPort( con.getLocalPort());211 packet.setRemotePort( con.getPort());212 if ( con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {208 packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize()); 209 } 210 packet.setLocalPort(_connection.getLocalPort()); 211 packet.setRemotePort(_connection.getPort()); 212 if (_connection.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { 213 213 packet.setFlag(Packet.FLAG_NO_ACK); 214 214 } … … 222 222 // throughout network? 223 223 // 224 if ( con.getOutputStream().getClosed() &&225 ( (size > 0) || ( con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) {224 if (_connection.getOutputStream().getClosed() && 225 ( (size > 0) || (_connection.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { 226 226 packet.setFlag(Packet.FLAG_CLOSE); 227 con.notifyCloseSent();227 _connection.notifyCloseSent(); 228 228 } 229 229 if (_log.shouldLog(Log.DEBUG)) -
apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java
rf0241d4 r2d8f0c2 106 106 _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); 107 107 // Stats for Connection 108 _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60* 1000, 10*60*1000, 60*60*1000 });109 _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60* 1000, 10*60*1000, 60*60*1000 });110 _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60* 1000, 10*60*1000, 60*60*1000 });111 _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000,10*60*1000 });108 _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*60*1000 }); 109 _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*60*1000 }); 110 _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*60*1000 }); 111 _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 10*60*1000 }); 112 112 // Stats for PacketQueue 113 _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000,10*60*1000, 60*60*1000 });114 _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000,10*60*1000, 60*60*1000 });113 _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 }); 114 _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 10*60*1000, 60*60*1000 }); 115 115 } 116 116 -
apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionOptions.java
rf0241d4 r2d8f0c2 34 34 private int _sendAckDelay; 35 35 private int _maxMessageSize; 36 private int _choke;37 36 private int _maxResends; 38 37 private int _inactivityTimeout; … … 328 327 setResendDelay(opts.getResendDelay()); 329 328 setMaxMessageSize(opts.getMaxMessageSize()); 330 setChoke(opts.getChoke());331 329 setMaxResends(opts.getMaxResends()); 332 330 setInactivityTimeout(opts.getInactivityTimeout()); … … 678 676 public void setMaxMessageSize(int bytes) { _maxMessageSize = Math.max(bytes, MIN_MESSAGE_SIZE); } 679 677 680 /**681 * how long we want to wait before any data is transferred on the682 * connection in either direction683 *684 * @return how long to wait before any data is transferred in either direction in ms685 */686 public int getChoke() { return _choke; }687 public void setChoke(int ms) { _choke = ms; }688 689 678 /** 690 679 * What profile do we want to use for this connection? -
apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java
rf0241d4 r2d8f0c2 97 97 boolean choke = false; 98 98 if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) { 99 if (packet.getOptionalDelay() > 60000) {99 if (packet.getOptionalDelay() >= Packet.MIN_DELAY_CHOKE) { 100 100 // requested choke 101 101 choke = true; 102 if (_log.shouldWarn()) 103 _log.warn("Got a choke on connection " + con + ": " + packet); 102 104 //con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); 103 105 } 106 // Only call this if the flag is set 107 con.setChoked(choke); 104 108 } 105 109 … … 107 111 if (_log.shouldWarn()) 108 112 _log.warn("Inbound buffer exceeded on connection " + con + 109 ", dropping " + packet); 110 con.getOptions().setChoke(61*1000); 113 ", choking and dropping " + packet); 114 // this will call ackImmediately() 115 con.setChoking(true); 116 // TODO we could still process the acks for this packet before discarding 111 117 packet.releasePayload(); 112 con.ackImmediately();113 118 return; 114 } 115 con.getOptions().setChoke(0); 119 } // else we will call setChoking(false) below 116 120 117 121 _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize()); … … 133 137 // But not ack-only packets! 134 138 boolean isNew; 135 if (seqNum > 0 || isSYN) 136 isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()); 137 else 139 if (seqNum > 0 || isSYN) { 140 isNew = con.getInputStream().messageReceived(seqNum, packet.getPayload()) && 141 !allowAck; 142 } else { 138 143 isNew = false; 139 if (!allowAck) 140 isNew = false; 144 } 145 146 if (isNew && packet.getPayloadSize() > 1500) { 147 // don't clear choking unless it was new, and a big packet 148 // this will call ackImmediately() if changed 149 // TODO if this filled in a hole, we shouldn't unchoke 150 // TODO a bunch of small packets should unchoke also 151 con.setChoking(false); 152 } 141 153 142 154 //if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { … … 159 171 } 160 172 161 boolean fastAck = false;162 173 boolean ackOnly = false; 163 174 … … 171 182 //con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); 172 183 // honor request "almost" immediately 184 // TODO the 250 below _may_ be a big limiter in how fast local "loopback" connections 185 // can go, however if it goes too fast then we start choking which causes 186 // frequent stalls anyway. 173 187 con.setNextSendTime(_context.clock().now() + 250); 174 188 } else { … … 223 237 } 224 238 239 boolean fastAck; 225 240 if (isSYN && (packet.getSendStreamId() <= 0) ) { 226 241 // don't honor the ACK 0 in SYN packets received when the other side 227 242 // has obviously not seen our messages 243 fastAck = false; 228 244 } else { 229 245 fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew, choke); 230 246 } 231 247 con.eventOccurred(); 232 if (fastAck ) {248 if (fastAck && !choke) { 233 249 if (!isNew) { 234 250 // if we're congested (fastAck) but this is also a new packet, … … 267 283 /** 268 284 * Process the acks in a received packet, and adjust our window and RTT 285 * @param isNew was it a new packet? false for ack-only 286 * @param choke did we get a choke in the packet? 269 287 * @return are we congested? 270 288 */ … … 355 373 } 356 374 357 /** @return are we congested? */ 375 /** 376 * This either does nothing or increases the window, it never decreases it. 377 * Decreasing is done in Connection.ResendPacketEvent.retransmit() 378 * 379 * @param isNew was it a new packet? false for ack-only 380 * @param sequenceNum 0 for ack-only 381 * @param choke did we get a choke in the packet? 382 * @return are we congested? 383 */ 358 384 private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) { 359 boolean congested = false;360 if ( (!isNew) && (sequenceNum > 0)) {385 boolean congested; 386 if (choke || (!isNew && sequenceNum > 0) || con.isChoked()) { 361 387 if (_log.shouldLog(Log.DEBUG)) 362 388 _log.debug("Congestion occurred on the sending side. Not adjusting window "+con); 363 364 389 congested = true; 365 } 366 390 } else { 391 congested = false; 392 } 393 367 394 long lowest = con.getHighestAckedThrough(); 368 395 // RFC 2581 -
apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java
rf0241d4 r2d8f0c2 15 15 * Receive raw information from the I2PSession and turn it into 16 16 * Packets, if we can. 17 * <p>17 *<p> 18 18 * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream 19 19 */ -
apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java
rf0241d4 r2d8f0c2 17 17 * Stream that can be given messages out of order 18 18 * yet present them in order. 19 * <p>19 *<p> 20 20 * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream 21 * <p>21 *<p> 22 22 * This buffers unlimited data via messageReceived() - 23 23 * limiting / blocking is done in ConnectionPacketHandler.receivePacket(). … … 103 103 /** 104 104 * Determine if this packet will fit in our buffering limits. 105 * Always returns true for zero payloadSize. 105 106 * 106 107 * @return true if we have room. If false, do not call messageReceived() -
apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java
rf0241d4 r2d8f0c2 17 17 * on flush or when the buffer is full. It also blocks according 18 18 * to the data receiver's needs. 19 * <p>19 *<p> 20 20 * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession 21 21 */ -
apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java
rf0241d4 r2d8f0c2 165 165 public static final int DEFAULT_MAX_SIZE = 32*1024; 166 166 protected static final int MAX_DELAY_REQUEST = 65535; 167 public static final int MIN_DELAY_CHOKE = 60001; 168 public static final int SEND_DELAY_CHOKE = 61000; 167 169 168 170 /** -
apps/streaming/java/src/net/i2p/client/streaming/impl/PacketHandler.java
rf0241d4 r2d8f0c2 12 12 * receive a packet and dispatch it correctly to the connection specified, 13 13 * the server socket, or queue a reply RST packet. 14 * <p>14 *<p> 15 15 * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream 16 16 */ -
apps/streaming/java/src/net/i2p/client/streaming/impl/PacketLocal.java
rf0241d4 r2d8f0c2 110 110 FLAG_CLOSE | 111 111 FLAG_ECHO); 112 }113 114 /** last minute update of ack fields, just before write/sign */115 public void prepare() {116 if (_connection != null)117 _connection.getInputStream().updateAcks(this);118 int numSends = _numSends.get();119 if (numSends > 0) {120 // so we can debug to differentiate resends121 setOptionalDelay(numSends * 1000);122 setFlag(FLAG_DELAY_REQUESTED);123 }124 112 } 125 113 -
apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java
rf0241d4 r2d8f0c2 24 24 * send them immediately with no blocking, since the 25 25 * mode=bestEffort doesnt block in the SDK. 26 * <p>26 *<p> 27 27 * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession 28 28 */ … … 65 65 66 66 /** 67 * Add a new packet to be sent out ASAP 67 * Add a new packet to be sent out ASAP. 68 * This updates the acks. 68 69 * 69 70 * keys and tags disabled since dropped in I2PSession … … 73 74 if (_dead) 74 75 return false; 75 // this updates the ack/nack field76 packet.prepare();77 76 78 77 //SessionKey keyUsed = packet.getKeyUsed(); … … 88 87 return false; 89 88 } 89 90 Connection con = packet.getConnection(); 91 if (con != null) { 92 // this updates the ack/nack fields 93 con.getInputStream().updateAcks(packet); 94 } 90 95 91 96 ByteArray ba = _cache.acquire(); … … 97 102 try { 98 103 int size = 0; 99 long beforeWrite = System.currentTimeMillis();104 //long beforeWrite = System.currentTimeMillis(); 100 105 if (packet.shouldSign()) 101 106 size = packet.writeSignedPacket(buf, 0); 102 107 else 103 108 size = packet.writePacket(buf, 0); 104 long writeTime = System.currentTimeMillis() - beforeWrite;105 if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) )106 _log.warn("took " + writeTime + "ms to write the packet: " + packet);109 //long writeTime = System.currentTimeMillis() - beforeWrite; 110 //if ( (writeTime > 1000) && (_log.shouldLog(Log.WARN)) ) 111 // _log.warn("took " + writeTime + "ms to write the packet: " + packet); 107 112 108 113 // last chance to short circuit... … … 122 127 boolean listenForStatus = false; 123 128 if (packet.isFlagSet(FLAGS_INITIAL_TAGS)) { 124 Connection con = packet.getConnection();125 129 if (con != null) { 126 130 if (con.isInbound()) … … 142 146 options.setTagThreshold(FINAL_TAG_THRESHOLD); 143 147 } else { 144 Connection con = packet.getConnection();145 148 if (con != null) { 146 149 if (con.isInbound() && con.getLifetime() < 2*60*1000) … … 158 161 I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort(), 159 162 options, this); 160 _messageStatusMap.put(Long.valueOf(id), packet.getConnection());163 _messageStatusMap.put(Long.valueOf(id), con); 161 164 sent = true; 162 165 } else { … … 174 177 _context.statManager().addRateData("stream.con.sendDuplicateSize", size, packet.getLifetime()); 175 178 176 Connection con = packet.getConnection();177 179 if (con != null) { 178 180 con.incrementBytesSent(size); … … 190 192 if (_log.shouldLog(Log.WARN)) 191 193 _log.warn("Send failed for " + packet); 192 Connection c = packet.getConnection(); 193 if (c != null) // handle race on b0rk 194 c.disconnect(false); 194 if (con != null) // handle race on b0rk 195 con.disconnect(false); 195 196 } else { 196 197 //packet.setKeyUsed(keyUsed); 197 198 //packet.setTagsSent(tagsSent); 198 199 packet.incrementSends(); 199 Connection c = packet.getConnection(); 200 if (c != null && _log.shouldDebug()) { 201 String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO(); 202 c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix); 200 if (con != null && _log.shouldDebug()) { 201 String suffix = "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO(); 202 con.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix); 203 203 } 204 204 if (I2PSocketManagerFull.pcapWriter != null && -
history.txt
rf0241d4 r2d8f0c2 1 2017-02-09 zzz 2 * Streaming: Fix optional delay and choking (tickets #1046, 1939) 3 4 2017-02-08 zzz 5 * I2CP: Return local delivery failure on queue overflow (ticket #1939) 6 7 2017-02-05 zzz 8 * Console: Consolidate timer threads (ticket #1068) 9 * NTCP: Don't write to an inbound connection before 10 fully established, causing NPE (ticket #996) 11 * Streaming: 12 - Don't always send optional delay (ticket #1046) 13 - Don't hard fail on expired message error (ticket #1748) 14 1 15 2017-02-04 zzz 16 * HTTP proxies: 17 - Pass through relative referer URIs, convert same-origin 18 absolute referer URIs to relative (ticket #1862) 2 19 * NTP: Enable IPv6 support (ticket #1896) 3 20 -
router/java/src/net/i2p/router/RouterVersion.java
rf0241d4 r2d8f0c2 19 19 public final static String ID = "Monotone"; 20 20 public final static String VERSION = CoreVersion.VERSION; 21 public final static long BUILD = 5;21 public final static long BUILD = 6; 22 22 23 23 /** for example "-test" */
Note: See TracChangeset
for help on using the changeset viewer.