diff --git a/gradle.properties b/gradle.properties index 7a33aa914..2d873a973 100644 --- a/gradle.properties +++ b/gradle.properties @@ -40,7 +40,7 @@ org.gradle.java.installations.auto-download=false #db = MSSQL # Enable the maven Central Snapshot repository, when set to any value (the value is ignored) -#enableCentralSonatypeSnapshotsRep = true +enableCentralSonatypeSnapshotsRep = true # Enable the maven local repository (for local development when needed) when present (value ignored) #enableMavenLocalRepo = true @@ -48,14 +48,14 @@ org.gradle.java.installations.auto-download=false ### Settings the following properties will override the version defined in gradle/libs.versions.toml # The default Hibernate ORM version (override using `-PhibernateOrmVersion=the.version.you.want`) -#hibernateOrmVersion = 7.1.1.Final +hibernateOrmVersion = 7.2.+ # Override default Hibernate ORM Gradle plugin version -#hibernateOrmGradlePluginVersion = 7.1.1.Final +hibernateOrmGradlePluginVersion = 7.2.+ # If set to true, skip Hibernate ORM version parsing (default is true, if set to null) # this is required when using intervals or weird versions or the build will fail -#skipOrmVersionParsing = true +skipOrmVersionParsing = true # Override default Vert.x Sql client version #vertxSqlClientVersion = 5.0.2-SNAPSHOT diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/NoJdbcEnvironmentInitiator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/NoJdbcEnvironmentInitiator.java index 2d5209127..adf7b403c 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/NoJdbcEnvironmentInitiator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/NoJdbcEnvironmentInitiator.java @@ -10,6 +10,7 @@ import org.hibernate.engine.jdbc.connections.spi.DatabaseConnectionInfo; import org.hibernate.engine.jdbc.dialect.spi.DialectFactory; import org.hibernate.engine.jdbc.dialect.spi.DialectResolutionInfo; +import org.hibernate.engine.jdbc.env.JdbcMetadataOnBoot; import org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentImpl; import org.hibernate.engine.jdbc.env.internal.JdbcEnvironmentInitiator; import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment; @@ -79,6 +80,7 @@ protected JdbcEnvironmentImpl getJdbcEnvironmentWithDefaults( @Override protected JdbcEnvironmentImpl getJdbcEnvironmentUsingJdbcMetadata( + JdbcMetadataOnBoot jdbcMetadataAccess, Map configurationValues, ServiceRegistryImplementor registry, DialectFactory dialectFactory, diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java index f8029e297..19d34d410 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java @@ -13,8 +13,10 @@ import org.hibernate.reactive.boot.spi.ReactiveMetadataImplementor; import org.hibernate.reactive.mutiny.Mutiny; import org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl; +import org.hibernate.reactive.sql.exec.internal.ReactiveJdbcSelectWithActions; import org.hibernate.reactive.stage.Stage; import org.hibernate.reactive.stage.impl.StageSessionFactoryImpl; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; /** * A Hibernate {@link org.hibernate.SessionFactory} that can be @@ -42,4 +44,9 @@ public T unwrap(Class type) { } return super.unwrap( type ); } + + public JdbcSelectWithActionsBuilder getJdbcSelectWithActionsBuilder(){ + return new ReactiveJdbcSelectWithActions.Builder(); + } + } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java new file mode 100644 index 000000000..197474287 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java @@ -0,0 +1,291 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal; + +import org.hibernate.LockOptions; +import org.hibernate.dialect.lock.spi.LockTimeoutType; +import org.hibernate.dialect.lock.spi.LockingSupport; +import org.hibernate.internal.util.collections.CollectionHelper; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveConnectionLockTimeoutStrategyBuilder; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveFollowOnLockingAction; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveLockTimeoutHandler; +import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.reactive.sql.exec.spi.ReactivePreAction; +import org.hibernate.sql.ast.spi.LockingClauseStrategy; +import org.hibernate.sql.ast.tree.select.QuerySpec; +import org.hibernate.sql.exec.internal.JdbcOperationQuerySelect; +import org.hibernate.sql.exec.internal.JdbcSelectWithActions; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelect; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; +import org.hibernate.sql.exec.spi.LoadedValuesCollector; +import org.hibernate.sql.exec.spi.PostAction; +import org.hibernate.sql.exec.spi.PreAction; +import org.hibernate.sql.exec.spi.SecondaryAction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.reactive.util.impl.CompletionStages.loop; +import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; + +/** + * Reactive version of {@link JdbcSelectWithActions} + */ +public class ReactiveJdbcSelectWithActions extends JdbcSelectWithActions implements ReactiveJdbcSelect { + + public ReactiveJdbcSelectWithActions( + JdbcOperationQuerySelect primaryOperation, + LoadedValuesCollector loadedValuesCollector, + PreAction[] preActions, + PostAction[] postActions) { + super( primaryOperation, loadedValuesCollector, preActions, postActions ); + } + + public ReactiveJdbcSelectWithActions( + JdbcOperationQuerySelect primaryAction, + LoadedValuesCollector loadedValuesCollector) { + super( primaryAction, loadedValuesCollector ); + } + + @Override + public CompletionStage reactivePerformPreActions( + ReactiveConnection connection, + ExecutionContext executionContext) { + if ( preActions == null ) { + return nullFuture(); + } + + return loop( preActions, preAction -> + ( (ReactivePreAction) preAction ).reactivePerformPreAction( connection, executionContext ) + ); + } + + @Override + public CompletionStage reactivePerformPostActions( + boolean succeeded, + ReactiveConnection connection, + ExecutionContext executionContext) { + if ( postActions != null ) { + return loop( + postActions, postAction -> { + if ( succeeded || postAction.shouldRunAfterFail() ) { + return ( (ReactivePostAction) postAction ).reactivePerformReactivePostAction( + connection, + executionContext + ); + } + return nullFuture(); + } + ).thenAccept( unused -> { + if ( loadedValuesCollector != null ) { + loadedValuesCollector.clear(); + } + } ); + } + else { + if ( loadedValuesCollector != null ) { + loadedValuesCollector.clear(); + } + return nullFuture(); + } + } + + public static class Builder implements JdbcSelectWithActionsBuilder { + private JdbcOperationQuerySelect primaryAction; + private LoadedValuesCollector loadedValuesCollector; + protected List preActions; + protected List postActions; + protected LockTimeoutType lockTimeoutType; + protected LockingSupport lockingSupport; + protected LockOptions lockOptions; + protected QuerySpec lockingTarget; + protected LockingClauseStrategy lockingClauseStrategy; + boolean isFollonOnLockStrategy; + + @Override + public Builder setPrimaryAction(JdbcSelect primaryAction) { + assert primaryAction instanceof JdbcOperationQuerySelect; + this.primaryAction = (JdbcOperationQuerySelect) primaryAction; + return this; + } + + @SuppressWarnings("UnusedReturnValue") + @Override + public Builder setLoadedValuesCollector(LoadedValuesCollector loadedValuesCollector) { + this.loadedValuesCollector = loadedValuesCollector; + return this; + } + + @Override + public Builder setLockTimeoutType(LockTimeoutType lockTimeoutType) { + this.lockTimeoutType = lockTimeoutType; + return this; + } + + @Override + public Builder setLockingSupport(LockingSupport lockingSupport) { + this.lockingSupport = lockingSupport; + return this; + } + + @Override + public Builder setLockOptions(LockOptions lockOptions) { + this.lockOptions = lockOptions; + return this; + } + + @Override + public Builder setLockingTarget(QuerySpec lockingTarget) { + this.lockingTarget = lockingTarget; + return this; + } + + @Override + public Builder setLockingClauseStrategy(LockingClauseStrategy lockingClauseStrategy) { + this.lockingClauseStrategy = lockingClauseStrategy; + return this; + } + + @Override + public Builder setIsFollowOnLockStrategy(boolean isFollonOnLockStrategy) { + this.isFollonOnLockStrategy = isFollonOnLockStrategy; + return this; + } + + @Override + public JdbcSelect build() { + if ( lockTimeoutType == LockTimeoutType.CONNECTION ) { + addSecondaryActionPair( + new ReactiveLockTimeoutHandler( + lockOptions.getTimeout(), + ReactiveConnectionLockTimeoutStrategyBuilder.build( lockingSupport.getConnectionLockTimeoutStrategy() ) + ) + ); + } + if ( isFollonOnLockStrategy ) { + ReactiveFollowOnLockingAction.apply( lockOptions, lockingTarget, lockingClauseStrategy, this ); + } + + if ( preActions == null && postActions == null ) { + assert loadedValuesCollector == null; + return primaryAction; + } + final PreAction[] preActions = toPreActionArray( this.preActions ); + final PostAction[] postActions = toPostActionArray( this.postActions ); + return new ReactiveJdbcSelectWithActions( primaryAction, loadedValuesCollector, preActions, postActions ); + } + + /** + * Appends the {@code actions} to the growing list of pre-actions, + * executed (in order) after all currently registered actions. + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder appendPreAction(PreAction... actions) { + if ( preActions == null ) { + preActions = new ArrayList<>(); + } + Collections.addAll( preActions, actions ); + return this; + } + + /** + * Prepends the {@code actions} to the growing list of pre-actions + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder prependPreAction(PreAction... actions) { + if ( preActions == null ) { + preActions = new ArrayList<>(); + } + // todo (DatabaseOperation) : should we invert the order of the incoming actions? + Collections.addAll( preActions, actions ); + return this; + } + + /** + * Appends the {@code actions} to the growing list of post-actions + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder appendPostAction(PostAction... actions) { + if ( postActions == null ) { + postActions = new ArrayList<>(); + } + Collections.addAll( postActions, actions ); + return this; + } + + /** + * Prepends the {@code actions} to the growing list of post-actions + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder prependPostAction(PostAction... actions) { + if ( postActions == null ) { + postActions = new ArrayList<>(); + } + // todo (DatabaseOperation) : should we invert the order of the incoming actions? + Collections.addAll( postActions, actions ); + return this; + } + + /** + * Adds a secondary action pair. + * Assumes the {@code action} implements both {@linkplain PreAction} and {@linkplain PostAction}. + * + * @return {@code this}, for method chaining. + * + * @apiNote Prefer {@linkplain #addSecondaryActionPair(PreAction, PostAction)} to avoid + * the casts needed here. + * @see #prependPreAction + * @see #appendPostAction + */ + @Override + public Builder addSecondaryActionPair(SecondaryAction action) { + return addSecondaryActionPair( (PreAction) action, (PostAction) action ); + } + + /** + * Adds a PreAction/PostAction pair. + * + * @return {@code this}, for method chaining. + * + * @see #prependPreAction + * @see #appendPostAction + */ + @Override + public Builder addSecondaryActionPair(PreAction preAction, PostAction postAction) { + prependPreAction( preAction ); + appendPostAction( postAction ); + return this; + } + + static PreAction[] toPreActionArray(List actions) { + if ( CollectionHelper.isEmpty( actions ) ) { + return null; + } + return actions.toArray( new PreAction[0] ); + } + + static PostAction[] toPostActionArray(List actions) { + if ( CollectionHelper.isEmpty( actions ) ) { + return null; + } + return actions.toArray( new PostAction[0] ); + } + + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java index edadd83e9..338dd21df 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java @@ -22,6 +22,9 @@ import org.hibernate.query.TupleTransformer; import org.hibernate.reactive.engine.impl.ReactivePersistenceContextAdapter; import org.hibernate.query.spi.QueryOptions; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect; import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState; import org.hibernate.reactive.sql.exec.spi.ReactiveSelectExecutor; import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet; @@ -220,10 +223,15 @@ public boolean shouldReturnProxies() { }; final JdbcValuesSourceProcessingStateStandardImpl valuesProcessingState = - new JdbcValuesSourceProcessingStateStandardImpl( executionContext, processingOptions ); + new JdbcValuesSourceProcessingStateStandardImpl( + jdbcSelect.getLoadedValuesCollector(), + processingOptions, + executionContext + ); + final SharedSessionContractImplementor session = executionContext.getSession(); final ReactiveRowReader rowReader = ReactiveResultsHelper.createRowReader( - executionContext.getSession().getSessionFactory(), + session.getSessionFactory(), rowTransformer, domainResultType, jdbcValues @@ -237,21 +245,34 @@ public boolean shouldReturnProxies() { ); rowReader.startLoading( rowProcessingState ); - - return resultsConsumer - .consume( - jdbcValues, - executionContext.getSession(), - processingOptions, - valuesProcessingState, - rowProcessingState, - rowReader - ) - .thenApply( result -> { - statistics.end( jdbcSelect, result ); - return result; - } ); - } ); + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + if ( jdbcSelect instanceof ReactiveJdbcSelect reactiveJdbcSelect ) { + return reactiveJdbcSelect.reactivePerformPreActions( reactiveConnection, executionContext ) + .thenCompose( unused -> resultsConsumer + .consume( + jdbcValues, + session, + processingOptions, + valuesProcessingState, + rowProcessingState, + rowReader + ) ) + .thenCompose( result -> reactiveJdbcSelect + .reactivePerformPostActions( true, reactiveConnection, executionContext ) + .thenApply( v -> { + statistics.end( jdbcSelect, result ); + return result; + } ) + ); + } + else { + return resultsConsumer.consume( jdbcValues, session, processingOptions, valuesProcessingState, rowProcessingState, rowReader ) + .thenApply( result -> { + statistics.end( jdbcSelect, result ); + return result; + } ); + } + }); } private static RowTransformer rowTransformer( diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/LockHelper.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/LockHelper.java new file mode 100644 index 000000000..a4db6a44c --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/LockHelper.java @@ -0,0 +1,80 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.Helper} + */ +public class LockHelper { + + public static CompletionStage getLockTimeout( + String sql, + TimeoutExtractor extractor, + ReactiveConnection connection) { + return connection.select( sql ) + .thenCompose( resultset -> { + if ( !resultset.hasNext() ) { + throw new HibernateException( + "Unable to query JDBC Connection for current lock-timeout setting (no result)" ); + + } + return extractor.extractFrom( resultset ); + } ); + + } + + /** + * Set the {@linkplain ReactiveConnection}-level lock-timeout using the given {@code sql} command. + */ + public static CompletionStage setLockTimeout( + String sql, + ReactiveConnection connection) { + return connection.execute( sql ); + } + + /** + * Set the {@linkplain ReactiveConnection}-level lock-timeout using + * the given {@code sqlFormat} (with a single format placeholder + * for the {@code milliseconds} value). + * + * @see #setLockTimeout(String, ReactiveConnection) + */ + public static CompletionStage setLockTimeout( + Integer milliseconds, + String sqlFormat, + ReactiveConnection connection) { + final String sql = String.format( sqlFormat, milliseconds ); + return setLockTimeout( sql, connection ); + } + + /** + * Set the {@linkplain ReactiveConnection}-level lock-timeout. The passed + * {@code valueStrategy} is used to interpret the {@code timeout} + * which is then used with {@code sqlFormat} to execute the command. + * + * @see #setLockTimeout(Integer, String, ReactiveConnection) + */ + public static CompletionStage setLockTimeout( + Timeout timeout, + Function valueStrategy, + String sqlFormat, + ReactiveConnection connection) { + final int milliseconds = valueStrategy.apply( timeout ); + return setLockTimeout( milliseconds, sqlFormat, connection ); + } + + @FunctionalInterface + public interface TimeoutExtractor { + CompletionStage extractFrom(ReactiveConnection.Result resultSet); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCockroachConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCockroachConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..35d30fbe3 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCockroachConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,74 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.NO_WAIT_MILLI; +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER; +import static org.hibernate.Timeouts.WAIT_FOREVER_MILLI; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +public class ReactiveCockroachConnectionLockTimeoutStrategyImpl + implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactiveCockroachConnectionLockTimeoutStrategyImpl INSTANCE = new ReactiveCockroachConnectionLockTimeoutStrategyImpl(); + + @Override + public Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.SUPPORTED; + } + + @Override + public CompletionStage getReactiveLockTimeout( + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "show lock_timeout", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + int millis = Integer.parseInt( (String) resultSet.next()[0] ); + return switch ( millis ) { + case 0 -> completedFuture( WAIT_FOREVER ); + default -> completedFuture( Timeout.milliseconds( millis ) ); + }; + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + if ( milliseconds == NO_WAIT_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept no-wait" ); + } + return milliseconds == WAIT_FOREVER_MILLI + ? 0 + : milliseconds; + }, + "set lock_timeout = %s", + connection + ); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategy.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategy.java new file mode 100644 index 000000000..8e8194a14 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategy.java @@ -0,0 +1,45 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link ConnectionLockTimeoutStrategy} + */ +public interface ReactiveConnectionLockTimeoutStrategy extends ConnectionLockTimeoutStrategy { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default Timeout getLockTimeout(Connection connection, SessionFactoryImplementor factory) { + throw LOG.nonReactiveMethodCall( "getReactiveLockTimeout()" ); + } + + @Override + default void setLockTimeout(Timeout timeout, Connection connection, SessionFactoryImplementor factory) { + throw LOG.nonReactiveMethodCall( "setReactiveLockTimeout()" ); + } + + default CompletionStage getReactiveLockTimeout(ReactiveConnection connection, SessionFactoryImplementor factory){ + throw new UnsupportedOperationException( "Lock timeout on the connection is not supported" ); + } + + default CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory){ + throw new UnsupportedOperationException( "Lock timeout on the connection is not supported" ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategyBuilder.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategyBuilder.java new file mode 100644 index 000000000..7619047e8 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategyBuilder.java @@ -0,0 +1,36 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.dialect.lock.internal.CockroachLockingSupport; +import org.hibernate.dialect.lock.internal.MySQLLockingSupport; +import org.hibernate.dialect.lock.internal.PostgreSQLLockingSupport; +import org.hibernate.dialect.lock.internal.TransactSQLLockingSupport; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; + +/** + * Builder to create {@link ReactiveConnectionLockTimeoutStrategy} equivalents of {@link ConnectionLockTimeoutStrategy} + */ +public class ReactiveConnectionLockTimeoutStrategyBuilder { + public static ReactiveConnectionLockTimeoutStrategy build(ConnectionLockTimeoutStrategy strategy) { + if ( strategy instanceof MySQLLockingSupport.ConnectionLockTimeoutStrategyImpl ) { + return ReactiveMySQLConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else if ( strategy instanceof CockroachLockingSupport ) { + return ReactiveCockroachConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else if ( strategy instanceof TransactSQLLockingSupport.SQLServerImpl ) { + return ReactiveSQLServerConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else if ( strategy instanceof PostgreSQLLockingSupport ) { + return ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else { + throw new IllegalArgumentException( "Unsupported ConnectionLockTimeoutStrategy: " + strategy.getClass() + .getName() ); + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveFollowOnLockingAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveFollowOnLockingAction.java new file mode 100644 index 000000000..654c50924 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveFollowOnLockingAction.java @@ -0,0 +1,139 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.LockMode; +import org.hibernate.LockOptions; +import org.hibernate.Locking; +import org.hibernate.engine.spi.CollectionKey; +import org.hibernate.engine.spi.EntityKey; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.metamodel.mapping.EntityMappingType; +import org.hibernate.metamodel.mapping.PluralAttributeMapping; +import org.hibernate.metamodel.mapping.TableDetails; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.impl.ReactiveSessionImpl; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.sql.ast.spi.LockingClauseStrategy; +import org.hibernate.sql.ast.tree.select.QuerySpec; +import org.hibernate.sql.exec.internal.lock.FollowOnLockingAction; +import org.hibernate.sql.exec.internal.lock.LockingHelper; +import org.hibernate.sql.exec.internal.lock.TableLock; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; + +import jakarta.persistence.Timeout; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import static java.util.Collections.emptyMap; +import static org.hibernate.reactive.util.impl.CompletionStages.loop; + +/** + * Reactive version of {@link FollowOnLockingAction} + */ +public class ReactiveFollowOnLockingAction extends FollowOnLockingAction implements ReactivePostAction { + protected ReactiveFollowOnLockingAction( + LoadedValuesCollectorImpl loadedValuesCollector, + LockMode lockMode, + Timeout lockTimeout, + Locking.Scope lockScope) { + super( loadedValuesCollector, lockMode, lockTimeout, lockScope ); + } + + public static void apply( + LockOptions lockOptions, + QuerySpec lockingTarget, + LockingClauseStrategy lockingClauseStrategy, + JdbcSelectWithActionsBuilder jdbcSelectBuilder) { + final var fromClause = lockingTarget.getFromClause(); + final var loadedValuesCollector = resolveLoadedValuesCollector( fromClause, lockingClauseStrategy ); + + // NOTE: we need to set this separately so that it can get incorporated into + // the JdbcValuesSourceProcessingState for proper callbacks + jdbcSelectBuilder.setLoadedValuesCollector( loadedValuesCollector ); + + // additionally, add a post-action which uses the collected values. + jdbcSelectBuilder.appendPostAction( new ReactiveFollowOnLockingAction( + loadedValuesCollector, + lockOptions.getLockMode(), + lockOptions.getTimeout(), + lockOptions.getScope() + ) ); + } + + + @Override + public CompletionStage reactivePerformReactivePostAction( + ReactiveConnection jdbcConnection, + ExecutionContext executionContext) { + LockingHelper.logLoadedValues( loadedValuesCollector ); + + final var session = executionContext.getSession(); + + // NOTE: we deal with effective graphs here to make sure embedded associations are treated as lazy + final var effectiveEntityGraph = session.getLoadQueryInfluencers().getEffectiveEntityGraph(); + final var initialGraph = effectiveEntityGraph.getGraph(); + final var initialSemantic = effectiveEntityGraph.getSemantic(); + + // collect registrations by entity type + final var entitySegments = segmentLoadedValues(); + final Map>> collectionSegments = + lockScope == Locking.Scope.INCLUDE_FETCHES + ? segmentLoadedCollections() + : emptyMap(); + + // for each entity-type, prepare a locking select statement per table. + // this is based on the attributes for "state array" ordering purposes - + // we match each attribute to the table it is mapped to and add it to + // the select-list for that table-segment. + + return loop( + entitySegments.keySet().iterator(), (entityMappingType, index) -> { + List entityKeys = entitySegments.get( entityMappingType ); + final var tableLocks = prepareTableLocks( entityMappingType, entityKeys, session ); + + // create a cross-reference of information related to an entity based on its identifier, + // we'll use this later when we adjust the state array and inject state into the entity instance. + final var entityDetailsMap = LockingHelper.resolveEntityKeys( entityKeys, executionContext ); + + // at this point, we have all the individual locking selects ready to go - execute them + final var lockingOptions = buildLockingOptions( + tableLocks, + entityDetailsMap, + entityMappingType, + effectiveEntityGraph, + entityKeys, + collectionSegments, + session, + executionContext + ); + return loop( tableLocks.values().iterator(), (tableLock, i) -> + ( (ReactiveTableLock) tableLock ).reactivePerformActions( + entityDetailsMap, + lockingOptions, + (ReactiveSessionImpl) session + ) + ); + + } + ).whenComplete( (unused, throwable) -> { + effectiveEntityGraph.clear(); + session.getLoadQueryInfluencers().applyEntityGraph( initialGraph, initialSemantic ); + } ); + + } + + @Override + protected TableLock createTableLock( + TableDetails tableDetails, + EntityMappingType entityMappingType, + List entityKeys, + SharedSessionContractImplementor session) { + return new ReactiveTableLock( tableDetails, entityMappingType, entityKeys, session ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveLockTimeoutHandler.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveLockTimeoutHandler.java new file mode 100644 index 000000000..7780166bd --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveLockTimeoutHandler.java @@ -0,0 +1,66 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.reactive.sql.exec.spi.ReactivePreAction; +import org.hibernate.sql.exec.spi.ExecutionContext; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link org.hibernate.sql.exec.internal.LockTimeoutHandler} + */ +public class ReactiveLockTimeoutHandler implements ReactivePreAction, ReactivePostAction { + private final ReactiveConnectionLockTimeoutStrategy lockTimeoutStrategy; + private final Timeout timeout; + + private Timeout baseline; + private boolean setTimeout; + + public ReactiveLockTimeoutHandler(Timeout timeout, ReactiveConnectionLockTimeoutStrategy lockTimeoutStrategy) { + this.timeout = timeout; + this.lockTimeoutStrategy = lockTimeoutStrategy; + } + + + @Override + public CompletionStage reactivePerformPreAction( + ReactiveConnection connection, + ExecutionContext executionContext) { + final var factory = executionContext.getSession().getFactory(); + + // first, get the baseline (for post-action) + return lockTimeoutStrategy.getReactiveLockTimeout( connection, factory ) + .thenCompose( baseline -> { + this.baseline = baseline; + // now set the timeout + return lockTimeoutStrategy.setReactiveLockTimeout( timeout, connection, factory ); + } ) + .thenAccept( unused -> setTimeout = true ); + + } + + @Override + public CompletionStage reactivePerformReactivePostAction( + ReactiveConnection connection, + ExecutionContext executionContext) { + final var factory = executionContext.getSession().getFactory(); + + // reset the timeout + return lockTimeoutStrategy.setReactiveLockTimeout(baseline, connection,factory ); + } + + + + @Override + public boolean shouldRunAfterFail() { + // if we set the timeout in the pre-action, we should always reset it in post-action + return setTimeout; + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveMySQLConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveMySQLConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..4444233f7 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveMySQLConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,71 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.Timeouts; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER_MILLI; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.MySQLLockingSupport.ConnectionLockTimeoutStrategyImpl} + */ +public class ReactiveMySQLConnectionLockTimeoutStrategyImpl implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactiveMySQLConnectionLockTimeoutStrategyImpl INSTANCE = new ReactiveMySQLConnectionLockTimeoutStrategyImpl(); + + @Override + public ConnectionLockTimeoutStrategy.Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.EXTENDED; + } + + @Override + public CompletionStage getReactiveLockTimeout( ReactiveConnection connection, SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "SELECT @@SESSION.innodb_lock_wait_timeout", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int millis = (int) resultSet.next()[0]; + return switch ( millis ) { + case 0 -> completedFuture( Timeouts.NO_WAIT ); + case 100000000 -> completedFuture( Timeouts.WAIT_FOREVER ); + default -> completedFuture( Timeout.milliseconds( millis ) ); + }; + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + if ( milliseconds == WAIT_FOREVER_MILLI ) { + return 100000000; + } + return milliseconds; + }, + "SET @@SESSION.innodb_lock_wait_timeout = %s", + connection + ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..99bc84d34 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,78 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.Timeouts; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.NO_WAIT_MILLI; +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER_MILLI; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.PostgreSQLLockingSupport} + */ +public class ReactivePostgreSQLConnectionLockTimeoutStrategyImpl implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactivePostgreSQLConnectionLockTimeoutStrategyImpl INSTANCE = new ReactivePostgreSQLConnectionLockTimeoutStrategyImpl(); + + @Override + public Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.SUPPORTED; + } + + @Override + public CompletionStage getReactiveLockTimeout( + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "select current_setting('lock_timeout', true)", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final String value = (String) resultSet.next()[0]; + if ( "0".equals( value ) ) { + return completedFuture( Timeouts.WAIT_FOREVER ); + } + assert value.endsWith( "s" ); + final int secondsValue = Integer.parseInt( value.substring( 0, value.length() - 1 ) ); + return completedFuture( Timeout.seconds( secondsValue ) ); + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + if ( milliseconds == NO_WAIT_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept no-wait" ); + } + return milliseconds == WAIT_FOREVER_MILLI + ? 0 + : milliseconds; + }, + "set lock_timeout = %s", + connection + ); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveSQLServerConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveSQLServerConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..4d8c9a156 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveSQLServerConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,71 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.NO_WAIT; +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.TransactSQLLockingSupport.SQLServerImpl} + */ +public class ReactiveSQLServerConnectionLockTimeoutStrategyImpl implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactiveSQLServerConnectionLockTimeoutStrategyImpl INSTANCE = new ReactiveSQLServerConnectionLockTimeoutStrategyImpl(); + + @Override + public Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.EXTENDED; + } + + @Override + public CompletionStage getReactiveLockTimeout( + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "select @@lock_timeout", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int millis = (int) resultSet.next()[0]; + return switch ( millis ) { + case -1 -> completedFuture( WAIT_FOREVER ); + case 0 -> completedFuture( NO_WAIT ); + default -> completedFuture( Timeout.milliseconds( millis ) ); + }; + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + return milliseconds; + }, + "set lock_timeout %s", + connection + ); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveTableLock.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveTableLock.java new file mode 100644 index 000000000..3415da415 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveTableLock.java @@ -0,0 +1,194 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.AssertionFailure; +import org.hibernate.engine.spi.EntityKey; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.metamodel.mapping.AttributeMapping; +import org.hibernate.metamodel.mapping.EntityMappingType; +import org.hibernate.metamodel.mapping.ForeignKeyDescriptor; +import org.hibernate.metamodel.mapping.TableDetails; +import org.hibernate.metamodel.mapping.internal.ToOneAttributeMapping; +import org.hibernate.query.spi.QueryOptions; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.session.impl.ReactiveSessionImpl; +import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor; +import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.sql.ast.tree.select.SelectStatement; +import org.hibernate.sql.exec.internal.BaseExecutionContext; +import org.hibernate.sql.exec.internal.lock.EntityDetails; +import org.hibernate.sql.exec.internal.lock.TableLock; +import org.hibernate.sql.results.graph.DomainResult; + +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Hibernate.isEmpty; +import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; + +/** + * Reactive version of {@link TableLock} + */ +public class ReactiveTableLock extends TableLock { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + public ReactiveTableLock( + TableDetails tableDetails, + EntityMappingType entityMappingType, + List entityKeys, + SharedSessionContractImplementor session) { + super( tableDetails, entityMappingType, entityKeys, session ); + } + + @Override + public void applyAttribute(int index, AttributeMapping attributeMapping) { + final var attributePath = rootPath.append( attributeMapping.getPartName() ); + final DomainResult domainResult; + final ResultHandler resultHandler; + if ( attributeMapping instanceof ToOneAttributeMapping toOne ) { + domainResult = + toOne.getForeignKeyDescriptor().getKeyPart() + .createDomainResult( + attributePath, + logicalTableGroup, + ForeignKeyDescriptor.PART_NAME, + creationStates + ); + resultHandler = new ReactiveToOneResultHandler( index, toOne ); + } + else { + domainResult = + attributeMapping.createDomainResult( + attributePath, + logicalTableGroup, + null, + creationStates + ); + resultHandler = new ReactiveNonToOneResultHandler( index ); + } + domainResults.add( domainResult ); + resultHandlers.add( resultHandler ); + } + + @Override + public void performActions( + Map entityDetailsMap, + QueryOptions lockingQueryOptions, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "reactivePerformActions()" ); + } + + public CompletionStage reactivePerformActions( + Map entityDetailsMap, + QueryOptions lockingQueryOptions, + ReactiveSessionImpl session) { + final var sessionFactory = session.getSessionFactory(); + final var jdbcServices = sessionFactory.getJdbcServices(); + final var selectStatement = new SelectStatement( querySpec, domainResults ); + + return StandardReactiveSelectExecutor.INSTANCE + .list( + jdbcServices.getDialect().getSqlAstTranslatorFactory() + .buildSelectTranslator( sessionFactory, selectStatement ) + .translate( jdbcParameterBindings, lockingQueryOptions ), + jdbcParameterBindings, + // IMPORTANT: we need a "clean" ExecutionContext to not further apply locking + new BaseExecutionContext( session ), + row -> row, + Object[].class, + ReactiveListResultsConsumer.UniqueSemantic.ALLOW + ).thenCompose( results -> { + if ( isEmpty( results ) ) { + throw new AssertionFailure( "Expecting results" ); + } + return CompletionStages.loop( results, row -> { + final var entityDetails = entityDetailsMap.get( row[0] ); + return CompletionStages.loop(resultHandlers.iterator(), (resultHandler, i) -> { + // offset 1 because of the id at position 0 + return ((ReactiveResulHandler)resultHandlers.get( i )).reactiveApplyResult( row[i + 1], entityDetails, session ); + }); + } ); + + } ); + } + + private interface ReactiveResulHandler extends ResultHandler { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void applyResult( + Object stateValue, + EntityDetails entityDetails, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "reactiveApplyResult()" ); + } + + CompletionStage reactiveApplyResult( + Object stateValue, + EntityDetails entityDetails, + ReactiveSessionImpl session); + } + + protected static class ReactiveNonToOneResultHandler extends NonToOneResultHandler implements ReactiveResulHandler { + + public ReactiveNonToOneResultHandler(Integer statePosition) { + super( statePosition ); + } + + @Override + public CompletionStage reactiveApplyResult( + Object stateValue, + EntityDetails entityDetails, + ReactiveSessionImpl session) { + super.applyResult( stateValue, entityDetails, session ); + return nullFuture(); + } + } + + protected static class ReactiveToOneResultHandler extends ToOneResultHandler implements ReactiveResulHandler { + + public ReactiveToOneResultHandler(Integer statePosition, ToOneAttributeMapping toOne) { + super( statePosition, toOne ); + } + + @Override + public CompletionStage reactiveApplyResult( + Object stateValue, + EntityDetails entityDetails, + ReactiveSessionImpl session) { + final Object reference; + if ( stateValue == null ) { + if ( !toOne.isNullable() ) { + throw new IllegalStateException( "Retrieved key was null, but to-one is not nullable : " + toOne.getNavigableRole() + .getFullPath() ); + } + else { + reference = null; + } + applyLoadedState( entityDetails, statePosition, reference ); + applyModelState( entityDetails, statePosition, reference ); + return nullFuture(); + } + else { + return session.reactiveInternalLoad( + toOne.getAssociatedEntityMappingType().getEntityName(), + stateValue, + false, + toOne.isNullable() + ).thenApply( ref -> { + applyLoadedState( entityDetails, statePosition, ref ); + applyModelState( entityDetails, statePosition, ref ); + return null; + } ); + } + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactiveJdbcSelect.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactiveJdbcSelect.java new file mode 100644 index 000000000..876296690 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactiveJdbcSelect.java @@ -0,0 +1,50 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.spi; + +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelect; +import org.hibernate.sql.exec.spi.StatementAccess; + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link JdbcSelect} + */ +public interface ReactiveJdbcSelect extends JdbcSelect { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void performPreActions( + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPreActions()" ); + + } + + @Override + default void performPostAction( + boolean succeeded, + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPostActions()" ); + } + + + CompletionStage reactivePerformPreActions(ReactiveConnection connection, ExecutionContext executionContext); + + CompletionStage reactivePerformPostActions( + boolean succeeded, + ReactiveConnection connection, + ExecutionContext executionContext); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePostAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePostAction.java new file mode 100644 index 000000000..7a519902a --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePostAction.java @@ -0,0 +1,36 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.spi; + +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.PostAction; +import org.hibernate.sql.exec.spi.StatementAccess; + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link PostAction} + */ +public interface ReactivePostAction extends PostAction { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void performPostAction( + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPostAction()" ); + } + + CompletionStage reactivePerformReactivePostAction( + ReactiveConnection jdbcConnection, + ExecutionContext executionContext); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePreAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePreAction.java new file mode 100644 index 000000000..3aba18ca4 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePreAction.java @@ -0,0 +1,34 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.spi; + +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.PreAction; +import org.hibernate.sql.exec.spi.StatementAccess; + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link PreAction} + */ +public interface ReactivePreAction extends PreAction { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void performPreAction( + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPreAction()" ); + } + + CompletionStage reactivePerformPreAction(ReactiveConnection connection, ExecutionContext executionContext); +} diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java index 9eb8bb8f8..17fc5cac9 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java @@ -10,13 +10,7 @@ import org.hibernate.cfg.Configuration; import org.hibernate.reactive.testing.SqlStatementTracker; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.vertx.junit5.Timeout; @@ -27,6 +21,10 @@ import jakarta.persistence.LockModeType; import jakarta.persistence.ManyToOne; import jakarta.persistence.OneToMany; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.hibernate.reactive.containers.DatabaseConfiguration.dbType; @@ -84,7 +82,6 @@ context, getMutinySessionFactory() ); } - @Disabled @Test public void testFindUpgradeNoWait(VertxTestContext context) { Child child = new Child( CHILD_ID, "And" ); @@ -97,30 +94,51 @@ context, getMutinySessionFactory() .invoke( c -> { assertThat( c ).isNotNull(); assertThat( c.getId() ).isEqualTo( CHILD_ID ); - String selectQuery = sqlTracker.getLoggedQueries().get( 0 ); - assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); - assertThat( selectQuery ) - .matches( this::noWaitLockingPredicate, "SQL query with nowait lock for " + dbType().name() ); + + assertThatExecutedQueriesContainLock(); } ) ) ) ); } - /** - * @return true if the query contains the expected nowait keyword for the selected database - */ - private boolean noWaitLockingPredicate(String selectQuery) { - return switch ( dbType() ) { - case POSTGRESQL -> selectQuery.endsWith( "for no key update of c1_0 nowait" ); - case COCKROACHDB -> selectQuery.endsWith( "for update of c1_0 nowait" ); - case SQLSERVER -> selectQuery.contains( "with (updlock,holdlock,rowlock,nowait)" ); - case ORACLE -> selectQuery.contains( "for update of c1_0.id nowait" ); + private void assertThatExecutedQueriesContainLock() { + switch ( dbType() ) { + case SQLSERVER -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + assertThat( sqlTracker.getLoggedQueries() + .get( 0 ) ).contains( "with (updlock,holdlock,rowlock,nowait)" ); + } // DB2 does not support nowait - case DB2 -> selectQuery.contains( "for read only with rs use and keep update locks" ); - case MARIA -> selectQuery.contains( "for update nowait" ); - case MYSQL -> selectQuery.contains( "for update of c1_0 nowait" ); + case DB2 -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).contains( + "for read only with rs use and keep update locks" ); + } + case ORACLE -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).contains( "for update of c1_0.id nowait" ); + } + case MARIA -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).contains( "for update nowait" ); + } + case MYSQL -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + assertThat( sqlTracker.getLoggedQueries().get( 0 ) ).contains( "for update of c1_0 nowait" ); + } + // For PostgresSql and CockroachDb LockStrategy.FOLLOW_ON is applied + // (dialects do not support outer join for update, see org.hibernate.sql.ast.spiAbstractSqlAst#determineLockingStrategy(QuerySpec,Locking.FollowOn)) + // so 2 queries are executed, the first one select the entity and contains the join the second one does not contain the join but contains the lock clause. + case POSTGRESQL -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 2 ); + assertThat( sqlTracker.getLoggedQueries().get( 1 ) ).endsWith( "for no key update of tbl nowait" ); + } + case COCKROACHDB -> { + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 2 ); + assertThat( sqlTracker.getLoggedQueries().get( 1 ) ).endsWith( "for update of tbl nowait" ); + } default -> throw new IllegalArgumentException( "Database not recognized: " + dbType().name() ); - }; + } } @Entity(name = "Parent") diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/LockTimeoutTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/LockTimeoutTest.java new file mode 100644 index 000000000..174f46ed3 --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/LockTimeoutTest.java @@ -0,0 +1,205 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive; + +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cfg.AvailableSettings; +import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.testing.SqlStatementTracker; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.junit5.VertxTestContext; +import jakarta.persistence.Entity; +import jakarta.persistence.FetchType; +import jakarta.persistence.Id; +import jakarta.persistence.LockModeType; +import jakarta.persistence.ManyToOne; +import jakarta.persistence.OneToMany; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hibernate.reactive.containers.DatabaseConfiguration.dbType; + +public class LockTimeoutTest extends BaseReactiveTest { + private static final Long CHILD_ID = 1L; + + private static SqlStatementTracker sqlTracker; + + @Override + protected Configuration constructConfiguration() { + Configuration configuration = super.constructConfiguration(); + configuration.setProperty( AvailableSettings.JAKARTA_LOCK_TIMEOUT, 1000 ); + // Construct a tracker that collects query statements via the SqlStatementLogger framework. + // Pass in configuration properties to hand off any actual logging properties + sqlTracker = new SqlStatementTracker( LockTimeoutTest::selectQueryFilter, configuration.getProperties() ); + return configuration; + } + + @BeforeEach + public void clearTracker() { + sqlTracker.clear(); + } + + @Override + protected void addServices(StandardServiceRegistryBuilder builder) { + sqlTracker.registerService( builder ); + } + + private static boolean selectQueryFilter(String s) { + return s.toLowerCase().startsWith( "select " ) + || s.toLowerCase( Locale.ROOT ).startsWith( "set" ) + || s.toLowerCase( Locale.ROOT ).startsWith( "show" ); + } + + @Override + protected Collection> annotatedEntities() { + return List.of( Parent.class, Child.class ); + } + + + @Test + public void testLockTimeOut(VertxTestContext context) { + Parent parent = new Parent( 1L, "Lio" ); + Child child = new Child( CHILD_ID, "And" ); + test( + context, getMutinySessionFactory() + .withTransaction( session -> session.persistAll( parent, child ) ) + .chain( () -> getMutinySessionFactory() + .withTransaction( + session -> + session.createQuery( "from Child c", Child.class ) + .setLockMode( LockModeType.PESSIMISTIC_WRITE ) + .getSingleResult().invoke( c -> { + assertThat( c ).isNotNull(); + assertThat( c.getId() ).isEqualTo( CHILD_ID ); + assertTimeoutApplied(); + } ) + ) + ) + ); + } + + /** + * @return true if the query contains the expected the expected timeout statements + */ + private void assertTimeoutApplied() { + List loggedQueries = sqlTracker.getLoggedQueries(); + switch ( dbType() ) { + case POSTGRESQL -> { + assertThat( loggedQueries ).hasSize( 4 ); + assertThat( loggedQueries.get( 0 ).toLowerCase( Locale.ROOT ) ).isEqualTo( + "select current_setting('lock_timeout', true)" ); + assertThat( loggedQueries.get( 1 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "set lock_timeout = 1000" ); + assertThat( loggedQueries.get( 3 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "set lock_timeout = 0" ); + } + case COCKROACHDB -> { + assertThat( loggedQueries ).hasSize( 4 ); + assertThat( loggedQueries.get( 0 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "show lock_timeout" ); + assertThat( loggedQueries.get( 1 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "set lock_timeout = 1000" ); + assertThat( loggedQueries.get( 3 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "set lock_timeout = 0" ); + } + case SQLSERVER -> { + assertThat( loggedQueries ).hasSize( 4 ); + assertThat( loggedQueries.get( 0 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "select @@lock_timeout" ); + assertThat( loggedQueries.get( 1 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "set lock_timeout 1000" ); + assertThat( loggedQueries.get( 3 ).toLowerCase( Locale.ROOT ) ).isEqualTo( "set lock_timeout -1" ); + } + // it seems ORM has not yet enabled connection lock timeout support for MariaDB/MySQL, see MySQLLockingSupport#getLockTimeout(TimeOut) + case MARIA, MYSQL -> assertThat( loggedQueries ).hasSize( 1 ); +// { +// assertThat( loggedQueries ).hasSize( 4 ); +// assertThat( loggedQueries.get( 0 ).toLowerCase( Locale.ROOT ) ).isEqualTo( +// "SELECT @@SESSION.innodb_lock_wait_timeout" ); +// assertThat( loggedQueries.get( 1 ).toLowerCase( Locale.ROOT ) ).isEqualTo( +// "SET @@SESSION.innodb_lock_wait_timeout = 1000" ); +// assertThat( loggedQueries.get( 3 ).toLowerCase( Locale.ROOT ) ).isEqualTo( +// "SET @@SESSION.innodb_lock_wait_timeout = 0" ); + // Oracle does not support connection lock timeout but only per-query timeouts +// } + case ORACLE -> { + assertThat( loggedQueries ).hasSize( 1 ); + assertThat( loggedQueries.get( 0 ).toLowerCase( Locale.ROOT ) ).contains( "for update of c1_0.id wait 1" ); + } + // DB2 does not support wait timeouts on locks + case DB2 -> assertThat( loggedQueries ).hasSize( 1 ); + default -> throw new IllegalArgumentException( "Database not recognized: " + dbType().name() ); + } + ; + } + + @Entity(name = "Parent") + public static class Parent { + + @Id + private Long id; + + private String name; + + @OneToMany(fetch = FetchType.EAGER) + public List children = new ArrayList<>(); + + public Parent() { + } + + public Parent(Long id, String name) { + this.id = id; + this.name = name; + } + + public void add(Child child) { + children.add( child ); + } + + public Long getId() { + return id; + } + + public String getName() { + return name; + } + + public List getChildren() { + return children; + } + } + + @Entity(name = "Child") + public static class Child { + + @Id + private Long id; + + public String name; + + @ManyToOne(fetch = FetchType.LAZY) + public Parent parent; + + public Child() { + } + + public Child(Long id, String name) { + this.id = id; + this.name = name; + } + + public Long getId() { + return id; + } + + public String getName() { + return name; + } + + public Parent getParent() { + return parent; + } + } +} diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/testing/TestingRegistryExtension.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/testing/TestingRegistryExtension.java index d3a0f3528..0a4e82468 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/testing/TestingRegistryExtension.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/testing/TestingRegistryExtension.java @@ -135,6 +135,11 @@ public ServiceBinding locateServiceBinding(Class servi public void destroy() { } + @Override + public boolean isActive() { + return true; + } + @Override public void registerChild(ServiceRegistryImplementor child) { }