-
Notifications
You must be signed in to change notification settings - Fork 18
Fork HttpEventCollectorResendMiddleware to properly implement retrying #322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
67f6aa8
6e01b22
e9313c8
0929af7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,125 @@ | ||||||
| package io.quarkiverse.logging.splunk.middleware; | ||||||
|
|
||||||
| import com.splunk.logging.HttpEventCollectorEventInfo; | ||||||
| import com.splunk.logging.HttpEventCollectorMiddleware; | ||||||
|
|
||||||
| import java.util.Arrays; | ||||||
| import java.util.HashSet; | ||||||
|
|
||||||
| /** | ||||||
| * @copyright | ||||||
| * | ||||||
| * Copyright 2013-2015 Splunk, Inc. | ||||||
| * Derived from https://github.com/splunk/splunk-library-javalogging/pull/287 | ||||||
| * by Simon Hege. | ||||||
| * | ||||||
| * Licensed under the Apache License, Version 2.0 (the "License"): you may | ||||||
| * not use this file except in compliance with the License. You may obtain | ||||||
| * a copy of the License at | ||||||
| * | ||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| * | ||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||||||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||||||
| * License for the specific language governing permissions and limitations | ||||||
| * under the License. | ||||||
| */ | ||||||
|
|
||||||
| import java.util.List; | ||||||
| import java.util.Set; | ||||||
|
|
||||||
| /** | ||||||
| * Splunk http event collector resend middleware. | ||||||
| * | ||||||
| * | ||||||
| * HTTP event collector middleware plug in that implements a simple resend policy. | ||||||
| * When HTTP post reply isn't an application error it tries to resend the data. | ||||||
| * An exponentially growing delay is used to prevent server overflow. | ||||||
| */ | ||||||
| public class HttpEventCollectorResendMiddleware | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to make this class the default of config
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we set this as the default middleware, perhaps it may be confusing to users as then setting their own middleware will remove this middleware I think? Especially confusing as they can still specify retries via the config. Injecting it 'transparently' when retries > 0 as in the current implementation makes sense to me. Making it more configurable is a good suggestion of course! I would specify the retry-delay as a Duration which we see more often in Quarkus (with default |
||||||
| extends HttpEventCollectorMiddleware.HttpSenderMiddleware { | ||||||
|
|
||||||
| /** | ||||||
| * List of HTTP event collector server application error statuses. These statuses | ||||||
| * indicate non-transient problems that cannot be fixed by resending the | ||||||
| * data. | ||||||
| */ | ||||||
| private static final Set<Integer> HttpEventCollectorApplicationErrors = new HashSet<>(Arrays.asList( | ||||||
| // Forbidden | ||||||
| 403, | ||||||
| // Method Not Allowed | ||||||
| 405, | ||||||
| // Bad Request | ||||||
| 400 | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There may be other 4xx where we should not retry.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would actually prefer the 'blacklist' as @simonhege implemented: we want logging to happen. If we retry only on a whitelist, we risk not retrying on a hypothetical future error 567 that the Splunk API might return without an adjustment to our code. Or we won't retry on a 404 which might be caused by a faulty DNS configuration while Splunk is fine and back online 5 minutes later. This PR was a 1:1 copy of @simonhege's work, but triggered by your response I'm thinking about it a bit more and would actually even want to narrow down on the check of what to retry: we should retry anything that isn't a known Splunk 4XX-response. For example, if the response is a 400, but without the expected json body such as Optionally, we could add another user config parameter for 'codes not to retry'? |
||||||
| )); | ||||||
| private long retriesOnError = 0; | ||||||
|
|
||||||
| /** | ||||||
| * Create a resend middleware component. | ||||||
| * @param retriesOnError is the max retry count. | ||||||
| */ | ||||||
| public HttpEventCollectorResendMiddleware(long retriesOnError) { | ||||||
| this.retriesOnError = retriesOnError; | ||||||
| } | ||||||
|
|
||||||
| public void postEvents( | ||||||
| final List<HttpEventCollectorEventInfo> events, | ||||||
| HttpEventCollectorMiddleware.IHttpSender sender, | ||||||
| HttpEventCollectorMiddleware.IHttpSenderCallback callback) { | ||||||
| callNext(events, sender, new Callback(events, sender, callback)); | ||||||
| } | ||||||
|
|
||||||
| private boolean shouldRetry(int statusCode) { | ||||||
| return statusCode != 200 && !HttpEventCollectorApplicationErrors.contains(statusCode); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Linked to the comment about reversing the logic to a whitelist:
Suggested change
|
||||||
| } | ||||||
|
|
||||||
| private class Callback implements HttpEventCollectorMiddleware.IHttpSenderCallback { | ||||||
| private long retries = 0; | ||||||
| private final List<HttpEventCollectorEventInfo> events; | ||||||
| private HttpEventCollectorMiddleware.IHttpSenderCallback prevCallback; | ||||||
| private HttpEventCollectorMiddleware.IHttpSender sender; | ||||||
| private final long RetryDelayCeiling = 60 * 1000; // 1 minute | ||||||
| private long retryDelay = 1000; // start with 1 second | ||||||
|
|
||||||
| public Callback( | ||||||
| final List<HttpEventCollectorEventInfo> events, | ||||||
| HttpEventCollectorMiddleware.IHttpSender sender, | ||||||
| HttpEventCollectorMiddleware.IHttpSenderCallback prevCallback) { | ||||||
| this.events = events; | ||||||
| this.prevCallback = prevCallback; | ||||||
| this.sender = sender; | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void completed(int statusCode, final String reply) { | ||||||
| if (shouldRetry(statusCode) && retries < retriesOnError) { | ||||||
| retry(); | ||||||
| } else { | ||||||
| // if non-retryable, resend wouldn't help, delegate to previous callback | ||||||
| prevCallback.completed(statusCode, reply); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we call
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR as is should work in principle the same as the current implementation - it just changes what to retry, so if it works now it will in theory still work. But no reason not to change it of course. Do you have a concrete code suggestion? |
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void failed(final Exception ex) { | ||||||
| if (retries < retriesOnError) { | ||||||
| retry(); | ||||||
| } else { | ||||||
| prevCallback.failed(ex); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void retry() { | ||||||
| retries++; | ||||||
| try { | ||||||
| Thread.sleep(retryDelay); | ||||||
| callNext(events, sender, this); | ||||||
| } catch (InterruptedException ie) { | ||||||
| prevCallback.failed(ie); | ||||||
| } | ||||||
| // increase delay exponentially | ||||||
| retryDelay = Math.min(RetryDelayCeiling, retryDelay * 2); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| package io.quarkiverse.logging.splunk.middleware; | ||
| /** | ||
| * @copyright | ||
| * | ||
| * Copyright 2013-2015 Splunk, Inc. | ||
| * Derived from https://github.com/splunk/splunk-library-javalogging/pull/287 | ||
| * by Simon Hege. Modifications include updating the unit testing framework to JUnit 5. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"): you may | ||
| * not use this file except in compliance with the License. You may obtain | ||
| * a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
| * License for the specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import com.splunk.logging.HttpEventCollectorMiddleware; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
|
|
||
| public class HttpEventCollectorResendMiddlewareTest { | ||
|
|
||
| @Test | ||
| public void testPostEvents_whenSuccesShouldNotRetry() { | ||
| // Arrange | ||
| HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3); | ||
| final AtomicInteger callCount = new AtomicInteger(0); | ||
| HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 0); | ||
| final List<Integer> recordedStatusCodes = new ArrayList<>(); | ||
| final List<String> recordedReplies = new ArrayList<>(); | ||
| final List<Exception> recordedExceptions = new ArrayList<>(); | ||
| HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, | ||
| recordedExceptions); | ||
|
|
||
| // Act | ||
| middleware.postEvents(null, sender, callback); | ||
|
|
||
| // Assert | ||
| assertEquals(1, callCount.get()); | ||
| assertEquals(1, recordedStatusCodes.size()); | ||
| assertEquals(1, recordedReplies.size()); | ||
| assertEquals(0, recordedExceptions.size()); | ||
| assertEquals(200, recordedStatusCodes.get(0).intValue()); | ||
| assertEquals("Success", recordedReplies.get(0)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testPostEvents_whenUnavailableThenSuccessShouldRetry() { | ||
| // Arrange | ||
| HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3); | ||
| final AtomicInteger callCount = new AtomicInteger(0); | ||
| HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 2); | ||
| final List<Integer> recordedStatusCodes = new ArrayList<>(); | ||
| final List<String> recordedReplies = new ArrayList<>(); | ||
| final List<Exception> recordedExceptions = new ArrayList<>(); | ||
| HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, | ||
| recordedExceptions); | ||
|
|
||
| // Act | ||
| middleware.postEvents(null, sender, callback); | ||
|
|
||
| // Assert | ||
| assertEquals(3, callCount.get()); | ||
| assertEquals(1, recordedStatusCodes.size()); | ||
| assertEquals(1, recordedReplies.size()); | ||
| assertEquals(0, recordedExceptions.size()); | ||
| assertEquals(200, recordedStatusCodes.get(0).intValue()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testPostEvents_whenUnavailableShouldRetryThenStop() { | ||
| // Arrange | ||
| HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3); | ||
| final AtomicInteger callCount = new AtomicInteger(0); | ||
| HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 10); | ||
| final List<Integer> recordedStatusCodes = new ArrayList<>(); | ||
| final List<String> recordedReplies = new ArrayList<>(); | ||
| final List<Exception> recordedExceptions = new ArrayList<>(); | ||
| HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, | ||
| recordedExceptions); | ||
|
|
||
| // Act | ||
| middleware.postEvents(null, sender, callback); | ||
|
|
||
| // Assert | ||
| assertEquals(4, callCount.get()); | ||
| assertEquals(1, recordedStatusCodes.size()); | ||
| assertEquals(1, recordedReplies.size()); | ||
| assertEquals(0, recordedExceptions.size()); | ||
| assertEquals(503, recordedStatusCodes.get(0).intValue()); | ||
| } | ||
|
|
||
| private static HttpEventCollectorMiddleware.IHttpSender getSender(AtomicInteger callCount, int errorCount) { | ||
| return (events, callback) -> { | ||
| callCount.incrementAndGet(); | ||
| if (callCount.get() > errorCount) { | ||
| callback.completed(200, "Success"); | ||
| } else { | ||
| callback.completed(503, "Service Unavailable"); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| private static HttpEventCollectorMiddleware.IHttpSenderCallback getCallback(List<Integer> recordedStatusCodes, | ||
| List<String> recordedReplies, List<Exception> recordedExceptions) { | ||
| return new HttpEventCollectorMiddleware.IHttpSenderCallback() { | ||
|
|
||
| @Override | ||
| public void completed(int statusCode, String reply) { | ||
| recordedStatusCodes.add(statusCode); | ||
| recordedReplies.add(reply); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(Exception ex) { | ||
| recordedExceptions.add(ex); | ||
| } | ||
| }; | ||
| } | ||
| } |

Uh oh!
There was an error while loading. Please reload this page.