diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java b/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java index 8afb25d4..55f69fca 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLog4jAppender.java @@ -245,6 +245,10 @@ public void append(final LogEvent event) ); } + public void flush() { + sender.flush(); + } + @Override public boolean stop(long timeout, TimeUnit timeUnit) { this.sender.close(); diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java b/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java index af2a2c4a..d0bd7163 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorLogbackAppender.java @@ -119,6 +119,12 @@ public void start() { super.start(); } + public void flush() { + if (started) { + sender.flush(); + } + } + @Override public void stop() { if (!started) diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorSender.java b/src/main/java/com/splunk/logging/HttpEventCollectorSender.java index c5ddfda8..891d02f5 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorSender.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorSender.java @@ -196,9 +196,18 @@ public synchronized void send(final String message) { } /** - * Flush all pending events + * Flush all pending events to the underlying HTTP client + * and then flush the HTTP client itself (keeping the client + * open to accept further events) */ public synchronized void flush() { + flush(false); + } + + /** + * Flush all pending events to the underlying HTTP client + */ + private synchronized void flushEvents() { if (eventsBatch.size() > 0) { postEventsAsync(eventsBatch); } @@ -210,9 +219,11 @@ public synchronized void flush() { } public synchronized void flush(boolean close) { - flush(); + flushEvents(); if (close) { stopHttpClient(); + } else { + flushHttpClient(); } } @@ -222,8 +233,7 @@ public synchronized void flush(boolean close) { void close() { if (timer != null) timer.cancel(); - flush(); - stopHttpClient(); + flush(true); super.cancel(); } @@ -232,7 +242,7 @@ void close() { */ @Override // TimerTask public void run() { - flush(); + flushEvents(); } /** @@ -261,6 +271,28 @@ public static void putIfPresent(JsonObject collection, String tag, Object value) } } + private void flushHttpClient() { + flushHttpClient(timeoutSettings.terminationTimeout); + } + + private void flushHttpClient(long timeout) { + if (httpClient != null && timeout > 0) { + Dispatcher dispatcher = httpClient.dispatcher(); + + long start = System.currentTimeMillis(); + + while (dispatcher.queuedCallsCount() > 0 && + dispatcher.runningCallsCount() > 0 && + start + timeout > System.currentTimeMillis()) { + try { + TimeUnit.MILLISECONDS.sleep(30); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + } private void stopHttpClient() { if (httpClient != null) {