44from collections import namedtuple
55from datetime import datetime , timezone
66from pathlib import Path
7- from typing import TYPE_CHECKING , Dict , Union , Iterable , List , Optional , Any
7+ from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Optional , Union
88from urllib .parse import urlparse
99
1010import ndjson
1111import requests
1212
1313from labelbox import utils
14- from labelbox .exceptions import InvalidQueryError , LabelboxError
14+ from labelbox .exceptions import (InvalidQueryError , LabelboxError ,
15+ ProcessingWaitTimeout , ResourceConflict )
1516from labelbox .orm import query
16- from labelbox .orm .db_object import DbObject , Updateable , Deletable
17+ from labelbox .orm .db_object import DbObject , Deletable , Updateable
1718from labelbox .orm .model import Entity , Field , Relationship
1819from labelbox .pagination import PaginatedCollection
1920from labelbox .schema .consensus_settings import ConsensusSettings
@@ -90,6 +91,9 @@ class Project(DbObject, Updateable, Deletable):
9091 benchmarks = Relationship .ToMany ("Benchmark" , False )
9192 ontology = Relationship .ToOne ("Ontology" , True )
9293
94+ #
95+ _wait_processing_max_seconds = 3600
96+
9397 def update (self , ** kwargs ):
9498 """ Updates this project with the specified attributes
9599
@@ -319,7 +323,7 @@ def _validate_datetime(string_date: str) -> bool:
319323 return True
320324 except ValueError :
321325 pass
322- raise ValueError (f"""Incorrect format for: { string_date } .
326+ raise ValueError (f"""Incorrect format for: { string_date } .
323327 Format must be \" YYYY-MM-DD\" or \" YYYY-MM-DD hh:mm:ss\" """ )
324328 return True
325329
@@ -507,6 +511,9 @@ def setup_editor(self, ontology) -> None:
507511 Args:
508512 ontology (Ontology): The ontology to attach to the project
509513 """
514+ if self .labeling_frontend () is not None :
515+ raise ResourceConflict ("Editor is already set up." )
516+
510517 labeling_frontend = next (
511518 self .client .get_labeling_frontends (
512519 where = Entity .LabelingFrontend .name == "Editor" ))
@@ -546,6 +553,9 @@ def setup(self, labeling_frontend, labeling_frontend_options) -> None:
546553 to `str` using `json.dumps`.
547554 """
548555
556+ if self .labeling_frontend () is not None :
557+ raise ResourceConflict ("Editor is already set up." )
558+
549559 if not isinstance (labeling_frontend_options , str ):
550560 labeling_frontend_options = json .dumps (labeling_frontend_options )
551561
@@ -595,11 +605,16 @@ def create_batch(self,
595605 if not len (dr_ids ):
596606 raise ValueError ("You need at least one data row in a batch" )
597607
598- method = 'createBatch'
608+ self ._wait_until_data_rows_are_processed (
609+ data_rows , self ._wait_processing_max_seconds )
610+ method = 'createBatchV2'
599611 query_str = """mutation %sPyApi($projectId: ID!, $batchInput: CreateBatchInput!) {
600612 project(where: {id: $projectId}) {
601613 %s(input: $batchInput) {
602- %s
614+ batch {
615+ %s
616+ }
617+ failedDataRowIds
603618 }
604619 }
605620 }
@@ -622,9 +637,12 @@ def create_batch(self,
622637 params ,
623638 timeout = 180.0 ,
624639 experimental = True )["project" ][method ]
625-
626- res ['size' ] = len (dr_ids )
627- return Entity .Batch (self .client , self .uid , res )
640+ batch = res ['batch' ]
641+ batch ['size' ] = len (dr_ids )
642+ return Entity .Batch (self .client ,
643+ self .uid ,
644+ batch ,
645+ failed_data_row_ids = res ['failedDataRowIds' ])
628646
629647 def _update_queue_mode (self , mode : "QueueMode" ) -> "QueueMode" :
630648 """
@@ -977,6 +995,42 @@ def _is_url_valid(url: Union[str, Path]) -> bool:
977995 raise ValueError (
978996 f'Invalid annotations given of type: { type (annotations )} ' )
979997
998+ def _wait_until_data_rows_are_processed (self ,
999+ data_row_ids : List [str ],
1000+ wait_processing_max_seconds : int ,
1001+ sleep_interval = 30 ):
1002+ """ Wait until all the specified data rows are processed"""
1003+ start_time = datetime .now ()
1004+ while True :
1005+ if (datetime .now () -
1006+ start_time ).total_seconds () >= wait_processing_max_seconds :
1007+ raise ProcessingWaitTimeout (
1008+ "Maximum wait time exceeded while waiting for data rows to be processed. Try creating a batch a bit later"
1009+ )
1010+
1011+ all_good = self .__check_data_rows_have_been_processed (data_row_ids )
1012+ if all_good :
1013+ return
1014+
1015+ logger .debug (
1016+ 'Some of the data rows are still being processed, waiting...' )
1017+ time .sleep (sleep_interval )
1018+
1019+ def __check_data_rows_have_been_processed (self , data_row_ids : List [str ]):
1020+ data_row_ids_param = "data_row_ids"
1021+
1022+ query_str = """query CheckAllDataRowsHaveBeenProcessedPyApi($%s: [ID!]!) {
1023+ queryAllDataRowsHaveBeenProcessed(dataRowIds:$%s) {
1024+ allDataRowsHaveBeenProcessed
1025+ }
1026+ }""" % (data_row_ids_param , data_row_ids_param )
1027+
1028+ params = {}
1029+ params [data_row_ids_param ] = data_row_ids
1030+ response = self .client .execute (query_str , params )
1031+ return response ["queryAllDataRowsHaveBeenProcessed" ][
1032+ "allDataRowsHaveBeenProcessed" ]
1033+
9801034
9811035class ProjectMember (DbObject ):
9821036 user = Relationship .ToOne ("User" , cache = True )
0 commit comments