Skip to content

Commit 8068a45

Browse files
authored
Improve error logging and handling (#150)
When looping a cursor fails log the error at an a error log level. If errors aren't tolerated then throw an exception instead of continuing to loop. KAFKA-396
1 parent b2e00f4 commit 8068a45

File tree

2 files changed

+14
-7
lines changed

2 files changed

+14
-7
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static java.util.stream.IntStream.range;
3232
import static java.util.stream.IntStream.rangeClosed;
3333
import static org.junit.jupiter.api.Assertions.assertAll;
34+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3435
import static org.junit.jupiter.api.Assertions.assertEquals;
3536
import static org.junit.jupiter.api.Assertions.assertFalse;
3637
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
@@ -53,7 +54,6 @@
5354
import org.apache.kafka.connect.data.Schema;
5455
import org.apache.kafka.connect.data.SchemaBuilder;
5556
import org.apache.kafka.connect.data.Struct;
56-
import org.apache.kafka.connect.errors.ConnectException;
5757
import org.apache.kafka.connect.errors.DataException;
5858
import org.apache.kafka.connect.source.SourceRecord;
5959
import org.apache.kafka.connect.source.SourceTask;
@@ -483,6 +483,7 @@ void testSourceCanUseCustomOffsetPartitionNames() {
483483
{
484484
put(MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
485485
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
486+
put(MongoSourceConfig.OVERRIDE_ERRORS_TOLERANCE_CONFIG, "all");
486487
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "50");
487488
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "5000");
488489
put(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG, "oldPartitionName");
@@ -494,8 +495,7 @@ void testSourceCanUseCustomOffsetPartitionNames() {
494495
.thenReturn(INVALID_OFFSET);
495496
task.initialize(context);
496497

497-
assertThrows(
498-
ConnectException.class,
498+
assertDoesNotThrow(
499499
() -> {
500500
task.start(cfg);
501501
getNextBatch(task);

src/main/java/com/mongodb/kafka/connect/source/StartedMongoSourceTask.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,11 +599,18 @@ private List<BsonDocument> getNextBatch() {
599599
} catch (MongoException e) {
600600
closeCursor();
601601
if (isRunning) {
602-
if (sourceConfig.tolerateErrors() && changeStreamNotValid(e)) {
603-
cursor = tryRecreateCursor(e);
602+
if (sourceConfig.tolerateErrors()) {
603+
if (changeStreamNotValid(e)) {
604+
cursor = tryRecreateCursor(e);
605+
} else {
606+
LOGGER.error(
607+
"An exception occurred when trying to get the next item from the Change Stream", e);
608+
}
604609
} else {
605-
LOGGER.info(
606-
"An exception occurred when trying to get the next item from the Change Stream", e);
610+
throw new ConnectException(
611+
"An exception occurred when trying to get the next item from the Change Stream: "
612+
+ e.getMessage(),
613+
e);
607614
}
608615
}
609616
} catch (Exception e) {

0 commit comments

Comments
 (0)