Skip to content

Commit 7b507b8

Browse files
committed
Add CreateTable API with typestate pattern and comprehensive testing
This commit introduces a complete CreateTable API for Delta Kernel Rust, enabling programmatic creation of Delta tables with proper metadata tracking and validation. The API uses a typestate pattern to enforce correct usage at compile time: 1. **CreateTableBuilder** (configuration stage) - Configure table properties, schema, partition columns - Builder methods: with_table_properties(), with_partition_columns() - Validates configuration before transitioning 2. **CreateTableTransaction** (commit stage) - Commits Protocol, Metadata, and CommitInfo actions - Returns Transaction for future data operations - Enforces that metadata is committed before data operations This pattern is extensible for future table features: - Add builder methods for clustering, constraints, invariants - Add table feature flags to Protocol action - Extend validation in build() method - Maintain type safety throughout Example extensibility: ```rust CreateTableBuilder::new(path, schema, engine_info) .with_table_properties(props) .with_partition_columns(vec!["date", "region"]) // ✓ Supported .with_clustering(vec!["user_id"]) // Future .with_check_constraints(constraints) // Future .build(engine)? ``` - test_create_simple_table: Basic table creation with events schema - test_create_table_with_properties: Financial table with Delta properties - test_create_table_already_exists: Duplicate creation prevention - test_create_table_multiple_properties: Builder pattern chaining **Coverage**: Verifies tables load correctly and schemas are preserved - test_commit_info_is_written_to_log: Validates all 3 actions present * Parses actual JSON log file * Verifies Protocol (minReaderVersion=3, minWriterVersion=7) * Verifies Metadata (id, schemaString, createdTime) * Verifies CommitInfo (timestamp, operation, engineInfo, txnId, kernelVersion) - test_log_action_order: Ensures correct action ordering **Coverage**: Guarantees Delta protocol compliance and complete audit trail - test_create_table_empty_schema: Schema validation - Realistic schemas: DECIMAL types, TIMESTAMP, DATE, BOOLEAN **Coverage**: Prevents invalid table configurations ```rust use delta_kernel::table_manager::TableManager; use delta_kernel::schema::{StructType, StructField, DataType}; use delta_kernel::committer::FileSystemCommitter; use std::sync::Arc; use std::collections::HashMap; // Define schema let schema = Arc::new(StructType::try_new(vec![ StructField::new("user_id", DataType::LONG, false), StructField::new("event_type", DataType::STRING, false), StructField::new("timestamp", DataType::TIMESTAMP, false), ])?); // Configure table properties let mut properties = HashMap::new(); properties.insert("delta.appendOnly".to_string(), "true".to_string()); properties.insert("delta.checkpointInterval".to_string(), "10".to_string()); // Create table with typestate pattern let create_txn = TableManager::create_table( "/path/to/table", schema, "MyApplication/1.0" ) .with_table_properties(properties) .build(&engine)?; // Commit metadata (creates _delta_log/00000000000000000000.json) let txn = create_txn.commit_metadata(&engine, Box::new(FileSystemCommitter::new()))?; // Future: Use returned transaction for data operations // txn.add_files(data); // txn.commit(&engine)?; ``` - kernel/src/actions/mod.rs: Protocol version constants - kernel/src/transaction/create_table.rs: CommitInfo generation - kernel/tests/create_table.rs: Comprehensive test suite (7 tests) None - this is a new API addition. - Add support for partition columns in builder - Add clustering configuration - Add check constraints and invariants - Support column mapping modes - Enable schema evolution from CreateTableTransaction
1 parent 74e75d3 commit 7b507b8

File tree

7 files changed

+857
-63
lines changed

7 files changed

+857
-63
lines changed

kernel/examples/write-table/src/main.rs

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::collections::HashMap;
2-
use std::fs::{create_dir_all, write};
2+
use std::fs::create_dir_all;
33
use std::path::Path;
44
use std::process::ExitCode;
55
use std::sync::Arc;
@@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
99
use clap::Parser;
1010
use common::{LocationArgs, ParseWithExamples};
1111
use itertools::Itertools;
12-
use serde_json::{json, to_vec};
1312
use url::Url;
14-
use uuid::Uuid;
1513

1614
use delta_kernel::arrow::array::TimestampMicrosecondArray;
1715
use delta_kernel::committer::FileSystemCommitter;
@@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
2018
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
2119
use delta_kernel::engine::default::DefaultEngine;
2220
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
21+
use delta_kernel::table_manager::TableManager;
2322
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
2423
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};
2524

@@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot(
152151
// Create new table
153152
println!("Creating new Delta table...");
154153
let schema = parse_schema(schema_str)?;
155-
create_table(url, &schema).await?;
154+
create_table(url, &schema, engine).await?;
156155
Snapshot::builder_for(url.clone()).build(engine)
157156
}
158157
}
@@ -192,66 +191,16 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
192191
Ok(Arc::new(StructType::try_new(fields)?))
193192
}
194193

195-
/// Create a new Delta table with the given schema.
196-
///
197-
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
198-
/// initial transaction log.
199-
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
200-
let table_id = Uuid::new_v4().to_string();
201-
let schema_str = serde_json::to_string(&schema)?;
194+
/// Create a new Delta table with the given schema using the official CreateTable API.
195+
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
196+
// Use the new TableManager API to create the table
197+
let table_path = table_url.as_str();
198+
let create_txn =
199+
TableManager::create_table(table_path, schema.clone(), "write-table-example/1.0")
200+
.build(engine)?;
202201

203-
let (reader_features, writer_features) = {
204-
let reader_features: Vec<&'static str> = vec![];
205-
let writer_features: Vec<&'static str> = vec![];
206-
207-
// TODO: Support adding specific table features
208-
(reader_features, writer_features)
209-
};
210-
211-
let protocol = json!({
212-
"protocol": {
213-
"minReaderVersion": 3,
214-
"minWriterVersion": 7,
215-
"readerFeatures": reader_features,
216-
"writerFeatures": writer_features,
217-
}
218-
});
219-
let partition_columns: Vec<String> = vec![];
220-
let metadata = json!({
221-
"metaData": {
222-
"id": table_id,
223-
"format": {
224-
"provider": "parquet",
225-
"options": {}
226-
},
227-
"schemaString": schema_str,
228-
"partitionColumns": partition_columns,
229-
"configuration": {},
230-
"createdTime": 1677811175819u64
231-
}
232-
});
233-
234-
let data = [
235-
to_vec(&protocol).unwrap(),
236-
b"\n".to_vec(),
237-
to_vec(&metadata).unwrap(),
238-
]
239-
.concat();
240-
241-
// Write the initial transaction with protocol and metadata to 0.json
242-
let delta_log_path = table_url
243-
.join("_delta_log/")?
244-
.to_file_path()
245-
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
246-
let file_path = delta_log_path.join("00000000000000000000.json");
247-
248-
// Create the _delta_log directory if it doesn't exist
249-
create_dir_all(&delta_log_path)
250-
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;
251-
252-
// Write the file using standard filesystem operations
253-
write(&file_path, data)
254-
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
202+
// Commit the metadata - this creates the table
203+
let _txn = create_txn.commit_metadata(engine, Box::new(FileSystemCommitter::new()))?;
255204

256205
println!("✓ Created Delta table with schema: {schema:#?}");
257206
Ok(())

kernel/src/actions/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";
6868

6969
pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";
7070

71+
/// Minimum reader version for tables that use table features.
72+
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
73+
#[internal_api]
74+
pub(crate) const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;
75+
76+
/// Minimum writer version for tables that use table features.
77+
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
78+
#[internal_api]
79+
pub(crate) const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;
80+
7181
static COMMIT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
7282
Arc::new(StructType::new_unchecked([
7383
StructField::nullable(ADD_NAME, Add::to_schema()),

kernel/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ pub mod snapshot;
9999
pub mod table_changes;
100100
pub mod table_configuration;
101101
pub mod table_features;
102+
pub mod table_manager;
102103
pub mod table_properties;
103104
pub mod transaction;
104105
pub(crate) mod transforms;
@@ -156,6 +157,8 @@ pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
156157
pub use log_compaction::{should_compact, LogCompactionWriter};
157158
pub use snapshot::Snapshot;
158159
pub use snapshot::SnapshotRef;
160+
pub use table_manager::TableManager;
161+
pub use transaction::create_table::{CreateTableBuilder, CreateTableTransaction};
159162

160163
use expressions::literal_expression_transform::LiteralExpressionTransform;
161164
use expressions::Scalar;

kernel/src/table_manager.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//! The `table_manager` module provides the entry point for creating and managing Delta tables.
2+
//!
3+
//! This module exposes the [`TableManager`] struct which provides static factory methods for
4+
//! creating new Delta tables with a fluent builder API.
5+
6+
use crate::schema::SchemaRef;
7+
use crate::transaction::create_table::CreateTableBuilder;
8+
9+
/// Entry point for creating and managing Delta tables.
10+
///
11+
/// `TableManager` provides static factory methods that return builders for configuring
12+
/// and creating new Delta tables.
13+
pub struct TableManager;
14+
15+
impl TableManager {
16+
/// Creates a builder for creating a new Delta table.
17+
///
18+
/// This method returns a [`CreateTableBuilder`] that can be configured with table
19+
/// properties and other options before building the transaction.
20+
///
21+
/// # Arguments
22+
///
23+
/// * `path` - The file system path where the Delta table will be created
24+
/// * `schema` - The schema for the new table
25+
/// * `engine_info` - Information about the engine creating the table (e.g., "MyApp/1.0")
26+
///
27+
/// # Example
28+
///
29+
/// ```rust,no_run
30+
/// use delta_kernel::table_manager::TableManager;
31+
/// use delta_kernel::schema::{StructType, StructField, DataType};
32+
/// use std::sync::Arc;
33+
/// # use delta_kernel::Engine;
34+
/// # fn example(engine: &dyn Engine) -> delta_kernel::DeltaResult<()> {
35+
///
36+
/// let schema = Arc::new(StructType::try_new(vec![
37+
/// StructField::new("id", DataType::INTEGER, false),
38+
/// StructField::new("name", DataType::STRING, true),
39+
/// ])?);
40+
///
41+
/// let create_txn = TableManager::create_table("/path/to/table", schema, "MyApp/1.0")
42+
/// .build(engine)?;
43+
/// # Ok(())
44+
/// # }
45+
/// ```
46+
pub fn create_table(
47+
path: impl AsRef<str>,
48+
schema: SchemaRef,
49+
engine_info: impl Into<String>,
50+
) -> CreateTableBuilder {
51+
CreateTableBuilder::new(path, schema, engine_info)
52+
}
53+
}

0 commit comments

Comments
 (0)