1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- use std:: collections:: HashSet ;
1615use std:: str:: FromStr ;
1716use std:: sync:: Arc ;
1817
@@ -168,12 +167,7 @@ impl TaskMgr {
168167 name : task_name. to_string ( ) ,
169168 } ) ) ;
170169 }
171- for after in afters {
172- if task. after . contains ( after) {
173- continue ;
174- }
175- task. after . push ( after. clone ( ) ) ;
176- }
170+ return self . add_after ( & task. task_name , afters) . await ;
177171 }
178172 AlterTaskOptions :: RemoveAfter ( afters) => {
179173 if task. schedule_options . is_some ( ) {
@@ -182,7 +176,7 @@ impl TaskMgr {
182176 name : task_name. to_string ( ) ,
183177 } ) ) ;
184178 }
185- task . after . retain ( | task| ! afters. contains ( task ) ) ;
179+ return self . remove_after ( & task. task_name , afters) . await ;
186180 }
187181 }
188182 if let Err ( e) = self
@@ -268,32 +262,14 @@ impl TaskMgr {
268262
269263 #[ async_backtrace:: framed]
270264 #[ fastrace:: trace]
271- pub async fn update_after (
265+ pub async fn add_after (
272266 & self ,
273267 task_name : & str ,
274- task_after : & [ String ] ,
268+ new_afters : & [ String ] ,
275269 ) -> Result < Result < ( ) , TaskError > , TaskApiError > {
276- let task_after_ident = DirName :: new ( TaskDependentIdent :: new_generic (
277- & self . tenant ,
278- TaskDependent :: new ( DependentType :: After , task_name. to_string ( ) , "" . to_string ( ) ) ,
279- ) ) ;
280-
281270 let mut update_ops = Vec :: new ( ) ;
282-
283- let mut new_afters: HashSet < & String > = task_after. iter ( ) . collect ( ) ;
284- let mut remove_afters: Vec < String > = Vec :: new ( ) ;
285- let mut task_after_stream = self . kv_api . list_pb_values ( & task_after_ident) . await ?;
286-
287- while let Some ( after_task_dependent) = task_after_stream. next ( ) . await {
288- let after_task_dependent = after_task_dependent?;
289-
290- debug_assert_eq ! ( after_task_dependent. ty, DependentType :: After ) ;
291-
292- if !new_afters. remove ( & after_task_dependent. target ) {
293- remove_afters. push ( after_task_dependent. target . clone ( ) ) ;
294- }
295- }
296271 let mut check_ops = Vec :: with_capacity ( new_afters. len ( ) ) ;
272+
297273 for after in new_afters {
298274 let after_dependent =
299275 TaskDependent :: new ( DependentType :: After , task_name. to_string ( ) , after. clone ( ) ) ;
@@ -320,27 +296,91 @@ impl TaskMgr {
320296 target : Some ( Target :: Seq ( 0 ) ) ,
321297 } ) ;
322298 }
323- for after in remove_afters {
324- let before_dependent_ident = TaskDependentIdent :: new_generic (
325- & self . tenant ,
326- TaskDependent :: new ( DependentType :: Before , after. clone ( ) , task_name. to_string ( ) ) ,
327- ) ;
328-
329- update_ops. push ( TxnOp :: delete ( before_dependent_ident. to_string_key ( ) ) ) ;
330- }
331299 let request = TxnRequest :: new ( check_ops, update_ops) ;
332300 let reply = self . kv_api . transaction ( request) . await ?;
333301
334302 if reply. success {
335303 return Err ( TaskApiError :: SimultaneousUpdateTaskAfter {
336304 task_name : task_name. to_string ( ) ,
337- after : task_after . join ( "," ) ,
305+ after : new_afters . join ( "," ) ,
338306 } ) ;
339307 }
340308
341309 Ok ( Ok ( ( ) ) )
342310 }
343311
312+ #[ async_backtrace:: framed]
313+ #[ fastrace:: trace]
314+ pub async fn remove_after (
315+ & self ,
316+ task_name : & str ,
317+ remove_afters : & [ String ] ,
318+ ) -> Result < Result < ( ) , TaskError > , TaskApiError > {
319+ let task_after_dir_ident = DirName :: new ( TaskDependentIdent :: new_generic (
320+ & self . tenant ,
321+ TaskDependent :: new ( DependentType :: After , task_name. to_string ( ) , "" . to_string ( ) ) ,
322+ ) ) ;
323+ let mut task_after_stream = self . kv_api . list_pb_values ( & task_after_dir_ident) . await ?;
324+ let mut update_ops = Vec :: new ( ) ;
325+
326+ while let Some ( task_after_dependent) = task_after_stream. next ( ) . await {
327+ let task_after_dependent = task_after_dependent?;
328+
329+ debug_assert_eq ! ( task_after_dependent. ty, DependentType :: After ) ;
330+
331+ if !remove_afters. contains ( & task_after_dependent. target ) {
332+ continue ;
333+ }
334+ let task_after_ident =
335+ TaskDependentIdent :: new_generic ( & self . tenant , task_after_dependent) ;
336+ update_ops. push ( TxnOp :: delete ( task_after_ident. to_string_key ( ) ) ) ;
337+ }
338+ let request = TxnRequest :: new ( vec ! [ ] , update_ops) ;
339+ let _ = self . kv_api . transaction ( request) . await ?;
340+
341+ Ok ( Ok ( ( ) ) )
342+ }
343+
344+ // Tips: Task Before only cleans up when dropping a task
345+ #[ async_backtrace:: framed]
346+ #[ fastrace:: trace]
347+ pub async fn clean_task_state_and_dependents (
348+ & self ,
349+ task_name : & str ,
350+ ) -> Result < Result < ( ) , TaskError > , TaskApiError > {
351+ let task_before_dir_ident = DirName :: new ( TaskDependentIdent :: new_generic (
352+ & self . tenant ,
353+ TaskDependent :: new ( DependentType :: Before , task_name. to_string ( ) , "" . to_string ( ) ) ,
354+ ) ) ;
355+ let mut task_before_stream = self . kv_api . list_pb_values ( & task_before_dir_ident) . await ?;
356+ let mut update_ops = Vec :: new ( ) ;
357+
358+ while let Some ( task_before_dependent) = task_before_stream. next ( ) . await {
359+ let task_before_dependent = task_before_dependent?;
360+ debug_assert_eq ! ( task_before_dependent. ty, DependentType :: Before ) ;
361+
362+ let task_after_dependent = TaskDependent :: new (
363+ DependentType :: After ,
364+ task_before_dependent. target . to_string ( ) ,
365+ task_before_dependent. source . to_string ( ) ,
366+ ) ;
367+
368+ let task_before_ident =
369+ TaskDependentIdent :: new_generic ( & self . tenant , task_before_dependent) ;
370+ update_ops. push ( TxnOp :: delete ( task_before_ident. to_string_key ( ) ) ) ;
371+
372+ let task_after_ident =
373+ TaskDependentIdent :: new_generic ( & self . tenant , task_after_dependent) ;
374+ update_ops. push ( TxnOp :: delete ( task_after_ident. to_string_key ( ) ) ) ;
375+ }
376+ update_ops. push ( TxnOp :: delete ( TaskStateIdent :: new ( & self . tenant , task_name) . to_string_key ( ) ) ) ;
377+
378+ let request = TxnRequest :: new ( vec ! [ ] , update_ops) ;
379+ let _ = self . kv_api . transaction ( request) . await ?;
380+
381+ Ok ( Ok ( ( ) ) )
382+ }
383+
344384 #[ async_backtrace:: framed]
345385 #[ fastrace:: trace]
346386 pub async fn task_succeeded (
@@ -466,7 +506,7 @@ impl TaskMgr {
466506 }
467507 }
468508 if !task. after . is_empty ( ) {
469- if let Err ( err) = self . update_after ( & task. task_name , & task. after ) . await ? {
509+ if let Err ( err) = self . add_after ( & task. task_name , & task. after ) . await ? {
470510 return Ok ( Err ( err) ) ;
471511 }
472512 } else if task. schedule_options . is_some ( ) && !without_schedule {
0 commit comments