@@ -133,10 +133,7 @@ void put() {
133133 client .capturedBulkWrites ().get (DEFAULT_NAMESPACE ), errorReporter .reported ());
134134 }
135135
136- @ Test
137- void putTolerateAllPostProcessingError () {
138- properties .put (MongoSinkTopicConfig .ERRORS_TOLERANCE_CONFIG , ErrorTolerance .ALL .value ());
139- MongoSinkConfig config = new MongoSinkConfig (properties );
136+ void testToleratePostProcessingError (final MongoSinkConfig config ) {
140137 client .configureCapturing (DEFAULT_NAMESPACE );
141138 task = new StartedMongoSinkTask (config , client .mongoClient (), errorReporter );
142139 RecordsAndExpectations recordsAndExpectations =
@@ -153,6 +150,21 @@ void putTolerateAllPostProcessingError() {
153150 client .capturedBulkWrites ().get (DEFAULT_NAMESPACE ), errorReporter .reported ());
154151 }
155152
153+ @ Test
154+ void putTolerateAllPostProcessingError () {
155+ properties .put (MongoSinkTopicConfig .ERRORS_TOLERANCE_CONFIG , ErrorTolerance .ALL .value ());
156+ MongoSinkConfig config = new MongoSinkConfig (properties );
157+ testToleratePostProcessingError (config );
158+ }
159+
160+ @ Test
161+ void putTolerateDataPostProcessingError () {
162+ properties .put (
163+ MongoSinkTopicConfig .OVERRIDE_ERRORS_TOLERANCE_CONFIG , ErrorTolerance .DATA .value ());
164+ MongoSinkConfig config = new MongoSinkConfig (properties );
165+ testToleratePostProcessingError (config );
166+ }
167+
156168 /**
157169 * {@link StartedMongoSinkTask#put(Collection)} must report not only {@linkplain
158170 * #putTolerateAllPostProcessingError() post-processing exceptions} and {@link
@@ -229,6 +241,34 @@ void putTolerateNoneWriteError() {
229241 client .capturedBulkWrites ().get (DEFAULT_NAMESPACE ), errorReporter .reported ());
230242 }
231243
244+ @ Test
245+ void putTolerateDataWriteError () {
246+ properties .put (
247+ MongoSinkTopicConfig .OVERRIDE_ERRORS_TOLERANCE_CONFIG , ErrorTolerance .DATA .value ());
248+ MongoSinkConfig config = new MongoSinkConfig (properties );
249+ client .configureCapturing (
250+ DEFAULT_NAMESPACE ,
251+ collection ->
252+ when (collection .bulkWrite (anyList (), any (BulkWriteOptions .class )))
253+ // batch1
254+ .thenThrow (new MongoCommandException (new BsonDocument (), new ServerAddress ()))
255+ // batch2
256+ .thenReturn (BulkWriteResult .unacknowledged ()));
257+ task = new StartedMongoSinkTask (config , client .mongoClient (), errorReporter );
258+ RecordsAndExpectations recordsAndExpectations =
259+ new RecordsAndExpectations (
260+ asList (
261+ // batch1
262+ Records .simpleValid (TEST_TOPIC , 0 ),
263+ // batch2
264+ Records .simpleValid (TEST_TOPIC2 , 1 )),
265+ singletonList (0 ),
266+ emptyList ());
267+ assertThrows (DataException .class , () -> task .put (recordsAndExpectations .records ()));
268+ recordsAndExpectations .assertExpectations (
269+ client .capturedBulkWrites ().get (DEFAULT_NAMESPACE ), errorReporter .reported ());
270+ }
271+
232272 @ Test
233273 void putTolerateAllOrderedWriteError () {
234274 properties .put (MongoSinkTopicConfig .ERRORS_TOLERANCE_CONFIG , ErrorTolerance .ALL .value ());
@@ -281,13 +321,7 @@ void putTolerateAllOrderedWriteError() {
281321 client .capturedBulkWrites ().get (DEFAULT_NAMESPACE ), errorReporter .reported ());
282322 }
283323
284- @ Test
285- void putTolerateAllUnorderedWriteError () {
286- properties .put (MongoSinkTopicConfig .ERRORS_TOLERANCE_CONFIG , ErrorTolerance .ALL .value ());
287- boolean bulkWriteOrdered = false ;
288- properties .put (
289- MongoSinkTopicConfig .BULK_WRITE_ORDERED_CONFIG , String .valueOf (bulkWriteOrdered ));
290- MongoSinkConfig config = new MongoSinkConfig (properties );
324+ void testTolerateOrderedWriteError (final MongoSinkConfig config , final boolean bulkWriteOrdered ) {
291325 client .configureCapturing (
292326 DEFAULT_NAMESPACE ,
293327 collection ->
@@ -336,6 +370,27 @@ void putTolerateAllUnorderedWriteError() {
336370 client .capturedBulkWrites ().get (DEFAULT_NAMESPACE ), errorReporter .reported ());
337371 }
338372
373+ @ Test
374+ void putTolerateAllUnorderedWriteError () {
375+ properties .put (MongoSinkTopicConfig .ERRORS_TOLERANCE_CONFIG , ErrorTolerance .ALL .value ());
376+ boolean bulkWriteOrdered = false ;
377+ properties .put (
378+ MongoSinkTopicConfig .BULK_WRITE_ORDERED_CONFIG , String .valueOf (bulkWriteOrdered ));
379+ MongoSinkConfig config = new MongoSinkConfig (properties );
380+ testTolerateOrderedWriteError (config , bulkWriteOrdered );
381+ }
382+
383+ @ Test
384+ void putTolerateDataUnorderedWriteError () {
385+ properties .put (
386+ MongoSinkTopicConfig .OVERRIDE_ERRORS_TOLERANCE_CONFIG , ErrorTolerance .DATA .value ());
387+ boolean bulkWriteOrdered = false ;
388+ properties .put (
389+ MongoSinkTopicConfig .BULK_WRITE_ORDERED_CONFIG , String .valueOf (bulkWriteOrdered ));
390+ MongoSinkConfig config = new MongoSinkConfig (properties );
391+ testTolerateOrderedWriteError (config , bulkWriteOrdered );
392+ }
393+
339394 @ SuppressWarnings ("unchecked" )
340395 private static <T > T cast (final Object o ) {
341396 return (T ) o ;
0 commit comments