Skip to content

Commit ab09aea

Browse files
committed
fix
1 parent 3700e80 commit ab09aea

File tree

10 files changed

+65
-273
lines changed

10 files changed

+65
-273
lines changed

kernel/src/actions/mod.rs

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)
33
44
use std::collections::HashMap;
5-
use std::fmt::{Debug, Display};
6-
use std::hash::Hash;
5+
use std::fmt::Debug;
76
use std::str::FromStr;
87
use std::sync::{Arc, LazyLock};
98

@@ -24,7 +23,6 @@ use url::Url;
2423
use visitors::{MetadataVisitor, ProtocolVisitor};
2524

2625
use delta_kernel_derive::{internal_api, IntoEngineData, ToSchema};
27-
use itertools::Itertools;
2826
use serde::{Deserialize, Serialize};
2927

3028
const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -568,33 +566,6 @@ impl Protocol {
568566
self.has_table_feature(&TableFeature::CatalogManaged)
569567
|| self.has_table_feature(&TableFeature::CatalogOwnedPreview)
570568
}
571-
572-
pub(crate) fn is_cdf_supported(&self) -> bool {
573-
// TODO: we should probably expand this to ~all supported reader features instead of gating
574-
// on a subset here. missing ones are:
575-
// - variantType
576-
// - typeWidening
577-
// - catalogManaged
578-
static CDF_SUPPORTED_READER_FEATURES: LazyLock<Vec<TableFeature>> = LazyLock::new(|| {
579-
vec![
580-
TableFeature::DeletionVectors,
581-
TableFeature::ColumnMapping,
582-
TableFeature::TimestampWithoutTimezone,
583-
TableFeature::V2Checkpoint,
584-
TableFeature::VacuumProtocolCheck,
585-
]
586-
});
587-
match self.reader_features() {
588-
// if min_reader_version = 3 and all reader features are subset of supported => OK
589-
Some(reader_features) if self.min_reader_version() == 3 => {
590-
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES).is_ok()
591-
}
592-
// if min_reader_version = 1 or 2 and there are no reader features => OK
593-
None => (1..=2).contains(&self.min_reader_version()),
594-
// any other protocol is not supported
595-
_ => false,
596-
}
597-
}
598569
}
599570

600571
// TODO: implement Scalar::From<HashMap<K, V>> so we can derive IntoEngineData using a macro (issue#1083)
@@ -636,43 +607,6 @@ impl IntoEngineData for Protocol {
636607
}
637608
}
638609

639-
// given `table_features`, check if they are subset of `supported_features`
640-
pub(crate) fn ensure_supported_features<T>(
641-
table_features: &[T],
642-
supported_features: &[T],
643-
) -> DeltaResult<()>
644-
where
645-
T: Display + FromStr + Hash + Eq,
646-
<T as FromStr>::Err: Display,
647-
{
648-
// first check if all features are supported, else we proceed to craft an error message
649-
if table_features
650-
.iter()
651-
.all(|feature| supported_features.contains(feature))
652-
{
653-
return Ok(());
654-
}
655-
656-
// we get the type name (TableFeature) for better error messages
657-
let features_type = std::any::type_name::<T>()
658-
.rsplit("::")
659-
.next()
660-
.unwrap_or("table feature");
661-
662-
// NB: we didn't do this above to avoid allocation in the common case
663-
let mut unsupported = table_features
664-
.iter()
665-
.filter(|feature| !supported_features.contains(*feature));
666-
667-
Err(Error::Unsupported(format!(
668-
"Found unsupported {}s: \"{}\". Supported {}s: \"{}\"",
669-
features_type,
670-
unsupported.join("\", \""),
671-
features_type,
672-
supported_features.iter().join("\", \""),
673-
)))
674-
}
675-
676610
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
677611
#[internal_api]
678612
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]

kernel/src/checkpoint/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ use crate::log_replay::LogReplayProcessor;
101101
use crate::path::ParsedLogPath;
102102
use crate::schema::{DataType, SchemaRef, StructField, StructType, ToSchema as _};
103103
use crate::snapshot::SnapshotRef;
104+
use crate::table_features::TableFeature;
104105
use crate::table_properties::TableProperties;
105106
use crate::{DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension, FileMeta};
106107

@@ -232,7 +233,7 @@ impl CheckpointWriter {
232233
let is_v2_checkpoints_supported = self
233234
.snapshot
234235
.table_configuration()
235-
.is_v2_checkpoint_write_supported();
236+
.is_feature_supported(&TableFeature::V2Checkpoint);
236237

237238
let actions = self.snapshot.log_segment().read_actions(
238239
engine,

kernel/src/table_changes/log_replay.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ use crate::scan::state::DvInfo;
1818
use crate::schema::{
1919
ColumnNamesAndTypes, DataType, SchemaRef, StructField, StructType, ToSchema as _,
2020
};
21-
use crate::table_changes::check_cdf_table_properties;
2221
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
2322
use crate::table_configuration::TableConfiguration;
23+
use crate::table_features::Operation;
24+
use crate::table_features::TableFeature;
2425
use crate::utils::require;
2526
use crate::{DeltaResult, Engine, EngineData, Error, PredicateRef, RowVisitor};
2627

@@ -186,15 +187,10 @@ impl LogReplayScanner {
186187
visitor.visit_rows_of(actions.as_ref())?;
187188

188189
let metadata_opt = Metadata::try_new_from_data(actions.as_ref())?;
190+
let has_metadata_update = metadata_opt.is_some();
189191
let protocol_opt = Protocol::try_new_from_data(actions.as_ref())?;
192+
let has_protocol_update = protocol_opt.is_some();
190193

191-
// Validate protocol and metadata if present
192-
if let Some(ref protocol) = protocol_opt {
193-
require!(
194-
protocol.is_cdf_supported(),
195-
Error::change_data_feed_unsupported(commit_file.version)
196-
);
197-
}
198194
if let Some(ref metadata) = metadata_opt {
199195
let schema = metadata.parse_schema()?;
200196
// Currently, schema compatibility is defined as having equal schema types. In the
@@ -204,20 +200,32 @@ impl LogReplayScanner {
204200
table_schema.as_ref() == &schema,
205201
Error::change_data_feed_incompatible_schema(table_schema, &schema)
206202
);
207-
let table_properties = metadata.parse_table_properties();
208-
check_cdf_table_properties(&table_properties)
209-
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
210203
}
211204

212205
// Update table configuration with any new Protocol or Metadata from this commit
213-
if metadata_opt.is_some() || protocol_opt.is_some() {
206+
if has_metadata_update || has_protocol_update {
214207
*table_configuration = TableConfiguration::try_new_from(
215208
table_configuration,
216209
metadata_opt,
217210
protocol_opt,
218211
commit_file.version,
219212
)?;
220213
}
214+
215+
// If metadata is updated, check if Change Data Feed is supported
216+
if has_metadata_update {
217+
require!(
218+
table_configuration.is_feature_enabled(&TableFeature::ChangeDataFeed),
219+
Error::change_data_feed_unsupported(commit_file.version)
220+
);
221+
}
222+
223+
// If protocol is updated, check if Change Data Feed is supported
224+
if has_protocol_update {
225+
table_configuration
226+
.ensure_operation_supported(Operation::Cdf)
227+
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;
228+
}
221229
}
222230
// We resolve the remove deletion vector map after visiting the entire commit.
223231
if has_cdc_action {

kernel/src/table_changes/log_replay/tests.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ fn get_default_table_config(table_root: &url::Url) -> TableConfiguration {
4141
]),
4242
)
4343
.unwrap();
44-
let protocol = Protocol::try_new(1, 1, None::<Vec<String>>, None::<Vec<String>>).unwrap();
44+
// CDF requires min_writer_version = 4
45+
let protocol = Protocol::try_new(1, 4, None::<Vec<String>>, None::<Vec<String>>).unwrap();
4546
TableConfiguration::try_new(metadata, protocol, table_root.clone(), 0).unwrap()
4647
}
4748

@@ -167,7 +168,7 @@ async fn metadata_protocol() {
167168
3,
168169
7,
169170
Some([TableFeature::DeletionVectors]),
170-
Some([TableFeature::DeletionVectors]),
171+
Some([TableFeature::DeletionVectors, TableFeature::ChangeDataFeed]),
171172
)
172173
.unwrap(),
173174
),
@@ -230,8 +231,14 @@ async fn unsupported_reader_feature() {
230231
Protocol::try_new(
231232
3,
232233
7,
233-
Some([TableFeature::DeletionVectors, TableFeature::TypeWidening]),
234-
Some([TableFeature::DeletionVectors, TableFeature::TypeWidening]),
234+
Some([
235+
TableFeature::DeletionVectors,
236+
TableFeature::unknown("unsupportedReaderFeature"),
237+
]),
238+
Some([
239+
TableFeature::DeletionVectors,
240+
TableFeature::unknown("unsupportedReaderFeature"),
241+
]),
235242
)
236243
.unwrap(),
237244
)])
@@ -323,25 +330,24 @@ async fn unsupported_protocol_feature_midstream() {
323330
// First commit: Basic protocol with CDF enabled
324331
mock_table
325332
.commit([
326-
protocol_action(1, 1, None, None),
333+
protocol_action(2, 6, None, None),
327334
metadata_with_cdf(get_schema()),
328335
])
329336
.await;
330337

331-
// Second commit: Protocol update with unsupported feature (TypeWidening)
338+
// Second commit: Protocol update with unsupported feature
332339
mock_table
333340
.commit([protocol_action(
334341
3,
335342
7,
336-
Some(vec![TableFeature::TypeWidening]),
337-
Some(vec![TableFeature::TypeWidening]),
343+
Some(vec![TableFeature::unknown("unsupportedFeature")]),
344+
Some(vec![TableFeature::unknown("unsupportedFeature")]),
338345
)])
339346
.await;
340347

341348
assert_midstream_failure(engine, &mock_table);
342349
}
343350

344-
// Note: This should be removed once type widening support is added for CDF
345351
#[tokio::test]
346352
async fn incompatible_schemas_fail() {
347353
async fn assert_incompatible_schema(commit_schema: StructType, cdf_schema: StructType) {

kernel/src/table_changes/mod.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ use crate::schema::{DataType, Schema, StructField, StructType};
4242
use crate::snapshot::{Snapshot, SnapshotRef};
4343
use crate::table_configuration::TableConfiguration;
4444
use crate::table_features::Operation;
45-
use crate::table_properties::TableProperties;
46-
use crate::utils::require;
45+
use crate::table_features::TableFeature;
4746
use crate::{DeltaResult, Engine, Error, Version};
4847

4948
mod log_replay;
@@ -168,12 +167,12 @@ impl TableChanges {
168167
// [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is
169168
// disabled.
170169
//
171-
// We also check the [`Protocol`] using [`ensure_cdf_read_supported`] to verify that
172-
// we support CDF with those features enabled.
173-
//
174170
// Note: We must still check each metadata and protocol action in the CDF range.
175171
let check_table_config = |snapshot: &Snapshot| {
176-
if snapshot.table_configuration().is_cdf_read_supported() {
172+
if snapshot
173+
.table_configuration()
174+
.is_feature_enabled(&TableFeature::ChangeDataFeed)
175+
{
177176
Ok(())
178177
} else {
179178
Err(Error::change_data_feed_unsupported(snapshot.version()))
@@ -240,17 +239,6 @@ impl TableChanges {
240239
}
241240
}
242241

243-
/// Ensures that change data feed is enabled in `table_properties`. See the documentation
244-
/// of [`TableChanges`] for more details.
245-
// TODO: move to TableProperties and normalize with the check in TableConfiguration
246-
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
247-
require!(
248-
table_properties.enable_change_data_feed.unwrap_or(false),
249-
Error::unsupported("Change data feed is not enabled")
250-
);
251-
Ok(())
252-
}
253-
254242
#[cfg(test)]
255243
mod tests {
256244
use super::*;

0 commit comments

Comments
 (0)