@@ -11,7 +11,6 @@ use crate::error::DBError;
1111use crate :: estimation:: estimate_rows_scanned;
1212use crate :: hash:: Hash ;
1313use crate :: host:: host_controller:: CallProcedureReturn ;
14- use crate :: host:: scheduler:: { handle_queued_call_reducer_params, QueueItem } ;
1514use crate :: host:: v8:: JsInstance ;
1615use crate :: host:: wasmtime:: ModuleInstance ;
1716use crate :: host:: { InvalidFunctionArguments , InvalidViewArguments } ;
@@ -41,7 +40,7 @@ use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOf
4140use spacetimedb_data_structures:: error_stream:: ErrorStream ;
4241use spacetimedb_data_structures:: map:: { HashCollectionExt as _, IntMap } ;
4342use spacetimedb_datastore:: error:: DatastoreError ;
44- use spacetimedb_datastore:: execution_context:: { ExecutionContext , ReducerContext , Workload , WorkloadType } ;
43+ use spacetimedb_datastore:: execution_context:: { Workload , WorkloadType } ;
4544use spacetimedb_datastore:: locking_tx_datastore:: { MutTxId , ViewCallInfo } ;
4645use spacetimedb_datastore:: traits:: { IsolationLevel , Program , TxData } ;
4746use spacetimedb_durability:: DurableOffset ;
@@ -56,7 +55,6 @@ use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId};
5655use spacetimedb_query:: compile_subscription;
5756use spacetimedb_sats:: { AlgebraicTypeRef , ProductValue } ;
5857use spacetimedb_schema:: auto_migrate:: { AutoMigrateError , MigrationPolicy } ;
59- use spacetimedb_schema:: def:: deserialize:: ArgsSeed ;
6058use spacetimedb_schema:: def:: { ModuleDef , ProcedureDef , ReducerDef , TableDef , ViewDef } ;
6159use spacetimedb_schema:: schema:: { Schema , TableSchema } ;
6260use spacetimedb_vm:: relation:: RelValue ;
@@ -615,59 +613,6 @@ pub fn call_identity_connected(
615613 }
616614}
617615
618- // Only for logging purposes.
619- const SCHEDULED_REDUCER : & str = "scheduled_reducer" ;
620-
621- pub ( crate ) fn call_scheduled_reducer (
622- module : & ModuleInfo ,
623- queue_item : QueueItem ,
624- call_reducer : impl FnOnce ( Option < MutTxId > , CallReducerParams ) -> ( ReducerCallResult , bool ) ,
625- ) -> ( Result < ( ReducerCallResult , Timestamp ) , ReducerCallError > , bool ) {
626- extract_trapped ( call_scheduled_reducer_inner ( module, queue_item, call_reducer) )
627- }
628-
629- fn call_scheduled_reducer_inner (
630- module : & ModuleInfo ,
631- item : QueueItem ,
632- call_reducer : impl FnOnce ( Option < MutTxId > , CallReducerParams ) -> ( ReducerCallResult , bool ) ,
633- ) -> Result < ( ( ReducerCallResult , Timestamp ) , bool ) , ReducerCallError > {
634- let db = & module. relational_db ( ) ;
635- let mut tx = db. begin_mut_tx ( IsolationLevel :: Serializable , Workload :: Internal ) ;
636-
637- match handle_queued_call_reducer_params ( & tx, module, db, item) {
638- Ok ( Some ( params) ) => {
639- // Is necessary to patch the context with the actual calling reducer
640- let reducer_def = module
641- . module_def
642- . get_reducer_by_id ( params. reducer_id )
643- . ok_or ( ReducerCallError :: ScheduleReducerNotFound ) ?;
644- let reducer = & * reducer_def. name ;
645-
646- tx. ctx = ExecutionContext :: with_workload (
647- tx. ctx . database_identity ( ) ,
648- Workload :: Reducer ( ReducerContext {
649- name : reducer. into ( ) ,
650- caller_identity : params. caller_identity ,
651- caller_connection_id : params. caller_connection_id ,
652- timestamp : Timestamp :: now ( ) ,
653- arg_bsatn : params. args . get_bsatn ( ) . clone ( ) ,
654- } ) ,
655- ) ;
656-
657- let timestamp = params. timestamp ;
658- let ( res, trapped) = call_reducer ( Some ( tx) , params) ;
659- Ok ( ( ( res, timestamp) , trapped) )
660- }
661- Ok ( None ) => Err ( ReducerCallError :: ScheduleReducerNotFound ) ,
662- Err ( err) => Err ( ReducerCallError :: Args ( InvalidReducerArguments (
663- InvalidFunctionArguments {
664- err,
665- function_name : SCHEDULED_REDUCER . into ( ) ,
666- } ,
667- ) ) ) ,
668- }
669- }
670-
671616pub struct CallReducerParams {
672617 pub timestamp : Timestamp ,
673618 pub caller_identity : Identity ,
@@ -678,6 +623,7 @@ pub struct CallReducerParams {
678623 pub reducer_id : ReducerId ,
679624 pub args : ArgsTuple ,
680625}
626+
681627impl CallReducerParams {
682628 /// Returns a set of parameters for a call that came from within
683629 /// and without a client/caller/request_id.
@@ -725,6 +671,26 @@ pub struct CallProcedureParams {
725671 pub args : ArgsTuple ,
726672}
727673
674+ impl CallProcedureParams {
675+ /// Returns a set of parameters for a call that came from within
676+ /// and without a client/caller/request_id.
677+ pub fn from_system (
678+ timestamp : Timestamp ,
679+ caller_identity : Identity ,
680+ procedure_id : ProcedureId ,
681+ args : ArgsTuple ,
682+ ) -> Self {
683+ Self {
684+ timestamp,
685+ caller_identity,
686+ caller_connection_id : ConnectionId :: ZERO ,
687+ timer : None ,
688+ procedure_id,
689+ args,
690+ }
691+ }
692+ }
693+
728694/// Holds a [`Module`] and a set of [`Instance`]s from it,
729695/// and allocates the [`Instance`]s to be used for function calls.
730696///
@@ -1449,8 +1415,9 @@ impl ModuleHost {
14491415 reducer_def : & ReducerDef ,
14501416 args : FunctionArgs ,
14511417 ) -> Result < CallReducerParams , InvalidReducerArguments > {
1452- let reducer_seed = ArgsSeed ( module. module_def . typespace ( ) . with_type ( reducer_def) ) ;
1453- let args = args. into_tuple ( reducer_seed) . map_err ( InvalidReducerArguments ) ?;
1418+ let args = args
1419+ . into_tuple_for_def ( & module. module_def , reducer_def)
1420+ . map_err ( InvalidReducerArguments ) ?;
14541421 let caller_connection_id = caller_connection_id. unwrap_or ( ConnectionId :: ZERO ) ;
14551422 Ok ( CallReducerParams {
14561423 timestamp : Timestamp :: now ( ) ,
@@ -1464,6 +1431,20 @@ impl ModuleHost {
14641431 } )
14651432 }
14661433
1434+ pub async fn call_reducer_with_params (
1435+ & self ,
1436+ reducer_name : & str ,
1437+ params : CallReducerParams ,
1438+ ) -> Result < ReducerCallResult , NoSuchModule > {
1439+ self . call (
1440+ reducer_name,
1441+ params,
1442+ |p, inst| inst. call_reducer ( None , p) ,
1443+ |p, inst| inst. call_reducer ( None , p) ,
1444+ )
1445+ . await
1446+ }
1447+
14671448 async fn call_reducer_inner (
14681449 & self ,
14691450 caller_identity : Identity ,
@@ -1475,8 +1456,9 @@ impl ModuleHost {
14751456 reducer_def : & ReducerDef ,
14761457 args : FunctionArgs ,
14771458 ) -> Result < ReducerCallResult , ReducerCallError > {
1478- let reducer_seed = ArgsSeed ( self . info . module_def . typespace ( ) . with_type ( reducer_def) ) ;
1479- let args = args. into_tuple ( reducer_seed) . map_err ( InvalidReducerArguments ) ?;
1459+ let args = args
1460+ . into_tuple_for_def ( & self . info . module_def , reducer_def)
1461+ . map_err ( InvalidReducerArguments ) ?;
14801462 let caller_connection_id = caller_connection_id. unwrap_or ( ConnectionId :: ZERO ) ;
14811463 let call_reducer_params = CallReducerParams {
14821464 timestamp : Timestamp :: now ( ) ,
@@ -1490,12 +1472,7 @@ impl ModuleHost {
14901472 } ;
14911473
14921474 Ok ( self
1493- . call (
1494- & reducer_def. name ,
1495- call_reducer_params,
1496- |p, inst| inst. call_reducer ( None , p) ,
1497- |p, inst| inst. call_reducer ( None , p) ,
1498- )
1475+ . call_reducer_with_params ( & reducer_def. name , call_reducer_params)
14991476 . await ?)
15001477 }
15011478
@@ -1600,9 +1577,10 @@ impl ModuleHost {
16001577 procedure_def : & ProcedureDef ,
16011578 args : FunctionArgs ,
16021579 ) -> Result < CallProcedureReturn , ProcedureCallError > {
1603- let procedure_seed = ArgsSeed ( self . info . module_def . typespace ( ) . with_type ( procedure_def) ) ;
1580+ let args = args
1581+ . into_tuple_for_def ( & self . info . module_def , procedure_def)
1582+ . map_err ( InvalidProcedureArguments ) ?;
16041583 let caller_connection_id = caller_connection_id. unwrap_or ( ConnectionId :: ZERO ) ;
1605- let args = args. into_tuple ( procedure_seed) . map_err ( InvalidProcedureArguments ) ?;
16061584
16071585 self . call_async_with_instance ( & procedure_def. name , async move |mut inst| {
16081586 let res = inst
@@ -1621,20 +1599,18 @@ impl ModuleHost {
16211599 . map_err ( Into :: into)
16221600 }
16231601
1624- // Scheduled reducers require a different function here to call their reducer
1625- // because their reducer arguments are stored in the database and need to be fetched
1626- // within the same transaction as the reducer call.
1627- pub ( crate ) async fn call_scheduled_reducer (
1602+ // This is not reused in `call_procedure_inner`
1603+ // due to concerns re. `Timestamp::now`.
1604+ pub async fn call_procedure_with_params (
16281605 & self ,
1629- item : QueueItem ,
1630- ) -> Result < ( ReducerCallResult , Timestamp ) , ReducerCallError > {
1631- self . call (
1632- SCHEDULED_REDUCER ,
1633- item,
1634- |item, inst| inst. call_scheduled_reducer ( item) ,
1635- |item, inst| inst. call_scheduled_reducer ( item) ,
1636- )
1637- . await ?
1606+ name : & str ,
1607+ params : CallProcedureParams ,
1608+ ) -> Result < CallProcedureReturn , NoSuchModule > {
1609+ self . call_async_with_instance ( name, async move |mut inst| {
1610+ let res = inst. call_procedure ( params) . await ;
1611+ ( res, inst)
1612+ } )
1613+ . await
16381614 }
16391615
16401616 /// Materializes the views return by the `view_collector`, if not already materialized,
@@ -1721,9 +1697,9 @@ impl ModuleHost {
17211697 let view_def = module_def. view ( view_name) . ok_or ( ViewCallError :: NoSuchView ) ?;
17221698 let fn_ptr = view_def. fn_ptr ;
17231699 let row_type = view_def. product_type_ref ;
1724- let typespace = module_def . typespace ( ) . with_type ( view_def ) ;
1725- let view_seed = ArgsSeed ( typespace ) ;
1726- let args = args . into_tuple ( view_seed ) . map_err ( InvalidViewArguments ) ?;
1700+ let args = args
1701+ . into_tuple_for_def ( module_def , view_def )
1702+ . map_err ( InvalidViewArguments ) ?;
17271703
17281704 match self
17291705 . call_view_inner ( tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type)
0 commit comments