Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]

- Added HTTP Sink request retries with delivery guarantee support (`sink.delivery-guarantee`).
- Amend to not log HTTP request response and header values by default.
- Added http 2 support.

Expand Down
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,14 @@ is provided.

## HTTP status code handler
### Sink table
You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table.
You can configure HTTP status code handling for HTTP sink table and enable automatic retries with delivery guarantees.
By default all 400 and 500 response codes will be interpreted as error code.

#### Retries and delivery guarantee
HTTP Sink supports automatic retries when `sink.delivery-guarantee` is set to `at-least-once`. Failed requests will be automatically retried based on the configured status codes.
- When `sink.delivery-guarantee` is `at-least-once`: Failed requests are retried automatically using AIMD (Additive Increase Multiplicative Decrease) rate limiting strategy.
- When `sink.delivery-guarantee` is `none` (default): Failed requests are logged but not retried.

This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are:
- `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404.
Many status codes can be defined in one value, where each code should be separated with comma, for example:
Expand Down Expand Up @@ -612,6 +617,7 @@ be requested if the current time is later than the cached token expiry time minu
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'. 'exactly-once' semantic is not supported. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
Expand Down Expand Up @@ -736,9 +742,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json
### HTTP TableLookup Source
- Check other `//TODO`'s.

### HTTP Sink
- Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented.

###
[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join
</br>
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/getindata/connectors/http/HttpSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Properties;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
Expand Down Expand Up @@ -41,6 +42,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -54,6 +56,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
deliveryGuarantee,
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/getindata/connectors/http/HttpSinkBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import java.util.Properties;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.Preconditions;

import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
Expand Down Expand Up @@ -71,6 +73,8 @@ public class HttpSinkBuilder<InputT> extends

private final Properties properties = new Properties();

private DeliveryGuarantee deliveryGuarantee;

// Mandatory field
private String endpointUrl;

Expand All @@ -92,6 +96,17 @@ public class HttpSinkBuilder<InputT> extends
this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR;
}

/**
* @param deliveryGuarantee HTTP Sink delivery guarantee
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) {
Preconditions.checkArgument(deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE,
"Only at-least-once and none delivery guarantees are supported.");
this.deliveryGuarantee = deliveryGuarantee;
return this;
}

/**
* @param endpointUrl the URL of the endpoint
* @return {@link HttpSinkBuilder} itself
Expand Down Expand Up @@ -181,6 +196,7 @@ public HttpSink<InputT> build() {
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
Optional.ofNullable(deliveryGuarantee).orElse(DeliveryGuarantee.NONE),
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.getindata.connectors.http.internal;

import java.util.List;
import java.util.stream.Collectors;

import lombok.Data;
import lombok.NonNull;
Expand All @@ -11,21 +12,36 @@

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
* to write, divided into two lists &mdash; successful and failed ones.
* to write.
*/
@Data
@ToString
public class SinkHttpClientResponse {

/**
* A list of successfully written requests.
* A list of requests along with write status.
*/
@NonNull
private final List<HttpRequest> successfulRequests;
private final List<ResponseItem> requests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpRequest> failedRequests;
public List<HttpRequest> getSuccessfulRequests() {
return requests.stream()
.filter(ResponseItem::isSuccessful)
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

public List<HttpRequest> getFailedRequests() {
return requests.stream()
.filter(r -> !r.isSuccessful())
.map(ResponseItem::getRequest)
.collect(Collectors.toList());
}

@Data
@ToString
public static class ResponseItem {
private final HttpRequest request;
private final boolean successful;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.Properties;

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ

private final String endpointUrl;

private final DeliveryGuarantee deliveryGuarantee;

// having Builder instead of an instance of `SinkHttpClient`
// makes it possible to serialize `HttpSink`
private final SinkHttpClientBuilder sinkHttpClientBuilder;
Expand All @@ -79,6 +82,7 @@ protected HttpSinkInternal(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -97,6 +101,7 @@ protected HttpSinkInternal(

Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(endpointUrl),
"The endpoint URL must be set when initializing HTTP Sink.");
this.deliveryGuarantee = deliveryGuarantee;
this.endpointUrl = endpointUrl;
this.httpPostRequestCallback =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -132,6 +137,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
deliveryGuarantee,
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand Down Expand Up @@ -159,6 +165,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
deliveryGuarantee,
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.getindata.connectors.http.internal.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -10,13 +11,15 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

Expand Down Expand Up @@ -45,6 +48,8 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ

private final Counter numRecordsSendErrorsCounter;

private final DeliveryGuarantee deliveryGuarantee;

public HttpSinkWriter(
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
Sink.InitContext context,
Expand All @@ -54,13 +59,15 @@ public HttpSinkWriter(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
DeliveryGuarantee deliveryGuarantee,
String endpointUrl,
SinkHttpClient sinkHttpClient,
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
Properties properties) {

super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
this.deliveryGuarantee = deliveryGuarantee;
this.endpointUrl = endpointUrl;
this.sinkHttpClient = sinkHttpClient;

Expand All @@ -87,35 +94,63 @@ protected void submitRequestEntries(
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
future.whenCompleteAsync((response, err) -> {
if (err != null) {
int failedRequestsNumber = requestEntries.size();
log.error(
"Http Sink fatally failed to write all {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

// TODO: Make `HttpSinkInternal` retry the failed requests.
// Currently, it does not retry those at all, only adds their count
// to the `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
//requestResult.accept(requestEntries);
handleFullyFailedRequest(err, requestEntries, requestResult);
} else if (response.getFailedRequests().size() > 0) {
int failedRequestsNumber = response.getFailedRequests().size();
log.error("Http Sink failed to write and will retry {} requests",
failedRequestsNumber);
numRecordsSendErrorsCounter.inc(failedRequestsNumber);
log.error("Http Sink failed to write and will retry {} requests",
failedRequestsNumber);
handlePartiallyFailedRequest(response, requestEntries, requestResult);
} else {
requestResult.accept(Collections.emptyList());
}
}, sinkWriterThreadPool);
}

// TODO: Make `HttpSinkInternal` retry the failed requests. Currently,
// it does not retry those at all, only adds their count to the
// `numRecordsSendErrors` metric. It is due to the fact we do not have
// a clear image how we want to do it, so it would be both efficient and correct.
private void handleFullyFailedRequest(Throwable err,
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
int failedRequestsNumber = requestEntries.size();
numRecordsSendErrorsCounter.inc(failedRequestsNumber);

if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) {
// Retry all requests.
log.error("Http Sink fatally failed to write and will retry {} requests", failedRequestsNumber, err);
requestResult.accept(requestEntries);
} else if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// Do not retry failed requests.
log.error(
"Http Sink fatally failed to write {} requests but will continue due to {} DeliveryGuarantee",
failedRequestsNumber,
deliveryGuarantee,
err
);
requestResult.accept(Collections.emptyList());
} else {
throw new UnsupportedOperationException(
"Unsupported delivery guarantee: " + deliveryGuarantee);
}
}

//requestResult.accept(response.getFailedRequests());
//} else {
//requestResult.accept(Collections.emptyList());
//}
private void handlePartiallyFailedRequest(SinkHttpClientResponse response,
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE) {
// Assumption: the order of response.requests is the same as requestEntries.
// See com.getindata.connectors.http.internal.sink.httpclient.
// JavaNetSinkHttpClient#putRequests where requests are submitted sequentially and
// then their futures are joined sequentially too.
List<HttpSinkRequestEntry> failedRequestEntries = new ArrayList<>();
for (int i = 0; i < response.getRequests().size(); ++i) {
if (!response.getRequests().get(i).isSuccessful()) {
failedRequestEntries.add(requestEntries.get(i));
}
}
requestResult.accept(failedRequestEntries);
} else if (deliveryGuarantee == DeliveryGuarantee.NONE) {
// Do not retry failed requests.
requestResult.accept(Collections.emptyList());
}, sinkWriterThreadPool);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
private SinkHttpClientResponse prepareSinkHttpClientResponse(
List<JavaNetHttpResponseWrapper> responses,
String endpointUrl) {
var successfulResponses = new ArrayList<HttpRequest>();
var failedResponses = new ArrayList<HttpRequest>();
var responseItems = new ArrayList<SinkHttpClientResponse.ResponseItem>();

for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
Expand All @@ -102,16 +101,12 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);

// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
failedResponses.add(sinkRequestEntry);
} else {
successfulResponses.add(sinkRequestEntry);
}
boolean isFailed = optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode());
responseItems.add(new SinkHttpClientResponse.ResponseItem(sinkRequestEntry, !isFailed));
}

return new SinkHttpClientResponse(successfulResponses, failedResponses);
return new SinkHttpClientResponse(responseItems);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
import com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.DELIVERY_GUARANTEE;
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD;
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.URL;

Expand Down Expand Up @@ -125,6 +126,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {

HttpSinkBuilder<RowData> builder = HttpSink
.<RowData>builder()
.setDeliveryGuarantee(tableOptions.get(DELIVERY_GUARANTEE))
.setEndpointUrl(tableOptions.get(URL))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.setHttpPostRequestCallback(httpPostRequestCallback)
Expand Down
Loading
Loading