Skip to content

Commit aff4cc5

Browse files
authored
Add LoopResources Configuration Support (#232)
Motivation: Enhances flexibility by allowing custom `LoopResources` configuration for optimized event loop management. Modifications: Introduced `LoopResources` configuration option in `MysqlConnectionConfiguration` and updated relevant documentation. Result: Enables performance tuning and flexibility, improving adaptability for various deployment scenarios. Resolves #229
1 parent fcf1f18 commit aff4cc5

File tree

5 files changed

+54
-5
lines changed

5 files changed

+54
-5
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
145145
.option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
146146
.option(Option.valueOf("tcpNoDelay"), true) // optional, default false
147147
.option(Option.valueOf("compressionAlgorithms"), "zstd") // optional, default UNCOMPRESSED
148+
.option(Option.valueOf("loopResources"), LoopResources.create("r2dbc")) // optional, default null, null means uses global tcp resources as loopResources (since 1.1.2)
148149
.option(Option.valueOf("autodetectExtensions"), false) // optional, default false
149150
.option(Option.valueOf("passwordPublisher"), Mono.just("password")) // optional, default null, null means has no passwordPublisher (since 1.0.5 / 0.9.6)
150151
.build();
@@ -194,6 +195,7 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde
194195
.tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
195196
.tcpNoDelay(true) // optional, controls TCP No Delay, default is false
196197
.compressionAlgorithms(CompressionAlgorithm.ZSTD, CompressionAlgotihm.ZLIB) // optional, default is UNCOMPRESSED
198+
.loopResources(LoopResources.create("r2dbc")) // optional, default null, null means uses global tcp resources as loopResources (since 1.1.2)
197199
.autodetectExtensions(false) // optional, controls extension auto-detect, default is true
198200
.extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
199201
.passwordPublisher(Mono.just("password")) // optional, default null, null means has no password publisher (since 1.0.5 / 0.9.6)
@@ -246,6 +248,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
246248
| useServerPrepareStatement | `true`, `false` or `Predicate<String>` | Optional, default is `false` | See following notice |
247249
| allowLoadLocalInfileInPath | A path | Optional, default is `null` | The path that allows `LOAD DATA LOCAL INFILE` to load file data |
248250
| compressionAlgorithms | A list of `CompressionAlgorithm` | Optional, default is `UNCOMPRESSED` | The compression algorithms for MySQL connection |
251+
| loopResources | A `LoopResources` | Optional, default is `null` | The loop resources for MySQL connection |
249252
| passwordPublisher | A `Publisher<String>` | Optional, default is `null` | The password publisher, see following notice |
250253

251254
- `SslMode` Considers security level and verification for SSL, make sure the database server supports SSL before you want change SSL mode to `REQUIRED` or higher. **The Unix Domain Socket only offers "DISABLED" available**
@@ -278,6 +281,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
278281
- `ZLIB` Use Zlib compression protocol, it is available on almost all MySQL versions (`5.x` and above)
279282
- `ZSTD` Use Z-standard compression protocol, it is available since MySQL `8.0.18` or above, requires an extern dependency `com.github.luben:zstd-jni`
280283
- For scenarios where the network environment is poor or the amount of data is always large, using a compression protocol may be useful
284+
- `loopResources` Considers loop resources for MySQL connection.
281285

282286
Should use `enum` in [Programmatic](#programmatic-configuration) configuration that not like discovery configurations, except `TlsVersions` (All elements of `TlsVersions` will be always `String` which is case-sensitive).
283287

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.netty.handler.ssl.SslContextBuilder;
2424
import org.jetbrains.annotations.Nullable;
2525
import org.reactivestreams.Publisher;
26+
import reactor.netty.resources.LoopResources;
27+
import reactor.netty.tcp.TcpResources;
2628

2729
import javax.net.ssl.HostnameVerifier;
2830
import java.net.Socket;
@@ -105,6 +107,8 @@ public final class MySqlConnectionConfiguration {
105107

106108
private final int zstdCompressionLevel;
107109

110+
private final LoopResources loopResources;
111+
108112
private final Extensions extensions;
109113

110114
@Nullable
@@ -119,6 +123,7 @@ private MySqlConnectionConfiguration(
119123
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
120124
int queryCacheSize, int prepareCacheSize,
121125
Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel,
126+
@Nullable LoopResources loopResources,
122127
Extensions extensions, @Nullable Publisher<String> passwordPublisher
123128
) {
124129
this.isHost = isHost;
@@ -141,6 +146,7 @@ private MySqlConnectionConfiguration(
141146
this.prepareCacheSize = prepareCacheSize;
142147
this.compressionAlgorithms = compressionAlgorithms;
143148
this.zstdCompressionLevel = zstdCompressionLevel;
149+
this.loopResources = loopResources == null? TcpResources.get() : loopResources;
144150
this.extensions = extensions;
145151
this.passwordPublisher = passwordPublisher;
146152
}
@@ -239,6 +245,10 @@ int getZstdCompressionLevel() {
239245
return zstdCompressionLevel;
240246
}
241247

248+
LoopResources getLoopResources() {
249+
return loopResources;
250+
}
251+
242252
Extensions getExtensions() {
243253
return extensions;
244254
}
@@ -277,6 +287,7 @@ public boolean equals(Object o) {
277287
prepareCacheSize == that.prepareCacheSize &&
278288
compressionAlgorithms.equals(that.compressionAlgorithms) &&
279289
zstdCompressionLevel == that.zstdCompressionLevel &&
290+
Objects.equals(loopResources, that.loopResources) &&
280291
extensions.equals(that.extensions) &&
281292
Objects.equals(passwordPublisher, that.passwordPublisher);
282293
}
@@ -286,7 +297,8 @@ public int hashCode() {
286297
return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
287298
serverZoneId, zeroDateOption, user, password, database, createDatabaseIfNotExist,
288299
preferPrepareStatement, loadLocalInfilePath, localInfileBufferSize, queryCacheSize,
289-
prepareCacheSize, compressionAlgorithms, zstdCompressionLevel, extensions, passwordPublisher);
300+
prepareCacheSize, compressionAlgorithms, zstdCompressionLevel, loopResources,
301+
extensions, passwordPublisher);
290302
}
291303

292304
@Override
@@ -303,6 +315,7 @@ public String toString() {
303315
", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
304316
", compressionAlgorithms=" + compressionAlgorithms +
305317
", zstdCompressionLevel=" + zstdCompressionLevel +
318+
", loopResources=" + loopResources +
306319
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
307320
}
308321

@@ -317,6 +330,7 @@ public String toString() {
317330
", prepareCacheSize=" + prepareCacheSize +
318331
", compressionAlgorithms=" + compressionAlgorithms +
319332
", zstdCompressionLevel=" + zstdCompressionLevel +
333+
", loopResources=" + loopResources +
320334
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
321335
}
322336

@@ -393,6 +407,9 @@ public static final class Builder {
393407

394408
private int zstdCompressionLevel = 3;
395409

410+
@Nullable
411+
private LoopResources loopResources;
412+
396413
private boolean autodetectExtensions = true;
397414

398415
private final List<Extension> extensions = new ArrayList<>();
@@ -425,7 +442,7 @@ public MySqlConnectionConfiguration build() {
425442
connectTimeout, zeroDateOption, serverZoneId, user, password, database,
426443
createDatabaseIfNotExist, preferPrepareStatement, loadLocalInfilePath,
427444
localInfileBufferSize, queryCacheSize, prepareCacheSize,
428-
compressionAlgorithms, zstdCompressionLevel,
445+
compressionAlgorithms, zstdCompressionLevel, loopResources,
429446
Extensions.from(extensions, autodetectExtensions), passwordPublisher);
430447
}
431448

@@ -911,6 +928,19 @@ public Builder zstdCompressionLevel(int level) {
911928
return this;
912929
}
913930

931+
/**
932+
* Configures the {@link LoopResources} for the driver.
933+
* Default to {@link TcpResources#get() global tcp resources}.
934+
* @param loopResources the {@link LoopResources}.
935+
* @return this {@link Builder}.
936+
* @throws IllegalArgumentException if {@code loopResources} is {@code null}.
937+
* @since 1.1.2
938+
*/
939+
public Builder loopResources(LoopResources loopResources) {
940+
this.loopResources = requireNonNull(loopResources, "loopResources must not be null");
941+
return this;
942+
}
943+
914944
/**
915945
* Configures whether to use {@link ServiceLoader} to discover and register extensions. Defaults to
916946
* {@code true}.

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private static Mono<MySqlConnection> getMySqlConnection(
146146
final int prepareCacheSize,
147147
@Nullable final CharSequence password) {
148148
return Client.connect(ssl, address, configuration.isTcpKeepAlive(), configuration.isTcpNoDelay(),
149-
context, configuration.getConnectTimeout())
149+
context, configuration.getConnectTimeout(), configuration.getLoopResources())
150150
.flatMap(client -> {
151151
// Lazy init database after handshake/login
152152
String db = createDbIfNotExist ? "" : database;

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.r2dbc.spi.ConnectionFactoryProvider;
2626
import io.r2dbc.spi.Option;
2727
import org.reactivestreams.Publisher;
28+
import reactor.netty.resources.LoopResources;
2829

2930
import javax.net.ssl.HostnameVerifier;
3031
import java.time.Duration;
@@ -217,6 +218,14 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
217218
public static final Option<Integer> ZSTD_COMPRESSION_LEVEL =
218219
Option.valueOf("zstdCompressionLevel");
219220

221+
/**
222+
* Option to set the {@link LoopResources} for the connection.
223+
* Default to {@link reactor.netty.tcp.TcpResources#get() global tcp Resources}
224+
*
225+
* @since 1.1.2
226+
*/
227+
public static final Option<LoopResources> LOOP_RESOURCES = Option.valueOf("loopResources");
228+
220229
/**
221230
* Option to set the maximum size of the {@link Query} parsing cache. Default to {@code 256}.
222231
*
@@ -312,6 +321,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
312321
).to(builder::compressionAlgorithms);
313322
mapper.optional(ZSTD_COMPRESSION_LEVEL).asInt()
314323
.to(builder::zstdCompressionLevel);
324+
mapper.optional(LOOP_RESOURCES).as(LoopResources.class)
325+
.to(builder::loopResources);
315326
mapper.optional(PASSWORD_PUBLISHER).as(Publisher.class)
316327
.to(builder::passwordPublisher);
317328

src/main/java/io/asyncer/r2dbc/mysql/client/Client.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.Mono;
3030
import reactor.core.publisher.SynchronousSink;
31+
import reactor.netty.resources.LoopResources;
3132
import reactor.netty.tcp.TcpClient;
3233

3334
import java.net.InetSocketAddress;
@@ -116,17 +117,20 @@ public interface Client {
116117
* @param tcpNoDelay if enable the {@link ChannelOption#TCP_NODELAY}
117118
* @param context the connection context
118119
* @param connectTimeout connect timeout, or {@code null} if it has no timeout
120+
* @param loopResources the loop resources to use
119121
* @return A {@link Mono} that will emit a connected {@link Client}.
120122
* @throws IllegalArgumentException if {@code ssl}, {@code address} or {@code context} is {@code null}.
121123
* @throws ArithmeticException if {@code connectTimeout} milliseconds overflow as an int
122124
*/
123125
static Mono<Client> connect(MySqlSslConfiguration ssl, SocketAddress address, boolean tcpKeepAlive,
124-
boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout) {
126+
boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout,
127+
LoopResources loopResources) {
125128
requireNonNull(ssl, "ssl must not be null");
126129
requireNonNull(address, "address must not be null");
127130
requireNonNull(context, "context must not be null");
128131

129-
TcpClient tcpClient = TcpClient.newConnection();
132+
TcpClient tcpClient = TcpClient.newConnection()
133+
.runOn(loopResources);
130134

131135
if (connectTimeout != null) {
132136
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,

0 commit comments

Comments
 (0)