Changeset cb979fb


Ignore:
Timestamp:
Nov 27, 2015 7:39:32 PM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
2849aec
Parents:
bafec180
Message:

Allow multiple simultaneous ACCEPT sockets.
Add support for parallel accepts in sink client

Location:
apps/sam/java/src/net/i2p/sam
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java

    rbafec180 rcb979fb  
    1515import java.net.NoRouteToHostException;
    1616import java.net.SocketTimeoutException;
     17import java.nio.channels.Channels;
     18import java.nio.channels.ReadableByteChannel;
     19import java.nio.channels.WritableByteChannel;
     20import java.nio.ByteBuffer;
     21import java.nio.channels.SocketChannel;
    1722import java.security.GeneralSecurityException;
    1823import java.util.Properties;
     24import java.util.concurrent.atomic.AtomicInteger;
    1925
    2026import javax.net.ssl.SSLException;
     
    3137import net.i2p.util.I2PSSLSocketFactory;
    3238import net.i2p.util.Log;
    33 import java.nio.channels.Channels;
    34 import java.nio.channels.ReadableByteChannel;
    35 import java.nio.channels.WritableByteChannel;
    36 import java.nio.ByteBuffer;
    37 import java.nio.channels.SocketChannel;
    3839
    3940/**
     
    4950               
    5051                private final Object socketServerLock = new Object();
     52                /** this is ONLY set for FORWARD, not for ACCEPT */
    5153                private I2PServerSocket socketServer;
     54                /** this is the count of active ACCEPT sockets */
     55                private final AtomicInteger _acceptors = new AtomicInteger();
     56
    5257                private static I2PSSLSocketFactory _sslSocketFactory;
    5358       
     
    155160            /**
    156161             * Accept a single incoming STREAM on the socket stolen from the handler.
     162             * As of version 3.2 (0.9.24), multiple simultaneous accepts are allowed.
     163             * Accepts and forwarding may not be done at the same time.
    157164             *
    158165             * @param handler The handler that communicates with the requesting client
     
    171178                throws I2PException, InterruptedIOException, IOException, SAMException {
    172179
    173                 synchronized( this.socketServerLock )
    174                 {
    175                         if (this.socketServer!=null) {
    176                                 if (_log.shouldLog(Log.DEBUG))
    177                                         _log.debug("a socket server is already defined for this destination");
    178                                 throw new SAMException("a socket server is already defined for this destination");
    179                         }
    180                         this.socketServer = this.socketMgr.getServerSocket();
    181                 }
    182                
    183                 I2PSocket i2ps = this.socketServer.accept();
    184 
    185                 synchronized( this.socketServerLock )
    186                 {
    187                         this.socketServer = null ;
    188                 }
    189                
     180                synchronized(this.socketServerLock) {
     181                        if (this.socketServer != null) {
     182                                if (_log.shouldWarn())
     183                                        _log.warn("a forwarding server is already defined for this destination");
     184                                throw new SAMException("a forwarding server is already defined for this destination");
     185                        }
     186                }
     187
     188                I2PSocket i2ps;
     189                _acceptors.incrementAndGet();
     190                try {
     191                        i2ps = socketMgr.getServerSocket().accept();
     192                } finally {
     193                        _acceptors.decrementAndGet();
     194                }
     195
    190196                SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    191197
     
    213219           
    214220            /**
    215              *  Forward sockets from I2P to the host/port provided
     221             *  Forward sockets from I2P to the host/port provided.
     222             *  Accepts and forwarding may not be done at the same time.
    216223             */
    217224            public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException
     
    237244                }
    238245                boolean isSSL = Boolean.parseBoolean(props.getProperty("SSL"));
    239                
    240                 synchronized( this.socketServerLock )
    241                 {
    242                         if (this.socketServer!=null) {
    243                                 if (_log.shouldLog(Log.DEBUG))
    244                                         _log.debug("a socket server is already defined for this destination");
    245                                 throw new SAMException("a socket server is already defined for this destination");
    246                         }
     246                if (_acceptors.get() > 0) {
     247                        if (_log.shouldWarn())
     248                                _log.warn("an accepting server is already defined for this destination");
     249                        throw new SAMException("an accepting server is already defined for this destination");
     250                }
     251                synchronized(this.socketServerLock) {
     252                        if (this.socketServer!=null) {
     253                                if (_log.shouldWarn())
     254                                        _log.warn("a forwarding server is already defined for this destination");
     255                                throw new SAMException("a forwarding server is already defined for this destination");
     256                        }
    247257                        this.socketServer = this.socketMgr.getServerSocket();
    248258                }
     
    428438            }
    429439           
    430             public I2PServerSocket getSocketServer()
     440            private I2PServerSocket getSocketServer()
    431441            {
    432442                synchronized ( this.socketServerLock ) {
  • apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java

    rbafec180 rcb979fb  
    169169            }
    170170            if (_isV3 && mode == STREAM) {
    171                 Socket sock2 = connect(isSSL);
    172                 out = sock2.getOutputStream();
    173                 eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
    174                 _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
    175                 _reader2.startReading();
    176                 if (_log.shouldLog(Log.DEBUG))
    177                     _log.debug("Reader2 created");
    178                 String ok = handshake(out, version, false, eventHandler, mode, user, password, "");
    179                 if (ok == null)
    180                     throw new IOException("2nd handshake failed");
    181                 if (_log.shouldLog(Log.DEBUG))
    182                     _log.debug("Handshake2 complete.");
     171                // test multiple acceptors, only works in 3.2
     172                int acceptors = isV32 ? 4 : 1;
     173                for (int i = 0; i < acceptors; i++) {
     174                    Socket sock2 = connect(isSSL);
     175                    out = sock2.getOutputStream();
     176                    eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out);
     177                    _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler);
     178                    _reader2.startReading();
     179                    if (_log.shouldLog(Log.DEBUG))
     180                        _log.debug("Reader " + (2 + i) + " created");
     181                    String ok = handshake(out, version, false, eventHandler, mode, user, password, "");
     182                    if (ok == null)
     183                        throw new IOException("handshake " + (2 + i) + " failed");
     184                    if (_log.shouldLog(Log.DEBUG))
     185                        _log.debug("Handshake " + (2 + i) + " complete.");
     186                }
    183187            } else if (_isV3 && (mode == DG || mode == RAW || mode == RAWHDR)) {
    184188                // set up a listening DatagramSocket
     
    623627           
    624628            File out = File.createTempFile("sink", ".dat", sinkDir);
     629            if (_log.shouldWarn())
     630                _log.warn("outputting to " + out);
    625631            _out = new FileOutputStream(out);
    626632            _started = _context.clock().now();
Note: See TracChangeset for help on using the changeset viewer.