@@ -114,9 +114,11 @@ class JsonConverter(Converter[JsonConverterOutput]): # pylint: disable=too-few-
114114
115115 Deprecated: This converter is deprecated and will be removed in a future release.
116116 """
117-
117+
118118 def __init__ (self ) -> None :
119- warnings .warn ("JSON converter is deprecated and will be removed in a future release" )
119+ warnings .warn (
120+ "JSON converter is deprecated and will be removed in a future release"
121+ )
120122 super ().__init__ ()
121123
122124 def _find_json_object_offsets (self , data : str ) -> List [Tuple [int , int ]]:
@@ -397,7 +399,9 @@ class _MultiGCSFileReader(_Reader): # pylint: disable=too-few-public-methods
397399 """
398400
399401 def __init__ (self ):
400- warnings .warn ("_MultiGCSFileReader is deprecated and will be removed in a future release" )
402+ warnings .warn (
403+ "_MultiGCSFileReader is deprecated and will be removed in a future release"
404+ )
401405 super ().__init__ ()
402406 self ._retrieval_strategy = None
403407
@@ -415,54 +419,6 @@ def read(self) -> Iterator[Tuple[_MetadataFileInfo, str]]:
415419 result = self ._retrieval_strategy .get_next_chunk ()
416420
417421
418- @dataclass
419- class BufferedJsonConverterOutput :
420- """Output with the JSON object"""
421- json : Any
422-
423-
424- class _BufferedJsonConverter (Converter [BufferedJsonConverterOutput ]):
425- """Converts JSON data in a buffered manner
426- """
427- def convert (
428- self , input_args : Converter .ConverterInputArgs
429- ) -> Iterator [BufferedJsonConverterOutput ]:
430- yield BufferedJsonConverterOutput (json = json .loads (input_args .raw_data ))
431-
432-
433- class _BufferedGCSFileReader (_Reader ):
434- """Reads data from multiple GCS files and buffer them to disk"""
435-
436- def __init__ (self ):
437- super ().__init__ ()
438- self ._retrieval_strategy = None
439-
440- def set_retrieval_strategy (self , strategy : FileRetrieverStrategy ) -> None :
441- """Sets the retrieval strategy."""
442- self ._retrieval_strategy = strategy
443-
444- def read (self ) -> Iterator [Tuple [_MetadataFileInfo , str ]]:
445- if not self ._retrieval_strategy :
446- raise ValueError ("retrieval strategy not set" )
447- # create a buffer
448- with tempfile .NamedTemporaryFile (mode = 'w+' , delete = False ) as temp_file :
449- result = self ._retrieval_strategy .get_next_chunk ()
450- while result :
451- file_info , raw_data = result
452- temp_file .seek (file_info .offsets .start )
453- temp_file .write (raw_data )
454- result = self ._retrieval_strategy .get_next_chunk ()
455- # read buffer
456- with open (temp_file .name , 'r' ) as temp_file_reopened :
457- for idx , line in enumerate (temp_file_reopened ):
458- yield _MetadataFileInfo (
459- offsets = Range (start = 0 , end = len (line ) - 1 ),
460- lines = Range (start = idx , end = idx + 1 ),
461- file = temp_file .name ), line
462- # manually delete buffer
463- os .unlink (temp_file .name )
464-
465-
466422class Stream (Generic [OutputT ]):
467423 """Streams data from a Reader."""
468424
@@ -520,6 +476,144 @@ def start(
520476 stream_handler (output )
521477
522478
479+ class _BufferedFileRetrieverByOffset (FileRetrieverStrategy ): # pylint: disable=too-few-public-methods
480+ """Retrieves files by offset."""
481+
482+ def __init__ (
483+ self ,
484+ ctx : _TaskContext ,
485+ offset : int ,
486+ ) -> None :
487+ super ().__init__ (ctx )
488+ self ._current_offset = offset
489+ self ._current_line = 0
490+ if self ._current_offset >= self ._ctx .metadata_header .total_size :
491+ raise ValueError (
492+ f"offset is out of range, max offset is { self ._ctx .metadata_header .total_size - 1 } "
493+ )
494+
495+ def get_next_chunk (self ) -> Optional [Tuple [_MetadataFileInfo , str ]]:
496+ if self ._current_offset >= self ._ctx .metadata_header .total_size :
497+ return None
498+ query = (
499+ f"query GetExportFileFromOffsetPyApi"
500+ f"($where: WhereUniqueIdInput, $streamType: TaskStreamType!, $offset: UInt64!)"
501+ f"{{task(where: $where)"
502+ f"{{{ 'exportFileFromOffset' } (streamType: $streamType, offset: $offset)"
503+ f"{{offsets {{start end}} lines {{start end}} file}}"
504+ f"}}}}" )
505+ variables = {
506+ "where" : {
507+ "id" : self ._ctx .task_id
508+ },
509+ "streamType" : self ._ctx .stream_type .value ,
510+ "offset" : str (self ._current_offset ),
511+ }
512+ file_info , file_content = self ._get_file_content (
513+ query , variables , "exportFileFromOffset" )
514+ file_info .offsets .start = self ._current_offset
515+ file_info .lines .start = self ._current_line
516+ self ._current_offset = file_info .offsets .end + 1
517+ self ._current_line = file_info .lines .end + 1
518+ return file_info , file_content
519+
520+
521+ class BufferedStream (Generic [OutputT ]):
522+ """Streams data from a Reader."""
523+
524+ def __init__ (
525+ self ,
526+ ctx : _TaskContext ,
527+ ):
528+ self ._ctx = ctx
529+ self ._reader = _BufferedGCSFileReader ()
530+ self ._converter = _BufferedJsonConverter ()
531+ self ._reader .set_retrieval_strategy (
532+ _BufferedFileRetrieverByOffset (self ._ctx , 0 ))
533+
534+ def __iter__ (self ):
535+ yield from self ._fetch ()
536+
537+ def _fetch (self ,) -> Iterator [OutputT ]:
538+ """Fetches the result data.
539+ Returns an iterator that yields the offset and the data.
540+ """
541+ if self ._ctx .metadata_header .total_size is None :
542+ return
543+
544+ stream = self ._reader .read ()
545+ with self ._converter as converter :
546+ for file_info , raw_data in stream :
547+ for output in converter .convert (
548+ Converter .ConverterInputArgs (self ._ctx , file_info ,
549+ raw_data )):
550+ yield output
551+
552+ def start (
553+ self ,
554+ stream_handler : Optional [Callable [[OutputT ], None ]] = None ) -> None :
555+ """Starts streaming the result data.
556+ Calls the stream_handler for each result.
557+ """
558+ # this calls the __iter__ method, which in turn calls the _fetch method
559+ for output in self :
560+ if stream_handler :
561+ stream_handler (output )
562+
563+
564+ @dataclass
565+ class BufferedJsonConverterOutput :
566+ """Output with the JSON object"""
567+ json : Any
568+
569+
570+ class _BufferedJsonConverter (Converter [BufferedJsonConverterOutput ]):
571+ """Converts JSON data in a buffered manner
572+ """
573+
574+ def convert (
575+ self , input_args : Converter .ConverterInputArgs
576+ ) -> Iterator [BufferedJsonConverterOutput ]:
577+ yield BufferedJsonConverterOutput (json = json .loads (input_args .raw_data ))
578+
579+
580+ class _BufferedGCSFileReader (_Reader ):
581+ """Reads data from multiple GCS files and buffer them to disk"""
582+
583+ def __init__ (self ):
584+ super ().__init__ ()
585+ self ._retrieval_strategy = None
586+
587+ def set_retrieval_strategy (self , strategy : FileRetrieverStrategy ) -> None :
588+ """Sets the retrieval strategy."""
589+ self ._retrieval_strategy = strategy
590+
591+ def read (self ) -> Iterator [Tuple [_MetadataFileInfo , str ]]:
592+ if not self ._retrieval_strategy :
593+ raise ValueError ("retrieval strategy not set" )
594+ # create a buffer
595+ with tempfile .NamedTemporaryFile (mode = 'w+' , delete = False ) as temp_file :
596+ result = self ._retrieval_strategy .get_next_chunk ()
597+ while result :
598+ _ , raw_data = result
599+ # there is something wrong with the way the offsets are being calculated
600+ # so just write all of the chunks as is too the file, with pointer initially
601+ # pointed to the start of the file (like what is in GCS) and do not
602+ # rely on offsets for file location
603+ # temp_file.seek(file_info.offsets.start)
604+ temp_file .write (raw_data )
605+ result = self ._retrieval_strategy .get_next_chunk ()
606+ # read buffer
607+ with open (temp_file .name , 'r' ) as temp_file_reopened :
608+ for idx , line in enumerate (temp_file_reopened ):
609+ yield _MetadataFileInfo (offsets = Range (start = 0 ,
610+ end = len (line ) - 1 ),
611+ lines = Range (start = idx , end = idx + 1 ),
612+ file = temp_file .name ), line
613+ # manually delete buffer
614+ os .unlink (temp_file .name )
615+
616+
523617class ExportTask :
524618 """
525619 An adapter class for working with task objects, providing extended functionality
@@ -645,12 +739,11 @@ def errors(self):
645739 self ._task .client , self ._task .uid , StreamType .ERRORS )
646740 if metadata_header is None :
647741 return None
648- Stream (
649- _TaskContext (self ._task .client , self ._task .uid , StreamType .ERRORS ,
650- metadata_header ),
651- _BufferedGCSFileReader (),
652- _BufferedJsonConverter (),
653- ).start (stream_handler = lambda output : data .append (output .json ))
742+ BufferedStream (
743+ _TaskContext (
744+ self ._task .client , self ._task .uid , StreamType .ERRORS ,
745+ metadata_header ),).start (
746+ stream_handler = lambda output : data .append (output .json ))
654747 return data
655748
656749 @property
@@ -667,12 +760,11 @@ def result(self):
667760 self ._task .client , self ._task .uid , StreamType .RESULT )
668761 if metadata_header is None :
669762 return []
670- Stream (
671- _TaskContext (self ._task .client , self ._task .uid ,
672- StreamType .RESULT , metadata_header ),
673- _BufferedGCSFileReader (),
674- _BufferedJsonConverter (),
675- ).start (stream_handler = lambda output : data .append (output .json ))
763+ BufferedStream (
764+ _TaskContext (
765+ self ._task .client , self ._task .uid , StreamType .RESULT ,
766+ metadata_header ),).start (
767+ stream_handler = lambda output : data .append (output .json ))
676768 return data
677769 return self ._task .result_url
678770
@@ -747,6 +839,38 @@ def has_errors(self) -> bool:
747839 total_size = self .get_total_file_size (StreamType .ERRORS )
748840 return total_size is not None and total_size > 0
749841
842+ def get_buffered_stream (
843+ self ,
844+ stream_type : StreamType = StreamType .RESULT ,
845+ ) -> BufferedStream :
846+ """
847+ Returns the result of the task.
848+
849+ Args:
850+ stream_type (StreamType, optional): The type of stream to retrieve. Defaults to StreamType.RESULT.
851+
852+ Returns:
853+ Stream: The buffered stream object.
854+
855+ Raises:
856+ ExportTask.ExportTaskException: If the task has failed or is not ready yet.
857+ ValueError: If the task does not have the specified stream type.
858+ """
859+ if self ._task .status == "FAILED" :
860+ raise ExportTask .ExportTaskException ("Task failed" )
861+ if self ._task .status != "COMPLETE" :
862+ raise ExportTask .ExportTaskException ("Task is not ready yet" )
863+
864+ metadata_header = self ._get_metadata_header (self ._task .client ,
865+ self ._task .uid , stream_type )
866+ if metadata_header is None :
867+ raise ValueError (
868+ f"Task { self ._task .uid } does not have a { stream_type .value } stream"
869+ )
870+ return BufferedStream (
871+ _TaskContext (self ._task .client , self ._task .uid , stream_type ,
872+ metadata_header ),)
873+
750874 @overload
751875 def get_stream (
752876 self ,
@@ -768,6 +892,9 @@ def get_stream(
768892 converter : Optional [Converter ] = None ,
769893 stream_type : StreamType = StreamType .RESULT ,
770894 ) -> Stream :
895+ warnings .warn (
896+ "get_stream is deprecated and will be removed in a future release, use get_buffered_stream"
897+ )
771898 if converter is None :
772899 converter = JsonConverter ()
773900 """Returns the result of the task."""
0 commit comments