Changeset e5d66f4 for apps/ministreaming


Ignore:
Timestamp:
Aug 15, 2004 8:48:35 PM (17 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
4dc17773
Parents:
d2fc24e7
git-author:
jrandom <jrandom> (08/15/04 20:48:35)
git-committer:
zzz <zzz@…> (08/15/04 20:48:35)
Message:

deal with a race on close
more zealous bc synchronization
make sure we always close the streams explicitly
logging

File:
1 edited

Legend:

Unmodified
Added
Removed
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    rd2fc24e7 re5d66f4  
    347347            boolean timedOut = false;
    348348
    349             while (read.length == 0) {
     349            while ( (read.length == 0) && (!inStreamClosed) ) {
    350350                synchronized (flagLock) {
    351351                    if (closed) {
     
    379379            }
    380380            if (read.length > len) throw new RuntimeException("BUG");
     381            if ( (inStreamClosed) && ( (read == null) || (read.length <= 0) ) )
     382                return -1;
     383           
    381384            System.arraycopy(read, 0, b, off, read.length);
    382385
     
    457460                I2PInputStream.this.notifyAll();
    458461            }
     462            if (_log.shouldLog(Log.DEBUG))
     463                _log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode());
    459464        }
    460465
     
    472477                bc.notifyAll();
    473478            }
     479            if (_log.shouldLog(Log.DEBUG))
     480                _log.debug(getStreamPrefix() + "After close");
    474481        }
    475482
     
    519526        private boolean handleNextPacket(ByteCollector bc, byte buffer[])
    520527                                         throws IOException, I2PSessionException {
     528            if (_log.shouldLog(Log.DEBUG))
     529                _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket");
    521530            int len = in.read(buffer);
    522             int bcsize = bc.getCurrentSize();
     531            int bcsize = 0;
     532            synchronized (bc) {
     533                bcsize = bc.getCurrentSize();
     534            }
     535
     536            if (_log.shouldLog(Log.DEBUG))
     537                _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize);
     538
    523539            if (len != -1) {
    524                 bc.append(buffer, len);
     540                synchronized (bc) {
     541                    bc.append(buffer, len);
     542                }
    525543            } else if (bcsize == 0) {
    526544                // nothing left in the buffer, and read(..) got EOF (-1).
     
    530548            if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
    531549                if (_log.shouldLog(Log.DEBUG))
    532                     _log.debug(getPrefix() + "Runner Point d: " + hashCode());
     550                    _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode());
    533551
    534552                try {
     
    539557            }
    540558            if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
    541                 byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
     559                byte data[] = null;
     560                synchronized (bc) {
     561                    data = bc.startToByteArray(MAX_PACKET_SIZE);
     562                }
    542563                if (data.length > 0) {
    543564                    if (_log.shouldLog(Log.DEBUG))
    544                         _log.debug(getPrefix() + "Message size is: " + data.length);
     565                        _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length);
    545566                    boolean sent = sendBlock(data);
    546567                    if (!sent) {
    547568                        if (_log.shouldLog(Log.WARN))
    548                             _log.warn(getPrefix() + "Error sending message to peer.  Killing socket runner");
     569                            _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer.  Killing socket runner");
    549570                        errorOccurred();
    550571                        return false;
    551572                    } else {
    552573                        if (_log.shouldLog(Log.DEBUG))
    553                             _log.debug(getPrefix() + "Message sent to peer");
     574                            _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer");
    554575                    }
    555576                }
     
    568589                    keepHandling = handleNextPacket(bc, buffer);
    569590                    packetsHandled++;
    570                 }
     591                    if (_log.shouldLog(Log.DEBUG))
     592                        _log.debug(getPrefix() + ":" + Thread.currentThread().getName()
     593                                   + "Packets handled: " + packetsHandled);
     594                }
     595                if (_log.shouldLog(Log.INFO))
     596                    _log.info(getPrefix() + ":" + Thread.currentThread().getName()
     597                               + "After handling packets, we're done.  Packets handled: " + packetsHandled);
     598               
    571599                if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
    572600                    if (_log.shouldLog(Log.WARN))
     
    584612                if (sc) {
    585613                    if (_log.shouldLog(Log.INFO))
    586                         _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
     614                        _log.info(getPrefix() + ":" + Thread.currentThread().getName()
     615                                  + "Sending close packet: (we started? " + outgoing
     616                                  + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
    587617                    byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
    588618                    boolean sent = manager.getSession().sendMessage(remote, packet);
    589619                    if (!sent) {
    590620                        if (_log.shouldLog(Log.WARN))
    591                             _log.warn(getPrefix() + "Error sending close packet to peer");
     621                            _log.warn(getPrefix() + ":" + Thread.currentThread().getName()
     622                                      + "Error sending close packet to peer");
    592623                        errorOccurred();
    593624                    }
    594625                }
    595626                manager.removeSocket(I2PSocketImpl.this);
     627                internalClose();
    596628            } catch (InterruptedIOException ex) {
    597629                _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
Note: See TracChangeset for help on using the changeset viewer.