33import io .vertx .core .*;
44import io .vertx .core .http .*;
55import io .vertx .core .json .JsonObject ;
6+ import io .vertx .core .net .SocketAddress ;
67import io .vertx .ext .web .Route ;
78import io .vertx .ext .web .Router ;
89import io .vertx .ext .web .RoutingContext ;
@@ -85,7 +86,17 @@ public class ReverseHttpProxy {
8586 /**
8687 * 请求发送的毫秒时间戳
8788 */
88- public static final String P_SEND_TIMESTAMP = "send.timestamp" ;
89+ protected static final String INTERNAL_SEND_TIMESTAMP = "internal.send.timestamp" ;
90+
91+ /**
92+ * 连接状态:客户端--代理服务
93+ */
94+ protected static final String INTERNAL_CLIENT_CONNECTION_OPEN = "internal.client.connection.open" ;
95+
96+ /**
97+ * 连接状态:代理服务--后端服务
98+ */
99+ protected static final String INTERNAL_PROXY_SERVER_CONNECTION_OPEN = "internal.client.proxyServer.connection.open" ;
89100
90101
91102 protected static final char [] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" .toCharArray ();
@@ -152,12 +163,12 @@ public static ReverseHttpProxy create(Router router, HttpServer httpServer, Http
152163 }
153164
154165 protected static String generateName () {
155- final String prefix = "VertxHTTPReverseProxy -" ;
166+ final String prefix = "ReverseHttpProxy -" ;
156167 try {
157168 // 池号对于虚拟机来说是全局的,以避免在类加载器范围的环境中池号重叠
158169 synchronized (System .getProperties ()) {
159- final String next = String .valueOf (Integer .getInteger ("top.meethigher.proxy.http.VertxHTTPReverseProxy .name" , 0 ) + 1 );
160- System .setProperty ("top.meethigher.proxy.http.VertxHTTPReverseProxy .name" , next );
170+ final String next = String .valueOf (Integer .getInteger ("top.meethigher.proxy.http.ReverseHttpProxy .name" , 0 ) + 1 );
171+ System .setProperty ("top.meethigher.proxy.http.ReverseHttpProxy .name" , next );
161172 return prefix + next ;
162173 }
163174 } catch (Exception e ) {
@@ -387,7 +398,7 @@ protected void doLog(Route route, HttpServerRequest serverReq, HttpServerRespons
387398 .replace ("{source}" , serverReq .uri ())
388399 .replace ("{target}" , proxyUrl )
389400 .replace ("{statusCode}" , String .valueOf (serverResp .getStatusCode ()))
390- .replace ("{consumedMills}" , String .valueOf (System .currentTimeMillis () - (Long ) route .getMetadata (P_SEND_TIMESTAMP )));
401+ .replace ("{consumedMills}" , String .valueOf (System .currentTimeMillis () - (Long ) route .getMetadata (INTERNAL_SEND_TIMESTAMP )));
391402 log .info (logInfo );
392403 }
393404 }
@@ -408,13 +419,16 @@ protected Handler<AsyncResult<HttpClientResponse>> sendRequestHandler(Route rout
408419 }
409420 // 设置响应码
410421 serverResp .setStatusCode (clientResp .statusCode ());
411- // 流输出
412- clientResp .pipeTo (serverResp ).onSuccess (v -> {
413- doLog (route , serverReq , serverResp , proxyUrl );
414- }).onFailure (e -> {
415- badGateway (route , serverReq , serverResp , proxyUrl );
416- log .error ("{} {} proxy response copy error" , serverReq .method ().name (), proxyUrl , e );
417- });
422+ if ((boolean ) route .getMetadata (INTERNAL_PROXY_SERVER_CONNECTION_OPEN ) && (boolean ) route .getMetadata (INTERNAL_CLIENT_CONNECTION_OPEN )) {
423+ // 流输出
424+ clientResp .pipeTo (serverResp ).onSuccess (v -> {
425+ doLog (route , serverReq , serverResp , proxyUrl );
426+ }).onFailure (e -> {
427+ badGateway (route , serverReq , serverResp , proxyUrl );
428+ log .error ("{} {} proxy response copy error" , serverReq .method ().name (), proxyUrl , e );
429+ });
430+ }
431+
418432 } else {
419433 badGateway (route , serverReq , serverResp , proxyUrl );
420434 Throwable e = ar .cause ();
@@ -430,13 +444,33 @@ protected Handler<AsyncResult<HttpClientRequest>> connectHandler(Route route, Ht
430444 return ar -> {
431445 if (ar .succeeded ()) {
432446 HttpClientRequest clientReq = ar .result ();
447+ // 记录连接状态
448+ route .putMetadata (INTERNAL_PROXY_SERVER_CONNECTION_OPEN , true );
449+
450+ // 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
451+ HttpConnection connection = clientReq .connection ();
452+ SocketAddress remoteAddress = connection .remoteAddress ();
453+ SocketAddress localAddress = connection .localAddress ();
454+ connection .closeHandler (v -> {
455+ route .putMetadata (INTERNAL_PROXY_SERVER_CONNECTION_OPEN , false );
456+ log .debug ("proxyServer connection {}:{} -- {}:{} closed" ,
457+ localAddress .hostAddress (), localAddress .port (),
458+ remoteAddress .hostAddress (), remoteAddress .port ());
459+ });
460+
433461 // 复制请求头。复制的过程中忽略逐跳标头
434462 copyRequestHeaders (route , serverReq , clientReq );
435- // 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
436- if (clientReq .headers ().contains ("Content-Length" ) || clientReq .headers ().contains ("Transfer-Encoding" )) {
437- clientReq .send (serverReq ).onComplete (sendRequestHandler (route , serverReq , serverResp , proxyUrl ));
438- } else {
439- clientReq .send ().onComplete (sendRequestHandler (route , serverReq , serverResp , proxyUrl ));
463+
464+ if ((boolean ) route .getMetadata (INTERNAL_PROXY_SERVER_CONNECTION_OPEN ) && (boolean ) route .getMetadata (INTERNAL_CLIENT_CONNECTION_OPEN )) {
465+ // 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
466+ if (clientReq .headers ().contains ("Content-Length" ) || clientReq .headers ().contains ("Transfer-Encoding" )) {
467+ clientReq .send (serverReq ).onComplete (sendRequestHandler (route , serverReq , serverResp , proxyUrl ));
468+ } else {
469+ clientReq .send ().onComplete (sendRequestHandler (route , serverReq , serverResp , proxyUrl ));
470+ }
471+ } else if ((boolean ) route .getMetadata (INTERNAL_PROXY_SERVER_CONNECTION_OPEN ) && !(boolean ) route .getMetadata (INTERNAL_CLIENT_CONNECTION_OPEN )) {
472+ // 整体链路连接不可用,释放资源
473+ connection .close ();
440474 }
441475 } else {
442476 badGateway (route , serverReq , serverResp , proxyUrl );
@@ -463,7 +497,9 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
463497 Route route = ctx .currentRoute ();
464498
465499 // 记录请求开始时间
466- route .putMetadata (P_SEND_TIMESTAMP , System .currentTimeMillis ());
500+ route .putMetadata (INTERNAL_SEND_TIMESTAMP , System .currentTimeMillis ());
501+ // 记录连接状态
502+ route .putMetadata (INTERNAL_CLIENT_CONNECTION_OPEN , true );
467503
468504 String result = route .getMetadata (P_TARGET_URL ).toString ();
469505 HttpServerRequest serverReq = ctx .request ();
@@ -472,20 +508,34 @@ protected Handler<RoutingContext> routingContextHandler(HttpClient httpClient) {
472508 // 暂停流读取
473509 serverReq .pause ();
474510
511+
475512 String absoluteURI = serverReq .absoluteURI ();
476513 UrlParser .ParsedUrl parsedUrl = UrlParser .parseUrl (absoluteURI );
477514 String prefix = parsedUrl .getFormatHostPort () + (route .getMetadata (P_SOURCE_URL ).toString ().replace ("/*" , "" ));
478515 String proxyUrl = result + (parsedUrl .getFormatUrl ().replace (prefix , "" ));
479516
480-
481517 // 构建请求参数
482518 RequestOptions requestOptions = new RequestOptions ();
483519 requestOptions .setAbsoluteURI (proxyUrl );
484520 requestOptions .setMethod (serverReq .method ());
485521 requestOptions .setFollowRedirects (route .getMetadata (P_FOLLOW_REDIRECTS ) != null && Boolean .parseBoolean (route .getMetadata (P_FOLLOW_REDIRECTS )));
486522
523+
524+ // 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
525+ HttpConnection connection = serverReq .connection ();
526+ SocketAddress remoteAddress = connection .remoteAddress ();
527+ SocketAddress localAddress = connection .localAddress ();
528+ connection .closeHandler (v -> {
529+ route .putMetadata (INTERNAL_CLIENT_CONNECTION_OPEN , false );
530+ log .debug ("client connection {}:{} -- {}:{} closed" ,
531+ remoteAddress .hostAddress (), remoteAddress .port (),
532+ localAddress .hostAddress (), localAddress .port ());
533+ });
534+
487535 // 请求
488- httpClient .request (requestOptions ).onComplete (connectHandler (route , serverReq , serverResp , proxyUrl ));
536+ if ((boolean ) route .getMetadata (INTERNAL_CLIENT_CONNECTION_OPEN )) {
537+ httpClient .request (requestOptions ).onComplete (connectHandler (route , serverReq , serverResp , proxyUrl ));
538+ }
489539 };
490540 }
491541
0 commit comments