Skip to content

Commit d73d28d

Browse files
committed
Implemented Reactive versions of PostAction, PreAction and JdbcSelectWithActions
1 parent 1adce18 commit d73d28d

18 files changed

+1430
-20
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import org.hibernate.reactive.boot.spi.ReactiveMetadataImplementor;
1414
import org.hibernate.reactive.mutiny.Mutiny;
1515
import org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl;
16+
import org.hibernate.reactive.sql.exec.internal.ReactiveJdbcSelectWithActions;
1617
import org.hibernate.reactive.stage.Stage;
1718
import org.hibernate.reactive.stage.impl.StageSessionFactoryImpl;
19+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
1820

1921
/**
2022
* A Hibernate {@link org.hibernate.SessionFactory} that can be
@@ -42,4 +44,9 @@ public <T> T unwrap(Class<T> type) {
4244
}
4345
return super.unwrap( type );
4446
}
47+
48+
public JdbcSelectWithActionsBuilder getJdbcSelectWithActionsBuilder(){
49+
return new ReactiveJdbcSelectWithActions.Builder();
50+
}
51+
4552
}
Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.sql.exec.internal;
7+
8+
import org.hibernate.LockOptions;
9+
import org.hibernate.Locking;
10+
import org.hibernate.dialect.lock.spi.LockTimeoutType;
11+
import org.hibernate.dialect.lock.spi.LockingSupport;
12+
import org.hibernate.internal.util.collections.CollectionHelper;
13+
import org.hibernate.reactive.pool.ReactiveConnection;
14+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveCollectionLockingAction;
15+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveConnectionLockTimeoutStrategyBuilder;
16+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveFollowOnLockingAction;
17+
import org.hibernate.reactive.sql.exec.internal.lock.ReactiveLockTimeoutHandler;
18+
import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect;
19+
import org.hibernate.reactive.sql.exec.spi.ReactivePostAction;
20+
import org.hibernate.reactive.sql.exec.spi.ReactivePreAction;
21+
import org.hibernate.sql.ast.spi.LockingClauseStrategy;
22+
import org.hibernate.sql.ast.tree.select.QuerySpec;
23+
import org.hibernate.sql.exec.internal.JdbcOperationQuerySelect;
24+
import org.hibernate.sql.exec.internal.JdbcSelectWithActions;
25+
import org.hibernate.sql.exec.spi.ExecutionContext;
26+
import org.hibernate.sql.exec.spi.JdbcSelect;
27+
import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder;
28+
import org.hibernate.sql.exec.spi.LoadedValuesCollector;
29+
import org.hibernate.sql.exec.spi.PostAction;
30+
import org.hibernate.sql.exec.spi.PreAction;
31+
import org.hibernate.sql.exec.spi.SecondaryAction;
32+
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.concurrent.CompletionStage;
37+
38+
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
39+
import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture;
40+
41+
public class ReactiveJdbcSelectWithActions extends JdbcSelectWithActions implements ReactiveJdbcSelect {
42+
43+
public ReactiveJdbcSelectWithActions(
44+
JdbcOperationQuerySelect primaryOperation,
45+
LoadedValuesCollector loadedValuesCollector,
46+
PreAction[] preActions,
47+
PostAction[] postActions) {
48+
super( primaryOperation, loadedValuesCollector, preActions, postActions );
49+
}
50+
51+
public ReactiveJdbcSelectWithActions(
52+
JdbcOperationQuerySelect primaryAction,
53+
LoadedValuesCollector loadedValuesCollector) {
54+
super( primaryAction, loadedValuesCollector );
55+
}
56+
57+
@Override
58+
public CompletionStage<Void> reactivePerformPreActions(
59+
ReactiveConnection connection,
60+
ExecutionContext executionContext) {
61+
if ( preActions == null ) {
62+
return nullFuture();
63+
}
64+
65+
return loop( preActions, preAction ->
66+
( (ReactivePreAction) preAction ).reactivePerformPreAction( connection, executionContext )
67+
);
68+
}
69+
70+
@Override
71+
public CompletionStage<Void> reactivePerformPostActions(
72+
boolean succeeded,
73+
ReactiveConnection connection,
74+
ExecutionContext executionContext) {
75+
if ( postActions != null ) {
76+
return loop(
77+
postActions, postAction -> {
78+
if ( succeeded || postAction.shouldRunAfterFail() ) {
79+
return ( (ReactivePostAction) postAction ).reactivePerformReactivePostAction(
80+
connection,
81+
executionContext
82+
);
83+
}
84+
return nullFuture();
85+
}
86+
).thenAccept( unused -> {
87+
if ( loadedValuesCollector != null ) {
88+
loadedValuesCollector.clear();
89+
}
90+
} );
91+
}
92+
else {
93+
if ( loadedValuesCollector != null ) {
94+
loadedValuesCollector.clear();
95+
}
96+
return nullFuture();
97+
}
98+
}
99+
100+
public static class Builder implements JdbcSelectWithActionsBuilder {
101+
private JdbcOperationQuerySelect primaryAction;
102+
private LoadedValuesCollector loadedValuesCollector;
103+
protected List<PreAction> preActions;
104+
protected List<PostAction> postActions;
105+
protected LockTimeoutType lockTimeoutType;
106+
protected LockingSupport lockingSupport;
107+
protected LockOptions lockOptions;
108+
protected QuerySpec lockingTarget;
109+
protected LockingClauseStrategy lockingClauseStrategy;
110+
boolean isFollonOnLockStrategy;
111+
112+
@Override
113+
public Builder setPrimaryAction(JdbcSelect primaryAction) {
114+
assert primaryAction instanceof JdbcOperationQuerySelect;
115+
this.primaryAction = (JdbcOperationQuerySelect) primaryAction;
116+
return this;
117+
}
118+
119+
@SuppressWarnings("UnusedReturnValue")
120+
@Override
121+
public Builder setLoadedValuesCollector(LoadedValuesCollector loadedValuesCollector) {
122+
this.loadedValuesCollector = loadedValuesCollector;
123+
return this;
124+
}
125+
126+
@Override
127+
public Builder setLockTymeOutType(LockTimeoutType lockTimeoutType) {
128+
this.lockTimeoutType = lockTimeoutType;
129+
return this;
130+
}
131+
132+
@Override
133+
public Builder setLockingSupport(LockingSupport lockingSupport) {
134+
this.lockingSupport = lockingSupport;
135+
return this;
136+
}
137+
138+
@Override
139+
public Builder setLockOptions(LockOptions lockOptions) {
140+
this.lockOptions = lockOptions;
141+
return this;
142+
}
143+
144+
@Override
145+
public Builder setLockingTarget(QuerySpec lockingTarget) {
146+
this.lockingTarget = lockingTarget;
147+
return this;
148+
}
149+
150+
@Override
151+
public Builder setLockingClauseStrategy(LockingClauseStrategy lockingClauseStrategy) {
152+
this.lockingClauseStrategy = lockingClauseStrategy;
153+
return this;
154+
}
155+
156+
@Override
157+
public Builder setIsFollowOnLockStrategy(boolean isFollonOnLockStrategy) {
158+
this.isFollonOnLockStrategy = isFollonOnLockStrategy;
159+
return this;
160+
}
161+
162+
@Override
163+
public JdbcSelect build() {
164+
if ( lockTimeoutType == LockTimeoutType.CONNECTION ) {
165+
addSecondaryActionPair(
166+
new ReactiveLockTimeoutHandler(
167+
lockOptions.getTimeout(),
168+
ReactiveConnectionLockTimeoutStrategyBuilder.build( lockingSupport.getConnectionLockTimeoutStrategy() )
169+
)
170+
);
171+
}
172+
if ( isFollonOnLockStrategy ) {
173+
ReactiveFollowOnLockingAction.apply( lockOptions, lockingTarget, lockingClauseStrategy, this );
174+
}
175+
else if ( lockOptions.getScope() == Locking.Scope.INCLUDE_COLLECTIONS ) {
176+
ReactiveCollectionLockingAction.apply( lockOptions, lockingTarget, this );
177+
}
178+
if ( preActions == null && postActions == null ) {
179+
assert loadedValuesCollector == null;
180+
return primaryAction;
181+
}
182+
final PreAction[] preActions = toPreActionArray( this.preActions );
183+
final PostAction[] postActions = toPostActionArray( this.postActions );
184+
return new ReactiveJdbcSelectWithActions( primaryAction, loadedValuesCollector, preActions, postActions );
185+
}
186+
187+
/**
188+
* Appends the {@code actions} to the growing list of pre-actions,
189+
* executed (in order) after all currently registered actions.
190+
*
191+
* @return {@code this}, for method chaining.
192+
*/
193+
@Override
194+
public Builder appendPreAction(PreAction... actions) {
195+
if ( preActions == null ) {
196+
preActions = new ArrayList<>();
197+
}
198+
Collections.addAll( preActions, actions );
199+
return this;
200+
}
201+
202+
/**
203+
* Prepends the {@code actions} to the growing list of pre-actions
204+
*
205+
* @return {@code this}, for method chaining.
206+
*/
207+
@Override
208+
public Builder prependPreAction(PreAction... actions) {
209+
if ( preActions == null ) {
210+
preActions = new ArrayList<>();
211+
}
212+
// todo (DatabaseOperation) : should we invert the order of the incoming actions?
213+
Collections.addAll( preActions, actions );
214+
return this;
215+
}
216+
217+
/**
218+
* Appends the {@code actions} to the growing list of post-actions
219+
*
220+
* @return {@code this}, for method chaining.
221+
*/
222+
@Override
223+
public Builder appendPostAction(PostAction... actions) {
224+
if ( postActions == null ) {
225+
postActions = new ArrayList<>();
226+
}
227+
Collections.addAll( postActions, actions );
228+
return this;
229+
}
230+
231+
/**
232+
* Prepends the {@code actions} to the growing list of post-actions
233+
*
234+
* @return {@code this}, for method chaining.
235+
*/
236+
@Override
237+
public Builder prependPostAction(PostAction... actions) {
238+
if ( postActions == null ) {
239+
postActions = new ArrayList<>();
240+
}
241+
// todo (DatabaseOperation) : should we invert the order of the incoming actions?
242+
Collections.addAll( postActions, actions );
243+
return this;
244+
}
245+
246+
/**
247+
* Adds a secondary action pair.
248+
* Assumes the {@code action} implements both {@linkplain PreAction} and {@linkplain PostAction}.
249+
*
250+
* @return {@code this}, for method chaining.
251+
*
252+
* @apiNote Prefer {@linkplain #addSecondaryActionPair(PreAction, PostAction)} to avoid
253+
* the casts needed here.
254+
* @see #prependPreAction
255+
* @see #appendPostAction
256+
*/
257+
@Override
258+
public Builder addSecondaryActionPair(SecondaryAction action) {
259+
return addSecondaryActionPair( (PreAction) action, (PostAction) action );
260+
}
261+
262+
/**
263+
* Adds a PreAction/PostAction pair.
264+
*
265+
* @return {@code this}, for method chaining.
266+
*
267+
* @see #prependPreAction
268+
* @see #appendPostAction
269+
*/
270+
@Override
271+
public Builder addSecondaryActionPair(PreAction preAction, PostAction postAction) {
272+
prependPreAction( preAction );
273+
appendPostAction( postAction );
274+
return this;
275+
}
276+
277+
static PreAction[] toPreActionArray(List<PreAction> actions) {
278+
if ( CollectionHelper.isEmpty( actions ) ) {
279+
return null;
280+
}
281+
return actions.toArray( new PreAction[0] );
282+
}
283+
284+
static PostAction[] toPostActionArray(List<PostAction> actions) {
285+
if ( CollectionHelper.isEmpty( actions ) ) {
286+
return null;
287+
}
288+
return actions.toArray( new PostAction[0] );
289+
}
290+
291+
}
292+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.hibernate.query.TupleTransformer;
2323
import org.hibernate.reactive.engine.impl.ReactivePersistenceContextAdapter;
2424
import org.hibernate.query.spi.QueryOptions;
25+
import org.hibernate.reactive.pool.ReactiveConnection;
26+
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
27+
import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect;
2528
import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState;
2629
import org.hibernate.reactive.sql.exec.spi.ReactiveSelectExecutor;
2730
import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet;
@@ -220,10 +223,15 @@ public boolean shouldReturnProxies() {
220223
};
221224

222225
final JdbcValuesSourceProcessingStateStandardImpl valuesProcessingState =
223-
new JdbcValuesSourceProcessingStateStandardImpl( executionContext, processingOptions );
226+
new JdbcValuesSourceProcessingStateStandardImpl(
227+
jdbcSelect.getLoadedValuesCollector(),
228+
processingOptions,
229+
executionContext
230+
);
224231

232+
final SharedSessionContractImplementor session = executionContext.getSession();
225233
final ReactiveRowReader<R> rowReader = ReactiveResultsHelper.createRowReader(
226-
executionContext.getSession().getSessionFactory(),
234+
session.getSessionFactory(),
227235
rowTransformer,
228236
domainResultType,
229237
jdbcValues
@@ -237,21 +245,34 @@ public boolean shouldReturnProxies() {
237245
);
238246

239247
rowReader.startLoading( rowProcessingState );
240-
241-
return resultsConsumer
242-
.consume(
243-
jdbcValues,
244-
executionContext.getSession(),
245-
processingOptions,
246-
valuesProcessingState,
247-
rowProcessingState,
248-
rowReader
249-
)
250-
.thenApply( result -> {
251-
statistics.end( jdbcSelect, result );
252-
return result;
253-
} );
254-
} );
248+
ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection();
249+
if ( jdbcSelect instanceof ReactiveJdbcSelect reactiveJdbcSelect ) {
250+
return reactiveJdbcSelect.reactivePerformPreActions( reactiveConnection, executionContext )
251+
.thenCompose( unused -> resultsConsumer
252+
.consume(
253+
jdbcValues,
254+
session,
255+
processingOptions,
256+
valuesProcessingState,
257+
rowProcessingState,
258+
rowReader
259+
) )
260+
.thenCompose( result -> reactiveJdbcSelect
261+
.reactivePerformPostActions( true, reactiveConnection, executionContext )
262+
.thenApply( v -> {
263+
statistics.end( jdbcSelect, result );
264+
return result;
265+
} )
266+
);
267+
}
268+
else {
269+
return resultsConsumer.consume( jdbcValues, session, processingOptions, valuesProcessingState, rowProcessingState, rowReader )
270+
.thenApply( result -> {
271+
statistics.end( jdbcSelect, result );
272+
return result;
273+
} );
274+
}
275+
});
255276
}
256277

257278
private static <R> RowTransformer<R> rowTransformer(

0 commit comments

Comments
 (0)