diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96d151..566faa06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] +- Added HTTP Sink request retries with delivery guarantee support (`sink.delivery-guarantee`). - Amend to not log HTTP request response and header values by default. - Added http 2 support. diff --git a/README.md b/README.md index e4bfe9ef..a259a11e 100644 --- a/README.md +++ b/README.md @@ -451,9 +451,14 @@ is provided. ## HTTP status code handler ### Sink table -You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table. +You can configure HTTP status code handling for HTTP sink table and enable automatic retries with delivery guarantees. By default all 400 and 500 response codes will be interpreted as error code. +#### Retries and delivery guarantee +HTTP Sink supports automatic retries when `sink.delivery-guarantee` is set to `at-least-once`. Failed requests will be automatically retried based on the configured status codes. +- When `sink.delivery-guarantee` is `at-least-once`: Failed requests are retried automatically using AIMD (Additive Increase Multiplicative Decrease) rate limiting strategy. +- When `sink.delivery-guarantee` is `none` (default): Failed requests are logged but not retried. + This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are: - `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404. Many status codes can be defined in one value, where each code should be separated with comma, for example: @@ -612,6 +617,7 @@ be requested if the current time is later than the cached token expiry time minu | url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | +| sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'. 'exactly-once' semantic is not supported. | | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | | sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. | | sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. | @@ -736,9 +742,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json ### HTTP TableLookup Source - Check other `//TODO`'s. -### HTTP Sink -- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented. - ### [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
diff --git a/src/main/java/com/getindata/connectors/http/HttpSink.java b/src/main/java/com/getindata/connectors/http/HttpSink.java index 23faf100..8dc42b5a 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSink.java +++ b/src/main/java/com/getindata/connectors/http/HttpSink.java @@ -3,6 +3,7 @@ import java.util.Properties; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.ElementConverter; import com.getindata.connectors.http.internal.HeaderPreprocessor; @@ -41,6 +42,7 @@ public class HttpSink extends HttpSinkInternal { long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + DeliveryGuarantee deliveryGuarantee, String endpointUrl, HttpPostRequestCallback httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, @@ -54,6 +56,7 @@ public class HttpSink extends HttpSinkInternal { maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, + deliveryGuarantee, endpointUrl, httpPostRequestCallback, headerPreprocessor, diff --git a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java index 208bcf01..53b5f18c 100644 --- a/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java +++ b/src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java @@ -4,8 +4,10 @@ import java.util.Properties; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.util.Preconditions; import com.getindata.connectors.http.internal.HeaderPreprocessor; import com.getindata.connectors.http.internal.SinkHttpClient; @@ -71,6 +73,8 @@ public class HttpSinkBuilder extends private final Properties properties = new Properties(); + private DeliveryGuarantee deliveryGuarantee; + // Mandatory field private String endpointUrl; @@ -92,6 +96,17 @@ public class HttpSinkBuilder extends this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR; } + /** + * @param deliveryGuarantee HTTP Sink delivery guarantee + * @return {@link HttpSinkBuilder} itself + */ + public HttpSinkBuilder setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE, + "Only at-least-once and none delivery guarantees are supported."); + this.deliveryGuarantee = deliveryGuarantee; + return this; + } + /** * @param endpointUrl the URL of the endpoint * @return {@link HttpSinkBuilder} itself @@ -181,6 +196,7 @@ public HttpSink build() { Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B), Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B), + Optional.ofNullable(deliveryGuarantee).orElse(DeliveryGuarantee.NONE), endpointUrl, httpPostRequestCallback, headerPreprocessor, diff --git a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java index b5637377..f75307a8 100644 --- a/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java +++ b/src/main/java/com/getindata/connectors/http/internal/SinkHttpClientResponse.java @@ -1,6 +1,7 @@ package com.getindata.connectors.http.internal; import java.util.List; +import java.util.stream.Collectors; import lombok.Data; import lombok.NonNull; @@ -11,21 +12,36 @@ /** * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted - * to write, divided into two lists — successful and failed ones. + * to write. */ @Data @ToString public class SinkHttpClientResponse { /** - * A list of successfully written requests. + * A list of requests along with write status. */ @NonNull - private final List successfulRequests; + private final List requests; - /** - * A list of requests that {@link SinkHttpClient} failed to write. - */ - @NonNull - private final List failedRequests; + public List getSuccessfulRequests() { + return requests.stream() + .filter(ResponseItem::isSuccessful) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + public List getFailedRequests() { + return requests.stream() + .filter(r -> !r.isSuccessful()) + .map(ResponseItem::getRequest) + .collect(Collectors.toList()); + } + + @Data + @ToString + public static class ResponseItem { + private final HttpRequest request; + private final boolean successful; + } } diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java index de37faac..abe243e4 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkInternal.java @@ -5,6 +5,7 @@ import java.util.Collections; import java.util.Properties; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -61,6 +62,8 @@ public class HttpSinkInternal extends AsyncSinkBase httpPostRequestCallback, HeaderPreprocessor headerPreprocessor, @@ -97,6 +101,7 @@ protected HttpSinkInternal( Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl), "The endpoint URL must be set when initializing HTTP Sink."); + this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.httpPostRequestCallback = Preconditions.checkNotNull( @@ -132,6 +137,7 @@ public StatefulSinkWriter> cr getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), + deliveryGuarantee, endpointUrl, sinkHttpClientBuilder.build( properties, @@ -159,6 +165,7 @@ public StatefulSinkWriter> re getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), + deliveryGuarantee, endpointUrl, sinkHttpClientBuilder.build( properties, diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java index d17e9213..23eb5418 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/HttpSinkWriter.java @@ -1,5 +1,6 @@ package com.getindata.connectors.http.internal.sink; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -10,6 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -17,6 +19,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory; import com.getindata.connectors.http.internal.SinkHttpClient; +import com.getindata.connectors.http.internal.SinkHttpClientResponse; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.utils.ThreadUtils; @@ -45,6 +48,8 @@ public class HttpSinkWriter extends AsyncSinkWriter elementConverter, Sink.InitContext context, @@ -54,6 +59,7 @@ public HttpSinkWriter( long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + DeliveryGuarantee deliveryGuarantee, String endpointUrl, SinkHttpClient sinkHttpClient, Collection> bufferedRequestStates, @@ -61,6 +67,7 @@ public HttpSinkWriter( super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates); + this.deliveryGuarantee = deliveryGuarantee; this.endpointUrl = endpointUrl; this.sinkHttpClient = sinkHttpClient; @@ -87,35 +94,63 @@ protected void submitRequestEntries( var future = sinkHttpClient.putRequests(requestEntries, endpointUrl); future.whenCompleteAsync((response, err) -> { if (err != null) { - int failedRequestsNumber = requestEntries.size(); - log.error( - "Http Sink fatally failed to write all {} requests", - failedRequestsNumber); - numRecordsSendErrorsCounter.inc(failedRequestsNumber); - - // TODO: Make `HttpSinkInternal` retry the failed requests. - // Currently, it does not retry those at all, only adds their count - // to the `numRecordsSendErrors` metric. It is due to the fact we do not have - // a clear image how we want to do it, so it would be both efficient and correct. - //requestResult.accept(requestEntries); + handleFullyFailedRequest(err, requestEntries, requestResult); } else if (response.getFailedRequests().size() > 0) { int failedRequestsNumber = response.getFailedRequests().size(); - log.error("Http Sink failed to write and will retry {} requests", - failedRequestsNumber); numRecordsSendErrorsCounter.inc(failedRequestsNumber); + log.error("Http Sink failed to write and will retry {} requests", + failedRequestsNumber); + handlePartiallyFailedRequest(response, requestEntries, requestResult); + } else { + requestResult.accept(Collections.emptyList()); + } + }, sinkWriterThreadPool); + } - // TODO: Make `HttpSinkInternal` retry the failed requests. Currently, - // it does not retry those at all, only adds their count to the - // `numRecordsSendErrors` metric. It is due to the fact we do not have - // a clear image how we want to do it, so it would be both efficient and correct. + private void handleFullyFailedRequest(Throwable err, + List requestEntries, + Consumer> requestResult) { + int failedRequestsNumber = requestEntries.size(); + numRecordsSendErrorsCounter.inc(failedRequestsNumber); + + if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + // Retry all requests. + log.error("Http Sink fatally failed to write and will retry {} requests", failedRequestsNumber, err); + requestResult.accept(requestEntries); + } else if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // Do not retry failed requests. + log.error( + "Http Sink fatally failed to write {} requests but will continue due to {} DeliveryGuarantee", + failedRequestsNumber, + deliveryGuarantee, + err + ); + requestResult.accept(Collections.emptyList()); + } else { + throw new UnsupportedOperationException( + "Unsupported delivery guarantee: " + deliveryGuarantee); + } + } - //requestResult.accept(response.getFailedRequests()); - //} else { - //requestResult.accept(Collections.emptyList()); - //} + private void handlePartiallyFailedRequest(SinkHttpClientResponse response, + List requestEntries, + Consumer> requestResult) { + if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) { + // Assumption: the order of response.requests is the same as requestEntries. + // See com.getindata.connectors.http.internal.sink.httpclient. + // JavaNetSinkHttpClient#putRequests where requests are submitted sequentially and + // then their futures are joined sequentially too. + List failedRequestEntries = new ArrayList<>(); + for (int i = 0; i < response.getRequests().size(); ++i) { + if (!response.getRequests().get(i).isSuccessful()) { + failedRequestEntries.add(requestEntries.get(i)); + } } + requestResult.accept(failedRequestEntries); + } else if (deliveryGuarantee == DeliveryGuarantee.NONE) { + // Do not retry failed requests. requestResult.accept(Collections.emptyList()); - }, sinkWriterThreadPool); + } } @Override diff --git a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java index 7e4c19ff..15d93e5d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java @@ -92,8 +92,7 @@ private CompletableFuture> submitRequests( private SinkHttpClientResponse prepareSinkHttpClientResponse( List responses, String endpointUrl) { - var successfulResponses = new ArrayList(); - var failedResponses = new ArrayList(); + var responseItems = new ArrayList(); for (var response : responses) { var sinkRequestEntry = response.getHttpRequest(); @@ -102,16 +101,12 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse( httpPostRequestCallback.call( optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap); - // TODO Add response processor here and orchestrate it with statusCodeChecker. - if (optResponse.isEmpty() || - statusCodeChecker.isErrorCode(optResponse.get().statusCode())) { - failedResponses.add(sinkRequestEntry); - } else { - successfulResponses.add(sinkRequestEntry); - } + boolean isFailed = optResponse.isEmpty() || + statusCodeChecker.isErrorCode(optResponse.get().statusCode()); + responseItems.add(new SinkHttpClientResponse.ResponseItem(sinkRequestEntry, !isFailed)); } - return new SinkHttpClientResponse(successfulResponses, failedResponses); + return new SinkHttpClientResponse(responseItems); } @VisibleForTesting diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java index f634e1d6..acae15e1 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSink.java @@ -25,6 +25,7 @@ import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient; import com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; +import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.DELIVERY_GUARANTEE; import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD; import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL; @@ -125,6 +126,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { HttpSinkBuilder builder = HttpSink .builder() + .setDeliveryGuarantee(tableOptions.get(DELIVERY_GUARANTEE)) .setEndpointUrl(tableOptions.get(URL)) .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setHttpPostRequestCallback(httpPostRequestCallback) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java index b87b6eb7..2953bf02 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicSinkConnectorOptions.java @@ -2,6 +2,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SINK_REQUEST_CALLBACK_IDENTIFIER; @@ -24,4 +25,12 @@ public class HttpDynamicSinkConnectorOptions { ConfigOptions.key(SINK_REQUEST_CALLBACK_IDENTIFIER) .stringType() .defaultValue(Slf4jHttpPostRequestCallbackFactory.IDENTIFIER); + + public static final ConfigOption DELIVERY_GUARANTEE = + ConfigOptions.key("sink.delivery-guarantee") + .enumType(DeliveryGuarantee.class) + .defaultValue(DeliveryGuarantee.NONE) + .withDescription("Defines the delivery semantic for the HTTP sink. " + + "Accepted enumerations are 'at-least-once', and 'none'. " + + "'exactly-once' semantic is not supported."); } diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java index cbf33c2f..9ba4d7cd 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java @@ -15,6 +15,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; @@ -160,7 +161,7 @@ public List testConnection( .setEndpointUrl("http://localhost:" + SERVER_PORT + endpoint) .setElementConverter( (s, _context) -> - new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) + new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) .setProperty( HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type", @@ -189,18 +190,43 @@ public List testConnection( } @Test - public void testServerErrorConnection() throws Exception { + public void testNoRetryWithNoneConnection() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) .inScenario("Retry Scenario") .whenScenarioStateIs(STARTED) .willReturn(serverError()) .willSetStateTo("Cause Success")); - wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .inScenario("Retry Scenario") + + var source = env.fromCollection(List.of(messages.get(0))); + var httpSink = HttpSink.builder() + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> + new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setDeliveryGuarantee(DeliveryGuarantee.NONE) + .build(); + source.sinkTo(httpSink); + + env.execute("Http Sink none delivery guarantee retry"); + + assertEquals(1, SendErrorsTestReporterFactory.getCount()); + } + + @Test + public void testRetryableServerErrorConnection() throws Exception { + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(serviceUnavailable()) + .willSetStateTo("Cause Success")); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") .whenScenarioStateIs("Cause Success") - .willReturn(aResponse().withStatus(200)) + .willReturn(aResponse().withStatus(200).withBody("msg1")) .willSetStateTo("Cause Success")); var source = env.fromCollection(List.of(messages.get(0))); @@ -210,30 +236,77 @@ public void testServerErrorConnection() throws Exception { (s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); assertEquals(1, SendErrorsTestReporterFactory.getCount()); - // TODO: reintroduce along with the retries - // var postedRequests = wireMockServer - // .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); - // assertEquals(2, postedRequests.size()); - // assertEquals(postedRequests.get(0).getBodyAsString(), - // postedRequests.get(1).getBodyAsString()); + + var postedRequests = wireMockServer + .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); + assertEquals(2, postedRequests.size()); + assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString()); + } + + @Test + public void testMixedRetryable() throws Exception { + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(serviceUnavailable()) + .willSetStateTo("Cause Success")); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":0}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs("Cause Success") + .willReturn(aResponse().withStatus(200).withBody("msg1")) + .willSetStateTo("Cause Success")); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":1}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(aResponse().withStatus(200).withBody("msg2")) + .willSetStateTo(STARTED)); + + wireMockServer.stubFor(post(urlPathEqualTo("/myendpoint")) + .withRequestBody(equalTo("[{\"http-sink-id\":1}]")) + .inScenario("Retry Success Scenario") + .whenScenarioStateIs("Cause Success") + .willReturn(aResponse().withStatus(200).withBody("msg2")) + .willSetStateTo("Cause Success")); + + var source = env.fromCollection(List.of(messages.get(0), messages.get(1))); + var httpSink = HttpSink.builder() + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> + new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .build(); + source.sinkTo(httpSink); + env.execute("Http Sink test failed connection"); + + assertEquals(1, SendErrorsTestReporterFactory.getCount()); + + var postedRequests = wireMockServer + .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); + assertEquals(3, postedRequests.size()); } @Test public void testFailedConnection() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) .inScenario("Retry Scenario") .whenScenarioStateIs(STARTED) .willReturn(aResponse().withFault(Fault.EMPTY_RESPONSE)) .willSetStateTo("Cause Success")); wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) .inScenario("Retry Scenario") .whenScenarioStateIs("Cause Success") .willReturn(aResponse().withStatus(200)) @@ -241,6 +314,7 @@ public void testFailedConnection() throws Exception { var source = env.fromCollection(List.of(messages.get(0))); var httpSink = HttpSink.builder() + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") .setElementConverter( (s, _context) -> @@ -252,11 +326,10 @@ public void testFailedConnection() throws Exception { env.execute("Http Sink test failed connection"); assertEquals(1, SendErrorsTestReporterFactory.getCount()); - // var postedRequests = wireMockServer - // .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); - // assertEquals(2, postedRequests.size()); - // assertEquals(postedRequests.get(0).getBodyAsString(), - // postedRequests.get(1).getBodyAsString()); + var postedRequests = wireMockServer + .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); + assertEquals(2, postedRequests.size()); + assertEquals(postedRequests.get(0).getBodyAsString(), postedRequests.get(1).getBodyAsString()); } @Test diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java index db1975ed..7a5f1dee 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkWriterTest.java @@ -1,8 +1,8 @@ package com.getindata.connectors.http.internal.sink; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -10,6 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.metrics.Counter; @@ -20,6 +21,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; @@ -58,7 +60,9 @@ public void setUp() { when(metricGroup.getNumRecordsSendErrorsCounter()).thenReturn(errorCounter); when(metricGroup.getIOMetricGroup()).thenReturn(operatorIOMetricGroup); when(context.metricGroup()).thenReturn(metricGroup); + } + private void createHttpSinkWriter(DeliveryGuarantee deliveryGuarantee) { Collection> stateBuffer = new ArrayList<>(); this.httpSinkWriter = new HttpSinkWriter<>( @@ -70,6 +74,7 @@ public void setUp() { 10, 10, 10, + deliveryGuarantee, "http://localhost/client", httpClient, stateBuffer, @@ -77,22 +82,208 @@ public void setUp() { } @Test - public void testErrorMetric() throws InterruptedException { + public void testErrorMetricWhenAllRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Exception("Test Exception")); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(2); + } + + @Test + public void testErrorMetricWhenFailureRequestsOccur() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new SinkHttpClientResponse.ResponseItem(null, true), + new SinkHttpClientResponse.ResponseItem(null, false)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + HttpSinkRequestEntry request3 = new HttpSinkRequestEntry("PUT", "lorem".getBytes()); + HttpSinkRequestEntry request4 = new HttpSinkRequestEntry("PUT", "ipsum".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2, request3, request4); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + } + + @Test + public void testRetryWhenAllRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new Exception("Test Exception")); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(2); + assertEquals(2, entriesToRetry.size()); + } + + + @Test + public void testRetryWhenAPartOfRequestsFailed() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new SinkHttpClientResponse.ResponseItem(null, false), + new SinkHttpClientResponse.ResponseItem(null, true)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + assertEquals(1, entriesToRetry.size()); + } + + @Test + public void testTemporalRequestsWithNoneGuaranteeDoNotRetry() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new SinkHttpClientResponse.ResponseItem(null, false), + new SinkHttpClientResponse.ResponseItem(null, true)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + assertEquals(0, entriesToRetry.size()); + } + + @Test + public void testAllSuccessfulRequests() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.NONE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new SinkHttpClientResponse.ResponseItem(null, true), + new SinkHttpClientResponse.ResponseItem(null, true)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + final List entriesToRetry = new ArrayList<>(); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = entriesToRetry::addAll; + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + assertEquals(0, entriesToRetry.size()); + } + + @Test + public void testFailureRequestsWithAtLeastOnceGuarantee() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.AT_LEAST_ONCE); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(new SinkHttpClientResponse( + Arrays.asList( + new SinkHttpClientResponse.ResponseItem(null, false), + new SinkHttpClientResponse.ResponseItem(null, true)) + )); + + when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); + + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); + Consumer> requestResult = + httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); + + List requestEntries = Arrays.asList(request1, request2); + this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); + + // would be good to use Countdown Latch instead sleep... + Thread.sleep(2000); + verify(errorCounter).inc(1); + } + + @Test + public void testUnsupportedDeliveryGuaranteeThrowsException() throws InterruptedException { + createHttpSinkWriter(DeliveryGuarantee.EXACTLY_ONCE); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Test Exception")); when(httpClient.putRequests(anyList(), anyString())).thenReturn(future); - HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request1 = new HttpSinkRequestEntry("PUT", "hello".getBytes()); + HttpSinkRequestEntry request2 = new HttpSinkRequestEntry("PUT", "world".getBytes()); Consumer> requestResult = httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries)); - List requestEntries = Collections.singletonList(request); + List requestEntries = Arrays.asList(request1, request2); this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult); // would be good to use Countdown Latch instead sleep... Thread.sleep(2000); - verify(errorCounter).inc(requestEntries.size()); + verify(errorCounter).inc(2); } } diff --git a/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java index 269ee87b..6f6d0c4d 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/sink/HttpDynamicTableSinkFactoryTest.java @@ -91,4 +91,44 @@ public void nonexistentOptionsTest() { assertThrows(ValidationException.class, () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); } + + @Test + public void invalidSinkDeliveryGuaranteeOptionTests() { + final String invalidOptionCreateSql = + String.format( + "CREATE TABLE http (\n" + + " id bigint\n" + + ") with (\n" + + " 'connector' = '%s',\n" + + " 'url' = '%s',\n" + + " 'format' = 'json',\n" + + " 'sink.delivery-guarantee' = 'invalid'\n" + + ")", + HttpDynamicTableSinkFactory.IDENTIFIER, + "http://localhost/" + ); + tEnv.executeSql(invalidOptionCreateSql); + assertThrows(ValidationException.class, + () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); + } + + @Test + public void exactlyOnceDeliveryGuaranteeNotSupported() { + final String exactlyOnceCreateSql = + String.format( + "CREATE TABLE http (\n" + + " id bigint\n" + + ") with (\n" + + " 'connector' = '%s',\n" + + " 'url' = '%s',\n" + + " 'format' = 'json',\n" + + " 'sink.delivery-guarantee' = 'exactly-once'\n" + + ")", + HttpDynamicTableSinkFactory.IDENTIFIER, + "http://localhost/" + ); + tEnv.executeSql(exactlyOnceCreateSql); + assertThrows(ValidationException.class, + () -> tEnv.executeSql("INSERT INTO http VALUES (1)").await()); + } }