Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/src/table_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ mod tests {
}}
{"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
"minWriterVersion": 4
}}
{"metaData": {
"id": "5fba94ed-9794-4965-ba6e-6ee3c0d22af9",
Expand Down
83 changes: 1 addition & 82 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};

Expand All @@ -24,7 +23,6 @@ use url::Url;
use visitors::{MetadataVisitor, ProtocolVisitor};

use delta_kernel_derive::{internal_api, IntoEngineData, ToSchema};
use itertools::Itertools;
use serde::{Deserialize, Serialize};

const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -568,33 +566,6 @@ impl Protocol {
self.has_table_feature(&TableFeature::CatalogManaged)
|| self.has_table_feature(&TableFeature::CatalogOwnedPreview)
}

pub(crate) fn is_cdf_supported(&self) -> bool {
// TODO: we should probably expand this to ~all supported reader features instead of gating
// on a subset here. missing ones are:
// - variantType
// - typeWidening
// - catalogManaged
static CDF_SUPPORTED_READER_FEATURES: LazyLock<Vec<TableFeature>> = LazyLock::new(|| {
vec![
TableFeature::DeletionVectors,
TableFeature::ColumnMapping,
TableFeature::TimestampWithoutTimezone,
TableFeature::V2Checkpoint,
TableFeature::VacuumProtocolCheck,
]
});
match self.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if self.min_reader_version() == 3 => {
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES).is_ok()
}
// if min_reader_version = 1 or 2 and there are no reader features => OK
None => (1..=2).contains(&self.min_reader_version()),
// any other protocol is not supported
_ => false,
}
}
}

// TODO: implement Scalar::From<HashMap<K, V>> so we can derive IntoEngineData using a macro (issue#1083)
Expand Down Expand Up @@ -636,43 +607,6 @@ impl IntoEngineData for Protocol {
}
}

// given `table_features`, check if they are subset of `supported_features`
pub(crate) fn ensure_supported_features<T>(
table_features: &[T],
supported_features: &[T],
) -> DeltaResult<()>
where
T: Display + FromStr + Hash + Eq,
<T as FromStr>::Err: Display,
{
// first check if all features are supported, else we proceed to craft an error message
if table_features
.iter()
.all(|feature| supported_features.contains(feature))
{
return Ok(());
}

// we get the type name (TableFeature) for better error messages
let features_type = std::any::type_name::<T>()
.rsplit("::")
.next()
.unwrap_or("table feature");

// NB: we didn't do this above to avoid allocation in the common case
let mut unsupported = table_features
.iter()
.filter(|feature| !supported_features.contains(*feature));

Err(Error::Unsupported(format!(
"Found unsupported {}s: \"{}\". Supported {}s: \"{}\"",
features_type,
unsupported.join("\", \""),
features_type,
supported_features.iter().join("\", \""),
)))
}

#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[internal_api]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
Expand Down Expand Up @@ -1553,21 +1487,6 @@ mod tests {
assert!(protocol.validate_table_features().is_err());
}

#[test]
fn test_ensure_supported_features() {
let supported_features = [TableFeature::ColumnMapping, TableFeature::DeletionVectors];
let table_features = vec![TableFeature::ColumnMapping];
ensure_supported_features(&table_features, &supported_features).unwrap();
// test unknown features
let table_features = vec![TableFeature::ColumnMapping, TableFeature::unknown("idk")];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
match error {
Error::Unsupported(e) if e ==
"Found unsupported TableFeatures: \"idk\". Supported TableFeatures: \"columnMapping\", \"deletionVectors\""
=> {},
_ => panic!("Expected unsupported error, got: {error}"),
}
}
#[test]
fn test_parse_table_feature_never_fails() {
// parse a non-str
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ use crate::log_replay::LogReplayProcessor;
use crate::path::ParsedLogPath;
use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
use crate::snapshot::SnapshotRef;
use crate::table_features::TableFeature;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension, FileMeta};

Expand Down Expand Up @@ -232,7 +233,7 @@ impl CheckpointWriter {
let is_v2_checkpoints_supported = self
.snapshot
.table_configuration()
.is_v2_checkpoint_write_supported();
.is_feature_supported(&TableFeature::V2Checkpoint);

let actions = self.snapshot.log_segment().read_actions(
engine,
Expand Down
32 changes: 20 additions & 12 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use crate::scan::state::DvInfo;
use crate::schema::{
ColumnNamesAndTypes, DataType, SchemaRef, StructField, StructType, ToSchema as _,
};
use crate::table_changes::check_cdf_table_properties;
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_configuration::TableConfiguration;
use crate::table_features::Operation;
use crate::table_features::TableFeature;
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, PredicateRef, RowVisitor};

Expand Down Expand Up @@ -186,15 +187,10 @@ impl LogReplayScanner {
visitor.visit_rows_of(actions.as_ref())?;

let metadata_opt = Metadata::try_new_from_data(actions.as_ref())?;
let has_metadata_update = metadata_opt.is_some();
let protocol_opt = Protocol::try_new_from_data(actions.as_ref())?;
let has_protocol_update = protocol_opt.is_some();

// Validate protocol and metadata if present
if let Some(ref protocol) = protocol_opt {
require!(
protocol.is_cdf_supported(),
Error::change_data_feed_unsupported(commit_file.version)
);
}
if let Some(ref metadata) = metadata_opt {
let schema = metadata.parse_schema()?;
// Currently, schema compatibility is defined as having equal schema types. In the
Expand All @@ -204,20 +200,32 @@ impl LogReplayScanner {
table_schema.as_ref() == &schema,
Error::change_data_feed_incompatible_schema(table_schema, &schema)
);
let table_properties = metadata.parse_table_properties();
check_cdf_table_properties(&table_properties)
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
}

// Update table configuration with any new Protocol or Metadata from this commit
if metadata_opt.is_some() || protocol_opt.is_some() {
if has_metadata_update || has_protocol_update {
*table_configuration = TableConfiguration::try_new_from(
table_configuration,
metadata_opt,
protocol_opt,
commit_file.version,
)?;
}

// If metadata is updated, check if Change Data Feed is supported
if has_metadata_update {
require!(
table_configuration.is_feature_enabled(&TableFeature::ChangeDataFeed),
Error::change_data_feed_unsupported(commit_file.version)
);
}

// If protocol is updated, check if Change Data Feed is supported
if has_protocol_update {
table_configuration
.ensure_operation_supported(Operation::Cdf)
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
}
}
// We resolve the remove deletion vector map after visiting the entire commit.
if has_cdc_action {
Expand Down
24 changes: 15 additions & 9 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ fn get_default_table_config(table_root: &url::Url) -> TableConfiguration {
]),
)
.unwrap();
let protocol = Protocol::try_new(1, 1, None::<Vec<String>>, None::<Vec<String>>).unwrap();
// CDF requires min_writer_version = 4
let protocol = Protocol::try_new(1, 4, None::<Vec<String>>, None::<Vec<String>>).unwrap();
TableConfiguration::try_new(metadata, protocol, table_root.clone(), 0).unwrap()
}

Expand Down Expand Up @@ -167,7 +168,7 @@ async fn metadata_protocol() {
3,
7,
Some([TableFeature::DeletionVectors]),
Some([TableFeature::DeletionVectors]),
Some([TableFeature::DeletionVectors, TableFeature::ChangeDataFeed]),
)
.unwrap(),
),
Expand Down Expand Up @@ -230,8 +231,14 @@ async fn unsupported_reader_feature() {
Protocol::try_new(
3,
7,
Some([TableFeature::DeletionVectors, TableFeature::TypeWidening]),
Some([TableFeature::DeletionVectors, TableFeature::TypeWidening]),
Some([
TableFeature::DeletionVectors,
TableFeature::unknown("unsupportedReaderFeature"),
]),
Some([
TableFeature::DeletionVectors,
TableFeature::unknown("unsupportedReaderFeature"),
]),
)
.unwrap(),
)])
Expand Down Expand Up @@ -323,25 +330,24 @@ async fn unsupported_protocol_feature_midstream() {
// First commit: Basic protocol with CDF enabled
mock_table
.commit([
protocol_action(1, 1, None, None),
protocol_action(2, 6, None, None),
metadata_with_cdf(get_schema()),
])
.await;

// Second commit: Protocol update with unsupported feature (TypeWidening)
// Second commit: Protocol update with unsupported feature
mock_table
.commit([protocol_action(
3,
7,
Some(vec![TableFeature::TypeWidening]),
Some(vec![TableFeature::TypeWidening]),
Some(vec![TableFeature::unknown("unsupportedFeature")]),
Some(vec![TableFeature::unknown("unsupportedFeature")]),
)])
.await;

assert_midstream_failure(engine, &mock_table);
}

// Note: This should be removed once type widening support is added for CDF
#[tokio::test]
async fn incompatible_schemas_fail() {
async fn assert_incompatible_schema(commit_schema: StructType, cdf_schema: StructType) {
Expand Down
22 changes: 5 additions & 17 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::{Snapshot, SnapshotRef};
use crate::table_configuration::TableConfiguration;
use crate::table_features::Operation;
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::table_features::TableFeature;
use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
Expand Down Expand Up @@ -168,12 +167,12 @@ impl TableChanges {
// [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is
// disabled.
//
// We also check the [`Protocol`] using [`ensure_cdf_read_supported`] to verify that
// we support CDF with those features enabled.
//
// Note: We must still check each metadata and protocol action in the CDF range.
let check_table_config = |snapshot: &Snapshot| {
if snapshot.table_configuration().is_cdf_read_supported() {
if snapshot
.table_configuration()
.is_feature_enabled(&TableFeature::ChangeDataFeed)
{
Ok(())
} else {
Err(Error::change_data_feed_unsupported(snapshot.version()))
Expand Down Expand Up @@ -240,17 +239,6 @@ impl TableChanges {
}
}

/// Ensures that change data feed is enabled in `table_properties`. See the documentation
/// of [`TableChanges`] for more details.
// TODO: move to TableProperties and normalize with the check in TableConfiguration
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
require!(
table_properties.enable_change_data_feed.unwrap_or(false),
Error::unsupported("Change data feed is not enabled")
);
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading