From cb005213fae83f2dc42546b8edad9e5052203ac8 Mon Sep 17 00:00:00 2001 From: Simon HEGE Date: Mon, 8 Jul 2024 06:56:18 +0200 Subject: [PATCH] Resend middleware to retry also for some HTTP error codes --- .../HttpEventCollectorResendMiddleware.java | 52 +++++++-- ...ttpEventCollectorResendMiddlewareTest.java | 108 ++++++++++++++++++ 2 files changed, 149 insertions(+), 11 deletions(-) create mode 100644 src/test/java/com/splunk/logging/HttpEventCollectorResendMiddlewareTest.java diff --git a/src/main/java/com/splunk/logging/HttpEventCollectorResendMiddleware.java b/src/main/java/com/splunk/logging/HttpEventCollectorResendMiddleware.java index b4e7b201..96fb47b7 100644 --- a/src/main/java/com/splunk/logging/HttpEventCollectorResendMiddleware.java +++ b/src/main/java/com/splunk/logging/HttpEventCollectorResendMiddleware.java @@ -1,5 +1,8 @@ package com.splunk.logging; +import java.util.Arrays; +import java.util.HashSet; + /** * @copyright * @@ -19,6 +22,7 @@ */ import java.util.List; +import java.util.Set; /** * Splunk http event collector resend middleware. @@ -30,6 +34,20 @@ */ public class HttpEventCollectorResendMiddleware extends HttpEventCollectorMiddleware.HttpSenderMiddleware { + + /** + * List of HTTP event collector server application error statuses. These statuses + * indicate non-transient problems that cannot be fixed by resending the + * data. + */ + private static final Set HttpEventCollectorApplicationErrors = new HashSet<>(Arrays.asList( + // Forbidden + 403, + // Method Not Allowed + 405, + // Bad Request + 400 + )); private long retriesOnError = 0; /** @@ -47,6 +65,10 @@ public void postEvents( callNext(events, sender, new Callback(events, sender, callback)); } + private boolean shouldRetry(int statusCode) { + return statusCode != 200 && !HttpEventCollectorApplicationErrors.contains(statusCode); + } + private class Callback implements HttpEventCollectorMiddleware.IHttpSenderCallback { private long retries = 0; private final List events; @@ -66,25 +88,33 @@ public Callback( @Override public void completed(int statusCode, final String reply) { - // if non-200, resend wouldn't help, delegate to previous callback - prevCallback.completed(statusCode, reply); + if (shouldRetry(statusCode) && retries < retriesOnError) { + retry(); + } else { + // if non-retryable, resend wouldn't help, delegate to previous callback + prevCallback.completed(statusCode, reply); + } } @Override public void failed(final Exception ex) { if (retries < retriesOnError) { - retries++; - try { - Thread.sleep(retryDelay); - callNext(events, sender, this); - } catch (InterruptedException ie) { - prevCallback.failed(ie); - } - // increase delay exponentially - retryDelay = Math.min(RetryDelayCeiling, retryDelay * 2); + retry(); } else { prevCallback.failed(ex); } } + + private void retry() { + retries++; + try { + Thread.sleep(retryDelay); + callNext(events, sender, this); + } catch (InterruptedException ie) { + prevCallback.failed(ie); + } + // increase delay exponentially + retryDelay = Math.min(RetryDelayCeiling, retryDelay * 2); + } } } diff --git a/src/test/java/com/splunk/logging/HttpEventCollectorResendMiddlewareTest.java b/src/test/java/com/splunk/logging/HttpEventCollectorResendMiddlewareTest.java new file mode 100644 index 00000000..0ff85106 --- /dev/null +++ b/src/test/java/com/splunk/logging/HttpEventCollectorResendMiddlewareTest.java @@ -0,0 +1,108 @@ +package com.splunk.logging; + +import junit.framework.TestCase; +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class HttpEventCollectorResendMiddlewareTest extends TestCase { + + public void testPostEvents_whenSuccesShouldNotRetry() { + // Arrange + HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3); + final AtomicInteger callCount = new AtomicInteger(0); + HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 0); + final List recordedStatusCodes = new ArrayList<>(); + final List recordedReplies = new ArrayList<>(); + final List recordedExceptions = new ArrayList<>(); + HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, recordedExceptions); + + // Act + middleware.postEvents(null, sender, callback); + + + // Assert + assertEquals(1, callCount.get()); + assertEquals(1, recordedStatusCodes.size()); + assertEquals(1, recordedReplies.size()); + assertEquals(0, recordedExceptions.size()); + assertEquals(200, recordedStatusCodes.get(0).intValue()); + assertEquals("Success", recordedReplies.get(0)); + } + + public void testPostEvents_whenUnavailableThenSuccessShouldRetry() { + // Arrange + HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3); + final AtomicInteger callCount = new AtomicInteger(0); + HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 2); + final List recordedStatusCodes = new ArrayList<>(); + final List recordedReplies = new ArrayList<>(); + final List recordedExceptions = new ArrayList<>(); + HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, recordedExceptions); + + // Act + middleware.postEvents(null, sender, callback); + + + // Assert + assertEquals(3, callCount.get()); + assertEquals(1, recordedStatusCodes.size()); + assertEquals(1, recordedReplies.size()); + assertEquals(0, recordedExceptions.size()); + assertEquals(200, recordedStatusCodes.get(0).intValue()); + } + + public void testPostEvents_whenUnavailableShouldRetryThenStop() { + // Arrange + HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3); + final AtomicInteger callCount = new AtomicInteger(0); + HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 10); + final List recordedStatusCodes = new ArrayList<>(); + final List recordedReplies = new ArrayList<>(); + final List recordedExceptions = new ArrayList<>(); + HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, recordedExceptions); + + // Act + middleware.postEvents(null, sender, callback); + + + // Assert + assertEquals(4, callCount.get()); + assertEquals(1, recordedStatusCodes.size()); + assertEquals(1, recordedReplies.size()); + assertEquals(0, recordedExceptions.size()); + assertEquals(503, recordedStatusCodes.get(0).intValue()); + } + + private static HttpEventCollectorMiddleware.IHttpSender getSender(AtomicInteger callCount, int errorCount) { + return new HttpEventCollectorMiddleware.IHttpSender() { + @Override + public void postEvents(List events, HttpEventCollectorMiddleware.IHttpSenderCallback callback) { + callCount.incrementAndGet(); + if (callCount.get() > errorCount) { + callback.completed(200, "Success"); + } else { + callback.completed(503, "Service Unavailable"); + } + } + }; + } + + private static HttpEventCollectorMiddleware.IHttpSenderCallback getCallback(List recordedStatusCodes, List recordedReplies, List recordedExceptions) { + return new HttpEventCollectorMiddleware.IHttpSenderCallback() { + + @Override + public void completed(int statusCode, String reply) { + recordedStatusCodes.add(statusCode); + recordedReplies.add(reply); + } + + @Override + public void failed(Exception ex) { + recordedExceptions.add(ex); + } + }; + } +} \ No newline at end of file