Skip to content

Commit 0a286ea

Browse files
committed
#20 - ENH: Add async().stream(...) for streaming results to beans (typically x-json-stream response)
1 parent fec0486 commit 0a286ea

File tree

4 files changed

+97
-11
lines changed

4 files changed

+97
-11
lines changed

client/src/main/java/io/avaje/http/client/DHttpAsync.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.net.http.HttpResponse;
44
import java.util.List;
55
import java.util.concurrent.CompletableFuture;
6+
import java.util.stream.Stream;
67

78
class DHttpAsync implements HttpAsyncResponse {
89

@@ -44,4 +45,11 @@ public <E> CompletableFuture<List<E>> list(Class<E> type) {
4445
.performSendAsync(true, HttpResponse.BodyHandlers.ofByteArray())
4546
.thenApply(httpResponse -> request.asyncList(type, httpResponse));
4647
}
48+
49+
@Override
50+
public <E> CompletableFuture<Stream<E>> stream(Class<E> type) {
51+
return request
52+
.performSendAsync(false, HttpResponse.BodyHandlers.ofLines())
53+
.thenApply(httpResponse -> request.asyncStream(type, httpResponse));
54+
}
4755
}

client/src/main/java/io/avaje/http/client/DHttpClientRequest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,17 @@ protected <E> List<E> asyncList(Class<E> type, HttpResponse<byte[]> response) {
409409
return context.readList(type, encodedResponseBody);
410410
}
411411

412+
protected <E> Stream<E> asyncStream(Class<E> type, HttpResponse<Stream<String>> response) {
413+
requestTimeNanos = System.nanoTime() - startAsyncNanos;
414+
httpResponse = response;
415+
context.afterResponse(this);
416+
if (response.statusCode() >= 300) {
417+
throw new HttpException(response, context);
418+
}
419+
final BodyReader<E> bodyReader = context.beanReader(type);
420+
return response.body().map(bodyReader::readBody);
421+
}
422+
412423
private void afterAsyncEncoded(HttpResponse<byte[]> response) {
413424
requestTimeNanos = System.nanoTime() - startAsyncNanos;
414425
httpResponse = response;

client/src/main/java/io/avaje/http/client/HttpAsyncResponse.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.net.http.HttpResponse;
44
import java.util.List;
55
import java.util.concurrent.CompletableFuture;
6+
import java.util.stream.Stream;
67

78
/**
89
* Async processing of the request with responses as CompletableFuture.
@@ -105,8 +106,9 @@ public interface HttpAsyncResponse {
105106
/**
106107
* Process expecting a bean response body (typically from json content).
107108
* <p>
108-
* If the HTTP statusCode is 300 or above a HttpException is throw which
109-
* contains the HttpResponse.
109+
* If the HTTP statusCode is 300 or above a HttpException is throw
110+
* which contains the HttpResponse. This is the cause in the
111+
* CompletionException.
110112
*
111113
* <pre>{@code
112114
*
@@ -125,7 +127,7 @@ public interface HttpAsyncResponse {
125127
* ..
126128
*
127129
* } else {
128-
* // use helloDto
130+
* // process helloDto
129131
* ...
130132
* }
131133
* });
@@ -139,8 +141,9 @@ public interface HttpAsyncResponse {
139141
/**
140142
* Process expecting a list of beans response body (typically from json content).
141143
* <p>
142-
* If the HTTP statusCode is 300 or above a HttpException is throw which
143-
* contains the HttpResponse.
144+
* If the HTTP statusCode is 300 or above a HttpException is throw
145+
* which contains the HttpResponse. This is the cause in the
146+
* CompletionException.
144147
*
145148
* <pre>{@code
146149
*
@@ -156,7 +159,7 @@ public interface HttpAsyncResponse {
156159
* ...
157160
*
158161
* } else {
159-
* // use list of helloDto
162+
* // process list of helloDto
160163
* ...
161164
* }
162165
* });
@@ -167,4 +170,35 @@ public interface HttpAsyncResponse {
167170
*/
168171
<E> CompletableFuture<List<E>> list(Class<E> type);
169172

173+
/**
174+
* Process response as a stream of beans (x-json-stream).
175+
* <p>
176+
* If the HTTP statusCode is 300 or above a HttpException is throw
177+
* which contains the HttpResponse. This is the cause in the
178+
* CompletionException.
179+
*
180+
* <pre>{@code
181+
*
182+
* CompletableFuture<Stream<Customer>> future = clientContext.request()
183+
* .path("customers/stream")
184+
* .GET().async()
185+
* .stream(Customer.class);
186+
*
187+
* future.whenComplete((stream, throwable) -> {
188+
* // if throwable != null ... handle error
189+
*
190+
* // else process Stream<Customer> ...
191+
* try (stream) {
192+
* stream.forEach(customer -> {
193+
* ...
194+
* });
195+
* }
196+
* });
197+
*
198+
* }</pre>
199+
*
200+
* @param type The bean type to convert the content to
201+
* @return The CompletableFuture of the response
202+
*/
203+
<E> CompletableFuture<Stream<E>> stream(Class<E> type);
170204
}

client/src/test/java/io/avaje/http/client/HelloControllerTest.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,37 @@ void get_stream() {
4141
}
4242

4343
@Test
44-
void async_stream() throws ExecutionException, InterruptedException {
44+
void async_get_stream() throws ExecutionException, InterruptedException {
45+
46+
final CompletableFuture<Stream<SimpleData>> future = clientContext.request()
47+
.path("hello").path("stream")
48+
.GET().async()
49+
.stream(SimpleData.class);
50+
51+
future.whenComplete((stream, throwable) -> {
52+
assertThat(throwable).isNull();
53+
54+
final List<SimpleData> data = stream.collect(Collectors.toList());
55+
assertThat(data).hasSize(4);
56+
final SimpleData first = data.get(0);
57+
assertThat(first.id).isEqualTo(1);
58+
assertThat(first.name).isEqualTo("one");
59+
60+
try (stream) {
61+
// more typically process with forEach ...
62+
stream.forEach(simpleData -> {
63+
System.out.println("process " + simpleData.id + " " + simpleData.name);
64+
});
65+
}
66+
});
67+
68+
// wait ...
69+
future.get();
70+
71+
}
72+
73+
@Test
74+
void async_stream_fromLineSubscriber() throws ExecutionException, InterruptedException {
4575

4676
AtomicReference<HttpResponse<Void>> hresRef = new AtomicReference<>();
4777
AtomicReference<Throwable> errRef = new AtomicReference<>();
@@ -59,14 +89,17 @@ public void onSubscribe(Flow.Subscription subscription) {
5989
subscription.request(Long.MAX_VALUE);
6090
onSubscribeRef.set(true);
6191
}
92+
6293
@Override
6394
public void onNext(String item) {
6495
lines.add(item);
6596
}
97+
6698
@Override
6799
public void onError(Throwable throwable) {
68100
errRef.set(throwable);
69101
}
102+
70103
@Override
71104
public void onComplete() {
72105
completeRef.set(true);
@@ -274,11 +307,11 @@ void async_exceptionally_style() {
274307
.bean(HelloDto.class);
275308

276309
future.exceptionally(throwable -> {
277-
final HttpException httpException = (HttpException) throwable.getCause();
278-
causeRef.set(httpException);
279-
assertThat(httpException.getStatusCode()).isEqualTo(422);
310+
final HttpException httpException = (HttpException) throwable.getCause();
311+
causeRef.set(httpException);
312+
assertThat(httpException.getStatusCode()).isEqualTo(422);
280313

281-
return new HelloDto(0, "ErrorResponse", "");
314+
return new HelloDto(0, "ErrorResponse", "");
282315

283316
}).thenAccept(helloDto -> {
284317
assertThat(helloDto.name).isEqualTo("ErrorResponse");

0 commit comments

Comments
 (0)