Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import java.util.logging.Filter;
import java.util.logging.Formatter;

import io.quarkiverse.logging.splunk.middleware.HttpEventCollectorResendMiddleware;
import org.jboss.logmanager.ExtHandler;
import org.jboss.logmanager.ExtLogRecord;
import org.jboss.logmanager.filters.AllFilter;

import com.splunk.logging.HttpEventCollectorResendMiddleware;

import com.splunk.logging.HttpEventCollectorSender;

public class SplunkLogHandler extends ExtHandler {
Expand Down Expand Up @@ -45,7 +46,7 @@ public SplunkLogHandler(HttpEventCollectorSender sender, boolean includeExceptio
@Override
public void doPublish(ExtLogRecord record) {
String formatted = formatMessage(record);
if (formatted.length() == 0) {
if (formatted.isEmpty()) {
// nothing to write; don't bother
return;
}
Expand All @@ -54,7 +55,7 @@ public void doPublish(ExtLogRecord record) {
record.getLevel().toString(),
formatted,
includeLoggerName ? record.getLoggerName() : null,
includeThreadName ? String.format(Locale.US, "%d", record.getThreadID()) : null,
includeThreadName ? String.format(Locale.US, "%d", record.getLongThreadID()) : null,
record.getMdcCopy(),
(!includeException || record.getThrown() == null) ? null : record.getThrown().getMessage(),
null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.quarkiverse.logging.splunk.middleware;

import com.splunk.logging.HttpEventCollectorEventInfo;
import com.splunk.logging.HttpEventCollectorMiddleware;

import java.util.Arrays;
import java.util.HashSet;

/**
* @copyright
*
* Copyright 2013-2015 Splunk, Inc.
* Derived from https://github.com/splunk/splunk-library-javalogging/pull/287
* by Simon Hege.
*
* Licensed under the Apache License, Version 2.0 (the "License"): you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

import java.util.List;
import java.util.Set;

/**
* Splunk http event collector resend middleware.
*
*
* HTTP event collector middleware plug in that implements a simple resend policy.
* When HTTP post reply isn't an application error it tries to resend the data.
* An exponentially growing delay is used to prevent server overflow.
*/
public class HttpEventCollectorResendMiddleware
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make this class the default of config quarkus.log.handler.splunk.middleware ? Since retriesOnError = 0, it would do nothing by default ?
If so, we could potentially make retriesOnError (same as existing quarkus.log.handler.splunk.max-retries ?) and retryDelay configurable by doing a microprofile config lookup in the constructor via something like:

org.eclipse.microprofile.config.ConfigProvider.getConfig().getOptionalValue("quarkus.log.handler.splunk.max-retries", Integer.class).orElse(0);
org.eclipse.microprofile.config.ConfigProvider.getConfig().getOptionalValue("quarkus.log.handler.splunk.retry-initial-delay-ms", Integer.class).orElse(1000);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set this as the default middleware, perhaps it may be confusing to users as then setting their own middleware will remove this middleware I think? Especially confusing as they can still specify retries via the config. Injecting it 'transparently' when retries > 0 as in the current implementation makes sense to me.

Making it more configurable is a good suggestion of course! I would specify the retry-delay as a Duration which we see more often in Quarkus (with default 1 (second)). https://quarkus.io/guides/all-config#duration-note-anchor-all-config

extends HttpEventCollectorMiddleware.HttpSenderMiddleware {

/**
* List of HTTP event collector server application error statuses. These statuses
* indicate non-transient problems that cannot be fixed by resending the
* data.
*/
private static final Set<Integer> HttpEventCollectorApplicationErrors = new HashSet<>(Arrays.asList(
// Forbidden
403,
// Method Not Allowed
405,
// Bad Request
400
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be other 4xx where we should not retry.
I think it may be safer to retry only on 502 Bad Gateway, 503 Service Unavailable and 504 Gateway Timeout (we can expan the list in the future if needed).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually prefer the 'blacklist' as @simonhege implemented: we want logging to happen.
Retrying anything except documented Splunk 'there is no point in retrying' responses would in my eyes be the 'safe' default.

If we retry only on a whitelist, we risk not retrying on a hypothetical future error 567 that the Splunk API might return without an adjustment to our code. Or we won't retry on a 404 which might be caused by a faulty DNS configuration while Splunk is fine and back online 5 minutes later.

This PR was a 1:1 copy of @simonhege's work, but triggered by your response I'm thinking about it a bit more and would actually even want to narrow down on the check of what to retry: we should retry anything that isn't a known Splunk 4XX-response. For example, if the response is a 400, but without the expected json body such as {"text":"Incorrect data format","code":5,"invalid-event-number":0}. We should then assume the 400-code comes from some middleware between us and the Splunk HEC and we retry. I've seen this enough in practice, where some 'transparant' proxy failed and we got responses from the proxy instead of the end-system we tried to reach.

image

Optionally, we could add another user config parameter for 'codes not to retry'?

));
private long retriesOnError = 0;

/**
* Create a resend middleware component.
* @param retriesOnError is the max retry count.
*/
public HttpEventCollectorResendMiddleware(long retriesOnError) {
this.retriesOnError = retriesOnError;
}

public void postEvents(
final List<HttpEventCollectorEventInfo> events,
HttpEventCollectorMiddleware.IHttpSender sender,
HttpEventCollectorMiddleware.IHttpSenderCallback callback) {
callNext(events, sender, new Callback(events, sender, callback));
}

private boolean shouldRetry(int statusCode) {
return statusCode != 200 && !HttpEventCollectorApplicationErrors.contains(statusCode);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linked to the comment about reversing the logic to a whitelist:

Suggested change
return statusCode != 200 && !HttpEventCollectorApplicationErrors.contains(statusCode);
return RetryableHttpStatusCodes.contains(statusCode);

}

private class Callback implements HttpEventCollectorMiddleware.IHttpSenderCallback {
private long retries = 0;
private final List<HttpEventCollectorEventInfo> events;
private HttpEventCollectorMiddleware.IHttpSenderCallback prevCallback;
private HttpEventCollectorMiddleware.IHttpSender sender;
private final long RetryDelayCeiling = 60 * 1000; // 1 minute
private long retryDelay = 1000; // start with 1 second

public Callback(
final List<HttpEventCollectorEventInfo> events,
HttpEventCollectorMiddleware.IHttpSender sender,
HttpEventCollectorMiddleware.IHttpSenderCallback prevCallback) {
this.events = events;
this.prevCallback = prevCallback;
this.sender = sender;
}

@Override
public void completed(int statusCode, final String reply) {
if (shouldRetry(statusCode) && retries < retriesOnError) {
retry();
} else {
// if non-retryable, resend wouldn't help, delegate to previous callback
prevCallback.completed(statusCode, reply);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call prevCallback.failed(new RetryLimitExceededException()) with our own exception ?
There's some logic in io.quarkiverse.logging.splunk.SplunkErrorCallback to log to stdout/stderr in case of failure, but not sure if the ErrorCallback will get called.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR as is should work in principle the same as the current implementation - it just changes what to retry, so if it works now it will in theory still work. But no reason not to change it of course. Do you have a concrete code suggestion?

}
}

@Override
public void failed(final Exception ex) {
if (retries < retriesOnError) {
retry();
} else {
prevCallback.failed(ex);
}
}

private void retry() {
retries++;
try {
Thread.sleep(retryDelay);
callNext(events, sender, this);
} catch (InterruptedException ie) {
prevCallback.failed(ie);
}
// increase delay exponentially
retryDelay = Math.min(RetryDelayCeiling, retryDelay * 2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;

import com.splunk.logging.HttpEventCollectorResendMiddleware;
import com.splunk.logging.HttpEventCollectorSender;

import io.quarkiverse.logging.splunk.middleware.HttpEventCollectorResendMiddleware;

@ExtendWith(MockitoExtension.class)
class SplunkLogHandlerTest {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package io.quarkiverse.logging.splunk.middleware;
/**
* @copyright
*
* Copyright 2013-2015 Splunk, Inc.
* Derived from https://github.com/splunk/splunk-library-javalogging/pull/287
* by Simon Hege. Modifications include updating the unit testing framework to JUnit 5.
*
* Licensed under the Apache License, Version 2.0 (the "License"): you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.splunk.logging.HttpEventCollectorMiddleware;
import org.junit.jupiter.api.Test;


public class HttpEventCollectorResendMiddlewareTest {

@Test
public void testPostEvents_whenSuccesShouldNotRetry() {
// Arrange
HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3);
final AtomicInteger callCount = new AtomicInteger(0);
HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 0);
final List<Integer> recordedStatusCodes = new ArrayList<>();
final List<String> recordedReplies = new ArrayList<>();
final List<Exception> recordedExceptions = new ArrayList<>();
HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies,
recordedExceptions);

// Act
middleware.postEvents(null, sender, callback);

// Assert
assertEquals(1, callCount.get());
assertEquals(1, recordedStatusCodes.size());
assertEquals(1, recordedReplies.size());
assertEquals(0, recordedExceptions.size());
assertEquals(200, recordedStatusCodes.get(0).intValue());
assertEquals("Success", recordedReplies.get(0));
}

@Test
public void testPostEvents_whenUnavailableThenSuccessShouldRetry() {
// Arrange
HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3);
final AtomicInteger callCount = new AtomicInteger(0);
HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 2);
final List<Integer> recordedStatusCodes = new ArrayList<>();
final List<String> recordedReplies = new ArrayList<>();
final List<Exception> recordedExceptions = new ArrayList<>();
HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies,
recordedExceptions);

// Act
middleware.postEvents(null, sender, callback);

// Assert
assertEquals(3, callCount.get());
assertEquals(1, recordedStatusCodes.size());
assertEquals(1, recordedReplies.size());
assertEquals(0, recordedExceptions.size());
assertEquals(200, recordedStatusCodes.get(0).intValue());
}

@Test
public void testPostEvents_whenUnavailableShouldRetryThenStop() {
// Arrange
HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3);
final AtomicInteger callCount = new AtomicInteger(0);
HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 10);
final List<Integer> recordedStatusCodes = new ArrayList<>();
final List<String> recordedReplies = new ArrayList<>();
final List<Exception> recordedExceptions = new ArrayList<>();
HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies,
recordedExceptions);

// Act
middleware.postEvents(null, sender, callback);

// Assert
assertEquals(4, callCount.get());
assertEquals(1, recordedStatusCodes.size());
assertEquals(1, recordedReplies.size());
assertEquals(0, recordedExceptions.size());
assertEquals(503, recordedStatusCodes.get(0).intValue());
}

private static HttpEventCollectorMiddleware.IHttpSender getSender(AtomicInteger callCount, int errorCount) {
return (events, callback) -> {
callCount.incrementAndGet();
if (callCount.get() > errorCount) {
callback.completed(200, "Success");
} else {
callback.completed(503, "Service Unavailable");
}
};
}

private static HttpEventCollectorMiddleware.IHttpSenderCallback getCallback(List<Integer> recordedStatusCodes,
List<String> recordedReplies, List<Exception> recordedExceptions) {
return new HttpEventCollectorMiddleware.IHttpSenderCallback() {

@Override
public void completed(int statusCode, String reply) {
recordedStatusCodes.add(statusCode);
recordedReplies.add(reply);
}

@Override
public void failed(Exception ex) {
recordedExceptions.add(ex);
}
};
}
}