111111//!
112112//! [`BufReader`]: buffered::BufReader
113113//! [`BufWriter`]: buffered::BufWriter
114+ //! [`Read`]: std::io::Read
115+ //! [`Seek`]: std::io::Seek
114116//!
115117//! # Adapters
116118//!
238240//! assert_eq!(object.len() as u64, meta.size);
239241//!
240242//! // Alternatively stream the bytes from object storage
241- //! let stream = object_store.get(&path).await.unwrap().into_stream() ;
243+ //! let stream = object_store.get(&path).await.unwrap().payload ;
242244//!
243245//! // Count the '0's using `try_fold` from `TryStreamExt` trait
244246//! let num_zeros = stream
@@ -575,8 +577,6 @@ use bytes::Bytes;
575577use chrono:: { DateTime , Utc } ;
576578use futures:: { stream:: BoxStream , StreamExt , TryStreamExt } ;
577579use std:: fmt:: { Debug , Formatter } ;
578- #[ cfg( all( feature = "fs" , not( target_arch = "wasm32" ) ) ) ]
579- use std:: io:: { Read , Seek , SeekFrom } ;
580580use std:: ops:: Range ;
581581use std:: sync:: Arc ;
582582
@@ -1035,10 +1035,9 @@ impl GetOptions {
10351035}
10361036
10371037/// Result for a get request
1038- #[ derive( Debug ) ]
10391038pub struct GetResult {
1040- /// The [`GetResultPayload`]
1041- pub payload : GetResultPayload ,
1039+ /// The payload.
1040+ pub payload : BoxStream < ' static , Result < Bytes > > ,
10421041 /// The [`ObjectMeta`] for this object
10431042 pub meta : ObjectMeta ,
10441043 /// The range of bytes returned by this request
@@ -1049,82 +1048,29 @@ pub struct GetResult {
10491048 pub attributes : Attributes ,
10501049}
10511050
1052- /// The kind of a [`GetResult`]
1053- ///
1054- /// This special cases the case of a local file, as some systems may
1055- /// be able to optimise the case of a file already present on local disk
1056- pub enum GetResultPayload {
1057- /// The file, path
1058- #[ cfg( all( feature = "fs" , not( target_arch = "wasm32" ) ) ) ]
1059- File ( std:: fs:: File , std:: path:: PathBuf ) ,
1060- /// An opaque stream of bytes
1061- Stream ( BoxStream < ' static , Result < Bytes > > ) ,
1062- }
1063-
1064- impl Debug for GetResultPayload {
1051+ impl std:: fmt:: Debug for GetResult {
10651052 fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: fmt:: Result {
1066- match self {
1067- #[ cfg( all( feature = "fs" , not( target_arch = "wasm32" ) ) ) ]
1068- Self :: File ( _, _) => write ! ( f, "GetResultPayload(File)" ) ,
1069- Self :: Stream ( _) => write ! ( f, "GetResultPayload(Stream)" ) ,
1070- }
1053+ let Self {
1054+ payload : _,
1055+ meta,
1056+ range,
1057+ attributes,
1058+ } = self ;
1059+
1060+ f. debug_struct ( "GetResult" )
1061+ . field ( "payload" , & "<STREAM>" )
1062+ . field ( "meta" , meta)
1063+ . field ( "range" , range)
1064+ . field ( "attributes" , attributes)
1065+ . finish ( )
10711066 }
10721067}
10731068
10741069impl GetResult {
10751070 /// Collects the data into a [`Bytes`]
10761071 pub async fn bytes ( self ) -> Result < Bytes > {
10771072 let len = self . range . end - self . range . start ;
1078- match self . payload {
1079- #[ cfg( all( feature = "fs" , not( target_arch = "wasm32" ) ) ) ]
1080- GetResultPayload :: File ( mut file, path) => {
1081- maybe_spawn_blocking ( move || {
1082- file. seek ( SeekFrom :: Start ( self . range . start as _ ) )
1083- . map_err ( |source| local:: Error :: Seek {
1084- source,
1085- path : path. clone ( ) ,
1086- } ) ?;
1087-
1088- let mut buffer = if let Ok ( len) = len. try_into ( ) {
1089- Vec :: with_capacity ( len)
1090- } else {
1091- Vec :: new ( )
1092- } ;
1093- file. take ( len as _ )
1094- . read_to_end ( & mut buffer)
1095- . map_err ( |source| local:: Error :: UnableToReadBytes { source, path } ) ?;
1096-
1097- Ok ( buffer. into ( ) )
1098- } )
1099- . await
1100- }
1101- GetResultPayload :: Stream ( s) => collect_bytes ( s, Some ( len) ) . await ,
1102- }
1103- }
1104-
1105- /// Converts this into a byte stream
1106- ///
1107- /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1108- /// otherwise will return the [`GetResultPayload::Stream`].
1109- ///
1110- /// # Tokio Compatibility
1111- ///
1112- /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1113- /// no major operating systems have stable async file APIs. Therefore if called from
1114- /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1115- /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1116- ///
1117- /// If not called from a tokio context, this will perform IO on the current thread with
1118- /// no additional complexity or overheads
1119- pub fn into_stream ( self ) -> BoxStream < ' static , Result < Bytes > > {
1120- match self . payload {
1121- #[ cfg( all( feature = "fs" , not( target_arch = "wasm32" ) ) ) ]
1122- GetResultPayload :: File ( file, path) => {
1123- const CHUNK_SIZE : usize = 8 * 1024 ;
1124- local:: chunked_stream ( file, path, self . range , CHUNK_SIZE )
1125- }
1126- GetResultPayload :: Stream ( s) => s,
1127- }
1073+ collect_bytes ( self . payload , Some ( len) ) . await
11281074 }
11291075}
11301076
0 commit comments