Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions docs/book/how-to/steps-pipelines/dynamic_pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description: Write dynamic pipelines
# Dynamic Pipelines (Experimental)

{% hint style="warning" %}
**Experimental Feature**: Dynamic pipelines are currently an experimental feature. There are known issues and limitations, and the interface is subject to change. This feature is only supported by the `local` and `kubernetes` orchestrators. If you encounter any issues or have feedback, please let us know at [https://github.com/zenml-io/zenml/issues](https://github.com/zenml-io/zenml/issues).
**Experimental Feature**: Dynamic pipelines are currently an experimental feature. There are known issues and limitations, and the interface is subject to change. This feature is only supported by the `local`, `kubernetes`, `sagemaker` and `vertex` orchestrators. If you encounter any issues or have feedback, please let us know at [https://github.com/zenml-io/zenml/issues](https://github.com/zenml-io/zenml/issues).
{% endhint %}

{% hint style="info" %}
Expand Down Expand Up @@ -265,26 +265,11 @@ When running multiple steps concurrently using `step.submit()`, a failure in one
Dynamic pipelines are currently only supported by:
- `local` orchestrator
- `kubernetes` orchestrator
- `sagemaker` orchestrator
- `vertex` orchestrator

Other orchestrators will raise an error if you try to run a dynamic pipeline with them.

### Remote Execution Requirement

When running dynamic pipelines remotely (e.g., with the `kubernetes` orchestrator), you **must** include `depends_on` for at least one step in your pipeline definition. This is currently required due to a bug in remote execution.

{% hint style="warning" %}
**Required for Remote Execution**: Without `depends_on`, remote execution will fail. This requirement does not apply when running locally with the `local` orchestrator.
{% endhint %}

For example:

```python
@pipeline(dynamic=True, depends_on=[some_step])
def dynamic_pipeline():
some_step()
# ... rest of your pipeline
```

### Artifact Loading

When you call `.load()` on an artifact in a dynamic pipeline, it synchronously loads the data. For large artifacts or when you want to maintain parallelism, consider passing the step outputs (future or artifact) directly to downstream steps instead of loading them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,10 +1050,10 @@ def _wait_for_completion() -> None:
metadata=metadata,
)

def launch_dynamic_step(
def run_isolated_step(
self, step_run_info: "StepRunInfo", environment: Dict[str, str]
) -> None:
"""Launch a dynamic step.
"""Runs an isolated step on Sagemaker.

Args:
step_run_info: The step run information.
Expand Down
25 changes: 25 additions & 0 deletions src/zenml/integrations/gcp/google_credentials_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class GoogleCredentialsConfigMixin(StackComponentConfig):
class GoogleCredentialsMixin(StackComponent):
"""StackComponent mixin to get Google Cloud Platform credentials."""

_gcp_credentials: Optional["Credentials"] = None
_gcp_project_id: Optional[str] = None

@property
def config(self) -> GoogleCredentialsConfigMixin:
"""Returns the `GoogleCredentialsConfigMixin` config.
Expand All @@ -57,6 +60,18 @@ def config(self) -> GoogleCredentialsConfigMixin:
"""
return cast(GoogleCredentialsConfigMixin, self._config)

@property
def gcp_project_id(self) -> str:
"""Get the GCP project ID.

Returns:
The GCP project ID.
"""
if self._gcp_project_id is None:
_, self._gcp_project_id = self._get_authentication()

return self._gcp_project_id

def _get_authentication(self) -> Tuple["Credentials", str]:
"""Get GCP credentials and the project ID associated with the credentials.

Expand All @@ -79,6 +94,12 @@ def _get_authentication(self) -> Tuple["Credentials", str]:
GCPServiceConnector,
)

if self.connector_has_expired():
self._gcp_credentials = None

if self._gcp_credentials and self._gcp_project_id:
return self._gcp_credentials, self._gcp_project_id

connector = self.get_connector()
if connector:
credentials = connector.connect()
Expand All @@ -90,6 +111,8 @@ def _get_authentication(self) -> Tuple["Credentials", str]:
"trying to use the linked connector, but got "
f"{type(credentials)}."
)
self._gcp_credentials = credentials
self._gcp_project_id = connector.config.gcp_project_id
return credentials, connector.config.gcp_project_id

if self.config.service_account_path:
Expand All @@ -111,4 +134,6 @@ def _get_authentication(self) -> Tuple["Credentials", str]:
# If the project was set in the configuration, use it. Otherwise, use
# the project that was used to authenticate.
project_id = self.config.project if self.config.project else project_id
self._gcp_credentials = credentials
self._gcp_project_id = project_id
return credentials, project_id
Loading
Loading