4646import java .util .Map ;
4747import java .util .Set ;
4848import java .util .concurrent .LinkedBlockingQueue ;
49+ import java .util .concurrent .RejectedExecutionHandler ;
4950import java .util .concurrent .TimeUnit ;
5051import java .util .concurrent .ThreadPoolExecutor ;
5152import java .util .concurrent .atomic .AtomicBoolean ;
@@ -75,6 +76,7 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
7576 private Map <Forest ,AtomicBoolean > forestIsDone = new HashMap <>();
7677 private final AtomicBoolean stopped = new AtomicBoolean (false );
7778 private final AtomicBoolean started = new AtomicBoolean (false );
79+ private final Object lock = new Object ();
7880 private final Map <Forest ,List <QueryTask >> blackListedTasks = new HashMap <>();
7981 private JobTicket jobTicket ;
8082 private Thread runJobCompletionListeners ;
@@ -350,9 +352,7 @@ private synchronized void initialize() {
350352 }
351353 logger .info ("Starting job batchSize={}, threadCount={}, onUrisReady listeners={}, failure listeners={}" ,
352354 getBatchSize (), getThreadCount (), urisReadyListeners .size (), failureListeners .size ());
353- threadPool = new QueryThreadPoolExecutor (1 , this );
354- threadPool .setCorePoolSize (getThreadCount ());
355- threadPool .setMaximumPoolSize (getThreadCount ());
355+ threadPool = new QueryThreadPoolExecutor (getThreadCount (), this );
356356 runJobCompletionListeners = new Thread (new Runnable () {
357357 @ Override
358358 public void run () {
@@ -872,12 +872,40 @@ protected void finalize() {
872872 }
873873 }
874874
875+ /**
876+ * A handler for rejected tasks that waits for the work queue to
877+ * become empty and then submits the rejected task
878+ */
879+ private class BlockingRunsPolicy implements RejectedExecutionHandler {
880+ /**
881+ * Waits for the work queue to become empty and then submits the rejected task,
882+ * unless the executor has been shut down, in which case the task is discarded.
883+ *
884+ * @param runnable the runnable task requested to be executed
885+ * @param executor the executor attempting to execute this task
886+ */
887+ public void rejectedExecution (Runnable runnable , ThreadPoolExecutor executor ) {
888+ if ( !executor .isShutdown () ) {
889+ try {
890+ synchronized ( lock ) {
891+ if (executor .getQueue ().remainingCapacity () == 0 ) {
892+ lock .wait ();
893+ }
894+ }
895+ } catch ( InterruptedException e ) {
896+ logger .warn ("Thread interrupted while waiting for the work queue to become empty" + e );
897+ }
898+ if ( !executor .isShutdown () ) executor .execute (runnable );
899+ }
900+ }
901+ }
902+
875903 private class QueryThreadPoolExecutor extends ThreadPoolExecutor {
876904 private Object objectToNotifyFrom ;
877905
878906 QueryThreadPoolExecutor (int threadCount , Object objectToNotifyFrom ) {
879907 super (threadCount , threadCount , 0 , TimeUnit .MILLISECONDS ,
880- new LinkedBlockingQueue <Runnable >(threadCount * 50 ), new ThreadPoolExecutor . CallerRunsPolicy ());
908+ new LinkedBlockingQueue <Runnable >(threadCount * 25 ), new BlockingRunsPolicy ());
881909 this .objectToNotifyFrom = objectToNotifyFrom ;
882910 }
883911
@@ -889,12 +917,23 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
889917 return returnValue ;
890918 }
891919
920+ @ Override
921+ protected void afterExecute (Runnable r , Throwable t ) {
922+ super .afterExecute (r , t );
923+ synchronized ( lock ) {
924+ lock .notify ();
925+ }
926+ }
927+
892928 @ Override
893929 protected void terminated () {
894930 super .terminated ();
895931 synchronized (objectToNotifyFrom ) {
896932 objectToNotifyFrom .notifyAll ();
897933 }
934+ synchronized ( lock ) {
935+ lock .notify ();
936+ }
898937 }
899938 }
900939
0 commit comments