1414from labelbox .orm .model import Entity , Field , Relationship
1515
1616logger = logging .getLogger (__name__ )
17+ import time
1718
1819
1920class Dataset (DbObject , Updateable , Deletable ):
@@ -69,13 +70,111 @@ def create_data_row(self, **kwargs):
6970 row_data = kwargs [DataRow .row_data .name ]
7071 if os .path .exists (row_data ):
7172 kwargs [DataRow .row_data .name ] = self .client .upload_file (row_data )
72-
7373 kwargs [DataRow .dataset .name ] = self
74-
7574 return self .client ._create (DataRow , kwargs )
7675
76+ def create_data_rows_sync (self , items ):
77+ """ Synchronously bulk upload data rows.
78+
79+ Use this instead of `Dataset.create_data_rows` for smaller batches of data rows that need to be uploaded quickly.
80+ Cannot use this for uploads containing more than 1000 data rows.
81+ Each data row is also limited to 5 attachments.
82+
83+ Args:
84+ items (iterable of (dict or str)):
85+ See the docstring for `Dataset._create_descriptor_file` for more information.
86+ Returns:
87+ None. If the function doesn't raise an exception then the import was successful.
88+
89+ Raises:
90+ InvalidQueryError: If the `items` parameter does not conform to
91+ the specification in Dataset._create_descriptor_file or if the server did not accept the
92+ DataRow creation request (unknown reason).
93+ InvalidAttributeError: If there are fields in `items` not valid for
94+ a DataRow.
95+ ValueError: When the upload parameters are invalid
96+ """
97+ max_data_rows_supported = 1000
98+ max_attachments_per_data_row = 5
99+ if len (items ) > max_data_rows_supported :
100+ raise ValueError (
101+ f"Dataset.create_data_rows_sync() supports a max of { max_data_rows_supported } data rows."
102+ " For larger imports use the async function Dataset.create_data_rows()"
103+ )
104+ descriptor_url = self ._create_descriptor_file (
105+ items , max_attachments_per_data_row = max_attachments_per_data_row )
106+ dataset_param = "datasetId"
107+ url_param = "jsonUrl"
108+ query_str = """mutation AppendRowsToDatasetSyncPyApi($%s: ID!, $%s: String!){
109+ appendRowsToDatasetSync(data:{datasetId: $%s, jsonFileUrl: $%s}
110+ ){dataset{id}}} """ % (dataset_param , url_param , dataset_param ,
111+ url_param )
112+ self .client .execute (query_str , {
113+ dataset_param : self .uid ,
114+ url_param : descriptor_url
115+ })
116+
77117 def create_data_rows (self , items ):
78- """ Creates multiple DataRow objects based on the given `items`.
118+ """ Asynchronously bulk upload data rows
119+
120+ Use this instead of `Dataset.create_data_rows_sync` uploads for batches that contain more than 100 data rows.
121+
122+ Args:
123+ items (iterable of (dict or str)): See the docstring for `Dataset._create_descriptor_file` for more information
124+
125+ Returns:
126+ Task representing the data import on the server side. The Task
127+ can be used for inspecting task progress and waiting until it's done.
128+
129+ Raises:
130+ InvalidQueryError: If the `items` parameter does not conform to
131+ the specification above or if the server did not accept the
132+ DataRow creation request (unknown reason).
133+ ResourceNotFoundError: If unable to retrieve the Task for the
134+ import process. This could imply that the import failed.
135+ InvalidAttributeError: If there are fields in `items` not valid for
136+ a DataRow.
137+ ValueError: When the upload parameters are invalid
138+ """
139+ descriptor_url = self ._create_descriptor_file (items )
140+ # Create data source
141+ dataset_param = "datasetId"
142+ url_param = "jsonUrl"
143+ query_str = """mutation AppendRowsToDatasetPyApi($%s: ID!, $%s: String!){
144+ appendRowsToDataset(data:{datasetId: $%s, jsonFileUrl: $%s}
145+ ){ taskId accepted errorMessage } } """ % (dataset_param , url_param ,
146+ dataset_param , url_param )
147+
148+ res = self .client .execute (query_str , {
149+ dataset_param : self .uid ,
150+ url_param : descriptor_url
151+ })
152+ res = res ["appendRowsToDataset" ]
153+ if not res ["accepted" ]:
154+ msg = res ['errorMessage' ]
155+ raise InvalidQueryError (
156+ f"Server did not accept DataRow creation request. { msg } " )
157+
158+ # Fetch and return the task.
159+ task_id = res ["taskId" ]
160+ user = self .client .get_user ()
161+ task = list (user .created_tasks (where = Entity .Task .uid == task_id ))
162+ # Cache user in a private variable as the relationship can't be
163+ # resolved due to server-side limitations (see Task.created_by)
164+ # for more info.
165+ if len (task ) != 1 :
166+ raise ResourceNotFoundError (Entity .Task , task_id )
167+ task = task [0 ]
168+ task ._user = user
169+ return task
170+
171+ def _create_descriptor_file (self , items , max_attachments_per_data_row = None ):
172+ """
173+ This function is shared by both `Dataset.create_data_rows` and `Dataset.create_data_rows_sync`
174+ to prepare the input file. The user defined input is validated, processed, and json stringified.
175+ Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed to
176+
177+
79178
80179 Each element in `items` can be either a `str` or a `dict`. If
81180 it is a `str`, then it is interpreted as a local file path. The file
@@ -102,19 +201,19 @@ def create_data_rows(self, items):
102201
103202 Args:
104203 items (iterable of (dict or str)): See above for details.
204+ max_attachments_per_data_row (Optional[int]): Param used during attachment validation to determine
205+ if the user has provided too many attachments.
105206
106207 Returns:
107- Task representing the data import on the server side. The Task
108- can be used for inspecting task progress and waiting until it's done.
208+ uri (string): A reference to the uploaded json data.
109209
110210 Raises:
111211 InvalidQueryError: If the `items` parameter does not conform to
112212 the specification above or if the server did not accept the
113213 DataRow creation request (unknown reason).
114- ResourceNotFoundError: If unable to retrieve the Task for the
115- import process. This could imply that the import failed.
116214 InvalidAttributeError: If there are fields in `items` not valid for
117215 a DataRow.
216+ ValueError: When the upload parameters are invalid
118217 """
119218 file_upload_thread_count = 20
120219 DataRow = Entity .DataRow
@@ -135,6 +234,12 @@ def validate_attachments(item):
135234 attachments = item .get ('attachments' )
136235 if attachments :
137236 if isinstance (attachments , list ):
237+ if max_attachments_per_data_row and len (
238+ attachments ) > max_attachments_per_data_row :
239+ raise ValueError (
240+ f"Max attachments number of supported attachments per data row is { max_attachments_per_data_row } ."
241+ f" Found { len (attachments )} . Condense multiple attachments into one with the HTML attachment type if necessary."
242+ )
138243 for attachment in attachments :
139244 AssetAttachment .validate_attachment_json (attachment )
140245 else :
@@ -198,40 +303,9 @@ def convert_item(item):
198303 with ThreadPoolExecutor (file_upload_thread_count ) as executor :
199304 futures = [executor .submit (convert_item , item ) for item in items ]
200305 items = [future .result () for future in as_completed (futures )]
201-
202306 # Prepare and upload the desciptor file
203307 data = json .dumps (items )
204- descriptor_url = self .client .upload_data (data )
205- # Create data source
206- dataset_param = "datasetId"
207- url_param = "jsonUrl"
208- query_str = """mutation AppendRowsToDatasetPyApi($%s: ID!, $%s: String!){
209- appendRowsToDataset(data:{datasetId: $%s, jsonFileUrl: $%s}
210- ){ taskId accepted errorMessage } } """ % (dataset_param , url_param ,
211- dataset_param , url_param )
212-
213- res = self .client .execute (query_str , {
214- dataset_param : self .uid ,
215- url_param : descriptor_url
216- })
217- res = res ["appendRowsToDataset" ]
218- if not res ["accepted" ]:
219- msg = res ['errorMessage' ]
220- raise InvalidQueryError (
221- f"Server did not accept DataRow creation request. { msg } " )
222-
223- # Fetch and return the task.
224- task_id = res ["taskId" ]
225- user = self .client .get_user ()
226- task = list (user .created_tasks (where = Entity .Task .uid == task_id ))
227- # Cache user in a private variable as the relationship can't be
228- # resolved due to server-side limitations (see Task.created_by)
229- # for more info.
230- if len (task ) != 1 :
231- raise ResourceNotFoundError (Entity .Task , task_id )
232- task = task [0 ]
233- task ._user = user
234- return task
308+ return self .client .upload_data (data )
235309
236310 def data_rows_for_external_id (self , external_id , limit = 10 ):
237311 """ Convenience method for getting a single `DataRow` belonging to this
0 commit comments