Skip to content

Commit f0bd0ab

Browse files
committed
add with_registry
1 parent 9b5c1aa commit f0bd0ab

File tree

1 file changed

+293
-3
lines changed

1 file changed

+293
-3
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 293 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,29 @@ impl Extensions {
175175
}
176176

177177
/// Builder for [`FileIO`].
178+
///
179+
/// # Custom Storage Implementations
180+
///
181+
/// You can use custom storage implementations by creating a custom
182+
/// [`StorageBuilderRegistry`] and registering your storage builder:
183+
///
184+
/// ```rust,ignore
185+
/// use iceberg::io::{StorageBuilderRegistry, StorageBuilder, FileIOBuilder};
186+
/// use std::sync::Arc;
187+
///
188+
/// // Create your custom storage builder
189+
/// let my_builder = Arc::new(MyCustomStorageBuilder);
190+
///
191+
/// // Register it with a custom scheme
192+
/// let mut registry = StorageBuilderRegistry::new();
193+
/// registry.register("mycustom", my_builder);
194+
///
195+
/// // Use it to build FileIO
196+
/// let file_io = FileIOBuilder::new("mycustom")
197+
/// .with_prop("key", "value")
198+
/// .with_registry(registry)
199+
/// .build()?;
200+
/// ```
178201
#[derive(Clone, Debug)]
179202
pub struct FileIOBuilder {
180203
/// This is used to infer scheme of operator.
@@ -185,6 +208,8 @@ pub struct FileIOBuilder {
185208
props: HashMap<String, String>,
186209
/// Optional extensions to configure the underlying FileIO behavior.
187210
extensions: Extensions,
211+
/// Optional custom registry. If None, a default registry will be created.
212+
registry: Option<StorageBuilderRegistry>,
188213
}
189214

190215
impl FileIOBuilder {
@@ -195,6 +220,7 @@ impl FileIOBuilder {
195220
scheme_str: Some(scheme_str.to_string()),
196221
props: HashMap::default(),
197222
extensions: Extensions::default(),
223+
registry: None,
198224
}
199225
}
200226

@@ -204,17 +230,26 @@ impl FileIOBuilder {
204230
scheme_str: None,
205231
props: HashMap::default(),
206232
extensions: Extensions::default(),
233+
registry: None,
207234
}
208235
}
209236

210237
/// Fetch the scheme string.
211238
///
212239
/// The scheme_str will be empty if it's None.
213-
pub fn into_parts(self) -> (String, HashMap<String, String>, Extensions) {
240+
pub fn into_parts(
241+
self,
242+
) -> (
243+
String,
244+
HashMap<String, String>,
245+
Extensions,
246+
Option<StorageBuilderRegistry>,
247+
) {
214248
(
215249
self.scheme_str.unwrap_or_default(),
216250
self.props,
217251
self.extensions,
252+
self.registry,
218253
)
219254
}
220255

@@ -252,13 +287,38 @@ impl FileIOBuilder {
252287
self.extensions.get::<T>()
253288
}
254289

290+
/// Sets a custom storage builder registry.
291+
///
292+
/// This allows you to register custom storage implementations that can be used
293+
/// when building the FileIO. If not set, a default registry with built-in
294+
/// storage types will be used.
295+
///
296+
/// # Example
297+
///
298+
/// ```rust,ignore
299+
/// use iceberg::io::{StorageBuilderRegistry, FileIOBuilder};
300+
/// use std::sync::Arc;
301+
///
302+
/// let mut registry = StorageBuilderRegistry::new();
303+
/// registry.register("mycustom", Arc::new(MyCustomStorageBuilder));
304+
///
305+
/// let file_io = FileIOBuilder::new("mycustom")
306+
/// .with_registry(registry)
307+
/// .build()?;
308+
/// ```
309+
pub fn with_registry(mut self, registry: StorageBuilderRegistry) -> Self {
310+
self.registry = Some(registry);
311+
self
312+
}
313+
255314
/// Builds [`FileIO`].
256315
pub fn build(self) -> Result<FileIO> {
257316
// Use the scheme to determine the storage type
258317
let scheme = self.scheme_str.clone().unwrap_or_default();
259318

260-
// Create registry and get builder
261-
let registry = StorageBuilderRegistry::new();
319+
// Use custom registry if provided, otherwise create default
320+
let registry = self.registry.clone().unwrap_or_default();
321+
262322
let builder = registry.get_builder(scheme.as_str())?;
263323

264324
// Build storage with props and extensions
@@ -466,16 +526,94 @@ impl OutputFile {
466526

467527
#[cfg(test)]
468528
mod tests {
529+
use std::collections::HashMap;
469530
use std::fs::{File, create_dir_all};
470531
use std::io::Write;
471532
use std::path::Path;
533+
use std::sync::Arc;
472534

535+
use async_trait::async_trait;
473536
use bytes::Bytes;
474537
use futures::AsyncReadExt;
475538
use futures::io::AllowStdIo;
476539
use tempfile::TempDir;
477540

478541
use super::{FileIO, FileIOBuilder};
542+
use crate::io::{
543+
Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
544+
StorageBuilder, StorageBuilderRegistry,
545+
};
546+
547+
// Dummy storage implementation for testing custom registries
548+
#[derive(Debug, Clone)]
549+
struct DummyStorage {
550+
_scheme: String,
551+
}
552+
553+
#[async_trait]
554+
impl Storage for DummyStorage {
555+
async fn exists(&self, _path: &str) -> crate::Result<bool> {
556+
Ok(true)
557+
}
558+
559+
async fn metadata(&self, _path: &str) -> crate::Result<FileMetadata> {
560+
Ok(FileMetadata { size: 0 })
561+
}
562+
563+
async fn read(&self, _path: &str) -> crate::Result<Bytes> {
564+
Ok(Bytes::new())
565+
}
566+
567+
async fn reader(&self, _path: &str) -> crate::Result<Box<dyn FileRead>> {
568+
Err(crate::Error::new(
569+
crate::ErrorKind::FeatureUnsupported,
570+
"DummyStorage does not support reader",
571+
))
572+
}
573+
574+
async fn write(&self, _path: &str, _bs: Bytes) -> crate::Result<()> {
575+
Ok(())
576+
}
577+
578+
async fn writer(&self, _path: &str) -> crate::Result<Box<dyn FileWrite>> {
579+
Err(crate::Error::new(
580+
crate::ErrorKind::FeatureUnsupported,
581+
"DummyStorage does not support writer",
582+
))
583+
}
584+
585+
async fn delete(&self, _path: &str) -> crate::Result<()> {
586+
Ok(())
587+
}
588+
589+
async fn remove_dir_all(&self, _path: &str) -> crate::Result<()> {
590+
Ok(())
591+
}
592+
593+
fn new_input(&self, path: &str) -> crate::Result<InputFile> {
594+
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
595+
}
596+
597+
fn new_output(&self, path: &str) -> crate::Result<OutputFile> {
598+
Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
599+
}
600+
}
601+
602+
// Dummy storage builder for testing
603+
#[derive(Debug)]
604+
struct DummyStorageBuilder;
605+
606+
impl StorageBuilder for DummyStorageBuilder {
607+
fn build(
608+
&self,
609+
_props: HashMap<String, String>,
610+
_extensions: Extensions,
611+
) -> crate::Result<Arc<dyn Storage>> {
612+
Ok(Arc::new(DummyStorage {
613+
_scheme: "dummy".to_string(),
614+
}))
615+
}
616+
}
479617

480618
fn create_local_file_io() -> FileIO {
481619
FileIOBuilder::new_fs_io().build().unwrap()
@@ -619,4 +757,156 @@ mod tests {
619757
io.delete(&path).await.unwrap();
620758
assert!(!io.exists(&path).await.unwrap());
621759
}
760+
761+
#[test]
762+
fn test_custom_registry() {
763+
// Create a custom registry and register the dummy storage
764+
let mut registry = StorageBuilderRegistry::new();
765+
registry.register("dummy", Arc::new(DummyStorageBuilder));
766+
767+
// Build FileIO with custom storage
768+
let file_io = FileIOBuilder::new("dummy")
769+
.with_registry(registry)
770+
.build()
771+
.unwrap();
772+
773+
// Verify we can create files with the custom storage
774+
assert!(file_io.new_output("dummy://test.txt").is_ok());
775+
assert!(file_io.new_input("dummy://test.txt").is_ok());
776+
}
777+
778+
#[tokio::test]
779+
async fn test_custom_registry_operations() {
780+
// Define a dummy storage that tracks operations
781+
#[derive(Debug, Clone)]
782+
struct TrackingStorage {
783+
written: Arc<std::sync::Mutex<Vec<String>>>,
784+
}
785+
786+
#[async_trait]
787+
impl Storage for TrackingStorage {
788+
async fn exists(&self, _path: &str) -> crate::Result<bool> {
789+
Ok(true)
790+
}
791+
792+
async fn metadata(&self, _path: &str) -> crate::Result<FileMetadata> {
793+
Ok(FileMetadata { size: 42 })
794+
}
795+
796+
async fn read(&self, _path: &str) -> crate::Result<Bytes> {
797+
Ok(Bytes::from("test data"))
798+
}
799+
800+
async fn reader(&self, _path: &str) -> crate::Result<Box<dyn FileRead>> {
801+
Err(crate::Error::new(
802+
crate::ErrorKind::FeatureUnsupported,
803+
"TrackingStorage does not support reader",
804+
))
805+
}
806+
807+
async fn write(&self, path: &str, _bs: Bytes) -> crate::Result<()> {
808+
self.written.lock().unwrap().push(path.to_string());
809+
Ok(())
810+
}
811+
812+
async fn writer(&self, _path: &str) -> crate::Result<Box<dyn FileWrite>> {
813+
Err(crate::Error::new(
814+
crate::ErrorKind::FeatureUnsupported,
815+
"TrackingStorage does not support writer",
816+
))
817+
}
818+
819+
async fn delete(&self, _path: &str) -> crate::Result<()> {
820+
Ok(())
821+
}
822+
823+
async fn remove_dir_all(&self, _path: &str) -> crate::Result<()> {
824+
Ok(())
825+
}
826+
827+
fn new_input(&self, path: &str) -> crate::Result<InputFile> {
828+
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
829+
}
830+
831+
fn new_output(&self, path: &str) -> crate::Result<OutputFile> {
832+
Ok(OutputFile::new(Arc::new(self.clone()), path.to_string()))
833+
}
834+
}
835+
836+
// Define a builder for tracking storage
837+
#[derive(Debug)]
838+
struct TrackingStorageBuilder {
839+
written: Arc<std::sync::Mutex<Vec<String>>>,
840+
}
841+
842+
impl StorageBuilder for TrackingStorageBuilder {
843+
fn build(
844+
&self,
845+
_props: HashMap<String, String>,
846+
_extensions: Extensions,
847+
) -> crate::Result<Arc<dyn Storage>> {
848+
Ok(Arc::new(TrackingStorage {
849+
written: self.written.clone(),
850+
}))
851+
}
852+
}
853+
854+
// Create tracking storage
855+
let written = Arc::new(std::sync::Mutex::new(Vec::new()));
856+
let mut registry = StorageBuilderRegistry::new();
857+
registry.register(
858+
"tracking",
859+
Arc::new(TrackingStorageBuilder {
860+
written: written.clone(),
861+
}),
862+
);
863+
864+
// Build FileIO with tracking storage
865+
let file_io = FileIOBuilder::new("tracking")
866+
.with_registry(registry)
867+
.build()
868+
.unwrap();
869+
870+
// Perform operations
871+
let output = file_io.new_output("tracking://bucket/file.txt").unwrap();
872+
output.write(Bytes::from("test")).await.unwrap();
873+
874+
let input = file_io.new_input("tracking://bucket/file.txt").unwrap();
875+
let data = input.read().await.unwrap();
876+
assert_eq!(data, Bytes::from("test data"));
877+
878+
let metadata = input.metadata().await.unwrap();
879+
assert_eq!(metadata.size, 42);
880+
881+
// Verify write was tracked
882+
let tracked = written.lock().unwrap();
883+
assert_eq!(tracked.len(), 1);
884+
assert_eq!(tracked[0], "tracking://bucket/file.txt");
885+
}
886+
887+
#[test]
888+
fn test_into_parts_includes_registry() {
889+
let registry = StorageBuilderRegistry::new();
890+
891+
let builder = FileIOBuilder::new("memory")
892+
.with_prop("key", "value")
893+
.with_registry(registry.clone());
894+
895+
let (scheme, props, _extensions, returned_registry) = builder.into_parts();
896+
897+
assert_eq!(scheme, "memory");
898+
assert_eq!(props.get("key"), Some(&"value".to_string()));
899+
assert!(returned_registry.is_some());
900+
}
901+
902+
#[test]
903+
fn test_into_parts_without_registry() {
904+
let builder = FileIOBuilder::new("memory").with_prop("key", "value");
905+
906+
let (scheme, props, _extensions, returned_registry) = builder.into_parts();
907+
908+
assert_eq!(scheme, "memory");
909+
assert_eq!(props.get("key"), Some(&"value".to_string()));
910+
assert!(returned_registry.is_none());
911+
}
622912
}

0 commit comments

Comments
 (0)