55// Copyright 2023 Oxide Computer Company
66
77use std:: {
8- borrow:: Cow ,
9- fmt,
10- ops:: ControlFlow ,
11- pin:: Pin ,
12- sync:: {
13- atomic:: { AtomicUsize , Ordering } ,
14- Mutex ,
15- } ,
16- task:: Poll ,
8+ borrow:: Cow , fmt, ops:: ControlFlow , pin:: Pin , sync:: Mutex , task:: Poll ,
179} ;
1810
1911use cancel_safe_futures:: coop_cancel;
@@ -184,14 +176,12 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
184176 async fn execute_impl (
185177 mut self ,
186178 ) -> Result < CompletionContext < S > , ExecutionError < S > > {
187- // TODO: this absolutely does not need to be an atomic! However it is
188- // currently so because of a bug in rustc, fixed in Rust 1.70. Fix this
189- // once omicron is on Rust 1.70.
190- //
191- // https://github.com/rust-lang/rust/pull/107844
192- let event_index = AtomicUsize :: new ( 0 ) ;
193- let next_event_index = || event_index. fetch_add ( 1 , Ordering :: SeqCst ) ;
194- let exec_cx = ExecutionContext :: new (
179+ let mut event_index = 0 ;
180+ let next_event_index = || {
181+ event_index += 1 ;
182+ event_index - 1
183+ } ;
184+ let mut exec_cx = ExecutionContext :: new (
195185 self . execution_id ,
196186 next_event_index,
197187 self . sender . clone ( ) ,
@@ -243,7 +233,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
243233 self . sender . send ( Event :: Step ( StepEvent {
244234 spec : S :: schema_name ( ) ,
245235 execution_id : self . execution_id ,
246- event_index : next_event_index ( ) ,
236+ event_index : ( exec_cx . next_event_index ) ( ) ,
247237 total_elapsed : exec_cx. total_start . elapsed ( ) ,
248238 kind : StepEventKind :: NoStepsDefined ,
249239 } ) ) . await ?;
@@ -264,7 +254,7 @@ impl<'a, S: StepSpec + 'a> UpdateEngine<'a, S> {
264254 let event = Event :: Step ( StepEvent {
265255 spec : S :: schema_name ( ) ,
266256 execution_id : self . execution_id ,
267- event_index : next_event_index ( ) ,
257+ event_index : ( exec_cx . next_event_index ) ( ) ,
268258 total_elapsed : exec_cx. total_start . elapsed ( ) ,
269259 kind : StepEventKind :: ExecutionStarted {
270260 steps : step_infos,
@@ -771,7 +761,7 @@ struct StepExec<'a, S: StepSpec> {
771761}
772762
773763impl < ' a , S : StepSpec > StepExec < ' a , S > {
774- async fn execute < F : Fn ( ) -> usize > (
764+ async fn execute < F : FnMut ( ) -> usize > (
775765 self ,
776766 log : & slog:: Logger ,
777767 step_exec_cx : StepExecutionContext < S , F > ,
@@ -884,12 +874,12 @@ impl<S: StepSpec, F> ExecutionContext<S, F> {
884874 }
885875
886876 fn create (
887- & self ,
877+ & mut self ,
888878 step_info : StepInfoWithMetadata < S > ,
889- ) -> StepExecutionContext < S , & F > {
879+ ) -> StepExecutionContext < S , & mut F > {
890880 StepExecutionContext {
891881 execution_id : self . execution_id ,
892- next_event_index : DebugIgnore ( & self . next_event_index . 0 ) ,
882+ next_event_index : DebugIgnore ( & mut self . next_event_index . 0 ) ,
893883 total_start : self . total_start ,
894884 step_info,
895885 sender : self . sender . clone ( ) ,
@@ -941,7 +931,7 @@ struct StepProgressReporter<S: StepSpec, F> {
941931 sender : mpsc:: Sender < Event < S > > ,
942932}
943933
944- impl < S : StepSpec , F : Fn ( ) -> usize > StepProgressReporter < S , F > {
934+ impl < S : StepSpec , F : FnMut ( ) -> usize > StepProgressReporter < S , F > {
945935 fn new ( step_exec_cx : StepExecutionContext < S , F > ) -> Self {
946936 let step_start = Instant :: now ( ) ;
947937 Self {
@@ -1069,7 +1059,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
10691059 }
10701060 }
10711061
1072- async fn handle_abort ( self , message : String ) -> ExecutionError < S > {
1062+ async fn handle_abort ( mut self , message : String ) -> ExecutionError < S > {
10731063 // Send the abort message over the channel.
10741064 //
10751065 // The only way this can fail is if the event receiver is closed or
@@ -1104,7 +1094,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
11041094 }
11051095
11061096 async fn next_step (
1107- self ,
1097+ mut self ,
11081098 step_res : Result < StepOutcome < S > , S :: Error > ,
11091099 next_step_info : & StepInfoWithMetadata < S > ,
11101100 ) -> Result < ( ) , ExecutionError < S > > {
@@ -1144,7 +1134,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
11441134 }
11451135
11461136 async fn last_step (
1147- self ,
1137+ mut self ,
11481138 step_res : Result < StepOutcome < S > , S :: Error > ,
11491139 ) -> Result < ( ) , ExecutionError < S > > {
11501140 match step_res {
@@ -1182,7 +1172,7 @@ impl<S: StepSpec, F: Fn() -> usize> StepProgressReporter<S, F> {
11821172 }
11831173
11841174 async fn send_error (
1185- self ,
1175+ mut self ,
11861176 error : & S :: Error ,
11871177 ) -> Result < ( ) , mpsc:: error:: SendError < Event < S > > > {
11881178 // Stringify `error` into a message + list causes; this is written the
0 commit comments