@@ -104,7 +104,7 @@ class AggregateIterableSpecification extends Specification {
104104 )
105105 }
106106
107- def ' should build the expected AggregateToCollectionOperation' () {
107+ def ' should build the expected AggregateToCollectionOperation for $out ' () {
108108 given :
109109 def cursor = Stub (AsyncBatchCursor ) {
110110 next(_) >> {
@@ -207,6 +207,155 @@ class AggregateIterableSpecification extends Specification {
207207 .comment(' this is a comment' ))
208208 }
209209
210+ def ' should build the expected AggregateToCollectionOperation for $merge' () {
211+ given :
212+ def cursor = Stub (AsyncBatchCursor ) {
213+ next(_) >> {
214+ it[0 ]. onResult(null , null )
215+ }
216+ }
217+ def executor = new TestOperationExecutor ([cursor, cursor, cursor, cursor, cursor, cursor, cursor]);
218+ def collectionName = ' collectionName'
219+ def collectionNamespace = new MongoNamespace (namespace. getDatabaseName(), collectionName)
220+ def pipeline = [new Document (' $match' , 1 ), new Document (' $merge' , new Document (' into' , collectionName))]
221+ def pipelineWithIntoDocument = [new Document (' $match' , 1 ), new Document (' $merge' ,
222+ new Document (' into' , new Document (' db' , ' db2' ). append(' coll' , collectionName)))]
223+
224+ when : ' aggregation includes $merge'
225+ new AggregateIterableImpl (null , namespace, Document , Document , codecRegistry, readPreference, readConcern, writeConcern, executor,
226+ pipeline, AggregationLevel . COLLECTION , true )
227+ .batchSize(99 )
228+ .maxAwaitTime(99 , MILLISECONDS )
229+ .maxTime(999 , MILLISECONDS )
230+ .allowDiskUse(true )
231+ .useCursor(true )
232+ .collation(collation)
233+ .hint(new Document (' a' , 1 ))
234+ .comment(' this is a comment' )
235+ .into([]) { result , t -> }
236+
237+ def operation = executor. getReadOperation() as WriteOperationThenCursorReadOperation
238+
239+ then : ' should use the overrides'
240+ expect operation. getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation (namespace,
241+ [new BsonDocument (' $match' , new BsonInt32 (1 )),
242+ new BsonDocument (' $merge' , new BsonDocument (' into' , new BsonString (collectionName)))],
243+ writeConcern)
244+ .maxTime(999 , MILLISECONDS )
245+ .allowDiskUse(true )
246+ .collation(collation)
247+ .hint(new BsonDocument (' a' , new BsonInt32 (1 )))
248+ .comment(' this is a comment' ))
249+
250+ when : ' the subsequent read should have the batchSize set'
251+ operation = operation. getReadOperation() as FindOperation
252+
253+ then : ' should use the correct settings'
254+ operation. getNamespace() == collectionNamespace
255+ operation. getCollation() == collation
256+ operation. getBatchSize() == 99
257+ operation. getMaxAwaitTime(MILLISECONDS ) == 0
258+ operation. getMaxTime(MILLISECONDS ) == 0
259+
260+ when : ' aggregation includes $merge into a different database'
261+ new AggregateIterableImpl (null , namespace, Document , Document , codecRegistry, readPreference, readConcern, writeConcern, executor,
262+ pipelineWithIntoDocument, AggregationLevel . COLLECTION , false )
263+ .batchSize(99 )
264+ .maxTime(999 , MILLISECONDS )
265+ .allowDiskUse(true )
266+ .useCursor(true )
267+ .collation(collation)
268+ .hint(new Document (' a' , 1 ))
269+ .comment(' this is a comment' )
270+ .into([]) { result , t -> }
271+
272+ operation = executor. getReadOperation() as WriteOperationThenCursorReadOperation
273+
274+ then : ' should use the overrides'
275+ expect operation. getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation (namespace,
276+ [new BsonDocument (' $match' , new BsonInt32 (1 )),
277+ new BsonDocument (' $merge' , new BsonDocument (' into' ,
278+ new BsonDocument (' db' , new BsonString (' db2' )). append(' coll' , new BsonString (collectionName))))],
279+ writeConcern,
280+ AggregationLevel . COLLECTION )
281+ .maxTime(999 , MILLISECONDS )
282+ .allowDiskUse(true )
283+ .collation(collation)
284+ .hint(new BsonDocument (' a' , new BsonInt32 (1 )))
285+ .comment(' this is a comment' )
286+ )
287+
288+ when : ' the subsequent read should have the batchSize set'
289+ operation = operation. getReadOperation() as FindOperation
290+
291+ then : ' should use the correct settings'
292+ operation. getNamespace() == new MongoNamespace (' db2' , collectionName)
293+ operation. getBatchSize() == 99
294+ operation. getCollation() == collation
295+ operation. getMaxAwaitTime(MILLISECONDS ) == 0
296+ operation. getMaxTime(MILLISECONDS ) == 0
297+
298+ when : ' aggregation includes $out and is at the database level'
299+ new AggregateIterableImpl (null , namespace, Document , Document , codecRegistry, readPreference, readConcern, writeConcern, executor,
300+ pipeline, AggregationLevel . DATABASE , true )
301+ .batchSize(99 )
302+ .maxAwaitTime(99 , MILLISECONDS )
303+ .maxTime(999 , MILLISECONDS )
304+ .allowDiskUse(true )
305+ .useCursor(true )
306+ .collation(collation)
307+ .hint(new Document (' a' , 1 ))
308+ .comment(' this is a comment' )
309+ .into([]) { result , t -> }
310+
311+ operation = executor. getReadOperation() as WriteOperationThenCursorReadOperation
312+
313+ then : ' should use the overrides'
314+ expect operation. getAggregateToCollectionOperation(), isTheSameAs(new AggregateToCollectionOperation (namespace,
315+ [new BsonDocument (' $match' , new BsonInt32 (1 )),
316+ new BsonDocument (' $merge' , new BsonDocument (' into' , new BsonString (collectionName)))],
317+ writeConcern,
318+ AggregationLevel . DATABASE )
319+ .maxTime(999 , MILLISECONDS )
320+ .allowDiskUse(true )
321+ .collation(collation)
322+ .hint(new BsonDocument (' a' , new BsonInt32 (1 )))
323+ .comment(' this is a comment' ))
324+
325+ when : ' the subsequent read should have the batchSize set'
326+ operation = operation. getReadOperation() as FindOperation
327+
328+ then : ' should use the correct settings'
329+ operation. getNamespace() == collectionNamespace
330+ operation. getCollation() == collation
331+ operation. getBatchSize() == 99
332+ operation. getMaxAwaitTime(MILLISECONDS ) == 0
333+ operation. getMaxTime(MILLISECONDS ) == 0
334+
335+ when : ' toCollection should work as expected'
336+ def futureResultCallback = new FutureResultCallback ()
337+ new AggregateIterableImpl (null , namespace, Document , Document , codecRegistry, readPreference, readConcern, writeConcern, executor,
338+ pipeline, AggregationLevel . COLLECTION , true )
339+ .allowDiskUse(true )
340+ .collation(collation)
341+ .hint(new Document (' a' , 1 ))
342+ .comment(' this is a comment' )
343+ .toCollection(futureResultCallback)
344+ futureResultCallback. get()
345+
346+ operation = executor. getWriteOperation() as AggregateToCollectionOperation
347+
348+ then :
349+ expect operation, isTheSameAs(new AggregateToCollectionOperation (namespace,
350+ [new BsonDocument (' $match' , new BsonInt32 (1 )),
351+ new BsonDocument (' $merge' , new BsonDocument (' into' , new BsonString (collectionName)))],
352+ writeConcern)
353+ .allowDiskUse(true )
354+ .collation(collation)
355+ .hint(new BsonDocument (' a' , new BsonInt32 (1 )))
356+ .comment(' this is a comment' ))
357+ }
358+
210359 def ' should handle exceptions correctly' () {
211360 given :
212361 def codecRegistry = fromProviders([new ValueCodecProvider (), new BsonValueCodecProvider ()])
0 commit comments