Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
.vscode/
.vim
.zed
.cache/
.clangd

# Rust
target/
Expand Down
4 changes: 3 additions & 1 deletion ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ mod tests {
))
};

let write_context = unsafe { get_write_context(txn_with_engine_info.shallow_copy()) };
let write_context = ok_or_panic(unsafe {
get_write_context(txn_with_engine_info.shallow_copy(), engine.shallow_copy())
});

// Ensure we get the correct schema
let write_schema = unsafe { get_write_schema(write_context.shallow_copy()) };
Expand Down
14 changes: 10 additions & 4 deletions ffi/src/transaction/write_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::error::{ExternResult, IntoExternResult};

Check warning on line 1 in ffi/src/transaction/write_context.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/delta-kernel-rs/delta-kernel-rs/ffi/src/transaction/write_context.rs
use crate::handle::Handle;
use crate::{kernel_string_slice, AllocateStringFn, NullableCvoid, SharedSchema};
use crate::{kernel_string_slice, AllocateStringFn, NullableCvoid, SharedExternEngine, SharedSchema};
use delta_kernel::transaction::WriteContext;
use delta_kernel_ffi_macros::handle_descriptor;

Expand All @@ -19,13 +20,18 @@
///
/// # Safety
///
/// Caller is responsible for passing a [valid][Handle#Validity] transaction handle.
/// Caller is responsible for passing valid transaction and engine handles.
#[no_mangle]
pub unsafe extern "C" fn get_write_context(
txn: Handle<ExclusiveTransaction>,
) -> Handle<SharedWriteContext> {
engine: Handle<SharedExternEngine>,
) -> ExternResult<Handle<SharedWriteContext>> {
let txn = unsafe { txn.as_ref() };
Arc::new(txn.get_write_context()).into()
let engine = unsafe { engine.as_ref() };
let write_context = txn.get_write_context();
write_context
.map(|wc| Arc::new(wc).into())
.into_extern_result(&engine)
}

#[no_mangle]
Expand Down
76 changes: 11 additions & 65 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::fs::{create_dir_all, write};
use std::fs::create_dir_all;
use std::path::Path;
use std::process::ExitCode;
use std::sync::Arc;
Expand All @@ -9,9 +9,7 @@ use arrow::util::pretty::print_batches;
use clap::Parser;
use common::{LocationArgs, ParseWithExamples};
use itertools::Itertools;
use serde_json::{json, to_vec};
use url::Url;
use uuid::Uuid;

use delta_kernel::arrow::array::TimestampMicrosecondArray;
use delta_kernel::committer::FileSystemCommitter;
Expand All @@ -20,6 +18,7 @@ use delta_kernel::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt};
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::table_manager::TableManager;
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};

Expand Down Expand Up @@ -94,7 +93,7 @@ async fn try_main() -> DeltaResult<()> {
.with_data_change(true);

// Write the data using the engine
let write_context = Arc::new(txn.get_write_context());
let write_context = Arc::new(txn.get_write_context()?);
let file_metadata = engine
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
.await?;
Expand Down Expand Up @@ -152,7 +151,7 @@ async fn create_or_get_base_snapshot(
// Create new table
println!("Creating new Delta table...");
let schema = parse_schema(schema_str)?;
create_table(url, &schema).await?;
create_table(url, &schema, engine).await?;
Snapshot::builder_for(url.clone()).build(engine)
}
}
Expand Down Expand Up @@ -192,66 +191,13 @@ fn parse_schema(schema_str: &str) -> DeltaResult<SchemaRef> {
Ok(Arc::new(StructType::try_new(fields)?))
}

/// Create a new Delta table with the given schema.
///
/// Creating a Delta table is not officially supported by kernel-rs yet, so we manually create the
/// initial transaction log.
async fn create_table(table_url: &Url, schema: &SchemaRef) -> DeltaResult<()> {
let table_id = Uuid::new_v4().to_string();
let schema_str = serde_json::to_string(&schema)?;

let (reader_features, writer_features) = {
let reader_features: Vec<&'static str> = vec![];
let writer_features: Vec<&'static str> = vec![];

// TODO: Support adding specific table features
(reader_features, writer_features)
};

let protocol = json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": reader_features,
"writerFeatures": writer_features,
}
});
let partition_columns: Vec<String> = vec![];
let metadata = json!({
"metaData": {
"id": table_id,
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": schema_str,
"partitionColumns": partition_columns,
"configuration": {},
"createdTime": 1677811175819u64
}
});

let data = [
to_vec(&protocol).unwrap(),
b"\n".to_vec(),
to_vec(&metadata).unwrap(),
]
.concat();

// Write the initial transaction with protocol and metadata to 0.json
let delta_log_path = table_url
.join("_delta_log/")?
.to_file_path()
.map_err(|_e| Error::generic("URL cannot be converted to local file path"))?;
let file_path = delta_log_path.join("00000000000000000000.json");

// Create the _delta_log directory if it doesn't exist
create_dir_all(&delta_log_path)
.map_err(|e| Error::generic(format!("Failed to create _delta_log directory: {e}")))?;

// Write the file using standard filesystem operations
write(&file_path, data)
.map_err(|e| Error::generic(format!("Failed to write initial transaction log: {e}")))?;
/// Create a new Delta table with the given schema using the official CreateTable API.
async fn create_table(table_url: &Url, schema: &SchemaRef, engine: &dyn Engine) -> DeltaResult<()> {
// Use the new TableManager API to create the table
let table_path = table_url.as_str();
let _result = TableManager::create_table(table_path, schema.clone(), "write-table-example/1.0")
.build(engine, Box::new(FileSystemCommitter::new()))?
.commit(engine)?;

println!("✓ Created Delta table with schema: {schema:#?}");
Ok(())
Expand Down
10 changes: 10 additions & 0 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";

pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";

/// Minimum reader version for tables that use table features.
/// When set to 3, the protocol requires an explicit `readerFeatures` array.
#[internal_api]
pub(crate) const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;

/// Minimum writer version for tables that use table features.
/// When set to 7, the protocol requires an explicit `writerFeatures` array.
#[internal_api]
pub(crate) const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;

static COMMIT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
Expand Down
3 changes: 3 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub mod snapshot;
pub mod table_changes;
pub mod table_configuration;
pub mod table_features;
pub mod table_manager;
pub mod table_properties;
pub mod transaction;
pub(crate) mod transforms;
Expand Down Expand Up @@ -158,6 +159,8 @@ pub use log_compaction::{should_compact, LogCompactionWriter};
pub use metrics::MetricsReporter;
pub use snapshot::Snapshot;
pub use snapshot::SnapshotRef;
pub use table_manager::TableManager;
pub use transaction::create_table::CreateTableBuilder;

use expressions::literal_expression_transform::LiteralExpressionTransform;
use expressions::Scalar;
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl Snapshot {

/// Create a [`Transaction`] for this `SnapshotRef`. With the specified [`Committer`].
pub fn transaction(self: Arc<Self>, committer: Box<dyn Committer>) -> DeltaResult<Transaction> {
Transaction::try_new(self, committer)
Transaction::try_new_existing_table(self, committer)
}

/// Fetch the latest version of the provided `application_id` for this snapshot. Filters the txn based on the SetTransactionRetentionDuration property and lastUpdated
Expand Down
Loading
Loading