Skip to content

Commit 427bca6

Browse files
committed
Add timeout to wait until queued messages are processed
1 parent 8a125ef commit 427bca6

File tree

6 files changed

+42
-21
lines changed

6 files changed

+42
-21
lines changed

src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public static HttpEventCollectorLog4jAppender createAppender(
157157
@PluginAttribute(value = "call_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT) final long callTimeout,
158158
@PluginAttribute(value = "read_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT) final long readTimeout,
159159
@PluginAttribute(value = "write_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT) final long writeTimeout,
160-
@PluginAttribute(value = "await_termination_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_AWAIT_TERMINATION_TIMEOUT) final long awaitTerminationTimeout,
160+
@PluginAttribute(value = "termination_timeout", defaultLong = HttpEventCollectorSender.TimeoutSettings.DEFAULT_TERMINATION_TIMEOUT) final long terminationTimeout,
161161
@PluginElement("Layout") Layout<? extends Serializable> layout,
162162
@PluginElement("Filter") final Filter filter
163163
)
@@ -220,7 +220,7 @@ public static HttpEventCollectorLog4jAppender createAppender(
220220
disableCertificateValidation,
221221
eventBodySerializer,
222222
eventHeaderSerializer,
223-
new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout, awaitTerminationTimeout)
223+
new HttpEventCollectorSender.TimeoutSettings(connectTimeout, callTimeout, readTimeout, writeTimeout, terminationTimeout)
224224
);
225225
}
226226

src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,12 +383,12 @@ public long getWriteTimeout(long milliseconds) {
383383
return this.timeoutSettings.writeTimeout = milliseconds;
384384
}
385385

386-
public void setAwaitTerminationTimeout(long milliseconds) {
387-
this.timeoutSettings.awaitTerminationTimeout = milliseconds;
386+
public void setTerminationTimeout(long milliseconds) {
387+
this.timeoutSettings.terminationTimeout = milliseconds;
388388
}
389389

390-
public long getAwaitTerminationTimeout(long milliseconds) {
391-
return this.timeoutSettings.awaitTerminationTimeout = milliseconds;
390+
public long getTerminationTimeout(long milliseconds) {
391+
return this.timeoutSettings.terminationTimeout = milliseconds;
392392
}
393393

394394
private static long parseLong(String string, int defaultValue) {

src/main/java/com/splunk/logging/HttpEventCollectorLoggingHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public final class HttpEventCollectorLoggingHandler extends Handler {
114114
private final String CallTimeoutConfTag = "call_timeout";
115115
private final String ReadTimeoutConfTag = "read_timeout";
116116
private final String WriteTimeoutConfTag = "write_timeout";
117-
private final String AwaitTerminationTimeoutConfTag = "await_termination_timeout";
117+
private final String TerminationTimeoutConfTag = "termination_timeout";
118118

119119
/** HttpEventCollectorLoggingHandler c-or */
120120
public HttpEventCollectorLoggingHandler() {
@@ -167,7 +167,7 @@ public HttpEventCollectorLoggingHandler() {
167167
getConfigurationNumericProperty(CallTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_CALL_TIMEOUT),
168168
getConfigurationNumericProperty(ReadTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_READ_TIMEOUT),
169169
getConfigurationNumericProperty(WriteTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_WRITE_TIMEOUT),
170-
getConfigurationNumericProperty(AwaitTerminationTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_AWAIT_TERMINATION_TIMEOUT)
170+
getConfigurationNumericProperty(TerminationTimeoutConfTag, HttpEventCollectorSender.TimeoutSettings.DEFAULT_TERMINATION_TIMEOUT)
171171
);
172172

173173
if ("raw".equalsIgnoreCase(type)) {

src/main/java/com/splunk/logging/HttpEventCollectorSender.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,36 @@ public static void putIfPresent(JsonObject collection, String tag, Object value)
263263

264264
private void stopHttpClient() {
265265
if (httpClient != null) {
266-
httpClient.dispatcher().executorService().shutdown();
266+
Dispatcher dispatcher = httpClient.dispatcher();
267+
httpClient = null;
267268

268-
if (timeoutSettings.awaitTerminationTimeout > 0) {
269-
try {
270-
httpClient.dispatcher().executorService().awaitTermination(timeoutSettings.awaitTerminationTimeout, TimeUnit.MILLISECONDS);
271-
} catch (InterruptedException ignored) { /* nop */ }
272-
}
269+
if (timeoutSettings.terminationTimeout > 0) {
270+
// wait for queued messages in the dispatcher to be promoted to the executor service
271+
long start = System.currentTimeMillis();
272+
while (dispatcher.queuedCallsCount() > 0 && start + timeoutSettings.terminationTimeout > System.currentTimeMillis()) {
273+
try {
274+
TimeUnit.MILLISECONDS.sleep(10);
275+
} catch (InterruptedException e) {
276+
Thread.currentThread().interrupt();
277+
break;
278+
}
279+
}
273280

274-
httpClient = null;
281+
// initialize the shutdown of the executor service
282+
dispatcher.executorService().shutdown();
283+
284+
// wait for the messages in the dispatcher's executor service to be sent out
285+
long awaitTerminationTimeout = timeoutSettings.terminationTimeout - (System.currentTimeMillis() - start);
286+
if (awaitTerminationTimeout > 0) {
287+
try {
288+
dispatcher.executorService().awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS);
289+
} catch (InterruptedException e) {
290+
Thread.currentThread().interrupt();
291+
}
292+
}
293+
} else {
294+
dispatcher.executorService().shutdown();
295+
}
275296
}
276297
}
277298

@@ -400,22 +421,22 @@ public static class TimeoutSettings {
400421
public static final long DEFAULT_WRITE_TIMEOUT = 0; // 0 means no timeout
401422
public static final long DEFAULT_CALL_TIMEOUT = 0;
402423
public static final long DEFAULT_READ_TIMEOUT = 0;
403-
public static final long DEFAULT_AWAIT_TERMINATION_TIMEOUT = 0;
424+
public static final long DEFAULT_TERMINATION_TIMEOUT = 0;
404425

405426
public long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
406427
public long callTimeout = DEFAULT_CALL_TIMEOUT;
407428
public long readTimeout = DEFAULT_READ_TIMEOUT;
408429
public long writeTimeout = DEFAULT_WRITE_TIMEOUT;
409-
public long awaitTerminationTimeout = DEFAULT_AWAIT_TERMINATION_TIMEOUT;
430+
public long terminationTimeout = DEFAULT_TERMINATION_TIMEOUT;
410431

411432
public TimeoutSettings() {}
412433

413-
public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout, long awaitTerminationTimeout) {
434+
public TimeoutSettings(long connectTimeout, long callTimeout, long readTimeout, long writeTimeout, long terminationTimeout) {
414435
this.connectTimeout = connectTimeout;
415436
this.callTimeout = callTimeout;
416437
this.readTimeout = readTimeout;
417438
this.writeTimeout = writeTimeout;
418-
this.awaitTerminationTimeout = awaitTerminationTimeout;
439+
this.terminationTimeout = terminationTimeout;
419440
}
420441
}
421442
}

src/test/resources/log4j2.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ under the License.
4545
batch_size_count="0"
4646
batch_interval="0"
4747
connect_timeout="5000"
48-
await_termination_timeout="1000"
48+
termination_timeout="1000"
4949
disableCertificateValidation="true">
5050
<PatternLayout pattern="%m"/>
5151
</SplunkHttp>

src/test/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ under the License.
6060
<messageFormat>text</messageFormat>
6161
<middleware>HttpEventCollectorUnitTestMiddleware</middleware>
6262
<connectTimeout>5000</connectTimeout>
63-
<awaitTerminationTieout>2000</awaitTerminationTieout>
63+
<terminationTieout>2000</terminationTieout>
6464
<layout class="ch.qos.logback.classic.PatternLayout">
6565
<pattern>%msg</pattern>
6666
</layout>

0 commit comments

Comments
 (0)