|
5 | 5 | import java.util.Map; |
6 | 6 | import java.util.Optional; |
7 | 7 | import java.util.Set; |
| 8 | +import java.util.concurrent.ExecutorService; |
8 | 9 | import java.util.concurrent.ScheduledThreadPoolExecutor; |
9 | 10 | import java.util.concurrent.TimeUnit; |
10 | 11 | import java.util.concurrent.locks.ReentrantLock; |
@@ -44,61 +45,37 @@ public void failedEvent(String uid, Event event) {} |
44 | 45 |
|
45 | 46 | private final EventBuffer eventBuffer; |
46 | 47 | private final Set<String> underProcessing = new HashSet<>(); |
47 | | - private final ScheduledThreadPoolExecutor executor; |
48 | 48 | private final EventDispatcher<R> eventDispatcher; |
49 | 49 | private final Retry retry; |
50 | 50 | private final Map<String, RetryExecution> retryState = new HashMap<>(); |
| 51 | + private final ExecutorService executor; |
51 | 52 | private final String controllerName; |
52 | | - private final int terminationTimeout; |
53 | 53 | private final ReentrantLock lock = new ReentrantLock(); |
54 | 54 | private DefaultEventSourceManager<R> eventSourceManager; |
55 | 55 |
|
56 | 56 | public DefaultEventHandler(ConfiguredController<R> controller) { |
57 | | - this( |
58 | | - new EventDispatcher<>(controller), |
| 57 | + this(controller.getConfiguration().getConfigurationService().getExecutorService(), |
59 | 58 | controller.getConfiguration().getName(), |
60 | | - GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), |
61 | | - controller.getConfiguration().getConfigurationService().concurrentReconciliationThreads(), |
62 | | - controller.getConfiguration().getConfigurationService().getTerminationTimeoutSeconds()); |
| 59 | + new EventDispatcher<>(controller), |
| 60 | + GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration())); |
63 | 61 | } |
64 | 62 |
|
65 | | - DefaultEventHandler(EventDispatcher<R> dispatcher, String relatedControllerName, Retry retry) { |
66 | | - this( |
67 | | - dispatcher, |
68 | | - relatedControllerName, |
69 | | - retry, |
70 | | - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER, |
71 | | - ConfigurationService.DEFAULT_TERMINATION_TIMEOUT_SECONDS); |
| 63 | + DefaultEventHandler(EventDispatcher<R> eventDispatcher, String relatedControllerName, |
| 64 | + Retry retry) { |
| 65 | + this(null, relatedControllerName, eventDispatcher, retry); |
72 | 66 | } |
73 | 67 |
|
74 | | - private DefaultEventHandler( |
75 | | - EventDispatcher<R> eventDispatcher, |
76 | | - String relatedControllerName, |
77 | | - Retry retry, |
78 | | - int concurrentReconciliationThreads, |
79 | | - int terminationTimeout) { |
| 68 | + private DefaultEventHandler(ExecutorService executor, String relatedControllerName, |
| 69 | + EventDispatcher<R> eventDispatcher, Retry retry) { |
| 70 | + this.executor = |
| 71 | + executor == null |
| 72 | + ? new ScheduledThreadPoolExecutor( |
| 73 | + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) |
| 74 | + : executor; |
| 75 | + this.controllerName = relatedControllerName; |
80 | 76 | this.eventDispatcher = eventDispatcher; |
81 | 77 | this.retry = retry; |
82 | | - this.controllerName = relatedControllerName; |
83 | 78 | eventBuffer = new EventBuffer(); |
84 | | - this.terminationTimeout = terminationTimeout; |
85 | | - executor = |
86 | | - new ScheduledThreadPoolExecutor( |
87 | | - concurrentReconciliationThreads, |
88 | | - runnable -> new Thread(runnable, "EventHandler-" + relatedControllerName)); |
89 | | - } |
90 | | - |
91 | | - @Override |
92 | | - public void close() { |
93 | | - try { |
94 | | - log.debug("Closing handler for {}", controllerName); |
95 | | - executor.shutdown(); |
96 | | - if (!executor.awaitTermination(terminationTimeout, TimeUnit.SECONDS)) { |
97 | | - executor.shutdownNow(); // if we timed out, waiting, cancel everything |
98 | | - } |
99 | | - } catch (InterruptedException e) { |
100 | | - log.debug("Exception closing handler for {}: {}", controllerName, e.getLocalizedMessage()); |
101 | | - } |
102 | 79 | } |
103 | 80 |
|
104 | 81 | public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) { |
@@ -146,7 +123,13 @@ private void executeBufferedEvents(String customResourceUid) { |
146 | 123 | latestCustomResource.get(), |
147 | 124 | retryInfo(customResourceUid)); |
148 | 125 | log.debug("Executing events for custom resource. Scope: {}", executionScope); |
149 | | - executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this)); |
| 126 | + executor.execute(() -> { |
| 127 | + // change thread name for easier debugging |
| 128 | + Thread.currentThread().setName("EventHandler-" + controllerName); |
| 129 | + PostExecutionControl<R> postExecutionControl = |
| 130 | + eventDispatcher.handleExecution(executionScope); |
| 131 | + eventProcessingFinished(executionScope, postExecutionControl); |
| 132 | + }); |
150 | 133 | } else { |
151 | 134 | log.debug( |
152 | 135 | "Skipping executing controller for resource id: {}. Events in queue: {}." |
|
0 commit comments