|
3 | 3 | import java.time.Duration; |
4 | 4 | import java.util.Optional; |
5 | 5 | import java.util.Set; |
6 | | -import java.util.concurrent.ExecutorService; |
7 | | -import java.util.concurrent.Executors; |
| 6 | +import java.util.concurrent.*; |
8 | 7 |
|
9 | 8 | import org.slf4j.Logger; |
10 | 9 | import org.slf4j.LoggerFactory; |
@@ -75,24 +74,49 @@ default boolean checkCRDAndValidateLocalModel() { |
75 | 74 | return false; |
76 | 75 | } |
77 | 76 |
|
78 | | - int DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; |
| 77 | + int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200; |
| 78 | + int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; |
79 | 79 |
|
80 | 80 | /** |
81 | | - * Retrieves the maximum number of threads the operator can spin out to dispatch reconciliation |
82 | | - * requests to reconcilers |
| 81 | + * The maximum number of threads the operator can spin out to dispatch reconciliation requests to |
| 82 | + * reconcilers |
83 | 83 | * |
84 | 84 | * @return the maximum number of concurrent reconciliation threads |
85 | 85 | */ |
86 | 86 | default int concurrentReconciliationThreads() { |
87 | 87 | return DEFAULT_RECONCILIATION_THREADS_NUMBER; |
88 | 88 | } |
89 | 89 |
|
| 90 | + /** |
| 91 | + * The minimum number of threads the operator starts in the thread pool for reconciliations. |
| 92 | + * |
| 93 | + * @return the minimum number of concurrent reconciliation threads |
| 94 | + */ |
| 95 | + default int minConcurrentReconciliationThreads() { |
| 96 | + return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER; |
| 97 | + } |
| 98 | + |
90 | 99 | int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER; |
| 100 | + int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER; |
91 | 101 |
|
| 102 | + /** |
| 103 | + * Retrieves the maximum number of threads the operator can spin out to be used in the workflows. |
| 104 | + * |
| 105 | + * @return the maximum number of concurrent workflow threads |
| 106 | + */ |
92 | 107 | default int concurrentWorkflowExecutorThreads() { |
93 | 108 | return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; |
94 | 109 | } |
95 | 110 |
|
| 111 | + /** |
| 112 | + * The minimum number of threads the operator starts in the thread pool for workflows. |
| 113 | + * |
| 114 | + * @return the minimum number of concurrent workflow threads |
| 115 | + */ |
| 116 | + default int minConcurrentWorkflowExecutorThreads() { |
| 117 | + return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; |
| 118 | + } |
| 119 | + |
96 | 120 | /** |
97 | 121 | * Used to clone custom resources. It is strongly suggested that implementors override this method |
98 | 122 | * since the default implementation creates a new {@link Cloner} instance each time this method is |
@@ -136,11 +160,15 @@ default Metrics getMetrics() { |
136 | 160 | } |
137 | 161 |
|
138 | 162 | default ExecutorService getExecutorService() { |
139 | | - return Executors.newFixedThreadPool(concurrentReconciliationThreads()); |
| 163 | + return new ThreadPoolExecutor(minConcurrentReconciliationThreads(), |
| 164 | + concurrentReconciliationThreads(), |
| 165 | + 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); |
140 | 166 | } |
141 | 167 |
|
142 | 168 | default ExecutorService getWorkflowExecutorService() { |
143 | | - return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads()); |
| 169 | + return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), |
| 170 | + concurrentWorkflowExecutorThreads(), |
| 171 | + 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); |
144 | 172 | } |
145 | 173 |
|
146 | 174 | default boolean closeClientOnStop() { |
|
0 commit comments