3636import org .hibernate .reactive .query .sqm .spi .ReactiveSelectQueryPlan ;
3737import org .hibernate .reactive .sql .exec .internal .StandardReactiveSelectExecutor ;
3838import org .hibernate .reactive .sql .results .spi .ReactiveListResultsConsumer ;
39+ import org .hibernate .reactive .sql .results .spi .ReactiveResultsConsumer ;
3940import org .hibernate .sql .ast .SqlAstTranslator ;
4041import org .hibernate .sql .ast .SqlAstTranslatorFactory ;
4142import org .hibernate .sql .ast .spi .FromClauseAccess ;
5960public class ConcreteSqmSelectReactiveQueryPlan <R > extends ConcreteSqmSelectQueryPlan <R >
6061 implements ReactiveSelectQueryPlan <R > {
6162
63+ private final SqmInterpreter <Object , ReactiveResultsConsumer <Object , R >> executeQueryInterpreter ;
6264 private final SqmInterpreter <List <R >, Void > listInterpreter ;
6365 private final RowTransformer <R > rowTransformer ;
6466
@@ -80,6 +82,8 @@ public ConcreteSqmSelectReactiveQueryPlan(
8082 this .rowTransformer = determineRowTransformer ( sqm , resultType , tupleMetadata , queryOptions );
8183 this .listInterpreter = (unused , executionContext , sqmInterpretation , jdbcParameterBindings ) ->
8284 listInterpreter ( hql , domainParameterXref , executionContext , sqmInterpretation , jdbcParameterBindings , rowTransformer );
85+ this .executeQueryInterpreter = (resultsConsumer , executionContext , sqmInterpretation , jdbcParameterBindings ) ->
86+ executeQueryInterpreter ( hql , domainParameterXref , executionContext , sqmInterpretation , jdbcParameterBindings , rowTransformer , resultsConsumer );
8387 }
8488
8589 private static <R > CompletionStage <List <R >> listInterpreter (
@@ -110,6 +114,40 @@ private static <R> CompletionStage<List<R>> listInterpreter(
110114 .whenComplete ( (rs , t ) -> domainParameterXref .clearExpansions () );
111115 }
112116
117+ private static <R > CompletionStage <Object > executeQueryInterpreter (
118+ String hql ,
119+ DomainParameterXref domainParameterXref ,
120+ DomainQueryExecutionContext executionContext ,
121+ CacheableSqmInterpretation sqmInterpretation ,
122+ JdbcParameterBindings jdbcParameterBindings ,
123+ RowTransformer <R > rowTransformer ,
124+ ReactiveResultsConsumer <Object , R > resultsConsumer ) {
125+ final ReactiveSharedSessionContractImplementor session = (ReactiveSharedSessionContractImplementor ) executionContext .getSession ();
126+ final JdbcOperationQuerySelect jdbcSelect = sqmInterpretation .getJdbcSelect ();
127+ // I'm using a supplier so that the whenComplete at the end will catch any errors, like a finally-block
128+ Supplier <SubselectFetch .RegistrationHandler > fetchHandlerSupplier = () -> SubselectFetch
129+ .createRegistrationHandler ( session .getPersistenceContext ().getBatchFetchQueue (), sqmInterpretation .selectStatement , JdbcParametersList .empty (), jdbcParameterBindings );
130+ return completedFuture ( fetchHandlerSupplier )
131+ .thenApply ( Supplier ::get )
132+ .thenCompose ( subSelectFetchKeyHandler -> session
133+ .reactiveAutoFlushIfRequired ( jdbcSelect .getAffectedTableNames () )
134+ .thenCompose ( required -> StandardReactiveSelectExecutor .INSTANCE
135+ .executeQuery ( jdbcSelect ,
136+ jdbcParameterBindings ,
137+ ConcreteSqmSelectQueryPlan .listInterpreterExecutionContext ( hql , executionContext , jdbcSelect , subSelectFetchKeyHandler ),
138+ rowTransformer ,
139+ null ,
140+ sql -> executionContext .getSession ()
141+ .getJdbcCoordinator ()
142+ .getStatementPreparer ()
143+ .prepareQueryStatement ( sql , false , null ),
144+ resultsConsumer
145+ )
146+ )
147+ )
148+ .whenComplete ( (rs , t ) -> domainParameterXref .clearExpansions () );
149+ }
150+
113151 @ Override
114152 public ScrollableResultsImplementor <R > performScroll (ScrollMode scrollMode , DomainQueryExecutionContext executionContext ) {
115153 throw new UnsupportedOperationException ();
@@ -119,10 +157,21 @@ public ScrollableResultsImplementor<R> performScroll(ScrollMode scrollMode, Doma
119157 public CompletionStage <List <R >> reactivePerformList (DomainQueryExecutionContext executionContext ) {
120158 return executionContext .getQueryOptions ().getEffectiveLimit ().getMaxRowsJpa () == 0
121159 ? completedFuture ( emptyList () )
122- : withCacheableSqmInterpretation ( executionContext , listInterpreter );
160+ : withCacheableSqmInterpretation ( executionContext , null , listInterpreter );
161+ }
162+
163+ @ Override
164+ public <T > CompletionStage <T > reactiveExecuteQuery (
165+ DomainQueryExecutionContext executionContext ,
166+ ReactiveResultsConsumer <T , R > resultsConsumer ) {
167+ return withCacheableSqmInterpretation (
168+ executionContext ,
169+ resultsConsumer ,
170+ (SqmInterpreter <T , ReactiveResultsConsumer <T , R >>) (SqmInterpreter ) executeQueryInterpreter
171+ );
123172 }
124173
125- private <T , X > CompletionStage <T > withCacheableSqmInterpretation (DomainQueryExecutionContext executionContext , SqmInterpreter <T , X > interpreter ) {
174+ private <T , X > CompletionStage <T > withCacheableSqmInterpretation (DomainQueryExecutionContext executionContext , X context , SqmInterpreter <T , X > interpreter ) {
126175 // NOTE : VERY IMPORTANT - intentional double-lock checking
127176 // The other option would be to leverage `java.util.concurrent.locks.ReadWriteLock`
128177 // to protect access. However, synchronized is much simpler here. We will verify
@@ -162,7 +211,7 @@ private <T, X> CompletionStage<T> withCacheableSqmInterpretation(DomainQueryExec
162211 jdbcParameterBindings = createJdbcParameterBindings ( localCopy , executionContext );
163212 }
164213
165- return interpreter .interpret ( null , executionContext , localCopy , jdbcParameterBindings );
214+ return interpreter .interpret ( context , executionContext , localCopy , jdbcParameterBindings );
166215 }
167216
168217 private JdbcParameterBindings createJdbcParameterBindings (CacheableSqmInterpretation sqmInterpretation , DomainQueryExecutionContext executionContext ) {
0 commit comments