Changeset bc4ee0fc for router


Ignore:
Timestamp:
Jun 28, 2018 5:33:34 PM (2 years ago)
Author:
zab2 <zab2@…>
Branches:
master
Children:
d42a467
Parents:
9b17b52
Message:

Implement tryLock-based object cache and make ntcp and ssu code use it

Location:
router/java/src/net/i2p/router
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    r9b17b52 rbc4ee0fc  
    2929import net.i2p.router.RouterContext;
    3030import net.i2p.router.transport.FIFOBandwidthLimiter;
     31import net.i2p.router.util.TryCache;
    3132import net.i2p.util.Addresses;
    3233import net.i2p.util.ConcurrentHashSet;
     
    5455    private final ObjectCounter<ByteArray> _blockedIPs;
    5556    private long _expireIdleWriteTime;
    56     private boolean _useDirect;
     57    private static boolean _useDirect;
    5758   
    5859    /**
     
    6465    private static final int MAX_CACHE_SIZE = 64;
    6566
    66     /**
    67      *  Read buffers. (write buffers use wrap())
    68      *  Shared if there are multiple routers in the JVM
    69      *  Note that if the routers have different PROP_DIRECT settings this will have a mix,
    70      *  so don't do that.
    71      */
    72     private static final LinkedBlockingQueue<ByteBuffer> _bufCache = new LinkedBlockingQueue<ByteBuffer>(MAX_CACHE_SIZE);
     67    private static class BufferFactory implements TryCache.ObjectFactory<ByteBuffer> {
     68        public ByteBuffer newInstance() {
     69            if (_useDirect)
     70                return ByteBuffer.allocateDirect(BUF_SIZE);
     71            else
     72                return ByteBuffer.allocate(BUF_SIZE);
     73        }
     74    }
     75   
     76    private static final TryCache<ByteBuffer> _bufferCache = new TryCache<>(new BufferFactory(), MAX_CACHE_SIZE);
    7377
    7478    /**
     
    320324
    321325
    322                 // Clear the cache if the user changes the setting,
    323                 // so we can test the effect.
    324                 boolean newUseDirect = _context.getBooleanProperty(PROP_DIRECT);
    325                 if (_useDirect != newUseDirect) {
    326                     _useDirect = newUseDirect;
    327                     _bufCache.clear();
    328                 }
     326                _useDirect = _context.getBooleanProperty(PROP_DIRECT);
    329327            } catch (RuntimeException re) {
    330328                _log.error("Error in the event pumper", re);
     
    364362        _wantsRegister.clear();
    365363        _wantsWrite.clear();
    366         _bufCache.clear();
    367364    }
    368365   
     
    463460
    464461    /**
    465      *  How many to keep in reserve.
    466      *  Shared if there are multiple routers in the JVM
    467      */
    468     private static int _numBufs = MIN_BUFS;
    469     private static int __consecutiveExtra;
    470 
    471     /**
    472462     *  High-frequency path in thread.
    473463     */
    474464    private ByteBuffer acquireBuf() {
    475         ByteBuffer rv = _bufCache.poll();
    476         // discard buffer if _useDirect setting changes
    477         if (rv == null || rv.isDirect() != _useDirect) {
    478             if (_useDirect)
    479                 rv = ByteBuffer.allocateDirect(BUF_SIZE);
    480             else
    481                 rv = ByteBuffer.allocate(BUF_SIZE);
    482             _numBufs++;
    483         }
    484         return rv;
     465        return _bufferCache.tryAcquire();
    485466    }
    486467   
     
    491472     */
    492473    public static void releaseBuf(ByteBuffer buf) {
    493         // double check
    494         if (buf.capacity() < BUF_SIZE) {
    495             I2PAppContext.getGlobalContext().logManager().getLog(EventPumper.class).error("Bad size " + buf.capacity(), new Exception());
    496             return;
    497         }
    498         buf.clear();
    499         int extra = _bufCache.size();
    500         boolean cached = extra < _numBufs;
    501 
    502         // TODO always offer if direct?
    503         if (cached) {
    504             _bufCache.offer(buf);
    505             if (extra > MIN_BUFS) {
    506                 __consecutiveExtra++;
    507                 if (__consecutiveExtra >= 20) {
    508                     if (_numBufs > MIN_BUFS)
    509                         _numBufs--;
    510                     __consecutiveExtra = 0;
    511                 }
    512             }
    513         }
     474        _bufferCache.tryRelease(buf);
    514475    }
    515476   
  • router/java/src/net/i2p/router/transport/udp/UDPPacket.java

    r9b17b52 rbc4ee0fc  
    1313import net.i2p.router.transport.FIFOBandwidthLimiter;
    1414import net.i2p.router.util.CDQEntry;
     15import net.i2p.router.util.TryCache;
    1516import net.i2p.util.Addresses;
    1617import net.i2p.util.Log;
     
    4647    private FIFOBandwidthLimiter.Request _bandwidthRequest;
    4748 
     49    private static class PacketFactory implements TryCache.ObjectFactory<UDPPacket> {
     50        static RouterContext context;
     51        public UDPPacket newInstance() {
     52            return new UDPPacket(context);
     53        }
     54    }
     55   
    4856    //  Warning - this mixes contexts in a multi-router JVM
    49     private static final Queue<UDPPacket> _packetCache;
     57    private static final TryCache<UDPPacket> _packetCache;
     58    private static final TryCache.ObjectFactory<UDPPacket> _packetFactory;
    5059    private static final boolean CACHE = true;
    51     private static final int MIN_CACHE_SIZE = 64;
    5260    private static final int MAX_CACHE_SIZE = 256;
    5361    static {
    5462        if (CACHE) {
    55             long maxMemory = SystemVersion.getMaxMemory();
    56             int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024)));
    57             _packetCache = new LinkedBlockingQueue<UDPPacket>(csize);
     63            _packetFactory = new PacketFactory();
     64            _packetCache = new TryCache<>(_packetFactory, MAX_CACHE_SIZE);
    5865        } else {
    5966            _packetCache = null;
     67            _packetFactory = null;
    6068        }
    6169    }
     
    399407        UDPPacket rv = null;
    400408        if (CACHE) {
    401             rv = _packetCache.poll();
    402             if (rv != null) {
    403                 synchronized(rv) {
    404                     if (!rv._released) {
    405                         Log log = rv._context.logManager().getLog(UDPPacket.class);
    406                         log.error("Unreleased cached packet", new Exception());
    407                         rv = null;
    408                     } else {
    409                         rv.init(ctx);
    410                     }
    411                 }
    412             }
     409            rv = _packetCache.tryAcquire();
     410            rv.init(ctx);
    413411        }
    414412        if (rv == null)
     
    441439        if (!CACHE)
    442440            return;
    443         _packetCache.offer(this);
     441        _packetCache.tryRelease(this);
    444442    }
    445443   
Note: See TracChangeset for help on using the changeset viewer.