Changeset f44eeaf


Ignore:
Timestamp:
Sep 1, 2012 9:39:14 PM (7 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
4360284
Parents:
a0418be
Message:

TunnelGateway?: Refactor TunnelGateway?.Pending to its own file PendingGatewayMesasge?

Location:
router/java/src/net/i2p/router/tunnel
Files:
1 added
5 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/BatchedPreprocessor.java

    ra0418be rf44eeaf  
    105105    /* See TunnelGateway.QueuePreprocessor for Javadoc */
    106106    @Override
    107     public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
     107    public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    108108        if (_log.shouldLog(Log.INFO))
    109109            display(0, pending, "Starting");
     
    132132            for (int i = 0; i < pending.size(); i++) {
    133133                long pendingStart = System.currentTimeMillis();
    134                 TunnelGateway.Pending msg = pending.get(i);
     134                PendingGatewayMessage msg = pending.get(i);
    135135                int instructionsSize = getInstructionsSize(msg);
    136136                instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
     
    170170                    // Remove what we sent from the pending queue
    171171                    for (int j = 0; j < i; j++) {
    172                         TunnelGateway.Pending cur = pending.remove(0);
     172                        PendingGatewayMessage cur = pending.remove(0);
    173173                        if (cur.getOffset() < cur.getData().length)
    174174                            throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
     
    182182                    if (msg.getOffset() >= msg.getData().length) {
    183183                        // ok, this last message fit perfectly, remove it too
    184                         TunnelGateway.Pending cur = pending.remove(0);
     184                        PendingGatewayMessage cur = pending.remove(0);
    185185                        if (timingBuf != null)
    186186                            timingBuf.append(" sent perfect fit " + cur).append(".");
     
    231231                    int beforeSize = pending.size();
    232232                    for (int i = 0; i < beforeSize; i++) {
    233                         TunnelGateway.Pending cur = pending.get(0);
     233                        PendingGatewayMessage cur = pending.get(0);
    234234                        if (cur.getOffset() < cur.getData().length)
    235235                            break;
     
    317317     * title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
    318318     */
    319     private void display(long allocated, List<TunnelGateway.Pending> pending, String title) {
     319    private void display(long allocated, List<PendingGatewayMessage> pending, String title) {
    320320        if (_log.shouldLog(Log.INFO)) {
    321321            long highestDelay = 0;
     
    328328                buf.append(" delay: ").append(getDelayAmount(false));
    329329            for (int i = 0; i < pending.size(); i++) {
    330                 TunnelGateway.Pending curPending = pending.get(i);
     330                PendingGatewayMessage curPending = pending.get(i);
    331331                buf.append(" [").append(i).append("]:");
    332332                buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
     
    348348     * @param sendThrough last index in pending to send (inclusive)
    349349     */
    350     protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
     350    protected void send(List<PendingGatewayMessage> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    351351        if (_log.shouldLog(Log.DEBUG))
    352352            _log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending);
     
    385385        long msgId = sender.sendPreprocessed(preprocessed, rec);
    386386        for (int i = 0; i < pending.size(); i++) {
    387             TunnelGateway.Pending cur = pending.get(i);
     387            PendingGatewayMessage cur = pending.get(i);
    388388            cur.addMessageId(msgId);
    389389        }
     
    398398     * @return new offset into the target for further bytes to be written
    399399     */
    400     private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) {
     400    private int writeFragments(List<PendingGatewayMessage> pending, int startAt, int sendThrough, byte target[], int offset) {
    401401        for (int i = startAt; i <= sendThrough; i++) {
    402             TunnelGateway.Pending msg = pending.get(i);
     402            PendingGatewayMessage msg = pending.get(i);
    403403            int prevOffset = offset;
    404404            if (msg.getOffset() == 0) {
  • router/java/src/net/i2p/router/tunnel/PumpedTunnelGateway.java

    ra0418be rf44eeaf  
    3636 */
    3737class PumpedTunnelGateway extends TunnelGateway {
    38     private final BlockingQueue<Pending> _prequeue;
     38    private final BlockingQueue<PendingGatewayMessage> _prequeue;
    3939    private final TunnelGatewayPumper _pumper;
    4040   
     
    7272    public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
    7373        _messagesSent++;
    74         Pending cur = new PendingImpl(msg, toRouter, toTunnel);
     74        PendingGatewayMessage cur = new PendingGatewayMessage(msg, toRouter, toTunnel);
    7575        if (_prequeue.offer(cur))
    7676            _pumper.wantsPumping(this);
     
    8989     *                 Must be empty when called; will always be emptied before return.
    9090     */
    91     void pump(List<Pending> queueBuf) {
     91    void pump(List<PendingGatewayMessage> queueBuf) {
    9292        _prequeue.drainTo(queueBuf, MAX_MSGS_PER_PUMP);
    9393        if (queueBuf.isEmpty())
     
    115115            // expire any as necessary, even if its framented
    116116            for (int i = 0; i < _queue.size(); i++) {
    117                 Pending m = _queue.get(i);
     117                PendingGatewayMessage m = _queue.get(i);
    118118                if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
    119119                    if (_log.shouldLog(Log.DEBUG))
  • router/java/src/net/i2p/router/tunnel/TrivialPreprocessor.java

    ra0418be rf44eeaf  
    5151     * NOTE: Unused here, see BatchedPreprocessor override, super is not called.
    5252     */
    53     public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
     53    public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
    5454        throw new IllegalArgumentException("unused, right?");
    5555    }
     
    156156    private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
    157157
    158     protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) {
     158    protected int writeFirstFragment(PendingGatewayMessage msg, byte target[], int offset) {
    159159        boolean fragmented = false;
    160160        int instructionsLength = getInstructionsSize(msg);
     
    222222    }
    223223   
    224     protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) {
     224    protected int writeSubsequentFragment(PendingGatewayMessage msg, byte target[], int offset) {
    225225        boolean isLast = true;
    226226       
     
    270270     *  call getInstructionAugmentationSize() for that.
    271271     */
    272     protected int getInstructionsSize(TunnelGateway.Pending msg) {
     272    protected int getInstructionsSize(PendingGatewayMessage msg) {
    273273        if (msg.getFragmentNumber() > 0)
    274274            return 7;
     
    288288   
    289289    /** @return 0 or 4 */
    290     protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
     290    protected int getInstructionAugmentationSize(PendingGatewayMessage msg, int offset, int instructionsSize) {
    291291        int payloadLength = msg.getData().length - msg.getOffset();
    292292        if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
  • router/java/src/net/i2p/router/tunnel/TunnelGateway.java

    ra0418be rf44eeaf  
    3838    protected final RouterContext _context;
    3939    protected final Log _log;
    40     protected final List<Pending> _queue;
     40    protected final List<PendingGatewayMessage> _queue;
    4141    protected final QueuePreprocessor _preprocessor;
    4242    protected final Sender _sender;
     
    172172         * @return true if we should delay before preprocessing again
    173173         */
    174         public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver);
     174        public boolean preprocessQueue(List<PendingGatewayMessage> pending, Sender sender, Receiver receiver);
    175175       
    176176        /** how long do we want to wait before flushing */
     
    185185        public long receiveEncrypted(byte encrypted[]);
    186186    }
    187    
    188     /**
    189      *  Stores all the state for an unsent or partially-sent message
    190      */
    191     public static class Pending {
    192         protected final Hash _toRouter;
    193         protected final TunnelId _toTunnel;
    194         protected final long _messageId;
    195         protected final long _expiration;
    196         protected final byte _remaining[];
    197         protected int _offset;
    198         protected int _fragmentNumber;
    199         protected final long _created;
    200         private List<Long> _messageIds;
    201        
    202         public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
    203             this(message, toRouter, toTunnel, System.currentTimeMillis());
    204         }
    205         public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) {
    206             _toRouter = toRouter;
    207             _toTunnel = toTunnel;
    208             _messageId = message.getUniqueId();
    209             _expiration = message.getMessageExpiration();
    210             _remaining = message.toByteArray();
    211             _created = now;
    212         }
    213         /** may be null */
    214         public Hash getToRouter() { return _toRouter; }
    215         /** may be null */
    216         public TunnelId getToTunnel() { return _toTunnel; }
    217         public long getMessageId() { return _messageId; }
    218         public long getExpiration() { return _expiration; }
    219         /** raw unfragmented message to send */
    220         public byte[] getData() { return _remaining; }
    221         /** index into the data to be sent */
    222         public int getOffset() { return _offset; }
    223         /** move the offset */
    224         public void setOffset(int offset) { _offset = offset; }
    225         public long getLifetime() { return System.currentTimeMillis()-_created; }
    226         /** which fragment are we working on (0 for the first fragment) */
    227         public int getFragmentNumber() { return _fragmentNumber; }
    228         /** ok, fragment sent, increment what the next will be */
    229         public void incrementFragmentNumber() { _fragmentNumber++; }
    230         /**
    231          *  Add an ID to the list of the TunnelDataMssages this message was fragmented into.
    232          *  Unused except in notePreprocessing() calls for debugging
    233          */
    234         public void addMessageId(long id) {
    235             synchronized (Pending.this) {
    236                 if (_messageIds == null)
    237                     _messageIds = new ArrayList();
    238                 _messageIds.add(Long.valueOf(id));
    239             }
    240         }
    241         /**
    242          *  The IDs of the TunnelDataMssages this message was fragmented into.
    243          *  Unused except in notePreprocessing() calls for debugging
    244          */
    245         public List<Long> getMessageIds() {
    246             synchronized (Pending.this) {
    247                 if (_messageIds != null)
    248                     return new ArrayList(_messageIds);
    249                 else
    250                     return new ArrayList();
    251             }
    252         }
    253     }
    254 
    255     /** Extend for debugging */
    256     class PendingImpl extends Pending {
    257         public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
    258             super(message, toRouter, toTunnel, _context.clock().now());
    259         }       
    260        
    261         @Override
    262         public String toString() {
    263             StringBuilder buf = new StringBuilder(64);
    264             buf.append("Message ").append(_messageId).append(" on ");
    265             buf.append(TunnelGateway.this.toString());
    266             if (_toRouter != null) {
    267                 buf.append(" targetting ");
    268                 buf.append(_toRouter.toBase64()).append(" ");
    269                 if (_toTunnel != null)
    270                     buf.append(_toTunnel.getTunnelId());
    271             }
    272             long now = _context.clock().now();
    273             buf.append(" actual lifetime ");
    274             buf.append(now - _created).append("ms");
    275             buf.append(" potential lifetime ");
    276             buf.append(_expiration - _created).append("ms");
    277             buf.append(" size ").append(_remaining.length);
    278             buf.append(" offset ").append(_offset);
    279             buf.append(" frag ").append(_fragmentNumber);
    280             return buf.toString();
    281         }
    282 
    283         @Override
    284         public long getLifetime() { return _context.clock().now()-_created; }
    285     }
    286    
     187
    287188    protected class DelayedFlush extends SimpleTimer2.TimedEvent {
    288189        DelayedFlush() {
  • router/java/src/net/i2p/router/tunnel/TunnelGatewayPumper.java

    ra0418be rf44eeaf  
    6666    public void run() {
    6767        PumpedTunnelGateway gw = null;
    68         List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
     68        List<PendingGatewayMessage> queueBuf = new ArrayList(32);
    6969        while (!_stop) {
    7070            try {
Note: See TracChangeset for help on using the changeset viewer.