22import os
33from concurrent .futures import ThreadPoolExecutor , as_completed
44
5- from typing import Iterable , List , Union
5+ from typing import Iterable , List
66
77from labelbox .exceptions import InvalidQueryError
88from labelbox .exceptions import InvalidAttributeError
99from labelbox .exceptions import MalformedQueryException
1010from labelbox .orm .model import Entity
1111from labelbox .orm .model import Field
1212from labelbox .schema .embedding import EmbeddingVector
13- from labelbox .schema .internal .data_row_create_upsert import DataRowItemBase
14- from labelbox .schema .internal .datarow_upload_constants import MAX_DATAROW_PER_API_OPERATION
13+ from labelbox .pydantic_compat import BaseModel
14+ from labelbox .schema .internal .datarow_upload_constants import (
15+ MAX_DATAROW_PER_API_OPERATION , FILE_UPLOAD_THREAD_COUNT )
16+ from labelbox .schema .internal .data_row_upsert_item import DataRowUpsertItem
1517
1618
17- class UploadManifest :
18-
19- def __init__ (self , source : str , item_count : int , chunk_uris : List [str ]):
20- self .source = source
21- self .item_count = item_count
22- self .chunk_uris = chunk_uris
23-
24- def to_dict (self ):
25- return {
26- "source" : self .source ,
27- "item_count" : self .item_count ,
28- "chunk_uris" : self .chunk_uris
29- }
19+ class UploadManifest (BaseModel ):
20+ source : str
21+ item_count : int
22+ chunk_uris : List [str ]
3023
3124
3225class DataRowUploader :
@@ -39,15 +32,15 @@ def create_descriptor_file(client,
3932 """
4033 This function is shared by `Dataset.create_data_rows`, `Dataset.create_data_rows_sync` and `Dataset.update_data_rows`.
4134 It is used to prepare the input file. The user defined input is validated, processed, and json stringified.
42- Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed to
35+ Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed as a parameter to a mutation that uploads data rows
4336
4437 Each element in `items` can be either a `str` or a `dict`. If
4538 it is a `str`, then it is interpreted as a local file path. The file
4639 is uploaded to Labelbox and a DataRow referencing it is created.
4740
4841 If an item is a `dict`, then it could support one of the two following structures
4942 1. For static imagery, video, and text it should map `DataRow` field names to values.
50- At the minimum an `item ` passed as a `dict` must contain a `row_data` key and value.
43+ At the minimum an `items ` passed as a `dict` must contain a `row_data` key and value.
5144 If the value for row_data is a local file path and the path exists,
5245 then the local file will be uploaded to labelbox.
5346
@@ -81,7 +74,7 @@ def create_descriptor_file(client,
8174 a DataRow.
8275 ValueError: When the upload parameters are invalid
8376 """
84- file_upload_thread_count = 20
77+ file_upload_thread_count = FILE_UPLOAD_THREAD_COUNT
8578 DataRow = Entity .DataRow
8679 AssetAttachment = Entity .AssetAttachment
8780
@@ -192,7 +185,7 @@ def validate_keys(item):
192185 raise InvalidAttributeError (DataRow , invalid_keys )
193186 return item
194187
195- def formatLegacyConversationalData (item ):
188+ def format_legacy_conversational_data (item ):
196189 messages = item .pop ("conversationalData" )
197190 version = item .pop ("version" , 1 )
198191 type = item .pop ("type" , "application/vnd.labelbox.conversational" )
@@ -213,7 +206,7 @@ def formatLegacyConversationalData(item):
213206 return item
214207
215208 def convert_item (data_row_item ):
216- if isinstance (data_row_item , DataRowItemBase ):
209+ if isinstance (data_row_item , DataRowUpsertItem ):
217210 item = data_row_item .payload
218211 else :
219212 item = data_row_item
@@ -223,7 +216,7 @@ def convert_item(data_row_item):
223216 return item
224217
225218 if "conversationalData" in item :
226- formatLegacyConversationalData (item )
219+ format_legacy_conversational_data (item )
227220
228221 # Convert all payload variations into the same dict format
229222 item = format_row (item )
@@ -238,7 +231,7 @@ def convert_item(data_row_item):
238231 # Upload any local file paths
239232 item = upload_if_necessary (item )
240233
241- if isinstance (data_row_item , DataRowItemBase ):
234+ if isinstance (data_row_item , DataRowUpsertItem ):
242235 return {'id' : data_row_item .id , 'payload' : item }
243236 else :
244237 return item
@@ -263,7 +256,7 @@ def convert_item(data_row_item):
263256 filename = "json_import.json" )
264257
265258 @staticmethod
266- def upload_in_chunks (client , specs : List [DataRowItemBase ],
259+ def upload_in_chunks (client , specs : List [DataRowUpsertItem ],
267260 file_upload_thread_count : int ,
268261 upsert_chunk_size : int ) -> UploadManifest :
269262 empty_specs = list (filter (lambda spec : spec .is_empty (), specs ))
@@ -278,9 +271,9 @@ def upload_in_chunks(client, specs: List[DataRowItemBase],
278271 for i in range (0 , len (specs ), upsert_chunk_size )
279272 ]
280273
281- def _upload_chunk (_chunk ):
274+ def _upload_chunk (chunk ):
282275 return DataRowUploader .create_descriptor_file (client ,
283- _chunk ,
276+ chunk ,
284277 is_upsert = True )
285278
286279 with ThreadPoolExecutor (file_upload_thread_count ) as executor :
0 commit comments