Skip to content

Commit 30d9131

Browse files
Feature:4015 Early stopping on unhealthy heartbeat
1 parent e89d745 commit 30d9131

File tree

4 files changed

+67
-3
lines changed

4 files changed

+67
-3
lines changed

src/zenml/config/step_configurations.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,13 @@ class StepConfigurationUpdate(FrozenBaseModel):
221221
"run inline unless a step operator or docker/resource settings "
222222
"are configured. This is only applicable for dynamic pipelines.",
223223
)
224+
heartbeat_healthy_threshold: int | None = Field(
225+
default=None,
226+
description="The amount of time (in minutes) that a running step "
227+
"has not received heartbeat and is considered healthy. Set null value"
228+
"disable healthiness checks via heartbeat.",
229+
ge=1,
230+
)
224231

225232
outputs: Mapping[str, PartialArtifactConfiguration] = {}
226233

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,20 @@ def stop_step(node: Node) -> None:
601601
)
602602
break
603603

604+
def is_node_heartbeat_unhealthy(node: Node) -> bool:
605+
from zenml.steps.heartbeat import is_heartbeat_unhealthy
606+
607+
sr_ = client.list_run_steps(
608+
name=node.id, pipeline_run_id=pipeline_run.id
609+
)
610+
611+
if sr_.items:
612+
sr_ = sr_[0]
613+
614+
return is_heartbeat_unhealthy(step_run=sr_)
615+
616+
return False
617+
604618
def check_job_status(node: Node) -> NodeStatus:
605619
"""Check the status of a job.
606620
@@ -641,6 +655,12 @@ def check_job_status(node: Node) -> NodeStatus:
641655
error_message,
642656
)
643657
return NodeStatus.FAILED
658+
elif is_node_heartbeat_unhealthy(node):
659+
logger.error(
660+
"Heartbeat for step `%s` indicates unhealthy status.",
661+
step_name,
662+
)
663+
return NodeStatus.FAILED
644664
else:
645665
return NodeStatus.RUNNING
646666

src/zenml/orchestrators/base_orchestrator.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,6 @@ def stop_run(
681681
Raises:
682682
IllegalOperationError: If the run has no orchestrator run id yet.
683683
"""
684-
685684
if not run.orchestrator_run_id:
686685
raise IllegalOperationError(
687686
"Cannot stop a pipeline run that has no orchestrator run id "
@@ -711,7 +710,6 @@ def _stop_run(
711710
graceful: If True, allows for graceful shutdown where possible.
712711
If False, forces immediate termination. Default is True.
713712
"""
714-
715713
if graceful:
716714
self._stop_run_gracefully(pipeline_run=run)
717715
else:
@@ -730,7 +728,6 @@ def _stop_run_gracefully(pipeline_run: "PipelineRunResponse") -> None:
730728
Raises:
731729
RuntimeError: If steps fail to be set to STOPPING or steps don't have heartbeat enabled.
732730
"""
733-
734731
from zenml.client import Client
735732
from zenml.models import StepRunUpdate
736733

src/zenml/steps/heartbeat.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717
import logging
1818
import threading
1919
import time
20+
from datetime import datetime, timezone
21+
from typing import TYPE_CHECKING
2022
from uuid import UUID
2123

2224
from zenml.enums import ExecutionStatus
2325

26+
if TYPE_CHECKING:
27+
from zenml.models import StepRunResponse
28+
2429
logger = logging.getLogger(__name__)
2530

2631

@@ -166,3 +171,38 @@ def _heartbeat(self) -> None:
166171
ExecutionStatus.STOPPING,
167172
}:
168173
self._terminated = True
174+
175+
176+
def is_heartbeat_unhealthy(step_run: "StepRunResponse") -> bool:
177+
"""Utility function - Checks if step heartbeats indicate un-healthy execution.
178+
179+
Args:
180+
step_run: Information regarding a step run.
181+
182+
Returns:
183+
True if the step heartbeat is unhealthy, False otherwise.
184+
"""
185+
if not step_run.spec.enable_heartbeat:
186+
return False
187+
188+
if not step_run.config.heartbeat_healthy_threshold:
189+
return False
190+
191+
if step_run.status.is_finished:
192+
heartbeat_diff = step_run.end_time - (
193+
step_run.latest_heartbeat or step_run.start_time
194+
)
195+
else:
196+
heartbeat_diff = datetime.now(tz=timezone.utc) - (
197+
step_run.latest_heartbeat or step_run.start_time
198+
)
199+
200+
logger.info("%s heartbeat diff=%s", step_run.name, heartbeat_diff)
201+
202+
if (
203+
heartbeat_diff.total_seconds()
204+
> step_run.config.heartbeat_healthy_threshold * 60
205+
):
206+
return True
207+
208+
return False

0 commit comments

Comments
 (0)