1616
1717package com.mongodb.operation
1818
19-
2019import com.mongodb.MongoException
2120import com.mongodb.MongoNamespace
2221import com.mongodb.OperationFunctionalSpecification
@@ -155,7 +154,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
155154 def expected = insertDocuments(helper, [1 , 2 ])
156155
157156 then :
158- def next = nextAndClean(cursor, async)
157+ def next = nextAndClean(cursor, async, expected . size() )
159158 next == expected
160159
161160 when :
@@ -164,7 +163,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
164163
165164 then :
166165 cursor. getBatchSize() == 5
167- nextAndClean(cursor, async) == expected
166+ nextAndClean(cursor, async, expected . size() ) == expected
168167
169168 then :
170169 if (async) {
@@ -193,7 +192,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
193192 when :
194193 def cursor = execute(operation, false )
195194 helper. insertDocuments(BsonDocument . parse(' { _id : 2, x : 2 }' ))
196- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
195+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
197196
198197 then :
199198 next. getResumeToken() != null
@@ -220,7 +219,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
220219 when :
221220 def cursor = execute(operation, false )
222221 helper. updateOne(BsonDocument . parse(' { _id : 2}' ), BsonDocument . parse(' { $set : {x : 3}, $unset : {y : 1}}' ))
223- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
222+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
224223
225224 then :
226225 next. getResumeToken() != null
@@ -247,7 +246,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
247246 when :
248247 def cursor = execute(operation, false )
249248 helper. replaceOne(BsonDocument . parse(' { _id : 2}' ), BsonDocument . parse(' { _id : 2, x : 3}' ), false )
250- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
249+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
251250
252251 then :
253252 next. getResumeToken() != null
@@ -274,7 +273,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
274273 when :
275274 def cursor = execute(operation, false )
276275 helper. deleteOne(BsonDocument . parse(' { _id : 2}' ))
277- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
276+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
278277
279278 then :
280279 next. getResumeToken() != null
@@ -301,7 +300,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
301300 when :
302301 def cursor = execute(operation, false )
303302 helper. drop()
304- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
303+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
305304
306305 then :
307306 next. getResumeToken() != null
@@ -329,7 +328,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
329328 when :
330329 def cursor = execute(operation, false )
331330 helper. drop()
332- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
331+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
333332
334333 then :
335334 next. getResumeToken() != null
@@ -358,7 +357,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
358357 when :
359358 def cursor = execute(operation, false )
360359 helper. dropDatabase(' JavaDriverTest' )
361- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
360+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
362361
363362 then :
364363 next. getResumeToken() != null
@@ -387,7 +386,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
387386 when :
388387 def cursor = execute(operation, false )
389388 helper. renameCollection(newNamespace)
390- ChangeStreamDocument<BsonDocument > next = next(cursor, false ). get(0 )
389+ ChangeStreamDocument<BsonDocument > next = next(cursor, false , 1 ). get(0 )
391390
392391 then :
393392 next. getResumeToken() != null
@@ -442,7 +441,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
442441 def expected = insertDocuments(helper, [1 , 2 ])
443442
444443 then :
445- nextAndClean(cursor, async) == expected
444+ nextAndClean(cursor, async, expected . size() ) == expected
446445
447446 then :
448447 tryNextAndClean(cursor, async) == null
@@ -451,7 +450,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
451450 expected = insertDocuments(helper, [3 , 4 ])
452451
453452 then :
454- nextAndClean(cursor, async) == expected
453+ nextAndClean(cursor, async, expected . size() ) == expected
455454
456455 cleanup :
457456 cursor?. close()
@@ -473,15 +472,12 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
473472 def expected = insertDocuments(helper, [1 , 2 ])
474473
475474 then :
476- nextAndClean(cursor, async) == expected
475+ nextAndClean(cursor, async, expected . size() ) == expected
477476
478477 when :
479478 helper. killCursor(helper. getNamespace(), cursor. getWrapped(). getServerCursor())
480479 expected = insertDocuments(helper, [3 , 4 ])
481- def results = nextAndClean(cursor, async)
482- if (results. size() < expected. size()) {
483- results. addAll(nextAndClean(cursor, async))
484- }
480+ def results = nextAndClean(cursor, async, expected. size())
485481
486482 then :
487483 results == expected
@@ -493,10 +489,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
493489 expected = insertDocuments(helper, [5 , 6 ])
494490 helper. killCursor(helper. getNamespace(), cursor. getWrapped(). getServerCursor())
495491
496- results = nextAndClean(cursor, async)
497- if (results. size() < expected. size()) {
498- results. addAll(nextAndClean(cursor, async))
499- }
492+ results = nextAndClean(cursor, async, expected. size())
500493
501494 then :
502495 results == expected
@@ -521,7 +514,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
521514
522515 when :
523516 def expected = insertDocuments(helper, [1 , 2 ])
524- def result = next(cursor, async)
517+ def result = next(cursor, async, 2 )
525518
526519 then :
527520 result. size() == 2
@@ -532,7 +525,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
532525
533526 operation. startAtOperationTime(result. last(). getTimestamp(' clusterTime' ))
534527 cursor = execute(operation, async)
535- result = nextAndClean(cursor, async)
528+ result = nextAndClean(cursor, async, expected . tail . size() )
536529
537530 then :
538531 result == expected. tail()
@@ -556,7 +549,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
556549
557550 when :
558551 def expected = insertDocuments(helper, [1 , 2 ])
559- def result = next(cursor, async)
552+ def result = next(cursor, async, 2 )
560553
561554 then :
562555 result. size() == 2
@@ -567,7 +560,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
567560
568561 operation. resumeAfter(result. head(). getDocument(' _id' )). startAtOperationTime(null )
569562 cursor = execute(operation, async)
570- result = nextAndClean(cursor, async)
563+ result = nextAndClean(cursor, async, expected . tail() . size() )
571564
572565 then :
573566 result == expected. tail()
@@ -592,7 +585,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
592585
593586 when :
594587 def expected = insertDocuments(helper, [1 , 2 ])
595- def result = next(cursor, async)
588+ def result = next(cursor, async, 2 )
596589
597590 then :
598591 result. size() == 2
@@ -602,7 +595,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
602595 waitForLastRelease(async ? getAsyncCluster() : getCluster())
603596
604597 cursor = execute(operation. startAfter(result. head(). getDocument(' _id' )). startAtOperationTime(null ), async)
605- result = nextAndClean(cursor, async)
598+ result = nextAndClean(cursor, async, expected . tail() . size() )
606599
607600 then :
608601 result == expected. tail()
@@ -762,6 +755,10 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
762755 removeExtra(tryNext(cursor, async))
763756 }
764757
758+ def nextAndClean (cursor , boolean async , int minimumCount ) {
759+ removeExtra(next(cursor, async, minimumCount))
760+ }
761+
765762 def nextAndClean (cursor , boolean async ) {
766763 removeExtra(next(cursor, async))
767764 }
0 commit comments