Changeset 92ffea2


Ignore:
Timestamp:
Aug 29, 2011 5:48:08 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
20855c9
Parents:
81240a5
Message:
  • NetDB:
    • Replace the old parallel lookup method with a true Kademlia lookup that iteratively queries additional floodfill peers returned in DatabaseSearchReplyMessages?. This is a more efficient and reliable lookup that will work much better when not all floodfill peers are known, and it removes a serious limitation to network growth.
    • Limit max number of DSRM entries to look up
    • Cleanups, javadocs, log tweaks
Location:
router/java/src/net/i2p/router/networkdb/kademlia
Files:
4 added
11 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupMatchJob.java

    r81240a5 r92ffea2  
    1414class FloodOnlyLookupMatchJob extends JobImpl implements ReplyJob {
    1515    private final Log _log;
    16     private final FloodOnlySearchJob _search;
     16    private final FloodSearchJob _search;
    1717
    18     public FloodOnlyLookupMatchJob(RouterContext ctx, FloodOnlySearchJob job) {
     18    public FloodOnlyLookupMatchJob(RouterContext ctx, FloodSearchJob job) {
    1919        super(ctx);
    2020        _log = ctx.logManager().getLog(getClass());
     
    3434    }
    3535
    36     public String getName() { return "NetDb flood search (phase 1) match"; }
     36    public String getName() { return "NetDb flood search match"; }
    3737
    3838    public void setMessage(I2NPMessage message) {
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupSelector.java

    r81240a5 r92ffea2  
    88import net.i2p.util.Log;
    99
     10/**
     11 * Mostly replaced by IterativeLookupSelector
     12 */
    1013class FloodOnlyLookupSelector implements MessageSelector {
    1114    private final RouterContext _context;
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlyLookupTimeoutJob.java

    r81240a5 r92ffea2  
    55import net.i2p.util.Log;
    66
     7/**
     8 *  This is the timeout for the whole search.
     9 */
    710class FloodOnlyLookupTimeoutJob extends JobImpl {
    811    private final FloodSearchJob _search;
    912
    10     public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodOnlySearchJob job) {
     13    public FloodOnlyLookupTimeoutJob(RouterContext ctx, FloodSearchJob job) {
    1114        super(ctx);
    1215        _search = job;
     
    2023    }
    2124
    22     public String getName() { return "NetDb flood search (phase 1) timeout"; }
     25    public String getName() { return "NetDb flood search timeout"; }
    2326}
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java

    r81240a5 r92ffea2  
    1515import net.i2p.router.RouterContext;
    1616import net.i2p.router.TunnelInfo;
    17 import net.i2p.router.peermanager.PeerProfile;
    1817import net.i2p.util.Log;
    1918
    2019/**
     20 * Uunused directly, replaced by IterativeSearchJob, but still extended by
     21 * SingleSearchJob.
     22 *
    2123 * Try sending a search to some floodfill peers, failing completely if we don't get
    2224 * a match from one of those peers, with no fallback to the kademlia search
     
    3537 * after it loses (or never had) floodfill references, as long as it
    3638 * knows one peer that is up.
    37  *
    3839 */
    3940class FloodOnlySearchJob extends FloodSearchJob {
    40     private volatile boolean _dead;
    41     protected final long _created;
    4241    private boolean _shouldProcessDSRM;
    4342    private final HashSet<Hash> _unheardFrom;
     
    5049
    5150    private static final int MIN_FOR_NO_DSRM = 4;
     51    private static final long SINGLE_SEARCH_MSG_TIME = 10*1000;
    5252
    5353    public FloodOnlySearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
     
    5656        _timeoutMs = Math.min(timeoutMs, SearchJob.PER_FLOODFILL_PEER_TIMEOUT);
    5757        _expiration = _timeoutMs + ctx.clock().now();
    58         _origExpiration = _timeoutMs + ctx.clock().now();
    5958        _unheardFrom = new HashSet(CONCURRENT_SEARCHES);
    6059        _replySelector = new FloodOnlyLookupSelector(getContext(), this);
    6160        _onReply = new FloodOnlyLookupMatchJob(getContext(), this);
    6261        _onTimeout = new FloodOnlyLookupTimeoutJob(getContext(), this);
    63         _created = System.currentTimeMillis();
    64     }
    65 
    66     /** System time, NOT context time */
    67     public long getCreated() { return _created; }
     62    }
    6863
    6964    public boolean shouldProcessDSRM() { return _shouldProcessDSRM; }
     
    175170            }
    176171            dlm.setFrom(replyTunnel.getPeer(0));
    177             dlm.setMessageExpiration(getContext().clock().now()+10*1000);
     172            dlm.setMessageExpiration(getContext().clock().now() + SINGLE_SEARCH_MSG_TIME);
    178173            dlm.setReplyTunnel(replyTunnel.getReceiveTunnelId(0));
    179174            dlm.setSearchKey(_key);
    180175           
    181176            if (_log.shouldLog(Log.INFO))
    182                 _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " to " + peer.toBase64());
     177                _log.info(getJobId() + ": Floodfill search for " + _key + " to " + peer);
    183178            getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
    184179            count++;
     
    188183        if (count <= 0) {
    189184            if (_log.shouldLog(Log.INFO))
    190                 _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " had no peers to send to");
     185                _log.info(getJobId() + ": Floodfill search for " + _key + " had no peers to send to");
    191186            // no floodfill peers, fail
    192187            failed();
     
    195190
    196191    @Override
    197     public String getName() { return "NetDb flood search (phase 1)"; }
     192    public String getName() { return "NetDb flood search"; }
    198193   
    199194    /**
     
    216211        }
    217212        getContext().messageRegistry().unregisterPending(_out);
    218         int timeRemaining = (int)(_origExpiration - getContext().clock().now());
    219         if (_log.shouldLog(Log.INFO))
    220             _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining + " remaining after " + (System.currentTimeMillis()-_created));
     213        long time = System.currentTimeMillis() - _created;
     214        if (_log.shouldLog(Log.INFO)) {
     215             int timeRemaining = (int)(_expiration - getContext().clock().now());
     216            _log.info(getJobId() + ": Floodfill search for " + _key + " failed with " + timeRemaining + " remaining after " + time);
     217        }
    221218        synchronized(_unheardFrom) {
    222219            for (Iterator<Hash> iter = _unheardFrom.iterator(); iter.hasNext(); )
     
    224221        }
    225222        _facade.complete(_key);
    226         getContext().statManager().addRateData("netDb.failedTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
     223        getContext().statManager().addRateData("netDb.failedTime", time, 0);
    227224        synchronized (_onFailed) {
    228225            for (int i = 0; i < _onFailed.size(); i++) {
     
    240237        }
    241238        if (_log.shouldLog(Log.INFO))
    242             _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " successful");
     239            _log.info(getJobId() + ": Floodfill search for " + _key + " successful");
    243240        // Sadly, we don't know which of the two replied, unless the first one sent a DSRM
    244241        // before the second one sent the answer, which isn't that likely.
     
    249246        // and FloodfillVerifyStoreJob) to record successful searches for now.
    250247        // StoreJob also calls dbStoreSent() which updates the lastHeardFrom timer - this also helps.
     248        long time = System.currentTimeMillis() - _created;
    251249        synchronized(_unheardFrom) {
    252250            if (_unheardFrom.size() == 1) {
    253251                Hash peer = _unheardFrom.iterator().next();
    254                 getContext().profileManager().dbLookupSuccessful(peer, System.currentTimeMillis()-_created);
     252                getContext().profileManager().dbLookupSuccessful(peer, time);
    255253            }
    256254        }
    257255        _facade.complete(_key);
    258         getContext().statManager().addRateData("netDb.successTime", System.currentTimeMillis()-_created, System.currentTimeMillis()-_created);
     256        getContext().statManager().addRateData("netDb.successTime", time, 0);
    259257        synchronized (_onFind) {
    260258            while (!_onFind.isEmpty())
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java

    r81240a5 r92ffea2  
    3737    protected long _expiration;
    3838    protected int _timeoutMs;
    39     protected long _origExpiration;
    4039    protected final boolean _isLease;
    4140    protected volatile int _lookupsRemaining;
    4241    protected volatile boolean _dead;
     42    protected final long _created;
    4343
    4444    public FloodSearchJob(RouterContext ctx, FloodfillNetworkDatabaseFacade facade, Hash key, Job onFind, Job onFailed, int timeoutMs, boolean isLease) {
     
    5656        _timeoutMs = timeout;
    5757        _expiration = timeout + ctx.clock().now();
    58         _origExpiration = timeoutMs + ctx.clock().now();
    5958        _isLease = isLease;
    60     }
    61 
     59        _created = System.currentTimeMillis();
     60    }
     61
     62    /** System time, NOT context time */
     63    public long getCreated() { return _created; }
     64
     65    /**
     66     *  Add jobs to an existing search
     67     */
    6268    void addDeferred(Job onFind, Job onFailed, long timeoutMs, boolean isLease) {
    6369        if (_dead) {
     
    7682    private static final int FLOOD_SEARCH_TIME_MIN = 30*1000;
    7783
     84    /**
     85     *  Deprecated, unused, see FOSJ override
     86     */
    7887    public void runJob() {
     88        throw new UnsupportedOperationException("use override");
     89/****
    7990        // pick some floodfill peers and send out the searches
    8091        List floodfillPeers = _facade.getFloodfillPeers();
     
    122133            _facade.searchFull(_key, _onFind, _onFailed, _timeoutMs*FLOOD_SEARCH_TIME_FACTOR, _isLease);
    123134        }
    124     }
    125 
     135****/
     136    }
     137
     138    /**
     139     *  Deprecated, unused, see FOSJ override
     140     */
    126141    public String getName() { return "NetDb search (phase 1)"; }
    127142   
     
    140155    protected int getLookupsRemaining() { return _lookupsRemaining; }
    141156   
     157    /**
     158     *  Deprecated, unused, see FOSJ override
     159     */
    142160    void failed() {
     161        throw new UnsupportedOperationException("use override");
     162/****
    143163        if (_dead) return;
    144164        _dead = true;
    145         int timeRemaining = (int)(_origExpiration - getContext().clock().now());
     165        int timeRemaining = (int)(_expiration - getContext().clock().now());
    146166        if (_log.shouldLog(Log.INFO))
    147167            _log.info(getJobId() + ": Floodfill search for " + _key.toBase64() + " failed with " + timeRemaining);
     
    157177                getContext().jobQueue().addJob(removed.remove(0));
    158178        }
    159     }
    160 
     179****/
     180    }
     181
     182    /**
     183     *  Deprecated, unused, see FOSJ override
     184     */
    161185    void success() {
     186        throw new UnsupportedOperationException("use override");
     187/****
    162188        if (_dead) return;
    163189        if (_log.shouldLog(Log.INFO))
     
    172198        while (!removed.isEmpty())
    173199            getContext().jobQueue().addJob(removed.remove(0));
    174     }
    175 
     200****/
     201    }
     202
     203    /**
     204     *  Deprecated, unused, see FOSJ override
     205     */
     206/****
    176207    private static class FloodLookupTimeoutJob extends JobImpl {
    177208        private FloodSearchJob _search;
     
    187218        public String getName() { return "NetDb search (phase 1) timeout"; }
    188219    }
    189 
     220****/
     221
     222    /**
     223     *  Deprecated, unused, see FOSJ override
     224     */
     225/****
    190226    private static class FloodLookupMatchJob extends JobImpl implements ReplyJob {
    191227        private Log _log;
     
    212248        public void setMessage(I2NPMessage message) {}
    213249    }
    214 
     250****/
     251
     252    /**
     253     *  Deprecated, unused, see FOSJ override
     254     */
     255/****
    215256    private static class FloodLookupSelector implements MessageSelector {
    216257        private RouterContext _context;
     
    241282        }   
    242283    }
     284****/
    243285}
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java

    r81240a5 r92ffea2  
    5050        _context.statManager().createRequiredRateStat("netDb.successTime", "Time for successful lookup (ms)", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
    5151        _context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
    52         _context.statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 60*1000l, 10*60*1000l });
     52        _context.statManager().createRateStat("netDb.retries", "How many additional queries for an iterative search", "NetworkDatabase", new long[] { 60*60*1000l });
     53        _context.statManager().createRateStat("netDb.failedAttemptedPeers", "How many peers we sent a search to when the search fails", "NetworkDatabase", new long[] { 10*60*1000l });
    5354        _context.statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
    5455        _context.statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "NetworkDatabase", new long[] { 60*60*1000l, 24*60*60*1000l });
     
    212213    }
    213214   
     215    /**
     216     *  @param may be null, returns false if null
     217     */
    214218    public static boolean isFloodfill(RouterInfo peer) {
    215219        if (peer == null) return false;
     
    248252     * without any match)
    249253     *
     254     * @return null always
    250255     */
    251256    @Override
     
    259264            if (searchJob == null) {
    260265                //if (SearchJob.onlyQueryFloodfillPeers(_context)) {
    261                     searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
     266                    //searchJob = new FloodOnlySearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
     267                    searchJob = new IterativeSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
    262268                //} else {
    263269                //    searchJob = new FloodSearchJob(_context, this, key, onFindJob, onFailedLookupJob, (int)timeoutMs, isLease);
     
    287293     * Unused - called only by FloodSearchJob which is overridden - don't use this.
    288294     */
     295/*****
    289296    void searchFull(Hash key, List<Job> onFind, List<Job> onFailed, long timeoutMs, boolean isLease) {
    290297        synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
     
    331338        }
    332339    }
    333 
     340*****/
     341
     342    /**
     343     *  Must be called by the search job queued by search() on success or failure
     344     */
    334345    void complete(Hash key) {
    335346        synchronized (_activeFloodQueries) { _activeFloodQueries.remove(key); }
  • router/java/src/net/i2p/router/networkdb/kademlia/HandleFloodfillDatabaseStoreMessageJob.java

    r81240a5 r92ffea2  
    4848   
    4949    public void runJob() {
    50         if (_log.shouldLog(Log.DEBUG))
    51             _log.debug("Handling database store message");
     50        //if (_log.shouldLog(Log.DEBUG))
     51        //    _log.debug("Handling database store message");
    5252
    5353        long recvBegin = System.currentTimeMillis();
  • router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java

    r81240a5 r92ffea2  
    644644            return "Expired leaseSet for " + leaseSet.getDestination().calculateHash().toBase64()
    645645                   + " expired " + DataHelper.formatDuration(age) + " ago";
    646         } else if (leaseSet.getEarliestLeaseDate() > _context.clock().now() + Router.CLOCK_FUDGE_FACTOR + MAX_LEASE_FUTURE) {
     646        } else if (leaseSet.getEarliestLeaseDate() > _context.clock().now() + (Router.CLOCK_FUDGE_FACTOR + MAX_LEASE_FUTURE)) {
    647647            long age = leaseSet.getEarliestLeaseDate() - _context.clock().now();
    648             if (_log.shouldLog(Log.ERROR))
    649                 _log.error("LeaseSet to expire too far in the future: "
     648            // let's not make this an error, it happens when peers have bad clocks
     649            if (_log.shouldLog(Log.WARN))
     650                _log.warn("LeaseSet expires too far in the future: "
    650651                          + leaseSet.getDestination().calculateHash().toBase64()
    651                           + " expires on " + new Date(leaseSet.getEarliestLeaseDate()), new Exception("Rejecting store"));
    652             return "Future expiring leaseSet for " + leaseSet.getDestination().calculateHash().toBase64()
     652                          + " expires " + DataHelper.formatDuration(age) + " from now");
     653            return "Future expiring leaseSet for " + leaseSet.getDestination().calculateHash()
    653654                   + " expiring in " + DataHelper.formatDuration(age);
    654655        }
  • router/java/src/net/i2p/router/networkdb/kademlia/SingleLookupJob.java

    r81240a5 r92ffea2  
    99
    1010/**
    11  * Ask the peer who sent us the DSRM for the RouterInfos.
     11 * Ask the peer who sent us the DSRM for the RouterInfos...
    1212 *
    13  * If we have the routerInfo already, try to refetch it from that router itself,
     13 * ... but If we have the routerInfo already, try to refetch it from that router itself,
    1414 * (if the info is old or we don't think it is floodfill)
    1515 * which will help us establish that router as a good floodfill and speed our
     
    2424    private final DatabaseSearchReplyMessage _dsrm;
    2525
     26    /**
     27     *  I2NP spec allows 255, max actually sent (in ../HDLMJ) is 3,
     28     *  so just to prevent trouble, we don't want to queue 255 jobs at once
     29     */
     30    public static final int MAX_TO_FOLLOW = 8;
     31
    2632    public SingleLookupJob(RouterContext ctx, DatabaseSearchReplyMessage dsrm) {
    2733        super(ctx);
     
    3238    public void runJob() {
    3339        Hash from = _dsrm.getFromHash();
    34         for (int i = 0; i < _dsrm.getNumReplies(); i++) {
     40        int limit = Math.min(_dsrm.getNumReplies(), MAX_TO_FOLLOW);
     41        for (int i = 0; i < limit; i++) {
    3542            Hash peer = _dsrm.getReply(i);
    3643            if (peer.equals(getContext().routerHash())) // us
  • router/java/src/net/i2p/router/networkdb/kademlia/SingleSearchJob.java

    r81240a5 r92ffea2  
    4949       
    5050        if (_log.shouldLog(Log.INFO))
    51             _log.info(getJobId() + ": Single search for " + _key.toBase64() + " to " + _to.toBase64());
     51            _log.info(getJobId() + ": Single search for " + _key + " to " + _to);
    5252        getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
    5353        _lookupsRemaining = 1;
  • router/java/src/net/i2p/router/networkdb/kademlia/XORComparator.java

    r81240a5 r92ffea2  
    1111 */
    1212class XORComparator implements Comparator<Hash> {
    13     private Hash _base;
     13    private final Hash _base;
     14
    1415    /**
    1516     * @param target key to compare distances with
     
    1819        _base = target;
    1920    }
     21
    2022    public int compare(Hash lhs, Hash rhs) {
    2123        if (lhs == null) throw new NullPointerException("LHS is null");
Note: See TracChangeset for help on using the changeset viewer.