|
1 | 1 | import logging |
2 | 2 | import requests |
3 | 3 | import time |
4 | | -from typing import TYPE_CHECKING, Optional |
| 4 | +from typing import TYPE_CHECKING, Optional, Dict, Any |
| 5 | +from functools import lru_cache |
5 | 6 |
|
6 | 7 | from labelbox.exceptions import ResourceNotFoundError |
7 | 8 | from labelbox.orm.db_object import DbObject |
@@ -32,13 +33,14 @@ class Task(DbObject): |
32 | 33 | name = Field.String("name") |
33 | 34 | status = Field.String("status") |
34 | 35 | completion_percentage = Field.Float("completion_percentage") |
35 | | - result = Field.String("result") |
| 36 | + result_url = Field.String("result_url", "result") |
36 | 37 | _user: Optional["User"] = None |
37 | 38 |
|
38 | 39 | # Relationships |
39 | 40 | created_by = Relationship.ToOne("User", False, "created_by") |
40 | 41 | organization = Relationship.ToOne("Organization") |
41 | 42 |
|
| 43 | + |
42 | 44 | def refresh(self) -> None: |
43 | 45 | """ Refreshes Task data from the server. """ |
44 | 46 | assert self._user is not None |
@@ -68,12 +70,36 @@ def wait_till_done(self, timeout_seconds=300) -> None: |
68 | 70 | time.sleep(sleep_time_seconds) |
69 | 71 | self.refresh() |
70 | 72 |
|
71 | | - def errors(self): |
| 73 | + @property |
| 74 | + def errors(self) -> Optional[Dict[str, Any]]: |
72 | 75 | """ Downloads the result file from Task |
73 | 76 | """ |
74 | | - if self.status == "FAILED" and self.result: |
75 | | - response = requests.get(self.result) |
76 | | - response.raise_for_status() |
77 | | - data = response.json() |
78 | | - return data.get('error') |
| 77 | + self.wait_till_done(timeout_seconds = 600) |
| 78 | + if self.status == "FAILED": |
| 79 | + data = self._fetch_remote(self.result_url) |
| 80 | + if data: |
| 81 | + return data.get('error', None) |
| 82 | + elif self.status == "IN_PROGRESS": |
| 83 | + raise Exception("Job state IN_PROGRESS. Result not available.") |
79 | 84 | return None |
| 85 | + |
| 86 | + @property |
| 87 | + def result(self) -> Dict[str, Any]: |
| 88 | + """ Fetch the result for a task |
| 89 | + """ |
| 90 | + self.wait_till_done(timeout_seconds = 600) |
| 91 | + if self.status == "COMPLETE": |
| 92 | + return self._fetch_remote(self.result_url) |
| 93 | + elif self.status == "FAILED": |
| 94 | + raise Exception(f"Job failed. Errors : {self.errors()}") |
| 95 | + else: |
| 96 | + raise Exception("Job state IN_PROGRESS. Result not available.") |
| 97 | + |
| 98 | + @lru_cache() |
| 99 | + def _fetch_remote(self, result_url) -> Dict[str, Any]: |
| 100 | + """ Function for fetching and caching the result data. |
| 101 | + """ |
| 102 | + response = requests.get(result_url) |
| 103 | + response.raise_for_status() |
| 104 | + return response.json() |
| 105 | + |
0 commit comments