Changeset 3a2fe5e


Ignore:
Timestamp:
Mar 16, 2012 12:20:29 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
e2a39a3
Parents:
fb8244e
Message:
  • OCMOSJ: Refactor cache to its own class, make non-static
Files:
1 added
4 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    rfb8244e r3a2fe5e  
     12012-03-16 zzz
     2  * FragmentHandler: Zero-copy read of unfragmented messages
     3                     for speed and to reduce object churn
     4  * Home page: Tag tooltip; CSS tweaks; news tweak
     5  * HTTP Proxy: Jump and addresshelper page tweaks
     6  * Jetty: Add I2P mime types to default eepsite config
     7  * OCMOSJ: Refactor cache to its own class, make non-static
     8  * TransportManager: Fix fatal exception on soft restart caused by DHSKB refactoring
     9  * TrustedUpdate: Preserve default key names even when keys are set
     10                   in advanced config
     11
    1122012-03-15 sponge
    213  * Plugins:
  • router/java/src/net/i2p/router/ClientMessagePool.java

    rfb8244e r3a2fe5e  
    1212
    1313import net.i2p.client.I2PClient;
     14import net.i2p.router.message.OutboundCache;
    1415import net.i2p.router.message.OutboundClientMessageOneShotJob;
    1516import net.i2p.util.Log;
     
    2627    private final Log _log;
    2728    private final RouterContext _context;
     29    private final OutboundCache _cache;
    2830   
    2931    public ClientMessagePool(RouterContext context) {
    3032        _context = context;
    3133        _log = _context.logManager().getLog(ClientMessagePool.class);
     34        _cache = new OutboundCache(_context);
    3235        OutboundClientMessageOneShotJob.init(_context);
    3336    }
     
    3740     */
    3841    public void shutdown() {
    39         OutboundClientMessageOneShotJob.clearAllCaches();
     42        _cache.clearAllCaches();
    4043    }
    4144
     
    7376            if (_log.shouldLog(Log.DEBUG))
    7477                _log.debug("Adding message for remote delivery");
    75             OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, msg);
     78            OutboundClientMessageOneShotJob j = new OutboundClientMessageOneShotJob(_context, _cache, msg);
    7679            if (true) // blocks the I2CP reader for a nontrivial period of time
    7780                j.runJob();
  • router/java/src/net/i2p/router/RouterVersion.java

    rfb8244e r3a2fe5e  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 15;
     21    public final static long BUILD = 16;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java

    rfb8244e r3a2fe5e  
    33import java.util.ArrayList;
    44import java.util.Collections;
    5 import java.util.HashMap;
    65import java.util.HashSet;
    7 import java.util.Iterator;
    86import java.util.List;
    97import java.util.Map;
    108import java.util.Properties;
    119import java.util.Set;
    12 import java.util.concurrent.ConcurrentHashMap;
    1310
    1411import net.i2p.crypto.SessionKeyManager;
    1512import net.i2p.crypto.TagSetHandle;
    16 import net.i2p.data.Base64;
    1713import net.i2p.data.Certificate;
    1814import net.i2p.data.Destination;
     
    3935import net.i2p.router.TunnelInfo;
    4036import net.i2p.util.Log;
    41 import net.i2p.util.SimpleScheduler;
    42 import net.i2p.util.SimpleTimer;
    4337
    4438/**
     
    5044public class OutboundClientMessageOneShotJob extends JobImpl {
    5145    private final Log _log;
     46    private final OutboundCache _cache;
    5247    private final long _overallExpiration;
    5348    private final ClientMessage _clientMessage;
     
    7166   
    7267    /**
    73      * This is the place where we make I2P go fast.
    74      *
    75      * We have five static caches.
    76      * - The LeaseSet cache is used to decide whether to bundle our own leaseset,
    77      *   which minimizes overhead.
    78      * - The Lease cache is used to persistently send to the same lease for the destination,
    79      *   which keeps the streaming lib happy by minimizing out-of-order delivery.
    80      * - The Tunnel and BackloggedTunnel caches are used to persistently use the same outbound tunnel
    81      *   for the same destination,
    82      *   which keeps the streaming lib happy by minimizing out-of-order delivery.
    83      * - The last reply requested cache ensures that a reply is requested every so often,
    84      *   so that failed tunnels are recognized.
    85      *
    86      */
    87 
    88     /**
    8968     * Key used to cache things with, based on source + dest
    9069     */
    91     private final HashPair _hashPair;
    92 
    93     /**
    94      * Use the same outbound tunnel as we did for the same destination previously,
    95      * if possible, to keep the streaming lib happy
    96      * Use two caches - although a cache of a list of tunnels per dest might be
    97      * more elegant.
    98      * Key the caches on the source+dest pair.
    99      *
    100      */
    101     private static final Map<HashPair, TunnelInfo> _tunnelCache = new HashMap(64);
    102 
    103     private static final Map<HashPair, TunnelInfo> _backloggedTunnelCache = new HashMap(64);
    104 
    105     /**
    106       * Returns the reply lease set if forced to do so,
    107       * or if configured to do so,
    108       * or if a certain percentage of the time if configured to do so,
    109       * or if our lease set has changed since we last talked to them,
    110       * or 10% of the time anyway so they don't forget us (disabled for now),
    111       * or null otherwise.
    112       *
    113       * Note that wantACK randomly forces us another 5% of the time.
    114       *
    115       * We don't want to do this too often as a typical 2-lease leaseset
    116       * in a DatabaseStoreMessage is 861+37=898 bytes -
    117       * when added to garlic it's a 1056-byte addition total, which is huge.
    118       *
    119       * Key the cache on the source+dest pair.
    120       */
    121     private static final Map<HashPair, LeaseSet> _leaseSetCache = new ConcurrentHashMap(64);
    122 
    123     /**
    124      * Use the same inbound tunnel (i.e. lease) as we did for the same destination previously,
    125      * if possible, to keep the streaming lib happy
    126      * Key the caches on the source+dest pair.
    127      *
    128      * We're going to use the lease until it expires, as long as it remains in the current leaseSet.
    129      *
    130      * If not found,
    131      * fetch the next lease that we should try sending through, randomly chosen
    132      * from within the sorted leaseSet (NOT sorted by # of failures through each
    133      * lease).
    134      *
    135      */
    136     private static final ConcurrentHashMap<HashPair, Lease> _leaseCache = new ConcurrentHashMap(64);
    137 
    138     /**
    139      * This cache is used to ensure that we request a reply every so often.
    140      * Hopefully this allows the router to recognize a failed tunnel and switch,
    141      * before upper layers like streaming lib fail, even for low-bandwidth
    142      * connections like IRC.
    143      */
    144     private static final Map<HashPair, Long> _lastReplyRequestCache = new ConcurrentHashMap(64);
     70    private final OutboundCache.HashPair _hashPair;
    14571
    14672    /**
     
    179105    private static final int BUNDLE_PROBABILITY_DEFAULT = 100;
    180106   
    181     private static final int CLEAN_INTERVAL = 5*60*1000;
    182107    private static final int REPLY_REQUEST_INTERVAL = 60*1000;
    183108
     
    185110     * Send the sucker
    186111     */
    187     public OutboundClientMessageOneShotJob(RouterContext ctx, ClientMessage msg) {
     112    public OutboundClientMessageOneShotJob(RouterContext ctx, OutboundCache cache, ClientMessage msg) {
    188113        super(ctx);
     114        _cache = cache;
    189115        _log = ctx.logManager().getLog(OutboundClientMessageOneShotJob.class);
    190116       
     
    195121        _from = msg.getFromDestination();
    196122        _to = msg.getDestination();
    197         _hashPair = new HashPair(_from.calculateHash(), _to.calculateHash());
     123        _hashPair = new OutboundCache.HashPair(_from.calculateHash(), _to.calculateHash());
    198124        _toString = _to.calculateHash().toBase64().substring(0,4);
    199125        _start = getContext().clock().now();
     
    237163    /** call once only */
    238164    public static void init(RouterContext ctx) {
    239         SimpleScheduler.getInstance().addPeriodicEvent(new OCMOSJCacheCleaner(ctx), CLEAN_INTERVAL, CLEAN_INTERVAL);
    240165        ctx.statManager().createFrequencyStat("client.sendMessageFailFrequency", "How often does a client fail to send a message?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
    241166        ctx.statManager().createRateStat("client.sendMessageSize", "How large are messages sent by the client?", "ClientMessages", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
     
    318243
    319244        // If the last leaseSet we sent him is still good, don't bother sending again
    320             LeaseSet ls = _leaseSetCache.put(_hashPair, newLS);
     245            LeaseSet ls = _cache.leaseSetCache.put(_hashPair, newLS);
    321246            if (!force) {
    322247                if (ls != null) {
     
    370295   
    371296    /**
     297     *  Choose a lease from his leaseset to send the message to. Sets _lease.
     298     *  Sets _wantACK if it's new or changed.
    372299     *  @return success
    373300     */
     
    386313        // Use the same lease if it's still good
    387314        // Even if _leaseSet changed, _leaseSet.getEncryptionKey() didn't...
    388             _lease = _leaseCache.get(_hashPair);
     315            _lease = _cache.leaseCache.get(_hashPair);
    389316            if (_lease != null) {
    390317                // if outbound tunnel length == 0 && lease.firsthop.isBacklogged() don't use it ??
     
    402329                }
    403330                // remove only if still equal to _lease (concurrent)
    404                 _leaseCache.remove(_hashPair, _lease);
     331                _cache.leaseCache.remove(_hashPair, _lease);
    405332                if (_log.shouldLog(Log.INFO))
    406333                    _log.info(getJobId() + ": Expired from cache - lease for " + _toString);
     
    472399                _log.warn(getJobId() + ": All leases are unreachable for " + _toString);
    473400        }
    474         _leaseCache.put(_hashPair, _lease);
     401        _cache.leaseCache.put(_hashPair, _lease);
    475402        if (_log.shouldLog(Log.INFO))
    476403            _log.info(getJobId() + ": Added to cache - lease for " + _toString);
     
    529456        boolean wantACK;
    530457
    531             Long lastSent = _lastReplyRequestCache.get(_hashPair);
     458            Long lastSent = _cache.lastReplyRequestCache.get(_hashPair);
    532459            wantACK = _wantACK || existingTags <= 30 ||
    533460                      lastSent == null || lastSent.longValue() < now - REPLY_REQUEST_INTERVAL;
    534461            if (wantACK)
    535                 _lastReplyRequestCache.put(_hashPair, Long.valueOf(now));
     462                _cache.lastReplyRequestCache.put(_hashPair, Long.valueOf(now));
    536463       
    537464        PublicKey key = _leaseSet.getEncryptionKey();
     
    658585
    659586    /**
    660      * Key used to cache things with based on source + dest
    661      * @since 0.8.3
    662      */
    663     private static class HashPair {
    664         private final Hash sh, dh;
    665 
    666         public HashPair(Hash s, Hash d) {
    667             sh = s;
    668             dh = d;
    669         }
    670 
    671         public int hashCode() {
    672             return sh.hashCode() ^ dh.hashCode();
    673         }
    674 
    675         public boolean equals(Object o) {
    676             if (o == null || !(o instanceof HashPair))
    677                 return false;
    678             HashPair hp = (HashPair) o;
    679             return sh.equals(hp.sh) && dh.equals(hp.dh);
    680         }
    681     }
    682 
    683     /**
    684587     * Called on failure to give us a better chance of success next time.
    685588     * Of course this is probably 60s too late.
     
    689592     */
    690593    private void clearCaches() {
    691         if (_inTunnel != null) {   // if we wanted an ack, we sent our lease too
    692                 _leaseSetCache.remove(_hashPair);
    693         }
    694         if (_lease != null) {
    695             // remove only if still equal to _lease (concurrent)
    696             _leaseCache.remove(_hashPair, _lease);
    697         }
    698         if (_outTunnel != null) {
    699             synchronized(_tunnelCache) {
    700                 TunnelInfo t = _backloggedTunnelCache.get(_hashPair);
    701                 if (t != null && t.equals(_outTunnel))
    702                     _backloggedTunnelCache.remove(_hashPair);
    703                 t = _tunnelCache.get(_hashPair);
    704                 if (t != null && t.equals(_outTunnel))
    705                     _tunnelCache.remove(_hashPair);
    706             }
    707         }
    708     }
    709 
    710     /**
    711      *  @since 0.8.8
    712      */
    713     public static void clearAllCaches() {
    714         _leaseSetCache.clear();
    715         _leaseCache.clear();
    716         synchronized(_tunnelCache) {
    717             _backloggedTunnelCache.clear();
    718             _tunnelCache.clear();
    719         }
    720         _lastReplyRequestCache.clear();
    721     }
    722 
    723     /**
    724      * Clean out old leaseSets
    725      */
    726     private static void cleanLeaseSetCache(RouterContext ctx, Map<HashPair, LeaseSet> tc) {
    727         long now = ctx.clock().now();
    728         for (Iterator<LeaseSet> iter = tc.values().iterator(); iter.hasNext(); ) {
    729             LeaseSet l = iter.next();
    730             if (l.getEarliestLeaseDate() < now)
    731                 iter.remove();
    732         }
    733     }
    734 
    735     /**
    736      * Clean out old leases
    737      */
    738     private static void cleanLeaseCache(Map<HashPair, Lease> tc) {
    739         for (Iterator<Lease> iter = tc.values().iterator(); iter.hasNext(); ) {
    740             Lease l = iter.next();
    741             if (l.isExpired(Router.CLOCK_FUDGE_FACTOR))
    742                 iter.remove();
    743         }
    744     }
    745 
    746     /**
    747      * Clean out old tunnels
    748      * Caller must synchronize on tc.
    749      */
    750     private static void cleanTunnelCache(RouterContext ctx, Map<HashPair, TunnelInfo> tc) {
    751         for (Iterator<Map.Entry<HashPair, TunnelInfo>> iter = tc.entrySet().iterator(); iter.hasNext(); ) {
    752             Map.Entry<HashPair, TunnelInfo> entry = iter.next();
    753             HashPair k = entry.getKey();
    754             TunnelInfo tunnel = entry.getValue();
    755             // This is a little sneaky, but get the _from back out of the "opaque" hash key
    756             if (!ctx.tunnelManager().isValidTunnel(k.sh, tunnel))
    757                 iter.remove();
    758         }
    759     }
    760 
    761     /**
    762      * Clean out old reply times
    763      */
    764     private static void cleanReplyCache(RouterContext ctx, Map<HashPair, Long> tc) {
    765         long now = ctx.clock().now();
    766         for (Iterator<Long> iter = tc.values().iterator(); iter.hasNext(); ) {
    767             Long l = iter.next();
    768             if (l.longValue() < now - CLEAN_INTERVAL)
    769                 iter.remove();
    770         }
    771     }
    772 
    773     private static class OCMOSJCacheCleaner implements SimpleTimer.TimedEvent {
    774         private RouterContext _ctx;
    775         private OCMOSJCacheCleaner(RouterContext ctx) {
    776             _ctx = ctx;
    777         }
    778         public void timeReached() {
    779             cleanLeaseSetCache(_ctx, _leaseSetCache);
    780             cleanLeaseCache(_leaseCache);
    781             synchronized(_tunnelCache) {
    782                 cleanTunnelCache(_ctx, _tunnelCache);
    783                 cleanTunnelCache(_ctx, _backloggedTunnelCache);
    784             }
    785             cleanReplyCache(_ctx, _lastReplyRequestCache);
    786         }
    787     }
    788 
     594        _cache.clearCaches(_hashPair, _lease, _inTunnel, _outTunnel);
     595    }
     596
     597    /**
     598     *  Choose our outbound tunnel to send the message through.
     599     *  Sets _wantACK if it's new or changed.
     600     *  @return the tunnel or null on failure
     601     */
    789602    private TunnelInfo selectOutboundTunnel(Destination to) {
    790603        TunnelInfo tunnel;
    791         synchronized (_tunnelCache) {
     604        synchronized (_cache.tunnelCache) {
    792605            /**
    793606             * If old tunnel is valid and no longer backlogged, use it.
     
    796609             * backlog and seeing if traffic came back or not.
    797610             */
    798             tunnel = _backloggedTunnelCache.get(_hashPair);
     611            tunnel = _cache.backloggedTunnelCache.get(_hashPair);
    799612            if (tunnel != null) {
    800613                if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
     
    802615                        if (_log.shouldLog(Log.WARN))
    803616                            _log.warn("Switching back to tunnel " + tunnel + " for " + _toString);
    804                         _backloggedTunnelCache.remove(_hashPair);
    805                         _tunnelCache.put(_hashPair, tunnel);
     617                        _cache.backloggedTunnelCache.remove(_hashPair);
     618                        _cache.tunnelCache.put(_hashPair, tunnel);
    806619                        _wantACK = true;
    807620                        return tunnel;
    808621                    }  // else still backlogged
    809622                } else // no longer valid
    810                     _backloggedTunnelCache.remove(_hashPair);
     623                    _cache.backloggedTunnelCache.remove(_hashPair);
    811624            }
    812625            // Use the same tunnel unless backlogged
    813             tunnel = _tunnelCache.get(_hashPair);
     626            tunnel = _cache.tunnelCache.get(_hashPair);
    814627            if (tunnel != null) {
    815628                if (getContext().tunnelManager().isValidTunnel(_from.calculateHash(), tunnel)) {
     
    819632                    if (_log.shouldLog(Log.WARN))
    820633                        _log.warn("Switching from backlogged " + tunnel + " for " + _toString);
    821                     _backloggedTunnelCache.put(_hashPair, tunnel);
     634                    _cache.backloggedTunnelCache.put(_hashPair, tunnel);
    822635                } // else no longer valid
    823                 _tunnelCache.remove(_hashPair);
     636                _cache.tunnelCache.remove(_hashPair);
    824637            }
    825638            // Pick a new tunnel
    826639            tunnel = selectOutboundTunnel();
    827640            if (tunnel != null)
    828                 _tunnelCache.put(_hashPair, tunnel);
     641                _cache.tunnelCache.put(_hashPair, tunnel);
    829642            _wantACK = true;
    830643        }
Note: See TracChangeset for help on using the changeset viewer.