Changeset 7b823933 for apps/streaming


Ignore:
Timestamp:
Apr 17, 2015 5:15:22 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
65993e1, 6a644dd
Parents:
22993e1
Message:

atomics and finals

File:
1 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java

    r22993e1 r7b823933  
    3030    private final ConnectionManager _connectionManager;
    3131    private Destination _remotePeer;
    32     private long _sendStreamId;
    33     private long _receiveStreamId;
     32    private final AtomicLong _sendStreamId = new AtomicLong();
     33    private final AtomicLong _receiveStreamId = new AtomicLong();
    3434    private volatile long _lastSendTime;
    3535    private final AtomicLong _lastSendId;
     
    4444    /** Locking: _nextSendLock */
    4545    private long _nextSendTime;
    46     private long _ackedPackets;
     46    private final AtomicLong _ackedPackets = new AtomicLong();
    4747    private final long _createdOn;
    4848    private final AtomicLong _closeSentOn = new AtomicLong();
    4949    private final AtomicLong _closeReceivedOn = new AtomicLong();
    50     private int _unackedPacketsReceived;
     50    private final AtomicInteger _unackedPacketsReceived = new AtomicInteger();
    5151    private long _congestionWindowEnd;
    5252    private volatile long _highestAckedThrough;
     
    7878    private final ConEvent _connectionEvent;
    7979    private final int _randomWait;
    80     private int _localPort;
    81     private int _remotePort;
     80    private final int _localPort;
     81    private final int _remotePort;
    8282    private final SimpleTimer2 _timer;
    8383   
    84     private long _lifetimeBytesSent;
     84    private final AtomicLong _lifetimeBytesSent = new AtomicLong();
    8585    /** TBD for tcpdump-compatible ack output */
    8686    private long _lowestBytesAckedThrough;
    87     private long _lifetimeBytesReceived;
    88     private long _lifetimeDupMessageSent;
    89     private long _lifetimeDupMessageReceived;
     87    private final AtomicLong _lifetimeBytesReceived = new AtomicLong();
     88    private final AtomicLong _lifetimeDupMessageSent = new AtomicLong();
     89    private final AtomicLong _lifetimeDupMessageReceived = new AtomicLong();
    9090   
    9191    public static final long MAX_RESEND_DELAY = 45*1000;
     
    133133            _localPort = opts.getLocalPort();
    134134            _remotePort = opts.getPort();
     135        } else {
     136            _localPort = 0;
     137            _remotePort = 0;
    135138        }
    136139        _options = (opts != null ? opts : new ConnectionOptions());
     
    306309        // Unconditionally set
    307310        _resetSentOn.set(now);
    308         if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
     311        if ( (_remotePeer == null) || (_sendStreamId.get() <= 0) ) return;
    309312        PacketLocal reply = new PacketLocal(_context, _remotePeer);
    310313        reply.setFlag(Packet.FLAG_RESET);
    311314        reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
    312         reply.setSendStreamId(_sendStreamId);
    313         reply.setReceiveStreamId(_receiveStreamId);
     315        reply.setSendStreamId(_sendStreamId.get());
     316        reply.setReceiveStreamId(_receiveStreamId.get());
    314317        // TODO remove this someday, as of 0.9.20 we do not require it
    315318        reply.setOptionalFrom(_connectionManager.getSession().getMyDestination());
     
    318321        // this just sends the packet - no retries or whatnot
    319322        if (_outboundQueue.enqueue(reply)) {
    320             _unackedPacketsReceived = 0;
     323            _unackedPacketsReceived.set(0);
    321324            _lastSendTime = _context.clock().now();
    322325            resetActivityTimer();
     
    398401       
    399402        if (_outboundQueue.enqueue(packet)) {       
    400             _unackedPacketsReceived = 0;
     403            _unackedPacketsReceived.set(0);
    401404            _lastSendTime = _context.clock().now();
    402405            resetActivityTimer();
     
    505508            }   // !isEmpty()
    506509            if (acked != null) {
     510                _ackedPackets.addAndGet(acked.size());
    507511                for (int i = 0; i < acked.size(); i++) {
    508512                    PacketLocal p = acked.get(i);
    509513                    // removed from _outboundPackets above in iterator
    510                     _ackedPackets++;
    511514                    if (p.getNumSends() > 1) {
    512515                        _activeResends.decrementAndGet();
     
    815818    }
    816819   
    817     private boolean _sendStreamIdSet = false;
    818     /** what stream do we send data to the peer on?
    819      * @return non-global stream sending ID
    820      */
    821     public long getSendStreamId() { return _sendStreamId; }
     820    /**
     821     *  What stream do we send data to the peer on?
     822     *  @return non-global stream sending ID, or 0 if unknown
     823     */
     824    public long getSendStreamId() { return _sendStreamId.get(); }
     825
     826    /**
     827     *  @param id 0 to 0xffffffff
     828     *  @throws RuntimeException if already set to nonzero
     829     */
    822830    public void setSendStreamId(long id) {
    823         if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
    824         _sendStreamIdSet = true;
    825         _sendStreamId = id;
    826     }
    827    
    828     private boolean _receiveStreamIdSet = false;
    829     /** The stream ID of a peer connection that sends data to us. (may be null)
    830      * @return receive stream ID, or null if there isn't one
    831      */
    832     public long getReceiveStreamId() { return _receiveStreamId; }
     831        if (!_sendStreamId.compareAndSet(0, id))
     832            throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]");
     833    }
     834   
     835    /**
     836     *  The stream ID of a peer connection that sends data to us, or zero if unknown.
     837     *  @return receive stream ID, or 0 if unknown
     838     */
     839    public long getReceiveStreamId() { return _receiveStreamId.get(); }
     840
     841    /**
     842     *  @param id 0 to 0xffffffff
     843     *  @throws RuntimeException if already set to nonzero
     844     */
    833845    public void setReceiveStreamId(long id) {
    834         if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
    835         _receiveStreamIdSet = true;
    836         _receiveStreamId = id;
     846        if (!_receiveStreamId.compareAndSet(0, id))
     847            throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]");
    837848        synchronized (_connectLock) { _connectLock.notifyAll(); }
    838849    }
     
    897908    public ConnectionPacketHandler getPacketHandler() { return _handler; }
    898909   
    899     public long getLifetimeBytesSent() { return _lifetimeBytesSent; }
    900     public long getLifetimeBytesReceived() { return _lifetimeBytesReceived; }
    901     public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent; }
    902     public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived; }
    903     public void incrementBytesSent(int bytes) { _lifetimeBytesSent += bytes; }
    904     public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent += msgs; }
    905     public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived += bytes; }
    906     public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived += msgs; }
     910    public long getLifetimeBytesSent() { return _lifetimeBytesSent.get(); }
     911    public long getLifetimeBytesReceived() { return _lifetimeBytesReceived.get(); }
     912    public long getLifetimeDupMessagesSent() { return _lifetimeDupMessageSent.get(); }
     913    public long getLifetimeDupMessagesReceived() { return _lifetimeDupMessageReceived.get(); }
     914    public void incrementBytesSent(int bytes) { _lifetimeBytesSent.addAndGet(bytes); }
     915    public void incrementDupMessagesSent(int msgs) { _lifetimeDupMessageSent.addAndGet(msgs); }
     916    public void incrementBytesReceived(int bytes) { _lifetimeBytesReceived.addAndGet(bytes); }
     917    public void incrementDupMessagesReceived(int msgs) { _lifetimeDupMessageReceived.addAndGet(msgs); }
    907918   
    908919    /**
     
    945956     * @return Count of how many packets ACKed.
    946957     */
    947     public long getAckedPackets() { return _ackedPackets; }
     958    public long getAckedPackets() { return _ackedPackets.get(); }
    948959    public long getCreatedOn() { return _createdOn; }
    949960
     
    960971        }
    961972    }
    962     public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; }
    963     public int getUnackedPacketsReceived() { return _unackedPacketsReceived; }
     973
     974    public void incrementUnackedPacketsReceived() { _unackedPacketsReceived.incrementAndGet(); }
     975    public int getUnackedPacketsReceived() { return _unackedPacketsReceived.get(); }
    964976
    965977    /** how many packets have we sent but not yet received an ACK for?
     
    10071019        long expiration = _context.clock().now() + _options.getConnectTimeout();
    10081020        while (true) {
    1009             if (_connected.get() && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
     1021            if (_connected.get() && (_receiveStreamId.get() > 0) && (_sendStreamId.get() > 0) ) {
    10101022                // w00t
    10111023                if (_log.shouldLog(Log.DEBUG))
     
    11691181        StringBuilder buf = new StringBuilder(256);
    11701182        buf.append("[Connection ");
    1171         if (_receiveStreamId > 0)
    1172             buf.append(Packet.toId(_receiveStreamId));
     1183        long id = _receiveStreamId.get();
     1184        if (id > 0)
     1185            buf.append(Packet.toId(id));
    11731186        else
    11741187            buf.append("unknown");
    11751188        buf.append('/');
    1176         if (_sendStreamId > 0)
    1177             buf.append(Packet.toId(_sendStreamId));
     1189        id = _sendStreamId.get();
     1190        if (id > 0)
     1191            buf.append(Packet.toId(id));
    11781192        else
    11791193            buf.append("unknown");
     
    13451359                _packet.setResendDelay(getOptions().getResendDelay() / 1000);
    13461360                if (_packet.getReceiveStreamId() <= 0)
    1347                     _packet.setReceiveStreamId(_receiveStreamId);
     1361                    _packet.setReceiveStreamId(_receiveStreamId.get());
    13481362                if (_packet.getSendStreamId() <= 0)
    1349                     _packet.setSendStreamId(_sendStreamId);
     1363                    _packet.setSendStreamId(_sendStreamId.get());
    13501364               
    13511365                int newWindowSize = getOptions().getWindowSize();
     
    14261440                                  + newWindowSize + " lifetime "
    14271441                                  + (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
    1428                         _unackedPacketsReceived = 0;
     1442                        _unackedPacketsReceived.set(0);
    14291443                        _lastSendTime = _context.clock().now();
    14301444                        // timer reset added 0.9.1
Note: See TracChangeset for help on using the changeset viewer.