2828from pydoc import splitdoc
2929from timeit import default_timer as timer
3030
31- logger = logging .getLogger (__name__ )
31+ module_logger = logging .getLogger (__name__ )
3232
3333
34- def group (* groups ):
35- """Mark a function as part of a particular group."""
36-
37- def decorator (obj ):
38- if hasattr (obj , "groups" ):
39- obj .groups = obj .groups .union (groups )
40- else :
41- setattr (obj , "groups" , set (groups ))
42- return obj
43-
44- return decorator
45-
46-
47- def humanize_time (seconds ):
48- """Convert the provided ``seconds`` number into human-readable time."""
49- message = f"{ seconds :.0f} seconds"
50-
51- if seconds > 86400 :
52- message += f" ({ seconds / 86400 :.1f} days)"
53- if seconds > 3600 :
54- message += f" ({ seconds / 3600 :.1f} hours)"
55- elif seconds > 60 :
56- message += f" ({ seconds / 60 :.1f} minutes)"
57-
58- return message
59-
60-
61- class BasePipeline :
62- """Base class for all pipeline implementations."""
34+ class PipelineDefinition :
35+ """
36+ Encapsulate the code related to a Pipeline definition:
37+ - Steps
38+ - Attributes
39+ - Documentation
40+ """
6341
6442 # Flag indicating if the Pipeline is an add-on, meaning it cannot be run first.
6543 is_addon = False
6644
67- def __init__ (self , run ):
68- """Load the Run and Project instances."""
69- self .run = run
70- self .project = run .project
71- self .pipeline_name = run .pipeline_name
72- self .env = self .project .get_env ()
73-
7445 @classmethod
7546 def steps (cls ):
7647 raise NotImplementedError
@@ -89,6 +60,9 @@ def get_steps(cls, groups=None):
8960
9061 steps = cls .steps ()
9162
63+ if initial_steps := cls .get_initial_steps ():
64+ steps = (* initial_steps , * steps )
65+
9266 if groups is not None :
9367 steps = tuple (
9468 step
@@ -152,13 +126,40 @@ def get_available_groups(cls):
152126 )
153127 )
154128
129+
130+ class PipelineRun :
131+ """
132+ Encapsulate the code related to a Pipeline run (execution):
133+ - Execution context: groups, steps
134+ - Execution logic
135+ - Logging
136+ - Results
137+ """
138+
139+ def __init__ (self , selected_groups = None , selected_steps = None ):
140+ """Load the Pipeline class."""
141+ self .pipeline_class = self .__class__
142+ self .pipeline_name = self .__class__ .__name__
143+
144+ self .selected_groups = selected_groups
145+ self .selected_steps = selected_steps or []
146+
147+ self .execution_log = []
148+ self .current_step = ""
149+
150+ def append_to_log (self , message ):
151+ self .execution_log .append (message )
152+
153+ def set_current_step (self , message ):
154+ self .current_step = message
155+
155156 def log (self , message ):
156- """Log the given `message` to the current module logger and Run instance ."""
157+ """Log the given `message` to the current module logger and execution_log ."""
157158 now_local = datetime .now (timezone .utc ).astimezone ()
158159 timestamp = now_local .strftime ("%Y-%m-%d %H:%M:%S.%f" )[:- 3 ]
159160 message = f"{ timestamp } { message } "
160- logger .info (message )
161- self .run . append_to_log (message )
161+ module_logger .info (message )
162+ self .append_to_log (message )
162163
163164 @staticmethod
164165 def output_from_exception (exception ):
@@ -177,23 +178,18 @@ def execute(self):
177178 """Execute each steps in the order defined on this pipeline class."""
178179 self .log (f"Pipeline [{ self .pipeline_name } ] starting" )
179180
180- steps = self .get_steps (groups = self .run .selected_groups )
181- selected_steps = self .run .selected_steps
182-
183- if initial_steps := self .get_initial_steps ():
184- steps = initial_steps + steps
185-
181+ steps = self .pipeline_class .get_steps (groups = self .selected_groups )
186182 steps_count = len (steps )
187183 pipeline_start_time = timer ()
188184
189185 for current_index , step in enumerate (steps , start = 1 ):
190186 step_name = step .__name__
191187
192- if selected_steps and step_name not in selected_steps :
188+ if self . selected_steps and step_name not in self . selected_steps :
193189 self .log (f"Step [{ step_name } ] skipped" )
194190 continue
195191
196- self .run . set_current_step (f"{ current_index } /{ steps_count } { step_name } " )
192+ self .set_current_step (f"{ current_index } /{ steps_count } { step_name } " )
197193 self .log (f"Step [{ step_name } ] starting" )
198194 step_start_time = timer ()
199195
@@ -206,8 +202,114 @@ def execute(self):
206202 step_run_time = timer () - step_start_time
207203 self .log (f"Step [{ step_name } ] completed in { humanize_time (step_run_time )} " )
208204
209- self .run . set_current_step ("" ) # Reset the `current_step` field on completion
205+ self .set_current_step ("" ) # Reset the `current_step` field on completion
210206 pipeline_run_time = timer () - pipeline_start_time
211207 self .log (f"Pipeline completed in { humanize_time (pipeline_run_time )} " )
212208
213209 return 0 , ""
210+
211+
212+ class BasePipeline (PipelineDefinition , PipelineRun ):
213+ """
214+ Base class for all pipeline implementations.
215+ It combines the pipeline definition and execution logics.
216+ """
217+
218+
219+ def group (* groups ):
220+ """Mark a function as part of a particular group."""
221+
222+ def decorator (obj ):
223+ if hasattr (obj , "groups" ):
224+ obj .groups = obj .groups .union (groups )
225+ else :
226+ setattr (obj , "groups" , set (groups ))
227+ return obj
228+
229+ return decorator
230+
231+
232+ def humanize_time (seconds ):
233+ """Convert the provided ``seconds`` number into human-readable time."""
234+ message = f"{ seconds :.0f} seconds"
235+
236+ if seconds > 86400 :
237+ message += f" ({ seconds / 86400 :.1f} days)"
238+ if seconds > 3600 :
239+ message += f" ({ seconds / 3600 :.1f} hours)"
240+ elif seconds > 60 :
241+ message += f" ({ seconds / 60 :.1f} minutes)"
242+
243+ return message
244+
245+
246+ class LoopProgress :
247+ """
248+ A context manager for logging progress in loops.
249+
250+ Usage::
251+ total_iterations = 100
252+ logger = print # Replace with your actual logger function
253+
254+ progress = LoopProgress(total_iterations, logger, progress_step=10)
255+ for item in progress.iter(iterator):
256+ "Your processing logic here"
257+
258+ # As a context manager
259+ with LoopProgress(total_iterations, logger, progress_step=10) as progress:
260+ for item in progress.iter(iterator):
261+ "Your processing logic here"
262+ """
263+
264+ def __init__ (self , total_iterations , logger , progress_step = 10 ):
265+ self .total_iterations = total_iterations
266+ self .logger = logger
267+ self .progress_step = progress_step
268+ self .start_time = timer ()
269+ self .last_logged_progress = 0
270+ self .current_iteration = 0
271+
272+ def get_eta (self , current_progress ):
273+ run_time = timer () - self .start_time
274+ return round (run_time / current_progress * (100 - current_progress ))
275+
276+ @property
277+ def current_progress (self ):
278+ return int ((self .current_iteration / self .total_iterations ) * 100 )
279+
280+ @property
281+ def eta (self ):
282+ run_time = timer () - self .start_time
283+ return round (run_time / self .current_progress * (100 - self .current_progress ))
284+
285+ def log_progress (self ):
286+ reasons_to_skip = [
287+ not self .logger ,
288+ not self .current_iteration > 0 ,
289+ self .total_iterations <= self .progress_step ,
290+ ]
291+ if any (reasons_to_skip ):
292+ return
293+
294+ if self .current_progress >= self .last_logged_progress + self .progress_step :
295+ msg = (
296+ f"Progress: { self .current_progress } % "
297+ f"({ self .current_iteration } /{ self .total_iterations } )"
298+ )
299+ if eta := self .eta :
300+ msg += f" ETA: { humanize_time (eta )} "
301+
302+ self .logger (msg )
303+ self .last_logged_progress = self .current_progress
304+
305+ def __enter__ (self ):
306+ return self
307+
308+ def __exit__ (self , exc_type , exc_value , traceback ):
309+ pass
310+
311+ def iter (self , iterator ):
312+ for item in iterator :
313+ self .current_iteration += 1
314+ self .log_progress ()
315+ yield item
0 commit comments