@@ -124,8 +124,11 @@ def _find_json_object_offsets(self, data: str) -> List[Tuple[int, int]]:
124124 object_offsets .append ((0 , index - 1 ))
125125 elif char == "}" and stack :
126126 stack .pop ()
127- if len (stack ) == 0 and data [
128- index + 1 ] == "\n " and current_object_start is not None :
127+ # this covers cases where the last object is either followed by a newline or
128+ # it is missing
129+ if len (stack ) == 0 and (len (data ) == index + 1 or
130+ data [index + 1 ] == "\n "
131+ ) and current_object_start is not None :
129132 object_offsets .append ((current_object_start , index + 1 ))
130133 current_object_start = None
131134
@@ -143,8 +146,8 @@ def convert(
143146 offsets = self ._find_json_object_offsets (raw_data )
144147 for line , (offset_start , offset_end ) in enumerate (offsets ):
145148 yield JsonConverterOutput (
146- current_offset + offset_start ,
147- current_line + line ,
149+ current_offset = current_offset + offset_start ,
150+ current_line = current_line + line ,
148151 json_str = raw_data [offset_start :offset_end + 1 ].strip (),
149152 )
150153
@@ -549,32 +552,30 @@ def _get_metadata_header(
549552 res = res ["task" ]["exportMetadataHeader" ]
550553 return _MetadataHeader (** res ) if res else None
551554
552- def get_total_file_size (self , task_id : str ,
553- stream_type : StreamType ) -> Union [int , None ]:
555+ def get_total_file_size (self , stream_type : StreamType ) -> Union [int , None ]:
554556 """Returns the total file size for a specific task."""
555557 if not self ._task .status in ["COMPLETE" , "FAILED" ]:
556558 raise ExportTask .TaskNotReadyException ("Task is not ready yet" )
557- header = ExportTask ._get_metadata_header (self ._task .client , task_id ,
558- stream_type )
559+ header = ExportTask ._get_metadata_header (self ._task .client ,
560+ self . _task . uid , stream_type )
559561 return header .total_size if header else None
560562
561- def get_total_lines (self , task_id : str ,
562- stream_type : StreamType ) -> Union [int , None ]:
563+ def get_total_lines (self , stream_type : StreamType ) -> Union [int , None ]:
563564 """Returns the total file size for a specific task."""
564565 if not self ._task .status in ["COMPLETE" , "FAILED" ]:
565566 raise ExportTask .TaskNotReadyException ("Task is not ready yet" )
566- header = ExportTask ._get_metadata_header (self ._task .client , task_id ,
567- stream_type )
567+ header = ExportTask ._get_metadata_header (self ._task .client ,
568+ self . _task . uid , stream_type )
568569 return header .total_lines if header else None
569570
570571 def has_result (self ) -> bool :
571572 """Returns whether the task has a result."""
572- total_size = self .get_total_file_size (self . _task . uid , StreamType .RESULT )
573+ total_size = self .get_total_file_size (StreamType .RESULT )
573574 return total_size is not None and total_size > 0
574575
575576 def has_errors (self ) -> bool :
576577 """Returns whether the task has errors."""
577- total_size = self .get_total_file_size (self . _task . uid , StreamType .ERRORS )
578+ total_size = self .get_total_file_size (StreamType .ERRORS )
578579 return total_size is not None and total_size > 0
579580
580581 @overload
0 commit comments