Skip to content

Commit 2f3cc22

Browse files
committed
Add InitFlow and move session states to context
1 parent aa90297 commit 2f3cc22

15 files changed

+1378
-1092
lines changed

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java

Lines changed: 143 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package io.asyncer.r2dbc.mysql;
1818

19+
import io.asyncer.r2dbc.mysql.cache.PrepareCache;
1920
import io.asyncer.r2dbc.mysql.codec.CodecContext;
2021
import io.asyncer.r2dbc.mysql.collation.CharCollation;
2122
import io.asyncer.r2dbc.mysql.constant.ServerStatuses;
2223
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
24+
import io.r2dbc.spi.IsolationLevel;
2325
import org.jetbrains.annotations.Nullable;
2426

2527
import java.nio.file.Path;
28+
import java.time.Duration;
2629
import java.time.ZoneId;
2730

2831
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
@@ -37,6 +40,10 @@ public final class ConnectionContext implements CodecContext {
3740

3841
private static final ServerVersion NONE_VERSION = ServerVersion.create(0, 0, 0);
3942

43+
private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4);
44+
45+
private static final ServerVersion MARIA_10_1_1 = ServerVersion.create(10, 1, 1, true);
46+
4047
private final ZeroDateOption zeroDateOption;
4148

4249
@Nullable
@@ -52,16 +59,47 @@ public final class ConnectionContext implements CodecContext {
5259

5360
private Capability capability = Capability.DEFAULT;
5461

62+
private PrepareCache prepareCache;
63+
5564
@Nullable
5665
private ZoneId timeZone;
5766

67+
private String product = "Unknown";
68+
69+
/**
70+
* Current isolation level inferred by past statements.
71+
* <p>
72+
* Inference rules:
73+
* <ol><li>In the beginning, it is also {@link #sessionIsolationLevel}.</li>
74+
* <li>A transaction has began with a {@link IsolationLevel}, it will be changed to the value</li>
75+
* <li>The transaction end (commit or rollback), it will recover to {@link #sessionIsolationLevel}.</li></ol>
76+
*/
77+
private volatile IsolationLevel currentIsolationLevel;
78+
79+
/**
80+
* Session isolation level.
81+
*
82+
* <ol><li>It is applied to all subsequent transactions performed within the current session.</li>
83+
* <li>Calls {@link io.r2dbc.spi.Connection#setTransactionIsolationLevel}, it will change to the value.</li>
84+
* <li>It can be changed within transactions, but does not affect the current ongoing transaction.</li></ol>
85+
*/
86+
private volatile IsolationLevel sessionIsolationLevel;
87+
5888
private boolean lockWaitTimeoutSupported = false;
5989

90+
/**
91+
* Current lock wait timeout in seconds.
92+
*/
93+
private volatile Duration currentLockWaitTimeout;
94+
95+
/**
96+
* Session lock wait timeout in seconds.
97+
*/
98+
private volatile Duration sessionLockWaitTimeout;
99+
60100
/**
61101
* Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or OK
62102
* message which means handshake V9 completed.
63-
* <p>
64-
* It would be updated multiple times, so {@code volatile} is required.
65103
*/
66104
private volatile short serverStatuses = ServerStatuses.AUTO_COMMIT;
67105

@@ -80,18 +118,50 @@ public final class ConnectionContext implements CodecContext {
80118
}
81119

82120
/**
83-
* Initializes this context.
121+
* Initializes handshake information after connection is established.
84122
*
85123
* @param connectionId the connection identifier that is specified by server.
86124
* @param version the server version.
87125
* @param capability the connection capabilities.
88126
*/
89-
void init(int connectionId, ServerVersion version, Capability capability) {
127+
void initHandshake(int connectionId, ServerVersion version, Capability capability) {
90128
this.connectionId = connectionId;
91129
this.serverVersion = version;
92130
this.capability = capability;
93131
}
94132

133+
/**
134+
* Initializes session information after logged-in.
135+
*
136+
* @param prepareCache the prepare cache.
137+
* @param isolationLevel the session isolation level.
138+
* @param lockWaitTimeoutSupported if the server supports lock wait timeout.
139+
* @param lockWaitTimeout the lock wait timeout.
140+
* @param product the server product name.
141+
* @param timeZone the server timezone.
142+
*/
143+
void initSession(
144+
PrepareCache prepareCache,
145+
IsolationLevel isolationLevel,
146+
boolean lockWaitTimeoutSupported,
147+
Duration lockWaitTimeout,
148+
@Nullable String product,
149+
@Nullable ZoneId timeZone
150+
) {
151+
this.prepareCache = prepareCache;
152+
this.currentIsolationLevel = this.sessionIsolationLevel = isolationLevel;
153+
this.lockWaitTimeoutSupported = lockWaitTimeoutSupported;
154+
this.currentLockWaitTimeout = this.sessionLockWaitTimeout = lockWaitTimeout;
155+
this.product = product == null ? "Unknown" : product;
156+
157+
if (timeZone != null) {
158+
if (isTimeZoneInitialized()) {
159+
throw new IllegalStateException("Connection timezone have been initialized");
160+
}
161+
this.timeZone = timeZone;
162+
}
163+
}
164+
95165
/**
96166
* Get the connection identifier that is specified by server.
97167
*
@@ -128,6 +198,14 @@ public ZoneId getTimeZone() {
128198
return timeZone;
129199
}
130200

201+
String getProduct() {
202+
return product;
203+
}
204+
205+
PrepareCache getPrepareCache() {
206+
return prepareCache;
207+
}
208+
131209
boolean isTimeZoneInitialized() {
132210
return timeZone != null;
133211
}
@@ -138,13 +216,6 @@ public boolean isMariaDb() {
138216
return (capability != null && capability.isMariaDb()) || serverVersion.isMariaDb();
139217
}
140218

141-
void initTimeZone(ZoneId timeZone) {
142-
if (isTimeZoneInitialized()) {
143-
throw new IllegalStateException("Connection timezone have been initialized");
144-
}
145-
this.timeZone = timeZone;
146-
}
147-
148219
@Override
149220
public ZeroDateOption getZeroDateOption() {
150221
return zeroDateOption;
@@ -170,19 +241,23 @@ public int getLocalInfileBufferSize() {
170241
}
171242

172243
/**
173-
* Checks if the server supports lock wait timeout.
244+
* Checks if the server supports InnoDB lock wait timeout.
174245
*
175-
* @return if the server supports lock wait timeout.
246+
* @return if the server supports InnoDB lock wait timeout.
176247
*/
177248
public boolean isLockWaitTimeoutSupported() {
178249
return lockWaitTimeoutSupported;
179250
}
180251

181252
/**
182-
* Enables lock wait timeout supported when loading session variables.
253+
* Checks if the server supports statement timeout.
254+
*
255+
* @return if the server supports statement timeout.
183256
*/
184-
void enableLockWaitTimeoutSupported() {
185-
this.lockWaitTimeoutSupported = true;
257+
public boolean isStatementTimeoutSupported() {
258+
boolean isMariaDb = isMariaDb();
259+
return (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_1_1)) ||
260+
(!isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4));
186261
}
187262

188263
/**
@@ -202,4 +277,56 @@ public short getServerStatuses() {
202277
public void setServerStatuses(short serverStatuses) {
203278
this.serverStatuses = serverStatuses;
204279
}
280+
281+
IsolationLevel getCurrentIsolationLevel() {
282+
return currentIsolationLevel;
283+
}
284+
285+
void setCurrentIsolationLevel(IsolationLevel isolationLevel) {
286+
this.currentIsolationLevel = isolationLevel;
287+
}
288+
289+
void resetCurrentIsolationLevel() {
290+
this.currentIsolationLevel = this.sessionIsolationLevel;
291+
}
292+
293+
IsolationLevel getSessionIsolationLevel() {
294+
return sessionIsolationLevel;
295+
}
296+
297+
void setSessionIsolationLevel(IsolationLevel isolationLevel) {
298+
this.sessionIsolationLevel = isolationLevel;
299+
}
300+
301+
void setCurrentLockWaitTimeout(Duration timeoutSeconds) {
302+
this.currentLockWaitTimeout = timeoutSeconds;
303+
}
304+
305+
void resetCurrentLockWaitTimeout() {
306+
this.currentLockWaitTimeout = this.sessionLockWaitTimeout;
307+
}
308+
309+
boolean isLockWaitTimeoutChanged() {
310+
return currentLockWaitTimeout != sessionLockWaitTimeout;
311+
}
312+
313+
Duration getSessionLockWaitTimeout() {
314+
return sessionLockWaitTimeout;
315+
}
316+
317+
void setAllLockWaitTimeout(Duration timeoutSeconds) {
318+
this.currentLockWaitTimeout = this.sessionLockWaitTimeout = timeoutSeconds;
319+
}
320+
321+
boolean isInTransaction() {
322+
return (serverStatuses & ServerStatuses.IN_TRANSACTION) != 0;
323+
}
324+
325+
boolean isAutoCommit() {
326+
// Within transaction, autocommit remains disabled until end the transaction with COMMIT or ROLLBACK.
327+
// The autocommit mode then reverts to its previous state.
328+
short serverStatuses = this.serverStatuses;
329+
return (serverStatuses & ServerStatuses.IN_TRANSACTION) == 0 &&
330+
(serverStatuses & ServerStatuses.AUTO_COMMIT) != 0;
331+
}
205332
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionState.java

Lines changed: 0 additions & 70 deletions
This file was deleted.

0 commit comments

Comments
 (0)