Skip to content

Commit 901db63

Browse files
committed
builder, registry, split by schemes
1 parent 535de90 commit 901db63

File tree

9 files changed

+951
-11
lines changed

9 files changed

+951
-11
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@ use async_trait::async_trait;
2525
use bytes::Bytes;
2626
use url::Url;
2727

28-
use super::storage::OpenDALStorage;
2928
use crate::{Error, ErrorKind, Result};
3029

31-
/// todo doc
30+
/// Trait for storage operations in Iceberg
3231
#[async_trait]
3332
pub trait Storage: Debug + Send + Sync {
3433
/// Check if a file exists at the given path
@@ -62,6 +61,16 @@ pub trait Storage: Debug + Send + Sync {
6261
fn new_output(&self, path: &str) -> Result<OutputFile>;
6362
}
6463

64+
/// Common interface for all storage builders.
65+
pub trait StorageBuilder: Debug + Send + Sync {
66+
/// Create a new storage instance with the given properties and extensions.
67+
fn build(
68+
&self,
69+
props: HashMap<String, String>,
70+
extensions: Extensions,
71+
) -> Result<Arc<dyn Storage>>;
72+
}
73+
6574
/// FileIO implementation, used to manipulate files in underlying storage.
6675
///
6776
/// # Note
@@ -288,10 +297,19 @@ impl FileIOBuilder {
288297

289298
/// Builds [`FileIO`].
290299
pub fn build(self) -> Result<FileIO> {
291-
let storage = OpenDALStorage::build(self.clone())?;
300+
// Use the scheme to determine the storage type
301+
let scheme = self.scheme_str.clone().unwrap_or_default();
302+
303+
// Create registry and get builder
304+
let registry = crate::io::StorageBuilderRegistry::new();
305+
let builder = registry.get_builder(scheme.as_str())?;
306+
307+
// Build storage with props and extensions
308+
let storage = builder.build(self.props.clone(), self.extensions.clone())?;
309+
292310
Ok(FileIO {
293311
builder: self,
294-
inner: Arc::new(storage),
312+
inner: storage,
295313
})
296314
}
297315
}

crates/iceberg/src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@
6868
6969
mod file_io;
7070
mod storage;
71+
mod storage_builder;
7172

7273
pub use file_io::*;
74+
pub use storage_builder::StorageBuilderRegistry;
7375
pub(crate) mod object_cache;
7476

7577
#[cfg(feature = "storage-azdls")]

crates/iceberg/src/io/storage_azdls.rs

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,18 @@
1818
use std::collections::HashMap;
1919
use std::fmt::Display;
2020
use std::str::FromStr;
21+
use std::sync::Arc;
2122

22-
use opendal::Configurator;
23+
use async_trait::async_trait;
24+
use bytes::Bytes;
2325
use opendal::services::AzdlsConfig;
26+
use opendal::{Configurator, Operator};
2427
use url::Url;
2528

29+
use crate::io::{
30+
Extensions, FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
31+
StorageBuilder,
32+
};
2633
use crate::{Error, ErrorKind, Result, ensure_data_valid};
2734

2835
/// A connection string.
@@ -125,7 +132,7 @@ pub(crate) fn azdls_create_operator<'a>(
125132
/// paths are expected to contain the `dfs` storage service.
126133
/// - `wasb[s]` is used to refer to files in Blob Storage directly; paths are
127134
/// expected to contain the `blob` storage service.
128-
#[derive(Debug, PartialEq)]
135+
#[derive(Debug, Clone, PartialEq)]
129136
pub(crate) enum AzureStorageScheme {
130137
Abfs,
131138
Abfss,
@@ -597,3 +604,116 @@ mod tests {
597604
}
598605
}
599606
}
607+
608+
/// Azure Data Lake Storage implementation using OpenDAL
609+
#[derive(Debug, Clone)]
610+
pub struct OpenDALAzdlsStorage {
611+
/// Because Azdls accepts multiple possible schemes, we store the full
612+
/// passed scheme here to later validate schemes passed via paths.
613+
configured_scheme: AzureStorageScheme,
614+
config: Arc<AzdlsConfig>,
615+
}
616+
617+
impl OpenDALAzdlsStorage {
618+
/// Creates operator from path.
619+
fn create_operator<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> {
620+
let (op, relative_path) = azdls_create_operator(path, &self.config, &self.configured_scheme)?;
621+
let op = op.layer(opendal::layers::RetryLayer::new());
622+
Ok((op, relative_path))
623+
}
624+
}
625+
626+
#[async_trait]
627+
impl Storage for OpenDALAzdlsStorage {
628+
async fn exists(&self, path: &str) -> Result<bool> {
629+
let (op, relative_path) = self.create_operator(path)?;
630+
Ok(op.exists(relative_path).await?)
631+
}
632+
633+
async fn metadata(&self, path: &str) -> Result<FileMetadata> {
634+
let (op, relative_path) = self.create_operator(path)?;
635+
let meta = op.stat(relative_path).await?;
636+
Ok(FileMetadata {
637+
size: meta.content_length(),
638+
})
639+
}
640+
641+
async fn read(&self, path: &str) -> Result<Bytes> {
642+
let (op, relative_path) = self.create_operator(path)?;
643+
Ok(op.read(relative_path).await?.to_bytes())
644+
}
645+
646+
async fn reader(&self, path: &str) -> Result<Box<dyn FileRead>> {
647+
let (op, relative_path) = self.create_operator(path)?;
648+
Ok(Box::new(op.reader(relative_path).await?))
649+
}
650+
651+
async fn write(&self, path: &str, bs: Bytes) -> Result<()> {
652+
let mut writer = self.writer(path).await?;
653+
writer.write(bs).await?;
654+
writer.close().await
655+
}
656+
657+
async fn writer(&self, path: &str) -> Result<Box<dyn FileWrite>> {
658+
let (op, relative_path) = self.create_operator(path)?;
659+
Ok(Box::new(op.writer(relative_path).await?))
660+
}
661+
662+
async fn delete(&self, path: &str) -> Result<()> {
663+
let (op, relative_path) = self.create_operator(path)?;
664+
Ok(op.delete(relative_path).await?)
665+
}
666+
667+
async fn remove_dir_all(&self, path: &str) -> Result<()> {
668+
let (op, relative_path) = self.create_operator(path)?;
669+
let path = if relative_path.ends_with('/') {
670+
relative_path.to_string()
671+
} else {
672+
format!("{relative_path}/")
673+
};
674+
Ok(op.remove_all(&path).await?)
675+
}
676+
677+
fn new_input(&self, path: &str) -> Result<InputFile> {
678+
Ok(InputFile {
679+
storage: Arc::new(self.clone()),
680+
path: path.to_string(),
681+
})
682+
}
683+
684+
fn new_output(&self, path: &str) -> Result<OutputFile> {
685+
Ok(OutputFile {
686+
storage: Arc::new(self.clone()),
687+
path: path.to_string(),
688+
})
689+
}
690+
}
691+
692+
/// Builder for Azure Data Lake Storage
693+
#[derive(Debug)]
694+
pub struct OpenDALAzdlsStorageBuilder;
695+
696+
impl StorageBuilder for OpenDALAzdlsStorageBuilder {
697+
fn build(
698+
&self,
699+
props: HashMap<String, String>,
700+
_extensions: Extensions,
701+
) -> Result<Arc<dyn Storage>> {
702+
// Get the scheme string from the props or use default
703+
let scheme_str = props
704+
.get("scheme_str")
705+
.cloned()
706+
.unwrap_or_else(|| "abfs".to_string());
707+
708+
// Parse the scheme
709+
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
710+
711+
// Parse Azdls config from props
712+
let config = azdls_config_parse(props)?;
713+
714+
Ok(Arc::new(OpenDALAzdlsStorage {
715+
configured_scheme: scheme,
716+
config: Arc::new(config),
717+
}))
718+
}
719+
}

0 commit comments

Comments
 (0)