File tree Expand file tree Collapse file tree 3 files changed +49
-2
lines changed
src/main/java/com/getindata/connectors/http/internal
table/lookup/querycreators Expand file tree Collapse file tree 3 files changed +49
-2
lines changed Original file line number Diff line number Diff line change 66
77- Added support for OIDC Bearer tokens.
88
9+ ### Fixed
10+
11+ - Ensured SerializationSchema is used in thread-safe way.
12+
913## [ 0.15.0] - 2024-07-30
1014
1115### Added
Original file line number Diff line number Diff line change 1515import com .getindata .connectors .http .LookupQueryCreator ;
1616import com .getindata .connectors .http .LookupQueryCreatorFactory ;
1717import com .getindata .connectors .http .internal .table .lookup .LookupRow ;
18+ import com .getindata .connectors .http .internal .utils .SynchronizedSerializationSchema ;
19+ import static com .getindata .connectors .http .internal .table .lookup .HttpLookupConnectorOptions .ASYNC_POLLING ;
1820import static com .getindata .connectors .http .internal .table .lookup .HttpLookupConnectorOptions .LOOKUP_REQUEST_FORMAT ;
1921
2022/**
@@ -47,8 +49,14 @@ public LookupQueryCreator createLookupQueryCreator(
4749 queryFormatAwareConfiguration
4850 );
4951
50- SerializationSchema <RowData > serializationSchema =
51- encoder .createRuntimeEncoder (null , lookupRow .getLookupPhysicalRowDataType ());
52+ final SerializationSchema <RowData > serializationSchema ;
53+ if (readableConfig .get (ASYNC_POLLING )) {
54+ serializationSchema = new SynchronizedSerializationSchema <>(
55+ encoder .createRuntimeEncoder (null , lookupRow .getLookupPhysicalRowDataType ()));
56+ } else {
57+ serializationSchema =
58+ encoder .createRuntimeEncoder (null , lookupRow .getLookupPhysicalRowDataType ());
59+ }
5260
5361 return new GenericJsonQueryCreator (serializationSchema );
5462 }
Original file line number Diff line number Diff line change 1+ package com .getindata .connectors .http .internal .utils ;
2+
3+ import org .apache .flink .api .common .serialization .SerializationSchema ;
4+
5+ /**
6+ * Decorator which ensures that underlying SerializationSchema is called in thread-safe way.
7+ *
8+ * @param <T> type
9+ */
10+ public class SynchronizedSerializationSchema <T > implements SerializationSchema <T > {
11+
12+ private final SerializationSchema <T > delegate ;
13+
14+ public SynchronizedSerializationSchema (SerializationSchema <T > delegate ) {
15+ this .delegate = delegate ;
16+ }
17+
18+ @ Override
19+ public void open (InitializationContext context ) throws Exception {
20+ doOpen (context );
21+ }
22+
23+ private synchronized void doOpen (InitializationContext context ) throws Exception {
24+ this .delegate .open (context );
25+ }
26+
27+ @ Override
28+ public byte [] serialize (T element ) {
29+ return syncSerialize (element );
30+ }
31+
32+ private synchronized byte [] syncSerialize (T element ) {
33+ return delegate .serialize (element );
34+ }
35+ }
You can’t perform that action at this time.
0 commit comments