Skip to content

Commit e54b69b

Browse files
committed
Reduce retained references to ConnectionContext
- Reduce publicity - Prepare to support redirect and reconnect
1 parent d801e8f commit e54b69b

29 files changed

+226
-209
lines changed

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/Capability.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ public final class Capability {
177177
TRANSACTIONS | SECURE_SALT | MULTI_STATEMENTS | MULTI_RESULTS | PS_MULTI_RESULTS |
178178
PLUGIN_AUTH | CONNECT_ATTRS | VAR_INT_SIZED_AUTH | SESSION_TRACK | DEPRECATE_EOF | ZSTD_COMPRESS;
179179

180+
/**
181+
* The default capabilities for a MySQL connection. It contains all client supported capabilities.
182+
*/
183+
public static final Capability DEFAULT = new Capability(ALL_SUPPORTED);
184+
180185
private final long bitmap;
181186

182187
/**
@@ -373,7 +378,8 @@ private Capability(long bitmap) {
373378
* @return the {@link Capability} without unknown flags.
374379
*/
375380
public static Capability of(long capabilities) {
376-
return new Capability(capabilities & ALL_SUPPORTED);
381+
long c = capabilities & ALL_SUPPORTED;
382+
return c == ALL_SUPPORTED ? DEFAULT : new Capability(c);
377383
}
378384

379385
static final class Builder {

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,13 @@
3030
/**
3131
* The MySQL connection context considers the behavior of server or client.
3232
* <p>
33-
* WARNING: Do NOT change any data outside of this project, try to configure {@code ConnectionFactoryOptions}
34-
* or {@code MySqlConnectionConfiguration} to control connection context and client behavior.
33+
* WARNING: Do NOT change any data outside of this project, try to configure {@code ConnectionFactoryOptions} or
34+
* {@code MySqlConnectionConfiguration} to control connection context and client behavior.
3535
*/
3636
public final class ConnectionContext implements CodecContext {
3737

3838
private static final ServerVersion NONE_VERSION = ServerVersion.create(0, 0, 0);
3939

40-
private volatile int connectionId = -1;
41-
42-
private volatile ServerVersion serverVersion = NONE_VERSION;
43-
4440
private final ZeroDateOption zeroDateOption;
4541

4642
@Nullable
@@ -50,20 +46,25 @@ public final class ConnectionContext implements CodecContext {
5046

5147
private final boolean preserveInstants;
5248

49+
private int connectionId = -1;
50+
51+
private ServerVersion serverVersion = NONE_VERSION;
52+
53+
private Capability capability = Capability.DEFAULT;
54+
5355
@Nullable
5456
private ZoneId timeZone;
5557

5658
private boolean lockWaitTimeoutSupported = false;
5759

5860
/**
59-
* Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or
60-
* OK message which means handshake V9 completed.
61+
* Assume that the auto commit is always turned on, it will be set after handshake V10 request message, or OK
62+
* message which means handshake V9 completed.
63+
* <p>
64+
* It would be updated multiple times, so {@code volatile} is required.
6165
*/
6266
private volatile short serverStatuses = ServerStatuses.AUTO_COMMIT;
6367

64-
@Nullable
65-
private volatile Capability capability = null;
66-
6768
ConnectionContext(
6869
ZeroDateOption zeroDateOption,
6970
@Nullable Path localInfilePath,
@@ -78,33 +79,37 @@ public final class ConnectionContext implements CodecContext {
7879
this.timeZone = timeZone;
7980
}
8081

81-
/**
82-
* Get the connection identifier that is specified by server.
83-
*
84-
* @return the connection identifier.
85-
*/
86-
public int getConnectionId() {
87-
return connectionId;
88-
}
89-
9082
/**
9183
* Initializes this context.
9284
*
9385
* @param connectionId the connection identifier that is specified by server.
9486
* @param version the server version.
9587
* @param capability the connection capabilities.
9688
*/
97-
public void init(int connectionId, ServerVersion version, Capability capability) {
89+
void init(int connectionId, ServerVersion version, Capability capability) {
9890
this.connectionId = connectionId;
9991
this.serverVersion = version;
10092
this.capability = capability;
10193
}
10294

95+
/**
96+
* Get the connection identifier that is specified by server.
97+
*
98+
* @return the connection identifier.
99+
*/
100+
public int getConnectionId() {
101+
return connectionId;
102+
}
103+
103104
@Override
104105
public ServerVersion getServerVersion() {
105106
return serverVersion;
106107
}
107108

109+
public Capability getCapability() {
110+
return capability;
111+
}
112+
108113
@Override
109114
public CharCollation getClientCollation() {
110115
return CharCollation.clientCharCollation();
@@ -123,7 +128,7 @@ public ZoneId getTimeZone() {
123128
return timeZone;
124129
}
125130

126-
public boolean isTimeZoneInitialized() {
131+
boolean isTimeZoneInitialized() {
127132
return timeZone != null;
128133
}
129134

@@ -133,9 +138,9 @@ public boolean isMariaDb() {
133138
return (capability != null && capability.isMariaDb()) || serverVersion.isMariaDb();
134139
}
135140

136-
void setTimeZone(ZoneId timeZone) {
141+
void initTimeZone(ZoneId timeZone) {
137142
if (isTimeZoneInitialized()) {
138-
throw new IllegalStateException("Server timezone have been initialized");
143+
throw new IllegalStateException("Connection timezone have been initialized");
139144
}
140145
this.timeZone = timeZone;
141146
}
@@ -176,7 +181,7 @@ public boolean isLockWaitTimeoutSupported() {
176181
/**
177182
* Enables lock wait timeout supported when loading session variables.
178183
*/
179-
public void enableLockWaitTimeoutSupported() {
184+
void enableLockWaitTimeoutSupported() {
180185
this.lockWaitTimeoutSupported = true;
181186
}
182187

@@ -197,13 +202,4 @@ public short getServerStatuses() {
197202
public void setServerStatuses(short serverStatuses) {
198203
this.serverStatuses = serverStatuses;
199204
}
200-
201-
/**
202-
* Get the connection capability. Should use it after this context initialized.
203-
*
204-
* @return the connection capability.
205-
*/
206-
public Capability getCapability() {
207-
return capability;
208-
}
209205
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlBatchingBatch.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,11 @@ final class MySqlBatchingBatch implements MySqlBatch {
3636

3737
private final Codecs codecs;
3838

39-
private final ConnectionContext context;
40-
4139
private final StringJoiner queries = new StringJoiner(";");
4240

43-
MySqlBatchingBatch(Client client, Codecs codecs, ConnectionContext context) {
41+
MySqlBatchingBatch(Client client, Codecs codecs) {
4442
this.client = requireNonNull(client, "client must not be null");
4543
this.codecs = requireNonNull(codecs, "codecs must not be null");
46-
this.context = requireNonNull(context, "context must not be null");
4744
}
4845

4946
@Override
@@ -65,7 +62,7 @@ public MySqlBatch add(String sql) {
6562
@Override
6663
public Flux<MySqlResult> execute() {
6764
return QueryFlow.execute(client, getSql())
68-
.map(messages -> MySqlSegmentResult.toResult(false, codecs, context, null, messages));
65+
.map(messages -> MySqlSegmentResult.toResult(false, client, codecs, null, messages));
6966
}
7067

7168
@Override

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private static Mono<MySqlConnection> getMySqlConnection(
151151
final String user,
152152
final SslMode sslMode,
153153
final Set<CompressionAlgorithm> compressionAlgorithms,
154-
final int zstdCompressionLevel,
154+
final int zstdLevel,
155155
final ConnectionContext context,
156156
final Extensions extensions,
157157
final List<String> sessionVariables,
@@ -163,8 +163,7 @@ private static Mono<MySqlConnection> getMySqlConnection(
163163
.flatMap(client -> {
164164
// Lazy init database after handshake/login
165165
String db = createDbIfNotExist ? "" : database;
166-
return QueryFlow.login(client, sslMode, db, user, password, compressionAlgorithms,
167-
zstdCompressionLevel, context);
166+
return QueryFlow.login(client, sslMode, db, user, password, compressionAlgorithms, zstdLevel);
168167
})
169168
.flatMap(client -> {
170169
ByteBufAllocator allocator = client.getByteBufAllocator();
@@ -175,7 +174,7 @@ private static Mono<MySqlConnection> getMySqlConnection(
175174
extensions.forEach(CodecRegistrar.class, registrar ->
176175
registrar.register(allocator, builder));
177176

178-
return MySqlSimpleConnection.init(client, builder.build(), context, db, queryCache.get(),
177+
return MySqlSimpleConnection.init(client, builder.build(), db, queryCache.get(),
179178
prepareCache, sessionVariables, prepare);
180179
});
181180
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlDataRow.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.asyncer.r2dbc.mysql.api.MySqlRow;
2020
import io.asyncer.r2dbc.mysql.api.MySqlRowMetadata;
21+
import io.asyncer.r2dbc.mysql.codec.CodecContext;
2122
import io.asyncer.r2dbc.mysql.codec.Codecs;
2223
import io.asyncer.r2dbc.mysql.message.FieldValue;
2324
import io.r2dbc.spi.Row;
@@ -42,10 +43,13 @@ final class MySqlDataRow implements MySqlRow {
4243
*/
4344
private final boolean binary;
4445

45-
private final ConnectionContext context;
46+
/**
47+
* It can be retained because it is provided by the executed connection instead of the current connection.
48+
*/
49+
private final CodecContext context;
4650

4751
MySqlDataRow(FieldValue[] fields, MySqlRowDescriptor rowMetadata, Codecs codecs, boolean binary,
48-
ConnectionContext context) {
52+
CodecContext context) {
4953
this.fields = requireNonNull(fields, "fields must not be null");
5054
this.rowMetadata = requireNonNull(rowMetadata, "rowMetadata must not be null");
5155
this.codecs = requireNonNull(codecs, "codecs must not be null");

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSegmentResult.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.asyncer.r2dbc.mysql.api.MySqlResult;
2020
import io.asyncer.r2dbc.mysql.api.MySqlRow;
21+
import io.asyncer.r2dbc.mysql.client.Client;
2122
import io.asyncer.r2dbc.mysql.codec.Codecs;
2223
import io.asyncer.r2dbc.mysql.internal.util.NettyBufferUtils;
2324
import io.asyncer.r2dbc.mysql.internal.util.OperatorUtils;
@@ -53,8 +54,8 @@
5354
/**
5455
* An implementation of {@link MySqlResult} representing the results of a query against the MySQL database.
5556
* <p>
56-
* A {@link Segment} provided by this implementation may be both {@link UpdateCount} and {@link RowSegment},
57-
* see also {@link MySqlOkSegment}.
57+
* A {@link Segment} provided by this implementation may be both {@link UpdateCount} and {@link RowSegment}, see also
58+
* {@link MySqlOkSegment}.
5859
*/
5960
final class MySqlSegmentResult implements MySqlResult {
6061

@@ -156,15 +157,15 @@ public <T> Flux<T> flatMap(Function<Result.Segment, ? extends Publisher<? extend
156157
});
157158
}
158159

159-
static MySqlResult toResult(boolean binary, Codecs codecs, ConnectionContext context,
160-
@Nullable String syntheticKeyName, Flux<ServerMessage> messages) {
160+
static MySqlResult toResult(boolean binary, Client client, Codecs codecs,
161+
@Nullable String syntheticKeyName, Flux<ServerMessage> messages) {
162+
requireNonNull(client, "client must not be null");
161163
requireNonNull(codecs, "codecs must not be null");
162-
requireNonNull(context, "context must not be null");
163164
requireNonNull(messages, "messages must not be null");
164165

165166
return new MySqlSegmentResult(OperatorUtils.discardOnCancel(messages)
166167
.doOnDiscard(ReferenceCounted.class, ReferenceCounted::release)
167-
.handle(new MySqlSegments(binary, codecs, context, syntheticKeyName)));
168+
.handle(new MySqlSegments(binary, client, codecs, syntheticKeyName)));
168169
}
169170

170171
private static final class MySqlMessage implements Message {
@@ -269,9 +270,9 @@ private static final class MySqlSegments implements BiConsumer<ServerMessage, Sy
269270

270271
private final boolean binary;
271272

272-
private final Codecs codecs;
273+
private final Client client;
273274

274-
private final ConnectionContext context;
275+
private final Codecs codecs;
275276

276277
@Nullable
277278
private final String syntheticKeyName;
@@ -280,11 +281,10 @@ private static final class MySqlSegments implements BiConsumer<ServerMessage, Sy
280281

281282
private MySqlRowDescriptor rowMetadata;
282283

283-
private MySqlSegments(boolean binary, Codecs codecs, ConnectionContext context,
284-
@Nullable String syntheticKeyName) {
284+
private MySqlSegments(boolean binary, Client client, Codecs codecs, @Nullable String syntheticKeyName) {
285285
this.binary = binary;
286+
this.client = client;
286287
this.codecs = codecs;
287-
this.context = context;
288288
this.syntheticKeyName = syntheticKeyName;
289289
}
290290

@@ -310,7 +310,7 @@ public void accept(ServerMessage message, SynchronousSink<Segment> sink) {
310310
ReferenceCountUtil.safeRelease(message);
311311
}
312312

313-
sink.next(new MySqlRowSegment(fields, metadata, codecs, binary, context));
313+
sink.next(new MySqlRowSegment(fields, metadata, codecs, binary, client.getContext()));
314314
} else if (message instanceof SyntheticMetadataMessage) {
315315
DefinitionMetadataMessage[] metadataMessages = ((SyntheticMetadataMessage) message).unwrap();
316316

@@ -322,7 +322,7 @@ public void accept(ServerMessage message, SynchronousSink<Segment> sink) {
322322
} else if (message instanceof OkMessage) {
323323
OkMessage msg = (OkMessage) message;
324324

325-
if (MySqlStatementSupport.supportReturning(context) && msg.isEndOfRows()) {
325+
if (MySqlStatementSupport.supportReturning(client.getContext()) && msg.isEndOfRows()) {
326326
sink.next(new MySqlUpdateCount(rowCount.getAndSet(0)));
327327
} else {
328328
long rows = msg.getAffectedRows();

0 commit comments

Comments
 (0)