Changeset 6364f14


Ignore:
Timestamp:
Sep 3, 2011 5:44:23 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
070d3ee
Parents:
7b974e7
Message:
  • UDP Inbound:
    • Hopefully fix race NPE, thx dream
    • Catch some more fragment errors
    • Exception and log tweaks
    • Cleanups and javadocs
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • history.txt

    r7b974e7 r6364f14  
     12011-09-03 zzz
     2  * i2psnark: Fix "eject" button in certain browsers (ticket #511)
     3  * UDP Inbound:
     4    - Hopefully fix race NPE, thx dream
     5    - Catch some more fragment errors
     6    - Exception and log tweaks
     7    - Cleanups and javadocs
     8
    192011-09-02 zzz
    210  * Console: Cache user-agent processing
  • router/java/src/net/i2p/router/RouterVersion.java

    r7b974e7 r6364f14  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 9;
     21    public final static long BUILD = 10;
    2222
    2323    /** for example "-test" */
  • router/java/src/net/i2p/router/transport/ntcp/EventPumper.java

    r7b974e7 r6364f14  
    2828 */
    2929class EventPumper implements Runnable {
    30     private RouterContext _context;
    31     private Log _log;
     30    private final RouterContext _context;
     31    private final Log _log;
    3232    private volatile boolean _alive;
    3333    private Selector _selector;
     
    3737    private final LinkedBlockingQueue<ServerSocketChannel> _wantsRegister = new LinkedBlockingQueue<ServerSocketChannel>();
    3838    private final LinkedBlockingQueue<NTCPConnection> _wantsConRegister = new LinkedBlockingQueue<NTCPConnection>();
    39     private NTCPTransport _transport;
     39    private final NTCPTransport _transport;
    4040    private long _expireIdleWriteTime;
    4141   
  • router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java

    r7b974e7 r6364f14  
    964964    private void processExpired(OutboundEstablishState outboundState) {
    965965        if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
    966             if (_log.shouldLog(Log.WARN))
    967                 _log.warn("Lifetime of expired outbound establish: " + outboundState.getLifetime());
     966            if (_log.shouldLog(Log.INFO))
     967                _log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
    968968            while (true) {
    969969                OutNetMessage msg = outboundState.getNextQueuedMessage();
  • router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java

    r7b974e7 r6364f14  
    123123         
    124124            synchronized (messages) {
     125                boolean isNew = false;
    125126                state = messages.get(messageId);
    126127                if (state == null) {
    127128                    state = new InboundMessageState(_context, mid, fromPeer);
    128                     messages.put(messageId, state);
     129                    isNew = true;
     130                    // we will add to messages shortly if it isn't complete
    129131                }
    130132               
     
    133135                if (state.isComplete()) {
    134136                    messageComplete = true;
    135                     messages.remove(messageId);
     137                    if (!isNew)
     138                        messages.remove(messageId);
    136139                } else if (state.isExpired()) {
    137140                    messageExpired = true;
    138                     messages.remove(messageId);
     141                    if (!isNew)
     142                        messages.remove(messageId);
    139143                } else {
    140144                    partialACK = true;
     145                    if (isNew)
     146                        messages.put(messageId, state);
    141147                }
    142148            }
     
    144150            if (messageComplete) {
    145151                _recentlyCompletedMessages.add(mid);
    146                 _messageReceiver.receiveMessage(state);
    147 
    148152                from.messageFullyReceived(messageId, state.getCompleteSize());
    149153                _ackSender.ackPeer(from);
     
    155159                if (state.getFragmentCount() > 0)
    156160                    _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
     161
     162                // this calls state.releaseResources(), all state access must be before this
     163                _messageReceiver.receiveMessage(state);
    157164            } else if (messageExpired) {
    158                 state.releaseResources();
    159165                if (_log.shouldLog(Log.WARN))
    160166                    _log.warn("Message expired while only being partially read: " + state);
    161167                _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
     168                // all state access must be before this
     169                state.releaseResources();
    162170            } else if (partialACK) {
    163171                // not expired but not yet complete... lets queue up a partial ACK
     
    168176            }
    169177
     178            // TODO: Why give up on other fragments if one is bad?
    170179            if (!fragmentOK)
    171180                break;
  • router/java/src/net/i2p/router/transport/udp/InboundMessageState.java

    r7b974e7 r6364f14  
    99
    1010/**
    11  * Hold the raw data fragments of an inbound message
     11 * Hold the raw data fragments of an inbound message.
    1212 *
     13 * Warning - there is no synchronization in this class, take care in
     14 * InboundMessageFragments to avoid use-after-release, etc.
    1315 */
    1416class InboundMessageState {
     
    2224     */
    2325    private final ByteArray _fragments[];
     26
    2427    /**
    2528     * what is the last fragment in the message (or -1 if not yet known)
     29     * Fragment count is _lastFragment + 1
    2630     */
    2731    private int _lastFragment;
     
    5054    /**
    5155     * Read in the data from the fragment.
     56     * Caller should synchronize.
    5257     *
    5358     * @return true if the data was ok, false if it was corrupt
     
    5560    public boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) {
    5661        int fragmentNum = data.readMessageFragmentNum(dataFragment);
    57         if ( (fragmentNum < 0) || (fragmentNum > _fragments.length)) {
    58             _log.warn("Invalid fragment " + fragmentNum + "/" + _fragments.length);
     62        if ( (fragmentNum < 0) || (fragmentNum >= MAX_FRAGMENTS)) {
     63            _log.warn("Invalid fragment " + fragmentNum + '/' + MAX_FRAGMENTS);
    5964            return false;
    6065        }
     
    6873                _fragments[fragmentNum] = message;
    6974                boolean isLast = data.readMessageIsLast(dataFragment);
    70                 if (isLast)
     75                if (isLast) {
     76                    // don't allow _lastFragment to be set twice
     77                    if (_lastFragment >= 0) {
     78                        if (_log.shouldLog(Log.ERROR))
     79                            _log.error("Multiple last fragments for message " + _messageId + " from " + _from);
     80                        return false;
     81                    }
     82                    // TODO - check for non-last fragments after this one?
    7183                    _lastFragment = fragmentNum;
     84                } else if (_lastFragment >= 0 && fragmentNum >= _lastFragment) {
     85                    // don't allow non-last after last
     86                    if (_log.shouldLog(Log.ERROR))
     87                        _log.error("Non-last fragment " + fragmentNum + " when last is " + _lastFragment + " for message " + _messageId + " from " + _from);
     88                    return false;
     89                }
    7290                if (_log.shouldLog(Log.DEBUG))
    7391                    _log.debug("New fragment " + fragmentNum + " for message " + _messageId
     
    88106    }
    89107   
     108    /**
     109     *  May not be valid after released
     110     */
    90111    public boolean isComplete() {
    91112        if (_lastFragment < 0) return false;
     
    95116        return true;
    96117    }
     118
    97119    public boolean isExpired() {
    98120        return _context.clock().now() > _receiveBegin + MAX_RECEIVE_TIME;
    99121    }
     122
    100123    public long getLifetime() {
    101124        return _context.clock().now() - _receiveBegin;
    102125    }
     126
    103127    public Hash getFrom() { return _from; }
     128
    104129    public long getMessageId() { return _messageId; }
     130
     131    /**
     132     *  @throws IllegalStateException if released or not isComplete()
     133     */
    105134    public int getCompleteSize() {
    106135        if (_completeSize < 0) {
     136            if (_lastFragment < 0)
     137                throw new IllegalStateException("last fragment not set");
     138            if (_released)
     139                throw new IllegalStateException("SSU IMS 2 Use after free");
    107140            int size = 0;
    108             for (int i = 0; i <= _lastFragment; i++)
    109                 size += _fragments[i].getValid();
     141            for (int i = 0; i <= _lastFragment; i++) {
     142                ByteArray frag = _fragments[i];
     143                if (frag == null)
     144                    throw new IllegalStateException("null fragment " + i + '/' + _lastFragment);
     145                size += frag.getValid();
     146            }
    110147            _completeSize = size;
    111148        }
    112149        return _completeSize;
    113150    }
     151
    114152    public ACKBitfield createACKBitfield() {
    115153        return new PartialBitfield(_messageId, _fragments);
     
    155193   
    156194    public void releaseResources() {
    157         for (int i = 0; i < _fragments.length; i++) {
     195        _released = true;
     196        for (int i = 0; i < MAX_FRAGMENTS; i++) {
    158197            if (_fragments[i] != null) {
    159198                _fragmentCache.release(_fragments[i]);
     
    161200            }
    162201        }
    163         _released = true;
    164     }
    165    
     202    }
     203   
     204    /**
     205     *  @throws IllegalStateException if released
     206     */
    166207    public ByteArray[] getFragments() {
    167208        if (_released) {
    168             RuntimeException e = new RuntimeException("Use after free: " + toString());
     209            RuntimeException e = new IllegalStateException("Use after free: " + _messageId);
    169210            _log.error("SSU IMS", e);
    170211            throw e;
     
    172213        return _fragments;
    173214    }
     215
    174216    public int getFragmentCount() { return _lastFragment+1; }
    175217   
     218    /**
     219     *  May not be valid if released, or may NPE on race with release, use with care in exception text
     220     */
    176221    @Override
    177222    public String toString() {
    178223        StringBuilder buf = new StringBuilder(256);
    179224        buf.append("IB Message: ").append(_messageId);
     225        buf.append(" from ").append(_from.toString());
    180226        if (isComplete()) {
    181227            buf.append(" completely received with ");
    182             buf.append(getCompleteSize()).append(" bytes");
     228            //buf.append(getCompleteSize()).append(" bytes");
     229            // may display -1 but avoid cascaded exceptions after release
     230            buf.append(_completeSize).append(" bytes");
    183231        } else {
    184             for (int i = 0; i < _lastFragment; i++) {
     232            for (int i = 0; i <= _lastFragment; i++) {
    185233                buf.append(" fragment ").append(i);
    186234                if (_fragments[i] != null)
  • router/java/src/net/i2p/router/transport/udp/MessageReceiver.java

    r7b974e7 r6364f14  
    9090    }
    9191   
     92    /**
     93     *  This queues the message for processing.
     94     *  Processing will call state.releaseResources(), do not access state after calling this.
     95     */
    9296    public void receiveMessage(InboundMessageState state) {
    9397        //int total = 0;
     
    121125                        if ( (message != null) && (message.isExpired()) ) {
    122126                            expiredLifetime += message.getLifetime();
     127                            // message.releaseResources() ??
    123128                            message = null;
    124129                            expired++;
     
    161166    }
    162167   
     168    /**
     169     *  Assemble all the fragments into an I2NP message.
     170     *  This calls state.releaseResources(), do not access state after calling this.
     171     *  @return null on error
     172     */
    163173    private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {
    164174        try {
  • router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java

    r7b974e7 r6364f14  
    426426            _log.debug("Got a packet, nextSend == now");
    427427    }
     428
     429    /** @since 0.8.9 */
     430    @Override
     431    public String toString() {
     432        return "OES " + _remoteHostId;
     433    }
    428434}
  • router/java/src/net/i2p/router/transport/udp/PeerState.java

    r7b974e7 r6364f14  
    650650       
    651651        synchronized (_inboundMessages) {
    652             int remaining = _inboundMessages.size();
    653             for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) {
    654                 InboundMessageState state = (InboundMessageState)iter.next();
     652            for (Iterator<InboundMessageState> iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
     653                InboundMessageState state = iter.next();
    655654                if (state.isExpired() || _dead) {
    656655                    iter.remove();
     656                    // state.releaseResources() ??
    657657                } else {
    658658                    if (state.isComplete()) {
    659659                        _log.error("inbound message is complete, but wasn't handled inline? " + state + " with " + this);
    660660                        iter.remove();
     661                        // state.releaseResources() ??
    661662                    } else {
    662663                        rv++;
     
    842843                    //    ((RouterContext)_context).messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired partially received: " + state.toString());
    843844                    iter.remove();
     845                    // state.releaseResources() ??
    844846                } else {
    845847                    if (!state.isComplete()) {
Note: See TracChangeset for help on using the changeset viewer.