From a666f869d875222bad0471199c126536188801d1 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Mon, 10 Nov 2025 12:47:57 +0000 Subject: [PATCH 1/3] Allow to configure timeout settings in OpenDAL Available Settings for Retry/Timeout io.retry.max-attempts - Maximum retry attempts (default: 3) io.retry.min-delay-ms - Minimum delay between retries (default: 1000) io.retry.max-delay-ms - Maximum delay between retries (default: 60000) io.retry.factor - Exponential backoff factor (default: 2.0) io.timeout-ms - Operation timeout in milliseconds (optional, no default) --- crates/iceberg/src/io/file_io.rs | 57 +++++++++++++++++++++++++--- crates/iceberg/src/io/storage.rs | 64 ++++++++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 7666f0dc76..eaf74dfd24 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -33,6 +33,30 @@ use crate::{Error, ErrorKind, Result}; /// like Cloudlare R2 requires all chunk sizes to be consistent except for the last. pub const IO_CHUNK_SIZE: &str = "io.write.chunk-size"; +/// Configuration property for setting the timeout duration for IO operations. +/// +/// This timeout is applied to individual operations like read, write, delete, etc. +/// Value should be in seconds. If not set, uses OpenDAL's default timeout. +pub const IO_TIMEOUT_SECONDS: &str = "io.timeout"; + +/// Configuration property for setting the maximum number of retries for IO operations. +/// +/// This controls how many times an operation will be retried upon failure. +/// If not set, uses OpenDAL's default retry count. +pub const IO_MAX_RETRIES: &str = "io.max-retries"; + +/// Configuration property for setting the minimum retry delay in milliseconds. +/// +/// This controls the minimum delay between retry attempts. +/// If not set, uses OpenDAL's default minimum delay. +pub const IO_RETRY_MIN_DELAY_MS: &str = "io.retry.min-delay"; + +/// Configuration property for setting the maximum retry delay in milliseconds. +/// +/// This controls the maximum delay between retry attempts. +/// If not set, uses OpenDAL's default maximum delay. +pub const IO_RETRY_MAX_DELAY_MS: &str = "io.retry.max-delay"; + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -96,7 +120,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; Ok(op.delete(relative_path).await?) } @@ -106,7 +130,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; Ok(op.remove_all(relative_path).await?) } @@ -116,7 +140,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn exists(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; Ok(op.exists(relative_path).await?) } @@ -126,7 +150,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_input(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); Ok(InputFile { @@ -142,7 +166,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_output(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator(&path)?; + let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); @@ -426,7 +450,10 @@ mod tests { use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; - use crate::io::IO_CHUNK_SIZE; + use crate::io::{ + IO_CHUNK_SIZE, IO_MAX_RETRIES, IO_RETRY_MAX_DELAY_MS, IO_RETRY_MIN_DELAY_MS, + IO_TIMEOUT_SECONDS, + }; fn create_local_file_io() -> FileIO { FileIOBuilder::new_fs_io().build().unwrap() @@ -574,4 +601,22 @@ mod tests { let output_file = io.new_output(&path).unwrap(); assert_eq!(Some(32 * 1024 * 1024), output_file.chunk_size); } + + #[test] + fn test_file_io_builder_with_timeout_and_retry_config() { + let builder = FileIOBuilder::new("memory") + .with_prop(IO_TIMEOUT_SECONDS, "30") + .with_prop(IO_MAX_RETRIES, "5") + .with_prop(IO_RETRY_MIN_DELAY_MS, "100") + .with_prop(IO_RETRY_MAX_DELAY_MS, "5000"); + + assert_eq!(builder.props.get(IO_TIMEOUT_SECONDS).unwrap(), "30"); + assert_eq!(builder.props.get(IO_MAX_RETRIES).unwrap(), "5"); + assert_eq!(builder.props.get(IO_RETRY_MIN_DELAY_MS).unwrap(), "100"); + assert_eq!(builder.props.get(IO_RETRY_MAX_DELAY_MS).unwrap(), "5000"); + + // Verify that FileIO can be built with these configurations + let file_io = builder.build(); + assert!(file_io.is_ok()); + } } diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index c9c0f46505..a31fca2d64 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use opendal::layers::{RetryLayer, TimeoutLayer}; #[cfg(feature = "storage-azblob")] @@ -32,7 +34,7 @@ use opendal::{Operator, Scheme}; #[cfg(feature = "storage-azdls")] use super::AzureStorageScheme; -use super::FileIOBuilder; +use super::{FileIOBuilder, IO_MAX_RETRIES, IO_RETRY_MAX_DELAY_MS, IO_RETRY_MIN_DELAY_MS, IO_TIMEOUT_SECONDS}; use crate::{Error, ErrorKind}; /// The storage carries all supported storage services in iceberg @@ -116,11 +118,12 @@ impl Storage { } } - /// Creates operator from path. + /// Creates operator from path with configuration. /// /// # Arguments /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. + /// * config: Configuration properties for timeout and retry settings. /// /// # Returns /// @@ -128,9 +131,10 @@ impl Storage { /// /// * An [`opendal::Operator`] instance used to operate on file. /// * Relative path to the root uri of [`opendal::Operator`]. - pub(crate) fn create_operator<'a>( + pub(crate) fn create_operator_with_config<'a>( &self, path: &'a impl AsRef, + config: &HashMap, ) -> crate::Result<(Operator, &'a str)> { let path = path.as_ref(); let (operator, relative_path): (Operator, &str) = match self { @@ -237,11 +241,63 @@ impl Storage { // Transient errors are common for object stores; however there's no // harm in retrying temporary failures for other storage backends as well. - let operator = operator.layer(TimeoutLayer::new()).layer(RetryLayer::new()); + + // Configure timeout layer + let operator = if let Some(timeout_str) = config.get(IO_TIMEOUT_SECONDS) { + if let Ok(timeout_secs) = timeout_str.parse::() { + operator.layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(timeout_secs))) + } else { + operator.layer(TimeoutLayer::new()) + } + } else { + operator.layer(TimeoutLayer::new()) + }; + + // Configure retry layer + let mut retry_layer = RetryLayer::new(); + + if let Some(max_retries_str) = config.get(IO_MAX_RETRIES) { + if let Ok(max_retries) = max_retries_str.parse::() { + retry_layer = retry_layer.with_max_times(max_retries); + } + } + + if let Some(min_delay_str) = config.get(IO_RETRY_MIN_DELAY_MS) { + if let Ok(min_delay_ms) = min_delay_str.parse::() { + retry_layer = retry_layer.with_min_delay(Duration::from_millis(min_delay_ms)); + } + } + + if let Some(max_delay_str) = config.get(IO_RETRY_MAX_DELAY_MS) { + if let Ok(max_delay_ms) = max_delay_str.parse::() { + retry_layer = retry_layer.with_max_delay(Duration::from_millis(max_delay_ms)); + } + } + + let operator = operator.layer(retry_layer); Ok((operator, relative_path)) } + /// Creates operator from path using default configuration. + /// + /// # Arguments + /// + /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. + /// + /// # Returns + /// + /// The return value consists of two parts: + /// + /// * An [`opendal::Operator`] instance used to operate on file. + /// * Relative path to the root uri of [`opendal::Operator`]. + pub(crate) fn create_operator<'a>( + &self, + path: &'a impl AsRef, + ) -> crate::Result<(Operator, &'a str)> { + self.create_operator_with_config(path, &HashMap::new()) + } + /// Parse scheme. fn parse_scheme(scheme: &str) -> crate::Result { match scheme { From 2710f5e408eacb40e0c6b96f0d1bff515db86f37 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Mon, 10 Nov 2025 12:54:14 +0000 Subject: [PATCH 2/3] Apply cargo fmt --- crates/iceberg/src/io/file_io.rs | 28 +++++++++++++++++++--------- crates/iceberg/src/io/storage.rs | 19 +++++++++++-------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index eaf74dfd24..e3dc4c07fd 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -120,7 +120,9 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; + let (op, relative_path) = self + .inner + .create_operator_with_config(&path, &self.builder.props)?; Ok(op.delete(relative_path).await?) } @@ -130,7 +132,9 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { - let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; + let (op, relative_path) = self + .inner + .create_operator_with_config(&path, &self.builder.props)?; Ok(op.remove_all(relative_path).await?) } @@ -140,7 +144,9 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn exists(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; + let (op, relative_path) = self + .inner + .create_operator_with_config(&path, &self.builder.props)?; Ok(op.exists(relative_path).await?) } @@ -150,7 +156,9 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_input(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; + let (op, relative_path) = self + .inner + .create_operator_with_config(&path, &self.builder.props)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); Ok(InputFile { @@ -166,16 +174,18 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_output(&self, path: impl AsRef) -> Result { - let (op, relative_path) = self.inner.create_operator_with_config(&path, &self.builder.props)?; + let (op, relative_path) = self + .inner + .create_operator_with_config(&path, &self.builder.props)?; let path = path.as_ref().to_string(); let relative_path_pos = path.len() - relative_path.len(); - + // ADLS requires append mode for writes #[cfg(feature = "storage-azdls")] let append_file = matches!(self.inner.as_ref(), Storage::Azdls { .. }); #[cfg(not(feature = "storage-azdls"))] let append_file = false; - + Ok(OutputFile { op, path, @@ -609,12 +619,12 @@ mod tests { .with_prop(IO_MAX_RETRIES, "5") .with_prop(IO_RETRY_MIN_DELAY_MS, "100") .with_prop(IO_RETRY_MAX_DELAY_MS, "5000"); - + assert_eq!(builder.props.get(IO_TIMEOUT_SECONDS).unwrap(), "30"); assert_eq!(builder.props.get(IO_MAX_RETRIES).unwrap(), "5"); assert_eq!(builder.props.get(IO_RETRY_MIN_DELAY_MS).unwrap(), "100"); assert_eq!(builder.props.get(IO_RETRY_MAX_DELAY_MS).unwrap(), "5000"); - + // Verify that FileIO can be built with these configurations let file_io = builder.build(); assert!(file_io.is_ok()); diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index a31fca2d64..38bafd8e6e 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -34,7 +34,9 @@ use opendal::{Operator, Scheme}; #[cfg(feature = "storage-azdls")] use super::AzureStorageScheme; -use super::{FileIOBuilder, IO_MAX_RETRIES, IO_RETRY_MAX_DELAY_MS, IO_RETRY_MIN_DELAY_MS, IO_TIMEOUT_SECONDS}; +use super::{ + FileIOBuilder, IO_MAX_RETRIES, IO_RETRY_MAX_DELAY_MS, IO_RETRY_MIN_DELAY_MS, IO_TIMEOUT_SECONDS, +}; use crate::{Error, ErrorKind}; /// The storage carries all supported storage services in iceberg @@ -241,39 +243,40 @@ impl Storage { // Transient errors are common for object stores; however there's no // harm in retrying temporary failures for other storage backends as well. - + // Configure timeout layer let operator = if let Some(timeout_str) = config.get(IO_TIMEOUT_SECONDS) { if let Ok(timeout_secs) = timeout_str.parse::() { - operator.layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(timeout_secs))) + operator + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(timeout_secs))) } else { operator.layer(TimeoutLayer::new()) } } else { operator.layer(TimeoutLayer::new()) }; - + // Configure retry layer let mut retry_layer = RetryLayer::new(); - + if let Some(max_retries_str) = config.get(IO_MAX_RETRIES) { if let Ok(max_retries) = max_retries_str.parse::() { retry_layer = retry_layer.with_max_times(max_retries); } } - + if let Some(min_delay_str) = config.get(IO_RETRY_MIN_DELAY_MS) { if let Ok(min_delay_ms) = min_delay_str.parse::() { retry_layer = retry_layer.with_min_delay(Duration::from_millis(min_delay_ms)); } } - + if let Some(max_delay_str) = config.get(IO_RETRY_MAX_DELAY_MS) { if let Ok(max_delay_ms) = max_delay_str.parse::() { retry_layer = retry_layer.with_max_delay(Duration::from_millis(max_delay_ms)); } } - + let operator = operator.layer(retry_layer); Ok((operator, relative_path)) From c7b5c96d68a188b6770fd823861cfed9fb0d6dd3 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Mon, 10 Nov 2025 13:02:10 +0000 Subject: [PATCH 3/3] Cleanup unused function --- crates/iceberg/src/io/storage.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 38bafd8e6e..7e5c344820 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -282,25 +282,6 @@ impl Storage { Ok((operator, relative_path)) } - /// Creates operator from path using default configuration. - /// - /// # Arguments - /// - /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. - /// - /// # Returns - /// - /// The return value consists of two parts: - /// - /// * An [`opendal::Operator`] instance used to operate on file. - /// * Relative path to the root uri of [`opendal::Operator`]. - pub(crate) fn create_operator<'a>( - &self, - path: &'a impl AsRef, - ) -> crate::Result<(Operator, &'a str)> { - self.create_operator_with_config(path, &HashMap::new()) - } - /// Parse scheme. fn parse_scheme(scheme: &str) -> crate::Result { match scheme {