Skip to content

Commit c3c60d0

Browse files
authored
feat(catalog): implement catalog loader for glue (#1603)
## Which issue does this PR close? - Closes [#1259](#1259). ## What changes are included in this PR? * Added `GlueCatalogBuilder` * Implement `CatalogBuilder` trait for `GlueCatalogBuilder` * Include glue in loader ## Are these changes tested? Added in loader tests and updated glue integration tests
1 parent 4e0dd84 commit c3c60d0

File tree

8 files changed

+157
-22
lines changed

8 files changed

+157
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ hive_metastore = "0.1"
7777
http = "1.2"
7878
iceberg = { version = "0.6.0", path = "./crates/iceberg" }
7979
iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" }
80+
iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" }
8081
iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" }
8182
indicatif = "0.17"
8283
itertools = "0.13"

crates/catalog/glue/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ iceberg = { workspace = true }
3737
serde_json = { workspace = true }
3838
tokio = { workspace = true }
3939
tracing = { workspace = true }
40-
typed-builder = { workspace = true }
4140

4241
[dev-dependencies]
4342
ctor = { workspace = true }

crates/catalog/glue/src/catalog.rs

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ use iceberg::io::{
2626
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
2727
use iceberg::table::Table;
2828
use iceberg::{
29-
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
30-
TableCreation, TableIdent,
29+
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
30+
TableCommit, TableCreation, TableIdent,
3131
};
32-
use typed_builder::TypedBuilder;
3332

3433
use crate::error::{from_aws_build_error, from_aws_sdk_error};
3534
use crate::utils::{
@@ -40,15 +39,90 @@ use crate::{
4039
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
4140
};
4241

43-
#[derive(Debug, TypedBuilder)]
42+
/// Glue catalog URI
43+
pub const GLUE_CATALOG_PROP_URI: &str = "uri";
44+
/// Glue catalog id
45+
pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
46+
/// Glue catalog warehouse location
47+
pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
48+
49+
/// Builder for [`GlueCatalog`].
50+
#[derive(Debug)]
51+
pub struct GlueCatalogBuilder(GlueCatalogConfig);
52+
53+
impl Default for GlueCatalogBuilder {
54+
fn default() -> Self {
55+
Self(GlueCatalogConfig {
56+
name: None,
57+
uri: None,
58+
catalog_id: None,
59+
warehouse: "".to_string(),
60+
props: HashMap::new(),
61+
})
62+
}
63+
}
64+
65+
impl CatalogBuilder for GlueCatalogBuilder {
66+
type C = GlueCatalog;
67+
68+
fn load(
69+
mut self,
70+
name: impl Into<String>,
71+
props: HashMap<String, String>,
72+
) -> impl Future<Output = Result<Self::C>> + Send {
73+
self.0.name = Some(name.into());
74+
75+
if props.contains_key(GLUE_CATALOG_PROP_URI) {
76+
self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
77+
}
78+
79+
if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
80+
self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
81+
}
82+
83+
if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
84+
self.0.warehouse = props
85+
.get(GLUE_CATALOG_PROP_WAREHOUSE)
86+
.cloned()
87+
.unwrap_or_default();
88+
}
89+
90+
// Collect other remaining properties
91+
self.0.props = props
92+
.into_iter()
93+
.filter(|(k, _)| {
94+
k != GLUE_CATALOG_PROP_URI
95+
&& k != GLUE_CATALOG_PROP_CATALOG_ID
96+
&& k != GLUE_CATALOG_PROP_WAREHOUSE
97+
})
98+
.collect();
99+
100+
async move {
101+
if self.0.name.is_none() {
102+
return Err(Error::new(
103+
ErrorKind::DataInvalid,
104+
"Catalog name is required",
105+
));
106+
}
107+
if self.0.warehouse.is_empty() {
108+
return Err(Error::new(
109+
ErrorKind::DataInvalid,
110+
"Catalog warehouse is required",
111+
));
112+
}
113+
114+
GlueCatalog::new(self.0).await
115+
}
116+
}
117+
}
118+
119+
#[derive(Debug)]
44120
/// Glue Catalog configuration
45-
pub struct GlueCatalogConfig {
46-
#[builder(default, setter(strip_option(fallback = uri_opt)))]
121+
pub(crate) struct GlueCatalogConfig {
122+
name: Option<String>,
47123
uri: Option<String>,
48-
#[builder(default, setter(strip_option(fallback = catalog_id_opt)))]
49124
catalog_id: Option<String>,
50125
warehouse: String,
51-
#[builder(default)]
52126
props: HashMap<String, String>,
53127
}
54128

@@ -71,7 +145,7 @@ impl Debug for GlueCatalog {
71145

72146
impl GlueCatalog {
73147
/// Create a new glue catalog
74-
pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
148+
async fn new(config: GlueCatalogConfig) -> Result<Self> {
75149
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
76150
let mut file_io_props = config.props.clone();
77151
if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {

crates/catalog/glue/src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,30 @@
1616
// under the License.
1717

1818
//! Iceberg Glue Catalog implementation.
19+
//!
20+
//! To build a glue catalog with configurations
21+
//! # Example
22+
//!
23+
//! ```rust, no_run
24+
//! use std::collections::HashMap;
25+
//!
26+
//! use iceberg::CatalogBuilder;
27+
//! use iceberg_catalog_glue::{GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalogBuilder};
28+
//!
29+
//! #[tokio::main]
30+
//! async fn main() {
31+
//! let catalog = GlueCatalogBuilder::default()
32+
//! .load(
33+
//! "glue",
34+
//! HashMap::from([(
35+
//! GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
36+
//! "s3://warehouse".to_string(),
37+
//! )]),
38+
//! )
39+
//! .await
40+
//! .unwrap();
41+
//! }
42+
//! ```
1943
2044
#![deny(missing_docs)]
2145

crates/catalog/glue/tests/glue_catalog_test.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@ use std::sync::RwLock;
2424
use ctor::{ctor, dtor};
2525
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
2626
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
27-
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, TableIdent};
27+
use iceberg::{
28+
Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation, TableIdent,
29+
};
2830
use iceberg_catalog_glue::{
29-
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GlueCatalog, GlueCatalogConfig,
31+
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GLUE_CATALOG_PROP_URI,
32+
GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder,
3033
};
3134
use iceberg_test_utils::docker::DockerCompose;
3235
use iceberg_test_utils::{normalize_test_name, set_up};
@@ -112,13 +115,22 @@ async fn get_catalog() -> GlueCatalog {
112115
retries += 1;
113116
}
114117

115-
let config = GlueCatalogConfig::builder()
116-
.uri(format!("http://{}", glue_socket_addr))
117-
.warehouse("s3a://warehouse/hive".to_string())
118-
.props(props.clone())
119-
.build();
118+
let mut glue_props = HashMap::from([
119+
(
120+
GLUE_CATALOG_PROP_URI.to_string(),
121+
format!("http://{}", glue_socket_addr),
122+
),
123+
(
124+
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
125+
"s3a://warehouse/hive".to_string(),
126+
),
127+
]);
128+
glue_props.extend(props.clone());
120129

121-
GlueCatalog::new(config).await.unwrap()
130+
GlueCatalogBuilder::default()
131+
.load("glue", glue_props)
132+
.await
133+
.unwrap()
122134
}
123135

124136
async fn set_test_namespace(catalog: &GlueCatalog, namespace: &NamespaceIdent) -> Result<()> {

crates/catalog/loader/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ repository = { workspace = true }
3030

3131
[dependencies]
3232
iceberg = { workspace = true }
33-
iceberg-catalog-rest = {workspace = true}
33+
iceberg-catalog-rest = { workspace = true }
34+
iceberg-catalog-glue = { workspace = true }
3435
tokio = { workspace = true }
35-
async-trait = {workspace = true}
36+
async-trait = { workspace = true }

crates/catalog/loader/src/lib.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020

2121
use async_trait::async_trait;
2222
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
23+
use iceberg_catalog_glue::GlueCatalogBuilder;
2324
use iceberg_catalog_rest::RestCatalogBuilder;
2425

2526
#[async_trait]
@@ -46,6 +47,7 @@ impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
4647
pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
4748
match r#type {
4849
"rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box<dyn BoxedCatalogBuilder>),
50+
"glue" => Ok(Box::new(GlueCatalogBuilder::default()) as Box<dyn BoxedCatalogBuilder>),
4951
_ => Err(Error::new(
5052
ErrorKind::FeatureUnsupported,
5153
format!("Unsupported catalog type: {}", r#type),
@@ -57,12 +59,12 @@ pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
5759
mod tests {
5860
use std::collections::HashMap;
5961

60-
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
61-
6262
use crate::load;
6363

6464
#[tokio::test]
6565
async fn test_load_rest_catalog() {
66+
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
67+
6668
let catalog_loader = load("rest").unwrap();
6769
let catalog = catalog_loader
6870
.load(
@@ -79,4 +81,25 @@ mod tests {
7981

8082
assert!(catalog.is_ok());
8183
}
84+
85+
#[tokio::test]
86+
async fn test_load_glue_catalog() {
87+
use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
88+
89+
let catalog_loader = load("glue").unwrap();
90+
let catalog = catalog_loader
91+
.load(
92+
"glue".to_string(),
93+
HashMap::from([
94+
(
95+
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
96+
"s3://test".to_string(),
97+
),
98+
("key".to_string(), "value".to_string()),
99+
]),
100+
)
101+
.await;
102+
103+
assert!(catalog.is_ok());
104+
}
82105
}

0 commit comments

Comments
 (0)