Changeset 6162908


Ignore:
Timestamp:
Sep 8, 2012 12:57:09 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
1ae0c2e
Parents:
5056706 (diff), 4cf1047 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p' (head 86f3e7e668b7ec9f2ddf75be7586719944bbc37f)

to branch 'i2p.i2p.zzz.test' (head da9536c250bc4c0b7523ed748574de1cc97f3028)

Files:
10 added
78 edited

Legend:

Unmodified
Added
Removed
  • apps/i2psnark/java/src/net/i2p/kademlia/KBucketSet.java

    r5056706 r6162908  
    1515import java.util.Comparator;
    1616import java.util.HashSet;
    17 import java.util.LinkedHashMap;
    1817import java.util.List;
    1918import java.util.Map;
     
    2524import net.i2p.data.DataHelper;
    2625import net.i2p.data.SimpleDataStructure;
     26import net.i2p.util.LHMCache;
    2727import net.i2p.util.Log;
    2828
     
    659659            _bValue = bValue;
    660660            _bigUs = new BigInteger(1, us.getData());
    661             _distanceCache = new LHM(256);
     661            _distanceCache = new LHMCache(256);
    662662        }
    663663
     
    698698    }
    699699
    700     private static class LHM<K, V> extends LinkedHashMap<K, V> {
    701         private final int _max;
    702 
    703         public LHM(int max) {
    704             super(max, 0.75f, true);
    705             _max = max;
    706         }
    707 
    708         @Override
    709         protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
    710             return size() > _max;
    711         }
    712     }
    713 
    714700    /**
    715701     *  For Collections.binarySearch.
  • apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java

    r5056706 r6162908  
    3939import net.i2p.data.Hash;
    4040import net.i2p.data.SimpleDataStructure;
     41import net.i2p.util.ConcurrentHashSet;
    4142import net.i2p.util.I2PAppThread;
    4243import net.i2p.util.Log;
     
    9899    /** index to incoming opaque tokens, received in a peers or nodes reply */
    99100    private final ConcurrentHashMap<NID, Token> _incomingTokens;
     101    /** recently unreachable, with lastSeen() as the added-to-blacklist time  */
     102    private final Set<NID> _blacklist;
    100103
    101104    /** hook to inject and receive datagrams */
     
    148151    private static final long CLEAN_TIME = 63*1000;
    149152    private static final long EXPLORE_TIME = 877*1000;
     153    private static final long BLACKLIST_CLEAN_TIME = 17*60*1000;
    150154    private static final String DHT_FILE = "i2psnark.dht.dat";
    151155
     
    162166        _outgoingTokens = new ConcurrentHashMap();
    163167        _incomingTokens = new ConcurrentHashMap();
     168        _blacklist = new ConcurrentHashSet();
    164169
    165170        // Construct my NodeInfo
     
    263268            int replyType = waiter.getReplyCode();
    264269            if (replyType == REPLY_NONE) {
    265                  if (_log.shouldLog(Log.INFO))
    266                      _log.info("Got no reply");
     270                 if (_log.shouldLog(Log.DEBUG))
     271                     _log.debug("Got no reply");
    267272            } else if (replyType == REPLY_NODES) {
    268273                 List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject();
    269274                 // It seems like we are just going to get back ourselves all the time
    270                  if (_log.shouldLog(Log.INFO))
    271                      _log.info("Got " + reply.size() + " nodes");
     275                 if (_log.shouldLog(Log.DEBUG))
     276                     _log.debug("Got " + reply.size() + " nodes");
    272277                 for (NodeInfo ni : reply) {
    273278                     if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni))))
     
    349354            int replyType = waiter.getReplyCode();
    350355            if (replyType == REPLY_NONE) {
    351                  if (_log.shouldLog(Log.INFO))
    352                      _log.info("Got no reply");
     356                 if (_log.shouldLog(Log.DEBUG))
     357                     _log.debug("Got no reply");
    353358            } else if (replyType == REPLY_PONG) {
    354                  if (_log.shouldLog(Log.INFO))
    355                      _log.info("Got pong");
     359                 if (_log.shouldLog(Log.DEBUG))
     360                     _log.debug("Got pong");
    356361            } else if (replyType == REPLY_PEERS) {
    357                  if (_log.shouldLog(Log.INFO))
    358                      _log.info("Got peers");
     362                 if (_log.shouldLog(Log.DEBUG))
     363                     _log.debug("Got peers");
    359364                 List<Hash> reply = (List<Hash>) waiter.getReplyObject();
    360365                 if (!reply.isEmpty()) {
     
    368373            } else if (replyType == REPLY_NODES) {
    369374                 List<NodeInfo> reply = (List<NodeInfo>) waiter.getReplyObject();
    370                  if (_log.shouldLog(Log.INFO))
    371                      _log.info("Got " + reply.size() + " nodes");
     375                 if (_log.shouldLog(Log.DEBUG))
     376                     _log.debug("Got " + reply.size() + " nodes");
    372377                 for (NodeInfo ni : reply) {
    373378                     if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni)))
     
    577582        _outgoingTokens.clear();
    578583        _incomingTokens.clear();
     584        _blacklist.clear();
    579585    }
    580586
     
    593599    public String renderStatusHTML() {
    594600        long uptime = Math.max(1000, _context.clock().now() - _started);
    595         StringBuilder buf = new StringBuilder();
     601        StringBuilder buf = new StringBuilder(256);
    596602        buf.append("<br><b>DHT DEBUG</b><br>TX: ").append(_txPkts.get()).append(" pkts / ")
    597603           .append(DataHelper.formatSize2(_txBytes.get())).append("B / ")
     
    601607           .append(DataHelper.formatSize2(_rxBytes.get() * 1000 / uptime)).append("Bps<br>" +
    602608                   "DHT Peers: ").append( _knownNodes.size()).append("<br>" +
     609                   "Blacklisted: ").append(_blacklist.size()).append("<br>" +
    603610                   "Sent tokens: ").append(_outgoingTokens.size()).append("<br>" +
    604611                   "Rcvd tokens: ").append(_incomingTokens.size()).append("<br>" +
     
    10801087                oldInfo.setDestination(nInfo.getDestination());
    10811088        }
    1082         oldInfo.getNID().setLastSeen();
     1089        nID = oldInfo.getNID();
     1090        nID.setLastSeen();
     1091        if (_blacklist.remove(nID)) {
     1092            if (_log.shouldLog(Log.INFO))
     1093                _log.info("UN-blacklisted: " + nID);
     1094        }
    10831095        return oldInfo;
    10841096    }
     
    11091121                if (_log.shouldLog(Log.INFO))
    11101122                    _log.info("Removed after consecutive timeouts: " + nInfo);
     1123            }
     1124            if (!_blacklist.contains(nid)) {
     1125                // used as when-added time
     1126                nid.setLastSeen();
     1127                _blacklist.add(nid);
     1128                if (_log.shouldLog(Log.INFO))
     1129                    _log.info("Blacklisted: " + nid);
    11111130            }
    11121131        }
     
    12241243                Token token = new Token(_context, tok);
    12251244                _incomingTokens.put(nInfo.getNID(), token);
    1226                 if (_log.shouldLog(Log.INFO))
    1227                     _log.info("Got token: " + token + ", must be a response to get_peers");
     1245                if (_log.shouldLog(Log.DEBUG))
     1246                    _log.debug("Got token: " + token + ", must be a response to get_peers");
    12281247            } else {
    1229                 if (_log.shouldLog(Log.INFO))
    1230                     _log.info("No token and saved infohash, must be a response to find_node");
     1248                if (_log.shouldLog(Log.DEBUG))
     1249                    _log.debug("No token and saved infohash, must be a response to find_node");
    12311250            }
    12321251        }
     
    12601279        for (int off = 0; off < ids.length; off += NodeInfo.LENGTH) {
    12611280            NodeInfo nInf = new NodeInfo(ids, off);
     1281            if (_blacklist.contains(nInf.getNID())) {
     1282                if (_log.shouldLog(Log.INFO))
     1283                    _log.info("Ignoring blacklisted " + nInf.getNID() + " from: " + nInfo);
     1284                continue;
     1285            }
    12621286            nInf = heardAbout(nInf);
    12631287            rv.add(nInf);
     
    15021526            if (_log.shouldLog(Log.DEBUG))
    15031527                _log.debug("KRPC cleaner starting with " +
     1528                          _blacklist.size() + " in blacklist, " +
    15041529                          _outgoingTokens.size() + " sent Tokens, " +
    15051530                          _incomingTokens.size() + " rcvd Tokens");
     
    15141539                    iter.remove();
    15151540            }
     1541            for (Iterator<NID> iter = _blacklist.iterator(); iter.hasNext(); ) {
     1542                NID nid = iter.next();
     1543                // lastSeen() is actually when-added
     1544                if (now > nid.lastSeen() + BLACKLIST_CLEAN_TIME)
     1545                    iter.remove();
     1546            }
    15161547            // TODO sent queries?
    15171548            if (_log.shouldLog(Log.DEBUG))
    15181549                _log.debug("KRPC cleaner done, now with " +
     1550                          _blacklist.size() + " in blacklist, " +
    15191551                          _outgoingTokens.size() + " sent Tokens, " +
    15201552                          _incomingTokens.size() + " rcvd Tokens, " +
  • apps/i2psnark/java/src/org/klomp/snark/dht/NID.java

    r5056706 r6162908  
    1919    private int fails;
    2020
    21     private static final int MAX_FAILS = 3;
     21    private static final int MAX_FAILS = 2;
    2222
    2323    public NID() {
     
    4242     */
    4343    public boolean timeout() {
    44         return fails++ > MAX_FAILS;
     44        return ++fails > MAX_FAILS;
    4545    }
    4646}
  • apps/routerconsole/java/src/net/i2p/router/web/SummaryRenderer.java

    r5056706 r6162908  
    88import java.text.SimpleDateFormat;
    99import java.util.Date;
     10import java.util.Map;
    1011
    1112import javax.imageio.ImageIO;
     
    1617import net.i2p.data.DataHelper;
    1718import net.i2p.router.RouterContext;
     19import net.i2p.router.util.EventLog;
    1820import net.i2p.util.Log;
    1921
     
    138140                descr = _(_listener.getRate().getRateStat().getDescription());
    139141            }
    140             long started = ((RouterContext)_context).router().getWhenStarted();
    141             if (started > start && started < end)
    142                 def.vrule(started / 1000, RESTART_BAR_COLOR, _("Restart"), 4.0f);
     142
     143            //long started = ((RouterContext)_context).router().getWhenStarted();
     144            //if (started > start && started < end)
     145            //    def.vrule(started / 1000, RESTART_BAR_COLOR, _("Restart"), 4.0f);
     146
    143147            def.datasource(plotName, path, plotName, SummaryListener.CF, _listener.getBackendName());
    144148            if (descr.length() > 0)
     
    152156                // '07-Jul 21:09 UTC' with month name in the system locale
    153157                SimpleDateFormat sdf = new SimpleDateFormat("dd-MMM HH:mm");
     158                Map<Long, String> events = ((RouterContext)_context).router().eventLog().getEvents(EventLog.STARTED, start);
     159                for (Map.Entry<Long, String> event : events.entrySet()) {
     160                    long started = event.getKey().longValue();
     161                    if (started > start && started < end) {
     162                        String legend = _("Restart") + ' ' + sdf.format(new Date(started)) + " UTC " + event.getValue() + "\\r";
     163                        def.vrule(started / 1000, RESTART_BAR_COLOR, legend, 4.0f);
     164                    }
     165                }
    154166                def.comment(sdf.format(new Date(start)) + " -- " + sdf.format(new Date(end)) + " UTC\\r");
    155167            }
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    r5056706 r6162908  
    1414import net.i2p.data.SessionKey;
    1515import net.i2p.util.Log;
    16 import net.i2p.util.SimpleTimer;
    1716import net.i2p.util.SimpleTimer2;
    1817
     
    495494            _pendingPings.remove(id);
    496495        } else {
    497             SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
     496            PingFailed pf = new PingFailed(id, notifier);
     497            pf.schedule(timeoutMs);
    498498        }
    499499       
     
    506506    }
    507507   
    508     private class PingFailed implements SimpleTimer.TimedEvent {
     508    private class PingFailed extends SimpleTimer2.TimedEvent {
    509509        private final Long _id;
    510510        private final PingNotifier _notifier;
    511511
    512512        public PingFailed(Long id, PingNotifier notifier) {
     513            super(_context.simpleTimer2());
    513514            _id = id;
    514515            _notifier = notifier;
  • core/java/src/net/i2p/I2PAppContext.java

    r5056706 r6162908  
    488488        if (val == null)
    489489            return defaultVal;
    490         return Boolean.valueOf(val).booleanValue();
     490        return Boolean.parseBoolean(val);
    491491    }
    492492
     
    496496     */
    497497    public boolean getBooleanProperty(String propName) {
    498         return Boolean.valueOf(getProperty(propName)).booleanValue();
     498        return Boolean.parseBoolean(getProperty(propName));
    499499    }
    500500
     
    954954     * Use instead of SimpleTimer.getInstance()
    955955     * @since 0.9 to replace static instance in the class
     956     * @deprecated use SimpleTimer2
    956957     */
    957958    public SimpleTimer simpleTimer() {
     
    961962    }
    962963
     964    /**
     965     * @deprecated use SimpleTimer2
     966     */
    963967    private void initializeSimpleTimer() {
    964968        synchronized (_lock19) {
  • core/java/src/net/i2p/client/ClientWriterRunner.java

    r5056706 r6162908  
    55import java.util.concurrent.BlockingQueue;
    66import java.util.concurrent.LinkedBlockingQueue;
     7import java.util.concurrent.TimeUnit;
    78
    89import net.i2p.data.i2cp.I2CPMessage;
     
    2324    private BlockingQueue<I2CPMessage> _messagesToWrite;
    2425    private static volatile long __Id = 0;
     26
     27    private static final int MAX_QUEUE_SIZE = 32;
     28    private static final long MAX_SEND_WAIT = 10*1000;
    2529   
    2630    /** starts the thread too */
     
    2832        _out = out;
    2933        _session = session;
    30         _messagesToWrite = new LinkedBlockingQueue();
     34        _messagesToWrite = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
    3135        Thread t = new I2PAppThread(this, "I2CP Client Writer " + (++__Id), true);
    3236        t.start();
     
    3438
    3539    /**
    36      * Add this message to the writer's queue
    37      *
     40     * Add this message to the writer's queue.
     41     * Blocking if queue is full.
     42     * @throws I2PSessionException if we wait too long or are interrupted
    3843     */
    39     public void addMessage(I2CPMessage msg) {
     44    public void addMessage(I2CPMessage msg) throws I2PSessionException {
    4045        try {
    41             _messagesToWrite.put(msg);
    42         } catch (InterruptedException ie) {}
     46            if (!_messagesToWrite.offer(msg, MAX_SEND_WAIT, TimeUnit.MILLISECONDS))
     47                throw new I2PSessionException("Timed out waiting while write queue was full");
     48        } catch (InterruptedException ie) {
     49            throw new I2PSessionException("Interrupted while write queue was full", ie);
     50        }
    4351    }
    4452
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    r5056706 r6162908  
    1919import java.util.ArrayList;
    2020import java.util.Iterator;
    21 import java.util.LinkedHashMap;
    2221import java.util.List;
    2322import java.util.Map;
     
    4544import net.i2p.internal.QueuedI2CPMessageReader;
    4645import net.i2p.util.I2PAppThread;
     46import net.i2p.util.LHMCache;
    4747import net.i2p.util.Log;
    4848import net.i2p.util.SimpleScheduler;
     
    141141     *  @since 0.8.9
    142142     */
    143     private static final LookupCache _lookupCache = new LookupCache(16);
     143    private static final Map<Hash, Destination> _lookupCache = new LHMCache(16);
    144144
    145145    /** SSL interface (only) @since 0.8.3 */
     
    147147
    148148    private static final long VERIFY_USAGE_TIME = 60*1000;
     149
     150    private static final long MAX_SEND_WAIT = 10*1000;
    149151
    150152    void dateUpdated() {
     
    644646    /**
    645647     * Deliver an I2CP message to the router
     648     * As of 0.9.3, may block for several seconds if the write queue to the router is full
    646649     *
    647650     * @throws I2PSessionException if the message is malformed or there is an error writing it out
    648651     */
    649652    void sendMessage(I2CPMessage message) throws I2PSessionException {
    650         if (isClosed())
     653        if (isClosed()) {
    651654            throw new I2PSessionException("Already closed");
    652         else if (_queue != null)
    653             _queue.offer(message);  // internal
    654         else if (_writer == null)
     655        } else if (_queue != null) {
     656            // internal
     657            try {
     658                if (!_queue.offer(message, MAX_SEND_WAIT))
     659                    throw new I2PSessionException("Timed out waiting while write queue was full");
     660            } catch (InterruptedException ie) {
     661                throw new I2PSessionException("Interrupted while write queue was full", ie);
     662            }
     663        } else if (_writer == null) {
    655664            throw new I2PSessionException("Already closed");
    656         else
     665        } else {
    657666            _writer.addMessage(message);
     667        }
    658668    }
    659669
     
    986996        return buf.toString();
    987997    }
    988 
    989     /**
    990      *  @since 0.8.9
    991      */
    992     private static class LookupCache extends LinkedHashMap<Hash, Destination> {
    993         private final int _max;
    994 
    995         public LookupCache(int max) {
    996             super(max, 0.75f, true);
    997             _max = max;
    998         }
    999 
    1000         @Override
    1001         protected boolean removeEldestEntry(Map.Entry<Hash, Destination> eldest) {
    1002             return size() > _max;
    1003         }
    1004     }
    1005998}
  • core/java/src/net/i2p/client/naming/BlockfileNamingService.java

    r5056706 r6162908  
    3030import net.i2p.data.Destination;
    3131import net.i2p.data.Hash;
     32import net.i2p.util.LHMCache;
    3233import net.i2p.util.Log;
    3334import net.i2p.util.SecureFileOutputStream;
     
    135136        _lists = new ArrayList();
    136137        _invalid = new ArrayList();
    137         _negativeCache = new LHM(NEGATIVE_CACHE_SIZE);
     138        _negativeCache = new LHMCache(NEGATIVE_CACHE_SIZE);
    138139        BlockFile bf = null;
    139140        RAIFile raf = null;
  • core/java/src/net/i2p/client/naming/DummyNamingService.java

    r5056706 r6162908  
    88package net.i2p.client.naming;
    99
    10 import java.util.LinkedHashMap;
    1110import java.util.Locale;
    1211import java.util.Map;
     
    1514import net.i2p.I2PAppContext;
    1615import net.i2p.data.Destination;
     16import net.i2p.util.LHMCache;
    1717
    1818/**
     
    3131     *  are invalidated.
    3232     */
    33     private static final Map<String, Destination> _cache = new LHM(CACHE_MAX_SIZE);
     33    private static final Map<String, Destination> _cache = new LHMCache(CACHE_MAX_SIZE);
    3434
    3535    /**
     
    116116        }
    117117    }
    118 
    119     protected static class LHM<K, V> extends LinkedHashMap<K, V> {
    120         private final int _max;
    121 
    122         public LHM(int max) {
    123             super(max, 0.75f, true);
    124             _max = max;
    125         }
    126 
    127         @Override
    128         protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
    129             return size() > _max;
    130         }
    131     }
    132118}
  • core/java/src/net/i2p/crypto/CryptixAESEngine.java

    r5056706 r6162908  
    2222import net.i2p.data.DataHelper;
    2323import net.i2p.data.SessionKey;
    24 import net.i2p.util.ByteCache;
    2524import net.i2p.util.Log;
     25import net.i2p.util.SimpleByteCache;
    2626
    2727/**
     
    3838    // keys are now cached in the SessionKey objects
    3939    //private CryptixAESKeyCache _cache;
    40    
    41     private static final ByteCache _prevCache = ByteCache.getInstance(16, 16);
    4240   
    4341/**** see comments for main() below
     
    167165        if (length % 16 != 0) numblock++;
    168166
    169         ByteArray prevA = _prevCache.acquire();
    170         byte prev[] = prevA.getData();
    171         ByteArray curA = _prevCache.acquire();
    172         byte cur[] = curA.getData();
     167        byte prev[] = SimpleByteCache.acquire(16);
     168        byte cur[] = SimpleByteCache.acquire(16);
    173169        System.arraycopy(iv, ivOffset, prev, 0, 16);
    174170       
     
    191187         */
    192188       
    193         _prevCache.release(prevA);
    194         _prevCache.release(curA);
     189        SimpleByteCache.release(prev);
     190        SimpleByteCache.release(cur);
    195191    }
    196192   
  • core/java/src/net/i2p/data/RouterAddress.java

    r5056706 r6162908  
    1818import java.util.Properties;
    1919
     20import net.i2p.util.Addresses;
    2021import net.i2p.util.OrderedProperties;
    2122
     
    3435public class RouterAddress extends DataStructureImpl {
    3536    private int _cost;
    36     private Date _expiration;
     37    //private Date _expiration;
    3738    private String _transportStyle;
    3839    private final Properties _options;
     40    // cached values
     41    private byte[] _ip;
     42    private int _port;
     43
     44    public static final String PROP_HOST = "host";
     45    public static final String PROP_PORT = "port";
    3946
    4047    public RouterAddress() {
     
    6976     *
    7077     * @deprecated unused for now
     78     * @return null always
    7179     */
    7280    public Date getExpiration() {
    73         return _expiration;
     81        //return _expiration;
     82        return null;
    7483    }
    7584
     
    7887     *
    7988     * Unused for now, always null
     89     * @deprecated unused for now
    8090     */
    8191    public void setExpiration(Date expiration) {
    82         _expiration = expiration;
     92        //_expiration = expiration;
    8393    }
    8494
     
    142152   
    143153    /**
     154     *  Caching version of InetAddress.getByName(getOption("host")).getAddress(), which is slow.
     155     *  Caches numeric host names only.
     156     *  Will resolve but not cache resolution of DNS host names.
     157     *
     158     *  @return IP or null
     159     *  @since 0.9.3
     160     */
     161    public byte[] getIP() {
     162        if (_ip != null)
     163            return _ip;
     164        byte[] rv = null;
     165        String host = _options.getProperty(PROP_HOST);
     166        if (host != null) {
     167            rv = Addresses.getIP(host);
     168            if (rv != null &&
     169                (host.replaceAll("[0-9\\.]", "").length() == 0 ||
     170                 host.replaceAll("[0-9a-fA-F:]", "").length() == 0)) {
     171                _ip = rv;
     172            }
     173        }
     174        return rv;
     175    }
     176   
     177    /**
     178     *  Caching version of Integer.parseInt(getOption("port"))
     179     *  Caches valid ports 1-65535 only.
     180     *
     181     *  @return 1-65535 or 0 if invalid
     182     *  @since 0.9.3
     183     */
     184    public int getPort() {
     185        if (_port != 0)
     186            return _port;
     187        String port = _options.getProperty(PROP_PORT);
     188        if (port != null) {
     189            try {
     190                int rv = Integer.parseInt(port);
     191                if (rv > 0 && rv <= 65535)
     192                    _port = rv;
     193            } catch (NumberFormatException nfe) {}
     194        }
     195        return _port;
     196    }
     197
     198    /**
    144199     *  @throws IllegalStateException if was already read in
    145200     */
     
    148203            throw new IllegalStateException();
    149204        _cost = (int) DataHelper.readLong(in, 1);
    150         _expiration = DataHelper.readDate(in);
     205        //_expiration = DataHelper.readDate(in);
     206        DataHelper.readDate(in);
    151207        _transportStyle = DataHelper.readString(in);
    152208        // reduce Object proliferation
     
    162218            throw new DataFormatException("Not enough data to write a router address");
    163219        DataHelper.writeLong(out, 1, _cost);
    164         DataHelper.writeDate(out, _expiration);
     220        //DataHelper.writeDate(out, _expiration);
     221        DataHelper.writeDate(out, null);
    165222        DataHelper.writeString(out, _transportStyle);
    166223        DataHelper.writeProperties(out, _options);
     
    199256        buf.append("\n\tTransportStyle: ").append(_transportStyle);
    200257        buf.append("\n\tCost: ").append(_cost);
    201         buf.append("\n\tExpiration: ").append(_expiration);
    202         if (_options != null) {
     258        //buf.append("\n\tExpiration: ").append(_expiration);
    203259            buf.append("\n\tOptions: #: ").append(_options.size());
    204260            for (Map.Entry e : _options.entrySet()) {
     
    207263                buf.append("\n\t\t[").append(key).append("] = [").append(val).append("]");
    208264            }
    209         }
    210265        buf.append("]");
    211266        return buf.toString();
  • core/java/src/net/i2p/data/SDSCache.java

    r5056706 r6162908  
    77import java.lang.reflect.Constructor;
    88import java.lang.reflect.InvocationTargetException;
    9 import java.util.LinkedHashMap;
    109import java.util.Map;
    1110
    1211import net.i2p.I2PAppContext;
     12import net.i2p.util.LHMCache;
    1313import net.i2p.util.SimpleByteCache;
    1414
     
    7272    public SDSCache(Class<V> rvClass, int len, int max) {
    7373        int size = (int) (max * FACTOR);
    74         _cache = new LHM(size);
     74        _cache = new LHMCache(size);
    7575        _datalen = len;
    7676        try {
     
    9999
    100100    /**
     101     *  WARNING - If the SDS is found in the cache, the passed-in
     102     *  byte array will be returned to the SimpleByteCache for reuse.
     103     *  Do NOT save a reference to the passed-in data, or use or modify it,
     104     *  after this call.
     105     *
    101106     *  @param data non-null, the byte array for the SimpleDataStructure
    102107     *  @return the cached value if available, otherwise
     
    177182        return Integer.valueOf(rv);
    178183    }
    179 
    180     private static class LHM<K, V> extends LinkedHashMap<K, V> {
    181         private final int _max;
    182 
    183         public LHM(int max) {
    184             super(max, 0.75f, true);
    185             _max = max;
    186         }
    187 
    188         @Override
    189         protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
    190             return size() > _max;
    191         }
    192     }
    193184}
  • core/java/src/net/i2p/internal/I2CPMessageQueue.java

    r5056706 r6162908  
    2323     */
    2424    public abstract boolean offer(I2CPMessage msg);
     25
     26    /**
     27     *  Send a message, blocking.
     28     *  @param timeout how long to wait for space (ms)
     29     *  @return success (false if no space available or if timed out)
     30     *  @since 0.9.3
     31     */
     32    public abstract boolean offer(I2CPMessage msg, long timeout) throws InterruptedException;
    2533
    2634    /**
  • core/java/src/net/i2p/util/Addresses.java

    r5056706 r6162908  
    1111import java.net.UnknownHostException;
    1212import java.util.Enumeration;
     13import java.util.LinkedHashMap;
     14import java.util.Map;
    1315import java.util.Set;
    1416import java.util.SortedSet;
    1517import java.util.TreeSet;
    1618
     19import net.i2p.I2PAppContext;
    1720
    1821/**
     
    154157
    155158    /**
     159     *  Textual IP to bytes, because InetAddress.getByName() is slow.
     160     *
     161     *  @since 0.9.3
     162     */
     163    private static final Map<String, byte[]> _IPAddress;
     164
     165    static {
     166        int size;
     167        I2PAppContext ctx = I2PAppContext.getCurrentContext();
     168        if (ctx != null && ctx.isRouterContext()) {
     169            long maxMemory = Runtime.getRuntime().maxMemory();
     170            if (maxMemory == Long.MAX_VALUE)
     171                maxMemory = 96*1024*1024l;
     172            long min = 128;
     173            long max = 4096;
     174            // 512 nominal for 128 MB
     175            size = (int) Math.max(min, Math.min(max, 1 + (maxMemory / (256*1024))));
     176        } else {
     177            size = 32;
     178        }
     179        _IPAddress = new LHMCache(size);
     180    }
     181
     182    /**
     183     *  Caching version of InetAddress.getByName(host).getAddress(), which is slow.
     184     *  Caches numeric host names only.
     185     *  Will resolve but not cache DNS host names.
     186     *
     187     *  @param host DNS or IPv4 or IPv6 host name; if null returns null
     188     *  @return IP or null
     189     *  @since 0.9.3
     190     */
     191    public static byte[] getIP(String host) {
     192        if (host == null)
     193            return null;
     194        byte[] rv;
     195        synchronized (_IPAddress) {
     196            rv = _IPAddress.get(host);
     197        }
     198        if (rv == null) {
     199            try {
     200                rv = InetAddress.getByName(host).getAddress();
     201                if (host.replaceAll("[0-9\\.]", "").length() == 0 ||
     202                    host.replaceAll("[0-9a-fA-F:]", "").length() == 0) {
     203                    synchronized (_IPAddress) {
     204                        _IPAddress.put(host, rv);
     205                    }
     206                }
     207            } catch (UnknownHostException uhe) {}
     208        }
     209        return rv;
     210    }
     211
     212    /**
     213     *  @since 0.9.3
     214     */
     215    public static void clearCaches() {
     216        synchronized(_IPAddress) {
     217            _IPAddress.clear();
     218        }
     219    }
     220
     221    /**
    156222     *  Print out the local addresses
    157223     */
  • core/java/src/net/i2p/util/ByteCache.java

    r5056706 r6162908  
    1414 * should be held onto as long as the  data referenced in it is needed.
    1515 *
     16 * For small arrays where the management of valid bytes in ByteArray
     17 * and prezeroing isn't required, use SimpleByteArray instead.
     18 *
    1619 * Heap size control - survey of usage (April 2010) :
    1720 *
    1821 *  <pre>
    1922        Size    Max     MaxMem  From
    20 
    21         16      16      256     CryptixAESEngine
    22         16      32      512     BloomFilterIVValidator
    23         16      64      1K      UDP PacketBuilder
    24         16      128     2K      tunnel HopProcessor
    25         16      128     2K      tunnel TrivialPreprocessor
    26         16      128     2K      tunnel InboundEndpointProcessor
    27         16      128     2K      tunnel OutboundGatewayProcessor
    28 
    29         32      64      2K      UDP PacketBuilder
    30         32      128     4K      tunnel TrivialPreprocessor
    3123
    3224        1K      32      32K     tunnel TrivialPreprocessor
  • core/java/src/net/i2p/util/LogRecord.java

    r5056706 r6162908  
    11package net.i2p.util;
     2
     3import net.i2p.data.DataHelper;
    24
    35/*
     
    6062        return _throwable;
    6163    }
     64
     65    /**
     66     *  Matches source class, message string, and throwable class only.
     67     *  @since 0.9.3
     68     */
     69    @Override
     70    public boolean equals(Object o) {
     71        if (!(o instanceof LogRecord))
     72            return false;
     73        LogRecord r = (LogRecord) o;
     74        return _source == r._source &&
     75               DataHelper.eq(_message, r._message) &&
     76               ((_throwable == null && r._throwable == null) ||
     77                (_throwable != null && r._throwable != null && _throwable.getClass() == r._throwable.getClass()));
     78    }
    6279}
  • core/java/src/net/i2p/util/LogWriter.java

    r5056706 r6162908  
    2727    private final static long CONFIG_READ_INTERVAL = 50 * 1000;
    2828    private final static long FLUSH_INTERVAL = 9 * 1000;
    29     private long _lastReadConfig = 0;
    30     private long _numBytesInCurrentFile = 0;
     29    private long _lastReadConfig;
     30    private long _numBytesInCurrentFile;
    3131    // volatile as it changes on log file rotation
    3232    private volatile Writer _currentOut;
     
    6767
    6868    public void flushRecords() { flushRecords(true); }
     69
    6970    public void flushRecords(boolean shouldWait) {
    7071        try {
     
    7374            if (records == null) return;
    7475            if (!records.isEmpty()) {
     76                LogRecord last = null;
    7577                LogRecord rec;
     78                int dupCount = 0;
    7679                while ((rec = records.poll()) != null) {
    77                     writeRecord(rec);
    78                 }
     80                    if (rec.equals(last)) {
     81                        dupCount++;
     82                    } else {
     83                        if (dupCount > 0) {
     84                            if (dupCount == 1)
     85                                writeRecord("*** 1 similar message omitted\n");
     86                            else
     87                                writeRecord("*** " + dupCount + " similar messages omitted\n");
     88                            dupCount = 0;
     89                        }
     90                        last = rec;
     91                        writeRecord(rec);
     92                    }
     93                }
     94                if (dupCount == 1)
     95                    writeRecord("*** 1 similar message omitted\n");
     96                else if (dupCount > 0)
     97                    writeRecord("*** " + dupCount + " similar messages omitted\n");
    7998                try {
    8099                    if (_currentOut != null)
  • core/java/src/net/i2p/util/SimpleByteCache.java

    r5056706 r6162908  
    1919    private static final Map<Integer, SimpleByteCache> _caches = new ConcurrentHashMap(8);
    2020
    21     private static final int DEFAULT_SIZE = 16;
     21    private static final int DEFAULT_SIZE = 64;
    2222
    2323    /** up to this, use ABQ to minimize object churn and for performance; above this, use LBQ for two locks */
  • core/java/src/net/i2p/util/SimpleStore.java

    r5056706 r6162908  
    66
    77/**
     8 *  Deprecated - used only by SimpleTimer
    89 *
    910 * @author sponge
  • core/java/src/net/i2p/util/SimpleTimer.java

    r5056706 r6162908  
    2222    /**
    2323     *  If you have a context, use context.simpleTimer() instead
     24     *  @deprecated use SimpleTimer2
    2425     */
    2526    public static SimpleTimer getInstance() {
     
    4041     *  To be instantiated by the context.
    4142     *  Others should use context.simpleTimer() instead
     43     *  @deprecated use SimpleTimer2
    4244     */
    4345    public SimpleTimer(I2PAppContext context) {
     
    4850     *  To be instantiated by the context.
    4951     *  Others should use context.simpleTimer() instead
     52     *  @deprecated use SimpleTimer2
    5053     */
    5154    private SimpleTimer(I2PAppContext context, String name) {
     
    147150                }
    148151            }
     152            // FIXME if you plan to use this class again
    149153            while (_events.containsKey(time))
    150154                time = new Long(time.longValue() + 1);
  • core/java/src/net/i2p/util/SimpleTimer2.java

    r5056706 r6162908  
    206206
    207207        /**
    208          *  More efficient than reschedule().
    209          *  Only call this after calling the non-scheduling constructor,
    210          *  or from within timeReached(), or you will get duplicates on the queue.
    211          *  Otherwise use reschedule().
     208         *  Slightly more efficient than reschedule().
     209         *  Does nothing if already scheduled.
    212210         */
    213211        public synchronized void schedule(long timeoutMs) {
     
    237235        /**
    238236         * Use the earliest of the new time and the old time
    239          * Do not call from within timeReached()
     237         * May be called from within timeReached(), but schedule() is
     238         * better there.
    240239         *
    241240         * @param timeoutMs
     
    246245
    247246        /**
    248          * useEarliestTime must be false if called from within timeReached(), as
    249          * it won't be rescheduled, in favor of the currently running task
     247         * May be called from within timeReached(), but schedule() is
     248         * better there.
    250249         *
    251250         * @param timeoutMs
  • core/java/src/net/i2p/util/SocketTimeout.java

    r5056706 r6162908  
    1515 *  Use socket.setsotimeout instead?
    1616 */
    17 public class SocketTimeout implements SimpleTimer.TimedEvent {
     17public class SocketTimeout extends SimpleTimer2.TimedEvent {
    1818    private Socket _targetSocket;
    1919    private long _startTime;
     
    2525    public SocketTimeout(long delay) { this(null, delay); }
    2626    public SocketTimeout(Socket socket, long delay) {
     27        super(SimpleTimer2.getInstance());
    2728        _inactivityDelay = delay;
    2829        _targetSocket = socket;
     
    3031        _lastActivity = _startTime = System.currentTimeMillis();
    3132        _totalTimeoutTime = -1;
    32         SimpleTimer.getInstance().addEvent(SocketTimeout.this, delay);
     33        schedule(delay);
    3334    }
    3435    public void timeReached() {
     
    4546            if (_command != null) _command.run();
    4647        }  else {
    47             SimpleTimer.getInstance().addEvent(SocketTimeout.this, _inactivityDelay);
     48            schedule(_inactivityDelay);
    4849        }
    4950    }
    5051   
    51     public void cancel() {
     52    public boolean cancel() {
    5253        _cancelled = true;
    53         SimpleTimer.getInstance().removeEvent(SocketTimeout.this);
     54        return super.cancel();
    5455    }
    5556    public void setSocket(Socket s) { _targetSocket = s; }
  • router/java/src/net/i2p/router/Blocklist.java

    r5056706 r6162908  
    2525
    2626import net.i2p.data.Base64;
     27import net.i2p.data.DataHelper;
    2728import net.i2p.data.Hash;
    2829import net.i2p.data.RouterAddress;
     
    439440     */
    440441    public void add(String ip) {
    441         InetAddress pi;
    442         try {
    443             pi = InetAddress.getByName(ip);
    444         } catch (UnknownHostException uhe) {
    445             return;
    446         }
    447         if (pi == null) return;
    448         byte[] pib = pi.getAddress();
     442        byte[] pib = Addresses.getIP(ip);
     443        if (pib == null) return;
    449444        add(pib);
    450445    }
     
    479474        RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
    480475        if (pinfo == null) return rv;
    481         String oldphost = null;
     476        byte[] oldpib = null;
    482477        // for each peer address
    483478        for (RouterAddress pa : pinfo.getAddresses()) {
    484             String phost = pa.getOption("host");
    485             if (phost == null) continue;
    486             if (oldphost != null && oldphost.equals(phost)) continue;
    487             oldphost = phost;
    488             InetAddress pi;
    489             try {
    490                 pi = InetAddress.getByName(phost);
    491             } catch (UnknownHostException uhe) {
    492                 continue;
    493             }
    494             if (pi == null) continue;
    495             byte[] pib = pi.getAddress();
     479            byte[] pib = pa.getIP();
     480            if (pib == null) continue;
     481            if (DataHelper.eq(oldpib, pib)) continue;
     482            oldpib = pib;
    496483            rv.add(pib);
    497484         }
     
    521508     */
    522509    public boolean isBlocklisted(String ip) {
    523         InetAddress pi;
    524         try {
    525             pi = InetAddress.getByName(ip);
    526         } catch (UnknownHostException uhe) {
    527             return false;
    528         }
    529         if (pi == null) return false;
    530         byte[] pib = pi.getAddress();
     510        byte[] pib = Addresses.getIP(ip);
     511        if (pib == null) return false;
    531512        return isBlocklisted(pib);
    532513    }
  • router/java/src/net/i2p/router/OutNetMessage.java

    r5056706 r6162908  
    2121import net.i2p.data.RouterInfo;
    2222import net.i2p.data.i2np.I2NPMessage;
     23import net.i2p.router.util.CDPQEntry;
    2324import net.i2p.util.Log;
    2425
     
    2829 *
    2930 */
    30 public class OutNetMessage {
     31public class OutNetMessage implements CDPQEntry {
    3132    private final Log _log;
    3233    private final RouterContext _context;
     
    4849    //private Exception _createdBy;
    4950    private final long _created;
     51    private long _enqueueTime;
     52    private long _seqNum;
    5053    /** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
    5154    private HashMap<String, Long> _timestamps;
     
    5760    private Object _preparationBuf;
    5861   
     62    /**
     63     *  Priorities, higher is higher priority.
     64     *  @since 0.9.3
     65     */
     66    public static final int PRIORITY_HIGHEST = 1000;
     67    public static final int PRIORITY_MY_BUILD_REQUEST = 500;
     68    public static final int PRIORITY_MY_NETDB_LOOKUP = 500;
     69    public static final int PRIORITY_MY_NETDB_STORE = 400;
     70    public static final int PRIORITY_MY_DATA = 400;
     71    public static final int PRIORITY_MY_NETDB_STORE_LOW = 300;
     72    public static final int PRIORITY_HIS_BUILD_REQUEST = 300;
     73    public static final int PRIORITY_BUILD_REPLY = 300;
     74    public static final int PRIORITY_NETDB_REPLY = 300;
     75    public static final int PRIORITY_HIS_NETDB_STORE = 200;
     76    public static final int PRIORITY_NETDB_FLOOD = 200;
     77    public static final int PRIORITY_PARTICIPATING = 200;
     78    public static final int PRIORITY_NETDB_EXPLORE = 100;
     79    public static final int PRIORITY_NETDB_HARVEST = 100;
     80    public static final int PRIORITY_LOWEST = 100;
     81
    5982    public OutNetMessage(RouterContext context) {
    6083        _context = context;
     
    264287    /** time the transport tries to send the message (including any queueing) */
    265288    public long getSendTime() { return _context.clock().now() - _sendBegin; }
     289
     290    /**
     291     *  For CDQ
     292     *  @since 0.9.3
     293     */
     294    public void setEnqueueTime(long now) {
     295        _enqueueTime = now;
     296    }
     297
     298    /**
     299     *  For CDQ
     300     *  @since 0.9.3
     301     */
     302    public long getEnqueueTime() {
     303        return _enqueueTime;
     304    }
     305
     306    /**
     307     *  For CDQ
     308     *  @since 0.9.3
     309     */
     310    public void drop() {
     311    }
     312
     313    /**
     314     *  For CDPQ
     315     *  @since 0.9.3
     316     */
     317    public void setSeqNum(long num) {
     318        _seqNum = num;
     319    }
     320
     321    /**
     322     *  For CDPQ
     323     *  @since 0.9.3
     324     */
     325    public long getSeqNum() {
     326        return _seqNum;
     327    }
    266328
    267329    /**
  • router/java/src/net/i2p/router/Router.java

    r5056706 r6162908  
    4141import net.i2p.router.transport.FIFOBandwidthLimiter;
    4242import net.i2p.router.transport.udp.UDPTransport;
     43import net.i2p.router.util.EventLog;
    4344import net.i2p.stat.RateStat;
    4445import net.i2p.stat.StatManager;
     
    7879    private RouterWatchdog _watchdog;
    7980    private Thread _watchdogThread;
     81    private final EventLog _eventLog;
    8082   
    8183    public final static String PROP_CONFIG_FILE = "router.configLocation";
     
    101103    public final static String PROP_SHUTDOWN_IN_PROGRESS = "__shutdownInProgress";
    102104    public final static String DNS_CACHE_TIME = "" + (5*60);
     105    private static final String EVENTLOG = "eventlog.txt";
    103106       
    104107    private static final String originalTimeZoneID;
     
    220223        // i2p.dir.base defaults to user.dir == $CWD
    221224        _context = new RouterContext(this, envProps);
     225        _eventLog = new EventLog(_context, new File(_context.getRouterDir(), EVENTLOG));
    222226
    223227        // This is here so that we can get the directory location from the context
     
    226230        // a NCDFE
    227231        if (!isOnlyRouterRunning()) {
     232            _eventLog.addEvent(EventLog.ABORTED, "Another router running");
    228233            System.err.println("ERROR: There appears to be another router already running!");
    229234            System.err.println("       Please make sure to shut down old instances before starting up");
     
    411416        if (_isAlive)
    412417            throw new IllegalStateException();
     418        String last = _config.get("router.previousFullVersion");
     419        if (last != null) {
     420            _eventLog.addEvent(EventLog.UPDATED, "from " + last + " to " + RouterVersion.FULL_VERSION);
     421            saveConfig("router.previousFullVersion", null);
     422        }
     423        _eventLog.addEvent(EventLog.STARTED, RouterVersion.FULL_VERSION);
    413424        startupStuff();
    414425        _isAlive = true;
     
    633644   
    634645    /**
     646     *  @since 0.9.3
     647     */
     648    public EventLog eventLog() {
     649        return _eventLog;
     650    }
     651   
     652    /**
    635653     * Ugly list of files that we need to kill if we are building a new identity
    636654     *
     
    647665                                                               };
    648666
    649     static final String IDENTLOG = "identlog.txt";
    650667    public void killKeys() {
    651668        //new Exception("Clearing identity files").printStackTrace();
     
    672689
    673690        if (remCount > 0) {
    674             FileOutputStream log = null;
    675             try {
    676                 log = new FileOutputStream(new File(_context.getRouterDir(), IDENTLOG), true);
    677                 log.write((new Date() + ": Old router identity keys cleared\n").getBytes());
    678             } catch (IOException ioe) {
    679                 // ignore
    680             } finally {
    681                 if (log != null)
    682                     try { log.close(); } catch (IOException ioe) {}
    683             }
    684         }
    685     }
     691            _eventLog.addEvent(EventLog.REKEYED);
     692        }
     693    }
     694
    686695    /**
    687696     * Rebuild a new identity the hard way - delete all of our old identity
     
    837846        _watchdog.shutdown();
    838847        _watchdogThread.interrupt();
     848        _eventLog.addEvent(EventLog.STOPPED, Integer.toString(exitCode));
    839849        finalShutdown(exitCode);
    840850    }
     
    11401150                // Set the last version to the current version, since 0.8.13
    11411151                _config.put("router.previousVersion", RouterVersion.VERSION);
     1152                _config.put("router.previousFullVersion", RouterVersion.FULL_VERSION);
    11421153                saveConfig();
    11431154                ok = FileUtil.extractZip(updateFile, _context.getBaseDir());
  • router/java/src/net/i2p/router/client/ClientManager.java

    r5056706 r6162908  
    5353    private static final String PROP_ENABLE_SSL = "i2cp.SSL";
    5454
     55    private static final int INTERNAL_QUEUE_SIZE = 256;
     56
    5557    public ClientManager(RouterContext context, int port) {
    5658        _ctx = context;
     
    126128        if (!_isStarted)
    127129            throw new I2PSessionException("Router client manager is shut down");
    128         // for now we make these unlimited size
    129         LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
    130         LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
     130        LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
     131        LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
    131132        I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
    132133        I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
  • router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java

    r5056706 r6162908  
    22
    33import java.util.concurrent.BlockingQueue;
     4import java.util.concurrent.TimeUnit;
    45
    56import net.i2p.data.i2cp.I2CPMessage;
     
    3435
    3536    /**
     37     *  Send a message, blocking.
     38     *  @param timeout how long to wait for space (ms)
     39     *  @return success (false if no space available or if timed out)
     40     *  @since 0.9.3
     41     */
     42    public boolean offer(I2CPMessage msg, long timeout) throws InterruptedException {
     43        return _out.offer(msg, timeout, TimeUnit.MILLISECONDS);
     44    }
     45
     46    /**
    3647     *  Receive a message, nonblocking
    3748     *  @return message or null if none available
  • router/java/src/net/i2p/router/message/GarlicMessageHandler.java

    r5056706 r6162908  
    2020 * HandlerJobBuilder to build jobs to handle GarlicMessages
    2121 *
     22 * This is essentially unused, as InNetMessagePool short circuits tunnel messages,
     23 * and the garlics are handled in InboundMessageDistributor.
     24 * Unless we get a garlic message not down a tunnel?
    2225 */
    2326public class GarlicMessageHandler implements HandlerJobBuilder {
  • router/java/src/net/i2p/router/message/GarlicMessageReceiver.java

    r5056706 r6162908  
    9898    private void handleClove(GarlicClove clove) {
    9999        if (!isValid(clove)) {
    100             if (_log.shouldLog(Log.DEBUG))
    101                 _log.warn("Invalid clove " + clove);
     100            //if (_log.shouldLog(Log.WARN))
     101            //    _log.warn("Invalid clove " + clove);
    102102            return;
    103103        }
     104        //if (_log.shouldLog(Log.DEBUG))
     105        //    _log.debug("valid clove " + clove);
    104106        _receiver.handleClove(clove.getInstructions(), clove.getData());
    105107    }
  • router/java/src/net/i2p/router/message/HandleGarlicMessageJob.java

    r5056706 r6162908  
    1616import net.i2p.data.i2np.TunnelGatewayMessage;
    1717import net.i2p.router.JobImpl;
     18import net.i2p.router.OutNetMessage;
    1819import net.i2p.router.RouterContext;
    1920import net.i2p.util.Log;
     
    2526 * need to be. soon)
    2627 *
     28 * This is essentially unused, as InNetMessagePool short circuits tunnel messages,
     29 * and the garlics are handled in InboundMessageDistributor.
     30 * Unless we get a garlic message not down a tunnel?
    2731 */
    2832class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.CloveReceiver {
     
    3539    //private GarlicMessageParser _parser;
    3640   
     41    private final static int ROUTER_PRIORITY = OutNetMessage.PRIORITY_LOWEST;
     42    private final static int TUNNEL_PRIORITY = OutNetMessage.PRIORITY_LOWEST;
     43
    3744    /**
    3845     *  @param from ignored
     
    4350        _log = context.logManager().getLog(HandleGarlicMessageJob.class);
    4451        getContext().statManager().createRateStat("crypto.garlic.decryptFail", "How often garlic messages are undecryptable", "Encryption", new long[] { 5*60*1000, 60*60*1000, 24*60*60*1000 });
    45         if (_log.shouldLog(Log.DEBUG))
    46             _log.debug("New handle garlicMessageJob called w/ message from [" + from + "]", new Exception("Debug"));
     52        if (_log.shouldLog(Log.WARN))
     53            _log.warn("Garlic Message not down a tunnel??? from [" + from + "]", new Exception("I did it"));
    4754        _message = msg;
    4855        //_from = from;
     
    7986                    if (_log.shouldLog(Log.DEBUG))
    8087                        _log.debug("router delivery instructions targetting "
    81                                    + instructions.getRouter().toBase64().substring(0,4));
     88                                   + instructions.getRouter().toBase64().substring(0,4) + " for " + data);
    8289                    SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data,
    8390                                                                      instructions.getRouter(),
    84                                                                       10*1000, 100);
     91                                                                      10*1000, ROUTER_PRIORITY);
    8592                    // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
    8693                    j.runJob();
     
    93100                gw.setTunnelId(instructions.getTunnelId());
    94101                gw.setMessageExpiration(data.getMessageExpiration());
     102                if (_log.shouldLog(Log.DEBUG))
     103                    _log.debug("tunnel delivery instructions targetting "
     104                               + instructions.getRouter().toBase64().substring(0,4) + " for " + data);
    95105                SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw,
    96106                                                                    instructions.getRouter(),
    97                                                                     10*1000, 100);
     107                                                                    10*1000, TUNNEL_PRIORITY);
    98108                // run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
    99109                job.runJob();
  • router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java

    r5056706 r6162908  
    2525import net.i2p.router.Job;
    2626import net.i2p.router.JobImpl;
     27import net.i2p.router.OutNetMessage;
    2728import net.i2p.router.Router;
    2829import net.i2p.router.RouterContext;
     
    4142    private final static int CLOSENESS_THRESHOLD = 8; // FNDF.MAX_TO_FLOOD + 1
    4243    private final static int REPLY_TIMEOUT = 60*1000;
    43     private final static int MESSAGE_PRIORITY = 300;
     44    private final static int MESSAGE_PRIORITY = OutNetMessage.PRIORITY_NETDB_REPLY;
    4445   
    4546    /**
     
    284285            m.setMessageExpiration(message.getMessageExpiration());
    285286            m.setTunnelId(replyTunnel);
    286             SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, 100);
     287            SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY);
    287288            j.runJob();
    288289            //getContext().jobQueue().addJob(j);
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java

    r5056706 r6162908  
    4141     */
    4242    private static final int MAX_TO_FLOOD = 4;
     43   
     44    private static final int FLOOD_PRIORITY = OutNetMessage.PRIORITY_NETDB_FLOOD;
     45    private static final int FLOOD_TIMEOUT = 30*1000;
    4346   
    4447    public FloodfillNetworkDatabaseFacade(RouterContext context) {
     
    225228    }
    226229
    227     private static final int FLOOD_PRIORITY = 200;
    228     private static final int FLOOD_TIMEOUT = 30*1000;
    229    
    230230    @Override
    231231    protected PeerSelector createPeerSelector() { return new FloodfillPeerSelector(_context); }
  • router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java

    r5056706 r6162908  
    1212import net.i2p.data.i2np.DatabaseLookupMessage;
    1313import net.i2p.router.JobImpl;
     14import net.i2p.router.OutNetMessage;
    1415import net.i2p.router.RouterContext;
    1516import net.i2p.router.TunnelInfo;
     
    4041    private static final int MAX_PER_RUN = 5;
    4142    /** background job, who cares */
    42     private static final int PRIORITY = 100;
     43    private static final int PRIORITY = OutNetMessage.PRIORITY_NETDB_HARVEST;
    4344   
    4445    public static final String PROP_ENABLED = "netDb.shouldHarvest";
  • router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java

    r5056706 r6162908  
    2626import net.i2p.router.Job;
    2727import net.i2p.router.JobImpl;
     28import net.i2p.router.OutNetMessage;
    2829import net.i2p.router.RouterContext;
    2930import net.i2p.router.TunnelInfo;
     
    7980     */
    8081    private static final long REQUEUE_DELAY = 1000;
     82
     83    // TODO pass to the tunnel dispatcher
     84    //private final static int LOOKUP_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_LOOKUP;
     85    //private final static int STORE_PRIORITY = OutNetMessage.PRIORITY_HIS_NETDB_STORE;
    8186   
    8287    /**
     
    446451            _floodfillSearchesOutstanding++;
    447452        getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
     453        // TODO pass a priority to the dispatcher
    448454        getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, to);
    449455    }
     
    653659            if (_log.shouldLog(Log.DEBUG))
    654660                _log.debug("resending leaseSet out to " + to + " through " + outTunnel + ": " + msg);
     661            // TODO pass a priority to the dispatcher
    655662            getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to);
    656663            return true;
  • router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java

    r5056706 r6162908  
    4444    private final static int PARALLELIZATION = 4; // how many sent at a time
    4545    private final static int REDUNDANCY = 4; // we want the data sent to 6 peers
    46     private final static int STORE_PRIORITY = 100;
     46    private final static int STORE_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE;
    4747   
    4848    /**
  • router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java

    r5056706 r6162908  
    12621262            return rv;
    12631263        for (RouterAddress pa : paddr) {
    1264             String phost = pa.getOption("host");
    1265             if (phost == null) continue;
    1266             InetAddress pi;
    1267             try {
    1268                 pi = InetAddress.getByName(phost);
    1269             } catch (UnknownHostException uhe) {
    1270                 continue;
    1271             }
    1272             if (pi == null) continue;
    1273             byte[] pib = pi.getAddress();
     1264            byte[] pib = pa.getIP();
     1265            if (pib == null) continue;
    12741266            rv.add(maskedIP(pib, mask));
    12751267        }
  • router/java/src/net/i2p/router/tasks/RouterWatchdog.java

    r5056706 r6162908  
    77import net.i2p.router.Router;
    88import net.i2p.router.RouterContext;
     9import net.i2p.router.util.EventLog;
    910import net.i2p.stat.Rate;
    1011import net.i2p.stat.RateStat;
     
    108109            if (_consecutiveErrors == 1) {
    109110                _log.log(Log.CRIT, "Router appears hung, or there is severe network congestion.  Watchdog starts barking!");
     111                 _context.router().eventLog().addEvent(EventLog.WATCHDOG);
    110112                // This works on linux...
    111113                // It won't on windows, and we can't call i2prouter.bat either, it does something
  • router/java/src/net/i2p/router/tasks/ShutdownHook.java

    r5056706 r6162908  
    1111import net.i2p.router.Router;
    1212import net.i2p.router.RouterContext;
     13import net.i2p.router.RouterVersion;
     14import net.i2p.router.util.EventLog;
    1315import net.i2p.util.Log;
    1416
     
    3638        // and thinks we haven't shut down, possibly because it
    3739        // prevents other shutdown hooks from running
     40        _context.router().eventLog().addEvent(EventLog.CRASHED, RouterVersion.FULL_VERSION);
    3841        _context.router().setKillVMOnEnd(false);
    3942        _context.router().shutdown2(Router.EXIT_HARD);
  • router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java

    r5056706 r6162908  
    6363        if (_manager != null)
    6464            _manager.shutdown();
     65        _geoIP.shutdown();
    6566    }
    6667   
     
    251252        RouterAddress addr = new RouterAddress();
    252253        addr.setCost(NTCPAddress.DEFAULT_COST);
    253         addr.setExpiration(null);
     254        //addr.setExpiration(null);
    254255        addr.setOptions(props);
    255256        addr.setTransportStyle(NTCPTransport.STYLE);
  • router/java/src/net/i2p/router/transport/GeoIP.java

    r5056706 r6162908  
    2222import net.i2p.router.Router;
    2323import net.i2p.router.RouterContext;
     24import net.i2p.util.Addresses;
    2425import net.i2p.util.ConcurrentHashSet;
    2526import net.i2p.util.Log;
     
    7475
    7576    /**
     77     *  @since 0.9.3
     78     */
     79    public void shutdown() {
     80        _codeToName.clear();
     81        _codeCache.clear();
     82        _IPToCountry.clear();
     83        _pendingSearch.clear();
     84        _notFound.clear();
     85    }
     86
     87    /**
    7688     * Fire off a thread to lookup all pending IPs.
    7789     * There is no indication of completion.
     
    298310     */
    299311    public void add(String ip) {
    300         InetAddress pi;
    301         try {
    302             pi = InetAddress.getByName(ip);
    303         } catch (UnknownHostException uhe) {
    304             return;
    305         }
    306         if (pi == null) return;
    307         byte[] pib = pi.getAddress();
     312        byte[] pib = Addresses.getIP(ip);
     313        if (pib == null) return;
    308314        add(pib);
    309315    }
     
    326332     */
    327333    public String get(String ip) {
    328         InetAddress pi;
    329         try {
    330             pi = InetAddress.getByName(ip);
    331         } catch (UnknownHostException uhe) {
    332             return null;
    333         }
    334         if (pi == null) return null;
    335         byte[] pib = pi.getAddress();
     334        byte[] pib = Addresses.getIP(ip);
     335        if (pib == null) return null;
    336336        return get(pib);
    337337    }
  • router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java

    r5056706 r6162908  
    2626import net.i2p.util.ConcurrentHashSet;
    2727import net.i2p.util.Log;
    28 import net.i2p.util.SimpleTimer;
     28import net.i2p.util.SimpleTimer2;
    2929
    3030/**
     
    255255    public void renderStatusHTML(Writer out) throws IOException {}
    256256   
    257     private class CleanupTask implements SimpleTimer.TimedEvent {
     257    private class CleanupTask extends SimpleTimer2.TimedEvent {
    258258        private long _nextExpire;
    259259
    260260        public CleanupTask() {
     261            super(_context.simpleTimer2());
    261262            _nextExpire = -1;
    262263        }
     
    313314            if (_nextExpire <= now)
    314315                _nextExpire = now + 10*1000;
    315             SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
     316            schedule(_nextExpire - now);
    316317        }
    317318
     
    320321            if ( (_nextExpire <= now) || (sel.getExpiration() < _nextExpire) ) {
    321322                _nextExpire = sel.getExpiration();
    322                 SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
     323                reschedule(_nextExpire - now);
    323324            }
    324325        }
  • router/java/src/net/i2p/router/transport/TransportImpl.java

    r5056706 r6162908  
    2323import java.util.concurrent.ConcurrentHashMap;
    2424
     25import net.i2p.data.DataHelper;
    2526import net.i2p.data.Hash;
    2627import net.i2p.data.RouterAddress;
     
    3637import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
    3738import net.i2p.util.ConcurrentHashSet;
     39import net.i2p.util.LHMCache;
    3840import net.i2p.util.Log;
    3941import net.i2p.util.SimpleScheduler;
     
    5456    private final Set<Hash> _wasUnreachableEntries;
    5557    /** global router ident -> IP */
    56     private static final Map<Hash, byte[]> _IPMap = new ConcurrentHashMap(128);
     58    private static final Map<Hash, byte[]> _IPMap;
     59
     60    static {
     61        long maxMemory = Runtime.getRuntime().maxMemory();
     62        if (maxMemory == Long.MAX_VALUE)
     63            maxMemory = 96*1024*1024l;
     64        long min = 512;
     65        long max = 4096;
     66        // 1024 nominal for 128 MB
     67        int size = (int) Math.max(min, Math.min(max, 1 + (maxMemory / (128*1024))));
     68        _IPMap = new LHMCache(size);
     69    }
    5770
    5871    /**
     
    586599
    587600    public void setIP(Hash peer, byte[] ip) {
    588         _IPMap.put(peer, ip);
    589         _context.commSystem().queueLookup(ip);
     601        byte[] old;
     602        synchronized (_IPMap) {
     603            old = _IPMap.put(peer, ip);
     604        }
     605        if (!DataHelper.eq(old, ip))
     606            _context.commSystem().queueLookup(ip);
    590607    }
    591608
    592609    public static byte[] getIP(Hash peer) {
    593         return _IPMap.get(peer);
     610        synchronized (_IPMap) {
     611            return _IPMap.get(peer);
     612        }
     613    }
     614
     615    /**
     616     *  @since 0.9.3
     617     */
     618    static void clearCaches() {
     619        synchronized(_IPMap) {
     620            _IPMap.clear();
     621        }
    594622    }
    595623
  • router/java/src/net/i2p/router/transport/TransportManager.java

    r5056706 r6162908  
    186186        stopListening();
    187187        _dhThread.shutdown();
     188        Addresses.clearCaches();
     189        TransportImpl.clearCaches();
    188190    }
    189191   
  • router/java/src/net/i2p/router/transport/ntcp/NTCPAddress.java

    r5056706 r6162908  
    99 */
    1010
    11 import java.net.InetAddress;
    1211import java.util.Properties;
    1312
     
    1615import net.i2p.data.RouterAddress;
    1716import net.i2p.router.transport.TransportImpl;
     17import net.i2p.util.Addresses;
    1818import net.i2p.util.Log;
    1919
     
    2626    //private InetAddress _addr;
    2727    /** Port number used in RouterAddress definitions */
    28     public final static String PROP_PORT = "port";
     28    public final static String PROP_PORT = RouterAddress.PROP_PORT;
    2929    /** Host name used in RouterAddress definitions */
    30     public final static String PROP_HOST = "host";
     30    public final static String PROP_HOST = RouterAddress.PROP_HOST;
    3131    public static final int DEFAULT_COST = 10;
    3232   
     
    6060            return;
    6161        }
    62         String host = addr.getOption(PROP_HOST);
    63         int iport = -1;
    64         if (host == null) {
    65             _host = null;
    66         } else {
    67             _host = host.trim();
    68             String port = addr.getOption(PROP_PORT);
    69             if ( (port != null) && (port.trim().length() > 0) && !("null".equals(port)) ) {
    70                 try {
    71                     iport = Integer.parseInt(port.trim());
    72                 } catch (NumberFormatException nfe) {
    73                     Log log = I2PAppContext.getGlobalContext().logManager().getLog(NTCPAddress.class);
    74                     log.error("Invalid port [" + port + "]", nfe);
    75                 }
    76             }
    77         }
    78         _port = iport;
     62        _host = addr.getOption(PROP_HOST);
     63        _port = addr.getPort();
    7964    }
    8065   
     
    8671       
    8772        addr.setCost(DEFAULT_COST);
    88         addr.setExpiration(null);
     73        //addr.setExpiration(null);
    8974       
    9075        Properties props = new Properties();
     
    10792        return isPubliclyRoutable(_host);
    10893    }
     94
    10995    public static boolean isPubliclyRoutable(String host) {
    11096        if (host == null) return false;
    111         try {
    112             InetAddress addr = InetAddress.getByName(host);
    113             byte quad[] = addr.getAddress();
    114             // allow ipv6 for ntcpaddress, since we've still got ssu
    115             //if (quad.length != 4) {
    116             //    if (_log.shouldLog(Log.ERROR))
    117             //        _log.error("Refusing IPv6 address (" + host + " / " + addr.getHostAddress() + ") "
    118             //                   + " since not all peers support it, and we don't support restricted routes");
    119             //    return false;
    120             //}
    121             return TransportImpl.isPubliclyRoutable(quad);
    122         } catch (Throwable t) {
    123             //if (_log.shouldLog(Log.WARN))
    124             //    _log.warn("Error checking routability", t);
    125             return false;
    126         }
     97        byte quad[] = Addresses.getIP(host);
     98        return TransportImpl.isPubliclyRoutable(quad);
    12799    }
    128100   
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    r5056706 r6162908  
    55import java.nio.channels.SelectionKey;
    66import java.nio.channels.SocketChannel;
     7import java.util.ArrayList;
    78import java.util.Iterator;
     9import java.util.List;
    810import java.util.Queue;
    911import java.util.Set;
     
    2527import net.i2p.router.RouterContext;
    2628import net.i2p.router.transport.FIFOBandwidthLimiter;
     29import net.i2p.router.util.CoDelPriorityBlockingQueue;
    2730import net.i2p.util.ConcurrentHashSet;
    2831import net.i2p.util.HexDump;
     
    8487     * pending unprepared OutNetMessage instances
    8588     */
    86     private final Queue<OutNetMessage> _outbound;
     89    private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
    8790    /**
    8891     *  current prepared OutNetMessage, or null - synchronize on _outbound to modify
     
    137140    /** 2 bytes for length and 4 for CRC */
    138141    public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
     142
     143    private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
    139144   
    140145    /**
     
    151156        _writeBufs = new ConcurrentLinkedQueue();
    152157        _bwRequests = new ConcurrentHashSet(2);
    153         // TODO possible switch to CLQ but beware non-constant size() - see below
    154         _outbound = new LinkedBlockingQueue();
     158        _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
    155159        _isInbound = true;
    156160        _decryptBlockBuf = new byte[BLOCK_SIZE];
     
    176180        _writeBufs = new ConcurrentLinkedQueue();
    177181        _bwRequests = new ConcurrentHashSet(8);
    178         // TODO possible switch to CLQ but beware non-constant size() - see below
    179         _outbound = new LinkedBlockingQueue();
     182        _outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
    180183        _isInbound = false;
    181184        _decryptBlockBuf = new byte[BLOCK_SIZE];
     
    296299        }
    297300
    298         OutNetMessage msg;
    299         while ((msg = _outbound.poll()) != null) {
     301        List<OutNetMessage> pending = new ArrayList();
     302        _outbound.drainAllTo(pending);
     303        for (OutNetMessage msg : pending) {
    300304            Object buf = msg.releasePreparationBuffer();
    301305            if (buf != null)
     
    304308        }
    305309
    306         msg = _currentOutbound;
     310        OutNetMessage msg = _currentOutbound;
    307311        if (msg != null) {
    308312            Object buf = msg.releasePreparationBuffer();
     
    317321     */
    318322    public void send(OutNetMessage msg) {
     323     /****
     324       always enqueue, let the queue do the dropping
     325
    319326        if (tooBacklogged()) {
    320327            boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
     
    336343        }
    337344        _consecutiveBacklog = 0;
    338         int enqueued = 0;
     345     ****/
    339346        //if (FAST_LARGE)
    340347            bufferedPrepare(msg);
    341         boolean noOutbound = false;
    342348        _outbound.offer(msg);
    343         enqueued = _outbound.size();
     349        //int enqueued = _outbound.size();
    344350        // although stat description says ahead of this one, not including this one...
    345         _context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
    346         noOutbound = (_currentOutbound == null);
    347         if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
     351        //_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
     352        boolean noOutbound = (_currentOutbound == null);
     353        //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
    348354        if (_established && noOutbound)
    349355            _transport.getWriter().wantsWrite(this, "enqueued");
    350356    }
    351357
     358/****
    352359    private long queueTime() {   
    353360        OutNetMessage msg = _currentOutbound;
     
    359366        return msg.getSendTime(); // does not include any of the pre-send(...) preparation
    360367    }
     368****/
    361369
    362370    public boolean tooBacklogged() {
    363         long queueTime = queueTime();
    364         if (queueTime <= 0) return false;
    365         boolean currentOutboundSet = _currentOutbound != null;
     371        //long queueTime = queueTime();
     372        //if (queueTime <= 0) return false;
    366373       
    367374        // perhaps we could take into account the size of the queued messages too, our
     
    370377        if (getUptime() < 10*1000) // allow some slack just after establishment
    371378            return false;
    372         if (queueTime > 5*1000) { // bloody arbitrary.  well, its half the average message lifetime...
     379        //if (queueTime > 5*1000) { // bloody arbitrary.  well, its half the average message lifetime...
     380        if (_outbound.isBacklogged()) { // bloody arbitrary.  well, its half the average message lifetime...
    373381            int size = _outbound.size();
    374382            if (_log.shouldLog(Log.WARN)) {
    375383                int writeBufs = _writeBufs.size();
     384                boolean currentOutboundSet = _currentOutbound != null;
    376385                try {
    377                     _log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
     386                    _log.warn("Too backlogged: size is " + size
    378387                          + ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
    379388                          + ", currentOut set? " + currentOutboundSet
     
    381390                } catch (Exception e) {}  // java.nio.channels.CancelledKeyException
    382391            }
    383             _context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
     392            //_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
    384393            return true;
    385394        //} else if (size > 32) { // another arbitrary limit.
     
    398407        dsm.setEntry(_context.router().getRouterInfo());
    399408        infoMsg.setMessage(dsm);
    400         infoMsg.setPriority(100);
     409        infoMsg.setPriority(PRIORITY);
    401410        RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
    402411        if (target != null) {
     
    650659                return;
    651660            }
     661/****
    652662                //throw new RuntimeException("We should not be preparing a write while we still have one pending");
    653663            if (queueTime() > 3*1000) {  // don't stall low-priority messages
     664****/
    654665                msg = _outbound.poll();
    655666                if (msg == null)
    656667                    return;
     668/****
    657669            } else {
    658670                // FIXME
     
    680692                    _log.warn("Already removed??? " + msg.getMessage().getType());
    681693            }
     694****/
    682695            _currentOutbound = msg;
    683696        }
  • router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java

    r5056706 r6162908  
    299299            return null;
    300300        }
    301         NTCPAddress naddr = new NTCPAddress(addr);
    302         if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) {
     301        byte[] ip = addr.getIP();
     302        if ( (addr.getPort() <= 0) || (ip == null) ) {
    303303            _context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1);
    304304            markUnreachable(peer);
     
    308308            return null;
    309309        }
    310         if (!naddr.isPubliclyRoutable()) {
     310        if (!isPubliclyRoutable(ip)) {
    311311            if (! _context.getProperty("i2np.ntcp.allowLocal", "false").equals("true")) {
    312312                _context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1);
  • router/java/src/net/i2p/router/transport/udp/InboundMessageState.java

    r5056706 r6162908  
    44import net.i2p.data.Hash;
    55import net.i2p.router.RouterContext;
     6import net.i2p.router.util.CDQEntry;
    67import net.i2p.util.ByteCache;
    78import net.i2p.util.Log;
     
    1314 * InboundMessageFragments to avoid use-after-release, etc.
    1415 */
    15 class InboundMessageState {
     16class InboundMessageState implements CDQEntry {
    1617    private final RouterContext _context;
    1718    private final Log _log;
     
    3031    private int _lastFragment;
    3132    private final long _receiveBegin;
     33    private long _enqueueTime;
    3234    private int _completeSize;
    3335    private boolean _released;
     
    139141    }
    140142
     143    /**
     144     *  For CDQ
     145     *  @since 0.9.3
     146     */
     147    public void setEnqueueTime(long now) {
     148        _enqueueTime = now;
     149    }
     150
     151    /**
     152     *  For CDQ
     153     *  @since 0.9.3
     154     */
     155    public long getEnqueueTime() {
     156        return _enqueueTime;
     157    }
     158
     159    /**
     160     *  For CDQ
     161     *  @since 0.9.3
     162     */
     163    public void drop() {
     164        releaseResources();
     165    }
     166
    141167    public Hash getFrom() { return _from; }
    142168
  • router/java/src/net/i2p/router/transport/udp/MessageReceiver.java

    r5056706 r6162908  
    22
    33import java.util.concurrent.BlockingQueue;
    4 import java.util.concurrent.LinkedBlockingQueue;
    54
    65import net.i2p.data.Base64;
     
    1110import net.i2p.data.i2np.I2NPMessageImpl;
    1211import net.i2p.router.RouterContext;
     12import net.i2p.router.util.CoDelBlockingQueue;
    1313//import net.i2p.util.ByteCache;
    1414import net.i2p.util.HexDump;
     
    5656            qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
    5757        }
    58         _completeMessages = new LinkedBlockingQueue(qsize);
     58        _completeMessages = new CoDelBlockingQueue(ctx, "UDP-MessageReceiver", qsize);
    5959
    6060        // the runners run forever, no need to have a cache
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java

    r5056706 r6162908  
    166166                return;
    167167            }
    168             int active = peer.add(state);
     168            peer.add(state);
    169169            add(peer);
    170             _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     170            //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    171171        } else {
    172172            if (_log.shouldLog(Log.WARN))
     
    183183        if (peer == null)
    184184            throw new RuntimeException("wtf, null peer for " + state);
    185         int active = peer.add(state);
     185        peer.add(state);
    186186        add(peer);
    187         _context.statManager().addRateData("udp.outboundActiveCount", active, 0);
     187        //_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
    188188    }
    189189
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    r5056706 r6162908  
    88import net.i2p.data.i2np.I2NPMessage;
    99import net.i2p.router.OutNetMessage;
     10import net.i2p.router.util.CDPQEntry;
    1011import net.i2p.util.ByteCache;
    1112import net.i2p.util.Log;
     
    1516 *
    1617 */
    17 class OutboundMessageState {
     18class OutboundMessageState implements CDPQEntry {
    1819    private final I2PAppContext _context;
    1920    private final Log _log;
     
    3738    private boolean _released;
    3839    private Exception _releasedBy;
     40    // we can't use the ones in _message since it is null for injections
     41    private long _enqueueTime;
     42    private long _seqNum;
    3943   
    4044    public static final int MAX_MSG_SIZE = 32 * 1024;
     
    105109    /**
    106110     *  Called from OutboundMessageFragments
     111     *  @param m null if msg is "injected"
    107112     *  @return success
    108113     */
     
    129134            //_expiration = msg.getExpiration();
    130135
    131             if (_log.shouldLog(Log.DEBUG))
    132                 _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
     136            //if (_log.shouldLog(Log.DEBUG))
     137            //    _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
    133138            return true;
    134139        } catch (IllegalStateException ise) {
     
    369374    }
    370375   
     376    /**
     377     *  For CDQ
     378     *  @since 0.9.3
     379     */
     380    public void setEnqueueTime(long now) {
     381        _enqueueTime = now;
     382    }
     383
     384    /**
     385     *  For CDQ
     386     *  @since 0.9.3
     387     */
     388    public long getEnqueueTime() {
     389        return _enqueueTime;
     390    }
     391
     392    /**
     393     *  For CDQ
     394     *  @since 0.9.3
     395     */
     396    public void drop() {
     397        _peer.getTransport().failed(this, false);
     398        releaseResources();
     399    }
     400
     401    /**
     402     *  For CDPQ
     403     *  @since 0.9.3
     404     */
     405    public void setSeqNum(long num) {
     406        _seqNum = num;
     407    }
     408
     409    /**
     410     *  For CDPQ
     411     *  @since 0.9.3
     412     */
     413    public long getSeqNum() {
     414        return _seqNum;
     415    }
     416
     417    /**
     418     *  For CDPQ
     419     *  @return OutNetMessage priority or 1000 for injected
     420     *  @since 0.9.3
     421     */
     422    public int getPriority() {
     423        return _message != null ? _message.getPriority() : 1000;
     424    }
     425
    371426    @Override
    372427    public String toString() {
  • router/java/src/net/i2p/router/transport/udp/PacketBuilder.java

    r5056706 r6162908  
    1717import net.i2p.data.SessionKey;
    1818import net.i2p.data.Signature;
    19 import net.i2p.util.ByteCache;
    2019import net.i2p.util.Addresses;
    2120import net.i2p.util.Log;
     21import net.i2p.util.SimpleByteCache;
    2222
    2323/**
     
    103103    private final UDPTransport _transport;
    104104   
    105     private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
    106     private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
    107 
    108105    /**
    109106     *  For debugging and stats only - does not go out on the wire.
     
    608605        // ok, now the full data is in there, but we also need to encrypt
    609606        // the signature, which means we need the IV
    610         ByteArray iv = _ivCache.acquire();
    611         _context.random().nextBytes(iv.getData());
     607        byte[] iv = SimpleByteCache.acquire(UDPPacket.IV_SIZE);
     608        _context.random().nextBytes(iv);
    612609       
    613610        int encrWrite = Signature.SIGNATURE_BYTES + 8;
    614611        int sigBegin = off - encrWrite;
    615         _context.aes().encrypt(data, sigBegin, data, sigBegin, state.getCipherKey(), iv.getData(), encrWrite);
     612        _context.aes().encrypt(data, sigBegin, data, sigBegin, state.getCipherKey(), iv, encrWrite);
    616613       
    617614        // pad up so we're on the encryption boundary
     
    621618        authenticate(packet, ourIntroKey, ourIntroKey, iv);
    622619        setTo(packet, to, state.getSentPort());
    623         _ivCache.release(iv);
     620        SimpleByteCache.release(iv);
    624621        packet.setMessageType(TYPE_CREAT);
    625622        return packet;
     
    12911288     */
    12921289    private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey) {
    1293         ByteArray iv = _ivCache.acquire();
    1294         _context.random().nextBytes(iv.getData());
     1290        byte[] iv = SimpleByteCache.acquire(UDPPacket.IV_SIZE);
     1291        _context.random().nextBytes(iv);
    12951292        authenticate(packet, cipherKey, macKey, iv);
    1296         _ivCache.release(iv);
     1293        SimpleByteCache.release(iv);
    12971294    }
    12981295   
     
    13091306     * @param iv IV to deliver
    13101307     */
    1311     private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) {
     1308    private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, byte[] iv) {
    13121309        long before = System.currentTimeMillis();
    13131310        int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE;
    13141311        int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset();
    13151312        byte data[] = packet.getPacket().getData();
    1316         _context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv.getData(), encryptSize);
     1313        _context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv, encryptSize);
    13171314       
    13181315        // ok, now we need to prepare things for the MAC, which requires reordering
     
    13201317        System.arraycopy(data, encryptOffset, data, off, encryptSize);
    13211318        off += encryptSize;
    1322         System.arraycopy(iv.getData(), 0, data, off, UDPPacket.IV_SIZE);
     1319        System.arraycopy(iv, 0, data, off, UDPPacket.IV_SIZE);
    13231320        off += UDPPacket.IV_SIZE;
    13241321        DataHelper.toLong(data, off, 2, encryptSize ^ PROTOCOL_VERSION);
     
    13271324        int hmacLen = encryptSize + UDPPacket.IV_SIZE + 2;
    13281325        //Hash hmac = _context.hmac().calculate(macKey, data, hmacOff, hmacLen);
    1329         ByteArray ba = _hmacCache.acquire();
    1330         _context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba.getData(), 0);
     1326        byte[] ba = SimpleByteCache.acquire(Hash.HASH_LENGTH);
     1327        _context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba, 0);
    13311328       
    13321329        if (_log.shouldLog(Log.DEBUG))
    13331330            _log.debug("Authenticating " + packet.getPacket().getLength() +
    1334                        "\nIV: " + Base64.encode(iv.getData()) +
    1335                        "\nraw mac: " + Base64.encode(ba.getData()) +
     1331                       "\nIV: " + Base64.encode(iv) +
     1332                       "\nraw mac: " + Base64.encode(ba) +
    13361333                       "\nMAC key: " + macKey);
    13371334        // ok, now lets put it back where it belongs...
    13381335        System.arraycopy(data, hmacOff, data, encryptOffset, encryptSize);
    13391336        //System.arraycopy(hmac.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
    1340         System.arraycopy(ba.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
    1341         System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
    1342         _hmacCache.release(ba);
     1337        System.arraycopy(ba, 0, data, hmacOff, UDPPacket.MAC_SIZE);
     1338        System.arraycopy(iv, 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
     1339        SimpleByteCache.release(ba);
    13431340        long timeToAuth = System.currentTimeMillis() - before;
    13441341        _context.statManager().addRateData("udp.packetAuthTime", timeToAuth, timeToAuth);
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r5056706 r6162908  
    1717import net.i2p.router.OutNetMessage;
    1818import net.i2p.router.RouterContext;
     19import net.i2p.router.util.CoDelPriorityBlockingQueue;
    1920import net.i2p.util.Log;
    2021import net.i2p.util.ConcurrentHashSet;
     
    189190    /** list of InboundMessageState for active message */
    190191    private final Map<Long, InboundMessageState> _inboundMessages;
    191     /** list of OutboundMessageState */
     192
     193    /**
     194     *  Mostly messages that have been transmitted and are awaiting acknowledgement,
     195     *  although there could be some that have not been sent yet.
     196     */
    192197    private final List<OutboundMessageState> _outboundMessages;
     198
     199    /**
     200     *  Priority queue of messages that have not yet been sent.
     201     *  They are taken from here and put in _outboundMessages.
     202     */
     203    private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
     204
    193205    /** which outbound message is currently being retransmitted */
    194206    private OutboundMessageState _retransmitter;
     
    299311        _inboundMessages = new HashMap(8);
    300312        _outboundMessages = new ArrayList(32);
     313        _outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
    301314        // all createRateStat() moved to EstablishmentManager
    302315        _remoteIP = remoteIP;
     
    727740            // no such element exception seen here
    728741            List<Long> rv = new ArrayList(_currentACKs);
    729             if (_log.shouldLog(Log.DEBUG))
    730                 _log.debug("Returning " + _currentACKs.size() + " current acks");
     742            //if (_log.shouldLog(Log.DEBUG))
     743            //    _log.debug("Returning " + _currentACKs.size() + " current acks");
    731744            return rv;
    732745    }
     
    749762            List<Long> randomResends = new ArrayList(_currentACKsResend);
    750763            Collections.shuffle(randomResends, _context.random());
    751             if (_log.shouldLog(Log.DEBUG))
    752                 _log.debug("Returning " + randomResends.size() + " resend acks");
     764            //if (_log.shouldLog(Log.DEBUG))
     765            //    _log.debug("Returning " + randomResends.size() + " resend acks");
    753766            return randomResends;
    754767    }
     
    11951208     *  TODO backlog / pushback / block instead of dropping? Can't really block here.
    11961209     *  TODO SSU does not support isBacklogged() now
    1197      *  @return total pending messages
    1198      */
    1199     public int add(OutboundMessageState state) {
     1210     */
     1211    public void add(OutboundMessageState state) {
    12001212        if (_dead) {
    12011213            _transport.failed(state, false);
    1202             return 0;
     1214            return;
    12031215        }
    12041216        state.setPeer(this);
     
    12061218            _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
    12071219        int rv = 0;
    1208         boolean fail = false;
     1220        // will never fail for CDPQ
     1221        boolean fail = !_outboundQueue.offer(state);
     1222/****
    12091223        synchronized (_outboundMessages) {
    12101224            rv = _outboundMessages.size() + 1;
     
    12131227                fail = true;
    12141228                rv--;
     1229****/
    12151230
    12161231         /******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
     
    12511266
    12521267             *******/
    1253 
     1268/****
    12541269            } else {
    12551270                _outboundMessages.add(state);
    12561271            }
    12571272        }
     1273****/
    12581274        if (fail) {
    12591275            if (_log.shouldLog(Log.WARN))
     
    12611277            _transport.failed(state, false);
    12621278        }
    1263         return rv;
    12641279    }
    12651280
     
    12691284        _dead = true;
    12701285        //_outboundMessages = null;
    1271         _retransmitter = null;
    1272 
    1273             int sz = 0;
    1274             List<OutboundMessageState> tempList = null;
     1286
     1287            List<OutboundMessageState> tempList;
    12751288            synchronized (_outboundMessages) {
    1276                 sz = _outboundMessages.size();
    1277                 if (sz > 0) {
     1289                    _retransmitter = null;
    12781290                    tempList = new ArrayList(_outboundMessages);
    12791291                    _outboundMessages.clear();
    1280                 }
    1281             }
    1282             for (int i = 0; i < sz; i++)
    1283                 _transport.failed(tempList.get(i), false);
     1292            }
     1293            _outboundQueue.drainAllTo(tempList);
     1294            for (OutboundMessageState oms : tempList) {
     1295                _transport.failed(oms, false);
     1296            }
    12841297
    12851298        // so the ACKSender will drop this peer from its queue
     
    12921305    public int getOutboundMessageCount() {
    12931306        if (_dead) return 0;
    1294         return _outboundMessages.size();
     1307        return _outboundMessages.size() + _outboundQueue.size();
    12951308    }
    12961309   
     
    13061319        // short circuit, unsynchronized
    13071320        if (_outboundMessages.isEmpty())
    1308             return 0;
     1321            return _outboundQueue.size();
    13091322
    13101323        if (_dead) {
     
    13681381        }
    13691382       
    1370         return rv;
     1383        return rv + _outboundQueue.size();
    13711384    }
    13721385   
     
    13881401                if (should == ShouldSend.YES) {
    13891402                    if (_log.shouldLog(Log.DEBUG))
    1390                         _log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
     1403                        _log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
    13911404                    /*
    13921405                    while (iter.hasNext()) {
     
    14031416                    // By not looking further, we keep strict sending order, and that allows
    14041417                    // some efficiency in acked() below.
    1405                     break;
     1418                    if (_log.shouldLog(Log.DEBUG))
     1419                        _log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
     1420                                   " / " + _outboundQueue.size() + " remaining");
     1421                    return null;
    14061422                } /* else {
    14071423                    OutNetMessage msg = state.getMessage();
     
    14101426                } */
    14111427            }
     1428            // Peek at head of _outboundQueue and see if we can send it.
     1429            // If so, pull it off, put it in _outbundMessages, test
     1430            // again for bandwidth if necessary, and return it.
     1431            OutboundMessageState state = _outboundQueue.peek();
     1432            if (state != null && ShouldSend.YES == locked_shouldSend(state)) {
     1433                // we could get a different state, or null, when we poll,
     1434                // due to AQM drops, so we test again if necessary
     1435                OutboundMessageState dequeuedState = _outboundQueue.poll();
     1436                if (dequeuedState != null) {
     1437                    _outboundMessages.add(dequeuedState);
     1438                    if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(dequeuedState)) {
     1439                        if (_log.shouldLog(Log.DEBUG))
     1440                            _log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
     1441                        return dequeuedState;
     1442                    }
     1443                }
     1444            }
    14121445        }
    14131446        if (_log.shouldLog(Log.DEBUG))
    1414             _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + " remaining");
     1447            _log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
     1448                       " / " + _outboundQueue.size() + " remaining");
    14151449        return null;
    14161450    }
     
    14421476            }
    14431477        }
     1478        // failsafe... is this OK?
     1479        if (rv > 100 && !_outboundQueue.isEmpty())
     1480            rv = 100;
    14441481        return rv;
     1482    }
     1483
     1484    /**
     1485     *  @since 0.9.3
     1486     */
     1487    public boolean isBacklogged() {
     1488        return _dead || _outboundQueue.isBacklogged();
    14451489    }
    14461490
     
    15221566            int size = state.getUnackedSize();
    15231567            if (allocateSendingBytes(size, state.getPushCount())) {
    1524                 if (_log.shouldLog(Log.INFO))
    1525                     _log.info("Allocation of " + size + " allowed with "
     1568                if (_log.shouldLog(Log.DEBUG))
     1569                    _log.debug("Allocation of " + size + " allowed with "
    15261570                              + getSendWindowBytesRemaining()
    15271571                              + "/" + getSendWindowBytes()
     
    15671611    /**
    15681612     *  A full ACK was received.
    1569      *  TODO if messages awaiting ack were a HashSet this would be faster.
     1613     *  TODO if messages awaiting ack were a HashMap<Long, OutboundMessageState> this would be faster.
    15701614     *
    15711615     *  @return true if the message was acked for the first time
     
    16211665        } else {
    16221666            // dupack, likely
    1623             if (_log.shouldLog(Log.DEBUG))
    1624                 _log.debug("Received an ACK for a message not pending: " + messageId);
     1667            //if (_log.shouldLog(Log.DEBUG))
     1668            //    _log.debug("Received an ACK for a message not pending: " + messageId);
    16251669        }
    16261670        return state != null;
     
    17681812    }
    17691813
     1814    /**
     1815     *  Convenience for OutboundMessageState so it can fail itself
     1816     *  @since 0.9.3
     1817     */
     1818    public UDPTransport getTransport() {
     1819        return _transport;
     1820    }
     1821
    17701822    // why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
    17711823
  • router/java/src/net/i2p/router/transport/udp/PeerTestManager.java

    r5056706 r6162908  
    1010import net.i2p.data.Base64;
    1111import net.i2p.data.DataHelper;
     12import net.i2p.data.RouterAddress;
    1213import net.i2p.data.RouterInfo;
    1314import net.i2p.data.SessionKey;
     
    585586            testInfo.readIntroKey(aliceIntroKey.getData(), 0);
    586587
    587             UDPAddress addr = new UDPAddress(charlieInfo.getTargetAddress(UDPTransport.STYLE));
     588            RouterAddress raddr = charlieInfo.getTargetAddress(UDPTransport.STYLE);
     589            if (raddr == null) {
     590                if (_log.shouldLog(Log.WARN))
     591                    _log.warn("Unable to pick a charlie");
     592                return;
     593            }
     594            UDPAddress addr = new UDPAddress(raddr);
    588595            SessionKey charlieIntroKey = new SessionKey(addr.getIntroKey());
    589596           
  • router/java/src/net/i2p/router/transport/udp/UDPAddress.java

    r5056706 r6162908  
    1313 */
    1414public class UDPAddress {
    15     private String _host;
     15    private final String _host;
    1616    private InetAddress _hostAddress;
    17     private int _port;
     17    private final int _port;
    1818    private byte[] _introKey;
    1919    private String _introHosts[];
     
    2424    private int _mtu;
    2525   
    26     public static final String PROP_PORT = "port";
    27     public static final String PROP_HOST = "host";
     26    public static final String PROP_PORT = RouterAddress.PROP_PORT;
     27    public static final String PROP_HOST = RouterAddress.PROP_HOST;
    2828    public static final String PROP_INTRO_KEY = "key";
    2929    public static final String PROP_MTU = "mtu";
     
    4141    public UDPAddress(RouterAddress addr) {
    4242        // TODO make everything final
    43         if (addr == null) return;
     43        if (addr == null) {
     44            _host = null;
     45            _port = 0;
     46            return;
     47        }
    4448        _host = addr.getOption(PROP_HOST);
    45         if (_host != null) _host = _host.trim();
    46         try {
    47             String port = addr.getOption(PROP_PORT);
    48             if (port != null)
    49                 _port = Integer.parseInt(port);
    50         } catch (NumberFormatException nfe) {
    51             _port = -1;
    52         }
     49        _port = addr.getPort();
    5350        try {
    5451            String mtu = addr.getOption(PROP_MTU);
     
    147144
    148145    /**
    149      *  @return 0 if unset; -1 if invalid
     146     *  @return 0 if unset or invalid
    150147     */
    151148    public int getPort() { return _port; }
  • router/java/src/net/i2p/router/transport/udp/UDPPacket.java

    r5056706 r6162908  
    1010import net.i2p.data.DataHelper;
    1111import net.i2p.data.SessionKey;
     12import net.i2p.router.util.CDQEntry;
    1213import net.i2p.util.Log;
    1314
     
    1718 *
    1819 */
    19 class UDPPacket {
     20class UDPPacket implements CDQEntry {
    2021    private I2PAppContext _context;
    2122    private final DatagramPacket _packet;
     
    247248    }
    248249
    249     /** the UDPReceiver has tossed it onto the inbound queue */
    250     void enqueue() { _enqueueTime = _context.clock().now(); }
     250    /**
     251     *  For CDQ
     252     *  @since 0.9.3
     253     */
     254    public void setEnqueueTime(long now) { _enqueueTime = now; }
     255
    251256    /** a packet handler has pulled it off the inbound queue */
    252257    void received() { _receivedTime = _context.clock().now(); }
     
    257262    //void afterHandling() { _afterHandlingTime = _context.clock().now(); }
    258263     
    259     /** the UDPReceiver has tossed it onto the inbound queue */
    260     //long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); }
     264    /**
     265     *  For CDQ
     266     *  @since 0.9.3
     267     */
     268    public long getEnqueueTime() { return _enqueueTime; }
    261269
    262270    /** a packet handler has pulled it off the inbound queue */
     
    270278    // Following 5: All used only for stats in PacketHandler, commented out
    271279
    272     /** when it was added to the endpoint's receive queue */
    273     //long getEnqueueTime() { return _enqueueTime; }
    274280    /** when it was pulled off the endpoint receive queue */
    275281    //long getReceivedTime() { return _receivedTime; }
     
    327333    }
    328334
     335    /**
     336     *  For CDQ
     337     *  @since 0.9.3
     338     */
     339    public void drop() {
     340        release();
     341    }
     342
    329343    public void release() {
    330344        verifyNotReleased();
  • router/java/src/net/i2p/router/transport/udp/UDPReceiver.java

    r5056706 r6162908  
    55import java.util.Arrays;
    66import java.util.concurrent.BlockingQueue;
    7 import java.util.concurrent.LinkedBlockingQueue;
    87
    98import net.i2p.router.RouterContext;
    109import net.i2p.router.transport.FIFOBandwidthLimiter;
     10import net.i2p.router.util.CoDelBlockingQueue;
    1111import net.i2p.util.I2PThread;
    1212import net.i2p.util.Log;
     
    4848            maxMemory = 96*1024*1024l;
    4949        int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
    50         _inboundQueue = new LinkedBlockingQueue(qsize);
     50        _inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize);
    5151        _socket = socket;
    5252        _transport = transport;
     
    178178        }
    179179
     180/****
    180181        packet.enqueue();
    181182        boolean rejected = false;
     
    191192            }
    192193            if (!rejected) {
     194****/
    193195                try {
    194196                    _inboundQueue.put(packet);
     
    199201                //return queueSize + 1;
    200202                return 0;
     203/****
    201204            }
    202205       
     
    215218        }
    216219        return queueSize;
     220****/
    217221    }
    218222   
  • router/java/src/net/i2p/router/transport/udp/UDPSender.java

    r5056706 r6162908  
    55import java.net.DatagramSocket;
    66import java.util.concurrent.BlockingQueue;
    7 import java.util.concurrent.LinkedBlockingQueue;
    87
    98import net.i2p.router.RouterContext;
    109import net.i2p.router.transport.FIFOBandwidthLimiter;
     10import net.i2p.router.util.CoDelBlockingQueue;
    1111import net.i2p.util.I2PThread;
    1212import net.i2p.util.Log;
     
    3636            maxMemory = 96*1024*1024l;
    3737        int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
    38         _outboundQueue = new LinkedBlockingQueue(qsize);
     38        _outboundQueue = new CoDelBlockingQueue(ctx, "UDP-Sender", qsize);
    3939        _socket = socket;
    4040        _runner = new Runner();
  • router/java/src/net/i2p/router/transport/udp/UDPTransport.java

    r5056706 r6162908  
    4343import net.i2p.util.SimpleScheduler;
    4444import net.i2p.util.SimpleTimer;
     45import net.i2p.util.SimpleTimer2;
    4546import net.i2p.util.Translate;
    4647
     
    370371        _expireEvent.setIsAlive(true);
    371372        _testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future...
    372         SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon
     373        _testEvent.reschedule(10*1000); // lets requeue it for Real Soon
    373374    }
    374375   
     
    682683        }
    683684        _testEvent.forceRun();
    684         SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
     685        _testEvent.reschedule(5*1000);
    685686        return updated;
    686687    }
     
    860861        if (getReachabilityStatus() != CommSystemFacade.STATUS_OK) {
    861862            _testEvent.forceRun();
    862             SimpleTimer.getInstance().addEvent(_testEvent, 0);
     863            _testEvent.reschedule(0);
    863864        }
    864865        return true;
     
    934935
    935936    private class RemoveDropList implements SimpleTimer.TimedEvent {
    936         private RemoteHostId _peer;
     937        private final RemoteHostId _peer;
    937938        public RemoveDropList(RemoteHostId peer) { _peer = peer; }
    938939        public void timeReached() {
     
    12031204            //if (ua.getIntroducerCount() <= 0) {
    12041205            if (addr.getOption("ihost0") == null) {
    1205                 String host = addr.getOption(UDPAddress.PROP_HOST);
    1206                 String port = addr.getOption(UDPAddress.PROP_PORT);
    1207                 if (host == null || port == null) {
     1206                byte[] ip = addr.getIP();
     1207                int port = addr.getPort();
     1208                if (ip == null || port <= 0 ||
     1209                    (!isValid(ip)) ||
     1210                    Arrays.equals(ip, getExternalIP())) {
    12081211                    markUnreachable(to);
    12091212                    return null;
    1210                 }
    1211                 try {
    1212                     InetAddress ia = InetAddress.getByName(host);
    1213                     int iport = Integer.parseInt(port);
    1214                     if (iport <= 0 || iport > 65535 || (!isValid(ia.getAddress())) ||
    1215                         Arrays.equals(ia.getAddress(), getExternalIP())) {
    1216                         markUnreachable(to);
    1217                         return null;
    1218                     }
    1219                 } catch (UnknownHostException uhe) {
    1220                         markUnreachable(to);
    1221                         return null;
    1222                 } catch (NumberFormatException nfe) {
    1223                         markUnreachable(to);
    1224                         return null;
    12251213                }
    12261214            }
     
    13381326    }
    13391327
     1328    /**
     1329     *  "injected" message from the EstablishmentManager
     1330     */
    13401331    void send(I2NPMessage msg, PeerState peer) {
    13411332        if (_log.shouldLog(Log.DEBUG))
     
    14471438            else
    14481439                addr.setCost(DEFAULT_COST);
    1449             addr.setExpiration(null);
     1440            //addr.setExpiration(null);
    14501441            addr.setTransportStyle(STYLE);
    14511442            addr.setOptions(options);
     
    16881679    }
    16891680
     1681    /**
     1682     *  @since 0.9.3
     1683     */
     1684    @Override
     1685    public boolean isBacklogged(Hash dest) {
     1686        PeerState peer =  _peersByIdent.get(dest);
     1687        return peer != null && peer.isBacklogged();
     1688    }
     1689
    16901690    public boolean allowConnection() {
     1691
    16911692            return _peersByIdent.size() < getMaxConnections();
    16921693    }
     
    21972198            buf.append(THINSP).append(peer.getConcurrentSendWindow());
    21982199            buf.append(THINSP).append(peer.getConsecutiveSendRejections());
     2200            if (peer.isBacklogged())
     2201                buf.append(' ').append(_("backlogged"));
    21992202            buf.append("</td>");
    22002203
     
    23622365    }
    23632366   
    2364     private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
     2367    private class ExpirePeerEvent extends SimpleTimer2.TimedEvent {
    23652368        private final Set<PeerState> _expirePeers;
    23662369        private final List<PeerState> _expireBuffer;
     
    23682371
    23692372        public ExpirePeerEvent() {
     2373            super(_context.simpleTimer2());
    23702374            _expirePeers = new ConcurrentHashSet(128);
    23712375            _expireBuffer = new ArrayList();
     
    24042408
    24052409            if (_alive)
    2406                 SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
     2410                schedule(30*1000);
    24072411        }
    24082412        public void add(PeerState peer) {
     
    24152419            _alive = isAlive;
    24162420            if (isAlive) {
    2417                 SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
     2421                reschedule(30*1000);
    24182422            } else {
    2419                 SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this);
     2423                cancel();
    24202424                _expirePeers.clear();
    24212425            }
     
    25162520    }
    25172521   
    2518     private class PeerTestEvent implements SimpleTimer.TimedEvent {
     2522    private class PeerTestEvent extends SimpleTimer2.TimedEvent {
    25192523        private volatile boolean _alive;
    25202524        /** when did we last test our reachability */
     
    25222526        private boolean _forceRun;
    25232527
     2528        PeerTestEvent() {
     2529            super(_context.simpleTimer2());
     2530        }
     2531       
    25242532        public void timeReached() {
    25252533            if (shouldTest()) {
     
    25332541                if (delay <= 0)
    25342542                    throw new RuntimeException("wtf, delay is " + delay);
    2535                 SimpleTimer.getInstance().addEvent(PeerTestEvent.this, delay);
     2543                schedule(delay);
    25362544            }
    25372545        }
     
    25592567            if (isAlive) {
    25602568                long delay = _context.random().nextInt(2*TEST_FREQUENCY);
    2561                 SimpleTimer.getInstance().addEvent(PeerTestEvent.this, delay);
     2569                reschedule(delay);
    25622570            } else {
    2563                 SimpleTimer.getInstance().removeEvent(PeerTestEvent.this);
     2571                cancel();
    25642572            }
    25652573        }
  • router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java

    r5056706 r6162908  
    105105    /* See TunnelGateway.QueuePreprocessor for Javadoc */
    106106    @Override
    107     public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
     107    public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    108108        if (_log.shouldLog(Log.INFO))
    109109            display(0, pending, "Starting");
     
    132132            for (int i = 0; i < pending.size(); i++) {
    133133                long pendingStart = System.currentTimeMillis();
    134                 TunnelGateway.Pending msg = pending.get(i);
     134                PendingGatewayMessage msg = pending.get(i);
    135135                int instructionsSize = getInstructionsSize(msg);
    136136                instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
     
    170170                    // Remove what we sent from the pending queue
    171171                    for (int j = 0; j < i; j++) {
    172                         TunnelGateway.Pending cur = pending.remove(0);
     172                        PendingGatewayMessage cur = pending.remove(0);
    173173                        if (cur.getOffset() < cur.getData().length)
    174174                            throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
     
    182182                    if (msg.getOffset() >= msg.getData().length) {
    183183                        // ok, this last message fit perfectly, remove it too
    184                         TunnelGateway.Pending cur = pending.remove(0);
     184                        PendingGatewayMessage cur = pending.remove(0);
    185185                        if (timingBuf != null)
    186186                            timingBuf.append(" sent perfect fit " + cur).append(".");
     
    231231                    int beforeSize = pending.size();
    232232                    for (int i = 0; i < beforeSize; i++) {
    233                         TunnelGateway.Pending cur = pending.get(0);
     233                        PendingGatewayMessage cur = pending.get(0);
    234234                        if (cur.getOffset() < cur.getData().length)
    235235                            break;
     
    317317     * title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
    318318     */
    319     private void display(long allocated, List<TunnelGateway.Pending> pending, String title) {
     319    private void display(long allocated, List<PendingGatewayMessage> pending, String title) {
    320320        if (_log.shouldLog(Log.INFO)) {
    321321            long highestDelay = 0;
     
    328328                buf.append(" delay: ").append(getDelayAmount(false));
    329329            for (int i = 0; i < pending.size(); i++) {
    330                 TunnelGateway.Pending curPending = pending.get(i);
     330                PendingGatewayMessage curPending = pending.get(i);
    331331                buf.append(" [").append(i).append("]:");
    332332                buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
     
    348348     * @param sendThrough last index in pending to send (inclusive)
    349349     */
    350     protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
     350    protected void send(List<PendingGatewayMessage> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    351351        if (_log.shouldLog(Log.DEBUG))
    352352            _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending);
     
    385385        long msgId = sender.sendPreprocessed(preprocessed, rec);
    386386        for (int i = 0; i < pending.size(); i++) {
    387             TunnelGateway.Pending cur = pending.get(i);
     387            PendingGatewayMessage cur = pending.get(i);
    388388            cur.addMessageId(msgId);
    389389        }
     
    398398     * @return new offset into the target for further bytes to be written
    399399     */
    400     private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) {
     400    private int writeFragments(List<PendingGatewayMessage> pending, int startAt, int sendThrough, byte target[], int offset) {
    401401        for (int i = startAt; i <= sendThrough; i++) {
    402             TunnelGateway.Pending msg = pending.get(i);
     402            PendingGatewayMessage msg = pending.get(i);
    403403            int prevOffset = offset;
    404404            if (msg.getOffset() == 0) {
  • router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java

    r5056706 r6162908  
    66import net.i2p.router.util.DecayingBloomFilter;
    77import net.i2p.router.util.DecayingHashSet;
    8 import net.i2p.util.ByteCache;
     8import net.i2p.util.SimpleByteCache;
    99
    1010/**
     
    1616    private final RouterContext _context;
    1717    private final DecayingBloomFilter _filter;
    18     private final ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH);
    1918   
    2019    /**
     
    5857   
    5958    public boolean receiveIV(byte ivData[], int ivOffset, byte payload[], int payloadOffset) {
    60         ByteArray buf = _ivXorCache.acquire();
    61         DataHelper.xor(ivData, ivOffset, payload, payloadOffset, buf.getData(), 0, HopProcessor.IV_LENGTH);
    62         boolean dup = _filter.add(buf.getData());
    63         _ivXorCache.release(buf);
     59        byte[] buf = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
     60        DataHelper.xor(ivData, ivOffset, payload, payloadOffset, buf, 0, HopProcessor.IV_LENGTH);
     61        boolean dup = _filter.add(buf);
     62        SimpleByteCache.release(buf);
    6463        if (dup) _context.statManager().addRateData("tunnel.duplicateIV", 1);
    6564        return !dup; // return true if it is OK, false if it isn't
  • router/java/src/net/i2p/router/tunnel/FragmentHandler.java

    r5056706 r6162908  
    1717import net.i2p.util.Log;
    1818import net.i2p.util.SimpleByteCache;
    19 import net.i2p.util.SimpleTimer;
     19import net.i2p.util.SimpleTimer2;
    2020
    2121/**
     
    370370                    }
    371371                    if (msg.getExpireEvent() != null)
    372                         SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
     372                        msg.getExpireEvent().cancel();
    373373                    receiveComplete(msg);
    374374                } else {
     
    379379                        if (_log.shouldLog(Log.DEBUG))
    380380                            _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
    381                         SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
     381                        evt.schedule(MAX_DEFRAGMENT_TIME);
    382382                    }
    383383                }
     
    438438                }
    439439                if (msg.getExpireEvent() != null)
    440                     SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
     440                    msg.getExpireEvent().cancel();
    441441                _context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
    442442                receiveComplete(msg);
     
    448448                    if (_log.shouldLog(Log.DEBUG))
    449449                        _log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
    450                     SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
     450                    evt.schedule(MAX_DEFRAGMENT_TIME);
    451451                }
    452452            }
     
    549549    }
    550550   
    551     private class RemoveFailed implements SimpleTimer.TimedEvent {
     551    private class RemoveFailed extends SimpleTimer2.TimedEvent {
    552552        private final FragmentedMessage _msg;
    553553
    554554        public RemoveFailed(FragmentedMessage msg) {
     555            super(_context.simpleTimer2());
    555556            _msg = msg;
    556557        }
  • router/java/src/net/i2p/router/tunnel/FragmentedMessage.java

    r5056706 r6162908  
    88import net.i2p.util.ByteCache;
    99import net.i2p.util.Log;
    10 import net.i2p.util.SimpleTimer;
     10import net.i2p.util.SimpleTimer2;
    1111
    1212/**
     
    2929    private boolean _completed;
    3030    private long _releasedAfter;
    31     private SimpleTimer.TimedEvent _expireEvent;
     31    private SimpleTimer2.TimedEvent _expireEvent;
    3232   
    3333    private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
     
    161161        return found;
    162162    }
     163
    163164    /** used in the fragment handler so we can cancel the expire event on success */
    164     SimpleTimer.TimedEvent getExpireEvent() { return _expireEvent; }
    165     void setExpireEvent(SimpleTimer.TimedEvent evt) { _expireEvent = evt; }
     165    public SimpleTimer2.TimedEvent getExpireEvent() { return _expireEvent; }
     166
     167    public void setExpireEvent(SimpleTimer2.TimedEvent evt) { _expireEvent = evt; }
    166168   
    167169    /** have we received all of the fragments? */
  • router/java/src/net/i2p/router/tunnel/HopProcessor.java

    r5056706 r6162908  
    33import net.i2p.I2PAppContext;
    44import net.i2p.data.Hash;
    5 import net.i2p.util.ByteCache;
    65import net.i2p.util.Log;
    76
     
    3029    static final boolean USE_DOUBLE_IV_ENCRYPTION = true;
    3130    static final int IV_LENGTH = 16;
    32     private static final ByteCache _cache = ByteCache.getInstance(128, IV_LENGTH);
    3331   
    3432    /** @deprecated unused */
  • router/java/src/net/i2p/router/tunnel/InboundEndpointProcessor.java

    r5056706 r6162908  
    44import net.i2p.data.Hash;
    55import net.i2p.router.RouterContext;
    6 import net.i2p.util.ByteCache;
    76import net.i2p.util.Log;
     7import net.i2p.util.SimpleByteCache;
    88
    99/**
     
    2222   
    2323    static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
    24     private static final ByteCache _cache = ByteCache.getInstance(128, HopProcessor.IV_LENGTH);
    2524   
    2625    /** @deprecated unused */
     
    5554        }
    5655       
    57         ByteArray ba = _cache.acquire();
    58         byte iv[] = ba.getData(); //new byte[HopProcessor.IV_LENGTH];
     56        byte iv[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
    5957        System.arraycopy(orig, offset, iv, 0, iv.length);
    6058        //if (_config.getLength() > 1)
     
    6563            if (_log.shouldLog(Log.WARN))
    6664                _log.warn("Invalid IV, dropping at IBEP " + _config);
    67             _cache.release(ba);
     65            SimpleByteCache.release(iv);
    6866            return false;
    6967        }
     
    7371            decrypt(_context, _config, iv, orig, offset, length);
    7472       
    75         _cache.release(ba);
     73        SimpleByteCache.release(iv);
    7674       
    7775        if (_config.getLength() > 0) {
     
    9290    private void decrypt(RouterContext ctx, TunnelCreatorConfig cfg, byte iv[], byte orig[], int offset, int length) {
    9391        //Log log = ctx.logManager().getLog(OutboundGatewayProcessor.class);
    94         ByteArray ba = _cache.acquire();
    95         byte cur[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH]; // so we dont malloc
     92        byte cur[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
    9693        for (int i = cfg.getLength()-2; i >= 0; i--) { // dont include the endpoint, since that is the creator
    9794            OutboundGatewayProcessor.decrypt(ctx, iv, orig, offset, length, cur, cfg.getConfig(i));
     
    10198            //}
    10299        }
    103         _cache.release(ba);
     100        SimpleByteCache.release(cur);
    104101    }
    105102   
  • router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java

    r5056706 r6162908  
    1616   
    1717    private static final long MAX_LOOKUP_TIME = 15*1000;
     18    private static final int PRIORITY = OutNetMessage.PRIORITY_PARTICIPATING;
    1819
    1920    public InboundGatewayReceiver(RouterContext ctx, HopConfig cfg) {
     
    5960        out.setTarget(_target);
    6061        out.setExpiration(msg.getMessageExpiration());
    61         out.setPriority(200);
     62        out.setPriority(PRIORITY);
    6263        _context.outNetMessagePool().add(out);
    6364        return msg.getUniqueId();
  • router/java/src/net/i2p/router/tunnel/OutboundGatewayProcessor.java

    r5056706 r6162908  
    44import net.i2p.data.Base64;
    55import net.i2p.data.ByteArray;
    6 import net.i2p.util.ByteCache;
    76import net.i2p.util.Log;
     7import net.i2p.util.SimpleByteCache;
    88
    99/**
     
    1919       
    2020    static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
    21     private static final ByteCache _cache = ByteCache.getInstance(128, HopProcessor.IV_LENGTH);
    2221
    2322    public OutboundGatewayProcessor(I2PAppContext ctx, TunnelCreatorConfig cfg) {
     
    3635     */
    3736    public void process(byte orig[], int offset, int length) {
    38         ByteArray ba = _cache.acquire();
    39         byte iv[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH];
     37        byte iv[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
    4038        //_context.random().nextBytes(iv);
    4139        //System.arraycopy(iv, 0, orig, offset, HopProcessor.IV_LENGTH);
     
    5048        if (_log.shouldLog(Log.DEBUG))
    5149            _log.debug("finished processing the preprocessed data");
    52         _cache.release(ba);
     50        SimpleByteCache.release(iv);
    5351    }
    5452   
     
    5957    private void decrypt(I2PAppContext ctx, TunnelCreatorConfig cfg, byte iv[], byte orig[], int offset, int length) {
    6058        Log log = ctx.logManager().getLog(OutboundGatewayProcessor.class);
    61         ByteArray ba = _cache.acquire();
    62         byte cur[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH]; // so we dont malloc
     59        byte cur[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
    6360        for (int i = cfg.getLength()-1; i >= 1; i--) { // dont include hop 0, since that is the creator
    6461            decrypt(ctx, iv, orig, offset, length, cur, cfg.getConfig(i));
     
    6865            }
    6966        }
    70         _cache.release(ba);
     67        SimpleByteCache.release(cur);
    7168    }
    7269   
  • router/java/src/net/i2p/router/tunnel/OutboundReceiver.java

    r5056706 r6162908  
    2121   
    2222    private static final long MAX_LOOKUP_TIME = 15*1000;
     23    private static final int PRIORITY = OutNetMessage.PRIORITY_MY_DATA;
    2324
    2425    public OutboundReceiver(RouterContext ctx, TunnelCreatorConfig cfg) {
     
    6263        m.setExpiration(msg.getMessageExpiration());
    6364        m.setTarget(ri);
    64         m.setPriority(400);
     65        m.setPriority(PRIORITY);
    6566        _context.outNetMessagePool().add(m);
    6667        _config.incrementProcessedMessages();
  • router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java

    r5056706 r6162908  
    33import java.util.List;
    44import java.util.concurrent.BlockingQueue;
    5 import java.util.concurrent.LinkedBlockingQueue;
    65
    76import net.i2p.data.Hash;
     
    109import net.i2p.router.Router;
    1110import net.i2p.router.RouterContext;
     11import net.i2p.router.util.CoDelBlockingQueue;
     12import net.i2p.router.util.CoDelPriorityBlockingQueue;
    1213import net.i2p.util.Log;
    1314
     
    3637 */
    3738class PumpedTunnelGateway extends TunnelGateway {
    38     private final BlockingQueue<Pending> _prequeue;
     39    private final BlockingQueue<PendingGatewayMessage> _prequeue;
    3940    private final TunnelGatewayPumper _pumper;
     41    private final boolean _isInbound;
    4042   
     43    private static final int MAX_OB_MSGS_PER_PUMP = 16;
     44    private static final int MAX_IB_MSGS_PER_PUMP = 8;
     45    private static final int INITIAL_OB_QUEUE = 64;
     46    private static final int MAX_IB_QUEUE = 1024;
     47
    4148    /**
    4249     * @param preprocessor this pulls Pending messages off a list, builds some
     
    4956    public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
    5057        super(context, preprocessor, sender, receiver);
    51         _prequeue = new LinkedBlockingQueue();
     58        if (getClass() == PumpedTunnelGateway.class) {
     59            // Unbounded priority queue for outbound
     60            _prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
     61            _isInbound = false;
     62        } else {  // extended by ThrottledPTG for IB
     63            // Bounded non-priority queue for inbound
     64            _prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE);
     65            _isInbound = true;
     66        }
    5267        _pumper = pumper;
    5368    }
     
    5873     * If it is queued up past its expiration, it is silently dropped
    5974     *
     75     * This is only for OBGWs. See TPTG override for IBGWs.
     76     *
    6077     * @param msg message to be sent through the tunnel
    6178     * @param toRouter router to send to after the endpoint (or null for endpoint processing)
     
    6481    @Override
    6582    public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
     83        OutboundGatewayMessage cur = new OutboundGatewayMessage(msg, toRouter, toTunnel);
     84        if (_log.shouldLog(Log.DEBUG))
     85            _log.debug("OB PTG add type " + msg.getType() + " pri " + cur.getPriority());
     86        add(cur);
     87    }
     88
     89    protected void add(PendingGatewayMessage cur) {
    6690        _messagesSent++;
    67         Pending cur = new PendingImpl(msg, toRouter, toTunnel);
    68         _prequeue.offer(cur);
    69         _pumper.wantsPumping(this);
     91        if (_prequeue.offer(cur))
     92            _pumper.wantsPumping(this);
     93        else
     94            _context.statManager().addRateData("tunnel.dropGatewayOverflow", 1);
    7095    }
    7196
     
    80105     *                 Must be empty when called; will always be emptied before return.
    81106     */
    82     void pump(List<Pending> queueBuf) {
    83         _prequeue.drainTo(queueBuf);
     107    void pump(List<PendingGatewayMessage> queueBuf) {
     108        // TODO if an IBGW, and the next hop is backlogged,
     109        // drain less or none... better to let things back up here.
     110        // Don't do this for OBGWs?
     111        int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
     112        _prequeue.drainTo(queueBuf, max);
    84113        if (queueBuf.isEmpty())
    85114            return;
     
    106135            // expire any as necessary, even if its framented
    107136            for (int i = 0; i < _queue.size(); i++) {
    108                 Pending m = _queue.get(i);
     137                PendingGatewayMessage m = _queue.get(i);
    109138                if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
    110139                    if (_log.shouldLog(Log.DEBUG))
     
    121150       
    122151        if (delayedFlush) {
    123             _context.simpleTimer().addEvent(_delayedFlush, delayAmount);
     152            _delayedFlush.reschedule(delayAmount);
    124153        }
    125         _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
    126         long complete = System.currentTimeMillis();
    127         if (_log.shouldLog(Log.DEBUG))
     154        //_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
     155        if (_log.shouldLog(Log.DEBUG)) {
     156            long complete = System.currentTimeMillis();
    128157            _log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
    129158                       + " delayed? " + delayedFlush + " remaining: " + remaining
     
    132161                       + " expire: " + (afterExpire-afterPreprocess)
    133162                       + " queue flush: " + (complete-afterExpire));
     163        }
    134164        queueBuf.clear();
     165        if (!_prequeue.isEmpty())
     166            _pumper.wantsPumping(this);
    135167    }
    136168   
  • router/java/src/net/i2p/router/tunnel/ThrottledPumpedTunnelGateway.java

    r5056706 r6162908  
    4343            return;
    4444        }
    45         super.add(msg, toRouter, toTunnel);
     45        add(new PendingGatewayMessage(msg, toRouter, toTunnel));
    4646    }
    4747}
  • router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java

    r5056706 r6162908  
    1010import net.i2p.util.ByteCache;
    1111import net.i2p.util.Log;
     12import net.i2p.util.SimpleByteCache;
    1213
    1314/**
     
    3435    protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE);
    3536
    36     private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
    37     private static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
    38    
    3937    public TrivialPreprocessor(RouterContext ctx) {
    4038        _context = ctx;
     
    5149     * NOTE: Unused here, see BatchedPreprocessor override, super is not called.
    5250     */
    53     public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
     51    public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    5452        throw new IllegalArgumentException("unused, right?");
    5553    }
     
    6462     */
    6563    protected void preprocess(byte fragments[], int fragmentLength) {
    66         ByteArray ivBuf = _ivCache.acquire();
    67         byte iv[] = ivBuf.getData(); // new byte[IV_SIZE];
     64        byte iv[] = SimpleByteCache.acquire(IV_SIZE);
    6865        _context.random().nextBytes(iv);
    6966       
     
    7168        System.arraycopy(iv, 0, fragments, fragmentLength, IV_SIZE);
    7269       
    73         ByteArray hashBuf = _hashCache.acquire();
     70        byte[] hashBuf = SimpleByteCache.acquire(Hash.HASH_LENGTH);
    7471        //Hash h = _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE);
    75         _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, hashBuf.getData(), 0);
     72        _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, hashBuf, 0);
    7673       
    7774        //Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE);
     
    9289        offset += IV_SIZE;
    9390        //System.arraycopy(h.getData(), 0, fragments, offset, 4);
    94         System.arraycopy(hashBuf.getData(), 0, fragments, offset, 4);
     91        System.arraycopy(hashBuf, 0, fragments, offset, 4);
    9592        offset += 4;
    9693        //_log.debug("before pad  : " + Base64.encode(target));
    9794       
    98         _hashCache.release(hashBuf);
    99         _ivCache.release(ivBuf);
     95        SimpleByteCache.release(hashBuf);
     96        SimpleByteCache.release(iv);
    10097       
    10198        // fits in a single message, so may be smaller than the full size
     
    156153    private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
    157154
    158     protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) {
     155    protected int writeFirstFragment(PendingGatewayMessage msg, byte target[], int offset) {
    159156        boolean fragmented = false;
    160157        int instructionsLength = getInstructionsSize(msg);
     
    222219    }
    223220   
    224     protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) {
     221    protected int writeSubsequentFragment(PendingGatewayMessage msg, byte target[], int offset) {
    225222        boolean isLast = true;
    226223       
     
    270267     *  call getInstructionAugmentationSize() for that.
    271268     */
    272     protected int getInstructionsSize(TunnelGateway.Pending msg) {
     269    protected int getInstructionsSize(PendingGatewayMessage msg) {
    273270        if (msg.getFragmentNumber() > 0)
    274271            return 7;
     
    288285   
    289286    /** @return 0 or 4 */
    290     protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
     287    protected int getInstructionAugmentationSize(PendingGatewayMessage msg, int offset, int instructionsSize) {
    291288        int payloadLength = msg.getData().length - msg.getOffset();
    292289        if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
  • router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java

    r5056706 r6162908  
    3030 * various tunnels.
    3131 *
     32 *<pre>
     33 *  For each type of tunnel, it creates a chain of handlers, as follows:
     34 *
     35 *  Following tunnels are created by us:
     36 *
     37 *    Outbound Gateway > 0 hops:
     38 *       PumpedTunnelGateway
     39 *         BatchedRouterPreprocessor -> OutboundSender -> OutboundReceiver -> OutNetMessagePool
     40 *
     41 *    Outbound zero-hop Gateway+Endpoint:
     42 *       TunnelGatewayZeroHop
     43 *         OutboundMessageDistributor -> OutNetMessagePool
     44 *
     45 *    Inbound Endpoint > 0 hops:
     46 *       TunnelParticipant
     47 *        RouterFragmentHandler ->  InboundEndpointProcessor -> InboundMessageDistributor -> InNetMessagePool
     48 *
     49 *    Inbound zero-hop Gateway+Endpoint:
     50 *       TunnelGatewayZeroHop
     51 *         InboundMessageDistributor -> InNetMessagePool
     52 *
     53 *
     54 *  Following tunnels are NOT created by us:
     55 *
     56 *    Participant (not gateway or endpoint)
     57 *       TunnelParticipant
     58 *         HopProcessor -> OutNetMessagePool
     59 *
     60 *    Outbound Endpoint > 0 hops:
     61 *       OutboundTunnelEndpoint
     62 *         RouterFragmentHandler -> HopProcessor -> OutboundMessageDistributor -> OutNetMessagePool
     63 *
     64 *    Inbound Gateway > 0 hops:
     65 *       ThrottledPumpedTunnelGateway
     66 *         BatchedRouterPreprocessor -> InboundSender -> InboundGatewayReceiver -> OutNetMessagePool
     67 *
     68 *</pre>
    3269 */
    3370public class TunnelDispatcher implements Service {
     
    175212        ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 60*60*1000 });
    176213        ctx.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*60*1000 });
     214        // following is for PumpedTunnelGateway
     215        ctx.statManager().createRateStat("tunnel.dropGatewayOverflow", "Dropped message at GW, queue full", "Tunnels", new long[] { 60*60*1000 });
    177216    }
    178217
     
    771810******/
    772811
    773     public void startup() {
     812    public synchronized void startup() {
    774813        // Note that we only use the validator for participants and OBEPs, not IBGWs, so
    775814        // this BW estimate will be high by about 33% assuming 2-hop tunnels average
     
    785824    }
    786825
    787     public void shutdown() {
     826    public synchronized void shutdown() {
    788827        if (_validator != null)
    789828            _validator.destroy();
     
    795834        _inboundGateways.clear();
    796835        _participatingConfig.clear();
     836        _leaveJob.clear();
    797837    }
    798838
     
    827867        public void add(HopConfig cfg) {
    828868            _configs.offer(cfg);
     869        }
     870
     871        public void clear() {
     872            _configs.clear();
    829873        }
    830874       
  • router/java/src/net/i2p/router/tunnel/TunnelGateway.java

    r5056706 r6162908  
    1111import net.i2p.router.RouterContext;
    1212import net.i2p.util.Log;
    13 import net.i2p.util.SimpleTimer;
     13import net.i2p.util.SimpleTimer2;
    1414
    1515/**
     
    3333 * </ol>
    3434 *
     35 * Unused directly - see PumpedTunnelGateway, ThrottledPumpedTunnelGateway, and TunnelGatewayZeroHop overrides.
    3536 */
    3637class TunnelGateway {
    3738    protected final RouterContext _context;
    3839    protected final Log _log;
    39     protected final List<Pending> _queue;
     40    protected final List<PendingGatewayMessage> _queue;
    4041    protected final QueuePreprocessor _preprocessor;
    4142    protected final Sender _sender;
     
    5455     *                 to the first hop
    5556     */
    56     public TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
     57    protected TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
    5758        _context = context;
    5859        _log = context.logManager().getLog(getClass());
     
    6465        _delayedFlush = new DelayedFlush();
    6566        _lastFlush = _context.clock().now();
    66         _context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
    67         _context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     67        //_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
     68        //_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
    6869    }
    6970   
     
    8283     * If it is queued up past its expiration, it is silently dropped
    8384     *
     85     * UNUSED - see overrides
     86     *
    8487     * @param msg message to be sent through the tunnel
    8588     * @param toRouter router to send to after the endpoint (or null for endpoint processing)
     
    8790     */
    8891    public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
     92        throw new UnsupportedOperationException("unused, right?");
     93/****
    8994        _messagesSent++;
    9095        long startAdd = System.currentTimeMillis();
     
    125130       
    126131        if (delayedFlush) {
    127             _context.simpleTimer().addEvent(_delayedFlush, delayAmount);
     132            _delayedFlush.reschedule(delayAmount);
    128133        }
    129134        _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
     
    138143                       + " queue flush: " + (complete-afterExpire));
    139144        }
     145****/
    140146    }
    141147   
     
    166172         * @return true if we should delay before preprocessing again
    167173         */
    168         public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver);
     174        public boolean preprocessQueue(List<PendingGatewayMessage> pending, Sender sender, Receiver receiver);
    169175       
    170176        /** how long do we want to wait before flushing */
     
    179185        public long receiveEncrypted(byte encrypted[]);
    180186    }
    181    
    182     /**
    183      *  Stores all the state for an unsent or partially-sent message
    184      */
    185     public static class Pending {
    186         protected final Hash _toRouter;
    187         protected final TunnelId _toTunnel;
    188         protected final long _messageId;
    189         protected final long _expiration;
    190         protected final byte _remaining[];
    191         protected int _offset;
    192         protected int _fragmentNumber;
    193         protected final long _created;
    194         private List<Long> _messageIds;
    195        
    196         public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
    197             this(message, toRouter, toTunnel, System.currentTimeMillis());
    198         }
    199         public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) {
    200             _toRouter = toRouter;
    201             _toTunnel = toTunnel;
    202             _messageId = message.getUniqueId();
    203             _expiration = message.getMessageExpiration();
    204             _remaining = message.toByteArray();
    205             _created = now;
    206         }
    207         /** may be null */
    208         public Hash getToRouter() { return _toRouter; }
    209         /** may be null */
    210         public TunnelId getToTunnel() { return _toTunnel; }
    211         public long getMessageId() { return _messageId; }
    212         public long getExpiration() { return _expiration; }
    213         /** raw unfragmented message to send */
    214         public byte[] getData() { return _remaining; }
    215         /** index into the data to be sent */
    216         public int getOffset() { return _offset; }
    217         /** move the offset */
    218         public void setOffset(int offset) { _offset = offset; }
    219         public long getLifetime() { return System.currentTimeMillis()-_created; }
    220         /** which fragment are we working on (0 for the first fragment) */
    221         public int getFragmentNumber() { return _fragmentNumber; }
    222         /** ok, fragment sent, increment what the next will be */
    223         public void incrementFragmentNumber() { _fragmentNumber++; }
    224         /**
    225          *  Add an ID to the list of the TunnelDataMssages this message was fragmented into.
    226          *  Unused except in notePreprocessing() calls for debugging
    227          */
    228         public void addMessageId(long id) {
    229             synchronized (Pending.this) {
    230                 if (_messageIds == null)
    231                     _messageIds = new ArrayList();
    232                 _messageIds.add(Long.valueOf(id));
    233             }
    234         }
    235         /**
    236          *  The IDs of the TunnelDataMssages this message was fragmented into.
    237          *  Unused except in notePreprocessing() calls for debugging
    238          */
    239         public List<Long> getMessageIds() {
    240             synchronized (Pending.this) {
    241                 if (_messageIds != null)
    242                     return new ArrayList(_messageIds);
    243                 else
    244                     return new ArrayList();
    245             }
    246         }
    247     }
    248 
    249     /** Extend for debugging */
    250     class PendingImpl extends Pending {
    251         public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
    252             super(message, toRouter, toTunnel, _context.clock().now());
    253         }       
    254        
    255         @Override
    256         public String toString() {
    257             StringBuilder buf = new StringBuilder(64);
    258             buf.append("Message ").append(_messageId).append(" on ");
    259             buf.append(TunnelGateway.this.toString());
    260             if (_toRouter != null) {
    261                 buf.append(" targetting ");
    262                 buf.append(_toRouter.toBase64()).append(" ");
    263                 if (_toTunnel != null)
    264                     buf.append(_toTunnel.getTunnelId());
    265             }
    266             long now = _context.clock().now();
    267             buf.append(" actual lifetime ");
    268             buf.append(now - _created).append("ms");
    269             buf.append(" potential lifetime ");
    270             buf.append(_expiration - _created).append("ms");
    271             buf.append(" size ").append(_remaining.length);
    272             buf.append(" offset ").append(_offset);
    273             buf.append(" frag ").append(_fragmentNumber);
    274             return buf.toString();
    275         }
    276 
    277         @Override
    278         public long getLifetime() { return _context.clock().now()-_created; }
    279     }
    280    
    281     private class DelayedFlush implements SimpleTimer.TimedEvent {
     187
     188    protected class DelayedFlush extends SimpleTimer2.TimedEvent {
     189        DelayedFlush() {
     190            super(_context.simpleTimer2());
     191        }
     192
    282193        public void timeReached() {
    283194            boolean wantRequeue = false;
    284             int remaining = 0;
    285             long beforeLock = _context.clock().now();
    286             long afterChecked = -1;
     195            //int remaining = 0;
     196            //long beforeLock = _context.clock().now();
     197            //long afterChecked = -1;
    287198            long delayAmount = -1;
    288199            //if (_queue.size() > 10000) // stay out of the synchronized block
     
    291202                //if (_queue.size() > 10000) // stay in the synchronized block
    292203                //    System.out.println("foo!");
    293                 afterChecked = _context.clock().now();
     204                //afterChecked = _context.clock().now();
    294205                if (!_queue.isEmpty()) {
    295                     if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
    296                         _log.debug("Remaining before delayed flush preprocessing: " + _queue);
     206                    //if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
     207                    //    _log.debug("Remaining before delayed flush preprocessing: " + _queue);
    297208                    wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
    298                     if (wantRequeue)
     209                    if (wantRequeue) {
    299210                        delayAmount = _preprocessor.getDelayAmount();
    300                     if (_log.shouldLog(Log.DEBUG))
    301                         _log.debug("Remaining after delayed flush preprocessing (requeue? " + wantRequeue + "): " + _queue);
     211                        if (_log.shouldLog(Log.DEBUG))
     212                            _log.debug("Remaining after delayed flush preprocessing: " + _queue);
     213                    }
    302214                }
    303                 remaining = _queue.size();
     215                //remaining = _queue.size();
    304216            }
    305217           
    306218            if (wantRequeue)
    307                 _context.simpleTimer().addEvent(_delayedFlush, delayAmount);
     219                schedule(delayAmount);
    308220            else
    309221                _lastFlush = _context.clock().now();
    310222           
    311             _context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
     223            //_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
    312224        }
    313225    }
  • router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java

    r5056706 r6162908  
    22
    33import java.util.ArrayList;
     4import java.util.Iterator;
     5import java.util.LinkedHashSet;
    46import java.util.List;
     7import java.util.Set;
    58import java.util.concurrent.BlockingQueue;
    69import java.util.concurrent.LinkedBlockingQueue;
     
    1013
    1114/**
    12  * run through the tunnel gateways that have had messages added to them and push
    13  * those messages through the preprocessing and sending process
     15 * Run through the tunnel gateways that have had messages added to them and push
     16 * those messages through the preprocessing and sending process.
     17 *
     18 * TODO do we need this many threads?
     19 * TODO this combines IBGWs and OBGWs, do we wish to separate the two
     20 * and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
    1421 */
    1522class TunnelGatewayPumper implements Runnable {
    1623    private final RouterContext _context;
    17     private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
    18     private boolean _stop;
     24    private final Set<PumpedTunnelGateway> _wantsPumping;
     25    private volatile boolean _stop;
    1926    private static final int MIN_PUMPERS = 1;
    2027    private static final int MAX_PUMPERS = 4;
     
    2431    public TunnelGatewayPumper(RouterContext ctx) {
    2532        _context = ctx;
    26         _wantsPumping = new LinkedBlockingQueue();
     33        _wantsPumping = new LinkedHashSet(16);
    2734        long maxMemory = Runtime.getRuntime().maxMemory();
    2835        if (maxMemory == Long.MAX_VALUE)
     
    3643        _stop=true;
    3744        _wantsPumping.clear();
    38         PumpedTunnelGateway poison = new PoisonPTG(_context);
    39         for (int i = 0; i < _pumpers; i++)
    40             _wantsPumping.offer(poison);
     45        for (int i = 0; i < _pumpers; i++) {
     46            PumpedTunnelGateway poison = new PoisonPTG(_context);
     47            wantsPumping(poison);
     48        }
    4149        for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
    4250            try {
     
    4856   
    4957    public void wantsPumping(PumpedTunnelGateway gw) {
    50         if (!_stop)
    51             _wantsPumping.offer(gw);
     58        if (!_stop) {
     59            synchronized (_wantsPumping) {
     60                if (_wantsPumping.add(gw))
     61                    _wantsPumping.notify();
     62            }
     63        }
    5264    }
    5365   
    5466    public void run() {
    5567        PumpedTunnelGateway gw = null;
    56         List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
     68        List<PendingGatewayMessage> queueBuf = new ArrayList(32);
    5769        while (!_stop) {
    5870            try {
    59                 gw = _wantsPumping.take();
     71                synchronized (_wantsPumping) {
     72                    if (_wantsPumping.isEmpty()) {
     73                        _wantsPumping.wait();
     74                    } else {
     75                        Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
     76                        gw = iter.next();
     77                        iter.remove();
     78                    }
     79                }
    6080            } catch (InterruptedException ie) {}
    6181            if (gw != null) {
  • router/java/src/net/i2p/router/tunnel/TunnelParticipant.java

    r5056706 r6162908  
    3131    /** for next hop when a tunnel is first created */
    3232    private static final long LONG_MAX_LOOKUP_TIME = 30*1000;
     33    private static final int PRIORITY = OutNetMessage.PRIORITY_PARTICIPATING;
    3334
    3435    /** not an inbound endpoint */
     
    197198        m.setExpiration(msg.getMessageExpiration());
    198199        m.setTarget(ri);
    199         m.setPriority(200);
     200        m.setPriority(PRIORITY);
    200201        if (_log.shouldLog(Log.DEBUG))
    201202            _log.debug("Forward on from " + _config + ": " + msg);
  • router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java

    r5056706 r6162908  
    6262
    6363    private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
     64    private static final int PRIORITY = OutNetMessage.PRIORITY_BUILD_REPLY;
    6465   
    6566    /**
     
    690691            msg.setMessage(state.msg);
    691692            msg.setExpiration(state.msg.getMessageExpiration());
    692             msg.setPriority(300);
     693            msg.setPriority(PRIORITY);
    693694            msg.setTarget(nextPeerInfo);
    694695            if (response == 0)
     
    723724                outMsg.setExpiration(m.getMessageExpiration());
    724725                outMsg.setMessage(m);
    725                 outMsg.setPriority(300);
     726                outMsg.setPriority(PRIORITY);
    726727                outMsg.setTarget(nextPeerInfo);
    727728                if (response == 0)
  • router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java

    r5056706 r6162908  
    3030            ORDER.add(Integer.valueOf(i));
    3131    }
    32     private static final int PRIORITY = 500;
     32
     33    private static final int PRIORITY = OutNetMessage.PRIORITY_MY_BUILD_REQUEST;
     34
    3335    /**
    3436     *  At 10 seconds, we were receiving about 20% of replies after expiration
  • router/java/src/net/i2p/router/util/DecayingBloomFilter.java

    r5056706 r6162908  
    88import net.i2p.data.DataHelper;
    99import net.i2p.util.Log;
    10 import net.i2p.util.SimpleTimer;
     10import net.i2p.util.SimpleTimer2;
    1111
    1212import org.xlattice.crypto.filters.BloomSHA1;
     
    3939    protected long _currentDuplicates;
    4040    protected volatile boolean _keepDecaying;
    41     protected final SimpleTimer.TimedEvent _decayEvent;
     41    protected final SimpleTimer2.TimedEvent _decayEvent;
    4242    /** just for logging */
    4343    protected final String _name;
     
    6565        _decayEvent = new DecayEvent();
    6666        _keepDecaying = true;
    67         SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
     67        _decayEvent.schedule(_durationMs);
    6868    }
    6969
     
    119119        _decayEvent = new DecayEvent();
    120120        _keepDecaying = true;
    121         SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
     121        _decayEvent.schedule(_durationMs);
    122122        if (_log.shouldLog(Log.WARN))
    123123           _log.warn("New DBF " + name + " m = " + m + " k = " + k + " entryBytes = " + entryBytes +
     
    275275    public void stopDecaying() {
    276276        _keepDecaying = false;
    277         SimpleTimer.getInstance().removeEvent(_decayEvent);
     277        _decayEvent.cancel();
    278278    }
    279279   
     
    311311    }
    312312   
    313     private class DecayEvent implements SimpleTimer.TimedEvent {
     313    private class DecayEvent extends SimpleTimer2.TimedEvent {
     314        DecayEvent() {
     315            super(_context.simpleTimer2());
     316        }
     317       
    314318        public void timeReached() {
    315319            if (_keepDecaying) {
    316320                decay();
    317                 SimpleTimer.getInstance().addEvent(DecayEvent.this, _durationMs);
     321                schedule(_durationMs);
    318322            }
    319323        }
Note: See TracChangeset for help on using the changeset viewer.