11package io .javaoperatorsdk .operator .api .config ;
22
3- import java .util .concurrent .BlockingQueue ;
43import java .util .concurrent .ExecutorService ;
54import java .util .concurrent .LinkedBlockingQueue ;
6- import java .util .concurrent .RejectedExecutionHandler ;
7- import java .util .concurrent .ThreadFactory ;
85import java .util .concurrent .ThreadPoolExecutor ;
96import java .util .concurrent .TimeUnit ;
107import java .util .concurrent .atomic .AtomicReference ;
@@ -16,8 +13,7 @@ class ExecutorServiceProducer {
1613
1714 static ExecutorService getExecutor (int threadPoolSize ) {
1815 final var gotSet =
19- executor .compareAndSet (null , new DebugThreadPoolExecutor (threadPoolSize , threadPoolSize , 0L ,
20- TimeUnit .MILLISECONDS , new LinkedBlockingQueue <>()));
16+ executor .compareAndSet (null , new InstrumentedExecutorService (threadPoolSize ));
2117 final var result = executor .get ();
2218 if (!gotSet ) {
2319 // check that we didn't try to change the pool size
@@ -30,35 +26,19 @@ static ExecutorService getExecutor(int threadPoolSize) {
3026 return result ;
3127 }
3228
33- private static class DebugThreadPoolExecutor extends ThreadPoolExecutor {
29+ private static class InstrumentedExecutorService extends ThreadPoolExecutor {
30+ private final boolean debug ;
3431
35- public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
36- TimeUnit unit ,
37- BlockingQueue <Runnable > workQueue ) {
38- super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue );
39- }
40-
41- public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
42- TimeUnit unit , BlockingQueue <Runnable > workQueue ,
43- ThreadFactory threadFactory ) {
44- super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory );
45- }
46-
47- public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
48- TimeUnit unit , BlockingQueue <Runnable > workQueue ,
49- RejectedExecutionHandler handler ) {
50- super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , handler );
51- }
52-
53- public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
54- TimeUnit unit , BlockingQueue <Runnable > workQueue ,
55- ThreadFactory threadFactory , RejectedExecutionHandler handler ) {
56- super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory , handler );
32+ public InstrumentedExecutorService (int corePoolSize ) {
33+ super (corePoolSize , corePoolSize , 0L , TimeUnit .MILLISECONDS , new LinkedBlockingQueue <>());
34+ debug = Utils .debugThreadPool ();
5735 }
5836
5937 @ Override
6038 public void shutdown () {
61- Thread .dumpStack ();
39+ if (debug ) {
40+ Thread .dumpStack ();
41+ }
6242 super .shutdown ();
6343 }
6444 }
0 commit comments