5151import com .google .common .util .concurrent .UncheckedExecutionException ;
5252import com .redis .lettucemod .api .StatefulRedisModulesConnection ;
5353import com .redis .lettucemod .search .AggregateOperation ;
54+ import com .redis .lettucemod .search .AggregateOptions ;
5455import com .redis .lettucemod .search .AggregateWithCursorResults ;
5556import com .redis .lettucemod .search .CreateOptions ;
57+ import com .redis .lettucemod .search .CursorOptions ;
5658import com .redis .lettucemod .search .Document ;
5759import com .redis .lettucemod .search .Field ;
5860import com .redis .lettucemod .search .Group ;
@@ -309,10 +311,13 @@ public SearchResults<String, String> search(RediSearchTableHandle tableHandle, S
309311 public AggregateWithCursorResults <String > aggregate (RediSearchTableHandle table ) {
310312 Aggregation aggregation = translator .aggregate (table );
311313 log .info ("Running %s" , aggregation );
312- AggregateWithCursorResults <String > results = connection .sync ().ftAggregate (aggregation .getIndex (),
313- aggregation .getQuery (), aggregation .getCursorOptions (), aggregation .getOptions ());
314- List <AggregateOperation <?, ?>> groupBys = aggregation .getOptions ().getOperations ().stream ()
315- .filter (o -> o .getType () == AggregateOperation .Type .GROUP ).collect (Collectors .toList ());
314+ String index = aggregation .getIndex ();
315+ String query = aggregation .getQuery ();
316+ CursorOptions cursor = aggregation .getCursorOptions ();
317+ AggregateOptions <String , String > options = aggregation .getOptions ();
318+ AggregateWithCursorResults <String > results = connection .sync ().ftAggregate (index , query , cursor , options );
319+ List <AggregateOperation <String , String >> groupBys = aggregation .getOptions ().getOperations ().stream ()
320+ .filter (this ::isGroupOperation ).collect (Collectors .toList ());
316321 if (results .isEmpty () && !groupBys .isEmpty ()) {
317322 Group groupBy = (Group ) groupBys .get (0 );
318323 Optional <String > as = groupBy .getReducers ()[0 ].getAs ();
@@ -325,6 +330,10 @@ public AggregateWithCursorResults<String> aggregate(RediSearchTableHandle table)
325330 return results ;
326331 }
327332
333+ private boolean isGroupOperation (AggregateOperation <String , String > operation ) {
334+ return operation .getType () == AggregateOperation .Type .GROUP ;
335+ }
336+
328337 public AggregateWithCursorResults <String > cursorRead (RediSearchTableHandle tableHandle , long cursor ) {
329338 String index = index (tableHandle .getSchemaTableName ());
330339 if (config .getCursorCount () > 0 ) {
0 commit comments