Skip to content

Commit 31c1b88

Browse files
committed
[#2651] Implemented Reactive versions of PostAction, PreAction and JdbcSelectWithActions
1 parent 62dfc6d commit 31c1b88

16 files changed

+1310
-17
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import org.hibernate.reactive.boot.spi.ReactiveMetadataImplementor;
1414
import org.hibernate.reactive.mutiny.Mutiny;
1515
import org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl;
16+
import org.hibernate.reactive.sql.exec.internal.ReactiveJdbcSelectWithActions;
1617
import org.hibernate.reactive.stage.Stage;
1718
import org.hibernate.reactive.stage.impl.StageSessionFactoryImpl;
19+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
1820

1921
/**
2022
* A Hibernate {@link org.hibernate.SessionFactory} that can be
@@ -42,4 +44,9 @@ public <T> T unwrap(Class<T> type) {
4244
}
4345
return super.unwrap( type );
4446
}
47+
48+
public JdbcSelectWithActionsBuilder getJdbcSelectWithActionsBuilder(){
49+
return new ReactiveJdbcSelectWithActions.Builder();
50+
}
51+
4552
}
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.sql.exec.internal;
7+
8+
import org.hibernate.LockOptions;
9+
import org.hibernate.dialect.lock.spi.LockTimeoutType;
10+
import org.hibernate.dialect.lock.spi.LockingSupport;
11+
import org.hibernate.internal.util.collections.CollectionHelper;
12+
import org.hibernate.reactive.pool.ReactiveConnection;
13+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveConnectionLockTimeoutStrategyBuilder;
14+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveFollowOnLockingAction;
15+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveLockTimeoutHandler;
16+
import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect;
17+
import org.hibernate.reactive.sql.exec.spi.ReactivePostAction;
18+
import org.hibernate.reactive.sql.exec.spi.ReactivePreAction;
19+
import org.hibernate.sql.ast.spi.LockingClauseStrategy;
20+
import org.hibernate.sql.ast.tree.select.QuerySpec;
21+
import org.hibernate.sql.exec.internal.JdbcOperationQuerySelect;
22+
import org.hibernate.sql.exec.internal.JdbcSelectWithActions;
23+
import org.hibernate.sql.exec.spi.ExecutionContext;
24+
import org.hibernate.sql.exec.spi.JdbcSelect;
25+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
26+
import org.hibernate.sql.exec.spi.LoadedValuesCollector;
27+
import org.hibernate.sql.exec.spi.PostAction;
28+
import org.hibernate.sql.exec.spi.PreAction;
29+
import org.hibernate.sql.exec.spi.SecondaryAction;
30+
31+
import java.util.ArrayList;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.concurrent.CompletionStage;
35+
36+
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
37+
import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture;
38+
39+
/**
40+
* Reactive version of {@link JdbcSelectWithActions}
41+
*/
42+
public class ReactiveJdbcSelectWithActions extends JdbcSelectWithActions implements ReactiveJdbcSelect {
43+
44+
public ReactiveJdbcSelectWithActions(
45+
JdbcOperationQuerySelect primaryOperation,
46+
LoadedValuesCollector loadedValuesCollector,
47+
PreAction[] preActions,
48+
PostAction[] postActions) {
49+
super( primaryOperation, loadedValuesCollector, preActions, postActions );
50+
}
51+
52+
public ReactiveJdbcSelectWithActions(
53+
JdbcOperationQuerySelect primaryAction,
54+
LoadedValuesCollector loadedValuesCollector) {
55+
super( primaryAction, loadedValuesCollector );
56+
}
57+
58+
@Override
59+
public CompletionStage<Void> reactivePerformPreActions(
60+
ReactiveConnection connection,
61+
ExecutionContext executionContext) {
62+
if ( preActions == null ) {
63+
return nullFuture();
64+
}
65+
66+
return loop( preActions, preAction ->
67+
( (ReactivePreAction) preAction ).reactivePerformPreAction( connection, executionContext )
68+
);
69+
}
70+
71+
@Override
72+
public CompletionStage<Void> reactivePerformPostActions(
73+
boolean succeeded,
74+
ReactiveConnection connection,
75+
ExecutionContext executionContext) {
76+
if ( postActions != null ) {
77+
return loop(
78+
postActions, postAction -> {
79+
if ( succeeded || postAction.shouldRunAfterFail() ) {
80+
return ( (ReactivePostAction) postAction ).reactivePerformReactivePostAction(
81+
connection,
82+
executionContext
83+
);
84+
}
85+
return nullFuture();
86+
}
87+
).thenAccept( unused -> {
88+
if ( loadedValuesCollector != null ) {
89+
loadedValuesCollector.clear();
90+
}
91+
} );
92+
}
93+
else {
94+
if ( loadedValuesCollector != null ) {
95+
loadedValuesCollector.clear();
96+
}
97+
return nullFuture();
98+
}
99+
}
100+
101+
public static class Builder implements JdbcSelectWithActionsBuilder {
102+
private JdbcOperationQuerySelect primaryAction;
103+
private LoadedValuesCollector loadedValuesCollector;
104+
protected List<PreAction> preActions;
105+
protected List<PostAction> postActions;
106+
protected LockTimeoutType lockTimeoutType;
107+
protected LockingSupport lockingSupport;
108+
protected LockOptions lockOptions;
109+
protected QuerySpec lockingTarget;
110+
protected LockingClauseStrategy lockingClauseStrategy;
111+
boolean isFollonOnLockStrategy;
112+
113+
@Override
114+
public Builder setPrimaryAction(JdbcSelect primaryAction) {
115+
assert primaryAction instanceof JdbcOperationQuerySelect;
116+
this.primaryAction = (JdbcOperationQuerySelect) primaryAction;
117+
return this;
118+
}
119+
120+
@SuppressWarnings("UnusedReturnValue")
121+
@Override
122+
public Builder setLoadedValuesCollector(LoadedValuesCollector loadedValuesCollector) {
123+
this.loadedValuesCollector = loadedValuesCollector;
124+
return this;
125+
}
126+
127+
@Override
128+
public Builder setLockTimeoutType(LockTimeoutType lockTimeoutType) {
129+
this.lockTimeoutType = lockTimeoutType;
130+
return this;
131+
}
132+
133+
@Override
134+
public Builder setLockingSupport(LockingSupport lockingSupport) {
135+
this.lockingSupport = lockingSupport;
136+
return this;
137+
}
138+
139+
@Override
140+
public Builder setLockOptions(LockOptions lockOptions) {
141+
this.lockOptions = lockOptions;
142+
return this;
143+
}
144+
145+
@Override
146+
public Builder setLockingTarget(QuerySpec lockingTarget) {
147+
this.lockingTarget = lockingTarget;
148+
return this;
149+
}
150+
151+
@Override
152+
public Builder setLockingClauseStrategy(LockingClauseStrategy lockingClauseStrategy) {
153+
this.lockingClauseStrategy = lockingClauseStrategy;
154+
return this;
155+
}
156+
157+
@Override
158+
public Builder setIsFollowOnLockStrategy(boolean isFollonOnLockStrategy) {
159+
this.isFollonOnLockStrategy = isFollonOnLockStrategy;
160+
return this;
161+
}
162+
163+
@Override
164+
public JdbcSelect build() {
165+
if ( lockTimeoutType == LockTimeoutType.CONNECTION ) {
166+
addSecondaryActionPair(
167+
new ReactiveLockTimeoutHandler(
168+
lockOptions.getTimeout(),
169+
ReactiveConnectionLockTimeoutStrategyBuilder.build( lockingSupport.getConnectionLockTimeoutStrategy() )
170+
)
171+
);
172+
}
173+
if ( isFollonOnLockStrategy ) {
174+
ReactiveFollowOnLockingAction.apply( lockOptions, lockingTarget, lockingClauseStrategy, this );
175+
}
176+
177+
if ( preActions == null && postActions == null ) {
178+
assert loadedValuesCollector == null;
179+
return primaryAction;
180+
}
181+
final PreAction[] preActions = toPreActionArray( this.preActions );
182+
final PostAction[] postActions = toPostActionArray( this.postActions );
183+
return new ReactiveJdbcSelectWithActions( primaryAction, loadedValuesCollector, preActions, postActions );
184+
}
185+
186+
/**
187+
* Appends the {@code actions} to the growing list of pre-actions,
188+
* executed (in order) after all currently registered actions.
189+
*
190+
* @return {@code this}, for method chaining.
191+
*/
192+
@Override
193+
public Builder appendPreAction(PreAction... actions) {
194+
if ( preActions == null ) {
195+
preActions = new ArrayList<>();
196+
}
197+
Collections.addAll( preActions, actions );
198+
return this;
199+
}
200+
201+
/**
202+
* Prepends the {@code actions} to the growing list of pre-actions
203+
*
204+
* @return {@code this}, for method chaining.
205+
*/
206+
@Override
207+
public Builder prependPreAction(PreAction... actions) {
208+
if ( preActions == null ) {
209+
preActions = new ArrayList<>();
210+
}
211+
// todo (DatabaseOperation) : should we invert the order of the incoming actions?
212+
Collections.addAll( preActions, actions );
213+
return this;
214+
}
215+
216+
/**
217+
* Appends the {@code actions} to the growing list of post-actions
218+
*
219+
* @return {@code this}, for method chaining.
220+
*/
221+
@Override
222+
public Builder appendPostAction(PostAction... actions) {
223+
if ( postActions == null ) {
224+
postActions = new ArrayList<>();
225+
}
226+
Collections.addAll( postActions, actions );
227+
return this;
228+
}
229+
230+
/**
231+
* Prepends the {@code actions} to the growing list of post-actions
232+
*
233+
* @return {@code this}, for method chaining.
234+
*/
235+
@Override
236+
public Builder prependPostAction(PostAction... actions) {
237+
if ( postActions == null ) {
238+
postActions = new ArrayList<>();
239+
}
240+
// todo (DatabaseOperation) : should we invert the order of the incoming actions?
241+
Collections.addAll( postActions, actions );
242+
return this;
243+
}
244+
245+
/**
246+
* Adds a secondary action pair.
247+
* Assumes the {@code action} implements both {@linkplain PreAction} and {@linkplain PostAction}.
248+
*
249+
* @return {@code this}, for method chaining.
250+
*
251+
* @apiNote Prefer {@linkplain #addSecondaryActionPair(PreAction, PostAction)} to avoid
252+
* the casts needed here.
253+
* @see #prependPreAction
254+
* @see #appendPostAction
255+
*/
256+
@Override
257+
public Builder addSecondaryActionPair(SecondaryAction action) {
258+
return addSecondaryActionPair( (PreAction) action, (PostAction) action );
259+
}
260+
261+
/**
262+
* Adds a PreAction/PostAction pair.
263+
*
264+
* @return {@code this}, for method chaining.
265+
*
266+
* @see #prependPreAction
267+
* @see #appendPostAction
268+
*/
269+
@Override
270+
public Builder addSecondaryActionPair(PreAction preAction, PostAction postAction) {
271+
prependPreAction( preAction );
272+
appendPostAction( postAction );
273+
return this;
274+
}
275+
276+
static PreAction[] toPreActionArray(List<PreAction> actions) {
277+
if ( CollectionHelper.isEmpty( actions ) ) {
278+
return null;
279+
}
280+
return actions.toArray( new PreAction[0] );
281+
}
282+
283+
static PostAction[] toPostActionArray(List<PostAction> actions) {
284+
if ( CollectionHelper.isEmpty( actions ) ) {
285+
return null;
286+
}
287+
return actions.toArray( new PostAction[0] );
288+
}
289+
290+
}
291+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.hibernate.query.TupleTransformer;
2323
import org.hibernate.reactive.engine.impl.ReactivePersistenceContextAdapter;
2424
import org.hibernate.query.spi.QueryOptions;
25+
import org.hibernate.reactive.pool.ReactiveConnection;
26+
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
27+
import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect;
2528
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
2629
import org.hibernate.reactive.sql.exec.spi.ReactiveSelectExecutor;
2730
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
@@ -220,10 +223,15 @@ public boolean shouldReturnProxies() {
220223
};
221224

222225
final JdbcValuesSourceProcessingStateStandardImpl valuesProcessingState =
223-
new JdbcValuesSourceProcessingStateStandardImpl( executionContext, processingOptions );
226+
new JdbcValuesSourceProcessingStateStandardImpl(
227+
jdbcSelect.getLoadedValuesCollector(),
228+
processingOptions,
229+
executionContext
230+
);
224231

232+
final SharedSessionContractImplementor session = executionContext.getSession();
225233
final ReactiveRowReader<R> rowReader = ReactiveResultsHelper.createRowReader(
226-
executionContext.getSession().getSessionFactory(),
234+
session.getSessionFactory(),
227235
rowTransformer,
228236
domainResultType,
229237
jdbcValues
@@ -237,21 +245,34 @@ public boolean shouldReturnProxies() {
237245
);
238246

239247
rowReader.startLoading( rowProcessingState );
240-
241-
return resultsConsumer
242-
.consume(
243-
jdbcValues,
244-
executionContext.getSession(),
245-
processingOptions,
246-
valuesProcessingState,
247-
rowProcessingState,
248-
rowReader
249-
)
250-
.thenApply( result -> {
251-
statistics.end( jdbcSelect, result );
252-
return result;
253-
} );
254-
} );
248+
ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
249+
if ( jdbcSelect instanceof ReactiveJdbcSelect reactiveJdbcSelect ) {
250+
return reactiveJdbcSelect.reactivePerformPreActions( reactiveConnection, executionContext )
251+
.thenCompose( unused -> resultsConsumer
252+
.consume(
253+
jdbcValues,
254+
session,
255+
processingOptions,
256+
valuesProcessingState,
257+
rowProcessingState,
258+
rowReader
259+
) )
260+
.thenCompose( result -> reactiveJdbcSelect
261+
.reactivePerformPostActions( true, reactiveConnection, executionContext )
262+
.thenApply( v -> {
263+
statistics.end( jdbcSelect, result );
264+
return result;
265+
} )
266+
);
267+
}
268+
else {
269+
return resultsConsumer.consume( jdbcValues, session, processingOptions, valuesProcessingState, rowProcessingState, rowReader )
270+
.thenApply( result -> {
271+
statistics.end( jdbcSelect, result );
272+
return result;
273+
} );
274+
}
275+
});
255276
}
256277

257278
private static <R> RowTransformer<R> rowTransformer(

0 commit comments

Comments
 (0)