@@ -21,6 +21,7 @@ use common::{
2121} ;
2222use database:: {
2323 BootstrapComponentsModel ,
24+ SystemMetadataModel ,
2425 TableModel ,
2526 Transaction ,
2627} ;
@@ -32,8 +33,14 @@ use model::{
3233 BackendStateModel ,
3334 } ,
3435 scheduled_jobs:: {
35- types:: ScheduledJobState ,
36+ types:: {
37+ args_to_bytes,
38+ ScheduledJobAttempts ,
39+ ScheduledJobMetadata ,
40+ ScheduledJobState ,
41+ } ,
3642 SchedulerModel ,
43+ SCHEDULED_JOBS_TABLE ,
3744 } ,
3845} ;
3946use runtime:: testing:: TestRuntime ;
@@ -68,6 +75,50 @@ fn insert_object_path() -> CanonicalizedComponentFunctionPath {
6875 }
6976}
7077
78+ /// Create a scheduled job in the old format with arguments inline in the
79+ /// document instead of args_id pointing to arguments in the
80+ /// `_scheduled_job_args_table`. We are not migrating docs from this old format,
81+ /// so we need to be sure the scheduler code always works with inline arguments.
82+ async fn create_scheduled_job_with_inline_args < ' a > (
83+ rt : & ' a TestRuntime ,
84+ tx : & ' a mut Transaction < TestRuntime > ,
85+ path : CanonicalizedComponentFunctionPath ,
86+ ) -> anyhow:: Result < ResolvedDocumentId > {
87+ let mut map = serde_json:: Map :: new ( ) ;
88+ map. insert (
89+ "key" . to_string ( ) ,
90+ serde_json:: Value :: String ( "value" . to_string ( ) ) ,
91+ ) ;
92+ let ( _, component) =
93+ BootstrapComponentsModel :: new ( tx) . must_component_path_to_ids ( & path. component ) ?;
94+ let mut model = SystemMetadataModel :: new ( tx, component. into ( ) ) ;
95+ let original_scheduled_ts = rt. unix_timestamp ( ) . as_system_time ( ) . try_into ( ) ?;
96+ let udf_path = & path. udf_path . clone ( ) ;
97+ let job_id = model
98+ . insert_metadata (
99+ & SCHEDULED_JOBS_TABLE ,
100+ ScheduledJobMetadata {
101+ path,
102+ udf_args_bytes : Some ( args_to_bytes ( parse_udf_args (
103+ udf_path,
104+ vec ! [ JsonValue :: Object ( map) ] ,
105+ ) ?) ?) ,
106+ args_id : None ,
107+ state : ScheduledJobState :: Pending ,
108+ next_ts : Some ( original_scheduled_ts) ,
109+ completed_ts : None ,
110+ original_scheduled_ts,
111+ attempts : ScheduledJobAttempts :: default ( ) ,
112+ }
113+ . try_into ( ) ?,
114+ )
115+ . await ?;
116+ let mut model = SchedulerModel :: new ( tx, component. into ( ) ) ;
117+ let state = model. check_status ( job_id) . await ?. unwrap ( ) ;
118+ assert_eq ! ( state, ScheduledJobState :: Pending ) ;
119+ Ok ( job_id)
120+ }
121+
71122async fn create_scheduled_job < ' a > (
72123 rt : & ' a TestRuntime ,
73124 tx : & ' a mut Transaction < TestRuntime > ,
@@ -134,6 +185,39 @@ async fn test_scheduled_jobs_success(
134185 Ok ( ( ) )
135186}
136187
188+ #[ convex_macro:: test_runtime]
189+ async fn test_scheduled_jobs_with_inline_args_success (
190+ rt : TestRuntime ,
191+ pause_controller : PauseController ,
192+ ) -> anyhow:: Result < ( ) > {
193+ let application = Application :: new_for_tests ( & rt) . await ?;
194+ application. load_udf_tests_modules ( ) . await ?;
195+
196+ let hold_guard = pause_controller. hold ( SCHEDULED_JOB_EXECUTED ) ;
197+
198+ let mut tx = application. begin ( Identity :: system ( ) ) . await ?;
199+ let job_id = create_scheduled_job_with_inline_args ( & rt, & mut tx, insert_object_path ( ) ) . await ?;
200+ assert ! (
201+ TableModel :: new( & mut tx)
202+ . table_is_empty( OBJECTS_TABLE_COMPONENT . into( ) , & OBJECTS_TABLE )
203+ . await ?
204+ ) ;
205+
206+ application. commit_test ( tx) . await ?;
207+
208+ wait_for_scheduled_job_execution ( hold_guard) . await ;
209+ tx = application. begin ( Identity :: system ( ) ) . await ?;
210+ let mut model = SchedulerModel :: new ( & mut tx, TableNamespace :: test_user ( ) ) ;
211+ let state = model. check_status ( job_id) . await ?. unwrap ( ) ;
212+ assert_eq ! ( state, ScheduledJobState :: Success ) ;
213+ assert ! (
214+ !TableModel :: new( & mut tx)
215+ . table_is_empty( OBJECTS_TABLE_COMPONENT . into( ) , & OBJECTS_TABLE )
216+ . await ?
217+ ) ;
218+ Ok ( ( ) )
219+ }
220+
137221#[ convex_macro:: test_runtime]
138222async fn test_scheduled_jobs_canceled ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
139223 let application = Application :: new_for_tests ( & rt) . await ?;
0 commit comments