Changeset 20e463e


Ignore:
Timestamp:
Sep 26, 2012 8:02:36 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
7c8ba61f
Parents:
5d3984e
Message:
  • Streaming:
    • Implement changing connection limits on a running session
    • Implement global blacklist
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/ConnThrottler.java

    r5d3984e r20e463e  
    1616class ConnThrottler {
    1717    private final ObjectCounter<Hash> counter;
    18     private final int _max;
    19     private final int _totalMax;
     18    private volatile int _max;
     19    private volatile int _totalMax;
    2020    private final AtomicInteger _currentTotal;
    2121
     
    2828        _max = max;
    2929        _totalMax = totalMax;
    30         if (max > 0)
    31             this.counter = new ObjectCounter<Hash>();
    32         else
    33             this.counter = null;
    34         if (totalMax > 0)
    35             _currentTotal = new AtomicInteger();
    36         else
    37             _currentTotal = null;
     30        this.counter = new ObjectCounter<Hash>();
     31        _currentTotal = new AtomicInteger();
    3832        SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), period);
     33    }
     34
     35    /*
     36     * @param max per-peer, 0 for unlimited
     37     * @param totalMax for all peers, 0 for unlimited
     38     * @since 0.9.3
     39     */
     40    public void updateLimits(int max, int totalMax) {
     41        _max = max;
     42        _totalMax = totalMax;
    3943    }
    4044
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    r5d3984e r20e463e  
    55import java.util.Map;
    66import java.util.Set;
     7import java.util.StringTokenizer;
    78import java.util.concurrent.ConcurrentHashMap;
    89
     
    1314import net.i2p.data.Hash;
    1415import net.i2p.data.SessionKey;
     16import net.i2p.util.ConcurrentHashSet;
     17import net.i2p.util.ConvertToHash;
    1518import net.i2p.util.Log;
    1619import net.i2p.util.SimpleTimer2;
     
    3639    /** Ping ID (Long) to PingRequest */
    3740    private final Map<Long, PingRequest> _pendingPings;
    38     private boolean _throttlersInitialized;
    39     private int _maxConcurrentStreams;
     41    private volatile boolean _throttlersInitialized;
    4042    private final ConnectionOptions _defaultOptions;
    4143    private volatile int _numWaiting;
    4244    private long _soTimeout;
    43     private ConnThrottler _minuteThrottler;
    44     private ConnThrottler _hourThrottler;
    45     private ConnThrottler _dayThrottler;
     45    private volatile ConnThrottler _minuteThrottler;
     46    private volatile ConnThrottler _hourThrottler;
     47    private volatile ConnThrottler _dayThrottler;
    4648    /** since 0.9, each manager instantiates its own timer */
    4749    private final SimpleTimer2 _timer;
    48    
    49     public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
     50    /** cache of the property to detect changes */
     51    private static volatile String _currentBlacklist = "";
     52    private static final Set<Hash> _globalBlacklist = new ConcurrentHashSet();
     53   
     54    /** @since 0.9.3 */
     55    public static final String PROP_BLACKLIST = "i2p.streaming.blacklist";
     56
     57    /**
     58     *  Manage all conns for this session
     59     */
     60    public ConnectionManager(I2PAppContext context, I2PSession session, ConnectionOptions defaultOptions) {
    5061        _context = context;
    5162        _session = session;
    52         _maxConcurrentStreams = maxConcurrent;
    5363        _defaultOptions = defaultOptions;
    5464        _log = _context.logManager().getLog(ConnectionManager.class);
     
    129139    public void setAllowIncomingConnections(boolean allow) {
    130140        _connectionHandler.setActive(allow);
    131         if (allow && !_throttlersInitialized) {
    132             _throttlersInitialized = true;
    133             if (_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) {
     141        if (allow) {
     142            synchronized(this) {
     143                if (!_throttlersInitialized) {
     144                    updateOptions();
     145                    _throttlersInitialized = true;
     146                }
     147            }
     148        }
     149    }
     150
     151    /*
     152     * Update the throttler options
     153     * @since 0.9.3
     154     */
     155    public synchronized void updateOptions() {
     156            if ((_defaultOptions.getMaxConnsPerMinute() > 0 || _defaultOptions.getMaxTotalConnsPerMinute() > 0) &&
     157                _minuteThrottler == null) {
    134158               _context.statManager().createRateStat("stream.con.throttledMinute", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
    135159               _minuteThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute(), 60*1000);
    136             }
    137             if (_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) {
     160            } else if (_minuteThrottler != null) {
     161               _minuteThrottler.updateLimits(_defaultOptions.getMaxConnsPerMinute(), _defaultOptions.getMaxTotalConnsPerMinute());
     162            }
     163            if ((_defaultOptions.getMaxConnsPerHour() > 0 || _defaultOptions.getMaxTotalConnsPerHour() > 0) &&
     164                _hourThrottler == null) {
    138165               _context.statManager().createRateStat("stream.con.throttledHour", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
    139166               _hourThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour(), 60*60*1000);
    140             }
    141             if (_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) {
     167            } else if (_hourThrottler != null) {
     168               _hourThrottler.updateLimits(_defaultOptions.getMaxConnsPerHour(), _defaultOptions.getMaxTotalConnsPerHour());
     169            }
     170            if ((_defaultOptions.getMaxConnsPerDay() > 0 || _defaultOptions.getMaxTotalConnsPerDay() > 0) &&
     171                _dayThrottler == null) {
    142172               _context.statManager().createRateStat("stream.con.throttledDay", "Dropped for conn limit", "Stream", new long[] { 5*60*1000 });
    143173               _dayThrottler = new ConnThrottler(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay(), 24*60*60*1000);
    144             }
    145         }
     174            } else if (_dayThrottler != null) {
     175               _dayThrottler.updateLimits(_defaultOptions.getMaxConnsPerDay(), _defaultOptions.getMaxTotalConnsPerDay());
     176            }
    146177    }
    147178
     
    178209            if (locked_tooManyStreams()) {
    179210                _log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
    180                               + _maxConcurrentStreams + " connections");
     211                              + _defaultOptions.getMaxConns() + " connections");
    181212                reject = true;
    182213            } else {
     
    268299            if (remaining <= 0) {
    269300                _log.logAlways(Log.WARN, "Refusing to connect since we have exceeded our max of "
    270                           + _maxConcurrentStreams + " connections");
     301                          + _defaultOptions.getMaxConns() + " connections");
    271302                _numWaiting--;
    272303                return null;
     
    274305
    275306                if (locked_tooManyStreams()) {
     307                    int max = _defaultOptions.getMaxConns();
    276308                    // allow a full buffer of pending/waiting streams
    277                     if (_numWaiting > _maxConcurrentStreams) {
     309                    if (_numWaiting > max) {
    278310                        _log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
    279                                       + _maxConcurrentStreams + " and there are " + _numWaiting
     311                                      + max + " and there are " + _numWaiting
    280312                                      + " waiting already");
    281313                        _numWaiting--;
     
    321353     */
    322354    private boolean locked_tooManyStreams() {
    323         if (_maxConcurrentStreams <= 0) return false;
    324         if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
     355        int max = _defaultOptions.getMaxConns();
     356        if (max <= 0) return false;
     357        if (_connectionByInboundId.size() < max) return false;
    325358        int active = 0;
    326359        for (Connection con : _connectionByInboundId.values()) {
     
    333366                      + " total: " + _connectionByInboundId.size());
    334367
    335         return (active >= _maxConcurrentStreams);
     368        return (active >= max);
    336369    }
    337370   
     
    399432            _defaultOptions.getBlacklist().contains(h))
    400433            return "blacklisted";
     434        String hashes = _context.getProperty(PROP_BLACKLIST, "");
     435        if (!_currentBlacklist.equals(hashes)) {
     436            // rebuild _globalBlacklist when property changes
     437            synchronized(_globalBlacklist) {
     438                if (hashes != null) {
     439                    Set<Hash> newSet = new HashSet();
     440                    StringTokenizer tok = new StringTokenizer(hashes, ",; ");
     441                    while (tok.hasMoreTokens()) {
     442                        String hashstr = tok.nextToken();
     443                        Hash hh = ConvertToHash.getHash(hashstr);
     444                        if (hh != null)
     445                            newSet.add(hh);
     446                        else
     447                            _log.error("Bad blacklist entry: " + hashstr);
     448                    }
     449                    _globalBlacklist.addAll(newSet);
     450                    _globalBlacklist.retainAll(newSet);
     451                    _currentBlacklist = hashes;
     452                } else {
     453                    _globalBlacklist.clear();
     454                    _currentBlacklist = "";
     455                }
     456            }
     457        }
     458        if (hashes.length() > 0 && _globalBlacklist.contains(h))
     459            return "blacklisted globally";
    401460        return null;
    402461    }
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java

    r5d3984e r20e463e  
    1515 * Define the current options for the con (and allow custom tweaking midstream)
    1616 *
     17 * TODO many of these are not per-connection options, and should be migrated
     18 * somewhere so they aren't copied for every connection
    1719 */
    1820class ConnectionOptions extends I2PSocketOptionsImpl {
     
    4850    private int _maxTotalConnsPerHour;
    4951    private int _maxTotalConnsPerDay;
     52    private int _maxConns;
    5053
    5154    // NOTE - almost all the options are below, but see
     
    9194    /** @since 0.9.1 */
    9295    public static final String PROP_ENFORCE_PROTO = "i2p.streaming.enforceProtocol";
     96    /**
     97     *  how many streams will we allow at once?
     98     *  @since 0.9.3 moved from I2PSocketManagerFull
     99     */
     100    public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
     101   
    93102   
    94103    private static final int TREND_COUNT = 3;
     
    309318            _maxTotalConnsPerHour = opts.getMaxTotalConnsPerHour();
    310319            _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay();
     320            _maxConns = opts.getMaxConns();
    311321    }
    312322   
     
    345355        _maxTotalConnsPerHour = getInt(opts, PROP_MAX_TOTAL_CONNS_HOUR, 0);
    346356        _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
     357        _maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
    347358    }
    348359   
     
    409420        if (opts.containsKey(PROP_MAX_TOTAL_CONNS_DAY))
    410421            _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0);
     422        if (opts.containsKey(PROP_MAX_STREAMS))
     423            _maxConns = getInt(opts, PROP_MAX_STREAMS, 0);
    411424    }
    412425   
     
    661674    public int getMaxTotalConnsPerHour() { return _maxTotalConnsPerHour; }
    662675    public int getMaxTotalConnsPerDay() { return _maxTotalConnsPerDay; }
     676    /** @since 0.9.3; no public setter */
     677    public int getMaxConns() { return _maxConns; }
    663678
    664679    public boolean isAccessListEnabled() { return _accessListEnabled; }
     
    692707            if (hashes == null)
    693708                return;
    694             StringTokenizer tok = new StringTokenizer(hashes, ", ");
     709            StringTokenizer tok = new StringTokenizer(hashes, ",; ");
    695710            while (tok.hasMoreTokens()) {
    696711                String hashstr = tok.nextToken();
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java

    r5d3984e r20e463e  
    3737    private long _acceptTimeout;
    3838    private String _name;
    39     private int _maxStreams;
    4039    private static int __managerId = 0;
    4140    private final ConnectionManager _connectionManager;
     
    5453        throw new UnsupportedOperationException();
    5554    }
    56    
    57     /** how many streams will we allow at once?  */
    58     public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams";
    5955   
    6056    /**
     
    8076        _log = _context.logManager().getLog(I2PSocketManagerFull.class);
    8177       
    82         _maxStreams = -1;
    83         try {
    84             String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1");
    85             _maxStreams = Integer.parseInt(num);
    86         } catch (NumberFormatException nfe) {
    87             if (_log.shouldLog(Log.WARN))
    88                 _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe);
    89             _maxStreams = -1;
    90         }
    9178        _name = name + " " + (++__managerId);
    9279        _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT;
    9380        _defaultOptions = new ConnectionOptions(opts);
    94         _connectionManager = new ConnectionManager(_context, _session, _maxStreams, _defaultOptions);
     81        _connectionManager = new ConnectionManager(_context, _session, _defaultOptions);
    9582        _serverSocket = new I2PServerSocketFull(this);
    9683       
     
    183170            _log.warn("Changing options from:\n " + _defaultOptions + "\nto:\n " + options);
    184171        _defaultOptions.updateAll((ConnectionOptions) options);
     172        _connectionManager.updateOptions();
    185173    }
    186174
     
    245233        Connection con = _connectionManager.connect(peer, opts);
    246234        if (con == null)
    247             throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")");
     235            throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns());
    248236        I2PSocketFull socket = new I2PSocketFull(con);
    249237        con.setSocket(socket);
  • history.txt

    r5d3984e r20e463e  
     12012-09-xx zzz
     2 * Addresses: Reject numeric IPs of the form n, n.n, and n.n.n
     3 * Console, i2ptunnel: More validation of address and port in forms
     4 * ConvertToHash:
     5   - Add support for b64hash.i2p
     6   - Cleanup and use cache
     7 * i2psnark: Enable DHT by default
     8 * RFC822Date: Synchronization fix
     9 * Streaming:
     10   - Implement changing connection limits on a running session
     11   - Implement global blacklist
     12
    1132012-09-25 zzz
    214 * Context: Make files final
     
    618 * SimpleByteCache: Concurrent fix
    719 * UPnP: Cleanup & final
    8  * URLLauncher: Add xdg-open (ticket #617)
     20 * URLLauncher: Add xdg-open (ticket #717)
    921
    10222012-09-21 zzz
  • router/java/src/net/i2p/router/RouterVersion.java

    r5d3984e r20e463e  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 2;
     21    public final static long BUILD = 3;
    2222
    2323    /** for example "-test" */
Note: See TracChangeset for help on using the changeset viewer.