Changeset aef021d


Ignore:
Timestamp:
Sep 7, 2012 10:49:24 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
871f046
Parents:
489f4352
Message:
  • I2CP: Limit router/client queue sizes and queue wait times
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • core/java/src/net/i2p/client/ClientWriterRunner.java

    r489f4352 raef021d  
    55import java.util.concurrent.BlockingQueue;
    66import java.util.concurrent.LinkedBlockingQueue;
     7import java.util.concurrent.TimeUnit;
    78
    89import net.i2p.data.i2cp.I2CPMessage;
     
    2324    private BlockingQueue<I2CPMessage> _messagesToWrite;
    2425    private static volatile long __Id = 0;
     26
     27    private static final int MAX_QUEUE_SIZE = 32;
     28    private static final long MAX_SEND_WAIT = 10*1000;
    2529   
    2630    /** starts the thread too */
     
    2832        _out = out;
    2933        _session = session;
    30         _messagesToWrite = new LinkedBlockingQueue();
     34        _messagesToWrite = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
    3135        Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true);
    3236        t.start();
     
    3438
    3539    /**
    36      * Add this message to the writer's queue
    37      *
     40     * Add this message to the writer's queue.
     41     * Blocking if queue is full.
     42     * @throws I2PSessionException if we wait too long or are interrupted
    3843     */
    39     public void addMessage(I2CPMessage msg) {
     44    public void addMessage(I2CPMessage msg) throws I2PSessionException {
    4045        try {
    41             _messagesToWrite.put(msg);
    42         } catch (InterruptedException ie) {}
     46            if (!_messagesToWrite.offer(msg, MAX_SEND_WAIT, TimeUnit.MILLISECONDS))
     47                throw new I2PSessionException("Timed out waiting while write queue was full");
     48        } catch (InterruptedException ie) {
     49            throw new I2PSessionException("Interrupted while write queue was full", ie);
     50        }
    4351    }
    4452
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    r489f4352 raef021d  
    147147
    148148    private static final long VERIFY_USAGE_TIME = 60*1000;
     149
     150    private static final long MAX_SEND_WAIT = 10*1000;
    149151
    150152    void dateUpdated() {
     
    644646    /**
    645647     * Deliver an I2CP message to the router
     648     * As of 0.9.3, may block for several seconds if the write queue to the router is full
    646649     *
    647650     * @throws I2PSessionException if the message is malformed or there is an error writing it out
    648651     */
    649652    void sendMessage(I2CPMessage message) throws I2PSessionException {
    650         if (isClosed())
     653        if (isClosed()) {
    651654            throw new I2PSessionException("Already closed");
    652         else if (_queue != null)
    653             _queue.offer(message);  // internal
    654         else if (_writer == null)
     655        } else if (_queue != null) {
     656            // internal
     657            try {
     658                if (!_queue.offer(message, MAX_SEND_WAIT))
     659                    throw new I2PSessionException("Timed out waiting while write queue was full");
     660            } catch (InterruptedException ie) {
     661                throw new I2PSessionException("Interrupted while write queue was full", ie);
     662            }
     663        } else if (_writer == null) {
    655664            throw new I2PSessionException("Already closed");
    656         else
     665        } else {
    657666            _writer.addMessage(message);
     667        }
    658668    }
    659669
  • core/java/src/net/i2p/internal/I2CPMessageQueue.java

    r489f4352 raef021d  
    2323     */
    2424    public abstract boolean offer(I2CPMessage msg);
     25
     26    /**
     27     *  Send a message, blocking.
     28     *  @param timeout how long to wait for space (ms)
     29     *  @return success (false if no space available or if timed out)
     30     *  @since 0.9.3
     31     */
     32    public abstract boolean offer(I2CPMessage msg, long timeout) throws InterruptedException;
    2533
    2634    /**
  • router/java/src/net/i2p/router/client/ClientManager.java

    r489f4352 raef021d  
    5353    private static final String PROP_ENABLE_SSL = "i2cp.SSL";
    5454
     55    private static final int INTERNAL_QUEUE_SIZE = 256;
     56
    5557    public ClientManager(RouterContext context, int port) {
    5658        _ctx = context;
     
    126128        if (!_isStarted)
    127129            throw new I2PSessionException("Router client manager is shut down");
    128         // for now we make these unlimited size
    129         LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
    130         LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
     130        LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
     131        LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
    131132        I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
    132133        I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
  • router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java

    r489f4352 raef021d  
    22
    33import java.util.concurrent.BlockingQueue;
     4import java.util.concurrent.TimeUnit;
    45
    56import net.i2p.data.i2cp.I2CPMessage;
     
    3435
    3536    /**
     37     *  Send a message, blocking.
     38     *  @param timeout how long to wait for space (ms)
     39     *  @return success (false if no space available or if timed out)
     40     *  @since 0.9.3
     41     */
     42    public boolean offer(I2CPMessage msg, long timeout) throws InterruptedException {
     43        return _out.offer(msg, timeout, TimeUnit.MILLISECONDS);
     44    }
     45
     46    /**
    3647     *  Receive a message, nonblocking
    3748     *  @return message or null if none available
Note: See TracChangeset for help on using the changeset viewer.