Changeset a916c1a2 for router


Ignore:
Timestamp:
Jul 6, 2018 1:31:46 PM (2 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
381f3909, 998931f
Parents:
1460bec
Message:

NTCP: Read all available data when able (ticket #2243)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    r1460bec ra916c1a2  
    614614    private void processRead(SelectionKey key) {
    615615        NTCPConnection con = (NTCPConnection)key.attachment();
    616         ByteBuffer buf = acquireBuf();
     616        ByteBuffer buf = null;
    617617        try {
    618             int read = con.getChannel().read(buf);
    619             if (read < 0) {
    620                 if (con.isInbound() && con.getMessagesReceived() <= 0) {
    621                     InetAddress addr = con.getChannel().socket().getInetAddress();
    622                     int count;
    623                     if (addr != null) {
    624                         byte[] ip = addr.getAddress();
    625                         ByteArray ba = new ByteArray(ip);
    626                         count = _blockedIPs.increment(ba);
     618            while (true) {
     619                buf = acquireBuf();
     620                int read = 0;
     621                int readThisTime;
     622                int readCount = 0;
     623                while ((readThisTime = con.getChannel().read(buf)) > 0)  {
     624                    read += readThisTime;
     625                    readCount++;
     626                }
     627                if (readThisTime < 0 && read == 0)
     628                    read = readThisTime;
     629                if (_log.shouldDebug())
     630                    _log.debug("Read " + read + " bytes total in " + readCount + " times from " + con);
     631                if (read < 0) {
     632                    if (con.isInbound() && con.getMessagesReceived() <= 0) {
     633                        InetAddress addr = con.getChannel().socket().getInetAddress();
     634                        int count;
     635                        if (addr != null) {
     636                            byte[] ip = addr.getAddress();
     637                            ByteArray ba = new ByteArray(ip);
     638                            count = _blockedIPs.increment(ba);
     639                            if (_log.shouldLog(Log.WARN))
     640                                _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
     641                        } else {
     642                            count = 1;
     643                            if (_log.shouldLog(Log.WARN))
     644                                _log.warn("EOF on inbound before receiving any: " + con);
     645                        }
     646                        _context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
     647                    } else {
     648                        if (_log.shouldLog(Log.DEBUG))
     649                            _log.debug("EOF on " + con);
     650                    }
     651                    con.close();
     652                    releaseBuf(buf);
     653                    break;
     654                }
     655                if (read == 0) {
     656                    // stay interested
     657                    //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
     658                    releaseBuf(buf);
     659                    // workaround for channel stuck returning 0 all the time, causing 100% CPU
     660                    int consec = con.gotZeroRead();
     661                    if (consec >= 5) {
     662                        _context.statManager().addRateData("ntcp.zeroReadDrop", 1);
    627663                        if (_log.shouldLog(Log.WARN))
    628                             _log.warn("EOF on inbound before receiving any, blocking IP " + Addresses.toString(ip) + " with count " + count + ": " + con);
     664                            _log.warn("Fail safe zero read close " + con);
     665                        con.close();
    629666                    } else {
    630                         count = 1;
    631                         if (_log.shouldLog(Log.WARN))
    632                             _log.warn("EOF on inbound before receiving any: " + con);
     667                        _context.statManager().addRateData("ntcp.zeroRead", consec);
     668                        if (_log.shouldLog(Log.INFO))
     669                            _log.info("nothing to read for " + con + ", but stay interested");
    633670                    }
    634                     _context.statManager().addRateData("ntcp.dropInboundNoMessage", count);
    635                 } else {
    636                     if (_log.shouldLog(Log.DEBUG))
    637                         _log.debug("EOF on " + con);
    638                 }
    639                 con.close();
    640                 releaseBuf(buf);
    641             } else if (read == 0) {
    642                 // stay interested
    643                 //key.interestOps(key.interestOps() | SelectionKey.OP_READ);
    644                 releaseBuf(buf);
    645                 // workaround for channel stuck returning 0 all the time, causing 100% CPU
    646                 int consec = con.gotZeroRead();
    647                 if (consec >= 5) {
    648                     _context.statManager().addRateData("ntcp.zeroReadDrop", 1);
    649                     if (_log.shouldLog(Log.WARN))
    650                         _log.warn("Fail safe zero read close " + con);
    651                     con.close();
    652                 } else {
    653                     _context.statManager().addRateData("ntcp.zeroRead", consec);
    654                     if (_log.shouldLog(Log.INFO))
    655                         _log.info("nothing to read for " + con + ", but stay interested");
    656                 }
    657             } else {
     671                    break;
     672                }
     673                // Process the data received
    658674                // clear counter for workaround above
    659675                con.clearZeroRead();
     676                // go around again if we filled the buffer (so we can read more)
     677                boolean keepReading = !buf.hasRemaining();
    660678                // ZERO COPY. The buffer will be returned in Reader.processRead()
    661679                buf.flip();
     
    666684                    _context.statManager().addRateData("ntcp.queuedRecv", read);
    667685                    con.queuedRecv(buf, req);
     686                    break;
    668687                } else {
    669688                    // stay interested
     
    671690                    con.recv(buf);
    672691                    _context.statManager().addRateData("ntcp.read", read);
    673                 }
    674             }
     692                    if (readThisTime < 0) {
     693                        // EOF, we're done
     694                        con.close();
     695                        break;
     696                    }
     697                    if (!keepReading)
     698                        break;
     699                }
     700            }  // while true
    675701        } catch (CancelledKeyException cke) {
    676             releaseBuf(buf);
     702            if (buf != null)
     703                releaseBuf(buf);
    677704            if (_log.shouldLog(Log.WARN)) _log.warn("error reading on " + con, cke);
    678705            con.close();
     
    680707        } catch (IOException ioe) {
    681708            // common, esp. at outbound connect time
    682             releaseBuf(buf);
     709            if (buf != null)
     710                releaseBuf(buf);
    683711            if (con.isInbound() && con.getMessagesReceived() <= 0) {
    684712                InetAddress addr = con.getChannel().socket().getInetAddress();
     
    713741            con.close();
    714742        } catch (NotYetConnectedException nyce) {
    715             releaseBuf(buf);
     743            if (buf != null)
     744                releaseBuf(buf);
    716745            // ???
    717746            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
Note: See TracChangeset for help on using the changeset viewer.