Skip to content

Commit fd196bd

Browse files
authored
[SDK-225] Add new classes/interfaces to support streaming export task results (#1279)
1 parent a0eaa08 commit fd196bd

File tree

8 files changed

+818
-109
lines changed

8 files changed

+818
-109
lines changed

labelbox/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from labelbox.schema.user import User
1717
from labelbox.schema.organization import Organization
1818
from labelbox.schema.task import Task
19+
from labelbox.schema.export_task import StreamType, ExportTask, JsonConverter, JsonConverterOutput, FileConverter, FileConverterOutput
1920
from labelbox.schema.labeling_frontend import LabelingFrontend, LabelingFrontendOptions
2021
from labelbox.schema.asset_attachment import AssetAttachment
2122
from labelbox.schema.webhook import Webhook

labelbox/schema/data_row.py

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
from labelbox.exceptions import ResourceNotFoundError
55

66
from labelbox.orm import query
7-
from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable
7+
from labelbox.orm.db_object import DbObject, Updateable, BulkDeletable, experimental
88
from labelbox.orm.model import Entity, Field, Relationship
99
from labelbox.schema.data_row_metadata import DataRowMetadataField # type: ignore
1010
from labelbox.schema.export_filters import DatarowExportFilters, build_filters, validate_at_least_one_of_data_row_ids_or_global_keys
1111
from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params
12+
from labelbox.schema.export_task import ExportTask
1213
from labelbox.schema.task import Task
1314
from labelbox.schema.user import User # type: ignore
1415

@@ -156,12 +157,54 @@ def create_attachment(self,
156157
return Entity.AssetAttachment(self.client,
157158
res["createDataRowAttachment"])
158159

160+
@experimental
159161
@staticmethod
160-
def export_v2(client: 'Client',
161-
data_rows: List[Union[str, 'DataRow']] = None,
162-
global_keys: List[str] = None,
163-
task_name: Optional[str] = None,
164-
params: Optional[CatalogExportParams] = None) -> Task:
162+
def export(
163+
client: "Client",
164+
data_rows: Optional[List[Union[str, "DataRow"]]] = None,
165+
global_keys: Optional[List[str]] = None,
166+
task_name: Optional[str] = None,
167+
params: Optional[CatalogExportParams] = None,
168+
) -> ExportTask:
169+
"""
170+
Creates a data rows export task with the given list, params and returns the task.
171+
Args:
172+
client (Client): client to use to make the export request
173+
data_rows (list of DataRow or str): list of data row objects or data row ids to export
174+
task_name (str): name of remote task
175+
params (CatalogExportParams): export params
176+
177+
>>> dataset = client.get_dataset(DATASET_ID)
178+
>>> task = DataRow.export(
179+
>>> data_rows=[data_row.uid for data_row in dataset.data_rows.list()],
180+
>>> # or a list of DataRow objects: data_rows = data_set.data_rows.list()
181+
>>> # or a list of global_keys=["global_key_1", "global_key_2"],
182+
>>> # Note that exactly one of: data_rows or global_keys parameters can be passed in at a time
183+
>>> # and if data rows ids is present, global keys will be ignored
184+
>>> params={
185+
>>> "performance_details": False,
186+
>>> "label_details": True
187+
>>> })
188+
>>> task.wait_till_done()
189+
>>> task.result
190+
"""
191+
task = DataRow.export_v2(client,
192+
data_rows,
193+
global_keys,
194+
task_name,
195+
params,
196+
streamable=True)
197+
return ExportTask(task)
198+
199+
@staticmethod
200+
def export_v2(
201+
client: "Client",
202+
data_rows: Optional[List[Union[str, "DataRow"]]] = None,
203+
global_keys: Optional[List[str]] = None,
204+
task_name: Optional[str] = None,
205+
params: Optional[CatalogExportParams] = None,
206+
streamable: bool = False,
207+
) -> Task:
165208
"""
166209
Creates a data rows export task with the given list, params and returns the task.
167210
Args:
@@ -202,9 +245,10 @@ def export_v2(client: 'Client',
202245
validate_catalog_export_params(_params)
203246

204247
mutation_name = "exportDataRowsInCatalog"
205-
create_task_query_str = """mutation exportDataRowsInCatalogPyApi($input: ExportDataRowsInCatalogInput!){
206-
%s(input: $input) {taskId} }
207-
""" % (mutation_name)
248+
create_task_query_str = (
249+
f"mutation {mutation_name}PyApi"
250+
f"($input: ExportDataRowsInCatalogInput!)"
251+
f"{{{mutation_name}(input: $input){{taskId}}}}")
208252

209253
data_row_ids = []
210254
if data_rows is not None:
@@ -227,7 +271,7 @@ def export_v2(client: 'Client',
227271
media_type_override = _params.get('media_type_override', None)
228272

229273
if task_name is None:
230-
task_name = f"Export v2: data rows (%s)" % len(data_row_ids)
274+
task_name = f"Export v2: data rows {len(data_row_ids)}"
231275
query_params = {
232276
"input": {
233277
"taskName": task_name,
@@ -260,6 +304,7 @@ def export_v2(client: 'Client',
260304
"modelRunIds":
261305
_params.get('model_run_ids', None),
262306
},
307+
"streamable": streamable
263308
}
264309
}
265310

@@ -269,14 +314,4 @@ def export_v2(client: 'Client',
269314
print(res)
270315
res = res[mutation_name]
271316
task_id = res["taskId"]
272-
user: User = client.get_user()
273-
tasks: List[Task] = list(
274-
user.created_tasks(where=Entity.Task.uid == task_id))
275-
# Cache user in a private variable as the relationship can't be
276-
# resolved due to server-side limitations (see Task.created_by)
277-
# for more info.
278-
if len(tasks) != 1:
279-
raise ResourceNotFoundError(Entity.Task, task_id)
280-
task: Task = tasks[0]
281-
task._user = user
282-
return task
317+
return Task.get_task(client, task_id)

labelbox/schema/dataset.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717
from labelbox import pagination
1818
from labelbox.exceptions import InvalidQueryError, LabelboxError, ResourceNotFoundError, InvalidAttributeError
1919
from labelbox.orm.comparison import Comparison
20-
from labelbox.orm.db_object import DbObject, Updateable, Deletable
20+
from labelbox.orm.db_object import DbObject, Updateable, Deletable, experimental
2121
from labelbox.orm.model import Entity, Field, Relationship
2222
from labelbox.orm import query
2323
from labelbox.exceptions import MalformedQueryException
2424
from labelbox.pagination import PaginatedCollection
2525
from labelbox.schema.data_row import DataRow
2626
from labelbox.schema.export_filters import DatasetExportFilters, build_filters
2727
from labelbox.schema.export_params import CatalogExportParams, validate_catalog_export_params
28+
from labelbox.schema.export_task import ExportTask
2829
from labelbox.schema.task import Task
2930
from labelbox.schema.user import User
3031

@@ -601,10 +602,40 @@ def export_data_rows(self,
601602
self.uid)
602603
time.sleep(sleep_time)
603604

604-
def export_v2(self,
605-
task_name: Optional[str] = None,
606-
filters: Optional[DatasetExportFilters] = None,
607-
params: Optional[CatalogExportParams] = None) -> Task:
605+
@experimental
606+
def export(
607+
self,
608+
task_name: Optional[str] = None,
609+
filters: Optional[DatasetExportFilters] = None,
610+
params: Optional[CatalogExportParams] = None,
611+
) -> ExportTask:
612+
"""
613+
Creates a dataset export task with the given params and returns the task.
614+
615+
>>> dataset = client.get_dataset(DATASET_ID)
616+
>>> task = dataset.export(
617+
>>> filters={
618+
>>> "last_activity_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
619+
>>> "label_created_at": ["2000-01-01 00:00:00", "2050-01-01 00:00:00"],
620+
>>> "data_row_ids": [DATA_ROW_ID_1, DATA_ROW_ID_2, ...] # or global_keys: [DATA_ROW_GLOBAL_KEY_1, DATA_ROW_GLOBAL_KEY_2, ...]
621+
>>> },
622+
>>> params={
623+
>>> "performance_details": False,
624+
>>> "label_details": True
625+
>>> })
626+
>>> task.wait_till_done()
627+
>>> task.result
628+
"""
629+
task = self.export_v2(task_name, filters, params, streamable=True)
630+
return ExportTask(task)
631+
632+
def export_v2(
633+
self,
634+
task_name: Optional[str] = None,
635+
filters: Optional[DatasetExportFilters] = None,
636+
params: Optional[CatalogExportParams] = None,
637+
streamable: bool = False,
638+
) -> Task:
608639
"""
609640
Creates a dataset export task with the given params and returns the task.
610641
@@ -645,10 +676,10 @@ def export_v2(self,
645676
})
646677

647678
mutation_name = "exportDataRowsInCatalog"
648-
create_task_query_str = """mutation exportDataRowsInCatalogPyApi($input: ExportDataRowsInCatalogInput!){
649-
%s(input: $input) {taskId} }
650-
""" % (mutation_name)
651-
679+
create_task_query_str = (
680+
f"mutation {mutation_name}PyApi"
681+
f"($input: ExportDataRowsInCatalogInput!)"
682+
f"{{{mutation_name}(input: $input){{taskId}}}}")
652683
media_type_override = _params.get('media_type_override', None)
653684

654685
if task_name is None:
@@ -685,6 +716,7 @@ def export_v2(self,
685716
"modelRunIds":
686717
_params.get('model_run_ids', None),
687718
},
719+
"streamable": streamable,
688720
}
689721
}
690722

@@ -702,14 +734,4 @@ def export_v2(self,
702734
error_log_key="errors")
703735
res = res[mutation_name]
704736
task_id = res["taskId"]
705-
user: User = self.client.get_user()
706-
tasks: List[Task] = list(
707-
user.created_tasks(where=Entity.Task.uid == task_id))
708-
# Cache user in a private variable as the relationship can't be
709-
# resolved due to server-side limitations (see Task.created_by)
710-
# for more info.
711-
if len(tasks) != 1:
712-
raise ResourceNotFoundError(Entity.Task, task_id)
713-
task: Task = tasks[0]
714-
task._user = user
715-
return task
737+
return Task.get_task(self.client, task_id)

0 commit comments

Comments
 (0)