Skip to content

Commit b3a576a

Browse files
E2e configurable heartbeat_healthy_threshold
1 parent b8a7159 commit b3a576a

File tree

5 files changed

+20
-16
lines changed

5 files changed

+20
-16
lines changed

src/zenml/config/step_configurations.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,13 @@ class StepConfigurationUpdate(FrozenBaseModel):
221221
"run inline unless a step operator or docker/resource settings "
222222
"are configured. This is only applicable for dynamic pipelines.",
223223
)
224-
heartbeat_healthy_threshold: int | None = Field(
225-
default=None,
224+
heartbeat_healthy_threshold: int = Field(
225+
default=30,
226226
description="The amount of time (in minutes) that a running step "
227-
"has not received heartbeat and is considered healthy. Set null value"
228-
"disable healthiness checks via heartbeat.",
227+
"has not received heartbeat and is considered healthy. By default, "
228+
"set to the maximum value (30 minutes).",
229229
ge=1,
230+
le=30,
230231
)
231232

232233
outputs: Mapping[str, PartialArtifactConfiguration] = {}

src/zenml/steps/base_step.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ def __init__(
133133
substitutions: Optional[Dict[str, str]] = None,
134134
cache_policy: Optional[CachePolicyOrString] = None,
135135
runtime: Optional[StepRuntime] = None,
136+
heartbeat_healthy_threshold: Optional[int] = None,
136137
) -> None:
137138
"""Initializes a step.
138139
@@ -170,6 +171,9 @@ def __init__(
170171
run inline unless a step operator or docker/resource settings
171172
are configured. This is only applicable for dynamic
172173
pipelines.
174+
heartbeat_healthy_threshold: The amount of time (in minutes) that a
175+
running step has not received heartbeat and is considered healthy.
176+
By default, set to the maximum value (30 minutes).",
173177
"""
174178
from zenml.config.step_configurations import PartialStepConfiguration
175179

@@ -237,6 +241,7 @@ def __init__(
237241
substitutions=substitutions,
238242
cache_policy=cache_policy,
239243
runtime=runtime,
244+
heartbeat_healthy_threshold=heartbeat_healthy_threshold,
240245
)
241246

242247
notebook_utils.try_to_save_notebook_cell_code(self.source_object)
@@ -876,6 +881,7 @@ def configure(
876881
cache_policy: Optional[CachePolicyOrString] = None,
877882
runtime: Optional[StepRuntime] = None,
878883
merge: bool = True,
884+
heartbeat_healthy_threshold: Optional[int] = None,
879885
) -> T:
880886
"""Configures the step.
881887
@@ -925,6 +931,9 @@ def configure(
925931
configurations. If `False` the given configurations will
926932
overwrite all existing ones. See the general description of this
927933
method for an example.
934+
heartbeat_healthy_threshold: The amount of time (in minutes) that a
935+
running step has not received heartbeat and is considered healthy.
936+
By default, set to the maximum value (30 minutes).",
928937
929938
Returns:
930939
The step instance that this method was called on.
@@ -999,6 +1008,7 @@ def _convert_to_tuple(value: Any) -> Tuple[Source, ...]:
9991008
"substitutions": substitutions,
10001009
"cache_policy": cache_policy,
10011010
"runtime": runtime,
1011+
"heartbeat_healthy_threshold": heartbeat_healthy_threshold,
10021012
}
10031013
)
10041014
config = StepConfigurationUpdate(**values)

src/zenml/steps/heartbeat.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,6 @@ def is_heartbeat_unhealthy(step_run: "StepRunResponse") -> bool:
185185
if not step_run.spec.enable_heartbeat:
186186
return False
187187

188-
if not step_run.config.heartbeat_healthy_threshold:
189-
return False
190-
191188
if step_run.status.is_finished:
192189
return False
193190

src/zenml/steps/step_decorator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def step(
107107
substitutions: Optional[Dict[str, str]] = None,
108108
cache_policy: Optional["CachePolicyOrString"] = None,
109109
runtime: Optional[StepRuntime] = None,
110+
heartbeat_healthy_threshold: Optional[int] = None,
110111
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
111112
"""Decorator to create a ZenML step.
112113
@@ -146,6 +147,9 @@ def step(
146147
run inline unless a step operator or docker/resource settings
147148
are configured. This is only applicable for dynamic
148149
pipelines.
150+
heartbeat_healthy_threshold: The amount of time (in minutes) that a
151+
running step has not received heartbeat and is considered healthy.
152+
By default, set to the maximum value (30 minutes).",
149153
150154
Returns:
151155
The step instance.
@@ -184,6 +188,7 @@ def inner_decorator(func: "F") -> "BaseStep":
184188
substitutions=substitutions,
185189
cache_policy=cache_policy,
186190
runtime=runtime,
191+
heartbeat_healthy_threshold=heartbeat_healthy_threshold,
187192
)
188193

189194
return step_instance

tests/unit/steps/test_heartbeat_worker.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,6 @@ def now(cls, tz=None):
188188
step_run.body.latest_heartbeat = None
189189
assert not heartbeat.is_heartbeat_unhealthy(step_run)
190190

191-
# if no threshold set = healthy (default response)
192-
step_run.body.start_time = non_passable_diff_time
193-
step_run.body.latest_heartbeat = None
194-
assert heartbeat.is_heartbeat_unhealthy(step_run)
195-
step_run.metadata.config = StepConfiguration(
196-
name="test", heartbeat_healthy_threshold=None
197-
)
198-
assert not heartbeat.is_heartbeat_unhealthy(step_run)
199-
200191
# if step heartbeat not enabled = healthy (default response)
201192
step_run.metadata.config = StepConfiguration(
202193
name="test", heartbeat_healthy_threshold=5

0 commit comments

Comments
 (0)