Changeset 3427464


Ignore:
Timestamp:
Jan 25, 2014 12:46:30 AM (6 years ago)
Author:
zab2 <zab2@…>
Branches:
master
Children:
747d8333
Parents:
9e87fd9
Message:

Move OutNetMessage? buffer preparation to the Writer threads

(Ticket #1184)
Up version to -1

Files:
5 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    r9e87fd9 r3427464  
     12014-01-25 zab
     2 * Move OutNetMessage buffer preparation to the Writer threads
     3   (Ticket #1184)
     4 * Up version to -1
     5
    16* 2014-01-22 0.9.10 released
    27
  • router/java/src/net/i2p/router/OutNetMessage.java

    r9e87fd9 r3427464  
    5858     */
    5959    private List<String> _timestampOrder;
    60     private Object _preparationBuf;
    6160   
    6261    /**
     
    293292    public void beginSend() { _sendBegin = _context.clock().now(); }
    294293
    295     public void prepared(Object buf) {
    296         _preparationBuf = buf;
    297     }
    298 
    299     public Object releasePreparationBuffer() {
    300         Object rv = _preparationBuf;
    301         _preparationBuf = null;
    302         return rv;
    303     }
    304    
    305294    public long getCreated() { return _created; }
    306295
  • router/java/src/net/i2p/router/RouterVersion.java

    r9e87fd9 r3427464  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 0;
     21    public final static long BUILD = 1;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    r9e87fd9 r3427464  
    393393        //_outbound.drainAllTo(pending);
    394394        _outbound.drainTo(pending);
    395         for (OutNetMessage msg : pending) {
    396             Object buf = msg.releasePreparationBuffer();
    397             if (buf != null)
    398                 releaseBuf((PrepBuffer)buf);
     395        for (OutNetMessage msg : pending)
    399396            _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
    400         }
    401397
    402398        OutNetMessage msg = getCurrentOutbound();
    403         if (msg != null) {
    404             Object buf = msg.releasePreparationBuffer();
    405             if (buf != null)
    406                 releaseBuf((PrepBuffer)buf);
     399        if (msg != null)
    407400            _transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
    408         }
    409401       
    410402        return old;
     
    439431     ****/
    440432        //if (FAST_LARGE)
    441             bufferedPrepare(msg);
    442433        _outbound.offer(msg);
    443434        //int enqueued = _outbound.size();
     
    605596     * prepare the next i2np message for transmission.  this should be run from
    606597     * the Writer thread pool.
     598     *
     599     * @param prep an instance of PrepBuffer to use as scratch space
    607600     *
    608601     */
    609     synchronized void prepareNextWrite() {
     602    synchronized void prepareNextWrite(PrepBuffer prep) {
    610603        //if (FAST_LARGE)
    611             prepareNextWriteFast();
     604            prepareNextWriteFast(prep);
    612605        //else
    613606        //    prepareNextWriteSmall();
     
    718711     *
    719712     * Caller must synchronize.
     713     * @param buf a PrepBuffer to use as scratch space
    720714     *
    721715     */
    722     private void prepareNextWriteFast() {
     716    private void prepareNextWriteFast(PrepBuffer buf) {
    723717        if (_closed.get())
    724718            return;
     
    781775       
    782776        //long begin = System.currentTimeMillis();
    783         PrepBuffer buf = (PrepBuffer)msg.releasePreparationBuffer();
    784         if (buf == null) {
    785             // race, see ticket #392
    786             //throw new RuntimeException("buf is null for " + msg);
    787             if (_log.shouldLog(Log.WARN))
    788                 _log.warn("Null prep buf for " + msg);
    789             return;
    790         }
     777        bufferedPrepare(msg,buf);
    791778        _context.aes().encrypt(buf.unencrypted, 0, buf.encrypted, 0, _sessionKey, _prevWriteEnd, 0, buf.unencryptedLength);
    792779        System.arraycopy(buf.encrypted, buf.encrypted.length-16, _prevWriteEnd, 0, _prevWriteEnd.length);
     
    798785        _transport.getPumper().wantsWrite(this, buf.encrypted);
    799786        //long wantsTime = System.currentTimeMillis();
    800         releaseBuf(buf);
    801787        //long releaseTime = System.currentTimeMillis();
    802788        //if (_log.shouldLog(Log.DEBUG))
     
    818804    /**
    819805     * Serialize the message/checksum/padding/etc for transmission, but leave off
    820      * the encryption for the actual write process (when we will always have the
    821      * end of the previous encrypted transmission to serve as our IV).  with care,
    822      * the encryption could be handled here too, as long as messages aren't expired
    823      * in the queue and the establishment process takes that into account.
    824      */
    825     private void bufferedPrepare(OutNetMessage msg) {
     806     * the encryption.  This should be called from a Writer thread
     807     *
     808     * @param msg message to send
     809     * @param buf PrepBuffer to use as scratch space
     810     */
     811    private void bufferedPrepare(OutNetMessage msg, PrepBuffer buf) {
    826812        //if (!_isInbound && !_established)
    827813        //    return;
    828814        //long begin = System.currentTimeMillis();
    829         PrepBuffer buf = acquireBuf();
    830815        //long alloc = System.currentTimeMillis();
    831816       
     
    864849       
    865850        //long crced = System.currentTimeMillis();
    866         msg.prepared(buf);
    867851        //if (_log.shouldLog(Log.DEBUG))
    868852        //    _log.debug("Buffered prepare took " + (crced-begin) + ", alloc=" + (alloc-begin)
    869853        //               + " serialize=" + (serialized-alloc) + " crc=" + (crced-serialized));
    870854    }
    871    
    872     private static final int MIN_BUFS = 4;
    873     private static final int MAX_BUFS = 16;
    874     private static int NUM_PREP_BUFS;
    875     static {
    876         long maxMemory = SystemVersion.getMaxMemory();
    877         NUM_PREP_BUFS = (int) Math.max(MIN_BUFS, Math.min(MAX_BUFS, 1 + (maxMemory / (16*1024*1024))));
    878     }
    879 
    880     private final static LinkedBlockingQueue<PrepBuffer> _bufs = new LinkedBlockingQueue<PrepBuffer>(NUM_PREP_BUFS);
    881 
    882     /**
    883      *  32KB each
    884      *  @return initialized buffer
    885      */
    886     private static PrepBuffer acquireBuf() {
    887         PrepBuffer b = _bufs.poll();
    888         if (b == null)
    889             b = new PrepBuffer();
    890         return b;
    891     }
    892 
    893     private static void releaseBuf(PrepBuffer buf) {
    894         buf.init();
    895         _bufs.offer(buf);
    896     }
    897 
    898     private static class PrepBuffer {
     855
     856    public static class PrepBuffer {
    899857        final byte unencrypted[];
    900858        int unencryptedLength;
     
    904862        byte encrypted[];
    905863       
    906         PrepBuffer() {
     864        public PrepBuffer() {
    907865            unencrypted = new byte[BUFFER_SIZE];
    908866            base = new byte[BUFFER_SIZE];
     
    910868        }
    911869
    912         private void init() {
     870        public void init() {
    913871            unencryptedLength = 0;
    914872            baseLength = 0;
     
    13681326    static void releaseResources() {
    13691327        _i2npHandlers.clear();
    1370         _bufs.clear();
    13711328    }
    13721329
  • router/java/src/net/i2p/router/transport/ntcp/Writer.java

    r9e87fd9 r3427464  
    2525    private final List<Runner> _runners;
    2626   
     27    /** a scratch space to serialize and encrypt messages */
     28    private final NTCPConnection.PrepBuffer _prepBuffer;
     29   
    2730    public Writer(RouterContext ctx) {
    2831        _log = ctx.logManager().getLog(getClass());
     
    3134        _liveWrites = new HashSet<NTCPConnection>(5);
    3235        _writeAfterLive = new HashSet<NTCPConnection>(5);
     36        _prepBuffer = new NTCPConnection.PrepBuffer();
    3337    }
    3438   
     
    120124                        if (_log.shouldLog(Log.DEBUG))
    121125                            _log.debug("Prepare next write on: " + con);
    122                         con.prepareNextWrite();
     126                        _prepBuffer.init();
     127                        con.prepareNextWrite(_prepBuffer);
    123128                    } catch (RuntimeException re) {
    124129                        _log.log(Log.CRIT, "Error in the ntcp writer", re);
Note: See TracChangeset for help on using the changeset viewer.