|
39 | 39 | import java.util.stream.Collectors; |
40 | 40 |
|
41 | 41 | import org.assertj.core.api.Assertions; |
| 42 | +import org.bson.BsonDocument; |
| 43 | +import org.bson.BsonString; |
42 | 44 | import org.bson.Document; |
43 | 45 | import org.bson.conversions.Bson; |
44 | 46 | import org.bson.types.ObjectId; |
|
111 | 113 | import com.mongodb.client.result.InsertOneResult; |
112 | 114 | import com.mongodb.client.result.UpdateResult; |
113 | 115 | import com.mongodb.reactivestreams.client.AggregatePublisher; |
| 116 | +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; |
114 | 117 | import com.mongodb.reactivestreams.client.DistinctPublisher; |
115 | 118 | import com.mongodb.reactivestreams.client.FindPublisher; |
116 | 119 | import com.mongodb.reactivestreams.client.MapReducePublisher; |
@@ -146,6 +149,7 @@ public class ReactiveMongoTemplateUnitTests { |
146 | 149 | @Mock DistinctPublisher distinctPublisher; |
147 | 150 | @Mock Publisher deletePublisher; |
148 | 151 | @Mock MapReducePublisher mapReducePublisher; |
| 152 | + @Mock ChangeStreamPublisher changeStreamPublisher; |
149 | 153 |
|
150 | 154 | private MongoExceptionTranslator exceptionTranslator = new MongoExceptionTranslator(); |
151 | 155 | private MappingMongoConverter converter; |
@@ -1485,6 +1489,22 @@ void createCollectionShouldSetUpTimeSeries() { |
1485 | 1489 | .granularity(TimeSeriesGranularity.HOURS).toString()); |
1486 | 1490 | } |
1487 | 1491 |
|
| 1492 | + @Test // GH-4167 |
| 1493 | + void changeStreamOptionStartAftershouldApplied() { |
| 1494 | + |
| 1495 | + when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db)); |
| 1496 | + |
| 1497 | + when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher); |
| 1498 | + when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher); |
| 1499 | + when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher); |
| 1500 | + when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher); |
| 1501 | + |
| 1502 | + BsonDocument token = new BsonDocument("token", new BsonString("id")); |
| 1503 | + template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe(); |
| 1504 | + |
| 1505 | + verify(changeStreamPublisher).startAfter(eq(token)); |
| 1506 | + } |
| 1507 | + |
1488 | 1508 | private void stubFindSubscribe(Document document) { |
1489 | 1509 |
|
1490 | 1510 | Publisher<Document> realPublisher = Flux.just(document); |
|
0 commit comments