From 5c6907a84a95a563fb057f85376b32360d9fe65a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 12 Sep 2025 15:28:17 -0700 Subject: [PATCH 01/17] Concentrated Storage trait, need to fix a lot --- crates/iceberg/src/io/file_io.rs | 133 ++++++++++++++++--------------- crates/iceberg/src/io/storage.rs | 95 +++++++++++++++++++--- 2 files changed, 154 insertions(+), 74 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 6e2d152ed7..d2020ad062 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -17,16 +17,49 @@ use std::any::{Any, TypeId}; use std::collections::HashMap; +use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; - +use async_trait::async_trait; use bytes::Bytes; -use opendal::Operator; use url::Url; -use super::storage::Storage; +use super::storage::OpenDALStorage; use crate::{Error, ErrorKind, Result}; +#[async_trait] +pub trait Storage: Debug + Send + Sync { + /// Check if a file exists at the given path + async fn exists(&self, path: &str) -> Result; + + /// Get metadata from an input path + async fn metadata(&self, path: &str) -> Result; + + /// Read bytes from a path + async fn read(&self, path: &str) -> Result; + + /// Get FileRead from a path + async fn reader(&self, path: &str) -> Result>; + + /// Write bytes to an output path + async fn write(&self, path: &str, bs: Bytes) -> Result<()>; + + /// Get FileWrite from a path + async fn writer(&self, path: &str) -> Result>; + + /// Delete a file at the given path + async fn delete(&self, path: &str) -> Result<()>; + + /// Remove a directory and all its contents recursively + async fn remove_dir_all(&self, path: &str) -> Result<()>; + + /// Create a new input file for reading + fn new_input(&self, path: &str) -> Result; + + /// Create a new output file for writing + fn new_output(&self, path: &str) -> Result; +} + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -48,7 +81,7 @@ use crate::{Error, ErrorKind, Result}; pub struct FileIO { builder: FileIOBuilder, - inner: Arc, + inner: Arc, } impl FileIO { @@ -89,8 +122,9 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; - Ok(op.delete(relative_path).await?) + // let (op, relative_path) = self.inner.create_operator(&path)?; + // Ok(op.delete(relative_path).await?) + self.inner.delete(path.as_ref()).await } /// Remove the path and all nested dirs and files recursively. @@ -100,8 +134,8 @@ impl FileIO { /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. #[deprecated(note = "use remove_dir_all instead", since = "0.4.0")] pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; - Ok(op.remove_all(relative_path).await?) + // todo this should be removed as it doesn't exist in the new trait + self.inner.remove_dir_all(path.as_ref()).await } /// Remove the path and all nested dirs and files recursively. @@ -116,13 +150,7 @@ impl FileIO { /// - If the path is a empty directory, this function will remove the directory itself. /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories. pub async fn remove_dir_all(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; - let path = if relative_path.ends_with('/') { - relative_path.to_string() - } else { - format!("{relative_path}/") - }; - Ok(op.remove_all(&path).await?) + self.inner.remove_dir_all(path.as_ref()).await } /// Check file exists. @@ -131,8 +159,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn exists(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; - Ok(op.exists(relative_path).await?) + self.inner.exists(path.as_ref()).await } /// Creates input file. @@ -141,14 +168,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_input(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; - let path = path.as_ref().to_string(); - let relative_path_pos = path.len() - relative_path.len(); - Ok(InputFile { - op, - path, - relative_path_pos, - }) + self.inner.new_input(path.as_ref()) } /// Creates output file. @@ -157,14 +177,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_output(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; - let path = path.as_ref().to_string(); - let relative_path_pos = path.len() - relative_path.len(); - Ok(OutputFile { - op, - path, - relative_path_pos, - }) + self.inner.new_output(path.as_ref()) } } @@ -273,7 +286,7 @@ impl FileIOBuilder { /// Builds [`FileIO`]. pub fn build(self) -> Result { - let storage = Storage::build(self.clone())?; + let storage = OpenDALStorage::build(self.clone())?; Ok(FileIO { builder: self, inner: Arc::new(storage), @@ -313,11 +326,12 @@ impl FileRead for opendal::Reader { /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { - op: Operator, + pub storage: Arc, // Absolution path of file. - path: String, - // Relative path of file to uri, starts at [`relative_path_pos`] - relative_path_pos: usize, + pub path: String, + // todo should remove this? Should always pass down a full path + // // Relative path of file to uri, starts at [`relative_path_pos`] + // relative_path_pos: usize, } impl InputFile { @@ -328,34 +342,29 @@ impl InputFile { /// Check if file exists. pub async fn exists(&self) -> crate::Result { - Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) + self.storage.exists(&self.path).await } /// Fetch and returns metadata of file. pub async fn metadata(&self) -> crate::Result { - let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?; - - Ok(FileMetadata { - size: meta.content_length(), - }) + self.storage.metadata(&self.path).await } /// Read and returns whole content of file. /// /// For continuous reading, use [`Self::reader`] instead. pub async fn read(&self) -> crate::Result { - Ok(self - .op - .read(&self.path[self.relative_path_pos..]) - .await? - .to_bytes()) + self + .storage + .read(&self.path) + .await } /// Creates [`FileRead`] for continuous reading. /// /// For one-time reading, use [`Self::read`] instead. - pub async fn reader(&self) -> crate::Result> { - Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) + pub async fn reader(&self) -> crate::Result> { + self.storage.reader(&self.path).await } } @@ -404,11 +413,12 @@ impl FileWrite for Box { /// Output file is used for writing to files.. #[derive(Debug)] pub struct OutputFile { - op: Operator, + pub storage: Arc, // Absolution path of file. - path: String, - // Relative path of file to uri, starts at [`relative_path_pos`] - relative_path_pos: usize, + pub path: String, + // todo should always pass down a full path + // // Relative path of file to uri, starts at [`relative_path_pos`] + // relative_path_pos: usize, } impl OutputFile { @@ -419,22 +429,21 @@ impl OutputFile { /// Checks if file exists. pub async fn exists(&self) -> Result { - Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) + Ok(self.storage.exists(&self.path).await?) } /// Deletes file. /// /// If the file does not exist, it will not return error. pub async fn delete(&self) -> Result<()> { - Ok(self.op.delete(&self.path[self.relative_path_pos..]).await?) + Ok(self.storage.delete(&self.path).await?) } /// Converts into [`InputFile`]. pub fn to_input_file(self) -> InputFile { InputFile { - op: self.op, + storage: self.storage, path: self.path, - relative_path_pos: self.relative_path_pos, } } @@ -445,9 +454,7 @@ impl OutputFile { /// Calling `write` will overwrite the file if it exists. /// For continuous writing, use [`Self::writer`]. pub async fn write(&self, bs: Bytes) -> crate::Result<()> { - let mut writer = self.writer().await?; - writer.write(bs).await?; - writer.close().await + self.storage.write(self.path.as_str(), bs).await } /// Creates output file for continuous writing. @@ -457,7 +464,7 @@ impl OutputFile { /// For one-time writing, use [`Self::write`] instead. pub async fn writer(&self) -> crate::Result> { Ok(Box::new( - self.op.writer(&self.path[self.relative_path_pos..]).await?, + self.storage.writer(&self.path).await? )) } } diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 3de4f10dbf..e0fae92abe 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -16,7 +16,7 @@ // under the License. use std::sync::Arc; - +use bytes::Bytes; use opendal::layers::RetryLayer; #[cfg(feature = "storage-azdls")] use opendal::services::AzdlsConfig; @@ -30,14 +30,14 @@ use opendal::{Operator, Scheme}; #[cfg(feature = "storage-azdls")] use super::AzureStorageScheme; -use super::FileIOBuilder; +use super::{FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage}; #[cfg(feature = "storage-s3")] use crate::io::CustomAwsCredentialLoader; use crate::{Error, ErrorKind}; /// The storage carries all supported storage services in iceberg -#[derive(Debug)] -pub(crate) enum Storage { +#[derive(Debug, Clone)] +pub(crate) enum OpenDALStorage { #[cfg(feature = "storage-memory")] Memory(Operator), #[cfg(feature = "storage-fs")] @@ -67,7 +67,80 @@ pub(crate) enum Storage { }, } -impl Storage { +#[async_trait::async_trait] +impl Storage for OpenDALStorage { + async fn exists(&self, path: &str) -> crate::Result { + let (op, relative_path) = self.create_operator(&path)?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> crate::Result { + let (op, relative_path) = self.create_operator(&path)?; + let meta = op.stat(relative_path).await?; + + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> crate::Result { + let (op, relative_path) = self.create_operator(&path)?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> crate::Result> { + let (op, relative_path) = self.create_operator(&path)?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> crate::Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> crate::Result> { + let (op, relative_path) = self.create_operator(&path)?; + Ok(Box::new( + op.writer(relative_path).await?, + )) + } + + async fn delete(&self, path: &str) -> crate::Result<()> { + let (op, relative_path) = self.create_operator(&path)?; + Ok(op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> crate::Result<()> { + let (op, relative_path) = self.create_operator(&path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> crate::Result { + let storage = Arc::new(self.clone()); + let path = path.to_string(); + Ok(InputFile { + storage, + path, + }) + } + + fn new_output(&self, path: &str) -> crate::Result { + let storage = Arc::new(self.clone()); + let path = path.to_string(); + Ok(OutputFile { + storage, + path, + }) + } +} + +impl OpenDALStorage { /// Convert iceberg config to opendal config. pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result { let (scheme_str, props, extensions) = file_io_builder.into_parts(); @@ -129,7 +202,7 @@ impl Storage { let path = path.as_ref(); let (operator, relative_path): (Operator, &str) = match self { #[cfg(feature = "storage-memory")] - Storage::Memory(op) => { + OpenDALStorage::Memory(op) => { if let Some(stripped) = path.strip_prefix("memory:/") { Ok::<_, crate::Error>((op.clone(), stripped)) } else { @@ -137,7 +210,7 @@ impl Storage { } } #[cfg(feature = "storage-fs")] - Storage::LocalFs => { + OpenDALStorage::LocalFs => { let op = super::fs_config_build()?; if let Some(stripped) = path.strip_prefix("file:/") { @@ -147,7 +220,7 @@ impl Storage { } } #[cfg(feature = "storage-s3")] - Storage::S3 { + OpenDALStorage::S3 { configured_scheme, config, customized_credential_load, @@ -167,7 +240,7 @@ impl Storage { } } #[cfg(feature = "storage-gcs")] - Storage::Gcs { config } => { + OpenDALStorage::Gcs { config } => { let operator = super::gcs_config_build(config, path)?; let prefix = format!("gs://{}/", operator.info().name()); if path.starts_with(&prefix) { @@ -180,7 +253,7 @@ impl Storage { } } #[cfg(feature = "storage-oss")] - Storage::Oss { config } => { + OpenDALStorage::Oss { config } => { let op = super::oss_config_build(config, path)?; // Check prefix of oss path. @@ -195,7 +268,7 @@ impl Storage { } } #[cfg(feature = "storage-azdls")] - Storage::Azdls { + OpenDALStorage::Azdls { configured_scheme, config, } => super::azdls_create_operator(path, config, configured_scheme), From 9a27198ce8f22efaf10433718a28c0918972d32e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 17 Sep 2025 16:26:56 -0700 Subject: [PATCH 02/17] pure storage, fixed compile issues --- crates/iceberg/src/arrow/reader.rs | 12 ++++++------ crates/iceberg/src/io/file_io.rs | 17 +++++++++++------ crates/iceberg/src/io/storage.rs | 26 ++++++++++++++------------ crates/iceberg/src/puffin/metadata.rs | 5 ++--- crates/iceberg/src/puffin/reader.rs | 2 +- 5 files changed, 34 insertions(+), 28 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ff4cff0a64..077961d493 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -327,7 +327,7 @@ impl ArrowReader { data_file_path: &str, file_io: FileIO, should_load_page_index: bool, - ) -> Result>> { + ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; @@ -1312,18 +1312,18 @@ impl BoundPredicateVisitor for PredicateConverter<'_> { } /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. -pub struct ArrowFileReader { +pub struct ArrowFileReader { meta: FileMetadata, preload_column_index: bool, preload_offset_index: bool, preload_page_index: bool, metadata_size_hint: Option, - r: R, + r: Box, } -impl ArrowFileReader { +impl ArrowFileReader { /// Create a new ArrowFileReader - pub fn new(meta: FileMetadata, r: R) -> Self { + pub fn new(meta: FileMetadata, r: Box) -> Self { Self { meta, preload_column_index: false, @@ -1362,7 +1362,7 @@ impl ArrowFileReader { } } -impl AsyncFileReader for ArrowFileReader { +impl AsyncFileReader for ArrowFileReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { Box::pin( self.r diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index d2020ad062..d45a52462d 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -27,6 +27,7 @@ use url::Url; use super::storage::OpenDALStorage; use crate::{Error, ErrorKind, Result}; +/// todo doc #[async_trait] pub trait Storage: Debug + Send + Sync { /// Check if a file exists at the given path @@ -326,8 +327,10 @@ impl FileRead for opendal::Reader { /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { + /// todo doc pub storage: Arc, // Absolution path of file. + /// todo doc pub path: String, // todo should remove this? Should always pass down a full path // // Relative path of file to uri, starts at [`relative_path_pos`] @@ -353,7 +356,7 @@ impl InputFile { /// Read and returns whole content of file. /// /// For continuous reading, use [`Self::reader`] instead. - pub async fn read(&self) -> crate::Result { + pub async fn read(&self) -> Result { self .storage .read(&self.path) @@ -363,7 +366,7 @@ impl InputFile { /// Creates [`FileRead`] for continuous reading. /// /// For one-time reading, use [`Self::read`] instead. - pub async fn reader(&self) -> crate::Result> { + pub async fn reader(&self) -> Result> { self.storage.reader(&self.path).await } } @@ -374,17 +377,17 @@ impl InputFile { /// /// It's possible for us to remove the async_trait, but we need to figure /// out how to handle the object safety. -#[async_trait::async_trait] -pub trait FileWrite: Send + Unpin + 'static { +#[async_trait] +pub trait FileWrite: Send + Sync + Unpin + 'static { /// Write bytes to file. /// /// TODO: we can support writing non-contiguous bytes in the future. - async fn write(&mut self, bs: Bytes) -> crate::Result<()>; + async fn write(&mut self, bs: Bytes) -> Result<()>; /// Close file. /// /// Calling close on closed file will generate an error. - async fn close(&mut self) -> crate::Result<()>; + async fn close(&mut self) -> Result<()>; } #[async_trait::async_trait] @@ -413,8 +416,10 @@ impl FileWrite for Box { /// Output file is used for writing to files.. #[derive(Debug)] pub struct OutputFile { + /// todo fix pub qualifier pub storage: Arc, // Absolution path of file. + /// todo fix pub qualifier pub path: String, // todo should always pass down a full path // // Relative path of file to uri, starts at [`relative_path_pos`] diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index e0fae92abe..0a8274de21 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use std::sync::Arc; +use async_trait::async_trait; use bytes::Bytes; use opendal::layers::RetryLayer; #[cfg(feature = "storage-azdls")] @@ -67,14 +69,14 @@ pub(crate) enum OpenDALStorage { }, } -#[async_trait::async_trait] +#[async_trait] impl Storage for OpenDALStorage { - async fn exists(&self, path: &str) -> crate::Result { + async fn exists(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; Ok(op.exists(relative_path).await?) } - async fn metadata(&self, path: &str) -> crate::Result { + async fn metadata(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; let meta = op.stat(relative_path).await?; @@ -83,35 +85,35 @@ impl Storage for OpenDALStorage { }) } - async fn read(&self, path: &str) -> crate::Result { + async fn read(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; Ok(op.read(relative_path).await?.to_bytes()) } - async fn reader(&self, path: &str) -> crate::Result> { + async fn reader(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; Ok(Box::new(op.reader(relative_path).await?)) } - async fn write(&self, path: &str, bs: Bytes) -> crate::Result<()> { + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { let mut writer = self.writer(path).await?; writer.write(bs).await?; writer.close().await } - async fn writer(&self, path: &str) -> crate::Result> { + async fn writer(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; Ok(Box::new( op.writer(relative_path).await?, )) } - async fn delete(&self, path: &str) -> crate::Result<()> { + async fn delete(&self, path: &str) -> Result<()> { let (op, relative_path) = self.create_operator(&path)?; Ok(op.delete(relative_path).await?) } - async fn remove_dir_all(&self, path: &str) -> crate::Result<()> { + async fn remove_dir_all(&self, path: &str) -> Result<()> { let (op, relative_path) = self.create_operator(&path)?; let path = if relative_path.ends_with('/') { relative_path.to_string() @@ -121,7 +123,7 @@ impl Storage for OpenDALStorage { Ok(op.remove_all(&path).await?) } - fn new_input(&self, path: &str) -> crate::Result { + fn new_input(&self, path: &str) -> Result { let storage = Arc::new(self.clone()); let path = path.to_string(); Ok(InputFile { @@ -130,7 +132,7 @@ impl Storage for OpenDALStorage { }) } - fn new_output(&self, path: &str) -> crate::Result { + fn new_output(&self, path: &str) -> Result { let storage = Arc::new(self.clone()); let path = path.to_string(); Ok(OutputFile { @@ -142,7 +144,7 @@ impl Storage for OpenDALStorage { impl OpenDALStorage { /// Convert iceberg config to opendal config. - pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result { + pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result { let (scheme_str, props, extensions) = file_io_builder.into_parts(); let scheme = Self::parse_scheme(&scheme_str)?; diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 6039c7f820..e2fea212b7 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{HashMap, HashSet}; - use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -288,9 +287,9 @@ impl FileMetadata { let input_file_length = input_file.metadata().await?.size; let footer_payload_length = - FileMetadata::read_footer_payload_length(&file_read, input_file_length).await?; + FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?; let footer_bytes = - FileMetadata::read_footer_bytes(&file_read, input_file_length, footer_payload_length) + FileMetadata::read_footer_bytes(file_read.as_ref(), input_file_length, footer_payload_length) .await?; let magic_length = FileMetadata::MAGIC_LENGTH as usize; diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index dce53d93f0..8941d4a8fc 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -18,7 +18,7 @@ use tokio::sync::OnceCell; use crate::Result; -use crate::io::{FileRead, InputFile}; +use crate::io::InputFile; use crate::puffin::blob::Blob; use crate::puffin::metadata::{BlobMetadata, FileMetadata}; From 535de901901b65c5c553261a46942e5329a95319 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 17 Sep 2025 16:56:08 -0700 Subject: [PATCH 03/17] fmt is good --- crates/iceberg/src/io/file_io.rs | 24 ++++++++++-------------- crates/iceberg/src/io/storage.rs | 20 ++++++-------------- crates/iceberg/src/puffin/metadata.rs | 10 +++++++--- 3 files changed, 23 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index d45a52462d..e362e67ea7 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use url::Url; @@ -32,16 +33,16 @@ use crate::{Error, ErrorKind, Result}; pub trait Storage: Debug + Send + Sync { /// Check if a file exists at the given path async fn exists(&self, path: &str) -> Result; - + /// Get metadata from an input path async fn metadata(&self, path: &str) -> Result; - + /// Read bytes from a path async fn read(&self, path: &str) -> Result; - + /// Get FileRead from a path async fn reader(&self, path: &str) -> Result>; - + /// Write bytes to an output path async fn write(&self, path: &str, bs: Bytes) -> Result<()>; @@ -319,7 +320,7 @@ pub trait FileRead: Send + Sync + Unpin + 'static { #[async_trait::async_trait] impl FileRead for opendal::Reader { - async fn read(&self, range: Range) -> crate::Result { + async fn read(&self, range: Range) -> Result { Ok(opendal::Reader::read(self, range).await?.to_bytes()) } } @@ -344,12 +345,12 @@ impl InputFile { } /// Check if file exists. - pub async fn exists(&self) -> crate::Result { + pub async fn exists(&self) -> Result { self.storage.exists(&self.path).await } /// Fetch and returns metadata of file. - pub async fn metadata(&self) -> crate::Result { + pub async fn metadata(&self) -> Result { self.storage.metadata(&self.path).await } @@ -357,10 +358,7 @@ impl InputFile { /// /// For continuous reading, use [`Self::reader`] instead. pub async fn read(&self) -> Result { - self - .storage - .read(&self.path) - .await + self.storage.read(&self.path).await } /// Creates [`FileRead`] for continuous reading. @@ -468,9 +466,7 @@ impl OutputFile { /// /// For one-time writing, use [`Self::write`] instead. pub async fn writer(&self) -> crate::Result> { - Ok(Box::new( - self.storage.writer(&self.path).await? - )) + Ok(Box::new(self.storage.writer(&self.path).await?)) } } diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 0a8274de21..62c6b8a31e 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use crate::Result; use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use opendal::layers::RetryLayer; @@ -35,7 +35,7 @@ use super::AzureStorageScheme; use super::{FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage}; #[cfg(feature = "storage-s3")] use crate::io::CustomAwsCredentialLoader; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, Result}; /// The storage carries all supported storage services in iceberg #[derive(Debug, Clone)] @@ -79,7 +79,7 @@ impl Storage for OpenDALStorage { async fn metadata(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; let meta = op.stat(relative_path).await?; - + Ok(FileMetadata { size: meta.content_length(), }) @@ -103,9 +103,7 @@ impl Storage for OpenDALStorage { async fn writer(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new( - op.writer(relative_path).await?, - )) + Ok(Box::new(op.writer(relative_path).await?)) } async fn delete(&self, path: &str) -> Result<()> { @@ -126,19 +124,13 @@ impl Storage for OpenDALStorage { fn new_input(&self, path: &str) -> Result { let storage = Arc::new(self.clone()); let path = path.to_string(); - Ok(InputFile { - storage, - path, - }) + Ok(InputFile { storage, path }) } fn new_output(&self, path: &str) -> Result { let storage = Arc::new(self.clone()); let path = path.to_string(); - Ok(OutputFile { - storage, - path, - }) + Ok(OutputFile { storage, path }) } } diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index e2fea212b7..2686db06d6 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::{HashMap, HashSet}; + use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -288,9 +289,12 @@ impl FileMetadata { let input_file_length = input_file.metadata().await?.size; let footer_payload_length = FileMetadata::read_footer_payload_length(file_read.as_ref(), input_file_length).await?; - let footer_bytes = - FileMetadata::read_footer_bytes(file_read.as_ref(), input_file_length, footer_payload_length) - .await?; + let footer_bytes = FileMetadata::read_footer_bytes( + file_read.as_ref(), + input_file_length, + footer_payload_length, + ) + .await?; let magic_length = FileMetadata::MAGIC_LENGTH as usize; // check first four bytes of footer From 901db63d41a28acc5bcc2b32027834ebfde69630 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 16 Oct 2025 19:07:07 -0700 Subject: [PATCH 04/17] builder, registry, split by schemes --- crates/iceberg/src/io/file_io.rs | 26 ++- crates/iceberg/src/io/mod.rs | 2 + crates/iceberg/src/io/storage_azdls.rs | 124 ++++++++++++- crates/iceberg/src/io/storage_builder.rs | 210 +++++++++++++++++++++++ crates/iceberg/src/io/storage_fs.rs | 113 +++++++++++- crates/iceberg/src/io/storage_gcs.rs | 117 ++++++++++++- crates/iceberg/src/io/storage_memory.rs | 109 +++++++++++- crates/iceberg/src/io/storage_oss.rs | 114 ++++++++++++ crates/iceberg/src/io/storage_s3.rs | 147 +++++++++++++++- 9 files changed, 951 insertions(+), 11 deletions(-) create mode 100644 crates/iceberg/src/io/storage_builder.rs diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index e362e67ea7..4ce81d8f04 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -25,10 +25,9 @@ use async_trait::async_trait; use bytes::Bytes; use url::Url; -use super::storage::OpenDALStorage; use crate::{Error, ErrorKind, Result}; -/// todo doc +/// Trait for storage operations in Iceberg #[async_trait] pub trait Storage: Debug + Send + Sync { /// Check if a file exists at the given path @@ -62,6 +61,16 @@ pub trait Storage: Debug + Send + Sync { fn new_output(&self, path: &str) -> Result; } +/// Common interface for all storage builders. +pub trait StorageBuilder: Debug + Send + Sync { + /// Create a new storage instance with the given properties and extensions. + fn build( + &self, + props: HashMap, + extensions: Extensions, + ) -> Result>; +} + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -288,10 +297,19 @@ impl FileIOBuilder { /// Builds [`FileIO`]. pub fn build(self) -> Result { - let storage = OpenDALStorage::build(self.clone())?; + // Use the scheme to determine the storage type + let scheme = self.scheme_str.clone().unwrap_or_default(); + + // Create registry and get builder + let registry = crate::io::StorageBuilderRegistry::new(); + let builder = registry.get_builder(scheme.as_str())?; + + // Build storage with props and extensions + let storage = builder.build(self.props.clone(), self.extensions.clone())?; + Ok(FileIO { builder: self, - inner: Arc::new(storage), + inner: storage, }) } } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5eb5964345..5f234961fc 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -68,8 +68,10 @@ mod file_io; mod storage; +mod storage_builder; pub use file_io::*; +pub use storage_builder::StorageBuilderRegistry; pub(crate) mod object_cache; #[cfg(feature = "storage-azdls")] diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index fe12167f6f..2afccd5a01 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -18,11 +18,18 @@ use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; +use std::sync::Arc; -use opendal::Configurator; +use async_trait::async_trait; +use bytes::Bytes; use opendal::services::AzdlsConfig; +use opendal::{Configurator, Operator}; use url::Url; +use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, + StorageBuilder, +}; use crate::{Error, ErrorKind, Result, ensure_data_valid}; /// A connection string. @@ -125,7 +132,7 @@ pub(crate) fn azdls_create_operator<'a>( /// paths are expected to contain the `dfs` storage service. /// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are /// expected to contain the `blob` storage service. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub(crate) enum AzureStorageScheme { Abfs, Abfss, @@ -597,3 +604,116 @@ mod tests { } } } + +/// Azure Data Lake Storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDALAzdlsStorage { + /// Because Azdls accepts multiple possible schemes, we store the full + /// passed scheme here to later validate schemes passed via paths. + configured_scheme: AzureStorageScheme, + config: Arc, +} + +impl OpenDALAzdlsStorage { + /// Creates operator from path. + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + let (op, relative_path) = azdls_create_operator(path, &self.config, &self.configured_scheme)?; + let op = op.layer(opendal::layers::RetryLayer::new()); + Ok((op, relative_path)) + } +} + +#[async_trait] +impl Storage for OpenDALAzdlsStorage { + async fn exists(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + let meta = op.stat(relative_path).await?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } +} + +/// Builder for Azure Data Lake Storage +#[derive(Debug)] +pub struct OpenDALAzdlsStorageBuilder; + +impl StorageBuilder for OpenDALAzdlsStorageBuilder { + fn build( + &self, + props: HashMap, + _extensions: Extensions, + ) -> Result> { + // Get the scheme string from the props or use default + let scheme_str = props + .get("scheme_str") + .cloned() + .unwrap_or_else(|| "abfs".to_string()); + + // Parse the scheme + let scheme = scheme_str.parse::()?; + + // Parse Azdls config from props + let config = azdls_config_parse(props)?; + + Ok(Arc::new(OpenDALAzdlsStorage { + configured_scheme: scheme, + config: Arc::new(config), + })) + } +} diff --git a/crates/iceberg/src/io/storage_builder.rs b/crates/iceberg/src/io/storage_builder.rs new file mode 100644 index 0000000000..aacd68bb00 --- /dev/null +++ b/crates/iceberg/src/io/storage_builder.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use crate::io::StorageBuilder; +#[cfg(feature = "storage-azdls")] +use crate::io::storage_azdls::OpenDALAzdlsStorageBuilder; +#[cfg(feature = "storage-fs")] +use crate::io::storage_fs::OpenDALFsStorageBuilder; +#[cfg(feature = "storage-gcs")] +use crate::io::storage_gcs::OpenDALGcsStorageBuilder; +#[cfg(feature = "storage-memory")] +use crate::io::storage_memory::OpenDALMemoryStorageBuilder; +#[cfg(feature = "storage-oss")] +use crate::io::storage_oss::OpenDALOssStorageBuilder; +#[cfg(feature = "storage-s3")] +use crate::io::storage_s3::OpenDALS3StorageBuilder; +use crate::{Error, ErrorKind, Result}; + +/// A registry of storage builders. +/// +/// The registry allows you to register custom storage builders for different URI schemes. +/// By default, it includes builders for all enabled storage features. +/// +/// # Example +/// +/// ```rust +/// use iceberg::io::StorageBuilderRegistry; +/// +/// // Create a new registry with default builders +/// let registry = StorageBuilderRegistry::new(); +/// +/// // Get supported storage types +/// let types = registry.supported_types(); +/// println!("Supported storage types: {:?}", types); +/// +/// // Get a builder for a specific scheme +/// # #[cfg(feature = "storage-memory")] +/// # { +/// let builder = registry.get_builder("memory").unwrap(); +/// # } +/// ``` +/// +/// You can also register custom storage builders: +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use iceberg::io::{StorageBuilderRegistry, StorageBuilder}; +/// +/// let mut registry = StorageBuilderRegistry::new(); +/// +/// // Register a custom storage builder +/// registry.register("custom", Arc::new(MyCustomStorageBuilder)); +/// ``` +#[derive(Debug, Clone)] +pub struct StorageBuilderRegistry { + builders: HashMap>, +} + +impl StorageBuilderRegistry { + /// Create a new storage registry with default builders based on enabled features. + pub fn new() -> Self { + let mut builders: HashMap> = HashMap::new(); + + #[cfg(feature = "storage-memory")] + { + let builder = Arc::new(OpenDALMemoryStorageBuilder) as Arc; + builders.insert("memory".to_string(), builder); + } + + #[cfg(feature = "storage-fs")] + { + let builder = Arc::new(OpenDALFsStorageBuilder) as Arc; + builders.insert("file".to_string(), builder.clone()); + builders.insert("".to_string(), builder); + } + + #[cfg(feature = "storage-s3")] + { + let builder = Arc::new(OpenDALS3StorageBuilder) as Arc; + builders.insert("s3".to_string(), builder.clone()); + builders.insert("s3a".to_string(), builder); + } + + #[cfg(feature = "storage-gcs")] + { + let builder = Arc::new(OpenDALGcsStorageBuilder) as Arc; + builders.insert("gs".to_string(), builder.clone()); + builders.insert("gcs".to_string(), builder); + } + + #[cfg(feature = "storage-oss")] + { + let builder = Arc::new(OpenDALOssStorageBuilder) as Arc; + builders.insert("oss".to_string(), builder); + } + + #[cfg(feature = "storage-azdls")] + { + let builder = Arc::new(OpenDALAzdlsStorageBuilder) as Arc; + builders.insert("abfs".to_string(), builder.clone()); + builders.insert("abfss".to_string(), builder.clone()); + builders.insert("wasb".to_string(), builder.clone()); + builders.insert("wasbs".to_string(), builder); + } + + Self { builders } + } + + /// Register a custom storage builder for a given scheme. + pub fn register(&mut self, scheme: impl Into, builder: Arc) { + self.builders.insert(scheme.into(), builder); + } + + /// Get a storage builder by scheme. + pub fn get_builder(&self, scheme: &str) -> Result> { + let key = scheme.trim(); + self.builders + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(key)) + .map(|(_, builder)| builder.clone()) + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported storage type: {}. Supported types: {}", + scheme, + self.supported_types().join(", ") + ), + ) + }) + } + + /// Return the list of supported storage types. + pub fn supported_types(&self) -> Vec { + self.builders.keys().cloned().collect() + } +} + +impl Default for StorageBuilderRegistry { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_storage_builder_registry_new() { + let registry = StorageBuilderRegistry::new(); + let types = registry.supported_types(); + + // At least one storage type should be available + assert!(!types.is_empty()); + } + + #[test] + #[cfg(feature = "storage-memory")] + fn test_storage_builder_registry_get_builder() { + let registry = StorageBuilderRegistry::new(); + + // Should be able to get memory storage builder + let builder = registry.get_builder("memory"); + assert!(builder.is_ok()); + + // Should be case-insensitive + let builder = registry.get_builder("MEMORY"); + assert!(builder.is_ok()); + } + + #[test] + fn test_storage_builder_registry_unsupported_type() { + let registry = StorageBuilderRegistry::new(); + + // Should return error for unsupported type + let result = registry.get_builder("unsupported"); + assert!(result.is_err()); + } + + #[test] + #[cfg(feature = "storage-memory")] + fn test_storage_builder_registry_clone() { + let registry = StorageBuilderRegistry::new(); + let cloned = registry.clone(); + + // Both should have the same builders + assert_eq!( + registry.supported_types().len(), + cloned.supported_types().len() + ); + } +} diff --git a/crates/iceberg/src/io/storage_fs.rs b/crates/iceberg/src/io/storage_fs.rs index d3e121a085..eb3a59fa7e 100644 --- a/crates/iceberg/src/io/storage_fs.rs +++ b/crates/iceberg/src/io/storage_fs.rs @@ -15,9 +15,18 @@ // specific language governing permissions and limitations // under the License. -use opendal::Operator; +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; use opendal::services::FsConfig; +use opendal::Operator; +use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, + StorageBuilder, +}; use crate::Result; /// Build new opendal operator from give path. @@ -27,3 +36,105 @@ pub(crate) fn fs_config_build() -> Result { Ok(Operator::from_config(cfg)?.finish()) } + +/// Filesystem storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDALFsStorage; + +impl OpenDALFsStorage { + /// Extract relative path from file:// URLs + fn extract_relative_path<'a>(&self, path: &'a str) -> &'a str { + if let Some(stripped) = path.strip_prefix("file:/") { + stripped + } else { + &path[1..] + } + } +} + +#[async_trait] +impl Storage for OpenDALFsStorage { + async fn exists(&self, path: &str) -> Result { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + let meta = op.stat(relative_path).await?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + Ok(Box::new(op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + Ok(op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + let relative_path = self.extract_relative_path(path); + let op = fs_config_build()?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } +} + +/// Builder for OpenDAL Filesystem storage +#[derive(Debug)] +pub struct OpenDALFsStorageBuilder; + +impl StorageBuilder for OpenDALFsStorageBuilder { + fn build( + &self, + _props: HashMap, + _extensions: Extensions, + ) -> Result> { + Ok(Arc::new(OpenDALFsStorage)) + } +} diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index 8c3d914c86..e0fed11024 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -17,12 +17,18 @@ //! Google Cloud Storage properties use std::collections::HashMap; +use std::sync::Arc; -use opendal::Operator; +use async_trait::async_trait; +use bytes::Bytes; use opendal::services::GcsConfig; +use opendal::Operator; use url::Url; -use crate::io::is_truthy; +use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, + StorageBuilder, is_truthy, +}; use crate::{Error, ErrorKind, Result}; // Reference: https://github.com/apache/iceberg/blob/main/gcp/src/main/java/org/apache/iceberg/gcp/GCPProperties.java @@ -104,3 +110,110 @@ pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result cfg.bucket = bucket.to_string(); Ok(Operator::from_config(cfg)?.finish()) } + +/// GCS storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDALGcsStorage { + config: Arc, +} + +impl OpenDALGcsStorage { + /// Creates operator from path. + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + let operator = gcs_config_build(&self.config, path)?; + let prefix = format!("gs://{}/", operator.info().name()); + + if path.starts_with(&prefix) { + let op = operator.layer(opendal::layers::RetryLayer::new()); + Ok((op, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {}, should start with {}", path, prefix), + )) + } + } +} + +#[async_trait] +impl Storage for OpenDALGcsStorage { + async fn exists(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + let meta = op.stat(relative_path).await?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } +} + +/// Builder for OpenDAL GCS storage +#[derive(Debug)] +pub struct OpenDALGcsStorageBuilder; + +impl StorageBuilder for OpenDALGcsStorageBuilder { + fn build( + &self, + props: HashMap, + _extensions: Extensions, + ) -> Result> { + let cfg = gcs_config_parse(props)?; + Ok(Arc::new(OpenDALGcsStorage { + config: Arc::new(cfg), + })) + } +} diff --git a/crates/iceberg/src/io/storage_memory.rs b/crates/iceberg/src/io/storage_memory.rs index b8023717b6..2c51d5af37 100644 --- a/crates/iceberg/src/io/storage_memory.rs +++ b/crates/iceberg/src/io/storage_memory.rs @@ -15,11 +15,118 @@ // specific language governing permissions and limitations // under the License. -use opendal::Operator; +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; use opendal::services::MemoryConfig; +use opendal::Operator; +use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, + StorageBuilder, +}; use crate::Result; pub(crate) fn memory_config_build() -> Result { Ok(Operator::from_config(MemoryConfig::default())?.finish()) } + +/// Memory storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDALMemoryStorage { + op: Operator, +} + +impl OpenDALMemoryStorage { + /// Extract relative path from memory:// URLs + fn extract_relative_path<'a>(&self, path: &'a str) -> &'a str { + if let Some(stripped) = path.strip_prefix("memory:/") { + stripped + } else { + &path[1..] + } + } +} + +#[async_trait] +impl Storage for OpenDALMemoryStorage { + async fn exists(&self, path: &str) -> Result { + let relative_path = self.extract_relative_path(path); + Ok(self.op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let relative_path = self.extract_relative_path(path); + let meta = self.op.stat(relative_path).await?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let relative_path = self.extract_relative_path(path); + Ok(self.op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let relative_path = self.extract_relative_path(path); + Ok(Box::new(self.op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let relative_path = self.extract_relative_path(path); + Ok(Box::new(self.op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let relative_path = self.extract_relative_path(path); + Ok(self.op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + let relative_path = self.extract_relative_path(path); + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(self.op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } +} + +/// Builder for OpenDAL Memory storage +#[derive(Debug)] +pub struct OpenDALMemoryStorageBuilder; + +impl StorageBuilder for OpenDALMemoryStorageBuilder { + fn build( + &self, + _props: HashMap, + _extensions: Extensions, + ) -> Result> { + let op = Operator::from_config(MemoryConfig::default())?.finish(); + Ok(Arc::new(OpenDALMemoryStorage { op })) + } +} diff --git a/crates/iceberg/src/io/storage_oss.rs b/crates/iceberg/src/io/storage_oss.rs index 8bfffc6ca8..43c5cbb5c8 100644 --- a/crates/iceberg/src/io/storage_oss.rs +++ b/crates/iceberg/src/io/storage_oss.rs @@ -16,11 +16,18 @@ // under the License. use std::collections::HashMap; +use std::sync::Arc; +use async_trait::async_trait; +use bytes::Bytes; use opendal::services::OssConfig; use opendal::{Configurator, Operator}; use url::Url; +use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, + StorageBuilder, +}; use crate::{Error, ErrorKind, Result}; /// Required configuration arguments for creating an Aliyun OSS Operator with OpenDAL: @@ -64,3 +71,110 @@ pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result Ok(Operator::new(builder)?.finish()) } + +/// OSS storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDALOssStorage { + config: Arc, +} + +impl OpenDALOssStorage { + /// Creates operator from path. + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + let op = oss_config_build(&self.config, path)?; + let prefix = format!("oss://{}/", op.info().name()); + + if path.starts_with(&prefix) { + let op = op.layer(opendal::layers::RetryLayer::new()); + Ok((op, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid oss url: {}, should start with {}", path, prefix), + )) + } + } +} + +#[async_trait] +impl Storage for OpenDALOssStorage { + async fn exists(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + let meta = op.stat(relative_path).await?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } +} + +/// Builder for OpenDAL OSS storage +#[derive(Debug)] +pub struct OpenDALOssStorageBuilder; + +impl StorageBuilder for OpenDALOssStorageBuilder { + fn build( + &self, + props: HashMap, + _extensions: Extensions, + ) -> Result> { + let cfg = oss_config_parse(props)?; + Ok(Arc::new(OpenDALOssStorage { + config: Arc::new(cfg), + })) + } +} diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index f2408331c5..24d96a2874 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -19,13 +19,17 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use opendal::services::S3Config; use opendal::{Configurator, Operator}; pub use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; use url::Url; -use crate::io::is_truthy; +use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, + is_truthy, +}; use crate::{Error, ErrorKind, Result}; /// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3). @@ -214,3 +218,144 @@ impl AwsCredentialLoad for CustomAwsCredentialLoader { self.0.load_credential(client).await } } + +/// S3 storage implementation using OpenDAL +#[derive(Debug, Clone)] +pub struct OpenDALS3Storage { + /// s3 storage could have `s3://` and `s3a://`. + /// Storing the scheme string here to return the correct path. + configured_scheme: String, + config: Arc, + customized_credential_load: Option, +} + +impl OpenDALS3Storage { + /// Creates operator from path. + /// + /// # Arguments + /// + /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. + /// + /// # Returns + /// + /// The return value consists of two parts: + /// + /// * An [`opendal::Operator`] instance used to operate on file. + /// * Relative path to the root uri of [`opendal::Operator`]. + fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + let op = s3_config_build(&self.config, &self.customized_credential_load, path)?; + let op_info = op.info(); + + // Check prefix of s3 path. + let prefix = format!("{}://{}/", self.configured_scheme, op_info.name()); + if path.starts_with(&prefix) { + // Add retry layer for transient errors + let op = op.layer(opendal::layers::RetryLayer::new()); + Ok((op, &path[prefix.len()..])) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, should start with {}", path, prefix), + )) + } + } +} + +#[async_trait] +impl Storage for OpenDALS3Storage { + async fn exists(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.exists(relative_path).await?) + } + + async fn metadata(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + let meta = op.stat(relative_path).await?; + + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.read(relative_path).await?.to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.reader(relative_path).await?)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let mut writer = self.writer(path).await?; + writer.write(bs).await?; + writer.close().await + } + + async fn writer(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(path)?; + Ok(Box::new(op.writer(relative_path).await?)) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + Ok(op.delete(relative_path).await?) + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await?) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile { + storage: Arc::new(self.clone()), + path: path.to_string(), + }) + } +} + +/// Builder for S3 storage +#[derive(Debug)] +pub struct OpenDALS3StorageBuilder; + +impl StorageBuilder for OpenDALS3StorageBuilder { + fn build( + &self, + props: HashMap, + extensions: Extensions, + ) -> Result> { + // Get the scheme string from the props or use "s3" as default + let scheme_str = props + .get("scheme_str") + .cloned() + .unwrap_or_else(|| "s3".to_string()); + + // Parse S3 config from props + let config = s3_config_parse(props)?; + + // Get customized credential loader from extensions if available + let customized_credential_load = extensions + .get::() + .map(Arc::unwrap_or_clone); + + Ok(Arc::new(OpenDALS3Storage { + configured_scheme: scheme_str, + config: Arc::new(config), + customized_credential_load, + })) + } +} From e5516b8db8db4a694de672eb60a1f98640daba20 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 16 Oct 2025 19:24:07 -0700 Subject: [PATCH 05/17] Cleanup OpenDALStorage enum --- crates/iceberg/src/io/file_io.rs | 55 +-- crates/iceberg/src/io/mod.rs | 7 +- crates/iceberg/src/io/storage.rs | 517 ++++++++++++----------- crates/iceberg/src/io/storage_azdls.rs | 6 +- crates/iceberg/src/io/storage_builder.rs | 210 --------- crates/iceberg/src/io/storage_fs.rs | 7 +- crates/iceberg/src/io/storage_gcs.rs | 8 +- crates/iceberg/src/io/storage_memory.rs | 7 +- crates/iceberg/src/io/storage_oss.rs | 5 +- 9 files changed, 285 insertions(+), 537 deletions(-) delete mode 100644 crates/iceberg/src/io/storage_builder.rs diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 4ce81d8f04..d64b45db3f 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -25,52 +25,11 @@ use async_trait::async_trait; use bytes::Bytes; use url::Url; +// Re-export traits from storage module +pub use super::storage::{Storage, StorageBuilder}; +use crate::io::StorageBuilderRegistry; use crate::{Error, ErrorKind, Result}; -/// Trait for storage operations in Iceberg -#[async_trait] -pub trait Storage: Debug + Send + Sync { - /// Check if a file exists at the given path - async fn exists(&self, path: &str) -> Result; - - /// Get metadata from an input path - async fn metadata(&self, path: &str) -> Result; - - /// Read bytes from a path - async fn read(&self, path: &str) -> Result; - - /// Get FileRead from a path - async fn reader(&self, path: &str) -> Result>; - - /// Write bytes to an output path - async fn write(&self, path: &str, bs: Bytes) -> Result<()>; - - /// Get FileWrite from a path - async fn writer(&self, path: &str) -> Result>; - - /// Delete a file at the given path - async fn delete(&self, path: &str) -> Result<()>; - - /// Remove a directory and all its contents recursively - async fn remove_dir_all(&self, path: &str) -> Result<()>; - - /// Create a new input file for reading - fn new_input(&self, path: &str) -> Result; - - /// Create a new output file for writing - fn new_output(&self, path: &str) -> Result; -} - -/// Common interface for all storage builders. -pub trait StorageBuilder: Debug + Send + Sync { - /// Create a new storage instance with the given properties and extensions. - fn build( - &self, - props: HashMap, - extensions: Extensions, - ) -> Result>; -} - /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -133,8 +92,6 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - // let (op, relative_path) = self.inner.create_operator(&path)?; - // Ok(op.delete(relative_path).await?) self.inner.delete(path.as_ref()).await } @@ -299,11 +256,11 @@ impl FileIOBuilder { pub fn build(self) -> Result { // Use the scheme to determine the storage type let scheme = self.scheme_str.clone().unwrap_or_default(); - + // Create registry and get builder - let registry = crate::io::StorageBuilderRegistry::new(); + let registry = StorageBuilderRegistry::new(); let builder = registry.get_builder(scheme.as_str())?; - + // Build storage with props and extensions let storage = builder.build(self.props.clone(), self.extensions.clone())?; diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5f234961fc..2397e2561f 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -68,10 +68,9 @@ mod file_io; mod storage; -mod storage_builder; pub use file_io::*; -pub use storage_builder::StorageBuilderRegistry; +pub use storage::{Storage, StorageBuilder, StorageBuilderRegistry}; pub(crate) mod object_cache; #[cfg(feature = "storage-azdls")] @@ -89,12 +88,8 @@ mod storage_s3; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; -#[cfg(feature = "storage-fs")] -use storage_fs::*; #[cfg(feature = "storage-gcs")] pub use storage_gcs::*; -#[cfg(feature = "storage-memory")] -use storage_memory::*; #[cfg(feature = "storage-oss")] pub use storage_oss::*; #[cfg(feature = "storage-s3")] diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 62c6b8a31e..03f5b77155 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -15,287 +15,296 @@ // specific language governing permissions and limitations // under the License. +//! Storage traits and implementations for Iceberg. +//! +//! This module provides the core storage abstraction used throughout Iceberg Rust. +//! Storage implementations handle reading and writing files across different backends +//! (S3, GCS, Azure, local filesystem, etc.). + +use std::collections::HashMap; +use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -use opendal::layers::RetryLayer; -#[cfg(feature = "storage-azdls")] -use opendal::services::AzdlsConfig; -#[cfg(feature = "storage-gcs")] -use opendal::services::GcsConfig; -#[cfg(feature = "storage-oss")] -use opendal::services::OssConfig; -#[cfg(feature = "storage-s3")] -use opendal::services::S3Config; -use opendal::{Operator, Scheme}; - -#[cfg(feature = "storage-azdls")] -use super::AzureStorageScheme; -use super::{FileIOBuilder, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage}; -#[cfg(feature = "storage-s3")] -use crate::io::CustomAwsCredentialLoader; -use crate::{Error, ErrorKind, Result}; - -/// The storage carries all supported storage services in iceberg + +use super::{Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; +use crate::Result; + +/// Trait for storage operations in Iceberg. +/// +/// This trait defines the interface for all storage backends. Implementations +/// provide access to different storage systems like S3, GCS, Azure, local filesystem, etc. +/// +/// # Example +/// +/// ```rust,ignore +/// use iceberg::io::Storage; +/// +/// async fn example(storage: Arc) -> Result<()> { +/// // Check if file exists +/// if storage.exists("s3://bucket/path/file.parquet").await? { +/// // Read file +/// let data = storage.read("s3://bucket/path/file.parquet").await?; +/// } +/// Ok(()) +/// } +/// ``` +#[async_trait] +pub trait Storage: Debug + Send + Sync { + /// Check if a file exists at the given path + async fn exists(&self, path: &str) -> Result; + + /// Get metadata from an input path + async fn metadata(&self, path: &str) -> Result; + + /// Read bytes from a path + async fn read(&self, path: &str) -> Result; + + /// Get FileRead from a path + async fn reader(&self, path: &str) -> Result>; + + /// Write bytes to an output path + async fn write(&self, path: &str, bs: Bytes) -> Result<()>; + + /// Get FileWrite from a path + async fn writer(&self, path: &str) -> Result>; + + /// Delete a file at the given path + async fn delete(&self, path: &str) -> Result<()>; + + /// Remove a directory and all its contents recursively + async fn remove_dir_all(&self, path: &str) -> Result<()>; + + /// Create a new input file for reading + fn new_input(&self, path: &str) -> Result; + + /// Create a new output file for writing + fn new_output(&self, path: &str) -> Result; +} + +/// Common interface for all storage builders. +/// +/// Storage builders are responsible for creating storage instances from configuration +/// properties and extensions. Each storage backend (S3, GCS, etc.) provides its own +/// builder implementation. +/// +/// # Example +/// +/// ```rust,ignore +/// use iceberg::io::{StorageBuilder, Extensions}; +/// use std::collections::HashMap; +/// +/// struct MyStorageBuilder; +/// +/// impl StorageBuilder for MyStorageBuilder { +/// fn build( +/// &self, +/// props: HashMap, +/// extensions: Extensions, +/// ) -> Result> { +/// // Parse configuration and create storage +/// Ok(Arc::new(MyStorage::new(props)?)) +/// } +/// } +/// ``` +pub trait StorageBuilder: Debug + Send + Sync { + /// Create a new storage instance with the given properties and extensions. + /// + /// # Arguments + /// + /// * `props` - Configuration properties for the storage backend + /// * `extensions` - Additional extensions (e.g., custom credential loaders) + /// + /// # Returns + /// + /// An `Arc` that can be used for file operations. + fn build( + &self, + props: HashMap, + extensions: Extensions, + ) -> Result>; +} + +/// A registry of storage builders. +/// +/// The registry allows you to register custom storage builders for different URI schemes. +/// By default, it includes builders for all enabled storage features. +/// +/// # Example +/// +/// ```rust +/// use iceberg::io::StorageBuilderRegistry; +/// +/// // Create a new registry with default builders +/// let registry = StorageBuilderRegistry::new(); +/// +/// // Get supported storage types +/// let types = registry.supported_types(); +/// println!("Supported storage types: {:?}", types); +/// +/// // Get a builder for a specific scheme +/// # #[cfg(feature = "storage-memory")] +/// # { +/// let builder = registry.get_builder("memory").unwrap(); +/// # } +/// ``` +/// +/// You can also register custom storage builders: +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use iceberg::io::{StorageBuilderRegistry, StorageBuilder}; +/// +/// let mut registry = StorageBuilderRegistry::new(); +/// +/// // Register a custom storage builder +/// registry.register("custom", Arc::new(MyCustomStorageBuilder)); +/// ``` #[derive(Debug, Clone)] -pub(crate) enum OpenDALStorage { - #[cfg(feature = "storage-memory")] - Memory(Operator), - #[cfg(feature = "storage-fs")] - LocalFs, - /// Expects paths of the form `s3[a]:///`. - #[cfg(feature = "storage-s3")] - S3 { - /// s3 storage could have `s3://` and `s3a://`. - /// Storing the scheme string here to return the correct path. - configured_scheme: String, - config: Arc, - customized_credential_load: Option, - }, - #[cfg(feature = "storage-gcs")] - Gcs { config: Arc }, - #[cfg(feature = "storage-oss")] - Oss { config: Arc }, - /// Expects paths of the form - /// `abfs[s]://@.dfs./` or - /// `wasb[s]://@.blob./`. - #[cfg(feature = "storage-azdls")] - Azdls { - /// Because Azdls accepts multiple possible schemes, we store the full - /// passed scheme here to later validate schemes passed via paths. - configured_scheme: AzureStorageScheme, - config: Arc, - }, +pub struct StorageBuilderRegistry { + builders: HashMap>, } -#[async_trait] -impl Storage for OpenDALStorage { - async fn exists(&self, path: &str) -> Result { - let (op, relative_path) = self.create_operator(&path)?; - Ok(op.exists(relative_path).await?) - } +impl StorageBuilderRegistry { + /// Create a new storage registry with default builders based on enabled features. + pub fn new() -> Self { + let mut builders: HashMap> = HashMap::new(); - async fn metadata(&self, path: &str) -> Result { - let (op, relative_path) = self.create_operator(&path)?; - let meta = op.stat(relative_path).await?; + #[cfg(feature = "storage-memory")] + { + use crate::io::storage_memory::OpenDALMemoryStorageBuilder; + let builder = Arc::new(OpenDALMemoryStorageBuilder) as Arc; + builders.insert("memory".to_string(), builder); + } - Ok(FileMetadata { - size: meta.content_length(), - }) - } + #[cfg(feature = "storage-fs")] + { + use crate::io::storage_fs::OpenDALFsStorageBuilder; + let builder = Arc::new(OpenDALFsStorageBuilder) as Arc; + builders.insert("file".to_string(), builder.clone()); + builders.insert("".to_string(), builder); + } - async fn read(&self, path: &str) -> Result { - let (op, relative_path) = self.create_operator(&path)?; - Ok(op.read(relative_path).await?.to_bytes()) - } + #[cfg(feature = "storage-s3")] + { + use crate::io::storage_s3::OpenDALS3StorageBuilder; + let builder = Arc::new(OpenDALS3StorageBuilder) as Arc; + builders.insert("s3".to_string(), builder.clone()); + builders.insert("s3a".to_string(), builder); + } - async fn reader(&self, path: &str) -> Result> { - let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(op.reader(relative_path).await?)) - } + #[cfg(feature = "storage-gcs")] + { + use crate::io::storage_gcs::OpenDALGcsStorageBuilder; + let builder = Arc::new(OpenDALGcsStorageBuilder) as Arc; + builders.insert("gs".to_string(), builder.clone()); + builders.insert("gcs".to_string(), builder); + } - async fn write(&self, path: &str, bs: Bytes) -> Result<()> { - let mut writer = self.writer(path).await?; - writer.write(bs).await?; - writer.close().await - } + #[cfg(feature = "storage-oss")] + { + use crate::io::storage_oss::OpenDALOssStorageBuilder; + let builder = Arc::new(OpenDALOssStorageBuilder) as Arc; + builders.insert("oss".to_string(), builder); + } + + #[cfg(feature = "storage-azdls")] + { + use crate::io::storage_azdls::OpenDALAzdlsStorageBuilder; + let builder = Arc::new(OpenDALAzdlsStorageBuilder) as Arc; + builders.insert("abfs".to_string(), builder.clone()); + builders.insert("abfss".to_string(), builder.clone()); + builders.insert("wasb".to_string(), builder.clone()); + builders.insert("wasbs".to_string(), builder); + } - async fn writer(&self, path: &str) -> Result> { - let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(op.writer(relative_path).await?)) + Self { builders } } - async fn delete(&self, path: &str) -> Result<()> { - let (op, relative_path) = self.create_operator(&path)?; - Ok(op.delete(relative_path).await?) + /// Register a custom storage builder for a given scheme. + pub fn register(&mut self, scheme: impl Into, builder: Arc) { + self.builders.insert(scheme.into(), builder); } - async fn remove_dir_all(&self, path: &str) -> Result<()> { - let (op, relative_path) = self.create_operator(&path)?; - let path = if relative_path.ends_with('/') { - relative_path.to_string() - } else { - format!("{relative_path}/") - }; - Ok(op.remove_all(&path).await?) + /// Get a storage builder by scheme. + pub fn get_builder(&self, scheme: &str) -> Result> { + let key = scheme.trim(); + self.builders + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(key)) + .map(|(_, builder)| builder.clone()) + .ok_or_else(|| { + use crate::{Error, ErrorKind}; + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported storage type: {}. Supported types: {}", + scheme, + self.supported_types().join(", ") + ), + ) + }) } - fn new_input(&self, path: &str) -> Result { - let storage = Arc::new(self.clone()); - let path = path.to_string(); - Ok(InputFile { storage, path }) + /// Return the list of supported storage types. + pub fn supported_types(&self) -> Vec { + self.builders.keys().cloned().collect() } +} - fn new_output(&self, path: &str) -> Result { - let storage = Arc::new(self.clone()); - let path = path.to_string(); - Ok(OutputFile { storage, path }) +impl Default for StorageBuilderRegistry { + fn default() -> Self { + Self::new() } } -impl OpenDALStorage { - /// Convert iceberg config to opendal config. - pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result { - let (scheme_str, props, extensions) = file_io_builder.into_parts(); - let scheme = Self::parse_scheme(&scheme_str)?; - - match scheme { - #[cfg(feature = "storage-memory")] - Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)), - #[cfg(feature = "storage-fs")] - Scheme::Fs => Ok(Self::LocalFs), - #[cfg(feature = "storage-s3")] - Scheme::S3 => Ok(Self::S3 { - configured_scheme: scheme_str, - config: super::s3_config_parse(props)?.into(), - customized_credential_load: extensions - .get::() - .map(Arc::unwrap_or_clone), - }), - #[cfg(feature = "storage-gcs")] - Scheme::Gcs => Ok(Self::Gcs { - config: super::gcs_config_parse(props)?.into(), - }), - #[cfg(feature = "storage-oss")] - Scheme::Oss => Ok(Self::Oss { - config: super::oss_config_parse(props)?.into(), - }), - #[cfg(feature = "storage-azdls")] - Scheme::Azdls => { - let scheme = scheme_str.parse::()?; - Ok(Self::Azdls { - config: super::azdls_config_parse(props)?.into(), - configured_scheme: scheme, - }) - } - // Update doc on [`FileIO`] when adding new schemes. - _ => Err(Error::new( - ErrorKind::FeatureUnsupported, - format!("Constructing file io from scheme: {scheme} not supported now",), - )), - } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_storage_builder_registry_new() { + let registry = StorageBuilderRegistry::new(); + let types = registry.supported_types(); + + // At least one storage type should be available + assert!(!types.is_empty()); } - /// Creates operator from path. - /// - /// # Arguments - /// - /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. - /// - /// # Returns - /// - /// The return value consists of two parts: - /// - /// * An [`opendal::Operator`] instance used to operate on file. - /// * Relative path to the root uri of [`opendal::Operator`]. - pub(crate) fn create_operator<'a>( - &self, - path: &'a impl AsRef, - ) -> crate::Result<(Operator, &'a str)> { - let path = path.as_ref(); - let (operator, relative_path): (Operator, &str) = match self { - #[cfg(feature = "storage-memory")] - OpenDALStorage::Memory(op) => { - if let Some(stripped) = path.strip_prefix("memory:/") { - Ok::<_, crate::Error>((op.clone(), stripped)) - } else { - Ok::<_, crate::Error>((op.clone(), &path[1..])) - } - } - #[cfg(feature = "storage-fs")] - OpenDALStorage::LocalFs => { - let op = super::fs_config_build()?; - - if let Some(stripped) = path.strip_prefix("file:/") { - Ok::<_, crate::Error>((op, stripped)) - } else { - Ok::<_, crate::Error>((op, &path[1..])) - } - } - #[cfg(feature = "storage-s3")] - OpenDALStorage::S3 { - configured_scheme, - config, - customized_credential_load, - } => { - let op = super::s3_config_build(config, customized_credential_load, path)?; - let op_info = op.info(); - - // Check prefix of s3 path. - let prefix = format!("{}://{}/", configured_scheme, op_info.name()); - if path.starts_with(&prefix) { - Ok((op, &path[prefix.len()..])) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, should start with {}", path, prefix), - )) - } - } - #[cfg(feature = "storage-gcs")] - OpenDALStorage::Gcs { config } => { - let operator = super::gcs_config_build(config, path)?; - let prefix = format!("gs://{}/", operator.info().name()); - if path.starts_with(&prefix) { - Ok((operator, &path[prefix.len()..])) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid gcs url: {}, should start with {}", path, prefix), - )) - } - } - #[cfg(feature = "storage-oss")] - OpenDALStorage::Oss { config } => { - let op = super::oss_config_build(config, path)?; - - // Check prefix of oss path. - let prefix = format!("oss://{}/", op.info().name()); - if path.starts_with(&prefix) { - Ok((op, &path[prefix.len()..])) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {}, should start with {}", path, prefix), - )) - } - } - #[cfg(feature = "storage-azdls")] - OpenDALStorage::Azdls { - configured_scheme, - config, - } => super::azdls_create_operator(path, config, configured_scheme), - #[cfg(all( - not(feature = "storage-s3"), - not(feature = "storage-fs"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), - ))] - _ => Err(Error::new( - ErrorKind::FeatureUnsupported, - "No storage service has been enabled", - )), - }?; - - // Transient errors are common for object stores; however there's no - // harm in retrying temporary failures for other storage backends as well. - let operator = operator.layer(RetryLayer::new()); - - Ok((operator, relative_path)) + #[test] + #[cfg(feature = "storage-memory")] + fn test_storage_builder_registry_get_builder() { + let registry = StorageBuilderRegistry::new(); + + // Should be able to get memory storage builder + let builder = registry.get_builder("memory"); + assert!(builder.is_ok()); + + // Should be case-insensitive + let builder = registry.get_builder("MEMORY"); + assert!(builder.is_ok()); } - /// Parse scheme. - fn parse_scheme(scheme: &str) -> crate::Result { - match scheme { - "memory" => Ok(Scheme::Memory), - "file" | "" => Ok(Scheme::Fs), - "s3" | "s3a" => Ok(Scheme::S3), - "gs" | "gcs" => Ok(Scheme::Gcs), - "oss" => Ok(Scheme::Oss), - "abfss" | "abfs" | "wasbs" | "wasb" => Ok(Scheme::Azdls), - s => Ok(s.parse::()?), - } + #[test] + fn test_storage_builder_registry_unsupported_type() { + let registry = StorageBuilderRegistry::new(); + + // Should return error for unsupported type + let result = registry.get_builder("unsupported"); + assert!(result.is_err()); + } + + #[test] + #[cfg(feature = "storage-memory")] + fn test_storage_builder_registry_clone() { + let registry = StorageBuilderRegistry::new(); + let cloned = registry.clone(); + + // Both should have the same builders + assert_eq!( + registry.supported_types().len(), + cloned.supported_types().len() + ); } } diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 2afccd5a01..ff9b5f6a5a 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -27,8 +27,7 @@ use opendal::{Configurator, Operator}; use url::Url; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, - StorageBuilder, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, }; use crate::{Error, ErrorKind, Result, ensure_data_valid}; @@ -617,7 +616,8 @@ pub struct OpenDALAzdlsStorage { impl OpenDALAzdlsStorage { /// Creates operator from path. fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { - let (op, relative_path) = azdls_create_operator(path, &self.config, &self.configured_scheme)?; + let (op, relative_path) = + azdls_create_operator(path, &self.config, &self.configured_scheme)?; let op = op.layer(opendal::layers::RetryLayer::new()); Ok((op, relative_path)) } diff --git a/crates/iceberg/src/io/storage_builder.rs b/crates/iceberg/src/io/storage_builder.rs deleted file mode 100644 index aacd68bb00..0000000000 --- a/crates/iceberg/src/io/storage_builder.rs +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use crate::io::StorageBuilder; -#[cfg(feature = "storage-azdls")] -use crate::io::storage_azdls::OpenDALAzdlsStorageBuilder; -#[cfg(feature = "storage-fs")] -use crate::io::storage_fs::OpenDALFsStorageBuilder; -#[cfg(feature = "storage-gcs")] -use crate::io::storage_gcs::OpenDALGcsStorageBuilder; -#[cfg(feature = "storage-memory")] -use crate::io::storage_memory::OpenDALMemoryStorageBuilder; -#[cfg(feature = "storage-oss")] -use crate::io::storage_oss::OpenDALOssStorageBuilder; -#[cfg(feature = "storage-s3")] -use crate::io::storage_s3::OpenDALS3StorageBuilder; -use crate::{Error, ErrorKind, Result}; - -/// A registry of storage builders. -/// -/// The registry allows you to register custom storage builders for different URI schemes. -/// By default, it includes builders for all enabled storage features. -/// -/// # Example -/// -/// ```rust -/// use iceberg::io::StorageBuilderRegistry; -/// -/// // Create a new registry with default builders -/// let registry = StorageBuilderRegistry::new(); -/// -/// // Get supported storage types -/// let types = registry.supported_types(); -/// println!("Supported storage types: {:?}", types); -/// -/// // Get a builder for a specific scheme -/// # #[cfg(feature = "storage-memory")] -/// # { -/// let builder = registry.get_builder("memory").unwrap(); -/// # } -/// ``` -/// -/// You can also register custom storage builders: -/// -/// ```rust,ignore -/// use std::sync::Arc; -/// use iceberg::io::{StorageBuilderRegistry, StorageBuilder}; -/// -/// let mut registry = StorageBuilderRegistry::new(); -/// -/// // Register a custom storage builder -/// registry.register("custom", Arc::new(MyCustomStorageBuilder)); -/// ``` -#[derive(Debug, Clone)] -pub struct StorageBuilderRegistry { - builders: HashMap>, -} - -impl StorageBuilderRegistry { - /// Create a new storage registry with default builders based on enabled features. - pub fn new() -> Self { - let mut builders: HashMap> = HashMap::new(); - - #[cfg(feature = "storage-memory")] - { - let builder = Arc::new(OpenDALMemoryStorageBuilder) as Arc; - builders.insert("memory".to_string(), builder); - } - - #[cfg(feature = "storage-fs")] - { - let builder = Arc::new(OpenDALFsStorageBuilder) as Arc; - builders.insert("file".to_string(), builder.clone()); - builders.insert("".to_string(), builder); - } - - #[cfg(feature = "storage-s3")] - { - let builder = Arc::new(OpenDALS3StorageBuilder) as Arc; - builders.insert("s3".to_string(), builder.clone()); - builders.insert("s3a".to_string(), builder); - } - - #[cfg(feature = "storage-gcs")] - { - let builder = Arc::new(OpenDALGcsStorageBuilder) as Arc; - builders.insert("gs".to_string(), builder.clone()); - builders.insert("gcs".to_string(), builder); - } - - #[cfg(feature = "storage-oss")] - { - let builder = Arc::new(OpenDALOssStorageBuilder) as Arc; - builders.insert("oss".to_string(), builder); - } - - #[cfg(feature = "storage-azdls")] - { - let builder = Arc::new(OpenDALAzdlsStorageBuilder) as Arc; - builders.insert("abfs".to_string(), builder.clone()); - builders.insert("abfss".to_string(), builder.clone()); - builders.insert("wasb".to_string(), builder.clone()); - builders.insert("wasbs".to_string(), builder); - } - - Self { builders } - } - - /// Register a custom storage builder for a given scheme. - pub fn register(&mut self, scheme: impl Into, builder: Arc) { - self.builders.insert(scheme.into(), builder); - } - - /// Get a storage builder by scheme. - pub fn get_builder(&self, scheme: &str) -> Result> { - let key = scheme.trim(); - self.builders - .iter() - .find(|(k, _)| k.eq_ignore_ascii_case(key)) - .map(|(_, builder)| builder.clone()) - .ok_or_else(|| { - Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Unsupported storage type: {}. Supported types: {}", - scheme, - self.supported_types().join(", ") - ), - ) - }) - } - - /// Return the list of supported storage types. - pub fn supported_types(&self) -> Vec { - self.builders.keys().cloned().collect() - } -} - -impl Default for StorageBuilderRegistry { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_storage_builder_registry_new() { - let registry = StorageBuilderRegistry::new(); - let types = registry.supported_types(); - - // At least one storage type should be available - assert!(!types.is_empty()); - } - - #[test] - #[cfg(feature = "storage-memory")] - fn test_storage_builder_registry_get_builder() { - let registry = StorageBuilderRegistry::new(); - - // Should be able to get memory storage builder - let builder = registry.get_builder("memory"); - assert!(builder.is_ok()); - - // Should be case-insensitive - let builder = registry.get_builder("MEMORY"); - assert!(builder.is_ok()); - } - - #[test] - fn test_storage_builder_registry_unsupported_type() { - let registry = StorageBuilderRegistry::new(); - - // Should return error for unsupported type - let result = registry.get_builder("unsupported"); - assert!(result.is_err()); - } - - #[test] - #[cfg(feature = "storage-memory")] - fn test_storage_builder_registry_clone() { - let registry = StorageBuilderRegistry::new(); - let cloned = registry.clone(); - - // Both should have the same builders - assert_eq!( - registry.supported_types().len(), - cloned.supported_types().len() - ); - } -} diff --git a/crates/iceberg/src/io/storage_fs.rs b/crates/iceberg/src/io/storage_fs.rs index eb3a59fa7e..791ab3e9fc 100644 --- a/crates/iceberg/src/io/storage_fs.rs +++ b/crates/iceberg/src/io/storage_fs.rs @@ -20,14 +20,13 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -use opendal::services::FsConfig; use opendal::Operator; +use opendal::services::FsConfig; +use crate::Result; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, - StorageBuilder, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, }; -use crate::Result; /// Build new opendal operator from give path. pub(crate) fn fs_config_build() -> Result { diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index e0fed11024..65cff80de3 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -21,13 +21,13 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -use opendal::services::GcsConfig; use opendal::Operator; +use opendal::services::GcsConfig; use url::Url; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, - StorageBuilder, is_truthy, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, + is_truthy, }; use crate::{Error, ErrorKind, Result}; @@ -122,7 +122,7 @@ impl OpenDALGcsStorage { fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { let operator = gcs_config_build(&self.config, path)?; let prefix = format!("gs://{}/", operator.info().name()); - + if path.starts_with(&prefix) { let op = operator.layer(opendal::layers::RetryLayer::new()); Ok((op, &path[prefix.len()..])) diff --git a/crates/iceberg/src/io/storage_memory.rs b/crates/iceberg/src/io/storage_memory.rs index 2c51d5af37..c406929109 100644 --- a/crates/iceberg/src/io/storage_memory.rs +++ b/crates/iceberg/src/io/storage_memory.rs @@ -20,14 +20,13 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -use opendal::services::MemoryConfig; use opendal::Operator; +use opendal::services::MemoryConfig; +use crate::Result; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, - StorageBuilder, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, }; -use crate::Result; pub(crate) fn memory_config_build() -> Result { Ok(Operator::from_config(MemoryConfig::default())?.finish()) diff --git a/crates/iceberg/src/io/storage_oss.rs b/crates/iceberg/src/io/storage_oss.rs index 43c5cbb5c8..fe230c7289 100644 --- a/crates/iceberg/src/io/storage_oss.rs +++ b/crates/iceberg/src/io/storage_oss.rs @@ -25,8 +25,7 @@ use opendal::{Configurator, Operator}; use url::Url; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, - StorageBuilder, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, }; use crate::{Error, ErrorKind, Result}; @@ -83,7 +82,7 @@ impl OpenDALOssStorage { fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { let op = oss_config_build(&self.config, path)?; let prefix = format!("oss://{}/", op.info().name()); - + if path.starts_with(&prefix) { let op = op.layer(opendal::layers::RetryLayer::new()); Ok((op, &path[prefix.len()..])) From f065f8b5de1f78137e7f34394d2edc44f532a67e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 16 Oct 2025 19:31:30 -0700 Subject: [PATCH 06/17] clean up --- crates/iceberg/src/io/file_io.rs | 4 ++-- crates/iceberg/src/io/storage_memory.rs | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index d64b45db3f..71b903a22a 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -407,14 +407,14 @@ impl OutputFile { /// Checks if file exists. pub async fn exists(&self) -> Result { - Ok(self.storage.exists(&self.path).await?) + self.storage.exists(&self.path).await? } /// Deletes file. /// /// If the file does not exist, it will not return error. pub async fn delete(&self) -> Result<()> { - Ok(self.storage.delete(&self.path).await?) + self.storage.delete(&self.path).await? } /// Converts into [`InputFile`]. diff --git a/crates/iceberg/src/io/storage_memory.rs b/crates/iceberg/src/io/storage_memory.rs index c406929109..9bad67d30c 100644 --- a/crates/iceberg/src/io/storage_memory.rs +++ b/crates/iceberg/src/io/storage_memory.rs @@ -28,10 +28,6 @@ use crate::io::{ Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, }; -pub(crate) fn memory_config_build() -> Result { - Ok(Operator::from_config(MemoryConfig::default())?.finish()) -} - /// Memory storage implementation using OpenDAL #[derive(Debug, Clone)] pub struct OpenDALMemoryStorage { From 3b161ed54f22186ec2d344e4b8b933b4d091bd0e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 16 Oct 2025 19:34:24 -0700 Subject: [PATCH 07/17] minor --- crates/iceberg/src/io/file_io.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 71b903a22a..82dd98b733 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -407,14 +407,14 @@ impl OutputFile { /// Checks if file exists. pub async fn exists(&self) -> Result { - self.storage.exists(&self.path).await? + self.storage.exists(&self.path).await } /// Deletes file. /// /// If the file does not exist, it will not return error. pub async fn delete(&self) -> Result<()> { - self.storage.delete(&self.path).await? + self.storage.delete(&self.path).await } /// Converts into [`InputFile`]. From 9b5c1aac0c1c037a2a9557da3015df6f05b4af0d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 15:48:54 -0700 Subject: [PATCH 08/17] encapsulate input and output file better --- crates/iceberg/src/io/file_io.rs | 61 ++++++++++++++++--------- crates/iceberg/src/io/storage_azdls.rs | 10 +--- crates/iceberg/src/io/storage_fs.rs | 10 +--- crates/iceberg/src/io/storage_gcs.rs | 10 +--- crates/iceberg/src/io/storage_memory.rs | 10 +--- crates/iceberg/src/io/storage_oss.rs | 10 +--- crates/iceberg/src/io/storage_s3.rs | 10 +--- 7 files changed, 52 insertions(+), 69 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 82dd98b733..5c8d994ae3 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -303,17 +303,28 @@ impl FileRead for opendal::Reader { /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { - /// todo doc - pub storage: Arc, - // Absolution path of file. - /// todo doc - pub path: String, - // todo should remove this? Should always pass down a full path - // // Relative path of file to uri, starts at [`relative_path_pos`] - // relative_path_pos: usize, + /// Storage backend for this input file. + storage: Arc, + /// Absolute path to the file. + path: String, } impl InputFile { + /// Creates a new input file. + /// + /// # Arguments + /// + /// * `storage` - The storage backend to use + /// * `path` - Absolute path to the file + pub fn new(storage: Arc, path: String) -> Self { + Self { storage, path } + } + + /// Returns the storage backend for this input file. + pub fn storage(&self) -> &Arc { + &self.storage + } + /// Absolute path to root uri. pub fn location(&self) -> &str { &self.path @@ -386,20 +397,31 @@ impl FileWrite for Box { } } -/// Output file is used for writing to files.. +/// Output file is used for writing to files. #[derive(Debug)] pub struct OutputFile { - /// todo fix pub qualifier - pub storage: Arc, - // Absolution path of file. - /// todo fix pub qualifier - pub path: String, - // todo should always pass down a full path - // // Relative path of file to uri, starts at [`relative_path_pos`] - // relative_path_pos: usize, + /// Storage backend for this output file. + storage: Arc, + /// Absolute path to the file. + path: String, } impl OutputFile { + /// Creates a new output file. + /// + /// # Arguments + /// + /// * `storage` - The storage backend to use + /// * `path` - Absolute path to the file + pub fn new(storage: Arc, path: String) -> Self { + Self { storage, path } + } + + /// Returns the storage backend for this output file. + pub fn storage(&self) -> &Arc { + &self.storage + } + /// Relative path to root uri. pub fn location(&self) -> &str { &self.path @@ -419,10 +441,7 @@ impl OutputFile { /// Converts into [`InputFile`]. pub fn to_input_file(self) -> InputFile { - InputFile { - storage: self.storage, - path: self.path, - } + InputFile::new(self.storage, self.path) } /// Create a new output file with given bytes. diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index ff9b5f6a5a..22da4dd125 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -675,17 +675,11 @@ impl Storage for OpenDALAzdlsStorage { } fn new_input(&self, path: &str) -> Result { - Ok(InputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } fn new_output(&self, path: &str) -> Result { - Ok(OutputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } diff --git a/crates/iceberg/src/io/storage_fs.rs b/crates/iceberg/src/io/storage_fs.rs index 791ab3e9fc..0b612351f7 100644 --- a/crates/iceberg/src/io/storage_fs.rs +++ b/crates/iceberg/src/io/storage_fs.rs @@ -110,17 +110,11 @@ impl Storage for OpenDALFsStorage { } fn new_input(&self, path: &str) -> Result { - Ok(InputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } fn new_output(&self, path: &str) -> Result { - Ok(OutputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } diff --git a/crates/iceberg/src/io/storage_gcs.rs b/crates/iceberg/src/io/storage_gcs.rs index 65cff80de3..f3dc0212a9 100644 --- a/crates/iceberg/src/io/storage_gcs.rs +++ b/crates/iceberg/src/io/storage_gcs.rs @@ -187,17 +187,11 @@ impl Storage for OpenDALGcsStorage { } fn new_input(&self, path: &str) -> Result { - Ok(InputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } fn new_output(&self, path: &str) -> Result { - Ok(OutputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } diff --git a/crates/iceberg/src/io/storage_memory.rs b/crates/iceberg/src/io/storage_memory.rs index 9bad67d30c..b72a0b7abc 100644 --- a/crates/iceberg/src/io/storage_memory.rs +++ b/crates/iceberg/src/io/storage_memory.rs @@ -97,17 +97,11 @@ impl Storage for OpenDALMemoryStorage { } fn new_input(&self, path: &str) -> Result { - Ok(InputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } fn new_output(&self, path: &str) -> Result { - Ok(OutputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } diff --git a/crates/iceberg/src/io/storage_oss.rs b/crates/iceberg/src/io/storage_oss.rs index fe230c7289..c070edef01 100644 --- a/crates/iceberg/src/io/storage_oss.rs +++ b/crates/iceberg/src/io/storage_oss.rs @@ -147,17 +147,11 @@ impl Storage for OpenDALOssStorage { } fn new_input(&self, path: &str) -> Result { - Ok(InputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } fn new_output(&self, path: &str) -> Result { - Ok(OutputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 24d96a2874..be7bb623af 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -314,17 +314,11 @@ impl Storage for OpenDALS3Storage { } fn new_input(&self, path: &str) -> Result { - Ok(InputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } fn new_output(&self, path: &str) -> Result { - Ok(OutputFile { - storage: Arc::new(self.clone()), - path: path.to_string(), - }) + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } From f0bd0abff3210075019456ff74e986e16f74c83e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 16:38:46 -0700 Subject: [PATCH 09/17] add with_registry --- crates/iceberg/src/io/file_io.rs | 296 ++++++++++++++++++++++++++++++- 1 file changed, 293 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 5c8d994ae3..1077014d0d 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -175,6 +175,29 @@ impl Extensions { } /// Builder for [`FileIO`]. +/// +/// # Custom Storage Implementations +/// +/// You can use custom storage implementations by creating a custom +/// [`StorageBuilderRegistry`] and registering your storage builder: +/// +/// ```rust,ignore +/// use iceberg::io::{StorageBuilderRegistry, StorageBuilder, FileIOBuilder}; +/// use std::sync::Arc; +/// +/// // Create your custom storage builder +/// let my_builder = Arc::new(MyCustomStorageBuilder); +/// +/// // Register it with a custom scheme +/// let mut registry = StorageBuilderRegistry::new(); +/// registry.register("mycustom", my_builder); +/// +/// // Use it to build FileIO +/// let file_io = FileIOBuilder::new("mycustom") +/// .with_prop("key", "value") +/// .with_registry(registry) +/// .build()?; +/// ``` #[derive(Clone, Debug)] pub struct FileIOBuilder { /// This is used to infer scheme of operator. @@ -185,6 +208,8 @@ pub struct FileIOBuilder { props: HashMap, /// Optional extensions to configure the underlying FileIO behavior. extensions: Extensions, + /// Optional custom registry. If None, a default registry will be created. + registry: Option, } impl FileIOBuilder { @@ -195,6 +220,7 @@ impl FileIOBuilder { scheme_str: Some(scheme_str.to_string()), props: HashMap::default(), extensions: Extensions::default(), + registry: None, } } @@ -204,17 +230,26 @@ impl FileIOBuilder { scheme_str: None, props: HashMap::default(), extensions: Extensions::default(), + registry: None, } } /// Fetch the scheme string. /// /// The scheme_str will be empty if it's None. - pub fn into_parts(self) -> (String, HashMap, Extensions) { + pub fn into_parts( + self, + ) -> ( + String, + HashMap, + Extensions, + Option, + ) { ( self.scheme_str.unwrap_or_default(), self.props, self.extensions, + self.registry, ) } @@ -252,13 +287,38 @@ impl FileIOBuilder { self.extensions.get::() } + /// Sets a custom storage builder registry. + /// + /// This allows you to register custom storage implementations that can be used + /// when building the FileIO. If not set, a default registry with built-in + /// storage types will be used. + /// + /// # Example + /// + /// ```rust,ignore + /// use iceberg::io::{StorageBuilderRegistry, FileIOBuilder}; + /// use std::sync::Arc; + /// + /// let mut registry = StorageBuilderRegistry::new(); + /// registry.register("mycustom", Arc::new(MyCustomStorageBuilder)); + /// + /// let file_io = FileIOBuilder::new("mycustom") + /// .with_registry(registry) + /// .build()?; + /// ``` + pub fn with_registry(mut self, registry: StorageBuilderRegistry) -> Self { + self.registry = Some(registry); + self + } + /// Builds [`FileIO`]. pub fn build(self) -> Result { // Use the scheme to determine the storage type let scheme = self.scheme_str.clone().unwrap_or_default(); - // Create registry and get builder - let registry = StorageBuilderRegistry::new(); + // Use custom registry if provided, otherwise create default + let registry = self.registry.clone().unwrap_or_default(); + let builder = registry.get_builder(scheme.as_str())?; // Build storage with props and extensions @@ -466,16 +526,94 @@ impl OutputFile { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::fs::{File, create_dir_all}; use std::io::Write; use std::path::Path; + use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use futures::AsyncReadExt; use futures::io::AllowStdIo; use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; + use crate::io::{ + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, + StorageBuilder, StorageBuilderRegistry, + }; + + // Dummy storage implementation for testing custom registries + #[derive(Debug, Clone)] + struct DummyStorage { + _scheme: String, + } + + #[async_trait] + impl Storage for DummyStorage { + async fn exists(&self, _path: &str) -> crate::Result { + Ok(true) + } + + async fn metadata(&self, _path: &str) -> crate::Result { + Ok(FileMetadata { size: 0 }) + } + + async fn read(&self, _path: &str) -> crate::Result { + Ok(Bytes::new()) + } + + async fn reader(&self, _path: &str) -> crate::Result> { + Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + "DummyStorage does not support reader", + )) + } + + async fn write(&self, _path: &str, _bs: Bytes) -> crate::Result<()> { + Ok(()) + } + + async fn writer(&self, _path: &str) -> crate::Result> { + Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + "DummyStorage does not support writer", + )) + } + + async fn delete(&self, _path: &str) -> crate::Result<()> { + Ok(()) + } + + async fn remove_dir_all(&self, _path: &str) -> crate::Result<()> { + Ok(()) + } + + fn new_input(&self, path: &str) -> crate::Result { + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + fn new_output(&self, path: &str) -> crate::Result { + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) + } + } + + // Dummy storage builder for testing + #[derive(Debug)] + struct DummyStorageBuilder; + + impl StorageBuilder for DummyStorageBuilder { + fn build( + &self, + _props: HashMap, + _extensions: Extensions, + ) -> crate::Result> { + Ok(Arc::new(DummyStorage { + _scheme: "dummy".to_string(), + })) + } + } fn create_local_file_io() -> FileIO { FileIOBuilder::new_fs_io().build().unwrap() @@ -619,4 +757,156 @@ mod tests { io.delete(&path).await.unwrap(); assert!(!io.exists(&path).await.unwrap()); } + + #[test] + fn test_custom_registry() { + // Create a custom registry and register the dummy storage + let mut registry = StorageBuilderRegistry::new(); + registry.register("dummy", Arc::new(DummyStorageBuilder)); + + // Build FileIO with custom storage + let file_io = FileIOBuilder::new("dummy") + .with_registry(registry) + .build() + .unwrap(); + + // Verify we can create files with the custom storage + assert!(file_io.new_output("dummy://test.txt").is_ok()); + assert!(file_io.new_input("dummy://test.txt").is_ok()); + } + + #[tokio::test] + async fn test_custom_registry_operations() { + // Define a dummy storage that tracks operations + #[derive(Debug, Clone)] + struct TrackingStorage { + written: Arc>>, + } + + #[async_trait] + impl Storage for TrackingStorage { + async fn exists(&self, _path: &str) -> crate::Result { + Ok(true) + } + + async fn metadata(&self, _path: &str) -> crate::Result { + Ok(FileMetadata { size: 42 }) + } + + async fn read(&self, _path: &str) -> crate::Result { + Ok(Bytes::from("test data")) + } + + async fn reader(&self, _path: &str) -> crate::Result> { + Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + "TrackingStorage does not support reader", + )) + } + + async fn write(&self, path: &str, _bs: Bytes) -> crate::Result<()> { + self.written.lock().unwrap().push(path.to_string()); + Ok(()) + } + + async fn writer(&self, _path: &str) -> crate::Result> { + Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + "TrackingStorage does not support writer", + )) + } + + async fn delete(&self, _path: &str) -> crate::Result<()> { + Ok(()) + } + + async fn remove_dir_all(&self, _path: &str) -> crate::Result<()> { + Ok(()) + } + + fn new_input(&self, path: &str) -> crate::Result { + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + fn new_output(&self, path: &str) -> crate::Result { + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) + } + } + + // Define a builder for tracking storage + #[derive(Debug)] + struct TrackingStorageBuilder { + written: Arc>>, + } + + impl StorageBuilder for TrackingStorageBuilder { + fn build( + &self, + _props: HashMap, + _extensions: Extensions, + ) -> crate::Result> { + Ok(Arc::new(TrackingStorage { + written: self.written.clone(), + })) + } + } + + // Create tracking storage + let written = Arc::new(std::sync::Mutex::new(Vec::new())); + let mut registry = StorageBuilderRegistry::new(); + registry.register( + "tracking", + Arc::new(TrackingStorageBuilder { + written: written.clone(), + }), + ); + + // Build FileIO with tracking storage + let file_io = FileIOBuilder::new("tracking") + .with_registry(registry) + .build() + .unwrap(); + + // Perform operations + let output = file_io.new_output("tracking://bucket/file.txt").unwrap(); + output.write(Bytes::from("test")).await.unwrap(); + + let input = file_io.new_input("tracking://bucket/file.txt").unwrap(); + let data = input.read().await.unwrap(); + assert_eq!(data, Bytes::from("test data")); + + let metadata = input.metadata().await.unwrap(); + assert_eq!(metadata.size, 42); + + // Verify write was tracked + let tracked = written.lock().unwrap(); + assert_eq!(tracked.len(), 1); + assert_eq!(tracked[0], "tracking://bucket/file.txt"); + } + + #[test] + fn test_into_parts_includes_registry() { + let registry = StorageBuilderRegistry::new(); + + let builder = FileIOBuilder::new("memory") + .with_prop("key", "value") + .with_registry(registry.clone()); + + let (scheme, props, _extensions, returned_registry) = builder.into_parts(); + + assert_eq!(scheme, "memory"); + assert_eq!(props.get("key"), Some(&"value".to_string())); + assert!(returned_registry.is_some()); + } + + #[test] + fn test_into_parts_without_registry() { + let builder = FileIOBuilder::new("memory").with_prop("key", "value"); + + let (scheme, props, _extensions, returned_registry) = builder.into_parts(); + + assert_eq!(scheme, "memory"); + assert_eq!(props.get("key"), Some(&"value".to_string())); + assert!(returned_registry.is_none()); + } } From 463c545de1715e69a1287ba900ef17f1acaf4ef6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 16:46:43 -0700 Subject: [PATCH 10/17] minor --- crates/iceberg/src/io/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 2397e2561f..f4c186a5c6 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -88,8 +88,12 @@ mod storage_s3; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; +#[cfg(feature = "storage-fs")] +use storage_fs::*; #[cfg(feature = "storage-gcs")] pub use storage_gcs::*; +#[cfg(feature = "storage-memory")] +use storage_memory::*; #[cfg(feature = "storage-oss")] pub use storage_oss::*; #[cfg(feature = "storage-s3")] From 96fe4a0a65f1369ab165418b0163687ed5867ea5 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 16:50:15 -0700 Subject: [PATCH 11/17] minor --- crates/iceberg/src/io/file_io.rs | 4 ---- crates/iceberg/src/io/mod.rs | 4 ---- 2 files changed, 8 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 1077014d0d..2ca5a77ddd 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -363,9 +363,7 @@ impl FileRead for opendal::Reader { /// Input file is used for reading from files. #[derive(Debug)] pub struct InputFile { - /// Storage backend for this input file. storage: Arc, - /// Absolute path to the file. path: String, } @@ -460,9 +458,7 @@ impl FileWrite for Box { /// Output file is used for writing to files. #[derive(Debug)] pub struct OutputFile { - /// Storage backend for this output file. storage: Arc, - /// Absolute path to the file. path: String, } diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index f4c186a5c6..2397e2561f 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -88,12 +88,8 @@ mod storage_s3; #[cfg(feature = "storage-azdls")] pub use storage_azdls::*; -#[cfg(feature = "storage-fs")] -use storage_fs::*; #[cfg(feature = "storage-gcs")] pub use storage_gcs::*; -#[cfg(feature = "storage-memory")] -use storage_memory::*; #[cfg(feature = "storage-oss")] pub use storage_oss::*; #[cfg(feature = "storage-s3")] From cf6998e88b61b04306001fa8d656a945ccc7dbc1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 17:29:14 -0700 Subject: [PATCH 12/17] Pass scheme strings cleanly --- crates/iceberg/src/io/file_io.rs | 7 +++++-- crates/iceberg/src/io/mod.rs | 6 ++++++ crates/iceberg/src/io/storage_azdls.rs | 3 ++- crates/iceberg/src/io/storage_s3.rs | 4 ++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 2ca5a77ddd..5ef8818033 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -27,7 +27,7 @@ use url::Url; // Re-export traits from storage module pub use super::storage::{Storage, StorageBuilder}; -use crate::io::StorageBuilderRegistry; +use crate::io::{StorageBuilderRegistry, STORAGE_LOCATION_SCHEME}; use crate::{Error, ErrorKind, Result}; /// FileIO implementation, used to manipulate files in underlying storage. @@ -321,8 +321,11 @@ impl FileIOBuilder { let builder = registry.get_builder(scheme.as_str())?; + let mut props_with_scheme = self.props.clone(); + props_with_scheme.insert(STORAGE_LOCATION_SCHEME.to_string(), scheme); + // Build storage with props and extensions - let storage = builder.build(self.props.clone(), self.extensions.clone())?; + let storage = builder.build(props_with_scheme, self.extensions.clone())?; Ok(FileIO { builder: self, diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 2397e2561f..f1a7600777 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -73,6 +73,12 @@ pub use file_io::*; pub use storage::{Storage, StorageBuilder, StorageBuilderRegistry}; pub(crate) mod object_cache; +/// Property key used to pass the scheme string to storage builders. +/// +/// Custom storage implementations can use this constant to retrieve the scheme +/// from the properties map passed to `StorageBuilder::build()`. +pub const STORAGE_LOCATION_SCHEME: &str = "iceberg.storage.location.scheme"; + #[cfg(feature = "storage-azdls")] mod storage_azdls; #[cfg(feature = "storage-fs")] diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 22da4dd125..2f963956b2 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -28,6 +28,7 @@ use url::Url; use crate::io::{ Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, + STORAGE_LOCATION_SCHEME, }; use crate::{Error, ErrorKind, Result, ensure_data_valid}; @@ -695,7 +696,7 @@ impl StorageBuilder for OpenDALAzdlsStorageBuilder { ) -> Result> { // Get the scheme string from the props or use default let scheme_str = props - .get("scheme_str") + .get(STORAGE_LOCATION_SCHEME) .cloned() .unwrap_or_else(|| "abfs".to_string()); diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index be7bb623af..63640ae591 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -28,7 +28,7 @@ use url::Url; use crate::io::{ Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, - is_truthy, + is_truthy, STORAGE_LOCATION_SCHEME, }; use crate::{Error, ErrorKind, Result}; @@ -334,7 +334,7 @@ impl StorageBuilder for OpenDALS3StorageBuilder { ) -> Result> { // Get the scheme string from the props or use "s3" as default let scheme_str = props - .get("scheme_str") + .get(STORAGE_LOCATION_SCHEME) .cloned() .unwrap_or_else(|| "s3".to_string()); From a46f94712c0a660c406de174c0fba8ddd2e4db19 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 17:32:01 -0700 Subject: [PATCH 13/17] daily fmt fix --- crates/iceberg/src/io/file_io.rs | 2 +- crates/iceberg/src/io/mod.rs | 5 +---- crates/iceberg/src/io/storage_azdls.rs | 4 ++-- crates/iceberg/src/io/storage_s3.rs | 4 ++-- 4 files changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 5ef8818033..f519613844 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -27,7 +27,7 @@ use url::Url; // Re-export traits from storage module pub use super::storage::{Storage, StorageBuilder}; -use crate::io::{StorageBuilderRegistry, STORAGE_LOCATION_SCHEME}; +use crate::io::{STORAGE_LOCATION_SCHEME, StorageBuilderRegistry}; use crate::{Error, ErrorKind, Result}; /// FileIO implementation, used to manipulate files in underlying storage. diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index f1a7600777..209be5321d 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -73,10 +73,7 @@ pub use file_io::*; pub use storage::{Storage, StorageBuilder, StorageBuilderRegistry}; pub(crate) mod object_cache; -/// Property key used to pass the scheme string to storage builders. -/// -/// Custom storage implementations can use this constant to retrieve the scheme -/// from the properties map passed to `StorageBuilder::build()`. +/// Property key used to pass the scheme string from FileIOBuilder to StorageBuilder. pub const STORAGE_LOCATION_SCHEME: &str = "iceberg.storage.location.scheme"; #[cfg(feature = "storage-azdls")] diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs index 2f963956b2..f0a85e44b1 100644 --- a/crates/iceberg/src/io/storage_azdls.rs +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -27,8 +27,8 @@ use opendal::{Configurator, Operator}; use url::Url; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, - STORAGE_LOCATION_SCHEME, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, STORAGE_LOCATION_SCHEME, + Storage, StorageBuilder, }; use crate::{Error, ErrorKind, Result, ensure_data_valid}; diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 63640ae591..cc8d84ca77 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -27,8 +27,8 @@ use reqwest::Client; use url::Url; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageBuilder, - is_truthy, STORAGE_LOCATION_SCHEME, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, STORAGE_LOCATION_SCHEME, + Storage, StorageBuilder, is_truthy, }; use crate::{Error, ErrorKind, Result}; From 12f20605018650fa6cc8f0d44aff25895dcd1e04 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 20:33:03 -0700 Subject: [PATCH 14/17] optimize tests --- crates/iceberg/src/io/file_io.rs | 240 +++++++++++++++---------------- 1 file changed, 115 insertions(+), 125 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index f519613844..e72dd1061f 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -529,7 +529,7 @@ mod tests { use std::fs::{File, create_dir_all}; use std::io::Write; use std::path::Path; - use std::sync::Arc; + use std::sync::{Arc, Mutex, MutexGuard}; use async_trait::async_trait; use bytes::Bytes; @@ -538,78 +538,107 @@ mod tests { use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; + use crate::{Error, ErrorKind, Result}; use crate::io::{ - Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, - StorageBuilder, StorageBuilderRegistry, + Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, + STORAGE_LOCATION_SCHEME, Storage, StorageBuilder, StorageBuilderRegistry, }; - // Dummy storage implementation for testing custom registries + // Test storage implementation that tracks write operations #[derive(Debug, Clone)] - struct DummyStorage { - _scheme: String, + struct TestStorage { + written: Arc>>, + received_props: HashMap, + } + + #[allow(dead_code)] + impl TestStorage { + pub fn written(&self) -> MutexGuard<'_, Vec> { + self.written.lock().unwrap() + } + + pub fn received_props(&self) -> &HashMap { + &self.received_props + } } #[async_trait] - impl Storage for DummyStorage { - async fn exists(&self, _path: &str) -> crate::Result { + impl Storage for TestStorage { + async fn exists(&self, _path: &str) -> Result { Ok(true) } - async fn metadata(&self, _path: &str) -> crate::Result { - Ok(FileMetadata { size: 0 }) + async fn metadata(&self, _path: &str) -> Result { + Ok(FileMetadata { size: 42 }) } - async fn read(&self, _path: &str) -> crate::Result { - Ok(Bytes::new()) + async fn read(&self, _path: &str) -> Result { + Ok(Bytes::from("test data")) } - async fn reader(&self, _path: &str) -> crate::Result> { - Err(crate::Error::new( - crate::ErrorKind::FeatureUnsupported, - "DummyStorage does not support reader", + async fn reader(&self, _path: &str) -> Result> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "TestStorage does not support reader", )) } - async fn write(&self, _path: &str, _bs: Bytes) -> crate::Result<()> { + async fn write(&self, path: &str, _bs: Bytes) -> Result<()> { + self.written.lock().unwrap().push(path.to_string()); Ok(()) } - async fn writer(&self, _path: &str) -> crate::Result> { - Err(crate::Error::new( - crate::ErrorKind::FeatureUnsupported, - "DummyStorage does not support writer", + async fn writer(&self, _path: &str) -> Result> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "TestStorage does not support writer", )) } - async fn delete(&self, _path: &str) -> crate::Result<()> { + async fn delete(&self, _path: &str) -> Result<()> { Ok(()) } - async fn remove_dir_all(&self, _path: &str) -> crate::Result<()> { + async fn remove_dir_all(&self, _path: &str) -> Result<()> { Ok(()) } - fn new_input(&self, path: &str) -> crate::Result { + fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } - fn new_output(&self, path: &str) -> crate::Result { + fn new_output(&self, path: &str) -> Result { Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) } } - // Dummy storage builder for testing + // Test storage builder #[derive(Debug)] - struct DummyStorageBuilder; + struct TestStorageBuilder { + written: Arc>>, + received_props: Arc>>, + } + + impl TestStorageBuilder { + pub fn written(&self) -> MutexGuard<'_, Vec> { + self.written.lock().unwrap() + } - impl StorageBuilder for DummyStorageBuilder { + pub fn received_props(&self) -> MutexGuard<'_, HashMap> { + self.received_props.lock().unwrap() + } + } + + impl StorageBuilder for TestStorageBuilder { fn build( &self, - _props: HashMap, + props: HashMap, _extensions: Extensions, - ) -> crate::Result> { - Ok(Arc::new(DummyStorage { - _scheme: "dummy".to_string(), + ) -> Result> { + *self.received_props.lock().unwrap() = props.clone(); + Ok(Arc::new(TestStorage { + written: self.written.clone(), + received_props: props, })) } } @@ -759,118 +788,48 @@ mod tests { #[test] fn test_custom_registry() { - // Create a custom registry and register the dummy storage + // Create a custom registry and register test storage + let builder = Arc::new(TestStorageBuilder { + written: Arc::new(Mutex::new(Vec::new())), + received_props: Arc::new(Mutex::new(HashMap::new())), + }); + let mut registry = StorageBuilderRegistry::new(); - registry.register("dummy", Arc::new(DummyStorageBuilder)); + registry.register("test", builder.clone()); // Build FileIO with custom storage - let file_io = FileIOBuilder::new("dummy") + let file_io = FileIOBuilder::new("test") .with_registry(registry) .build() .unwrap(); // Verify we can create files with the custom storage - assert!(file_io.new_output("dummy://test.txt").is_ok()); - assert!(file_io.new_input("dummy://test.txt").is_ok()); + assert!(file_io.new_output("test://test.txt").is_ok()); + assert!(file_io.new_input("test://test.txt").is_ok()); } #[tokio::test] async fn test_custom_registry_operations() { - // Define a dummy storage that tracks operations - #[derive(Debug, Clone)] - struct TrackingStorage { - written: Arc>>, - } - - #[async_trait] - impl Storage for TrackingStorage { - async fn exists(&self, _path: &str) -> crate::Result { - Ok(true) - } - - async fn metadata(&self, _path: &str) -> crate::Result { - Ok(FileMetadata { size: 42 }) - } - - async fn read(&self, _path: &str) -> crate::Result { - Ok(Bytes::from("test data")) - } - - async fn reader(&self, _path: &str) -> crate::Result> { - Err(crate::Error::new( - crate::ErrorKind::FeatureUnsupported, - "TrackingStorage does not support reader", - )) - } - - async fn write(&self, path: &str, _bs: Bytes) -> crate::Result<()> { - self.written.lock().unwrap().push(path.to_string()); - Ok(()) - } - - async fn writer(&self, _path: &str) -> crate::Result> { - Err(crate::Error::new( - crate::ErrorKind::FeatureUnsupported, - "TrackingStorage does not support writer", - )) - } - - async fn delete(&self, _path: &str) -> crate::Result<()> { - Ok(()) - } - - async fn remove_dir_all(&self, _path: &str) -> crate::Result<()> { - Ok(()) - } - - fn new_input(&self, path: &str) -> crate::Result { - Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) - } - - fn new_output(&self, path: &str) -> crate::Result { - Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) - } - } - - // Define a builder for tracking storage - #[derive(Debug)] - struct TrackingStorageBuilder { - written: Arc>>, - } - - impl StorageBuilder for TrackingStorageBuilder { - fn build( - &self, - _props: HashMap, - _extensions: Extensions, - ) -> crate::Result> { - Ok(Arc::new(TrackingStorage { - written: self.written.clone(), - })) - } - } - - // Create tracking storage - let written = Arc::new(std::sync::Mutex::new(Vec::new())); + // Create test storage with write tracking + let builder = Arc::new(TestStorageBuilder { + written: Arc::new(Mutex::new(Vec::new())), + received_props: Arc::new(Mutex::new(HashMap::new())), + }); + let mut registry = StorageBuilderRegistry::new(); - registry.register( - "tracking", - Arc::new(TrackingStorageBuilder { - written: written.clone(), - }), - ); + registry.register("test", builder.clone()); - // Build FileIO with tracking storage - let file_io = FileIOBuilder::new("tracking") + // Build FileIO with test storage + let file_io = FileIOBuilder::new("test") .with_registry(registry) .build() .unwrap(); // Perform operations - let output = file_io.new_output("tracking://bucket/file.txt").unwrap(); + let output = file_io.new_output("test://bucket/file.txt").unwrap(); output.write(Bytes::from("test")).await.unwrap(); - let input = file_io.new_input("tracking://bucket/file.txt").unwrap(); + let input = file_io.new_input("test://bucket/file.txt").unwrap(); let data = input.read().await.unwrap(); assert_eq!(data, Bytes::from("test data")); @@ -878,9 +837,40 @@ mod tests { assert_eq!(metadata.size, 42); // Verify write was tracked - let tracked = written.lock().unwrap(); + let tracked = builder.written(); assert_eq!(tracked.len(), 1); - assert_eq!(tracked[0], "tracking://bucket/file.txt"); + assert_eq!(tracked[0], "test://bucket/file.txt"); + } + + #[test] + fn test_scheme_and_props_propagation() { + // Create test storage that captures props + let builder = Arc::new(TestStorageBuilder { + written: Arc::new(Mutex::new(Vec::new())), + received_props: Arc::new(Mutex::new(HashMap::new())), + }); + + let mut registry = StorageBuilderRegistry::new(); + registry.register("myscheme", builder.clone()); + + // Build FileIO with custom scheme and additional props + let file_io = FileIOBuilder::new("myscheme") + .with_prop("custom.prop", "custom_value") + .with_registry(registry) + .build() + .unwrap(); + + // Verify the storage was created + assert!(file_io.new_output("myscheme://test.txt").is_ok()); + + // Verify the scheme was propagated to the builder + let props = builder.received_props(); + assert_eq!( + props.get(STORAGE_LOCATION_SCHEME), + Some(&"myscheme".to_string()) + ); + // Verify custom props were also passed + assert_eq!(props.get("custom.prop"), Some(&"custom_value".to_string())); } #[test] From 1ae79f8c587bb8e30b5e4dbd7113abbf71a2b571 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 20:34:22 -0700 Subject: [PATCH 15/17] fmt fix --- crates/iceberg/src/io/file_io.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index e72dd1061f..276b71d063 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -538,11 +538,11 @@ mod tests { use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; - use crate::{Error, ErrorKind, Result}; use crate::io::{ Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, STORAGE_LOCATION_SCHEME, Storage, StorageBuilder, StorageBuilderRegistry, }; + use crate::{Error, ErrorKind, Result}; // Test storage implementation that tracks write operations #[derive(Debug, Clone)] @@ -793,7 +793,7 @@ mod tests { written: Arc::new(Mutex::new(Vec::new())), received_props: Arc::new(Mutex::new(HashMap::new())), }); - + let mut registry = StorageBuilderRegistry::new(); registry.register("test", builder.clone()); @@ -815,7 +815,7 @@ mod tests { written: Arc::new(Mutex::new(Vec::new())), received_props: Arc::new(Mutex::new(HashMap::new())), }); - + let mut registry = StorageBuilderRegistry::new(); registry.register("test", builder.clone()); @@ -849,7 +849,7 @@ mod tests { written: Arc::new(Mutex::new(Vec::new())), received_props: Arc::new(Mutex::new(HashMap::new())), }); - + let mut registry = StorageBuilderRegistry::new(); registry.register("myscheme", builder.clone()); From b7d7937a907b4ffdfe577fbb3213630ea17634b2 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 20:48:36 -0700 Subject: [PATCH 16/17] minor --- crates/iceberg/src/io/file_io.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 276b71d063..d0c8a3a1b8 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -102,7 +102,6 @@ impl FileIO { /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. #[deprecated(note = "use remove_dir_all instead", since = "0.4.0")] pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { - // todo this should be removed as it doesn't exist in the new trait self.inner.remove_dir_all(path.as_ref()).await } From c2b5c830570284c88922ea325036b86a0393e6e6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 17 Oct 2025 20:51:02 -0700 Subject: [PATCH 17/17] minor againnnn --- crates/iceberg/src/io/file_io.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index d0c8a3a1b8..48b9396bc9 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -26,8 +26,8 @@ use bytes::Bytes; use url::Url; // Re-export traits from storage module -pub use super::storage::{Storage, StorageBuilder}; -use crate::io::{STORAGE_LOCATION_SCHEME, StorageBuilderRegistry}; +pub use super::storage::{Storage, StorageBuilder, StorageBuilderRegistry}; +use crate::io::STORAGE_LOCATION_SCHEME; use crate::{Error, ErrorKind, Result}; /// FileIO implementation, used to manipulate files in underlying storage.