Skip to content

Commit e739656

Browse files
committed
test: Add comprehensive tests for CreateTable API
Add integration tests covering the CreateTable API functionality: Test Coverage: - test_create_simple_table: Basic table creation with a multi-column schema (event_id, user_id, event_type, timestamp, properties) - test_create_table_with_properties: Table creation with Delta properties (appendOnly, deletedFileRetentionDuration, checkpointInterval, logRetentionDuration) - test_create_table_already_exists: Verifies error when attempting to create a table at an existing path - test_create_table_empty_schema: Validates rejection of empty schemas - test_create_table_multiple_properties: Tests builder pattern with multiple with_table_properties() calls - test_commit_info_is_written_to_log: Verifies the Delta log contains correct CommitInfo, Protocol, and Metadata actions with proper fields (timestamp, engineInfo, operation, txnId, kernelVersion) - test_log_action_order: Ensures actions are written in ICT-compliant order (CommitInfo first, then Protocol, then Metadata) All tests use realistic schemas (events, financial transactions, user profiles, product catalogs) to validate real-world usage patterns.
1 parent d3c8578 commit e739656

File tree

1 file changed

+391
-0
lines changed

1 file changed

+391
-0
lines changed

kernel/tests/create_table.rs

Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
//! Integration tests for the CreateTable API
2+
3+
use std::collections::HashMap;
4+
use std::sync::Arc;
5+
6+
use delta_kernel::committer::FileSystemCommitter;
7+
use delta_kernel::schema::{DataType, StructField, StructType};
8+
use delta_kernel::snapshot::Snapshot;
9+
use delta_kernel::table_manager::TableManager;
10+
use delta_kernel::DeltaResult;
11+
use serde_json::Value;
12+
use tempfile::tempdir;
13+
use test_utils::create_default_engine;
14+
15+
#[tokio::test]
16+
async fn test_create_simple_table() -> DeltaResult<()> {
17+
// Setup
18+
let temp_dir = tempdir().expect("Failed to create temp dir");
19+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
20+
21+
let engine =
22+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
23+
24+
// Create schema for an events table
25+
let schema = Arc::new(StructType::try_new(vec![
26+
StructField::new("event_id", DataType::LONG, false),
27+
StructField::new("user_id", DataType::LONG, false),
28+
StructField::new("event_type", DataType::STRING, false),
29+
StructField::new("timestamp", DataType::TIMESTAMP, false),
30+
StructField::new("properties", DataType::STRING, true),
31+
])?);
32+
33+
// Create table using new API
34+
let _result = TableManager::create_table(&table_path, schema.clone(), "DeltaKernel-RS/0.17.0")
35+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
36+
.commit(engine.as_ref())?;
37+
38+
// Verify table was created
39+
let table_url = delta_kernel::try_parse_uri(&table_path)?;
40+
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
41+
42+
assert_eq!(snapshot.version(), 0);
43+
assert_eq!(snapshot.schema().fields().len(), 5);
44+
45+
// Verify schema field names
46+
let field_names: Vec<_> = snapshot
47+
.schema()
48+
.fields()
49+
.map(|f| f.name().to_string())
50+
.collect();
51+
assert!(field_names.contains(&"event_id".to_string()));
52+
assert!(field_names.contains(&"user_id".to_string()));
53+
assert!(field_names.contains(&"event_type".to_string()));
54+
assert!(field_names.contains(&"timestamp".to_string()));
55+
assert!(field_names.contains(&"properties".to_string()));
56+
57+
Ok(())
58+
}
59+
60+
#[tokio::test]
61+
async fn test_create_table_with_properties() -> DeltaResult<()> {
62+
// Setup
63+
let temp_dir = tempdir().expect("Failed to create temp dir");
64+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
65+
66+
let engine =
67+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
68+
69+
// Create schema for a financial transactions table
70+
let schema = Arc::new(StructType::try_new(vec![
71+
StructField::new("transaction_id", DataType::STRING, false),
72+
StructField::new("account_id", DataType::LONG, false),
73+
StructField::new("amount", DataType::decimal(18, 2)?, false),
74+
StructField::new("currency", DataType::STRING, false),
75+
StructField::new("transaction_date", DataType::DATE, false),
76+
StructField::new("description", DataType::STRING, true),
77+
])?);
78+
79+
// Create table with realistic Delta table properties
80+
let mut properties = HashMap::new();
81+
properties.insert("delta.appendOnly".to_string(), "true".to_string());
82+
properties.insert(
83+
"delta.deletedFileRetentionDuration".to_string(),
84+
"interval 7 days".to_string(),
85+
);
86+
properties.insert("delta.checkpointInterval".to_string(), "10".to_string());
87+
properties.insert(
88+
"delta.logRetentionDuration".to_string(),
89+
"interval 30 days".to_string(),
90+
);
91+
92+
let _result = TableManager::create_table(&table_path, schema.clone(), "FinanceApp/2.5.1")
93+
.with_table_properties(properties.clone())
94+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
95+
.commit(engine.as_ref())?;
96+
97+
// Verify table was created with properties
98+
let table_url = delta_kernel::try_parse_uri(&table_path)?;
99+
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
100+
101+
assert_eq!(snapshot.version(), 0);
102+
103+
// Check table properties
104+
let table_properties = snapshot
105+
.table_configuration()
106+
.metadata()
107+
.parse_table_properties();
108+
assert_eq!(table_properties.append_only, Some(true));
109+
110+
Ok(())
111+
}
112+
113+
#[tokio::test]
114+
async fn test_create_table_already_exists() -> DeltaResult<()> {
115+
// Setup
116+
let temp_dir = tempdir().expect("Failed to create temp dir");
117+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
118+
119+
let engine =
120+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
121+
122+
// Create schema for a user profiles table
123+
let schema = Arc::new(StructType::try_new(vec![
124+
StructField::new("user_id", DataType::LONG, false),
125+
StructField::new("username", DataType::STRING, false),
126+
StructField::new("email", DataType::STRING, false),
127+
StructField::new("created_at", DataType::TIMESTAMP, false),
128+
StructField::new("is_active", DataType::BOOLEAN, false),
129+
])?);
130+
131+
// Create table first time
132+
let _result =
133+
TableManager::create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
134+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
135+
.commit(engine.as_ref())?;
136+
137+
// Try to create again - should fail at build time (table already exists)
138+
let result =
139+
TableManager::create_table(&table_path, schema.clone(), "UserManagementService/1.2.0")
140+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
141+
142+
assert!(result.is_err());
143+
let err = result.unwrap_err();
144+
assert!(err.to_string().contains("already exists"));
145+
146+
Ok(())
147+
}
148+
149+
#[tokio::test]
150+
async fn test_create_table_empty_schema() -> DeltaResult<()> {
151+
// Setup
152+
let temp_dir = tempdir().expect("Failed to create temp dir");
153+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
154+
155+
let engine =
156+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
157+
158+
// Create empty schema
159+
let schema = Arc::new(StructType::try_new(vec![])?);
160+
161+
// Try to create table with empty schema - should fail at build time
162+
let result =
163+
TableManager::create_table(&table_path, schema, "InvalidApp/0.1.0")
164+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()));
165+
166+
assert!(result.is_err());
167+
let err = result.unwrap_err();
168+
assert!(err.to_string().contains("cannot be empty"));
169+
170+
Ok(())
171+
}
172+
173+
#[tokio::test]
174+
async fn test_create_table_multiple_properties() -> DeltaResult<()> {
175+
// Setup
176+
let temp_dir = tempdir().expect("Failed to create temp dir");
177+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
178+
179+
let engine =
180+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
181+
182+
// Create schema for a product catalog table
183+
let schema = Arc::new(StructType::try_new(vec![
184+
StructField::new("product_id", DataType::STRING, false),
185+
StructField::new("product_name", DataType::STRING, false),
186+
StructField::new("category", DataType::STRING, false),
187+
StructField::new("price", DataType::decimal(10, 2)?, false),
188+
StructField::new("inventory_count", DataType::INTEGER, false),
189+
StructField::new("last_updated", DataType::TIMESTAMP, false),
190+
StructField::new("is_available", DataType::BOOLEAN, false),
191+
])?);
192+
193+
// Create table with multiple property calls to test builder pattern
194+
let mut props1 = HashMap::new();
195+
props1.insert("delta.checkpointInterval".to_string(), "100".to_string());
196+
props1.insert(
197+
"delta.deletedFileRetentionDuration".to_string(),
198+
"interval 14 days".to_string(),
199+
);
200+
201+
let mut props2 = HashMap::new();
202+
props2.insert(
203+
"delta.logRetentionDuration".to_string(),
204+
"interval 30 days".to_string(),
205+
);
206+
props2.insert(
207+
"delta.autoOptimize.optimizeWrite".to_string(),
208+
"true".to_string(),
209+
);
210+
211+
let _result =
212+
TableManager::create_table(&table_path, schema.clone(), "InventorySystem/3.0.0")
213+
.with_table_properties(props1)
214+
.with_table_properties(props2)
215+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
216+
.commit(engine.as_ref())?;
217+
218+
// Verify table was created
219+
let table_url = delta_kernel::try_parse_uri(&table_path)?;
220+
let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
221+
222+
assert_eq!(snapshot.version(), 0);
223+
224+
Ok(())
225+
}
226+
227+
#[tokio::test]
228+
async fn test_commit_info_is_written_to_log() -> DeltaResult<()> {
229+
// Setup
230+
let temp_dir = tempdir().expect("Failed to create temp dir");
231+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
232+
233+
let engine =
234+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
235+
236+
// Create schema
237+
let schema = Arc::new(StructType::try_new(vec![
238+
StructField::new("user_id", DataType::LONG, false),
239+
StructField::new("action", DataType::STRING, false),
240+
])?);
241+
242+
let engine_info = "AuditService/2.1.0";
243+
244+
// Create table
245+
let _ = TableManager::create_table(&table_path, schema, engine_info)
246+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
247+
.commit(engine.as_ref())?;
248+
249+
// Read the actual Delta log file
250+
let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path);
251+
let log_contents = std::fs::read_to_string(&log_file_path).expect("Failed to read log file");
252+
253+
// Parse each line (each line is a separate JSON action)
254+
let actions: Vec<Value> = log_contents
255+
.lines()
256+
.map(|line| serde_json::from_str(line).expect("Failed to parse JSON"))
257+
.collect();
258+
259+
// Verify we have exactly 3 actions: CommitInfo, Protocol, Metadata
260+
// CommitInfo is first to comply with ICT (In-Commit Timestamps) protocol requirements
261+
assert_eq!(
262+
actions.len(),
263+
3,
264+
"Expected 3 actions (commitInfo, protocol, metaData), found {}",
265+
actions.len()
266+
);
267+
268+
// Verify CommitInfo action (first for ICT compliance)
269+
let commit_info_action = &actions[0];
270+
assert!(
271+
commit_info_action.get("commitInfo").is_some(),
272+
"First action should be commitInfo"
273+
);
274+
let commit_info = commit_info_action.get("commitInfo").unwrap();
275+
assert!(
276+
commit_info.get("timestamp").is_some(),
277+
"CommitInfo should have timestamp"
278+
);
279+
assert!(
280+
commit_info.get("engineInfo").is_some(),
281+
"CommitInfo should have engineInfo"
282+
);
283+
assert!(
284+
commit_info.get("operation").is_some(),
285+
"CommitInfo should have operation"
286+
);
287+
assert_eq!(
288+
commit_info["operation"], "CREATE TABLE",
289+
"Operation should be CREATE TABLE"
290+
);
291+
292+
// Verify Protocol action
293+
let protocol_action = &actions[1];
294+
assert!(
295+
protocol_action.get("protocol").is_some(),
296+
"Second action should be protocol"
297+
);
298+
let protocol = protocol_action.get("protocol").unwrap();
299+
assert_eq!(protocol["minReaderVersion"], 3);
300+
assert_eq!(protocol["minWriterVersion"], 7);
301+
302+
// Verify Metadata action
303+
let metadata_action = &actions[2];
304+
assert!(
305+
metadata_action.get("metaData").is_some(),
306+
"Third action should be metaData"
307+
);
308+
let metadata = metadata_action.get("metaData").unwrap();
309+
assert!(metadata.get("id").is_some(), "Metadata should have id");
310+
assert!(
311+
metadata.get("schemaString").is_some(),
312+
"Metadata should have schemaString"
313+
);
314+
assert!(
315+
metadata.get("createdTime").is_some(),
316+
"Metadata should have createdTime"
317+
);
318+
319+
// Additional CommitInfo verification (commit_info was already extracted from actions[0] above)
320+
assert_eq!(
321+
commit_info["engineInfo"], engine_info,
322+
"CommitInfo should contain the engine info we provided"
323+
);
324+
325+
assert!(
326+
commit_info.get("txnId").is_some(),
327+
"CommitInfo should have txnId"
328+
);
329+
330+
// Verify kernelVersion is present
331+
let kernel_version = commit_info.get("kernelVersion");
332+
assert!(
333+
kernel_version.is_some(),
334+
"CommitInfo should have kernelVersion"
335+
);
336+
assert!(
337+
kernel_version.unwrap().as_str().unwrap().starts_with("v"),
338+
"Kernel version should start with 'v'"
339+
);
340+
341+
Ok(())
342+
}
343+
344+
#[tokio::test]
345+
async fn test_log_action_order() -> DeltaResult<()> {
346+
// This test verifies that actions are written in the correct order:
347+
// 1. Protocol
348+
// 2. Metadata
349+
// 3. CommitInfo
350+
//
351+
// The Delta protocol doesn't strictly require this order for initial commits,
352+
// but it's a best practice and required for commits with in-commit timestamps.
353+
354+
let temp_dir = tempdir().expect("Failed to create temp dir");
355+
let table_path = temp_dir.path().to_str().expect("Invalid path").to_string();
356+
357+
let engine =
358+
create_default_engine(&url::Url::from_directory_path(&table_path).expect("Invalid URL"))?;
359+
360+
let schema = Arc::new(StructType::try_new(vec![StructField::new(
361+
"id",
362+
DataType::LONG,
363+
false,
364+
)])?);
365+
366+
let _ = TableManager::create_table(&table_path, schema, "OrderTest/1.0")
367+
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
368+
.commit(engine.as_ref())?;
369+
370+
// Read log file
371+
let log_file_path = format!("{}/_delta_log/00000000000000000000.json", table_path);
372+
let log_contents = std::fs::read_to_string(&log_file_path)?;
373+
374+
let lines: Vec<&str> = log_contents.lines().collect();
375+
376+
// Verify order: CommitInfo first (for ICT compliance), then Protocol, then Metadata
377+
assert!(
378+
lines[0].contains("\"commitInfo\""),
379+
"First action should be commitInfo (for ICT compliance)"
380+
);
381+
assert!(
382+
lines[1].contains("\"protocol\""),
383+
"Second action should be protocol"
384+
);
385+
assert!(
386+
lines[2].contains("\"metaData\""),
387+
"Third action should be metaData"
388+
);
389+
390+
Ok(())
391+
}

0 commit comments

Comments
 (0)