@@ -230,18 +230,30 @@ def __init__(self, *args, **kwargs):
230230 def result (self ) -> Union [List [Dict [str , Any ]]]:
231231 if self .status == "FAILED" :
232232 raise ValueError (f"Job failed. Errors : { self .errors } " )
233- return self ._result_as_list ()
233+ return self ._results_as_list ()
234+
235+ @property
236+ def errors (self ) -> Optional [Dict [str , Any ]]:
237+ return self ._errors_as_list ()
234238
235239 @property
236240 def created_data_rows (self ) -> Optional [Dict [str , Any ]]:
237241 return self .result
238242
243+ @property
244+ def failed_data_rows (self ) -> Optional [Dict [str , Any ]]:
245+ return self .errors
246+
239247 @property
240248 def result_all (self ) -> PaginatedCollection :
241- return self ._download_result_paginated ()
249+ return self ._download_results_paginated ()
242250
243- def _download_result_paginated (self ) -> PaginatedCollection :
244- page_size = 5000 # hardcode to avoid overloading the server
251+ @property
252+ def errors_all (self ) -> PaginatedCollection :
253+ return self ._download_errors_paginated ()
254+
255+ def _download_results_paginated (self ) -> PaginatedCollection :
256+ page_size = 900 # hardcode to avoid overloading the server
245257 from_cursor = None
246258
247259 query_str = """query SuccessesfulDataRowImportsPyApi($taskId: ID!, $first: Int, $from: String) {
@@ -271,18 +283,66 @@ def _download_result_paginated(self) -> PaginatedCollection:
271283 params = params ,
272284 dereferencing = ['successesfulDataRowImports' , 'nodes' ],
273285 obj_class = lambda _ , data_row : {
274- 'id' : data_row [ 'id' ] ,
286+ 'id' : data_row . get ( 'id' ) ,
275287 'external_id' : data_row .get ('externalId' ),
276- 'row_data' : data_row [ 'rowData' ] ,
288+ 'row_data' : data_row . get ( 'rowData' ) ,
277289 'global_key' : data_row .get ('globalKey' ),
278290 },
279291 cursor_path = ['successesfulDataRowImports' , 'after' ],
280292 )
281293
282- def _result_as_list (self ) -> List [Dict [str , Any ]]:
294+ def _download_errors_paginated (self ) -> PaginatedCollection :
295+ page_size = 5000 # hardcode to avoid overloading the server
296+ from_cursor = None
297+
298+ query_str = """query FailedDataRowImportsPyApi($taskId: ID!, $first: Int, $from: String) {
299+ failedDataRowImports(data: { taskId: $taskId, first: $first, from: $from})
300+ {
301+ after
302+ total
303+ results {
304+ message
305+ spec {
306+ externalId
307+ globalKey
308+ rowData
309+ }
310+ }
311+ }
312+ }
313+ """
314+
315+ params = {
316+ 'taskId' : self .uid ,
317+ 'first' : page_size ,
318+ 'from' : from_cursor ,
319+ }
320+
321+ return PaginatedCollection (
322+ client = self .client ,
323+ query = query_str ,
324+ params = params ,
325+ dereferencing = ['failedDataRowImports' , 'results' ],
326+ obj_class = lambda _ , data_row : {
327+ 'error' :
328+ data_row .get ('message' ),
329+ 'external_id' :
330+ data_row .get ('spec' ).get ('externalId' )
331+ if data_row .get ('spec' ) else None ,
332+ 'row_data' :
333+ data_row .get ('spec' ).get ('rowData' )
334+ if data_row .get ('spec' ) else None ,
335+ 'global_key' :
336+ data_row .get ('spec' ).get ('globalKey' )
337+ if data_row .get ('spec' ) else None ,
338+ },
339+ cursor_path = ['failedDataRowImports' , 'after' ],
340+ )
341+
342+ def _results_as_list (self ) -> List [Dict [str , Any ]]:
283343 total_downloaded = 0
284344 results = []
285- data = self ._download_result_paginated ()
345+ data = self ._download_results_paginated ()
286346
287347 for row in data :
288348 results .append (row )
@@ -291,3 +351,16 @@ def _result_as_list(self) -> List[Dict[str, Any]]:
291351 break
292352
293353 return results
354+
355+ def _errors_as_list (self ) -> List [Dict [str , Any ]]:
356+ total_downloaded = 0
357+ errors = []
358+ data = self ._download_errors_paginated ()
359+
360+ for row in data :
361+ errors .append (row )
362+ total_downloaded += 1
363+ if total_downloaded >= self .__max_donwload_size :
364+ break
365+
366+ return errors
0 commit comments