|
1 | 1 | import logging |
| 2 | +import json |
2 | 3 | import requests |
3 | 4 | import time |
4 | | -from typing import TYPE_CHECKING, Optional, Dict, Any |
5 | | -from functools import lru_cache |
| 5 | +from typing import TYPE_CHECKING, TypeVar, Callable, Optional, Dict, Any, List |
6 | 6 |
|
7 | 7 | from labelbox.exceptions import ResourceNotFoundError |
8 | 8 | from labelbox.orm.db_object import DbObject |
|
11 | 11 | if TYPE_CHECKING: |
12 | 12 | from labelbox import User |
13 | 13 |
|
| 14 | + def lru_cache() -> Callable[..., Callable[..., Dict[str, Any]]]: |
| 15 | + pass |
| 16 | +else: |
| 17 | + from functools import lru_cache |
| 18 | + |
14 | 19 | logger = logging.getLogger(__name__) |
15 | 20 |
|
16 | 21 |
|
@@ -73,33 +78,40 @@ def wait_till_done(self, timeout_seconds=300) -> None: |
73 | 78 | def errors(self) -> Optional[Dict[str, Any]]: |
74 | 79 | """ Downloads the result file from Task |
75 | 80 | """ |
76 | | - self.wait_till_done(timeout_seconds=600) |
77 | 81 | if self.status == "FAILED": |
78 | | - data = self._fetch_remote(self.result_url) |
79 | | - if data: |
80 | | - return data.get('error', None) |
81 | | - elif self.status == "IN_PROGRESS": |
82 | | - raise Exception("Job state IN_PROGRESS. Result not available.") |
| 82 | + result = self._fetch_remote_json() |
| 83 | + return result['error'] |
83 | 84 | return None |
84 | 85 |
|
85 | 86 | @property |
86 | | - def result(self) -> Dict[str, Any]: |
| 87 | + def result(self) -> List[Dict[str, Any]]: |
87 | 88 | """ Fetch the result for a task |
88 | 89 | """ |
89 | | - self.wait_till_done(timeout_seconds=600) |
90 | | - if self.status == "COMPLETE": |
91 | | - return self._fetch_remote(self.result_url) |
92 | | - elif self.status == "FAILED": |
93 | | - errors = self.errors |
94 | | - message = errors.get('message') or errors |
95 | | - raise Exception(f"Job failed. Errors : {message}") |
| 90 | + if self.status == "FAILED": |
| 91 | + raise ValueError(f"Job failed. Errors : {self.errors}") |
96 | 92 | else: |
97 | | - raise Exception("Job state IN_PROGRESS. Result not available.") |
| 93 | + result = self._fetch_remote_json() |
| 94 | + return [{ |
| 95 | + 'id': data_row['id'], |
| 96 | + 'external_id': data_row.get('externalId'), |
| 97 | + 'row_data': data_row['rowData'] |
| 98 | + } for data_row in result['createdDataRows']] |
98 | 99 |
|
99 | 100 | @lru_cache() |
100 | | - def _fetch_remote(self, result_url) -> Dict[str, Any]: |
| 101 | + def _fetch_remote_json(self) -> Dict[str, Any]: |
101 | 102 | """ Function for fetching and caching the result data. |
102 | 103 | """ |
103 | | - response = requests.get(result_url) |
| 104 | + if self.name != 'JSON Import': |
| 105 | + raise ValueError( |
| 106 | + "Task result is only supported for `JSON Import` tasks." |
| 107 | + " Download task.result_url manually to access the result for other tasks." |
| 108 | + ) |
| 109 | + self.wait_till_done(timeout_seconds=600) |
| 110 | + if self.status == "IN_PROGRESS": |
| 111 | + raise ValueError( |
| 112 | + "Job status still in `IN_PROGRESS`. The result is not available. Increase timeout or contact support." |
| 113 | + ) |
| 114 | + |
| 115 | + response = requests.get(self.result_url) |
104 | 116 | response.raise_for_status() |
105 | 117 | return response.json() |
0 commit comments