|
4 | 4 |
|
5 | 5 | from mindee.error.mindee_error import MindeeClientError, MindeeError |
6 | 6 | from mindee.error.mindee_http_error import handle_error |
| 7 | +from mindee.input import WorkflowOptions |
7 | 8 | from mindee.input.local_response import LocalResponse |
8 | 9 | from mindee.input.page_options import PageOptions |
9 | 10 | from mindee.input.sources import ( |
|
22 | 23 | is_valid_async_response, |
23 | 24 | is_valid_sync_response, |
24 | 25 | ) |
| 26 | +from mindee.mindee_http.workflow_endpoint import WorkflowEndpoint |
| 27 | +from mindee.mindee_http.workflow_settings import WorkflowSettings |
25 | 28 | from mindee.parsing.common.async_predict_response import AsyncPredictResponse |
26 | 29 | from mindee.parsing.common.feedback_response import FeedbackResponse |
27 | 30 | from mindee.parsing.common.inference import Inference |
28 | 31 | from mindee.parsing.common.predict_response import PredictResponse |
29 | 32 | from mindee.parsing.common.string_dict import StringDict |
| 33 | +from mindee.parsing.common.workflow_response import WorkflowResponse |
| 34 | +from mindee.product import GeneratedV1 |
30 | 35 |
|
31 | 36 | OTS_OWNER = "mindee" |
32 | 37 |
|
@@ -230,6 +235,41 @@ def parse_queued( |
230 | 235 |
|
231 | 236 | return self._get_queued_document(product_class, endpoint, queue_id) |
232 | 237 |
|
| 238 | + def execute_workflow( |
| 239 | + self, |
| 240 | + input_source: Union[LocalInputSource, UrlInputSource], |
| 241 | + workflow_id: str, |
| 242 | + options: Optional[WorkflowOptions] = None, |
| 243 | + page_options: Optional[PageOptions] = None, |
| 244 | + ) -> WorkflowResponse: |
| 245 | + """ |
| 246 | + Send the document to a workflow execution. |
| 247 | +
|
| 248 | + :param input_source: The document/source file to use. |
| 249 | + Has to be created beforehand. |
| 250 | + :param workflow_id: ID of the workflow. |
| 251 | + :param page_options: If set, remove pages from the document as specified. This is done before sending the file\ |
| 252 | + to the server. It is useful to avoid page limitations. |
| 253 | + :param options: Options for the workflow. |
| 254 | + :return: |
| 255 | + """ |
| 256 | + if isinstance(input_source, LocalInputSource): |
| 257 | + if page_options and input_source.is_pdf(): |
| 258 | + input_source.process_pdf( |
| 259 | + page_options.operation, |
| 260 | + page_options.on_min_pages, |
| 261 | + page_options.page_indexes, |
| 262 | + ) |
| 263 | + |
| 264 | + logger.debug("Sending document to workflow: %s", workflow_id) |
| 265 | + |
| 266 | + if not options: |
| 267 | + options = WorkflowOptions( |
| 268 | + alias=None, priority=None, full_text=False, public_url=None |
| 269 | + ) |
| 270 | + |
| 271 | + return self._send_to_workflow(GeneratedV1, input_source, workflow_id, options) |
| 272 | + |
233 | 273 | def _validate_async_params( |
234 | 274 | self, initial_delay_sec: float, delay_sec: float, max_retries: int |
235 | 275 | ) -> None: |
@@ -438,6 +478,44 @@ def _get_queued_document( |
438 | 478 |
|
439 | 479 | return AsyncPredictResponse(product_class, queue_response.json()) |
440 | 480 |
|
| 481 | + def _send_to_workflow( |
| 482 | + self, |
| 483 | + product_class: Type[Inference], |
| 484 | + input_source: Union[LocalInputSource, UrlInputSource], |
| 485 | + workflow_id: str, |
| 486 | + options: WorkflowOptions, |
| 487 | + ) -> WorkflowResponse: |
| 488 | + """ |
| 489 | + Sends a document to a workflow. |
| 490 | +
|
| 491 | + :param product_class: The document class to use. |
| 492 | + The response object will be instantiated based on this parameter. |
| 493 | +
|
| 494 | + :param input_source: The document/source file to use. |
| 495 | + Has to be created beforehand. |
| 496 | + :param workflow_id: ID of the workflow. |
| 497 | + :param options: Optional options for the workflow. |
| 498 | + :return: |
| 499 | + """ |
| 500 | + if input_source is None: |
| 501 | + raise MindeeClientError("No input document provided") |
| 502 | + |
| 503 | + workflow_endpoint = WorkflowEndpoint( |
| 504 | + WorkflowSettings(api_key=self.api_key, workflow_id=workflow_id) |
| 505 | + ) |
| 506 | + |
| 507 | + response = workflow_endpoint.workflow_execution_post(input_source, options) |
| 508 | + |
| 509 | + dict_response = response.json() |
| 510 | + |
| 511 | + if not is_valid_async_response(response): |
| 512 | + clean_response = clean_request_json(response) |
| 513 | + raise handle_error( |
| 514 | + str(product_class.endpoint_name), |
| 515 | + clean_response, |
| 516 | + ) |
| 517 | + return WorkflowResponse(product_class, dict_response) |
| 518 | + |
441 | 519 | def _initialize_ots_endpoint(self, product_class: Type[Inference]) -> Endpoint: |
442 | 520 | if product_class.__name__ == "CustomV1": |
443 | 521 | raise MindeeClientError("Missing endpoint specifications for custom build.") |
|
0 commit comments