@@ -411,9 +411,6 @@ def export_v2(self,
411411 task_name : Optional [str ] = None ,
412412 params : Optional [ProjectExportParams ] = None ) -> Task :
413413
414- if (task_name is None ):
415- task_name = f'Export Data Rows in Project - { self .name } '
416-
417414 _params = params or ProjectExportParams ({
418415 "attachments" : False ,
419416 "metadata_fields" : False ,
@@ -669,16 +666,20 @@ def setup(self, labeling_frontend, labeling_frontend_options) -> None:
669666 timestamp = datetime .now (timezone .utc ).strftime ("%Y-%m-%dT%H:%M:%SZ" )
670667 self .update (setup_complete = timestamp )
671668
672- def create_batch (self ,
673- name : str ,
674- data_rows : List [Union [str , DataRow ]],
675- priority : int = 5 ,
676- consensus_settings : Optional [Dict [str , float ]] = None ):
677- """Create a new batch for a project. Batches is in Beta and subject to change
669+ def create_batch (
670+ self ,
671+ name : str ,
672+ data_rows : Optional [List [Union [str , DataRow ]]] = None ,
673+ priority : int = 5 ,
674+ consensus_settings : Optional [Dict [str , float ]] = None ,
675+ global_keys : Optional [List [str ]] = None ,
676+ ):
677+ """Create a new batch for a project. One of `global_keys` or `data_rows` must be provided but not both.
678678
679679 Args:
680680 name: a name for the batch, must be unique within a project
681- data_rows: Either a list of `DataRows` or Data Row ids
681+ data_rows: Either a list of `DataRows` or Data Row ids.
682+ global_keys: global keys for data rows to add to the batch.
682683 priority: An optional priority for the Data Rows in the Batch. 1 highest -> 5 lowest
683684 consensus_settings: An optional dictionary with consensus settings: {'number_of_labels': 3, 'coverage_percentage': 0.1}
684685 """
@@ -688,35 +689,45 @@ def create_batch(self,
688689 raise ValueError ("Project must be in batch mode" )
689690
690691 dr_ids = []
691- for dr in data_rows :
692- if isinstance (dr , Entity .DataRow ):
693- dr_ids .append (dr .uid )
694- elif isinstance (dr , str ):
695- dr_ids .append (dr )
696- else :
697- raise ValueError ("You can DataRow ids or DataRow objects" )
692+ if data_rows is not None :
693+ for dr in data_rows :
694+ if isinstance (dr , Entity .DataRow ):
695+ dr_ids .append (dr .uid )
696+ elif isinstance (dr , str ):
697+ dr_ids .append (dr )
698+ else :
699+ raise ValueError (
700+ "`data_rows` must be DataRow ids or DataRow objects" )
701+
702+ if data_rows is not None :
703+ row_count = len (data_rows )
704+ elif global_keys is not None :
705+ row_count = len (global_keys )
706+ else :
707+ row_count = 0
698708
699- if len ( dr_ids ) > 100_000 :
709+ if row_count > 100_000 :
700710 raise ValueError (
701711 f"Batch exceeds max size, break into smaller batches" )
702- if not len ( dr_ids ) :
712+ if not row_count :
703713 raise ValueError ("You need at least one data row in a batch" )
704714
705715 self ._wait_until_data_rows_are_processed (
706- dr_ids , self ._wait_processing_max_seconds )
716+ dr_ids , global_keys , self ._wait_processing_max_seconds )
707717
708718 if consensus_settings :
709719 consensus_settings = ConsensusSettings (** consensus_settings ).dict (
710720 by_alias = True )
711721
712722 if len (dr_ids ) >= 10_000 :
713- return self ._create_batch_async (name , dr_ids , priority ,
723+ return self ._create_batch_async (name , dr_ids , global_keys , priority ,
714724 consensus_settings )
715725 else :
716- return self ._create_batch_sync (name , dr_ids , priority ,
726+ return self ._create_batch_sync (name , dr_ids , global_keys , priority ,
717727 consensus_settings )
718728
719- def _create_batch_sync (self , name , dr_ids , priority , consensus_settings ):
729+ def _create_batch_sync (self , name , dr_ids , global_keys , priority ,
730+ consensus_settings ):
720731 method = 'createBatchV2'
721732 query_str = """mutation %sPyApi($projectId: ID!, $batchInput: CreateBatchInput!) {
722733 project(where: {id: $projectId}) {
@@ -734,6 +745,7 @@ def _create_batch_sync(self, name, dr_ids, priority, consensus_settings):
734745 "batchInput" : {
735746 "name" : name ,
736747 "dataRowIds" : dr_ids ,
748+ "globalKeys" : global_keys ,
737749 "priority" : priority ,
738750 "consensusSettings" : consensus_settings
739751 }
@@ -751,7 +763,8 @@ def _create_batch_sync(self, name, dr_ids, priority, consensus_settings):
751763
752764 def _create_batch_async (self ,
753765 name : str ,
754- dr_ids : List [str ],
766+ dr_ids : Optional [List [str ]] = None ,
767+ global_keys : Optional [List [str ]] = None ,
755768 priority : int = 5 ,
756769 consensus_settings : Optional [Dict [str ,
757770 float ]] = None ):
@@ -794,6 +807,7 @@ def _create_batch_async(self,
794807 "input" : {
795808 "batchId" : batch_id ,
796809 "dataRowIds" : dr_ids ,
810+ "globalKeys" : global_keys ,
797811 "priority" : priority ,
798812 }
799813 }
@@ -1260,38 +1274,50 @@ def _is_url_valid(url: Union[str, Path]) -> bool:
12601274 raise ValueError (
12611275 f'Invalid annotations given of type: { type (annotations )} ' )
12621276
1263- def _wait_until_data_rows_are_processed (self ,
1264- data_row_ids : List [str ],
1265- wait_processing_max_seconds : int ,
1266- sleep_interval = 30 ):
1277+ def _wait_until_data_rows_are_processed (
1278+ self ,
1279+ data_row_ids : Optional [List [str ]] = None ,
1280+ global_keys : Optional [List [str ]] = None ,
1281+ wait_processing_max_seconds : int = _wait_processing_max_seconds ,
1282+ sleep_interval = 30 ):
12671283 """ Wait until all the specified data rows are processed"""
12681284 start_time = datetime .now ()
1285+
12691286 while True :
12701287 if (datetime .now () -
12711288 start_time ).total_seconds () >= wait_processing_max_seconds :
12721289 raise ProcessingWaitTimeout (
12731290 "Maximum wait time exceeded while waiting for data rows to be processed. Try creating a batch a bit later"
12741291 )
12751292
1276- all_good = self .__check_data_rows_have_been_processed (data_row_ids )
1293+ all_good = self .__check_data_rows_have_been_processed (
1294+ data_row_ids , global_keys )
12771295 if all_good :
12781296 return
12791297
12801298 logger .debug (
12811299 'Some of the data rows are still being processed, waiting...' )
12821300 time .sleep (sleep_interval )
12831301
1284- def __check_data_rows_have_been_processed (self , data_row_ids : List [str ]):
1285- data_row_ids_param = "data_row_ids"
1302+ def __check_data_rows_have_been_processed (
1303+ self ,
1304+ data_row_ids : Optional [List [str ]] = None ,
1305+ global_keys : Optional [List [str ]] = None ):
1306+
1307+ if data_row_ids is not None and len (data_row_ids ) > 0 :
1308+ param_name = "dataRowIds"
1309+ params = {param_name : data_row_ids }
1310+ else :
1311+ param_name = "globalKeys"
1312+ global_keys = global_keys if global_keys is not None else []
1313+ params = {param_name : global_keys }
12861314
1287- query_str = """query CheckAllDataRowsHaveBeenProcessedPyApi($%s: [ID!]! ) {
1288- queryAllDataRowsHaveBeenProcessed(dataRowIds :$%s) {
1315+ query_str = """query CheckAllDataRowsHaveBeenProcessedPyApi($%s: [ID!]) {
1316+ queryAllDataRowsHaveBeenProcessed(%s :$%s) {
12891317 allDataRowsHaveBeenProcessed
12901318 }
1291- }""" % (data_row_ids_param , data_row_ids_param )
1319+ }""" % (param_name , param_name , param_name )
12921320
1293- params = {}
1294- params [data_row_ids_param ] = data_row_ids
12951321 response = self .client .execute (query_str , params )
12961322 return response ["queryAllDataRowsHaveBeenProcessed" ][
12971323 "allDataRowsHaveBeenProcessed" ]
0 commit comments