22
33import java .io .EOFException ;
44import java .io .IOException ;
5+ import java .nio .ByteBuffer ;
6+ import java .util .HashMap ;
57import java .util .Set ;
68import java .util .concurrent .ConcurrentHashMap ;
9+ import java .util .concurrent .Executors ;
10+ import java .util .concurrent .ScheduledExecutorService ;
11+ import java .util .concurrent .TimeUnit ;
712
813import org .eclipse .jetty .websocket .api .Session ;
914
15+ import info .unterrainer .commons .jreutils .ShutdownHook ;
1016import info .unterrainer .oauthtokenmanager .OauthTokenManager ;
1117import io .javalin .websocket .WsBinaryMessageContext ;
1218import io .javalin .websocket .WsCloseContext ;
@@ -21,6 +27,37 @@ public class WsOauthHandlerBase extends WsHandlerBase {
2127 protected OauthTokenManager tokenHandler ;
2228 protected Set <WsConnectContext > clientsConnected = ConcurrentHashMap .newKeySet ();
2329 protected Set <WsConnectContext > clientsQuarantined = ConcurrentHashMap .newKeySet ();
30+ protected HashMap <Session , String > tenantIdsBySession = new HashMap <>();
31+
32+ protected ScheduledExecutorService hb = Executors .newSingleThreadScheduledExecutor (r -> {
33+ Thread t = new Thread (r , "ws-heartbeat" );
34+ t .setDaemon (true );
35+ return t ;
36+ });
37+
38+ public WsOauthHandlerBase () {
39+ super ();
40+ ShutdownHook .register (() -> {
41+ hb .close ();
42+ hb = null ;
43+ });
44+
45+ hb .scheduleAtFixedRate (() -> {
46+ for (WsConnectContext c : clientsConnected ) {
47+ Session s = c .session ;
48+ if (s .isOpen ()) {
49+ try {
50+ s .getRemote ().sendPing (ByteBuffer .allocate (1 ));
51+ } catch (Exception e ) {
52+ try {
53+ s .close (1000 , "heartbeat failed" );
54+ } catch (Exception ignore ) {
55+ }
56+ }
57+ }
58+ }
59+ }, 30 , 30 , TimeUnit .SECONDS );
60+ }
2461
2562 void setTokenHandler (OauthTokenManager tokenHandler ) {
2663 this .tokenHandler = tokenHandler ;
@@ -64,7 +101,8 @@ public void onConnect(WsConnectContext ctx) throws Exception {
64101 }
65102 log .debug ("New client token: [{}]" , token );
66103 try {
67- tokenHandler .checkAccess (token );
104+ String tenantId = tokenHandler .checkAccess (token );
105+ tenantIdsBySession .put (ctx .session , tenantId );
68106 clientsConnected .add (ctx );
69107 } catch (Exception e ) {
70108 log .debug ("Token validation failed for client [{}]. Disconnecting." , ctx .session .getRemoteAddress (), e );
@@ -90,7 +128,8 @@ public final void onMessage(WsMessageContext ctx) throws Exception {
90128 return ;
91129 }
92130 try {
93- tokenHandler .checkAccess (ctx .message ());
131+ String tenantId = tokenHandler .checkAccess (ctx .message ());
132+ tenantIdsBySession .put (ctx .session , tenantId );
94133 WsConnectContext client = getQuarantinedClient (ctx .session );
95134 log .debug ("Client [{}] passed token validation. Moving from quarantine to connected." ,
96135 ctx .session .getRemoteAddress ());
0 commit comments