11package io .javaoperatorsdk .operator .api .config ;
22
3+ import java .util .concurrent .BlockingQueue ;
34import java .util .concurrent .ExecutorService ;
4- import java .util .concurrent .ScheduledThreadPoolExecutor ;
5+ import java .util .concurrent .LinkedBlockingQueue ;
6+ import java .util .concurrent .RejectedExecutionHandler ;
7+ import java .util .concurrent .ThreadFactory ;
8+ import java .util .concurrent .ThreadPoolExecutor ;
9+ import java .util .concurrent .TimeUnit ;
510import java .util .concurrent .atomic .AtomicReference ;
611
712class ExecutorServiceProducer {
813
9- private final static AtomicReference <ScheduledThreadPoolExecutor > executor =
14+ private final static AtomicReference <ThreadPoolExecutor > executor =
1015 new AtomicReference <>();
1116
1217 static ExecutorService getExecutor (int threadPoolSize ) {
1318 final var gotSet =
14- executor .compareAndSet (null , new ScheduledThreadPoolExecutor (threadPoolSize ));
19+ executor .compareAndSet (null , new DebugThreadPoolExecutor (threadPoolSize , threadPoolSize , 0L ,
20+ TimeUnit .MILLISECONDS , new LinkedBlockingQueue <>()));
1521 final var result = executor .get ();
1622 if (!gotSet ) {
1723 // check that we didn't try to change the pool size
@@ -23,4 +29,37 @@ static ExecutorService getExecutor(int threadPoolSize) {
2329 }
2430 return result ;
2531 }
32+
33+ private static class DebugThreadPoolExecutor extends ThreadPoolExecutor {
34+
35+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
36+ TimeUnit unit ,
37+ BlockingQueue <Runnable > workQueue ) {
38+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue );
39+ }
40+
41+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
42+ TimeUnit unit , BlockingQueue <Runnable > workQueue ,
43+ ThreadFactory threadFactory ) {
44+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory );
45+ }
46+
47+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
48+ TimeUnit unit , BlockingQueue <Runnable > workQueue ,
49+ RejectedExecutionHandler handler ) {
50+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , handler );
51+ }
52+
53+ public DebugThreadPoolExecutor (int corePoolSize , int maximumPoolSize , long keepAliveTime ,
54+ TimeUnit unit , BlockingQueue <Runnable > workQueue ,
55+ ThreadFactory threadFactory , RejectedExecutionHandler handler ) {
56+ super (corePoolSize , maximumPoolSize , keepAliveTime , unit , workQueue , threadFactory , handler );
57+ }
58+
59+ @ Override
60+ public void shutdown () {
61+ Thread .dumpStack ();
62+ super .shutdown ();
63+ }
64+ }
2665}
0 commit comments