11import invariant from "invariant" ;
22import type {
3+ ActorKey ,
4+ ActorRouter ,
35 AnyActorInstance as CoreAnyActorInstance ,
46 RegistryConfig ,
57 RunConfig ,
@@ -12,10 +14,10 @@ import type {
1214 ManagerDriver ,
1315} from "rivetkit/driver-helpers" ;
1416import { promiseWithResolvers } from "rivetkit/utils" ;
15- import { buildActorId , parseActorId } from "./actor-id" ;
16- import { GLOBAL_KV_KEYS } from "./global_kv" ;
17+ import { parseActorId } from "./actor-id" ;
18+ import { kvDelete , kvGet , kvListPrefix , kvPut } from "./actor-kv" ;
19+ import { GLOBAL_KV_KEYS } from "./global-kv" ;
1720import { getCloudflareAmbientEnv } from "./handler" ;
18- import { kvDelete , kvGet , kvListPrefix , kvPut } from "./kv_query" ;
1921
2022interface DurableObjectGlobalState {
2123 ctx : DurableObjectState ;
@@ -31,8 +33,8 @@ export class CloudflareDurableObjectGlobalState {
3133 // Map of actor ID -> DO state
3234 #dos: Map < string , DurableObjectGlobalState > = new Map ( ) ;
3335
34- // Map of DO ID -> ActorHandler
35- #actors: Map < string , ActorHandler > = new Map ( ) ;
36+ // WeakMap of DO state -> ActorGlobalState for proper GC
37+ #actors: WeakMap < DurableObjectState , ActorGlobalState > = new WeakMap ( ) ;
3638
3739 getDOState ( doId : string ) : DurableObjectGlobalState {
3840 const state = this . #dos. get ( doId ) ;
@@ -47,20 +49,40 @@ export class CloudflareDurableObjectGlobalState {
4749 this . #dos. set ( doId , state ) ;
4850 }
4951
50- get actors ( ) {
51- return this . #actors;
52+ getActorState ( ctx : DurableObjectState ) : ActorGlobalState | undefined {
53+ return this . #actors. get ( ctx ) ;
54+ }
55+
56+ setActorState ( ctx : DurableObjectState , actorState : ActorGlobalState ) : void {
57+ this . #actors. set ( ctx , actorState ) ;
5258 }
5359}
5460
5561export interface DriverContext {
5662 state : DurableObjectState ;
5763}
5864
59- // Actor handler to track running instances
60- class ActorHandler {
61- actor ?: AnyActorInstance ;
62- actorPromise ?: ReturnType < typeof promiseWithResolvers < void > > =
63- promiseWithResolvers ( ) ;
65+ interface InitializedData {
66+ name : string ;
67+ key : ActorKey ;
68+ generation : number ;
69+ }
70+
71+ interface LoadedActor {
72+ actorRouter : ActorRouter ;
73+ actorDriver : ActorDriver ;
74+ generation : number ;
75+ }
76+
77+ // Actor global state to track running instances
78+ export class ActorGlobalState {
79+ // Initialization state
80+ initialized ?: InitializedData ;
81+
82+ // Loaded actor state
83+ actor ?: LoadedActor ;
84+ actorInstance ?: AnyActorInstance ;
85+ actorPromise ?: ReturnType < typeof promiseWithResolvers < void > > ;
6486
6587 /**
6688 * Indicates if `startDestroy` has been called.
@@ -70,6 +92,14 @@ class ActorHandler {
7092 * See the corresponding `destroyed` property in SQLite metadata.
7193 */
7294 destroying : boolean = false ;
95+
96+ reset ( ) {
97+ this . initialized = undefined ;
98+ this . actor = undefined ;
99+ this . actorInstance = undefined ;
100+ this . actorPromise = undefined ;
101+ this . destroying = false ;
102+ }
73103}
74104
75105export class CloudflareActorsActorDriver implements ActorDriver {
@@ -103,23 +133,36 @@ export class CloudflareActorsActorDriver implements ActorDriver {
103133 // Parse actor ID to get DO ID and generation
104134 const [ doId , expectedGeneration ] = parseActorId ( actorId ) ;
105135
136+ // Get the DO state
137+ const doState = this . #globalState. getDOState ( doId ) ;
138+
106139 // Check if actor is already loaded
107- let handler = this . #globalState. actors . get ( doId ) ;
108- if ( handler ) {
109- if ( handler . actorPromise ) await handler . actorPromise . promise ;
110- if ( ! handler . actor ) throw new Error ( "Actor should be loaded" ) ;
111- return handler . actor ;
140+ let actorState = this . #globalState. getActorState ( doState . ctx ) ;
141+ if ( actorState ?. actorInstance ) {
142+ // Actor is already loaded, return it
143+ return actorState . actorInstance ;
112144 }
113145
114- // Create new actor handler
115- handler = new ActorHandler ( ) ;
116- this . #globalState. actors . set ( doId , handler ) ;
146+ // Create new actor state if it doesn't exist
147+ if ( ! actorState ) {
148+ actorState = new ActorGlobalState ( ) ;
149+ actorState . actorPromise = promiseWithResolvers ( ) ;
150+ this . #globalState. setActorState ( doState . ctx , actorState ) ;
151+ }
117152
118- // Get the actor metadata from Durable Object storage
119- const doState = this . #globalState. getDOState ( doId ) ;
120- const sql = doState . ctx . storage . sql ;
153+ // Another request is already loading this actor, wait for it
154+ if ( actorState . actorPromise ) {
155+ await actorState . actorPromise . promise ;
156+ if ( ! actorState . actorInstance ) {
157+ throw new Error (
158+ `Actor ${ actorId } failed to load in concurrent request` ,
159+ ) ;
160+ }
161+ return actorState . actorInstance ;
162+ }
121163
122- // Load actor metadata from SQL table
164+ // Load actor metadata
165+ const sql = doState . ctx . storage . sql ;
123166 const cursor = sql . exec (
124167 "SELECT name, key, destroyed, generation FROM _rivetkit_metadata LIMIT 1" ,
125168 ) ;
@@ -150,10 +193,10 @@ export class CloudflareActorsActorDriver implements ActorDriver {
150193
151194 // Create actor instance
152195 const definition = lookupInRegistry ( this . #registryConfig, name ) ;
153- handler . actor = definition . instantiate ( ) ;
196+ actorState . actorInstance = definition . instantiate ( ) ;
154197
155198 // Start actor
156- await handler . actor . start (
199+ await actorState . actorInstance . start (
157200 this ,
158201 this . #inlineClient,
159202 actorId ,
@@ -163,10 +206,10 @@ export class CloudflareActorsActorDriver implements ActorDriver {
163206 ) ;
164207
165208 // Finish
166- handler . actorPromise ?. resolve ( ) ;
167- handler . actorPromise = undefined ;
209+ actorState . actorPromise ?. resolve ( ) ;
210+ actorState . actorPromise = undefined ;
168211
169- return handler . actor ;
212+ return actorState . actorInstance ;
170213 }
171214
172215 getContext ( actorId : string ) : DriverContext {
@@ -231,21 +274,32 @@ export class CloudflareActorsActorDriver implements ActorDriver {
231274 // Parse actor ID to get DO ID and generation
232275 const [ doId , generation ] = parseActorId ( actorId ) ;
233276
234- const handler = this . #globalState. actors . get ( doId ) ;
277+ // Get the DO state
278+ const doState = this . #globalState. getDOState ( doId ) ;
279+ const actorState = this . #globalState. getActorState ( doState . ctx ) ;
235280
236281 // Actor not loaded, nothing to destroy
237- if ( ! handler || ! handler . actor ) {
282+ if ( ! actorState ?. actorInstance ) {
238283 return ;
239284 }
240285
241286 // Check if already destroying
242- if ( handler . destroying ) {
287+ if ( actorState . destroying ) {
243288 return ;
244289 }
245- handler . destroying = true ;
290+ actorState . destroying = true ;
291+
292+ // Spawn onStop in background
293+ this . #callOnStopAsync( actorId , doId , actorState . actorInstance ) ;
294+ }
246295
296+ async #callOnStopAsync(
297+ actorId : string ,
298+ doId : string ,
299+ actor : CoreAnyActorInstance ,
300+ ) {
247301 // Stop
248- handler . actor . onStop ( "destroy" ) ;
302+ await actor . onStop ( "destroy" ) ;
249303
250304 // Remove state
251305 const doState = this . #globalState. getDOState ( doId ) ;
@@ -254,15 +308,17 @@ export class CloudflareActorsActorDriver implements ActorDriver {
254308 sql . exec ( "DELETE FROM _rivetkit_kv_storage" ) ;
255309
256310 // Clear any scheduled alarms
257- doState . ctx . storage . deleteAlarm ( ) ;
311+ await doState . ctx . storage . deleteAlarm ( ) ;
258312
259313 // Delete from ACTOR_KV in the background - use full actorId including generation
260314 const env = getCloudflareAmbientEnv ( ) ;
261315 doState . ctx . waitUntil (
262316 env . ACTOR_KV . delete ( GLOBAL_KV_KEYS . actorMetadata ( actorId ) ) ,
263317 ) ;
264318
265- this . #globalState. actors . delete ( doId ) ;
319+ // Reset global state using the DO context
320+ const actorHandle = this . #globalState. getActorState ( doState . ctx ) ;
321+ actorHandle ?. reset ( ) ;
266322 }
267323}
268324
0 commit comments