Changeset d96ddd1


Ignore:
Timestamp:
Jun 3, 2015 11:42:54 AM (5 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
f57e37d
Parents:
7b711eb (diff), b5455cee (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p.zzz.sam' (head 68de14d0053dea374413f9e0419b1c0f7e9ec3af)

to branch 'i2p.i2p' (head 54f5dd288f7c0c5a50f7f63f911aec4008be27e2)

Files:
1 added
12 edited

Legend:

Unmodified
Added
Removed
  • apps/sam/java/src/net/i2p/sam/SAMBridge.java

    r7b711eb rd96ddd1  
    2020import java.nio.ByteBuffer;
    2121import java.util.Arrays;
     22import java.util.ArrayList;
    2223import java.util.HashMap;
     24import java.util.HashSet;
     25import java.util.List;
    2326import java.util.Map;
    2427import java.util.Properties;
     28import java.util.Set;
    2529
    2630import net.i2p.I2PAppContext;
     
    3135import net.i2p.util.I2PAppThread;
    3236import net.i2p.util.Log;
     37import net.i2p.util.PortMapper;
    3338
    3439/**
     
    5661     */
    5762    private final Map<String,String> nameToPrivKeys;
     63    private final Set<Handler> _handlers;
    5864
    5965    private volatile boolean acceptConnections = true;
     
    96102        persistFilename = options.keyFile;
    97103        nameToPrivKeys = new HashMap<String,String>(8);
     104        _handlers = new HashSet<Handler>(8);
    98105        this.i2cpProps = options.opts;
    99106        _state = INITIALIZED;
     
    125132        persistFilename = persistFile;
    126133        nameToPrivKeys = new HashMap<String,String>(8);
     134        _handlers = new HashSet<Handler>(8);
    127135        loadKeys();
    128136        try {
     
    210218   
    211219    /**
    212      * Load up the keys from the persistFilename
    213      *
     220     * Load up the keys from the persistFilename.
     221     * TODO use DataHelper
     222     * TODO store in config dir, not base dir
    214223     */
    215224    private void loadKeys() {
     
    219228            try {
    220229                br = new BufferedReader(new InputStreamReader(
    221                         new FileInputStream(persistFilename)));
     230                        new FileInputStream(persistFilename), "UTF-8"));
    222231                String line = null;
    223232                while ( (line = br.readLine()) != null) {
     
    227236                    nameToPrivKeys.put(name, privKeys);
    228237                }
     238                if (_log.shouldInfo())
     239                    _log.info("Loaded " + nameToPrivKeys.size() + " private keys from " + persistFilename);
    229240            } catch (FileNotFoundException fnfe) {
    230241                _log.warn("Key file does not exist at " + persistFilename);
     
    238249   
    239250    /**
    240      * Store the current keys to disk in the location specified on creation
    241      *
     251     * Store the current keys to disk in the location specified on creation.
     252     * TODO use DataHelper
     253     * TODO store in config dir, not base dir
    242254     */
    243255    private void storeKeys() {
     
    249261                    String name = entry.getKey();
    250262                    String privKeys = entry.getValue();
    251                     out.write(name.getBytes());
     263                    out.write(name.getBytes("UTF-8"));
    252264                    out.write('=');
    253                     out.write(privKeys.getBytes());
     265                    out.write(privKeys.getBytes("UTF-8"));
    254266                    out.write('\n');
    255267                }
     268                if (_log.shouldInfo())
     269                    _log.info("Saved " + nameToPrivKeys.size() + " private keys to " + persistFilename);
    256270            } catch (IOException ioe) {
    257271                _log.error("Error writing out the SAM keys to " + persistFilename, ioe);
     
    262276    }
    263277   
     278    /**
     279     * Handlers must call on startup
     280     * @since 0.9.20
     281     */
     282    public void register(Handler handler) {
     283        if (_log.shouldInfo())
     284            _log.info("Register " + handler);
     285        synchronized (_handlers) {
     286            _handlers.add(handler);
     287        }
     288    }
     289   
     290    /**
     291     * Handlers must call on stop
     292     * @since 0.9.20
     293     */
     294    public void unregister(Handler handler) {
     295        if (_log.shouldInfo())
     296            _log.info("Unregister " + handler);
     297        synchronized (_handlers) {
     298            _handlers.remove(handler);
     299        }
     300    }
     301
     302    /**
     303     * Stop all the handlers.
     304     * @since 0.9.20
     305     */
     306    private void stopHandlers() {
     307        List<Handler> handlers = null;
     308        synchronized (_handlers) {
     309            if (!_handlers.isEmpty()) {
     310                handlers = new ArrayList<Handler>(_handlers);
     311                _handlers.clear();
     312            }
     313        }
     314        if (handlers != null) {
     315            for (Handler handler : handlers) {
     316                if (_log.shouldInfo())
     317                    _log.info("Stopping " + handler);
     318                handler.stopHandling();
     319            }
     320        }
     321    }
     322
    264323    ////// begin ClientApp interface, use only if using correct construtor
    265324
     
    271330            return;
    272331        changeState(STARTING);
     332        synchronized (_handlers) {
     333            _handlers.clear();
     334        }
    273335        loadKeys();
    274336        try {
     
    286348
    287349    /**
    288      *  Does NOT stop existing sessions.
     350     *  As of 0.9.20, stops running handlers and sessions.
     351     *
    289352     *  @since 0.9.6
    290353     */
     
    294357        changeState(STOPPING);
    295358        acceptConnections = false;
     359        stopHandlers();
    296360        if (_runner != null)
    297361            _runner.interrupt();
    298362        else
    299363            changeState(STOPPED);
    300         // TODO does not stop active connections / sessions
    301364    }
    302365
     
    376439     */
    377440    private void startThread() {
    378         I2PAppThread t = new I2PAppThread(this, "SAMListener");
     441        I2PAppThread t = new I2PAppThread(this, "SAMListener " + _listenPort);
    379442        if (Boolean.parseBoolean(System.getProperty("sam.shutdownOnOOM"))) {
    380443            t.addOOMEventThreadListener(new I2PAppThread.OOMEventListener() {
     
    488551        if (_mgr != null)
    489552            _mgr.register(this);
     553        I2PAppContext.getGlobalContext().portMapper().register(PortMapper.SVC_SAM, _listenPort);
    490554        try {
    491555            while (acceptConnections) {
     
    496560                               + s.socket().getPort());
    497561
    498                 class HelloHandler implements Runnable {
    499                         private final SocketChannel s;
    500                         private final SAMBridge parent;
    501 
    502                         HelloHandler(SocketChannel s, SAMBridge parent) {
     562                class HelloHandler implements Runnable, Handler {
     563                    private final SocketChannel s;
     564                    private final SAMBridge parent;
     565
     566                    HelloHandler(SocketChannel s, SAMBridge parent) {
    503567                                this.s = s ;
    504568                                this.parent = parent ;
    505                         }
    506 
    507                         public void run() {
     569                    }
     570
     571                    public void run() {
     572                        parent.register(this);
    508573                        try {
    509                             SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps);
     574                            SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps, parent);
    510575                            if (handler == null) {
    511576                                if (_log.shouldLog(Log.DEBUG))
     
    516581                                return;
    517582                            }
    518                             handler.setBridge(parent);
    519583                            handler.startHandling();
    520584                        } catch (SAMException e) {
     
    527591                            try { s.close(); } catch (IOException ioe) {}
    528592                            _log.log(Log.CRIT, "Unexpected error handling SAM connection", ee);
    529                         }                               
    530                         }
     593                        } finally {
     594                            parent.unregister(this);
     595                        }
     596                    }
     597
     598                    /** @since 0.9.20 */
     599                    public void stopHandling() {
     600                        try { s.close(); } catch (IOException ioe) {}
     601                    }
    531602                }
    532                 // TODO: Handler threads are not saved or tracked and cannot be stopped
    533                 new I2PAppThread(new HelloHandler(s,this), "HelloHandler").start();
     603                new I2PAppThread(new HelloHandler(s,this), "SAM HelloHandler").start();
    534604            }
    535605            changeState(STOPPING);
     
    547617                    serverSocket.close();
    548618            } catch (IOException e) {}
     619            I2PAppContext.getGlobalContext().portMapper().unregister(PortMapper.SVC_SAM);
     620            stopHandlers();
    549621            changeState(STOPPED);
    550622        }
  • apps/sam/java/src/net/i2p/sam/SAMHandler.java

    r7b711eb rd96ddd1  
    2626 * @author human
    2727 */
    28 abstract class SAMHandler implements Runnable {
     28abstract class SAMHandler implements Runnable, Handler {
    2929
    3030    protected final Log _log;
    3131
    32     protected I2PAppThread thread = null;
    33     protected SAMBridge bridge = null;
     32    protected I2PAppThread thread;
     33    protected final SAMBridge bridge;
    3434
    3535    private final Object socketWLock = new Object(); // Guards writings on socket
     
    4242    protected final Properties i2cpProps;
    4343
    44     private final Object stopLock = new Object();
    45     private volatile boolean stopHandler;
     44    protected final Object stopLock = new Object();
     45    protected boolean stopHandler;
    4646
    4747    /**
     
    5454     * @throws IOException
    5555     */
    56     protected SAMHandler(SocketChannel s,
    57                          int verMajor, int verMinor, Properties i2cpProps) throws IOException {
     56    protected SAMHandler(SocketChannel s, int verMajor, int verMinor,
     57                         Properties i2cpProps, SAMBridge parent) throws IOException {
    5858        _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
    5959        socket = s;
     
    6262        this.verMinor = verMinor;
    6363        this.i2cpProps = i2cpProps;
     64        bridge = parent;
    6465    }
    6566
     
    6970     */
    7071    public final void startHandling() {
    71         thread = new I2PAppThread(this, "SAMHandler");
     72        thread = new I2PAppThread(this, getClass().getSimpleName());
    7273        thread.start();
    7374    }
    74    
    75     public void setBridge(SAMBridge bridge) { this.bridge = bridge; }
    76    
     75
    7776    /**
    7877     * Actually handle the SAM protocol.
     
    8281
    8382    /**
    84      * Get the input stream of the socket connected to the SAM client
    85      *
    86      * @return input stream
    87      * @throws IOException
     83     * Get the channel of the socket connected to the SAM client
     84     *
     85     * @return channel
    8886     */
    8987    protected final SocketChannel getClientSocket() {
     
    157155
    158156    /**
    159      * Stop the SAM handler
    160      *
    161      */
    162     public final void stopHandling() {
     157     * Stop the SAM handler, close the client socket,
     158     * unregister with the bridge.
     159     */
     160    public void stopHandling() {
    163161        synchronized (stopLock) {
    164162            stopHandler = true;
    165163        }
     164        try {
     165            closeClientSocket();
     166        } catch (IOException e) {}
     167        bridge.unregister(this);
    166168    }
    167169
     
    184186    @Override
    185187    public final String toString() {
    186         return ("SAM handler (class: " + this.getClass().getName()
     188        return (this.getClass().getSimpleName()
    187189                + "; SAM version: " + verMajor + "." + verMinor
    188190                + "; client: "
     
    191193    }
    192194
     195    /**
     196     * Register with the bridge, call handle(),
     197     * unregister with the bridge.
     198     */
    193199    public final void run() {
    194         handle();
     200        bridge.register(this);
     201        try {
     202            handle();
     203        } finally {
     204            bridge.unregister(this);
     205        }
    195206    }
    196207}
  • apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java

    r7b711eb rd96ddd1  
    3939     * @return A SAM protocol handler, or null if the client closed before the handshake
    4040     */
    41     public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException {
     41    public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps,
     42                                              SAMBridge parent) throws SAMException {
    4243        StringTokenizer tok;
    4344        Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMHandlerFactory.class);
     
    4647            Socket sock = s.socket();
    4748            sock.setSoTimeout(HELLO_TIMEOUT);
     49            sock.setKeepAlive(true);
    4850            String line = DataHelper.readLine(sock.getInputStream());
    4951            sock.setSoTimeout(0);
     
    104106            switch (verMajor) {
    105107            case 1:
    106                 handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps);
     108                handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps, parent);
    107109                break;
    108110            case 2:
    109                 handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps);
     111                handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps, parent);
    110112                break;
    111113            case 3:
    112                 handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps);
     114                handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps, parent);
    113115                break;
    114116            default:
  • apps/sam/java/src/net/i2p/sam/SAMMessageSession.java

    r7b711eb rd96ddd1  
    7373        handler = new SAMMessageSessionHandler(destStream, props);
    7474
     75        // FIXME don't start threads in constructors
    7576        Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
    7677        t.start();
     
    126127    /**
    127128     * Close a SAM message-based session.
    128      *
    129129     */
    130130    public void close() {
  • apps/sam/java/src/net/i2p/sam/SAMUtils.java

    r7b711eb rd96ddd1  
    213213
    214214    /* Dump a Properties object in an human-readable form */
     215/****
    215216    private static String dumpProperties(Properties props) {
    216217        StringBuilder builder = new StringBuilder();
     
    232233        return builder.toString();
    233234    }
     235****/
    234236   
    235237/****
  • apps/sam/java/src/net/i2p/sam/SAMv1Handler.java

    r7b711eb rd96ddd1  
    6161     * @throws IOException
    6262     */
    63     public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException {
    64         this(s, verMajor, verMinor, new Properties());
     63    public SAMv1Handler(SocketChannel s, int verMajor, int verMinor,
     64                        SAMBridge parent) throws SAMException, IOException {
     65        this(s, verMajor, verMinor, new Properties(), parent);
    6566    }
    6667    /**
     
    7677     * @throws IOException
    7778     */
    78     public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
    79         super(s, verMajor, verMinor, i2cpProps);
     79    public SAMv1Handler(SocketChannel s, int verMajor, int verMinor,
     80                        Properties i2cpProps, SAMBridge parent) throws SAMException, IOException {
     81        super(s, verMajor, verMinor, i2cpProps, parent);
    8082        _id = __id.incrementAndGet();
    8183        if (_log.shouldLog(Log.DEBUG))
    8284            _log.debug("SAM version 1 handler instantiated");
    8385
    84     if ( ! verifVersion() ) {
     86        if ( ! verifVersion() ) {
    8587            throw new SAMException("BUG! Wrong protocol version!");
    8688        }
     
    184186        } catch (IOException e) {
    185187            if (_log.shouldLog(Log.DEBUG))
    186                 _log.debug("Caught IOException ("
    187                        + e.getMessage() + ") for message [" + msg + "]", e);
     188                _log.debug("Caught IOException for message [" + msg + "]", e);
    188189        } catch (Exception e) {
    189190            _log.error("Unexpected exception for message [" + msg + "]", e);
     
    194195                closeClientSocket();
    195196            } catch (IOException e) {
    196                 _log.error("Error closing socket: " + e.getMessage());
     197                if (_log.shouldWarn())
     198                    _log.warn("Error closing socket", e);
    197199            }
    198200            if (getRawSession() != null) {
     
    798800        if (getRawSession() == null) {
    799801            _log.error("BUG! Received raw bytes, but session is null!");
    800             throw new NullPointerException("BUG! RAW session is null!");
     802            return;
    801803        }
    802804
     
    819821        if (getRawSession() == null) {
    820822            _log.error("BUG! Got raw receiving stop, but session is null!");
    821             throw new NullPointerException("BUG! RAW session is null!");
     823            return;
    822824        }
    823825
     
    834836        if (getDatagramSession() == null) {
    835837            _log.error("BUG! Received datagram bytes, but session is null!");
    836             throw new NullPointerException("BUG! DATAGRAM session is null!");
     838            return;
    837839        }
    838840
     
    856858        if (getDatagramSession() == null) {
    857859            _log.error("BUG! Got datagram receiving stop, but session is null!");
    858             throw new NullPointerException("BUG! DATAGRAM session is null!");
     860            return;
    859861        }
    860862
     
    874876        {
    875877            _log.error ( "BUG! Want to answer to stream SEND, but session is null!" );
    876             throw new NullPointerException ( "BUG! STREAM session is null!" );
     878            return;
    877879        }
    878880   
     
    892894        {
    893895            _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" );
    894             throw new NullPointerException ( "BUG! STREAM session is null!" );
     896            return;
    895897        }
    896898   
     
    905907        if (getStreamSession() == null) {
    906908            _log.error("BUG! Received stream connection, but session is null!");
    907             throw new NullPointerException("BUG! STREAM session is null!");
     909            return;
    908910        }
    909911
     
    915917    }
    916918
     919    /** @param msg may be null */
    917920    public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException
    918921    {
     
    920923        {
    921924            _log.error ( "BUG! Received stream connection, but session is null!" );
    922             throw new NullPointerException ( "BUG! STREAM session is null!" );
    923         }
    924 
    925         String msgString = "" ;
    926 
    927         if ( msg != null ) msgString = " MESSAGE=\"" + msg + "\"";
    928 
     925            return;
     926        }
     927
     928        String msgString = createMessageString(msg);
    929929        if ( !writeString ( "STREAM STATUS RESULT="
    930930                        + result
     
    936936        }
    937937    }
     938
     939    /**
     940     *  Create a string to be appended to a status.
     941     *
     942     *  @param msg may be null
     943     *  @return non-null, "" if msg is null, MESSAGE=msg or MESSAGE="msg a b c"
     944     *           with leading space if msg is non-null
     945     *  @since 0.9.20
     946     */
     947    protected static String createMessageString(String msg) {
     948        String rv;
     949        if ( msg != null ) {
     950            msg = msg.replace("\n", " ");
     951            msg = msg.replace("\r", " ");
     952            if (!msg.startsWith("\"")) {
     953                msg = msg.replace("\"", "");
     954                if (msg.contains("\"") || msg.contains("\t"))
     955                    msg = '"' + msg + '"';
     956            }
     957            rv = " MESSAGE=\"" + msg + "\"";
     958        } else {
     959            rv = "";
     960        }
     961        return rv;
     962    }
    938963 
    939964    public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
    940965        if (getStreamSession() == null) {
    941966            _log.error("Received stream bytes, but session is null!");
    942             throw new NullPointerException("BUG! STREAM session is null!");
     967            return;
    943968        }
    944969
     
    957982    }
    958983
     984    /** @param msg may be null */
    959985    public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
    960986        if (getStreamSession() == null) {
    961987            _log.error("BUG! Received stream disconnection, but session is null!");
    962             throw new NullPointerException("BUG! STREAM session is null!");
    963         }
    964 
    965         // FIXME: msg should be escaped!
    966         if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result
    967                          + (msg == null ? "" : (" MESSAGE=" + msg))
    968                          + "\n")) {
     988            return;
     989        }
     990
     991        String msgString = createMessageString(msg);
     992        if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result + msgString + '\n')) {
    969993            throw new IOException("Error notifying disconnection to SAM client");
    970994        }
     
    9771001        if (getStreamSession() == null) {
    9781002            _log.error("BUG! Got stream receiving stop, but session is null!");
    979             throw new NullPointerException("BUG! STREAM session is null!");
     1003            return;
    9801004        }
    9811005
  • apps/sam/java/src/net/i2p/sam/SAMv2Handler.java

    r7b711eb rd96ddd1  
    3535                 * @param verMinor SAM minor version to manage
    3636                 */
    37                 public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
     37                public SAMv2Handler(SocketChannel s, int verMajor, int verMinor,
     38                                     SAMBridge parent) throws SAMException, IOException
    3839                {
    39                         this ( s, verMajor, verMinor, new Properties() );
     40                        this(s, verMajor, verMinor, new Properties(), parent);
    4041                }
    4142
     
    5152                 */
    5253
    53                 public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
     54                public SAMv2Handler(SocketChannel s, int verMajor, int verMinor,
     55                                    Properties i2cpProps, SAMBridge parent) throws SAMException, IOException
    5456                {
    55                         super ( s, verMajor, verMinor, i2cpProps );
     57                        super(s, verMajor, verMinor, i2cpProps, parent);
    5658                }
    5759               
  • apps/sam/java/src/net/i2p/sam/SAMv3Handler.java

    r7b711eb rd96ddd1  
    4949        private Session session;
    5050        public static final SessionsDB sSessionsHash = new SessionsDB();
    51         private boolean stolenSocket;
    52         private boolean streamForwardingSocket;
     51        private volatile boolean stolenSocket;
     52        private volatile boolean streamForwardingSocket;
    5353
    5454       
     
    6868         * @param verMinor SAM minor version to manage
    6969         */
    70         public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
    71         {
    72                 this ( s, verMajor, verMinor, new Properties() );
     70        public SAMv3Handler(SocketChannel s, int verMajor, int verMinor,
     71                            SAMBridge parent) throws SAMException, IOException
     72        {
     73                this(s, verMajor, verMinor, new Properties(), parent);
    7374        }
    7475
     
    8485         */
    8586
    86         public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
    87         {
    88                 super ( s, verMajor, verMinor, i2cpProps );
     87        public SAMv3Handler(SocketChannel s, int verMajor, int verMinor,
     88                            Properties i2cpProps, SAMBridge parent) throws SAMException, IOException
     89        {
     90                super(s, verMajor, verMinor, i2cpProps, parent);
    8991                if (_log.shouldLog(Log.DEBUG))
    9092                        _log.debug("SAM version 3 handler instantiated");
     
    215217        }
    216218       
     219        /**
     220         *  The values in the SessionsDB
     221         */
    217222        public static class SessionRecord
    218223        {
     
    267272        }
    268273
     274        /**
     275         *  basically a HashMap from String to SessionRecord
     276         */
    269277        public static class SessionsDB
    270278        {
    271279                private static final long serialVersionUID = 0x1;
    272280
    273                 static class ExistingIdException   extends Exception {
     281                static class ExistingIdException extends Exception {
    274282                        private static final long serialVersionUID = 0x1;
    275283                }
     
    285293                }
    286294
     295                /** @return success */
    287296                synchronized public boolean put( String nick, SessionRecord session )
    288297                        throws ExistingIdException, ExistingDestException
     
    306315                }
    307316
     317                /** @return true if removed */
    308318                synchronized public boolean del( String nick )
    309319                {
    310                         SessionRecord rec = map.get(nick);
    311                        
    312                         if ( rec!=null ) {
    313                                 map.remove(nick);
    314                                 return true ;
    315                         }
    316                         else
    317                                 return false ;
    318                 }
     320                        return map.remove(nick) != null;
     321                }
     322
    319323                synchronized public SessionRecord get(String nick)
    320324                {
     
    333337        }
    334338       
     339        /**
     340         *  For SAMv3StreamSession connect and accept
     341         */
    335342        public void stealSocket()
    336343        {
    337344                stolenSocket = true ;
    338345                this.stopHandling();
     346        }
     347       
     348        /**
     349         *  For SAMv3StreamSession
     350         *  @since 0.9.20
     351         */
     352        SAMBridge getBridge() {
     353                return bridge;
    339354        }
    340355       
     
    349364                this.thread.setName("SAMv3Handler " + _id);
    350365                if (_log.shouldLog(Log.DEBUG))
    351                         _log.debug("SAM handling started");
     366                        _log.debug("SAMv3 handling started");
    352367
    353368                try {
     
    423438                } catch (IOException e) {
    424439                        if (_log.shouldLog(Log.DEBUG))
    425                                 _log.debug("Caught IOException ("
    426                                         + e.getMessage() + ") for message [" + msg + "]", e);
     440                                _log.debug("Caught IOException for message [" + msg + "]", e);
    427441                } catch (Exception e) {
    428442                        _log.error("Unexpected exception for message [" + msg + "]", e);
     
    436450                                        closeClientSocket();
    437451                                } catch (IOException e) {
    438                                         _log.error("Error closing socket: " + e.getMessage());
     452                                        if (_log.shouldWarn())
     453                                                _log.warn("Error closing socket", e);
    439454                                }
    440455                        }
     
    445460                                                ((SAMv3StreamSession)streamSession).stopForwardingIncoming();
    446461                                        } catch (SAMException e) {
    447                                                 _log.error("Error while stopping forwarding connections: " + e.getMessage());
     462                                                if (_log.shouldWarn())
     463                                                        _log.warn("Error while stopping forwarding connections", e);
    448464                                        } catch (InterruptedIOException e) {
    449                                                 _log.error("Interrupted while stopping forwarding connections: " + e.getMessage());
     465                                                if (_log.shouldWarn())
     466                                                        _log.warn("Interrupted while stopping forwarding connections", e);
    450467                                        }
    451468                                }
    452469                        }
    453                
    454 
    455 
    456470                        die();
    457471                }
    458472        }
    459473
    460         protected void die() {
     474        /**
     475         * Stop the SAM handler, close the socket,
     476         * unregister with the bridge.
     477         *
     478         * Overridden to not close the client socket if stolen.
     479         *
     480         * @since 0.9.20
     481         */
     482        @Override
     483        public void stopHandling() {
     484            synchronized (stopLock) {
     485                stopHandler = true;
     486            }
     487            if (!stolenSocket) {
     488                try {
     489                    closeClientSocket();
     490                } catch (IOException e) {}
     491            }
     492            bridge.unregister(this);
     493        }
     494
     495        private void die() {
    461496                SessionRecord rec = null ;
    462497               
     
    814849       
    815850
    816         public void notifyStreamResult(boolean verbose, String result, String message) throws IOException
    817     {
     851        public void notifyStreamResult(boolean verbose, String result, String message) throws IOException {
    818852                if (!verbose) return ;
    819                
    820                 String out = "STREAM STATUS RESULT="+result;
    821                 if (message!=null)
    822                         out = out + " MESSAGE=\"" + message + "\"";
    823                 out = out + '\n';
     853                String msgString = createMessageString(message);
     854                String out = "STREAM STATUS RESULT=" + result + msgString + '\n';
    824855       
    825         if ( !writeString ( out ) )
    826         {
    827             throw new IOException ( "Error notifying connection to SAM client" );
    828         }
    829     }
     856                if (!writeString(out)) {
     857                        throw new IOException ( "Error notifying connection to SAM client" );
     858                }
     859        }
    830860
    831861        public void notifyStreamIncomingConnection(Destination d) throws IOException {
  • apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java

    r7b711eb rd96ddd1  
    7575            /**
    7676             * Connect the SAM STREAM session to the specified Destination
     77             * for a single connection, using the socket stolen from the handler.
    7778             *
    7879             * @param handler The handler that communicates with the requesting client
     
    8889             */
    8990            public void connect ( SAMv3Handler handler, String dest, Properties props )
    90             throws I2PException, ConnectException, NoRouteToHostException,
     91                throws I2PException, ConnectException, NoRouteToHostException,
    9192                        DataFormatException, InterruptedIOException, IOException {
    9293
     
    118119                WritableByteChannel toI2P      = Channels.newChannel(i2ps.getOutputStream());
    119120               
    120                 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start();
    121                 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start();
    122                
     121                SAMBridge bridge = handler.getBridge();
     122                (new Thread(rec.getThreadGroup(),
     123                            new Pipe(fromClient, toI2P, bridge),
     124                            "ConnectV3 SAMPipeClientToI2P")).start();
     125                (new Thread(rec.getThreadGroup(),
     126                            new Pipe(fromI2P, toClient, bridge),
     127                            "ConnectV3 SAMPipeI2PToClient")).start();
    123128            }
    124129
    125130            /**
    126              * Accept an incoming STREAM
     131             * Accept a single incoming STREAM on the socket stolen from the handler.
    127132             *
    128133             * @param handler The handler that communicates with the requesting client
     
    151156                }
    152157               
    153                         I2PSocket i2ps;
    154                         i2ps = this.socketServer.accept();
     158                I2PSocket i2ps = this.socketServer.accept();
    155159
    156160                synchronized( this.socketServerLock )
     
    160164               
    161165                SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
    162                
    163                 if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
    164                
    165                         if (verbose)
    166                                 handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
     166
     167                if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
     168
     169                if (verbose)
     170                        handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
    167171
    168172                handler.stealSocket() ;
     
    172176                WritableByteChannel toI2P      = Channels.newChannel(i2ps.getOutputStream());
    173177               
    174                 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start();
    175                 (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start();             
     178                SAMBridge bridge = handler.getBridge();
     179                (new Thread(rec.getThreadGroup(),
     180                            new Pipe(fromClient, toI2P, bridge),
     181                            "AcceptV3 SAMPipeClientToI2P")).start();
     182                (new Thread(rec.getThreadGroup(),
     183                            new Pipe(fromI2P, toClient, bridge),
     184                            "AcceptV3 SAMPipeI2PToClient")).start();
    176185            }
    177186
     
    211220               
    212221                SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose);
    213                 (new Thread(rec.getThreadGroup(), new I2PAppThread(forwarder, "SAMStreamForwarder"), "SAMStreamForwarder")).start();
     222                (new Thread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
    214223            }
    215224           
    216             private static class SocketForwarder extends Thread
     225            private static class SocketForwarder implements Runnable
    217226            {
    218227                private final String host;
     
    255264                                // build pipes between both sockets
    256265                                try {
     266                                        clientServerSock.socket().setKeepAlive(true);
    257267                                        if (this.verbose)
    258268                                                SAMv3Handler.notifyStreamIncomingConnection(
     
    262272                                        WritableByteChannel toClient   = clientServerSock ;
    263273                                        WritableByteChannel toI2P      = Channels.newChannel(i2ps.getOutputStream());
    264                                         (new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start();
    265                                         (new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start();
     274                                        (new I2PAppThread(new Pipe(fromClient, toI2P, null),
     275                                                          "ForwardV3 SAMPipeClientToI2P")).start();
     276                                        (new I2PAppThread(new Pipe(fromI2P,toClient, null),
     277                                                          "ForwardV3 SAMPipeI2PToClient")).start();
    266278
    267279                                } catch (IOException e) {
     
    278290            }
    279291
    280             private static class Pipe extends Thread
     292            private static class Pipe implements Runnable, Handler
    281293            {
    282294                private final ReadableByteChannel in  ;
    283295                private final WritableByteChannel out ;
    284296                private final ByteBuffer buf ;
    285                
    286                 public Pipe(ReadableByteChannel in, WritableByteChannel out, String name)
    287                 {
    288                         super(name);
     297                private final SAMBridge bridge;
     298               
     299                /**
     300                 *  @param bridge may be null
     301                 */
     302                public Pipe(ReadableByteChannel in, WritableByteChannel out, SAMBridge bridge)
     303                {
    289304                        this.in  = in ;
    290305                        this.out = out ;
    291306                        this.buf = ByteBuffer.allocate(BUFFER_SIZE) ;
    292                 }
    293                
    294                 public void run()
    295                 {
    296                         try {
    297                                 while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) {
    298                                          buf.flip();
    299                                          out.write(buf);
    300                                          buf.compact();
    301                                 }
    302                         }
    303                                 catch (IOException e)
    304                                 {
    305                                         this.interrupt();
     307                        this.bridge = bridge;
     308                }
     309               
     310                public void run() {
     311                        if (bridge != null)
     312                                bridge.register(this);
     313                        try {
     314                                while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) {
     315                                         buf.flip();
     316                                         out.write(buf);
     317                                         buf.compact();
    306318                                }
    307                         try {
    308                                 in.close();
    309                         }
    310                         catch (IOException e) {}
    311                         try {
    312                                 buf.flip();
    313                                 while (buf.hasRemaining())
    314                                         out.write(buf);
    315                         }
    316                         catch (IOException e) {}
    317                         try {
    318                                 out.close();
    319                         }
    320                         catch (IOException e) {}
    321                 }
     319                        } catch (IOException ioe) {
     320                                // ignore
     321                        } finally {
     322                                try {
     323                                        in.close();
     324                                } catch (IOException e) {}
     325                                try {
     326                                        buf.flip();
     327                                        while (buf.hasRemaining()) {
     328                                                out.write(buf);
     329                                        }
     330                                } catch (IOException e) {}
     331                                try {
     332                                        out.close();
     333                                } catch (IOException e) {}
     334                                if (bridge != null)
     335                                        bridge.unregister(this);
     336                        }
     337                }
     338
     339                /**
     340                 *  Handler interface
     341                 *  @since 0.9.20
     342                 */
     343                public void stopHandling() {
     344                        try {
     345                                in.close();
     346                        } catch (IOException e) {}
     347                }
    322348            }
    323349           
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    r7b711eb rd96ddd1  
    494494                            I2PSSLSocketFactory fact = new I2PSSLSocketFactory(_context, false, "certificates/i2cp");
    495495                            _socket = fact.createSocket(_hostname, _portNum);
     496                            _socket.setKeepAlive(true);
    496497                        } catch (GeneralSecurityException gse) {
    497498                            IOException ioe = new IOException("SSL Fail");
     
    501502                    } else {
    502503                        _socket = new Socket(_hostname, _portNum);
     504                        _socket.setKeepAlive(true);
    503505                    }
    504506                    // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
  • core/java/src/net/i2p/client/I2PSimpleSession.java

    r7b711eb rd96ddd1  
    9090                        _socket = new Socket(_hostname, _portNum);
    9191                    }
     92                    _socket.setKeepAlive(true);
    9293                    OutputStream out = _socket.getOutputStream();
    9394                    out.write(I2PClient.PROTOCOL_BYTE);
  • router/java/src/net/i2p/router/client/ClientListenerRunner.java

    r7b711eb rd96ddd1  
    9494                            if (_log.shouldLog(Log.DEBUG))
    9595                                _log.debug("Connection received");
     96                            socket.setKeepAlive(true);
    9697                            runConnection(socket);
    9798                        } else {
Note: See TracChangeset for help on using the changeset viewer.