Skip to content

Commit 5c6907a

Browse files
committed
Concentrated Storage trait, need to fix a lot
1 parent fa07ec6 commit 5c6907a

File tree

2 files changed

+154
-74
lines changed

2 files changed

+154
-74
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 70 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,49 @@
1717

1818
use std::any::{Any, TypeId};
1919
use std::collections::HashMap;
20+
use std::fmt::Debug;
2021
use std::ops::Range;
2122
use std::sync::Arc;
22-
23+
use async_trait::async_trait;
2324
use bytes::Bytes;
24-
use opendal::Operator;
2525
use url::Url;
2626

27-
use super::storage::Storage;
27+
use super::storage::OpenDALStorage;
2828
use crate::{Error, ErrorKind, Result};
2929

30+
#[async_trait]
31+
pub trait Storage: Debug + Send + Sync {
32+
/// Check if a file exists at the given path
33+
async fn exists(&self, path: &str) -> Result<bool>;
34+
35+
/// Get metadata from an input path
36+
async fn metadata(&self, path: &str) -> Result<FileMetadata>;
37+
38+
/// Read bytes from a path
39+
async fn read(&self, path: &str) -> Result<Bytes>;
40+
41+
/// Get FileRead from a path
42+
async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>>;
43+
44+
/// Write bytes to an output path
45+
async fn write(&self, path: &str, bs: Bytes) -> Result<()>;
46+
47+
/// Get FileWrite from a path
48+
async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>>;
49+
50+
/// Delete a file at the given path
51+
async fn delete(&self, path: &str) -> Result<()>;
52+
53+
/// Remove a directory and all its contents recursively
54+
async fn remove_dir_all(&self, path: &str) -> Result<()>;
55+
56+
/// Create a new input file for reading
57+
fn new_input(&self, path: &str) -> Result<InputFile>;
58+
59+
/// Create a new output file for writing
60+
fn new_output(&self, path: &str) -> Result<OutputFile>;
61+
}
62+
3063
/// FileIO implementation, used to manipulate files in underlying storage.
3164
///
3265
/// # Note
@@ -48,7 +81,7 @@ use crate::{Error, ErrorKind, Result};
4881
pub struct FileIO {
4982
builder: FileIOBuilder,
5083

51-
inner: Arc<Storage>,
84+
inner: Arc<dyn Storage>,
5285
}
5386

5487
impl FileIO {
@@ -89,8 +122,9 @@ impl FileIO {
89122
///
90123
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
91124
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
92-
let (op, relative_path) = self.inner.create_operator(&path)?;
93-
Ok(op.delete(relative_path).await?)
125+
// let (op, relative_path) = self.inner.create_operator(&path)?;
126+
// Ok(op.delete(relative_path).await?)
127+
self.inner.delete(path.as_ref()).await
94128
}
95129

96130
/// Remove the path and all nested dirs and files recursively.
@@ -100,8 +134,8 @@ impl FileIO {
100134
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
101135
#[deprecated(note = "use remove_dir_all instead", since = "0.4.0")]
102136
pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
103-
let (op, relative_path) = self.inner.create_operator(&path)?;
104-
Ok(op.remove_all(relative_path).await?)
137+
// todo this should be removed as it doesn't exist in the new trait
138+
self.inner.remove_dir_all(path.as_ref()).await
105139
}
106140

107141
/// Remove the path and all nested dirs and files recursively.
@@ -116,13 +150,7 @@ impl FileIO {
116150
/// - If the path is a empty directory, this function will remove the directory itself.
117151
/// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories.
118152
pub async fn remove_dir_all(&self, path: impl AsRef<str>) -> Result<()> {
119-
let (op, relative_path) = self.inner.create_operator(&path)?;
120-
let path = if relative_path.ends_with('/') {
121-
relative_path.to_string()
122-
} else {
123-
format!("{relative_path}/")
124-
};
125-
Ok(op.remove_all(&path).await?)
153+
self.inner.remove_dir_all(path.as_ref()).await
126154
}
127155

128156
/// Check file exists.
@@ -131,8 +159,7 @@ impl FileIO {
131159
///
132160
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
133161
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
134-
let (op, relative_path) = self.inner.create_operator(&path)?;
135-
Ok(op.exists(relative_path).await?)
162+
self.inner.exists(path.as_ref()).await
136163
}
137164

138165
/// Creates input file.
@@ -141,14 +168,7 @@ impl FileIO {
141168
///
142169
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
143170
pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
144-
let (op, relative_path) = self.inner.create_operator(&path)?;
145-
let path = path.as_ref().to_string();
146-
let relative_path_pos = path.len() - relative_path.len();
147-
Ok(InputFile {
148-
op,
149-
path,
150-
relative_path_pos,
151-
})
171+
self.inner.new_input(path.as_ref())
152172
}
153173

154174
/// Creates output file.
@@ -157,14 +177,7 @@ impl FileIO {
157177
///
158178
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
159179
pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
160-
let (op, relative_path) = self.inner.create_operator(&path)?;
161-
let path = path.as_ref().to_string();
162-
let relative_path_pos = path.len() - relative_path.len();
163-
Ok(OutputFile {
164-
op,
165-
path,
166-
relative_path_pos,
167-
})
180+
self.inner.new_output(path.as_ref())
168181
}
169182
}
170183

@@ -273,7 +286,7 @@ impl FileIOBuilder {
273286

274287
/// Builds [`FileIO`].
275288
pub fn build(self) -> Result<FileIO> {
276-
let storage = Storage::build(self.clone())?;
289+
let storage = OpenDALStorage::build(self.clone())?;
277290
Ok(FileIO {
278291
builder: self,
279292
inner: Arc::new(storage),
@@ -313,11 +326,12 @@ impl FileRead for opendal::Reader {
313326
/// Input file is used for reading from files.
314327
#[derive(Debug)]
315328
pub struct InputFile {
316-
op: Operator,
329+
pub storage: Arc<dyn Storage>,
317330
// Absolution path of file.
318-
path: String,
319-
// Relative path of file to uri, starts at [`relative_path_pos`]
320-
relative_path_pos: usize,
331+
pub path: String,
332+
// todo should remove this? Should always pass down a full path
333+
// // Relative path of file to uri, starts at [`relative_path_pos`]
334+
// relative_path_pos: usize,
321335
}
322336

323337
impl InputFile {
@@ -328,34 +342,29 @@ impl InputFile {
328342

329343
/// Check if file exists.
330344
pub async fn exists(&self) -> crate::Result<bool> {
331-
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
345+
self.storage.exists(&self.path).await
332346
}
333347

334348
/// Fetch and returns metadata of file.
335349
pub async fn metadata(&self) -> crate::Result<FileMetadata> {
336-
let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
337-
338-
Ok(FileMetadata {
339-
size: meta.content_length(),
340-
})
350+
self.storage.metadata(&self.path).await
341351
}
342352

343353
/// Read and returns whole content of file.
344354
///
345355
/// For continuous reading, use [`Self::reader`] instead.
346356
pub async fn read(&self) -> crate::Result<Bytes> {
347-
Ok(self
348-
.op
349-
.read(&self.path[self.relative_path_pos..])
350-
.await?
351-
.to_bytes())
357+
self
358+
.storage
359+
.read(&self.path)
360+
.await
352361
}
353362

354363
/// Creates [`FileRead`] for continuous reading.
355364
///
356365
/// For one-time reading, use [`Self::read`] instead.
357-
pub async fn reader(&self) -> crate::Result<impl FileRead + use<>> {
358-
Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
366+
pub async fn reader(&self) -> crate::Result<Box<dyn FileRead>> {
367+
self.storage.reader(&self.path).await
359368
}
360369
}
361370

@@ -404,11 +413,12 @@ impl FileWrite for Box<dyn FileWrite> {
404413
/// Output file is used for writing to files..
405414
#[derive(Debug)]
406415
pub struct OutputFile {
407-
op: Operator,
416+
pub storage: Arc<dyn Storage>,
408417
// Absolution path of file.
409-
path: String,
410-
// Relative path of file to uri, starts at [`relative_path_pos`]
411-
relative_path_pos: usize,
418+
pub path: String,
419+
// todo should always pass down a full path
420+
// // Relative path of file to uri, starts at [`relative_path_pos`]
421+
// relative_path_pos: usize,
412422
}
413423

414424
impl OutputFile {
@@ -419,22 +429,21 @@ impl OutputFile {
419429

420430
/// Checks if file exists.
421431
pub async fn exists(&self) -> Result<bool> {
422-
Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
432+
Ok(self.storage.exists(&self.path).await?)
423433
}
424434

425435
/// Deletes file.
426436
///
427437
/// If the file does not exist, it will not return error.
428438
pub async fn delete(&self) -> Result<()> {
429-
Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?)
439+
Ok(self.storage.delete(&self.path).await?)
430440
}
431441

432442
/// Converts into [`InputFile`].
433443
pub fn to_input_file(self) -> InputFile {
434444
InputFile {
435-
op: self.op,
445+
storage: self.storage,
436446
path: self.path,
437-
relative_path_pos: self.relative_path_pos,
438447
}
439448
}
440449

@@ -445,9 +454,7 @@ impl OutputFile {
445454
/// Calling `write` will overwrite the file if it exists.
446455
/// For continuous writing, use [`Self::writer`].
447456
pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
448-
let mut writer = self.writer().await?;
449-
writer.write(bs).await?;
450-
writer.close().await
457+
self.storage.write(self.path.as_str(), bs).await
451458
}
452459

453460
/// Creates output file for continuous writing.
@@ -457,7 +464,7 @@ impl OutputFile {
457464
/// For one-time writing, use [`Self::write`] instead.
458465
pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
459466
Ok(Box::new(
460-
self.op.writer(&self.path[self.relative_path_pos..]).await?,
467+
self.storage.writer(&self.path).await?
461468
))
462469
}
463470
}

0 commit comments

Comments
 (0)