diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96d151..21755591 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] +- Added new HTTP Sink configuration options: `gid.connector.http.sink.success-codes`, `gid.connector.http.sink.retry-codes`, and `gid.connector.http.sink.ignored-response-codes`. - Amend to not log HTTP request response and header values by default. - Added http 2 support. diff --git a/README.md b/README.md index e4bfe9ef..a601e160 100644 --- a/README.md +++ b/README.md @@ -452,16 +452,21 @@ is provided. ## HTTP status code handler ### Sink table You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table. -By default all 400 and 500 response codes will be interpreted as error code. +By default all 400 and 500 response codes will be interpreted as error code. 500, 503, and 504 response codes will be interpreted as retry. + +The sink categorizes HTTP responses into groups: +- Success codes (`gid.connector.http.sink.success-codes`): Expected successful responses. `1XX, 2XX, 3XX` are defaults +- Retry codes (`gid.connector.http.sink.retry-codes`): Transient errors that trigger automatic retries when using `at-least-once` delivery guarantee. `500, 503, 504` are defaults +- Ignored responses (`gid.connector.http.sink.ignored-response-codes`): Responses whose content is ignored but treated as successful. +- Error codes: Any response code not classified in the above groups. + +Parameters support whitelisting and blacklisting: `2XX,404,!203` means all codes from 200-299, plus 404, except 203. + +#### Legacy error code configuration +For backward compatibility, you can use the legacy properties: +- `gid.connector.http.sink.error.code` - HTTP status codes treated as errors (supports masks like `3XX, 4XX, 5XX`). +- `gid.connector.http.sink.error.code.exclude` - HTTP codes to exclude from the error list. -This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are: -- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404. -Many status codes can be defined in one value, where each code should be separated with comma, for example: -`401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors. -An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors. -- `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list. - Many status codes can be defined in one value, where each code should be separated with comma, for example: - `401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes. ### Source table The source table categorizes HTTP responses into three groups based on status codes: @@ -617,8 +622,11 @@ be requested if the current time is later than the cached token expiry time minu | sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. | | sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. | | gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. | -| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. | -| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. | +| gid.connector.http.sink.error.code `DEPRECATED` | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. | +| gid.connector.http.sink.error.code.exclude `DEPRECATED` | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. | +| gid.connector.http.sink.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. Defaults are `1XX,2XX,3XX` | +| gid.connector.http.sink.retry-codes | optional | Comma separated http codes considered as transient errors that will trigger retries. Use [1-5]XX for groups and '!' character for excluding. Only used when `sink.delivery-guarantee` is set to `at-least-once`. Defaults are `500,503,504` | +| gid.connector.http.sink.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. | | gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | diff --git a/src/main/java/com/getindata/connectors/http/BatchHttpStatusCodeValidationFailedException.java b/src/main/java/com/getindata/connectors/http/BatchHttpStatusCodeValidationFailedException.java new file mode 100644 index 00000000..ed20141a --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/BatchHttpStatusCodeValidationFailedException.java @@ -0,0 +1,17 @@ +package com.getindata.connectors.http; + +import java.util.List; + +import lombok.Getter; + +import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; + +@Getter +public class BatchHttpStatusCodeValidationFailedException extends Exception { + private final List failedRequests; + + public BatchHttpStatusCodeValidationFailedException(String message, List failedRequests) { + super(message); + this.failedRequests = failedRequests; + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java index b5637377..bca876db 100644 --- a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java +++ b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java @@ -1,31 +1,57 @@ package com.getindata.connectors.http.internal; import java.util.List; +import java.util.stream.Collectors; import lombok.Data; import lombok.NonNull; import lombok.ToString; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; /** * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted - * to write, divided into two lists — successful and failed ones. + * to write. */ @Data @ToString public class SinkHttpClientResponse { /** - * A list of successfully written requests. + * A list of requests along with write status. */ @NonNull - private final List successfulRequests; + private final List requests; - /** - * A list of requests that {@link SinkHttpClient} failed to write. - */ - @NonNull - private final List failedRequests; + private List getRequestByStatus(final ResponseItemStatus status) { + return requests.stream() + .filter(r -> r.getStatus().equals(status)) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getSuccessfulRequests() { + return getRequestByStatus(ResponseItemStatus.SUCCESS); + } + + public List getFailedRequests() { + return getRequestByStatus(ResponseItemStatus.FAILURE); + } + + public List getTemporalRequests() { + return getRequestByStatus(ResponseItemStatus.TEMPORAL); + } + + public List getIgnoredRequests() { + return getRequestByStatus(ResponseItemStatus.IGNORE); + } + + @Data + @ToString + public static class ResponseItem { + private final HttpRequest request; + private final ResponseItemStatus status; + } } diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index c6e436ad..1afe30fa 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -19,6 +19,7 @@ public final class HttpConnectorConfigConstants { */ public static final String GID_CONNECTOR_HTTP = "gid.connector.http."; private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup."; + private static final String SINK_PREFIX = GID_CONNECTOR_HTTP + "sink."; /** * A property prefix for http connector header properties @@ -45,9 +46,13 @@ public final class HttpConnectorConfigConstants { public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type"; // --------- Error code handling configuration --------- - public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude"; + public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = SINK_PREFIX + "error.code.exclude"; + public static final String HTTP_ERROR_SINK_CODES_LIST = SINK_PREFIX + "error.code"; + + public static final String SINK_SUCCESS_CODES = SINK_PREFIX + "success-codes"; + public static final String SINK_RETRY_CODES = SINK_PREFIX + "retry-codes"; + public static final String SINK_IGNORE_RESPONSE_CODES = SINK_PREFIX + "ignored-response-codes"; - public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code"; // ----------------------------------------------------- public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER = diff --git a/src/main/java/com/getindata/connectors/http/internal/config/ResponseItemStatus.java b/src/main/java/com/getindata/connectors/http/internal/config/ResponseItemStatus.java new file mode 100644 index 00000000..b84db3dc --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/config/ResponseItemStatus.java @@ -0,0 +1,18 @@ +package com.getindata.connectors.http.internal.config; + +public enum ResponseItemStatus { + SUCCESS("success"), + TEMPORAL("temporal"), + IGNORE("ignore"), + FAILURE("failure"); + + private final String status; + + ResponseItemStatus(String status) { + this.status = status; + } + + public String getStatus() { + return status; + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index d17e9213..ebbbb7dc 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -16,8 +16,10 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import com.getindata.connectors.http.BatchHttpStatusCodeValidationFailedException; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; +import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; import com.getindata.connectors.http.internal.utils.ThreadUtils; /** @@ -89,8 +91,10 @@ protected void submitRequestEntries( if (err != null) { int failedRequestsNumber = requestEntries.size(); log.error( - "Http Sink fatally failed to write all {} requests", - failedRequestsNumber); + "Http Sink fatally failed to write {} requests", + failedRequestsNumber, + err + ); numRecordsSendErrorsCounter.inc(failedRequestsNumber); // TODO: Make `HttpSinkInternal` retry the failed requests. @@ -98,11 +102,33 @@ protected void submitRequestEntries( // to the `numRecordsSendErrors` metric. It is due to the fact we do not have // a clear image how we want to do it, so it would be both efficient and correct. //requestResult.accept(requestEntries); - } else if (response.getFailedRequests().size() > 0) { - int failedRequestsNumber = response.getFailedRequests().size(); - log.error("Http Sink failed to write and will retry {} requests", - failedRequestsNumber); - numRecordsSendErrorsCounter.inc(failedRequestsNumber); + } else { + List failedRequests = response.getFailedRequests(); + List ignoredRequests = response.getIgnoredRequests(); + List temporalRequests = response.getTemporalRequests(); + + if (!failedRequests.isEmpty()) { + numRecordsSendErrorsCounter.inc(failedRequests.size()); + log.error( + "failed requests: {}, throwing BatchHttpStatusCodeValidationFailedException from sink", + failedRequests + ); + getFatalExceptionCons().accept(new BatchHttpStatusCodeValidationFailedException( + String.format("Received %d fatal response codes", failedRequests.size()), failedRequests) + ); + } + + if (!ignoredRequests.isEmpty()) { + log.info("Ignoring {} requests", ignoredRequests.size()); + } + + if (!temporalRequests.isEmpty()) { + numRecordsSendErrorsCounter.inc(temporalRequests.size()); + log.error( + "Http Sink failed to write {} temporal requests", + temporalRequests.size() + ); + } // TODO: Make `HttpSinkInternal` retry the failed requests. Currently, // it does not retry those at all, only adds their count to the diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..807e7223 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -3,6 +3,7 @@ import java.net.http.HttpClient; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -11,16 +12,17 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.ConfigurationException; import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; -import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker; +import com.getindata.connectors.http.internal.status.HttpCodesParser; +import com.getindata.connectors.http.internal.status.HttpResponseChecker; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; /** @@ -34,7 +36,7 @@ public class JavaNetSinkHttpClient implements SinkHttpClient { private final Map headerMap; - private final HttpStatusCodeChecker statusCodeChecker; + private final HttpResponseChecker responseChecker; private final HttpPostRequestCallback httpPostRequestCallback; @@ -53,16 +55,7 @@ public JavaNetSinkHttpClient( headerPreprocessor ); - // TODO Inject this via constructor when implementing a response processor. - // Processor will be injected and it will wrap statusChecker implementation. - ComposeHttpStatusCodeCheckerConfig checkerConfig = - ComposeHttpStatusCodeCheckerConfig.builder() - .properties(properties) - .whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST) - .build(); - - this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + this.responseChecker = createHttpResponseChecker(properties); this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(this.headerMap); this.requestSubmitter = requestSubmitterFactory.createSubmitter( @@ -71,6 +64,68 @@ public JavaNetSinkHttpClient( ); } + public static HttpResponseChecker createHttpResponseChecker(Properties properties) { + try { + String deprecatedIgnoreExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, + "" + ); + String deprecatedErrorExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, + "" + ); + + if (deprecatedIgnoreExpr.replace(',',' ').trim().isEmpty() + && deprecatedErrorExpr.replace(',',' ').trim().isEmpty()) { + return createHttpResponseCheckerWithDefaults(properties); + } else { + return createBackwardsCompatibleResponseChecker(properties); + } + } catch (ConfigurationException e) { + throw new IllegalStateException(e); + } + } + + private static HttpResponseChecker createHttpResponseCheckerWithDefaults(Properties properties) + throws ConfigurationException { + String ignoreCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.SINK_IGNORE_RESPONSE_CODES, + "" + ); + String retryCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.SINK_RETRY_CODES, + "500,503,504" + ); + String successCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.SINK_SUCCESS_CODES, + "1XX,2XX,3XX" + ); + + return new HttpResponseChecker(successCodeExpr, retryCodeExpr, ignoreCodeExpr); + } + + private static HttpResponseChecker createBackwardsCompatibleResponseChecker(Properties properties) + throws ConfigurationException { + String ignoreCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, + "" + ); + String errorCodeExpr = properties.getProperty( + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, + "4XX,5XX" + ); + + //backwards compatibility + var ignoreErrorCodes = HttpCodesParser.parse(ignoreCodeExpr); + var errorCodes = HttpCodesParser.parse(errorCodeExpr); + var retryCodes = HttpCodesParser.parse("500,503,504"); + + var successCodes = new HashSet<>(HttpCodesParser.parse("1XX,2XX,3XX,4XX,5XX")); + successCodes.removeAll(retryCodes); + successCodes.removeAll(errorCodes); + return new HttpResponseChecker(successCodes, retryCodes, ignoreErrorCodes); + } + @Override public CompletableFuture putRequests( List requestEntries, @@ -92,26 +147,29 @@ private CompletableFuture> submitRequests( private SinkHttpClientResponse prepareSinkHttpClientResponse( List responses, String endpointUrl) { - var successfulResponses = new ArrayList(); - var failedResponses = new ArrayList(); + var responseItems = new ArrayList(); for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); var optResponse = response.getResponse(); - httpPostRequestCallback.call( - optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); + httpPostRequestCallback.call(optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - // TODO Add response processor here and orchestrate it with statusCodeChecker. - if (optResponse.isEmpty() || - statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { - failedResponses.add(sinkRequestEntry); + final ResponseItemStatus status; + if (optResponse.isEmpty() || responseChecker.isTemporalError(optResponse.get())) { + status = ResponseItemStatus.TEMPORAL; + } else if (responseChecker.isIgnoreCode(optResponse.get())) { + status = ResponseItemStatus.IGNORE; + } else if (responseChecker.isSuccessful(optResponse.get())) { + status = ResponseItemStatus.SUCCESS; } else { - successfulResponses.add(sinkRequestEntry); + status = ResponseItemStatus.FAILURE; } + + responseItems.add(new SinkHttpClientResponse.ResponseItem(sinkRequestEntry, status)); } - return new SinkHttpClientResponse(successfulResponses, failedResponses); + return new SinkHttpClientResponse(responseItems); } @VisibleForTesting diff --git a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java deleted file mode 100644 index 015c068c..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/ComposeHttpStatusCodeChecker.java +++ /dev/null @@ -1,161 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; -import java.util.stream.Collectors; - -import lombok.AccessLevel; -import lombok.Builder; -import lombok.Data; -import lombok.RequiredArgsConstructor; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; - -import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; - -/** - * An implementation of {@link HttpStatusCodeChecker} that checks Http Status code against - * white list, concrete value or {@link HttpResponseCodeType} - */ -public class ComposeHttpStatusCodeChecker implements HttpStatusCodeChecker { - - private static final Set DEFAULT_ERROR_CODES = - Set.of( - new TypeStatusCodeChecker(HttpResponseCodeType.CLIENT_ERROR), - new TypeStatusCodeChecker(HttpResponseCodeType.SERVER_ERROR) - ); - - private static final int MIN_HTTP_STATUS_CODE = 100; - - /** - * Set of {@link HttpStatusCodeChecker} for white listed status codes. - */ - private final Set excludedCodes; - - /** - * Set of {@link HttpStatusCodeChecker} that check status code againts value match or {@link - * HttpResponseCodeType} match. - */ - private final Set errorCodes; - - public ComposeHttpStatusCodeChecker(ComposeHttpStatusCodeCheckerConfig config) { - excludedCodes = prepareWhiteList(config); - errorCodes = prepareErrorCodes(config); - } - - /** - * Checks whether given status code is considered as a error code. - * This implementation checks if status code matches any single value mask like "404" - * or http type mask such as "4XX". Code that matches one of those masks and is not on a - * white list will be considered as error code. - * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - public boolean isErrorCode(int statusCode) { - - Preconditions.checkArgument( - statusCode >= MIN_HTTP_STATUS_CODE, - String.format( - "Provided invalid Http status code %s," - + " status code should be equal or bigger than %d.", - statusCode, - MIN_HTTP_STATUS_CODE) - ); - - boolean isWhiteListed = excludedCodes.stream() - .anyMatch(check -> check.isWhiteListed(statusCode)); - - return !isWhiteListed - && errorCodes.stream() - .anyMatch(httpStatusCodeChecker -> httpStatusCodeChecker.isErrorCode(statusCode)); - } - - private Set prepareErrorCodes( - ComposeHttpStatusCodeCheckerConfig config) { - - Properties properties = config.getProperties(); - String errorCodePrefix = config.getErrorCodePrefix(); - - String errorCodes = - properties.getProperty(errorCodePrefix, ""); - - if (StringUtils.isNullOrWhitespaceOnly(errorCodes)) { - return DEFAULT_ERROR_CODES; - } else { - String[] splitCodes = errorCodes.split(HttpConnectorConfigConstants.PROP_DELIM); - return prepareErrorCodes(splitCodes); - } - } - - /** - * Process given array of status codes and assign them to - * {@link SingleValueHttpStatusCodeChecker} for full codes such as 100, 404 etc. or to - * {@link TypeStatusCodeChecker} for codes that were constructed with "XX" mask - */ - private Set prepareErrorCodes(String[] statusCodes) { - - Set errorCodes = new HashSet<>(); - for (String sCode : statusCodes) { - if (!StringUtils.isNullOrWhitespaceOnly(sCode)) { - String trimCode = sCode.toUpperCase().trim(); - Preconditions.checkArgument( - trimCode.length() == 3, - "Status code should contain three characters. Provided [%s]", - trimCode); - - // at this point we have trim, upper case 3 character status code. - if (isTypeCode(trimCode)) { - int code = Integer.parseInt(trimCode.replace("X", "")); - errorCodes.add(new TypeStatusCodeChecker(HttpResponseCodeType.getByCode(code))); - } else { - errorCodes.add( - new SingleValueHttpStatusCodeChecker(Integer.parseInt(trimCode)) - ); - } - } - } - return (errorCodes.isEmpty()) ? DEFAULT_ERROR_CODES : errorCodes; - } - - private Set prepareWhiteList( - ComposeHttpStatusCodeCheckerConfig config) { - - Properties properties = config.getProperties(); - String whiteListPrefix = config.getWhiteListPrefix(); - - return Arrays.stream( - properties.getProperty(whiteListPrefix, "") - .split(HttpConnectorConfigConstants.PROP_DELIM)) - .filter(sCode -> !StringUtils.isNullOrWhitespaceOnly(sCode)) - .map(String::trim) - .mapToInt(Integer::parseInt) - .mapToObj(WhiteListHttpStatusCodeChecker::new) - .collect(Collectors.toSet()); - } - - /** - * This method checks if "code" param matches "digit + XX" mask. This method expects that - * provided string will be 3 elements long, trim and upper case. - * - * @param code to check if it contains XX on second ant third position. Parameter is expected to - * be 3 characters long, trim and uppercase. - * @return true if string matches "anything + XX" and false if not. - */ - private boolean isTypeCode(final String code) { - return code.charAt(1) == 'X' && code.charAt(2) == 'X'; - } - - @Data - @Builder - @RequiredArgsConstructor(access = AccessLevel.PRIVATE) - public static class ComposeHttpStatusCodeCheckerConfig { - - private final String whiteListPrefix; - - private final String errorCodePrefix; - - private final Properties properties; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java index 6b59c4bf..90cf8658 100644 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java +++ b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseChecker.java @@ -13,16 +13,28 @@ public class HttpResponseChecker { private final Set successCodes; private final Set temporalErrorCodes; + private final Set ignoreCodes; - HttpResponseChecker(@NonNull String successCodeExpr, @NonNull String temporalErrorCodeExpr) - throws ConfigurationException { - this(HttpCodesParser.parse(successCodeExpr), HttpCodesParser.parse(temporalErrorCodeExpr)); + public HttpResponseChecker( + @NonNull String successCodeExpr, + @NonNull String temporalErrorCodeExpr, + @NonNull String ignoreCodeExpr + ) throws ConfigurationException { + this( + HttpCodesParser.parse(successCodeExpr), + HttpCodesParser.parse(temporalErrorCodeExpr), + HttpCodesParser.parse(ignoreCodeExpr) + ); } - public HttpResponseChecker(@NonNull Set successCodes, @NonNull Set temporalErrorCodes) - throws ConfigurationException { + public HttpResponseChecker( + @NonNull Set successCodes, + @NonNull Set temporalErrorCodes, + @NonNull Set ignoreCodes + ) throws ConfigurationException { this.successCodes = successCodes; this.temporalErrorCodes = temporalErrorCodes; + this.ignoreCodes = ignoreCodes; validate(); } @@ -31,7 +43,7 @@ public boolean isSuccessful(HttpResponse response) { } public boolean isSuccessful(int httpStatusCode) { - return successCodes.contains(httpStatusCode); + return successCodes.contains(httpStatusCode) || ignoreCodes.contains(httpStatusCode); } public boolean isTemporalError(HttpResponse response) { @@ -42,12 +54,33 @@ public boolean isTemporalError(int httpStatusCode) { return temporalErrorCodes.contains(httpStatusCode); } + public boolean isIgnoreCode(HttpResponse response) { + return isIgnoreCode(response.statusCode()); + } + + public boolean isIgnoreCode(int httpStatusCode) { + return ignoreCodes.contains(httpStatusCode); + } + + public boolean isErrorCode(HttpResponse response) { + return isErrorCode(response.statusCode()); + } + + public boolean isErrorCode(int httpStatusCode) { + return !isTemporalError(httpStatusCode) && !isSuccessful(httpStatusCode); + } + private void validate() throws ConfigurationException { - if (successCodes.isEmpty()) { - throw new ConfigurationException("Success code list can not be empty"); + if (successCodes.isEmpty() && ignoreCodes.isEmpty()) { + throw new ConfigurationException("Success and ignore code lists can not be empty"); } - var intersection = new HashSet<>(successCodes); - intersection.retainAll(temporalErrorCodes); + HashSet intersection = new HashSet<>(temporalErrorCodes); + + HashSet combinedSuccessIgnoreCodes = new HashSet<>(); + combinedSuccessIgnoreCodes.addAll(successCodes); + combinedSuccessIgnoreCodes.addAll(ignoreCodes); + + intersection.retainAll(combinedSuccessIgnoreCodes); if (!intersection.isEmpty()) { throw new ConfigurationException("Http codes " + intersection + " can not be used as both success and retry codes"); diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseCodeType.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseCodeType.java deleted file mode 100644 index 71f174eb..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpResponseCodeType.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import java.util.HashMap; -import java.util.Map; - -/** - * This enum represents HTTP response code types, grouped by "hundreds" digit. - */ -public enum HttpResponseCodeType { - - INFO(1), - SUCCESS(2), - REDIRECTION(3), - CLIENT_ERROR(4), - SERVER_ERROR(5); - - private static final Map map; - - static { - map = new HashMap<>(); - for (HttpResponseCodeType httpResponseCodeType : HttpResponseCodeType.values()) { - map.put(httpResponseCodeType.httpTypeCode, httpResponseCodeType); - } - } - - private final int httpTypeCode; - - HttpResponseCodeType(int httpTypeCode) { - this.httpTypeCode = httpTypeCode; - } - - /** - * @param statusCode Http status code to get the {@link HttpResponseCodeType} instance for. - * @return a {@link HttpResponseCodeType} instance based on http type code, for example {@code - * HttpResponseCodeType.getByCode(1)} will return {@link HttpResponseCodeType#INFO} type. - */ - public static HttpResponseCodeType getByCode(int statusCode) { - return map.get(statusCode); - } - - /** - * @return a "hundreds" digit that represents given {@link HttpResponseCodeType} instance. - * For example {@code HttpResponseCodeType.INFO.getHttpTypeCode()} will return 1 since HTTP - * information repossess have status codes in range 100 - 199. - */ - public int getHttpTypeCode() { - return this.httpTypeCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java deleted file mode 100644 index 6af0344c..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/HttpStatusCodeChecker.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -/** - * Base interface for all classes that would validate HTTP status - * code whether it is an error or not. - */ -public interface HttpStatusCodeChecker { - - /** - * Validates http status code wheter it is considered as error code. The logic for - * what status codes are considered as "errors" depends on the concreted implementation - * @param statusCode http status code to assess. - * @return true if statusCode is considered as Error and false if not. - */ - boolean isErrorCode(int statusCode); -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java deleted file mode 100644 index b52951ed..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/SingleValueHttpStatusCodeChecker.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; -import lombok.RequiredArgsConstructor; - -/** - * An implementation of {@link HttpStatusCodeChecker} that validates status code against - * constant value. - */ -@RequiredArgsConstructor -@EqualsAndHashCode -public class SingleValueHttpStatusCodeChecker implements HttpStatusCodeChecker { - - /** - * A reference http status code to compare with. - */ - private final int errorCode; - - /** - * Validates given statusCode against constant value. - * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - @Override - public boolean isErrorCode(int statusCode) { - return errorCode == statusCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java deleted file mode 100644 index df942879..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/TypeStatusCodeChecker.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; - -/** - * Implementation of {@link HttpStatusCodeChecker} that verifies if given Http status code - * belongs to specific HTTP code type family. For example if it any of 100's 200's or 500's code. - */ -@EqualsAndHashCode -public class TypeStatusCodeChecker implements HttpStatusCodeChecker { - - /** - * First digit from HTTP status code that describes a type of code, - * for example 1 for all 100's, 5 for all 500's. - */ - private final int httpTypeCode; - - /** - * Creates TypeStatusCodeChecker for given {@link HttpResponseCodeType} - * - * @param httpResponseCodeType {@link HttpResponseCodeType} for this {@link - * TypeStatusCodeChecker} instance. - */ - public TypeStatusCodeChecker(HttpResponseCodeType httpResponseCodeType) { - this.httpTypeCode = httpResponseCodeType.getHttpTypeCode(); - } - - /** - * Checks whether given status code belongs to Http code status type. - * For example: - *
{@code
-     *    TypeStatusCodeChecker checker =  new TypeStatusCodeChecker(5);
-     *    checker.isErrorCode(505); <- will return true.
-     *    }
-     * 
- * @param statusCode http status code to assess. - * @return true if status code is considered as error or false if not. - */ - @Override - public boolean isErrorCode(int statusCode) { - return statusCode / 100 == httpTypeCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java b/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java deleted file mode 100644 index 2aa65c65..00000000 --- a/src/main/java/com/getindata/connectors/http/internal/status/WhiteListHttpStatusCodeChecker.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.getindata.connectors.http.internal.status; - -import lombok.EqualsAndHashCode; -import lombok.RequiredArgsConstructor; - -/** - * Class that implements logic of a "white list" against single constant value. - */ -@RequiredArgsConstructor -@EqualsAndHashCode -public class WhiteListHttpStatusCodeChecker { - - private final int whiteListCode; - - /** - * Checks if given statusCode is considered as "white listed" - * @param statusCode status code to check. - * @return true if given statusCode is white listed and false if not. - */ - public boolean isWhiteListed(int statusCode) { - return whiteListCode == statusCode; - } -} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 81a75674..2afd2097 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -9,11 +9,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; @@ -61,7 +59,7 @@ public class JavaNetHttpPollingClient implements PollingClient { private final ObjectMapper objectMapper; private final HttpPostRequestCallback httpPostRequestCallback; private final HttpLookupConfig options; - private final Set ignoredErrorCodes; + private final HttpResponseChecker responseChecker; private final boolean continueOnError; public JavaNetHttpPollingClient( @@ -77,17 +75,16 @@ public JavaNetHttpPollingClient( this.options = options; var config = options.getReadableConfig(); - this.ignoredErrorCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES)); + var ignoreCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_IGNORED_RESPONSE_CODES)); var errorCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_RETRY_CODES)); - var successCodes = new HashSet(); - successCodes.addAll(HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES))); - successCodes.addAll(ignoredErrorCodes); + var successCodes = HttpCodesParser.parse(config.get(SOURCE_LOOKUP_HTTP_SUCCESS_CODES)); this.continueOnError = config.get(SOURCE_LOOKUP_CONTINUE_ON_ERROR); + this.responseChecker = new HttpResponseChecker(successCodes, errorCodes, ignoreCodes); this.httpClient = HttpClientWithRetry.builder() .httpClient(httpClient) .retryConfig(RetryConfigProvider.create(config)) - .responseChecker(new HttpResponseChecker(successCodes, errorCodes)) + .responseChecker(responseChecker) .build(); } @@ -213,7 +210,7 @@ private HttpRowDataWrapper processHttpResponse( var responseBody = response.body(); log.debug("Received status code [{}] for RestTableSource request", response.statusCode()); - if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || ignoreResponse(response))) { + if (!isError && (StringUtils.isNullOrWhitespaceOnly(responseBody) || responseChecker.isIgnoreCode(response))) { return HttpRowDataWrapper.builder() .data(Collections.emptyList()) .httpCompletionState(HttpCompletionState.SUCCESS) @@ -291,8 +288,4 @@ private List deserializeArray(byte[] rawBytes) throws IOException { } return result; } - - private boolean ignoreResponse(HttpResponse response) { - return ignoredErrorCodes.contains(response.statusCode()); - } } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java index cbf33c2f..8631c570 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java @@ -21,6 +21,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension; import org.junit.jupiter.api.AfterEach; @@ -30,6 +31,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.*; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.getindata.connectors.http.HttpSink; @@ -191,13 +193,11 @@ public List testConnection( @Test public void testServerErrorConnection() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) .inScenario("Retry Scenario") .whenScenarioStateIs(STARTED) .willReturn(serverError()) .willSetStateTo("Cause Success")); wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) .inScenario("Retry Scenario") .whenScenarioStateIs("Cause Success") .willReturn(aResponse().withStatus(200)) @@ -249,7 +249,7 @@ public void testFailedConnection() throws Exception { .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .build(); source.sinkTo(httpSink); - env.execute("Http Sink test failed connection"); + assertThrows(JobExecutionException.class, () -> env.execute("Http Sink test failed connection")); assertEquals(1, SendErrorsTestReporterFactory.getCount()); // var postedRequests = wireMockServer diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitterTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitterTest.java index c6581255..42785fe3 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitterTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitterTest.java @@ -1,6 +1,7 @@ package com.getindata.connectors.http.internal.sink.httpclient; import java.net.http.HttpClient; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -8,6 +9,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -15,6 +17,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -93,4 +96,27 @@ public void shouldSplitBatchPerHttpMethod( verify(mockHttpClient, times(expectedNumberOfBatchRequests)).sendAsync(any(), any()); } + + @Test + public void shouldHandleEmptyBatch() { + + Properties properties = new Properties(); + properties.setProperty( + HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE, + String.valueOf(50) + ); + + BatchRequestSubmitter submitter = new BatchRequestSubmitter( + properties, + new String[0], + mockHttpClient + ); + + List> futures = submitter.submit( + "http://hello.pl", + Collections.emptyList() + ); + + assertEquals(futures, Collections.emptyList()); + } } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java index c12b2699..bdd1e99c 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClientTest.java @@ -1,12 +1,16 @@ package com.getindata.connectors.http.internal.sink.httpclient; import java.net.http.HttpClient; +import java.util.Collections; +import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -19,7 +23,10 @@ import com.getindata.connectors.http.HttpPostRequestCallback; import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; +import com.getindata.connectors.http.internal.config.ResponseItemStatus; +import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; import static com.getindata.connectors.http.TestHelper.assertPropertyArray; @@ -52,9 +59,9 @@ public void setUp() { postRequestCallback = new Slf4jHttpPostRequestCallback(); headerPreprocessor = HttpHeaderUtils.createBasicAuthorizationHeaderPreprocessor(); httpClientStaticMock.when(HttpClient::newBuilder).thenReturn(httpClientBuilder); - when(httpClientBuilder.followRedirects(any())).thenReturn(httpClientBuilder); - when(httpClientBuilder.sslContext(any())).thenReturn(httpClientBuilder); - when(httpClientBuilder.executor(any())).thenReturn(httpClientBuilder); + lenient().when(httpClientBuilder.followRedirects(any())).thenReturn(httpClientBuilder); + lenient().when(httpClientBuilder.sslContext(any())).thenReturn(httpClientBuilder); + lenient().when(httpClientBuilder.executor(any())).thenReturn(httpClientBuilder); } private static Stream provideSubmitterFactory() { @@ -119,4 +126,55 @@ public void shouldBuildClientWithHeaders(RequestSubmitterFactory requestSubmitte assertPropertyArray(headersAndValues, "Access-Control-Allow-Origin", "*"); } + @Test + public void shouldHandleEmptyResponse() throws Exception { + // GIVEN + HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("POST", "test data".getBytes()); + List requestEntries = Collections.singletonList(requestEntry); + + // Create an HttpRequest (which wraps java.net.http.HttpRequest) + java.net.http.HttpRequest javaHttpRequest = java.net.http.HttpRequest.newBuilder() + .uri(java.net.URI.create("http://test.com/endpoint")) + .POST(java.net.http.HttpRequest.BodyPublishers.ofByteArray(requestEntry.element)) + .build(); + HttpRequest httpRequest = new HttpRequest( + javaHttpRequest, + List.of(requestEntry.element), + requestEntry.method + ); + + // Create a response wrapper with an empty response (null) + JavaNetHttpResponseWrapper responseWrapper = new JavaNetHttpResponseWrapper(httpRequest, null); + + // Mock RequestSubmitterFactory to return a mock RequestSubmitter + RequestSubmitterFactory mockFactory = mock(RequestSubmitterFactory.class); + RequestSubmitter mockSubmitter = mock(RequestSubmitter.class); + when(mockFactory.createSubmitter(any(Properties.class), any(String[].class))) + .thenReturn(mockSubmitter); + + // Mock the submit method to return a completed future with the empty response + CompletableFuture responseFuture = + CompletableFuture.completedFuture(responseWrapper); + when(mockSubmitter.submit(anyString(), eq(requestEntries))) + .thenReturn(Collections.singletonList(responseFuture)); + + // WHEN + JavaNetSinkHttpClient client = new JavaNetSinkHttpClient( + new Properties(), + postRequestCallback, + headerPreprocessor, + mockFactory + ); + + SinkHttpClientResponse response = client.putRequests(requestEntries, "http://test.com/endpoint").get(); + + // THEN + assertThat(response.getRequests()).hasSize(1); + assertThat(response.getRequests().get(0).getStatus()).isEqualTo(ResponseItemStatus.TEMPORAL); + assertThat(response.getTemporalRequests()).hasSize(1); + assertThat(response.getSuccessfulRequests()).isEmpty(); + assertThat(response.getFailedRequests()).isEmpty(); + assertThat(response.getIgnoredRequests()).isEmpty(); + } + } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerDeprecationTest.java similarity index 67% rename from src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java rename to src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerDeprecationTest.java index 23baaead..edf54409 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/httpclient/status/ComposeHttpStatusCodeCheckerDeprecationTest.java @@ -18,10 +18,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker; -import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig; +import com.getindata.connectors.http.internal.status.HttpResponseChecker; +import static com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient.createHttpResponseChecker; -class ComposeHttpStatusCodeCheckerTest { +class ComposeHttpStatusCodeCheckerDeprecationTest { private static final String STRING_CODES = "403, 100,200, 300, , 303 ,200"; @@ -33,7 +33,7 @@ class ComposeHttpStatusCodeCheckerTest { .boxed() .collect(Collectors.toList()); - private ComposeHttpStatusCodeChecker codeChecker; + private HttpResponseChecker codeChecker; @BeforeAll public static void beforeAll() { @@ -53,14 +53,11 @@ private static Stream propertiesArguments() { @MethodSource("propertiesArguments") public void shouldPassOnDefault(Properties properties) { - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); - + codeChecker = prepareCheckerConfig(properties); assertAll(() -> { assertThat(codeChecker.isErrorCode(100)).isFalse(); assertThat(codeChecker.isErrorCode(200)).isFalse(); - assertThat(codeChecker.isErrorCode(500)).isTrue(); + assertThat(codeChecker.isTemporalError(500)).isTrue(); assertThat(codeChecker.isErrorCode(501)).isTrue(); assertThat(codeChecker.isErrorCode(400)).isTrue(); assertThat(codeChecker.isErrorCode(404)).isTrue(); @@ -72,24 +69,22 @@ public void shouldParseWhiteList() { Properties properties = new Properties(); properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, - STRING_CODES); + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST, + STRING_CODES); properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, - "1XX, 2XX, 3XX, 4XX, 5XX" + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, + "1XX, 2XX, 3XX, 4XX, 5XX" ); - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + codeChecker = prepareCheckerConfig(properties); assertAll(() -> { CODES.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isFalse()); assertThat(codeChecker.isErrorCode(301)) .withFailMessage( - "Not on a white list but matches 3XX range. " - + "Should be considered as error code.") + "Not on a white list but matches 3XX range. " + + "Should be considered as error code.") .isTrue(); }); } @@ -100,11 +95,10 @@ public void shouldParseErrorCodeList() { Properties properties = new Properties(); properties.setProperty( HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, - STRING_CODES); - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + STRING_CODES + ); - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + codeChecker = prepareCheckerConfig(properties); assertAll(() -> CODES.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isTrue())); } @@ -114,20 +108,19 @@ public void shouldParseErrorCodeRange() { Properties properties = new Properties(); properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, "1xx, 2XX "); + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, "1xx, 2XX " + ); List codes = List.of(100, 110, 200, 220); - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); - - codeChecker = new ComposeHttpStatusCodeChecker(checkerConfig); + codeChecker = prepareCheckerConfig(properties); assertAll(() -> { codes.forEach(code -> assertThat(codeChecker.isErrorCode(code)).isTrue()); assertThat(codeChecker.isErrorCode(303)) .withFailMessage( - "Out ot Error code type range therefore should be not marked as error code.") + "Out ot Error code type range therefore should be not marked as error code.") .isFalse(); }); } @@ -138,13 +131,12 @@ public void shouldThrowOnInvalidCodeRange(String listCode) { Properties properties = new Properties(); properties.setProperty( - HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, listCode); - - ComposeHttpStatusCodeCheckerConfig checkerConfig = prepareCheckerConfig(properties); + HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST, listCode + ); assertThrows( Exception.class, - () -> new ComposeHttpStatusCodeChecker(checkerConfig) + () -> prepareCheckerConfig(properties) ); } @@ -161,11 +153,7 @@ private static Properties prepareErrorCodeProperties(String errorCodeList, Strin return properties; } - private ComposeHttpStatusCodeCheckerConfig prepareCheckerConfig(Properties properties) { - return ComposeHttpStatusCodeCheckerConfig.builder() - .properties(properties) - .whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST) - .errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST) - .build(); + private HttpResponseChecker prepareCheckerConfig(Properties properties) { + return createHttpResponseChecker(properties); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java b/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java index 4cc27052..96054df1 100644 --- a/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/status/HttpResponseCheckerTest.java @@ -23,13 +23,25 @@ class HttpResponseCheckerTest { @Test void failWhenTheSameCodeIsMarkedSuccessAndError() { assertThrows(ConfigurationException.class, - () -> new HttpResponseChecker(Set.of(404), Set.of(404))); + () -> new HttpResponseChecker(Set.of(404), Set.of(404), emptySet())); + assertThrows(ConfigurationException.class, + () -> new HttpResponseChecker("404", "404", "")); + } + + @Test + void failWhenTheSameCodeIsMarkedIgnoreAndError() { + assertThrows(ConfigurationException.class, + () -> new HttpResponseChecker(Set.of(200), Set.of(404), Set.of(404))); + assertThrows(ConfigurationException.class, + () -> new HttpResponseChecker("200", "404", "404")); } @Test void failWhenSuccessListIsEmpty() { assertThrows(ConfigurationException.class, - () -> new HttpResponseChecker(emptySet(), Set.of(500))); + () -> new HttpResponseChecker(emptySet(), Set.of(500), emptySet())); + assertThrows(ConfigurationException.class, + () -> new HttpResponseChecker("", "500", "")); } private static Stream testData() { @@ -50,7 +62,7 @@ private static Stream testData() { @ParameterizedTest @MethodSource("testData") void verifyCodes(InputArgs inputArgs) throws ConfigurationException { - var checker = new HttpResponseChecker("2XX,404,!202", "4XX,!404,500,501,502,!409"); + var checker = new HttpResponseChecker("2XX,!202", "4XX,!404,500,501,502,!409", "404"); var response = inputArgs.getResponse(); switch (inputArgs.getCodeType()) { @@ -70,17 +82,68 @@ void verifyCodes(InputArgs inputArgs) throws ConfigurationException { private void assertSuccessful(HttpResponseChecker checker, HttpResponse response) { assertTrue(checker.isSuccessful(response)); + assertFalse(checker.isErrorCode(response)); assertFalse(checker.isTemporalError(response)); } private void assertTemporalError(HttpResponseChecker checker, HttpResponse response) { assertFalse(checker.isSuccessful(response)); + assertFalse(checker.isErrorCode(response)); assertTrue(checker.isTemporalError(response)); } private void assertError(HttpResponseChecker checker, HttpResponse response) { assertFalse(checker.isSuccessful(response)); assertFalse(checker.isTemporalError(response)); + assertTrue(checker.isErrorCode(response)); + } + + @Test + void testIsIgnoreCode() throws ConfigurationException { + var checker = new HttpResponseChecker("2XX", "5XX", "404"); + var response404 = mock(HttpResponse.class); + when(response404.statusCode()).thenReturn(404); + + assertTrue(checker.isIgnoreCode(response404)); + assertTrue(checker.isSuccessful(response404)); // Ignore codes are considered successful + assertFalse(checker.isTemporalError(response404)); + assertFalse(checker.isErrorCode(response404)); + } + + @Test + void testIsErrorCode() throws ConfigurationException { + var checker = new HttpResponseChecker("2XX", "5XX", "404"); + var response400 = mock(HttpResponse.class); + when(response400.statusCode()).thenReturn(400); + + assertTrue(checker.isErrorCode(response400)); + assertFalse(checker.isSuccessful(response400)); + assertFalse(checker.isTemporalError(response400)); + assertFalse(checker.isIgnoreCode(response400)); + } + + @Test + void testNullSuccessCodeExprThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(null, "5XX", "404")); + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(null, Set.of(500), Set.of(404))); + } + + @Test + void testNullTemporalErrorCodeExprThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker("2XX", null, "404")); + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(Set.of(200), null, Set.of(404))); + } + + @Test + void testNullIgnoreCodeExprThrowsNullPointerException() { + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker("2XX", "5XX", null)); + assertThrows(NullPointerException.class, + () -> new HttpResponseChecker(Set.of(200), Set.of(500), null)); } @RequiredArgsConstructor