Changeset e02d8298


Ignore:
Timestamp:
Sep 8, 2012 3:10:27 PM (8 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
d9e6c06
Parents:
98da06c
Message:
  • Run HandleJob? inline for speed
  • Remove payload from message map if availability announce fails
  • Cleanups
Location:
router/java/src/net/i2p/router/client
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/client/ClientConnectionRunner.java

    r98da06c re02d8298  
    120120   
    121121    private static volatile int __id = 0;
     122
    122123    /**
    123124     * Actually run the connection - listen for I2CP messages and respond.  This
     
    126127     *
    127128     */
    128     public void startRunning() {
     129    public synchronized void startRunning() {
    129130        try {
    130131            _reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE),
     
    138139            _out = _socket.getOutputStream(); // FIXME OWCH! needs a better way so it can be final. FIXME
    139140            _reader.startReading();
     141            // TODO need a cleaner for unclaimed items in _messages, but we have no timestamps...
    140142        } catch (IOException ioe) {
    141143            _log.error("Error starting up the runner", ioe);
     
    144146   
    145147    /** die a horrible death */
    146     void stopRunning() {
     148    public synchronized void stopRunning() {
    147149        if (_dead) return;
    148150        if (_context.router().isAlive() && _log.shouldLog(Log.WARN))
     
    172174    /** current client's sessionkeymanager */
    173175    public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; }
     176
    174177    /** currently allocated leaseSet */
    175178    public LeaseSet getLeaseSet() { return _currentLeaseSet; }
    176179    void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; }
     180
    177181    public Hash getDestHash() { return _destHashCache; }
    178182   
     
    180184    SessionId getSessionId() { return _sessionId; }
    181185    void setSessionId(SessionId id) { if (id != null) _sessionId = id; }
     186
    182187    /** data for the current leaseRequest, or null if there is no active leaseSet request */
    183188    LeaseRequestState getLeaseRequest() { return _leaseRequest; }
     189
    184190    void setLeaseRequest(LeaseRequestState req) {
    185191        synchronized (this) {
     
    189195        }
    190196    }
     197
    191198    /** already closed? */
    192199    boolean isDead() { return _dead; }
     
    470477        private final Job _onCreate;
    471478        private final Job _onFailed;
     479
    472480        public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) {
    473481            _ls = ls;
     
    476484            _onFailed = onFailed;
    477485        }
     486
    478487        public void timeReached() {
    479488            requestLeaseSet(_ls, _expirationTime, _onCreate, _onFailed);
     
    580589   
    581590    private class MessageDeliveryStatusUpdate extends JobImpl {
    582         private MessageId _messageId;
    583         private boolean _success;
     591        private final MessageId _messageId;
     592        private final boolean _success;
    584593        private long _lastTried;
     594
    585595        public MessageDeliveryStatusUpdate(MessageId id, boolean success) {
    586596            super(ClientConnectionRunner.this._context);
    587597            _messageId = id;
    588598            _success = success;
    589             _lastTried = 0;
    590599        }
    591600
  • router/java/src/net/i2p/router/client/ClientManager.java

    r98da06c re02d8298  
    239239            _msgId = id;
    240240        }
     241
    241242        public String getName() { return "Distribute local message"; }
     243
    242244        public void runJob() {
    243245            _to.receiveMessage(_toDest, _fromDest, _payload);
     
    275277
    276278    private static final int REQUEST_LEASESET_TIMEOUT = 120*1000;
     279
    277280    public void requestLeaseSet(Hash dest, LeaseSet ls) {
    278281        ClientConnectionRunner runner = getRunner(dest);
     
    299302        return rv;
    300303    }
     304
    301305    public boolean isLocal(Hash destHash) {
    302306        if (destHash == null) return false;
     
    481485   
    482486    public void messageReceived(ClientMessage msg) {
    483         _ctx.jobQueue().addJob(new HandleJob(msg));
     487        // This is fast and non-blocking, run in-line
     488        //_ctx.jobQueue().addJob(new HandleJob(msg));
     489        (new HandleJob(msg)).runJob();
    484490    }
    485491
    486492    private class HandleJob extends JobImpl {
    487         private ClientMessage _msg;
     493        private final ClientMessage _msg;
     494
    488495        public HandleJob(ClientMessage msg) {
    489496            super(_ctx);
    490497            _msg = msg;
    491498        }
     499
    492500        public String getName() { return "Handle Inbound Client Messages"; }
     501
    493502        public void runJob() {
    494             ClientConnectionRunner runner = null;
     503            ClientConnectionRunner runner;
    495504            if (_msg.getDestination() != null)
    496505                runner = getRunner(_msg.getDestination());
  • router/java/src/net/i2p/router/client/MessageReceivedJob.java

    r98da06c re02d8298  
    4141        id.setMessageId(_runner.getNextMessageId());
    4242        _runner.setPayload(id, _payload);
    43         messageAvailable(id, _payload.getSize());
     43        try {
     44            messageAvailable(id, _payload.getSize());
     45        } catch (I2CPMessageException ime) {
     46            if (_log.shouldLog(Log.ERROR))
     47                _log.error("Error writing out the message status message", ime);
     48            _runner.removePayload(id);
     49        }
    4450    }
    4551   
     
    4753     * Deliver notification to the client that the given message is available.
    4854     */
    49     private void messageAvailable(MessageId id, long size) {
    50         if (_log.shouldLog(Log.DEBUG))
    51             _log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId()
    52                        + " (with nonce=1)", new Exception("available"));
     55    private void messageAvailable(MessageId id, long size) throws I2CPMessageException {
     56        //if (_log.shouldLog(Log.DEBUG))
     57        //    _log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId()
     58        //               + " (with nonce=1)", new Exception("available"));
    5359        MessageStatusMessage msg = new MessageStatusMessage();
    5460        msg.setMessageId(id.getMessageId());
     
    5864        msg.setNonce(1);
    5965        msg.setStatus(MessageStatusMessage.STATUS_AVAILABLE);
    60         try {
    61             _runner.doSend(msg);
    62         } catch (I2CPMessageException ime) {
    63             if (_log.shouldLog(Log.ERROR))
    64                 _log.error("Error writing out the message status message", ime);
    65         }
     66        _runner.doSend(msg);
    6667    }
    6768}
  • router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java

    r98da06c re02d8298  
    3232     */
    3333    @Override
    34     public void startRunning() {
     34    public synchronized void startRunning() {
    3535        _reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this, false));
    3636        _reader.startReading();
     
    4141     */
    4242    @Override
    43     void stopRunning() {
     43    public synchronized void stopRunning() {
    4444        super.stopRunning();
    4545        queue.close();
Note: See TracChangeset for help on using the changeset viewer.