3838public class ReverseTcpProxyTunnelClient extends TunnelClient {
3939 private static final Logger log = LoggerFactory .getLogger (ReverseTcpProxyTunnelClient .class );
4040 protected static final char [] ID_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" .toCharArray ();
41- protected static final long HEARTBEAT_DELAY_DEFAULT = 5000 ; // 毫秒
41+
4242 protected static final long MIN_DELAY_DEFAULT = 1000 ;// 毫秒
4343 protected static final long MAX_DELAY_DEFAULT = 64000 ;// 毫秒
4444
4545
46- protected final long heartbeatDelay ;
4746 protected final String secret ;
4847 protected final String name ;
4948
50-
49+ protected long heartbeatDelay ;
5150 protected String backendHost = "meethigher.top" ;
5251 protected int backendPort = 22 ;
5352 protected String dataProxyHost = "127.0.0.1" ;
@@ -75,10 +74,9 @@ protected static String generateName() {
7574 }
7675
7776 protected ReverseTcpProxyTunnelClient (Vertx vertx , NetClient netClient ,
78- long minDelay , long maxDelay , long heartbeatDelay ,
77+ long minDelay , long maxDelay ,
7978 String secret , String name ) {
8079 super (vertx , netClient , minDelay , maxDelay );
81- this .heartbeatDelay = heartbeatDelay ;
8280 this .secret = secret ;
8381 this .name = name ;
8482 addMessageHandler ();
@@ -106,24 +104,25 @@ public ReverseTcpProxyTunnelClient dataProxyPort(int dataProxyPort) {
106104
107105 public ReverseTcpProxyTunnelClient dataProxyName (String dataProxyName ) {
108106 this .dataProxyName = dataProxyName ;
107+ super .name = this .dataProxyName ;
109108 return this ;
110109 }
111110
112- public static ReverseTcpProxyTunnelClient create (Vertx vertx , NetClient netClient , long minDelay , long maxDelay , long heartbeatDelay , String secret , String name ) {
113- return new ReverseTcpProxyTunnelClient (vertx , netClient , minDelay , maxDelay , heartbeatDelay , secret , name );
111+ public static ReverseTcpProxyTunnelClient create (Vertx vertx , NetClient netClient , long minDelay , long maxDelay , String secret , String name ) {
112+ return new ReverseTcpProxyTunnelClient (vertx , netClient , minDelay , maxDelay , secret , name );
114113 }
115114
116115 public static ReverseTcpProxyTunnelClient create (Vertx vertx , NetClient netClient , String secret ) {
117- return new ReverseTcpProxyTunnelClient (vertx , netClient , MIN_DELAY_DEFAULT , MAX_DELAY_DEFAULT , HEARTBEAT_DELAY_DEFAULT , secret , generateName ());
116+ return new ReverseTcpProxyTunnelClient (vertx , netClient , MIN_DELAY_DEFAULT , MAX_DELAY_DEFAULT , secret , generateName ());
118117 }
119118
120119
121120 public static ReverseTcpProxyTunnelClient create (Vertx vertx , NetClient netClient ) {
122- return new ReverseTcpProxyTunnelClient (vertx , netClient , MIN_DELAY_DEFAULT , MAX_DELAY_DEFAULT , HEARTBEAT_DELAY_DEFAULT , SECRET_DEFAULT , generateName ());
121+ return new ReverseTcpProxyTunnelClient (vertx , netClient , MIN_DELAY_DEFAULT , MAX_DELAY_DEFAULT , SECRET_DEFAULT , generateName ());
123122 }
124123
125124 public static ReverseTcpProxyTunnelClient create (Vertx vertx ) {
126- return new ReverseTcpProxyTunnelClient (vertx , vertx .createNetClient (), MIN_DELAY_DEFAULT , MAX_DELAY_DEFAULT , HEARTBEAT_DELAY_DEFAULT , SECRET_DEFAULT , generateName ());
125+ return new ReverseTcpProxyTunnelClient (vertx , vertx .createNetClient (), MIN_DELAY_DEFAULT , MAX_DELAY_DEFAULT , SECRET_DEFAULT , generateName ());
127126 }
128127
129128
@@ -135,6 +134,7 @@ protected void addMessageHandler() {
135134 this .onConnected ((vertx , netSocket , buffer ) -> netSocket .write (encode (TunnelMessageType .OPEN_DATA_PORT ,
136135 TunnelMessage .OpenDataPort .newBuilder ()
137136 .setSecret (secret )
137+ .setDataProxyHost (dataProxyHost )
138138 .setDataProxyPort (dataProxyPort )
139139 .setDataProxyName (dataProxyName )
140140 .build ().toByteArray ())));
@@ -149,6 +149,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
149149 if (parsed .getSuccess ()) {
150150 // 如果认证 + 开通端口成功,那么就需要进行长连接保持,并开启定期心跳。
151151 result = true ;
152+ heartbeatDelay = parsed .getHeartbeatDelay ();
152153 vertx .setTimer (heartbeatDelay , id -> netSocket .write (encode (TunnelMessageType .HEARTBEAT ,
153154 TunnelMessage .Heartbeat .newBuilder ().setTimestamp (System .currentTimeMillis ()).build ().toByteArray ())));
154155 } else {
@@ -186,45 +187,55 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
186187 if (ar .succeeded ()) {
187188 final NetSocket dataSocket = ar .result ();
188189 dataSocket .pause ();
190+ // 连接建立成功后,立马发送消息告诉数据服务,我是数据连接,并与用户连接进行绑定
191+ dataSocket .write (Buffer .buffer ()
192+ .appendBytes (DATA_CONN_FLAG )
193+ .appendInt (sessionId ));
194+ log .debug ("{}: data connection {} established, notify data proxy server of current session id {}. wait for backend connection" ,
195+ dataProxyName ,
196+ dataSocket .remoteAddress (),
197+ sessionId );
189198 netClient .connect (backendPort , backendHost ).onComplete (rst -> {
190199 if (rst .succeeded ()) {
191200 atomicResult .set (rst .succeeded ());
192201 final NetSocket backendSocket = rst .result ();
193202 backendSocket .pause ();
194- dataSocket .write (Buffer .buffer ()
195- .appendBytes (DATA_CONN_FLAG )
196- .appendInt (sessionId ));
203+ log .debug ("{}: backend connection {} established" , dataProxyName , backendSocket .remoteAddress ());
197204 // 双向生命周期绑定、双向数据转发
198205 dataSocket .closeHandler (v -> {
199- log .debug ("data connection {} closed" , dataSocket .remoteAddress ());
206+ log .debug ("{}: data connection {} closed" , dataProxyName , dataSocket .remoteAddress ());
200207 backendSocket .close ();
201208 }).pipeTo (backendSocket ).onFailure (e -> {
202- log .error ("data connection {} pipe to backend connection {} failed, connection will be closed" ,
209+ log .error ("{}: data connection {} pipe to backend connection {} failed, connection will be closed" ,
210+ dataProxyName ,
203211 dataSocket .remoteAddress (), backendSocket .remoteAddress (), e );
204212 dataSocket .close ();
205213 });
206214 backendSocket .closeHandler (v -> {
207- log .debug ("backend connection {} closed" , backendSocket .remoteAddress ());
215+ log .debug ("{}: backend connection {} closed" , dataProxyName , backendSocket .remoteAddress ());
208216 dataSocket .close ();
209217 }).pipeTo (dataSocket ).onFailure (e -> {
210- log .error ("backend connection {} pipe to data connection {} failed, connection will be closed" ,
218+ log .error ("{}: backend connection {} pipe to data connection {} failed, connection will be closed" ,
219+ dataProxyName ,
211220 backendSocket .remoteAddress (), dataSocket .remoteAddress (), e );
212221 backendSocket .close ();
213222 });
214223 backendSocket .resume ();
215224 dataSocket .resume ();
216- log .debug ("data connection {} bound to backend connection {} for session id {}" ,
225+ log .debug ("{}: data connection {} bound to backend connection {} for session id {}" ,
226+ dataProxyName ,
217227 dataSocket .remoteAddress (),
218228 backendSocket .remoteAddress (),
219229 sessionId );
220230 } else {
221- log .error ("client open backend connection to {}:{} failed" ,
231+ log .error ("{}: client open backend connection to {}:{} failed" ,
232+ dataProxyName ,
222233 backendHost , backendPort , rst .cause ());
223234 }
224235 latch .countDown ();
225236 });
226237 } else {
227- log .error ("client open data connection to {}:{} failed" , dataProxyHost , dataProxyPort , ar .cause ());
238+ log .error ("{}: client open data connection to {}:{} failed" , dataProxyName , dataProxyHost , dataProxyPort , ar .cause ());
228239 latch .countDown ();
229240 }
230241
0 commit comments