Skip to content

Commit db4e5ba

Browse files
Merge pull request #919 from Labelbox/rsun/QQC-1484
[QQC-1484] Support move to task action
2 parents fd3e888 + 7195550 commit db4e5ba

File tree

9 files changed

+246
-61
lines changed

9 files changed

+246
-61
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
## Added
55
* All imports are available via `import labelbox as lb` and `import labelbox.types as lb_types`.
66
* Attachment_name support to create_attachment()
7+
* New method `Project.task_queues()` to obtain the task queues for a project.
8+
* New method `Project.move_data_rows_to_task_queue()` for moving data rows to a specified task queue.
79

810
## Changed
911
* `LabelImport.create_from_objects()`, `MALPredictionImport.create_from_objects()`, `MEAPredictionImport.create_from_objects()`, `Project.upload_annotations()`, `ModelRun.add_predictions()` now support Python Types for annotations.

docs/source/index.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ Task
118118
:members:
119119
:show-inheritance:
120120

121+
Task Queue
122+
---------------------------
123+
.. automodule:: labelbox.schema.task_queue
124+
:members:
125+
:show-inheritance:
126+
121127
User
122128
---------------------------
123129

labelbox/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@
2929
from labelbox.schema.media_type import MediaType
3030
from labelbox.schema.slice import Slice, CatalogSlice
3131
from labelbox.schema.queue_mode import QueueMode
32+
from labelbox.schema.task_queue import TaskQueue

labelbox/orm/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ class Entity(metaclass=EntityMeta):
378378
Project: Type[labelbox.Project]
379379
Batch: Type[labelbox.Batch]
380380
CatalogSlice: Type[labelbox.CatalogSlice]
381+
TaskQueue: Type[labelbox.TaskQueue]
381382

382383
@classmethod
383384
def _attributes_of_type(cls, attr_type):

labelbox/schema/project.py

Lines changed: 102 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from labelbox.schema.resource_tag import ResourceTag
2727
from labelbox.schema.task import Task
2828
from labelbox.schema.user import User
29+
from labelbox.schema.task_queue import TaskQueue
2930

3031
if TYPE_CHECKING:
3132
from labelbox import BulkImportRequest
@@ -69,6 +70,7 @@ class Project(DbObject, Updateable, Deletable):
6970
webhooks (Relationship): `ToMany` relationship to Webhook
7071
benchmarks (Relationship): `ToMany` relationship to Benchmark
7172
ontology (Relationship): `ToOne` relationship to Ontology
73+
task_queues (Relationship): `ToMany` relationship to TaskQueue
7274
"""
7375

7476
name = Field.String("name")
@@ -806,54 +808,34 @@ def _create_batch_async(self,
806808

807809
task_id = res['taskId']
808810

809-
timeout_seconds = 600
810-
sleep_time = 2
811-
get_task_query_str = """query %s($taskId: ID!) {
812-
task(where: {id: $taskId}) {
813-
status
811+
task = self._wait_for_task(task_id)
812+
if task.status != "COMPLETE":
813+
raise LabelboxError(f"Batch was not created successfully: " +
814+
json.dumps(task.errors))
815+
816+
# obtain batch entity to return
817+
get_batch_str = """query %s($projectId: ID!, $batchId: ID!) {
818+
project(where: {id: $projectId}) {
819+
batches(where: {id: $batchId}) {
820+
nodes {
821+
%s
822+
}
823+
}
814824
}
815825
}
816-
""" % "getTaskPyApi"
826+
""" % ("getProjectBatchPyApi",
827+
query.results_query_part(Entity.Batch))
817828

818-
while True:
819-
task_status = self.client.execute(
820-
get_task_query_str, {'taskId': task_id},
821-
experimental=True)['task']['status']
822-
823-
if task_status == "COMPLETE":
824-
# obtain batch entity to return
825-
get_batch_str = """query %s($projectId: ID!, $batchId: ID!) {
826-
project(where: {id: $projectId}) {
827-
batches(where: {id: $batchId}) {
828-
nodes {
829-
%s
830-
}
831-
}
832-
}
833-
}
834-
""" % ("getProjectBatchPyApi",
835-
query.results_query_part(Entity.Batch))
836-
837-
batch = self.client.execute(
838-
get_batch_str, {
839-
"projectId": self.uid,
840-
"batchId": batch_id
841-
},
842-
timeout=180.0,
843-
experimental=True)["project"]["batches"]["nodes"][0]
844-
845-
# TODO async endpoints currently do not provide failed_data_row_ids in response
846-
return Entity.Batch(self.client, self.uid, batch)
847-
elif task_status == "IN_PROGRESS":
848-
timeout_seconds -= sleep_time
849-
if timeout_seconds <= 0:
850-
raise LabelboxError(
851-
f"Timed out while waiting for batch to be created.")
852-
logger.debug("Creating batch, waiting for server...", self.uid)
853-
time.sleep(sleep_time)
854-
continue
855-
else:
856-
raise LabelboxError(f"Batch was not created successfully.")
829+
batch = self.client.execute(
830+
get_batch_str, {
831+
"projectId": self.uid,
832+
"batchId": batch_id
833+
},
834+
timeout=180.0,
835+
experimental=True)["project"]["batches"]["nodes"][0]
836+
837+
# TODO async endpoints currently do not provide failed_data_row_ids in response
838+
return Entity.Batch(self.client, self.uid, batch)
857839

858840
def _update_queue_mode(self, mode: "QueueMode") -> "QueueMode":
859841
"""
@@ -1139,6 +1121,81 @@ def batches(self) -> PaginatedCollection:
11391121
cursor_path=['project', 'batches', 'pageInfo', 'endCursor'],
11401122
experimental=True)
11411123

1124+
def task_queues(self) -> List[TaskQueue]:
1125+
""" Fetch all task queues that belong to this project
1126+
1127+
Returns:
1128+
A `List of `TaskQueue`s
1129+
"""
1130+
query_str = """query GetProjectTaskQueuesPyApi($projectId: ID!) {
1131+
project(where: {id: $projectId}) {
1132+
taskQueues {
1133+
%s
1134+
}
1135+
}
1136+
}
1137+
""" % (query.results_query_part(Entity.TaskQueue))
1138+
1139+
task_queue_values = self.client.execute(
1140+
query_str, {"projectId": self.uid},
1141+
timeout=180.0,
1142+
experimental=True)["project"]["taskQueues"]
1143+
1144+
return [
1145+
Entity.TaskQueue(self.client, field_values)
1146+
for field_values in task_queue_values
1147+
]
1148+
1149+
def move_data_rows_to_task_queue(self, data_row_ids: List[str],
1150+
task_queue_id: str):
1151+
"""
1152+
1153+
Moves data rows to the specified task queue.
1154+
1155+
Args:
1156+
data_row_ids: a list of data row ids to be moved
1157+
task_queue_id: the task queue id to be moved to, or None to specify the "Done" queue
1158+
1159+
Returns:
1160+
None if successful, or a raised error on failure
1161+
1162+
"""
1163+
method = "createBulkAddRowsToQueueTask"
1164+
query_str = """mutation AddDataRowsToTaskQueueAsyncPyApi(
1165+
$projectId: ID!
1166+
$queueId: ID
1167+
$dataRowIds: [ID!]!
1168+
) {
1169+
project(where: { id: $projectId }) {
1170+
%s(
1171+
data: { queueId: $queueId, dataRowIds: $dataRowIds }
1172+
) {
1173+
taskId
1174+
}
1175+
}
1176+
}
1177+
""" % method
1178+
1179+
task_id = self.client.execute(
1180+
query_str, {
1181+
"projectId": self.uid,
1182+
"queueId": task_queue_id,
1183+
"dataRowIds": data_row_ids
1184+
},
1185+
timeout=180.0,
1186+
experimental=True)["project"][method]["taskId"]
1187+
1188+
task = self._wait_for_task(task_id)
1189+
if task.status != "COMPLETE":
1190+
raise LabelboxError(f"Data rows were not moved successfully: " +
1191+
json.dumps(task.errors))
1192+
1193+
def _wait_for_task(self, task_id: str) -> Task:
1194+
task = Task.get_task(self.client, task_id)
1195+
task.wait_till_done()
1196+
1197+
return task
1198+
11421199
def upload_annotations(
11431200
self,
11441201
name: str,

labelbox/schema/task.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import requests
34
import time
@@ -6,7 +7,7 @@
67

78
from labelbox.exceptions import ResourceNotFoundError
89
from labelbox.orm.db_object import DbObject
9-
from labelbox.orm.model import Field, Relationship
10+
from labelbox.orm.model import Field, Relationship, Entity
1011

1112
if TYPE_CHECKING:
1213
from labelbox import User
@@ -55,7 +56,7 @@ def refresh(self) -> None:
5556
for field in self.fields():
5657
setattr(self, field.name, getattr(tasks[0], field.name))
5758

58-
def wait_till_done(self, timeout_seconds=300) -> None:
59+
def wait_till_done(self, timeout_seconds: int = 300) -> None:
5960
""" Waits until the task is completed. Periodically queries the server
6061
to update the task attributes.
6162
@@ -83,9 +84,16 @@ def wait_till_done(self, timeout_seconds=300) -> None:
8384
def errors(self) -> Optional[Dict[str, Any]]:
8485
""" Fetch the error associated with an import task.
8586
"""
87+
if self.type == "add-data-rows-to-batch" or self.type == "send-to-task-queue":
88+
if self.status == "FAILED":
89+
# for these tasks, the error is embedded in the result itself
90+
return json.loads(self.result_url)
91+
return None
92+
8693
# TODO: We should handle error messages for export v2 tasks in the future.
8794
if self.name != 'JSON Import':
8895
return None
96+
8997
if self.status == "FAILED":
9098
result = self._fetch_remote_json()
9199
return result["error"]
@@ -153,3 +161,17 @@ def download_result():
153161
"Job status still in `IN_PROGRESS`. The result is not available. Call task.wait_till_done() with a larger timeout or contact support."
154162
)
155163
return download_result()
164+
165+
@staticmethod
166+
def get_task(client, task_id):
167+
user: User = client.get_user()
168+
tasks: List[Task] = list(
169+
user.created_tasks(where=Entity.Task.uid == task_id))
170+
# Cache user in a private variable as the relationship can't be
171+
# resolved due to server-side limitations (see Task.created_by)
172+
# for more info.
173+
if len(tasks) != 1:
174+
raise ResourceNotFoundError(Entity.Task, {task_id: task_id})
175+
task: Task = tasks[0]
176+
task._user = user
177+
return task

labelbox/schema/task_queue.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from labelbox.orm.db_object import DbObject
2+
from labelbox.orm.model import Field
3+
4+
5+
class TaskQueue(DbObject):
6+
"""
7+
a task queue
8+
9+
Attributes
10+
name
11+
description
12+
queue_type
13+
data_row_count
14+
15+
Relationships
16+
project
17+
organization
18+
pass_queue
19+
fail_queue
20+
"""
21+
22+
name = Field.String("name")
23+
description = Field.String("description")
24+
queue_type = Field.String("queue_type")
25+
data_row_count = Field.Int("data_row_count")
26+
27+
def __init__(self, client, *args, **kwargs):
28+
super().__init__(client, *args, **kwargs)

tests/integration/conftest.py

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -311,18 +311,39 @@ def configured_project_with_label(client, rand_gen, image_url, project, dataset,
311311
One label is already created and yielded when using fixture
312312
"""
313313
project.datasets.connect(dataset)
314-
editor = list(
315-
project.client.get_labeling_frontends(
316-
where=LabelingFrontend.name == "editor"))[0]
317314

318-
ontology_builder = OntologyBuilder(tools=[
319-
Tool(tool=Tool.Type.BBOX, name="test-bbox-class"),
320-
])
321-
project.setup(editor, ontology_builder.asdict())
322-
# TODO: ontology may not be synchronous after setup. remove sleep when api is more consistent
323-
time.sleep(2)
315+
ontology = _setup_ontology(project)
316+
label = _create_label(project, datarow, ontology, wait_for_label_processing)
317+
318+
yield [project, dataset, datarow, label]
319+
320+
for label in project.labels():
321+
label.delete()
322+
323+
324+
@pytest.fixture
325+
def configured_batch_project_with_label(client, rand_gen, image_url,
326+
batch_project, dataset, datarow,
327+
wait_for_label_processing):
328+
"""Project with a batch having one datarow
329+
Project contains an ontology with 1 bbox tool
330+
Additionally includes a create_label method for any needed extra labels
331+
One label is already created and yielded when using fixture
332+
"""
333+
data_rows = [dr.uid for dr in list(dataset.data_rows())]
334+
batch_project.create_batch("test-batch", data_rows)
335+
336+
ontology = _setup_ontology(batch_project)
337+
label = _create_label(batch_project, datarow, ontology,
338+
wait_for_label_processing)
339+
340+
yield [batch_project, dataset, datarow, label]
341+
342+
for label in batch_project.labels():
343+
label.delete()
344+
324345

325-
ontology = ontology_builder.from_project(project)
346+
def _create_label(project, datarow, ontology, wait_for_label_processing):
326347
predictions = [{
327348
"uuid": str(uuid.uuid4()),
328349
"schemaId": ontology.tools[0].feature_schema_id,
@@ -342,7 +363,8 @@ def create_label():
342363
Creates a LabelImport task which will create a label
343364
"""
344365
upload_task = LabelImport.create_from_objects(
345-
client, project.uid, f'label-import-{uuid.uuid4()}', predictions)
366+
project.client, project.uid, f'label-import-{uuid.uuid4()}',
367+
predictions)
346368
upload_task.wait_until_done(sleep_time_seconds=5)
347369
assert upload_task.state == AnnotationImportState.FINISHED, "Label Import did not finish"
348370
assert len(
@@ -352,11 +374,20 @@ def create_label():
352374
project.create_label = create_label
353375
project.create_label()
354376
label = wait_for_label_processing(project)[0]
377+
return label
355378

356-
yield [project, dataset, datarow, label]
357379

358-
for label in project.labels():
359-
label.delete()
380+
def _setup_ontology(project):
381+
editor = list(
382+
project.client.get_labeling_frontends(
383+
where=LabelingFrontend.name == "editor"))[0]
384+
ontology_builder = OntologyBuilder(tools=[
385+
Tool(tool=Tool.Type.BBOX, name="test-bbox-class"),
386+
])
387+
project.setup(editor, ontology_builder.asdict())
388+
# TODO: ontology may not be synchronous after setup. remove sleep when api is more consistent
389+
time.sleep(2)
390+
return ontology_builder.from_project(project)
360391

361392

362393
@pytest.fixture

0 commit comments

Comments
 (0)