Skip to content

Commit 0e01bd8

Browse files
committed
Add wildcard support for starts with field name matching
KAFKA-391
1 parent f77e882 commit 0e01bd8

File tree

4 files changed

+55
-11
lines changed

4 files changed

+55
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
## 1.11.1
66

77
### Bug Fixes
8-
- [KAFKA-390](https://jira.mongodb.org/browse/KAFKA-390) Logging incompatible properties no longer NPEs on null values
8+
- [KAFKA-391](https://jira.mongodb.org/browse/KAFKA-391) Add wildcard support for starts with field name matching
9+
- [KAFKA-390](https://jira.mongodb.org/browse/KAFKA-390) Logging incompatible properties no longer NPEs on null values
910

1011
## 1.11.0
1112

src/main/java/com/mongodb/kafka/connect/sink/processor/field/projection/AllowListProjector.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ private void doProjection(final String field, final BsonDocument doc) {
5656
field.isEmpty()
5757
? entry.getKey()
5858
: field + FieldProjector.SUB_FIELD_DOT_SEPARATOR + entry.getKey();
59+
5960
BsonValue value = entry.getValue();
6061

6162
if (!getFields().contains(key) && !checkForWildcardMatch(key)) {
@@ -85,6 +86,10 @@ private void doProjection(final String field, final BsonDocument doc) {
8586
}
8687

8788
private boolean checkForWildcardMatch(final String key) {
89+
if (checkForStartsWithWildcardMatch(key)) {
90+
return true;
91+
}
92+
8893
String[] keyParts = key.split("\\" + FieldProjector.SUB_FIELD_DOT_SEPARATOR);
8994
String[] pattern = new String[keyParts.length];
9095
Arrays.fill(pattern, FieldProjector.SINGLE_WILDCARD);
@@ -105,4 +110,21 @@ private boolean checkForWildcardMatch(final String key) {
105110

106111
return false;
107112
}
113+
114+
/**
115+
* Processes any non nested field names to see if they should be kept
116+
*
117+
* @param key the document key to check against
118+
* @return true if any of the fields match against the document key
119+
*/
120+
private boolean checkForStartsWithWildcardMatch(final String key) {
121+
return getFields().stream()
122+
.filter(
123+
f ->
124+
!f.contains(FieldProjector.SUB_FIELD_DOT_SEPARATOR)
125+
&& !f.equals(FieldProjector.SINGLE_WILDCARD)
126+
&& f.endsWith(FieldProjector.SINGLE_WILDCARD))
127+
.map(f -> f.substring(0, f.length() - 1))
128+
.anyMatch(key::startsWith);
129+
}
108130
}

src/main/java/com/mongodb/kafka/connect/sink/processor/field/projection/BlockListProjector.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,7 @@ protected void projectDocument(final BsonDocument doc) {
4141

4242
private void doProjection(final String field, final BsonDocument doc) {
4343
if (!field.contains(FieldProjector.SUB_FIELD_DOT_SEPARATOR)) {
44-
if (field.equals(FieldProjector.SINGLE_WILDCARD)
45-
|| field.equals(FieldProjector.DOUBLE_WILDCARD)) {
46-
handleWildcard(field, "", doc);
47-
return;
48-
}
49-
50-
doc.remove(field);
44+
processNonNestedMatches(field, doc);
5145
return;
5246
}
5347

@@ -77,6 +71,24 @@ private void doProjection(final String field, final BsonDocument doc) {
7771
}
7872
}
7973

74+
/**
75+
* Processes any non nested field names and removes any matches from the document.
76+
*
77+
* @param field the field name or field name with wildcard
78+
* @param doc the document to block fields from
79+
*/
80+
private void processNonNestedMatches(final String field, final BsonDocument doc) {
81+
if (field.equals(FieldProjector.SINGLE_WILDCARD)
82+
|| field.equals(FieldProjector.DOUBLE_WILDCARD)) {
83+
handleWildcard(field, "", doc);
84+
} else if (field.endsWith(FieldProjector.SINGLE_WILDCARD)) {
85+
String fieldStartsWith = field.substring(0, field.length() - 1);
86+
doc.entrySet().removeIf(entry -> entry.getKey().startsWith(fieldStartsWith));
87+
} else {
88+
doc.remove(field);
89+
}
90+
}
91+
8092
private void handleWildcard(
8193
final String firstPart, final String otherParts, final BsonDocument doc) {
8294
Iterator<Map.Entry<String, BsonValue>> iter = doc.entrySet().iterator();

src/test/java/com/mongodb/kafka/connect/sink/processor/field/projection/FieldProjectorTest.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,17 @@ static void setupFlatDocMaps() {
8989
BsonDocument.parse(
9090
"{myBoolean: true, myInt: 42, "
9191
+ "myBytes: {$binary: 'QUJD', $type: '00'}, myArray: []}");
92+
BsonDocument keyDocument6 = BsonDocument.parse("{_id: 'ABC-123', myInt: 42, myArray: []}");
93+
BsonDocument keyDocument7 =
94+
BsonDocument.parse("{myBoolean: true, myBytes: {$binary: 'QUJD', $type: '00'}}");
9295

9396
flatKeyFieldsMapBlockList =
9497
new HashMap<String, BsonDocument>() {
9598
{
9699
put("", keyDocument1);
97100
put("*", keyDocument2);
98101
put("**", keyDocument2);
102+
put("myB*", keyDocument6);
99103
put("_id", keyDocument5);
100104
put("myBoolean, myInt", keyDocument3);
101105
put("missing1, unknown2", keyDocument1);
@@ -108,6 +112,7 @@ static void setupFlatDocMaps() {
108112
put("", keyDocument2);
109113
put("*", keyDocument1);
110114
put("**", keyDocument1);
115+
put("myB*", keyDocument7);
111116
put("missing1, unknown2", keyDocument2);
112117
put("myBoolean, myBytes, myArray", keyDocument4);
113118
}
@@ -131,13 +136,15 @@ static void setupFlatDocMaps() {
131136
BsonDocument.parse(
132137
"{ myLong: {$numberLong: '42'}, myDouble: 23.23, myString: 'BSON', "
133138
+ "myBytes: {$binary: 'eHl6', $type: '00'}, myArray: []}");
139+
BsonDocument valueDocument6 = BsonDocument.parse("{_id: 'XYZ-789'}");
134140

135141
flatValueFieldsMapBlockList =
136142
new HashMap<String, BsonDocument>() {
137143
{
138144
put("", valueDocument1);
139145
put("*", valueDocument2);
140146
put("**", valueDocument2);
147+
put("my*", valueDocument6);
141148
put("_id", valueDocument5);
142149
put("myLong, myDouble", valueDocument3);
143150
put("missing1,unknown2", valueDocument1);
@@ -150,6 +157,8 @@ static void setupFlatDocMaps() {
150157
put("", valueDocument2);
151158
put("*", valueDocument1);
152159
put("**", valueDocument1);
160+
put("my*", valueDocument5);
161+
put("_id", valueDocument6);
153162
put("missing1,unknown2", valueDocument2);
154163
put("myDouble, myBytes,myArray", valueDocument4);
155164
}
@@ -364,7 +373,7 @@ List<DynamicTest> testProjectorSettingsOnNestedStructure() {
364373
entry.getKey()));
365374
tests.add(
366375
buildDynamicTestFor(
367-
entry, buildSinkDocumentNestedStruct(), new WhitelistKeyProjector(cfg)));
376+
entry, buildSinkDocumentNestedStruct(), new AllowListKeyProjector(cfg)));
368377
}
369378

370379
for (Map.Entry<String, BsonDocument> entry : nestedValueFieldsMapBlockList.entrySet()) {
@@ -378,7 +387,7 @@ List<DynamicTest> testProjectorSettingsOnNestedStructure() {
378387
entry.getKey()));
379388
tests.add(
380389
buildDynamicTestFor(
381-
entry, buildSinkDocumentNestedStruct(), new BlacklistValueProjector(cfg)));
390+
entry, buildSinkDocumentNestedStruct(), new BlockListValueProjector(cfg)));
382391
}
383392

384393
for (Map.Entry<String, BsonDocument> entry : nestedValueFieldsMapAllowList.entrySet()) {
@@ -392,7 +401,7 @@ List<DynamicTest> testProjectorSettingsOnNestedStructure() {
392401
entry.getKey()));
393402
tests.add(
394403
buildDynamicTestFor(
395-
entry, buildSinkDocumentNestedStruct(), new WhitelistValueProjector(cfg)));
404+
entry, buildSinkDocumentNestedStruct(), new AllowListValueProjector(cfg)));
396405
}
397406

398407
return tests;

0 commit comments

Comments
 (0)