Skip to content

Commit 18d335e

Browse files
authored
Store Kubernetes step job names in run metadata (#4180)
* Store Kubernetes step job names in run metadata * Tests * Mypy
1 parent b5ee953 commit 18d335e

File tree

3 files changed

+79
-5
lines changed

3 files changed

+79
-5
lines changed

src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from zenml.entrypoints.step_entrypoint_configuration import (
2929
StepEntrypointConfiguration,
3030
)
31-
from zenml.enums import ExecutionMode, ExecutionStatus
31+
from zenml.enums import ExecutionMode, ExecutionStatus, MetadataResourceTypes
3232
from zenml.exceptions import AuthorizationException
3333
from zenml.integrations.kubernetes import kube_utils
3434
from zenml.integrations.kubernetes.constants import (
@@ -61,6 +61,7 @@
6161
PipelineRunResponse,
6262
PipelineRunUpdate,
6363
PipelineSnapshotResponse,
64+
RunMetadataResource,
6465
)
6566
from zenml.orchestrators import publish_utils
6667
from zenml.orchestrators.step_run_utils import (
@@ -524,6 +525,16 @@ def start_step_job(node: Node) -> NodeStatus:
524525
job_manifest=job_manifest,
525526
)
526527

528+
Client().create_run_metadata(
529+
metadata={"step_jobs": {step_name: job_name}},
530+
resources=[
531+
RunMetadataResource(
532+
id=pipeline_run.id,
533+
type=MetadataResourceTypes.PIPELINE_RUN,
534+
)
535+
],
536+
)
537+
527538
node.metadata["job_name"] = job_name
528539

529540
return NodeStatus.RUNNING

src/zenml/zen_stores/schemas/utils.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,23 @@ def fetch_metadata(self, **kwargs: Any) -> Dict[str, MetadataType]:
127127
and the values represent the latest entry with this key.
128128
"""
129129
metadata_collection = self.fetch_metadata_collection(**kwargs)
130-
return {
131-
k: sorted(v, key=lambda x: x.created, reverse=True)[0].value
132-
for k, v in metadata_collection.items()
133-
}
130+
metadata: Dict[str, MetadataType] = {}
131+
132+
for key, values in metadata_collection.items():
133+
values = sorted(values, key=lambda x: x.created, reverse=False)
134+
135+
if all(isinstance(item.value, dict) for item in values):
136+
# All metadata values for this key are dictionaries, so we can
137+
# merge them into a single dictionary
138+
metadata[key] = {
139+
k: v
140+
for item in values
141+
for k, v in item.value.items() # type: ignore[union-attr]
142+
}
143+
else:
144+
metadata[key] = values[-1].value
145+
146+
return metadata
134147

135148

136149
def get_resource_type_name(schema_class: Type[BaseSchema]) -> str:

tests/integration/functional/test_client.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,56 @@ def test_create_run_metadata_for_artifact(clean_client_with_run: Client):
538538
assert rm["axel"] == "is awesome"
539539

540540

541+
def test_create_run_metadata_overwrite(clean_client_with_run: Client):
542+
"""Test creating run metadata with the same key but different value."""
543+
pipeline_run = clean_client_with_run.list_pipeline_runs()[0]
544+
545+
clean_client_with_run.create_run_metadata(
546+
metadata={"axel": "is not awesome"},
547+
resources=[
548+
RunMetadataResource(
549+
id=pipeline_run.id, type=MetadataResourceTypes.PIPELINE_RUN
550+
)
551+
],
552+
)
553+
clean_client_with_run.create_run_metadata(
554+
metadata={"axel": "is awesome"},
555+
resources=[
556+
RunMetadataResource(
557+
id=pipeline_run.id, type=MetadataResourceTypes.PIPELINE_RUN
558+
)
559+
],
560+
)
561+
562+
rm = clean_client_with_run.get_pipeline_run(pipeline_run.id).run_metadata
563+
assert rm["axel"] == "is awesome"
564+
565+
566+
def test_create_run_metadata_dict_merge(clean_client_with_run: Client):
567+
"""Test creating run metadata with the same key merges dictionaries."""
568+
pipeline_run = clean_client_with_run.list_pipeline_runs()[0]
569+
570+
clean_client_with_run.create_run_metadata(
571+
metadata={"key": {"a": 1}},
572+
resources=[
573+
RunMetadataResource(
574+
id=pipeline_run.id, type=MetadataResourceTypes.PIPELINE_RUN
575+
)
576+
],
577+
)
578+
clean_client_with_run.create_run_metadata(
579+
metadata={"key": {"a": 2, "b": 2}},
580+
resources=[
581+
RunMetadataResource(
582+
id=pipeline_run.id, type=MetadataResourceTypes.PIPELINE_RUN
583+
)
584+
],
585+
)
586+
587+
rm = clean_client_with_run.get_pipeline_run(pipeline_run.id).run_metadata
588+
assert rm["key"] == {"a": 2, "b": 2}
589+
590+
541591
# .---------.
542592
# | SECRETS |
543593
# '---------'

0 commit comments

Comments
 (0)