Skip to content

Commit b2e00f4

Browse files
tlsdnwn55rozza
andcommitted
Fix custom Delete write model strategy support
KAFKA-395 Co-authored-by: Ross Lawley <ross@mongodb.com>
1 parent aac52ac commit b2e00f4

File tree

4 files changed

+94
-16
lines changed

4 files changed

+94
-16
lines changed

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -525,22 +525,22 @@ public Optional<WriteModelStrategy> getDeleteWriteModelStrategy() {
525525
return Optional.empty();
526526
}
527527
if (deleteOneWriteModelStrategy == null) {
528-
if (!getString(DELETE_WRITEMODEL_STRATEGY_CONFIG).isEmpty()) {
529-
deleteOneWriteModelStrategy =
530-
getWriteModelStrategyFromConfig(DELETE_WRITEMODEL_STRATEGY_CONFIG);
531-
} else {
532-
/*
533-
NOTE: DeleteOneModel requires the key document which means that the only reasonable ID generation strategies are those
534-
which refer to/operate on the key document. Thus currently this means the IdStrategy must be either:
535-
536-
FullKeyStrategy
537-
PartialKeyStrategy
538-
ProvidedInKeyStrategy
539-
*/
528+
if (DELETE_WRITEMODEL_STRATEGY_DEFAULT.equals(getString(DELETE_WRITEMODEL_STRATEGY_CONFIG))) {
529+
540530
IdStrategy idStrategy = getIdStrategy();
541-
if (!(idStrategy instanceof FullKeyStrategy)
542-
&& !(idStrategy instanceof PartialKeyStrategy)
543-
&& !(idStrategy instanceof ProvidedInKeyStrategy)) {
531+
if (idStrategy instanceof FullKeyStrategy
532+
|| idStrategy instanceof PartialKeyStrategy
533+
|| idStrategy instanceof ProvidedInKeyStrategy) {
534+
deleteOneWriteModelStrategy = new DeleteOneDefaultStrategy(idStrategy);
535+
} else {
536+
/*
537+
NOTE: DeleteOneModel requires the key document which means that the only reasonable ID generation strategies are those
538+
which refer to/operate on the key document. Thus currently this means the IdStrategy must be either:
539+
540+
FullKeyStrategy
541+
PartialKeyStrategy
542+
ProvidedInKeyStrategy
543+
*/
544544
throw new ConnectConfigException(
545545
DELETE_ON_NULL_VALUES_CONFIG,
546546
getBoolean(DELETE_ON_NULL_VALUES_CONFIG),
@@ -551,7 +551,9 @@ public Optional<WriteModelStrategy> getDeleteWriteModelStrategy() {
551551
PartialKeyStrategy.class.getSimpleName(),
552552
ProvidedInKeyStrategy.class.getSimpleName()));
553553
}
554-
deleteOneWriteModelStrategy = new DeleteOneDefaultStrategy(idStrategy);
554+
} else {
555+
deleteOneWriteModelStrategy =
556+
getWriteModelStrategyFromConfig(DELETE_WRITEMODEL_STRATEGY_CONFIG);
555557
}
556558
}
557559

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/DeleteOneDefaultStrategy.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
3333
import com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy;
34+
import com.mongodb.kafka.connect.util.VisibleForTesting;
3435

3536
public class DeleteOneDefaultStrategy implements WriteModelStrategy {
3637
private IdStrategy idStrategy;
@@ -43,6 +44,11 @@ public DeleteOneDefaultStrategy(final IdStrategy idStrategy) {
4344
this.idStrategy = idStrategy;
4445
}
4546

47+
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
48+
public IdStrategy getIdStrategy() {
49+
return this.idStrategy;
50+
}
51+
4652
@Override
4753
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
4854

src/test/java/com/mongodb/kafka/connect/sink/MongoSinkConfigTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.HashSet;
7373
import java.util.List;
7474
import java.util.Map;
75+
import java.util.Optional;
7576
import java.util.Set;
7677
import java.util.regex.Pattern;
7778
import java.util.stream.Collectors;
@@ -103,6 +104,7 @@
103104
import com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy;
104105
import com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy;
105106
import com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy;
107+
import com.mongodb.kafka.connect.sink.writemodel.strategy.CustomDeleteWriteModelStrategy;
106108
import com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy;
107109
import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy;
108110
import com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy;
@@ -833,6 +835,44 @@ Collection<DynamicTest> testGetSingleValidDeleteWriteModelStrategy() {
833835
return tests;
834836
}
835837

838+
@Test
839+
@DisplayName("test Default DELETE_WRITEMODEL_STRATEGY_CONFIG")
840+
void testDefaultDeleteWriteModelStrategyConfig() {
841+
Map<String, String> map = createConfigMap();
842+
843+
map.put(DELETE_ON_NULL_VALUES_CONFIG, "true");
844+
map.put(DOCUMENT_ID_STRATEGY_CONFIG, FullKeyStrategy.class.getName());
845+
MongoSinkConfig cfg = new MongoSinkConfig(map);
846+
847+
Optional<WriteModelStrategy> optionalDeleteWriteModelStrategy =
848+
cfg.getMongoSinkTopicConfig(TEST_TOPIC).getDeleteWriteModelStrategy();
849+
assertTrue(optionalDeleteWriteModelStrategy.isPresent());
850+
851+
WriteModelStrategy writeModelStrategy = optionalDeleteWriteModelStrategy.get();
852+
assertTrue(writeModelStrategy instanceof DeleteOneDefaultStrategy);
853+
854+
assertTrue(
855+
((DeleteOneDefaultStrategy) writeModelStrategy).getIdStrategy() instanceof FullKeyStrategy,
856+
"IdStrategy is FullKeyStrategy");
857+
}
858+
859+
@Test
860+
@DisplayName("test Custom DELETE_WRITEMODEL_STRATEGY_CONFIG")
861+
void testCustomDeleteWriteModelStrategyConfig() {
862+
Map<String, String> map = createConfigMap();
863+
864+
map.put(DELETE_ON_NULL_VALUES_CONFIG, "true");
865+
map.put(DELETE_WRITEMODEL_STRATEGY_CONFIG, CustomDeleteWriteModelStrategy.class.getName());
866+
MongoSinkConfig cfg = new MongoSinkConfig(map);
867+
868+
Optional<WriteModelStrategy> optionalDeleteWriteModelStrategy =
869+
cfg.getMongoSinkTopicConfig(TEST_TOPIC).getDeleteWriteModelStrategy();
870+
assertTrue(optionalDeleteWriteModelStrategy.isPresent());
871+
872+
WriteModelStrategy writeModelStrategy = optionalDeleteWriteModelStrategy.get();
873+
assertTrue(writeModelStrategy instanceof CustomDeleteWriteModelStrategy);
874+
}
875+
836876
@Test
837877
@DisplayName("test timeseries validation")
838878
void testTimeseries() {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.kafka.connect.sink.writemodel.strategy;
18+
19+
import org.bson.BsonDocument;
20+
21+
import com.mongodb.client.model.WriteModel;
22+
23+
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
24+
25+
public final class CustomDeleteWriteModelStrategy implements WriteModelStrategy {
26+
@Override
27+
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
28+
return null;
29+
}
30+
}

0 commit comments

Comments
 (0)