Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.splunk.logging;

import java.util.Arrays;
import java.util.HashSet;

/**
* @copyright
*
Expand All @@ -19,6 +22,7 @@
*/

import java.util.List;
import java.util.Set;

/**
* Splunk http event collector resend middleware.
Expand All @@ -30,6 +34,20 @@
*/
public class HttpEventCollectorResendMiddleware
extends HttpEventCollectorMiddleware.HttpSenderMiddleware {

/**
* List of HTTP event collector server application error statuses. These statuses
* indicate non-transient problems that cannot be fixed by resending the
* data.
*/
private static final Set<Integer> HttpEventCollectorApplicationErrors = new HashSet<>(Arrays.asList(
// Forbidden
403,
// Method Not Allowed
405,
// Bad Request
400
));
private long retriesOnError = 0;

/**
Expand All @@ -47,6 +65,10 @@ public void postEvents(
callNext(events, sender, new Callback(events, sender, callback));
}

private boolean shouldRetry(int statusCode) {
return statusCode != 200 && !HttpEventCollectorApplicationErrors.contains(statusCode);
}

private class Callback implements HttpEventCollectorMiddleware.IHttpSenderCallback {
private long retries = 0;
private final List<HttpEventCollectorEventInfo> events;
Expand All @@ -66,25 +88,33 @@ public Callback(

@Override
public void completed(int statusCode, final String reply) {
// if non-200, resend wouldn't help, delegate to previous callback
prevCallback.completed(statusCode, reply);
if (shouldRetry(statusCode) && retries < retriesOnError) {
retry();
} else {
// if non-retryable, resend wouldn't help, delegate to previous callback
prevCallback.completed(statusCode, reply);
}
}

@Override
public void failed(final Exception ex) {
if (retries < retriesOnError) {
retries++;
try {
Thread.sleep(retryDelay);
callNext(events, sender, this);
} catch (InterruptedException ie) {
prevCallback.failed(ie);
}
// increase delay exponentially
retryDelay = Math.min(RetryDelayCeiling, retryDelay * 2);
retry();
} else {
prevCallback.failed(ex);
}
}

private void retry() {
retries++;
try {
Thread.sleep(retryDelay);
callNext(events, sender, this);
} catch (InterruptedException ie) {
prevCallback.failed(ie);
}
// increase delay exponentially
retryDelay = Math.min(RetryDelayCeiling, retryDelay * 2);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.splunk.logging;

import junit.framework.TestCase;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class HttpEventCollectorResendMiddlewareTest extends TestCase {

public void testPostEvents_whenSuccesShouldNotRetry() {
// Arrange
HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3);
final AtomicInteger callCount = new AtomicInteger(0);
HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 0);
final List<Integer> recordedStatusCodes = new ArrayList<>();
final List<String> recordedReplies = new ArrayList<>();
final List<Exception> recordedExceptions = new ArrayList<>();
HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, recordedExceptions);

// Act
middleware.postEvents(null, sender, callback);


// Assert
assertEquals(1, callCount.get());
assertEquals(1, recordedStatusCodes.size());
assertEquals(1, recordedReplies.size());
assertEquals(0, recordedExceptions.size());
assertEquals(200, recordedStatusCodes.get(0).intValue());
assertEquals("Success", recordedReplies.get(0));
}

public void testPostEvents_whenUnavailableThenSuccessShouldRetry() {
// Arrange
HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3);
final AtomicInteger callCount = new AtomicInteger(0);
HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 2);
final List<Integer> recordedStatusCodes = new ArrayList<>();
final List<String> recordedReplies = new ArrayList<>();
final List<Exception> recordedExceptions = new ArrayList<>();
HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, recordedExceptions);

// Act
middleware.postEvents(null, sender, callback);


// Assert
assertEquals(3, callCount.get());
assertEquals(1, recordedStatusCodes.size());
assertEquals(1, recordedReplies.size());
assertEquals(0, recordedExceptions.size());
assertEquals(200, recordedStatusCodes.get(0).intValue());
}

public void testPostEvents_whenUnavailableShouldRetryThenStop() {
// Arrange
HttpEventCollectorResendMiddleware middleware = new HttpEventCollectorResendMiddleware(3);
final AtomicInteger callCount = new AtomicInteger(0);
HttpEventCollectorMiddleware.IHttpSender sender = getSender(callCount, 10);
final List<Integer> recordedStatusCodes = new ArrayList<>();
final List<String> recordedReplies = new ArrayList<>();
final List<Exception> recordedExceptions = new ArrayList<>();
HttpEventCollectorMiddleware.IHttpSenderCallback callback = getCallback(recordedStatusCodes, recordedReplies, recordedExceptions);

// Act
middleware.postEvents(null, sender, callback);


// Assert
assertEquals(4, callCount.get());
assertEquals(1, recordedStatusCodes.size());
assertEquals(1, recordedReplies.size());
assertEquals(0, recordedExceptions.size());
assertEquals(503, recordedStatusCodes.get(0).intValue());
}

private static HttpEventCollectorMiddleware.IHttpSender getSender(AtomicInteger callCount, int errorCount) {
return new HttpEventCollectorMiddleware.IHttpSender() {
@Override
public void postEvents(List<HttpEventCollectorEventInfo> events, HttpEventCollectorMiddleware.IHttpSenderCallback callback) {
callCount.incrementAndGet();
if (callCount.get() > errorCount) {
callback.completed(200, "Success");
} else {
callback.completed(503, "Service Unavailable");
}
}
};
}

private static HttpEventCollectorMiddleware.IHttpSenderCallback getCallback(List<Integer> recordedStatusCodes, List<String> recordedReplies, List<Exception> recordedExceptions) {
return new HttpEventCollectorMiddleware.IHttpSenderCallback() {

@Override
public void completed(int statusCode, String reply) {
recordedStatusCodes.add(statusCode);
recordedReplies.add(reply);
}

@Override
public void failed(Exception ex) {
recordedExceptions.add(ex);
}
};
}
}