Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ for file_name, file_url in notebook_files.items():
## Once installed, run this code to import the library into your notebook
```python
import sempy_labs as labs
from sempy_labs import migration, directlake, admin, graph
from sempy_labs import lakehouse as lake
from sempy_labs import report as rep
import sempy_labs.lakehouse as lake
import sempy_labs.report as rep
from sempy_labs import migration, directlake, admin, graph, mirrored_azure_databricks_catalog
from sempy_labs.tom import connect_semantic_model
from sempy_labs.report import ReportWrapper
from sempy_labs.report import connect_report
```

## Load Semantic Link Labs into a custom [Fabric environment](https://learn.microsoft.com/fabric/data-engineering/create-and-use-environment)
Expand Down
82 changes: 81 additions & 1 deletion src/sempy_labs/_dataflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
_conv_b64,
get_jsonpath_value,
)
from typing import Optional, Tuple
from typing import Optional, Tuple, List, Literal
import sempy_labs._icons as icons
from uuid import UUID
from jsonpath_ng.ext import parse
Expand Down Expand Up @@ -504,3 +504,83 @@ def create_dataflow(
print(
f"{icons.green_dot} The dataflow '{name}' has been created within the '{workspace_name}' workspace."
)


def run_dataflow(
dataflow: str | UUID,
workspace: Optional[str | UUID] = None,
job_type: Literal["Execute", "ApplyChanges"] = "Execute",
parameters: Optional[List[dict]] = None,
):
"""
Executes a dataflow.

This is a wrapper function for the following APIs: `Background Jobs - Run On Demand Execute <https://learn.microsoft.com/rest/api/fabric/dataflow/background-jobs/run-on-demand-execute>`_ and `Background Jobs - Run On Demand Apply Changes <https://learn.microsoft.com/rest/api/fabric/dataflow/background-jobs/run-on-demand-apply-changes>`_.

Parameters
----------
dataflow : str | uuid.UUID
The name or ID of the dataflow.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
job_type : Literal["Execute", "ApplyChanges"], default="Execute"
The type of job to run. Can be either "Execute" or "ApplyChanges".
parameters : List[dict], default=None
A list of parameters to pass to the dataflow. Example:
[
{
"parameterName": "OrderKey",
"type": "Automatic",
"value": 25
},
{
"parameterName": "Threshold",
"type": "Automatic",
"value": "start"
}
]
"""
if job_type not in ["Execute", "ApplyChanges"]:
raise ValueError(
f"{icons.red_dot} The job_type parameter must be either 'Execute' or 'ApplyChanges'."
)
if job_type == "ApplyChanges" and parameters:
print(
f"The job type is set to '{job_type}'. Parameters are not accepted for this job type."
)
return

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)

(dataflow_name, dataflow_id, generation) = (
_resolve_dataflow_name_and_id_and_generation(dataflow, workspace_id)
)

if generation != "Gen2 CI/CD":
print(
f"{icons.info} The dataflow '{dataflow_name}' is not a Fabric Dataflow Gen2 CI/CD item. This function only supports Dataflow Gen2 CI/CD."
)
return

payload = None
if parameters:
payload = {
"executionData": {
"executeOption": "ApplyChangesIfNeeded",
"parameters": parameters,
}
}

_base_api(
request=f"/v1/workspaces/{workspace_id}/dataflows/{dataflow_id}/jobs/instances?jobType={job_type}",
method="post",
payload=payload,
lro_return_json=True,
status_codes=[200, 202],
)

print(
f"{icons.green_dot} The dataflow '{dataflow_name}' has been run within the '{workspace_name}' workspace."
)