Skip to content

Commit 313a13e

Browse files
authored
Apply interceptors for WebSocket handshake (#95)
* apply interceptors for socket handshake * configuration for applying websocket * add test and doc * fix batch 1 * fix batch 2
1 parent 32381a3 commit 313a13e

File tree

11 files changed

+326
-63
lines changed

11 files changed

+326
-63
lines changed

src/main/asciidoc/index.adoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ Likewise with the proxy response
120120
{@link examples.HttpProxyExamples#outboundInterceptor}
121121
----
122122

123+
Interceptors will not apply to WebSocket handshake packets by default. While in some use cases (e.g. changing the WebSocket request path), you can overwrite {@link io.vertx.httpproxy.ProxyInterceptor#allowApplyToWebSocket} method to allow interceptors apply to WebSocket.
124+
123125
==== Body filtering
124126

125127
You can filter body by simply replacing the original {@link io.vertx.httpproxy.Body} with a new one
@@ -174,3 +176,12 @@ You can use body interceptor to create body transformations for common data type
174176

175177
Please check the {@link io.vertx.httpproxy.interceptors.BodyTransformer} for other supported transformations.
176178

179+
==== WebSocket interceptor
180+
181+
You can use WebSocket interceptor to wrap an interceptor to let it allow WebSocket handling:
182+
183+
[source,java]
184+
----
185+
{@link examples.HttpProxyExamples#webSocketInterceptorPath}
186+
----
187+

src/main/java/examples/HttpProxyExamples.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313
import io.vertx.core.net.SocketAddress;
1414
import io.vertx.httpproxy.*;
1515
import io.vertx.httpproxy.cache.CacheOptions;
16-
import io.vertx.httpproxy.interceptors.BodyInterceptor;
17-
import io.vertx.httpproxy.interceptors.BodyTransformer;
18-
import io.vertx.httpproxy.interceptors.HeadersInterceptor;
19-
import io.vertx.httpproxy.interceptors.QueryInterceptor;
16+
import io.vertx.httpproxy.interceptors.*;
2017

2118
import java.util.Set;
2219

@@ -129,6 +126,12 @@ public void bodyInterceptorJson(HttpProxy proxy) {
129126
));
130127
}
131128

129+
public void webSocketInterceptorPath(HttpProxy proxy) {
130+
proxy.addInterceptor(
131+
WebSocketInterceptor.allow(PathInterceptor.addPrefix("/api"))
132+
);
133+
}
134+
132135
public void immediateResponse(HttpProxy proxy) {
133136
proxy.addInterceptor(new ProxyInterceptor() {
134137
@Override

src/main/java/io/vertx/httpproxy/ProxyContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public interface ProxyContext {
2929
*/
3030
Future<Void> sendResponse();
3131

32+
/**
33+
* @return if this request or response is the handshake of WebSocket
34+
*/
35+
boolean isWebSocket();
36+
3237
/**
3338
* Attach a payload to the context
3439
*

src/main/java/io/vertx/httpproxy/ProxyInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,13 @@ default Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
2828
default Future<Void> handleProxyResponse(ProxyContext context) {
2929
return context.sendResponse();
3030
}
31+
32+
/**
33+
* Used to set whether to apply the interceptor to the WebSocket
34+
* handshake packet. The default value is false.
35+
* @return the boolean value
36+
*/
37+
default boolean allowApplyToWebSocket() {
38+
return false;
39+
}
3140
}

src/main/java/io/vertx/httpproxy/ProxyResponse.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
import io.vertx.core.Future;
1717
import io.vertx.core.MultiMap;
1818
import io.vertx.core.buffer.Buffer;
19+
import io.vertx.core.http.HttpClient;
20+
import io.vertx.core.http.HttpClientResponse;
21+
import io.vertx.core.http.HttpServerRequest;
22+
import io.vertx.core.http.HttpServerResponse;
1923
import io.vertx.core.streams.ReadStream;
2024

2125
import java.util.function.Function;
@@ -108,6 +112,11 @@ public interface ProxyResponse {
108112
@Fluent
109113
ProxyResponse setBody(Body body);
110114

115+
/**
116+
* @return the proxied HTTP server response
117+
*/
118+
HttpClientResponse proxiedResponse();
119+
111120
boolean publicCacheControl();
112121

113122
long maxAge();

src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ public ProxyResponse setBody(Body body) {
134134
return this;
135135
}
136136

137+
@Override
138+
public HttpClientResponse proxiedResponse() {
139+
return response;
140+
}
141+
137142
@Override
138143
public boolean publicCacheControl() {
139144
return publicCacheControl;

src/main/java/io/vertx/httpproxy/impl/ReverseProxy.java

Lines changed: 59 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
package io.vertx.httpproxy.impl;
1212

1313
import io.vertx.core.Future;
14+
import io.vertx.core.buffer.Buffer;
1415
import io.vertx.core.http.*;
1516
import io.vertx.core.internal.logging.Logger;
1617
import io.vertx.core.internal.logging.LoggerFactory;
1718
import io.vertx.core.net.NetSocket;
19+
import io.vertx.core.streams.ReadStream;
1820
import io.vertx.httpproxy.*;
1921
import io.vertx.httpproxy.cache.CacheOptions;
2022
import io.vertx.httpproxy.spi.cache.Cache;
@@ -64,13 +66,8 @@ public void handle(HttpServerRequest request) {
6466
return;
6567
}
6668

67-
// WebSocket upgrade tunneling
68-
if (supportWebSocket && request.canUpgradeToWebSocket()) {
69-
handleWebSocketUpgrade(proxyRequest);
70-
return;
71-
}
72-
73-
Proxy proxy = new Proxy(proxyRequest);
69+
boolean isWebSocket = supportWebSocket && request.canUpgradeToWebSocket();
70+
Proxy proxy = new Proxy(proxyRequest, isWebSocket);
7471
proxy.filters = interceptors.listIterator();
7572
proxy.sendRequest()
7673
.recover(throwable -> {
@@ -84,57 +81,6 @@ public void handle(HttpServerRequest request) {
8481
});
8582
}
8683

87-
private void handleWebSocketUpgrade(ProxyRequest proxyRequest) {
88-
HttpServerRequest proxiedRequest = proxyRequest.proxiedRequest();
89-
resolveOrigin(proxiedRequest).onComplete(ar -> {
90-
if (ar.succeeded()) {
91-
HttpClientRequest request = ar.result();
92-
request.setMethod(HttpMethod.GET);
93-
request.setURI(proxiedRequest.uri());
94-
request.headers().addAll(proxiedRequest.headers());
95-
Future<HttpClientResponse> fut2 = request.connect();
96-
proxiedRequest.handler(request::write);
97-
proxiedRequest.endHandler(v -> request.end());
98-
proxiedRequest.resume();
99-
fut2.onComplete(ar2 -> {
100-
if (ar2.succeeded()) {
101-
HttpClientResponse proxiedResponse = ar2.result();
102-
if (proxiedResponse.statusCode() == 101) {
103-
HttpServerResponse response = proxiedRequest.response();
104-
response.setStatusCode(101);
105-
response.headers().addAll(proxiedResponse.headers());
106-
Future<NetSocket> otherso = proxiedRequest.toNetSocket();
107-
otherso.onComplete(ar3 -> {
108-
if (ar3.succeeded()) {
109-
NetSocket responseSocket = ar3.result();
110-
NetSocket proxyResponseSocket = proxiedResponse.netSocket();
111-
responseSocket.handler(proxyResponseSocket::write);
112-
proxyResponseSocket.handler(responseSocket::write);
113-
responseSocket.closeHandler(v -> proxyResponseSocket.close());
114-
proxyResponseSocket.closeHandler(v -> responseSocket.close());
115-
} else {
116-
// Find reproducer
117-
System.err.println("Handle this case");
118-
ar3.cause().printStackTrace();
119-
}
120-
});
121-
} else {
122-
// Rejection
123-
proxiedRequest.resume();
124-
end(proxyRequest, proxiedResponse.statusCode());
125-
}
126-
} else {
127-
proxiedRequest.resume();
128-
end(proxyRequest, 502);
129-
}
130-
});
131-
} else {
132-
proxiedRequest.resume();
133-
end(proxyRequest, 502);
134-
}
135-
});
136-
}
137-
13884
private void end(ProxyRequest proxyRequest, int sc) {
13985
proxyRequest
14086
.response()
@@ -155,9 +101,16 @@ private class Proxy implements ProxyContext {
155101
private ProxyResponse response;
156102
private final Map<String, Object> attachments = new HashMap<>();
157103
private ListIterator<ProxyInterceptor> filters;
104+
private final boolean isWebSocket;
158105

159-
private Proxy(ProxyRequest request) {
106+
private Proxy(ProxyRequest request, boolean isWebSocket) {
160107
this.request = request;
108+
this.isWebSocket = isWebSocket;
109+
}
110+
111+
@Override
112+
public boolean isWebSocket() {
113+
return isWebSocket;
161114
}
162115

163116
@Override
@@ -180,8 +133,25 @@ public ProxyRequest request() {
180133
public Future<ProxyResponse> sendRequest() {
181134
if (filters.hasNext()) {
182135
ProxyInterceptor next = filters.next();
136+
if (isWebSocket && !next.allowApplyToWebSocket()) {
137+
return sendRequest();
138+
}
183139
return next.handleProxyRequest(this);
184140
} else {
141+
if (isWebSocket) {
142+
HttpServerRequest proxiedRequest = request().proxiedRequest();
143+
return resolveOrigin(proxiedRequest).compose(request -> {
144+
request.setMethod(request().getMethod());
145+
request.setURI(request().getURI());
146+
request.headers().addAll(request().headers());
147+
Future<HttpClientResponse> responseFuture = request.connect();
148+
ReadStream<Buffer> readStream = request().getBody().stream();
149+
readStream.handler(request::write);
150+
readStream.resume();
151+
proxiedRequest.resume();
152+
return responseFuture;
153+
}).map(response -> new ProxiedResponse((ProxiedRequest) request(), request().proxiedRequest().response(), response));
154+
}
185155
return sendProxyRequest(request);
186156
}
187157
}
@@ -195,8 +165,38 @@ public ProxyResponse response() {
195165
public Future<Void> sendResponse() {
196166
if (filters.hasPrevious()) {
197167
ProxyInterceptor filter = filters.previous();
168+
if (isWebSocket && !filter.allowApplyToWebSocket()) {
169+
return sendResponse();
170+
}
198171
return filter.handleProxyResponse(this);
199172
} else {
173+
if (isWebSocket) {
174+
HttpClientResponse proxiedResponse = response().proxiedResponse();
175+
if (response.getStatusCode() == 101) {
176+
HttpServerResponse clientResponse = request().proxiedRequest().response();
177+
clientResponse.setStatusCode(101);
178+
clientResponse.headers().addAll(response.headers());
179+
Future<NetSocket> otherso = request.proxiedRequest().toNetSocket();
180+
otherso.onComplete(ar3 -> {
181+
if (ar3.succeeded()) {
182+
NetSocket responseSocket = ar3.result();
183+
NetSocket proxyResponseSocket = proxiedResponse.netSocket();
184+
responseSocket.handler(proxyResponseSocket::write);
185+
proxyResponseSocket.handler(responseSocket::write);
186+
responseSocket.closeHandler(v -> proxyResponseSocket.close());
187+
proxyResponseSocket.closeHandler(v -> responseSocket.close());
188+
} else {
189+
// Find reproducer
190+
System.err.println("Handle this case");
191+
ar3.cause().printStackTrace();
192+
}
193+
});
194+
} else {
195+
request().proxiedRequest().resume();
196+
end(request(), proxiedResponse.statusCode());
197+
}
198+
return Future.succeededFuture();
199+
}
200200
return response.send();
201201
}
202202
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.vertx.httpproxy.interceptors;
2+
3+
import io.vertx.codegen.annotations.Unstable;
4+
import io.vertx.codegen.annotations.VertxGen;
5+
import io.vertx.httpproxy.ProxyInterceptor;
6+
import io.vertx.httpproxy.interceptors.impl.WebSocketInterceptorImpl;
7+
8+
/**
9+
* Interceptor settings for WebSocket.
10+
*/
11+
@Unstable
12+
@VertxGen
13+
public interface WebSocketInterceptor {
14+
15+
/**
16+
* A wrapper to allow interceptor apply to WebSocket handshake packages.
17+
*
18+
* @param interceptor the original interceptor
19+
* @return the generated interceptor
20+
*/
21+
static ProxyInterceptor allow(ProxyInterceptor interceptor) {
22+
return new WebSocketInterceptorImpl(interceptor);
23+
}
24+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package io.vertx.httpproxy.interceptors.impl;
2+
3+
import io.vertx.core.Future;
4+
import io.vertx.httpproxy.ProxyContext;
5+
import io.vertx.httpproxy.ProxyInterceptor;
6+
import io.vertx.httpproxy.ProxyResponse;
7+
8+
public class WebSocketInterceptorImpl implements ProxyInterceptor {
9+
private final ProxyInterceptor interceptor;
10+
11+
public WebSocketInterceptorImpl(ProxyInterceptor interceptor) {
12+
this.interceptor = interceptor;
13+
}
14+
15+
@Override
16+
public Future<ProxyResponse> handleProxyRequest(ProxyContext context) {
17+
return interceptor.handleProxyRequest(context);
18+
}
19+
20+
@Override
21+
public Future<Void> handleProxyResponse(ProxyContext context) {
22+
return interceptor.handleProxyResponse(context);
23+
}
24+
25+
@Override
26+
public boolean allowApplyToWebSocket() {
27+
return true;
28+
}
29+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.vertx.tests;
2+
3+
import io.vertx.core.buffer.Buffer;
4+
import io.vertx.core.net.SocketAddress;
5+
import io.vertx.ext.unit.Async;
6+
import io.vertx.ext.unit.TestContext;
7+
import io.vertx.ext.unit.junit.VertxUnitRunner;
8+
import io.vertx.httpproxy.ProxyOptions;
9+
import io.vertx.httpproxy.cache.CacheOptions;
10+
import org.junit.Test;
11+
import org.junit.runner.RunWith;
12+
13+
@RunWith(VertxUnitRunner.class)
14+
public class WebSocketCacheTest extends ProxyTestBase {
15+
16+
public WebSocketCacheTest() {
17+
super(new ProxyOptions().setCacheOptions(new CacheOptions()));
18+
}
19+
20+
@Test
21+
public void testWsWithCache(TestContext ctx) {
22+
Async latch = ctx.async();
23+
SocketAddress backend = startHttpBackend(ctx, 8081, req -> {
24+
req.toWebSocket().onSuccess(ws -> {
25+
ws.handler(ws::write);
26+
});
27+
});
28+
startProxy(backend);
29+
vertx.createWebSocketClient().connect(8080, "localhost", "/").onComplete(ctx.asyncAssertSuccess(ws1 -> {
30+
ws1.handler(buffer -> {
31+
ctx.assertEquals(buffer.toString(), "v1");
32+
ws1.close().onComplete(ctx.asyncAssertSuccess(v -> {
33+
vertx.createWebSocketClient().connect(8080, "localhost", "/").onComplete(ctx.asyncAssertSuccess(ws2 -> {
34+
ws2.handler(buffer2 -> {
35+
ctx.assertEquals(buffer2.toString(), "v2");
36+
ws2.close().onComplete(ctx.asyncAssertSuccess(v2 -> {
37+
latch.complete();
38+
}));
39+
});
40+
ws2.write(Buffer.buffer("v2")); // second WebSocket, send and reply "v2"
41+
}));
42+
}));
43+
});
44+
ws1.write(Buffer.buffer("v1")); // first WebSocket, send and reply "v1"
45+
}));
46+
}
47+
}

0 commit comments

Comments
 (0)