Changeset 099515a for apps/streaming


Ignore:
Timestamp:
Jun 8, 2015 9:50:42 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
cbc2f89
Parents:
e8f4e19 (diff), 657f13a (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p' (head 1de143fff53bb56e6eac926d6293d62200f0c392)

to branch 'i2p.i2p.zzz.multisess' (head 70fc07857232668b93ca6ba02c433dffc7639132)

Location:
apps/streaming/java/src/net/i2p/client/streaming/impl
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • apps/streaming/java/src/net/i2p/client/streaming/impl/Connection.java

    re8f4e19 r099515a  
    875875    public void setOptions(ConnectionOptions opts) { _options = opts; }
    876876       
     877    /** @since 0.9.21 */
     878    public ConnectionManager getConnectionManager() { return _connectionManager; }
     879
    877880    public I2PSession getSession() { return _connectionManager.getSession(); }
    878881    public I2PSocketFull getSocket() { return _socket; }
  • apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionManager.java

    re8f4e19 r099515a  
    8989        int protocol = defaultOptions.getEnforceProtocol() ? I2PSession.PROTO_STREAMING : I2PSession.PROTO_ANY;
    9090        _session.addMuxedSessionListener(_messageHandler, protocol, defaultOptions.getLocalPort());
    91         _outboundQueue = new PacketQueue(_context, _session, this);
     91        _outboundQueue = new PacketQueue(_context, _session);
    9292        _recentlyClosed = new LHMCache<Long, Object>(32);
    9393        /** Socket timeout for accept() */
     
    430430                    try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
    431431                } else {
    432                     con = new Connection(_context, this, _schedulerChooser, _timer, _outboundQueue, _conPacketHandler, opts, false);
     432                    con = new Connection(_context, this, _schedulerChooser, _timer,
     433                                         _outboundQueue, _conPacketHandler, opts, false);
    433434                    con.setRemotePeer(peer);
    434435                    assignReceiveStreamId(con);
     
    891892            req.pong(payload);
    892893    }
     894
     895    /**
     896     *  @since 0.9.20
     897     */
     898    @Override
     899    public String toString() {
     900        return "ConnectionManager for " + _session;
     901    }
    893902}
  • apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketManagerFull.java

    re8f4e19 r099515a  
    11package net.i2p.client.streaming.impl;
    22
     3import java.io.ByteArrayInputStream;
     4import java.io.ByteArrayOutputStream;
    35import java.io.IOException;
     6import java.io.InputStream;
    47import java.net.ConnectException;
    58import java.net.NoRouteToHostException;
     
    811import java.net.SocketTimeoutException;
    912import java.util.HashSet;
     13import java.util.List;
     14import java.util.Map;
    1015import java.util.Properties;
    1116import java.util.Set;
     17import java.util.concurrent.ConcurrentHashMap;
    1218import java.util.concurrent.atomic.AtomicBoolean;
    1319import java.util.concurrent.atomic.AtomicInteger;
     
    1521import net.i2p.I2PAppContext;
    1622import net.i2p.I2PException;
     23import net.i2p.client.I2PClient;
    1724import net.i2p.client.I2PSession;
    1825import net.i2p.client.I2PSessionException;
     
    2128import net.i2p.client.streaming.I2PSocketManager;
    2229import net.i2p.client.streaming.I2PSocketOptions;
     30import net.i2p.crypto.SigType;
     31import net.i2p.data.Certificate;
    2332import net.i2p.data.Destination;
     33import net.i2p.data.Hash;
     34import net.i2p.data.PrivateKey;
     35import net.i2p.data.PublicKey;
     36import net.i2p.data.SimpleDataStructure;
     37import net.i2p.util.ConvertToHash;
    2438import net.i2p.util.Log;
    2539
     
    3852    private final Log _log;
    3953    private final I2PSession _session;
     54    private final ConcurrentHashMap<I2PSession, ConnectionManager> _subsessions;
    4055    private final I2PServerSocketFull _serverSocket;
    4156    private StandardServerSocket _realServerSocket;
     
    4661    private final ConnectionManager _connectionManager;
    4762    private final AtomicBoolean _isDestroyed = new AtomicBoolean();
    48    
     63
     64    /** @since 0.9.20 */
     65    private static final Set<Hash> _dsaOnly = new HashSet<Hash>(16);
     66    private static final String[] DSA_ONLY_HASHES = {
     67        // list from http://zzz.i2p/topics/1682?page=1#p8414
     68        // bzr.welterde.i2p
     69        "Cvs1gCZTTkgD2Z2byh2J9atPmh5~I8~L7BNQnQl0hUE=",
     70        // docs.i2p2.i2p
     71        "WCXV87RdrF6j-mnn6qt7kVSBifHTlPL0PmVMFWwaolo=",
     72        // flibusta.i2p
     73        "yy2hYtqqfl84N9skwdRkeM7baFMXHKyDWU3XRShlEo8=",
     74        // forum.i2p
     75        "3t5Ar2NCTIOId70uzX2bZyJljR0aBogxMEzNyHirB7A=",
     76        // i2jump.i2p
     77        "9vaoGZbOaeqdRK2qEunlwRM9mUSW-I9R4OON35TDKK4=",
     78        // irc.welterde.i2p
     79        "5rjezx4McFk3bNhoJV-NTLlQW1AR~jiUcN6DOWMCCVc=",
     80        // lists.i2p2.i2p
     81        "qwtgoFoMSK0TOtbT4ovBX1jHUzCoZCPzrJVxjKD7RCg=",
     82        // mtn.i2p2.i2p
     83        "X5VDzYaoX9-P6bAWnrVSR5seGLkOeORP2l3Mh4drXPo=",
     84        // nntp.welterde.i2p
     85        "VXwmNIwMy1BcUVmut0oZ72jbWoqFzvxJukmS-G8kAAE=",
     86        // paste.i2p2.i2p
     87        "DoyMyUUgOSTddvRpqYfKHFPPjkkX~iQmResyfjjBYWs=",
     88        // syndie.wetlerde.i2p
     89        "xMxC54BFgyp-~zzrQI3F8m2CK--9XMcNmSAep6RH4Kk=",
     90        // ugha.i2p
     91        "zsu3WF~QLBxZXH-gHq9MuZE6y8ROZmMF7dA2MbMMKkY=",
     92        // tracker.welterde.i2p
     93        "EVkFgKkrDKyGfI7TIuDmlHoAmvHC~FbnY946DfujR0A=",
     94        // www.i2p2.i2p
     95        "im9gytzKT15mT1sB5LC9bHXCcwytQ4EPcrGQhoam-4w="
     96    };
     97   
     98    static {
     99        for (int i = 0; i < DSA_ONLY_HASHES.length; i++) {
     100            String s = DSA_ONLY_HASHES[i];
     101            Hash h = ConvertToHash.getHash(s);
     102            if (h != null)
     103                _dsaOnly.add(h);
     104            else
     105                System.out.println("Bad hash " + s);
     106        }
     107    }
     108
    49109    /**
    50110     * How long to wait for the client app to accept() before sending back CLOSE?
     
    81141        _context = context;
    82142        _session = session;
     143        _subsessions = new ConcurrentHashMap<I2PSession, ConnectionManager>(4);
    83144        _log = _context.logManager().getLog(I2PSocketManagerFull.class);
    84145       
     
    121182    }
    122183   
     184    /**
     185     *  @return a new subsession, non-null
     186     *  @param privateKeyStream null for transient, if non-null must have same encryption keys as primary session
     187     *                          and different signing keys
     188     *  @param opts subsession options if any, may be null
     189     *  @since 0.9.19
     190     */
     191    public I2PSession addSubsession(InputStream privateKeyStream, Properties opts) throws I2PSessionException {
     192        if (privateKeyStream == null) {
     193            // We don't actually need the same pubkey in the dest, just in the LS.
     194            // The dest one is unused. But this is how we find the LS keys
     195            // to reuse in RequestLeaseSetMessageHandler.
     196            ByteArrayOutputStream keyStream = new ByteArrayOutputStream(1024);
     197            try {
     198                SigType type = getSigType(opts);
     199                if (type != SigType.DSA_SHA1) {
     200                    // hassle, have to set up the padding and cert, see I2PClientImpl
     201                    throw new I2PSessionException("type " + type + " unsupported");
     202                }
     203                PublicKey pub = _session.getMyDestination().getPublicKey();
     204                PrivateKey priv = _session.getDecryptionKey();
     205                SimpleDataStructure[] keys = _context.keyGenerator().generateSigningKeys(type);
     206                pub.writeBytes(keyStream);
     207                keys[0].writeBytes(keyStream); // signing pub
     208                Certificate.NULL_CERT.writeBytes(keyStream);
     209                priv.writeBytes(keyStream);
     210                keys[1].writeBytes(keyStream); // signing priv
     211            } catch (Exception e) {
     212                throw new I2PSessionException("Error creating keys", e);
     213            }
     214            privateKeyStream = new ByteArrayInputStream(keyStream.toByteArray());
     215        }
     216        I2PSession rv = _session.addSubsession(privateKeyStream, opts);
     217        ConnectionOptions defaultOptions = new ConnectionOptions(opts);
     218        ConnectionManager connectionManager = new ConnectionManager(_context, rv, defaultOptions);
     219        ConnectionManager old = _subsessions.putIfAbsent(rv, connectionManager);
     220        if (old != null) {
     221            // shouldn't happen
     222            _session.removeSubsession(rv);
     223            connectionManager.shutdown();
     224            throw new I2PSessionException("dup");
     225        }
     226        if (_log.shouldLog(Log.WARN))
     227            _log.warn("Added subsession " + rv);
     228        return rv;
     229    }
     230
     231    /**
     232     *  @param opts may be null
     233     *  @since 0.9.20 copied from I2PSocketManagerFactory
     234     */
     235    private SigType getSigType(Properties opts) {
     236        if (opts != null) {
     237            String st = opts.getProperty(I2PClient.PROP_SIGTYPE);
     238            if (st != null) {
     239                SigType rv = SigType.parseSigType(st);
     240                if (rv != null && rv.isAvailable())
     241                    return rv;
     242                if (rv != null)
     243                    st = rv.toString();
     244                _log.logAlways(Log.WARN, "Unsupported sig type " + st +
     245                                         ", reverting to " + I2PClient.DEFAULT_SIGTYPE);
     246                // TODO throw instead?
     247            }
     248        }
     249        return I2PClient.DEFAULT_SIGTYPE;
     250    }
     251   
     252    /**
     253     *  Remove the subsession
     254     *
     255     *  @since 0.9.19
     256     */
     257    public void removeSubsession(I2PSession session) {
     258        _session.removeSubsession(session);
     259        ConnectionManager cm = _subsessions.remove(session);
     260        if (cm != null) {
     261            cm.shutdown();
     262            if (_log.shouldLog(Log.WARN))
     263                _log.warn("Removeed subsession " + session);
     264        } else {
     265            if (_log.shouldLog(Log.WARN))
     266                _log.warn("Subsession not found to remove " + session);
     267        }
     268    }
     269   
     270    /**
     271     *  @return a list of subsessions, non-null, does not include the primary session
     272     *  @since 0.9.19
     273     */
     274    public List<I2PSession> getSubsessions() {
     275        return _session.getSubsessions();
     276    }
     277   
    123278    public ConnectionManager getConnectionManager() {
    124279        return _connectionManager;
     
    263418
    264419    private void verifySession() throws I2PException {
     420        verifySession(_connectionManager);
     421    }
     422
     423    /** @since 0.9.20 */
     424    private void verifySession(ConnectionManager cm) throws I2PException {
    265425        if (_isDestroyed.get())
    266426            throw new I2PException("Session was closed");
    267         if (!_connectionManager.getSession().isClosed())
     427        if (!cm.getSession().isClosed())
    268428            return;
    269         _connectionManager.getSession().connect();
     429        cm.getSession().connect();
    270430    }
    271431   
     
    286446    public I2PSocket connect(Destination peer, I2PSocketOptions options)
    287447                             throws I2PException, NoRouteToHostException {
    288         verifySession();
    289448        if (options == null)
    290449            options = _defaultOptions;
     
    298457            _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6)
    299458                      + " with options: " + opts);
     459        // pick the subsession here
     460        ConnectionManager cm = _connectionManager;
     461        if (!_subsessions.isEmpty()) {
     462            Hash h = peer.calculateHash();
     463            if (_dsaOnly.contains(h)) {
     464                // FIXME just taking the first one for now
     465                for (Map.Entry<I2PSession, ConnectionManager> e : _subsessions.entrySet()) {
     466                    if (e.getKey().getMyDestination().getSigType() == SigType.DSA_SHA1) {
     467                        cm = e.getValue();
     468                        break;
     469                    }
     470                }
     471            }
     472        }
     473        verifySession(cm);
    300474        // the following blocks unless connect delay > 0
    301         Connection con = _connectionManager.connect(peer, opts);
     475        Connection con = cm.connect(peer, opts);
    302476        if (con == null)
    303477            throw new TooManyStreamsException("Too many streams, max " + _defaultOptions.getMaxConns());
     
    382556        _connectionManager.setAllowIncomingConnections(false);
    383557        _connectionManager.shutdown();
     558        if (!_subsessions.isEmpty()) {
     559            for (I2PSession sess : _subsessions.keySet()) {
     560                 removeSubsession(sess);
     561            }
     562        }
     563
    384564        // should we destroy the _session too?
    385565        // yes, since the old lib did (and SAM wants it to, and i dont know why not)
  • apps/streaming/java/src/net/i2p/client/streaming/impl/MessageHandler.java

    re8f4e19 r099515a  
    5151     */
    5252    public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
    53         byte data[] = null;
     53        byte data[];
    5454        try {
    5555            data = session.receiveMessage(msgId);
     
    6060            return;
    6161        }
    62         if (data == null) return;
     62        if (data == null) {
     63            if (_log.shouldLog(Log.WARN))
     64                _log.warn("Received null data on " + session + " proto: " + proto +
     65                          " fromPort: " + fromPort + " toPort: " + toPort);
     66            return;
     67        }
     68        if (_log.shouldLog(Log.DEBUG))
     69            _log.debug("Received " + data.length + " bytes on " + session +
     70                       " (" + _manager + ')' +
     71                       " proto: " + proto +
     72                       " fromPort: " + fromPort + " toPort: " + toPort);
    6373        Packet packet = new Packet();
    6474        try {
  • apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java

    re8f4e19 r099515a  
    569569    public void close() {
    570570        synchronized (_dataLock) {
     571            if (_log.shouldLog(Log.DEBUG)) {
     572                StringBuilder buf = new StringBuilder(128);
     573                buf.append("close(), ready bytes: ");
     574                long available = 0;
     575                for (int i = 0; i < _readyDataBlocks.size(); i++)
     576                    available += _readyDataBlocks.get(i).getValid();
     577                available -= _readyDataBlockIndex;
     578                buf.append(available);
     579                buf.append(" blocks: ").append(_readyDataBlocks.size());
     580                buf.append(" not ready blocks: ");
     581                long notAvailable = 0;
     582                for (Long id : _notYetReadyBlocks.keySet()) {
     583                    ByteArray ba = _notYetReadyBlocks.get(id);
     584                    buf.append(id).append(" ");
     585                    if (ba != null)
     586                        notAvailable += ba.getValid();
     587                }
     588                buf.append("not ready bytes: ").append(notAvailable);
     589                buf.append(" highest ready block: ").append(_highestReadyBlockId);
     590                _log.debug(buf.toString());
     591            }
    571592            //while (_readyDataBlocks.size() > 0)
    572593            //    _cache.release((ByteArray)_readyDataBlocks.remove(0));
  • apps/streaming/java/src/net/i2p/client/streaming/impl/Packet.java

    re8f4e19 r099515a  
    767767        if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE");
    768768        if (isFlagSet(FLAG_RESET)) buf.append(" RESET");
    769         if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG");
     769        if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG ").append(_optionSignature.length());
    770770        if (isFlagSet(FLAG_SIGNATURE_REQUESTED)) buf.append(" SIGREQ");
    771771        if (isFlagSet(FLAG_SYNCHRONIZE)) buf.append(" SYN");
  • apps/streaming/java/src/net/i2p/client/streaming/impl/PacketQueue.java

    re8f4e19 r099515a  
    3030    private final Log _log;
    3131    private final I2PSession _session;
    32     private final ConnectionManager _connectionManager;
    3332    private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
    3433    private final Map<Long, Connection> _messageStatusMap;
     
    4746    private static final boolean ENABLE_STATUS_LISTEN = true;
    4847
    49     public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
     48    public PacketQueue(I2PAppContext context, I2PSession session) {
    5049        _context = context;
    5150        _session = session;
    52         _connectionManager = mgr;
    5351        _log = context.logManager().getLog(PacketQueue.class);
    5452        _messageStatusMap = new ConcurrentHashMap<Long, Connection>(16);
     
    200198            packet.incrementSends();
    201199            Connection c = packet.getConnection();
    202             String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null);
    203             if (_log.shouldDebug())
    204                 _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix);
     200            if (c != null) {
     201                String suffix = "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO();
     202                c.getConnectionManager().getPacketHandler().displayPacket(packet, "SEND", suffix);
     203            }
    205204            if (I2PSocketManagerFull.pcapWriter != null &&
    206205                _context.getBooleanProperty(I2PSocketManagerFull.PROP_PCAP))
Note: See TracChangeset for help on using the changeset viewer.