@@ -6,17 +6,29 @@ use crate::{
66use ahash:: AHashMap ;
77use anyhow:: Result ;
88use aptos_indexer_processor_sdk:: {
9- traits:: { async_step:: AsyncRunType , AsyncStep , NamedStep , Processable } ,
9+ traits:: {
10+ async_step:: AsyncRunType , processable:: CustomRunType , AsyncStep , IntoRunnableStep ,
11+ NamedStep , Processable , RunnableAsyncStep , RunnableStep ,
12+ } ,
1013 types:: transaction_context:: TransactionContext ,
11- utils:: errors:: ProcessorError ,
14+ utils:: {
15+ errors:: ProcessorError ,
16+ step_metrics:: { StepMetricLabels , StepMetricsBuilder } ,
17+ } ,
1218} ;
1319use async_trait:: async_trait;
20+ use bigdecimal:: Zero ;
1421use diesel:: {
1522 pg:: { upsert:: excluded, Pg } ,
1623 query_builder:: QueryFragment ,
1724 ExpressionMethods ,
1825} ;
19- use tracing:: { error, info} ;
26+ use instrumented_channel:: {
27+ instrumented_bounded_channel, InstrumentedAsyncReceiver , InstrumentedAsyncSender ,
28+ } ;
29+ use std:: time:: { Duration , Instant } ;
30+ use tokio:: task:: JoinHandle ;
31+ use tracing:: { error, info, warn} ;
2032
2133pub struct EventsStorer
2234where
@@ -55,7 +67,7 @@ fn insert_events_query(
5567impl Processable for EventsStorer {
5668 type Input = EventModel ;
5769 type Output = EventModel ;
58- type RunType = AsyncRunType ;
70+ type RunType = CustomRunType ;
5971
6072 async fn process (
6173 & mut self ,
@@ -91,3 +103,139 @@ impl NamedStep for EventsStorer {
91103 "EventsStorer" . to_string ( )
92104 }
93105}
106+
107+ // Custom spawning and task management
108+ impl IntoRunnableStep < EventModel , EventModel , EventsStorer , CustomRunType > for EventsStorer {
109+ fn into_runnable_step ( self ) -> impl RunnableStep < EventModel , EventModel > {
110+ RunnableEventsStorer :: new ( self )
111+ }
112+ }
113+
114+ pub struct RunnableEventsStorer {
115+ pub step : EventsStorer ,
116+ }
117+
118+ impl RunnableEventsStorer {
119+ pub fn new ( step : EventsStorer ) -> Self {
120+ Self { step }
121+ }
122+ }
123+
124+ impl RunnableStep < EventModel , EventModel > for RunnableEventsStorer {
125+ fn spawn (
126+ self ,
127+ input_receiver : Option < InstrumentedAsyncReceiver < TransactionContext < EventModel > > > ,
128+ output_channel_size : usize ,
129+ _input_sender : Option < InstrumentedAsyncSender < TransactionContext < EventModel > > > ,
130+ ) -> (
131+ InstrumentedAsyncReceiver < TransactionContext < EventModel > > ,
132+ JoinHandle < ( ) > ,
133+ ) {
134+ let mut step = self . step ;
135+ let step_name = step. name ( ) ;
136+ let input_receiver = input_receiver. expect ( "Input receiver must be set" ) ;
137+
138+ let ( output_sender, output_receiver) =
139+ instrumented_bounded_channel ( & step_name, output_channel_size) ;
140+
141+ // Tip: You may replace this tokio task spawning with your own code to customize the library and parallelization of this step
142+ info ! ( step_name = step_name, "Spawning processing task" ) ;
143+ let handle = tokio:: spawn ( async move {
144+ loop {
145+ let input_with_context = match input_receiver. recv ( ) . await {
146+ Ok ( input_with_context) => input_with_context,
147+ Err ( e) => {
148+ // If the previous steps have finished and the channels have closed , we should break out of the loop
149+ warn ! (
150+ step_name = step_name,
151+ error = e. to_string( ) ,
152+ "No input received from channel"
153+ ) ;
154+ break ;
155+ } ,
156+ } ;
157+ let processing_duration = Instant :: now ( ) ;
158+ let output_with_context = match step. process ( input_with_context) . await {
159+ Ok ( output_with_context) => output_with_context,
160+ Err ( e) => {
161+ error ! (
162+ step_name = step_name,
163+ error = e. to_string( ) ,
164+ "Failed to process input"
165+ ) ;
166+ break ;
167+ } ,
168+ } ;
169+ if let Some ( output_with_context) = output_with_context {
170+ match StepMetricsBuilder :: default ( )
171+ . labels ( StepMetricLabels {
172+ step_name : step. name ( ) ,
173+ } )
174+ . latest_processed_version ( output_with_context. end_version )
175+ . latest_transaction_timestamp (
176+ output_with_context. get_start_transaction_timestamp_unix ( ) ,
177+ )
178+ . num_transactions_processed_count (
179+ output_with_context. get_num_transactions ( ) ,
180+ )
181+ . processing_duration_in_secs ( processing_duration. elapsed ( ) . as_secs_f64 ( ) )
182+ . processed_size_in_bytes ( output_with_context. total_size_in_bytes )
183+ . build ( )
184+ {
185+ Ok ( mut metrics) => metrics. log_metrics ( ) ,
186+ Err ( e) => {
187+ error ! (
188+ step_name = step_name,
189+ error = e. to_string( ) ,
190+ "Failed to log metrics"
191+ ) ;
192+ break ;
193+ } ,
194+ }
195+ match output_sender. send ( output_with_context) . await {
196+ Ok ( _) => ( ) ,
197+ Err ( e) => {
198+ error ! (
199+ step_name = step_name,
200+ error = e. to_string( ) ,
201+ "Error sending output to channel"
202+ ) ;
203+ break ;
204+ } ,
205+ }
206+ }
207+ }
208+
209+ // Wait for output channel to be empty before ending the task and closing the send channel
210+ loop {
211+ let channel_size = output_sender. len ( ) ;
212+ info ! (
213+ step_name = step_name,
214+ channel_size = channel_size,
215+ "Waiting for output channel to be empty"
216+ ) ;
217+ if channel_size. is_zero ( ) {
218+ break ;
219+ }
220+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
221+ }
222+ info ! (
223+ step_name = step_name,
224+ "Output channel is empty. Closing send channel."
225+ ) ;
226+ } ) ;
227+
228+ ( output_receiver, handle)
229+ }
230+ }
231+
232+ impl NamedStep for RunnableEventsStorer {
233+ fn name ( & self ) -> String {
234+ self . step . name ( )
235+ }
236+
237+ fn type_name ( & self ) -> String {
238+ let step_type = std:: any:: type_name :: < EventsStorer > ( ) . to_string ( ) ;
239+ format ! ( "{} (via RunnableAsyncStep)" , step_type)
240+ }
241+ }
0 commit comments