Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sempy_labs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
create_item_schedule_cron,
create_item_schedule_daily,
create_item_schedule_weekly,
cancel_item_job_instance,
)
from ._delta_analyzer import (
delta_analyzer,
Expand Down Expand Up @@ -529,6 +530,7 @@
"bind_semantic_model_to_gateway",
"list_semantic_model_errors",
"list_item_job_instances",
"cancel_item_job_instance",
"list_item_schedules",
"list_skus",
"list_skus_for_capacity",
Expand Down
90 changes: 90 additions & 0 deletions src/sempy_labs/_job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
)
from uuid import UUID
import sempy_labs._icons as icons
import time
from sempy.fabric.exceptions import FabricHTTPException


@log
Expand Down Expand Up @@ -527,3 +529,91 @@ def create_item_schedule_weekly(
print(
f"{icons.green_dot} The schedule for the '{item_name}' {type.lower()} has been created."
)


@log
def cancel_item_job_instance(
item: str | UUID,
job_instance_id: UUID,
type: Optional[str] = None,
workspace: Optional[str | UUID] = None,
):
"""
Cancel an item's job instance.

This is a wrapper function for the following API: `Job Scheduler - Cancel Item Job Instance <https://learn.microsoft.com/rest/api/fabric/core/job-scheduler/cancel-item-job-instance>`_.

Service Principal Authentication is supported (see `here <https://github.com/microsoft/semantic-link-labs/blob/main/notebooks/Service%20Principal.ipynb>`_ for examples).

Parameters
----------
item : str | uuid.UUID
The item name or ID.
job_instance_id : uuid.UUID
The job instance ID.
type : str, default=None
The item `type <https://learn.microsoft.com/rest/api/fabric/core/items/list-items?tabs=HTTP#itemtype>`_. If specifying the item name as the item, the item type is required.
workspace : str | uuid.UUID, default=None
The Fabric workspace name or ID used by the lakehouse.
Defaults to None which resolves to the workspace of the attached lakehouse
or if no lakehouse attached, resolves to the workspace of the notebook.
"""

(workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)
(item_name, item_id) = resolve_item_name_and_id(
item=item, type=type, workspace=workspace
)

try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the try/except here won't do much because the exception is already handled within the _base_api function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try/except will catch the exception thrown by _base_api function that is handled internally at _base_api and will "silence" the exception if it's 400 and JobAlreadyCompleted, if the exception is any other than this, it will reraise it with raise in the else statement.

This is how it looks when the _base_api returns 400 and JobAlreadyCompleted and we handle it:
image
So the only added benefit of this try/except is to make the code not fail in case we cancel an already finished/cancelled Job Instance.

This is how it looks when the _base_api returns an exception that we don't handle:
image
In this case, the exception is not the one we handle so we re-raise the exception cause by _base_api + output additional information to console.

Let me know if you'd like this to be changed or the try/except completely removed. The reasoning behind this was to make the code/notebook not fail if we try to cancel an already finished/cancelled Job Instance (this exception could also be handled by the end users themselves inside code/notebooks).

Copy link
Collaborator

@m-kovalsky m-kovalsky Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not use try/except as it can lead to strange things not being seen properly. What if instead we checked the status of the job and if it is 'Completed' just print an output saying the job is completed and then return. If the job is not completed it can simply run the cancel API step. But the check of the status should not loop. It should be a 1-time check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ay, can do that. The flow then would be to check the status of the Job Instance before running the cancel API.

Do you think this flow then makes sense?:

  1. Check status
  2. (if Completed/Failed/Cancelled) Stop and output that the Job is finished and return status.
  3. (else (not started / in progress) Run Cancel API
  4. Wait few seconds and check status and return
  5. (if Job is finished) output the job is cancelled + return status
  6. (if Job has not yet finished) output the job cancel is in progress + return status

At the end, as we don't do a loop, if the cancel operation takes a bit longer (and we don't catch it) it is left to end user to implement a wait (if it's needed).

P.S. This complication is only because we can't get information about cancel operation and we don't get Retry-After for example.

response = _base_api(
request=f"v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances/{job_instance_id}/cancel",
method="post",
status_codes=[200, 202],
client="fabric_sp",
)
except FabricHTTPException as e:
if (
e.status_code == 400
and e.response.json().get("errorCode") == "JobAlreadyCompleted"
):
# If Job Instance is already completed, skip the rest of the process and output the Job Instance details.
print(
f"{icons.green_dot} The Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has been already cancelled."
)

return _get_item_job_instance(
url=f"v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances/{job_instance_id}"
)
else:
print(
f"{icons.red_dot} An error occurred while cancelling '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace: {e}"
)
raise

print(
f"{icons.in_progress} The cancellation of Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has been initiated."
)

status_url = response.headers.get("Location").split("fabric.microsoft.com")[1]
status = None
while status not in ["Completed", "Failed", "Cancelled"]:
response = _base_api(request=status_url)
status = response.json().get("status")
time.sleep(3)

df = _get_item_job_instance(url=status_url)

# Check what is the final status of the Job Instance.
if status in ["Completed", "Failed", "Cancelled"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all 3 of these outcomes mean cancelled successfully? i don't think this is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know (might be wrong) but we don't have a separate API endpoint to check the status of Cancel operation for Job Instance's. Unlike with Long Running Operations (where we can get the status of the operation), cancelling a Job Instance does not return the operation in it's headers and instead returns only the location of the job instance.

So what I am checking is the status of the Job Instance from https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/get-item-job-instance?tabs=HTTP (returned also by the headers as location). We wait till the status of it is no longer not started / in progress.

For example, if we cancel an active notebook Spark session: the job instance is marked as "Completed"; if we cancel running pipeline: the job instance is marked as "Cancelled".

Do you know another approach perhaps?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the status shows as 'Cancelled' then the cancel job ran successfully. If the status shows as 'Failed' then the job failed. If the status shows as 'Completed' then the cancel job did not succeed and the job succeeded. Correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really (as far as I know). The status shows the status of the Job Instance itself and not the cancel operation. I have not found a way to retrieve the status of the cancel operation (not sure if there's one, not like in Long Running Operations. At least didn't found it in the response headers + documented APIs).

For example cancelling running Spark Job Instance (stopping) marks it as "Completed" instead of "Cancelled".

print(
f"{icons.green_dot} The Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has been cancelled successfully."
)
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 'else' portion would never actually be reached because it would still be in the while loop.

print(
f"Latest status of Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace: {status}"
)
print(
f"{icons.red_dot} The cancellation of Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has failed."
)

return df