-
Notifications
You must be signed in to change notification settings - Fork 559
Feature:4135 Orchestrator generic graceful stopping #4247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Feature:4135 Orchestrator generic graceful stopping #4247
Conversation
6dee130 to
9f6475d
Compare
9f6475d to
e89d745
Compare
| RuntimeError: If steps fail to be set to STOPPING or steps don't have heartbeat enabled. | ||
| """ | ||
|
|
||
| from zenml.client import Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, I think to avoid any race conditions and make this much simpler, we should set the pipeline run status instead and adjust the server-side logic to look at this one when deciding on the heartbeat response.
30d9131 to
5aba3ed
Compare
ZenML CLI Performance Comparison (Threshold: 1.0s, Timeout: 60s, Slow: 5s)❌ Failed Commands on Current Branch (feature/4135-generalised-early-stopping)
🚨 New Failures IntroducedThe following commands fail on your branch but worked on the target branch:
Performance Comparison
Summary
Environment Info
|
53c3806 to
d817e63
Compare
43b78fd to
e4a8533
Compare
- Early stopping on unhealthy heartbeat - Use pipeline status for heartbeat - Some extra utils and improvements
e4a8533 to
221ad75
Compare
221ad75 to
c2b3d01
Compare
| title="The substitutions of the step run.", | ||
| default={}, | ||
| ) | ||
| cached_heartbeat_threshold: Optional[int] = Field(title="", default=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing title here I think?
Also, I'm not sure what the cached in the name here indicates? We can also just call this heartbeat_threshold right? I'm wondering whether we even need this attribute on the response, or if it's enough just on the DB schema. When we have the model already, we might as well just do .config.heartbeat_threshold I assume?
| return False | ||
|
|
||
| if latest_heartbeat: | ||
| heartbeat_diff = datetime.now(tz=timezone.utc) - to_local_tz( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to convert to the local timezone here, and then subtract that from a utc timezone datetime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have both objects timezone-aware. Basically this part of the utiltiy:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
| ) | ||
| ) | ||
|
|
||
| cached_heartbeat_threshold: Optional[int] = Field(nullable=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call this just heartbeat_threshold, the cached_ IMO just confuses things. This is really just the configured threshold right, whether this is a duplicate from the config doesn't really play a role when using it?
| return items | ||
|
|
||
|
|
||
| def depaginate_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not used anymore? Feel free to leave it in though, I've always thought we should have an option to get them as an iterator, potentially even on our client methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yy decided to leave it a more memory-safe option.
| "substitutions": substitutions, | ||
| "cache_policy": cache_policy, | ||
| "runtime": runtime, | ||
| "heartbeat_healthy_threshold": heartbeat_healthy_threshold, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also BaseStep.with_options(...) that needs to have this new argument
| self._terminated = True | ||
|
|
||
|
|
||
| def cached_is_heartbeat_unhealthy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the cached in the name here either?
| "Heartbeat for step `%s` indicates unhealthy status.", | ||
| step_name, | ||
| ) | ||
| return NodeStatus.FAILED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we get to this point, it means the Kubernetes job is still actively running but for some reason the heartbeat stopped, correct? In that case, it would probably be a good idea to also stop the job from continuing?
| step_run_id=step_run_info.step_run_id, | ||
| step_run_update=StepRunUpdate( | ||
| exception_info=exception_info, | ||
| status=ExecutionStatus.STOPPING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you update to STOPPING here instead of STOPPED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic behind this is that step runner sets to stopping and the step launcher which could be in a different process does any post execution steps and sets to stopped. It could be set to stopped directly I guess but to me it makes more sense as status transitions go (it is truly stopped after the step-launcher handles the signal/interruption).
Describe changes
I implemented/fixed _ to achieve _.
Pre-requisites
Please ensure you have done the following:
developand the open PR is targetingdevelop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes