@@ -170,9 +170,9 @@ def __init__(
170170 self ._tasks = TaskRegistry ()
171171 self ._idempotent_tasks_to_submit = set ()
172172 self ._execution = None
173- self .current_priority = None
174- self .handled_failures = {}
175- self .created_activity_types = set ()
173+ self .current_priority : str | int | None = None
174+ self .handled_failures : dict [ int , Any ] = {}
175+ self .created_activity_types : set [ tuple [ str , str ]] = set ()
176176
177177 def reset (self ):
178178 """
@@ -223,15 +223,15 @@ def _make_task_id(
223223 # It makes the workflow resistant to retries or variations on the
224224 # same task name (see #11).
225225 arguments = json_dumps ({"args" : args , "kwargs" : kwargs })
226- suffix = hashlib .md5 (arguments .encode ("utf-8" )).hexdigest () # nosec
226+ suffix = hashlib .md5 (arguments .encode ()).hexdigest () # nosec
227227
228228 if isinstance (a_task , WorkflowTask ):
229229 # Some task types must have globally unique names.
230230 suffix = f"{ workflow_id } --{ hex_hash (run_id )} --{ suffix } "
231231
232232 task_id = f"{ a_task .name } -{ suffix } "
233233 if len (task_id ) > 256 : # Better safe than sorry...
234- task_id = task_id [0 :223 ] + "-" + hashlib .md5 (task_id .encode ("utf-8" )).hexdigest () # nosec
234+ task_id = task_id [0 :223 ] + "-" + hashlib .md5 (task_id .encode ()).hexdigest () # nosec
235235 return task_id
236236
237237 def _get_future_from_activity_event (self , event : dict [str , Any ]) -> futures .Future | None :
@@ -248,16 +248,7 @@ def _get_future_from_activity_event(self, event: dict[str, Any]) -> futures.Futu
248248 name = event ["activity_type" ]["name" ]
249249 version = event ["activity_type" ]["version" ]
250250 if event ["cause" ] == "ACTIVITY_TYPE_DOES_NOT_EXIST" and (name , version ) not in self .created_activity_types :
251- self .created_activity_types .add ((name , version ))
252- activity_type = simpleflow .swf .mapper .models .ActivityType (self .domain , name = name , version = version )
253- logger .info (f"creating activity type { activity_type .name } in domain { self .domain .name } " )
254- try :
255- activity_type .save ()
256- except simpleflow .swf .mapper .exceptions .AlreadyExistsError :
257- logger .info (
258- f"oops: Activity type { activity_type .name } in domain { self .domain .name } already exists,"
259- f" creation failed, continuing..."
260- )
251+ self .create_activity_type (name , version )
261252 return None
262253 logger .info (f"failed to schedule { name } : { event ['cause' ]} " )
263254 return None
@@ -283,6 +274,18 @@ def _get_future_from_activity_event(self, event: dict[str, Any]) -> futures.Futu
283274
284275 return future
285276
277+ def create_activity_type (self , name : str , version : str ) -> None :
278+ self .created_activity_types .add ((name , version ))
279+ activity_type = simpleflow .swf .mapper .models .ActivityType (self .domain , name = name , version = version )
280+ logger .info (f"creating activity type { activity_type .name } in domain { self .domain .name } " )
281+ try :
282+ activity_type .save ()
283+ except simpleflow .swf .mapper .exceptions .AlreadyExistsError :
284+ logger .info (
285+ f"oops: Activity type { activity_type .name } in domain { self .domain .name } already exists,"
286+ f" creation failed, continuing..."
287+ )
288+
286289 def _get_future_from_child_workflow_event (self , event : dict [str , Any ]) -> futures .Future | None :
287290 """Maps a child workflow event to a Future with the corresponding
288291 state.
@@ -797,7 +800,7 @@ def resume(
797800 # ... but only keep the event if the task was successful
798801 if former_event and former_event ["state" ] == "completed" :
799802 logger .info (f"faking task completed successfully in previous workflow: { former_event ['id' ]} " )
800- json_hash = hashlib .md5 (json_dumps (former_event ).encode ("utf-8" )).hexdigest () # nosec
803+ json_hash = hashlib .md5 (json_dumps (former_event ).encode ()).hexdigest () # nosec
801804 fake_task_list = "FAKE-" + json_hash
802805
803806 # schedule task on a fake task list
0 commit comments