Changeset 26c4f98


Ignore:
Timestamp:
May 29, 2009 9:14:08 PM (11 years ago)
Author:
sponge <sponge@…>
Branches:
master
Children:
f511c9b
Parents:
fd598de
Message:
  • added big fat start/stop lock into BOB
  • added zap command to shut down BOB… now we need a way to start it after it stops. :-)
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • apps/BOB/src/net/i2p/BOB/BOB.java

    rfd598de r26c4f98  
    2828import java.io.FileOutputStream;
    2929import java.io.IOException;
     30import java.net.ConnectException;
    3031import java.net.InetAddress;
    3132import java.net.ServerSocket;
    3233import java.net.Socket;
     34import java.net.SocketTimeoutException;
    3335import java.util.Properties;
     36import java.util.concurrent.atomic.AtomicBoolean;
    3437import net.i2p.client.I2PClient;
    3538import net.i2p.client.streaming.RetransmissionTimer;
    3639import net.i2p.util.Log;
    3740import net.i2p.util.SimpleScheduler;
     41import net.i2p.util.SimpleStore;
    3842import net.i2p.util.SimpleTimer2;
    3943
     
    115119        public final static String PROP_BOB_PORT = "BOB.port";
    116120        public final static String PROP_BOB_HOST = "BOB.host";
    117         private static int maxConnections = 0;
    118121        private static NamedDB database;
    119122        private static Properties props = new Properties();
    120 
     123        private static AtomicBoolean spin = new AtomicBoolean(true);
     124        private static final String P_RUNNING = "RUNNING";
     125        private static final String P_STARTING = "STARTING";
     126        private static final String P_STOPPING = "STOPPING";
     127        private static AtomicBoolean lock = new AtomicBoolean(false);
     128        // no longer used.
     129        // private static int maxConnections = 0;
    121130
    122131        /**
     
    151160
    152161        /**
     162         * Stop BOB gracefully
     163         */
     164        public static void stop() {
     165                spin.set(false);
     166        }
     167        /**
    153168         * Listen for incoming connections and handle them
    154169         *
     
    157172        public static void main(String[] args) {
    158173                database = new NamedDB();
     174                ServerSocket listener = null;
    159175                int i = 0;
    160176                boolean save = false;
     
    169185                i = Y1.hashCode();
    170186                i = Y2.hashCode();
    171 
    172                 {
     187                try {
     188                        {
     189                                try {
     190                                        FileInputStream fi = new FileInputStream(configLocation);
     191                                        props.load(fi);
     192                                        fi.close();
     193                                } catch (FileNotFoundException fnfe) {
     194                                        warn("Unable to load up the BOB config file " + configLocation + ", Using defaults.");
     195                                        warn(fnfe.toString());
     196                                        save = true;
     197                                } catch (IOException ioe) {
     198                                        warn("IOException on BOB config file " + configLocation + ", using defaults.");
     199                                        warn(ioe.toString());
     200                                }
     201                        }
     202                        // Global router and client API configurations that are missing are set to defaults here.
     203                        if (!props.containsKey(I2PClient.PROP_TCP_HOST)) {
     204                                props.setProperty(I2PClient.PROP_TCP_HOST, "localhost");
     205                        }
     206                        if (!props.containsKey(I2PClient.PROP_TCP_PORT)) {
     207                                props.setProperty(I2PClient.PROP_TCP_PORT, "7654");
     208                        }
     209                        if (!props.containsKey(I2PClient.PROP_RELIABILITY)) {
     210                                props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
     211                        }
     212                        if (!props.containsKey(PROP_BOB_PORT)) {
     213                                props.setProperty(PROP_BOB_PORT, "2827"); // 0xB0B
     214                        }
     215                        if (!props.containsKey("inbound.length")) {
     216                                props.setProperty("inbound.length", "1");
     217                        }
     218                        if (!props.containsKey("outbound.length")) {
     219                                props.setProperty("outbound.length", "1");
     220                        }
     221                        if (!props.containsKey("inbound.lengthVariance")) {
     222                                props.setProperty("inbound.lengthVariance", "0");
     223                        }
     224                        if (!props.containsKey("outbound.lengthVariance")) {
     225                                props.setProperty("outbound.lengthVariance", "0");
     226                        }
     227                        if (!props.containsKey(PROP_BOB_HOST)) {
     228                                props.setProperty(PROP_BOB_HOST, "localhost");
     229                        }
     230                        if (save) {
     231                                try {
     232                                        warn("Writing new defaults file " + configLocation);
     233                                        FileOutputStream fo = new FileOutputStream(configLocation);
     234                                        props.store(fo, configLocation);
     235                                        fo.close();
     236                                } catch (IOException ioe) {
     237                                        error("IOException on BOB config file " + configLocation + ", " + ioe);
     238                                }
     239                        }
     240
     241                        i = 0;
     242                        boolean g = false;
    173243                        try {
    174                                 FileInputStream fi = new FileInputStream(configLocation);
    175                                 props.load(fi);
    176                                 fi.close();
    177                         } catch(FileNotFoundException fnfe) {
    178                                 warn("Unable to load up the BOB config file " + configLocation + ", Using defaults.");
    179                                 warn(fnfe.toString());
    180                                 save = true;
    181                         } catch(IOException ioe) {
    182                                 warn("IOException on BOB config file " + configLocation + ", using defaults.");
    183                                 warn(ioe.toString());
    184                         }
     244                                info("BOB is now running.");
     245                                listener = new ServerSocket(Integer.parseInt(props.getProperty(PROP_BOB_PORT)), 10, InetAddress.getByName(props.getProperty(PROP_BOB_HOST)));
     246                                Socket server = null;
     247                                listener.setSoTimeout(500); // .5 sec
     248                                while (spin.get()) {
     249                                        //DoCMDS connection;
     250
     251                                        try {
     252                                                server = listener.accept();
     253                                                g = true;
     254                                        } catch (ConnectException ce) {
     255                                                g = false;
     256                                        } catch (SocketTimeoutException ste) {
     257                                                g = false;
     258                                        }
     259
     260                                        if (g) {
     261                                                DoCMDS conn_c = new DoCMDS(spin, lock, server, props, database, _log);
     262                                                Thread t = new Thread(conn_c);
     263                                                t.setName("BOB.DoCMDS " + i);
     264                                                t.start();
     265                                                i++;
     266                                        }
     267                                }
     268                        } catch (IOException ioe) {
     269                                error("IOException on socket listen: " + ioe);
     270                                ioe.printStackTrace();
     271                        }
     272                } finally {
     273                        info("BOB is now shutting down...");
     274                        // Clean up everything.
     275                        try {
     276                                listener.close();
     277                        } catch (Exception ex) {
     278                                // nop
     279                        }
     280                        // Find all our "BOB.DoCMDS" threads, wait for them to be finished.
     281                        // We could order them to stop, but that could cause nasty issues in the locks.
     282                        visitAllThreads();
     283                        database.getReadLock();
     284                        int all = database.getcount();
     285                        database.releaseReadLock();
     286                        NamedDB nickinfo;
     287                        for (i = 0; i < all; i++) {
     288                                database.getReadLock();
     289                                nickinfo = (NamedDB) database.getnext(i);
     290                                nickinfo.getReadLock();
     291                                if (nickinfo.get(P_RUNNING).equals(Boolean.TRUE) && nickinfo.get(P_STOPPING).equals(Boolean.FALSE) && nickinfo.get(P_STARTING).equals(Boolean.FALSE)) {
     292                                        nickinfo.releaseReadLock();
     293                                        database.releaseReadLock();
     294                                        database.getWriteLock();
     295                                        nickinfo.getWriteLock();
     296                                        nickinfo.add(P_STOPPING, new Boolean(true));
     297                                        nickinfo.releaseWriteLock();
     298                                        database.releaseWriteLock();
     299                                } else {
     300                                        nickinfo.releaseReadLock();
     301                                        database.releaseReadLock();
     302                                }
     303                        }
     304                        info("BOB is now stopped.");
     305
    185306                }
    186                 // Global router and client API configurations that are missing are set to defaults here.
    187                 if(!props.containsKey(I2PClient.PROP_TCP_HOST)) {
    188                         props.setProperty(I2PClient.PROP_TCP_HOST, "localhost");
     307        }
     308
     309        /**
     310         *      Find the root thread group,
     311         *      then find all theads with certain names and wait for them all to be dead.
     312         *
     313         */
     314        private static void visitAllThreads() {
     315                ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
     316                while (root.getParent() != null) {
     317                        root = root.getParent();
    189318                }
    190                 if(!props.containsKey(I2PClient.PROP_TCP_PORT)) {
    191                         props.setProperty(I2PClient.PROP_TCP_PORT, "7654");
     319
     320                // Visit each thread group
     321                waitjoin(root, 0, root.getName());
     322        }
     323
     324        private static void waitjoin(ThreadGroup group, int level, String tn) {
     325                // Get threads in `group'
     326                int numThreads = group.activeCount();
     327                Thread[] threads = new Thread[numThreads * 2];
     328                numThreads = group.enumerate(threads, false);
     329                // Enumerate each thread in `group' and wait for it to stop if it is one of ours.
     330                for (int i = 0; i < numThreads; i++) {
     331                        // Get thread
     332                        Thread thread = threads[i];
     333                        if (thread.getName().startsWith("BOB.DoCMDS ")) {
     334                                try {
     335                                        if (thread.isAlive()) {
     336                                                try {
     337                                                        thread.join();
     338                                                } catch (InterruptedException ex) {
     339                                                }
     340                                        }
     341                                } catch (SecurityException se) {
     342                                        //nop
     343                                }
     344                        }
    192345                }
    193                 if(!props.containsKey(I2PClient.PROP_RELIABILITY)) {
    194                         props.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
     346
     347                // Get thread subgroups of `group'
     348                int numGroups = group.activeGroupCount();
     349                ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
     350                numGroups = group.enumerate(groups, false);
     351
     352                // Recursively visit each subgroup
     353                for (int i = 0; i < numGroups; i++) {
     354                        waitjoin(groups[i], level + 1, groups[i].getName());
    195355                }
    196                 if(!props.containsKey(PROP_BOB_PORT)) {
    197                         props.setProperty(PROP_BOB_PORT, "2827"); // 0xB0B
    198                 }
    199                 if(!props.containsKey("inbound.length")) {
    200                         props.setProperty("inbound.length", "1");
    201                 }
    202                 if(!props.containsKey("outbound.length")) {
    203                         props.setProperty("outbound.length", "1");
    204                 }
    205                 if(!props.containsKey("inbound.lengthVariance")) {
    206                         props.setProperty("inbound.lengthVariance", "0");
    207                 }
    208                 if(!props.containsKey("outbound.lengthVariance")) {
    209                         props.setProperty("outbound.lengthVariance", "0");
    210                 }
    211                 if(!props.containsKey(PROP_BOB_HOST)) {
    212                         props.setProperty(PROP_BOB_HOST, "localhost");
    213                 }
    214                 if(save) {
    215                         try {
    216                                 warn("Writing new defaults file " + configLocation);
    217                                 FileOutputStream fo = new FileOutputStream(configLocation);
    218                                 props.store(fo, configLocation);
    219                                 fo.close();
    220                         } catch(IOException ioe) {
    221                                 error("IOException on BOB config file " + configLocation + ", " + ioe);
    222                         }
    223                 }
    224 
    225                 i = 0;
    226                 try {
    227                         info("BOB is now running.");
    228                         ServerSocket listener = new ServerSocket(Integer.parseInt(props.getProperty(PROP_BOB_PORT)), 10, InetAddress.getByName(props.getProperty(PROP_BOB_HOST)));
    229                         Socket server;
    230 
    231                         while((i++ < maxConnections) || (maxConnections == 0)) {
    232                                 //DoCMDS connection;
    233 
    234                                 server = listener.accept();
    235                                 DoCMDS conn_c = new DoCMDS(server, props, database, _log);
    236                                 Thread t = new Thread(conn_c);
    237                                 t.start();
    238                         }
    239                 } catch(IOException ioe) {
    240                         error("IOException on socket listen: " + ioe);
    241                         ioe.printStackTrace();
    242                 }
    243356        }
    244357}
  • apps/BOB/src/net/i2p/BOB/DoCMDS.java

    rfd598de r26c4f98  
    3232import java.util.Properties;
    3333import java.util.StringTokenizer;
     34import java.util.concurrent.atomic.AtomicBoolean;
    3435import net.i2p.I2PException;
    3536import net.i2p.client.I2PClientFactory;
    3637import net.i2p.data.Destination;
    3738import net.i2p.util.Log;
     39import net.i2p.util.SimpleStore;
    3840
    3941/**
     
    5860        private NamedDB nickinfo;
    5961        private Log _log;
     62        private AtomicBoolean LIVE;
     63        private AtomicBoolean lock;
    6064        /* database strings */
    6165        private static final String P_DEST = "DESTINATION";
     
    9599        private static final String C_stop = "stop";
    96100        private static final String C_verify = "verify";
     101        private static final String C_zap = "zap";
    97102
    98103        /* all the coomands available, plus description */
     
    120125                {C_stop, C_stop + " * Stops the current nicknamed tunnel."},
    121126                {C_verify, C_verify + " BASE64_key * Verifies BASE64 destination."},
     127                {C_zap, C_zap + " * Shuts down BOB."},
    122128                {"", "COMMANDS: " + // this is ugly, but...
    123129                        C_help + " " +
     
    142148                        C_status + " " +
    143149                        C_stop + " " +
    144                         C_verify
     150                        C_verify + " " +
     151                        C_zap
    145152                },
    146153                {" ", " "} // end of list
     
    148155
    149156        /**
    150          *
     157         * @parm LIVE
    151158         * @param server
    152159         * @param props
     
    154161         * @param _log
    155162         */
    156         DoCMDS(Socket server, Properties props, NamedDB database, Log _log) {
     163        DoCMDS(AtomicBoolean LIVE, AtomicBoolean lock, Socket server, Properties props, NamedDB database, Log _log) {
     164                this.lock = lock;
     165                this.LIVE = LIVE;
    157166                this.server = server;
    158167                this.props = new Properties();
     
    510519                                                                // End the command session
    511520                                                                break quit;
     521                                                        } else if (Command.equals(C_zap)) {
     522                                                                // Kill BOB!! (let's hope this works!)
     523                                                                LIVE.set(false);
     524                                                                // End the command session
     525                                                                break quit;
    512526                                                        } else if (Command.equals(C_newkeys)) {
    513527                                                                if (ns) {
     
    12611275                                                                                        MUXlisten tunnel;
    12621276                                                                                        try {
    1263                                                                                                 tunnel = new MUXlisten(database, nickinfo, _log);
     1277                                                                                                while(!lock.compareAndSet(false, true)) {
     1278                                                                                                        // wait
     1279                                                                                                }
     1280                                                                                                tunnel = new MUXlisten(lock, database, nickinfo, _log);
    12641281                                                                                                Thread t = new Thread(tunnel);
    12651282                                                                                                t.start();
     
    12711288                                                                                                out.println("OK tunnel starting");
    12721289                                                                                        } catch (I2PException e) {
     1290                                                                                                lock.set(false);
    12731291                                                                                                out.println("ERROR starting tunnel: " + e);
    12741292                                                                                        } catch (IOException e) {
     1293                                                                                                lock.set(false);
    12751294                                                                                                out.println("ERROR starting tunnel: " + e);
    12761295                                                                                        }
  • apps/BOB/src/net/i2p/BOB/I2Plistener.java

    rfd598de r26c4f98  
    2626import java.net.ConnectException;
    2727import java.net.SocketTimeoutException;
    28 import java.util.logging.Level;
    29 import java.util.logging.Logger;
    3028import net.i2p.I2PException;
    3129import net.i2p.client.streaming.I2PServerSocket;
  • apps/BOB/src/net/i2p/BOB/MUXlisten.java

    rfd598de r26c4f98  
    2929import java.net.ServerSocket;
    3030import java.util.Properties;
     31import java.util.concurrent.atomic.AtomicBoolean;
    3132import net.i2p.I2PException;
    3233import net.i2p.client.streaming.I2PServerSocket;
     
    5354        boolean go_out;
    5455        boolean come_in;
     56        private AtomicBoolean lock;
    5557
    5658        /**
     
    6365         * @throws java.io.IOException
    6466         */
    65         MUXlisten(NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException {
     67        MUXlisten(AtomicBoolean lock, NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException {
    6668                int port = 0;
    6769                InetAddress host = null;
     70                this.lock = lock;
    6871                this.tg = null;
    6972                this.database = database;
     
    152155                        }
    153156//              socketManager.addDisconnectListener(new DisconnectListener());
    154 
     157                        lock.set(false);
    155158                        quit:
    156159                        {
     
    217220                                                        }
    218221                                                }
    219 
     222                                                /* cleared in the finally...
    220223                                                try {
    221224                                                        wlock();
     
    234237                                                        break die;
    235238                                                }
     239                                                */
    236240                                        } // die
    237241
     
    242246                        } // quit
    243247                } finally {
    244                         // Start cleanup. Allow threads above this one to catch the stop signal.
    245                         try {
    246                                 Thread.sleep(250);
    247                         } catch (InterruptedException ex) {
    248                         }
     248                        // Start cleanup.
     249                        while (!lock.compareAndSet(false, true)) {
     250                                // wait
     251                        }
     252
    249253                        // zero out everything.
    250254                        try {
     
    262266                        }
    263267
    264 
    265268                        if (SS != null) {
    266269                                try {
     
    280283                        } catch (Exception e) {
    281284                                // nop
    282                                 }
     285                        }
     286                        // Some grace time.
     287                        try {
     288                                Thread.sleep(250);
     289                        } catch (InterruptedException ex) {
     290                        }
     291
     292                        lock.set(false); // Should we force waiting for all threads??
    283293                        // Wait around till all threads are collected.
    284294                        if (tg != null) {
     
    363373        }
    364374
     375        /*
    365376        private static void nuke(ThreadGroup group, int level) {
    366                 // Get threads in `group'
    367                 int numThreads = group.activeCount();
    368                 Thread[] threads = new Thread[numThreads * 2];
    369                 numThreads = group.enumerate(threads, false);
    370                 // Enumerate each thread in `group' and stop it.
    371                 for (int i = 0; i < numThreads; i++) {
    372                         // Get thread
    373                         Thread thread = threads[i];
    374                         try {
    375                                 if (thread.isAlive()) {
    376                                         thread.stop();
    377                                 }
    378                         } catch (SecurityException se) {
    379                                 //nop
    380                         }
    381                 }
    382 
    383                 // Get thread subgroups of `group'
    384                 int numGroups = group.activeGroupCount();
    385                 ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
    386                 numGroups = group.enumerate(groups, false);
    387 
    388                 // Recursively visit each subgroup
    389                 for (int i = 0; i < numGroups; i++) {
    390                         nuke(groups[i], level + 1);
    391                 }
    392                 try {
    393                         group.destroy();
    394                 } catch (IllegalThreadStateException IE) {
    395                         //nop
    396                 } catch (SecurityException se) {
    397                         //nop
    398                 }
    399         }
     377        // Get threads in `group'
     378        int numThreads = group.activeCount();
     379        Thread[] threads = new Thread[numThreads * 2];
     380        numThreads = group.enumerate(threads, false);
     381        // Enumerate each thread in `group' and stop it.
     382        for (int i = 0; i < numThreads; i++) {
     383        // Get thread
     384        Thread thread = threads[i];
     385        try {
     386        if (thread.isAlive()) {
     387        thread.stop();
     388        }
     389        } catch (SecurityException se) {
     390        //nop
     391        }
     392        }
     393
     394        // Get thread subgroups of `group'
     395        int numGroups = group.activeGroupCount();
     396        ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
     397        numGroups = group.enumerate(groups, false);
     398
     399        // Recursively visit each subgroup
     400        for (int i = 0; i < numGroups; i++) {
     401        nuke(groups[i], level + 1);
     402        }
     403        try {
     404        group.destroy();
     405        } catch (IllegalThreadStateException IE) {
     406        //nop
     407        } catch (SecurityException se) {
     408        //nop
     409        }
     410        }
     411         */
    400412}
  • apps/BOB/src/net/i2p/BOB/TCPio.java

    rfd598de r26c4f98  
    5050                this.Ain = Ain;
    5151                this.Aout = Aout;
    52                 // this.info = info;
    53                 // this.database = database;
     52        // this.info = info;
     53        // this.database = database;
    5454        }
    5555
     
    8787                boolean spin = true;
    8888                try {
    89                         while(spin) {
    90                                 b = Ain.read(a, 0, 1);
    91                                 if(b > 0) {
    92                                         Aout.write(a, 0, b);
    93                                 } else if(b == 0) {
    94                                         Thread.yield(); // this should act like a mini sleep.
    95                                         if(Ain.available() == 0) {
     89                        try {
     90                                while (spin) {
     91                                        b = Ain.read(a, 0, 1);
     92                                        if (b > 0) {
     93                                                Aout.write(a, 0, b);
     94                                        } else if (b == 0) {
     95                                                Thread.yield(); // this should act like a mini sleep.
     96                                                if (Ain.available() == 0) {
    9697                                                        Thread.sleep(10);
     98                                                }
     99                                        } else {
     100                                                /* according to the specs:
     101                                                 *
     102                                                 * The total number of bytes read into the buffer,
     103                                                 * or -1 if there is no more data because the end of
     104                                                 * the stream has been reached.
     105                                                 *
     106                                                 */
     107                                                // System.out.println("TCPio: End Of Stream");
     108                                                // Ain.close();
     109                                                // Aout.close();
     110                                                //return;
     111                                                break;
    97112                                        }
    98                                 } else {
    99                                         /* according to the specs:
    100                                          *
    101                                          * The total number of bytes read into the buffer,
    102                                          * or -1 if there is no more data because the end of
    103                                          * the stream has been reached.
    104                                          *
    105                                          */
    106                                         // System.out.println("TCPio: End Of Stream");
    107                                         Ain.close();
    108                                         Aout.close();
    109                                         return;
    110113                                }
     114                        } catch (Exception e) {
    111115                        }
    112                 } catch(Exception e) {
     116                // System.out.println("TCPio: Leaving.");
     117                } finally {
    113118                        // Eject!!! Eject!!!
    114119                        //System.out.println("TCPio: Caught an exception " + e);
     
    123128                        return;
    124129                }
    125                 // System.out.println("TCPio: Leaving.");
    126130        }
    127131}
  • apps/BOB/src/net/i2p/BOB/TCPlistener.java

    rfd598de r26c4f98  
    3030// import net.i2p.client.I2PSession;
    3131// import net.i2p.client.I2PSessionException;
    32 import java.util.logging.Level;
    33 import java.util.logging.Logger;
    3432import net.i2p.client.streaming.I2PServerSocket;
    3533import net.i2p.client.streaming.I2PSocketManager;
     
    143141                                                }
    144142                                        }
    145                                         listener.close();
    146143                                } catch (IOException ioe) {
    147                                         try {
    148                                                 listener.close();
    149                                         } catch (IOException e) {
    150                                         }
    151144                                }
    152145                        }
  • history.txt

    rfd598de r26c4f98  
     12009-05-29 sponge
     2    * added big fat start/stop lock into BOB
     3    * added zap command to shut down BOB... now we need a way to start it
     4      after it stops. :-)
     5
    162009-05-27 Mathiasdm
    27    * Increase sendProcessingTime some more, add a property to configure.
  • router/java/src/net/i2p/router/RouterVersion.java

    rfd598de r26c4f98  
    1919    public final static String ID = "Monotone";
    2020    public final static String VERSION = CoreVersion.VERSION;
    21     public final static long BUILD = 8;
     21    public final static long BUILD = 9;
    2222    /** for example "-test" */
    2323    public final static String EXTRA = "";
Note: See TracChangeset for help on using the changeset viewer.