Changeset 0942a7f


Ignore:
Timestamp:
May 19, 2004 3:14:30 PM (17 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
097a4647
Parents:
2df4370
git-author:
jrandom <jrandom> (05/19/04 15:14:30)
git-committer:
zzz <zzz@…> (05/19/04 15:14:30)
Message:

truckloads of logging
new async interface for error notification (e.g. you can get notified of an error prior to it throwing the IOException).
This async is useful since the IOException can be delayed for up to a minute while waiting for the close packet to be delivered.
The alternative is to fire off a new thread to do the closing, and we may want to go there later, but i'm not sure.

Location:
apps/ministreaming/java/src/net/i2p/client/streaming
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java

    r2df4370 r0942a7f  
    9090    public boolean addWaitForAccept(I2PSocket s, long timeoutMs) {
    9191        if (_log.shouldLog(Log.DEBUG))
    92             _log.debug("addWaitForAccept [new socket arrived, pending: " + pendingSockets.size());
     92            _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size());
    9393       
    9494        if (closing) {
     
    112112            if (now >= end) {
    113113                if (_log.shouldLog(Log.INFO))
    114                     _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms");
     114                    _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString());
    115115                pendingSockets.remove(s);
    116116                return false;
     
    131131        long now = clock.now();
    132132        if (_log.shouldLog(Log.DEBUG))
    133             _log.info("Socket accepted after " + (now-start) + "ms");
     133            _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString());
    134134        return true;
    135135    }
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    r2df4370 r0942a7f  
    3030    private I2PInputStream in;
    3131    private I2POutputStream out;
     32    private SocketErrorListener _socketErrorListener;
    3233    private boolean outgoing;
     34    private long _socketId;
     35    private static long __socketId = 0;
     36    private long _bytesRead = 0;
     37    private long _bytesWritten = 0;
    3338    private Object flagLock = new Object();
    3439
     
    6267        manager = mgr;
    6368        remote = peer;
     69        _socketId = ++__socketId;
    6470        local = mgr.getSession().getMyDestination();
    6571        in = new I2PInputStream();
     
    154160     */
    155161    public void queueData(byte[] data) {
     162        _bytesRead += data.length;
    156163        in.queueData(data);
    157164    }
     
    233240    }
    234241
     242    public void setSocketErrorListener(SocketErrorListener lsnr) {
     243        _socketErrorListener = lsnr;
     244    }
     245   
     246    void errorOccurred() {
     247        if (_socketErrorListener != null)
     248            _socketErrorListener.errorOccurred();
     249    }
     250   
     251    private String getPrefix() { return "[" + _socketId + "]: "; }
     252   
    235253    //--------------------------------------------------
    236254    private class I2PInputStream extends InputStream {
     
    257275
    258276        public synchronized int read(byte[] b, int off, int len) throws IOException {
    259             _log.debug("Read called: " + this.hashCode());
     277            if (_log.shouldLog(Log.DEBUG))
     278                _log.debug(getPrefix() + "Read called: " + this.hashCode());
    260279            if (len == 0) return 0;
    261280            long dieAfter = System.currentTimeMillis() + readTimeout;
     
    266285                synchronized (flagLock) {
    267286                    if (closed) {
    268                         _log.debug("Closed is set, so closing stream: " + hashCode());
     287                        if (_log.shouldLog(Log.DEBUG))
     288                            _log.debug(getPrefix() + "Closed is set after reading " + _bytesRead + " and writing " + _bytesWritten + ", so closing stream: " + hashCode());
    269289                        return -1;
    270290                    }
     
    280300                if ((readTimeout >= 0)
    281301                    && (System.currentTimeMillis() >= dieAfter)) {
    282                     throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)");
     302                    throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)");
    283303                }
    284304
     
    289309
    290310            if (_log.shouldLog(Log.DEBUG)) {
    291                 _log.debug("Read from I2PInputStream " + hashCode() + " returned "
     311                _log.debug(getPrefix() + "Read from I2PInputStream " + hashCode() + " returned "
    292312                           + read.length + " bytes");
    293313            }
     
    310330        public synchronized void queueData(byte[] data, int off, int len) {
    311331            if (_log.shouldLog(Log.DEBUG))
    312                 _log.debug("Insert " + len + " bytes into queue: " + hashCode());
     332                _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
    313333            bc.append(data, off, len);
    314334            notifyAll();
     
    339359
    340360        public void write(byte[] b, int off, int len) throws IOException {
     361            _bytesWritten += len;
    341362            sendTo.queueData(b, off, len);
    342363        }
     
    354375        public I2PSocketRunner(InputStream in) {
    355376            if (_log.shouldLog(Log.DEBUG))
    356                 _log.debug("Runner's input stream is: " + in.hashCode());
     377                _log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode());
    357378            this.in = in;
    358379            String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
    359             setName("SocketRunner " + (++__runnerId) + " " + peer.substring(0, 4));
     380            setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4));
    360381            start();
    361382        }
     
    379400            if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
    380401                if (_log.shouldLog(Log.DEBUG))
    381                     _log.debug("Runner Point d: " + hashCode());
     402                    _log.debug(getPrefix() + "Runner Point d: " + hashCode());
    382403
    383404                try {
     
    391412                if (data.length > 0) {
    392413                    if (_log.shouldLog(Log.DEBUG))
    393                         _log.debug("Message size is: " + data.length);
     414                        _log.debug(getPrefix() + "Message size is: " + data.length);
    394415                    boolean sent = sendBlock(data);
    395416                    if (!sent) {
    396                         _log.error("Error sending message to peer.  Killing socket runner");
     417                        _log.error(getPrefix() + "Error sending message to peer.  Killing socket runner");
     418                        errorOccurred();
    397419                        return false;
    398420                    }
     
    414436                }
    415437                if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
    416                     _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: "
     438                    _log.error(getPrefix() + "A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: "
    417439                               + in.hashCode() + "; "
    418440                               + "queue size: " + bc.getCurrentSize() + ")");
     
    427449                if (sc) {
    428450                    if (_log.shouldLog(Log.INFO))
    429                         _log.info("Sending close packet: " + outgoing);
     451                        _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
    430452                    byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
    431453                    boolean sent = manager.getSession().sendMessage(remote, packet);
    432454                    if (!sent) {
    433                         _log.error("Error sending close packet to peer");
     455                        _log.error(getPrefix() + "Error sending close packet to peer");
     456                        errorOccurred();
    434457                    }
    435458                }
    436459                manager.removeSocket(I2PSocketImpl.this);
    437460            } catch (InterruptedIOException ex) {
    438                 _log.error("BUG! read() operations should not timeout!", ex);
     461                _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
    439462            } catch (IOException ex) {
    440463                // WHOEVER removes this event on inconsistent
     
    442465                // reference on the socket in the socket manager
    443466                // etc.) will get hanged by me personally -- mihi
    444                 _log.error("Error running - **INCONSISTENT STATE!!!**", ex);
     467                _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
    445468            } catch (I2PException ex) {
    446                 _log.error("Error running - **INCONSISTENT STATE!!!**", ex);
     469                _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
    447470            }
    448471        }
     
    450473        private boolean sendBlock(byte data[]) throws I2PSessionException {
    451474            if (_log.shouldLog(Log.DEBUG))
    452                 _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
     475                _log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
    453476            if (remoteID == null) {
    454                 _log.error("NULL REMOTEID");
     477                _log.error(getPrefix() + "NULL REMOTEID");
    455478                return false;
    456479            }
     
    464487        }
    465488    }
     489   
     490    public String toString() { return "" + hashCode(); }
    466491}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java

    r2df4370 r0942a7f  
    190190        }
    191191       
    192         _log.debug("*Disconnect outgoing!");
     192        _log.debug("*Disconnect outgoing for socket " + s);
    193193        try {
    194194            if (s != null) {
     
    208208            return;
    209209        } catch (Exception t) {
    210             _log.error("Ignoring error on disconnect", t);
     210            _log.error("Ignoring error on disconnect for socket " + s, t);
    211211        }
    212212    }
     
    226226        // packet send outgoing
    227227        if (_log.shouldLog(Log.DEBUG))
    228             _log.debug("*Packet send outgoing [" + payload.length + "]");
     228            _log.debug("*Packet send outgoing [" + payload.length + "] for socket " + s);
    229229        if (s != null) {
    230230            s.queueData(payload);
     
    246246    private void synIncomingAvailable(String id, byte payload[], I2PSession session)
    247247                                      throws DataFormatException, I2PSessionException {
    248         _log.debug("*Syn!");
    249248        Destination d = new Destination();
    250249        d.fromByteArray(payload);
     
    260259            }
    261260        }   
     261        _log.debug("*Syn! for socket " + s);
    262262       
    263263        if (!acceptConnections) {
     
    284284                if (_log.shouldLog(Log.WARN))
    285285                    _log.warn("Error sending reply to " + d.calculateHash().toBase64()
    286                                + " in response to a new con message",
     286                               + " in response to a new con message for socket " + s,
    287287                               new Exception("Failed creation"));
    288288                s.internalClose();
     
    294294            boolean nackSent = session.sendMessage(d, packet);
    295295            if (!nackSent) {
    296                 _log.warn("Error sending NACK for session creation");
     296                _log.warn("Error sending NACK for session creation for socket " + s);
    297297            }
    298298            s.internalClose();
     
    307307     */
    308308    private void disconnectIncoming(String id, byte payload[]) {
    309         _log.debug("*Disconnect incoming!");
    310309        I2PSocketImpl s = null;
    311310        synchronized (lock) {
     
    315314            }
    316315        }
     316       
     317        _log.debug("*Disconnect incoming for socket " + s);
    317318       
    318319        try {
     
    340341     */
    341342    private void sendIncoming(String id, byte payload[]) {
     343        I2PSocketImpl s = null;
     344        synchronized (lock) {
     345            s = (I2PSocketImpl) _inSockets.get(id);
     346        }
     347
    342348        if (_log.shouldLog(Log.DEBUG))
    343             _log.debug("*Packet send incoming [" + payload.length + "]");
    344         I2PSocketImpl s = null;
    345         synchronized (lock) {
    346             s = (I2PSocketImpl) _inSockets.get(id);
    347         }
     349            _log.debug("*Packet send incoming [" + payload.length + "] for socket " + s);
    348350       
    349351        if (s != null) {
     
    423425            sent = _session.sendMessage(peer, packet);
    424426            if (!sent) {
    425                 _log.info("Unable to send & receive ack for SYN packet");
     427                _log.info("Unable to send & receive ack for SYN packet for socket " + s);
    426428                synchronized (lock) {
    427429                    _outSockets.remove(s.getLocalID());
     
    432434           
    433435            if (remoteID == null)
    434                 throw new ConnectException("Connection refused by peer");
     436                throw new ConnectException("Connection refused by peer for socket " + s);
    435437            if ("".equals(remoteID))
    436                 throw new NoRouteToHostException("Unable to reach peer");
     438                throw new NoRouteToHostException("Unable to reach peer for socket " + s);
    437439            if (_log.shouldLog(Log.DEBUG))
    438440                _log.debug("TIMING: s given out for remoteID "
    439                            + getReadableForm(remoteID));
     441                           + getReadableForm(remoteID) + " for socket " + s);
    440442           
    441443            return s;
     
    443445            if (_log.shouldLog(Log.ERROR))
    444446                _log.error("Timeout waiting for ack from syn for id "
    445                            + getReadableForm(lcID), ioe);
     447                           + getReadableForm(lcID) + " for socket " + s, ioe);
    446448            synchronized (lock) {
    447449                _outSockets.remove(s.getLocalID());
     
    457459        } catch (IOException ex) {
    458460            if (_log.shouldLog(Log.ERROR))
    459                 _log.error("Error sending syn on id " + getReadableForm(lcID), ex);
     461                _log.error("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex);
    460462            synchronized (lock) {
    461463                _outSockets.remove(s.getLocalID());
     
    465467        } catch (I2PException ex) {
    466468            if (_log.shouldLog(Log.INFO))
    467                 _log.info("Error sending syn on id " + getReadableForm(lcID), ex);
     469                _log.info("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex);
    468470            synchronized (lock) {
    469471                _outSockets.remove(s.getLocalID());
     
    578580        synchronized (lock) {
    579581            if (_log.shouldLog(Log.DEBUG))
    580                 _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\"");
     582                _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\" [" + sock + "]");
    581583            _inSockets.remove(sock.getLocalID());
    582584            _outSockets.remove(sock.getLocalID());
Note: See TracChangeset for help on using the changeset viewer.