Skip to content

Commit f77e882

Browse files
rozzastIncMale
andauthored
Logging incompatible properties no longer NPEs on null values (#147)
KAFKA-390 Co-authored-by: Valentin Kovalenko <valentin.kovalenko@mongodb.com>
1 parent 62fedf0 commit f77e882

File tree

3 files changed

+31
-16
lines changed

3 files changed

+31
-16
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## Changelog
44

5+
## 1.11.1
6+
7+
### Bug Fixes
8+
- [KAFKA-390](https://jira.mongodb.org/browse/KAFKA-390) Logging incompatible properties no longer NPEs on null values
9+
510
## 1.11.0
611

712
### Improvements

src/main/java/com/mongodb/kafka/connect/util/config/ConfigSoftValidator.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,19 @@ public static void logIncompatibleProperties(
7474
final Consumer<String> logger) {
7575
// global props are considered as belonging to a topic named "" for simplicity
7676
String global = "";
77-
Map<String, Map<String, String>> topicNameToItsStrippedProps =
77+
Map<String, Map<String, Optional<String>>> topicNameToItsStrippedProps =
7878
props.entrySet().stream()
7979
.collect(
8080
Collectors.groupingBy(
8181
propNameAndValue -> topicNameFromPropertyName(propNameAndValue.getKey()),
8282
Collectors.toMap(
8383
propNameAndValue -> strippedPropertyName(propNameAndValue.getKey()),
84-
Entry::getValue)));
85-
Map<String, String> globalProps =
84+
e -> Optional.ofNullable(e.getValue()))));
85+
86+
Map<String, Optional<String>> globalProps =
8687
topicNameToItsStrippedProps.getOrDefault(global, Collections.emptyMap());
8788
topicNameToItsStrippedProps.remove(global);
88-
Map<String, Map<String, Entry<String, Boolean>>> topicNameToCombinedStrippedProps =
89+
Map<String, Map<String, Entry<Optional<String>, Boolean>>> topicNameToCombinedStrippedProps =
8990
topicNameToItsStrippedProps.entrySet().stream()
9091
.collect(
9192
Collectors.toMap(
@@ -95,7 +96,8 @@ public static void logIncompatibleProperties(
9596
globalProps,
9697
topicNameAndItsStrippedProps.getKey(),
9798
topicNameAndItsStrippedProps.getValue())));
98-
Map<String, Entry<String, Boolean>> globalPropsWithFalseFlags =
99+
100+
Map<String, Entry<Optional<String>, Boolean>> globalPropsWithFalseFlags =
99101
combineProperties(globalProps, null, null);
100102
incompatibleConfigs.forEach(
101103
incompatiblePair -> {
@@ -117,12 +119,12 @@ public static void logIncompatibleProperties(
117119
* property value and a flag telling whether the property value came from {@code
118120
* topicStrippedProps}, i.e., was overridden, or from {@code globalProps}.
119121
*/
120-
private static Map<String, Entry<String, Boolean>> combineProperties(
121-
final Map<String, String> globalProps,
122+
private static Map<String, Entry<Optional<String>, Boolean>> combineProperties(
123+
final Map<String, Optional<String>> globalProps,
122124
@Nullable final String topicName,
123-
@Nullable final Map<String, String> topicStrippedProps) {
125+
@Nullable final Map<String, Optional<String>> topicStrippedProps) {
124126
assertTrue((topicName == null) ^ (topicStrippedProps != null));
125-
Map<String, Entry<String, Boolean>> combinedStrippedProps = new HashMap<>();
127+
Map<String, Entry<Optional<String>, Boolean>> combinedStrippedProps = new HashMap<>();
126128
globalProps.forEach(
127129
(propertyName, propertyValue) ->
128130
combinedStrippedProps.put(
@@ -273,16 +275,22 @@ public static IncompatiblePropertiesPair latterIgnored(
273275
*/
274276
private void logIfPresent(
275277
@Nullable final String topicName,
276-
final Map<String, Entry<String, Boolean>> combinedStrippedProps,
278+
final Map<String, Entry<Optional<String>, Boolean>> combinedStrippedProps,
277279
final Consumer<String> logger) {
278-
Entry<String, Boolean> property1ValueAndOverridden = combinedStrippedProps.get(propertyName1);
280+
Entry<Optional<String>, Boolean> property1ValueAndOverridden =
281+
combinedStrippedProps.get(propertyName1);
279282
if (property1ValueAndOverridden == null
280-
|| property1ValueAndOverridden.getKey().equals(defaultPropertyValue1)) {
283+
|| property1ValueAndOverridden
284+
.getKey()
285+
.equals(Optional.ofNullable(defaultPropertyValue1))) {
281286
return;
282287
}
283-
Entry<String, Boolean> property2ValueAndOverridden = combinedStrippedProps.get(propertyName2);
288+
Entry<Optional<String>, Boolean> property2ValueAndOverridden =
289+
combinedStrippedProps.get(propertyName2);
284290
if (property2ValueAndOverridden == null
285-
|| property2ValueAndOverridden.getKey().equals(defaultPropertyValue2)) {
291+
|| property2ValueAndOverridden
292+
.getKey()
293+
.equals(Optional.ofNullable(defaultPropertyValue2))) {
286294
return;
287295
}
288296
logger.accept(

src/test/java/com/mongodb/kafka/connect/util/config/ConfigSoftValidatorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,12 @@ void logIncompatibleProperties() {
8787
.collect(Collectors.toSet());
8888
Map<String, String> props = new HashMap<>();
8989
props.put("a", "valueA");
90-
props.put("b", "valueB");
90+
props.put("b", null);
9191
props.put("c", "");
9292
props.put(topicOverridePropertyName("t", "c"), "valueC");
93-
props.put("d", "valueD");
93+
props.put("d", null);
9494
props.put("e", "valueE");
95+
props.put(topicOverridePropertyName("t", "e"), null);
9596
props.put("f", "valueF");
9697
props.put("g", "valueG");
9798
Set<String> expectedMessages =
@@ -101,6 +102,7 @@ void logIncompatibleProperties() {
101102
latterIgnoredMsg("c", "b"),
102103
latterIgnoredMsg(topicOverridePropertyName("t", "c"), "b"),
103104
latterIgnoredMsg("d", "e"),
105+
latterIgnoredMsg("d", topicOverridePropertyName("t", "e")),
104106
latterIgnoredMsg("d", "f"))
105107
.collect(toSet());
106108
Set<String> actualMessages = new HashSet<>();

0 commit comments

Comments
 (0)