Skip to content

Commit 39e55e2

Browse files
committed
Add lockWaitTimeout and statementTimeout options
1 parent e54b69b commit 39e55e2

File tree

6 files changed

+146
-10
lines changed

6 files changed

+146
-10
lines changed

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ public final class MySqlConnectionConfiguration {
101101

102102
private final List<String> sessionVariables;
103103

104+
@Nullable
105+
private final Duration lockWaitTimeout;
106+
107+
@Nullable
108+
private final Duration statementTimeout;
109+
104110
@Nullable
105111
private final Path loadLocalInfilePath;
106112

@@ -130,7 +136,7 @@ private MySqlConnectionConfiguration(
130136
boolean forceConnectionTimeZoneToSession,
131137
String user, @Nullable CharSequence password, @Nullable String database,
132138
boolean createDatabaseIfNotExist, @Nullable Predicate<String> preferPrepareStatement,
133-
List<String> sessionVariables,
139+
List<String> sessionVariables, @Nullable Duration lockWaitTimeout, @Nullable Duration statementTimeout,
134140
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
135141
int queryCacheSize, int prepareCacheSize,
136142
Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel,
@@ -154,6 +160,8 @@ private MySqlConnectionConfiguration(
154160
this.createDatabaseIfNotExist = createDatabaseIfNotExist;
155161
this.preferPrepareStatement = preferPrepareStatement;
156162
this.sessionVariables = sessionVariables;
163+
this.lockWaitTimeout = lockWaitTimeout;
164+
this.statementTimeout = statementTimeout;
157165
this.loadLocalInfilePath = loadLocalInfilePath;
158166
this.localInfileBufferSize = localInfileBufferSize;
159167
this.queryCacheSize = queryCacheSize;
@@ -245,6 +253,16 @@ List<String> getSessionVariables() {
245253
return sessionVariables;
246254
}
247255

256+
@Nullable
257+
Duration getLockWaitTimeout() {
258+
return lockWaitTimeout;
259+
}
260+
261+
@Nullable
262+
Duration getStatementTimeout() {
263+
return statementTimeout;
264+
}
265+
248266
@Nullable
249267
Path getLoadLocalInfilePath() {
250268
return loadLocalInfilePath;
@@ -309,6 +327,8 @@ public boolean equals(Object o) {
309327
createDatabaseIfNotExist == that.createDatabaseIfNotExist &&
310328
Objects.equals(preferPrepareStatement, that.preferPrepareStatement) &&
311329
sessionVariables.equals(that.sessionVariables) &&
330+
Objects.equals(lockWaitTimeout, that.lockWaitTimeout) &&
331+
Objects.equals(statementTimeout, that.statementTimeout) &&
312332
Objects.equals(loadLocalInfilePath, that.loadLocalInfilePath) &&
313333
localInfileBufferSize == that.localInfileBufferSize &&
314334
queryCacheSize == that.queryCacheSize &&
@@ -325,9 +345,14 @@ public int hashCode() {
325345
return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
326346
preserveInstants, connectionTimeZone, forceConnectionTimeZoneToSession,
327347
zeroDateOption, user, password, database, createDatabaseIfNotExist,
328-
preferPrepareStatement, sessionVariables, loadLocalInfilePath,
329-
localInfileBufferSize, queryCacheSize, prepareCacheSize, compressionAlgorithms,
330-
zstdCompressionLevel, loopResources, extensions, passwordPublisher);
348+
preferPrepareStatement,
349+
sessionVariables,
350+
lockWaitTimeout,
351+
statementTimeout,
352+
loadLocalInfilePath, localInfileBufferSize,
353+
queryCacheSize, prepareCacheSize,
354+
compressionAlgorithms, zstdCompressionLevel,
355+
loopResources, extensions, passwordPublisher);
331356
}
332357

333358
@Override
@@ -343,6 +368,8 @@ public String toString() {
343368
", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
344369
", preferPrepareStatement=" + preferPrepareStatement +
345370
", sessionVariables=" + sessionVariables +
371+
", lockWaitTimeout=" + lockWaitTimeout +
372+
", statementTimeout=" + statementTimeout +
346373
", loadLocalInfilePath=" + loadLocalInfilePath +
347374
", localInfileBufferSize=" + localInfileBufferSize +
348375
", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
@@ -361,6 +388,8 @@ public String toString() {
361388
", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
362389
", preferPrepareStatement=" + preferPrepareStatement +
363390
", sessionVariables=" + sessionVariables +
391+
", lockWaitTimeout=" + lockWaitTimeout +
392+
", statementTimeout=" + statementTimeout +
364393
", loadLocalInfilePath=" + loadLocalInfilePath +
365394
", localInfileBufferSize=" + localInfileBufferSize +
366395
", queryCacheSize=" + queryCacheSize +
@@ -433,6 +462,12 @@ public static final class Builder {
433462
@Nullable
434463
private Predicate<String> preferPrepareStatement;
435464

465+
@Nullable
466+
private Duration lockWaitTimeout;
467+
468+
@Nullable
469+
private Duration statementTimeout;
470+
436471
private List<String> sessionVariables = Collections.emptyList();
437472

438473
@Nullable
@@ -486,7 +521,11 @@ public MySqlConnectionConfiguration build() {
486521
connectionTimeZone,
487522
forceConnectionTimeZoneToSession,
488523
user, password, database,
489-
createDatabaseIfNotExist, preferPrepareStatement, sessionVariables, loadLocalInfilePath,
524+
createDatabaseIfNotExist, preferPrepareStatement,
525+
sessionVariables,
526+
lockWaitTimeout,
527+
statementTimeout,
528+
loadLocalInfilePath,
490529
localInfileBufferSize, queryCacheSize, prepareCacheSize,
491530
compressionAlgorithms, zstdCompressionLevel, loopResources,
492531
Extensions.from(extensions, autodetectExtensions), passwordPublisher);
@@ -911,6 +950,30 @@ public Builder sessionVariables(String... sessionVariables) {
911950
return this;
912951
}
913952

953+
/**
954+
* Configures the lock wait timeout. Default to use the server-side default value.
955+
*
956+
* @param lockWaitTimeout the lock wait timeout, or {@code null} to use the server-side default value.
957+
* @return {@link Builder this}
958+
* @since 1.1.3
959+
*/
960+
public Builder lockWaitTimeout(@Nullable Duration lockWaitTimeout) {
961+
this.lockWaitTimeout = lockWaitTimeout;
962+
return this;
963+
}
964+
965+
/**
966+
* Configures the statement timeout. Default to use the server-side default value.
967+
*
968+
* @param statementTimeout the statement timeout, or {@code null} to use the server-side default value.
969+
* @return {@link Builder this}
970+
* @since 1.1.3
971+
*/
972+
public Builder statementTimeout(@Nullable Duration statementTimeout) {
973+
this.statementTimeout = statementTimeout;
974+
return this;
975+
}
976+
914977
/**
915978
* Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or
916979
* disallow the statement. Default to {@code null} which means not allow the statement.

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,20 @@ private static Mono<MySqlConnection> getMySqlConnection(
174174
extensions.forEach(CodecRegistrar.class, registrar ->
175175
registrar.register(allocator, builder));
176176

177-
return MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(),
177+
Mono<MySqlConnection> c = MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(),
178178
prepareCache, sessionVariables, prepare);
179+
180+
if (configuration.getLockWaitTimeout() != null) {
181+
c = c.flatMap(connection -> connection.setLockWaitTimeout(configuration.getLockWaitTimeout())
182+
.thenReturn(connection));
183+
}
184+
185+
if (configuration.getStatementTimeout() != null) {
186+
c = c.flatMap(connection -> connection.setStatementTimeout(configuration.getStatementTimeout())
187+
.thenReturn(connection));
188+
}
189+
190+
return c;
179191
});
180192
}
181193

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@
4242
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
4343
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
4444
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
45+
import static io.r2dbc.spi.ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT;
4546
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
4647
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
4748
import static io.r2dbc.spi.ConnectionFactoryOptions.SSL;
49+
import static io.r2dbc.spi.ConnectionFactoryOptions.STATEMENT_TIMEOUT;
4850
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
4951

5052
/**
@@ -393,6 +395,10 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
393395
MySqlConnectionFactoryProvider::splitVariables,
394396
String[]::new
395397
).to(builder::sessionVariables);
398+
mapper.optional(LOCK_WAIT_TIMEOUT).as(Duration.class, Duration::parse)
399+
.to(builder::lockWaitTimeout);
400+
mapper.optional(STATEMENT_TIMEOUT).as(Duration.class, Duration::parse)
401+
.to(builder::statementTimeout);
396402

397403
return builder.build();
398404
}

r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfigurationTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ private static MySqlConnectionConfiguration filledUp() {
250250
.sslHostnameVerifier((host, s) -> true)
251251
.queryCacheSize(128)
252252
.prepareCacheSize(0)
253+
.sessionVariables("sql_mode=ANSI_QUOTES")
254+
.lockWaitTimeout(Duration.ofSeconds(5))
255+
.statementTimeout(Duration.ofSeconds(10))
253256
.autodetectExtensions(false)
254257
.build();
255258
}

r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProviderTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,17 +455,15 @@ void validPasswordSupplier() {
455455

456456
@Test
457457
void allConfigurationOptions() {
458-
List<String> exceptConfigs = Arrays.asList(
458+
List<String> exceptConfigs = Arrays.asList(
459459
"extendWith",
460460
"username",
461461
"zeroDateOption");
462462
List<String> exceptOptions = Arrays.asList(
463463
"driver",
464464
"ssl",
465465
"protocol",
466-
"zeroDate",
467-
"lockWaitTimeout",
468-
"statementTimeout");
466+
"zeroDate");
469467
Set<String> allOptions = Stream.concat(
470468
Arrays.stream(ConnectionFactoryOptions.class.getFields()),
471469
Arrays.stream(MySqlConnectionFactoryProvider.class.getFields())

r2dbc-mysql/src/test/java/io/asyncer/r2dbc/mysql/SessionStateIntegrationTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@
1717
package io.asyncer.r2dbc.mysql;
1818

1919
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
20+
import io.r2dbc.spi.R2dbcTimeoutException;
2021
import org.junit.jupiter.api.Test;
22+
import org.junit.jupiter.api.condition.EnabledIf;
2123
import org.junit.jupiter.params.ParameterizedTest;
2224
import org.junit.jupiter.params.provider.Arguments;
2325
import org.junit.jupiter.params.provider.MethodSource;
2426
import org.junit.jupiter.params.provider.ValueSource;
2527
import reactor.core.publisher.Mono;
2628
import reactor.test.StepVerifier;
2729

30+
import java.time.Duration;
2831
import java.time.ZoneId;
2932
import java.util.LinkedHashMap;
3033
import java.util.Map;
@@ -122,6 +125,57 @@ void sessionVariables(Map<String, String> variables) {
122125
.verifyComplete();
123126
}
124127

128+
@ParameterizedTest
129+
@ValueSource(strings = { "PT1S", "PT10S", "PT1M" })
130+
void initLockWaitTimeout(String timeout) {
131+
Duration lockWaitTimeout = Duration.parse(timeout);
132+
133+
connectionFactory(builder -> builder.lockWaitTimeout(lockWaitTimeout))
134+
.create()
135+
.flatMapMany(connection -> connection.createStatement("SELECT @@innodb_lock_wait_timeout").execute()
136+
.flatMap(result -> result.map(r -> r.get(0, Long.class)))
137+
.onErrorResume(e -> connection.close().then(Mono.error(e)))
138+
.concatWith(connection.close().then(Mono.empty()))
139+
)
140+
.as(StepVerifier::create)
141+
.expectNext(lockWaitTimeout.getSeconds())
142+
.verifyComplete();
143+
}
144+
145+
@EnabledIf("isGreaterThanOrEqualToMariaDB10_1_1MySql5_7_4")
146+
@ParameterizedTest
147+
@ValueSource(strings = { "PT0.1S", "PT0.5S" })
148+
void initStatementTimeout(String timeout) {
149+
Duration statementTimeout = Duration.parse(timeout);
150+
151+
connectionFactory(builder -> builder.statementTimeout(statementTimeout))
152+
.create()
153+
.flatMapMany(connection -> connection.createStatement("SELECT 1 WHERE SLEEP(1) > 1").execute()
154+
.flatMap(result -> result.map(r -> r.get(0)))
155+
.onErrorResume(e -> connection.close().then(Mono.error(e)))
156+
.concatWith(connection.close().then(Mono.empty()))
157+
)
158+
.as(StepVerifier::create)
159+
.verifyError(R2dbcTimeoutException.class);
160+
}
161+
162+
static boolean isGreaterThanOrEqualToMariaDB10_1_1MySql5_7_4() {
163+
String version = System.getProperty("test.mysql.version");
164+
165+
if (version == null || version.isEmpty()) {
166+
return false;
167+
}
168+
169+
ServerVersion ver = ServerVersion.parse(version);
170+
String type = System.getProperty("test.db.type");
171+
172+
if ("mariadb".equalsIgnoreCase(type)) {
173+
return ver.isGreaterThanOrEqualTo(ServerVersion.create(10, 1, 1));
174+
}
175+
176+
return ver.isGreaterThanOrEqualTo(ServerVersion.create(5, 7, 4));
177+
}
178+
125179
static Stream<Arguments> sessionVariables() {
126180
return Stream.of(
127181
Arguments.of(mapOf("sql_mode", "ANSI_QUOTES")),

0 commit comments

Comments
 (0)