Skip to content

Commit ea1881b

Browse files
committed
Move ping command to QueryFlow
1 parent 2f3cc22 commit ea1881b

File tree

3 files changed

+27
-25
lines changed

3 files changed

+27
-25
lines changed

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlSimpleConnection.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.asyncer.r2dbc.mysql.client.Client;
2626
import io.asyncer.r2dbc.mysql.codec.Codecs;
2727
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
28-
import io.asyncer.r2dbc.mysql.message.client.PingMessage;
2928
import io.asyncer.r2dbc.mysql.message.server.CompleteMessage;
3029
import io.asyncer.r2dbc.mysql.message.server.ErrorMessage;
3130
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
@@ -38,12 +37,9 @@
3837
import io.r2dbc.spi.ValidationDepth;
3938
import org.jetbrains.annotations.Nullable;
4039
import org.jetbrains.annotations.TestOnly;
41-
import reactor.core.publisher.Flux;
4240
import reactor.core.publisher.Mono;
43-
import reactor.core.publisher.SynchronousSink;
4441

4542
import java.time.Duration;
46-
import java.util.function.BiConsumer;
4743
import java.util.function.Function;
4844
import java.util.function.Predicate;
4945

@@ -66,27 +62,14 @@ final class MySqlSimpleConnection implements MySqlConnection {
6662

6763
if (message instanceof ErrorMessage) {
6864
ErrorMessage msg = (ErrorMessage) message;
69-
logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(),
70-
msg.getMessage());
65+
logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(), msg.getMessage());
7166
} else {
7267
ReferenceCountUtil.safeRelease(message);
7368
}
7469

7570
return false;
7671
};
7772

78-
private static final BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> PING = (message, sink) -> {
79-
if (message instanceof ErrorMessage) {
80-
sink.next(message);
81-
sink.complete();
82-
} else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
83-
sink.next(message);
84-
sink.complete();
85-
} else {
86-
ReferenceCountUtil.safeRelease(message);
87-
}
88-
};
89-
9073
private final Client client;
9174

9275
private final Codecs codecs;
@@ -266,9 +249,9 @@ public Mono<Boolean> validate(ValidationDepth depth) {
266249
return Mono.just(false);
267250
}
268251

269-
return doPingInternal(client)
270-
.last()
252+
return QueryFlow.ping(client)
271253
.map(VALIDATE)
254+
.last()
272255
.onErrorResume(e -> {
273256
// `last` maybe emit a NoSuchElementException, exchange maybe emit exception by Netty.
274257
// But should NEVER emit any exception, so logging exception and emit false.
@@ -334,8 +317,4 @@ public Mono<Void> setStatementTimeout(Duration timeout) {
334317
ConnectionContext context() {
335318
return client.getContext();
336319
}
337-
338-
static Flux<ServerMessage> doPingInternal(Client client) {
339-
return client.exchange(PingMessage.INSTANCE, PING);
340-
}
341320
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/PingStatement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Flux<MySqlResult> execute() {
6969
client,
7070
codecs,
7171
null,
72-
MySqlSimpleConnection.doPingInternal(client)
72+
QueryFlow.ping(client)
7373
)));
7474
}
7575
}

r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.List;
6464
import java.util.concurrent.atomic.AtomicBoolean;
6565
import java.util.concurrent.atomic.AtomicInteger;
66+
import java.util.function.BiConsumer;
6667
import java.util.function.Consumer;
6768
import java.util.function.Predicate;
6869

@@ -86,6 +87,18 @@ final class QueryFlow {
8687
}
8788
};
8889

90+
private static final BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> PING = (message, sink) -> {
91+
if (message instanceof ErrorMessage) {
92+
sink.next(message);
93+
sink.complete();
94+
} else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
95+
sink.next(message);
96+
sink.complete();
97+
} else {
98+
ReferenceCountUtil.safeRelease(message);
99+
}
100+
};
101+
89102
/**
90103
* Execute multiple bindings of a server-preparing statement with one-by-one binary execution. The execution
91104
* terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. If client receives a
@@ -252,6 +265,16 @@ static Mono<Void> createSavepoint(Client client, String name, boolean batchSuppo
252265
return client.exchange(new TransactionMultiExchangeable(savepointState)).then();
253266
}
254267

268+
/**
269+
* Executes a ping command to the server.
270+
*
271+
* @param client the {@link Client} to exchange messages with.
272+
* @return complete or error messages received in response to this exchange.
273+
*/
274+
static Flux<ServerMessage> ping(Client client) {
275+
return client.exchange(PingMessage.INSTANCE, PING);
276+
}
277+
255278
/**
256279
* Sets a session variable to the server.
257280
*

0 commit comments

Comments
 (0)