Skip to content

Commit d59a616

Browse files
authored
[PLT-X] Make export_v2 methods use streamable backend (#1515)
1 parent 729b667 commit d59a616

File tree

7 files changed

+150
-49
lines changed

7 files changed

+150
-49
lines changed

labelbox/schema/catalog.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Dict, List, Optional, Union
1+
from typing import Any, Dict, List, Optional, Tuple, Union
22
from labelbox.orm.db_object import experimental
33
from labelbox.schema.export_filters import CatalogExportFilters, build_filters
44

@@ -23,7 +23,7 @@ def export_v2(
2323
task_name: Optional[str] = None,
2424
filters: Union[CatalogExportFilters, Dict[str, List[str]], None] = None,
2525
params: Optional[CatalogExportParams] = None,
26-
) -> Task:
26+
) -> Union[Task, ExportTask]:
2727
"""
2828
Creates a catalog export task with the given params, filters and returns the task.
2929
@@ -42,7 +42,10 @@ def export_v2(
4242
>>> task.wait_till_done()
4343
>>> task.result
4444
"""
45-
return self._export(task_name, filters, params, False)
45+
task, is_streamable = self._export(task_name, filters, params)
46+
if (is_streamable):
47+
return ExportTask(task, True)
48+
return task
4649

4750
@experimental
4851
def export(
@@ -83,15 +86,15 @@ def export(
8386
>>> stream_type=lb.StreamType.RESULT
8487
>>> ).start(stream_handler=json_stream_handler)
8588
"""
86-
task = self._export(task_name, filters, params, True)
89+
task, _ = self._export(task_name, filters, params, streamable=True)
8790
return ExportTask(task)
8891

8992
def _export(self,
9093
task_name: Optional[str] = None,
9194
filters: Union[CatalogExportFilters, Dict[str, List[str]],
9295
None] = None,
9396
params: Optional[CatalogExportParams] = None,
94-
streamable: bool = False) -> Task:
97+
streamable: bool = False) -> Tuple[Task, bool]:
9598

9699
_params = params or CatalogExportParams({
97100
"attachments": False,
@@ -120,7 +123,7 @@ def _export(self,
120123
create_task_query_str = (
121124
f"mutation {mutation_name}PyApi"
122125
f"($input: ExportDataRowsInCatalogInput!)"
123-
f"{{{mutation_name}(input: $input){{taskId}}}}")
126+
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")
124127

125128
media_type_override = _params.get('media_type_override', None)
126129
query_params: Dict[str, Any] = {
@@ -132,6 +135,7 @@ def _export(self,
132135
"query": None,
133136
}
134137
},
138+
"isStreamableReady": True,
135139
"params": {
136140
"mediaTypeOverride":
137141
media_type_override.value
@@ -171,4 +175,5 @@ def _export(self,
171175
error_log_key="errors")
172176
res = res[mutation_name]
173177
task_id = res["taskId"]
174-
return Task.get_task(self.client, task_id)
178+
is_streamable = res["isStreamable"]
179+
return Task.get_task(self.client, task_id), is_streamable

labelbox/schema/data_row.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from enum import Enum
3-
from typing import TYPE_CHECKING, List, Optional, Union, Any
3+
from typing import TYPE_CHECKING, List, Optional, Tuple, Union, Any
44
import json
55

66
from labelbox.orm import query
@@ -210,12 +210,12 @@ def export(
210210
>>> task.wait_till_done()
211211
>>> task.result
212212
"""
213-
task = DataRow._export(client,
214-
data_rows,
215-
global_keys,
216-
task_name,
217-
params,
218-
streamable=True)
213+
task, _ = DataRow._export(client,
214+
data_rows,
215+
global_keys,
216+
task_name,
217+
params,
218+
streamable=True)
219219
return ExportTask(task)
220220

221221
@staticmethod
@@ -225,7 +225,7 @@ def export_v2(
225225
global_keys: Optional[List[str]] = None,
226226
task_name: Optional[str] = None,
227227
params: Optional[CatalogExportParams] = None,
228-
) -> Task:
228+
) -> Union[Task, ExportTask]:
229229
"""
230230
Creates a data rows export task with the given list, params and returns the task.
231231
Args:
@@ -249,8 +249,11 @@ def export_v2(
249249
>>> task.wait_till_done()
250250
>>> task.result
251251
"""
252-
return DataRow._export(client, data_rows, global_keys, task_name,
253-
params)
252+
task, is_streamable = DataRow._export(client, data_rows, global_keys,
253+
task_name, params)
254+
if is_streamable:
255+
return ExportTask(task, True)
256+
return task
254257

255258
@staticmethod
256259
def _export(
@@ -260,7 +263,7 @@ def _export(
260263
task_name: Optional[str] = None,
261264
params: Optional[CatalogExportParams] = None,
262265
streamable: bool = False,
263-
) -> Task:
266+
) -> Tuple[Task, bool]:
264267
_params = params or CatalogExportParams({
265268
"attachments": False,
266269
"metadata_fields": False,
@@ -282,7 +285,7 @@ def _export(
282285
create_task_query_str = (
283286
f"mutation {mutation_name}PyApi"
284287
f"($input: ExportDataRowsInCatalogInput!)"
285-
f"{{{mutation_name}(input: $input){{taskId}}}}")
288+
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")
286289

287290
data_row_ids = []
288291
if data_rows is not None:
@@ -315,6 +318,7 @@ def _export(
315318
"query": search_query
316319
}
317320
},
321+
"isStreamableReady": True,
318322
"params": {
319323
"mediaTypeOverride":
320324
media_type_override.value
@@ -352,4 +356,5 @@ def _export(
352356
print(res)
353357
res = res[mutation_name]
354358
task_id = res["taskId"]
355-
return Task.get_task(client, task_id)
359+
is_streamable = res["isStreamable"]
360+
return Task.get_task(client, task_id), is_streamable

labelbox/schema/dataset.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import Dict, Generator, List, Optional, Any, Final
2+
from typing import Dict, Generator, List, Optional, Any, Final, Tuple, Union
33
import os
44
import json
55
import logging
@@ -669,15 +669,15 @@ def export(
669669
>>> task.wait_till_done()
670670
>>> task.result
671671
"""
672-
task = self._export(task_name, filters, params, streamable=True)
672+
task, _ = self._export(task_name, filters, params, streamable=True)
673673
return ExportTask(task)
674674

675675
def export_v2(
676676
self,
677677
task_name: Optional[str] = None,
678678
filters: Optional[DatasetExportFilters] = None,
679679
params: Optional[CatalogExportParams] = None,
680-
) -> Task:
680+
) -> Union[Task, ExportTask]:
681681
"""
682682
Creates a dataset export task with the given params and returns the task.
683683
@@ -695,15 +695,18 @@ def export_v2(
695695
>>> task.wait_till_done()
696696
>>> task.result
697697
"""
698-
return self._export(task_name, filters, params)
698+
task, is_streamable = self._export(task_name, filters, params)
699+
if (is_streamable):
700+
return ExportTask(task, True)
701+
return task
699702

700703
def _export(
701704
self,
702705
task_name: Optional[str] = None,
703706
filters: Optional[DatasetExportFilters] = None,
704707
params: Optional[CatalogExportParams] = None,
705708
streamable: bool = False,
706-
) -> Task:
709+
) -> Tuple[Task, bool]:
707710
_params = params or CatalogExportParams({
708711
"attachments": False,
709712
"metadata_fields": False,
@@ -731,7 +734,7 @@ def _export(
731734
create_task_query_str = (
732735
f"mutation {mutation_name}PyApi"
733736
f"($input: ExportDataRowsInCatalogInput!)"
734-
f"{{{mutation_name}(input: $input){{taskId}}}}")
737+
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")
735738
media_type_override = _params.get('media_type_override', None)
736739

737740
if task_name is None:
@@ -745,6 +748,7 @@ def _export(
745748
"query": None,
746749
}
747750
},
751+
"isStreamableReady": True,
748752
"params": {
749753
"mediaTypeOverride":
750754
media_type_override.value
@@ -790,7 +794,8 @@ def _export(
790794
error_log_key="errors")
791795
res = res[mutation_name]
792796
task_id = res["taskId"]
793-
return Task.get_task(self.client, task_id)
797+
is_streamable = res["isStreamable"]
798+
return Task.get_task(self.client, task_id), is_streamable
794799

795800
def upsert_data_rows(self, items, file_upload_thread_count=20) -> "Task":
796801
"""
@@ -887,4 +892,4 @@ def _convert_items_to_upsert_format(self, _items):
887892
k: v for k, v in item.items() if v is not None
888893
} # remove None values
889894
_upsert_items.append(DataRowUpsertItem(payload=item, id=key))
890-
return _upsert_items
895+
return _upsert_items

labelbox/schema/export_task.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,8 @@ class ExportTask:
467467
class ExportTaskException(Exception):
468468
"""Raised when the task is not ready yet."""
469469

470-
def __init__(self, task: Task) -> None:
470+
def __init__(self, task: Task, is_export_v2: bool = False) -> None:
471+
self._is_export_v2 = is_export_v2
471472
self._task = task
472473

473474
def __repr__(self):
@@ -530,9 +531,79 @@ def metadata(self):
530531
"""Returns the metadata of the task."""
531532
return self._task.metadata
532533

534+
@property
535+
def result_url(self):
536+
"""Returns the result URL of the task."""
537+
if not self._is_export_v2:
538+
raise ExportTask.ExportTaskException(
539+
"This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead"
540+
)
541+
base_url = self._task.client.rest_endpoint
542+
return base_url + '/export-results/' + self._task.uid + '/' + self._task.client.get_organization(
543+
).uid
544+
545+
@property
546+
def errors_url(self):
547+
"""Returns the errors URL of the task."""
548+
if not self._is_export_v2:
549+
raise ExportTask.ExportTaskException(
550+
"This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead"
551+
)
552+
base_url = self._task.client.rest_endpoint
553+
return base_url + '/export-errors/' + self._task.uid + '/' + self._task.client.get_organization(
554+
).uid
555+
556+
@property
557+
def errors(self):
558+
"""Returns the errors of the task."""
559+
if not self._is_export_v2:
560+
raise ExportTask.ExportTaskException(
561+
"This property is only available for export_v2 tasks due to compatibility reasons, please use streamable errors instead"
562+
)
563+
if self.status == "FAILED":
564+
raise ExportTask.ExportTaskException("Task failed")
565+
if self.status != "COMPLETE":
566+
raise ExportTask.ExportTaskException("Task is not ready yet")
567+
568+
if not self.has_errors():
569+
return None
570+
571+
data = []
572+
573+
metadata_header = ExportTask._get_metadata_header(
574+
self._task.client, self._task.uid, StreamType.ERRORS)
575+
if metadata_header is None:
576+
return None
577+
Stream(
578+
_TaskContext(self._task.client, self._task.uid, StreamType.ERRORS,
579+
metadata_header),
580+
_MultiGCSFileReader(),
581+
JsonConverter(),
582+
).start(stream_handler=lambda output: data.append(output.json_str))
583+
return data
584+
533585
@property
534586
def result(self):
535587
"""Returns the result of the task."""
588+
if self._is_export_v2:
589+
if self.status == "FAILED":
590+
raise ExportTask.ExportTaskException("Task failed")
591+
if self.status != "COMPLETE":
592+
raise ExportTask.ExportTaskException("Task is not ready yet")
593+
data = []
594+
595+
metadata_header = ExportTask._get_metadata_header(
596+
self._task.client, self._task.uid, StreamType.RESULT)
597+
if metadata_header is None:
598+
return []
599+
Stream(
600+
_TaskContext(self._task.client, self._task.uid,
601+
StreamType.RESULT, metadata_header),
602+
_MultiGCSFileReader(),
603+
JsonConverter(),
604+
).start(stream_handler=lambda output: data.append(
605+
json.loads(output.json_str)))
606+
return data
536607
return self._task.result_url
537608

538609
@property

labelbox/schema/model_run.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import warnings
66
from enum import Enum
77
from pathlib import Path
8-
from typing import TYPE_CHECKING, Dict, Iterable, Union, List, Optional, Any
8+
from typing import TYPE_CHECKING, Dict, Iterable, Union, Tuple, List, Optional, Any
99

1010
import requests
1111

@@ -521,33 +521,36 @@ def export(self,
521521
>>> export_task = export("my_export_task", params={"media_attributes": True})
522522
523523
"""
524-
task = self._export(task_name, params, streamable=True)
524+
task, _ = self._export(task_name, params, streamable=True)
525525
return ExportTask(task)
526526

527527
def export_v2(
528528
self,
529529
task_name: Optional[str] = None,
530530
params: Optional[ModelRunExportParams] = None,
531-
) -> Task:
531+
) -> Union[Task, ExportTask]:
532532
"""
533533
Creates a model run export task with the given params and returns the task.
534534
535535
>>> export_task = export_v2("my_export_task", params={"media_attributes": True})
536536
537537
"""
538-
return self._export(task_name, params)
538+
task, is_streamable = self._export(task_name, params)
539+
if (is_streamable):
540+
return ExportTask(task, True)
541+
return task
539542

540543
def _export(
541544
self,
542545
task_name: Optional[str] = None,
543546
params: Optional[ModelRunExportParams] = None,
544547
streamable: bool = False,
545-
) -> Task:
548+
) -> Tuple[Task, bool]:
546549
mutation_name = "exportDataRowsInModelRun"
547550
create_task_query_str = (
548551
f"mutation {mutation_name}PyApi"
549552
f"($input: ExportDataRowsInModelRunInput!)"
550-
f"{{{mutation_name}(input: $input){{taskId}}}}")
553+
f"{{{mutation_name}(input: $input){{taskId isStreamable}}}}")
551554

552555
_params = params or ModelRunExportParams()
553556

@@ -557,6 +560,7 @@ def _export(
557560
"filters": {
558561
"modelRunId": self.uid
559562
},
563+
"isStreamableReady": True,
560564
"params": {
561565
"mediaTypeOverride":
562566
_params.get('media_type_override', None),
@@ -579,7 +583,8 @@ def _export(
579583
error_log_key="errors")
580584
res = res[mutation_name]
581585
task_id = res["taskId"]
582-
return Task.get_task(self.client, task_id)
586+
is_streamable = res["isStreamable"]
587+
return Task.get_task(self.client, task_id), is_streamable
583588

584589
def send_to_annotate_from_model(
585590
self, destination_project_id: str, task_queue_id: Optional[str],

0 commit comments

Comments
 (0)