@@ -42,10 +42,12 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
4242
4343 public static final String CONFIG_BATCH_SIZE = "event.processor.batch.size" ;
4444 public static final String CONFIG_BATCH_INTERVAL = "event.processor.batch.interval" ;
45+ public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout" ;
4546
4647 public static final int DEFAULT_QUEUE_CAPACITY = 1000 ;
4748 public static final int DEFAULT_BATCH_SIZE = 50 ;
4849 public static final long DEFAULT_BATCH_INTERVAL = TimeUnit .MINUTES .toMillis (1 );
50+ public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit .SECONDS .toMillis (5 );
4951
5052 private static final Object SHUTDOWN_SIGNAL = new Object ();
5153
@@ -54,17 +56,19 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
5456
5557 private final int batchSize ;
5658 private final long flushInterval ;
59+ private final long timeoutMillis ;
5760 private final ExecutorService executor ;
5861 private final NotificationCenter notificationCenter ;
5962
6063 private Future <?> future ;
6164 private boolean isStarted = false ;
6265
63- private BatchEventProcessor (BlockingQueue <Object > eventQueue , EventHandler eventHandler , Integer batchSize , Long flushInterval , ExecutorService executor , NotificationCenter notificationCenter ) {
66+ private BatchEventProcessor (BlockingQueue <Object > eventQueue , EventHandler eventHandler , Integer batchSize , Long flushInterval , Long timeoutMillis , ExecutorService executor , NotificationCenter notificationCenter ) {
6467 this .eventHandler = eventHandler ;
6568 this .eventQueue = eventQueue ;
6669 this .batchSize = batchSize == null ? PropertyUtils .getInteger (CONFIG_BATCH_SIZE , DEFAULT_BATCH_SIZE ) : batchSize ;
6770 this .flushInterval = flushInterval == null ? PropertyUtils .getLong (CONFIG_BATCH_INTERVAL , DEFAULT_BATCH_INTERVAL ) : flushInterval ;
71+ this .timeoutMillis = timeoutMillis == null ? PropertyUtils .getLong (CONFIG_CLOSE_TIMEOUT , DEFAULT_TIMEOUT_INTERVAL ) : timeoutMillis ;
6872 this .notificationCenter = notificationCenter ;
6973
7074 if (executor == null ) {
@@ -97,10 +101,12 @@ public void close() throws Exception {
97101 logger .info ("Start close" );
98102 eventQueue .put (SHUTDOWN_SIGNAL );
99103 try {
100- future .get (5 , TimeUnit .SECONDS );
104+ future .get (timeoutMillis , TimeUnit .MILLISECONDS );
101105 } catch (InterruptedException e ) {
102106 logger .warn ("Interrupted while awaiting termination." );
103107 Thread .currentThread ().interrupt ();
108+ } catch (TimeoutException e ) {
109+ logger .error ("Timeout exceeded attempting to close for {} ms" , timeoutMillis );
104110 }
105111 }
106112
@@ -179,12 +185,12 @@ private boolean shouldSplit(UserEvent userEvent) {
179185 ProjectConfig currentConfig = currentBatch .peekLast ().getUserContext ().getProjectConfig ();
180186 ProjectConfig newConfig = userEvent .getUserContext ().getProjectConfig ();
181187
182- // Revisions should match
188+ // Projects should match
183189 if (!currentConfig .getProjectId ().equals (newConfig .getProjectId ())) {
184190 return true ;
185191 }
186192
187- // Projects should match
193+ // Revisions should match
188194 if (!currentConfig .getRevision ().equals (newConfig .getRevision ())) {
189195 return true ;
190196 }
@@ -223,6 +229,7 @@ public static class Builder {
223229 private Long flushInterval = null ;
224230 private ExecutorService executor = null ;
225231 private NotificationCenter notificationCenter = null ;
232+ private Long timeoutMillis = null ;
226233
227234 public Builder withEventHandler (EventHandler eventHandler ) {
228235 this .eventHandler = eventHandler ;
@@ -249,13 +256,18 @@ public Builder withExecutor(ExecutorService executor) {
249256 return this ;
250257 }
251258
259+ public Builder withTimeout (long duration , TimeUnit timeUnit ) {
260+ this .timeoutMillis = timeUnit .toMillis (duration );
261+ return this ;
262+ }
263+
252264 public Builder withNotificationCenter (NotificationCenter notificationCenter ) {
253265 this .notificationCenter = notificationCenter ;
254266 return this ;
255267 }
256268
257269 public BatchEventProcessor build () {
258- return new BatchEventProcessor (eventQueue , eventHandler , batchSize , flushInterval , executor , notificationCenter );
270+ return new BatchEventProcessor (eventQueue , eventHandler , batchSize , flushInterval , timeoutMillis , executor , notificationCenter );
259271 }
260272 }
261273
0 commit comments