@@ -9,10 +9,10 @@ import type { Logger } from "@/common/log";
99import { isCborSerializable , stringifyError } from "@/common/utils" ;
1010import type { UniversalWebSocket } from "@/common/websocket-interface" ;
1111import { ActorInspector } from "@/inspector/actor" ;
12- import type { Registry , RegistryConfig } from "@/mod" ;
12+ import type { Registry } from "@/mod" ;
1313import type { ActionContext } from "./action" ;
1414import type { ActorConfig } from "./config" ;
15- import { Conn , type ConnId } from "./connection" ;
15+ import { Conn , type ConnId , generatePing } from "./connection" ;
1616import { ActorContext } from "./context" ;
1717import type { AnyDatabaseProvider , InferDatabaseClient } from "./database" ;
1818import type { ActorDriver , ConnDriver , ConnDrivers } from "./driver" ;
@@ -157,6 +157,11 @@ export class ActorInstance<
157157 #ready = false ;
158158
159159 #connections = new Map < ConnId , Conn < S , CP , CS , V , I , AD , DB > > ( ) ;
160+ // This is used to track the last ping sent to the client, when restoring a connection
161+ #connectionsPingRequests = new Map <
162+ ConnId ,
163+ { ping : string ; resolve : ( ) => void }
164+ > ( ) ;
160165 #subscriptionIndex = new Map < string , Set < Conn < S , CP , CS , V , I , AD , DB > > > ( ) ;
161166
162167 #schedule! : Schedule ;
@@ -591,6 +596,8 @@ export class ActorInstance<
591596 // Set initial state
592597 this . #setPersist( persistData ) ;
593598
599+ const restorePromises = [ ] ;
600+
594601 // Load connections
595602 for ( const connPersist of this . #persist. c ) {
596603 // Create connections
@@ -601,13 +608,60 @@ export class ActorInstance<
601608 driver ,
602609 this . #connStateEnabled,
603610 ) ;
604- this . #connections. set ( conn . id , conn ) ;
605611
606- // Register event subscriptions
607- for ( const sub of connPersist . su ) {
608- this . #addSubscription( sub . n , conn , true ) ;
609- }
612+ // send ping, to ensure the connection is alive
613+
614+ const { promise, resolve } = Promise . withResolvers < void > ( ) ;
615+ restorePromises . push (
616+ Promise . race ( [
617+ promise ,
618+ new Promise < void > ( ( _ , reject ) => {
619+ setTimeout ( ( ) => {
620+ reject ( ) ;
621+ } , 2500 ) ;
622+ } ) ,
623+ ] )
624+ . catch ( ( ) => {
625+ process . nextTick ( ( ) => {
626+ logger ( ) . debug ( "connection restore failed" , {
627+ connId : conn . id ,
628+ } ) ;
629+ this . #connections. delete ( conn . id ) ;
630+ conn . disconnect (
631+ "Connection restore failed, connection is not alive" ,
632+ ) ;
633+ this . __removeConn ( conn ) ;
634+ } ) ;
635+ } )
636+ . then ( ( ) => {
637+ logger ( ) . debug ( "connection restored" , {
638+ connId : conn . id ,
639+ } ) ;
640+ this . #connections. set ( conn . id , conn ) ;
641+
642+ // Register event subscriptions
643+ for ( const sub of connPersist . su ) {
644+ this . #addSubscription( sub . n , conn , true ) ;
645+ }
646+ } ) ,
647+ ) ;
648+
649+ const ping = generatePing ( ) ;
650+ this . #connectionsPingRequests. set ( conn . id , { ping, resolve } ) ;
651+ conn . _sendMessage (
652+ new CachedSerializer < wsToClient . ToClient > ( {
653+ b : {
654+ p : ping ,
655+ } ,
656+ } ) ,
657+ ) ;
610658 }
659+
660+ const result = await Promise . allSettled ( restorePromises ) ;
661+ logger ( ) . info ( "connections restored" , {
662+ success : result . filter ( ( r ) => r . status === "fulfilled" ) . length ,
663+ failed : result . filter ( ( r ) => r . status === "rejected" ) . length ,
664+ } ) ;
611665 } else {
612666 logger ( ) . info ( "actor creating" ) ;
613667
@@ -818,6 +872,8 @@ export class ActorInstance<
818872 this . #persist. c . push ( persist ) ;
819873 this . saveState ( { immediate : true } ) ;
820874
875+ this . inspector . emitter . emit ( "connectionUpdated" ) ;
876+
821877 // Handle connection
822878 if ( this . #config. onConnect ) {
823879 try {
@@ -841,8 +897,6 @@ export class ActorInstance<
841897 }
842898 }
843899
844- this . inspector . emitter . emit ( "connectionUpdated" ) ;
845-
846900 // Send init message
847901 conn . _sendMessage (
848902 new CachedSerializer < wsToClient . ToClient > ( {
@@ -890,6 +944,14 @@ export class ActorInstance<
890944 } ) ;
891945 this . #removeSubscription( eventName , conn , false ) ;
892946 } ,
947+ onPong : async ( pong , conn ) => {
948+ const pingRequest = this . #connectionsPingRequests. get ( conn . id ) ;
949+ if ( pingRequest ?. ping === pong ) {
950+ // Resolve the ping request if it matches the last sent ping
951+ pingRequest . resolve ( ) ;
952+ this . #connectionsPingRequests. delete ( conn . id ) ;
953+ }
954+ } ,
893955 } ) ;
894956 }
895957
0 commit comments