Changeset 7ec29b0


Ignore:
Timestamp:
Feb 2, 2009 6:03:16 PM (11 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
d236b9b4
Parents:
8d734050
Message:

use concurrent

Location:
router/java/src/net/i2p/router/client
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/client/ClientConnectionRunner.java

    r8d734050 r7ec29b0  
    1212import java.io.OutputStream;
    1313import java.net.Socket;
     14import java.util.concurrent.ConcurrentHashMap;
    1415import java.util.ArrayList;
    1516import java.util.HashMap;
     
    6061    private SessionConfig _config;
    6162    /** static mapping of MessageId to Payload, storing messages for retrieval */
    62     private Map _messages;
     63    private Map<MessageId, Payload> _messages;
    6364    /** lease set request state, or null if there is no request pending on at the moment */
    6465    private LeaseRequestState _leaseRequest;
     
    8990        _socket = socket;
    9091        _config = null;
    91         _messages = new HashMap();
     92        _messages = new ConcurrentHashMap();
    9293        _alreadyProcessed = new ArrayList();
    9394        _acceptedPending = new HashSet();
     
    107108            _writer = new ClientWriterRunner(_context, this);
    108109            I2PThread t = new I2PThread(_writer);
    109             t.setName("Writer " + ++__id);
     110            t.setName("I2CP Writer " + ++__id);
    110111            t.setDaemon(true);
    111112            t.setPriority(I2PThread.MAX_PRIORITY);
     
    129130        if (_writer != null) _writer.stopWriting();
    130131        if (_socket != null) try { _socket.close(); } catch (IOException ioe) { }
    131         synchronized (_messages) {
    132             _messages.clear();
    133         }
     132        _messages.clear();
    134133        if (_manager != null)
    135134            _manager.unregisterConnection(this);
     
    165164    /** already closed? */
    166165    boolean isDead() { return _dead; }
     166
    167167    /** message body */
    168168    Payload getPayload(MessageId id) {
    169         Payload rv = null;
    170         long beforeLock = _context.clock().now();
    171         long inLock = 0;
    172         synchronized (_messages) {
    173             inLock = _context.clock().now();
    174             rv = (Payload)_messages.get(id);
    175         }
    176         long afterLock = _context.clock().now();
    177        
    178         if (afterLock - beforeLock > 50) {
    179             _log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock)
    180                       + " overall, synchronized took " + (inLock - beforeLock));
    181         }
    182         return rv;
    183     }
     169        return _messages.get(id);
     170    }
     171
    184172    void setPayload(MessageId id, Payload payload) {
    185         long beforeLock = _context.clock().now();
    186         long inLock = 0;
    187         synchronized (_messages) {
    188             inLock = _context.clock().now();
    189             _messages.put(id, payload);
    190         }
    191         long afterLock = _context.clock().now();
    192        
    193         if (afterLock - beforeLock > 50) {
    194             _log.warn("setPayload.locking took too long: " + (afterLock-beforeLock)
    195                       + " overall, synchronized took " + (inLock - beforeLock));
    196         }
    197     }
     173        _messages.put(id, payload);
     174    }
     175
    198176    void removePayload(MessageId id) {
    199         long beforeLock = _context.clock().now();
    200         long inLock = 0;
    201         synchronized (_messages) {
    202             inLock = _context.clock().now();
    203             _messages.remove(id);
    204         }
    205         long afterLock = _context.clock().now();
    206        
    207         if (afterLock - beforeLock > 50) {
    208             _log.warn("removePayload.locking took too long: " + (afterLock-beforeLock)
    209                       + " overall, synchronized took " + (inLock - beforeLock));
    210         }
     177        _messages.remove(id);
    211178    }
    212179   
  • router/java/src/net/i2p/router/client/ClientWriterRunner.java

    r8d734050 r7ec29b0  
    11package net.i2p.router.client;
    22
    3 import java.util.ArrayList;
    4 import java.util.List;
     3import java.io.IOException;
     4import java.io.InputStream;
     5import java.util.concurrent.BlockingQueue;
     6import java.util.concurrent.LinkedBlockingQueue;
    57
    68import net.i2p.data.i2cp.I2CPMessage;
     9import net.i2p.data.i2cp.I2CPMessageImpl;
     10import net.i2p.data.i2cp.I2CPMessageException;
    711import net.i2p.router.RouterContext;
    812import net.i2p.util.Log;
     
    1418 * happen)
    1519 *
     20 * @author zzz modded to use concurrent
    1621 */
    1722class ClientWriterRunner implements Runnable {
    18     private List _messagesToWrite;
    19     private List _messagesToWriteTimes;
     23    private BlockingQueue<I2CPMessage> _messagesToWrite;
    2024    private ClientConnectionRunner _runner;
    21     private RouterContext _context;
    2225    private Log _log;
    2326    private long _id;
    2427    private static long __id = 0;
    2528   
    26     private static final long MAX_WAIT = 5*1000;
    27    
    28     /** lock on this when updating the class level data structs */
    29     private Object _dataLock = new Object();
    30    
    3129    public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {
    32         _context = context;
    3330        _log = context.logManager().getLog(ClientWriterRunner.class);
    34         _messagesToWrite = new ArrayList(4);
    35         _messagesToWriteTimes = new ArrayList(4);
     31        _messagesToWrite = new LinkedBlockingQueue();
    3632        _runner = runner;
    3733        _id = ++__id;
     
    4339     */
    4440    public void addMessage(I2CPMessage msg) {
    45         synchronized (_dataLock) {
    46             _messagesToWrite.add(msg);
    47             _messagesToWriteTimes.add(new Long(_context.clock().now()));
    48             _dataLock.notifyAll();
    49         }
     41        try {
     42            _messagesToWrite.put(msg);
     43        } catch (InterruptedException ie) {}
    5044        if (_log.shouldLog(Log.DEBUG))
    5145            _log.debug("["+_id+"] addMessage completed for " + msg.getClass().getName());
     
    5751     */
    5852    public void stopWriting() {
    59         synchronized (_dataLock) {
    60             _dataLock.notifyAll();
     53        _messagesToWrite.clear();
     54        try {
     55            _messagesToWrite.put(new PoisonMessage());
     56        } catch (InterruptedException ie) {}
     57    }
     58
     59    public void run() {
     60        I2CPMessage msg;
     61        while (!_runner.getIsDead()) {
     62            try {
     63                msg = _messagesToWrite.take();
     64            } catch (InterruptedException ie) {
     65                continue;
     66            }
     67            if (msg.getType() == PoisonMessage.MESSAGE_TYPE)
     68                break;
     69            _runner.writeMessage(msg);
    6170        }
    6271    }
    63     public void run() {
    64         List messages = new ArrayList(64);
    65         List messageTimes = new ArrayList(64);
    66         List switchList = null;
    67        
    68         while (!_runner.getIsDead()) {
    69             synchronized (_dataLock) {
    70                 if (_messagesToWrite.size() <= 0)
    71                     try { _dataLock.wait(); } catch (InterruptedException ie) {}
    72                
    73                 if (_messagesToWrite.size() > 0) {
    74                     switchList = _messagesToWrite;
    75                     _messagesToWrite = messages;
    76                     messages = switchList;
    77                    
    78                     switchList = _messagesToWriteTimes;
    79                     _messagesToWriteTimes = messageTimes;
    80                     messageTimes = switchList;
    81                 }
    82             }
    83            
    84             if (messages.size() > 0) {
    85                 for (int i = 0; i < messages.size(); i++) {
    86                     I2CPMessage msg = (I2CPMessage)messages.get(i);
    87                     Long when = (Long)messageTimes.get(i);
    88                     if (_log.shouldLog(Log.DEBUG))
    89                         _log.debug("["+_id+"] writeMessage before writing "
    90                                    + msg.getClass().getName());
    91                     _runner.writeMessage(msg);
    92                     if (_log.shouldLog(Log.DEBUG))
    93                         _log.debug("["+_id+"] writeMessage time since addMessage(): "
    94                                    + (_context.clock().now()-when.longValue()) + " for "
    95                                    + msg.getClass().getName());
    96                 }
    97             }
    98             messages.clear();
    99             messageTimes.clear();
     72
     73    /**
     74     * End-of-stream msg used to stop the concurrent queue
     75     * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
     76     *
     77     */
     78    private static class PoisonMessage extends I2CPMessageImpl {
     79        public static final int MESSAGE_TYPE = 999999;
     80        public int getType() {
     81            return MESSAGE_TYPE;
    10082        }
     83        public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
     84        public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
    10185    }
    10286}
Note: See TracChangeset for help on using the changeset viewer.