Changeset b53ed94


Ignore:
Timestamp:
Nov 12, 2013 7:28:23 PM (6 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
79b5d97
Parents:
df84a2f
Message:

Findbugs:

  • Fix several 'increment of volatile is not atomic' all over Remaining: UDP PeerState?.java, to be checked in separately
  • Comment out all of unused MessageStateMonitor?
Files:
11 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    rdf84a2f rb53ed94  
    77import java.util.StringTokenizer;
    88import java.util.concurrent.ConcurrentHashMap;
     9import java.util.concurrent.atomic.AtomicInteger;
    910
    1011import net.i2p.I2PAppContext;
     
    4142    private volatile boolean _throttlersInitialized;
    4243    private final ConnectionOptions _defaultOptions;
    43     private volatile int _numWaiting;
     44    private final AtomicInteger _numWaiting = new AtomicInteger();
    4445    private long _soTimeout;
    4546    private volatile ConnThrottler _minuteThrottler;
     
    300301        if (opts.getConnectTimeout() <= 0)
    301302            expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
    302         _numWaiting++;
     303        _numWaiting.incrementAndGet();
    303304        while (true) {
    304305            long remaining = expiration - _context.clock().now();
     
    306307                _log.logAlways(Log.WARN, "Refusing to connect since we have exceeded our max of "
    307308                          + _defaultOptions.getMaxConns() + " connections");
    308                 _numWaiting--;
     309                _numWaiting.decrementAndGet();
    309310                return null;
    310311            }
     
    313314                    int max = _defaultOptions.getMaxConns();
    314315                    // allow a full buffer of pending/waiting streams
    315                     if (_numWaiting > max) {
     316                    if (_numWaiting.get() > max) {
    316317                        _log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
    317318                                      + max + " and there are " + _numWaiting
    318319                                      + " waiting already");
    319                         _numWaiting--;
     320                        _numWaiting.decrementAndGet();
    320321                        return null;
    321322                    }
     
    347348            con.waitForConnect();
    348349        }
    349         if (_numWaiting > 0)
    350             _numWaiting--;
     350        // safe decrement
     351        for (;;) {
     352            int n = _numWaiting.get();
     353            if (n <= 0)
     354                break;
     355            if (_numWaiting.compareAndSet(n, n - 1))
     356                break;
     357        }
    351358       
    352359        _context.statManager().addRateData("stream.connectionCreated", 1, 0);
  • apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java

    rdf84a2f rb53ed94  
    3737    private int _readyDataBlockIndex;
    3838    /** highest message ID used in the readyDataBlocks */
    39     private volatile long _highestReadyBlockId;
     39    private long _highestReadyBlockId;
    4040    /** highest overall message ID */
    41     private volatile long _highestBlockId;
     41    private long _highestBlockId;
    4242    /**
    4343     * Message ID (Long) to ByteArray for blocks received
     
    7777     */
    7878    public long getHighestReadyBockId() {
    79         // not synchronized as it doesnt hurt to read a too-low value
    80         return _highestReadyBlockId;
     79        synchronized (_dataLock) {
     80            return _highestReadyBlockId;
     81        }
    8182    }
    8283   
     
    8586     */
    8687    public long getHighestBlockId() {
    87         // not synchronized as it doesnt hurt to read a too-low value
    88         return _highestBlockId;
     88        synchronized (_dataLock) {
     89            return _highestBlockId;
     90        }
    8991    }
    9092   
  • apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java

    rdf84a2f rb53ed94  
    2525    private SessionKey _keyUsed;
    2626    private final long _createdOn;
    27     private volatile int _numSends;
     27    private final AtomicInteger _numSends = new AtomicInteger();
    2828    private volatile long _lastSend;
    2929    private long _acceptedOn;
     
    100100        if (_connection != null)
    101101            _connection.getInputStream().updateAcks(this);
    102         if (_numSends > 0) {
     102        int numSends = _numSends.get();
     103        if (numSends > 0) {
    103104            // so we can debug to differentiate resends
    104             setOptionalDelay(_numSends * 1000);
     105            setOptionalDelay(numSends * 1000);
    105106            setFlag(FLAG_DELAY_REQUESTED);
    106107        }
     
    110111    public long getLifetime() { return _context.clock().now() - _createdOn; }
    111112    public void incrementSends() {
    112         _numSends++;
     113        _numSends.incrementAndGet();
    113114        _lastSend = _context.clock().now();
    114115    }
     
    153154            return (int)(_ackOn - _createdOn);
    154155    }
    155     public int getNumSends() { return _numSends; }
     156    public int getNumSends() { return _numSends.get(); }
    156157    public long getLastSend() { return _lastSend; }
    157158
     
    167168        SimpleTimer2.TimedEvent evt = _resendEvent;
    168169        if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
    169             (_numSends == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
     170            (_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) {  // Don't fast retx if we recently resent it
    170171            _retransmitted = true;
    171172            evt.reschedule(0);
     
    175176                final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
    176177                    " numSends=%d, lastSend=%d, now=%d",
    177                     toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now());
     178                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
    178179                    _log.debug(log);
    179180            }
     
    181182            final String log = String.format("%s nack but no retransmit.  Criteria: nacks=%d, retransmitted=%b,"+
    182183                    " numSends=%d, lastSend=%d, now=%d",
    183                     toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now());
     184                    toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
    184185                    _log.debug(log);
    185186        }
     
    204205        }
    205206       
    206         if (_numSends > 1)
    207             buf.append(" sent ").append(_numSends).append(" times");
     207        int numSends = _numSends.get();
     208        if (numSends > 1)
     209            buf.append(" sent ").append(numSends).append(" times");
    208210       
    209211        if (isFlagSet(FLAG_SYNCHRONIZE |
  • router/java/src/net/i2p/router/JobStats.java

    rdf84a2f rb53ed94  
    11package net.i2p.router;
     2
     3import java.util.concurrent.atomic.AtomicLong;
    24
    35import net.i2p.data.DataHelper;
     
    1012public class JobStats {
    1113    private final String _job;
    12     private volatile long _numRuns;
     14    private final AtomicLong _numRuns = new AtomicLong();
    1315    private volatile long _totalTime;
    1416    private volatile long _maxTime;
     
    2729   
    2830    public void jobRan(long runTime, long lag) {
    29         _numRuns++;
     31        _numRuns.incrementAndGet();
    3032        _totalTime += runTime;
    3133        if ( (_maxTime < 0) || (runTime > _maxTime) )
     
    4143   
    4244    public String getName() { return _job; }
    43     public long getRuns() { return _numRuns; }
     45    public long getRuns() { return _numRuns.get(); }
    4446    public long getTotalTime() { return _totalTime; }
    4547    public long getMaxTime() { return _maxTime; }
    4648    public long getMinTime() { return _minTime; }
    4749    public long getAvgTime() {
    48         if (_numRuns > 0)
    49             return _totalTime / _numRuns;
     50        long numRuns = _numRuns.get();
     51        if (numRuns > 0)
     52            return _totalTime / numRuns;
    5053        else
    5154            return 0;
     
    5558    public long getMinPendingTime() { return _minPendingTime; }
    5659    public long getAvgPendingTime() {
    57         if (_numRuns > 0)
    58             return _totalPendingTime / _numRuns;
     60        long numRuns = _numRuns.get();
     61        if (numRuns > 0)
     62            return _totalPendingTime / numRuns;
    5963        else
    6064            return 0;
  • router/java/src/net/i2p/router/MessageStateMonitor.java

    rdf84a2f rb53ed94  
    99 */
    1010public class MessageStateMonitor {
     11/****
    1112    private Log _log;
    1213    private volatile int _inboundLiveCount;
     
    6667    public int getOutboundLiveCount() { return _outboundLiveCount; }
    6768    public int getOutboundDiscardedCount() { return _outboundDiscardedCount; }
     69****/
    6870}
  • router/java/src/net/i2p/router/RouterDoSThrottle.java

    rdf84a2f rb53ed94  
    11package net.i2p.router;
     2
     3import java.util.concurrent.atomic.AtomicInteger;
    24
    35import net.i2p.data.Hash;
     
    1618   
    1719    private volatile long _currentLookupPeriod;
    18     private volatile int _currentLookupCount;
     20    private final AtomicInteger _currentLookupCount = new AtomicInteger();
    1921    // if we receive over 20 netDb lookups in 10 seconds, someone is acting up
    2022    private static final long LOOKUP_THROTTLE_PERIOD = 10*1000;
     
    3133        if (_currentLookupPeriod + LOOKUP_THROTTLE_PERIOD > now) {
    3234            // same period, check for DoS
    33             _currentLookupCount++;
    34             if (_currentLookupCount >= LOOKUP_THROTTLE_MAX) {
    35                 _context.statManager().addRateData("router.throttleNetDbDoS", _currentLookupCount, 0);
    36                 int rand = _context.random().nextInt(_currentLookupCount);
     35            int cnt = _currentLookupCount.incrementAndGet();
     36            if (cnt >= LOOKUP_THROTTLE_MAX) {
     37                _context.statManager().addRateData("router.throttleNetDbDoS", cnt, 0);
     38                int rand = _context.random().nextInt(cnt);
    3739                if (rand > LOOKUP_THROTTLE_MAX) {
    3840                    return false;
     
    4850            // (no, I'm not worried about concurrency here)
    4951            _currentLookupPeriod = now;
    50             _currentLookupCount = 1;
     52            _currentLookupCount.set(1);
    5153            return true;
    5254        }
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodOnlySearchJob.java

    rdf84a2f rb53ed94  
    144144
    145145        int count = 0; // keep a separate count since _lookupsRemaining could be decremented elsewhere
    146         for (int i = 0; _lookupsRemaining < CONCURRENT_SEARCHES && i < floodfillPeers.size(); i++) {
     146        for (int i = 0; _lookupsRemaining.get() < CONCURRENT_SEARCHES && i < floodfillPeers.size(); i++) {
    147147            Hash peer = floodfillPeers.get(i);
    148148            if (peer.equals(getContext().routerHash()))
     
    178178            getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
    179179            count++;
    180             _lookupsRemaining++;
     180            _lookupsRemaining.incrementAndGet();
    181181        }
    182182       
  • router/java/src/net/i2p/router/networkdb/kademlia/FloodSearchJob.java

    rdf84a2f rb53ed94  
    33import java.util.List;
    44import java.util.concurrent.CopyOnWriteArrayList;
     5import java.util.concurrent.atomic.AtomicInteger;
    56
    67import net.i2p.data.Hash;
     
    3031    protected int _timeoutMs;
    3132    protected final boolean _isLease;
    32     protected volatile int _lookupsRemaining;
     33    protected final AtomicInteger _lookupsRemaining = new AtomicInteger();
    3334    protected volatile boolean _dead;
    3435    protected final long _created;
     
    149150
    150151    /**
    151      *  TODO AtomicInteger?
    152152     *  @return number remaining after decrementing
    153153     */
    154154    protected int decrementRemaining() {
    155         if (_lookupsRemaining > 0)
    156             return (--_lookupsRemaining);
    157         return 0;
    158     }
    159 
    160     protected int getLookupsRemaining() { return _lookupsRemaining; }
     155        // safe decrement
     156        for (;;) {
     157            int n = _lookupsRemaining.get();
     158            if (n <= 0)
     159                return 0;
     160            if (_lookupsRemaining.compareAndSet(n, n - 1))
     161                return n - 1;
     162        }
     163    }
     164
     165    protected int getLookupsRemaining() { return _lookupsRemaining.get(); }
    161166   
    162167    /**
  • router/java/src/net/i2p/router/networkdb/kademlia/KBucketSet.java

    rdf84a2f rb53ed94  
    1313import java.util.HashSet;
    1414import java.util.Set;
     15import java.util.concurrent.atomic.AtomicInteger;
    1516
    1617import net.i2p.I2PAppContext;
     
    3031    private final LocalHash _us;
    3132    private final KBucket _buckets[];
    32     private volatile int _size;
     33    private final AtomicInteger _size = new AtomicInteger();
    3334   
    3435    public final static int BASE = 8; // must go into KEYSIZE_BITS evenly
     
    5657            int numInBucket = _buckets[bucket].add(peer);
    5758            if (numInBucket != oldSize)
    58                 _size++;
     59                _size.incrementAndGet();
    5960            if (numInBucket > BUCKET_SIZE) {
    6061                // perhaps queue up coalesce job?  naaahh.. lets let 'er grow for now
     
    7374     */
    7475    public int size() {
    75         return _size;
     76        return _size.get();
    7677        /*
    7778        int size = 0;
     
    8788        boolean removed = kbucket.remove(entry);
    8889        if (removed)
    89             _size--;
     90            _size.decrementAndGet();
    9091        return removed;
    9192    }
     
    9697            _buckets[i].setEntries(Collections.EMPTY_SET);
    9798        }
    98         _size = 0;
     99        _size.set(0);
    99100        _us.clearXorCache();
    100101    }
  • router/java/src/net/i2p/router/networkdb/kademlia/SingleSearchJob.java

    rdf84a2f rb53ed94  
    5151            _log.info(getJobId() + ": Single search for " + _key + " to " + _to);
    5252        getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
    53         _lookupsRemaining = 1;
     53        _lookupsRemaining.set(1);
    5454    }
    5555
  • router/java/src/net/i2p/router/peermanager/TunnelHistory.java

    rdf84a2f rb53ed94  
    55import java.util.Date;
    66import java.util.Properties;
     7import java.util.concurrent.atomic.AtomicLong;
    78
    89import net.i2p.router.RouterContext;
     
    1718    private final RouterContext _context;
    1819    private final Log _log;
    19     private volatile long _lifetimeAgreedTo;
    20     private volatile long _lifetimeRejected;
     20    private final AtomicLong _lifetimeAgreedTo = new AtomicLong();
     21    private final AtomicLong _lifetimeRejected = new AtomicLong();
    2122    private volatile long _lastAgreedTo;
    2223    private volatile long _lastRejectedCritical;
     
    2425    private volatile long _lastRejectedTransient;
    2526    private volatile long _lastRejectedProbabalistic;
    26     private volatile long _lifetimeFailed;
     27    private final AtomicLong _lifetimeFailed = new AtomicLong();
    2728    private volatile long _lastFailed;
    2829    private RateStat _rejectRate;
     
    5455   
    5556    /** total tunnels the peer has agreed to participate in */
    56     public long getLifetimeAgreedTo() { return _lifetimeAgreedTo; }
     57    public long getLifetimeAgreedTo() { return _lifetimeAgreedTo.get(); }
    5758    /** total tunnels the peer has refused to participate in */
    58     public long getLifetimeRejected() { return _lifetimeRejected; }
     59    public long getLifetimeRejected() { return _lifetimeRejected.get(); }
    5960    /** total tunnels the peer has agreed to participate in that were later marked as failed prematurely */
    60     public long getLifetimeFailed() { return _lifetimeFailed; }
     61    public long getLifetimeFailed() { return _lifetimeFailed.get(); }
    6162    /** when the peer last agreed to participate in a tunnel */
    6263    public long getLastAgreedTo() { return _lastAgreedTo; }
     
    7778   
    7879    public void incrementAgreedTo() {
    79         _lifetimeAgreedTo++;
     80        _lifetimeAgreedTo.incrementAndGet();
    8081        _lastAgreedTo = _context.clock().now();
    8182    }
     
    8687     */
    8788    public void incrementRejected(int severity) {
    88         _lifetimeRejected++;
     89        _lifetimeRejected.incrementAndGet();
    8990        if (severity >= TUNNEL_REJECT_CRIT) {
    9091            _lastRejectedCritical = _context.clock().now();
     
    107108     */
    108109    public void incrementFailed(int pct) {
    109         _lifetimeFailed++;
     110        _lifetimeFailed.incrementAndGet();
    110111        _failRate.addData(pct, 1);
    111112        _lastFailed = _context.clock().now();
     
    148149        addDate(buf, "lastRejectedTransient", _lastRejectedTransient, "When was the last time the peer refused to participate in a tunnel (Transient load response code)?");
    149150        addDate(buf, "lastRejectedProbabalistic", _lastRejectedProbabalistic, "When was the last time the peer refused to participate in a tunnel (Probabalistic response code)?");
    150         add(buf, "lifetimeAgreedTo", _lifetimeAgreedTo, "How many tunnels has the peer ever agreed to participate in?");
    151         add(buf, "lifetimeFailed", _lifetimeFailed, "How many tunnels has the peer ever agreed to participate in that failed prematurely?");
    152         add(buf, "lifetimeRejected", _lifetimeRejected, "How many tunnels has the peer ever refused to participate in?");
     151        add(buf, "lifetimeAgreedTo", _lifetimeAgreedTo.get(), "How many tunnels has the peer ever agreed to participate in?");
     152        add(buf, "lifetimeFailed", _lifetimeFailed.get(), "How many tunnels has the peer ever agreed to participate in that failed prematurely?");
     153        add(buf, "lifetimeRejected", _lifetimeRejected.get(), "How many tunnels has the peer ever refused to participate in?");
    153154        out.write(buf.toString().getBytes());
    154155        _rejectRate.store(out, "tunnelHistory.rejectRate");
     
    173174        _lastRejectedTransient = getLong(props, "tunnels.lastRejectedTransient");
    174175        _lastRejectedProbabalistic = getLong(props, "tunnels.lastRejectedProbabalistic");
    175         _lifetimeAgreedTo = getLong(props, "tunnels.lifetimeAgreedTo");
    176         _lifetimeFailed = getLong(props, "tunnels.lifetimeFailed");
    177         _lifetimeRejected = getLong(props, "tunnels.lifetimeRejected");
     176        _lifetimeAgreedTo.set(getLong(props, "tunnels.lifetimeAgreedTo"));
     177        _lifetimeFailed.set(getLong(props, "tunnels.lifetimeFailed"));
     178        _lifetimeRejected.set(getLong(props, "tunnels.lifetimeRejected"));
    178179        try {
    179180            _rejectRate.load(props, "tunnelHistory.rejectRate", true);
Note: See TracChangeset for help on using the changeset viewer.