Changeset bec62c1b


Ignore:
Timestamp:
Jul 5, 2013 7:48:31 PM (7 years ago)
Author:
zab2 <zab2@…>
Branches:
master
Children:
614b8b4
Parents:
7f8efca
Message:

Remove unused field

Use atomics to manage closed state and IOExceptions

File:
1 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java

    r7f8efca rbec62c1b  
    44import java.io.InterruptedIOException;
    55import java.io.OutputStream;
     6import java.util.concurrent.atomic.AtomicBoolean;
     7import java.util.concurrent.atomic.AtomicReference;
    68
    79import net.i2p.I2PAppContext;
     
    2527    private final Object _dataLock;
    2628    private final DataReceiver _dataReceiver;
    27     private IOException _streamError;
    28     private volatile boolean _closed;
     29    private final AtomicReference<IOException>_streamError = new AtomicReference<IOException>(null);
     30    private final AtomicBoolean _closed = new AtomicBoolean(false);
    2931    private long _written;
    3032    private int _writeTimeout;
    3133    private ByteCache _dataCache;
    3234    private final Flusher _flusher;
    33     private long _lastFlushed;
    3435    private volatile long _lastBuffered;
    3536    /** if we enqueue data but don't flush it in this period, flush it passively */
     
    99100        @Override
    100101    public void write(byte b[], int off, int len) throws IOException {
    101         if (_closed) throw new IOException("Already closed");
     102        if (_closed.get()) throw new IOException("Already closed");
    102103        if (_log.shouldLog(Log.DEBUG))
    103104            _log.debug("write(b[], " + off + ", " + len + ") ");
     
    107108        while (remaining > 0) {
    108109            WriteStatus ws = null;
    109             if (_closed) throw new IOException("closed underneath us");
     110            if (_closed.get()) throw new IOException("closed underneath us");
    110111            // we do any waiting outside the synchronized() block because we
    111112            // want to allow other threads to flushAvailable() whenever they want. 
     
    140141                    _valid = 0;                       
    141142                    throwAnyError();
    142                     _lastFlushed = _context.clock().now();
    143143                   
    144144                    locked_updateBufferSize();
     
    240240        }
    241241        public void timeReached() {
    242             if (_closed)
     242            if (_closed.get())
    243243                return;
    244244            _enqueued = false;
     
    266266                        _written += _valid;
    267267                        _valid = 0;
    268                         _lastFlushed = _context.clock().now();
    269268                        locked_updateBufferSize();
    270269                        _dataLock.notifyAll();
     
    331330                _valid = 0;
    332331                locked_updateBufferSize();
    333                 _lastFlushed = _context.clock().now();
    334332                _dataLock.notifyAll();
    335333            }
     
    346344        if (_log.shouldLog(Log.DEBUG))
    347345            _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws);
    348         if (_closed &&
     346        if (_closed.get() &&
    349347            ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
    350348              (_writeTimeout <= 0) ) )
     
    373371    @Override
    374372    public void close() throws IOException {
    375         if (_closed) {
     373        if (!_closed.compareAndSet(false,true)) {
    376374            synchronized (_dataLock) { _dataLock.notifyAll(); }
     375            LogUtil.logCloseLoop(_log, "MOS");
    377376            return;
    378377        }
    379378        // setting _closed before flush() will force flush() to send a CLOSE packet
    380         _closed = true;
    381379        _flusher.cancel();
    382380
     
    412410     */
    413411    public void closeInternal() {
    414         _closed = true;
     412        if (!_closed.compareAndSet(false,true)) {
     413            LogUtil.logCloseLoop(_log, "close internal");
     414            return;
     415        }
    415416        _flusher.cancel();
    416         if (_streamError == null)
    417             _streamError = new IOException("Closed internally");
     417        _streamError.compareAndSet(null,new IOException("Closed internally"));
    418418        clearData(true);
    419419    }
     
    436436                _valid = 0;
    437437            }
    438             _lastFlushed = _context.clock().now();
    439438            _dataLock.notifyAll();
    440439        }
     
    444443    }
    445444   
    446     public boolean getClosed() { return _closed; }
     445    public boolean getClosed() { return _closed.get(); }
    447446   
    448447    private void throwAnyError() throws IOException {
    449         IOException ioe = _streamError;
     448        IOException ioe = _streamError.getAndSet(null);
    450449        if (ioe != null) {
    451             _streamError = null;
    452450            // constructor with cause not until Java 6
    453451            IOException ioe2 = new IOException("Output stream error");
     
    458456   
    459457    void streamErrorOccurred(IOException ioe) {
    460         if (_streamError == null)
    461             _streamError = ioe;
     458        _streamError.compareAndSet(null,ioe);
    462459        clearData(false);
    463460    }
     
    485482            locked_updateBufferSize();
    486483            _dataLock.notifyAll();
    487             _lastFlushed = _context.clock().now();
    488484        }
    489485        long afterBuild = System.currentTimeMillis();
     
    505501   
    506502    void destroy() {
    507         _closed = true;
     503        if (!_closed.compareAndSet(false,true)) {
     504            LogUtil.logCloseLoop(_log, "destroy()");
     505            return;
     506        }
    508507        _flusher.cancel();
    509508        synchronized (_dataLock) {
Note: See TracChangeset for help on using the changeset viewer.