@@ -69,13 +69,111 @@ def create_data_row(self, **kwargs):
6969 row_data = kwargs [DataRow .row_data .name ]
7070 if os .path .exists (row_data ):
7171 kwargs [DataRow .row_data .name ] = self .client .upload_file (row_data )
72-
7372 kwargs [DataRow .dataset .name ] = self
74-
7573 return self .client ._create (DataRow , kwargs )
7674
75+ def create_data_rows_sync (self , items ):
76+ """ Synchronously bulk upload data rows.
77+
78+ Use this instead of `Dataset.create_data_rows` for smaller batches of data rows that need to be uploaded quickly.
79+ Cannot use this for uploads containing more than 1000 data rows.
80+ Each data row is also limited to 5 attachments.
81+
82+ Args:
83+ items (iterable of (dict or str)):
84+ See the docstring for `Dataset._create_descriptor_file` for more information.
85+ Returns:
86+ None. If the function doesn't raise an exception then the import was successful.
87+
88+ Raises:
89+ InvalidQueryError: If the `items` parameter does not conform to
90+ the specification in Dataset._create_descriptor_file or if the server did not accept the
91+ DataRow creation request (unknown reason).
92+ InvalidAttributeError: If there are fields in `items` not valid for
93+ a DataRow.
94+ ValueError: When the upload parameters are invalid
95+ """
96+ max_data_rows_supported = 1000
97+ max_attachments_per_data_row = 5
98+ if len (items ) > max_data_rows_supported :
99+ raise ValueError (
100+ f"Dataset.create_data_rows_sync() supports a max of { max_data_rows_supported } data rows."
101+ " For larger imports use the async function Dataset.create_data_rows()"
102+ )
103+ descriptor_url = self ._create_descriptor_file (
104+ items , max_attachments_per_data_row = max_attachments_per_data_row )
105+ dataset_param = "datasetId"
106+ url_param = "jsonUrl"
107+ query_str = """mutation AppendRowsToDatasetSyncPyApi($%s: ID!, $%s: String!){
108+ appendRowsToDatasetSync(data:{datasetId: $%s, jsonFileUrl: $%s}
109+ ){dataset{id}}} """ % (dataset_param , url_param , dataset_param ,
110+ url_param )
111+ self .client .execute (query_str , {
112+ dataset_param : self .uid ,
113+ url_param : descriptor_url
114+ })
115+
77116 def create_data_rows (self , items ):
78- """ Creates multiple DataRow objects based on the given `items`.
117+ """ Asynchronously bulk upload data rows
118+
119+ Use this instead of `Dataset.create_data_rows_sync` uploads for batches that contain more than 100 data rows.
120+
121+ Args:
122+ items (iterable of (dict or str)): See the docstring for `Dataset._create_descriptor_file` for more information
123+
124+ Returns:
125+ Task representing the data import on the server side. The Task
126+ can be used for inspecting task progress and waiting until it's done.
127+
128+ Raises:
129+ InvalidQueryError: If the `items` parameter does not conform to
130+ the specification above or if the server did not accept the
131+ DataRow creation request (unknown reason).
132+ ResourceNotFoundError: If unable to retrieve the Task for the
133+ import process. This could imply that the import failed.
134+ InvalidAttributeError: If there are fields in `items` not valid for
135+ a DataRow.
136+ ValueError: When the upload parameters are invalid
137+ """
138+ descriptor_url = self ._create_descriptor_file (items )
139+ # Create data source
140+ dataset_param = "datasetId"
141+ url_param = "jsonUrl"
142+ query_str = """mutation AppendRowsToDatasetPyApi($%s: ID!, $%s: String!){
143+ appendRowsToDataset(data:{datasetId: $%s, jsonFileUrl: $%s}
144+ ){ taskId accepted errorMessage } } """ % (dataset_param , url_param ,
145+ dataset_param , url_param )
146+
147+ res = self .client .execute (query_str , {
148+ dataset_param : self .uid ,
149+ url_param : descriptor_url
150+ })
151+ res = res ["appendRowsToDataset" ]
152+ if not res ["accepted" ]:
153+ msg = res ['errorMessage' ]
154+ raise InvalidQueryError (
155+ f"Server did not accept DataRow creation request. { msg } " )
156+
157+ # Fetch and return the task.
158+ task_id = res ["taskId" ]
159+ user = self .client .get_user ()
160+ task = list (user .created_tasks (where = Entity .Task .uid == task_id ))
161+ # Cache user in a private variable as the relationship can't be
162+ # resolved due to server-side limitations (see Task.created_by)
163+ # for more info.
164+ if len (task ) != 1 :
165+ raise ResourceNotFoundError (Entity .Task , task_id )
166+ task = task [0 ]
167+ task ._user = user
168+ return task
169+
170+ def _create_descriptor_file (self , items , max_attachments_per_data_row = None ):
171+ """
172+ This function is shared by both `Dataset.create_data_rows` and `Dataset.create_data_rows_sync`
173+ to prepare the input file. The user defined input is validated, processed, and json stringified.
174+ Finally the json data is uploaded to gcs and a uri is returned. This uri can be passed to
175+
176+
79177
80178 Each element in `items` can be either a `str` or a `dict`. If
81179 it is a `str`, then it is interpreted as a local file path. The file
@@ -102,19 +200,19 @@ def create_data_rows(self, items):
102200
103201 Args:
104202 items (iterable of (dict or str)): See above for details.
203+ max_attachments_per_data_row (Optional[int]): Param used during attachment validation to determine
204+ if the user has provided too many attachments.
105205
106206 Returns:
107- Task representing the data import on the server side. The Task
108- can be used for inspecting task progress and waiting until it's done.
207+ uri (string): A reference to the uploaded json data.
109208
110209 Raises:
111210 InvalidQueryError: If the `items` parameter does not conform to
112211 the specification above or if the server did not accept the
113212 DataRow creation request (unknown reason).
114- ResourceNotFoundError: If unable to retrieve the Task for the
115- import process. This could imply that the import failed.
116213 InvalidAttributeError: If there are fields in `items` not valid for
117214 a DataRow.
215+ ValueError: When the upload parameters are invalid
118216 """
119217 file_upload_thread_count = 20
120218 DataRow = Entity .DataRow
@@ -135,6 +233,12 @@ def validate_attachments(item):
135233 attachments = item .get ('attachments' )
136234 if attachments :
137235 if isinstance (attachments , list ):
236+ if max_attachments_per_data_row and len (
237+ attachments ) > max_attachments_per_data_row :
238+ raise ValueError (
239+ f"Max attachments number of supported attachments per data row is { max_attachments_per_data_row } ."
240+ f" Found { len (attachments )} . Condense multiple attachments into one with the HTML attachment type if necessary."
241+ )
138242 for attachment in attachments :
139243 AssetAttachment .validate_attachment_json (attachment )
140244 else :
@@ -198,40 +302,9 @@ def convert_item(item):
198302 with ThreadPoolExecutor (file_upload_thread_count ) as executor :
199303 futures = [executor .submit (convert_item , item ) for item in items ]
200304 items = [future .result () for future in as_completed (futures )]
201-
202305 # Prepare and upload the desciptor file
203306 data = json .dumps (items )
204- descriptor_url = self .client .upload_data (data )
205- # Create data source
206- dataset_param = "datasetId"
207- url_param = "jsonUrl"
208- query_str = """mutation AppendRowsToDatasetPyApi($%s: ID!, $%s: String!){
209- appendRowsToDataset(data:{datasetId: $%s, jsonFileUrl: $%s}
210- ){ taskId accepted errorMessage } } """ % (dataset_param , url_param ,
211- dataset_param , url_param )
212-
213- res = self .client .execute (query_str , {
214- dataset_param : self .uid ,
215- url_param : descriptor_url
216- })
217- res = res ["appendRowsToDataset" ]
218- if not res ["accepted" ]:
219- msg = res ['errorMessage' ]
220- raise InvalidQueryError (
221- f"Server did not accept DataRow creation request. { msg } " )
222-
223- # Fetch and return the task.
224- task_id = res ["taskId" ]
225- user = self .client .get_user ()
226- task = list (user .created_tasks (where = Entity .Task .uid == task_id ))
227- # Cache user in a private variable as the relationship can't be
228- # resolved due to server-side limitations (see Task.created_by)
229- # for more info.
230- if len (task ) != 1 :
231- raise ResourceNotFoundError (Entity .Task , task_id )
232- task = task [0 ]
233- task ._user = user
234- return task
307+ return self .client .upload_data (data )
235308
236309 def data_rows_for_external_id (self , external_id , limit = 10 ):
237310 """ Convenience method for getting a single `DataRow` belonging to this
0 commit comments