Changeset 798bdf3


Ignore:
Timestamp:
Oct 11, 2010 3:17:35 PM (10 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
647b8f7
Parents:
fbc20da
Message:
  • Streaming:
    • Make flush() block less, by waiting only for "accept" into the streaming queue rather than "completion" (i.e. ACK from the far end). This prevents complete window stalls when flushing, and should help performance of apps that use flush(), like i2psnark (and SAM?). close() still does a flush that waits for completion, as i2ptunnel doesn't like a fast return from close().
    • flush/close javadocs and comments
  • i2ptunnel:
    • Now that streaming flush() is fixed, use it in IRCClient, and for initial data in I2PTunnelRunner, to avoid the 250 ms passive flush delay
Location:
apps
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java

    rfbc20da r798bdf3  
    1010import java.util.StringTokenizer;
    1111
    12 import net.i2p.I2PAppContext;
    1312import net.i2p.client.streaming.I2PSocket;
    1413import net.i2p.data.DataFormatException;
     
    125124        if (size == 1) // skip the rand in the most common case
    126125            return dests.get(0);
    127         int index = I2PAppContext.getGlobalContext().random().nextInt(size);
     126        int index = _context.random().nextInt(size);
    128127        return dests.get(index);
    129128    }
     
    183182                            outmsg=outmsg+"\r\n";   // rfc1459 sec. 2.3
    184183                            output.write(outmsg.getBytes("ISO-8859-1"));
     184                            // probably doesn't do much but can't hurt
     185                            output.flush();
    185186                        } else {
    186187                            if (_log.shouldLog(Log.WARN))
     
    258259                                outmsg=outmsg+"\r\n";   // rfc1459 sec. 2.3
    259260                                output.write(outmsg.getBytes("ISO-8859-1"));
     261                                // save 250 ms in streaming
     262                                output.flush();
    260263                            } else {
    261264                                if (_log.shouldLog(Log.WARN))
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java

    rfbc20da r798bdf3  
    130130                    // But if we don't flush, then we have to wait for the connectDelay timer to fire
    131131                    // in i2p socket? To be researched and/or fixed.
    132                     //i2pout.flush();
     132                    //
     133                    // AS OF 0.8.1, MessageOutputStream.flush() is fixed to only wait for accept,
     134                    // not for "completion" (i.e. an ACK from the far end).
     135                    // So we now get a fast return from flush(), and can do it here to save 250 ms.
     136                    // To make sure we are under the initial window size and don't hang waiting for accept,
     137                    // only flush if it fits in one message.
     138                    if (initialI2PData.length <= 1730)   // ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE
     139                        i2pout.flush();
    133140                }
    134141            }
  • apps/streaming/java/src/net/i2p/client/streaming/Connection.java

    rfbc20da r798bdf3  
    149149   
    150150    /**
     151     * This doesn't "send a choke". Rather, it blocks if the outbound window is full,
     152     * thus choking the sender that calls this.
     153     *
    151154     * Block until there is an open outbound packet slot or the write timeout
    152155     * expires. 
     156     * PacketLocal is the only caller, generally with -1.
    153157     *
    154      * @param timeoutMs PacketLocal is the only caller, often with -1??????
    155      * @return true if the packet should be sent
     158     * @param timeoutMs 0 or negative means wait forever, 5 minutes max
     159     * @return true if the packet should be sent, false for a fatal error
     160     *         will return false after 5 minutes even if timeoutMs is <= 0.
    156161     */
    157162    boolean packetSendChoke(long timeoutMs) {
    158163        // if (false) return true; // <--- what the fuck??
    159164        long start = _context.clock().now();
    160         long writeExpire = start + timeoutMs;
     165        long writeExpire = start + timeoutMs;  // only used if timeoutMs > 0
    161166        boolean started = false;
    162167        while (true) {
  • apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java

    rfbc20da r798bdf3  
    4444    private int _sendBps;
    4545   
     46    /**
     47     *  Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000,
     48     *  we only wait 250 at the start. Guess that's ok, 1000 is too long anyway.
     49     */
    4650    private static final int DEFAULT_PASSIVE_FLUSH_DELAY = 250;
    4751
     
    274278   
    275279    /**
    276      * Flush the data already queued up, blocking until it has been
    277      * delivered.
     280     * Flush the data already queued up, blocking only if the outbound
     281     * window is full.
     282     *
     283     * Prior to 0.8.1, this blocked until "delivered".
     284     * "Delivered" meant "received an ACK from the far end",
     285     * which is not the commom implementation of flush(), and really hurt the
     286     * performance of i2psnark, which flush()ed frequently.
     287     * Calling flush() would cause a complete window stall.
     288     *
     289     * As of 0.8.1, only wait for accept into the streaming output queue.
     290     * This will speed up snark significantly, and allow us to flush()
     291     * the initial data in I2PTunnelRunner, saving 250 ms.
    278292     *
    279293     * @throws IOException if the write fails
     
    284298      * Documented here, but doesn't belong in the javadoc.
    285299      */
     300        flush(true);
     301    }
     302
     303    /**
     304     *  @param wait_for_accept_only see discussion in close() code
     305     *  @@since 0.8.1
     306     */
     307    private void flush(boolean wait_for_accept_only) throws IOException {
    286308        long begin = _context.clock().now();
    287309        WriteStatus ws = null;
     
    298320                return;
    299321            }
    300             ws = _dataReceiver.writeData(_buf, 0, _valid);
    301             _written += _valid;
    302             _valid = 0;
    303             locked_updateBufferSize();
    304             _lastFlushed = _context.clock().now();
    305             _dataLock.notifyAll();
     322            // if valid == 0 return ??? - no, this could flush a CLOSE packet too.
     323
     324            // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below
     325            // (disabled)
     326            if (!wait_for_accept_only) {
     327                ws = _dataReceiver.writeData(_buf, 0, _valid);
     328                _written += _valid;
     329                _valid = 0;
     330                locked_updateBufferSize();
     331                _lastFlushed = _context.clock().now();
     332                _dataLock.notifyAll();
     333            }
    306334        }
    307335       
     336        // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1
     337        // must do this outside the data lock
     338        if (wait_for_accept_only) {
     339            flushAvailable(_dataReceiver, true);
     340            return;
     341        }
     342
     343        // Wait a loooooong time, until we have the ACK
    308344        if (_log.shouldLog(Log.DEBUG))
    309345            _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws);
     
    329365    }
    330366   
     367    /**
     368     *  This does a flush, and BLOCKS until
     369     *  the CLOSE packet is acked.
     370     */
    331371    @Override
    332372    public void close() throws IOException {
     
    335375            return;
    336376        }
     377        // setting _closed before flush() will force flush() to send a CLOSE packet
    337378        _closed = true;
    338         flush();
     379
     380        // In 0.8.1 we rewrote flush() to only wait for accept into the window,
     381        // not "completion" (i.e. ack from the far end).
     382        // Unfortunately, that broke close(), at least in i2ptunnel HTTPClient.
     383        // Symptom was premature close, i.e. incomplete pages and images.
     384        // Possible cause - I2PTunnelRunner code? or the code here that follows flush()?
     385        // It seems like we shouldn't have to wait for the far-end ACK for a close packet,
     386        // should we? To be researched further.
     387        // false -> wait for completion, not just accept.
     388        flush(false);
    339389        _log.debug("Output stream closed after writing " + _written);
    340390        ByteArray ba = null;
     
    352402        }
    353403    }
    354     /** nonblocking close */
     404
     405    /**
     406     *  nonblocking close -
     407     *  Use outside of this package is deprecated, should be made package local
     408     */
    355409    public void closeInternal() {
    356410        _closed = true;
     
    413467            _log.info("flushAvailable() valid = " + _valid);
    414468        synchronized (_dataLock) {
     469            // if valid == 0 return ??? - no, this could flush a CLOSE packet too.
     470
    415471            // _buf may be null, but the data receiver can handle that just fine,
    416472            // deciding whether or not to send a packet
     
    458514    /** Define a way to detect the status of a write */
    459515    public interface WriteStatus {
    460         /** wait until the data written either fails or succeeds */
     516        /**
     517         * Wait until the data written either fails or succeeds.
     518         * Success means an ACK FROM THE FAR END.
     519         * @param maxWaitMs -1 = forever
     520         */
    461521        public void waitForCompletion(int maxWaitMs);
     522
    462523        /**
    463          * wait until the data written is accepted into the outbound pool,
     524         * Wait until the data written is accepted into the outbound pool,
     525         * (i.e. the outbound window is not full)
    464526         * which we throttle rather than accept arbitrary data and queue
    465          * @param maxWaitMs -1 = forever ?
     527         * @param maxWaitMs -1 = forever
    466528         */
    467529        public void waitForAccept(int maxWaitMs);
     530
    468531        /** the write was accepted.  aka did the socket not close? */
    469532        public boolean writeAccepted();
  • apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java

    rfbc20da r798bdf3  
    195195   
    196196    /**
    197      * @param maxWaitMs MessageOutputStream is the only caller, often with -1 ??????
     197     * Blocks until outbound window is not full. See Connection.packetSendChoke().
     198     * @param maxWaitMs MessageOutputStream is the only caller, generally with -1
    198199     */
    199200    public void waitForAccept(int maxWaitMs) {
     
    221222    }
    222223   
     224    /** block until the packet is acked from the far end */
    223225    public void waitForCompletion(int maxWaitMs) {
    224226        long expiration = _context.clock().now()+maxWaitMs;
Note: See TracChangeset for help on using the changeset viewer.