Changeset 6be5494


Ignore:
Timestamp:
Jan 20, 2009 5:22:56 PM (12 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
c620420
Parents:
ab92206
Message:
  • Streaming, I2CP, Client Message sending: Pass message timeout through new I2CP message SendMessageExpiresMessage?, so that the router uses the same expiration as the streaming lib. Should help reliability.
  • I2CP: Implement new I2CP message ReconfigureSessionMessage?. Will be used for tunnel reduction.
Files:
2 added
12 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java

    rab92206 r6be5494  
    8383            // this should not block!
    8484            begin = _context.clock().now();
    85             sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent);
     85            long expires = 0;
     86            Connection.ResendPacketEvent rpe = (Connection.ResendPacketEvent) packet.getResendEvent();
     87            if (rpe != null)
     88                // we want the router to expire it a little before we do,
     89                // so if we retransmit it will use a new tunnel/lease combo
     90                expires = rpe.getNextSendTime() - 500;
     91            if (expires > 0)
     92                sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, expires);
     93            else
     94                sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent);
    8695            end = _context.clock().now();
    8796           
  • core/java/src/net/i2p/client/I2CPMessageProducer.java

    rab92206 r6be5494  
    1010 */
    1111
     12import java.util.Date;
    1213import java.util.Set;
    1314
     
    2930import net.i2p.data.i2cp.ReportAbuseMessage;
    3031import net.i2p.data.i2cp.SendMessageMessage;
     32import net.i2p.data.i2cp.SendMessageExpiresMessage;
    3133import net.i2p.data.i2cp.SessionConfig;
    3234import net.i2p.util.Log;
     
    9294     */
    9395    public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag,
    94                             SessionKey key, Set tags, SessionKey newKey) throws I2PSessionException {
    95         SendMessageMessage msg = new SendMessageMessage();
     96                            SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException {
     97        SendMessageMessage msg;
     98        if (expires > 0) {
     99            msg = new SendMessageExpiresMessage();
     100            ((SendMessageExpiresMessage)msg).setExpiration(new Date(expires));
     101        } else
     102            msg = new SendMessageMessage();
    96103        msg.setDestination(dest);
    97104        msg.setSessionId(session.getSessionId());
  • core/java/src/net/i2p/client/I2PSession.java

    rab92206 r6be5494  
    7171    public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
    7272    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
     73    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException;
    7374
    7475    /** Receive a message that the router has notified the client about, returning
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    rab92206 r6be5494  
    551551     */
    552552    void propogateError(String msg, Throwable error) {
    553         if (_log.shouldLog(Log.WARN))
    554             _log.warn(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage());
    555         if (_log.shouldLog(Log.WARN))
    556             _log.warn(getPrefix() + " cause", error);
     553        if (_log.shouldLog(Log.ERROR))
     554            _log.error(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage());
     555        if (_log.shouldLog(Log.ERROR))
     556            _log.error(getPrefix() + " cause", error);
    557557       
    558558        if (_sessionListener != null) _sessionListener.errorOccurred(this, msg, error);
  • core/java/src/net/i2p/client/I2PSessionImpl2.java

    rab92206 r6be5494  
    108108    }
    109109    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException {
    110         return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64));
     110        return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64), 0);
    111111    }
    112112   
    113113    @Override
    114114    public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException {
    115         return sendMessage(dest, payload, 0, payload.length, keyUsed, tagsSent);
     115        return sendMessage(dest, payload, 0, payload.length, keyUsed, tagsSent, 0);
    116116    }
    117117    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent)
     118                   throws I2PSessionException {
     119        return sendMessage(dest, payload, offset, size, keyUsed, tagsSent, 0);
     120    }
     121    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires)
    118122                   throws I2PSessionException {
    119123        if (_log.shouldLog(Log.DEBUG)) _log.debug("sending message");
     
    143147        _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
    144148        _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
    145         return sendBestEffort(dest, payload, keyUsed, tagsSent);
     149        return sendBestEffort(dest, payload, keyUsed, tagsSent, expires);
    146150    }
    147151
     
    169173    private static final int NUM_TAGS = 50;
    170174
    171     private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
     175    private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent, long expires)
    172176                    throws I2PSessionException {
    173177        SessionKey key = null;
     
    177181        int oldTags = 0;
    178182        long begin = _context.clock().now();
     183        /***********
    179184        if (I2CPMessageProducer.END_TO_END_CRYPTO) {
    180185            if (_log.shouldLog(Log.DEBUG)) _log.debug("begin sendBestEffort");
     
    221226            // not using end to end crypto, so don't ever bundle any tags
    222227        }
     228        **********/
    223229       
    224230        if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
     
    234240
    235241        if (keyUsed != null) {
    236             if (I2CPMessageProducer.END_TO_END_CRYPTO) {
    237                 if (newKey != null)
    238                     keyUsed.setData(newKey.getData());
    239                 else
    240                     keyUsed.setData(key.getData());
    241             } else {
     242            //if (I2CPMessageProducer.END_TO_END_CRYPTO) {
     243            //    if (newKey != null)
     244            //        keyUsed.setData(newKey.getData());
     245            //    else
     246            //        keyUsed.setData(key.getData());
     247            //} else {
    242248                keyUsed.setData(SessionKey.INVALID_KEY.getData());
    243             }
     249            //}
    244250        }
    245251        if (tagsSent != null) {
     
    262268                       + " sync took " + (inSendingSync-beforeSendingSync)
    263269                       + " add took " + (afterSendingSync-inSendingSync));
    264         _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
     270        _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires);
    265271       
    266272        // since this is 'best effort', all we're waiting for is a status update
  • core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java

    rab92206 r6be5494  
    2222import net.i2p.data.PrivateKey;
    2323import net.i2p.data.PublicKey;
     24import net.i2p.data.SessionKey;
    2425import net.i2p.data.SigningPrivateKey;
    2526import net.i2p.data.SigningPublicKey;
     
    7980        leaseSet.setEncryptionKey(li.getPublicKey());
    8081        leaseSet.setSigningKey(li.getSigningPublicKey());
     82        String sk = session.getOptions().getProperty("i2cp.sessionKey");
     83        if (sk != null) {
     84            SessionKey key = new SessionKey();
     85            try {
     86                key.fromBase64(sk);
     87                leaseSet.encrypt(key);
     88                _context.keyRing().put(session.getMyDestination().calculateHash(), key);
     89            } catch (DataFormatException dfe) {
     90                _log.error("Bad session key: " + sk);
     91            }
     92        }
    8193        try {
    8294            leaseSet.sign(session.getPrivateKey());
  • core/java/src/net/i2p/data/i2cp/I2CPMessageHandler.java

    rab92206 r6be5494  
    1919
    2020/**
    21  * Handle messages from the server for the client
     21 * Handle messages from the server for the client or vice versa
    2222 *
    2323 */
     
    7676        case SendMessageMessage.MESSAGE_TYPE:
    7777            return new SendMessageMessage();
     78        case SendMessageExpiresMessage.MESSAGE_TYPE:
     79            return new SendMessageExpiresMessage();
    7880        case SessionStatusMessage.MESSAGE_TYPE:
    7981            return new SessionStatusMessage();
  • router/java/src/net/i2p/router/ClientMessage.java

    rab92206 r6be5494  
    2828    private Hash _destinationHash;
    2929    private MessageId _messageId;
     30    private long _expiration;
    3031   
    3132    public ClientMessage() {
     
    3738        setDestinationHash(null);
    3839        setMessageId(null);
     40        setExpiration(0);
    3941    }
    4042   
     
    9294    public SessionConfig getSenderConfig() { return _senderConfig; }
    9395    public void setSenderConfig(SessionConfig config) { _senderConfig = config; }
     96
     97    /**
     98     * Expiration requested by the client that sent the message.  This will only be available
     99     * for locally originated messages.
     100     *
     101     */
     102    public long getExpiration() { return _expiration; }
     103    public void setExpiration(long e) { _expiration = e; }
    94104}
  • router/java/src/net/i2p/router/client/ClientConnectionRunner.java

    rab92206 r6be5494  
    3030import net.i2p.data.i2cp.MessageStatusMessage;
    3131import net.i2p.data.i2cp.SendMessageMessage;
     32import net.i2p.data.i2cp.SendMessageExpiresMessage;
    3233import net.i2p.data.i2cp.SessionConfig;
    3334import net.i2p.data.i2cp.SessionId;
     
    271272        MessageId id = new MessageId();
    272273        id.setMessageId(getNextMessageId());
     274        long expiration = 0;
     275        if (message instanceof SendMessageExpiresMessage)
     276            expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime();
    273277        long beforeLock = _context.clock().now();
    274278        long inLock = 0;
     
    292296        SessionConfig cfg = _config;
    293297        if (cfg != null)
    294             _manager.distributeMessage(cfg.getDestination(), dest, payload, id);
     298            _manager.distributeMessage(cfg.getDestination(), dest, payload, id, expiration);
    295299        long timeToDistribute = _context.clock().now() - beforeDistribute;
    296300        if (_log.shouldLog(Log.DEBUG))
  • router/java/src/net/i2p/router/client/ClientManager.java

    rab92206 r6be5494  
    141141    }
    142142   
    143     void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId) {
     143    void distributeMessage(Destination fromDest, Destination toDest, Payload payload, MessageId msgId, long expiration) {
    144144        // check if there is a runner for it
    145145        ClientConnectionRunner runner = getRunner(toDest);
     
    169169            msg.setFromDestination(runner.getConfig().getDestination());
    170170            msg.setMessageId(msgId);
     171            msg.setExpiration(expiration);
    171172            _ctx.clientMessagePool().add(msg, true);
    172173        }
  • router/java/src/net/i2p/router/client/ClientMessageEventListener.java

    rab92206 r6be5494  
    2222import net.i2p.data.i2cp.ReceiveMessageBeginMessage;
    2323import net.i2p.data.i2cp.ReceiveMessageEndMessage;
     24import net.i2p.data.i2cp.ReconfigureSessionMessage;
    2425import net.i2p.data.i2cp.SendMessageMessage;
     26import net.i2p.data.i2cp.SendMessageExpiresMessage;
    2527import net.i2p.data.i2cp.SessionId;
    2628import net.i2p.data.i2cp.SessionStatusMessage;
     
    6769            case SendMessageMessage.MESSAGE_TYPE:
    6870                handleSendMessage(reader, (SendMessageMessage)message);
     71                break;
     72            case SendMessageExpiresMessage.MESSAGE_TYPE:
     73                handleSendMessage(reader, (SendMessageExpiresMessage)message);
    6974                break;
    7075            case ReceiveMessageBeginMessage.MESSAGE_TYPE:
     
    238243    }
    239244
     245    /**
     246     * Message's Session ID ignored. This doesn't support removing previously set options.
     247     * Nor do we bother with message.getSessionConfig().verifySignature() ... should we?
     248     *
     249     */
     250    private void handleReconfigureSession(I2CPMessageReader reader, ReconfigureSessionMessage message) {
     251        if (_log.shouldLog(Log.INFO))
     252            _log.info("Updating options - session " + _runner.getSessionId());
     253        _runner.getConfig().getOptions().putAll(message.getSessionConfig().getOptions());
     254    }
     255   
    240256    // this *should* be mod 65536, but UnsignedInteger is still b0rked.  FIXME
    241257    private final static int MAX_SESSION_ID = 32767;
  • router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java

    rab92206 r6be5494  
    7070    public final static String OVERALL_TIMEOUT_MS_PARAM = "clientMessageTimeout";
    7171    private final static long OVERALL_TIMEOUT_MS_DEFAULT = 60*1000;
     72    private final static long OVERALL_TIMEOUT_MS_MIN = 5*1000;
    7273   
    7374    /** priority of messages, that might get honored some day... */
     
    126127        _toString = _to.calculateHash().toBase64().substring(0,4);
    127128        _leaseSetLookupBegin = -1;
    128        
    129         String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
    130         if (param == null)
    131             param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM);
    132         if (param != null) {
    133             try {
    134                 timeoutMs = Long.parseLong(param);
    135             } catch (NumberFormatException nfe) {
    136                 if (_log.shouldLog(Log.WARN))
    137                     _log.warn("Invalid client message timeout specified [" + param
    138                               + "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe);
    139                 timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
    140             }
    141         }
    142        
    143129        _start = getContext().clock().now();
    144         _overallExpiration = timeoutMs + _start;
     130       
     131        // use expiration requested by client if available, otherwise session config,
     132        // otherwise router config, otherwise default
     133        _overallExpiration = msg.getExpiration();
     134        if (_overallExpiration > 0) {
     135           _overallExpiration = Math.max(_overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN);
     136           _overallExpiration = Math.min(_overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT);
     137           if (_log.shouldLog(Log.WARN))
     138               _log.warn("Message Expiration (ms): " + (_overallExpiration - _start));
     139        } else {
     140            String param = msg.getSenderConfig().getOptions().getProperty(OVERALL_TIMEOUT_MS_PARAM);
     141            if (param == null)
     142                param = ctx.router().getConfigSetting(OVERALL_TIMEOUT_MS_PARAM);
     143            if (param != null) {
     144                try {
     145                    timeoutMs = Long.parseLong(param);
     146                } catch (NumberFormatException nfe) {
     147                    if (_log.shouldLog(Log.WARN))
     148                        _log.warn("Invalid client message timeout specified [" + param
     149                                  + "], defaulting to " + OVERALL_TIMEOUT_MS_DEFAULT, nfe);
     150                    timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT;
     151                }
     152            }
     153            _overallExpiration = timeoutMs + _start;
     154           if (_log.shouldLog(Log.WARN))
     155               _log.warn("Default Expiration (ms): " + timeoutMs);
     156        }
    145157        _finished = false;
    146158    }
     
    446458        boolean wantACK = true;
    447459        int existingTags = GarlicMessageBuilder.estimateAvailableTags(getContext(), _leaseSet.getEncryptionKey());
     460        // what's the point of 5% random? possible improvements or replacements:
     461        // - wantACK if we changed their inbound lease
     462        // - wantACK if we changed our outbound tunnel (requires moving selectOutboundTunnel() before this)
     463        // - wantACK if we haven't in last 1m (requires a new static cache probably)
    448464        if ( (existingTags > 30) && (getContext().random().nextInt(100) >= 5) )
    449465            wantACK = false;
Note: See TracChangeset for help on using the changeset viewer.