File tree Expand file tree Collapse file tree 2 files changed +40
-1
lines changed
driver-core/src/main/com/mongodb/operation
driver-sync/src/test/functional/com/mongodb/client Expand file tree Collapse file tree 2 files changed +40
-1
lines changed Original file line number Diff line number Diff line change 1919import com .mongodb .MongoChangeStreamException ;
2020import com .mongodb .MongoCursorNotFoundException ;
2121import com .mongodb .MongoException ;
22+ import com .mongodb .MongoInterruptedException ;
2223import com .mongodb .MongoNotPrimaryException ;
2324import com .mongodb .MongoSocketException ;
2425
@@ -32,7 +33,7 @@ final class ChangeStreamBatchCursorHelper {
3233 private static final List <String > NONRESUMABLE_CHANGE_STREAM_ERROR_LABELS = asList ("NonResumableChangeStreamError" );
3334
3435 static boolean isRetryableError (final Throwable t ) {
35- if (!(t instanceof MongoException ) || t instanceof MongoChangeStreamException ) {
36+ if (!(t instanceof MongoException ) || t instanceof MongoChangeStreamException || t instanceof MongoInterruptedException ) {
3637 return false ;
3738 } else if (t instanceof MongoNotPrimaryException || t instanceof MongoCursorNotFoundException
3839 || t instanceof MongoSocketException ) {
Original file line number Diff line number Diff line change 1919import com .mongodb .MongoChangeStreamException ;
2020import com .mongodb .MongoCommandException ;
2121import com .mongodb .MongoException ;
22+ import com .mongodb .MongoInterruptedException ;
2223import com .mongodb .MongoQueryException ;
2324import com .mongodb .client .internal .MongoChangeStreamCursorImpl ;
2425import com .mongodb .client .model .Aggregates ;
@@ -59,6 +60,43 @@ public void setUp() {
5960 collection .insertOne (Document .parse ("{ _id : 0 }" ));
6061 }
6162
63+ class ChangeStreamWatcher implements Runnable {
64+ private volatile boolean interruptedExceptionOccurred = false ;
65+ private final MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor ;
66+
67+ ChangeStreamWatcher (final MongoChangeStreamCursor <ChangeStreamDocument <Document >> cursor ) {
68+ this .cursor = cursor ;
69+ }
70+
71+ @ Override
72+ public void run () {
73+ try {
74+ cursor .next ();
75+ } catch (final MongoInterruptedException e ) {
76+ interruptedExceptionOccurred = true ;
77+ } finally {
78+ cursor .close ();
79+ }
80+ }
81+
82+ public boolean getInterruptedExceptionOccurred () {
83+ return interruptedExceptionOccurred ;
84+ }
85+ }
86+
87+ //
88+ // Test that MongoInterruptedException is not retryable so that a thread can be interrupted.
89+ //
90+ @ Test
91+ public void testThreadInterrupted () throws InterruptedException {
92+ final ChangeStreamWatcher watcher = new ChangeStreamWatcher (collection .watch ().cursor ());
93+ final Thread t = new Thread (watcher );
94+ t .start ();
95+ t .interrupt ();
96+ t .join ();
97+ assertTrue (watcher .getInterruptedExceptionOccurred ());
98+ }
99+
62100 //
63101 // Test that the ChangeStream continuously tracks the last seen resumeToken.
64102 //
You can’t perform that action at this time.
0 commit comments