From 9318a8469f881113b5731f6669445eddcab82515 Mon Sep 17 00:00:00 2001 From: dorzeidman Date: Fri, 7 Mar 2025 21:53:51 +0200 Subject: [PATCH 1/3] added extend least thread capabilites --- src/conductor/client/automator/task_runner.py | 114 +++++++++++++++++- .../client/http/models/task_result.py | 23 +++- src/conductor/client/worker/worker.py | 3 + .../client/worker/worker_interface.py | 9 ++ 4 files changed, 145 insertions(+), 4 deletions(-) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 187442944..18372e78f 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -1,6 +1,7 @@ import logging import os import sys +import threading import time import traceback @@ -11,7 +12,7 @@ from conductor.client.http.models.task import Task from conductor.client.http.models.task_exec_log import TaskExecLog from conductor.client.http.models.task_result import TaskResult -from conductor.client.http.rest import AuthorizationException +from conductor.client.http.rest import ApiException, AuthorizationException from conductor.client.telemetry.metrics_collector import MetricsCollector from conductor.client.worker.worker_interface import WorkerInterface @@ -123,10 +124,21 @@ def __execute_task(self, task: Task) -> TaskResult: task_definition_name=task_definition_name ) ) + + extend_lease_stop_event : threading.Event | None = None + try: + if self.worker.extend_lease_interval > 0: + extend_lease_stop_event = threading.Event() + self.__execute_extend_lease(task, task_definition_name, extend_lease_stop_event) + start_time = time.time() task_result = self.worker.execute(task) finish_time = time.time() + + if extend_lease_stop_event is not None: + extend_lease_stop_event.set() + time_spent = finish_time - start_time if self.metrics_collector is not None: self.metrics_collector.record_task_execute_time( @@ -145,6 +157,9 @@ def __execute_task(self, task: Task) -> TaskResult: ) ) except Exception as e: + if extend_lease_stop_event is not None: + extend_lease_stop_event.set() + if self.metrics_collector is not None: self.metrics_collector.increment_task_execution_error( task_definition_name, type(e) @@ -255,3 +270,100 @@ def __get_property_value_from_env(self, prop, task_type): key_upper = prefix.upper() + "_" + task_type + "_" + prop.upper() value = os.getenv(key_small, os.getenv(key_upper, value_all)) return value + + def __execute_extend_lease(self, task: Task, task_definition_name: str, stop_event: threading.Event): + interval = self.worker.extend_lease_interval + + task_result = TaskResult( + task_id=task.task_id, + workflow_instance_id=task.workflow_instance_id, + worker_id=self.worker.get_identity(), + extend_lease=True + ) + + def extend_lease_target(): + logger.debug( + 'Start Extend lease for task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( + task_id=task.task_id, + workflow_instance_id=task.workflow_instance_id, + task_definition_name=task_definition_name + )) + + while not stop_event.is_set(): + stop_event.wait(interval) + + if stop_event.is_set(): + break + + logger.debug( + 'Sending Extend lease for task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name + ) + ) + + try: + response = self.task_client.update_task(body=task_result) + + logger.debug( + 'Extend Lease for task sent, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + response=response + ) + ) + + except ApiException as ae: + if ae.status == 404: + logger.debug( + 'Extend Lease stopping because received a 404 response for, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + response=response + ) + ) + break + else: + if self.metrics_collector is not None: + self.metrics_collector.increment_task_update_error( + task_definition_name, type(e) + ) + logger.error( + 'Failed to extend task lease, id: {task_id}, workflow_instance_id: {workflow_instance_id}, ' + 'task_definition_name: {task_definition_name}, reason: {reason}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + reason=traceback.format_exc() + ) + ) + + except Exception as e: + if self.metrics_collector is not None: + self.metrics_collector.increment_task_update_error( + task_definition_name, type(e) + ) + logger.error( + 'Failed to extend task lease, id: {task_id}, workflow_instance_id: {workflow_instance_id}, ' + 'task_definition_name: {task_definition_name}, reason: {reason}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + reason=traceback.format_exc() + ) + ) + + logger.debug( + 'Extend lease for task ended, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( + task_id=task.task_id, + workflow_instance_id=task.workflow_instance_id, + task_definition_name=task_definition_name + )) + + + + thread = threading.Thread(target=extend_lease_target) + thread.start() diff --git a/src/conductor/client/http/models/task_result.py b/src/conductor/client/http/models/task_result.py index eaa0921d7..1f025ad21 100644 --- a/src/conductor/client/http/models/task_result.py +++ b/src/conductor/client/http/models/task_result.py @@ -28,7 +28,8 @@ class TaskResult(object): 'output_data': 'dict(str, object)', 'logs': 'list[TaskExecLog]', 'external_output_payload_storage_path': 'str', - 'sub_workflow_id': 'str' + 'sub_workflow_id': 'str', + 'extend_lease': 'bool' } attribute_map = { @@ -41,12 +42,13 @@ class TaskResult(object): 'output_data': 'outputData', 'logs': 'logs', 'external_output_payload_storage_path': 'externalOutputPayloadStoragePath', - 'sub_workflow_id': 'subWorkflowId' + 'sub_workflow_id': 'subWorkflowId', + 'extend_lease': 'extendLease' } def __init__(self, workflow_instance_id=None, task_id=None, reason_for_incompletion=None, callback_after_seconds=None, worker_id=None, status=None, output_data=None, logs=None, - external_output_payload_storage_path=None, sub_workflow_id=None): # noqa: E501 + external_output_payload_storage_path=None, sub_workflow_id=None, extend_lease=None): # noqa: E501 """TaskResult - a model defined in Swagger""" # noqa: E501 self._workflow_instance_id = None self._task_id = None @@ -77,6 +79,8 @@ def __init__(self, workflow_instance_id=None, task_id=None, reason_for_incomplet self.external_output_payload_storage_path = external_output_payload_storage_path if sub_workflow_id is not None: self.sub_workflow_id = sub_workflow_id + if extend_lease is not None: + self.extend_lease = extend_lease @property def workflow_instance_id(self): @@ -294,6 +298,19 @@ def sub_workflow_id(self, sub_workflow_id): self._sub_workflow_id = sub_workflow_id + @property + def extend_lease(self): + return self._extend_lease + + @extend_lease.setter + def extend_lease(self, extend_lease): + """Sets the extend_lease of this TaskResult. + :param extend_lease: The extend_lease of this TaskResult. # noqa: E501 + :type: str + """ + + self._extend_lease = extend_lease + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/src/conductor/client/worker/worker.py b/src/conductor/client/worker/worker.py index 121f5b984..548d9d603 100644 --- a/src/conductor/client/worker/worker.py +++ b/src/conductor/client/worker/worker.py @@ -53,6 +53,7 @@ def __init__(self, poll_interval: float = None, domain: str = None, worker_id: str = None, + extend_lease_interval: float = None ) -> Self: super().__init__(task_definition_name) self.api_client = ApiClient() @@ -66,6 +67,8 @@ def __init__(self, else: self.worker_id = deepcopy(worker_id) self.execute_function = deepcopy(execute_function) + if extend_lease_interval is None: + self.extend_lease_interval = 0 def execute(self, task: Task) -> TaskResult: task_input = {} diff --git a/src/conductor/client/worker/worker_interface.py b/src/conductor/client/worker/worker_interface.py index 08e95f9cf..8f49674cd 100644 --- a/src/conductor/client/worker/worker_interface.py +++ b/src/conductor/client/worker/worker_interface.py @@ -15,6 +15,7 @@ def __init__(self, task_definition_name: Union[str, list]): self._task_definition_name_cache = None self._domain = None self._poll_interval = DEFAULT_POLLING_INTERVAL + self._extend_lease_interval = 0 @abc.abstractmethod def execute(self, task: Task) -> TaskResult: @@ -117,3 +118,11 @@ def poll_interval(self): @poll_interval.setter def poll_interval(self, value): self._poll_interval = value + + @property + def extend_lease_interval(self): + return self._extend_lease_interval + + @extend_lease_interval.setter + def extend_lease_interval(self, value): + self._extend_lease_interval = value From 6e2e49a2b74b9a5db112ba36dc0d1df44da684da Mon Sep 17 00:00:00 2001 From: dorzeidman Date: Sun, 9 Mar 2025 21:13:40 +0200 Subject: [PATCH 2/3] impoved exception handling + retries --- src/conductor/client/automator/task_runner.py | 64 +++++++++---------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/src/conductor/client/automator/task_runner.py b/src/conductor/client/automator/task_runner.py index 18372e78f..c4c8d37f0 100644 --- a/src/conductor/client/automator/task_runner.py +++ b/src/conductor/client/automator/task_runner.py @@ -292,45 +292,56 @@ def extend_lease_target(): while not stop_event.is_set(): stop_event.wait(interval) - if stop_event.is_set(): - break + for attempt in range(4): + if stop_event.is_set(): + break - logger.debug( - 'Sending Extend lease for task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( - task_id=task_result.task_id, - workflow_instance_id=task_result.workflow_instance_id, - task_definition_name=task_definition_name - ) - ) - - try: - response = self.task_client.update_task(body=task_result) + if attempt > 0: + # Wait for [10s, 20s, 30s] before next attempt + time.sleep(attempt * 10) + + if stop_event.is_set(): + break logger.debug( - 'Extend Lease for task sent, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( + 'Sending Extend lease for task, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, - task_definition_name=task_definition_name, - response=response + task_definition_name=task_definition_name ) ) + + try: + response = self.task_client.update_task(body=task_result) - except ApiException as ae: - if ae.status == 404: logger.debug( - 'Extend Lease stopping because received a 404 response for, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( + 'Extend Lease for task sent, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( task_id=task_result.task_id, workflow_instance_id=task_result.workflow_instance_id, task_definition_name=task_definition_name, response=response ) ) + break - else: + except Exception as e: if self.metrics_collector is not None: self.metrics_collector.increment_task_update_error( task_definition_name, type(e) ) + + if isinstance(e, ApiException): + if e.status == 404: + logger.debug( + 'Extend Lease stopping because received a 404 response for, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}, response: {response}'.format( + task_id=task_result.task_id, + workflow_instance_id=task_result.workflow_instance_id, + task_definition_name=task_definition_name, + response=response + ) + ) + break + logger.error( 'Failed to extend task lease, id: {task_id}, workflow_instance_id: {workflow_instance_id}, ' 'task_definition_name: {task_definition_name}, reason: {reason}'.format( @@ -340,21 +351,6 @@ def extend_lease_target(): reason=traceback.format_exc() ) ) - - except Exception as e: - if self.metrics_collector is not None: - self.metrics_collector.increment_task_update_error( - task_definition_name, type(e) - ) - logger.error( - 'Failed to extend task lease, id: {task_id}, workflow_instance_id: {workflow_instance_id}, ' - 'task_definition_name: {task_definition_name}, reason: {reason}'.format( - task_id=task_result.task_id, - workflow_instance_id=task_result.workflow_instance_id, - task_definition_name=task_definition_name, - reason=traceback.format_exc() - ) - ) logger.debug( 'Extend lease for task ended, id: {task_id}, workflow_instance_id: {workflow_instance_id}, task_definition_name: {task_definition_name}'.format( From f936f6dd5d15eb0629e5847a18f1e50fc2e5b1ae Mon Sep 17 00:00:00 2001 From: dorzeidman Date: Sun, 9 Mar 2025 21:17:08 +0200 Subject: [PATCH 3/3] fix typo - type is bool --- src/conductor/client/http/models/task_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/conductor/client/http/models/task_result.py b/src/conductor/client/http/models/task_result.py index 1f025ad21..85aa52537 100644 --- a/src/conductor/client/http/models/task_result.py +++ b/src/conductor/client/http/models/task_result.py @@ -306,7 +306,7 @@ def extend_lease(self): def extend_lease(self, extend_lease): """Sets the extend_lease of this TaskResult. :param extend_lease: The extend_lease of this TaskResult. # noqa: E501 - :type: str + :type: bool """ self._extend_lease = extend_lease