Skip to content

Commit 9a27198

Browse files
committed
pure storage, fixed compile issues
1 parent 5c6907a commit 9a27198

File tree

5 files changed

+34
-28
lines changed

5 files changed

+34
-28
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ impl ArrowReader {
327327
data_file_path: &str,
328328
file_io: FileIO,
329329
should_load_page_index: bool,
330-
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader<impl FileRead + Sized>>> {
330+
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
331331
// Get the metadata for the Parquet file we need to read and build
332332
// a reader for the data within
333333
let parquet_file = file_io.new_input(data_file_path)?;
@@ -1312,18 +1312,18 @@ impl BoundPredicateVisitor for PredicateConverter<'_> {
13121312
}
13131313

13141314
/// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader.
1315-
pub struct ArrowFileReader<R: FileRead> {
1315+
pub struct ArrowFileReader {
13161316
meta: FileMetadata,
13171317
preload_column_index: bool,
13181318
preload_offset_index: bool,
13191319
preload_page_index: bool,
13201320
metadata_size_hint: Option<usize>,
1321-
r: R,
1321+
r: Box<dyn FileRead>,
13221322
}
13231323

1324-
impl<R: FileRead> ArrowFileReader<R> {
1324+
impl ArrowFileReader {
13251325
/// Create a new ArrowFileReader
1326-
pub fn new(meta: FileMetadata, r: R) -> Self {
1326+
pub fn new(meta: FileMetadata, r: Box<dyn FileRead>) -> Self {
13271327
Self {
13281328
meta,
13291329
preload_column_index: false,
@@ -1362,7 +1362,7 @@ impl<R: FileRead> ArrowFileReader<R> {
13621362
}
13631363
}
13641364

1365-
impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
1365+
impl AsyncFileReader for ArrowFileReader {
13661366
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
13671367
Box::pin(
13681368
self.r

crates/iceberg/src/io/file_io.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use url::Url;
2727
use super::storage::OpenDALStorage;
2828
use crate::{Error, ErrorKind, Result};
2929

30+
/// todo doc
3031
#[async_trait]
3132
pub trait Storage: Debug + Send + Sync {
3233
/// Check if a file exists at the given path
@@ -326,8 +327,10 @@ impl FileRead for opendal::Reader {
326327
/// Input file is used for reading from files.
327328
#[derive(Debug)]
328329
pub struct InputFile {
330+
/// todo doc
329331
pub storage: Arc<dyn Storage>,
330332
// Absolution path of file.
333+
/// todo doc
331334
pub path: String,
332335
// todo should remove this? Should always pass down a full path
333336
// // Relative path of file to uri, starts at [`relative_path_pos`]
@@ -353,7 +356,7 @@ impl InputFile {
353356
/// Read and returns whole content of file.
354357
///
355358
/// For continuous reading, use [`Self::reader`] instead.
356-
pub async fn read(&self) -> crate::Result<Bytes> {
359+
pub async fn read(&self) -> Result<Bytes> {
357360
self
358361
.storage
359362
.read(&self.path)
@@ -363,7 +366,7 @@ impl InputFile {
363366
/// Creates [`FileRead`] for continuous reading.
364367
///
365368
/// For one-time reading, use [`Self::read`] instead.
366-
pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
369+
pub async fn reader(&self) -> Result<Box<dyn FileRead>> {
367370
self.storage.reader(&self.path).await
368371
}
369372
}
@@ -374,17 +377,17 @@ impl InputFile {
374377
///
375378
/// It's possible for us to remove the async_trait, but we need to figure
376379
/// out how to handle the object safety.
377-
#[async_trait::async_trait]
378-
pub trait FileWrite: Send + Unpin + 'static {
380+
#[async_trait]
381+
pub trait FileWrite: Send + Sync + Unpin + 'static {
379382
/// Write bytes to file.
380383
///
381384
/// TODO: we can support writing non-contiguous bytes in the future.
382-
async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
385+
async fn write(&mut self, bs: Bytes) -> Result<()>;
383386

384387
/// Close file.
385388
///
386389
/// Calling close on closed file will generate an error.
387-
async fn close(&mut self) -> crate::Result<()>;
390+
async fn close(&mut self) -> Result<()>;
388391
}
389392

390393
#[async_trait::async_trait]
@@ -413,8 +416,10 @@ impl FileWrite for Box<dyn FileWrite> {
413416
/// Output file is used for writing to files..
414417
#[derive(Debug)]
415418
pub struct OutputFile {
419+
/// todo fix pub qualifier
416420
pub storage: Arc<dyn Storage>,
417421
// Absolution path of file.
422+
/// todo fix pub qualifier
418423
pub path: String,
419424
// todo should always pass down a full path
420425
// // Relative path of file to uri, starts at [`relative_path_pos`]

crates/iceberg/src/io/storage.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::Result;
1819
use std::sync::Arc;
20+
use async_trait::async_trait;
1921
use bytes::Bytes;
2022
use opendal::layers::RetryLayer;
2123
#[cfg(feature = "storage-azdls")]
@@ -67,14 +69,14 @@ pub(crate) enum OpenDALStorage {
6769
},
6870
}
6971

70-
#[async_trait::async_trait]
72+
#[async_trait]
7173
impl Storage for OpenDALStorage {
72-
async fn exists(&self, path: &str) -> crate::Result<bool> {
74+
async fn exists(&self, path: &str) -> Result<bool> {
7375
let (op, relative_path) = self.create_operator(&path)?;
7476
Ok(op.exists(relative_path).await?)
7577
}
7678

77-
async fn metadata(&self, path: &str) -> crate::Result<FileMetadata> {
79+
async fn metadata(&self, path: &str) -> Result<FileMetadata> {
7880
let (op, relative_path) = self.create_operator(&path)?;
7981
let meta = op.stat(relative_path).await?;
8082

@@ -83,35 +85,35 @@ impl Storage for OpenDALStorage {
8385
})
8486
}
8587

86-
async fn read(&self, path: &str) -> crate::Result<Bytes> {
88+
async fn read(&self, path: &str) -> Result<Bytes> {
8789
let (op, relative_path) = self.create_operator(&path)?;
8890
Ok(op.read(relative_path).await?.to_bytes())
8991
}
9092

91-
async fn reader(&self, path: &str) -> crate::Result<Box<dyn FileRead>> {
93+
async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
9294
let (op, relative_path) = self.create_operator(&path)?;
9395
Ok(Box::new(op.reader(relative_path).await?))
9496
}
9597

96-
async fn write(&self, path: &str, bs: Bytes) -> crate::Result<()> {
98+
async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
9799
let mut writer = self.writer(path).await?;
98100
writer.write(bs).await?;
99101
writer.close().await
100102
}
101103

102-
async fn writer(&self, path: &str) -> crate::Result<Box<dyn FileWrite>> {
104+
async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
103105
let (op, relative_path) = self.create_operator(&path)?;
104106
Ok(Box::new(
105107
op.writer(relative_path).await?,
106108
))
107109
}
108110

109-
async fn delete(&self, path: &str) -> crate::Result<()> {
111+
async fn delete(&self, path: &str) -> Result<()> {
110112
let (op, relative_path) = self.create_operator(&path)?;
111113
Ok(op.delete(relative_path).await?)
112114
}
113115

114-
async fn remove_dir_all(&self, path: &str) -> crate::Result<()> {
116+
async fn remove_dir_all(&self, path: &str) -> Result<()> {
115117
let (op, relative_path) = self.create_operator(&path)?;
116118
let path = if relative_path.ends_with('/') {
117119
relative_path.to_string()
@@ -121,7 +123,7 @@ impl Storage for OpenDALStorage {
121123
Ok(op.remove_all(&path).await?)
122124
}
123125

124-
fn new_input(&self, path: &str) -> crate::Result<InputFile> {
126+
fn new_input(&self, path: &str) -> Result<InputFile> {
125127
let storage = Arc::new(self.clone());
126128
let path = path.to_string();
127129
Ok(InputFile {
@@ -130,7 +132,7 @@ impl Storage for OpenDALStorage {
130132
})
131133
}
132134

133-
fn new_output(&self, path: &str) -> crate::Result<OutputFile> {
135+
fn new_output(&self, path: &str) -> Result<OutputFile> {
134136
let storage = Arc::new(self.clone());
135137
let path = path.to_string();
136138
Ok(OutputFile {
@@ -142,7 +144,7 @@ impl Storage for OpenDALStorage {
142144

143145
impl OpenDALStorage {
144146
/// Convert iceberg config to opendal config.
145-
pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
147+
pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
146148
let (scheme_str, props, extensions) = file_io_builder.into_parts();
147149
let scheme = Self::parse_scheme(&scheme_str)?;
148150

crates/iceberg/src/puffin/metadata.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use std::collections::{HashMap, HashSet};
19-
2019
use bytes::Bytes;
2120
use serde::{Deserialize, Serialize};
2221

@@ -288,9 +287,9 @@ impl FileMetadata {
288287

289288
let input_file_length = input_file.metadata().await?.size;
290289
let footer_payload_length =
291-
FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?;
290+
FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?;
292291
let footer_bytes =
293-
FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length)
292+
FileMetadata::read_footer_bytes(file_read.as_ref(), input_file_length, footer_payload_length)
294293
.await?;
295294

296295
let magic_length = FileMetadata::MAGIC_LENGTH as usize;

crates/iceberg/src/puffin/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use tokio::sync::OnceCell;
1919

2020
use crate::Result;
21-
use crate::io::{FileRead, InputFile};
21+
use crate::io::InputFile;
2222
use crate::puffin::blob::Blob;
2323
use crate::puffin::metadata::{BlobMetadata, FileMetadata};
2424

0 commit comments

Comments
 (0)