Changeset f42ac71


Ignore:
Timestamp:
Nov 23, 2013 2:37:33 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
e498e21
Parents:
74f2fd0
Message:

UDP PeerState? findbugs volatile/atomic/synch

File:
1 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r74f2fd0 rf42ac71  
    1212import java.util.Queue;
    1313import java.util.concurrent.LinkedBlockingQueue;
     14import java.util.concurrent.atomic.AtomicInteger;
    1415
    1516import net.i2p.data.Hash;
     
    120121    //private boolean _remoteWantsPreviousACKs;
    121122    /** how many bytes should we send to the peer in a second */
    122     private volatile int _sendWindowBytes;
     123    private int _sendWindowBytes;
    123124    /** how many bytes can we send to the peer in the current second */
    124     private volatile int _sendWindowBytesRemaining;
     125    private int _sendWindowBytesRemaining;
    125126    private long _lastSendRefill;
    126127    private int _sendBps;
     
    226227    private static final int MIN_CONCURRENT_MSGS = 8;
    227228    /** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
    228     private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
     229    private int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
    229230    /**
    230231     * how many outbound messages are currently being transmitted.  Not thread safe, as we're not strict
    231232     */
    232     private volatile int _concurrentMessagesActive = 0;
     233    private int _concurrentMessagesActive;
    233234    /** how many concurrency rejections have we had in a row */
    234     private volatile int _consecutiveRejections = 0;
     235    private int _consecutiveRejections;
    235236    /** is it inbound? **/
    236237    private final boolean _isInbound;
     
    437438
    438439    /** how many bytes should we send to the peer in a second */
    439     public int getSendWindowBytes() { return _sendWindowBytes; }
     440    public int getSendWindowBytes() {
     441        synchronized(_outboundMessages) {
     442            return _sendWindowBytes;
     443        }
     444    }
     445
    440446    /** how many bytes can we send to the peer in the current second */
    441     public int getSendWindowBytesRemaining() { return _sendWindowBytesRemaining; }
     447    public int getSendWindowBytesRemaining() {
     448        synchronized(_outboundMessages) {
     449            return _sendWindowBytesRemaining;
     450        }
     451    }
     452
    442453    /** what IP is the peer sending and receiving packets on? */
    443454    public byte[] getRemoteIP() { return _remoteIP; }
     
    581592    public int getSendBps() { return _sendBps; }
    582593    public int getReceiveBps() { return _receiveBps; }
     594
    583595    public int incrementConsecutiveFailedSends() {
    584         _concurrentMessagesActive--;
    585         if (_concurrentMessagesActive < 0)
    586             _concurrentMessagesActive = 0;
    587        
    588         //long now = _context.clock().now()/(10*1000);
    589         //if (_lastFailedSendPeriod >= now) {
    590         //    // ignore... too fast
    591         //} else {
    592         //    _lastFailedSendPeriod = now;
    593             _consecutiveFailedSends++;
    594         //}
    595         return _consecutiveFailedSends;
    596     }
     596        synchronized(_outboundMessages) {
     597            _concurrentMessagesActive--;
     598            if (_concurrentMessagesActive < 0)
     599                _concurrentMessagesActive = 0;
     600           
     601            //long now = _context.clock().now()/(10*1000);
     602            //if (_lastFailedSendPeriod >= now) {
     603            //    // ignore... too fast
     604            //} else {
     605            //    _lastFailedSendPeriod = now;
     606                _consecutiveFailedSends++;
     607            //}
     608            return _consecutiveFailedSends;
     609        }
     610    }
     611
    597612    public long getInactivityTime() {
    598613        long now = _context.clock().now();
     
    621636     * cannot.  If it is not decremented, the window size remaining is
    622637     * not adjusted at all.
    623      */
    624     public boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
    625 
    626     public boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
    627 
    628     /**
     638     *
    629639     *  Caller should synch
    630640     */
    631     public boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
     641    private boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
     642
     643    //private boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
     644
     645    /**
     646     *  Caller should synch
     647     */
     648    private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
    632649        long now = _context.clock().now();
    633650        long duration = now - _lastSendRefill;
     
    695712
    696713    public int getSlowStartThreshold() { return _slowStartThreshold; }
    697     public int getConcurrentSends() { return _concurrentMessagesActive; }
    698     public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
    699     public int getConsecutiveSendRejections() { return _consecutiveRejections; }
     714
     715    public int getConcurrentSends() {
     716        synchronized(_outboundMessages) {
     717            return _concurrentMessagesActive;
     718        }
     719    }
     720
     721    public int getConcurrentSendWindow() {
     722        synchronized(_outboundMessages) {
     723            return _concurrentMessagesAllowed;
     724        }
     725    }
     726
     727    public int getConsecutiveSendRejections() {
     728        synchronized(_outboundMessages) {
     729            return _consecutiveRejections;
     730        }
     731    }
     732
    700733    public boolean isInbound() { return _isInbound; }
    701734
     
    16751708     *  Have 3 return values, because if allocateSendingBytes() returns false,
    16761709     *  then allocateSend() can stop iterating
     1710     *
     1711     *  Caller should synch
    16771712     */
    16781713    private ShouldSend locked_shouldSend(OutboundMessageState state) {
Note: See TracChangeset for help on using the changeset viewer.