Skip to content

Commit 1aa46b6

Browse files
NeatGuyCodingonobc
authored andcommitted
Use Flux.interval in Webflux samples
This commit improves the Webflux samples by leveraging Flux.interval instead of Thread.sleep. Signed-off-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com>
1 parent cb84310 commit 1aa46b6

File tree

2 files changed

+18
-30
lines changed

2 files changed

+18
-30
lines changed

samples/grpc-webflux-secure/src/main/java/org/springframework/grpc/sample/GrpcServerService.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.springframework.grpc.sample;
22

3+
import java.time.Duration;
4+
35
import org.apache.commons.logging.Log;
46
import org.apache.commons.logging.LogFactory;
57
import org.springframework.grpc.sample.proto.HelloReply;
@@ -8,6 +10,7 @@
810
import org.springframework.stereotype.Service;
911

1012
import io.grpc.stub.StreamObserver;
13+
import reactor.core.publisher.Flux;
1114

1215
@Service
1316
public class GrpcServerService extends SimpleGrpc.SimpleImplBase {
@@ -31,21 +34,12 @@ public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserv
3134
@Override
3235
public void streamHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
3336
log.info("Hello " + req.getName());
34-
int count = 0;
35-
while (count < 10) {
36-
HelloReply reply = HelloReply.newBuilder().setMessage("Hello(" + count + ") ==> " + req.getName()).build();
37-
responseObserver.onNext(reply);
38-
count++;
39-
try {
40-
Thread.sleep(1000L);
41-
}
42-
catch (InterruptedException e) {
43-
Thread.currentThread().interrupt();
44-
responseObserver.onError(e);
45-
return;
46-
}
47-
}
48-
responseObserver.onCompleted();
37+
38+
// Use reactive Flux.interval() instead of blocking Thread.sleep()
39+
Flux.interval(Duration.ofSeconds(1))
40+
.take(10)
41+
.map(count -> HelloReply.newBuilder().setMessage("Hello(" + count + ") ==> " + req.getName()).build())
42+
.subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted);
4943
}
5044

5145
}

samples/grpc-webflux/src/main/java/org/springframework/grpc/sample/GrpcServerService.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.springframework.grpc.sample;
22

3+
import java.time.Duration;
4+
35
import org.apache.commons.logging.Log;
46
import org.apache.commons.logging.LogFactory;
57
import org.springframework.grpc.sample.proto.HelloReply;
@@ -8,6 +10,7 @@
810
import org.springframework.stereotype.Service;
911

1012
import io.grpc.stub.StreamObserver;
13+
import reactor.core.publisher.Flux;
1114

1215
@Service
1316
public class GrpcServerService extends SimpleGrpc.SimpleImplBase {
@@ -31,21 +34,12 @@ public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserv
3134
@Override
3235
public void streamHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
3336
log.info("Hello " + req.getName());
34-
int count = 0;
35-
while (count < 10) {
36-
HelloReply reply = HelloReply.newBuilder().setMessage("Hello(" + count + ") ==> " + req.getName()).build();
37-
responseObserver.onNext(reply);
38-
count++;
39-
try {
40-
Thread.sleep(1000L);
41-
}
42-
catch (InterruptedException e) {
43-
Thread.currentThread().interrupt();
44-
responseObserver.onError(e);
45-
return;
46-
}
47-
}
48-
responseObserver.onCompleted();
37+
38+
// Use reactive Flux.interval() instead of blocking Thread.sleep()
39+
Flux.interval(Duration.ofSeconds(1))
40+
.take(10)
41+
.map(count -> HelloReply.newBuilder().setMessage("Hello(" + count + ") ==> " + req.getName()).build())
42+
.subscribe(responseObserver::onNext, responseObserver::onError, responseObserver::onCompleted);
4943
}
5044

5145
}

0 commit comments

Comments
 (0)