Skip to content

Commit a932795

Browse files
committed
http proxy pass trailers
1 parent 7674c5a commit a932795

File tree

3 files changed

+189
-1
lines changed

3 files changed

+189
-1
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package io.vertx.httpproxy.impl;
2+
3+
import io.vertx.codegen.annotations.Nullable;
4+
import io.vertx.core.Future;
5+
import io.vertx.core.Handler;
6+
import io.vertx.core.MultiMap;
7+
import io.vertx.core.buffer.Buffer;
8+
import io.vertx.core.http.HttpClientResponse;
9+
import io.vertx.core.http.HttpServerResponse;
10+
import io.vertx.core.streams.Pipe;
11+
import io.vertx.core.streams.ReadStream;
12+
import io.vertx.core.streams.WriteStream;
13+
import io.vertx.httpproxy.Body;
14+
15+
16+
/**
17+
* created by wang007 on 2025/9/15
18+
*/
19+
public class HttpClientResponseBody implements Body, ReadStream<Buffer> {
20+
21+
private final HttpClientResponse response;
22+
private final String mediaType;
23+
private final long length;
24+
25+
private volatile Handler<Void> endHandler;
26+
27+
private volatile HttpServerResponse dst;
28+
29+
public HttpClientResponseBody(HttpClientResponse response, long length, String mediaType) {
30+
this.response = response;
31+
this.mediaType = mediaType;
32+
this.length = length;
33+
}
34+
35+
@Override
36+
public String mediaType() {
37+
return mediaType;
38+
}
39+
40+
@Override
41+
public long length() {
42+
return length;
43+
}
44+
45+
@Override
46+
public ReadStream<Buffer> stream() {
47+
return this;
48+
}
49+
50+
51+
@Override
52+
public ReadStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
53+
response.exceptionHandler(handler);
54+
return this;
55+
}
56+
57+
@Override
58+
public ReadStream<Buffer> handler(@Nullable Handler<Buffer> handler) {
59+
response.handler(handler);
60+
return this;
61+
}
62+
63+
@Override
64+
public ReadStream<Buffer> pause() {
65+
response.pause();
66+
return this;
67+
}
68+
69+
@Override
70+
public ReadStream<Buffer> resume() {
71+
response.resume();
72+
return this;
73+
}
74+
75+
@Override
76+
public ReadStream<Buffer> fetch(long amount) {
77+
response.fetch(amount);
78+
return this;
79+
}
80+
81+
@Override
82+
public ReadStream<Buffer> endHandler(@Nullable Handler<Void> endHandler) {
83+
if (endHandler == null) {
84+
response.endHandler(null);
85+
return this;
86+
}
87+
Handler<Void> current = this.endHandler;
88+
this.endHandler = endHandler;
89+
if (current != null) {
90+
return this;
91+
}
92+
93+
response.endHandler(v -> {
94+
try {
95+
MultiMap trailers = response.trailers();
96+
if (trailers.isEmpty()) {
97+
return;
98+
}
99+
HttpServerResponse dst = this.dst;
100+
if (dst == null) {
101+
return;
102+
}
103+
MultiMap dstTrailers = dst.trailers();
104+
dstTrailers.addAll(trailers);
105+
} finally {
106+
Handler<Void> h = this.endHandler;
107+
if (h != null) {
108+
h.handle(null);
109+
}
110+
}
111+
});
112+
113+
return this;
114+
}
115+
116+
117+
@Override
118+
public Future<Void> pipeTo(WriteStream<Buffer> dst) {
119+
if (dst instanceof HttpServerResponse) {
120+
this.dst = (HttpServerResponse) dst;
121+
}
122+
return ReadStream.super.pipeTo(dst);
123+
}
124+
125+
@Override
126+
public Pipe<Buffer> pipe() {
127+
Pipe<Buffer> pipe = ReadStream.super.pipe();
128+
return new Pipe<>() {
129+
@Override
130+
public Pipe<Buffer> endOnFailure(boolean end) {
131+
pipe.endOnFailure(end);
132+
return this;
133+
}
134+
135+
@Override
136+
public Pipe<Buffer> endOnSuccess(boolean end) {
137+
pipe.endOnSuccess(end);
138+
return this;
139+
}
140+
141+
@Override
142+
public Pipe<Buffer> endOnComplete(boolean end) {
143+
pipe.endOnComplete(end);
144+
return this;
145+
}
146+
147+
@Override
148+
public Future<Void> to(WriteStream<Buffer> dst) {
149+
if (dst instanceof HttpServerResponse) {
150+
HttpClientResponseBody.this.dst = (HttpServerResponse) dst;
151+
}
152+
return pipe.to(dst);
153+
}
154+
155+
@Override
156+
public void close() {
157+
pipe.close();
158+
}
159+
};
160+
161+
}
162+
}

src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class ProxiedResponse implements ProxyResponse {
7474
this.proxiedResponse = proxiedResponse;
7575
this.statusCode = response.statusCode();
7676
this.statusMessage = response.statusMessage();
77-
this.body = Body.body(response, contentLength, contentType);
77+
this.body = new HttpClientResponseBody(response, contentLength, contentType);
7878

7979
long maxAge = -1;
8080
boolean publicCacheControl = false;

src/test/java/io/vertx/tests/ProxyRequestTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,32 @@ public void testChunkedFrontendRequest(TestContext ctx) {
121121
.onComplete(ctx.asyncAssertSuccess());
122122
}
123123

124+
@Test
125+
public void testResponseTrailer(TestContext ctx) {
126+
runHttpTest(ctx, req -> {
127+
String te = req.getHeader(HttpHeaders.TRANSFER_ENCODING);
128+
if (te == null || !te.equalsIgnoreCase("chunked")) {
129+
ctx.fail("got non chunked request");
130+
}
131+
HttpServerResponse response = req.response();
132+
response.setChunked(true);
133+
response.trailers().add("marked", "1");
134+
response.end("Hello World");
135+
}, ctx.asyncAssertSuccess());
136+
httpClient = vertx.createHttpClient();
137+
httpClient
138+
.request(HttpMethod.POST, 8080, "localhost", "/somepath")
139+
.compose(req -> req
140+
.setChunked(true)
141+
.send("chunk")
142+
.andThen(ctx.asyncAssertSuccess(resp -> ctx.assertEquals(200, resp.statusCode())))
143+
.compose(response -> response.end()
144+
.map(response)
145+
.andThen(ctx.asyncAssertSuccess(resp -> ctx.assertEquals("1", resp.getTrailer("marked"))))))
146+
.onComplete(ctx.asyncAssertSuccess());
147+
}
148+
149+
124150
@Test
125151
public void testNonChunkedFrontendRequest(TestContext ctx) {
126152
runHttpTest(ctx, req -> {

0 commit comments

Comments
 (0)