Opened 12 days ago

Last modified 3 days ago

#2590 new enhancement

Reduce # of job queue threads

Reported by: jogger Owned by:
Priority: minor Milestone: undecided
Component: router/general Version: 0.9.41
Keywords: Cc:
Parent Tickets: Sensitive: no

Description

I analyzed why there are so many of these threads using so little CPU. When running they always end up in front of the run queue before our heavy worker threads. I have run i2p with 1 job queue runner for years with out a hitch.

Turns out BootCommSystem? turns parallel threads on. Reason given: multiple lengthy jobs are loaded onto the queue during router startup only.

Consequently then runJob() should revert this at the end using:

getContext().jobQueue().runQueue(1);

Turns out runQueue() is an incomplete mess, incapable of doing what is documented in the comments. Resolving this it turned out that JobQueueRunner? maintains an unneccessary _id conflicting with thread.getName(), logs regular exit as critical, contains unused and unneccessary methods and unneccessarily directly accesses the runner set.

This makes for quite a few easy patches that I have already tested. If you agree this should be done, I will post them.

Subtickets

Change History (9)

comment:1 Changed 9 days ago by zzz

Component: unspecifiedrouter/general

Some disagreement and some questions:

JobQueue? and SimpleTimer2, by design, both run at higher priority to ensure timed events get run. Especially on Windows, this fixes a problem where periodic things don't run and everything gets hung up. Both of these execution mechanisms require multiple threads to ensure that things don't stall if something takes a long time or is blocked. The multiple threads doing nothing are relatively harmless, don't use much resources, and are not a reason for concern. It's just a thread pool.

While this is 15 year old code, it has been maintained, and it doesn't look like a "mess" to me. Please explain what's "incomplete" and "incapable" about it.

Much of the Job subsystem supports stats gathering for debugging that we rarely use any more. Perhaps that could be looked at.

comment:2 Changed 6 days ago by jogger

Please review the attached diffs implementing the remarks above for removal of redundant fields, added and corrected functionality and corrected logging.

--- i2p/i2p-0.9.41/router/java/src/net/i2p/router/startup/BootCommSystemJob.java
+++ i2p/41p/router/java/src/net/i2p/router/startup/BootCommSystemJob.java
@@ -54,6 +54,14 @@
         }
 
         ((RouterClock) getContext().clock()).addShiftListener(getContext().router());
+    // @jogger
+        if ( !SystemVersion.isWindows() )
+            // zzz says this can´t be done on Windoze
+            // latest JVM version and Windoze version problems reported
+            // to verify we still support them ??
+            // scale back to 1 runner as implied above
+            getContext().jobQueue().runQueue(1);
+    // @jogger \
     }
         
     private void startupDb() {
--- i2p-0.9.41/router/java/src/net/i2p/router/JobQueue.java
+++ 41p/router/java/src/net/i2p/router/JobQueue.java
@@ -22,7 +22,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import net.i2p.data.DataHelper;
 import net.i2p.router.message.HandleGarlicMessageJob;
@@ -46,8 +45,6 @@
     
     /** Integer (runnerId) to JobQueueRunner for created runners */
     private final Map<Integer, JobQueueRunner> _queueRunners;
-    /** a counter to identify a job runner */
-    private final static AtomicInteger _runnerId = new AtomicInteger(0);
     /** list of jobs that are ready to run ASAP */
     private final BlockingQueue<Job> _readyJobs;
     /** SortedSet of jobs that are scheduled for running in the future, earliest first */
@@ -384,7 +381,6 @@
         }
         _queueRunners.clear();
         _jobStats.clear();
-        _runnerId.set(0);
 
       /********
         if (_log.shouldLog(Log.WARN)) {
@@ -494,36 +490,38 @@
      *
      */
     public synchronized void runQueue(int numThreads) {
-            // we're still starting up [serially] and we've got at least one runner,
+            // we're still starting up [serially] and we've got at least one runner
+            // or no change specified,
             // so dont do anything
-            if ( (!_queueRunners.isEmpty()) && (!_allowParallelOperation) ) return;
-
-            // we've already enabled parallel operation, so grow to however many are
+        // @jogger
+            if ( (!_queueRunners.isEmpty()) && (!_allowParallelOperation) ||
+                numThreads == _queueRunners.size() || numThreads < 1) return;
+
+            // we've already enabled parallel operation, so adjust to however many are
             // specified
-            if (_queueRunners.size() < numThreads) {
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("Increasing the number of queue runners from " 
+            if (_log.shouldLog(Log.INFO))
+                _log.info("Changing the number of queue runners from "
                               + _queueRunners.size() + " to " + numThreads);
-                for (int i = _queueRunners.size(); i < numThreads; i++) {
-                    JobQueueRunner runner = new JobQueueRunner(_context, i);
-                    _queueRunners.put(Integer.valueOf(i), runner);
-                    runner.setName("JobQueue " + _runnerId.incrementAndGet() + '/' + numThreads);
-                    runner.start();
-                }
-            } else if (_queueRunners.size() == numThreads) {
-                for (JobQueueRunner runner : _queueRunners.values()) {
-                    runner.startRunning();
-                }
-            } else { // numThreads < # runners, so shrink
-                //for (int i = _queueRunners.size(); i > numThreads; i++) {
-                //     QueueRunner runner = (QueueRunner)_queueRunners.get(new Integer(i));
-                //     runner.stopRunning();
-                //}
-            }
-    }
-        
-    void removeRunner(int id) { _queueRunners.remove(Integer.valueOf(id)); }
-    
+            // if going up
+            for (int i = _queueRunners.size() + 1; i <= numThreads; i++) {
+                JobQueueRunner runner = new JobQueueRunner(_context);
+                _queueRunners.put(i, runner);
+                runner.start();
+            }
+            // if going down
+            for (int i = _queueRunners.size(); i > numThreads; i--) {
+                 JobQueueRunner runner = (JobQueueRunner)_queueRunners.get(i);
+                 runner.stopRunning();
+                 _queueRunners.remove(i);
+            }
+            // adjust names properly
+            for (int i = 1; i <= numThreads; i++) {
+                 JobQueueRunner runner = (JobQueueRunner)_queueRunners.get(i);
+                 runner.setName("JobQueue " + i + '/' + numThreads);
+            }
+        // @jogger \
+    }
+
     /**
      * Responsible for moving jobs from the timed queue to the ready queue, 
      * adjusting the number of queue runners, as well as periodically updating the 
--- i2p-0.9.41/router/java/src/net/i2p/router/JobQueueRunner.java
+++ 41p/router/java/src/net/i2p/router/JobQueueRunner.java
@@ -9,16 +9,14 @@
     private final Log _log;
     private final RouterContext _context;
     private volatile boolean _keepRunning;
-    private final int _id;
     private volatile Job _currentJob;
     private volatile Job _lastJob;
     private volatile long _lastBegin;
     private volatile long _lastEnd;
     //private volatile int _state;
-    
-    public JobQueueRunner(RouterContext context, int id) {
+
+    public JobQueueRunner(RouterContext context) {
         _context = context;
-        _id = id;
         _keepRunning = true;
         _log = _context.logManager().getLog(JobQueueRunner.class);
         setPriority(NORM_PRIORITY + 1);
@@ -30,9 +28,7 @@
     
     public Job getCurrentJob() { return _currentJob; }
     public Job getLastJob() { return _lastJob; }
-    public int getRunnerId() { return _id; }
     public void stopRunning() { _keepRunning = false; }
-    public void startRunning() { _keepRunning = true; }
     public long getLastBegin() { return _lastBegin; }
     public long getLastEnd() { return _lastEnd; }
     public void run() {
@@ -70,7 +66,7 @@
                 _lastJob = null;
                 //_state = 9;
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Runner " + _id + " running job " + job.getJobId() + ": " + job.getName());
+                    _log.debug(this.getName() + " running job " + job.getJobId() + ": " + job.getName());
                 long origStartAfter = job.getTiming().getStartAfter();
                 long doStart = _context.clock().now();
                 //_state = 10;
@@ -122,9 +118,10 @@
             }
         }
         //_state = 16;
-        if (_context.router().isAlive())
-            _log.log(Log.CRIT, "Queue runner " + _id + " exiting");
-        _context.jobQueue().removeRunner(_id);
+    // @jogger
+        if (_context.router().isAlive() && _log.shouldLog(Log.INFO))
+            _log.log(Log.INFO, "Queue runner " + this.getName() + " exiting");
+    // @jogger \
         //_state = 17;
     }
     
@@ -144,7 +141,7 @@
         } catch (Throwable t) {
             //_state = 21;
             _log.log(Log.CRIT, "Error processing job [" + _currentJob.getName() 
-                                   + "] on thread " + _id + ": " + t, t);
+                                   + "] on thread " + this.getName() + ": " + t, t);
         }
     }
 }

comment:3 Changed 5 days ago by zzz

NAK. Thread pools are a good thing. Going down to 1 could create rare and hard-to-reproduce bugs. There's nothing wrong with thread pools. There's nothing wrong with having an ID to put in the thread name for stack traces. Other than stating that you've run with 1 thread for years, you haven't said why we should reduce it to 1. All the other changes look like very minor tweaks.

comment:4 Changed 5 days ago by jogger

If an error can easily be seen in jconsole and top and my patch is turned down, I do not feel good. For thread pool sizing see here : http://zzz.i2p/topics/2762?page=1#p14751

comment:5 Changed 4 days ago by zzz

Thanks for the long explanatory post. While I don't agree with your conclusions, the analysis is interesting. What's the "error seen in jconsole"?

comment:6 Changed 4 days ago by jogger

Just go to the threads view, filter on "job" and see why I call the current code a mess. top -H also works.

comment:7 Changed 4 days ago by zzz

If you're trying to make a point it would be much more effective if you just told us what you consider an "error", as top doesn't usually output error messages for an application, and I don't have time for a treasure hunt or to try to guess what you're thinking. Thanks.

comment:8 Changed 3 days ago by jogger

top - 21:42:30 up 29 days,  6:39,  2 users,  load average: 4,74, 4,40, 4,37
Threads: 142 total,   2 running, 140 sleeping,   0 stopped,   0 zombie
%Cpu(s): 31,5 us,  6,4 sy,  0,0 ni, 60,0 id,  0,6 wa,  0,0 hi,  1,5 si,  0,0 st
MiB Mem :   1996,9 total,     56,3 free,   1375,3 used,    565,3 buff/cache
MiB Swap:   3078,4 total,   2689,4 free,    389,0 used.    541,7 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND               
12904 a         20   0  510644 366236  10952 S   0,0  17,9  31:19.27 JobQueue 5/5          
12903 a         20   0  510644 366236  10952 S   1,6  17,9  32:52.38 JobQueue 4/5          
12902 a         20   0  510644 366236  10952 S   1,6  17,9  31:23.34 JobQueue 3/5          
12901 a         20   0  510644 366236  10952 S   3,2  17,9  31:14.43 JobQueue 2/5          
12899 a         20   0  510644 366236  10952 S   4,8  17,9  31:21.67 JobQueue 1/1          

The numbering does not work upward nor downward and there is no intent to sync _id.

comment:9 Changed 3 days ago by zzz

so the complaint is that the number one thread name should be 1/5, not 1/1?

Note: See TracTickets for help on using tickets.