Changeset 937ae8ad


Ignore:
Timestamp:
Dec 9, 2011 4:43:54 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
25b0603f
Parents:
6d4a9ab
Message:
  • UDP:
    • Round expiration times when converting to seconds
    • Zero-copy of single-fragment messages in MessageReceiver?
    • Optimizations, log tweaks, comments
Location:
router/java/src/net/i2p/router/transport/udp
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/transport/udp/ACKSender.java

    r6d4a9ab r937ae8ad  
    139139                            _peersToACK.addAll(notYet);
    140140                        } catch (Exception e) {}
    141                         if (_log.shouldLog(Log.INFO))
    142                             _log.info("sleeping, pending size = " + notYet.size());
     141                        if (_log.shouldLog(Log.DEBUG))
     142                            _log.debug("sleeping, pending size = " + notYet.size());
    143143                        notYet.clear();
    144144                        try {
  • router/java/src/net/i2p/router/transport/udp/InboundMessageState.java

    r6d4a9ab r937ae8ad  
    7070                data.readMessageFragment(dataFragment, message.getData(), 0);
    7171                int size = data.readMessageFragmentSize(dataFragment);
     72                if (size <= 0) {
     73                    // Bug in routers prior to 0.8.12
     74                    // If the msg size was an exact multiple of the fragment size,
     75                    // it would send a zero-length last fragment.
     76                    // This message is almost certainly doomed.
     77                    // We might as well ack it, keep going, and pass it along to I2NP where it
     78                    // will get dropped as corrupted.
     79                    // If we don't ack the fragment he will just send a zero-length fragment again.
     80                    if (_log.shouldLog(Log.WARN))
     81                        _log.warn("Zero-length fragment " + fragmentNum + " for message " + _messageId + " from " + _from);
     82                }
    7283                message.setValid(size);
    7384                _fragments[fragmentNum] = message;
     
    92103                               + ", size=" + size
    93104                               + ", isLast=" + isLast
    94                                + ", data=" + Base64.encode(message.getData(), 0, size));
     105                          /*   + ", data=" + Base64.encode(message.getData(), 0, size)   */  );
    95106            } catch (ArrayIndexOutOfBoundsException aioobe) {
    96107                _log.warn("Corrupt SSU fragment " + fragmentNum, aioobe);
     
    107118   
    108119    /**
    109      *  May not be valid after released
     120     *  May not be valid after released.
     121     *  Probably doesn't need to be synced by caller, given the order of
     122     *  events in receiveFragment() above, but you might want to anyway
     123     *  to be safe.
    110124     */
    111125    public boolean isComplete() {
    112         if (_lastFragment < 0) return false;
    113         for (int i = 0; i <= _lastFragment; i++)
     126        int last = _lastFragment;
     127        if (last < 0) return false;
     128        for (int i = 0; i <= last; i++)
    114129            if (_fragments[i] == null)
    115130                return false;
  • router/java/src/net/i2p/router/transport/udp/MessageReceiver.java

    r6d4a9ab r937ae8ad  
    1212import net.i2p.router.RouterContext;
    1313//import net.i2p.util.ByteCache;
     14import net.i2p.util.HexDump;
    1415import net.i2p.util.I2PThread;
    1516import net.i2p.util.Log;
     
    5455        //_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
    5556        _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
    56         _context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
    57         _context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
     57        //_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
     58        //_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
    5859        _context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
    5960       
     
    137138           
    138139            if (message != null) {
    139                 long before = System.currentTimeMillis();
     140                //long before = System.currentTimeMillis();
    140141                //if (remaining > 0)
    141142                //    _context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
    142143                int size = message.getCompleteSize();
    143                 if (_log.shouldLog(Log.INFO))
    144                     _log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
    145                 long afterRead = -1;
     144                //if (_log.shouldLog(Log.DEBUG))
     145                //    _log.debug("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
     146                //long afterRead = -1;
    146147                try {
    147148                    I2NPMessage msg = readMessage(buf, message, handler);
    148                     afterRead = System.currentTimeMillis();
     149                    //afterRead = System.currentTimeMillis();
    149150                    if (msg != null)
    150151                        _transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
     
    154155                }
    155156                message = null;
    156                 long after = System.currentTimeMillis();
    157                 if (afterRead - before > 100)
    158                     _context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
    159                 if (after - afterRead > 100)
    160                     _context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
     157                //long after = System.currentTimeMillis();
     158                //if (afterRead - before > 100)
     159                //    _context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
     160                //if (after - afterRead > 100)
     161                //    _context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
    161162            }
    162163        }
     
    169170     *  Assemble all the fragments into an I2NP message.
    170171     *  This calls state.releaseResources(), do not access state after calling this.
     172     *
     173     *  @param buf temp buffer for convenience
    171174     *  @return null on error
    172175     */
     
    174177        try {
    175178            //byte buf[] = new byte[state.getCompleteSize()];
    176             ByteArray fragments[] = state.getFragments();
     179            I2NPMessage m;
    177180            int numFragments = state.getFragmentCount();
    178             int off = 0;
    179             for (int i = 0; i < numFragments; i++) {
    180                 System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
    181                 if (_log.shouldLog(Log.DEBUG))
    182                     _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
    183                                + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
    184                                + " (valid: " + fragments[i].getValid()
    185                                + " raw: " + Base64.encode(fragments[i].getData()) + ")");
    186                 off += fragments[i].getValid();
    187             }
    188             if (off != state.getCompleteSize()) {
    189                 if (_log.shouldLog(Log.WARN))
    190                     _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
    191                 return null;
    192             }
    193             if (_log.shouldLog(Log.DEBUG))
    194                 _log.debug("Raw byte array for " + state.getMessageId() + ": " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
    195             I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
     181            if (numFragments > 1) {
     182                ByteArray fragments[] = state.getFragments();
     183                int off = 0;
     184                for (int i = 0; i < numFragments; i++) {
     185                    System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
     186                    //if (_log.shouldLog(Log.DEBUG))
     187                    //    _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": "
     188                    //               + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
     189                    //               + " (valid: " + fragments[i].getValid()
     190                    //               + " raw: " + Base64.encode(fragments[i].getData()) + ")");
     191                    off += fragments[i].getValid();
     192                }
     193                if (off != state.getCompleteSize()) {
     194                    if (_log.shouldLog(Log.WARN))
     195                        _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
     196                    return null;
     197                }
     198                //if (_log.shouldLog(Log.DEBUG))
     199                //    _log.debug("Raw byte array for " + state.getMessageId() + ": " + HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
     200                m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
     201            } else {
     202                // zero copy for single fragment
     203                m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, state.getCompleteSize(), handler);
     204            }
     205            if (state.getCompleteSize() == 534 && _log.shouldLog(Log.INFO)) {
     206                _log.info(HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
     207            }
    196208            m.setUniqueId(state.getMessageId());
    197209            return m;
    198210        } catch (I2NPMessageException ime) {
    199             if (_log.shouldLog(Log.WARN))
     211            if (_log.shouldLog(Log.WARN)) {
    200212                _log.warn("Message invalid: " + state, ime);
     213                _log.warn(HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
     214                _log.warn("RAW: " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
     215            }
    201216            _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "error: " + ime.toString() + ": " + state.toString());
    202217            return null;
  • router/java/src/net/i2p/router/transport/udp/PacketBuilder.java

    r6d4a9ab r937ae8ad  
    239239            return null;
    240240        }
     241        if (dataSize == 0) {
     242            // OK according to the protocol but if we send it, it's a bug
     243            _log.error("Sending zero-size fragment " + fragment + " of " + state + " for " + peer);
     244        }
    241245        int currentMTU = peer.getMTU();
    242246        int availableForAcks = currentMTU - MIN_DATA_PACKET_OVERHEAD - dataSize;
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r6d4a9ab r937ae8ad  
    11701170        if (_log.shouldLog(Log.DEBUG))
    11711171            _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
    1172         List<OutboundMessageState> msgs = _outboundMessages;
    11731172        int rv = 0;
    11741173        boolean fail = false;
    1175         synchronized (msgs) {
    1176             rv = msgs.size() + 1;
     1174        synchronized (_outboundMessages) {
     1175            rv = _outboundMessages.size() + 1;
    11771176            if (rv > 32) {
    11781177                // 32 queued messages?  to *one* peer?  nuh uh.
     
    12191218
    12201219            } else {
    1221                 msgs.add(state);
     1220                _outboundMessages.add(state);
    12221221            }
    12231222        }
     
    12311230        //if (_dead) return;
    12321231        _dead = true;
    1233         List<OutboundMessageState> msgs = _outboundMessages;
    12341232        //_outboundMessages = null;
    12351233        _retransmitter = null;
     
    12371235            int sz = 0;
    12381236            List<OutboundMessageState> tempList = null;
    1239             synchronized (msgs) {
    1240                 sz = msgs.size();
     1237            synchronized (_outboundMessages) {
     1238                sz = _outboundMessages.size();
    12411239                if (sz > 0) {
    1242                     tempList = new ArrayList(msgs);
    1243                     msgs.clear();
     1240                    tempList = new ArrayList(_outboundMessages);
     1241                    _outboundMessages.clear();
    12441242                }
    12451243            }
     
    12641262     */
    12651263    public int finishMessages() {
    1266         List<OutboundMessageState> msgs = _outboundMessages;
    12671264        // short circuit, unsynchronized
    1268         if (msgs.isEmpty())
     1265        if (_outboundMessages.isEmpty())
    12691266            return 0;
    12701267
     
    12771274        List<OutboundMessageState> succeeded = null;
    12781275        List<OutboundMessageState> failed = null;
    1279         synchronized (msgs) {
    1280             for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1276        synchronized (_outboundMessages) {
     1277            for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
    12811278                OutboundMessageState state = iter.next();
    12821279                if (state.isComplete()) {
     
    13021299                } // end (pushCount > maxVolleys)
    13031300            } // end iterating over outbound messages
    1304             rv = msgs.size();
     1301            rv = _outboundMessages.size();
    13051302        }
    13061303       
     
    13381335     */
    13391336    public OutboundMessageState allocateSend() {
    1340         int total = 0;
    1341         List<OutboundMessageState> msgs = _outboundMessages;
    13421337        if (_dead) return null;
    1343         synchronized (msgs) {
    1344             for (OutboundMessageState state : msgs) {
     1338        synchronized (_outboundMessages) {
     1339            for (OutboundMessageState state : _outboundMessages) {
    13451340                if (locked_shouldSend(state)) {
    13461341                    if (_log.shouldLog(Log.DEBUG))
     
    13611356                } */
    13621357            }
    1363             total = msgs.size();
    13641358        }
    13651359        if (_log.shouldLog(Log.DEBUG))
    1366             _log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + total + " remaining");
     1360            _log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + _outboundMessages.size() + " remaining");
    13671361        return null;
    13681362    }
     
    13761370        if (_dead) return rv;
    13771371        long now = _context.clock().now();
    1378         List<OutboundMessageState> msgs = _outboundMessages;
    1379         synchronized (msgs) {
     1372        synchronized (_outboundMessages) {
    13801373            if (_retransmitter != null) {
    13811374                rv = (int)(_retransmitter.getNextSendTime() - now);
    13821375                return rv;
    13831376            }
    1384             for (OutboundMessageState state : msgs) {
     1377            for (OutboundMessageState state : _outboundMessages) {
    13851378                int delay = (int)(state.getNextSendTime() - now);
    13861379                if (delay < rv)
     
    15131506        if (_dead) return false;
    15141507        OutboundMessageState state = null;
    1515         List<OutboundMessageState> msgs = _outboundMessages;
    1516         synchronized (msgs) {
    1517             for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1508        synchronized (_outboundMessages) {
     1509            for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
    15181510                state = iter.next();
    15191511                if (state.getMessageId() == messageId) {
     
    15751567        }
    15761568   
    1577         List<OutboundMessageState> msgs = _outboundMessages;
    1578        
    15791569        OutboundMessageState state = null;
    15801570        boolean isComplete = false;
    1581         synchronized (msgs) {
    1582             for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
     1571        synchronized (_outboundMessages) {
     1572            for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
    15831573                state = iter.next();
    15841574                if (state.getMessageId() == bitfield.getMessageId()) {
Note: See TracChangeset for help on using the changeset viewer.