Changeset 8371b8f


Ignore:
Timestamp:
May 15, 2014 8:11:21 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
f69b757
Parents:
a93666c
Message:
  • I2CP: Client-side prep for asynch status for sent messages (ticket #788)
Location:
core/java/src/net/i2p/client
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • core/java/src/net/i2p/client/I2CPMessageProducer.java

    ra93666c r8371b8f  
    302302     * @param newKey unused - no end-to-end crypto
    303303     */
    304     private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set tags,
     304    private Payload createPayload(Destination dest, byte[] payload, SessionTag tag, SessionKey key, Set<SessionTag> tags,
    305305                                  SessionKey newKey) throws I2PSessionException {
    306306        if (dest == null) throw new I2PSessionException("No destination specified");
     
    347347     *
    348348     */
    349     public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv, PrivateKey priv)
    350                                                                                                                          throws I2PSessionException {
     349    public void createLeaseSet(I2PSessionImpl session, LeaseSet leaseSet, SigningPrivateKey signingPriv,
     350                               PrivateKey priv) throws I2PSessionException {
    351351        CreateLeaseSetMessage msg = new CreateLeaseSetMessage();
    352352        msg.setLeaseSet(leaseSet);
  • core/java/src/net/i2p/client/I2PSession.java

    ra93666c r8371b8f  
    4848    public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException;
    4949
     50    /** Send a new message to the given destination, containing the specified
     51     * payload, returning true if the router feels confident that the message
     52     * was delivered.
     53     *
     54     * WARNING: It is recommended that you use a method that specifies the protocol and ports.
     55     *
     56     * @param dest location to send the message
     57     * @param payload body of the message to be sent (unencrypted)
     58     * @return success
     59     */
    5060    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException;
    5161
    5262    /**
    5363     * See I2PSessionMuxedImpl for proto/port details.
     64     * @return success
    5465     * @since 0.7.1
    5566     */
     
    8495     *                 the contents of the set is ignored during the call, but afterwards it contains a set of SessionTag
    8596     *                 objects that were sent along side the given keyUsed.
     97     * @return success
    8698     */
    8799    public boolean sendMessage(Destination dest, byte[] payload, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
     
    91103     * @param keyUsed UNUSED, IGNORED.
    92104     * @param tagsSent UNUSED, IGNORED.
     105     * @return success
    93106     */
    94107    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent) throws I2PSessionException;
     
    99112     * @param tagsSent UNUSED, IGNORED.
    100113     * @param expire absolute expiration timestamp, NOT interval from now
     114     * @return success
    101115     * @since 0.7.1
    102116     */
     
    108122     * @param keyUsed UNUSED, IGNORED.
    109123     * @param tagsSent UNUSED, IGNORED.
     124     * @param proto 1-254 or 0 for unset; recommended:
     125     *         I2PSession.PROTO_UNSPECIFIED
     126     *         I2PSession.PROTO_STREAMING
     127     *         I2PSession.PROTO_DATAGRAM
     128     *         255 disallowed
     129     * @param fromPort 1-65535 or 0 for unset
     130     * @param toPort 1-65535 or 0 for unset
     131     * @return success
    110132     * @since 0.7.1
    111133     */
     
    119141     * @param tagsSent UNUSED, IGNORED.
    120142     * @param expire absolute expiration timestamp, NOT interval from now
     143     * @param proto 1-254 or 0 for unset; recommended:
     144     *         I2PSession.PROTO_UNSPECIFIED
     145     *         I2PSession.PROTO_STREAMING
     146     *         I2PSession.PROTO_DATAGRAM
     147     *         255 disallowed
     148     * @param fromPort 1-65535 or 0 for unset
     149     * @param toPort 1-65535 or 0 for unset
     150     * @return success
    121151     * @since 0.7.1
    122152     */
     
    130160     * @param tagsSent UNUSED, IGNORED.
    131161     * @param expire absolute expiration timestamp, NOT interval from now
     162     * @param proto 1-254 or 0 for unset; recommended:
     163     *         I2PSession.PROTO_UNSPECIFIED
     164     *         I2PSession.PROTO_STREAMING
     165     *         I2PSession.PROTO_DATAGRAM
     166     *         255 disallowed
     167     * @param fromPort 1-65535 or 0 for unset
     168     * @param toPort 1-65535 or 0 for unset
     169     * @return success
    132170     * @since 0.8.4
    133171     */
     
    138176     * See I2PSessionMuxedImpl for proto/port details.
    139177     * See SendMessageOptions for option details.
     178     *
     179     * @param proto 1-254 or 0 for unset; recommended:
     180     *         I2PSession.PROTO_UNSPECIFIED
     181     *         I2PSession.PROTO_STREAMING
     182     *         I2PSession.PROTO_DATAGRAM
     183     *         255 disallowed
     184     * @param fromPort 1-65535 or 0 for unset
     185     * @param toPort 1-65535 or 0 for unset
     186     * @param options to be passed to the router
     187     * @return success
    140188     * @since 0.9.2
    141189     */
    142190    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
    143191                               int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException;
     192
     193    /**
     194     * Send a message and request an asynchronous notification of delivery status.
     195     * Notifications will be delivered at least up to the expiration specified in the options,
     196     * or 60 seconds if not specified.
     197     *
     198     * See I2PSessionMuxedImpl for proto/port details.
     199     * See SendMessageOptions for option details.
     200     *
     201     * @param proto 1-254 or 0 for unset; recommended:
     202     *         I2PSession.PROTO_UNSPECIFIED
     203     *         I2PSession.PROTO_STREAMING
     204     *         I2PSession.PROTO_DATAGRAM
     205     *         255 disallowed
     206     * @param fromPort 1-65535 or 0 for unset
     207     * @param toPort 1-65535 or 0 for unset
     208     * @param options to be passed to the router
     209     * @return the message ID to be used for later notification to the listener
     210     * @throws I2PSessionException on all errors
     211     * @since 0.9.14
     212     */
     213    public long sendMessage(Destination dest, byte[] payload, int offset, int size,
     214                               int proto, int fromport, int toport,
     215                               SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException;
    144216
    145217    /** Receive a message that the router has notified the client about, returning
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    ra93666c r8371b8f  
    5252import net.i2p.util.Log;
    5353import net.i2p.util.OrderedProperties;
    54 import net.i2p.util.SimpleTimer;
     54import net.i2p.util.SimpleTimer2;
    5555import net.i2p.util.VersionComparator;
    5656
     
    619619
    620620    /**
    621      *  Fire up a periodic task to check for unclamed messages
     621     *  Fire up a periodic task to check for unclaimed messages
    622622     *  @since 0.9.1
    623623     */
    624     private void startVerifyUsage() {
    625         _context.simpleScheduler().addEvent(new VerifyUsage(), VERIFY_USAGE_TIME);
     624    protected void startVerifyUsage() {
     625        new VerifyUsage();
    626626    }
    627627
    628628    /**
    629629     *  Check for unclaimed messages, without wastefully setting a timer for each
    630      *  message. Just copy all unclaimed ones and check 30 seconds later.
    631      */
    632     private class VerifyUsage implements SimpleTimer.TimedEvent {
     630     *  message. Just copy all unclaimed ones and check some time later.
     631     */
     632    private class VerifyUsage extends SimpleTimer2.TimedEvent {
    633633        private final List<Long> toCheck = new ArrayList<Long>();
    634634       
     635        public VerifyUsage() {
     636             super(_context.simpleTimer2(), VERIFY_USAGE_TIME);
     637        }
     638
    635639        public void timeReached() {
    636640            if (isClosed())
     
    642646                    MessagePayloadMessage removed = _availableMessages.remove(msgId);
    643647                    if (removed != null)
    644                         _log.error("Message NOT removed! id=" + msgId + ": " + removed);
     648                        _log.error(getPrefix() + " Client not responding? Message not processed! id=" + msgId + ": " + removed);
    645649                }
    646650                toCheck.clear();
    647651            }
    648652            toCheck.addAll(_availableMessages.keySet());
    649             _context.simpleScheduler().addEvent(this, VERIFY_USAGE_TIME);
     653            schedule(VERIFY_USAGE_TIME);
    650654        }
    651655    }
  • core/java/src/net/i2p/client/I2PSessionImpl2.java

    ra93666c r8371b8f  
    1212import java.io.IOException;
    1313import java.io.InputStream;
    14 import java.util.HashSet;
    1514import java.util.Iterator;
    1615import java.util.Locale;
     16import java.util.Map;
    1717import java.util.Properties;
    1818import java.util.Set;
     19import java.util.concurrent.ConcurrentHashMap;
     20import java.util.concurrent.atomic.AtomicLong;
    1921
    2022import net.i2p.I2PAppContext;
     
    2527import net.i2p.data.i2cp.MessageStatusMessage;
    2628import net.i2p.util.Log;
     29import net.i2p.util.SimpleTimer2;
    2730
    2831/**
     
    3639
    3740    /** set of MessageState objects, representing all of the messages in the process of being sent */
    38     private /* FIXME final FIXME */ Set<MessageState> _sendingStates;
     41    protected final Map<Long, MessageState> _sendingStates;
     42    protected final AtomicLong _sendMessageNonce;
    3943    /** max # seconds to wait for confirmation of the message send */
    4044    private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send
     
    4549    protected boolean _noEffort;
    4650
     51    private static final long REMOVE_EXPIRED_TIME = 63*1000;
     52
    4753     /**
    4854      * for extension by SimpleSession (no dest)
     
    5157                              I2PClientMessageHandlerMap handlerMap) {
    5258        super(context, options, handlerMap);
     59        _sendingStates = null;
     60        _sendMessageNonce = null;
    5361    }
    5462
     
    6472    public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
    6573        super(ctx, destKeyStream, options);
    66         _sendingStates = new HashSet<MessageState>(32);
     74        _sendingStates = new ConcurrentHashMap<Long, MessageState>(32);
     75        _sendMessageNonce = new AtomicLong();
    6776        // default is BestEffort
    6877        _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
    6978
    70         ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
     79        //ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
    7180        //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
    7281        //ctx.statManager().createRateStat("i2cp.sendBestEffortStage1", "second part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
     
    8190        _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 });
    8291        _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 });
    83         _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
     92        //_context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
    8493        _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 });
    8594        _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
    8695    }
     96
     97    /**
     98     *  Fire up a periodic task to check for unclaimed messages
     99     *  @since 0.9.14
     100     */
     101    @Override
     102    protected void startVerifyUsage() {
     103        super.startVerifyUsage();
     104        new RemoveExpired();
     105    }
     106
     107    /**
     108     *  Check for expired message states, without wastefully setting a timer for each
     109     *  message.
     110     *  @since 0.9.14
     111     */
     112    private class RemoveExpired extends SimpleTimer2.TimedEvent {
     113       
     114        public RemoveExpired() {
     115             super(_context.simpleTimer2(), REMOVE_EXPIRED_TIME);
     116        }
     117
     118        public void timeReached() {
     119            if (isClosed())
     120                return;
     121            if (!_sendingStates.isEmpty()) {
     122                long now = _context.clock().now();
     123                for (Iterator<MessageState> iter = _sendingStates.values().iterator(); iter.hasNext(); ) {
     124                    MessageState state = iter.next();
     125                    if (state.getExpires() < now)
     126                        iter.remove();
     127                }
     128            }
     129            schedule(REMOVE_EXPIRED_TIME);
     130        }
     131    }
     132
    87133
    88134    protected long getTimeout() {
     
    110156     */
    111157    private static final int DONT_COMPRESS_SIZE = 66;
     158
    112159    protected boolean shouldCompress(int size) {
    113160         if (size <= DONT_COMPRESS_SIZE)
     
    119166    }
    120167   
     168    /** @throws UnsupportedOperationException always, use MuxedImpl */
    121169    public void addSessionListener(I2PSessionListener lsnr, int proto, int port) {
    122         throw new IllegalArgumentException("Use MuxedImpl");
    123     }
     170        throw new UnsupportedOperationException("Use MuxedImpl");
     171    }
     172    /** @throws UnsupportedOperationException always, use MuxedImpl */
    124173    public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) {
    125         throw new IllegalArgumentException("Use MuxedImpl");
    126     }
     174        throw new UnsupportedOperationException("Use MuxedImpl");
     175    }
     176    /** @throws UnsupportedOperationException always, use MuxedImpl */
    127177    public void removeListener(int proto, int port) {
    128         throw new IllegalArgumentException("Use MuxedImpl");
    129     }
     178        throw new UnsupportedOperationException("Use MuxedImpl");
     179    }
     180    /** @throws UnsupportedOperationException always, use MuxedImpl */
    130181    public boolean sendMessage(Destination dest, byte[] payload, int proto, int fromport, int toport) throws I2PSessionException {
    131         throw new IllegalArgumentException("Use MuxedImpl");
    132     }
     182        throw new UnsupportedOperationException("Use MuxedImpl");
     183    }
     184    /** @throws UnsupportedOperationException always, use MuxedImpl */
    133185    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent,
    134186                               int proto, int fromport, int toport) throws I2PSessionException {
    135         throw new IllegalArgumentException("Use MuxedImpl");
    136     }
     187        throw new UnsupportedOperationException("Use MuxedImpl");
     188    }
     189    /** @throws UnsupportedOperationException always, use MuxedImpl */
    137190    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
    138191                               int proto, int fromport, int toport) throws I2PSessionException {
    139         throw new IllegalArgumentException("Use MuxedImpl");
    140     }
     192        throw new UnsupportedOperationException("Use MuxedImpl");
     193    }
     194    /** @throws UnsupportedOperationException always, use MuxedImpl */
    141195    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
    142196                               int proto, int fromport, int toport, int flags) throws I2PSessionException {
    143         throw new IllegalArgumentException("Use MuxedImpl");
    144     }
     197        throw new UnsupportedOperationException("Use MuxedImpl");
     198    }
     199    /** @throws UnsupportedOperationException always, use MuxedImpl */
    145200    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
    146201                               int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException {
    147         throw new IllegalArgumentException("Use MuxedImpl");
     202        throw new UnsupportedOperationException("Use MuxedImpl");
     203    }
     204    /** @throws UnsupportedOperationException always, use MuxedImpl */
     205    public long sendMessage(Destination dest, byte[] payload, int offset, int size,
     206                               int proto, int fromport, int toport,
     207                               SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException {
     208        throw new UnsupportedOperationException("Use MuxedImpl");
    148209    }
    149210
     
    211272            _log.info("sending message to: " + d + " compress? " + sc + " sizeIn=" + size + " sizeOut=" + compressed);
    212273        }
    213         _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
    214         _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
     274        _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed);
     275        _context.statManager().addRateData("i2cp.tx.msgExpanded", size);
    215276        if (_noEffort)
    216277            return sendNoEffort(dest, payload, expires, 0);
     
    258319    protected boolean sendBestEffort(Destination dest, byte payload[], long expires, int flags)
    259320                    throws I2PSessionException {
    260         //SessionKey key = null;
    261         //SessionKey newKey = null;
    262         //SessionTag tag = null;
    263         //Set sentTags = null;
    264         //int oldTags = 0;
    265         long begin = _context.clock().now();
    266         /***********
    267         if (I2CPMessageProducer.END_TO_END_CRYPTO) {
    268             if (_log.shouldLog(Log.DEBUG)) _log.debug("begin sendBestEffort");
    269             key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey());
    270             if (_log.shouldLog(Log.DEBUG)) _log.debug("key fetched");
    271             if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
    272             tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
    273             if (_log.shouldLog(Log.DEBUG)) _log.debug("tag consumed");
    274             sentTags = null;
    275             oldTags = _context.sessionKeyManager().getAvailableTags(dest.getPublicKey(), key);
    276             long availTimeLeft = _context.sessionKeyManager().getAvailableTimeLeft(dest.getPublicKey(), key);
    277321       
    278             if ( (tagsSent == null) || (tagsSent.isEmpty()) ) {
    279                 if (oldTags < NUM_TAGS) {
    280                     sentTags = createNewTags(NUM_TAGS);
    281                     if (_log.shouldLog(Log.DEBUG))
    282                         _log.debug("** sendBestEffort only had " + oldTags + " with " + availTimeLeft + ", adding " + NUM_TAGS + ": " + sentTags);
    283                 } else if (availTimeLeft < 2 * 60 * 1000) {
    284                     // if we have > 50 tags, but they expire in under 2 minutes, we want more
    285                     sentTags = createNewTags(NUM_TAGS);
    286                     if (_log.shouldLog(Log.DEBUG))
    287                         _log.debug(getPrefix() + "Tags expiring in " + availTimeLeft + ", adding " + NUM_TAGS + " new ones: " + sentTags);
    288                     //_log.error("** sendBestEffort available time left " + availTimeLeft);
    289                 } else {
    290                     if (_log.shouldLog(Log.DEBUG))
    291                         _log.debug("sendBestEffort old tags: " + oldTags + " available time left: " + availTimeLeft);
    292                 }
    293             } else {
    294                 if (_log.shouldLog(Log.DEBUG))
    295                     _log.debug("sendBestEffort is sending " + tagsSent.size() + " with " + availTimeLeft
    296                                + "ms left, " + oldTags + " tags known and "
    297                                + (tag == null ? "no tag" : " a valid tag"));
    298             }
    299 
    300             if (false) // rekey
    301                 newKey = _context.keyGenerator().generateSessionKey();
    302        
    303             if ( (tagsSent != null) && (!tagsSent.isEmpty()) ) {
    304                 if (sentTags == null)
    305                     sentTags = new HashSet();
    306                 sentTags.addAll(tagsSent);
    307             }
    308         } else {
    309             // not using end to end crypto, so don't ever bundle any tags
    310         }
    311         **********/
    312        
    313         //if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
    314        
    315         long nonce = _context.random().nextInt(Integer.MAX_VALUE - 1) + 1;
    316         //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
     322        long nonce = _sendMessageNonce.incrementAndGet();
    317323        MessageState state = new MessageState(_context, nonce, getPrefix());
    318         //state.setKey(key);
    319         //state.setTags(sentTags);
    320         //state.setNewKey(newKey);
    321         state.setTo(dest);
    322         //if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Setting key = " + key);
    323 
    324         //if (keyUsed != null) {
    325             //if (I2CPMessageProducer.END_TO_END_CRYPTO) {
    326             //    if (newKey != null)
    327             //        keyUsed.setData(newKey.getData());
    328             //    else
    329             //        keyUsed.setData(key.getData());
    330             //} else {
    331             //    keyUsed.setData(SessionKey.INVALID_KEY.getData());
    332             //}
    333         //}
    334         //if (tagsSent != null) {
    335         //    if (sentTags != null) {
    336         //        tagsSent.addAll(sentTags);
    337         //    }
    338         //}
    339 
    340         //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
    341         long beforeSendingSync = _context.clock().now();
    342         long inSendingSync = 0;
    343         synchronized (_sendingStates) {
    344             inSendingSync = _context.clock().now();
    345             _sendingStates.add(state);
    346         }
    347         long afterSendingSync = _context.clock().now();
    348         if (_log.shouldLog(Log.DEBUG))
    349             _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / "
    350                        + state.getNonce() + " for best effort "
    351                        + " sync took " + (inSendingSync-beforeSendingSync)
    352                        + " add took " + (afterSendingSync-inSendingSync));
    353         //_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey, expires);
    354         _producer.sendMessage(this, dest, nonce, payload, expires, flags);
    355        
     324
    356325        // since this is 'best effort', all we're waiting for is a status update
    357326        // saying that the router received it - in theory, that should come back
     
    359328        // much quicker).  setting this to false will short-circuit that delay
    360329        boolean actuallyWait = false; // true;
     330        if (actuallyWait)
     331            _sendingStates.put(Long.valueOf(nonce), state);
     332        _producer.sendMessage(this, dest, nonce, payload, expires, flags);
    361333       
    362         long beforeWaitFor = _context.clock().now();
    363         if (actuallyWait)
    364             state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
    365                           _context.clock().now() + getTimeout());
    366         //long afterWaitFor = _context.clock().now();
    367         //long inRemovingSync = 0;
    368         synchronized (_sendingStates) {
    369             //inRemovingSync = _context.clock().now();
    370             _sendingStates.remove(state);
    371         }
    372         long afterRemovingSync = _context.clock().now();
    373         boolean found = !actuallyWait || state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
    374         if (_log.shouldLog(Log.DEBUG))
    375             _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
    376                        + " / " + state.getNonce() + " found = " + found);
    377        
    378         long timeToSend = afterRemovingSync - beforeSendingSync;
    379         if ( (timeToSend > 10*1000) && (_log.shouldLog(Log.WARN)) ) {
    380             _log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
    381         }
    382        
    383         if ( (afterRemovingSync - begin > 500) && (_log.shouldLog(Log.WARN) ) ) {
    384             _log.warn("Took " + (afterRemovingSync-begin) + "ms to sendBestEffort, "
    385                       + (afterSendingSync-begin) + "ms to prepare, "
    386                       + (beforeWaitFor-afterSendingSync) + "ms to send, "
    387                       + (afterRemovingSync-beforeWaitFor) + "ms waiting for reply");
    388         }
    389        
    390         _context.statManager().addRateData("i2cp.sendBestEffortTotalTime", afterRemovingSync - begin, 0);
    391         //_context.statManager().addRateData("i2cp.sendBestEffortStage0", beforeSendingSync- begin, 0);
    392         //_context.statManager().addRateData("i2cp.sendBestEffortStage1", afterSendingSync- beforeSendingSync, 0);
    393         //_context.statManager().addRateData("i2cp.sendBestEffortStage2", beforeWaitFor- afterSendingSync, 0);
    394         //_context.statManager().addRateData("i2cp.sendBestEffortStage3", afterWaitFor- beforeWaitFor, 0);
    395         //_context.statManager().addRateData("i2cp.sendBestEffortStage4", afterRemovingSync- afterWaitFor, 0);
     334        if (actuallyWait) {
     335            try {
     336                state.waitForAccept(_context.clock().now() + getTimeout());
     337            } catch (InterruptedException ie) {
     338                throw new I2PSessionException("interrupted");
     339            } finally {
     340                _sendingStates.remove(Long.valueOf(nonce));
     341            }
     342        }
     343        boolean found = !actuallyWait || state.wasAccepted();
    396344       
    397345        if (found) {
     
    403351                _log.info(getPrefix() + "Message send failed after " + state.getElapsed() + "ms with "
    404352                          + payload.length + " bytes");
    405             if (_log.shouldLog(Log.ERROR))
    406                 _log.error(getPrefix() + "Never received *accepted* from the router!  dropping and reconnecting");
    407             disconnect();
     353            //if (_log.shouldLog(Log.ERROR))
     354            //    _log.error(getPrefix() + "Never received *accepted* from the router!  dropping and reconnecting");
     355            //disconnect();
    408356            return false;
    409357        }
     
    433381     *  MessageState is removed from _sendingStates immediately and
    434382     *  so the lookup here fails.
    435      *  And iterating through the HashSet instead of having a map
    436      *  is bad too.
    437383     *
    438384     *  This is now pretty much avoided since streaming now sets
     
    444390    @Override
    445391    public void receiveStatus(int msgId, long nonce, int status) {
    446         if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
     392        if (_log.shouldLog(Log.DEBUG))
     393            _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
     394
    447395        MessageState state = null;
    448         long beforeSync = _context.clock().now();
    449         long inSync = 0;
    450         synchronized (_sendingStates) {
    451             inSync = _context.clock().now();
    452             for (Iterator<MessageState> iter = _sendingStates.iterator(); iter.hasNext();) {
    453                 state = iter.next();
    454                 if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce());
    455                 if (state.getNonce() == nonce) {
    456                     if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state");
     396        if ((state = _sendingStates.get(Long.valueOf(nonce))) != null) {
     397            if (_log.shouldLog(Log.DEBUG))
     398                _log.debug(getPrefix() + "Found a matching state");
     399        } else if (!_sendingStates.isEmpty()) {
     400            // O(n**2)
     401            // shouldn't happen, router sends good nonce for all statuses as of 0.9.14
     402            for (MessageState s : _sendingStates.values()) {
     403                if (s.getMessageId() != null && s.getMessageId().getMessageId() == msgId) {
     404                    if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state by msgId");
     405                    state = s;
    457406                    break;
    458                 } else if ((state.getMessageId() != null) && (state.getMessageId().getMessageId() == msgId)) {
    459                     if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Found a matching state by msgId");
    460                     break;
    461                 } else {
    462                     if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State does not match");
    463                     state = null;
    464407                }
    465408            }
    466409        }
    467         long afterSync = _context.clock().now();
    468 
    469         if (_log.shouldLog(Log.DEBUG))
    470             _log.debug("receiveStatus(" + msgId + ", " + nonce + ", " + status+ "): sync: "
    471                        + (inSync-beforeSync) + "ms, check: " + (afterSync-inSync));
    472410       
    473411        if (state != null) {
     
    478416            }
    479417            state.receive(status);
     418            if (state.wasSuccessful())
     419                _sendingStates.remove(Long.valueOf(nonce));
    480420           
    481421            long lifetime = state.getElapsed();
    482422            switch (status) {
    483423                case 1:
    484                     _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0);
     424                    _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime);
    485425                    break;
    486426                // best effort codes unused
     
    492432                //    break;
    493433                case 4:
    494                     _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0);
     434                    _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime);
    495435                    break;
    496436                case 5:
    497                     _context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime, 0);
     437                    _context.statManager().addRateData("i2cp.receiveStatusTime.5", lifetime);
    498438                    break;
    499439            }
     
    504444                          + " w/ status = " + status);
    505445        }
    506         _context.statManager().addRateData("i2cp.receiveStatusTime", _context.clock().now() - beforeSync, 0);
    507446    }
    508447
     
    523462        if (_sendingStates == null)    // only null if overridden by I2PSimpleSession
    524463            return;
    525         synchronized (_sendingStates) {
    526             for (MessageState state : _sendingStates)
    527                 state.cancel();
    528             if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states");
    529             _sendingStates.clear();
    530         }
     464        for (MessageState state : _sendingStates.values()) {
     465            state.cancel();
     466        }
     467        if (_log.shouldLog(Log.INFO))
     468            _log.info(getPrefix() + "Disconnecting " + _sendingStates.size() + " states");
     469        _sendingStates.clear();
    531470    }
    532471}
  • core/java/src/net/i2p/client/I2PSessionMuxedImpl.java

    ra93666c r8371b8f  
    194194                               int proto, int fromPort, int toPort, int flags)
    195195                   throws I2PSessionException {
    196         if (isClosed()) throw new I2PSessionException("Already closed");
    197         updateActivity();
    198 
    199         boolean sc = shouldCompress(size);
    200         if (sc)
    201             payload = DataHelper.compress(payload, offset, size);
    202         else
    203             payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
    204 
    205         setProto(payload, proto);
    206         setFromPort(payload, fromPort);
    207         setToPort(payload, toPort);
    208 
    209         _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
    210         _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
     196        payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
    211197        if (_noEffort)
    212198            return sendNoEffort(dest, payload, expires, flags);
     
    233219    public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
    234220                               int proto, int fromPort, int toPort, SendMessageOptions options) throws I2PSessionException {
    235         if (isClosed()) throw new I2PSessionException("Already closed");
    236         updateActivity();
    237 
    238         boolean sc = shouldCompress(size);
    239         if (sc)
    240             payload = DataHelper.compress(payload, offset, size);
    241         else
    242             payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
    243 
    244         setProto(payload, proto);
    245         setFromPort(payload, fromPort);
    246         setToPort(payload, toPort);
    247 
    248         _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
    249         _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
     221        payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
    250222        //if (_noEffort) {
    251223            sendNoEffort(dest, payload, options);
     
    255227            //return sendBestEffort(dest, payload, options);
    256228        //}
     229    }
     230
     231    /**
     232     * Send a message and request an asynchronous notification of delivery status.
     233     *
     234     * See I2PSessionMuxedImpl for proto/port details.
     235     * See SendMessageOptions for option details.
     236     *
     237     * @return the message ID to be used for later notification to the listener
     238     * @throws I2PSessionException on all errors
     239     * @since 0.9.14
     240     */
     241    @Override
     242    public long sendMessage(Destination dest, byte[] payload, int offset, int size,
     243                            int proto, int fromPort, int toPort,
     244                            SendMessageOptions options, SendMessageStatusListener listener) throws I2PSessionException {
     245        payload = prepPayload(payload, offset, size, proto, fromPort, toPort);
     246        long nonce = _sendMessageNonce.incrementAndGet();
     247        long expires = Math.max(_context.clock().now() + 60*1000L, options.getTime());
     248        MessageState state = new MessageState(_context, nonce, this, expires, listener);
     249        _sendingStates.put(Long.valueOf(nonce), state);
     250        _producer.sendMessage(this, dest, nonce, payload, options);
     251        return nonce;
     252    }
     253
     254    /**
     255     * @return gzip compressed payload, ready to send
     256     * @since 0.9.14
     257     */
     258    private byte[] prepPayload(byte[] payload, int offset, int size, int proto, int fromPort, int toPort) throws I2PSessionException {
     259        if (isClosed()) throw new I2PSessionException("Already closed");
     260        updateActivity();
     261
     262        if (shouldCompress(size))
     263            payload = DataHelper.compress(payload, offset, size);
     264        else
     265            payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
     266
     267        setProto(payload, proto);
     268        setFromPort(payload, fromPort);
     269        setToPort(payload, toPort);
     270
     271        _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length);
     272        _context.statManager().addRateData("i2cp.tx.msgExpanded", size);
     273        return payload;
    257274    }
    258275
  • core/java/src/net/i2p/client/MessageState.java

    ra93666c r8371b8f  
    11package net.i2p.client;
    22
    3 import java.util.HashSet;
    4 import java.util.Set;
    53import java.util.concurrent.atomic.AtomicLong;
    64
    75import net.i2p.I2PAppContext;
    8 import net.i2p.data.Destination;
    9 import net.i2p.data.SessionKey;
    106import net.i2p.data.i2cp.MessageId;
    117import net.i2p.data.i2cp.MessageStatusMessage;
     
    1511 * Contains the state of a payload message being sent to a peer.
    1612 *
    17  * This is mostly unused. See sendNoEffort vs. sendBestEffort in I2PSessionImpl2.
    18  * TODO delete altogether? This is really bad.
     13 * Originally was a general-purpose waiter.
     14 * Then we got rid of guaranteed delivery.
     15 * Then we stopped waiting for accept in best-effort delivery.
     16 * Brought back to life for asynchronous status delivery to the client.
    1917 */
    2018class MessageState {
     
    2422    private final String _prefix;
    2523    private MessageId _id;
    26     private final Set<Integer> _receivedStatus;
    27     private SessionKey _key;
    28     private SessionKey _newKey;
    29     private Set _tags;
    30     private Destination _to;
    31     private boolean _cancelled;
    3224    private final long _created;
     25    private final long _expires;
     26    private final SendMessageStatusListener _listener;
     27    private final I2PSession _session;
    3328
    34     private static final AtomicLong __stateId = new AtomicLong();
    35     private final long _stateId;
    36    
     29    private enum State { INIT, ACCEPTED, PROBABLE_FAIL, FAIL, SUCCESS };
     30    private State _state = State.INIT;
     31
     32    /**
     33     *  For synchronous waiting for accept with waitForAccept().
     34     *  UNUSED.
     35     */
    3736    public MessageState(I2PAppContext ctx, long nonce, String prefix) {
    38         _stateId = __stateId.incrementAndGet();
    3937        _context = ctx;
    4038        _log = ctx.logManager().getLog(MessageState.class);
    4139        _nonce = nonce;
    42         _prefix = prefix + "[" + _stateId + "]: ";
    43         _receivedStatus = new HashSet<Integer>();
     40        _prefix = prefix + '[' + _nonce + "]: ";
    4441        _created = ctx.clock().now();
    45         //ctx.statManager().createRateStat("i2cp.checkStatusTime", "how long it takes to go through the states", "i2cp", new long[] { 60*1000 });
     42        _expires = _created + 60*1000L;
     43        _listener = null;
     44        _session = null;
     45    }
     46
     47    /**
     48     *  For asynchronous notification
     49     *  @param expires absolute time (not interval)
     50     *  @since 0.9.14
     51     */
     52    public MessageState(I2PAppContext ctx, long nonce, I2PSession session,
     53                        long expires, SendMessageStatusListener listener) {
     54        _context = ctx;
     55        _log = ctx.logManager().getLog(MessageState.class);
     56        _nonce = nonce;
     57        _prefix = session.toString() + " [" + _nonce + "]: ";
     58        _created = ctx.clock().now();
     59        _expires = expires;
     60        _listener = listener;
     61        _session = session;
    4662    }
    4763
    4864    public void receive(int status) {
    49         synchronized (_receivedStatus) {
    50             _receivedStatus.add(Integer.valueOf(status));
    51             _receivedStatus.notifyAll();
     65        State oldState;
     66        State newState;
     67        synchronized (this) {
     68            oldState = _state;
     69            locked_update(status);
     70            newState = _state;
     71            this.notifyAll();
     72        }
     73        if (_listener != null) {
     74            // only notify on changing state, and only if we haven't expired
     75            if (oldState != newState && _expires > _context.clock().now())
     76                _listener.messageStatus(_session, _nonce, status);
    5277        }
    5378    }
     
    6186    }
    6287
    63     public long getNonce() {
    64         return _nonce;
    65     }
    66 
    67     /** @deprecated unused */
    68     public void setKey(SessionKey key) {
    69         if (_log.shouldLog(Log.DEBUG))
    70             _log.debug(_prefix + "Setting key [" + _key + "] to [" + key + "]");
    71         _key = key;
    72     }
    73 
    74     /** @deprecated unused */
    75     public SessionKey getKey() {
    76         return _key;
    77     }
    78 
    79     /** @deprecated unused */
    80     public void setNewKey(SessionKey key) {
    81         _newKey = key;
    82     }
    83 
    84     /** @deprecated unused */
    85     public SessionKey getNewKey() {
    86         return _newKey;
    87     }
    88 
    89     /** @deprecated unused */
    90     public void setTags(Set tags) {
    91         _tags = tags;
    92     }
    93 
    94     /** @deprecated unused */
    95     public Set getTags() {
    96         return _tags;
    97     }
    98 
    99     public void setTo(Destination dest) {
    100         _to = dest;
    101     }
    102 
    103     /** @deprecated unused */
    104     public Destination getTo() {
    105         return _to;
    106     }
    107 
    10888    public long getElapsed() {
    10989        return _context.clock().now() - _created;
    11090    }
    11191
    112     public void waitFor(int status, long expiration) {
    113         //long checkTime = -1;
    114         boolean found = false;
    115         while (!found) {
    116             if (_cancelled) return;
     92    /**
     93     *  @since 0.9.14
     94     */
     95    public long getExpires() {
     96        return _expires;
     97    }
     98
     99    /**
     100     *  For guaranteed/best effort only. Not really used.
     101     */
     102    public void waitForAccept(long expiration) throws InterruptedException {
     103        while (true) {
    117104            long timeToWait = expiration - _context.clock().now();
    118105            if (timeToWait <= 0) {
    119106                if (_log.shouldLog(Log.WARN))
    120                     _log.warn(_prefix + "Expired waiting for the status [" + status + "]");
     107                    _log.warn(_prefix + "Expired waiting for the status");
    121108                return;
    122109            }
    123             found = false;
    124             synchronized (_receivedStatus) {
    125                 //long beforeCheck = _context.clock().now();
    126                 if (locked_isSuccess(status) || locked_isFailure(status)) {
     110            synchronized (this) {
     111                if (_state != State.INIT) {
    127112                    if (_log.shouldLog(Log.DEBUG))
    128113                        _log.debug(_prefix + "Received a confirm (one way or the other)");
    129                     found = true;
     114                    return;
    130115                }
    131                 //checkTime = _context.clock().now() - beforeCheck;
    132                 if (!found) {
    133                     if (timeToWait > 5000) {
    134                         timeToWait = 5000;
    135                     }
    136                     try {
    137                         _receivedStatus.wait(timeToWait);
    138                     } catch (InterruptedException ie) { // nop
    139                     }
    140                 }
     116                if (timeToWait > 5000)
     117                    timeToWait = 5000;
     118                this.wait(timeToWait);
    141119            }
    142             //if (found)
    143             //    _context.statManager().addRateData("i2cp.checkStatusTime", checkTime, 0);
    144120        }
    145121    }
    146122
    147     private boolean locked_isSuccess(int wantedStatus) {
    148         boolean rv = false;
     123    /**
     124     *  Update our flags
     125     *  @since 0.9.14
     126     */
     127    private void locked_update(int status) {
     128        switch (status) {
     129            case MessageStatusMessage.STATUS_SEND_ACCEPTED:
     130                // only trumps init
     131                if (_state == State.INIT)
     132                    _state = State.ACCEPTED;
     133                break;
    149134
    150         if (_log.shouldLog(Log.DEBUG))
    151             _log.debug(_prefix + "isSuccess(" + wantedStatus + "): " + _receivedStatus);
    152         for (Integer val : _receivedStatus) {
    153             int recv = val.intValue();
    154             switch (recv) {
    155                 case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
    156                     if (_log.shouldLog(Log.WARN))
    157                          _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
    158                                    + toString());
    159                     rv = false;
    160                     break;
    161                 case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
    162                     if (_log.shouldLog(Log.WARN))
    163                          _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
    164                                    + toString());
    165                     rv = false;
    166                     break;
    167                 case MessageStatusMessage.STATUS_SEND_ACCEPTED:
    168                     if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
    169                         return true; // if we're only looking for accepted, take it directly (don't let any GUARANTEED_* override it)
    170                     }
    171                     // ignore accepted, as we want something better
    172                     if (_log.shouldLog(Log.DEBUG))
    173                         _log.debug(_prefix + "Got accepted, but we're waiting for more from " + toString());
    174                     continue;
    175                 case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
    176                     if (_log.shouldLog(Log.DEBUG))
    177                         _log.debug(_prefix + "Received best effort success after " + getElapsed()
    178                                    + " from " + toString());
    179                     if (wantedStatus == recv) {
    180                         rv = true;
    181                     } else {
    182                         if (_log.shouldLog(Log.DEBUG))
    183                             _log.debug(_prefix + "Not guaranteed success, but best effort after "
    184                                        + getElapsed() + " will do... from " + toString());
    185                         rv = true;
    186                     }
    187                     break;
    188                 case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
    189                     if (_log.shouldLog(Log.DEBUG))
    190                         _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
    191                                    + toString());
    192                     // even if we're waiting for best effort success, guaranteed is good enough
    193                     rv = true;
    194                     break;
    195                 case -1:
    196                     continue;
    197                 default:
    198                     if (_log.shouldLog(Log.DEBUG))
    199                         _log.debug(_prefix + "Received something else [" + recv + "]...");
    200             }
     135            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
     136            case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
     137                // does not trump failure or success
     138                if (_state != State.FAIL && _state != State.SUCCESS)
     139                    _state = State.PROBABLE_FAIL;
     140                break;
     141
     142            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL:
     143            case MessageStatusMessage.STATUS_SEND_FAILURE_ROUTER:
     144            case MessageStatusMessage.STATUS_SEND_FAILURE_NETWORK:
     145            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_SESSION:
     146            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_MESSAGE:
     147            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_OPTIONS:
     148            case MessageStatusMessage.STATUS_SEND_FAILURE_OVERFLOW:
     149            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED:
     150            case MessageStatusMessage.STATUS_SEND_FAILURE_LOCAL_LEASESET:
     151            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_TUNNELS:
     152            case MessageStatusMessage.STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION:
     153            case MessageStatusMessage.STATUS_SEND_FAILURE_DESTINATION:
     154            case MessageStatusMessage.STATUS_SEND_FAILURE_BAD_LEASESET:
     155            case MessageStatusMessage.STATUS_SEND_FAILURE_EXPIRED_LEASESET:
     156            case MessageStatusMessage.STATUS_SEND_FAILURE_NO_LEASESET:
     157            case SendMessageStatusListener.STATUS_CANCELLED:
     158                // does not trump success
     159                if (_state != State.SUCCESS)
     160                    _state = State.FAIL;
     161                break;
     162
     163            case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
     164            case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
     165            case MessageStatusMessage.STATUS_SEND_SUCCESS_LOCAL:
     166                // trumps all
     167                _state = State.SUCCESS;
     168
     169            default:
     170                break;
    201171        }
    202         return rv;
    203172    }
    204173
    205     private boolean locked_isFailure(int wantedStatus) {
    206         boolean rv = false;
    207 
    208         if (_log.shouldLog(Log.DEBUG))
    209             _log.debug(_prefix + "isFailure(" + wantedStatus + "): " + _receivedStatus);
    210        
    211         for (Integer val : _receivedStatus) {
    212             int recv = val.intValue();
    213             switch (recv) {
    214                 case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE:
    215                     if (_log.shouldLog(Log.DEBUG))
    216                         _log.warn(_prefix + "Received best effort failure after " + getElapsed() + " from "
    217                                   + toString());
    218                     rv = true;
    219                     break;
    220                 case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE:
    221                     if (_log.shouldLog(Log.DEBUG))
    222                         _log.warn(_prefix + "Received guaranteed failure after " + getElapsed() + " from "
    223                                   + toString());
    224                     rv = true;
    225                     break;
    226                 case MessageStatusMessage.STATUS_SEND_ACCEPTED:
    227                     if (wantedStatus == MessageStatusMessage.STATUS_SEND_ACCEPTED) {
    228                         rv = false;
    229                     } else {
    230                         if (_log.shouldLog(Log.DEBUG))
    231                             _log.debug(_prefix + "Got accepted, but we're waiting for more from "
    232                                        + toString());
    233                         continue;
    234                         // ignore accepted, as we want something better
    235                     }
    236                     break;
    237                 case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS:
    238                     if (_log.shouldLog(Log.DEBUG))
    239                         _log.debug(_prefix + "Received best effort success after " + getElapsed()
    240                                    + " from " + toString());
    241                     if (wantedStatus == recv) {
    242                         rv = false;
    243                     } else {
    244                         if (_log.shouldLog(Log.DEBUG))
    245                             _log.debug(_prefix + "Not guaranteed success, but best effort after "
    246                                        + getElapsed() + " will do... from " + toString());
    247                         rv = false;
    248                     }
    249                     break;
    250                 case MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS:
    251                     if (_log.shouldLog(Log.DEBUG))
    252                         _log.debug(_prefix + "Received guaranteed success after " + getElapsed() + " from "
    253                                    + toString());
    254                     // even if we're waiting for best effort success, guaranteed is good enough
    255                     rv = false;
    256                     break;
    257                 case -1:
    258                     continue;
    259                 default:
    260                     if (_log.shouldLog(Log.DEBUG))
    261                         _log.debug(_prefix + "Received something else [" + recv + "]...");
    262             }
     174    /**
     175     *  @return true if accepted (fixme and not failed)
     176     *  @since 0.9.14
     177     */
     178    public boolean wasAccepted() {
     179        synchronized (this) {
     180            return _state != State.INIT && _state != State.FAIL;
    263181        }
    264         return rv;
    265182    }
    266183
    267     /** #return true if the given status (or an equivalent) was received */
    268     public boolean received(int status) {
    269         synchronized (_receivedStatus) {
    270             return locked_isSuccess(status);
     184    /**
     185     *  @return true if successful
     186     *  @since 0.9.14
     187     */
     188    public boolean wasSuccessful() {
     189        synchronized (this) {
     190            return _state == State.SUCCESS;
    271191        }
    272192    }
    273193
    274194    public void cancel() {
    275         _cancelled = true;
    276         synchronized (_receivedStatus) {
    277             _receivedStatus.notifyAll();
    278         }
     195        // Inject a fake status
     196        receive(SendMessageStatusListener.STATUS_CANCELLED);
    279197    }
    280198}
Note: See TracChangeset for help on using the changeset viewer.