Changeset 69f051d


Ignore:
Timestamp:
Feb 4, 2009 2:17:10 PM (11 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
a6dc27a
Parents:
5946c35
Message:

concurrentify TunnelDispatcher?

File:
1 edited

Legend:

Unmodified
Added
Removed
  • router/java/src/net/i2p/router/tunnel/TunnelDispatcher.java

    r5946c35 r69f051d  
    33import java.io.IOException;
    44import java.io.Writer;
     5import java.util.concurrent.ConcurrentHashMap;
    56import java.util.ArrayList;
    6 import java.util.HashMap;
    77import java.util.List;
    88import java.util.Map;
     
    3131    private RouterContext _context;
    3232    private Log _log;
    33     private Map _outboundGateways;
    34     private Map _outboundEndpoints;
    35     private Map _participants;
    36     private Map _inboundGateways;
    37     /** id to HopConfig */
    38     private Map _participatingConfig;
     33    private Map<TunnelId, TunnelGateway> _outboundGateways;
     34    private Map<TunnelId, OutboundTunnelEndpoint> _outboundEndpoints;
     35    private Map<TunnelId, TunnelParticipant> _participants;
     36    private Map<TunnelId, TunnelGateway> _inboundGateways;
     37    private Map<TunnelId, HopConfig> _participatingConfig;
    3938    /** what is the date/time on which the last non-locally-created tunnel expires? */
    4039    private long _lastParticipatingExpiration;
     
    4948        _context = ctx;
    5049        _log = ctx.logManager().getLog(TunnelDispatcher.class);
    51         _outboundGateways = new HashMap();
    52         _outboundEndpoints = new HashMap();
    53         _participants = new HashMap();
    54         _inboundGateways = new HashMap();
    55         _participatingConfig = new HashMap();
     50        _outboundGateways = new ConcurrentHashMap();
     51        _outboundEndpoints = new ConcurrentHashMap();
     52        _participants = new ConcurrentHashMap();
     53        _inboundGateways = new ConcurrentHashMap();
     54        _participatingConfig = new ConcurrentHashMap();
    5655        _lastParticipatingExpiration = 0;
    5756        _lastDropTime = 0;
     
    159158            TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
    160159            TunnelId outId = cfg.getConfig(0).getSendTunnel();
    161             synchronized (_outboundGateways) {
    162                 _outboundGateways.put(outId, gw);
    163             }
     160            _outboundGateways.put(outId, gw);
    164161            _context.statManager().addRateData("tunnel.joinOutboundGateway", 1, 0);
    165162            _context.messageHistory().tunnelJoined("outbound", cfg);
     
    167164            TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
    168165            TunnelId outId = cfg.getConfig(0).getSendTunnel();
    169             synchronized (_outboundGateways) {
    170                 _outboundGateways.put(outId, gw);
    171             }
     166            _outboundGateways.put(outId, gw);
    172167            _context.statManager().addRateData("tunnel.joinOutboundGatewayZeroHop", 1, 0);
    173168            _context.messageHistory().tunnelJoined("outboundZeroHop", cfg);
     
    184179            TunnelParticipant participant = new TunnelParticipant(_context, new InboundEndpointProcessor(_context, cfg, _validator));
    185180            TunnelId recvId = cfg.getConfig(cfg.getLength()-1).getReceiveTunnel();
    186             synchronized (_participants) {
    187                 _participants.put(recvId, participant);
    188             }
     181            _participants.put(recvId, participant);
    189182            _context.statManager().addRateData("tunnel.joinInboundEndpoint", 1, 0);
    190183            _context.messageHistory().tunnelJoined("inboundEndpoint", cfg);
     
    192185            TunnelGatewayZeroHop gw = new TunnelGatewayZeroHop(_context, cfg);
    193186            TunnelId recvId = cfg.getConfig(0).getReceiveTunnel();
    194             synchronized (_inboundGateways) {
    195                 _inboundGateways.put(recvId, gw);
    196             }
     187            _inboundGateways.put(recvId, gw);
    197188            _context.statManager().addRateData("tunnel.joinInboundEndpointZeroHop", 1, 0);
    198189            _context.messageHistory().tunnelJoined("inboundEndpointZeroHop", cfg);
     
    209200        TunnelId recvId = cfg.getReceiveTunnel();
    210201        TunnelParticipant participant = new TunnelParticipant(_context, cfg, new HopProcessor(_context, cfg, _validator));
    211         synchronized (_participants) {
    212             _participants.put(recvId, participant);
    213         }
    214         synchronized (_participatingConfig) {
    215             _participatingConfig.put(recvId, cfg);
    216         }
     202        _participants.put(recvId, participant);
     203        _participatingConfig.put(recvId, cfg);
    217204        _context.messageHistory().tunnelJoined("participant", cfg);
    218205        _context.statManager().addRateData("tunnel.joinParticipant", 1, 0);
     
    230217        TunnelId recvId = cfg.getReceiveTunnel();
    231218        OutboundTunnelEndpoint endpoint = new OutboundTunnelEndpoint(_context, cfg, new HopProcessor(_context, cfg, _validator));
    232         synchronized (_outboundEndpoints) {
    233             _outboundEndpoints.put(recvId, endpoint);
    234         }
    235         synchronized (_participatingConfig) {
    236             _participatingConfig.put(recvId, cfg);
    237         }
     219        _outboundEndpoints.put(recvId, endpoint);
     220        _participatingConfig.put(recvId, cfg);
    238221        _context.messageHistory().tunnelJoined("outboundEndpoint", cfg);
    239222        _context.statManager().addRateData("tunnel.joinOutboundEndpoint", 1, 0);
     
    257240        TunnelGateway gw = new PumpedTunnelGateway(_context, preproc, sender, receiver, _pumper);
    258241        TunnelId recvId = cfg.getReceiveTunnel();
    259         synchronized (_inboundGateways) {
    260             _inboundGateways.put(recvId, gw);
    261         }
    262         synchronized (_participatingConfig) {
    263             _participatingConfig.put(recvId, cfg);
    264         }
     242        _inboundGateways.put(recvId, gw);
     243        _participatingConfig.put(recvId, cfg);
    265244        _context.messageHistory().tunnelJoined("inboundGateway", cfg);
    266245        _context.statManager().addRateData("tunnel.joinInboundGateway", 1, 0);
     
    272251
    273252    public int getParticipatingCount() {
    274         synchronized (_participatingConfig) {
    275             return _participatingConfig.size();
    276         }
     253        return _participatingConfig.size();
    277254    }
    278255   
     
    288265            if (_log.shouldLog(Log.DEBUG))
    289266                _log.debug("removing our own inbound " + cfg);
    290             TunnelParticipant participant = null;
    291             synchronized (_participants) {
    292                 participant = (TunnelParticipant)_participants.remove(recvId);
    293             }
     267            TunnelParticipant participant = _participants.remove(recvId);
    294268            if (participant == null) {
    295                 synchronized (_inboundGateways) {
    296                     _inboundGateways.remove(recvId);
    297                 }
     269                _inboundGateways.remove(recvId);
    298270            } else {
    299271                // update stats based off getCompleteCount() + getFailedCount()
     
    312284                _log.debug("removing our own outbound " + cfg);
    313285            TunnelId outId = cfg.getConfig(0).getSendTunnel();
    314             TunnelGateway gw = null;
    315             synchronized (_outboundGateways) {
    316                 gw = (TunnelGateway)_outboundGateways.remove(outId);
    317             }   
     286            TunnelGateway gw = _outboundGateways.remove(outId);
    318287            if (gw != null) {
    319288                // update stats based on gw.getMessagesSent()
     
    340309            _log.debug("removing " + cfg);
    341310       
    342         boolean removed = false;
    343         synchronized (_participatingConfig) {
    344             removed = (null != _participatingConfig.remove(recvId));
    345         }
     311        boolean removed = (null != _participatingConfig.remove(recvId));
    346312        if (!removed) {
    347313            if (_log.shouldLog(Log.WARN))
     
    349315        }
    350316       
    351         synchronized (_participants) {
    352             removed = (null != _participants.remove(recvId));
    353         }
     317        removed = (null != _participants.remove(recvId));
    354318        if (removed) return;
    355         synchronized (_inboundGateways) {
    356             removed = (null != _inboundGateways.remove(recvId));
    357         }
     319        removed = (null != _inboundGateways.remove(recvId));
    358320        if (removed) return;
    359         synchronized (_outboundEndpoints) {
    360             removed = (null != _outboundEndpoints.remove(recvId));
    361         }
     321        _outboundEndpoints.remove(recvId);
    362322    }
    363323   
     
    373333    public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
    374334        long before = System.currentTimeMillis();
    375         TunnelParticipant participant = null;
    376         synchronized (_participants) {
    377             participant = (TunnelParticipant)_participants.get(msg.getTunnelIdObj());
    378         }
     335        TunnelParticipant participant = _participants.get(msg.getTunnelIdObj());
    379336        if (participant != null) {
    380337            // we are either just a random participant or the inbound endpoint
     
    386343            _context.statManager().addRateData("tunnel.dispatchParticipant", 1, 0);
    387344        } else {
    388             OutboundTunnelEndpoint endpoint = null;
    389             synchronized (_outboundEndpoints) {
    390                 endpoint = (OutboundTunnelEndpoint)_outboundEndpoints.get(msg.getTunnelIdObj());
    391             }
     345            OutboundTunnelEndpoint endpoint = _outboundEndpoints.get(msg.getTunnelIdObj());
    392346            if (endpoint != null) {
    393347                // we are the outobund endpoint
     
    422376    public void dispatch(TunnelGatewayMessage msg) {
    423377        long before = System.currentTimeMillis();
    424         TunnelGateway gw = null;
    425         synchronized (_inboundGateways) {
    426             gw = (TunnelGateway)_inboundGateways.get(msg.getTunnelId());
    427         }
     378        TunnelGateway gw = _inboundGateways.get(msg.getTunnelId());
    428379        if (gw != null) {
    429380            if (_log.shouldLog(Log.DEBUG))
     
    490441        if (outboundTunnel == null) throw new IllegalArgumentException("wtf, null outbound tunnel?");
    491442        long before = _context.clock().now();
    492         TunnelGateway gw = null;
    493         synchronized (_outboundGateways) {
    494             gw = (TunnelGateway)_outboundGateways.get(outboundTunnel);
    495         }
     443        TunnelGateway gw = _outboundGateways.get(outboundTunnel);
    496444        if (gw != null) {
    497445            if (_log.shouldLog(Log.DEBUG))
     
    539487    }
    540488   
    541     public List listParticipatingTunnels() {
    542         synchronized (_participatingConfig) {
    543             return new ArrayList(_participatingConfig.values());
    544         }
     489    public List<HopConfig> listParticipatingTunnels() {
     490        return new ArrayList(_participatingConfig.values());
    545491    }
    546492
     
    555501     */
    556502    public void updateParticipatingStats() {
    557         List participating = listParticipatingTunnels();
     503        List<HopConfig> participating = listParticipatingTunnels();
    558504        int size = participating.size();
    559505        long count = 0;
     
    564510        long tooOld = tooYoung - 9*60*1000;
    565511        for (int i = 0; i < size; i++) {
    566             HopConfig cfg = (HopConfig)participating.get(i);
     512            HopConfig cfg = participating.get(i);
    567513            long c = cfg.getRecentMessagesCount();
    568514            bw += c;
     
    646592    public void dropBiggestParticipating() {
    647593
    648        List partTunnels = listParticipatingTunnels();
     594       List<HopConfig> partTunnels = listParticipatingTunnels();
    649595       if ((partTunnels == null) || (partTunnels.size() == 0)) {
    650596           if (_log.shouldLog(Log.ERROR))
     
    669615       for (int i=0; i<partTunnels.size(); i++) {
    670616
    671            current = (HopConfig)partTunnels.get(i);
     617           current = partTunnels.get(i);
    672618
    673619           long currentMessages = current.getProcessedMessagesCount();
     
    717663   
    718664    private class LeaveTunnel extends JobImpl {
    719         private List _configs;
    720         private List _times;
     665        private List<HopConfig> _configs;
     666        private List<Long> _times;
    721667       
    722668        public LeaveTunnel(RouterContext ctx) {
     
    766712                    if (_configs.size() <= 0)
    767713                        return;
    768                     nextTime = (Long)_times.get(0);
     714                    nextTime = _times.get(0);
    769715                    if (nextTime.longValue() <= now) {
    770                         cur = (HopConfig)_configs.remove(0);
     716                        cur = _configs.remove(0);
    771717                        _times.remove(0);
    772718                        if (_times.size() > 0)
    773                             nextTime = (Long)_times.get(0);
     719                            nextTime = _times.get(0);
    774720                        else
    775721                            nextTime = null;
Note: See TracChangeset for help on using the changeset viewer.