Changeset 6d06427


Ignore:
Timestamp:
Aug 20, 2011 8:23:27 PM (9 years ago)
Author:
zzz <zzz@…>
Branches:
master
Children:
3849a96
Parents:
67d608a (diff), 3834403 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

propagate from branch 'i2p.i2p' (head 793ca7c46f5d8b51c5880fc538dea7874e62f63b)

to branch 'i2p.i2p.zzz.test' (head d39f17fe601b6ae514111b07092de820668015d7)

Files:
8 added
41 edited

Legend:

Unmodified
Added
Removed
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java

    r67d608a r6d06427  
    5050import java.util.Set;
    5151import java.util.StringTokenizer;
     52import java.util.concurrent.CopyOnWriteArrayList;
    5253import java.util.concurrent.CopyOnWriteArraySet;
    5354
     
    7172
    7273/**
     74 *  An I2PTunnel tracks one or more I2PTunnelTasks and one or more I2PSessions.
     75 *  Usually one of each.
     76 *
    7377 *  Todo: Most events are not listened to elsewhere, so error propagation is poor
    7478 */
    75 public class I2PTunnel implements Logging, EventDispatcher {
     79public class I2PTunnel extends EventDispatcherImpl implements Logging {
    7680    private final Log _log;
    77     private final EventDispatcherImpl _event;
    7881    private final I2PAppContext _context;
    7982    private static long __tunnelId = 0;
    8083    private final long _tunnelId;
    8184    private final Properties _clientOptions;
    82     private final List<I2PSession> _sessions;
     85    private final Set<I2PSession> _sessions;
    8386
    8487    public static final int PACKET_DELAY = 100;
     
    97100    private static final String nocli_args[] = { "-nocli", "-die"};
    98101
    99     private final List tasks = new ArrayList();
     102    private final List<I2PTunnelTask> tasks = new ArrayList();
    100103    private int next_task_id = 1;
    101104
     
    115118
    116119    public I2PTunnel(String[] args, ConnectionEventListener lsnr) {
     120        super();
    117121        _context = I2PAppContext.getGlobalContext(); // new I2PAppContext();
    118122        _tunnelId = ++__tunnelId;
    119123        _log = _context.logManager().getLog(I2PTunnel.class);
    120         _event = new EventDispatcherImpl();
    121124        // as of 0.8.4, include context properties
    122125        Properties p = _context.getProperties();
    123126        _clientOptions = p;
    124         _sessions = new ArrayList(1);
     127        _sessions = new CopyOnWriteArraySet();
    125128       
    126129        addConnectionEventListener(lsnr);
     
    194197    /** @return non-null */
    195198    List<I2PSession> getSessions() {
    196         synchronized (_sessions) {
    197199            return new ArrayList(_sessions);
    198         }
    199     }
     200    }
     201
    200202    void addSession(I2PSession session) {
    201203        if (session == null) return;
    202         synchronized (_sessions) {
    203             if (!_sessions.contains(session))
    204                 _sessions.add(session);
    205         }
    206     }
     204        _sessions.add(session);
     205    }
     206
    207207    void removeSession(I2PSession session) {
    208208        if (session == null) return;
    209         synchronized (_sessions) {
    210             _sessions.remove(session);
    211         }
     209        _sessions.remove(session);
    212210    }
    213211   
     
    219217            tsk.setId(next_task_id);
    220218            next_task_id++;
    221             synchronized (tasks) {
    222                 tasks.add(tsk);
    223             }
     219            tasks.add(tsk);
    224220        }
    225221    }
     
    12621258    public void runQuit(Logging l) {
    12631259        purgetasks(l);
    1264         synchronized (tasks) {
    1265             if (tasks.isEmpty()) {
    1266                 System.exit(0);
    1267             }
     1260        if (tasks.isEmpty()) {
     1261            System.exit(0);
    12681262        }
    12691263        l.log("There are running tasks. Try 'list'.");
     
    12811275    public void runList(Logging l) {
    12821276        purgetasks(l);
    1283         synchronized (tasks) {
    1284             for (int i = 0; i < tasks.size(); i++) {
    1285                 I2PTunnelTask t = (I2PTunnelTask) tasks.get(i);
    1286                 l.log("[" + t.getId() + "] " + t.toString());
    1287             }
     1277        for (I2PTunnelTask t : tasks) {
     1278            l.log("[" + t.getId() + "] " + t.toString());
    12881279        }
    12891280        notifyEvent("listDone", "done");
     
    13141305            }
    13151306            if (args[argindex].equalsIgnoreCase("all")) {
    1316                 List curTasks = null;
    1317                 synchronized (tasks) {
    1318                     curTasks = new LinkedList(tasks);
    1319                 }
    1320 
    13211307                boolean error = false;
    1322                 for (int i = 0; i < curTasks.size(); i++) {
    1323                     I2PTunnelTask t = (I2PTunnelTask) curTasks.get(i);
     1308                for (I2PTunnelTask t : tasks) {
    13241309                    if (!closetask(t, forced, l)) {
    13251310                        notifyEvent("closeResult", "error");
     
    14431428
    14441429        _log.debug(getPrefix() + "closetask(): looking for task " + num);
    1445         synchronized (tasks) {
    1446             for (Iterator it = tasks.iterator(); it.hasNext();) {
    1447                 I2PTunnelTask t = (I2PTunnelTask) it.next();
     1430            for (I2PTunnelTask t : tasks) {
    14481431                int id = t.getId();
    14491432                if (_log.shouldLog(Log.DEBUG))
     
    14551438                    break;
    14561439                }
    1457             }
    14581440        }
    14591441        return closed;
     
    14831465     */
    14841466    private void purgetasks(Logging l) {
    1485         synchronized (tasks) {
    1486             for (Iterator it = tasks.iterator(); it.hasNext();) {
    1487                 I2PTunnelTask t = (I2PTunnelTask) it.next();
     1467            List<I2PTunnelTask> removed = new ArrayList();
     1468            for (I2PTunnelTask t : tasks) {
    14881469                if (!t.isOpen()) {
    14891470                    _log.debug(getPrefix() + "Purging inactive tunnel: [" + t.getId() + "] " + t.toString());
    1490                     it.remove();
     1471                    removed.add(t);
    14911472                }
    14921473            }
    1493         }
     1474            tasks.removeAll(removed);
    14941475    }
    14951476
     
    16581639        public void routerDisconnected();
    16591640    }
    1660 
    1661     /* Required by the EventDispatcher interface */
    1662     public EventDispatcher getEventDispatcher() {
    1663         return _event;
    1664     }
    1665 
    1666     public void attachEventDispatcher(EventDispatcher e) {
    1667         _event.attachEventDispatcher(e.getEventDispatcher());
    1668     }
    1669 
    1670     public void detachEventDispatcher(EventDispatcher e) {
    1671         _event.detachEventDispatcher(e.getEventDispatcher());
    1672     }
    1673 
    1674     public void notifyEvent(String e, Object a) {
    1675         _event.notifyEvent(e, a);
    1676     }
    1677 
    1678     public Object getEventValue(String n) {
    1679         return _event.getEventValue(n);
    1680     }
    1681 
    1682     public Set getEvents() {
    1683         return _event.getEvents();
    1684     }
    1685 
    1686     public void ignoreEvents() {
    1687         _event.ignoreEvents();
    1688     }
    1689 
    1690     public void unIgnoreEvents() {
    1691         _event.unIgnoreEvents();
    1692     }
    1693 
    1694     public Object waitEventValue(String n) {
    1695         return _event.waitEventValue(n);
    1696     }
    16971641}
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java

    r67d608a r6d06427  
    5252    protected final Object sockLock = new Object(); // Guards sockMgr and mySockets
    5353    protected I2PSocketManager sockMgr; // should be final and use a factory. LINT
    54     protected List mySockets = new ArrayList();
     54    protected final List<I2PSocket> mySockets = new ArrayList();
    5555    protected boolean _ownDest;
    5656
     
    6060    private boolean listenerReady = false;
    6161
    62     private ServerSocket ss;
     62    protected ServerSocket ss;
    6363
    6464    private final Object startLock = new Object();
     
    197197        // no need to load the netDb with leaseSets for destinations that will never
    198198        // be looked up
    199         tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true");
     199        boolean dccEnabled = (this instanceof I2PTunnelIRCClient) &&
     200                      Boolean.valueOf(tunnel.getClientOptions().getProperty(I2PTunnelIRCClient.PROP_DCC)).booleanValue();
     201        if (!dccEnabled)
     202            tunnel.getClientOptions().setProperty("i2cp.dontPublishLeaseSet", "true");
    200203       
    201204        boolean openNow = !Boolean.valueOf(tunnel.getClientOptions().getProperty("i2cp.delayOpen")).booleanValue();
     
    684687            if (sockMgr != null) {
    685688                mySockets.retainAll(sockMgr.listSockets());
    686                 if (!forced && mySockets.size() != 0) {
    687                     l.log("There are still active connections!");
     689                if ((!forced) && (!mySockets.isEmpty())) {
     690                    l.log("Not closing, there are still active connections!");
    688691                    _log.debug("can't close: there are still active connections!");
    689                     for (Iterator it = mySockets.iterator(); it.hasNext();) {
    690                         l.log("->" + it.next());
     692                    for (I2PSocket s : mySockets) {
     693                        l.log("  -> " + s.toString());
    691694                    }
    692695                    return false;
     
    704707                if (ss != null) ss.close();
    705708            } catch (IOException ex) {
    706                 ex.printStackTrace();
     709                if (_log.shouldLog(Log.WARN))
     710                    _log.warn("error closing", ex);
    707711                return false;
    708712            }
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java

    r67d608a r6d06427  
    2626public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
    2727    private Log _log;
    28     public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) {
     28    public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
     29                                     List<I2PSocket> sockList, Runnable onTimeout) {
    2930        super(s, i2ps, slock, initialI2PData, sockList, onTimeout);
    3031        _log = I2PAppContext.getGlobalContext().logManager().getLog(I2PTunnelHTTPClientRunner.class);
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelIRCClient.java

    r67d608a r6d06427  
    1111
    1212import net.i2p.client.streaming.I2PSocket;
     13import net.i2p.data.Base32;
    1314import net.i2p.data.Destination;
     15import net.i2p.i2ptunnel.irc.DCCClientManager;
     16import net.i2p.i2ptunnel.irc.DCCHelper;
     17import net.i2p.i2ptunnel.irc.I2PTunnelDCCServer;
     18import net.i2p.i2ptunnel.irc.IrcInboundFilter;
     19import net.i2p.i2ptunnel.irc.IrcOutboundFilter;
    1420import net.i2p.util.EventDispatcher;
    1521import net.i2p.util.I2PAppThread;
     
    1925 * Todo: Can we extend I2PTunnelClient instead and remove some duplicated code?
    2026 */
    21 public class I2PTunnelIRCClient extends I2PTunnelClientBase implements Runnable {
     27public class I2PTunnelIRCClient extends I2PTunnelClientBase {
    2228
    2329    /** used to assign unique IDs to the threads / clients.  no logic or functionality */
     
    2834    private static final long DEFAULT_READ_TIMEOUT = 5*60*1000; // -1
    2935    protected long readTimeout = DEFAULT_READ_TIMEOUT;
     36    private final boolean _dccEnabled;
     37    private I2PTunnelDCCServer _DCCServer;
     38    private DCCClientManager _DCCClientManager;
     39
     40    /**
     41     *  @since 0.8.9
     42     */
     43    public static final String PROP_DCC = "i2ptunnel.ircclient.enableDCC";
    3044
    3145    /**
     
    7690        setName("IRC Client on " + tunnel.listenHost + ':' + localPort);
    7791
     92        _dccEnabled = Boolean.valueOf(tunnel.getClientOptions().getProperty(PROP_DCC)).booleanValue();
     93        // TODO add some prudent tunnel options (or is it too late?)
     94
    7895        startRunning();
    7996
     
    8299   
    83100    protected void clientConnectionRun(Socket s) {
    84         if (_log.shouldLog(Log.DEBUG))
    85             _log.debug("got a connection.");
     101        if (_log.shouldLog(Log.INFO))
     102            _log.info("New connection local addr is: " + s.getLocalAddress() +
     103                      " from: " + s.getInetAddress());
    86104        Destination clientDest = pickDestination();
    87105        I2PSocket i2ps = null;
     
    90108            i2ps.setReadTimeout(readTimeout);
    91109            StringBuffer expectedPong = new StringBuffer();
    92             Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log), "IRC Client " + __clientId + " in", true);
     110            DCCHelper dcc = _dccEnabled ? new DCC(s.getLocalAddress().getAddress()) : null;
     111            Thread in = new I2PAppThread(new IrcInboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + __clientId + " in", true);
    93112            in.start();
    94             Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log), "IRC Client " + __clientId + " out", true);
     113            Thread out = new I2PAppThread(new IrcOutboundFilter(s,i2ps, expectedPong, _log, dcc), "IRC Client " + __clientId + " out", true);
    95114            out.start();
    96115        } catch (Exception ex) {
     
    121140    }
    122141
    123     /*************************************************************************
    124      *
     142    @Override
     143    public boolean close(boolean forced) {
     144        synchronized(this) {
     145            if (_DCCServer != null) {
     146                _DCCServer.close(forced);
     147                _DCCServer = null;
     148            }
     149            if (_DCCClientManager != null) {
     150                _DCCClientManager.close(forced);
     151                _DCCClientManager = null;
     152            }
     153        }
     154        return super.close(forced);
     155    }
     156
     157    //
     158    //  Start of the DCCHelper interface
     159    //
     160
     161  private class DCC implements DCCHelper {
     162
     163    private final byte[] _localAddr;
     164
     165    /**
     166     *  @param local Our IP address, from the IRC client's perspective
    125167     */
    126     public static class IrcInboundFilter implements Runnable {
    127        
    128         private final Socket local;
    129         private final I2PSocket remote;
    130         private final StringBuffer expectedPong;
    131         private final Log _log;
    132                
    133         public IrcInboundFilter(Socket _local, I2PSocket _remote, StringBuffer pong, Log log) {
    134             local=_local;
    135             remote=_remote;
    136             expectedPong=pong;
    137             _log = log;
    138         }
    139 
    140         public void run() {
    141             // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but...
    142             BufferedReader in;
    143             OutputStream output;
    144             try {
    145                 in = new BufferedReader(new InputStreamReader(remote.getInputStream(), "ISO-8859-1"));
    146                 output=local.getOutputStream();
    147             } catch (IOException e) {
    148                 if (_log.shouldLog(Log.ERROR))
    149                     _log.error("IrcInboundFilter: no streams",e);
    150                 return;
    151             }
    152             if (_log.shouldLog(Log.DEBUG))
    153                 _log.debug("IrcInboundFilter: Running.");
    154             try {
    155                 while(true)
    156                 {
    157                     try {
    158                         String inmsg = in.readLine();
    159                         if(inmsg==null)
    160                             break;
    161                         if(inmsg.endsWith("\r"))
    162                             inmsg=inmsg.substring(0,inmsg.length()-1);
    163                         if (_log.shouldLog(Log.DEBUG))
    164                             _log.debug("in: [" + inmsg + "]");
    165                         String outmsg = inboundFilter(inmsg, expectedPong);
    166                         if(outmsg!=null)
    167                         {
    168                             if(!inmsg.equals(outmsg)) {
    169                                 if (_log.shouldLog(Log.WARN)) {
    170                                     _log.warn("inbound FILTERED: "+outmsg);
    171                                     _log.warn(" - inbound was: "+inmsg);
    172                                 }
    173                             } else {
    174                                 if (_log.shouldLog(Log.INFO))
    175                                     _log.info("inbound: "+outmsg);
    176                             }
    177                             outmsg=outmsg+"\r\n";   // rfc1459 sec. 2.3
    178                             output.write(outmsg.getBytes("ISO-8859-1"));
    179                             // probably doesn't do much but can't hurt
    180                             output.flush();
    181                         } else {
    182                             if (_log.shouldLog(Log.WARN))
    183                                 _log.warn("inbound BLOCKED: "+inmsg);
    184                         }
    185                     } catch (IOException e1) {
    186                         if (_log.shouldLog(Log.WARN))
    187                             _log.warn("IrcInboundFilter: disconnected",e1);
    188                         break;
    189                     }
    190                 }
    191             } catch (RuntimeException re) {
    192                 _log.error("Error filtering inbound data", re);
    193             } finally {
    194                 try { local.close(); } catch (IOException e) {}
    195             }
    196             if(_log.shouldLog(Log.DEBUG))
    197                 _log.debug("IrcInboundFilter: Done.");
    198             }
    199            
    200         }
    201                
    202         /*************************************************************************
    203          *
    204          */
    205         public static class IrcOutboundFilter implements Runnable {
    206                    
    207             private final Socket local;
    208             private final I2PSocket remote;
    209             private final StringBuffer expectedPong;
    210             private final Log _log;
    211                
    212             public IrcOutboundFilter(Socket _local, I2PSocket _remote, StringBuffer pong, Log log) {
    213                 local=_local;
    214                 remote=_remote;
    215                 expectedPong=pong;
    216                 _log = log;
    217             }
    218                
    219             public void run() {
    220                 // Todo: Don't use BufferedReader - IRC spec limits line length to 512 but...
    221                 BufferedReader in;
    222                 OutputStream output;
    223                 try {
    224                     in = new BufferedReader(new InputStreamReader(local.getInputStream(), "ISO-8859-1"));
    225                     output=remote.getOutputStream();
    226                 } catch (IOException e) {
    227                     if (_log.shouldLog(Log.ERROR))
    228                         _log.error("IrcOutboundFilter: no streams",e);
    229                     return;
    230                 }
    231                 if (_log.shouldLog(Log.DEBUG))
    232                     _log.debug("IrcOutboundFilter: Running.");
    233                 try {
    234                     while(true)
    235                     {
    236                         try {
    237                             String inmsg = in.readLine();
    238                             if(inmsg==null)
    239                                 break;
    240                             if(inmsg.endsWith("\r"))
    241                                 inmsg=inmsg.substring(0,inmsg.length()-1);
    242                             if (_log.shouldLog(Log.DEBUG))
    243                                 _log.debug("out: [" + inmsg + "]");
    244                             String outmsg = outboundFilter(inmsg, expectedPong);
    245                             if(outmsg!=null)
    246                             {
    247                                 if(!inmsg.equals(outmsg)) {
    248                                     if (_log.shouldLog(Log.WARN)) {
    249                                         _log.warn("outbound FILTERED: "+outmsg);
    250                                         _log.warn(" - outbound was: "+inmsg);
    251                                     }
    252                                 } else {
    253                                     if (_log.shouldLog(Log.INFO))
    254                                         _log.info("outbound: "+outmsg);
    255                                 }
    256                                 outmsg=outmsg+"\r\n";   // rfc1459 sec. 2.3
    257                                 output.write(outmsg.getBytes("ISO-8859-1"));
    258                                 // save 250 ms in streaming
    259                                 output.flush();
    260                             } else {
    261                                 if (_log.shouldLog(Log.WARN))
    262                                     _log.warn("outbound BLOCKED: "+"\""+inmsg+"\"");
    263                             }
    264                         } catch (IOException e1) {
    265                             if (_log.shouldLog(Log.WARN))
    266                                 _log.warn("IrcOutboundFilter: disconnected",e1);
    267                             break;
    268                         }
    269                     }
    270                 } catch (RuntimeException re) {
    271                     _log.error("Error filtering outbound data", re);
    272                 } finally {
    273                     try { remote.close(); } catch (IOException e) {}
    274                 }
    275                 if (_log.shouldLog(Log.DEBUG))
    276                     _log.debug("IrcOutboundFilter: Done.");
    277             }
    278         }
    279 
    280    
    281     /*************************************************************************
    282      *
    283      */
    284    
    285     public static String inboundFilter(String s, StringBuffer expectedPong) {
    286        
    287         String field[]=s.split(" ",4);
    288         String command;
    289         int idx=0;
    290         final String[] allowedCommands =
    291         {
    292                 // "NOTICE", // can contain CTCP
    293                 //"PING",
    294                 //"PONG",
    295                 "MODE",
    296                 "JOIN",
    297                 "NICK",
    298                 "QUIT",
    299                 "PART",
    300                 "WALLOPS",
    301                 "ERROR",
    302                 "KICK",
    303                 "H", // "hide operator status" (after kicking an op)
    304                 "TOPIC"
    305         };
    306        
    307         if(field[0].charAt(0)==':')
    308             idx++;
    309 
    310         try { command = field[idx++]; }
    311          catch (IndexOutOfBoundsException ioobe) // wtf, server sent borked command?
    312         {
    313            //_log.warn("Dropping defective message: index out of bounds while extracting command.");
    314            return null;
    315         }
    316 
    317         idx++; //skip victim
    318 
    319         // Allow numerical responses
    320         try {
    321             new Integer(command);
    322             return s;
    323         } catch(NumberFormatException nfe){}
    324 
    325        
    326         if ("PING".equalsIgnoreCase(command))
    327             return "PING 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works
    328         if ("PONG".equalsIgnoreCase(command)) {
    329             // Turn the received ":irc.freshcoffee.i2p PONG irc.freshcoffee.i2p :127.0.0.1"
    330             // into ":127.0.0.1 PONG 127.0.0.1 " so that the caller can append the client's extra parameter
    331             // though, does 127.0.0.1 work for irc clients connecting remotely?  and for all of them?  sure would
    332             // be great if irc clients actually followed the RFCs here, but i guess thats too much to ask.
    333             // If we haven't PINGed them, or the PING we sent isn't something we know how to filter, this
    334             // is blank.
    335             //
    336             // String pong = expectedPong.length() > 0 ? expectedPong.toString() : null;
    337             // If we aren't going to rewrite it, pass it through
    338             String pong = expectedPong.length() > 0 ? expectedPong.toString() : s;
    339             expectedPong.setLength(0);
    340             return pong;
    341         }
    342        
    343         // Allow all allowedCommands
    344         for(int i=0;i<allowedCommands.length;i++) {
    345             if(allowedCommands[i].equalsIgnoreCase(command))
    346                 return s;
    347         }
    348        
    349         // Allow PRIVMSG, but block CTCP.
    350         if("PRIVMSG".equalsIgnoreCase(command) || "NOTICE".equalsIgnoreCase(command))
    351         {
    352             String msg;
    353             msg = field[idx++];
    354        
    355             if(msg.indexOf(0x01) >= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':'
    356             {
    357                 // CTCP
    358                 msg=msg.substring(2);
    359                 if(msg.startsWith("ACTION ")) {
    360                     // /me says hello
    361                     return s;
    362                 }
    363                 return null; // Block all other ctcp
    364             }
    365             return s;
    366         }
    367        
    368         // Block the rest
    369         return null;
    370     }
    371    
    372     public static String outboundFilter(String s, StringBuffer expectedPong) {
    373        
    374         String field[]=s.split(" ",3);
    375         String command;
    376         final String[] allowedCommands =
    377         {
    378                 // "NOTICE", // can contain CTCP
    379                 "MODE",
    380                 "JOIN",
    381                 "NICK",
    382                 "WHO",
    383                 "WHOIS",
    384                 "LIST",
    385                 "NAMES",
    386                 "NICK",
    387                 // "QUIT", // replace with a filtered QUIT to hide client quit messages
    388                 "SILENCE",
    389                 "MAP", // seems safe enough, the ircd should protect themselves though
    390                 // "PART", // replace with filtered PART to hide client part messages
    391                 "OPER",
    392                 // "PONG", // replaced with a filtered PING/PONG since some clients send the server IP (thanks aardvax!)
    393                 // "PING",
    394                 "KICK",
    395                 "HELPME",
    396                 "RULES",
    397                 "TOPIC",
    398                 "ISON",    // jIRCii uses this for a ping (response is 303)
    399                 "INVITE"
    400         };
    401 
    402         if(field[0].length()==0)
    403             return null; // W T F?
    404        
    405        
    406         if(field[0].charAt(0)==':')
    407             return null; // wtf
    408        
    409         command = field[0].toUpperCase();
    410 
    411         if ("PING".equals(command)) {
    412             // Most clients just send a PING and are happy with any old PONG.  Others,
    413             // like BitchX, actually expect certain behavior.  It sends two different pings:
    414             // "PING :irc.freshcoffee.i2p" and "PING 1234567890 127.0.0.1" (where the IP is the proxy)
    415             // the PONG to the former seems to be "PONG 127.0.0.1", while the PONG to the later is
    416             // ":irc.freshcoffee.i2p PONG irc.freshcoffe.i2p :1234567890".
    417             // We don't want to send them our proxy's IP address, so we need to rewrite the PING
    418             // sent to the server, but when we get a PONG back, use what we expected, rather than
    419             // what they sent.
    420             //
    421             // Yuck.
    422 
    423             String rv = null;
    424             expectedPong.setLength(0);
    425             if (field.length == 1) { // PING
    426                 rv = "PING";
    427                 // If we aren't rewriting the PING don't rewrite the PONG
    428                 // expectedPong.append("PONG 127.0.0.1");
    429             } else if (field.length == 2) { // PING nonce
    430                 rv = "PING " + field[1];
    431                 // If we aren't rewriting the PING don't rewrite the PONG
    432                 // expectedPong.append("PONG ").append(field[1]);
    433             } else if (field.length == 3) { // PING nonce serverLocation
    434                 rv = "PING " + field[1];
    435                 expectedPong.append("PONG ").append(field[2]).append(" :").append(field[1]); // PONG serverLocation nonce
    436             } else {
    437                 //if (_log.shouldLog(Log.ERROR))
    438                 //    _log.error("IRC client sent a PING we don't understand, filtering it (\"" + s + "\")");
    439                 rv = null;
    440             }
    441            
    442             //if (_log.shouldLog(Log.WARN))
    443             //    _log.warn("sending ping [" + rv + "], waiting for [" + expectedPong + "] orig was [" + s  + "]");
    444            
    445             return rv;
    446         }
    447         if ("PONG".equals(command))
    448             return "PONG 127.0.0.1"; // no way to know what the ircd to i2ptunnel server con is, so localhost works
    449 
    450         // Allow all allowedCommands
    451         for(int i=0;i<allowedCommands.length;i++)
    452         {
    453             if(allowedCommands[i].equals(command))
    454                 return s;
    455         }
    456        
    457         // mIRC sends "NOTICE user :DCC Send file (IP)"
    458         // in addition to the CTCP version
    459         if("NOTICE".equals(command))
    460         {
    461             String msg = field[2];
    462             if(msg.startsWith(":DCC "))
    463                 return null;
    464             // fall through
    465         }
    466        
    467         // Allow PRIVMSG, but block CTCP (except ACTION).
    468         if("PRIVMSG".equals(command) || "NOTICE".equals(command))
    469         {
    470             String msg;
    471             msg = field[2];
    472        
    473             if(msg.indexOf(0x01) >= 0) // CTCP marker ^A can be anywhere, not just immediately after the ':'
    474             {
    475                     // CTCP
    476                 msg=msg.substring(2);
    477                 if(msg.startsWith("ACTION ")) {
    478                     // /me says hello
    479                     return s;
    480                 }
    481                 return null; // Block all other ctcp
    482             }
    483             return s;
    484         }
    485        
    486         if("USER".equals(command)) {
    487             int idx = field[2].lastIndexOf(":");
    488             if(idx<0)
    489                 return "USER user hostname localhost :realname";
    490             String realname = field[2].substring(idx+1);
    491             String ret = "USER "+field[1]+" hostname localhost :"+realname;
    492             return ret;
    493         }
    494 
    495         if ("PART".equals(command)) {
    496             // hide client message
    497             return "PART " + field[1] + " :leaving";
    498         }
    499        
    500         if ("QUIT".equals(command)) {
    501             return "QUIT :leaving";
    502         }
    503        
    504         // Block the rest
    505         return null;
    506     }
     168    public DCC(byte[] local) {
     169        if (local.length == 4)
     170            _localAddr = local;
     171        else
     172            _localAddr = new byte[] {127, 0, 0, 1};
     173    }
     174
     175    public boolean isEnabled() {
     176        return _dccEnabled;
     177    }
     178
     179    public String getB32Hostname() {
     180        return Base32.encode(sockMgr.getSession().getMyDestination().calculateHash().getData()) + ".b32.i2p";
     181    }
     182
     183    public byte[] getLocalAddress() {
     184        return _localAddr;
     185    }
     186
     187    public int newOutgoing(byte[] ip, int port, String type) {
     188        I2PTunnelDCCServer server;
     189        synchronized(this) {
     190            if (_DCCServer == null) {
     191                if (_log.shouldLog(Log.INFO))
     192                    _log.info("Starting DCC Server");
     193                _DCCServer = new I2PTunnelDCCServer(sockMgr, l, I2PTunnelIRCClient.this, getTunnel());
     194                // TODO add some prudent tunnel options (or is it too late?)
     195                _DCCServer.startRunning();
     196            }
     197            server = _DCCServer;
     198        }
     199        int rv = server.newOutgoing(ip, port, type);
     200        if (_log.shouldLog(Log.INFO))
     201            _log.info("New outgoing " + type + ' ' + port + " returns " + rv);
     202        return rv;
     203    }
     204
     205    public int newIncoming(String b32, int port, String type) {
     206        DCCClientManager tracker;
     207        synchronized(this) {
     208            if (_DCCClientManager == null) {
     209                if (_log.shouldLog(Log.INFO))
     210                    _log.info("Starting DCC Client");
     211                _DCCClientManager = new DCCClientManager(sockMgr, l, I2PTunnelIRCClient.this, getTunnel());
     212            }
     213            tracker = _DCCClientManager;
     214        }
     215        // The tracker starts our client
     216        int rv = tracker.newIncoming(b32, port, type);
     217        if (_log.shouldLog(Log.INFO))
     218            _log.info("New incoming " + type + ' ' + b32 + ' ' + port + " returns " + rv);
     219        return rv;
     220    }
     221
     222    public int resumeOutgoing(int port) {
     223        DCCClientManager tracker = _DCCClientManager;
     224        if (tracker != null)
     225            return tracker.resumeOutgoing(port);
     226        return -1;
     227    }
     228
     229    public int resumeIncoming(int port) {
     230        I2PTunnelDCCServer server = _DCCServer;
     231        if (server != null)
     232            return server.resumeIncoming(port);
     233        return -1;
     234    }
     235
     236    public int acceptOutgoing(int port) {
     237        I2PTunnelDCCServer server = _DCCServer;
     238        if (server != null)
     239            return server.acceptOutgoing(port);
     240        return -1;
     241    }
     242
     243    public int acceptIncoming(int port) {
     244        DCCClientManager tracker = _DCCClientManager;
     245        if (tracker != null)
     246            return tracker.acceptIncoming(port);
     247        return -1;
     248    }
     249  }
    507250}
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java

    r67d608a r6d06427  
    2121
    2222public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener {
    23     private final static Log _log = new Log(I2PTunnelRunner.class);
     23    private final Log _log = new Log(I2PTunnelRunner.class);
    2424
    2525    private static volatile long __runnerId;
    26     private long _runnerId;
     26    private final long _runnerId;
    2727    /**
    2828     * max bytes streamed in a packet - smaller ones might be filled
     
    3535    static final int NETWORK_BUFFER_SIZE = MAX_PACKET_SIZE;
    3636
    37     private Socket s;
    38     private I2PSocket i2ps;
    39     final Object slock, finishLock = new Object();
     37    private final Socket s;
     38    private final I2PSocket i2ps;
     39    private final Object slock, finishLock = new Object();
    4040    boolean finished = false;
    41     HashMap ostreams, sockets;
    42     byte[] initialI2PData;
    43     byte[] initialSocketData;
     41    private final byte[] initialI2PData;
     42    private final byte[] initialSocketData;
    4443    /** when the last data was sent/received (or -1 if never) */
    4544    private long lastActivityOn;
    4645    /** when the runner started up */
    47     private long startedOn;
    48     private List sockList;
     46    private final long startedOn;
     47    private final List<I2PSocket> sockList;
    4948    /** if we die before receiving any data, run this job */
    50     private Runnable onTimeout;
     49    private final Runnable onTimeout;
    5150    private long totalSent;
    5251    private long totalReceived;
     
    5453    private volatile long __forwarderId;
    5554   
    56     public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList) {
     55    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
     56                           List<I2PSocket> sockList) {
    5757        this(s, i2ps, slock, initialI2PData, null, sockList, null);
    5858    }
    59     public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList) {
     59
     60    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
     61                           byte[] initialSocketData, List<I2PSocket> sockList) {
    6062        this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null);
    6163    }
    62     public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) {
     64
     65    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
     66                           List<I2PSocket> sockList, Runnable onTimeout) {
    6367        this(s, i2ps, slock, initialI2PData, null, sockList, onTimeout);
    6468    }
    65     public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, byte[] initialSocketData, List sockList, Runnable onTimeout) {
     69
     70    /**
     71     *  Starts itself
     72     *
     73     *  @param slock the socket lock, non-null
     74     *  @param initialI2PData may be null
     75     *  @param initialSocketData may be null
     76     *  @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
     77     *                               Will synchronize on slock when removing.
     78     *  @param onTImeout may be null
     79     */
     80    public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
     81                           byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout) {
    6682        this.sockList = sockList;
    6783        this.s = s;
     
    85101     * [aka we're done running the streams]?
    86102     *
     103     * @deprecated unused
    87104     */
    88105    public boolean isFinished() {
     
    94111     *
    95112     * @return date (ms since the epoch), or -1 if no data has been transferred yet
    96      *
     113     * @deprecated unused
    97114     */
    98115    public long getLastActivityOn() {
     
    238255    private class StreamForwarder extends I2PAppThread {
    239256
    240         InputStream in;
    241         OutputStream out;
    242         String direction;
    243         private boolean _toI2P;
    244         private ByteCache _cache;
     257        private final InputStream in;
     258        private final OutputStream out;
     259        private final String direction;
     260        private final boolean _toI2P;
     261        private final ByteCache _cache;
    245262
    246263        private StreamForwarder(InputStream in, OutputStream out, boolean toI2P) {
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java

    r67d608a r6d06427  
    4848    private boolean _usePool;
    4949
    50     private Logging l;
     50    protected Logging l;
    5151
    5252    private static final long DEFAULT_READ_TIMEOUT = -1; // 3*60*1000;
     
    7070    private ThreadPoolExecutor _executor;
    7171
     72    /** unused? port should always be specified */
    7273    private int DEFAULT_LOCALPORT = 4488;
    7374    protected int localPort = DEFAULT_LOCALPORT;
    7475
    7576    /**
     77     * @param privData Base64-encoded private key data,
     78     *                 format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    7679     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    7780     *                                  badly that we cant create a socketManager
     
    8588
    8689    /**
     90     * @param privkey file containing the private key data,
     91     *                format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
     92     * @param privkeyname the name of the privKey file, not clear why we need this too
    8793     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    8894     *                                  badly that we cant create a socketManager
     
    106112
    107113    /**
     114     * @param privData stream containing the private key data,
     115     *                 format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
     116     * @param privkeyname the name of the privKey file, not clear why we need this too
    108117     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    109118     *                                  badly that we cant create a socketManager
     
    115124    }
    116125
     126    /**
     127     *  @param sktMgr the existing socket manager
     128     *  @since 0.8.9
     129     */
     130    public I2PTunnelServer(InetAddress host, int port, I2PSocketManager sktMgr,
     131                           Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) {
     132        super("Server at " + host + ':' + port, notifyThis, tunnel);
     133        this.l = l;
     134        this.remoteHost = host;
     135        this.remotePort = port;
     136        _log = tunnel.getContext().logManager().getLog(getClass());
     137        sockMgr = sktMgr;
     138        open = true;
     139    }
     140
    117141    private static final int RETRY_DELAY = 20*1000;
    118142    private static final int MAX_RETRIES = 4;
    119143
    120144    /**
     145     * @param privData stream containing the private key data,
     146     *                 format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
     147     * @param privkeyname the name of the privKey file, not clear why we need this too
    121148     * @throws IllegalArgumentException if the I2CP configuration is b0rked so
    122149     *                                  badly that we cant create a socketManager
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelTask.java

    r67d608a r6d06427  
    1414 */
    1515
    16 public abstract class I2PTunnelTask implements EventDispatcher {
    17 
    18     private final EventDispatcherImpl _event = new EventDispatcherImpl();
     16public abstract class I2PTunnelTask extends EventDispatcherImpl {
    1917
    2018    private int id;
     
    7876        return name;
    7977    }
    80 
    81     /* Required by the EventDispatcher interface */
    82     public EventDispatcher getEventDispatcher() {
    83         return _event;
    84     }
    85 
    86     public void attachEventDispatcher(EventDispatcher e) {
    87         _event.attachEventDispatcher(e.getEventDispatcher());
    88     }
    89 
    90     public void detachEventDispatcher(EventDispatcher e) {
    91         _event.detachEventDispatcher(e.getEventDispatcher());
    92     }
    93 
    94     public void notifyEvent(String e, Object a) {
    95         _event.notifyEvent(e, a);
    96     }
    97 
    98     public Object getEventValue(String n) {
    99         return _event.getEventValue(n);
    100     }
    101 
    102     public Set getEvents() {
    103         return _event.getEvents();
    104     }
    105 
    106     public void ignoreEvents() {
    107         _event.ignoreEvents();
    108     }
    109 
    110     public void unIgnoreEvents() {
    111         _event.unIgnoreEvents();
    112     }
    113 
    114     public Object waitEventValue(String n) {
    115         return _event.waitEventValue(n);
    116     }
    11778}
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/TunnelController.java

    r67d608a r6d06427  
    2323
    2424/**
    25  * Coordinate the runtime operation and configuration of a tunnel. 
     25 * Coordinate the runtime operation and configuration of a single I2PTunnel.
     26 * An I2PTunnel tracks one or more I2PTunnelTasks and one or more I2PSessions.
     27 * Usually one of each.
     28 *
    2629 * These objects are bundled together under a TunnelControllerGroup where the
    2730 * entire group is stored / loaded from a single config file.
     
    2932 */
    3033public class TunnelController implements Logging {
    31     private Log _log;
     34    private final Log _log;
    3235    private Properties _config;
    33     private I2PTunnel _tunnel;
    34     private List<String> _messages;
     36    private final I2PTunnel _tunnel;
     37    private final List<String> _messages;
    3538    private List<I2PSession> _sessions;
    3639    private boolean _running;
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/socks/I2PSOCKSIRCTunnel.java

    r67d608a r6d06427  
    1212import net.i2p.client.streaming.I2PSocket;
    1313import net.i2p.i2ptunnel.I2PTunnel;
    14 import net.i2p.i2ptunnel.I2PTunnelIRCClient;
     14import net.i2p.i2ptunnel.irc.IrcInboundFilter;
     15import net.i2p.i2ptunnel.irc.IrcOutboundFilter;
    1516import net.i2p.i2ptunnel.Logging;
    1617import net.i2p.util.EventDispatcher;
     
    5152            I2PSocket destSock = serv.getDestinationI2PSocket(this);
    5253            StringBuffer expectedPong = new StringBuffer();
    53             Thread in = new I2PAppThread(new I2PTunnelIRCClient.IrcInboundFilter(clientSock, destSock, expectedPong, _log),
     54            Thread in = new I2PAppThread(new IrcInboundFilter(clientSock, destSock, expectedPong, _log),
    5455                                         "SOCKS IRC Client " + (++__clientId) + " in", true);
    5556            in.start();
    56             Thread out = new I2PAppThread(new I2PTunnelIRCClient.IrcOutboundFilter(clientSock, destSock, expectedPong, _log),
     57            Thread out = new I2PAppThread(new IrcOutboundFilter(clientSock, destSock, expectedPong, _log),
    5758                                          "SOCKS IRC Client " + __clientId + " out", true);
    5859            out.start();
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/EditBean.java

    r67d608a r6d06427  
    2222import net.i2p.i2ptunnel.I2PTunnelHTTPClient;
    2323import net.i2p.i2ptunnel.I2PTunnelHTTPClientBase;
     24import net.i2p.i2ptunnel.I2PTunnelIRCClient;
    2425import net.i2p.i2ptunnel.TunnelController;
    2526import net.i2p.i2ptunnel.TunnelControllerGroup;
     
    171172    }
    172173   
     174    /** @since 0.8.9 */
     175    public boolean getDCC(int tunnel) {
     176        return getBooleanProperty(tunnel, I2PTunnelIRCClient.PROP_DCC);
     177    }
     178
    173179    public String getEncryptKey(int tunnel) {
    174180        return getProperty(tunnel, "i2cp.leaseSetKey", "");
  • apps/i2ptunnel/java/src/net/i2p/i2ptunnel/web/IndexBean.java

    r67d608a r6d06427  
    2929import net.i2p.i2ptunnel.I2PTunnelHTTPClient;
    3030import net.i2p.i2ptunnel.I2PTunnelHTTPClientBase;
     31import net.i2p.i2ptunnel.I2PTunnelIRCClient;
    3132import net.i2p.i2ptunnel.TunnelController;
    3233import net.i2p.i2ptunnel.TunnelControllerGroup;
     
    712713    }
    713714
     715    /** @since 0.8.9 */
     716    public void setDCC(String moo) {
     717        _booleanOptions.add(I2PTunnelIRCClient.PROP_DCC);
     718    }
     719
    714720    protected static final String PROP_ENABLE_ACCESS_LIST = "i2cp.enableAccessList";
    715721    protected static final String PROP_ENABLE_BLACKLIST = "i2cp.enableBlackList";
     
    10211027                config.setProperty("interface", "");
    10221028        }
     1029
     1030        if ("ircclient".equals(_type)) {
     1031            boolean dcc = _booleanOptions.contains(I2PTunnelIRCClient.PROP_DCC);
     1032            config.setProperty("option." + I2PTunnelIRCClient.PROP_DCC,
     1033                               "" + dcc);
     1034            // add some sane server options since they aren't in the GUI (yet)
     1035            if (dcc) {
     1036                config.setProperty("options." + PROP_MAX_CONNS_MIN, "3");
     1037                config.setProperty("options." + PROP_MAX_CONNS_HOUR, "10");
     1038                config.setProperty("options." + PROP_MAX_TOTAL_CONNS_MIN, "5");
     1039                config.setProperty("options." + PROP_MAX_TOTAL_CONNS_HOUR, "25");
     1040            }
     1041        }
     1042
    10231043        return config;
    10241044    }
     
    10271047        "inbound.length", "outbound.length", "inbound.lengthVariance", "outbound.lengthVariance",
    10281048        "inbound.backupQuantity", "outbound.backupQuantity", "inbound.quantity", "outbound.quantity",
    1029         "inbound.nickname", "outbound.nickname", "i2p.streaming.connectDelay", "i2p.streaming.maxWindowSize"
     1049        "inbound.nickname", "outbound.nickname", "i2p.streaming.connectDelay", "i2p.streaming.maxWindowSize",
     1050        I2PTunnelIRCClient.PROP_DCC
    10301051        };
    10311052    private static final String _booleanClientOpts[] = {
     
    10491070         PROP_MAX_STREAMS
    10501071        };
     1072
    10511073    protected static final Set _noShowSet = new HashSet(64);
    10521074    static {
  • apps/i2ptunnel/jsp/editClient.jsp

    r67d608a r6d06427  
    174174                <span class="comment"><%=intl._("(Check the Box for 'YES')")%></span>
    175175            </div>
     176         <% if ("ircclient".equals(tunnelType)) { %>
     177            <div id="startupField" class="rowItem">
     178                <label for="dcc" accesskey="d">
     179                    <%=intl._("Enable DCC")%>:
     180                </label>
     181                <input value="1" type="checkbox" id="startOnLoad" name="DCC" title="Enable DCC"<%=(editBean.getDCC(curTunnel) ? " checked=\"checked\"" : "")%> class="tickbox" />               
     182                <span class="comment"><%=intl._("(Check the Box for 'YES')")%></span>
     183            </div>
     184         <% } // ircclient %>
    176185           
    177186            <div class="footer">
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java

    r67d608a r6d06427  
    7171
    7272    public void setSocketErrorListener(SocketErrorListener lsnr);
     73
     74    /**
     75     *  The remote port.
     76     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     77     *  @since 0.8.9
     78     */
     79    public int getPort();
     80
     81    /**
     82     *  The local port.
     83     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     84     *  @since 0.8.9
     85     */
     86    public int getLocalPort();
     87
    7388    /**
    7489     * Allow notification of underlying errors communicating across I2P without
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

    r67d608a r6d06427  
    88import net.i2p.I2PAppContext;
    99import net.i2p.I2PException;
     10import net.i2p.client.I2PSession;
    1011import net.i2p.client.I2PSessionException;
    1112import net.i2p.data.Destination;
     
    301302    public long getCreatedOn() { return _createdOn; }
    302303    public long getClosedOn() { return _closedOn; }
     304   
     305    /**
     306     * The remote port.
     307     * @return 0 always
     308     * @since 0.8.9
     309     */
     310    public int getPort() {
     311        return I2PSession.PORT_UNSPECIFIED;
     312    }
     313
     314    /**
     315     * The local port.
     316     * @return 0 always
     317     * @since 0.8.9
     318     */
     319    public int getLocalPort() {
     320        return I2PSession.PORT_UNSPECIFIED;
     321    }
    303322   
    304323   
     
    672691        }
    673692    }
    674    
     693
    675694    @Override
    676695    public String toString() { return "" + hashCode(); }
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java

    r67d608a r6d06427  
    9191     * stream and connected to the default I2CP host and port.
    9292     *
    93      * @param myPrivateKeyStream private key stream
     93     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    9494     * @return the newly created socket manager, or null if there were errors
    9595     */
     
    102102     * stream and connected to the default I2CP host and port.
    103103     *
    104      * @param myPrivateKeyStream private key stream
     104     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    105105     * @param opts I2CP options
    106106     * @return the newly created socket manager, or null if there were errors
     
    115115     * port
    116116     *
    117      * @param myPrivateKeyStream private key stream
     117     * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    118118     * @param i2cpHost I2CP host
    119119     * @param i2cpPort I2CP port
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java

    r67d608a r6d06427  
    33/**
    44 * Define the configuration for streaming and verifying data on the socket.
    5  *
     5 * Use I2PSocketManager.buildOptions() to get one of these.
    66 */
    77public interface I2PSocketOptions {
     
    8282     */
    8383    public void setWriteTimeout(long ms);
     84
     85    /**
     86     *  The remote port.
     87     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     88     *  @since 0.8.9
     89     */
     90    public int getPort();
     91
     92    /**
     93     *  The remote port.
     94     *  @param port 0 - 65535
     95     *  @since 0.8.9
     96     */
     97    public void setPort(int port);
     98
     99    /**
     100     *  The local port.
     101     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     102     *  @since 0.8.9
     103     */
     104    public int getLocalPort();
     105
     106    /**
     107     *  The local port.
     108     *  @param port 0 - 65535
     109     *  @since 0.8.9
     110     */
     111    public void setLocalPort(int port);
    84112}
  • apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java

    r67d608a r6d06427  
    55/**
    66 * Define the configuration for streaming and verifying data on the socket.
    7  *
     7 * Use I2PSocketManager.buildOptions() to get one of these.
    88 */
    99class I2PSocketOptionsImpl implements I2PSocketOptions {
     
    1212    private long _writeTimeout;
    1313    private int _maxBufferSize;
     14    private int _localPort;
     15    private int _remotePort;
    1416   
    1517    public static final int DEFAULT_BUFFER_SIZE = 1024*64;
     
    1719    public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000;
    1820   
     21    /**
     22     *  Sets max buffer size, connect timeout, read timeout, and write timeout
     23     *  from System properties. Does not set local port or remote port.
     24     */
    1925    public I2PSocketOptionsImpl() {
    2026        this(System.getProperties());
    2127    }
    2228   
     29    /**
     30     *  Initializes from System properties then copies over all options.
     31     *  @param opts may be null
     32     */
    2333    public I2PSocketOptionsImpl(I2PSocketOptions opts) {
    2434        this(System.getProperties());
     
    2838            _writeTimeout = opts.getWriteTimeout();
    2939            _maxBufferSize = opts.getMaxBufferSize();
     40            _localPort = opts.getLocalPort();
     41            _remotePort = opts.getPort();
    3042        }
    3143    }
    3244
     45    /**
     46     *  Sets max buffer size, connect timeout, read timeout, and write timeout
     47     *  from properties. Does not set local port or remote port.
     48     *  @param opts may be null
     49     */
    3350    public I2PSocketOptionsImpl(Properties opts) {
    3451        init(opts);
    3552    }
    3653   
     54    /**
     55     *  Sets max buffer size, connect timeout, read timeout, and write timeout
     56     *  from properties. Does not set local port or remote port.
     57     *  @param opts may be null
     58     */
    3759    public void setProperties(Properties opts) {
    3860        if (opts == null) return;
     
    4769    }
    4870   
     71    /**
     72     *  Sets max buffer size, connect timeout, read timeout, and write timeout
     73     *  from properties. Does not set local port or remote port.
     74     */
    4975    protected void init(Properties opts) {
    5076        _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
     
    145171        _writeTimeout = ms;
    146172    }
     173
     174    /**
     175     *  The remote port.
     176     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     177     *  @since 0.8.9
     178     */
     179    public int getPort() {
     180        return _remotePort;
     181    }
     182
     183    /**
     184     *  The remote port.
     185     *  @param port 0 - 65535
     186     *  @since 0.8.9
     187     */
     188    public void setPort(int port) {
     189        _remotePort = port;
     190    }
     191
     192    /**
     193     *  The local port.
     194     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     195     *  @since 0.8.9
     196     */
     197    public int getLocalPort() {
     198        return _localPort;
     199    }
     200
     201    /**
     202     *  The local port.
     203     *  @param port 0 - 65535
     204     *  @since 0.8.9
     205     */
     206    public void setLocalPort(int port) {
     207        _localPort = port;
     208    }
    147209}
  • apps/streaming/java/src/net/i2p/client/streaming/Connection.java

    r67d608a r6d06427  
    2424 */
    2525class Connection {
    26     private I2PAppContext _context;
    27     private Log _log;
    28     private ConnectionManager _connectionManager;
     26    private final I2PAppContext _context;
     27    private final Log _log;
     28    private final ConnectionManager _connectionManager;
    2929    private Destination _remotePeer;
    3030    private long _sendStreamId;
    3131    private long _receiveStreamId;
    3232    private long _lastSendTime;
    33     private AtomicLong _lastSendId;
     33    private final AtomicLong _lastSendId;
    3434    private boolean _resetReceived;
    3535    private boolean _resetSent;
     
    3737    private boolean _connected;
    3838    private boolean _hardDisconnected;
    39     private MessageInputStream _inputStream;
    40     private MessageOutputStream _outputStream;
    41     private SchedulerChooser _chooser;
     39    private final MessageInputStream _inputStream;
     40    private final MessageOutputStream _outputStream;
     41    private final SchedulerChooser _chooser;
    4242    private long _nextSendTime;
    4343    private long _ackedPackets;
    44     private long _createdOn;
     44    private final long _createdOn;
    4545    private long _closeSentOn;
    4646    private long _closeReceivedOn;
     
    5252    /** Packet ID (Long) to PacketLocal for sent but unacked packets */
    5353    private final Map<Long, PacketLocal> _outboundPackets;
    54     private PacketQueue _outboundQueue;
    55     private ConnectionPacketHandler _handler;
     54    private final PacketQueue _outboundQueue;
     55    private final ConnectionPacketHandler _handler;
    5656    private ConnectionOptions _options;
    57     private ConnectionDataReceiver _receiver;
     57    private final ConnectionDataReceiver _receiver;
    5858    private I2PSocketFull _socket;
    5959    /** set to an error cause if the connection could not be established */
     
    7171    /** how many messages have been resent and not yet ACKed? */
    7272    private int _activeResends;
    73     private ConEvent _connectionEvent;
    74     private int _randomWait;
     73    private final ConEvent _connectionEvent;
     74    private final int _randomWait;
     75    private int _localPort;
     76    private int _remotePort;
    7577   
    7678    private long _lifetimeBytesSent;
     
    8789    public static final int MAX_WINDOW_SIZE = 128;
    8890   
    89     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
     91    public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
     92                      PacketQueue queue, ConnectionPacketHandler handler) {
    9093        this(ctx, manager, chooser, queue, handler, null);
    9194    }
    92     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
     95
     96    public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser,
     97                      PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) {
    9398        _context = ctx;
    9499        _connectionManager = manager;
     
    102107        _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize()));
    103108        _outboundPackets = new TreeMap();
     109        if (opts != null) {
     110            _localPort = opts.getLocalPort();
     111            _remotePort = opts.getPort();
     112        }
    104113        _options = (opts != null ? opts : new ConnectionOptions());
    105114        _outputStream.setWriteTimeout((int)_options.getWriteTimeout());
     
    107116        _lastSendId = new AtomicLong(-1);
    108117        _nextSendTime = -1;
    109         _ackedPackets = 0;
    110118        _createdOn = _context.clock().now();
    111119        _closeSentOn = -1;
    112120        _closeReceivedOn = -1;
    113         _unackedPacketsReceived = 0;
    114121        _congestionWindowEnd = _options.getWindowSize()-1;
    115122        _highestAckedThrough = -1;
     
    117124        _lastCongestionTime = -1;
    118125        _lastCongestionHighestUnacked = -1;
    119         _resetReceived = false;
    120126        _connected = true;
    121127        _disconnectScheduledOn = -1;
     
    124130        _ackSinceCongestion = true;
    125131        _connectLock = new Object();
    126         _activeResends = 0;
    127132        _resetSentOn = -1;
    128         _isInbound = false;
    129         _updatedShareOpts = false;
    130133        _connectionEvent = new ConEvent();
    131         _hardDisconnected = false;
    132134        _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage
    133135        _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 });
     
    679681    public void setSocket(I2PSocketFull socket) { _socket = socket; }
    680682   
     683    /**
     684     * The remote port.
     685     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     686     *  @since 0.8.9
     687     */
     688    public int getPort() {
     689        return _remotePort;
     690    }
     691
     692    /**
     693     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     694     *  @since 0.8.9
     695     */
     696    public int getLocalPort() {
     697        return _localPort;
     698    }
     699
    681700    public String getConnectionError() { return _connectionError; }
    682701    public void setConnectionError(String err) { _connectionError = err; }
     
    782801   
    783802    public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; }
    784    
     803
    785804    void congestionOccurred() {
    786805        // if we hit congestion and e.g. 5 packets are resent,
     
    963982     */
    964983    public MessageInputStream getInputStream() { return _inputStream; }
     984
    965985    /** stream that the local peer sends data to the remote peer on
    966986     * @return the outbound message stream
    967987     */
    968988    public MessageOutputStream getOutputStream() { return _outputStream; }
    969    
    970         @Override
     989
     990    @Override
    971991    public String toString() {
    972992        StringBuilder buf = new StringBuilder(128);
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java

    r67d608a r6d06427  
    1414 */
    1515class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
    16     private I2PAppContext _context;
    17     private Log _log;
    18     private Connection _connection;
     16    private final I2PAppContext _context;
     17    private final Log _log;
     18    private final Connection _connection;
    1919    private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus();
    2020   
     21    /**
     22     *  @param con non-null
     23     */
    2124    public ConnectionDataReceiver(I2PAppContext ctx, Connection con) {
    2225        _context = ctx;
     
    4245     */
    4346    public boolean writeInProcess() {
    44         Connection con = _connection;
    45         if (con != null)
    46             return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize();
    47         return false;
     47        return _connection.getUnackedPacketsSent() >= _connection.getOptions().getWindowSize();
    4848    }
    4949   
     
    6161    public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) {
    6262        Connection con = _connection;
    63         if (con == null) return _dummyStatus;
     63        //if (con == null) return _dummyStatus;
    6464        boolean doSend = true;
    6565        if ( (size <= 0) && (con.getLastSendId() >= 0) ) {
     
    122122    public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) {
    123123        Connection con = _connection;
    124         if (con == null) return null;
     124        //if (con == null) return null;
    125125        long before = System.currentTimeMillis();
    126126        PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement);
     
    186186            packet.setOptionalFrom(con.getSession().getMyDestination());
    187187            packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
     188            packet.setLocalPort(con.getLocalPort());
     189            packet.setRemotePort(con.getPort());
    188190        }
    189191        if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java

    r67d608a r6d06427  
    1919 */
    2020class ConnectionHandler {
    21     private I2PAppContext _context;
    22     private Log _log;
    23     private ConnectionManager _manager;
    24     private LinkedBlockingQueue<Packet> _synQueue;
     21    private final I2PAppContext _context;
     22    private final Log _log;
     23    private final ConnectionManager _manager;
     24    private final LinkedBlockingQueue<Packet> _synQueue;
    2525    private boolean _active;
    2626    private int _acceptTimeout;
     
    4242        _manager = mgr;
    4343        _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE);
    44         _active = false;
    4544        _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT;
    4645    }
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java

    r67d608a r6d06427  
    2222 */
    2323class ConnectionManager {
    24     private I2PAppContext _context;
    25     private Log _log;
    26     private I2PSession _session;
    27     private MessageHandler _messageHandler;
    28     private PacketHandler _packetHandler;
    29     private ConnectionHandler _connectionHandler;
    30     private PacketQueue _outboundQueue;
    31     private SchedulerChooser _schedulerChooser;
    32     private ConnectionPacketHandler _conPacketHandler;
    33     private TCBShare _tcbShare;
     24    private final I2PAppContext _context;
     25    private final Log _log;
     26    private final I2PSession _session;
     27    private final MessageHandler _messageHandler;
     28    private final PacketHandler _packetHandler;
     29    private final ConnectionHandler _connectionHandler;
     30    private final PacketQueue _outboundQueue;
     31    private final SchedulerChooser _schedulerChooser;
     32    private final ConnectionPacketHandler _conPacketHandler;
     33    private final TCBShare _tcbShare;
    3434    /** Inbound stream ID (Long) to Connection map */
    35     private ConcurrentHashMap<Long, Connection> _connectionByInboundId;
     35    private final ConcurrentHashMap<Long, Connection> _connectionByInboundId;
    3636    /** Ping ID (Long) to PingRequest */
    3737    private final Map<Long, PingRequest> _pendingPings;
     
    3939    private boolean _throttlersInitialized;
    4040    private int _maxConcurrentStreams;
    41     private ConnectionOptions _defaultOptions;
     41    private final ConnectionOptions _defaultOptions;
    4242    private volatile int _numWaiting;
    4343    private long _soTimeout;
     
    6060        _conPacketHandler = new ConnectionPacketHandler(_context);
    6161        _tcbShare = new TCBShare(_context);
    62         _session.setSessionListener(_messageHandler);
     62        // PROTO_ANY is for backward compatibility (pre-0.7.1)
     63        // TODO change proto to PROTO_STREAMING someday.
     64        // Right now we get everything, and rely on Datagram to specify PROTO_UDP.
     65        // PacketQueue has sent PROTO_STREAMING since the beginning of mux support (0.7.1)
     66        _session.addMuxedSessionListener(_messageHandler, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
    6367        _outboundQueue = new PacketQueue(_context, _session, this);
    64         _allowIncoming = false;
    65         _numWaiting = 0;
    6668        /** Socket timeout for accept() */
    6769        _soTimeout = -1;
     
    142144     */
    143145    public Connection receiveConnection(Packet synPacket) {
    144         Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
     146        ConnectionOptions opts = new ConnectionOptions(_defaultOptions);
     147        opts.setPort(synPacket.getRemotePort());
     148        opts.setLocalPort(synPacket.getLocalPort());
     149        Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
    145150        _tcbShare.updateOptsFromShare(con);
    146151        con.setInbound();
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java

    r67d608a r6d06427  
    107107     *  some parts that could use more research.
    108108     *
     109     *<pre>
    109110     *  1024 Tunnel Message
    110111     *  - 21 Header (see router/tunnel/BatchedPreprocessor.java)
     
    170171     *   3 msgs: 2722
    171172     *   4 msgs: 3714
    172      *
     173     *</pre>
    173174     *
    174175     * Before release 0.6.1.14 this was 4096.
     
    206207    public static final int MIN_MESSAGE_SIZE = 512;
    207208
     209    /**
     210     *  Sets max buffer size, connect timeout, read timeout, and write timeout
     211     *  from System properties. Does not set local port or remote port.
     212     */
    208213    public ConnectionOptions() {
    209214        super();
    210215    }
    211216   
     217    /**
     218     *  Sets max buffer size, connect timeout, read timeout, and write timeout
     219     *  from properties. Does not set local port or remote port.
     220     *  @param opts may be null
     221     */
    212222    public ConnectionOptions(Properties opts) {
    213223        super(opts);
    214224    }
    215225   
     226    /**
     227     *  Initializes from System properties then copies over all options.
     228     *  @param opts may be null
     229     */
    216230    public ConnectionOptions(I2PSocketOptions opts) {
    217231        super(opts);
    218232    }
    219233   
     234    /**
     235     *  Initializes from System properties then copies over all options.
     236     *  @param opts may be null
     237     */
    220238    public ConnectionOptions(ConnectionOptions opts) {
    221239        super(opts);
     
    236254            setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor());
    237255            setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor());
    238             setWriteTimeout(opts.getWriteTimeout());
    239             setReadTimeout(opts.getReadTimeout());
     256            // handled in super()
     257            // not clear why added by jr 12/22/2005
     258            //setWriteTimeout(opts.getWriteTimeout());
     259            //setReadTimeout(opts.getReadTimeout());
    240260            setAnswerPings(opts.getAnswerPings());
    241261            initLists(opts);
     
    249269    }
    250270   
    251         @Override
     271    @Override
    252272    protected void init(Properties opts) {
    253273        super.init(opts);
     
    263283        setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE));
    264284        setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
    265         setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
     285        // handled in super()
     286        //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
    266287        setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000));
    267288        setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND));
     
    269290        setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1));
    270291        setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1));
     292        // overrides default in super()
    271293        setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
    272294        setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
     
    280302    }
    281303   
    282         @Override
     304    @Override
    283305    public void setProperties(Properties opts) {
    284306        super.setProperties(opts);
     
    304326        if (opts.containsKey(PROP_MAX_RESENDS))
    305327            setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS));
    306         if (opts.containsKey(PROP_WRITE_TIMEOUT))
    307             setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
     328        // handled in super()
     329        //if (opts.containsKey(PROP_WRITE_TIMEOUT))
     330        //    setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
    308331        if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
    309332            setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000));
     
    317340        if (opts.containsKey(PROP_CONNECT_TIMEOUT))
    318341            // wow 5 minutes!!! FIXME!!
     342            // overrides default in super()
    319343            setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
    320344        if (opts.containsKey(PROP_ANSWER_PINGS))
  • apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java

    r67d608a r6d06427  
    1616 */
    1717class ConnectionPacketHandler {
    18     private I2PAppContext _context;
    19     private Log _log;
     18    private final I2PAppContext _context;
     19    private final Log _log;
    2020   
    2121    public ConnectionPacketHandler(I2PAppContext context) {
  • apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java

    r67d608a r6d06427  
    99 */
    1010class I2PServerSocketFull implements I2PServerSocket {
    11     private I2PSocketManagerFull _socketManager;
     11    private final I2PSocketManagerFull _socketManager;
    1212   
    1313    public I2PServerSocketFull(I2PSocketManagerFull mgr) {
  • apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java

    r67d608a r6d06427  
    55import java.io.OutputStream;
    66
     7import net.i2p.client.I2PSession;
    78import net.i2p.data.Destination;
    89
     
    128129            c.disconnectComplete();
    129130    }
    130         @Override
     131   
     132    /**
     133     * The remote port.
     134     * @return the port or 0 if unknown
     135     * @since 0.8.9
     136     */
     137    public int getPort() {
     138        Connection c = _connection;
     139        return c == null ? I2PSession.PORT_UNSPECIFIED : c.getPort();
     140    }
     141
     142    /**
     143     * The local port.
     144     * @return the port or 0 if unknown
     145     * @since 0.8.9
     146     */
     147    public int getLocalPort() {
     148        Connection c = _connection;
     149        return c == null ? I2PSession.PORT_UNSPECIFIED : c.getLocalPort();
     150    }
     151
     152    @Override
    131153    public String toString() {
    132154        Connection c = _connection;
  • apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java

    r67d608a r6d06427  
    88import net.i2p.client.I2PSession;
    99import net.i2p.client.I2PSessionException;
    10 import net.i2p.client.I2PSessionListener;
     10import net.i2p.client.I2PSessionMuxedListener;
    1111import net.i2p.util.Log;
    1212
     
    1616 *
    1717 */
    18 class MessageHandler implements I2PSessionListener {
     18class MessageHandler implements I2PSessionMuxedListener {
    1919    private final ConnectionManager _manager;
    2020    private final I2PAppContext _context;
     
    3232    /** Instruct the client that the given session has received a message with
    3333     * size # of bytes.
     34     * This shouldn't be called anymore since we are registering as a muxed listener.
    3435     * @param session session to notify
    3536     * @param msgId message number available
     
    3738     */
    3839    public void messageAvailable(I2PSession session, int msgId, long size) {
     40        messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED,
     41                         I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
     42    }
     43
     44    /** Instruct the client that the given session has received a message with
     45     * size # of bytes.
     46     * @param session session to notify
     47     * @param msgId message number available
     48     * @param size size of the message
     49     */
     50    public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) {
    3951        byte data[] = null;
    4052        try {
     
    5062        try {
    5163            packet.readPacket(data, 0, data.length);
     64            packet.setRemotePort(fromPort);
     65            packet.setLocalPort(toPort);
    5266            _manager.getPacketHandler().receivePacket(packet);
    5367        } catch (IllegalArgumentException iae) {
  • apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java

    r67d608a r6d06427  
    6464        _dataReceiver = receiver;
    6565        _dataLock = new Object();
    66         _written = 0;
    67         _closed = false;
    6866        _writeTimeout = -1;
    6967        _passiveFlushDelay = passiveFlushDelay;
    7068        _nextBufferSize = -1;
    7169        _sendPeriodBeginTime = ctx.clock().now();
    72         _sendPeriodBytes = 0;
    73         _sendBps = 0;
    7470        _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 });
    7571        _flusher = new Flusher();
  • apps/streaming/java/src/net/i2p/client/streaming/Packet.java

    r67d608a r6d06427  
    1414
    1515/**
     16 * This contains solely the data that goes out on the wire,
     17 * including the local and remote port which is embedded in
     18 * the I2CP overhead, not in the packet itself.
     19 * For local state saved for outbound packets, see PacketLocal.
     20 *
     21 * <p>
     22 *
    1623 * Contain a single packet transferred as part of a streaming connection. 
    1724 * The data format is as follows:<ul>
     
    6875    private int _optionDelay;
    6976    private int _optionMaxSize;
     77    private int _localPort;
     78    private int _remotePort;
    7079   
    7180    /**
     
    149158    protected static final int MAX_DELAY_REQUEST = 65535;
    150159
     160    /**
     161     *  Does no initialization.
     162     *  See readPacket() for inbound packets, and the setters for outbound packets.
     163     */
    151164    public Packet() { }
    152165   
     
    323336    }
    324337   
     338    /**
     339     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     340     *  @since 0.8.9
     341     */
     342    public int getLocalPort() {
     343        return _localPort;
     344    }
     345
     346    /**
     347     *  Must be called to change the port, not set by readPacket()
     348     *  as the port is out-of-band in the I2CP header.
     349     *  @since 0.8.9
     350     */
     351    public void setLocalPort(int port) {
     352        _localPort = port;
     353    }
     354   
     355    /**
     356     *  @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0)
     357     *  @since 0.8.9
     358     */
     359    public int getRemotePort() {
     360        return _remotePort;
     361    }
     362
     363    /**
     364     *  Must be called to change the port, not set by readPacket()
     365     *  as the port is out-of-band in the I2CP header.
     366     *  @since 0.8.9
     367     */
     368    public void setRemotePort(int port) {
     369        _remotePort = port;
     370    }
     371
    325372    /**
    326373     * Write the packet to the buffer (starting at the offset) and return
  • apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java

    r67d608a r6d06427  
    1717 */
    1818class PacketHandler {
    19     private ConnectionManager _manager;
    20     private I2PAppContext _context;
    21     private Log _log;
     19    private final ConnectionManager _manager;
     20    private final I2PAppContext _context;
     21    private final Log _log;
    2222    //private int _lastDelay;
    2323    //private int _dropped;
  • apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java

    r67d608a r6d06427  
    1414 */
    1515class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
    16     private I2PAppContext _context;
    17     private Log _log;
    18     private Connection _connection;
     16    private final I2PAppContext _context;
     17    private final Log _log;
     18    private final Connection _connection;
    1919    private Destination _to;
    2020    private SessionKey _keyUsed;
    2121    private Set _tagsSent;
    22     private long _createdOn;
     22    private final long _createdOn;
    2323    private int _numSends;
    2424    private long _lastSend;
     
    3030    private SimpleTimer2.TimedEvent _resendEvent;
    3131   
     32    /** not bound to a connection */
    3233    public PacketLocal(I2PAppContext ctx, Destination to) {
    3334        this(ctx, to, null);
    3435    }
     36
    3537    public PacketLocal(I2PAppContext ctx, Destination to, Connection con) {
    3638        _context = ctx;
     
    4143        _lastSend = -1;
    4244        _cancelledOn = -1;
    43         _nackCount = 0;
    44         _retransmitted = false;
    4545    }
    4646   
     
    139139    public int getNumSends() { return _numSends; }
    140140    public long getLastSend() { return _lastSend; }
     141
     142    /** @return null if not bound */
    141143    public Connection getConnection() { return _connection; }
    142144
  • apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java

    r67d608a r6d06427  
    2020 */
    2121class PacketQueue {
    22     private I2PAppContext _context;
    23     private Log _log;
    24     private I2PSession _session;
    25     private ConnectionManager _connectionManager;
    26     private ByteCache _cache = ByteCache.getInstance(64, 36*1024);
     22    private final I2PAppContext _context;
     23    private final Log _log;
     24    private final I2PSession _session;
     25    private final ConnectionManager _connectionManager;
     26    private final ByteCache _cache = ByteCache.getInstance(64, 36*1024);
    2727   
    2828    public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
     
    9999                // I2PSessionMuxedImpl no tags
    100100                sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires,
    101                                  I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
     101                                 I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort());
    102102            else
    103103                // I2PSessionImpl2
     
    108108                // I2PSessionMuxedImpl no tags
    109109                sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null,
    110                                  I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
     110                                 I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort());
    111111            end = _context.clock().now();
    112112           
  • apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java

    r67d608a r6d06427  
    108108
    109109    /**
    110      *  @return -1 always
     110     *  @return the port or 0 if unknown
    111111     */
    112112    @Override
    113113    public int getLocalPort() {
    114         return -1;
     114        return _socket.getLocalPort();
    115115    }
    116116
     
    140140
    141141    /**
    142      *  @return 0 always
     142     *  @return the port or 0 if unknown
    143143     */
    144144    @Override
    145145    public int getPort() {
    146         return 0;
     146        return _socket.getPort();
    147147    }
    148148
  • apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java

    r67d608a r6d06427  
    2222 */
    2323class TCBShare {
    24     private I2PAppContext _context;
    25     private Log _log;
    26     private Map<Destination, Entry> _cache;
    27     private CleanEvent _cleaner;
     24    private final I2PAppContext _context;
     25    private final Log _log;
     26    private final Map<Destination, Entry> _cache;
     27    private final CleanEvent _cleaner;
    2828
    2929    private static final long EXPIRE_TIME = 30*60*1000;
  • core/java/src/net/i2p/client/I2PClient.java

    r67d608a r6d06427  
    4545     * the router how to handle the new session, and to configure the end to end
    4646     * encryption.
    47      * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from
     47     *
     48     * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from,
     49     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    4850     * @param options set of options to configure the router with, if null will use System properties
    4951     * @return new session allowing a Destination to recieve all of its messages and send messages to any other Destination.
     
    5355    /** Create a new destination with the default certificate creation properties and store
    5456     * it, along with the private encryption and signing keys at the specified location
     57     *
    5558     * @param destKeyStream create a new destination and write out the object to the given stream,
    5659     *                      formatted as Destination, PrivateKey, and SigningPrivateKey
     60     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    5761     * @return new destination
    5862     */
     
    6266     * encryption and signing keys at the specified location
    6367     *
    64      * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey
     68     * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey,
     69     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    6570     * @param cert certificate to tie to the destination
    6671     * @return newly created destination
  • core/java/src/net/i2p/client/I2PClientImpl.java

    r67d608a r6d06427  
    3131 */
    3232class I2PClientImpl implements I2PClient {
     33
    3334    /**
    3435     * Create the destination with a null payload
     36     *
     37     * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey,
     38     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    3539     */
    3640    public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException {
     
    4549     * the PrivateKey and SigningPrivateKey to the destKeyStream
    4650     *
     51     * @param destKeyStream location to write out the destination, PrivateKey, and SigningPrivateKey,
     52     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    4753     */
    4854    public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException {
     
    6874    /**
    6975     * Create a new session (though do not connect it yet)
     76     *
     77     * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from,
     78     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    7079     * @param options set of options to configure the router with, if null will use System properties
    7180     */
     
    7382        return createSession(I2PAppContext.getGlobalContext(), destKeyStream, options);
    7483    }
     84
    7585    /**
    7686     * Create a new session (though do not connect it yet)
     87     *
     88     * @param destKeyStream location from which to read the Destination, PrivateKey, and SigningPrivateKey from,
     89     *                      format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    7790     * @param options set of options to configure the router with, if null will use System properties
    7891     */
  • core/java/src/net/i2p/client/I2PSession.java

    r67d608a r6d06427  
    218218    public int[] bandwidthLimits() throws I2PSessionException;
    219219
    220     /** See I2PSessionMuxedImpl for details */
     220    /**
     221     *  Listen on specified protocol and port.
     222     *
     223     *  An existing listener with the same proto and port is replaced.
     224     *  Only the listener with the best match is called back for each message.
     225     *
     226     *  @param proto 1-254 or PROTO_ANY (0) for all; recommended:
     227     *         I2PSession.PROTO_STREAMING
     228     *         I2PSession.PROTO_DATAGRAM
     229     *         255 disallowed
     230     *  @param port 1-65535 or PORT_ANY (0) for all
     231     *  @since 0.7.1
     232     */
    221233    public void addSessionListener(I2PSessionListener lsnr, int proto, int port);
    222     /** See I2PSessionMuxedImpl for details */
     234
     235    /**
     236     *  Listen on specified protocol and port, and receive notification
     237     *  of proto, fromPort, and toPort for every message.
     238     *  @param proto 1-254 or PROTO_ANY (0) for all; 255 disallowed
     239     *  @param port 1-65535 or PORT_ANY (0) for all
     240     *  @since 0.7.1
     241     */
    223242    public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port);
    224     /** See I2PSessionMuxedImpl for details */
     243
     244    /**
     245     *  removes the specified listener (only)
     246     *  @since 0.7.1
     247     */
    225248    public void removeListener(int proto, int port);
    226249
  • core/java/src/net/i2p/client/I2PSessionDemultiplexer.java

    r67d608a r6d06427  
    1717 * protocol, from port, and to port for every received message.
    1818 *
    19  * This only calls one listener, not all that apply.
     19 * messageAvailable() only calls one listener, not all that apply.
     20 * The others call all listeners.
    2021 *
    2122 * @author zzz
    2223 */
    2324public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
    24     private Log _log;
    25     private Map<Integer, I2PSessionMuxedListener> _listeners;
     25    private final Log _log;
     26    private final Map<Integer, I2PSessionMuxedListener> _listeners;
    2627
    2728    public I2PSessionDemultiplexer(I2PAppContext ctx) {
  • core/java/src/net/i2p/client/I2PSessionImpl.java

    r67d608a r6d06427  
    160160     * from the destKeyStream, and using the specified options to connect to the router
    161161     *
     162     * @param destKeyStream stream containing the private key data,
     163     *                             format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    162164     * @param options set of options to configure the router with, if null will use System properties
    163165     * @throws I2PSessionException if there is a problem loading the private keys or
  • core/java/src/net/i2p/client/I2PSessionImpl2.java

    r67d608a r6d06427  
    5050     * from the destKeyStream, and using the specified options to connect to the router
    5151     *
     52     * @param destKeyStream stream containing the private key data,
     53     *                             format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    5254     * @param options set of options to configure the router with, if null will use System properties
    5355     * @throws I2PSessionException if there is a problem loading the private keys or
  • core/java/src/net/i2p/client/I2PSessionMuxedImpl.java

    r67d608a r6d06427  
    6666 */
    6767class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
    68     private I2PSessionDemultiplexer _demultiplexer;
     68
     69    private final I2PSessionDemultiplexer _demultiplexer;
    6970
    7071    /*
     72     * @param destKeyStream stream containing the private key data,
     73     *                             format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile}
    7174     * @param options set of options to configure the router with, if null will use System properties
    7275     */
     
    9396     *  Only the listener with the best match is called back for each message.
    9497     *
    95      *  @param proto 1-254 or PROTO_ANY for all; recommended:
     98     *  @param proto 1-254 or PROTO_ANY (0) for all; recommended:
    9699     *         I2PSession.PROTO_STREAMING
    97100     *         I2PSession.PROTO_DATAGRAM
    98101     *         255 disallowed
    99      *  @param port 1-65535 or PORT_ANY for all
     102     *  @param port 1-65535 or PORT_ANY (0) for all
    100103     */
    101104    @Override
     
    107110     *  Listen on specified protocol and port, and receive notification
    108111     *  of proto, fromPort, and toPort for every message.
    109      *  @param proto 1-254 or 0 for all; 255 disallowed
    110      *  @param port 1-65535 or 0 for all
     112     *  @param proto 1-254 or PROTO_ANY (0) for all; 255 disallowed
     113     *  @param port 1-65535 or PORT_ANY (0) for all
    111114     */
    112115    @Override
  • core/java/src/net/i2p/util/EventDispatcherImpl.java

    r67d608a r6d06427  
    1010 */
    1111
    12 import java.util.ArrayList;
    1312import java.util.Collections;
    14 import java.util.HashMap;
    1513import java.util.HashSet;
    16 import java.util.Iterator;
    17 import java.util.ListIterator;
     14import java.util.List;
     15import java.util.Map;
    1816import java.util.Set;
     17import java.util.concurrent.ConcurrentHashMap;
     18import java.util.concurrent.CopyOnWriteArrayList;
    1919
    2020/**
     
    3535public class EventDispatcherImpl implements EventDispatcher {
    3636
    37     //private final static Log _log = new Log(EventDispatcherImpl.class);
    38 
    3937    private boolean _ignore = false;
    40     private final HashMap _events = new HashMap(4);
    41     private final ArrayList _attached = new ArrayList();
     38    private final Map<String, Object> _events = new ConcurrentHashMap(4);
     39    private final List<EventDispatcher> _attached = new CopyOnWriteArrayList();
    4240   
    4341    public EventDispatcher getEventDispatcher() {
     
    4745    public void attachEventDispatcher(EventDispatcher ev) {
    4846        if (ev == null) return;
    49         synchronized (_attached) {
    50             //_log.debug(this.hashCode() + ": attaching EventDispatcher " + ev.hashCode());
    51             _attached.add(ev);
    52         }
     47        _attached.add(ev);
    5348    }
    5449   
    5550    public void detachEventDispatcher(EventDispatcher ev) {
    5651        if (ev == null) return;
    57         synchronized (_attached) {
    58             ListIterator it = _attached.listIterator();
    59             while (it.hasNext()) {
    60                 if (((EventDispatcher) it.next()) == ev) {
    61                     it.remove();
    62                     break;
    63                 }
    64             }
    65         }
     52        _attached.remove(ev);
    6653    }
    6754   
     
    7158            args = "[null value]";
    7259        }
    73         //_log.debug(this.hashCode() + ": got notification [" + eventName + "] = [" + args + "]");
     60        _events.put(eventName, args);
    7461        synchronized (_events) {
    75             _events.put(eventName, args);
    7662            _events.notifyAll();
    77             synchronized (_attached) {
    78                 Iterator it = _attached.iterator();
    79                 EventDispatcher e;
    80                 while (it.hasNext()) {
    81                     e = (EventDispatcher) it.next();
    82                     //_log.debug(this.hashCode() + ": notifying attached EventDispatcher " + e.hashCode() + ": ["
    83                     //           + eventName + "] = [" + args + "]");
    84                     e.notifyEvent(eventName, args);
    85                 }
    86             }
     63        }
     64        for (EventDispatcher e : _attached) {
     65            e.notifyEvent(eventName, args);
    8766        }
    8867    }
     
    9069    public Object getEventValue(String name) {
    9170        if (_ignore) return null;
    92         Object val;
    93 
    94         synchronized (_events) {
    95             val = _events.get(name);
    96         }
    97 
    98         return val;
     71        return _events.get(name);
    9972    }
    10073
    101     public Set getEvents() {
     74    public Set<String> getEvents() {
    10275        if (_ignore) return Collections.EMPTY_SET;
    103         Set set;
    104 
    105         synchronized (_events) {
    106             set = new HashSet(_events.keySet());
    107         }
    108 
    109         return set;
     76        return new HashSet(_events.keySet());
    11077    }
    11178   
    11279    public void ignoreEvents() {
    11380        _ignore = true;
    114         synchronized (_events) {
    115             _events.clear();
    116         }
     81        _events.clear();
    11782    }
    11883   
     
    12388    public Object waitEventValue(String name) {
    12489        if (_ignore) return null;
    125         Object val;
    126 
    127         //_log.debug(this.hashCode() + ": waiting for [" + name + "]");
    12890        do {
    12991            synchronized (_events) {
    130                 if (_events.containsKey(name)) {
    131                     val = _events.get(name);
    132                     break;
    133                 }
     92                Object val = _events.get(name);
     93                if (val != null)
     94                    return val;
    13495                try {
    135                     _events.wait(1 * 1000);
    136                 } catch (InterruptedException e) { // nop
    137                 }
     96                    _events.wait(5 * 1000);
     97                } catch (InterruptedException e) {}
    13898            }
    13999        } while (true);
    140 
    141         return val;
    142100    }
    143101}
Note: See TracChangeset for help on using the changeset viewer.