@@ -7,8 +7,11 @@ use rustc_hash::FxHashMap;
77use spacetimedb_client_api_messages:: energy:: EnergyQuanta ;
88use spacetimedb_lib:: scheduler:: ScheduleAt ;
99use spacetimedb_lib:: Timestamp ;
10+ use spacetimedb_primitives:: ReducerId ;
1011use spacetimedb_primitives:: { ColId , TableId } ;
1112use spacetimedb_sats:: { bsatn:: ToBsatn as _, AlgebraicValue } ;
13+ use spacetimedb_schema:: def:: deserialize:: ArgsSeed ;
14+ use spacetimedb_schema:: def:: ReducerDef ;
1215use spacetimedb_table:: table:: RowRef ;
1316use tokio:: sync:: mpsc;
1417use tokio:: time:: Instant ;
@@ -28,11 +31,11 @@ use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHED
2831use spacetimedb_datastore:: traits:: IsolationLevel ;
2932
3033#[ derive( Copy , Clone , Eq , PartialEq , Hash ) ]
31- pub struct ScheduledReducerId {
32- /// The ID of the table whose rows hold the scheduled reducers.
33- /// This table should have a entry in `ST_SCHEDULED`.
34+ pub struct ScheduledFunctionId {
35+ /// The ID of the table whose rows hold the scheduled reducers or procedures .
36+ /// This table should have an entry in `ST_SCHEDULED`.
3437 table_id : TableId ,
35- /// The particular schedule row in the reducer scheduling table referred to by `self.table_id`.
38+ /// The particular schedule row in the scheduling table referred to by `self.table_id`.
3639 schedule_id : u64 ,
3740 // These may seem redundant, but they're actually free - they fit in the struct padding.
3841 // `scheduled_id: u64, table_id: u32, id_column: u16, at_column: u16` == 16 bytes, same as
@@ -43,7 +46,7 @@ pub struct ScheduledReducerId {
4346 at_column : ColId ,
4447}
4548
46- spacetimedb_table:: static_assert_size!( ScheduledReducerId , 16 ) ;
49+ spacetimedb_table:: static_assert_size!( ScheduledFunctionId , 16 ) ;
4750
4851enum MsgOrExit < T > {
4952 Msg ( T ) ,
@@ -52,20 +55,20 @@ enum MsgOrExit<T> {
5255
5356enum SchedulerMessage {
5457 Schedule {
55- id : ScheduledReducerId ,
58+ id : ScheduledFunctionId ,
5659 /// The timestamp we'll tell the reducer it is.
5760 effective_at : Timestamp ,
5861 /// The actual instant we're scheduling for.
5962 real_at : Instant ,
6063 } ,
6164 ScheduleImmediate {
62- reducer_name : String ,
65+ function_name : String ,
6366 args : FunctionArgs ,
6467 } ,
6568}
6669
67- pub struct ScheduledReducer {
68- reducer : Box < str > ,
70+ pub struct ScheduledFunction {
71+ function : Box < str > ,
6972 bsatn_args : Vec < u8 > ,
7073}
7174
@@ -121,7 +124,7 @@ impl SchedulerStarter {
121124 // calculate duration left to call the scheduled reducer
122125 let duration = schedule_at. to_duration_from ( now_ts) ;
123126 let at = schedule_at. to_timestamp_from ( now_ts) ;
124- let id = ScheduledReducerId {
127+ let id = ScheduledFunctionId {
125128 table_id,
126129 schedule_id,
127130 id_column,
@@ -130,7 +133,7 @@ impl SchedulerStarter {
130133 let key = queue. insert_at ( QueueItem :: Id { id, at } , now_instant + duration) ;
131134
132135 // This should never happen as duplicate entries should be gated by unique
133- // constraint voilation in scheduled tables.
136+ // constraint violation in scheduled tables.
134137 if key_map. insert ( id, key) . is_some ( ) {
135138 return Err ( anyhow ! (
136139 "Duplicate key found in scheduler queue: table_id {}, schedule_id {}" ,
@@ -195,21 +198,24 @@ pub enum ScheduleError {
195198}
196199
197200impl Scheduler {
198- /// Schedule a reducer to run from a scheduled table.
201+ /// Schedule a reducer/procedure to run from a scheduled table.
199202 ///
200- /// `reducer_start ` is the timestamp of the start of the current reducer.
203+ /// `fn_start ` is the timestamp of the start of the current reducer/procedure .
201204 pub ( super ) fn schedule (
202205 & self ,
203206 table_id : TableId ,
204207 schedule_id : u64 ,
205208 schedule_at : ScheduleAt ,
206209 id_column : ColId ,
207210 at_column : ColId ,
208- reducer_start : Timestamp ,
211+ fn_start : Timestamp ,
209212 ) -> Result < ( ) , ScheduleError > {
210213 // if `Timestamp::now()` is properly monotonic, use it; otherwise, use
211214 // the start of the reducer run as "now" for purposes of scheduling
212- let now = reducer_start. max ( Timestamp :: now ( ) ) ;
215+ // TODO(procedure-tx): when we do `with_tx` in a procedure,
216+ // it inherits the timestamp of the procedure,
217+ // which could become a problem here for long running procedures.
218+ let now = fn_start. max ( Timestamp :: now ( ) ) ;
213219
214220 // Check that `at` is within `tokio_utils::time::DelayQueue`'s
215221 // accepted time-range.
@@ -229,7 +235,7 @@ impl Scheduler {
229235 // if the actor has exited, it's fine to ignore; it means that the host actor calling
230236 // schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
231237 let _ = self . tx . send ( MsgOrExit :: Msg ( SchedulerMessage :: Schedule {
232- id : ScheduledReducerId {
238+ id : ScheduledFunctionId {
233239 table_id,
234240 schedule_id,
235241 id_column,
@@ -242,9 +248,9 @@ impl Scheduler {
242248 Ok ( ( ) )
243249 }
244250
245- pub fn volatile_nonatomic_schedule_immediate ( & self , reducer_name : String , args : FunctionArgs ) {
251+ pub fn volatile_nonatomic_schedule_immediate ( & self , function_name : String , args : FunctionArgs ) {
246252 let _ = self . tx . send ( MsgOrExit :: Msg ( SchedulerMessage :: ScheduleImmediate {
247- reducer_name ,
253+ function_name ,
248254 args,
249255 } ) ) ;
250256 }
@@ -261,13 +267,13 @@ impl Scheduler {
261267struct SchedulerActor {
262268 rx : mpsc:: UnboundedReceiver < MsgOrExit < SchedulerMessage > > ,
263269 queue : DelayQueue < QueueItem > ,
264- key_map : FxHashMap < ScheduledReducerId , delay_queue:: Key > ,
270+ key_map : FxHashMap < ScheduledFunctionId , delay_queue:: Key > ,
265271 module_host : WeakModuleHost ,
266272}
267273
268274pub ( crate ) enum QueueItem {
269- Id { id : ScheduledReducerId , at : Timestamp } ,
270- VolatileNonatomicImmediate { reducer_name : String , args : FunctionArgs } ,
275+ Id { id : ScheduledFunctionId , at : Timestamp } ,
276+ VolatileNonatomicImmediate { function_name : String , args : FunctionArgs } ,
271277}
272278
273279#[ cfg( target_pointer_width = "64" ) ]
@@ -304,9 +310,9 @@ impl SchedulerActor {
304310 let key = self . queue . insert_at ( QueueItem :: Id { id, at : effective_at } , real_at) ;
305311 self . key_map . insert ( id, key) ;
306312 }
307- SchedulerMessage :: ScheduleImmediate { reducer_name , args } => {
313+ SchedulerMessage :: ScheduleImmediate { function_name , args } => {
308314 self . queue . insert (
309- QueueItem :: VolatileNonatomicImmediate { reducer_name , args } ,
315+ QueueItem :: VolatileNonatomicImmediate { function_name , args } ,
310316 Duration :: ZERO ,
311317 ) ;
312318 }
@@ -332,43 +338,44 @@ impl SchedulerActor {
332338 let res = tokio:: spawn ( async move { module_host. call_scheduled_reducer ( item) . await } ) . await ;
333339
334340 match res {
335- // if we didn't actually call the reducer because the module exited or it was already deleted, leave
336- // the ScheduledReducer in the database for when the module restarts
341+ // If we didn't actually call the function
342+ // because the module exited or it was already deleted,
343+ // leave the `ScheduledFunction` in the database for when the module restarts.
337344 Ok ( Err ( ReducerCallError :: NoSuchModule ( _) ) | Err ( ReducerCallError :: ScheduleReducerNotFound ) ) => { }
338345
339346 Ok ( Ok ( ( _, ts) ) ) => {
340347 if let Some ( id) = id {
341- let _ = self . delete_scheduled_reducer_row ( & db, id, module_host_clone, ts) . await ;
348+ let _ = self . delete_scheduled_function_row ( & db, id, module_host_clone, ts) . await ;
342349 }
343350 }
344351
345- // delete the scheduled reducer row if its not repeated reducer
352+ // Delete the scheduled function row if its not repeated function.
346353 Ok ( _) | Err ( _) => {
347354 if let Some ( id) = id {
348355 // TODO: Handle errors here?
349356 let _ = self
350- . delete_scheduled_reducer_row ( & db, id, module_host_clone, Timestamp :: now ( ) )
357+ . delete_scheduled_function_row ( & db, id, module_host_clone, Timestamp :: now ( ) )
351358 . await ;
352359 }
353360 }
354361 }
355362
356363 if let Err ( e) = res {
357- log:: error!( "invoking scheduled reducer failed: {e:#}" ) ;
364+ log:: error!( "invoking scheduled function failed: {e:#}" ) ;
358365 } ;
359366 }
360367
361- async fn delete_scheduled_reducer_row (
368+ async fn delete_scheduled_function_row (
362369 & mut self ,
363370 db : & RelationalDB ,
364- id : ScheduledReducerId ,
371+ id : ScheduledFunctionId ,
365372 module_host : ModuleHost ,
366373 ts : Timestamp ,
367374 ) -> anyhow:: Result < ( ) > {
368375 let host_clone = module_host. clone ( ) ;
369376 let db = db. clone ( ) ;
370377 let schedule_at = host_clone
371- . on_module_thread ( "delete_scheduled_reducer_row " , move || {
378+ . on_module_thread ( "delete_scheduled_function_row " , move || {
372379 let mut tx = db. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: Internal ) ;
373380
374381 match get_schedule_row_mut ( & tx, & db, id) {
@@ -426,19 +433,16 @@ pub(crate) fn handle_queued_call_reducer_params(
426433 let Ok ( schedule_row) = get_schedule_row_mut ( tx, db, id) else {
427434 // if the row is not found, it means the schedule is cancelled by the user
428435 log:: debug!(
429- "table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}" ,
436+ "table row corresponding to yield scheduler id not found: table_id {}, scheduler_id {}" ,
430437 id. table_id,
431438 id. schedule_id
432439 ) ;
433440 return Ok ( None ) ;
434441 } ;
435442
436- let ScheduledReducer { reducer , bsatn_args } = process_schedule ( tx, db, id. table_id , & schedule_row) ?;
443+ let ScheduledFunction { function , bsatn_args } = process_schedule ( tx, db, id. table_id , & schedule_row) ?;
437444
438- let ( reducer_id, reducer_seed) = module_info
439- . module_def
440- . reducer_arg_deserialize_seed ( & reducer[ ..] )
441- . ok_or_else ( || anyhow ! ( "Reducer not found: {reducer}" ) ) ?;
445+ let ( reducer_id, reducer_seed) = find_reducer ( module_info, & function) ?;
442446
443447 let reducer_args = FunctionArgs :: Bsatn ( bsatn_args. into ( ) ) . into_tuple ( reducer_seed) ?;
444448
@@ -453,11 +457,8 @@ pub(crate) fn handle_queued_call_reducer_params(
453457 reducer_args,
454458 ) ) )
455459 }
456- QueueItem :: VolatileNonatomicImmediate { reducer_name, args } => {
457- let ( reducer_id, reducer_seed) = module_info
458- . module_def
459- . reducer_arg_deserialize_seed ( & reducer_name[ ..] )
460- . ok_or_else ( || anyhow ! ( "Reducer not found: {reducer_name}" ) ) ?;
460+ QueueItem :: VolatileNonatomicImmediate { function_name, args } => {
461+ let ( reducer_id, reducer_seed) = find_reducer ( module_info, & function_name) ?;
461462 let reducer_args = args. into_tuple ( reducer_seed) ?;
462463
463464 Ok ( Some ( CallReducerParams :: from_system (
@@ -470,6 +471,13 @@ pub(crate) fn handle_queued_call_reducer_params(
470471 }
471472}
472473
474+ fn find_reducer < ' a > ( module_info : & ' a ModuleInfo , name : & str ) -> anyhow:: Result < ( ReducerId , ArgsSeed < ' a , ReducerDef > ) > {
475+ module_info
476+ . module_def
477+ . reducer_arg_deserialize_seed ( name)
478+ . ok_or_else ( || anyhow ! ( "Reducer not found: {name}" ) )
479+ }
480+
473481fn commit_and_broadcast_deletion_event ( tx : MutTxId , module_host : ModuleHost ) {
474482 let caller_identity = module_host. info ( ) . database_identity ;
475483
@@ -495,40 +503,41 @@ fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) {
495503 }
496504}
497505
498- /// Generate `ScheduledReducer` for given `ScheduledReducerId`
506+ /// Generate [`ScheduledFunction`] for given [`ScheduledFunctionId`].
499507fn process_schedule (
500508 tx : & MutTxId ,
501509 db : & RelationalDB ,
502510 table_id : TableId ,
503511 schedule_row : & RowRef < ' _ > ,
504- ) -> Result < ScheduledReducer , anyhow:: Error > {
505- // get reducer name from `ST_SCHEDULED` table
512+ ) -> Result < ScheduledFunction , anyhow:: Error > {
513+ // Get reducer name from `ST_SCHEDULED` table.
506514 let table_id_col = StScheduledFields :: TableId . col_id ( ) ;
507- let reducer_name_col = StScheduledFields :: ReducerName . col_id ( ) ;
515+ let function_name_col = StScheduledFields :: ReducerName . col_id ( ) ;
508516 let st_scheduled_row = db
509517 . iter_by_col_eq_mut ( tx, ST_SCHEDULED_ID , table_id_col, & table_id. into ( ) ) ?
510518 . next ( )
511519 . ok_or_else ( || anyhow ! ( "Scheduled table with id {table_id} entry does not exist in `st_scheduled`" ) ) ?;
512- let reducer = st_scheduled_row. read_col :: < Box < str > > ( reducer_name_col ) ?;
520+ let function = st_scheduled_row. read_col :: < Box < str > > ( function_name_col ) ?;
513521
514- Ok ( ScheduledReducer {
515- reducer ,
522+ Ok ( ScheduledFunction {
523+ function ,
516524 bsatn_args : schedule_row. to_bsatn_vec ( ) ?,
517525 } )
518526}
519527
520- /// Helper to get schedule_row with `MutTxId`
528+ /// Helper to get ` schedule_row` with `MutTxId`.
521529fn get_schedule_row_mut < ' a > (
522530 tx : & ' a MutTxId ,
523531 db : & ' a RelationalDB ,
524- id : ScheduledReducerId ,
532+ id : ScheduledFunctionId ,
525533) -> anyhow:: Result < RowRef < ' a > > {
526534 db. iter_by_col_eq_mut ( tx, id. table_id , id. id_column , & id. schedule_id . into ( ) ) ?
527535 . next ( )
528536 . ok_or_else ( || anyhow ! ( "Schedule with ID {} not found in table {}" , id. schedule_id, id. table_id) )
529537}
530538
531- /// Helper to get schedule_id and schedule_at from schedule_row product value
539+ /// Helper to get `schedule_id` and `schedule_at`
540+ /// from `schedule_row` product value.
532541pub fn get_schedule_from_row (
533542 row : & RowRef < ' _ > ,
534543 id_column : ColId ,
0 commit comments