@@ -8,8 +8,8 @@ use std::sync::Arc;
88use byteorder:: { BigEndian , LittleEndian , ReadBytesExt } ;
99use bytes:: buf:: Reader ;
1010use bytes:: { Buf , Bytes } ;
11- use futures:: future:: { BoxFuture , FutureExt , TryFutureExt } ;
12- use object_store :: ObjectStore ;
11+ use futures:: future:: { BoxFuture , FutureExt } ;
12+ use futures :: TryFutureExt ;
1313
1414use crate :: error:: { AsyncTiffError , AsyncTiffResult } ;
1515
@@ -67,45 +67,75 @@ impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
6767 }
6868}
6969
70- // #[cfg(feature = "tokio")]
71- // impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Debug + Send + Sync> AsyncFileReader
72- // for T
73- // {
74- // fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
75- // use tokio::io::{AsyncReadExt, AsyncSeekExt};
76-
77- // async move {
78- // self.seek(std::io::SeekFrom::Start(range.start)).await?;
79-
80- // let to_read = (range.end - range.start).try_into().unwrap();
81- // let mut buffer = Vec::with_capacity(to_read);
82- // let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
83- // if read != to_read {
84- // return Err(AsyncTiffError::EndOfFile(to_read, read));
85- // }
86-
87- // Ok(buffer.into())
88- // }
89- // .boxed()
90- // }
91- // }
70+ /// A wrapper for things that implement [AsyncRead] and [AsyncSeek] to also implement
71+ /// [AsyncFileReader].
72+ ///
73+ /// This wrapper is needed because `AsyncRead` and `AsyncSeek` require mutable access to seek and
74+ /// read data, while the `AsyncFileReader` trait requires immutable access to read data.
75+ ///
76+ /// This wrapper stores the inner reader in a `Mutex`.
77+ ///
78+ /// [AsyncRead]: tokio::io::AsyncRead
79+ /// [AsyncSeek]: tokio::io::AsyncSeek
80+ #[ cfg( feature = "tokio" ) ]
81+ #[ derive( Debug ) ]
82+ pub struct TokioReader < T : tokio:: io:: AsyncRead + tokio:: io:: AsyncSeek + Unpin + Send + Debug > (
83+ tokio:: sync:: Mutex < T > ,
84+ ) ;
85+
86+ #[ cfg( feature = "tokio" ) ]
87+ impl < T : tokio:: io:: AsyncRead + tokio:: io:: AsyncSeek + Unpin + Send + Debug > TokioReader < T > {
88+ /// Create a new TokioReader from a reader.
89+ pub fn new ( inner : T ) -> Self {
90+ Self ( tokio:: sync:: Mutex :: new ( inner) )
91+ }
92+ }
93+
94+ #[ cfg( feature = "tokio" ) ]
95+ impl < T : tokio:: io:: AsyncRead + tokio:: io:: AsyncSeek + Unpin + Send + Debug > AsyncFileReader
96+ for TokioReader < T >
97+ {
98+ fn get_bytes ( & self , range : Range < u64 > ) -> BoxFuture < ' _ , AsyncTiffResult < Bytes > > {
99+ use std:: io:: SeekFrom ;
100+ use tokio:: io:: { AsyncReadExt , AsyncSeekExt } ;
101+
102+ async move {
103+ let mut file = self . 0 . lock ( ) . await ;
104+
105+ file. seek ( SeekFrom :: Start ( range. start ) ) . await ?;
106+
107+ let to_read = range. end - range. start ;
108+ let mut buffer = Vec :: with_capacity ( to_read as usize ) ;
109+ let read = file. read ( & mut buffer) . await ? as u64 ;
110+ if read != to_read {
111+ return Err ( AsyncTiffError :: EndOfFile ( to_read, read) ) ;
112+ }
113+
114+ Ok ( buffer. into ( ) )
115+ }
116+ . boxed ( )
117+ }
118+ }
92119
93120/// An AsyncFileReader that reads from an [`ObjectStore`] instance.
121+ #[ cfg( feature = "object_store" ) ]
94122#[ derive( Clone , Debug ) ]
95123pub struct ObjectReader {
96- store : Arc < dyn ObjectStore > ,
124+ store : Arc < dyn object_store :: ObjectStore > ,
97125 path : object_store:: path:: Path ,
98126}
99127
128+ #[ cfg( feature = "object_store" ) ]
100129impl ObjectReader {
101130 /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path
102131 ///
103132 /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
104- pub fn new ( store : Arc < dyn ObjectStore > , path : object_store:: path:: Path ) -> Self {
133+ pub fn new ( store : Arc < dyn object_store :: ObjectStore > , path : object_store:: path:: Path ) -> Self {
105134 Self { store, path }
106135 }
107136}
108137
138+ #[ cfg( feature = "object_store" ) ]
109139impl AsyncFileReader for ObjectReader {
110140 fn get_bytes ( & self , range : Range < u64 > ) -> BoxFuture < ' _ , AsyncTiffResult < Bytes > > {
111141 let range = range. start as _ ..range. end as _ ;
@@ -134,19 +164,22 @@ impl AsyncFileReader for ObjectReader {
134164}
135165
136166/// An AsyncFileReader that reads from a URL using reqwest.
167+ #[ cfg( feature = "reqwest" ) ]
137168#[ derive( Debug , Clone ) ]
138169pub struct ReqwestReader {
139170 client : reqwest:: Client ,
140171 url : reqwest:: Url ,
141172}
142173
174+ #[ cfg( feature = "reqwest" ) ]
143175impl ReqwestReader {
144176 /// Construct a new ReqwestReader from a reqwest client and URL.
145177 pub fn new ( client : reqwest:: Client , url : reqwest:: Url ) -> Self {
146178 Self { client, url }
147179 }
148180}
149181
182+ #[ cfg( feature = "reqwest" ) ]
150183impl AsyncFileReader for ReqwestReader {
151184 fn get_bytes ( & self , range : Range < u64 > ) -> BoxFuture < ' _ , AsyncTiffResult < Bytes > > {
152185 let url = self . url . clone ( ) ;
0 commit comments