Skip to content

Commit 4fc7ef4

Browse files
committed
Merge branch 'feature/improve-cli-tables' into experiment/try-partial-selective-ci-tests
2 parents 5254d53 + f11d4f5 commit 4fc7ef4

File tree

24 files changed

+965
-370
lines changed

24 files changed

+965
-370
lines changed

.github/workflows/claude-code-review.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
needs.check-team-member.outputs.is-team-member == 'true' &&
3131
github.actor != 'github-actions[bot]' &&
3232
github.event.issue.pull_request &&
33-
contains(github.event.comment.body, '/claude /full-review')
33+
contains(github.event.comment.body, '@claude /full-review')
3434
runs-on: ubuntu-latest
3535
timeout-minutes: 10
3636
concurrency:

.github/workflows/claude.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ jobs:
3636
needs.check-team-member.outputs.is-team-member == 'true' &&
3737
github.actor != 'github-actions[bot]' &&
3838
(
39-
(github.event_name == 'issue_comment' && contains(github.event.comment.body, '/claude') && !contains(github.event.comment.body, '/claude /full-review')) ||
40-
(github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '/claude') && !contains(github.event.comment.body, '/claude /full-review')) ||
41-
(github.event_name == 'pull_request_review' && contains(github.event.review.body, '/claude') && !contains(github.event.review.body, '/claude /full-review')) ||
42-
(github.event_name == 'issues' && (contains(github.event.issue.body, '/claude') || contains(github.event.issue.title, '/claude')))
39+
(github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude') && !contains(github.event.comment.body, '@claude /full-review')) ||
40+
(github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude') && !contains(github.event.comment.body, '@claude /full-review')) ||
41+
(github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude') && !contains(github.event.review.body, '@claude /full-review')) ||
42+
(github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude')))
4343
)
4444
runs-on: ubuntu-latest
4545
timeout-minutes: 10

docs/book/how-to/steps-pipelines/dynamic_pipelines.md

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description: Write dynamic pipelines
55
# Dynamic Pipelines (Experimental)
66

77
{% hint style="warning" %}
8-
**Experimental Feature**: Dynamic pipelines are currently an experimental feature. There are known issues and limitations, and the interface is subject to change. This feature is only supported by the `local` and `kubernetes` orchestrators. If you encounter any issues or have feedback, please let us know at [https://github.com/zenml-io/zenml/issues](https://github.com/zenml-io/zenml/issues).
8+
**Experimental Feature**: Dynamic pipelines are currently an experimental feature. There are known issues and limitations, and the interface is subject to change. This feature is only supported by the `local`, `kubernetes`, `sagemaker` and `vertex` orchestrators. If you encounter any issues or have feedback, please let us know at [https://github.com/zenml-io/zenml/issues](https://github.com/zenml-io/zenml/issues).
99
{% endhint %}
1010

1111
{% hint style="info" %}
@@ -180,6 +180,39 @@ def unmapped_example():
180180
consumer.map(a=a, b=unmapped(b))
181181
```
182182

183+
#### Unpacking mapped outputs
184+
185+
If a mapped step returns multiple outputs, you can split them into separate lists (one per output) using `unpack()`. This returns a tuple of lists of artifact futures, aligned by mapped invocation.
186+
187+
```python
188+
from zenml import pipeline, step
189+
190+
@step
191+
def create_int_list() -> list[int]:
192+
return [1, 2]
193+
194+
@step
195+
def compute(a: int) -> tuple[int, int]:
196+
return a * 2, a * 3
197+
198+
@pipeline(dynamic=True)
199+
def map_pipeline():
200+
ints = create_int_list()
201+
results = compute.map(a=ints) # Map over [1, 2]
202+
203+
# Unpack per-output across all mapped invocations
204+
double, triple = results.unpack()
205+
206+
# Each element is an ArtifactFuture; load to get concrete values
207+
doubles = [f.load() for f in double] # [2, 4]
208+
triples = [f.load() for f in triple] # [3, 6]
209+
```
210+
211+
Notes:
212+
- `results` is a future that refers to all outputs of all steps, and `unpack()` works for both `.map(...)` and `.product(...)`.
213+
- Each list contains future objects that refer to a single artifact.
214+
215+
183216
### Parallel Step Execution
184217

185218
Dynamic pipelines support true parallel execution using `step.submit()`. This method returns a `StepRunFuture` that you can use to wait for results or pass to downstream steps:
@@ -276,26 +309,11 @@ When running multiple steps concurrently using `step.submit()`, a failure in one
276309
Dynamic pipelines are currently only supported by:
277310
- `local` orchestrator
278311
- `kubernetes` orchestrator
312+
- `sagemaker` orchestrator
313+
- `vertex` orchestrator
279314

280315
Other orchestrators will raise an error if you try to run a dynamic pipeline with them.
281316

282-
### Remote Execution Requirement
283-
284-
When running dynamic pipelines remotely (e.g., with the `kubernetes` orchestrator), you **must** include `depends_on` for at least one step in your pipeline definition. This is currently required due to a bug in remote execution.
285-
286-
{% hint style="warning" %}
287-
**Required for Remote Execution**: Without `depends_on`, remote execution will fail. This requirement does not apply when running locally with the `local` orchestrator.
288-
{% endhint %}
289-
290-
For example:
291-
292-
```python
293-
@pipeline(dynamic=True, depends_on=[some_step])
294-
def dynamic_pipeline():
295-
some_step()
296-
# ... rest of your pipeline
297-
```
298-
299317
### Artifact Loading
300318

301319
When you call `.load()` on an artifact in a dynamic pipeline, it synchronously loads the data. For large artifacts or when you want to maintain parallelism, consider passing the step outputs (future or artifact) directly to downstream steps instead of loading them.

src/zenml/cli/utils.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3215,7 +3215,7 @@ def handle_output(
32153215
logger.warning("Failed to write clean output: %s", err)
32163216
print(cli_output)
32173217

3218-
if page:
3218+
if page and output_format == "table":
32193219
print_page_info(page)
32203220

32213221

@@ -3321,16 +3321,27 @@ def prepare_output(
33213321
def _syntax_highlight(content: str, lexer: str) -> str:
33223322
"""Apply syntax highlighting to content if colors are enabled.
33233323
3324+
Syntax highlighting is only applied when output goes to an interactive
3325+
terminal (TTY). When output is redirected to a file or piped to another
3326+
program, plain text is returned to ensure machine-readable output.
3327+
33243328
Args:
33253329
content: The text content to highlight
33263330
lexer: The lexer to use (e.g., "json", "yaml")
33273331
33283332
Returns:
3329-
Syntax-highlighted string if colors enabled, otherwise original content
3333+
Syntax-highlighted string if colors enabled and output is a TTY,
3334+
otherwise the original content unchanged.
33303335
"""
33313336
if os.getenv("NO_COLOR"):
33323337
return content
33333338

3339+
# Import here to avoid circular imports at module load time
3340+
from zenml_cli import is_terminal_output
3341+
3342+
if not is_terminal_output():
3343+
return content
3344+
33343345
syntax = Syntax(
33353346
content, lexer, theme="ansi_dark", background_color="default"
33363347
)

src/zenml/config/docker_settings.py

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,129 @@ class PythonPackageInstaller(Enum):
5555
UV = "uv"
5656

5757

58+
# (docker_sdk_argument, attribute_name)
59+
BUILD_OPTION_CONVERSIONS = [
60+
("buildargs", "build_args"),
61+
("cachefrom", "cache_from"),
62+
("nocache", "no_cache"),
63+
("shmsize", "shm_size"),
64+
]
65+
66+
67+
class DockerBuildOptions(BaseModel):
68+
"""Docker build options.
69+
70+
This class only specifies a subset of the options which require explicit
71+
conversion as they require different names in the Docker CLI and Python SDK.
72+
However, you can still specify any other options as extra model attributes
73+
which will be passed unmodified to the build method of the image builder.
74+
"""
75+
76+
pull: Optional[bool] = None
77+
rm: Optional[bool] = None
78+
no_cache: Optional[bool] = None
79+
shm_size: Optional[int] = None
80+
labels: Optional[Dict[str, Any]] = None
81+
build_args: Optional[Dict[str, Any]] = None
82+
cache_from: Optional[List[str]] = None
83+
84+
model_config = ConfigDict(extra="allow")
85+
86+
@model_validator(mode="before")
87+
@classmethod
88+
@before_validator_handler
89+
def _migrate_sdk_arguments(cls, data: Dict[str, Any]) -> Dict[str, Any]:
90+
"""Migrate Docker SDK arguments to attributes.
91+
92+
Args:
93+
data: The model data.
94+
95+
Returns:
96+
The migrated data.
97+
"""
98+
for sdk_argument, attribute_name in BUILD_OPTION_CONVERSIONS:
99+
if sdk_argument in data and attribute_name not in data:
100+
data[attribute_name] = data.pop(sdk_argument)
101+
102+
return data
103+
104+
def to_docker_cli_options(self) -> List[str]:
105+
"""Convert the build options to a list of Docker CLI options.
106+
107+
https://docs.docker.com/reference/cli/docker/buildx/build/#options
108+
109+
Returns:
110+
A list of Docker CLI options.
111+
"""
112+
options = []
113+
if self.pull:
114+
options.append("--pull")
115+
if self.rm:
116+
options.append("--rm")
117+
if self.no_cache:
118+
options.append("--no-cache")
119+
if self.shm_size:
120+
options.extend(["--shm-size", str(self.shm_size)])
121+
if self.labels:
122+
for key, value in self.labels.items():
123+
options.extend(["--label", f"{key}={value}"])
124+
if self.build_args:
125+
for key, value in self.build_args.items():
126+
options.extend(["--build-arg", f"{key}={value}"])
127+
if self.cache_from:
128+
for value in self.cache_from:
129+
options.extend(["--cache-from", value])
130+
131+
if self.model_extra:
132+
for key, value in self.model_extra.items():
133+
option = f"--{key.replace('_', '-')}"
134+
if isinstance(value, Dict):
135+
for key, value in value.items():
136+
options.extend([option, f"{key}={value}"])
137+
elif isinstance(value, List):
138+
for val in value:
139+
options.extend([option, str(val)])
140+
elif value in (True, None):
141+
options.extend([option])
142+
elif value is False:
143+
pass
144+
else:
145+
options.extend([option, str(value)])
146+
147+
return options
148+
149+
def to_docker_python_sdk_options(self) -> Dict[str, Any]:
150+
"""Get the build options as a dictionary of Docker Python SDK options.
151+
152+
https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
153+
154+
Returns:
155+
A dictionary of Docker Python SDK options.
156+
"""
157+
options = self.model_dump(exclude_unset=True)
158+
for sdk_argument, attribute_name in BUILD_OPTION_CONVERSIONS:
159+
if attribute_name in options:
160+
options[sdk_argument] = options.pop(attribute_name)
161+
162+
return options
163+
164+
58165
class DockerBuildConfig(BaseModel):
59166
"""Configuration for a Docker build.
60167
61168
Attributes:
62-
build_options: Additional options that will be passed unmodified to the
63-
Docker build call when building an image. You can use this to for
64-
example specify build args or a target stage. See
65-
https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
66-
for a full list of available options.
169+
build_options: Additional options that will be passed when building an
170+
image. Depending on the image builder that is used, different
171+
options are available.
172+
For image builders that use the Docker CLI:
173+
- https://docs.docker.com/reference/cli/docker/buildx/build/#options
174+
For image builders that use the Docker Python SDK:
175+
- https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
67176
dockerignore: Path to a dockerignore file to use when building the
68177
Docker image.
69178
"""
70179

71-
build_options: Dict[str, Any] = {}
180+
build_options: Optional[DockerBuildOptions] = None
72181
dockerignore: Optional[str] = None
73182

74183

src/zenml/execution/pipeline/dynamic/outputs.py

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"""Dynamic pipeline execution outputs."""
1515

1616
from concurrent.futures import Future
17-
from typing import Any, List, Optional, Tuple, Union, overload
17+
from typing import Any, Iterator, List, Optional, Tuple, Union, overload
1818

1919
from zenml.logger import get_logger
2020
from zenml.models import ArtifactVersionResponse
@@ -270,4 +270,92 @@ def __len__(self) -> int:
270270
return len(self._output_keys)
271271

272272

273-
StepRunFuture = Union[ArtifactFuture, StepRunOutputsFuture]
273+
class MapResultsFuture:
274+
"""Future that represents the results of a `step.map/product(...)` call."""
275+
276+
def __init__(self, futures: List[StepRunOutputsFuture]) -> None:
277+
"""Initialize the map results future.
278+
279+
Args:
280+
futures: The step run futures.
281+
"""
282+
self.futures = futures
283+
284+
def result(self) -> List[StepRunOutputs]:
285+
"""Get the step run outputs this future represents.
286+
287+
Returns:
288+
The step run outputs.
289+
"""
290+
return [future.result() for future in self.futures]
291+
292+
def unpack(self) -> Tuple[List[ArtifactFuture], ...]:
293+
"""Unpack the map results future.
294+
295+
This method can be used to get lists of artifact futures that represent
296+
the outputs of all the step runs that are part of this map result.
297+
298+
Example:
299+
```python
300+
from zenml import pipeline, step
301+
302+
@step
303+
def create_int_list() -> list[int]:
304+
return [1, 2]
305+
306+
@step
307+
def do_something(a: int) -> Tuple[int, int]:
308+
return a * 2, a * 3
309+
310+
@pipeline
311+
def map_pipeline():
312+
int_list = create_int_list()
313+
results = do_something.map(a=int_list)
314+
double, triple = results.unpack()
315+
316+
# [future.load() for future in double] will return [2, 4]
317+
# [future.load() for future in triple] will return [3, 6]
318+
```
319+
320+
Returns:
321+
The unpacked map results.
322+
"""
323+
return tuple(map(list, zip(*self.futures)))
324+
325+
@overload
326+
def __getitem__(self, key: int) -> StepRunOutputsFuture: ...
327+
328+
@overload
329+
def __getitem__(self, key: slice) -> List[StepRunOutputsFuture]: ...
330+
331+
def __getitem__(
332+
self, key: Union[int, slice]
333+
) -> Union[StepRunOutputsFuture, List[StepRunOutputsFuture]]:
334+
"""Get a step run future.
335+
336+
Args:
337+
key: The index or slice of the step run futures.
338+
339+
Returns:
340+
The step run futures.
341+
"""
342+
return self.futures[key]
343+
344+
def __iter__(self) -> Iterator[StepRunOutputsFuture]:
345+
"""Iterate over the step run futures.
346+
347+
Yields:
348+
The step run futures.
349+
"""
350+
yield from self.futures
351+
352+
def __len__(self) -> int:
353+
"""Get the number of step run futures.
354+
355+
Returns:
356+
The number of step run futures.
357+
"""
358+
return len(self.futures)
359+
360+
361+
StepRunFuture = Union[ArtifactFuture, StepRunOutputsFuture, MapResultsFuture]

0 commit comments

Comments
 (0)