Skip to content

Commit 5d02dd4

Browse files
committed
Refactor in preparation for #88 support of response-level offset
1 parent a53dba2 commit 5d02dd4

File tree

14 files changed

+520
-162
lines changed

14 files changed

+520
-162
lines changed
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@
2222

2323
import lombok.experimental.UtilityClass;
2424

25+
import java.util.HashMap;
2526
import java.util.LinkedHashMap;
27+
import java.util.Map;
2628
import java.util.function.Function;
2729
import java.util.stream.Collector;
2830

2931
import static java.util.stream.Collectors.toMap;
3032

3133
@UtilityClass
32-
public class CollectorsUtils {
34+
public class CollectionUtils {
3335

3436
public static <T, K, U> Collector<T, ?, LinkedHashMap<K, U>> toLinkedHashMap(
3537
Function<? super T, ? extends K> keyMapper,
@@ -44,4 +46,10 @@ public class CollectorsUtils {
4446
LinkedHashMap::new
4547
);
4648
}
49+
50+
public static <S, T> Map<S, T> merge(Map<S, T> mapA, Map<S, T> mapB) {
51+
Map<S, T> merged = new HashMap<>(mapA);
52+
mapB.forEach((key, value) -> merged.merge(key, value, (k, v) -> v));
53+
return merged;
54+
}
4755
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/ack/ConfirmationWindow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.Map;
2828
import java.util.Optional;
2929

30-
import static com.github.castorm.kafka.connect.common.CollectorsUtils.toLinkedHashMap;
30+
import static com.github.castorm.kafka.connect.common.CollectionUtils.toLinkedHashMap;
3131
import static java.util.function.Function.identity;
3232

3333
@Slf4j

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
* #L%
2121
*/
2222

23-
import com.fasterxml.jackson.databind.JsonNode;
2423
import com.github.castorm.kafka.connect.http.model.HttpResponse;
2524
import com.github.castorm.kafka.connect.http.model.Offset;
2625
import com.github.castorm.kafka.connect.http.record.model.KvRecord;
26+
import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord;
2727
import com.github.castorm.kafka.connect.http.response.spi.KvRecordHttpResponseParser;
2828
import com.github.castorm.kafka.connect.http.response.timestamp.spi.TimestampParser;
2929
import lombok.RequiredArgsConstructor;
@@ -43,7 +43,7 @@ public class JacksonKvRecordHttpResponseParser implements KvRecordHttpResponsePa
4343

4444
private final Function<Map<String, ?>, JacksonKvRecordHttpResponseParserConfig> configFactory;
4545

46-
private JacksonRecordParser recordParser;
46+
private JacksonResponseRecordParser responseParser;
4747

4848
private TimestampParser timestampParser;
4949

@@ -54,27 +54,27 @@ public JacksonKvRecordHttpResponseParser() {
5454
@Override
5555
public void configure(Map<String, ?> configs) {
5656
JacksonKvRecordHttpResponseParserConfig config = configFactory.apply(configs);
57-
recordParser = config.getRecordParser();
57+
responseParser = config.getResponseParser();
5858
timestampParser = config.getTimestampParser();
5959
}
6060

6161
@Override
6262
public List<KvRecord> parse(HttpResponse response) {
63-
return recordParser.getRecords(response.getBody())
63+
return responseParser.getRecords(response.getBody())
6464
.map(this::map)
6565
.collect(toList());
6666
}
6767

68-
private KvRecord map(JsonNode node) {
68+
private KvRecord map(JacksonRecord record) {
6969

70-
Map<String, Object> offsets = recordParser.getOffsets(node);
70+
Map<String, Object> offsets = record.getOffset();
7171

72-
String key = recordParser.getKey(node)
72+
String key = ofNullable(record.getKey())
7373
.map(Optional::of)
7474
.orElseGet(() -> ofNullable(offsets.get("key")).map(String.class::cast))
75-
.orElseGet(() -> generateConsistentKey(node));
75+
.orElseGet(() -> generateConsistentKey(record.getBody()));
7676

77-
Optional<Instant> timestamp = recordParser.getTimestamp(node)
77+
Optional<Instant> timestamp = ofNullable(record.getTimestamp())
7878
.map(Optional::of)
7979
.orElseGet(() -> ofNullable(offsets.get("timestamp")).map(String.class::cast))
8080
.map(timestampParser::parse);
@@ -85,12 +85,12 @@ private KvRecord map(JsonNode node) {
8585

8686
return KvRecord.builder()
8787
.key(key)
88-
.value(recordParser.getValue(node))
88+
.value(record.getBody())
8989
.offset(offset)
9090
.build();
9191
}
9292

93-
private String generateConsistentKey(JsonNode node) {
94-
return nameUUIDFromBytes(node.toString().getBytes()).toString();
93+
private static String generateConsistentKey(String body) {
94+
return nameUUIDFromBytes(body.getBytes()).toString();
9595
}
9696
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParserConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,16 @@ public class JacksonKvRecordHttpResponseParserConfig extends AbstractConfig {
3636

3737
private static final String RECORD_TIMESTAMP_PARSER_CLASS = "http.response.record.timestamp.parser";
3838

39-
private final JacksonRecordParser recordParser;
39+
private final JacksonResponseRecordParser responseParser;
4040
private final TimestampParser timestampParser;
4141

4242
JacksonKvRecordHttpResponseParserConfig(Map<String, ?> originals) {
4343
super(config(), originals);
44-
recordParser = new JacksonRecordParser();
44+
JacksonSerializer serializer = new JacksonSerializer();
45+
JacksonRecordParser recordParser = new JacksonRecordParser(serializer);
4546
recordParser.configure(originals);
47+
responseParser = new JacksonResponseRecordParser(recordParser, serializer);
48+
responseParser.configure(originals);
4649
timestampParser = getConfiguredInstance(RECORD_TIMESTAMP_PARSER_CLASS, TimestampParser.class);
4750
}
4851

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,13 @@
2424
import com.fasterxml.jackson.databind.JsonNode;
2525
import com.fasterxml.jackson.databind.ObjectMapper;
2626
import lombok.RequiredArgsConstructor;
27-
import lombok.SneakyThrows;
2827
import org.apache.kafka.common.Configurable;
2928

30-
import java.io.IOException;
3129
import java.util.List;
3230
import java.util.Map;
3331
import java.util.Map.Entry;
3432
import java.util.Optional;
3533
import java.util.function.Function;
36-
import java.util.stream.Stream;
3734

3835
import static java.util.stream.Collectors.joining;
3936
import static java.util.stream.Collectors.toMap;
@@ -43,73 +40,59 @@ public class JacksonRecordParser implements Configurable {
4340

4441
private final Function<Map<String, ?>, JacksonRecordParserConfig> configFactory;
4542

46-
private final ObjectMapper objectMapper;
43+
private final JacksonSerializer serializer;
4744

48-
private final JacksonPropertyResolver propertyResolver;
49-
50-
private JsonPointer recordsPointer;
5145
private List<JsonPointer> keyPointer;
5246
private Optional<JsonPointer> timestampPointer;
5347
private Map<String, JsonPointer> offsetPointers;
5448
private JsonPointer valuePointer;
5549

5650
public JacksonRecordParser() {
57-
this(JacksonRecordParserConfig::new, new ObjectMapper(), new JacksonPropertyResolver());
51+
this(new JacksonSerializer(new ObjectMapper()));
52+
}
53+
54+
public JacksonRecordParser(JacksonSerializer serializer) {
55+
this(JacksonRecordParserConfig::new, serializer);
5856
}
5957

6058
@Override
6159
public void configure(Map<String, ?> settings) {
6260
JacksonRecordParserConfig config = configFactory.apply(settings);
63-
recordsPointer = config.getRecordsPointer();
6461
keyPointer = config.getKeyPointer();
6562
valuePointer = config.getValuePointer();
6663
offsetPointers = config.getOffsetPointers();
6764
timestampPointer = config.getTimestampPointer();
6865
}
6966

70-
Stream<JsonNode> getRecords(byte[] body) {
71-
return propertyResolver.getArrayAt(deserialize(body), recordsPointer);
72-
}
73-
74-
@Deprecated
75-
/*
76-
Replaced by Offset
67+
/**
68+
* @deprecated Replaced by Offset
7769
*/
70+
@Deprecated
7871
Optional<String> getKey(JsonNode node) {
7972
String key = keyPointer.stream()
80-
.map(pointer -> propertyResolver.getObjectAt(node, pointer).asText())
73+
.map(pointer -> serializer.getObjectAt(node, pointer).asText())
8174
.filter(it -> !it.isEmpty())
8275
.collect(joining("+"));
8376
return key.isEmpty() ? Optional.empty() : Optional.of(key);
8477
}
8578

86-
@Deprecated
87-
/*
88-
Replaced by Offset
79+
/**
80+
* @deprecated Replaced by Offset
8981
*/
82+
@Deprecated
9083
Optional<String> getTimestamp(JsonNode node) {
91-
return timestampPointer.map(pointer -> propertyResolver.getObjectAt(node, pointer).asText());
84+
return timestampPointer.map(pointer -> serializer.getObjectAt(node, pointer).asText());
9285
}
9386

94-
Map<String, Object> getOffsets(JsonNode node) {
87+
Map<String, Object> getOffset(JsonNode node) {
9588
return offsetPointers.entrySet().stream()
96-
.collect(toMap(Entry::getKey, entry -> propertyResolver.getObjectAt(node, entry.getValue()).asText()));
89+
.collect(toMap(Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText()));
9790
}
9891

9992
String getValue(JsonNode node) {
10093

101-
JsonNode value = propertyResolver.getObjectAt(node, valuePointer);
102-
103-
return value.isObject() ? serialize(value) : value.asText();
104-
}
105-
106-
@SneakyThrows(IOException.class)
107-
private JsonNode deserialize(byte[] body) {
108-
return objectMapper.readTree(body);
109-
}
94+
JsonNode value = serializer.getObjectAt(node, valuePointer);
11095

111-
@SneakyThrows(IOException.class)
112-
private String serialize(JsonNode node) {
113-
return objectMapper.writeValueAsString(node);
96+
return value.isObject() ? serializer.serialize(value) : value.asText();
11497
}
11598
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.github.castorm.kafka.connect.http.response.jackson;
2+
3+
/*-
4+
* #%L
5+
* kafka-connect-http
6+
* %%
7+
* Copyright (C) 2020 CastorM
8+
* %%
9+
* Licensed under the Apache License, Version 2.0 (the "License");
10+
* you may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import com.fasterxml.jackson.core.JsonPointer;
24+
import com.fasterxml.jackson.databind.JsonNode;
25+
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import com.github.castorm.kafka.connect.http.response.jackson.model.JacksonRecord;
27+
import lombok.RequiredArgsConstructor;
28+
import org.apache.kafka.common.Configurable;
29+
30+
import java.util.Map;
31+
import java.util.function.Function;
32+
import java.util.stream.Stream;
33+
34+
import static com.github.castorm.kafka.connect.common.CollectionUtils.merge;
35+
import static java.util.Collections.emptyMap;
36+
37+
@RequiredArgsConstructor
38+
public class JacksonResponseRecordParser implements Configurable {
39+
40+
private final Function<Map<String, ?>, JacksonRecordParserConfig> configFactory;
41+
42+
private final JacksonRecordParser recordParser;
43+
44+
private final JacksonSerializer serializer;
45+
46+
private JsonPointer recordsPointer;
47+
48+
public JacksonResponseRecordParser() {
49+
this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper()));
50+
}
51+
52+
public JacksonResponseRecordParser(JacksonRecordParser recordParser, JacksonSerializer serializer) {
53+
this(JacksonRecordParserConfig::new, recordParser, serializer);
54+
}
55+
56+
@Override
57+
public void configure(Map<String, ?> settings) {
58+
JacksonRecordParserConfig config = configFactory.apply(settings);
59+
recordsPointer = config.getRecordsPointer();
60+
}
61+
62+
Stream<JacksonRecord> getRecords(byte[] body) {
63+
64+
JsonNode jsonBody = serializer.deserialize(body);
65+
66+
Map<String, Object> responseOffset = getResponseOffset(jsonBody);
67+
68+
return serializer.getArrayAt(jsonBody, recordsPointer)
69+
.map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset));
70+
}
71+
72+
private Map<String, Object> getResponseOffset(JsonNode node) {
73+
return emptyMap();
74+
}
75+
76+
private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map<String, Object> responseOffset) {
77+
return JacksonRecord.builder()
78+
.key(recordParser.getKey(jsonRecord).orElse(null))
79+
.timestamp(recordParser.getTimestamp(jsonRecord).orElse(null))
80+
.offset(merge(responseOffset, recordParser.getOffset(jsonRecord)))
81+
.body(recordParser.getValue(jsonRecord))
82+
.build();
83+
}
84+
}
Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
* Licensed under the Apache License, Version 2.0 (the "License");
1010
* you may not use this file except in compliance with the License.
1111
* You may obtain a copy of the License at
12-
*
12+
*
1313
* http://www.apache.org/licenses/LICENSE-2.0
14-
*
14+
*
1515
* Unless required by applicable law or agreed to in writing, software
1616
* distributed under the License is distributed on an "AS IS" BASIS,
1717
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,16 +22,37 @@
2222

2323
import com.fasterxml.jackson.core.JsonPointer;
2424
import com.fasterxml.jackson.databind.JsonNode;
25+
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import lombok.RequiredArgsConstructor;
27+
import lombok.SneakyThrows;
2528

29+
import java.io.IOException;
2630
import java.util.stream.Stream;
2731

2832
import static com.fasterxml.jackson.core.JsonPointer.compile;
2933
import static java.util.stream.StreamSupport.stream;
3034

31-
class JacksonPropertyResolver {
35+
@RequiredArgsConstructor
36+
class JacksonSerializer {
3237

3338
private static final JsonPointer JSON_ROOT = compile("/");
3439

40+
private final ObjectMapper objectMapper;
41+
42+
public JacksonSerializer() {
43+
this(new ObjectMapper());
44+
}
45+
46+
@SneakyThrows(IOException.class)
47+
JsonNode deserialize(byte[] body) {
48+
return objectMapper.readTree(body);
49+
}
50+
51+
@SneakyThrows(IOException.class)
52+
String serialize(JsonNode node) {
53+
return objectMapper.writeValueAsString(node);
54+
}
55+
3556
JsonNode getObjectAt(JsonNode node, JsonPointer pointer) {
3657
return getRequiredAt(node, pointer);
3758
}

0 commit comments

Comments
 (0)