Changeset 617d1cd


Ignore:
Timestamp:
Mar 18, 2010 3:49:03 PM (10 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
5fb01a0
Parents:
05597ae (diff), f672193 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p.zzz.test' (head c295ab421dd719cfe0e273268b5b4e48505e4f61)

to branch 'i2p.i2p' (head 995914d8e049d9bb695fd25e4cf5be860cd4e487)

Files:
68 edited
1 moved

Legend:

Unmodified
Added
Removed
  • apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java

    r05597ae r617d1cd  
    164164
    165165                // XXX - Should also register overhead...
    166                 if (m.type == Message.PIECE)
    167                   state.uploaded(m.len);
     166                // Don't let other clients requesting big chunks get an advantage
     167                // when we are seeding;
     168                // only count the rest of the upload after sendMessage().
     169                int remainder = 0;
     170                if (m.type == Message.PIECE) {
     171                  if (m.len <= PeerState.PARTSIZE) {
     172                     state.uploaded(m.len);
     173                  } else {
     174                     state.uploaded(PeerState.PARTSIZE);
     175                     remainder = m.len - PeerState.PARTSIZE;
     176                  }
     177                }
    168178
    169179                m.sendMessage(dout);
     180                if (remainder > 0)
     181                  state.uploaded(remainder);
    170182                m = null;
    171183              }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java

    r05597ae r617d1cd  
    204204                if (_log.shouldLog(Log.INFO))
    205205                    _log.info("Proxy list is empty - no outproxy available");
    206                 l.log("Proxy list is emtpy - no outproxy available");
     206                l.log("Proxy list is empty - no outproxy available");
    207207                return null;
    208208            }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPUnwrapper.java

    r05597ae r617d1cd  
    5151        int headerlen = h.getBytes().length;
    5252        byte unwrapped[] = new byte[data.length - headerlen];
    53         System.arraycopy(unwrapped, 0, data, headerlen, unwrapped.length);
     53        System.arraycopy(data, headerlen, unwrapped, 0, unwrapped.length);
    5454        this.sink.send(dest, unwrapped);
    5555    }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/SOCKSUDPWrapper.java

    r05597ae r617d1cd  
    4040        byte[] header = h.getBytes();
    4141        byte wrapped[] = new byte[header.length + data.length];
    42         System.arraycopy(wrapped, 0, header, 0, header.length);
    43         System.arraycopy(wrapped, header.length, data, 0, data.length);
     42        System.arraycopy(header, 0, wrapped, 0, header.length);
     43        System.arraycopy(data, 0, wrapped, header.length, data.length);
    4444        this.sink.send(from, wrapped);
    4545    }
  • apps/routerconsole/java/src/net/i2p/router/web/ConfigNetHandler.java

    r05597ae r617d1cd  
    299299    }
    300300   
     301    private static final int DEF_BURST_PCT = 10;
     302    private static final int DEF_BURST_TIME = 20;
     303
    301304    private void updateRates() {
    302305        boolean updated = false;
     
    311314        }
    312315
     316        // Since burst is now hidden in the gui, set burst to +10% for 20 seconds
    313317        if ( (_inboundRate != null) && (_inboundRate.length() > 0) &&
    314318            !_inboundRate.equals(_context.getProperty(FIFOBandwidthRefiller.PROP_INBOUND_BANDWIDTH, "" + FIFOBandwidthRefiller.DEFAULT_INBOUND_BANDWIDTH))) {
    315319            _context.router().setConfigSetting(FIFOBandwidthRefiller.PROP_INBOUND_BANDWIDTH, _inboundRate);
     320            try {
     321                int rate = Integer.parseInt(_inboundRate) * (100 + DEF_BURST_PCT) / 100;
     322                int kb = DEF_BURST_TIME * rate;
     323                _context.router().setConfigSetting(FIFOBandwidthRefiller.PROP_INBOUND_BURST_BANDWIDTH, "" + rate);
     324                _context.router().setConfigSetting(FIFOBandwidthRefiller.PROP_INBOUND_BANDWIDTH_PEAK, "" + kb);
     325            } catch (NumberFormatException nfe) {}
    316326            updated = true;
    317327        }
     
    319329            !_outboundRate.equals(_context.getProperty(FIFOBandwidthRefiller.PROP_OUTBOUND_BANDWIDTH, "" + FIFOBandwidthRefiller.DEFAULT_OUTBOUND_BANDWIDTH))) {
    320330            _context.router().setConfigSetting(FIFOBandwidthRefiller.PROP_OUTBOUND_BANDWIDTH, _outboundRate);
     331            try {
     332                int rate = Integer.parseInt(_outboundRate) * (100 + DEF_BURST_PCT) / 100;
     333                int kb = DEF_BURST_TIME * rate;
     334                _context.router().setConfigSetting(FIFOBandwidthRefiller.PROP_OUTBOUND_BURST_BANDWIDTH, "" + rate);
     335                _context.router().setConfigSetting(FIFOBandwidthRefiller.PROP_OUTBOUND_BANDWIDTH_PEAK, "" + kb);
     336            } catch (NumberFormatException nfe) {}
    321337            updated = true;
    322338        }
  • apps/routerconsole/java/src/net/i2p/router/web/NavHelper.java

    r05597ae r617d1cd  
    88
    99public class NavHelper {
    10     private static Map<String, String> _apps = new ConcurrentHashMap();
     10    private static Map<String, String> _apps = new ConcurrentHashMap(4);
    1111   
    1212    /**
     
    2929     */
    3030    public static String getClientAppLinks(I2PAppContext ctx) {
    31         StringBuilder buf = new StringBuilder(1024);
     31        if (_apps.isEmpty())
     32            return "";
     33        StringBuilder buf = new StringBuilder(256);
    3234        for (Iterator<String> iter = _apps.keySet().iterator(); iter.hasNext(); ) {
    3335            String name = iter.next();
  • apps/routerconsole/java/src/net/i2p/router/web/NetDbRenderer.java

    r05597ae r617d1cd  
    283283        for (Iterator iter = info.getAddresses().iterator(); iter.hasNext(); ) {
    284284            RouterAddress addr = (RouterAddress)iter.next();
    285             buf.append("<b>").append(DataHelper.stripHTML(addr.getTransportStyle())).append(":</b> ");
     285            String style = addr.getTransportStyle();
     286            buf.append("<b>").append(DataHelper.stripHTML(style)).append(":</b> ");
     287            int cost = addr.getCost();
     288            if (!((style.equals("SSU") && cost == 5) || (style.equals("NTCP") && cost == 10)))
     289                buf.append('[').append("cost").append('=').append("" + cost).append("] ");
    286290            for (Iterator optIter = addr.getOptions().keySet().iterator(); optIter.hasNext(); ) {
    287291                String name = (String)optIter.next();
  • apps/routerconsole/java/src/net/i2p/router/web/PluginStarter.java

    r05597ae r617d1cd  
    181181     *  @throws just about anything, caller would be wise to catch Throwable
    182182     */
    183     static boolean stopPlugin(RouterContext ctx, String appName) throws IOException {
     183    static boolean stopPlugin(RouterContext ctx, String appName) throws Exception {
    184184        Log log = ctx.logManager().getLog(PluginStarter.class);
    185185        File pluginDir = new File(ctx.getAppDir(), PluginUpdateHandler.PLUGIN_DIR + '/' + appName);
     
    229229
    230230    /** @return true on success - caller should call stopPlugin() first */
    231     static boolean deletePlugin(RouterContext ctx, String appName) throws IOException {
     231    static boolean deletePlugin(RouterContext ctx, String appName) throws Exception {
    232232        Log log = ctx.logManager().getLog(PluginStarter.class);
    233233        File pluginDir = new File(ctx.getAppDir(), PluginUpdateHandler.PLUGIN_DIR + '/' + appName);
     
    349349    }
    350350
    351     /** @param action "start" or "stop" or "uninstall" */
    352     private static void runClientApps(RouterContext ctx, File pluginDir, List<ClientAppConfig> apps, String action) {
     351    /**
     352     *  @param action "start" or "stop" or "uninstall"
     353     *  @throws just about anything if an app has a delay less than zero, caller would be wise to catch Throwable
     354     *  If no apps have a delay less than zero, it shouldn't throw anything
     355     */
     356    private static void runClientApps(RouterContext ctx, File pluginDir, List<ClientAppConfig> apps, String action) throws Exception {
    353357        Log log = ctx.logManager().getLog(PluginStarter.class);
    354358        for(ClientAppConfig app : apps) {
     
    389393                addToClasspath(cp, app.clientName, log);
    390394            }
    391             if (app.delay == 0 || !action.equals("start")) {
     395
     396            if (app.delay < 0 && action.equals("start")) {
     397                // this will throw exceptions
     398                LoadClientAppsJob.runClientInline(app.className, app.clientName, argVal, log);
     399            } else if (app.delay == 0 || !action.equals("start")) {
     400                // quick check, will throw ClassNotFoundException on error
     401                LoadClientAppsJob.testClient(app.className);
    392402                // run this guy now
    393403                LoadClientAppsJob.runClient(app.className, app.clientName, argVal, log);
    394404            } else {
     405                // quick check, will throw ClassNotFoundException on error
     406                LoadClientAppsJob.testClient(app.className);
    395407                // wait before firing it up
    396408                ctx.jobQueue().addJob(new LoadClientAppsJob.DelayedRunClient(ctx, app.className, app.clientName, argVal, app.delay));
  • apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java

    r05597ae r617d1cd  
    206206                    _log.log(Log.CRIT, "Update was VERIFIED, will be installed at next restart");
    207207                    StringBuilder buf = new StringBuilder(64);
    208                     buf.append("<b>").append(_("Update downloaded")).append("</b><br>");
     208                    buf.append("<b>").append(_("Update downloaded")).append("<br>");
    209209                    if (System.getProperty("wrapper.version") != null)
    210210                        buf.append(_("Click Restart to install"));
     
    213213                    if (up.newVersion() != null)
    214214                        buf.append(' ').append(_("Version {0}", up.newVersion()));
     215                    buf.append("</b>");
    215216                    updateStatus(buf.toString());
    216217                }
  • apps/routerconsole/jsp/logs.jsp

    r05597ae r617d1cd  
    1111<div class="main" id="main">
    1212 <div class="joblog"><h3><%=intl._("I2P Version & Running Environment")%></h3><a name="version"> </a>
    13  <i><%=intl._("Please include this information in bug reports")%>:</i>
     13<p><%=intl._("Please report bugs on <a href=\"http://trac.i2p2.i2p/newticket\">trac.i2p2.i2p</a>.")%>
     14<p><i><%=intl._("Please include this information in bug reports")%>:</i>
    1415 <p>
    1516<b>I2P version:</b> <jsp:getProperty name="helper" property="version" /><br>
  • apps/streaming/java/src/net/i2p/client/streaming/Connection.java

    r05597ae r617d1cd  
    77import java.util.Map;
    88import java.util.TreeMap;
     9import java.util.concurrent.atomic.AtomicLong;
    910
    1011import net.i2p.I2PAppContext;
     
    3031    private long _receiveStreamId;
    3132    private long _lastSendTime;
    32     private long _lastSendId;
     33    private AtomicLong _lastSendId;
    3334    private boolean _resetReceived;
    3435    private boolean _resetSent;
     
    5051    private boolean _updatedShareOpts;
    5152    /** Packet ID (Long) to PacketLocal for sent but unacked packets */
    52     private final Map _outboundPackets;
     53    private final Map<Long, PacketLocal> _outboundPackets;
    5354    private PacketQueue _outboundQueue;
    5455    private ConnectionPacketHandler _handler;
     
    103104        _outputStream.setWriteTimeout((int)_options.getWriteTimeout());
    104105        _inputStream.setReadTimeout((int)_options.getReadTimeout());
    105         _lastSendId = -1;
     106        _lastSendId = new AtomicLong(-1);
    106107        _nextSendTime = -1;
    107108        _ackedPackets = 0;
     
    138139   
    139140    public long getNextOutboundPacketNum() {
    140         synchronized (this) {
    141             return ++_lastSendId;
    142         }
     141        return _lastSendId.incrementAndGet();
    143142    }
    144143   
     
    176175                started = true;
    177176                if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ||
    178                      (_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) {
     177                     (_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) {
    179178                    if (timeoutMs > 0) {
    180179                        if (timeLeft <= 0) {
     
    212211        PacketLocal packet = null;
    213212        synchronized (_outboundPackets) {
    214             if (_outboundPackets.size() > 0) {
     213            if (!_outboundPackets.isEmpty()) {
    215214                // ordered, so pick the lowest to retransmit
    216                 Iterator iter = _outboundPackets.values().iterator();
    217                 packet = (PacketLocal)iter.next();
     215                Iterator<PacketLocal> iter = _outboundPackets.values().iterator();
     216                packet = iter.next();
    218217                //iter.remove();
    219218            }
     
    404403        }
    405404       
    406         List acked = null;
     405        List<PacketLocal> acked = null;
    407406        synchronized (_outboundPackets) {
    408             for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
    409                 Long id = (Long)iter.next();
     407            for (Iterator<Long> iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
     408                Long id = iter.next();
    410409                if (id.longValue() <= ackThrough) {
    411410                    boolean nacked = false;
     
    415414                            if (nacks[i] == id.longValue()) {
    416415                                nacked = true;
    417                                 PacketLocal nackedPacket = (PacketLocal)_outboundPackets.get(id);
     416                                PacketLocal nackedPacket = _outboundPackets.get(id);
    418417                                nackedPacket.incrementNACKs();
    419418                                break; // NACKed
     
    424423                        if (acked == null)
    425424                            acked = new ArrayList(1);
    426                         PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id);
     425                        PacketLocal ackedPacket = _outboundPackets.get(id);
    427426                        ackedPacket.ackReceived();
    428427                        acked.add(ackedPacket);
     
    434433            if (acked != null) {
    435434                for (int i = 0; i < acked.size(); i++) {
    436                     PacketLocal p = (PacketLocal)acked.get(i);
     435                    PacketLocal p = acked.get(i);
    437436                    _outboundPackets.remove(new Long(p.getSequenceNum()));
    438437                    _ackedPackets++;
     
    444443                }
    445444            }
    446             if ( (_outboundPackets.size() <= 0) && (_activeResends != 0) ) {
     445            if ( (_outboundPackets.isEmpty()) && (_activeResends != 0) ) {
    447446                if (_log.shouldLog(Log.INFO))
    448447                    _log.info("All outbound packets acked, clearing " + _activeResends);
     
    571570        //boolean tagsCancelled = false;
    572571        synchronized (_outboundPackets) {
    573             for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
    574                 PacketLocal pl = (PacketLocal)iter.next();
     572            for (Iterator<PacketLocal> iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
     573                PacketLocal pl = iter.next();
    575574                //if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
    576575                //    tagsCancelled = true;
     
    653652     * @return The last sent packet ID
    654653     */
    655     public long getLastSendId() { return _lastSendId; }
     654    public long getLastSendId() { return _lastSendId.get(); }
    656655    /** Set the packet Id that was sent to a peer.
    657656     * @param id The packet ID
    658657     */
    659     public void setLastSendId(long id) { _lastSendId = id; }
     658    public void setLastSendId(long id) { _lastSendId.set(id); }
    660659   
    661660    /**
     
    784783            _lastCongestionSeenAt = _options.getWindowSize();
    785784            _lastCongestionTime = _context.clock().now();
    786             _lastCongestionHighestUnacked = _lastSendId;
     785            _lastCongestionHighestUnacked = _lastSendId.get();
    787786            _ackSinceCongestion = false;
    788787        }
     
    10231022        if (getCloseReceivedOn() > 0)
    10241023            buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
    1025         buf.append(" sent: ").append(1 + _lastSendId);
     1024        buf.append(" sent: ").append(1 + _lastSendId.get());
    10261025        if (_inputStream != null)
    10271026            buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    r05597ae r617d1cd  
    11package net.i2p.client.streaming;
    22
    3 import java.util.HashMap;
    43import java.util.HashSet;
    54import java.util.Iterator;
    65import java.util.Map;
    76import java.util.Set;
     7import java.util.concurrent.ConcurrentHashMap;
    88
    99import net.i2p.I2PAppContext;
     
    3333    private TCBShare _tcbShare;
    3434    /** Inbound stream ID (Long) to Connection map */
    35     private Map _connectionByInboundId;
     35    private ConcurrentHashMap<Long, Connection> _connectionByInboundId;
    3636    /** Ping ID (Long) to PingRequest */
    37     private final Map _pendingPings;
     37    private final Map<Long, PingRequest> _pendingPings;
    3838    private boolean _allowIncoming;
    3939    private int _maxConcurrentStreams;
    4040    private ConnectionOptions _defaultOptions;
    4141    private volatile int _numWaiting;
    42     private final Object _connectionLock;
    4342    private long SoTimeout;
    4443   
     
    4948        _defaultOptions = defaultOptions;
    5049        _log = _context.logManager().getLog(ConnectionManager.class);
    51         _connectionByInboundId = new HashMap(32);
    52         _pendingPings = new HashMap(4);
    53         _connectionLock = new Object();
     50        _connectionByInboundId = new ConcurrentHashMap(32);
     51        _pendingPings = new ConcurrentHashMap(4);
    5452        _messageHandler = new MessageHandler(_context, this);
    5553        _packetHandler = new PacketHandler(_context, this);
     
    7876   
    7977    Connection getConnectionByInboundId(long id) {
    80         synchronized (_connectionLock) {
    81             return (Connection)_connectionByInboundId.get(new Long(id));
    82         }
     78        return _connectionByInboundId.get(Long.valueOf(id));
    8379    }
    8480    /**
     
    8783     */
    8884    Connection getConnectionByOutboundId(long id) {
    89         synchronized (_connectionLock) {
    90             for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    91                 Connection con = (Connection)iter.next();
     85            for (Connection con : _connectionByInboundId.values()) {
    9286                if (DataHelper.eq(con.getSendStreamId(), id))
    9387                    return con;
    9488            }
    95         }
    9689        return null;
    9790    }
     
    136129        int active = 0;
    137130        int total = 0;
    138         synchronized (_connectionLock) {
    139             total = _connectionByInboundId.size();
    140             for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    141                 if ( ((Connection)iter.next()).getIsConnected() )
    142                     active++;
    143             }
     131
     132            // just for the stat
     133            //total = _connectionByInboundId.size();
     134            //for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
     135            //    if ( ((Connection)iter.next()).getIsConnected() )
     136            //        active++;
     137            //}
    144138            if (locked_tooManyStreams()) {
    145139                reject = true;
    146140            } else {
    147141                while (true) {
    148                     Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
     142                    Connection oldCon = _connectionByInboundId.putIfAbsent(Long.valueOf(receiveId), con);
    149143                    if (oldCon == null) {
    150144                        break;
    151145                    } else {
    152                         _connectionByInboundId.put(new Long(receiveId), oldCon);
    153146                        // receiveId already taken, try another
    154147                        receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
     
    156149                }
    157150            }
    158         }
    159151       
    160152        _context.statManager().addRateData("stream.receiveActive", active, total);
     
    180172            con.getPacketHandler().receivePacket(synPacket, con);
    181173        } catch (I2PException ie) {
    182             synchronized (_connectionLock) {
    183                 _connectionByInboundId.remove(new Long(receiveId));
    184             }
     174            _connectionByInboundId.remove(Long.valueOf(receiveId));
    185175            return null;
    186176        }
     
    216206                return null;
    217207            }
    218             boolean reject = false;
    219             synchronized (_connectionLock) {
     208
    220209                if (locked_tooManyStreams()) {
    221210                    // allow a full buffer of pending/waiting streams
     
    228217                        return null;
    229218                    }
    230                    
     219
    231220                    // no remaining streams, lets wait a bit
    232                     try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
     221                    // got rid of the lock, so just sleep (fixme?)
     222                    // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
     223                    try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
    233224                } else {
    234225                    con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
    235226                    con.setRemotePeer(peer);
    236227           
    237                     while (_connectionByInboundId.containsKey(new Long(receiveId))) {
     228                    while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) {
    238229                        receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
    239230                    }
    240                     _connectionByInboundId.put(new Long(receiveId), con);
     231                    _connectionByInboundId.put(Long.valueOf(receiveId), con);
    241232                    break; // stop looping as a psuedo-wait
    242233                }
    243             }
     234
    244235        }
    245236
     
    248239        con.eventOccurred();
    249240       
    250         _log.debug("Connect() conDelay = " + opts.getConnectDelay());
     241        if (_log.shouldLog(Log.DEBUG))
     242            _log.debug("Connect() conDelay = " + opts.getConnectDelay());
    251243        if (opts.getConnectDelay() <= 0) {
    252244            con.waitForConnect();
     
    259251    }
    260252
     253    /**
     254     *  Doesn't need to be locked any more
     255     *  @return too many
     256     */
    261257    private boolean locked_tooManyStreams() {
    262258        if (_maxConcurrentStreams <= 0) return false;
    263259        if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
    264260        int active = 0;
    265         for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    266             Connection con = (Connection)iter.next();
     261        for (Connection con : _connectionByInboundId.values()) {
    267262            if (con.getIsConnected())
    268263                active++;
     
    294289     */
    295290    public void disconnectAllHard() {
    296         synchronized (_connectionLock) {
    297             for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
    298                 Connection con = (Connection)iter.next();
    299                 con.disconnect(false, false);
    300             }
    301             _connectionByInboundId.clear();
    302             _connectionLock.notifyAll();
     291        for (Iterator<Connection> iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
     292            Connection con = iter.next();
     293            con.disconnect(false, false);
     294            iter.remove();
    303295        }
    304296        _tcbShare.stop();
     
    311303     */
    312304    public void removeConnection(Connection con) {
    313         boolean removed = false;
    314         synchronized (_connectionLock) {
    315             Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
    316             removed = (o == con);
     305
     306            Object o = _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId()));
     307            boolean removed = (o == con);
    317308            if (_log.shouldLog(Log.DEBUG))
    318309                _log.debug("Connection removed? " + removed + " remaining: "
     
    320311            if (!removed && _log.shouldLog(Log.DEBUG))
    321312                _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
    322             _connectionLock.notifyAll();
    323         }
     313
    324314        if (removed) {
    325315            _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime());
     
    345335     * @return set of Connection objects
    346336     */
    347     public Set listConnections() {
    348         synchronized (_connectionLock) {
     337    public Set<Connection> listConnections() {
    349338            return new HashSet(_connectionByInboundId.values());
    350         }
    351339    }
    352340
     
    369357
    370358    public boolean ping(Destination peer, long timeoutMs, boolean blocking, PingNotifier notifier) {
    371         Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
     359        Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
    372360        PacketLocal packet = new PacketLocal(_context, peer);
    373361        packet.setSendStreamId(id.longValue());
     
    382370        PingRequest req = new PingRequest(peer, packet, notifier);
    383371       
    384         synchronized (_pendingPings) {
    385             _pendingPings.put(id, req);
    386         }
     372        _pendingPings.put(id, req);
    387373       
    388374        _outboundQueue.enqueue(packet);
     
    394380                    try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
    395381            }
    396            
    397             synchronized (_pendingPings) {
    398                 _pendingPings.remove(id);
    399             }
     382            _pendingPings.remove(id);
    400383        } else {
    401384            SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
     
    419402       
    420403        public void timeReached() {
    421             boolean removed = false;
    422             synchronized (_pendingPings) {
    423                 Object o = _pendingPings.remove(_id);
    424                 if (o != null)
    425                     removed = true;
    426             }
    427             if (removed) {
     404            PingRequest pr = _pendingPings.remove(_id);
     405            if (pr != null) {
    428406                if (_notifier != null)
    429407                    _notifier.pingComplete(false);
     
    434412    }
    435413   
    436     private class PingRequest {
     414    private static class PingRequest {
    437415        private boolean _ponged;
    438416        private Destination _peer;
     
    446424        }
    447425        public void pong() {
    448             _log.debug("Ping successful");
     426            // static, no log
     427            //_log.debug("Ping successful");
    449428            //_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
    450429            synchronized (ConnectionManager.PingRequest.this) {
     
    459438   
    460439    void receivePong(long pingId) {
    461         PingRequest req = null;
    462         synchronized (_pendingPings) {
    463             req = (PingRequest)_pendingPings.remove(new Long(pingId));
    464         }
     440        PingRequest req = _pendingPings.remove(Long.valueOf(pingId));
    465441        if (req != null)
    466442            req.pong();
  • apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java

    r05597ae r617d1cd  
    11package net.i2p.client.streaming;
    22
    3 import java.util.ArrayList;
    4 import java.util.List;
     3import java.util.Iterator;
     4import java.util.Set;
    55
    66import net.i2p.I2PAppContext;
     
    99import net.i2p.client.I2PSessionListener;
    1010import net.i2p.util.Log;
     11import net.i2p.util.ConcurrentHashSet;
    1112
    1213/**
     
    1920    private I2PAppContext _context;
    2021    private Log _log;
    21     private final List _listeners;
     22    private final Set<I2PSocketManager.DisconnectListener> _listeners;
    2223   
    2324    public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) {
    2425        _manager = mgr;
    2526        _context = ctx;
    26         _listeners = new ArrayList(1);
     27        _listeners = new ConcurrentHashSet(1);
    2728        _log = ctx.logManager().getLog(MessageHandler.class);
    2829        _context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     
    7879        _manager.disconnectAllHard();
    7980       
    80         List listeners = null;
    81         synchronized (_listeners) {
    82             listeners = new ArrayList(_listeners);
    83             _listeners.clear();
    84         }
    85         for (int i = 0; i < listeners.size(); i++) {
    86             I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i);
     81        for (Iterator<I2PSocketManager.DisconnectListener> iter = _listeners.iterator(); iter.hasNext(); ) {
     82            I2PSocketManager.DisconnectListener lsnr = iter.next();
    8783            lsnr.sessionDisconnected();
     84            iter.remove();
    8885        }
    8986    }
     
    105102   
    106103    public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    107         synchronized (_listeners) {
    108104            _listeners.add(lsnr);
    109         }
    110105    }
    111106    public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
    112         synchronized (_listeners) {
    113107            _listeners.remove(lsnr);
    114         }
    115108    }
    116109}
  • apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java

    r05597ae r617d1cd  
    223223                // So perhaps it IS wise to be "overly worried" ...
    224224                forceReschedule(_passiveFlushDelay);
    225                 if (_log.shouldLog(Log.INFO))
    226                     _log.info("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
     225                if (_log.shouldLog(Log.DEBUG))
     226                    _log.debug("Enqueueing the flusher for " + _passiveFlushDelay + "ms out");
    227227            } else {
    228228                if (_log.shouldLog(Log.DEBUG))
  • apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java

    r05597ae r617d1cd  
    5656       
    5757        public void eventOccurred(Connection con) {
    58             _log.log(Log.ERROR, "Yell at jrandom: Event occurred on " + con, new Exception("source"));
     58            if (_log.shouldLog(Log.WARN))
     59                _log.warn("Yell at jrandom: Event occurred on " + con, new Exception("source"));
    5960        }
    6061        public boolean accept(Connection con) { return true; }
  • core/java/src/net/i2p/util/ByteCache.java

    r05597ae r617d1cd  
    11package net.i2p.util;
    22
    3 import java.util.ArrayList;
    43import java.util.Arrays;
    54import java.util.HashMap;
    6 import java.util.List;
    75import java.util.Map;
     6import java.util.Queue;
     7import java.util.concurrent.LinkedBlockingQueue;
    88
    99import net.i2p.I2PAppContext;
     
    3737    private Log _log;
    3838    /** list of available and available entries */
    39     private final List _available;
     39    private Queue<ByteArray> _available;
    4040    private int _maxCached;
    4141    private int _entrySize;
    4242    private long _lastOverflow;
    4343   
    44     /** do we actually want to cache? */
     44    /** do we actually want to cache? Warning - setting to false may NPE, this should be fixed or removed */
    4545    private static final boolean _cache = true;
    4646   
     
    5252    private ByteCache(int maxCachedEntries, int entrySize) {
    5353        if (_cache)
    54             _available = new ArrayList(maxCachedEntries);
     54            _available = new LinkedBlockingQueue(maxCachedEntries);
    5555        _maxCached = maxCachedEntries;
    5656        _entrySize = entrySize;
     
    6363        if (_maxCached >= maxCachedEntries) return;
    6464        _maxCached = maxCachedEntries;
     65        // make a bigger one, move the cached items over
     66        Queue newLBQ = new LinkedBlockingQueue(maxCachedEntries);
     67        ByteArray ba;
     68        while ((ba = _available.poll()) != null)
     69            newLBQ.offer(ba);
     70        _available = newLBQ;
    6571    }
    6672   
     
    7177    public final ByteArray acquire() {
    7278        if (_cache) {
    73             synchronized (_available) {
    74                 if (_available.size() > 0)
    75                     return (ByteArray)_available.remove(0);
    76             }
     79            ByteArray rv = _available.poll();
     80            if (rv != null)
     81                return rv;
    7782        }
    7883        _lastOverflow = System.currentTimeMillis();
     
    101106            if (shouldZero)
    102107                Arrays.fill(entry.getData(), (byte)0x0);
    103             synchronized (_available) {
    104                 if (_available.size() < _maxCached)
    105                     _available.add(entry);
    106             }
     108            _available.offer(entry);
    107109        }
    108110    }
     
    113115                // we haven't exceeded the cache size in a few minutes, so lets
    114116                // shrink the cache
    115                 synchronized (_available) {
    116117                    int toRemove = _available.size() / 2;
    117118                    for (int i = 0; i < toRemove; i++)
    118                         _available.remove(0);
     119                        _available.poll();
    119120                    if ( (toRemove > 0) && (_log.shouldLog(Log.DEBUG)) )
    120121                        _log.debug("Removing " + toRemove + " cached entries of size " + _entrySize);
    121                 }
    122122            }
    123123        }
  • installer/install.xml

    r05597ae r617d1cd  
    1313        <!-- use pack200 compression, saves about 33%
    1414             see http://java.sun.com/j2se/1.5.0/docs/guide/deployment/deployment-guide/pack200.html
    15              However it makes the unpacked jars much larger...
    16              For further testing...
     15         -->
    1716             <pack200 />
    18          -->
    1917
    2018        <!-- adding this element will make the installer attempt to launch itself with administrator permissions,
  • router/java/src/net/i2p/router/Blocklist.java

    r05597ae r617d1cd  
    1212import java.net.InetAddress;
    1313import java.net.UnknownHostException;
    14 import java.util.*;
     14import java.util.ArrayList;
     15import java.util.Arrays;
     16import java.util.HashMap;
     17import java.util.HashSet;
     18import java.util.Iterator;
     19import java.util.List;
     20import java.util.Map;
     21import java.util.Properties;
     22import java.util.Set;
     23import java.util.TreeSet;
    1524
    1625import net.i2p.data.Base64;
     
    5766    private final Object _lock = new Object();
    5867    private Entry _wrapSave;
    59     private final Set<Hash> _inProcess = new HashSet(0);
    60     private Map<Hash, String> _peerBlocklist = new HashMap(0);
    61     private final Set<Integer> _singleIPBlocklist = new ConcurrentHashSet(0);
     68    private final Set<Hash> _inProcess = new HashSet(4);
     69    private Map<Hash, String> _peerBlocklist = new HashMap(4);
     70    private final Set<Integer> _singleIPBlocklist = new ConcurrentHashSet(4);
    6271   
    6372    public Blocklist(RouterContext context) {
     
    110119                }
    111120            }
    112             for (Iterator iter = _peerBlocklist.keySet().iterator(); iter.hasNext(); ) {
    113                 Hash peer = (Hash) iter.next();
     121            for (Iterator<Hash> iter = _peerBlocklist.keySet().iterator(); iter.hasNext(); ) {
     122                Hash peer = iter.next();
    114123                String reason;
    115124                String comment = (String) _peerBlocklist.get(peer);
     
    126135            FloodfillNetworkDatabaseFacade fndf = (FloodfillNetworkDatabaseFacade) _context.netDb();
    127136            int count = 0;
    128             for (Iterator iter = fndf.getKnownRouterData().iterator(); iter.hasNext(); ) {
    129                 RouterInfo ri = (RouterInfo) iter.next();
     137            for (Iterator<RouterInfo> iter = fndf.getKnownRouterData().iterator(); iter.hasNext(); ) {
     138                RouterInfo ri = iter.next();
    130139                Hash peer = ri.getIdentity().getHash();
    131140                if (isBlocklisted(peer))
     
    459468     * but I suppose it could.
    460469     */
    461     public List getAddresses(Hash peer) {
    462         List rv = new ArrayList(1);
     470    public List<byte[]> getAddresses(Hash peer) {
     471        List<byte[]> rv = new ArrayList(1);
    463472        RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
    464473        if (pinfo == null) return rv;
    465         Set paddr = pinfo.getAddresses();
     474        Set<RouterAddress> paddr = pinfo.getAddresses();
    466475        if (paddr == null || paddr.size() == 0)
    467476            return rv;
    468477        String oldphost = null;
    469         List pladdr = new ArrayList(paddr);
     478        List<RouterAddress> pladdr = new ArrayList(paddr);
    470479        // for each peer address
    471480        for (int j = 0; j < paddr.size(); j++) {
     
    496505     */
    497506    public boolean isBlocklisted(Hash peer) {
    498         List ips = getAddresses(peer);
    499         for (Iterator iter = ips.iterator(); iter.hasNext(); ) {
    500             byte ip[] = (byte[]) iter.next();
     507        List<byte[]> ips = getAddresses(peer);
     508        for (Iterator<byte[]> iter = ips.iterator(); iter.hasNext(); ) {
     509            byte ip[] = iter.next();
    501510            if (isBlocklisted(ip)) {
    502511                if (! _context.shitlist().isShitlisted(peer))
     
    716725        // look through the file for each address to find which one was the cause
    717726        List ips = getAddresses(peer);
    718         for (Iterator iter = ips.iterator(); iter.hasNext(); ) {
    719             byte ip[] = (byte[]) iter.next();
     727        for (Iterator<byte[]> iter = ips.iterator(); iter.hasNext(); ) {
     728            byte ip[] = iter.next();
    720729            int ipint = toInt(ip);
    721730            FileInputStream in = null;
     
    763772        // move to the jsp
    764773        //out.write("<h2>Banned IPs</h2>");
    765         Set singles = new TreeSet();
     774        Set<Integer> singles = new TreeSet();
    766775        singles.addAll(_singleIPBlocklist);
    767776        if (singles.size() > 0) {
    768777            out.write("<table><tr><td><b>Transient IPs</b></td></tr>");
    769             for (Iterator iter = singles.iterator(); iter.hasNext(); ) {
    770                  int ip = ((Integer) iter.next()).intValue();
     778            for (Iterator<Integer> iter = singles.iterator(); iter.hasNext(); ) {
     779                 int ip = iter.next().intValue();
    771780                 out.write("<tr><td align=right>"); out.write(toStr(ip)); out.write("</td></tr>\n");
    772781            }
  • router/java/src/net/i2p/router/CommSystemFacade.java

    r05597ae r617d1cd  
    1212import java.io.Writer;
    1313import java.util.Collections;
    14 import java.util.HashSet;
    1514import java.util.List;
    1615import java.util.Set;
     
    3130   
    3231    /** Create the set of RouterAddress structures based on the router's config */
    33     public Set createAddresses() { return new HashSet(); }
     32    public Set<RouterAddress> createAddresses() { return Collections.EMPTY_SET; }
    3433   
    3534    public int countActivePeers() { return 0; }
  • router/java/src/net/i2p/router/DummyPeerManagerFacade.java

    r05597ae r617d1cd  
    2525    public void restart() {}
    2626    public void renderStatusHTML(Writer out) { }   
    27     public List selectPeers(PeerSelectionCriteria criteria) { return null; }
    28     public List getPeersByCapability(char capability) { return null; }
     27    public List<Hash> selectPeers(PeerSelectionCriteria criteria) { return null; }
     28    public List<Hash> getPeersByCapability(char capability) { return null; }
    2929    public void setCapabilities(Hash peer, String caps) {}
    3030    public void removeCapabilities(Hash peer) {}
  • router/java/src/net/i2p/router/InNetMessagePool.java

    r05597ae r617d1cd  
    241241   
    242242    public int handleReplies(I2NPMessage messageBody) {
    243         List origMessages = _context.messageRegistry().getOriginalMessages(messageBody);
     243        List<OutNetMessage> origMessages = _context.messageRegistry().getOriginalMessages(messageBody);
    244244        if (_log.shouldLog(Log.DEBUG))
    245245            _log.debug("Original messages for inbound message: " + origMessages.size());
     
    251251
    252252        for (int i = 0; i < origMessages.size(); i++) {
    253             OutNetMessage omsg = (OutNetMessage)origMessages.get(i);
     253            OutNetMessage omsg = origMessages.get(i);
    254254            ReplyJob job = omsg.getOnReplyJob();
    255255            if (_log.shouldLog(Log.DEBUG))
  • router/java/src/net/i2p/router/Job.java

    r05597ae r617d1cd  
    2020    public String getName();
    2121    /** unique id */
    22     public int getJobId();
     22    public long getJobId();
    2323    /**
    2424     * Timing criteria for the task
  • router/java/src/net/i2p/router/JobImpl.java

    r05597ae r617d1cd  
    1616    private RouterContext _context;
    1717    private JobTiming _timing;
    18     private static int _idSrc = 0;
    19     private int _id;
     18    private static long _idSrc = 0;
     19    private long _id;
    2020    private Exception _addedBy;
    2121    private long _madeReadyOn;
     
    2929    }
    3030   
    31     public int getJobId() { return _id; }
     31    public long getJobId() { return _id; }
    3232    public JobTiming getTiming() { return _timing; }
    3333   
  • router/java/src/net/i2p/router/JobQueue.java

    r05597ae r617d1cd  
    1313import java.util.ArrayList;
    1414import java.util.Collections;
    15 import java.util.HashMap;
    1615import java.util.Iterator;
     16import java.util.List;
     17import java.util.Map;
    1718import java.util.SortedMap;
    1819import java.util.TreeMap;
     20import java.util.concurrent.ConcurrentHashMap;
     21import java.util.concurrent.BlockingQueue;
     22import java.util.concurrent.LinkedBlockingQueue;
    1923
    2024import net.i2p.data.DataHelper;
     
    3438   
    3539    /** Integer (runnerId) to JobQueueRunner for created runners */
    36     private final HashMap _queueRunners;
     40    private final Map<Integer, JobQueueRunner> _queueRunners;
    3741    /** a counter to identify a job runner */
    3842    private volatile static int _runnerId = 0;
    3943    /** list of jobs that are ready to run ASAP */
    40     private ArrayList _readyJobs;
     44    private BlockingQueue<Job> _readyJobs;
    4145    /** list of jobs that are scheduled for running in the future */
    42     private ArrayList _timedJobs;
     46    private List<Job> _timedJobs;
    4347    /** job name to JobStat for that job */
    44     private final SortedMap _jobStats;
     48    private final Map<String, JobStats> _jobStats;
    4549    /** how many job queue runners can go concurrently */
    4650    private int _maxRunners = 1;
     
    5357    private final Object _jobLock;
    5458   
     59    /** how many when we go parallel */
     60    private static final int RUNNERS = 4;
     61
    5562    /** default max # job queue runners operating */
    5663    private final static int DEFAULT_MAX_RUNNERS = 1;
    57     /** router.config parameter to override the max runners */
     64    /** router.config parameter to override the max runners @deprecated unimplemented */
    5865    private final static String PROP_MAX_RUNNERS = "router.maxJobRunners";
    5966   
     
    6471    private long _lagWarning = DEFAULT_LAG_WARNING;
    6572    private final static long DEFAULT_LAG_WARNING = 5*1000;
     73    /** @deprecated unimplemented */
    6674    private final static String PROP_LAG_WARNING = "router.jobLagWarning";
    6775   
    68     /** if a job is this lagged, the router is hosed, so shut it down */
     76    /** if a job is this lagged, the router is hosed, so spit out a warning (dont shut it down) */
    6977    private long _lagFatal = DEFAULT_LAG_FATAL;
    7078    private final static long DEFAULT_LAG_FATAL = 30*1000;
     79    /** @deprecated unimplemented */
    7180    private final static String PROP_LAG_FATAL = "router.jobLagFatal";
    7281   
     
    7483    private long _runWarning = DEFAULT_RUN_WARNING;
    7584    private final static long DEFAULT_RUN_WARNING = 5*1000;
     85    /** @deprecated unimplemented */
    7686    private final static String PROP_RUN_WARNING = "router.jobRunWarning";
    7787   
    78     /** if a job takes this long to run, the router is hosed, so shut it down */
     88    /** if a job takes this long to run, the router is hosed, so spit out a warning (dont shut it down) */
    7989    private long _runFatal = DEFAULT_RUN_FATAL;
    8090    private final static long DEFAULT_RUN_FATAL = 30*1000;
     91    /** @deprecated unimplemented */
    8192    private final static String PROP_RUN_FATAL = "router.jobRunFatal";
    8293   
     
    8495    private long _warmupTime = DEFAULT_WARMUP_TIME;
    8596    private final static long DEFAULT_WARMUP_TIME = 10*60*1000;
    86     private final static String PROP_WARMUM_TIME = "router.jobWarmupTime";
     97    /** @deprecated unimplemented */
     98    private final static String PROP_WARMUP_TIME = "router.jobWarmupTime";
    8799   
    88100    /** max ready and waiting jobs before we start dropping 'em */
    89101    private int _maxWaitingJobs = DEFAULT_MAX_WAITING_JOBS;
    90102    private final static int DEFAULT_MAX_WAITING_JOBS = 100;
     103    /** @deprecated unimplemented */
    91104    private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs";
    92    
     105
    93106    /**
    94107     * queue runners wait on this whenever they're not doing anything, and
     
    110123
    111124        _alive = true;
    112         _readyJobs = new ArrayList(16);
     125        _readyJobs = new LinkedBlockingQueue();
    113126        _timedJobs = new ArrayList(64);
    114127        _jobLock = new Object();
    115         _queueRunners = new HashMap();
    116         _jobStats = Collections.synchronizedSortedMap(new TreeMap());
     128        _queueRunners = new ConcurrentHashMap(RUNNERS);
     129        _jobStats = new ConcurrentHashMap();
    117130        _allowParallelOperation = false;
    118131        _pumper = new QueuePumper();
    119         I2PThread pumperThread = new I2PThread(_pumper);
    120         pumperThread.setDaemon(true);
    121         pumperThread.setName("QueuePumper");
     132        I2PThread pumperThread = new I2PThread(_pumper, "Job Queue Pumper", true);
    122133        //pumperThread.setPriority(I2PThread.NORM_PRIORITY+1);
    123134        pumperThread.start();
     
    129140     */
    130141    public void addJob(Job job) {
    131         if (job == null) return;
     142        if (job == null || !_alive) return;
    132143
    133144        if (job instanceof JobImpl)
     
    137148        boolean alreadyExists = false;
    138149        boolean dropped = false;
     150        // getNext() is now outside the jobLock, is that ok?
    139151        synchronized (_jobLock) {
    140152            if (_readyJobs.contains(job))
     
    156168                        if (job instanceof JobImpl)
    157169                            ((JobImpl)job).madeReady();
    158                         _readyJobs.add(job);
     170                        _readyJobs.offer(job);
    159171                    } else {
    160172                        _timedJobs.add(job);
     
    168180        if (dropped) {
    169181            _context.statManager().addRateData("jobQueue.droppedJobs", 1, 1);
    170            if (_log.shouldLog(Log.WARN))
    171                 _log.warn("Dropping job due to overload!  # ready jobs: "
     182            if (_log.shouldLog(Log.ERROR))
     183                _log.error("Dropping job due to overload!  # ready jobs: "
    172184                          + numReady + ": job = " + job);
    173185        }
    174 
    175         return;
    176186    }
    177187   
     
    190200   
    191201    public int getReadyCount() {
    192         synchronized (_jobLock) {
    193202            return _readyJobs.size();
    194         }
    195     }
     203    }
     204
    196205    public long getMaxLag() {
    197         synchronized (_jobLock) {
    198             if (_readyJobs.size() <= 0) return 0;
     206            Job j = _readyJobs.peek();
     207            if (j == null) return 0;
    199208            // first job is the one that has been waiting the longest
    200             long startAfter = ((Job)_readyJobs.get(0)).getTiming().getStartAfter();
     209            long startAfter = j.getTiming().getStartAfter();
    201210            return _context.clock().now() - startAfter;
    202         }
    203211    }
    204212   
     
    229237    public void allowParallelOperation() {
    230238        _allowParallelOperation = true;
    231         runQueue(4);
    232     }
    233    
     239        runQueue(RUNNERS);
     240    }
     241   
     242    /** @deprecated do you really want to do this? */
    234243    public void restart() {
    235244        synchronized (_jobLock) {
     
    242251    void shutdown() {
    243252        _alive = false;
    244         synchronized (_jobLock) {
    245             _jobLock.notifyAll();
    246         }
     253        _timedJobs.clear();
     254        _readyJobs.clear();
     255        // The JobQueueRunners are NOT daemons,
     256        // so they must be stopped.
     257        Job poison = new PoisonJob();
     258        for (int i = 0; i < _queueRunners.size(); i++)
     259            _readyJobs.offer(poison);
     260
     261
     262      /********
    247263        if (_log.shouldLog(Log.WARN)) {
    248264            StringBuilder buf = new StringBuilder(1024);
    249265            buf.append("current jobs: \n");
    250266            for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
    251                 JobQueueRunner runner = (JobQueueRunner)iter.next();
     267                JobQueueRunner runner = iter.next();
    252268                Job j = runner.getCurrentJob();
    253269
     
    280296            _log.log(Log.WARN, buf.toString());
    281297        }
    282     }
     298      ********/
     299    }
     300
    283301    boolean isAlive() { return _alive; }
    284302   
     
    288306    public long getLastJobBegin() {
    289307        long when = -1;
    290         // not synchronized, so might b0rk if the runners are changed
    291         for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
    292             long cur = ((JobQueueRunner)iter.next()).getLastBegin();
     308        for (JobQueueRunner runner : _queueRunners.values()) {
     309            long cur = runner.getLastBegin();
    293310            if (cur > when)
    294311                cur = when;
     
    301318    public long getLastJobEnd() {
    302319        long when = -1;
    303         // not synchronized, so might b0rk if the runners are changed
    304         for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
    305             long cur = ((JobQueueRunner)iter.next()).getLastEnd();
     320        for (JobQueueRunner runner : _queueRunners.values()) {
     321            long cur = runner.getLastEnd();
    306322            if (cur > when)
    307323                cur = when;
     
    316332        Job j = null;
    317333        long when = -1;
    318         // not synchronized, so might b0rk if the runners are changed
    319         for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
    320             JobQueueRunner cur = (JobQueueRunner)iter.next();
     334        for (JobQueueRunner cur : _queueRunners.values()) {
    321335            if (cur.getLastBegin() > when) {
    322336                j = cur.getCurrentJob();
     
    334348        while (_alive) {
    335349            try {
    336                 synchronized (_jobLock) {
    337                     if (_readyJobs.size() > 0) {
    338                         return (Job)_readyJobs.remove(0);
    339                     } else {
    340                         _jobLock.wait();
    341                     }
    342                 }
     350                Job j = _readyJobs.take();
     351                if (j.getJobId() == POISON_ID)
     352                    break;
     353                return j;
    343354            } catch (InterruptedException ie) {}
    344355        }
     
    356367     *
    357368     */
    358     public void runQueue(int numThreads) {
    359         synchronized (_queueRunners) {
     369    public synchronized void runQueue(int numThreads) {
    360370            // we're still starting up [serially] and we've got at least one runner,
    361371            // so dont do anything
     
    378388                }
    379389            } else if (_queueRunners.size() == numThreads) {
    380                 for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
    381                     JobQueueRunner runner = (JobQueueRunner)iter.next();
     390                for (JobQueueRunner runner : _queueRunners.values()) {
    382391                    runner.startRunning();
    383392                }
     
    388397                //}
    389398            }
    390         }
    391399    }
    392400       
     
    408416                    long now = _context.clock().now();
    409417                    long timeToWait = -1;
    410                     ArrayList toAdd = null;
     418                    List<Job> toAdd = null;
    411419                    try {
    412420                        synchronized (_jobLock) {
    413421                            for (int i = 0; i < _timedJobs.size(); i++) {
    414                                 Job j = (Job)_timedJobs.get(i);
     422                                Job j = _timedJobs.get(i);
    415423                                // find jobs due to start before now
    416424                                long timeLeft = j.getTiming().getStartAfter() - now;
     
    438446                                // on some profiling data ;)
    439447                                for (int i = 0; i < toAdd.size(); i++)
    440                                     _readyJobs.add(toAdd.get(i));
     448                                    _readyJobs.offer(toAdd.get(i));
    441449                                _jobLock.notifyAll();
    442450                            } else {
     
    477485        synchronized (_jobLock) {
    478486            for (int i = 0; i < _timedJobs.size(); i++) {
    479                 Job j = (Job)_timedJobs.get(i);
     487                Job j = _timedJobs.get(i);
    480488                j.getTiming().offsetChanged(delta);
    481489            }
    482             for (int i = 0; i < _readyJobs.size(); i++) {
    483                 Job j = (Job)_readyJobs.get(i);
     490            for (Job j : _readyJobs) {
    484491                j.getTiming().offsetChanged(delta);
    485492            }
    486493        }
    487494        synchronized (_runnerLock) {
    488             for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) {
    489                 JobQueueRunner runner = (JobQueueRunner)iter.next();
     495            for (JobQueueRunner runner : _queueRunners.values()) {
    490496                Job job = runner.getCurrentJob();
    491497                if (job != null)
     
    510516        if (duration < 0) duration = 0;
    511517       
    512         JobStats stats = null;
    513         if (!_jobStats.containsKey(key)) {
    514             _jobStats.put(key, new JobStats(key));
     518        JobStats stats = _jobStats.get(key);
     519        if (stats == null) {
     520            stats = new JobStats(key);
     521            _jobStats.put(key, stats);
    515522            // yes, if two runners finish the same job at the same time, this could
    516523            // create an extra object.  but, who cares, its pushed out of the map
    517524            // immediately anyway.
    518525        }
    519         stats = (JobStats)_jobStats.get(key);
    520526        stats.jobRan(duration, lag);
    521527
     
    556562   
    557563       
     564    /** job ID counter changed from int to long so it won't wrap negative */
     565    private static final int POISON_ID = -99999;
     566
     567    private static class PoisonJob implements Job {
     568        public String getName() { return null; }
     569        public long getJobId() { return POISON_ID; }
     570        public JobTiming getTiming() { return null; }
     571        public void runJob() {}
     572        public Exception getAddedBy() { return null; }
     573        public void dropped() {}
     574    }
     575
    558576    ////
    559577    // the remainder are utility methods for dumping status info
     
    561579   
    562580    public void renderStatusHTML(Writer out) throws IOException {
    563         ArrayList readyJobs = null;
    564         ArrayList timedJobs = null;
    565         ArrayList activeJobs = new ArrayList(1);
    566         ArrayList justFinishedJobs = new ArrayList(4);
     581        List<Job> readyJobs = null;
     582        List<Job> timedJobs = null;
     583        List<Job> activeJobs = new ArrayList(RUNNERS);
     584        List<Job> justFinishedJobs = new ArrayList(RUNNERS);
    567585        //out.write("<!-- jobQueue rendering -->\n");
    568586        out.flush();
    569587       
    570         int states[] = null;
     588        //int states[] = null;
    571589        int numRunners = 0;
    572         synchronized (_queueRunners) {
    573             states = new int[_queueRunners.size()];
     590
     591        {
     592            //states = new int[_queueRunners.size()];
    574593            int i = 0;
    575             for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); i++) {
    576                 JobQueueRunner runner = (JobQueueRunner)iter.next();
    577                 states[i] = runner.getState();
     594            for (Iterator<JobQueueRunner> iter = _queueRunners.values().iterator(); iter.hasNext(); i++) {
     595                JobQueueRunner runner = iter.next();
     596                //states[i] = runner.getState();
    578597                Job job = runner.getCurrentJob();
    579598                if (job != null) {
     
    622641        buf.append("<hr><b>Active jobs: ").append(activeJobs.size()).append("</b><ol>\n");
    623642        for (int i = 0; i < activeJobs.size(); i++) {
    624             Job j = (Job)activeJobs.get(i);
     643            Job j = activeJobs.get(i);
    625644            buf.append("<li>[started ").append(DataHelper.formatDuration(now-j.getTiming().getStartAfter())).append(" ago]: ");
    626645            buf.append(j.toString()).append("</li>\n");
     
    629648        buf.append("<hr><b>Just finished jobs: ").append(justFinishedJobs.size()).append("</b><ol>\n");
    630649        for (int i = 0; i < justFinishedJobs.size(); i++) {
    631             Job j = (Job)justFinishedJobs.get(i);
     650            Job j = justFinishedJobs.get(i);
    632651            buf.append("<li>[finished ").append(DataHelper.formatDuration(now-j.getTiming().getActualEnd())).append(" ago]: ");
    633652            buf.append(j.toString()).append("</li>\n");
     
    636655        buf.append("<hr><b>Ready/waiting jobs: ").append(readyJobs.size()).append("</b><ol>\n");
    637656        for (int i = 0; i < readyJobs.size(); i++) {
    638             Job j = (Job)readyJobs.get(i);
     657            Job j = readyJobs.get(i);
    639658            buf.append("<li>[waiting ");
    640659            buf.append(DataHelper.formatDuration(now-j.getTiming().getStartAfter()));
     
    646665
    647666        buf.append("<hr><b>Scheduled jobs: ").append(timedJobs.size()).append("</b><ol>\n");
    648         TreeMap ordered = new TreeMap();
     667        TreeMap<Long, Job> ordered = new TreeMap();
    649668        for (int i = 0; i < timedJobs.size(); i++) {
    650             Job j = (Job)timedJobs.get(i);
     669            Job j = timedJobs.get(i);
    651670            ordered.put(new Long(j.getTiming().getStartAfter()), j);
    652671        }
    653         for (Iterator iter = ordered.values().iterator(); iter.hasNext(); ) {
    654             Job j = (Job)iter.next();
     672        for (Iterator<Job> iter = ordered.values().iterator(); iter.hasNext(); ) {
     673            Job j = iter.next();
    655674            long time = j.getTiming().getStartAfter() - now;
    656675            buf.append("<li>").append(j.getName()).append(" in ");
     
    686705        long minPendingTime = -1;
    687706
    688         TreeMap tstats = null;
    689         synchronized (_jobStats) {
    690             tstats = new TreeMap(_jobStats);
    691         }
    692        
    693         for (Iterator iter = tstats.values().iterator(); iter.hasNext(); ) {
    694             JobStats stats = (JobStats)iter.next();
     707        TreeMap<String, JobStats> tstats = new TreeMap(_jobStats);
     708       
     709        for (Iterator<JobStats> iter = tstats.values().iterator(); iter.hasNext(); ) {
     710            JobStats stats = iter.next();
    695711            buf.append("<tr>");
    696712            buf.append("<td><b>").append(stats.getName()).append("</b></td>");
  • router/java/src/net/i2p/router/JobQueueRunner.java

    r05597ae r617d1cd  
    2424        _lastJob = null;
    2525        _log = _context.logManager().getLog(JobQueueRunner.class);
    26         _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
    27         _context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l });
     26        _context.statManager().createRateStat("jobQueue.jobRun", "How long jobs take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
     27        _context.statManager().createRateStat("jobQueue.jobRunSlow", "How long jobs that take over a second take", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
    2828        _context.statManager().createRateStat("jobQueue.jobLag", "How long jobs have to wait before running", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
    29         _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
    30         _context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
    31         _state = 1;
     29        _context.statManager().createRateStat("jobQueue.jobWait", "How long does a job sit on the job queue?", "JobQueue", new long[] { 60*60*1000l, 24*60*60*1000l });
     30        //_context.statManager().createRateStat("jobQueue.jobRunnerInactive", "How long are runners inactive?", "JobQueue", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l });
     31        //_state = 1;
    3232    }
    3333   
     
    4242    public long getLastEnd() { return _lastEnd; }
    4343    public void run() {
    44         _state = 2;
     44        //_state = 2;
    4545        long lastActive = _context.clock().now();
    4646        long jobNum = 0;
    4747        while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) {
    48             _state = 3;
     48            //_state = 3;
    4949            try {
    5050                Job job = _context.jobQueue().getNext();
    51                 _state = 4;
     51                //_state = 4;
    5252                if (job == null) {
    53                     _state = 5;
     53                    //_state = 5;
    5454                    if (_context.router().isAlive())
    5555                        if (_log.shouldLog(Log.ERROR))
     
    6161                long enqueuedTime = 0;
    6262                if (job instanceof JobImpl) {
    63                     _state = 6;
     63                    //_state = 6;
    6464                    long when = ((JobImpl)job).getMadeReadyOn();
    6565                    if (when <= 0) {
    66                         _state = 7;
     66                        //_state = 7;
    6767                        _log.error("Job was not made ready?! " + job,
    6868                                   new Exception("Not made ready?!"));
    6969                    } else {
    70                         _state = 8;
     70                        //_state = 8;
    7171                        enqueuedTime = now - when;
    7272                    }
     
    7676                _currentJob = job;
    7777                _lastJob = null;
    78                 _state = 9;
     78                //_state = 9;
    7979                if (_log.shouldLog(Log.DEBUG))
    8080                    _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
    8181                long origStartAfter = job.getTiming().getStartAfter();
    8282                long doStart = _context.clock().now();
    83                 _state = 10;
     83                //_state = 10;
    8484                job.getTiming().start();
    8585                runCurrentJob();
    8686                job.getTiming().end();
    87                 _state = 11;
     87                //_state = 11;
    8888                long duration = job.getTiming().getActualEnd() - job.getTiming().getActualStart();
    8989                long beforeUpdate = _context.clock().now();
    90                 _state = 12;
     90                //_state = 12;
    9191                _context.jobQueue().updateStats(job, doStart, origStartAfter, duration);
    92                 _state = 13;
     92                //_state = 13;
    9393                long diff = _context.clock().now() - beforeUpdate;
    9494
     
    9696                if (lag < 0) lag = 0;
    9797               
    98                 _context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
     98                //_context.statManager().addRateData("jobQueue.jobRunnerInactive", betweenJobs, betweenJobs);
    9999                _context.statManager().addRateData("jobQueue.jobRun", duration, duration);
    100100                _context.statManager().addRateData("jobQueue.jobLag", lag, 0);
     
    108108                }
    109109               
    110                 _state = 14;
     110                //_state = 14;
    111111               
    112112                if (diff > 100) {
     
    122122                _lastEnd = lastActive;
    123123                jobNum++;
    124                 _state = 15;
     124                //_state = 15;
    125125               
    126126                //if ( (jobNum % 10) == 0)
     
    131131            }
    132132        }
    133         _state = 16;
     133        //_state = 16;
    134134        if (_context.router().isAlive())
    135135            if (_log.shouldLog(Log.CRIT))
    136136                _log.log(Log.CRIT, "Queue runner " + _id + " exiting");
    137137        _context.jobQueue().removeRunner(_id);
    138         _state = 17;
     138        //_state = 17;
    139139    }
    140140   
    141141    private void runCurrentJob() {
    142142        try {
    143             _state = 18;
     143            //_state = 18;
    144144            _lastBegin = _context.clock().now();
    145145            _currentJob.runJob();
    146             _state = 19;
     146            //_state = 19;
    147147        } catch (OutOfMemoryError oom) {
    148             _state = 20;
     148            //_state = 20;
    149149            try {
    150150                if (_log.shouldLog(Log.CRIT))
     
    158158            System.exit(-1);
    159159        } catch (Throwable t) {
    160             _state = 21;
     160            //_state = 21;
    161161            if (_log.shouldLog(Log.CRIT))
    162162                _log.log(Log.CRIT, "Error processing job [" + _currentJob.getName()
  • router/java/src/net/i2p/router/KeyManager.java

    r05597ae r617d1cd  
    1313import java.io.FileOutputStream;
    1414import java.io.IOException;
    15 import java.util.HashMap;
    1615import java.util.HashSet;
    1716import java.util.Map;
    1817import java.util.Set;
     18import java.util.concurrent.ConcurrentHashMap;
    1919
    2020import net.i2p.data.DataFormatException;
     
    4242    private SigningPrivateKey _signingPrivateKey;
    4343    private SigningPublicKey _signingPublicKey;
    44     private final Map _leaseSetKeys; // Destination --> LeaseSetKeys
     44    private final Map<Hash, LeaseSetKeys> _leaseSetKeys; // Destination --> LeaseSetKeys
    4545    private SynchronizeKeysJob _synchronizeJob;
    4646   
     
    6464        setSigningPrivateKey(null);
    6565        setSigningPublicKey(null);
    66         _leaseSetKeys = new HashMap();
     66        _leaseSetKeys = new ConcurrentHashMap();
    6767    }
    6868   
     
    103103        _log.info("Registering keys for destination " + dest.calculateHash().toBase64());
    104104        LeaseSetKeys keys = new LeaseSetKeys(dest, leaseRevocationPrivateKey, endpointDecryptionKey);
    105         synchronized (_leaseSetKeys) {
    106             _leaseSetKeys.put(dest.calculateHash(), keys);
    107         }
     105        _leaseSetKeys.put(dest.calculateHash(), keys);
    108106    }
    109107   
     
    119117        if (_log.shouldLog(Log.INFO))
    120118            _log.info("Unregistering keys for destination " + dest.calculateHash().toBase64());
    121         LeaseSetKeys rv = null;
    122         synchronized (_leaseSetKeys) {
    123             rv = (LeaseSetKeys)_leaseSetKeys.remove(dest.calculateHash());
    124         }
    125         return rv;
     119        return _leaseSetKeys.remove(dest.calculateHash());
    126120    }
    127121   
     
    130124    }
    131125    public LeaseSetKeys getKeys(Hash dest) {
    132         synchronized (_leaseSetKeys) {
    133             return (LeaseSetKeys)_leaseSetKeys.get(dest);
    134         }
    135     }
    136    
    137     public Set getAllKeys() {
     126            return _leaseSetKeys.get(dest);
     127    }
     128   
     129    public Set<LeaseSetKeys> getAllKeys() {
    138130        HashSet keys = new HashSet();
    139         synchronized (_leaseSetKeys) {
    140             keys.addAll(_leaseSetKeys.values());
    141         }
     131        keys.addAll(_leaseSetKeys.values());
    142132        return keys;
    143133    }
  • router/java/src/net/i2p/router/OutNetMessage.java

    r05597ae r617d1cd  
    4747    private Job _onFailedReply;
    4848    private MessageSelector _replySelector;
    49     private Set _failedTransports;
     49    private Set<String> _failedTransports;
    5050    private long _sendBegin;
    5151    private long _transmitBegin;
     
    5353    private long _created;
    5454    /** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
    55     private HashMap _timestamps;
     55    private HashMap<String, Long> _timestamps;
    5656    /**
    5757     * contains a list of timestamp event names in the order they were fired
    5858     * (some JVMs have less than 10ms resolution, so the Long above doesn't guarantee order)
    5959     */
    60     private List _timestampOrder;
     60    private List<String> _timestampOrder;
    6161    private int _queueSize;
    6262    private long _prepareBegin;
     
    109109        return now - _created;
    110110    }
    111     public Map getTimestamps() {
     111    public Map<String, Long> getTimestamps() {
    112112        if (_log.shouldLog(Log.INFO)) {
    113113            synchronized (this) {
    114114                locked_initTimestamps();
    115                 return (Map)_timestamps.clone();
     115                return (Map<String, Long>)_timestamps.clone();
    116116            }
    117117        }
     
    122122            synchronized (this) {
    123123                locked_initTimestamps();
    124                 return (Long)_timestamps.get(eventName);
     124                return _timestamps.get(eventName);
    125125            }
    126126        }
     
    340340                long lastWhen = -1;
    341341                for (int i = 0; i < _timestampOrder.size(); i++) {
    342                     String name = (String)_timestampOrder.get(i);
    343                     Long when = (Long)_timestamps.get(name);
     342                    String name = _timestampOrder.get(i);
     343                    Long when = _timestamps.get(name);
    344344                    buf.append("\t[");
    345345                    long diff = when.longValue() - lastWhen;
  • router/java/src/net/i2p/router/PeerManagerFacade.java

    r05597ae r617d1cd  
    2626     * @return List of Hash objects of the RouterIdentity for matching peers
    2727     */
    28     public List selectPeers(PeerSelectionCriteria criteria);
    29     public List getPeersByCapability(char capability);
     28    public List<Hash> selectPeers(PeerSelectionCriteria criteria);
     29    public List<Hash> getPeersByCapability(char capability);
    3030    public void setCapabilities(Hash peer, String caps);
    3131    public void removeCapabilities(Hash peer);
  • router/java/src/net/i2p/router/RouterThrottleImpl.java

    r05597ae r617d1cd  
    4545        setTunnelStatus();
    4646        _context.statManager().createRateStat("router.throttleNetworkCause", "How lagged the jobQueue was when an I2NP was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
    47         _context.statManager().createRateStat("router.throttleNetDbCause", "How lagged the jobQueue was when a networkDb request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
    48         _context.statManager().createRateStat("router.throttleTunnelCause", "How lagged the jobQueue was when a tunnel request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
     47        //_context.statManager().createRateStat("router.throttleNetDbCause", "How lagged the jobQueue was when a networkDb request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
     48        //_context.statManager().createRateStat("router.throttleTunnelCause", "How lagged the jobQueue was when a tunnel request was throttled", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
    4949        _context.statManager().createRateStat("tunnel.bytesAllocatedAtAccept", "How many bytes had been 'allocated' for participating tunnels when we accepted a request?", "Tunnels", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    5050        _context.statManager().createRateStat("router.throttleTunnelProcessingTime1m", "How long it takes to process a message (1 minute average) when we throttle a tunnel?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
     
    5252        _context.statManager().createRateStat("router.throttleTunnelMaxExceeded", "How many tunnels we are participating in when we refuse one due to excees?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    5353        _context.statManager().createRateStat("router.throttleTunnelProbTooFast", "How many tunnels beyond the previous 1h average are we participating in when we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    54         _context.statManager().createRateStat("router.throttleTunnelProbTestSlow", "How slow are our tunnel tests when our average exceeds the old average and we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
     54        //_context.statManager().createRateStat("router.throttleTunnelProbTestSlow", "How slow are our tunnel tests when our average exceeds the old average and we throttle?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    5555        _context.statManager().createRateStat("router.throttleTunnelBandwidthExceeded", "How much bandwidth is allocated when we refuse due to bandwidth allocation?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    5656        _context.statManager().createRateStat("router.throttleTunnelBytesAllowed", "How many bytes are allowed to be sent when we get a tunnel request (period is how many are currently allocated)?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    5757        _context.statManager().createRateStat("router.throttleTunnelBytesUsed", "Used Bps at request (period = max KBps)?", "Throttle", new long[] { 10*60*1000, 60*60*1000, 24*60*60*1000 });
    5858        _context.statManager().createRateStat("router.throttleTunnelFailCount1m", "How many messages failed to be sent in the last 2 minutes when we throttle based on a spike in failures (period = 10 minute average failure count)?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000});
    59         _context.statManager().createRateStat("router.throttleTunnelQueueOverload", "How many pending tunnel request messages have we received when we reject them due to overload (period = time to process each)?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000});
     59        //_context.statManager().createRateStat("router.throttleTunnelQueueOverload", "How many pending tunnel request messages have we received when we reject them due to overload (period = time to process each)?", "Throttle", new long[] { 60*1000, 10*60*1000, 60*60*1000});
    6060    }
    6161   
     
    6464        long lag = _context.jobQueue().getMaxLag();
    6565        if ( (lag > JOB_LAG_LIMIT) && (_context.router().getUptime() > 60*1000) ) {
    66             if (_log.shouldLog(Log.DEBUG))
    67                 _log.debug("Throttling network reader, as the job lag is " + lag);
     66            if (_log.shouldLog(Log.WARN))
     67                _log.warn("Throttling network reader, as the job lag is " + lag);
    6868            _context.statManager().addRateData("router.throttleNetworkCause", lag, lag);
    6969            return false;
  • router/java/src/net/i2p/router/peermanager/PeerManager.java

    r05597ae r617d1cd  
    1212import java.io.Writer;
    1313import java.util.ArrayList;
    14 import java.util.HashMap;
    1514import java.util.HashSet;
    1615import java.util.Iterator;
     
    1817import java.util.Map;
    1918import java.util.Set;
     19import java.util.concurrent.ConcurrentHashMap;
    2020
    2121import net.i2p.data.Hash;
     
    2727import net.i2p.util.SimpleScheduler;
    2828import net.i2p.util.SimpleTimer;
     29import net.i2p.util.ConcurrentHashSet;
    2930
    3031/**
     
    4748    private ProfileOrganizer _organizer;
    4849    private ProfilePersistenceHelper _persistenceHelper;
    49     private List _peersByCapability[];
    50     private final Map _capabilitiesByPeer;
     50    private Set<Hash> _peersByCapability[];
     51    private final Map<Hash, String> _capabilitiesByPeer;
    5152   
    5253    public PeerManager(RouterContext context) {
     
    5657        _organizer = context.profileOrganizer();
    5758        _organizer.setUs(context.routerHash());
    58         _capabilitiesByPeer = new HashMap(128);
    59         _peersByCapability = new List[26];
     59        _capabilitiesByPeer = new ConcurrentHashMap(128);
     60        _peersByCapability = new Set[26];
    6061        for (int i = 0; i < _peersByCapability.length; i++)
    61             _peersByCapability[i] = new ArrayList(64);
     62            _peersByCapability[i] = new ConcurrentHashSet();
    6263        loadProfiles();
    6364        ////_context.jobQueue().addJob(new EvaluateProfilesJob(_context));
     
    7879    void storeProfiles() {
    7980        Set peers = selectPeers();
    80         for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
    81             Hash peer = (Hash)iter.next();
     81        for (Iterator<Hash> iter = peers.iterator(); iter.hasNext(); ) {
     82            Hash peer = iter.next();
    8283            storeProfile(peer);
    8384        }
    8485    }
     86
    8587    Set selectPeers() {
    8688        return _organizer.selectAllPeers();
    8789    }
     90
    8891    void storeProfile(Hash peer) {
    8992        if (peer == null) return;
     
    9396            _persistenceHelper.writeProfile(prof);
    9497    }
     98
    9599    void loadProfiles() {
    96         Set profiles = _persistenceHelper.readProfiles();
    97         for (Iterator iter = profiles.iterator(); iter.hasNext();) {
    98             PeerProfile prof = (PeerProfile)iter.next();
     100        Set<PeerProfile> profiles = _persistenceHelper.readProfiles();
     101        for (Iterator<PeerProfile> iter = profiles.iterator(); iter.hasNext();) {
     102            PeerProfile prof = iter.next();
    99103            if (prof != null) {
    100104                _organizer.addProfile(prof);
     
    108112     * Find some peers that meet the criteria and we have the netDb info for locally
    109113     *
     114     * Only used by PeerTestJob (PURPOSE_TEST)
    110115     */
    111     List selectPeers(PeerSelectionCriteria criteria) {
    112         Set peers = new HashSet(criteria.getMinimumRequired());
    113         Set exclude = new HashSet(1);
     116    List<Hash> selectPeers(PeerSelectionCriteria criteria) {
     117        Set<Hash> peers = new HashSet(criteria.getMinimumRequired());
     118        Set<Hash> exclude = new HashSet(1);
    114119        exclude.add(_context.routerHash());
    115120        switch (criteria.getPurpose()) {
     
    144149                break;
    145150        }
    146         if (peers.size() <= 0) {
     151        if (peers.isEmpty()) {
    147152            if (_log.shouldLog(Log.WARN))
    148153                _log.warn("We ran out of peers when looking for reachable ones after finding "
    149                           + peers.size() + " with "
     154                          + "0 with "
    150155                          + _organizer.countWellIntegratedPeers() + "/"
    151156                          + _organizer.countHighCapacityPeers() + "/"
     
    161166            _log.debug("Setting capabilities for " + peer.toBase64() + " to " + caps);
    162167        if (caps != null) caps = caps.toLowerCase();
    163         synchronized (_capabilitiesByPeer) {
     168
    164169            String oldCaps = null;
    165170            if (caps != null)
    166                 oldCaps = (String)_capabilitiesByPeer.put(peer, caps);
     171                oldCaps = _capabilitiesByPeer.put(peer, caps);
    167172            else
    168                 oldCaps = (String)_capabilitiesByPeer.remove(peer);
     173                oldCaps = _capabilitiesByPeer.remove(peer);
    169174           
    170175            if (oldCaps != null) {
     
    172177                    char c = oldCaps.charAt(i);
    173178                    if ( (caps == null) || (caps.indexOf(c) < 0) ) {
    174                         List peers = locked_getPeers(c);
     179                        Set<Hash> peers = locked_getPeers(c);
    175180                        if (peers != null)
    176181                            peers.remove(peer);
     
    183188                    if ( (oldCaps != null) && (oldCaps.indexOf(c) >= 0) )
    184189                        continue;
    185                     List peers = locked_getPeers(c);
    186                     if ( (peers != null) && (!peers.contains(peer)) )
     190                    Set<Hash> peers = locked_getPeers(c);
     191                    if (peers != null)
    187192                        peers.add(peer);
    188193                }
    189194            }
    190         }
    191     }
    192    
    193     private List locked_getPeers(char c) {
     195    }
     196   
     197    /** locking no longer req'd */
     198    private Set<Hash> locked_getPeers(char c) {
    194199        c = Character.toLowerCase(c);
    195200        int i = c - 'a';
     
    205210        if (_log.shouldLog(Log.DEBUG))
    206211            _log.debug("Removing capabilities from " + peer.toBase64());
    207         synchronized (_capabilitiesByPeer) {
     212
    208213            String oldCaps = (String)_capabilitiesByPeer.remove(peer);
    209214            if (oldCaps != null) {
    210215                for (int i = 0; i < oldCaps.length(); i++) {
    211216                    char c = oldCaps.charAt(i);
    212                     List peers = locked_getPeers(c);
     217                    Set<Hash> peers = locked_getPeers(c);
    213218                    if (peers != null)
    214219                        peers.remove(peer);
    215220                }
    216221            }
    217         }
    218     }
     222    }
     223
     224/*******
    219225    public Hash selectRandomByCapability(char capability) {
    220226        int index = _context.random().nextInt(Integer.MAX_VALUE);
     
    228234        return null;
    229235    }
    230     public List getPeersByCapability(char capability) {
    231         if (false) {
    232             synchronized (_capabilitiesByPeer) {
    233                 List peers = locked_getPeers(capability);
    234                 if (peers != null)
    235                     return new ArrayList(peers);
    236             }
     236********/
     237
     238    /**
     239     *  The only user of this is TunnelPeerSelector for unreachables?
     240     */
     241    public List<Hash> getPeersByCapability(char capability) {
     242        if (true) {
     243            Set<Hash> peers = locked_getPeers(capability);
     244            if (peers != null)
     245                return new ArrayList(peers);
    237246            return null;
    238247        } else {
     248            // Wow this looks really slow...
     249            // What is the point of keeping all the data structures above
     250            // if we are going to go through the whole netdb anyway?
     251            // Not sure why jrandom switched to do it this way,
     252            // the checkin comments aren't clear...
     253            // Since the locking is gone, switch back to the above.
    239254            FloodfillNetworkDatabaseFacade f = (FloodfillNetworkDatabaseFacade)_context.netDb();
    240             List routerInfos = f.getKnownRouterData();
    241             List rv = new ArrayList();
    242             for (Iterator iter = routerInfos.iterator(); iter.hasNext(); ) {
    243                 RouterInfo ri = (RouterInfo)iter.next();
     255            List<RouterInfo> routerInfos = f.getKnownRouterData();
     256            List<Hash> rv = new ArrayList();
     257            for (Iterator<RouterInfo> iter = routerInfos.iterator(); iter.hasNext(); ) {
     258                RouterInfo ri = iter.next();
    244259                String caps = ri.getCapabilities();
    245260                if (caps.indexOf(capability) >= 0)
  • router/java/src/net/i2p/router/peermanager/PeerManagerFacadeImpl.java

    r05597ae r617d1cd  
    5858    }
    5959   
    60     public List selectPeers(PeerSelectionCriteria criteria) {
     60    public List<Hash> selectPeers(PeerSelectionCriteria criteria) {
    6161        return _manager.selectPeers(criteria);
    6262    }
     
    7070        _manager.removeCapabilities(peer);
    7171    }
     72
     73    /** @deprecated unused */
    7274    public Hash selectRandomByCapability(char capability) {
    73         if (_manager == null) return null;
    74         return _manager.selectRandomByCapability(capability);
     75        //if (_manager == null) return null;
     76        //return _manager.selectRandomByCapability(capability);
     77        return null;
    7578    }
    76     public List getPeersByCapability(char capability) {
     79
     80    public List<Hash> getPeersByCapability(char capability) {
    7781        if (_manager == null) return new ArrayList(0);
    7882        return _manager.getPeersByCapability(capability);
  • router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java

    r05597ae r617d1cd  
    218218    }
    219219   
    220     private boolean isX(Map m, Hash peer) {
     220    private boolean isX(Map<Hash, PeerProfile> m, Hash peer) {
    221221        getReadLock();
    222222        try {
     
    273273     *
    274274     */
    275     public void selectFastPeers(int howMany, Set exclude, Set matches) {
     275    public void selectFastPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
    276276        selectFastPeers(howMany, exclude, matches, 0);
    277277    }
    278     public void selectFastPeers(int howMany, Set exclude, Set matches, int mask) {
     278    public void selectFastPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
    279279        getReadLock();
    280280        try {
     
    296296     *
    297297     */
    298     public void selectHighCapacityPeers(int howMany, Set exclude, Set matches) {
     298    public void selectHighCapacityPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
    299299        selectHighCapacityPeers(howMany, exclude, matches, 0);
    300300    }
    301     public void selectHighCapacityPeers(int howMany, Set exclude, Set matches, int mask) {
     301    public void selectHighCapacityPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
    302302        getReadLock();
    303303        try {
     
    327327     *
    328328     */
    329     public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches) {
     329    public void selectWellIntegratedPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
    330330        selectWellIntegratedPeers(howMany, exclude, matches, 0);
    331331    }
    332     public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches, int mask) {
     332    public void selectWellIntegratedPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
    333333        getReadLock();
    334334        try {
     
    351351     *
    352352     */
    353     public void selectNotFailingPeers(int howMany, Set exclude, Set matches) {
     353    public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
    354354        selectNotFailingPeers(howMany, exclude, matches, false, 0);
    355355    }
    356     public void selectNotFailingPeers(int howMany, Set exclude, Set matches, int mask) {
     356    public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
    357357        selectNotFailingPeers(howMany, exclude, matches, false, mask);
    358358    }
    359     public void selectNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing) {
     359    public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing) {
    360360        selectNotFailingPeers(howMany, exclude, matches, onlyNotFailing, 0);
    361361    }
     
    369369     * @param onlyNotFailing if true, don't include any high capacity peers
    370370     */
    371     public void selectNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing, int mask) {
     371    public void selectNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing, int mask) {
    372372        if (matches.size() < howMany)
    373373            selectAllNotFailingPeers(howMany, exclude, matches, onlyNotFailing, mask);
     
    389389     * No mask parameter, to be fixed
    390390     */
    391     public void selectActiveNotFailingPeers(int howMany, Set exclude, Set matches) {
     391    public void selectActiveNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
    392392        if (matches.size() < howMany) {
    393393            getReadLock();
     
    413413     * This DOES cascade further to non-connected peers.
    414414     */
    415     private void selectActiveNotFailingPeers2(int howMany, Set exclude, Set matches, int mask) {
     415    private void selectActiveNotFailingPeers2(int howMany, Set<Hash> exclude, Set<Hash> matches, int mask) {
    416416        if (matches.size() < howMany) {
    417417            Map<Hash, PeerProfile> activePeers = new HashMap();
     
    440440     *
    441441     */
    442     public void selectAllNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing) {
     442    public void selectAllNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing) {
    443443        selectAllNotFailingPeers(howMany, exclude, matches, onlyNotFailing, 0);
    444444    }
     
    447447     *
    448448     */
    449     private void selectAllNotFailingPeers(int howMany, Set exclude, Set matches, boolean onlyNotFailing, int mask) {
     449    private void selectAllNotFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches, boolean onlyNotFailing, int mask) {
    450450        if (matches.size() < howMany) {
    451451            int orig = matches.size();
     
    496496     *
    497497     */
    498     public void selectFailingPeers(int howMany, Set exclude, Set matches) {
     498    public void selectFailingPeers(int howMany, Set<Hash> exclude, Set<Hash> matches) {
    499499        getReadLock();
    500500        try {
     
    565565     *
    566566     */
    567     public List selectPeersRecentlyRejecting() {
     567    public List<Hash> selectPeersRecentlyRejecting() {
    568568        getReadLock();
    569569        try {
    570570            long cutoff = _context.clock().now() - (20*1000);
    571571            int count = _notFailingPeers.size();
    572             List l = new ArrayList(count / 128);
     572            List<Hash> l = new ArrayList(count / 128);
    573573            for (Iterator<PeerProfile> iter = _notFailingPeers.values().iterator(); iter.hasNext(); ) {
    574574                PeerProfile prof = iter.next();
     
    584584     *
    585585     */
    586     public Set selectAllPeers() {
     586    public Set<Hash> selectAllPeers() {
    587587        getReadLock();
    588588        try {
    589             Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
     589            Set<Hash> allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
    590590            allPeers.addAll(_failingPeers.keySet());
    591591            allPeers.addAll(_notFailingPeers.keySet());
     
    854854     *
    855855     */
    856     private void locked_calculateThresholds(Set allPeers) {
     856    private void locked_calculateThresholds(Set<PeerProfile> allPeers) {
    857857        double totalCapacity = 0;
    858858        double totalIntegration = 0;
    859         Set reordered = new TreeSet(_comp);
     859        Set<PeerProfile> reordered = new TreeSet(_comp);
    860860        for (Iterator<PeerProfile> iter = allPeers.iterator(); iter.hasNext(); ) {
    861861            PeerProfile profile = iter.next();
     
    896896     *                  capacity is greater than the growth factor
    897897     */
    898     private void locked_calculateCapacityThreshold(double totalCapacity, Set reordered) {
     898    private void locked_calculateCapacityThreshold(double totalCapacity, Set<PeerProfile> reordered) {
    899899        int numNotFailing = reordered.size();
    900900       
     
    965965     *                  (highest first) for active nonfailing peers
    966966     */
    967     private void locked_calculateSpeedThreshold(Set reordered) {
     967    private void locked_calculateSpeedThreshold(Set<PeerProfile> reordered) {
    968968        if (true) {
    969969            locked_calculateSpeedThresholdMean(reordered);
     
    997997    }
    998998   
    999     private void locked_calculateSpeedThresholdMean(Set reordered) {
     999    private void locked_calculateSpeedThresholdMean(Set<PeerProfile> reordered) {
    10001000        double total = 0;
    10011001        int count = 0;
     
    10411041     *
    10421042     */
    1043     private void locked_selectPeers(Map peers, int howMany, Set toExclude, Set matches) {
     1043    private void locked_selectPeers(Map<Hash, PeerProfile> peers, int howMany, Set<Hash> toExclude, Set<Hash> matches) {
    10441044        locked_selectPeers(peers, howMany, toExclude, matches, 0);
    10451045    }
    1046     private void locked_selectPeers(Map peers, int howMany, Set toExclude, Set matches, int mask) {
     1046    private void locked_selectPeers(Map<Hash, PeerProfile> peers, int howMany, Set<Hash> toExclude, Set<Hash> matches, int mask) {
    10471047        List all = new ArrayList(peers.keySet());
    10481048        if (toExclude != null)
     
    10521052        all.remove(_us);
    10531053        Collections.shuffle(all, _random);
    1054         Set IPSet = new HashSet(8);
     1054        Set<Integer> IPSet = new HashSet(8);
    10551055        for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) {
    10561056            Hash peer = (Hash)all.get(i);
     
    10741074     * @param IPMatches all IPs so far, modified by this routine
    10751075     */
    1076     private boolean notRestricted(Hash peer, Set IPSet, int mask) {
    1077         Set peerIPs = maskedIPSet(peer, mask);
     1076    private boolean notRestricted(Hash peer, Set<Integer> IPSet, int mask) {
     1077        Set<Integer> peerIPs = maskedIPSet(peer, mask);
    10781078        if (containsAny(IPSet, peerIPs))
    10791079            return false;
     
    10881088      * @return an opaque set of masked IPs for this peer
    10891089      */
    1090     private Set maskedIPSet(Hash peer, int mask) {
    1091         Set rv = new HashSet(2);
     1090    private Set<Integer> maskedIPSet(Hash peer, int mask) {
     1091        Set<Integer> rv = new HashSet(2);
    10921092        byte[] commIP = _context.commSystem().getIP(peer);
    10931093        if (commIP != null)
  • router/java/src/net/i2p/router/startup/ClientAppConfig.java

    r05597ae r617d1cd  
    1919 * page in the router console.
    2020 *
     21 * <pre>
     22 *
     23 * clients.config format:
     24 *
     25 * Lines are of the form clientApp.x.prop=val, where x is the app number.
     26 * App numbers MUST start with 0 and be consecutive.
     27 *
     28 * Properties are as follows:
     29 *      main: Full class name. Required. The main() method in this
     30 *            class will be run.
     31 *      name: Name to be displayed on console.
     32 *      args: Arguments to the main class, separated by spaces or tabs.
     33 *            Arguments containing spaces or tabs may be quoted with ' or "
     34 *      delay: Seconds before starting, default 120
     35 *      onBoot: {true|false}, default false, forces a delay of 0,
     36 *              overrides delay setting
     37 *      startOnLoad: {true|false} Is the client to be run at all?
     38 *                    Default true
     39 *
     40 * The following additional properties are used only by plugins:
     41 *      stopargs: Arguments to stop the client.
     42 *      uninstallargs: Arguments to stop the client.
     43 *      classpath: Additional classpath elements for the client,
     44 *                 separated by commas.
     45 *
     46 * The following substitutions are made in the args, stopargs,
     47 * uninstallargs, and classpath lines, for plugins only:
     48 *      $I2P: The base I2P install directory
     49 *      $CONFIG: The user's configuration directory (e.g. ~/.i2p)
     50 *      $PLUGIN: This plugin's directory (e.g. ~/.i2p/plugins/foo)
     51 *
     52 * All properties except "main" are optional.
     53 * Lines starting with "#" are comments.
     54 *
     55 * If the delay is less than zero, the client is run immediately,
     56 * in the same thread, so that exceptions may be propagated to the console.
     57 * In this case, the client should either throw an exception, return quickly,
     58 * or spawn its own thread.
     59 * If the delay is greater than or equal to zero, it will be run
     60 * in a new thread, and exceptions will be logged but not propagated
     61 * to the console.
     62 *
     63 * </pre>
    2164 */
    2265public class ClientAppConfig {
  • router/java/src/net/i2p/router/startup/LoadClientAppsJob.java

    r05597ae r617d1cd  
    1212
    1313/**
    14  * Run any client applications specified in the router.config.  If any clientApp
     14 * Run any client applications specified in clients.config.  If any clientApp
    1515 * contains the config property ".onBoot=true" it'll be launched immediately, otherwise
    1616 * it'll get queued up for starting 2 minutes later.
     
    4141                continue;
    4242            String argVal[] = parseArgs(app.args);
    43             if (app.delay == 0) {
     43            if (app.delay <= 0) {
    4444                // run this guy now
    4545                runClient(app.className, app.clientName, argVal, _log);
     
    119119    }
    120120
     121    /**
     122     *  Use to test if the class is present,
     123     *  to propagate an error back to the user,
     124     *  since runClient() runs in a separate thread.
     125     *
     126     *  @since 0.7.13
     127     */
     128    public static void testClient(String className) throws ClassNotFoundException {
     129        Class.forName(className);
     130    }
     131
     132    /**
     133     *  Run client in this thread.
     134     *
     135     *  @throws just about anything, caller would be wise to catch Throwable
     136     *  @since 0.7.13
     137     */
     138    public static void runClientInline(String className, String clientName, String args[], Log log) throws Exception {
     139        if (log.shouldLog(Log.INFO))
     140            log.info("Loading up the client application " + clientName + ": " + className + " " + Arrays.toString(args));
     141        if (args == null)
     142            args = new String[0];
     143        Class cls = Class.forName(className);
     144        Method method = cls.getMethod("main", new Class[] { String[].class });
     145        method.invoke(cls, new Object[] { args });
     146    }
     147
     148    /**
     149     *  Run client in a new thread.
     150     */
    121151    public static void runClient(String className, String clientName, String args[], Log log) {
    122152        if (log.shouldLog(Log.INFO))
  • router/java/src/net/i2p/router/transport/CommSystemFacadeImpl.java

    r05597ae r617d1cd  
    117117    }
    118118   
    119     public List getBids(OutNetMessage msg) {
     119    public List<TransportBid> getBids(OutNetMessage msg) {
    120120        return _manager.getBids(msg);
    121121    }
     
    175175   
    176176    @Override
    177     public Set createAddresses() {
    178         Map addresses = null;
     177    public Set<RouterAddress> createAddresses() {
     178        Map<String, RouterAddress> addresses = null;
    179179        boolean newCreated = false;
    180180       
  • router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java

    r05597ae r617d1cd  
    55import java.util.ArrayList;
    66import java.util.List;
     7import java.util.concurrent.atomic.AtomicInteger;
     8import java.util.concurrent.atomic.AtomicLong;
    79
    810import net.i2p.I2PAppContext;
     
    1012import net.i2p.util.Log;
    1113
     14/**
     15 *  Concurrent plan:
     16 *
     17 *  It's difficult to get rid of the locks on _pendingInboundRequests
     18 *  since locked_satisyInboundAvailable() leaves Requests on the head
     19 *  of the queue.
     20 *
     21 *  When we go to Java 6, we can convert from a locked ArrayList to
     22 *  a LinkedBlockingDeque, where locked_sIA will poll() from the
     23 *  head of the queue, and if the request is not fully satisfied,
     24 *  offerFirst() (i.e. push) it back on the head.
     25 *
     26 *  Ditto outbound of course.
     27 *
     28 *  In the meantime, for Java 5, we have lockless 'shortcut'
     29 *  methods for the common case where we are under the bandwidth limits.
     30 *  And the volatile counters are now AtomicIntegers / AtomicLongs.
     31 *
     32 */
    1233public class FIFOBandwidthLimiter {
    1334    private Log _log;
    1435    private I2PAppContext _context;
    15     private final List _pendingInboundRequests;
    16     private final List _pendingOutboundRequests;
     36    private final List<Request> _pendingInboundRequests;
     37    private final List<Request> _pendingOutboundRequests;
    1738    /** how many bytes we can consume for inbound transmission immediately */
    18     private volatile int _availableInbound;
     39    private AtomicInteger _availableInbound = new AtomicInteger();
    1940    /** how many bytes we can consume for outbound transmission immediately */
    20     private volatile int _availableOutbound;
     41    private AtomicInteger _availableOutbound = new AtomicInteger();
    2142    /** how many bytes we can queue up for bursting */
    22     private volatile int _unavailableInboundBurst;
     43    private AtomicInteger _unavailableInboundBurst = new AtomicInteger();
    2344    /** how many bytes we can queue up for bursting */
    24     private volatile int _unavailableOutboundBurst;
     45    private AtomicInteger _unavailableOutboundBurst = new AtomicInteger();
    2546    /** how large _unavailableInbound can get */
    2647    private int _maxInboundBurst;
     
    3657    private boolean _inboundUnlimited;
    3758    /** lifetime counter of bytes received */
    38     private volatile long _totalAllocatedInboundBytes;
     59    private AtomicLong _totalAllocatedInboundBytes = new AtomicLong();
    3960    /** lifetime counter of bytes sent */
    40     private volatile long _totalAllocatedOutboundBytes;
     61    private AtomicLong _totalAllocatedOutboundBytes = new AtomicLong();
    4162    /** lifetime counter of tokens available for use but exceeded our maxInboundBurst size */
    42     private volatile long _totalWastedInboundBytes;
     63    private AtomicLong _totalWastedInboundBytes = new AtomicLong();
    4364    /** lifetime counter of tokens available for use but exceeded our maxOutboundBurst size */
    44     private volatile long _totalWastedOutboundBytes;
     65    private AtomicLong _totalWastedOutboundBytes = new AtomicLong();
    4566    private FIFOBandwidthRefiller _refiller;
    4667   
     
    7697        _pendingInboundRequests = new ArrayList(16);
    7798        _pendingOutboundRequests = new ArrayList(16);
    78         _lastTotalSent = _totalAllocatedOutboundBytes;
    79         _lastTotalReceived = _totalAllocatedInboundBytes;
     99        _lastTotalSent = _totalAllocatedOutboundBytes.get();
     100        _lastTotalReceived = _totalAllocatedInboundBytes.get();
    80101        _sendBps = 0;
    81102        _recvBps = 0;
     
    91112    //public long getAvailableInboundBytes() { return _availableInboundBytes; }
    92113    //public long getAvailableOutboundBytes() { return _availableOutboundBytes; }
    93     public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes; }
    94     public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes; }
    95     public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes; }
    96     public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes; }
     114    public long getTotalAllocatedInboundBytes() { return _totalAllocatedInboundBytes.get(); }
     115    public long getTotalAllocatedOutboundBytes() { return _totalAllocatedOutboundBytes.get(); }
     116    public long getTotalWastedInboundBytes() { return _totalWastedInboundBytes.get(); }
     117    public long getTotalWastedOutboundBytes() { return _totalWastedOutboundBytes.get(); }
    97118    //public long getMaxInboundBytes() { return _maxInboundBytes; }
    98119    //public void setMaxInboundBytes(int numBytes) { _maxInboundBytes = numBytes; }
     
    117138        _pendingInboundRequests.clear();
    118139        _pendingOutboundRequests.clear();
    119         _availableInbound = 0;
    120         _availableOutbound = 0;
     140        _availableInbound.set(0);
     141        _availableOutbound.set(0);
    121142        _maxInbound = 0;
    122143        _maxOutbound = 0;
    123144        _maxInboundBurst = 0;
    124145        _maxOutboundBurst = 0;
    125         _unavailableInboundBurst = 0;
    126         _unavailableOutboundBurst = 0;
     146        _unavailableInboundBurst.set(0);
     147        _unavailableOutboundBurst.set(0);
    127148        _inboundUnlimited = false;
    128149        _outboundUnlimited = false;
     
    133154
    134155    /**
    135      * Request some bytes, blocking until they become available
    136      *
    137      */
    138     public Request requestInbound(int bytesIn, String purpose) { return requestInbound(bytesIn, purpose, null, null); }
     156     * Request some bytes. Does not block.
     157     */
     158    public Request requestInbound(int bytesIn, String purpose) {
     159        // try to satisfy without grabbing the global lock
     160        if (shortcutSatisfyInboundRequest(bytesIn))
     161            return _noop;
     162        return requestInbound(bytesIn, purpose, null, null);
     163    }
     164
    139165    public Request requestInbound(int bytesIn, String purpose, CompleteListener lsnr, Object attachment) {
    140         if (_inboundUnlimited) {
    141             _totalAllocatedInboundBytes += bytesIn;
    142             return _noop;
    143         }
    144        
    145166        SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose, lsnr, attachment);
    146167        requestInbound(req, bytesIn, purpose);
    147168        return req;
    148169    }
    149     public void requestInbound(Request req, int bytesIn, String purpose) {
    150         req.init(bytesIn, 0, purpose);
    151         if (false) { ((SimpleRequest)req).allocateAll(); return; }
    152         int pending = 0;
     170
     171    /**
     172     * The transports don't use this any more, so make it private
     173     * and a SimpleRequest instead of a Request
     174     * So there's no more casting
     175     */
     176    private void requestInbound(SimpleRequest req, int bytesIn, String purpose) {
     177        // don't init twice - uncomment if we make public again?
     178        //req.init(bytesIn, 0, purpose);
     179        int pending;
    153180        synchronized (_pendingInboundRequests) {
    154181            pending = _pendingInboundRequests.size();
    155182            _pendingInboundRequests.add(req);
    156183        }
    157         satisfyInboundRequests(((SimpleRequest)req).satisfiedBuffer);
    158         ((SimpleRequest)req).satisfiedBuffer.clear();
     184        satisfyInboundRequests(req.satisfiedBuffer);
     185        req.satisfiedBuffer.clear();
    159186        if (pending > 0)
    160187            _context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending);
    161188    }
    162     /**
    163      * Request some bytes, blocking until they become available
    164      *
    165      */
    166     public Request requestOutbound(int bytesOut, String purpose) { return requestOutbound(bytesOut, purpose, null, null); }
     189
     190    /**
     191     * Request some bytes. Does not block.
     192     */
     193    public Request requestOutbound(int bytesOut, String purpose) {
     194        // try to satisfy without grabbing the global lock
     195        if (shortcutSatisfyOutboundRequest(bytesOut))
     196            return _noop;
     197        return requestOutbound(bytesOut, purpose, null, null);
     198    }
     199
    167200    public Request requestOutbound(int bytesOut, String purpose, CompleteListener lsnr, Object attachment) {
    168         if (_outboundUnlimited) {
    169             _totalAllocatedOutboundBytes += bytesOut;
    170             return _noop;
    171         }
    172 
    173201        SimpleRequest req = new SimpleRequest(0, bytesOut, purpose, lsnr, attachment);
    174202        requestOutbound(req, bytesOut, purpose);
    175203        return req;
    176204    }
    177     public void requestOutbound(Request req, int bytesOut, String purpose) {
    178         req.init(0, bytesOut, purpose);
    179         if (false) { ((SimpleRequest)req).allocateAll(); return; }
    180         int pending = 0;
     205
     206    private void requestOutbound(SimpleRequest req, int bytesOut, String purpose) {
     207        // don't init twice - uncomment if we make public again?
     208        //req.init(0, bytesOut, purpose);
     209        int pending;
    181210        synchronized (_pendingOutboundRequests) {
    182211            pending = _pendingOutboundRequests.size();
    183212            _pendingOutboundRequests.add(req);
    184213        }
    185         satisfyOutboundRequests(((SimpleRequest)req).satisfiedBuffer);
    186         ((SimpleRequest)req).satisfiedBuffer.clear();
     214        satisfyOutboundRequests(req.satisfiedBuffer);
     215        req.satisfiedBuffer.clear();
    187216        if (pending > 0)
    188217            _context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending);
     
    201230   
    202231    StringBuilder getStatus() {
    203         StringBuilder rv = new StringBuilder(64);
     232        StringBuilder rv = new StringBuilder(128);
    204233        rv.append("Available: ").append(_availableInbound).append('/').append(_availableOutbound).append(' ');
    205234        rv.append("Max: ").append(_maxInbound).append('/').append(_maxOutbound).append(' ');
     
    216245     * @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative)
    217246     */
    218     final void refillBandwidthQueues(List buf, long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) {
     247    final void refillBandwidthQueues(List<Request> buf, long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) {
    219248        if (_log.shouldLog(Log.DEBUG))
    220249            _log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ": " + getStatus().toString());
    221         _availableInbound += bytesInbound;
    222         _availableOutbound += bytesOutbound;
    223        
    224         if (_availableInbound > _maxInbound) {
     250
     251        // Take some care throughout to minimize accesses to the atomics,
     252        // both for efficiency and to not let strange things happen if
     253        // it changes out from under us
     254        // This never had locks before concurrent, anyway
     255
     256        int avi = _availableInbound.addAndGet((int) bytesInbound);
     257        if (avi > _maxInbound) {
    225258            if (_log.shouldLog(Log.DEBUG))
    226                 _log.debug("available inbound (" + _availableInbound + ") exceeds our inbound burst (" + _maxInbound + "), so no supplement");
    227             _unavailableInboundBurst += _availableInbound - _maxInbound;
    228             _availableInbound = _maxInbound;
    229             if (_unavailableInboundBurst > _maxInboundBurst) {
    230                 _totalWastedInboundBytes += _unavailableInboundBurst - _maxInboundBurst;
    231                 _unavailableInboundBurst = _maxInboundBurst;
     259                _log.debug("available inbound (" + avi + ") exceeds our inbound burst (" + _maxInbound + "), so no supplement");
     260            int uib = _unavailableInboundBurst.addAndGet(avi - _maxInbound);
     261            _availableInbound.set(_maxInbound);
     262            if (uib > _maxInboundBurst) {
     263                _totalWastedInboundBytes.addAndGet(uib - _maxInboundBurst);
     264                _unavailableInboundBurst.set(_maxInboundBurst);
    232265            }
    233266        } else {
    234267            // try to pull in up to 1/10th of the burst rate, since we refill every 100ms
    235268            int want = (int)maxBurstIn;
    236             if (want > (_maxInbound - _availableInbound))
    237                 want = _maxInbound - _availableInbound;
     269            if (want > (_maxInbound - avi))
     270                want = _maxInbound - avi;
    238271            if (_log.shouldLog(Log.DEBUG))
    239                 _log.debug("want to pull " + want + " from the inbound burst (" + _unavailableInboundBurst + ") to supplement " + _availableInbound + " (max: " + _maxInbound + ")");
     272                _log.debug("want to pull " + want + " from the inbound burst (" + _unavailableInboundBurst + ") to supplement " + avi + " (max: " + _maxInbound + ")");
    240273           
    241274            if (want > 0) {
    242                 if (want <= _unavailableInboundBurst) {
    243                     _availableInbound += want;
    244                     _unavailableInboundBurst -= want;
     275                int uib = _unavailableInboundBurst.get();
     276                if (want <= uib) {
     277                    _availableInbound.addAndGet(want);
     278                    _unavailableInboundBurst.addAndGet(0 - want);
    245279                } else {
    246                     _availableInbound += _unavailableInboundBurst;
    247                     _unavailableInboundBurst = 0;
     280                    _availableInbound.addAndGet(uib);
     281                    _unavailableInboundBurst.set(0);
    248282                }
    249283            }
    250284        }
    251285       
    252         if (_availableOutbound > _maxOutbound) {
     286        int avo = _availableOutbound.addAndGet((int) bytesOutbound);
     287        if (avo > _maxOutbound) {
    253288            if (_log.shouldLog(Log.DEBUG))
    254                 _log.debug("available outbound (" + _availableOutbound + ") exceeds our outbound burst (" + _maxOutbound + "), so no supplement");
    255             _unavailableOutboundBurst += _availableOutbound - _maxOutbound;
    256             _availableOutbound = _maxOutbound;
    257             if (_unavailableOutboundBurst > _maxOutboundBurst) {
    258                 _totalWastedOutboundBytes += _unavailableOutboundBurst - _maxOutboundBurst;
    259                 _unavailableOutboundBurst = _maxOutboundBurst;
     289                _log.debug("available outbound (" + avo + ") exceeds our outbound burst (" + _maxOutbound + "), so no supplement");
     290            int uob = _unavailableOutboundBurst.getAndAdd(avo - _maxOutbound);
     291            _availableOutbound.set(_maxOutbound);
     292
     293            if (uob > _maxOutboundBurst) {
     294                _totalWastedOutboundBytes.getAndAdd(uob - _maxOutboundBurst);
     295                _unavailableOutboundBurst.set(_maxOutboundBurst);
    260296            }
    261297        } else {
    262298            // try to pull in up to 1/10th of the burst rate, since we refill every 100ms
    263299            int want = (int)maxBurstOut;
    264             if (want > (_maxOutbound - _availableOutbound))
    265                 want = _maxOutbound - _availableOutbound;
     300            if (want > (_maxOutbound - avo))
     301                want = _maxOutbound - avo;
    266302            if (_log.shouldLog(Log.DEBUG))
    267                 _log.debug("want to pull " + want + " from the outbound burst (" + _unavailableOutboundBurst + ") to supplement " + _availableOutbound + " (max: " + _maxOutbound + ")");
     303                _log.debug("want to pull " + want + " from the outbound burst (" + _unavailableOutboundBurst + ") to supplement " + avo + " (max: " + _maxOutbound + ")");
    268304           
    269305            if (want > 0) {
    270                 if (want <= _unavailableOutboundBurst) {
    271                     _availableOutbound += want;
    272                     _unavailableOutboundBurst -= want;
     306                int uob = _unavailableOutboundBurst.get();
     307                if (want <= uob) {
     308                    _availableOutbound.addAndGet(want);
     309                    _unavailableOutboundBurst.addAndGet(0 - want);
    273310                } else {
    274                     _availableOutbound += _unavailableOutboundBurst;
    275                     _unavailableOutboundBurst = 0;
     311                    _availableOutbound.addAndGet(uob);
     312                    _unavailableOutboundBurst.set(0);
    276313                }
    277314            }
     
    287324        // If at least one second has passed
    288325        if (time >= 1000) {
    289             long totS = _totalAllocatedOutboundBytes;
    290             long totR = _totalAllocatedInboundBytes;
     326            long totS = _totalAllocatedOutboundBytes.get();
     327            long totR = _totalAllocatedInboundBytes.get();
    291328            long sent = totS - _lastTotalSent; // How much we sent meanwhile
    292329            long recv = totR - _lastTotalReceived; // How much we received meanwhile
     
    338375     * Go through the queue, satisfying as many requests as possible (notifying
    339376     * each one satisfied that the request has been granted). 
    340      */
    341     private final void satisfyRequests(List buffer) {
     377     *
     378     * @param buffer returned with the satisfied outbound requests only
     379     */
     380    private final void satisfyRequests(List<Request> buffer) {
    342381        buffer.clear();
    343382        satisfyInboundRequests(buffer);
     
    346385    }
    347386   
    348     private final void satisfyInboundRequests(List satisfied) {
     387    private final void satisfyInboundRequests(List<Request> satisfied) {
    349388        synchronized (_pendingInboundRequests) {
    350389            if (_inboundUnlimited) {
    351390                locked_satisfyInboundUnlimited(satisfied);
    352391            } else {
    353                 if (_availableInbound > 0) {
     392                if (_availableInbound.get() > 0) {
    354393                    locked_satisfyInboundAvailable(satisfied);
    355394                } else {
     
    371410    }
    372411   
     412    /** called from debug logging only */
    373413    private long locked_getLongestInboundWait() {
    374414        long start = -1;
     
    383423            return now() - start;
    384424    }
     425
     426    /** called from debug logging only */
    385427    private long locked_getLongestOutboundWait() {
    386428        long start = -1;
     
    401443     *
    402444     */
    403     private final void locked_satisfyInboundUnlimited(List satisfied) {
     445    private final void locked_satisfyInboundUnlimited(List<Request> satisfied) {
    404446        while (_pendingInboundRequests.size() > 0) {
    405447            SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0);
    406448            int allocated = req.getPendingInboundRequested();
    407             _totalAllocatedInboundBytes += allocated;
     449            _totalAllocatedInboundBytes.addAndGet(allocated);
    408450            req.allocateBytes(allocated, 0);
    409451            satisfied.add(req);
     
    426468     * @return list of requests that were completely satisfied
    427469     */
    428     private final void locked_satisfyInboundAvailable(List satisfied) {
     470    private final void locked_satisfyInboundAvailable(List<Request> satisfied) {
    429471        for (int i = 0; i < _pendingInboundRequests.size(); i++) {
    430             if (_availableInbound <= 0) break;
     472            if (_availableInbound.get() <= 0) break;
    431473            SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i);
    432474            long waited = now() - req.getRequestTime();
     
    453495            // ok, they are really waiting for us to give them stuff
    454496            int requested = req.getPendingInboundRequested();
    455             int allocated = 0;
    456             if (_availableInbound > requested)
     497            int avi = _availableInbound.get();
     498            int allocated;
     499            if (avi >= requested)
    457500                allocated = requested;
    458501            else
    459                 allocated = _availableInbound;
    460             _availableInbound -= allocated;
    461             _totalAllocatedInboundBytes += allocated;
     502                allocated = avi;
     503            _availableInbound.addAndGet(0 - allocated);
     504            _totalAllocatedInboundBytes.addAndGet(allocated);
    462505            req.allocateBytes(allocated, 0);
    463506            satisfied.add(req);
     
    486529    }
    487530   
    488     private final void satisfyOutboundRequests(List satisfied) {
     531    private final void satisfyOutboundRequests(List<Request> satisfied) {
    489532        synchronized (_pendingOutboundRequests) {
    490533            if (_outboundUnlimited) {
    491534                locked_satisfyOutboundUnlimited(satisfied);
    492535            } else {
    493                 if (_availableOutbound > 0) {
     536                if (_availableOutbound.get() > 0) {
    494537                    locked_satisfyOutboundAvailable(satisfied);
    495538                } else {
     
    515558     *
    516559     */
    517     private final void locked_satisfyOutboundUnlimited(List satisfied) {
     560    private final void locked_satisfyOutboundUnlimited(List<Request> satisfied) {
    518561        while (_pendingOutboundRequests.size() > 0) {
    519562            SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0);
    520563            int allocated = req.getPendingOutboundRequested();
    521             _totalAllocatedOutboundBytes += allocated;
     564            _totalAllocatedOutboundBytes.addAndGet(allocated);
    522565            req.allocateBytes(0, allocated);
    523566            satisfied.add(req);
     
    541584     * @return list of requests that were completely satisfied
    542585     */
    543     private final void locked_satisfyOutboundAvailable(List satisfied) {
     586    private final void locked_satisfyOutboundAvailable(List<Request> satisfied) {
    544587        for (int i = 0; i < _pendingOutboundRequests.size(); i++) {
    545             if (_availableOutbound <= 0) break;
     588            if (_availableOutbound.get() <= 0) break;
    546589            SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i);
    547590            long waited = now() - req.getRequestTime();
     
    568611            // ok, they are really waiting for us to give them stuff
    569612            int requested = req.getPendingOutboundRequested();
    570             int allocated = 0;
    571             if (_availableOutbound > requested)
     613            int avo = _availableOutbound.get();
     614            int allocated;
     615            if (avo >= requested)
    572616                allocated = requested;
    573617            else
    574                 allocated = _availableOutbound;
    575             _availableOutbound -= allocated;
    576             _totalAllocatedOutboundBytes += allocated;
     618                allocated = avo;
     619            _availableOutbound.addAndGet(0 - allocated);
     620            _totalAllocatedOutboundBytes.addAndGet(allocated);
    577621            req.allocateBytes(0, allocated);
    578622            satisfied.add(req);
     
    619663    }
    620664   
     665    /**
     666     *  Lockless total satisfaction,
     667     *  at some minor risk of exceeding the limits
     668     *  and driving the available counter below zero
     669     *
     670     *  @param requested number of bytes
     671     *  @return satisfaction
     672     *  @since 0.7.13
     673     */
     674    private boolean shortcutSatisfyInboundRequest(int requested) {
     675        boolean rv = _inboundUnlimited ||
     676                     (_pendingInboundRequests.isEmpty() &&
     677                      _availableInbound.get() >= requested);
     678        if (rv) {
     679            _availableInbound.addAndGet(0 - requested);
     680            _totalAllocatedInboundBytes.addAndGet(requested);
     681        }
     682        if (_log.shouldLog(Log.INFO))
     683            _log.info("IB shortcut for " + requested + "B? " + rv);
     684        return rv;
     685    }
     686   
     687    /**
     688     *  Lockless total satisfaction,
     689     *  at some minor risk of exceeding the limits
     690     *  and driving the available counter below zero
     691     *
     692     *  @param requested number of bytes
     693     *  @return satisfaction
     694     *  @since 0.7.13
     695     */
     696    private boolean shortcutSatisfyOutboundRequest(int requested) {
     697        boolean rv = _outboundUnlimited ||
     698                     (_pendingOutboundRequests.isEmpty() &&
     699                      _availableOutbound.get() >= requested);
     700        if (rv) {
     701            _availableOutbound.addAndGet(0 - requested);
     702            _totalAllocatedOutboundBytes.addAndGet(requested);
     703        }
     704        if (_log.shouldLog(Log.INFO))
     705            _log.info("OB shortcut for " + requested + "B? " + rv);
     706        return rv;
     707    }
     708
    621709    /** @deprecated not worth translating */
    622710    public void renderStatusHTML(Writer out) throws IOException {
     
    666754        private boolean _aborted;
    667755        private boolean _waited;
    668         List satisfiedBuffer;
     756        List<Request> satisfiedBuffer;
    669757        private CompleteListener _lsnr;
    670758        private Object _attachment;
     
    742830            _inAllocated = _inTotal;
    743831            _outAllocated = _outTotal;
    744             _outAllocated = _outTotal;
    745832            if (_lsnr == null)
    746833                _allocationsSinceWait++;
     
    779866    }
    780867
     868    /**
     869     *  This is somewhat complicated by having both
     870     *  inbound and outbound in a single request.
     871     *  Making a request unidirectional would
     872     *  be a good simplification.
     873     *  But NTCP would have to be changed as it puts them on one queue.
     874     */
    781875    public interface Request {
    782876        /** describe this particular request */
  • router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java

    r05597ae r617d1cd  
    7272        // bootstrap 'em with nothing
    7373        _lastRefillTime = _limiter.now();
    74         List buffer = new ArrayList(2);
     74        List<FIFOBandwidthLimiter.Request> buffer = new ArrayList(2);
    7575        while (true) {
    7676            long now = _limiter.now();
     
    9696    }
    9797   
    98     private boolean updateQueues(List buffer, long now) {
     98    private boolean updateQueues(List<FIFOBandwidthLimiter.Request> buffer, long now) {
    9999        long numMs = (now - _lastRefillTime);
    100100        if (_log.shouldLog(Log.INFO))
  • router/java/src/net/i2p/router/transport/TransportImpl.java

    r05597ae r617d1cd  
    422422    }
    423423
    424     /** To protect dev anonymity. Set to true after 0.7.12 is out */
    425     public static final boolean ADJUST_COST = !RouterVersion.VERSION.equals("0.7.11");
     424    /** Do we increase the advertised cost when approaching conn limits? */
     425    public static final boolean ADJUST_COST = true;
    426426
    427427    /** What addresses are we currently listening to? */
  • router/java/src/net/i2p/router/transport/TransportManager.java

    r05597ae r617d1cd  
    141141        _log.debug("Starting up the transport manager");
    142142        for (int i = 0; i < _transports.size(); i++) {
    143             Transport t = (Transport)_transports.get(i);
     143            Transport t = _transports.get(i);
    144144            RouterAddress addr = t.startListening();
    145145            if (_log.shouldLog(Log.DEBUG))
     
    162162            _upnpManager.stop();
    163163        for (int i = 0; i < _transports.size(); i++) {
    164             ((Transport)_transports.get(i)).stopListening();
     164            _transports.get(i).stopListening();
    165165        }
    166166        _transports.clear();
     
    169169    public Transport getTransport(String style) {
    170170        for (int i = 0; i < _transports.size(); i++) {
    171             Transport t = (Transport)_transports.get(i);
     171            Transport t = _transports.get(i);
    172172            if(style.equals(t.getStyle()))
    173173                return t;
     
    190190        int peers = 0;
    191191        for (int i = 0; i < _transports.size(); i++) {
    192             peers += ((Transport)_transports.get(i)).countActivePeers();
     192            peers += _transports.get(i).countActivePeers();
    193193        }
    194194        return peers;
     
    198198        int peers = 0;
    199199        for (int i = 0; i < _transports.size(); i++) {
    200             peers += ((Transport)_transports.get(i)).countActiveSendPeers();
     200            peers += _transports.get(i).countActiveSendPeers();
    201201        }
    202202        return peers;
     
    211211    public boolean haveOutboundCapacity(int pct) {
    212212        for (int i = 0; i < _transports.size(); i++) {
    213             if (((Transport)_transports.get(i)).haveCapacity(pct))
     213            if (_transports.get(i).haveCapacity(pct))
    214214                return true;
    215215        }
     
    226226            return false;
    227227        for (int i = 0; i < _transports.size(); i++) {
    228             if (!((Transport)_transports.get(i)).haveCapacity(HIGH_CAPACITY_PCT))
     228            if (!_transports.get(i).haveCapacity(HIGH_CAPACITY_PCT))
    229229                return false;
    230230        }
     
    254254        Vector skews = new Vector();
    255255        for (int i = 0; i < _transports.size(); i++) {
    256             Vector tempSkews = ((Transport)_transports.get(i)).getClockSkews();
     256            Vector tempSkews = _transports.get(i).getClockSkews();
    257257            if ((tempSkews == null) || (tempSkews.size() <= 0)) continue;
    258258            skews.addAll(tempSkews);
     
    276276    public void recheckReachability() {
    277277        for (int i = 0; i < _transports.size(); i++)
    278             ((Transport)_transports.get(i)).recheckReachability();
     278            _transports.get(i).recheckReachability();
    279279    }
    280280
    281281    public boolean isBacklogged(Hash dest) {
    282282        for (int i = 0; i < _transports.size(); i++) {
    283             Transport t = (Transport)_transports.get(i);
     283            Transport t = _transports.get(i);
    284284            if (t.isBacklogged(dest))
    285285                return true;
     
    290290    public boolean isEstablished(Hash dest) {
    291291        for (int i = 0; i < _transports.size(); i++) {
    292             Transport t = (Transport)_transports.get(i);
     292            Transport t = _transports.get(i);
    293293            if (t.isEstablished(dest))
    294294                return true;
     
    304304    public boolean wasUnreachable(Hash dest) {
    305305        for (int i = 0; i < _transports.size(); i++) {
    306             Transport t = (Transport)_transports.get(i);
     306            Transport t = _transports.get(i);
    307307            if (!t.wasUnreachable(dest))
    308308                return false;
     
    372372   
    373373    public TransportBid getBid(OutNetMessage msg) {
    374         List bids = getBids(msg);
     374        List<TransportBid> bids = getBids(msg);
    375375        if ( (bids == null) || (bids.size() <= 0) )
    376376            return null;
    377377        else
    378             return (TransportBid)bids.get(0);
    379     }
    380     public List getBids(OutNetMessage msg) {
     378            return bids.get(0);
     379    }
     380    public List<TransportBid> getBids(OutNetMessage msg) {
    381381        if (msg == null)
    382382            throw new IllegalArgumentException("Null message?  no bidding on a null outNetMessage!");
     
    384384            throw new IllegalArgumentException("WTF, bids for a message bound to ourselves?");
    385385
    386         List rv = new ArrayList(_transports.size());
     386        List<TransportBid> rv = new ArrayList(_transports.size());
    387387        Set failedTransports = msg.getFailedTransports();
    388388        for (int i = 0; i < _transports.size(); i++) {
    389             Transport t = (Transport)_transports.get(i);
     389            Transport t = _transports.get(i);
    390390            if (failedTransports.contains(t.getStyle())) {
    391391                if (_log.shouldLog(Log.DEBUG))
     
    416416        TransportBid rv = null;
    417417        for (int i = 0; i < _transports.size(); i++) {
    418             Transport t = (Transport)_transports.get(i);
     418            Transport t = _transports.get(i);
    419419            if (t.isUnreachable(peer)) {
    420420                unreachableTransports++;
     
    483483        List rv = new ArrayList(16);
    484484        for (int i = 0; i < _transports.size(); i++) {
    485             Transport t = (Transport)_transports.get(i);
     485            Transport t = _transports.get(i);
    486486            rv.addAll(t.getMostRecentErrorMessages());
    487487        }
     
    492492        TreeMap transports = new TreeMap();
    493493        for (int i = 0; i < _transports.size(); i++) {
    494             Transport t = (Transport)_transports.get(i);
     494            Transport t = _transports.get(i);
    495495            transports.put(t.getStyle(), t);
    496496        }
     
    502502        buf.append("<h3>Router Transport Addresses</h3><pre>\n");
    503503        for (int i = 0; i < _transports.size(); i++) {
    504             Transport t = (Transport)_transports.get(i);
     504            Transport t = _transports.get(i);
    505505            if (t.getCurrentAddress() != null)
    506506                buf.append(t.getCurrentAddress()).append("\n\n");
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    r05597ae r617d1cd  
    303303    public void wantsWrite(NTCPConnection con, byte data[]) {
    304304        ByteBuffer buf = ByteBuffer.wrap(data);
    305         FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write", null, null);//con, buf);
     305        FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(data.length, "NTCP write");//con, buf);
    306306        if (req.getPendingOutboundRequested() > 0) {
    307307            if (_log.shouldLog(Log.INFO))
     
    472472                releaseBuf(buf);
    473473                ByteBuffer rbuf = ByteBuffer.wrap(data);
    474                 FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read", null, null); //con, buf);
     474                FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(read, "NTCP read"); //con, buf);
    475475                if (req.getPendingInboundRequested() > 0) {
    476476                    key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
  • router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java

    r05597ae r617d1cd  
    6363     */
    6464    private final LinkedBlockingQueue<ByteBuffer> _writeBufs;
    65     /** Todo: This is only so we can abort() them when we close() ??? */
     65    /** Requests that were not granted immediately */
    6666    private final Set<FIFOBandwidthLimiter.Request> _bwRequests;
    6767    private boolean _established;
  • router/java/src/net/i2p/router/transport/udp/ACKSender.java

    r05597ae r617d1cd  
    11package net.i2p.router.transport.udp;
    22
    3 import java.util.ArrayList;
     3import java.util.HashSet;
    44import java.util.List;
     5import java.util.Set;
     6import java.util.concurrent.BlockingQueue;
     7import java.util.concurrent.LinkedBlockingQueue;
    58
    69import net.i2p.router.RouterContext;
     
    1922    private PacketBuilder _builder;
    2023    /** list of peers (PeerState) who we have received data from but not yet ACKed to */
    21     private final List _peersToACK;
     24    private final BlockingQueue<PeerState> _peersToACK;
    2225    private boolean _alive;
     26    private static final long POISON_PS = -9999999999l;
    2327   
    2428    /** how frequently do we want to send ACKs to a peer? */
     
    2933        _log = ctx.logManager().getLog(ACKSender.class);
    3034        _transport = transport;
    31         _peersToACK = new ArrayList(4);
     35        _peersToACK = new LinkedBlockingQueue();
    3236        _builder = new PacketBuilder(_context, transport);
    3337        _alive = true;
     
    3842    }
    3943   
     44    /**
     45     *  Add to the queue.
     46     *  For speed, don't check for duplicates here.
     47     *  The runner will remove them in its own thread.
     48     */
    4049    public void ackPeer(PeerState peer) {
    41         synchronized (_peersToACK) {
    42             if (!_peersToACK.contains(peer))
    43                 _peersToACK.add(peer);
    44             _peersToACK.notifyAll();
    45         }
     50        if (_alive)
     51            _peersToACK.offer(peer);
    4652    }
    4753   
    4854    public void startup() {
    4955        _alive = true;
    50         I2PThread t = new I2PThread(this, "UDP ACK sender");
    51         t.setDaemon(true);
     56        _peersToACK.clear();
     57        I2PThread t = new I2PThread(this, "UDP ACK sender", true);
    5258        t.start();
    5359    }
     
    5561    public void shutdown() {
    5662        _alive = false;
    57         synchronized (_peersToACK) {
    58             _peersToACK.clear();
    59             _peersToACK.notifyAll();
     63        PeerState poison = new PeerState(_context, _transport);
     64        poison.setTheyRelayToUsAs(POISON_PS);
     65        _peersToACK.offer(poison);
     66        for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) {
     67            try {
     68                Thread.sleep(i * 50);
     69            } catch (InterruptedException ie) {}
    6070        }
     71        _peersToACK.clear();
    6172    }
    6273   
     
    7283   
    7384    public void run() {
     85
     86        // we use a Set to strip out dups that come in on the Queue
     87        Set<PeerState> notYet = new HashSet();
    7488        while (_alive) {
    7589            PeerState peer = null;
    76             long now = _context.clock().now();
     90            long now = 0;
    7791            long remaining = -1;
    78             try {
    79                 synchronized (_peersToACK) {
    80                     for (int i = 0; i < _peersToACK.size(); i++) {
    81                         PeerState cur = (PeerState)_peersToACK.get(i);
    82                         long wanted = cur.getWantedACKSendSince();
     92            long wanted = 0;
     93
     94                while (_alive) {
     95                    // Pull from the queue until we find one ready to ack
     96                    // Any that are not ready we will put back on the queue
     97                    PeerState cur = null;
     98                    try {
     99                        if (notYet.isEmpty())
     100                            // wait forever
     101                            cur = _peersToACK.take();
     102                        else
     103                            // Don't wait if nothing there, just put everybody back and sleep below
     104                            cur = _peersToACK.poll();
     105                    } catch (InterruptedException ie) {}
     106
     107                    if (cur != null) {
     108                        if (cur.getTheyRelayToUsAs() == POISON_PS)
     109                            return;
     110                        wanted = cur.getWantedACKSendSince();
     111                        now = _context.clock().now();
    83112                        long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now;
    84                         if ( ( (wanted > 0) && (delta < 0) ) || (cur.unsentACKThresholdReached()) ) {
    85                             _peersToACK.remove(i);
     113                        if (wanted <= 0) {
     114                            // it got acked by somebody - discard, remove any dups, and go around again
     115                            notYet.remove(cur);
     116                        } else if ( (delta <= 0) || (cur.unsentACKThresholdReached()) ) {
     117                            // found one to ack
    86118                            peer = cur;
     119                            notYet.remove(cur); // in case a dup
     120                            try {
     121                                // bulk operations may throw an exception
     122                                _peersToACK.addAll(notYet);
     123                            } catch (Exception e) {}
     124                            notYet.clear();
    87125                            break;
     126                        } else {
     127                            // not yet, go around again
     128                            // moving from the Queue to the Set and then back removes duplicates
     129                            boolean added = notYet.add(cur);
     130                            if (added && _log.shouldLog(Log.DEBUG))
     131                                _log.debug("Pending ACK (delta = " + delta + ") for " + cur);
    88132                        }
    89                     }
     133                    } else if (!notYet.isEmpty()) {
     134                        // put them all back and wait a while
     135                        try {
     136                            // bulk operations may throw an exception
     137                            _peersToACK.addAll(notYet);
     138                        } catch (Exception e) {}
     139                        if (_log.shouldLog(Log.INFO))
     140                            _log.info("sleeping, pending size = " + notYet.size());
     141                        notYet.clear();
     142                        try {
     143                            // sleep a little longer than the divided frequency,
     144                            // so it will be ready after we circle around a few times
     145                            Thread.sleep(5 + (ACK_FREQUENCY / 3));
     146                        } catch (InterruptedException ie) {}
     147                    } // else go around again where we will wait at take()
     148                } // inner while()
    90149                   
    91                     if (peer == null) {
    92                         if (_peersToACK.size() <= 0)
    93                             _peersToACK.wait();
    94                         else
    95                             _peersToACK.wait(50);
    96                     } else {
    97                         remaining = _peersToACK.size();
    98                     }
    99                 }
    100             } catch (InterruptedException ie) {}
    101                
    102150            if (peer != null) {
    103151                long lastSend = peer.getLastACKSend();
    104                 long wanted = peer.getWantedACKSendSince();
    105                 List ackBitfields = peer.retrieveACKBitfields(false);
     152                // set above before the break
     153                //long wanted = peer.getWantedACKSendSince();
     154                List<ACKBitfield> ackBitfields = peer.retrieveACKBitfields(false);
    106155               
    107                 if (wanted < 0)
    108                     _log.error("wtf, why are we acking something they dont want?  remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
     156                if (wanted < 0) {
     157                    if (_log.shouldLog(Log.WARN))
     158                        _log.warn("wtf, why are we acking something they dont want?  remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
     159                    continue;
     160                }
    109161               
    110                 if ( (ackBitfields != null) && (ackBitfields.size() > 0) ) {
     162                if ( (ackBitfields != null) && (!ackBitfields.isEmpty()) ) {
    111163                    _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0);
    112164                    if (remaining > 0)
    113165                        _context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
    114                     now = _context.clock().now();
     166                    // set above before the break
     167                    //now = _context.clock().now();
    115168                    if (lastSend < 0)
    116169                        lastSend = now - 1;
     
    120173                    ack.markType(1);
    121174                    ack.setFragmentCount(-1);
    122                     ack.setMessageType(42);
     175                    ack.setMessageType(PacketBuilder.TYPE_ACK);
    123176                   
    124177                    if (_log.shouldLog(Log.INFO))
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    r05597ae r617d1cd  
    88import java.util.List;
    99import java.util.Map;
     10import java.util.concurrent.ConcurrentHashMap;
    1011
    1112import net.i2p.crypto.DHSessionKeyBuilder;
     
    3839    private PacketBuilder _builder;
    3940    /** map of RemoteHostId to InboundEstablishState */
    40     private final Map _inboundStates;
     41    private final ConcurrentHashMap<RemoteHostId, InboundEstablishState> _inboundStates;
    4142    /** map of RemoteHostId to OutboundEstablishState */
    42     private final Map _outboundStates;
     43    private final ConcurrentHashMap<RemoteHostId, OutboundEstablishState> _outboundStates;
    4344    /** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */
    44     private final Map _queuedOutbound;
     45    private final ConcurrentHashMap<RemoteHostId, List<OutNetMessage>> _queuedOutbound;
    4546    /** map of nonce (Long) to OutboundEstablishState */
    46     private final Map _liveIntroductions;
     47    private final ConcurrentHashMap<Long, OutboundEstablishState> _liveIntroductions;
    4748    private boolean _alive;
    4849    private final Object _activityLock;
     
    5758        _transport = transport;
    5859        _builder = new PacketBuilder(ctx, transport);
    59         _inboundStates = new HashMap(32);
    60         _outboundStates = new HashMap(32);
    61         _queuedOutbound = new HashMap(32);
    62         _liveIntroductions = new HashMap(32);
     60        _inboundStates = new ConcurrentHashMap();
     61        _outboundStates = new ConcurrentHashMap();
     62        _queuedOutbound = new ConcurrentHashMap();
     63        _liveIntroductions = new ConcurrentHashMap();
    6364        _activityLock = new Object();
    6465        _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
     
    7576    public void startup() {
    7677        _alive = true;
    77         I2PThread t = new I2PThread(new Establisher(), "UDP Establisher");
    78         t.setDaemon(true);
     78        I2PThread t = new I2PThread(new Establisher(), "UDP Establisher", true);
    7979        t.start();
    8080    }
     
    8888     */
    8989    InboundEstablishState getInboundState(RemoteHostId from) {
    90         synchronized (_inboundStates) {
    91             InboundEstablishState state = (InboundEstablishState)_inboundStates.get(from);
     90            InboundEstablishState state = _inboundStates.get(from);
    9291            // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
    9392            //     _log.debug("No inbound states for " + from + ", with remaining: " + _inboundStates);
    9493            return state;
    95         }
    9694    }
    9795   
    9896    OutboundEstablishState getOutboundState(RemoteHostId from) {
    99         synchronized (_outboundStates) {
    100             OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(from);
     97            OutboundEstablishState state = _outboundStates.get(from);
    10198            // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
    10299            //     _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates);
    103100            return state;
    104         }
    105101    }
    106102   
     
    164160        boolean rejected = false;
    165161        int queueCount = 0;
    166         synchronized (_outboundStates) {
    167             state = (OutboundEstablishState)_outboundStates.get(to);
     162
     163            state = _outboundStates.get(to);
    168164            if (state == null) {
    169165                if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
    170                     List queued = (List)_queuedOutbound.get(to);
    171                     if (queued == null) {
    172                         queued = new ArrayList(1);
    173                         if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
    174                             rejected = true;
    175                         } else {
    176                             _queuedOutbound.put(to, queued);
    177                         }
     166                    if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
     167                        rejected = true;
     168                    } else {
     169                        List<OutNetMessage> newQueued = new ArrayList(1);
     170                        List<OutNetMessage> queued = _queuedOutbound.putIfAbsent(to, newQueued);
     171                        if (queued == null)
     172                            queued = newQueued;
     173                        queueCount = queued.size();
     174                        if (queueCount < MAX_QUEUED_PER_PEER)
     175                            queued.add(msg);
    178176                    }
    179                     queueCount = queued.size();
    180                     if ( (queueCount < MAX_QUEUED_PER_PEER) && (!rejected) )
    181                         queued.add(msg);
    182177                    deferred = _queuedOutbound.size();
    183178                } else {
     
    185180                                                       msg.getTarget().getIdentity(),
    186181                                                       new SessionKey(addr.getIntroKey()), addr);
    187                     _outboundStates.put(to, state);
    188                     SimpleScheduler.getInstance().addEvent(new Expire(to, state), 10*1000);
     182                    OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
     183                    boolean isNew = oldState == null;
     184                    if (!isNew)
     185                        // whoops, somebody beat us to it, throw out the state we just created
     186                        state = oldState;
     187                    else
     188                        SimpleScheduler.getInstance().addEvent(new Expire(to, state), 10*1000);
    189189                }
    190190            }
    191191            if (state != null) {
    192192                state.addMessage(msg);
    193                 List queued = (List)_queuedOutbound.remove(to);
     193                List<OutNetMessage> queued = _queuedOutbound.remove(to);
    194194                if (queued != null)
    195195                    for (int i = 0; i < queued.size(); i++)
    196                         state.addMessage((OutNetMessage)queued.get(i));
    197             }
    198         }
     196                        state.addMessage(queued.get(i));
     197            }
    199198       
    200199        if (rejected) {
     
    224223        }
    225224        public void timeReached() {
    226             Object removed = null;
    227             synchronized (_outboundStates) {
    228                 removed = _outboundStates.remove(_to);
    229                 if ( (removed != null) && (removed != _state) ) { // oops, we must have failed, then retried
    230                     _outboundStates.put(_to, removed);
    231                     removed = null;
    232                 }/* else {
    233                     locked_admitQueued();
    234                 }*/
    235             }
    236             if (removed != null) {
     225            // remove only if value == state
     226            boolean removed = _outboundStates.remove(_to, _state);
     227            if (removed) {
    237228                _context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime());
    238229                if (_log.shouldLog(Log.WARN))
     
    261252       
    262253        boolean isNew = false;
    263         InboundEstablishState state = null;
    264         synchronized (_inboundStates) {
     254
    265255            if (_inboundStates.size() >= maxInbound)
    266256                return; // drop the packet
    267257           
    268             state = (InboundEstablishState)_inboundStates.get(from);
     258            InboundEstablishState state = _inboundStates.get(from);
    269259            if (state == null) {
    270260                if (_context.blocklist().isBlocklisted(from.getIP())) {
     
    277267                state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort());
    278268                state.receiveSessionRequest(reader.getSessionRequestReader());
    279                 isNew = true;
    280                 _inboundStates.put(from, state);
    281             }
    282         }
     269                InboundEstablishState oldState = _inboundStates.putIfAbsent(from, state);
     270                isNew = oldState == null;
     271                if (!isNew)
     272                    // whoops, somebody beat us to it, throw out the state we just created
     273                    state = oldState;
     274            }
     275
    283276        if (isNew) {
    284277            // we don't expect inbound connections when hidden, but it could happen
     
    308301     */
    309302    void receiveSessionConfirmed(RemoteHostId from, UDPPacketReader reader) {
    310         InboundEstablishState state = null;
    311         synchronized (_inboundStates) {
    312             state = (InboundEstablishState)_inboundStates.get(from);
    313         }
     303        InboundEstablishState state = _inboundStates.get(from);
    314304        if (state != null) {
    315305            state.receiveSessionConfirmed(reader.getSessionConfirmedReader());
     
    325315     */
    326316    void receiveSessionCreated(RemoteHostId from, UDPPacketReader reader) {
    327         OutboundEstablishState state = null;
    328         synchronized (_outboundStates) {
    329             state = (OutboundEstablishState)_outboundStates.get(from);
    330         }
     317        OutboundEstablishState state = _outboundStates.get(from);
    331318        if (state != null) {
    332319            state.receiveSessionCreated(reader.getSessionCreatedReader());
     
    347334        //int admitted = 0;
    348335        //int remaining = 0;
    349         synchronized (_outboundStates) {
     336
    350337            //active = _outboundStates.size();
    351338            _outboundStates.remove(state.getRemoteHostId());
    352             if (_queuedOutbound.size() > 0) {
    353339                // there shouldn't have been queued messages for this active state, but just in case...
    354                 List queued = (List)_queuedOutbound.remove(state.getRemoteHostId());
     340                List<OutNetMessage> queued = _queuedOutbound.remove(state.getRemoteHostId());
    355341                if (queued != null) {
    356342                    for (int i = 0; i < queued.size(); i++)
    357                         state.addMessage((OutNetMessage)queued.get(i));
     343                        state.addMessage(queued.get(i));
    358344                }
    359345               
    360346                //admitted = locked_admitQueued();
    361             }
    362347            //remaining = _queuedOutbound.size();
    363         }
     348
    364349        //if (admitted > 0)
    365350        //    _log.log(Log.CRIT, "Admitted " + admitted + " with " + remaining + " remaining queued and " + active + " active");
     
    372357    }
    373358
     359/********
    374360    private int locked_admitQueued() {
    375361        int admitted = 0;
     
    410396        return admitted;
    411397    }
     398*******/
    412399   
    413400    private void notifyActivity() {
     
    597584            if (_log.shouldLog(Log.ERROR))
    598585                _log.error("Peer " + state.getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe);
    599             synchronized (_inboundStates) {
    600                 _inboundStates.remove(state.getRemoteHostId());
    601             }
     586            _inboundStates.remove(state.getRemoteHostId());
    602587            return;
    603588        }
     
    628613        long nonce = _context.random().nextLong(MAX_NONCE);
    629614        while (true) {
    630             synchronized (_liveIntroductions) {
    631                 OutboundEstablishState old = (OutboundEstablishState)_liveIntroductions.put(new Long(nonce), state);
     615                OutboundEstablishState old = _liveIntroductions.putIfAbsent(new Long(nonce), state);
    632616                if (old != null) {
    633617                    nonce = _context.random().nextLong(MAX_NONCE);
     
    635619                    break;
    636620                }
    637             }
    638621        }
    639622        SimpleScheduler.getInstance().addEvent(new FailIntroduction(state, nonce), INTRO_ATTEMPT_TIMEOUT);
     
    657640        }
    658641        public void timeReached() {
    659             OutboundEstablishState removed = null;
    660             synchronized (_liveIntroductions) {
    661                 removed = (OutboundEstablishState)_liveIntroductions.remove(new Long(_nonce));
    662                 if (removed != _state) {
    663                     // another one with the same nonce in a very brief time...
    664                     _liveIntroductions.put(new Long(_nonce), removed);
    665                     removed = null;
    666                 }
    667             }
    668             if (removed != null) {
     642            // remove only if value equal to state
     643            boolean removed = _liveIntroductions.remove(new Long(_nonce), _state);
     644            if (removed) {
    669645                if (_log.shouldLog(Log.DEBUG))
    670646                    _log.debug("Send intro for " + _state.getRemoteHostId().toString() + " timed out");
     
    678654    public void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) {
    679655        long nonce = reader.getRelayResponseReader().readNonce();
    680         OutboundEstablishState state = null;
    681         synchronized (_liveIntroductions) {
    682             state = (OutboundEstablishState)_liveIntroductions.remove(new Long(nonce));
    683         }
     656        OutboundEstablishState state = _liveIntroductions.remove(new Long(nonce));
    684657        if (state == null)
    685658            return; // already established
     
    706679        RemoteHostId oldId = state.getRemoteHostId();
    707680        state.introduced(addr, ip, port);
    708         synchronized (_outboundStates) {
    709             _outboundStates.remove(oldId);
    710             _outboundStates.put(state.getRemoteHostId(), state);
    711         }
     681        _outboundStates.remove(oldId);
     682        _outboundStates.put(state.getRemoteHostId(), state);
    712683        notifyActivity();
    713684    }
     
    749720        long nextSendTime = -1;
    750721        InboundEstablishState inboundState = null;
    751         synchronized (_inboundStates) {
     722
    752723            //if (_log.shouldLog(Log.DEBUG))
    753724            //    _log.debug("# inbound states: " + _inboundStates.size());
    754             for (Iterator iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
    755                 InboundEstablishState cur = (InboundEstablishState)iter.next();
     725            for (Iterator<InboundEstablishState> iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
     726                InboundEstablishState cur = iter.next();
    756727                if (cur.getState() == InboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
    757728                    // completely received (though the signature may be invalid)
     
    792763                }
    793764            }
    794         }
    795765
    796766        if (inboundState != null) {
     
    854824        //int remaining = 0;
    855825        //int active = 0;
    856         synchronized (_outboundStates) {
     826
    857827            //active = _outboundStates.size();
    858828            //if (_log.shouldLog(Log.DEBUG))
    859829            //    _log.debug("# outbound states: " + _outboundStates.size());
    860             for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
    861                 OutboundEstablishState cur = (OutboundEstablishState)iter.next();
     830            for (Iterator<OutboundEstablishState> iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
     831                OutboundEstablishState cur = iter.next();
    862832                if (cur == null) continue;
    863833                if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
     
    903873            //admitted = locked_admitQueued();   
    904874            //remaining = _queuedOutbound.size();
    905         }
    906875
    907876        //if (admitted > 0)
  • router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java

    r05597ae r617d1cd  
    2222 */
    2323public class InboundEstablishState {
    24     private RouterContext _context;
    25     private Log _log;
     24    private final RouterContext _context;
     25    private final Log _log;
    2626    // SessionRequest message
    2727    private byte _receivedX[];
    2828    private byte _bobIP[];
    29     private int _bobPort;
     29    private final int _bobPort;
    3030    private DHSessionKeyBuilder _keyBuilder;
    3131    // SessionCreated message
    3232    private byte _sentY[];
    33     private byte _aliceIP[];
    34     private int _alicePort;
     33    private final byte _aliceIP[];
     34    private final int _alicePort;
    3535    private long _sentRelayTag;
    3636    private long _sentSignedOnTime;
     
    4545    private RouterIdentity _receivedConfirmedIdentity;
    4646    // general status
    47     private long _establishBegin;
    48     private long _lastReceive;
     47    private final long _establishBegin;
     48    //private long _lastReceive;
    4949    // private long _lastSend;
    5050    private long _nextSend;
    51     private RemoteHostId _remoteHostId;
     51    private final RemoteHostId _remoteHostId;
    5252    private int _currentState;
    5353    private boolean _complete;
     
    122122
    123123    /** what IP do they appear to be on? */
    124     public synchronized byte[] getSentIP() { return _aliceIP; }
     124    public byte[] getSentIP() { return _aliceIP; }
     125
    125126    /** what port number do they appear to be coming from? */
    126     public synchronized int getSentPort() { return _alicePort; }
     127    public int getSentPort() { return _alicePort; }
    127128   
    128129    public synchronized byte[] getBobIP() { return _bobIP; }
     
    206207   
    207208    /** how long have we been trying to establish this session? */
    208     public synchronized long getLifetime() { return _context.clock().now() - _establishBegin; }
    209     public synchronized long getEstablishBeginTime() { return _establishBegin; }
     209    public long getLifetime() { return _context.clock().now() - _establishBegin; }
     210    public long getEstablishBeginTime() { return _establishBegin; }
    210211    public synchronized long getNextSendTime() { return _nextSend; }
    211212    public synchronized void setNextSendTime(long when) { _nextSend = when; }
     
    329330   
    330331    private void packetReceived() {
    331         _lastReceive = _context.clock().now();
    332         _nextSend = _lastReceive;
     332        _nextSend = _context.clock().now();
    333333    }
    334334   
  • router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java

    r05597ae r617d1cd  
    9797        Hash fromPeer = from.getRemotePeer();
    9898           
    99         Map messages = from.getInboundMessages();
    100            
     99        Map<Long, InboundMessageState> messages = from.getInboundMessages();
     100
    101101        for (int i = 0; i < fragments; i++) {
    102102            long mid = data.readMessageId(i);
     
    123123         
    124124            synchronized (messages) {
    125                 state = (InboundMessageState)messages.get(messageId);
     125                state = messages.get(messageId);
    126126                if (state == null) {
    127127                    state = new InboundMessageState(_context, mid, fromPeer);
  • router/java/src/net/i2p/router/transport/udp/InboundMessageState.java

    r05597ae r617d1cd  
    173173    @Override
    174174    public String toString() {
    175         StringBuilder buf = new StringBuilder(32);
    176         buf.append("Message: ").append(_messageId);
     175        StringBuilder buf = new StringBuilder(256);
     176        buf.append("IB Message: ").append(_messageId);
    177177        if (isComplete()) {
    178178            buf.append(" completely received with ");
  • router/java/src/net/i2p/router/transport/udp/IntroductionManager.java

    r05597ae r617d1cd  
    77import java.util.Map;
    88import java.util.Properties;
     9import java.util.Set;
     10import java.util.concurrent.ConcurrentHashMap;
    911
    1012import net.i2p.data.Base64;
     
    1315import net.i2p.data.SessionKey;
    1416import net.i2p.router.RouterContext;
     17import net.i2p.util.ConcurrentHashSet;
    1518import net.i2p.util.Log;
    1619
     
    2427    private PacketBuilder _builder;
    2528    /** map of relay tag to PeerState that should receive the introduction */
    26     private Map<Long, PeerState> _outbound;
     29    private final Map<Long, PeerState> _outbound;
    2730    /** list of peers (PeerState) who have given us introduction tags */
    28     private final List<PeerState> _inbound;
     31    private final Set<PeerState> _inbound;
    2932
    3033    public IntroductionManager(RouterContext ctx, UDPTransport transport) {
     
    3336        _transport = transport;
    3437        _builder = new PacketBuilder(ctx, transport);
    35         _outbound = Collections.synchronizedMap(new HashMap(128));
    36         _inbound = new ArrayList(128);
     38        _outbound = new ConcurrentHashMap(128);
     39        _inbound = new ConcurrentHashSet(128);
    3740        ctx.statManager().createRateStat("udp.receiveRelayIntro", "How often we get a relayed request for us to talk to someone?", "udp", UDPTransport.RATES);
    3841        ctx.statManager().createRateStat("udp.receiveRelayRequest", "How often we receive a good request to relay to someone else?", "udp", UDPTransport.RATES);
     
    5356            _outbound.put(new Long(peer.getWeRelayToThemAs()), peer);
    5457        if (peer.getTheyRelayToUsAs() > 0) {
    55             synchronized (_inbound) {
    56                 if (!_inbound.contains(peer))
    5758                    _inbound.add(peer);
    58             }
    5959        }
    6060    }
     
    6868            _outbound.remove(new Long(peer.getWeRelayToThemAs()));
    6969        if (peer.getTheyRelayToUsAs() > 0) {
    70             synchronized (_inbound) {
    71                 _inbound.remove(peer);
    72             }
     70            _inbound.remove(peer);
    7371        }
    7472    }
     
    9189     */
    9290    public int pickInbound(Properties ssuOptions, int howMany) {
    93         List<PeerState> peers = null;
    9491        int start = _context.random().nextInt(Integer.MAX_VALUE);
    95         synchronized (_inbound) {
    96             if (_log.shouldLog(Log.DEBUG))
    97                 _log.debug("Picking inbound out of " + _inbound.size());
    98             if (_inbound.size() <= 0) return 0;
    99             peers = new ArrayList(_inbound);
    100         }
     92        if (_log.shouldLog(Log.DEBUG))
     93            _log.debug("Picking inbound out of " + _inbound.size());
     94        if (_inbound.isEmpty()) return 0;
     95        List<PeerState> peers = new ArrayList(_inbound);
    10196        int sz = peers.size();
    10297        start = start % sz;
     
    165160     */
    166161    int introducerCount() {
    167         synchronized(_inbound) {
    168162            return _inbound.size();
    169         }
    170163    }
    171164
  • router/java/src/net/i2p/router/transport/udp/MessageReceiver.java

    r05597ae r617d1cd  
    11package net.i2p.router.transport.udp;
    22
    3 import java.util.ArrayList;
    4 import java.util.List;
     3import java.util.concurrent.BlockingQueue;
     4import java.util.concurrent.LinkedBlockingQueue;
    55
    66import net.i2p.data.Base64;
     
    2525    private UDPTransport _transport;
    2626    /** list of messages (InboundMessageState) fully received but not interpreted yet */
    27     private final List _completeMessages;
     27    private final BlockingQueue<InboundMessageState> _completeMessages;
    2828    private boolean _alive;
    2929    private ByteCache _cache;
     30    private static final int THREADS = 5;
     31    private static final long POISON_IMS = -99999999999l;
    3032   
    3133    public MessageReceiver(RouterContext ctx, UDPTransport transport) {
     
    3335        _log = ctx.logManager().getLog(MessageReceiver.class);
    3436        _transport = transport;
    35         _completeMessages = new ArrayList(16);
     37        _completeMessages = new LinkedBlockingQueue();
    3638        _cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
    3739        _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
     
    4749    public void startup() {
    4850        _alive = true;
    49         for (int i = 0; i < 5; i++) {
    50             I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i);
    51             t.setDaemon(true);
     51        for (int i = 0; i < THREADS; i++) {
     52            I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i + '/' + THREADS, true);
    5253            t.start();
    5354        }
     
    6263    public void shutdown() {
    6364        _alive = false;
    64         synchronized (_completeMessages) {
    65             _completeMessages.clear();
    66             _completeMessages.notifyAll();
     65        _completeMessages.clear();
     66        for (int i = 0; i < THREADS; i++) {
     67            InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null);
     68            _completeMessages.offer(ims);
    6769        }
     70        for (int i = 1; i <= 5 && !_completeMessages.isEmpty(); i++) {
     71            try {
     72                Thread.sleep(i * 50);
     73            } catch (InterruptedException ie) {}
     74        }
     75        _completeMessages.clear();
    6876    }
    6977   
    7078    public void receiveMessage(InboundMessageState state) {
    71         int total = 0;
    72         long lag = -1;
    73         synchronized (_completeMessages) {
    74             _completeMessages.add(state);
    75             total = _completeMessages.size();
    76             if (total > 1)
    77                 lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();
    78             _completeMessages.notifyAll();
    79         }
    80         if (total > 1)
    81             _context.statManager().addRateData("udp.inboundReady", total, 0);
    82         if (lag > 1000)
    83             _context.statManager().addRateData("udp.inboundLag", lag, total);
     79        //int total = 0;
     80        //long lag = -1;
     81        if (_alive)
     82            _completeMessages.offer(state);
     83        //total = _completeMessages.size();
     84        //if (total > 1)
     85        //    lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();
     86        //if (total > 1)
     87        //    _context.statManager().addRateData("udp.inboundReady", total, 0);
     88        //if (lag > 1000)
     89        //    _context.statManager().addRateData("udp.inboundLag", lag, total);
    8490    }
    8591   
     
    9298            int remaining = 0;
    9399            try {
    94                 synchronized (_completeMessages) {
    95100                    while (message == null) {
    96                         if (_completeMessages.size() > 0) // grab the tail for lowest latency
    97                             message = (InboundMessageState)_completeMessages.remove(_completeMessages.size()-1);
    98                         else
    99                             _completeMessages.wait(5000);
     101                        message = _completeMessages.take();
     102                        if ( (message != null) && (message.getMessageId() == POISON_IMS) ) {
     103                            message = null;
     104                            break;
     105                        }
    100106                        if ( (message != null) && (message.isExpired()) ) {
    101107                            expiredLifetime += message.getLifetime();
     
    103109                            expired++;
    104110                        }
    105                         remaining = _completeMessages.size();
    106                     }
     111                        //remaining = _completeMessages.size();
    107112                }
    108113            } catch (InterruptedException ie) {}
  • router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java

    r05597ae r617d1cd  
    22
    33import java.net.InetAddress;
    4 import java.util.ArrayList;
    5 import java.util.List;
     4import java.util.Queue;
     5import java.util.concurrent.LinkedBlockingQueue;
    66
    77import net.i2p.crypto.DHSessionKeyBuilder;
     
    2323 */
    2424public class OutboundEstablishState {
    25     private RouterContext _context;
    26     private Log _log;
     25    private final RouterContext _context;
     26    private final Log _log;
    2727    // SessionRequest message
    2828    private byte _sentX[];
     
    4545    private Signature _sentSignature;
    4646    // general status
    47     private long _establishBegin;
    48     private long _lastReceive;
     47    private final long _establishBegin;
     48    //private long _lastReceive;
    4949    private long _lastSend;
    5050    private long _nextSend;
    5151    private RemoteHostId _remoteHostId;
    52     private RouterIdentity _remotePeer;
     52    private final RouterIdentity _remotePeer;
    5353    private SessionKey _introKey;
    54     private final List _queuedMessages;
     54    private final Queue<OutNetMessage> _queuedMessages;
    5555    private int _currentState;
    5656    private long _introductionNonce;
    5757    // intro
    58     private UDPAddress _remoteAddress;
     58    private final UDPAddress _remoteAddress;
    5959    private boolean _complete;
    6060   
     
    8888        _introKey = introKey;
    8989        _keyBuilder = null;
    90         _queuedMessages = new ArrayList(4);
     90        _queuedMessages = new LinkedBlockingQueue();
    9191        _currentState = STATE_UNKNOWN;
    9292        _establishBegin = ctx.clock().now();
     
    114114   
    115115    public void addMessage(OutNetMessage msg) {
    116         synchronized (_queuedMessages) {
    117             if (!_queuedMessages.contains(msg))
    118                 _queuedMessages.add(msg);
    119         }
    120     }
     116        // chance of a duplicate here in a race, that's ok
     117        if (!_queuedMessages.contains(msg))
     118            _queuedMessages.offer(msg);
     119        else if (_log.shouldLog(Log.WARN))
     120             _log.warn("attempt to add duplicate msg to queue: " + msg);
     121    }
     122
    121123    public OutNetMessage getNextQueuedMessage() {
    122         synchronized (_queuedMessages) {
    123             if (_queuedMessages.size() > 0)
    124                 return (OutNetMessage)_queuedMessages.remove(0);
    125         }
    126         return null;
     124        return _queuedMessages.poll();
    127125    }
    128126   
     
    130128    public SessionKey getIntroKey() { return _introKey; }
    131129   
     130    /** called from constructor, no need to synch */
    132131    private void prepareSessionRequest() {
    133132        _keyBuilder = new DHSessionKeyBuilder();
     
    143142    }
    144143
    145     public synchronized byte[] getSentX() { return _sentX; }
     144    public byte[] getSentX() { return _sentX; }
    146145    public synchronized byte[] getSentIP() { return _bobIP; }
    147146    public synchronized int getSentPort() { return _bobPort; }
     
    404403   
    405404    /** how long have we been trying to establish this session? */
    406     public synchronized long getLifetime() { return _context.clock().now() - _establishBegin; }
    407     public synchronized long getEstablishBeginTime() { return _establishBegin; }
     405    public long getLifetime() { return _context.clock().now() - _establishBegin; }
     406    public long getEstablishBeginTime() { return _establishBegin; }
    408407    public synchronized long getNextSendTime() { return _nextSend; }
    409408    public synchronized void setNextSendTime(long when) {
     
    423422   
    424423    private void packetReceived() {
    425         _lastReceive = _context.clock().now();
    426         _nextSend = _lastReceive;
     424        _nextSend = _context.clock().now();
    427425        if (_log.shouldLog(Log.DEBUG))
    428426            _log.debug("Got a packet, nextSend == now");
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java

    r05597ae r617d1cd  
    2929    // private ActiveThrottle _throttle; // LINT not used ??
    3030    /** peers we are actively sending messages to */
    31     private final List _activePeers;
     31    private final List<PeerState> _activePeers;
    3232    private boolean _alive;
    3333    /** which peer should we build the next packet out of? */
     
    208208            peers = new ArrayList(_activePeers.size());
    209209            for (int i = 0; i < _activePeers.size(); i++) {
    210                 PeerState state = (PeerState)_activePeers.get(i);
     210                PeerState state = _activePeers.get(i);
    211211                if (state.getOutboundMessageCount() <= 0) {
    212212                    _activePeers.remove(i);
     
    256256                                _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
    257257                        }
    258                         peer = (PeerState)_activePeers.get(i);
     258                        peer = _activePeers.get(i);
    259259                        state = peer.allocateSend();
    260260                        if (state != null) {
     
    319319
    320320            // ok, simplest possible thing is to always tack on the bitfields if
    321             List msgIds = peer.getCurrentFullACKs();
     321            List<Long> msgIds = peer.getCurrentFullACKs();
    322322            if (msgIds == null) msgIds = new ArrayList();
    323             List partialACKBitfields = new ArrayList();
     323            List<ACKBitfield> partialACKBitfields = new ArrayList();
    324324            peer.fetchPartialACKs(partialACKBitfields);
    325325            int piggybackedPartialACK = partialACKBitfields.size();
    326             List remaining = new ArrayList(msgIds);
     326            List<Long> remaining = new ArrayList(msgIds);
    327327            int sparseCount = 0;
    328328            UDPPacket rv[] = new UDPPacket[fragments]; //sparse
     
    357357            if (msgIds.size() != remaining.size()) {
    358358                for (int i = 0; i < msgIds.size(); i++) {
    359                     Long id = (Long)msgIds.get(i);
     359                    Long id = msgIds.get(i);
    360360                    if (!remaining.contains(id)) {
    361361                        peer.removeACKMessage(id);
  • router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java

    r05597ae r617d1cd  
    343343        short sends[] = _fragmentSends;
    344344        ByteArray messageBuf = _messageBuf;
    345         StringBuilder buf = new StringBuilder(64);
    346         buf.append("Message ").append(_messageId);
     345        StringBuilder buf = new StringBuilder(256);
     346        buf.append("OB Message ").append(_messageId);
    347347        if (sends != null)
    348348            buf.append(" with ").append(sends.length).append(" fragments");
  • router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java

    r05597ae r617d1cd  
    3232    public void startup() {
    3333        _alive = true;
    34         I2PThread t = new I2PThread(this, "UDP outbound refiller");
    35         t.setDaemon(true);
     34        I2PThread t = new I2PThread(this, "UDP outbound refiller", true);
    3635        t.start();
    3736    }
  • router/java/src/net/i2p/router/transport/udp/PacketBuilder.java

    r05597ae r617d1cd  
    33import java.net.InetAddress;
    44import java.net.UnknownHostException;
    5 import java.util.ArrayList;
    65import java.util.Arrays;
     6import java.util.Collections;
    77import java.util.Date;
    88import java.util.List;
     
    3333    private static final ByteCache _blockCache = ByteCache.getInstance(64, 16);
    3434
     35    /**
     36     *  For debugging and stats only - does not go out on the wire.
     37     *  These are chosen to be higher than the highest I2NP message type,
     38     *  as a data packet is set to the underlying I2NP message type.
     39     */
     40    static final int TYPE_FIRST = 42;
     41    static final int TYPE_ACK = TYPE_FIRST;
     42    static final int TYPE_PUNCH = 43;
     43    static final int TYPE_RESP = 44;
     44    static final int TYPE_INTRO = 45;
     45    static final int TYPE_RREQ = 46;
     46    static final int TYPE_TCB = 47;
     47    static final int TYPE_TBC = 48;
     48    static final int TYPE_TTA = 49;
     49    static final int TYPE_TFA = 50;
     50    static final int TYPE_CONF = 51;
     51    static final int TYPE_SREQ = 52;
     52    static final int TYPE_CREAT = 53;
     53
    3554    /** we only talk to people of the right version */
    3655    static final int PROTOCOL_VERSION = 0;
     
    5978     *                        included, it should be removed from the list.
    6079     */
    61     public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) {
     80    public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List<Long> ackIdsRemaining, List<ACKBitfield> partialACKsRemaining) {
    6281        UDPPacket packet = UDPPacket.acquire(_context, false);
    6382
     
    93112        // just one or two), and since the packets are so small anyway, an additional five
    94113        // or ten bytes doesn't hurt.
    95         if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) )
     114        if ( (ackIdsRemaining != null) && (!ackIdsRemaining.isEmpty()) )
    96115            data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK;
    97         if ( (partialACKsRemaining != null) && (partialACKsRemaining.size() > 0) )
     116        if ( (partialACKsRemaining != null) && (!partialACKsRemaining.isEmpty()) )
    98117            data[off] |= UDPPacket.DATA_FLAG_ACK_BITFIELDS;
    99118        off++;
    100119
    101         if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) {
     120        if ( (ackIdsRemaining != null) && (!ackIdsRemaining.isEmpty()) ) {
    102121            DataHelper.toLong(data, off, 1, ackIdsRemaining.size());
    103122            off++;
    104123            for (int i = 0; i < ackIdsRemaining.size(); i++) {
    105124            //while (ackIdsRemaining.size() > 0) {
    106                 Long ackId = (Long)ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0);
     125                Long ackId = ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0);
    107126                DataHelper.toLong(data, off, 4, ackId.longValue());
    108127                off += 4;       
     
    119138            off++;
    120139            for (int i = 0; i < partialACKsRemaining.size(); i++) {
    121                 ACKBitfield bitfield = (ACKBitfield)partialACKsRemaining.get(i);
     140                ACKBitfield bitfield = partialACKsRemaining.get(i);
    122141                if (bitfield.receivedComplete()) continue;
    123142                DataHelper.toLong(data, off, 4, bitfield.getMessageId());
     
    215234    // It doesn't generate a reply, but that's ok.
    216235    public UDPPacket buildPing(PeerState peer) {
    217         return buildACK(peer, new ArrayList(0));
     236        return buildACK(peer, Collections.EMPTY_LIST);
    218237    }
    219238
     
    221240   
    222241    /**
     242     *  Build the ack packet. The list need not be sorted into full and partial;
     243     *  this method will put all fulls before the partials in the outgoing packet.
     244     *
    223245     * @param ackBitfields list of ACKBitfield instances to either fully or partially ACK
    224246     */
    225     public UDPPacket buildACK(PeerState peer, List ackBitfields) {
     247    public UDPPacket buildACK(PeerState peer, List<ACKBitfield> ackBitfields) {
    226248        UDPPacket packet = UDPPacket.acquire(_context, false);
    227249       
     
    264286            off++;
    265287            for (int i = 0; i < ackBitfields.size(); i++) {
    266                 ACKBitfield bf = (ACKBitfield)ackBitfields.get(i);
     288                ACKBitfield bf = ackBitfields.get(i);
    267289                if (bf.receivedComplete()) {
    268290                    DataHelper.toLong(data, off, 4, bf.getMessageId());
     
    416438        setTo(packet, to, state.getSentPort());
    417439        _ivCache.release(iv);
    418         packet.setMessageType(53);
     440        packet.setMessageType(TYPE_CREAT);
    419441        return packet;
    420442    }
     
    480502        authenticate(packet, state.getIntroKey(), state.getIntroKey());
    481503        setTo(packet, to, state.getSentPort());
    482         packet.setMessageType(52);
     504        packet.setMessageType(TYPE_SREQ);
    483505        return packet;
    484506    }
     
    587609       
    588610        setTo(packet, to, state.getSentPort());
    589         packet.setMessageType(51);
     611        packet.setMessageType(TYPE_CONF);
    590612        return packet;
    591613    }
     
    640662        authenticate(packet, toCipherKey, toMACKey);
    641663        setTo(packet, toIP, toPort);
    642         packet.setMessageType(50);
     664        packet.setMessageType(TYPE_TFA);
    643665        return packet;
    644666    }
     
    685707        authenticate(packet, aliceIntroKey, aliceIntroKey);
    686708        setTo(packet, aliceIP, alicePort);
    687         packet.setMessageType(49);
     709        packet.setMessageType(TYPE_TTA);
    688710        return packet;
    689711    }
     
    732754        authenticate(packet, charlieCipherKey, charlieMACKey);
    733755        setTo(packet, charlieIP, charliePort);
    734         packet.setMessageType(48);
     756        packet.setMessageType(TYPE_TBC);
    735757        return packet;
    736758    }
     
    777799        authenticate(packet, bobCipherKey, bobMACKey);
    778800        setTo(packet, bobIP, bobPort);
    779         packet.setMessageType(47);
     801        packet.setMessageType(TYPE_TCB);
    780802        return packet;
    781803    }
     
    876898            authenticate(packet, new SessionKey(introKey), new SessionKey(introKey));
    877899        setTo(packet, introHost, introPort);
    878         packet.setMessageType(46);
     900        packet.setMessageType(TYPE_RREQ);
    879901        return packet;
    880902    }
     
    926948        authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey());
    927949        setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort());
    928         packet.setMessageType(45);
     950        packet.setMessageType(TYPE_INTRO);
    929951        return packet;
    930952    }
     
    9871009        authenticate(packet, aliceIntroKey, aliceIntroKey);
    9881010        setTo(packet, aliceAddr, alice.getPort());
    989         packet.setMessageType(44);
     1011        packet.setMessageType(TYPE_RESP);
    9901012        return packet;
    9911013    }
     
    10201042        setTo(packet, to, port);
    10211043       
    1022         packet.setMessageType(43);
     1044        packet.setMessageType(TYPE_PUNCH);
    10231045        return packet;
    10241046    }
  • router/java/src/net/i2p/router/transport/udp/PacketHandler.java

    r05597ae r617d1cd  
    11package net.i2p.router.transport.udp;
    22
    3 import java.util.ArrayList;
    43import java.util.Date;
    5 import java.util.List;
    64
    75import net.i2p.router.Router;
     
    3230    private IntroductionManager _introManager;
    3331    private boolean _keepReading;
    34     private List _handlers;
     32    private final Handler[] _handlers;
    3533   
    3634    private static final int NUM_HANDLERS = 5;
     
    4745        _testManager = testManager;
    4846        _introManager = introManager;
    49         _handlers = new ArrayList(NUM_HANDLERS);
     47        _handlers = new Handler[NUM_HANDLERS];
    5048        for (int i = 0; i < NUM_HANDLERS; i++) {
    51             _handlers.add(new Handler());
     49            _handlers[i] = new Handler();
    5250        }
    5351        _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES);
     
    8280    public void startup() {
    8381        _keepReading = true;
    84         for (int i = 0; i < _handlers.size(); i++) {
    85             I2PThread t = new I2PThread((Handler)_handlers.get(i), "UDP Packet handler " + i + "/" + _handlers.size());
    86             t.setDaemon(true);
     82        for (int i = 0; i < NUM_HANDLERS; i++) {
     83            I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + i + '/' + NUM_HANDLERS, true);
    8784            t.start();
    8885        }
     
    9592    String getHandlerStatus() {
    9693        StringBuilder rv = new StringBuilder();
    97         int size = _handlers.size();
    98         rv.append("Handlers: ").append(size);
    99         for (int i = 0; i < size; i++) {
    100             Handler handler = (Handler)_handlers.get(i);
     94        rv.append("Handlers: ").append(NUM_HANDLERS);
     95        for (int i = 0; i < NUM_HANDLERS; i++) {
     96            Handler handler = _handlers[i];
    10197            rv.append(" handler ").append(i).append(" state: ").append(handler._state);
    10298        }
  • router/java/src/net/i2p/router/transport/udp/PacketPusher.java

    r05597ae r617d1cd  
    2626    public void startup() {
    2727        _alive = true;
    28         I2PThread t = new I2PThread(this, "UDP packet pusher");
    29         t.setDaemon(true);
     28        I2PThread t = new I2PThread(this, "UDP packet pusher", true);
    3029        t.start();
    3130    }
     
    4039                    for (int i = 0; i < packets.length; i++) {
    4140                        if (packets[i] != null) // null for ACKed fragments
    42                             _sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
     41                            //_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
     42                            _sender.add(packets[i]);
    4343                    }
    4444                }
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r05597ae r617d1cd  
    99import java.util.List;
    1010import java.util.Map;
     11import java.util.Set;
     12import java.util.Queue;
     13import java.util.concurrent.LinkedBlockingQueue;
    1114
    1215import net.i2p.data.Hash;
     
    1518import net.i2p.router.RouterContext;
    1619import net.i2p.util.Log;
     20import net.i2p.util.ConcurrentHashSet;
    1721
    1822/**
     
    7478    /** when did we last have a failed send (beginning of period) */
    7579    // private long _lastFailedSendPeriod;
    76     /** list of messageIds (Long) that we have received but not yet sent */
    77     private final List _currentACKs;
     80
     81    /**
     82     *  Set of messageIds (Long) that we have received but not yet sent
     83     *  Since even with the smallest MTU we can fit 131 acks in a message,
     84     *  we are unlikely to get backed up on acks, so we don't keep
     85     *  them in any particular order.
     86     */
     87    private final Set<Long> _currentACKs;
     88
    7889    /**
    7990     * list of the most recent messageIds (Long) that we have received and sent
     
    8192     * hopefully saving some spurious retransmissions
    8293     */
    83     private final List _currentACKsResend;
     94    private final Queue<Long> _currentACKsResend;
     95
    8496    /** when did we last send ACKs to the peer? */
    8597    private volatile long _lastACKSend;
     
    170182   
    171183    /** list of InboundMessageState for active message */
    172     private final Map _inboundMessages;
     184    private final Map<Long, InboundMessageState> _inboundMessages;
    173185    /** list of OutboundMessageState */
    174     private final List _outboundMessages;
     186    private final List<OutboundMessageState> _outboundMessages;
    175187    /** which outbound message is currently being retransmitted */
    176188    private OutboundMessageState _retransmitter;
     
    181193    private volatile boolean _dead;
    182194
     195    /** Make sure a 4229 byte TunnelBuildMessage can be sent in one volley with small MTU */
     196    private static final int MIN_CONCURRENT_MSGS = 8;
    183197    /** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
    184     private volatile int _concurrentMessagesAllowed = 8;
     198    private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
    185199    /**
    186200     * how many outbound messages are currently being transmitted.  Not thread safe, as we're not strict
     
    204218     * for UDP, and 8 for IP, giving us 596.  round up to mod 16, giving a total
    205219     * of 608
     220     *
     221     * Well, we really need to count the acks as well, especially
     222     * 4 * MAX_RESEND_ACKS which can take up a significant amount of space.
     223     * We reduce the max acks when using the small MTU but it may not be enough...
     224     *
    206225     */
    207226    private static final int MIN_MTU = 608;//600; //1500;
     
    235254        _lastSendTime = -1;
    236255        _lastReceiveTime = -1;
    237         _currentACKs = new ArrayList(8);
    238         _currentACKsResend = new ArrayList(8);
     256        _currentACKs = new ConcurrentHashSet();
     257        _currentACKsResend = new LinkedBlockingQueue();
    239258        _currentSecondECNReceived = false;
    240259        _remoteWantsPreviousACKs = false;
     
    583602        }
    584603       
    585         synchronized (_currentACKs) {
    586             if (_wantACKSendSince <= 0)
    587                 _wantACKSendSince = now;
    588         &nbs