-
Notifications
You must be signed in to change notification settings - Fork 155
Add function to cancel item job instance #952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,8 @@ | |
| ) | ||
| from uuid import UUID | ||
| import sempy_labs._icons as icons | ||
| import time | ||
| from sempy.fabric.exceptions import FabricHTTPException | ||
|
|
||
|
|
||
| @log | ||
|
|
@@ -527,3 +529,91 @@ def create_item_schedule_weekly( | |
| print( | ||
| f"{icons.green_dot} The schedule for the '{item_name}' {type.lower()} has been created." | ||
| ) | ||
|
|
||
|
|
||
| @log | ||
| def cancel_item_job_instance( | ||
| item: str | UUID, | ||
| job_instance_id: UUID, | ||
| type: Optional[str] = None, | ||
| workspace: Optional[str | UUID] = None, | ||
| ): | ||
| """ | ||
| Cancel an item's job instance. | ||
|
|
||
| This is a wrapper function for the following API: `Job Scheduler - Cancel Item Job Instance <https://learn.microsoft.com/rest/api/fabric/core/job-scheduler/cancel-item-job-instance>`_. | ||
|
|
||
| Service Principal Authentication is supported (see `here <https://github.com/microsoft/semantic-link-labs/blob/main/notebooks/Service%20Principal.ipynb>`_ for examples). | ||
|
|
||
| Parameters | ||
| ---------- | ||
| item : str | uuid.UUID | ||
| The item name or ID. | ||
| job_instance_id : uuid.UUID | ||
| The job instance ID. | ||
| type : str, default=None | ||
| The item `type <https://learn.microsoft.com/rest/api/fabric/core/items/list-items?tabs=HTTP#itemtype>`_. If specifying the item name as the item, the item type is required. | ||
| workspace : str | uuid.UUID, default=None | ||
| The Fabric workspace name or ID used by the lakehouse. | ||
| Defaults to None which resolves to the workspace of the attached lakehouse | ||
| or if no lakehouse attached, resolves to the workspace of the notebook. | ||
| """ | ||
|
|
||
| (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace) | ||
| (item_name, item_id) = resolve_item_name_and_id( | ||
| item=item, type=type, workspace=workspace | ||
| ) | ||
|
|
||
| try: | ||
| response = _base_api( | ||
| request=f"v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances/{job_instance_id}/cancel", | ||
| method="post", | ||
| status_codes=[200, 202], | ||
| client="fabric_sp", | ||
| ) | ||
| except FabricHTTPException as e: | ||
| if ( | ||
| e.status_code == 400 | ||
| and e.response.json().get("errorCode") == "JobAlreadyCompleted" | ||
| ): | ||
| # If Job Instance is already completed, skip the rest of the process and output the Job Instance details. | ||
| print( | ||
| f"{icons.green_dot} The Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has been already cancelled." | ||
| ) | ||
|
|
||
| return _get_item_job_instance( | ||
| url=f"v1/workspaces/{workspace_id}/items/{item_id}/jobs/instances/{job_instance_id}" | ||
| ) | ||
| else: | ||
| print( | ||
| f"{icons.red_dot} An error occurred while cancelling '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace: {e}" | ||
| ) | ||
| raise | ||
|
|
||
| print( | ||
| f"{icons.in_progress} The cancellation of Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has been initiated." | ||
| ) | ||
|
|
||
| status_url = response.headers.get("Location").split("fabric.microsoft.com")[1] | ||
| status = None | ||
| while status not in ["Completed", "Failed", "Cancelled"]: | ||
| response = _base_api(request=status_url) | ||
| status = response.json().get("status") | ||
| time.sleep(3) | ||
|
|
||
| df = _get_item_job_instance(url=status_url) | ||
|
|
||
| # Check what is the final status of the Job Instance. | ||
| if status in ["Completed", "Failed", "Cancelled"]: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all 3 of these outcomes mean cancelled successfully? i don't think this is correct.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I know (might be wrong) but we don't have a separate API endpoint to check the status of Cancel operation for Job Instance's. Unlike with Long Running Operations (where we can get the status of the operation), cancelling a Job Instance does not return the operation in it's headers and instead returns only the location of the job instance. So what I am checking is the status of the Job Instance from https://learn.microsoft.com/en-us/rest/api/fabric/core/job-scheduler/get-item-job-instance?tabs=HTTP (returned also by the headers as location). We wait till the status of it is no longer not started / in progress. For example, if we cancel an active notebook Spark session: the job instance is marked as "Completed"; if we cancel running pipeline: the job instance is marked as "Cancelled". Do you know another approach perhaps?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the status shows as 'Cancelled' then the cancel job ran successfully. If the status shows as 'Failed' then the job failed. If the status shows as 'Completed' then the cancel job did not succeed and the job succeeded. Correct?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really (as far as I know). The status shows the status of the Job Instance itself and not the cancel operation. I have not found a way to retrieve the status of the cancel operation (not sure if there's one, not like in Long Running Operations. At least didn't found it in the response headers + documented APIs). For example cancelling running Spark Job Instance (stopping) marks it as "Completed" instead of "Cancelled". |
||
| print( | ||
| f"{icons.green_dot} The Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has been cancelled successfully." | ||
| ) | ||
| else: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the 'else' portion would never actually be reached because it would still be in the while loop. |
||
| print( | ||
| f"Latest status of Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace: {status}" | ||
| ) | ||
| print( | ||
| f"{icons.red_dot} The cancellation of Job Instance '{job_instance_id}' of '{item_name}' {type.lower()} within the '{workspace_name}' workspace has failed." | ||
| ) | ||
|
|
||
| return df | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the try/except here won't do much because the exception is already handled within the _base_api function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try/except will catch the exception thrown by
_base_apifunction that is handled internally at_base_apiand will "silence" the exception if it's400andJobAlreadyCompleted, if the exception is any other than this, it will reraise it withraisein the else statement.This is how it looks when the

_base_apireturns400andJobAlreadyCompletedand we handle it:So the only added benefit of this try/except is to make the code not fail in case we cancel an already finished/cancelled Job Instance.
This is how it looks when the

_base_apireturns an exception that we don't handle:In this case, the exception is not the one we handle so we re-raise the exception cause by
_base_api+ output additional information to console.Let me know if you'd like this to be changed or the try/except completely removed. The reasoning behind this was to make the code/notebook not fail if we try to cancel an already finished/cancelled Job Instance (this exception could also be handled by the end users themselves inside code/notebooks).
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not use try/except as it can lead to strange things not being seen properly. What if instead we checked the status of the job and if it is 'Completed' just print an output saying the job is completed and then return. If the job is not completed it can simply run the cancel API step. But the check of the status should not loop. It should be a 1-time check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ay, can do that. The flow then would be to check the status of the Job Instance before running the cancel API.
Do you think this flow then makes sense?:
At the end, as we don't do a loop, if the cancel operation takes a bit longer (and we don't catch it) it is left to end user to implement a wait (if it's needed).
P.S. This complication is only because we can't get information about cancel operation and we don't get
Retry-Afterfor example.