1616
1717package com .mongodb .async .client ;
1818
19- import com .mongodb .Block ;
20- import com .mongodb .Function ;
2119import com .mongodb .MongoNamespace ;
2220import com .mongodb .ReadConcern ;
2321import com .mongodb .ReadPreference ;
2422import com .mongodb .WriteConcern ;
2523import com .mongodb .async .AsyncBatchCursor ;
2624import com .mongodb .async .SingleResultCallback ;
2725import com .mongodb .client .model .Collation ;
28- import com .mongodb .client .model .FindOptions ;
2926import com .mongodb .operation .AggregateOperation ;
3027import com .mongodb .operation .AggregateToCollectionOperation ;
3128import com .mongodb .operation .AsyncOperationExecutor ;
29+ import com .mongodb .operation .AsyncReadOperation ;
30+ import com .mongodb .operation .FindOperation ;
31+ import com .mongodb .session .ClientSession ;
3232import org .bson .BsonDocument ;
3333import org .bson .BsonValue ;
3434import org .bson .codecs .configuration .CodecRegistry ;
3535import org .bson .conversions .Bson ;
3636
3737import java .util .ArrayList ;
38- import java .util .Collection ;
3938import java .util .List ;
4039import java .util .concurrent .TimeUnit ;
4140
42- import static com .mongodb .ReadPreference .primary ;
4341import static com .mongodb .assertions .Assertions .notNull ;
4442import static java .util .concurrent .TimeUnit .MILLISECONDS ;
4543
4644
47- class AggregateIterableImpl <TDocument , TResult > implements AggregateIterable <TResult > {
45+ class AggregateIterableImpl <TDocument , TResult > extends MongoIterableImpl < TResult > implements AggregateIterable <TResult > {
4846 private final MongoNamespace namespace ;
4947 private final Class <TDocument > documentClass ;
5048 private final Class <TResult > resultClass ;
51- private final ReadPreference readPreference ;
52- private final ReadConcern readConcern ;
5349 private final WriteConcern writeConcern ;
5450 private final CodecRegistry codecRegistry ;
55- private final AsyncOperationExecutor executor ;
5651 private final List <? extends Bson > pipeline ;
5752
5853 private Boolean allowDiskUse ;
59- private Integer batchSize ;
60- private long maxAwaitTimeMS ;
6154 private long maxTimeMS ;
55+ private long maxAwaitTimeMS ;
6256 private Boolean useCursor ;
6357 private Boolean bypassDocumentValidation ;
6458 private Collation collation ;
6559 private String comment ;
6660 private Bson hint ;
6761
68- AggregateIterableImpl (final MongoNamespace namespace , final Class <TDocument > documentClass , final Class <TResult > resultClass ,
69- final CodecRegistry codecRegistry , final ReadPreference readPreference , final ReadConcern readConcern ,
70- final WriteConcern writeConcern , final AsyncOperationExecutor executor , final List <? extends Bson > pipeline ) {
62+ AggregateIterableImpl (final ClientSession clientSession , final MongoNamespace namespace , final Class <TDocument > documentClass ,
63+ final Class <TResult > resultClass , final CodecRegistry codecRegistry , final ReadPreference readPreference ,
64+ final ReadConcern readConcern , final WriteConcern writeConcern , final AsyncOperationExecutor executor ,
65+ final List <? extends Bson > pipeline ) {
66+ super (clientSession , executor , readConcern , readPreference );
7167 this .namespace = notNull ("namespace" , namespace );
7268 this .documentClass = notNull ("documentClass" , documentClass );
7369 this .resultClass = notNull ("resultClass" , resultClass );
7470 this .codecRegistry = notNull ("codecRegistry" , codecRegistry );
75- this .readPreference = notNull ("readPreference" , readPreference );
76- this .readConcern = notNull ("readConcern" , readConcern );
7771 this .writeConcern = notNull ("writeConcern" , writeConcern );
78- this .executor = notNull ("executor" , executor );
7972 this .pipeline = notNull ("pipeline" , pipeline );
8073 }
8174
8275 @ Override
83- public AggregateIterable <TResult > allowDiskUse (final Boolean allowDiskUse ) {
84- this .allowDiskUse = allowDiskUse ;
85- return this ;
76+ public void toCollection (final SingleResultCallback <Void > callback ) {
77+ List <BsonDocument > aggregateList = createBsonDocumentList (pipeline );
78+
79+ if (getOutCollection (aggregateList ) == null ) {
80+ throw new IllegalStateException ("The last stage of the aggregation pipeline must be $out" );
81+ }
82+
83+ getExecutor ().execute (createAggregateToCollectionOperation (aggregateList ), getClientSession (), callback );
8684 }
8785
8886 @ Override
89- public AggregateIterable <TResult > batchSize (final int batchSize ) {
90- this .batchSize = batchSize ;
87+ public AggregateIterable <TResult > allowDiskUse (final Boolean allowDiskUse ) {
88+ this .allowDiskUse = allowDiskUse ;
9189 return this ;
9290 }
9391
9492 @ Override
95- public AggregateIterable <TResult > maxAwaitTime (final long maxAwaitTime , final TimeUnit timeUnit ) {
96- notNull ("timeUnit" , timeUnit );
97- this .maxAwaitTimeMS = TimeUnit .MILLISECONDS .convert (maxAwaitTime , timeUnit );
93+ public AggregateIterable <TResult > batchSize (final int batchSize ) {
94+ super .batchSize (batchSize );
9895 return this ;
9996 }
10097
@@ -105,7 +102,6 @@ public AggregateIterable<TResult> maxTime(final long maxTime, final TimeUnit tim
105102 return this ;
106103 }
107104
108-
109105 @ Override
110106 @ Deprecated
111107 public AggregateIterable <TResult > useCursor (final Boolean useCursor ) {
@@ -114,51 +110,10 @@ public AggregateIterable<TResult> useCursor(final Boolean useCursor) {
114110 }
115111
116112 @ Override
117- public void toCollection (final SingleResultCallback <Void > callback ) {
118- List <BsonDocument > aggregateList = createBsonDocumentList ();
119- BsonValue outCollection = getAggregateOutCollection (aggregateList );
120-
121- if (outCollection == null ) {
122- throw new IllegalStateException ("The last stage of the aggregation pipeline must be $out" );
123- }
124-
125- executor .execute (new AggregateToCollectionOperation (namespace , aggregateList , writeConcern )
126- .maxTime (maxTimeMS , MILLISECONDS )
127- .allowDiskUse (allowDiskUse )
128- .collation (collation )
129- .hint (hint == null ? null : hint .toBsonDocument (documentClass , codecRegistry ))
130- .comment (comment ), callback );
131- }
132-
133- @ Override
134- public void first (final SingleResultCallback <TResult > callback ) {
135- notNull ("callback" , callback );
136- execute ().first (callback );
137- }
138-
139- @ Override
140- public void forEach (final Block <? super TResult > block , final SingleResultCallback <Void > callback ) {
141- notNull ("block" , block );
142- notNull ("callback" , callback );
143- execute ().forEach (block , callback );
144- }
145-
146- @ Override
147- public <A extends Collection <? super TResult >> void into (final A target , final SingleResultCallback <A > callback ) {
148- notNull ("target" , target );
149- notNull ("callback" , callback );
150- execute ().into (target , callback );
151- }
152-
153- @ Override
154- public <U > MongoIterable <U > map (final Function <TResult , U > mapper ) {
155- return new MappingIterable <TResult , U >(this , mapper );
156- }
157-
158- @ Override
159- public void batchCursor (final SingleResultCallback <AsyncBatchCursor <TResult >> callback ) {
160- notNull ("callback" , callback );
161- execute ().batchCursor (callback );
113+ public AggregateIterable <TResult > maxAwaitTime (final long maxAwaitTime , final TimeUnit timeUnit ) {
114+ notNull ("timeUnit" , timeUnit );
115+ this .maxAwaitTimeMS = TimeUnit .MILLISECONDS .convert (maxAwaitTime , timeUnit );
116+ return this ;
162117 }
163118
164119 @ Override
@@ -185,53 +140,59 @@ public AggregateIterable<TResult> hint(final Bson hint) {
185140 return this ;
186141 }
187142
143+ @ Override
188144 @ SuppressWarnings ("deprecation" )
189- private MongoIterable < TResult > execute () {
190- List <BsonDocument > aggregateList = createBsonDocumentList ();
191- BsonValue outCollection = getAggregateOutCollection (aggregateList );
145+ AsyncReadOperation < AsyncBatchCursor < TResult >> asAsyncReadOperation () {
146+ List <BsonDocument > aggregateList = createBsonDocumentList (pipeline );
147+ BsonValue outCollection = getOutCollection (aggregateList );
192148
193149 if (outCollection != null ) {
194- AggregateToCollectionOperation operation = new AggregateToCollectionOperation (namespace , aggregateList , writeConcern )
195- .maxTime (maxTimeMS , MILLISECONDS )
196- .allowDiskUse (allowDiskUse )
197- .bypassDocumentValidation (bypassDocumentValidation )
198- .collation (collation )
199- .hint (hint == null ? null : hint .toBsonDocument (documentClass , codecRegistry ))
200- .comment (comment );
201- MongoIterable <TResult > delegated = new FindIterableImpl <TDocument , TResult >(new MongoNamespace (namespace .getDatabaseName (),
202- outCollection .asString ().getValue ()), documentClass , resultClass , codecRegistry , primary (), readConcern ,
203- executor , new BsonDocument (), new FindOptions ().collation (collation ).maxAwaitTime (maxAwaitTimeMS , MILLISECONDS ));
204- if (batchSize != null ) {
205- delegated .batchSize (batchSize );
150+ AggregateToCollectionOperation aggregateToCollectionOperation = createAggregateToCollectionOperation (aggregateList );
151+ FindOperation <TResult > findOperation =
152+ new FindOperation <TResult >(new MongoNamespace (namespace .getDatabaseName (), outCollection .asString ().getValue ()),
153+ codecRegistry .get (resultClass ))
154+ .readConcern (getReadConcern ())
155+ .collation (collation )
156+ .maxAwaitTime (maxAwaitTimeMS , MILLISECONDS );
157+ if (getBatchSize () != null ) {
158+ findOperation .batchSize (getBatchSize ());
206159 }
207- return new AwaitingWriteOperationIterable <TResult , Void >( operation , executor , delegated );
160+ return new AggregateToCollectionThenFindOperation <TResult >( aggregateToCollectionOperation , findOperation );
208161 } else {
209- return new OperationIterable <TResult >(new AggregateOperation <TResult >(namespace , aggregateList , codecRegistry .get (resultClass ))
210- .maxAwaitTime (maxAwaitTimeMS , MILLISECONDS )
162+ return new AggregateOperation <TResult >(namespace , aggregateList , codecRegistry .get (resultClass ))
211163 .maxTime (maxTimeMS , MILLISECONDS )
164+ .maxAwaitTime (maxAwaitTimeMS , MILLISECONDS )
212165 .allowDiskUse (allowDiskUse )
213- .batchSize (batchSize )
166+ .batchSize (getBatchSize () )
214167 .useCursor (useCursor )
215- .readConcern (readConcern )
168+ .readConcern (getReadConcern () )
216169 .collation (collation )
217170 .hint (hint == null ? null : hint .toBsonDocument (documentClass , codecRegistry ))
218- .comment (comment ),
219- readPreference ,
220- executor );
171+ .comment (comment );
221172 }
222173 }
223174
224- private BsonValue getAggregateOutCollection (final List <BsonDocument > aggregateList ) {
175+ private BsonValue getOutCollection (final List <BsonDocument > aggregateList ) {
225176 return aggregateList .size () == 0 ? null : aggregateList .get (aggregateList .size () - 1 ).get ("$out" );
226177 }
227178
228- private List <BsonDocument > createBsonDocumentList () {
179+ private AggregateToCollectionOperation createAggregateToCollectionOperation (final List <BsonDocument > aggregateList ) {
180+ return new AggregateToCollectionOperation (namespace , aggregateList , writeConcern )
181+ .maxTime (maxTimeMS , MILLISECONDS )
182+ .allowDiskUse (allowDiskUse )
183+ .bypassDocumentValidation (bypassDocumentValidation )
184+ .collation (collation )
185+ .hint (hint == null ? null : hint .toBsonDocument (documentClass , codecRegistry ))
186+ .comment (comment );
187+ }
188+
189+ private List <BsonDocument > createBsonDocumentList (final List <? extends Bson > pipeline ) {
229190 List <BsonDocument > aggregateList = new ArrayList <BsonDocument >(pipeline .size ());
230- for (Bson document : pipeline ) {
231- if (document == null ) {
191+ for (Bson obj : pipeline ) {
192+ if (obj == null ) {
232193 throw new IllegalArgumentException ("pipeline can not contain a null value" );
233194 }
234- aggregateList .add (document .toBsonDocument (documentClass , codecRegistry ));
195+ aggregateList .add (obj .toBsonDocument (documentClass , codecRegistry ));
235196 }
236197 return aggregateList ;
237198 }
0 commit comments