Skip to content

Commit 9a47e63

Browse files
committed
Refactor subscriptions
1 parent e0fa40b commit 9a47e63

File tree

4 files changed

+14
-4
lines changed

4 files changed

+14
-4
lines changed

graphql-java-kickstart/src/main/java/graphql/kickstart/execution/subscriptions/DefaultSubscriptionSession.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ public class DefaultSubscriptionSession implements SubscriptionSession {
2323
@Override
2424
public void send(String message) {
2525
Objects.requireNonNull(message, "message is required");
26-
log.info("Offer message: {}", message);
2726
publisher.offer(message);
2827
}
2928

3029
@Override
3130
public void sendMessage(Object payload) {
3231
Objects.requireNonNull(payload, "payload is required");
33-
log.info("Send message: {}", payload);
3432
send(mapper.serialize(payload));
3533
}
3634

@@ -66,6 +64,8 @@ public void sendCompleteMessage(String id) {
6664

6765
@Override
6866
public void close(String reason) {
67+
log.debug("Closing subscription session {}", getId());
68+
subscriptions.close();
6969
publisher.noMoreData();
7070
}
7171

@@ -99,4 +99,9 @@ public Publisher<String> getPublisher() {
9999
return publisher;
100100
}
101101

102+
@Override
103+
public String toString() {
104+
return getId();
105+
}
106+
102107
}

graphql-java-kickstart/src/main/java/graphql/kickstart/execution/subscriptions/SessionSubscriber.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class SessionSubscriber implements Subscriber<ExecutionResult> {
1818

1919
@Override
2020
public void onSubscribe(Subscription subscription) {
21-
log.info("Subscribe to execution result: {}", subscription);
21+
log.debug("Subscribe to execution result: {}", subscription);
2222
subscriptionReference.set(subscription);
2323
subscriptionReference.get().request(1);
2424

@@ -27,7 +27,6 @@ public void onSubscribe(Subscription subscription) {
2727

2828
@Override
2929
public void onNext(ExecutionResult executionResult) {
30-
log.info("Next execution result: {}", executionResult);
3130
Map<String, Object> result = new HashMap<>();
3231
result.put("data", executionResult.getData());
3332
session.sendDataMessage(id, result);

graphql-java-kickstart/src/main/java/graphql/kickstart/execution/subscriptions/apollo/SubscriptionConnectionInitCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
import graphql.kickstart.execution.subscriptions.apollo.OperationMessage.Type;
55
import java.util.Collection;
66
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
78

9+
@Slf4j
810
@RequiredArgsConstructor
911
class SubscriptionConnectionInitCommand implements SubscriptionCommand {
1012

1113
private final Collection<ApolloSubscriptionConnectionListener> connectionListeners;
1214

1315
@Override
1416
public void apply(SubscriptionSession session, OperationMessage message) {
17+
log.info("Apollo subscription connection init: {}", session);
1518
try {
1619
connectionListeners.forEach(it -> it.onConnect(session, message));
1720
session.sendMessage(new OperationMessage(Type.GQL_CONNECTION_ACK, message.getId(), null));

graphql-java-kickstart/src/main/java/graphql/kickstart/execution/subscriptions/apollo/SubscriptionStartCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
import java.util.Objects;
1414
import java.util.concurrent.CompletableFuture;
1515
import lombok.RequiredArgsConstructor;
16+
import lombok.extern.slf4j.Slf4j;
1617

18+
@Slf4j
1719
@RequiredArgsConstructor
1820
class SubscriptionStartCommand implements SubscriptionCommand {
1921

@@ -24,6 +26,7 @@ class SubscriptionStartCommand implements SubscriptionCommand {
2426

2527
@Override
2628
public void apply(SubscriptionSession session, OperationMessage message) {
29+
log.info("Apollo subscription start: {} --> {}", session, message.getPayload());
2730
connectionListeners.forEach(it -> it.onStart(session, message));
2831
CompletableFuture<ExecutionResult> executionResult = executeAsync(message.getPayload(), session);
2932
executionResult.thenAccept(result -> handleSubscriptionStart(session, message.getId(), result));

0 commit comments

Comments
 (0)