1- import type { PersistedConn } from "./connection" ;
21import type { Logger } from "@/common//log" ;
3- import { type ActorTags , isJsonSerializable , stringifyError } from "@/common//utils" ;
2+ import {
3+ type ActorTags ,
4+ isJsonSerializable ,
5+ stringifyError ,
6+ } from "@/common//utils" ;
47import onChange from "on-change" ;
58import type { ActorConfig } from "./config" ;
69import { Conn , type ConnId } from "./connection" ;
@@ -9,15 +12,15 @@ import type { ConnDriver } from "./driver";
912import * as errors from "./errors" ;
1013import { processMessage } from "./protocol/message/mod" ;
1114import { instanceLogger , logger } from "./log" ;
12- import { ActionContext } from "./action" ;
15+ import type { ActionContext } from "./action" ;
1316import { Lock , deadline } from "./utils" ;
1417import { Schedule } from "./schedule" ;
15- import { KEYS } from "./keys" ;
1618import type * as wsToServer from "@/actor/protocol/message/to-server" ;
1719import { CachedSerializer } from "./protocol/serde" ;
1820import { ActorInspector } from "@/inspector/actor" ;
1921import { ActorContext } from "./context" ;
2022import invariant from "invariant" ;
23+ import type { PersistedActor , PersistedConn , PersistedScheduleEvents } from "./persisted" ;
2124
2225/**
2326 * Options for the `_saveState` method.
@@ -72,14 +75,6 @@ export type ExtractActorConnState<A extends AnyActorInstance> =
7275 ? ConnState
7376 : never ;
7477
75- /** State object that gets automatically persisted to storage. */
76- interface PersistedActor < S , CP , CS > {
77- // State
78- s : S ;
79- // Connections
80- c : PersistedConn < CP , CS > [ ] ;
81- }
82-
8378export class ActorInstance < S , CP , CS , V > {
8479 // Shared actor context for this instance
8580 actorContext : ActorContext < S , CP , CS , V > ;
@@ -155,7 +150,7 @@ export class ActorInstance<S, CP, CS, V> {
155150 this . #name = name ;
156151 this . #tags = tags ;
157152 this . #region = region ;
158- this . #schedule = new Schedule ( this , actorDriver ) ;
153+ this . #schedule = new Schedule ( this ) ;
159154 this . inspector = new ActorInspector ( this ) ;
160155
161156 // Initialize server
@@ -171,7 +166,12 @@ export class ActorInstance<S, CP, CS, V> {
171166 let vars : V | undefined = undefined ;
172167 if ( "createVars" in this . #config) {
173168 const dataOrPromise = this . #config. createVars (
174- this . actorContext as unknown as ActorContext < undefined , undefined , undefined , undefined > ,
169+ this . actorContext as unknown as ActorContext <
170+ undefined ,
171+ undefined ,
172+ undefined ,
173+ undefined
174+ > ,
175175 this . #actorDriver. getContext ( this . #actorId) ,
176176 ) ;
177177 if ( dataOrPromise instanceof Promise ) {
@@ -200,8 +200,101 @@ export class ActorInstance<S, CP, CS, V> {
200200 this . #ready = true ;
201201 }
202202
203+ async scheduleEvent (
204+ timestamp : number ,
205+ fn : string ,
206+ args : unknown [ ] ,
207+ ) : Promise < void > {
208+ // Build event
209+ const eventId = crypto . randomUUID ( ) ;
210+ const newEvent : PersistedScheduleEvents = {
211+ e : eventId ,
212+ t : timestamp ,
213+ a : fn ,
214+ ar : args ,
215+ } ;
216+
217+ this . actorContext . log . info ( "scheduling event" , {
218+ event : eventId ,
219+ timestamp,
220+ action : fn
221+ } ) ;
222+
223+ // Insert event in to index
224+ const insertIndex = this . #persist. e . findIndex ( ( x ) => x . t > newEvent . t ) ;
225+ if ( insertIndex === - 1 ) {
226+ this . #persist. e . push ( newEvent ) ;
227+ } else {
228+ this . #persist. e . splice ( insertIndex , 0 , newEvent ) ;
229+ }
230+
231+ // Update alarm if:
232+ // - this is the newest event (i.e. at beginning of array) or
233+ // - this is the only event (i.e. the only event in the array)
234+ if ( insertIndex === 0 || this . #persist. e . length === 1 ) {
235+ this . actorContext . log . info ( "setting alarm" , { timestamp } ) ;
236+ await this . #actorDriver. setAlarm ( this , newEvent . t ) ;
237+ }
238+ }
239+
203240 async onAlarm ( ) {
204- await this . #schedule. __onAlarm ( ) ;
241+ const now = Date . now ( ) ;
242+ this . actorContext . log . debug ( "alarm triggered" , { now, events : this . #persist. e . length } ) ;
243+
244+ // Remove events from schedule that we're about to run
245+ const runIndex = this . #persist. e . findIndex ( ( x ) => x . t <= now ) ;
246+ if ( runIndex === - 1 ) {
247+ this . actorContext . log . debug ( "no events to run" , { now } ) ;
248+ return ;
249+ }
250+ const scheduleEvents = this . #persist. e . splice ( 0 , runIndex + 1 ) ;
251+ this . actorContext . log . debug ( "running events" , { count : scheduleEvents . length } ) ;
252+
253+ // Set alarm for next event
254+ if ( this . #persist. e . length > 0 ) {
255+ await this . #actorDriver. setAlarm ( this , this . #persist. e [ 0 ] . t ) ;
256+ }
257+
258+ // Iterate by event key in order to ensure we call the events in order
259+ for ( const event of scheduleEvents ) {
260+ try {
261+ this . actorContext . log . info ( "running action for event" , {
262+ event : event . e ,
263+ timestamp : event . t ,
264+ action : event . a ,
265+ args : event . ar
266+ } ) ;
267+
268+ // Look up function
269+ const fn : unknown = this . #config. actions [ event . a ] ;
270+ if ( ! fn ) throw new Error ( `Missing action for alarm ${ event . a } ` ) ;
271+ if ( typeof fn !== "function" )
272+ throw new Error (
273+ `Alarm function lookup for ${ event . a } returned ${ typeof fn } ` ,
274+ ) ;
275+
276+ // Call function
277+ try {
278+ await fn . call ( undefined , this . actorContext , ...event . ar ) ;
279+ } catch ( error ) {
280+ this . actorContext . log . error ( "error while running event" , {
281+ error : stringifyError ( error ) ,
282+ event : event . e ,
283+ timestamp : event . t ,
284+ action : event . a ,
285+ args : event . ar ,
286+ } ) ;
287+ }
288+ } catch ( error ) {
289+ this . actorContext . log . error ( "internal error while running event" , {
290+ error : stringifyError ( error ) ,
291+ event : event . e ,
292+ timestamp : event . t ,
293+ action : event . a ,
294+ args : event . ar ,
295+ } ) ;
296+ }
297+ }
205298 }
206299
207300 get stateEnabled ( ) {
@@ -268,9 +361,8 @@ export class ActorInstance<S, CP, CS, V> {
268361 this . #persistChanged = false ;
269362
270363 // Write to KV
271- await this . #actorDriver. kvPut (
364+ await this . #actorDriver. writePersistedData (
272365 this . #actorId,
273- KEYS . STATE . DATA ,
274366 this . #persistRaw,
275367 ) ;
276368
@@ -359,12 +451,11 @@ export class ActorInstance<S, CP, CS, V> {
359451
360452 async #initialize( ) {
361453 // Read initial state
362- const [ initialized , persistData ] = ( await this . #actorDriver. kvGetBatch (
454+ const persistData = ( await this . #actorDriver. readPersistedData (
363455 this . #actorId,
364- [ KEYS . STATE . INITIALIZED , KEYS . STATE . DATA ] ,
365- ) ) as [ boolean , PersistedActor < S , CP , CS > ] ;
456+ ) ) as PersistedActor < S , CP , CS > ;
366457
367- if ( initialized ) {
458+ if ( persistData !== undefined ) {
368459 logger ( ) . info ( "actor restoring" , {
369460 connections : persistData . c . length ,
370461 } ) ;
@@ -406,7 +497,12 @@ export class ActorInstance<S, CP, CS, V> {
406497
407498 // Convert state to undefined since state is not defined yet here
408499 stateData = await this . #config. createState (
409- this . actorContext as unknown as ActorContext < undefined , undefined , undefined , undefined > ,
500+ this . actorContext as unknown as ActorContext <
501+ undefined ,
502+ undefined ,
503+ undefined ,
504+ undefined
505+ > ,
410506 ) ;
411507 } else if ( "state" in this . #config) {
412508 stateData = structuredClone ( this . #config. state ) ;
@@ -420,14 +516,12 @@ export class ActorInstance<S, CP, CS, V> {
420516 const persist : PersistedActor < S , CP , CS > = {
421517 s : stateData as S ,
422518 c : [ ] ,
519+ e : [ ] ,
423520 } ;
424521
425522 // Update state
426523 logger ( ) . debug ( "writing state" ) ;
427- await this . #actorDriver. kvPutBatch ( this . #actorId, [
428- [ KEYS . STATE . INITIALIZED , true ] ,
429- [ KEYS . STATE . DATA , persist ] ,
430- ] ) ;
524+ await this . #actorDriver. writePersistedData ( this . #actorId, persist ) ;
431525
432526 this . #setPersist( persist ) ;
433527 }
@@ -509,7 +603,12 @@ export class ActorInstance<S, CP, CS, V> {
509603 if ( this . #connStateEnabled) {
510604 if ( "createConnState" in this . #config) {
511605 const dataOrPromise = this . #config. createConnState (
512- this . actorContext as unknown as ActorContext < undefined , undefined , undefined , undefined > ,
606+ this . actorContext as unknown as ActorContext <
607+ undefined ,
608+ undefined ,
609+ undefined ,
610+ undefined
611+ > ,
513612 onBeforeConnectOpts ,
514613 ) ;
515614 if ( dataOrPromise instanceof Promise ) {
@@ -723,6 +822,8 @@ export class ActorInstance<S, CP, CS, V> {
723822 rpcName : string ,
724823 args : unknown [ ] ,
725824 ) : Promise < unknown > {
825+ invariant ( this . #ready, "exucuting rpc before ready" ) ;
826+
726827 // Prevent calling private or reserved methods
727828 if ( ! ( rpcName in this . #config. actions ) ) {
728829 logger ( ) . warn ( "rpc does not exist" , { rpcName } ) ;
0 commit comments