Skip to content

Commit 85f0089

Browse files
committed
fix
1 parent 78c2bb6 commit 85f0089

File tree

4 files changed

+18
-4
lines changed

4 files changed

+18
-4
lines changed

kernel/src/table_changes/log_replay.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::schema::{
2121
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
2222
use crate::table_configuration::TableConfiguration;
2323
use crate::table_features::Operation;
24+
use crate::table_features::TableFeature;
2425
use crate::utils::require;
2526
use crate::{DeltaResult, Engine, EngineData, Error, PredicateRef, RowVisitor};
2627

@@ -186,7 +187,9 @@ impl LogReplayScanner {
186187
visitor.visit_rows_of(actions.as_ref())?;
187188

188189
let metadata_opt = Metadata::try_new_from_data(actions.as_ref())?;
190+
let has_metadata_update = metadata_opt.is_some();
189191
let protocol_opt = Protocol::try_new_from_data(actions.as_ref())?;
192+
let has_protocol_update = protocol_opt.is_some();
190193

191194
if let Some(ref metadata) = metadata_opt {
192195
let schema = metadata.parse_schema()?;
@@ -200,15 +203,25 @@ impl LogReplayScanner {
200203
}
201204

202205
// Update table configuration with any new Protocol or Metadata from this commit
203-
if metadata_opt.is_some() || protocol_opt.is_some() {
206+
if has_metadata_update || has_protocol_update {
204207
*table_configuration = TableConfiguration::try_new_from(
205208
table_configuration,
206209
metadata_opt,
207210
protocol_opt,
208211
commit_file.version,
209212
)?;
213+
}
214+
215+
// If metadata is updated, check if Change Data Feed is supported
216+
if has_metadata_update {
217+
require!(
218+
table_configuration.is_feature_enabled(&TableFeature::ChangeDataFeed),
219+
Error::change_data_feed_unsupported(commit_file.version)
220+
);
221+
}
210222

211-
// If protocol or metadata is updated, check if Change Data Feed is supported
223+
// If protocol is updated, check if Change Data Feed is supported
224+
if has_protocol_update {
212225
table_configuration
213226
.ensure_operation_supported(Operation::Cdf)
214227
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;

kernel/src/table_changes/log_replay/tests.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ fn get_default_table_config(table_root: &url::Url) -> TableConfiguration {
4141
]),
4242
)
4343
.unwrap();
44-
let protocol = Protocol::try_new(1, 1, None::<Vec<String>>, None::<Vec<String>>).unwrap();
44+
// CDF requires min_writer_version = 4
45+
let protocol = Protocol::try_new(1, 4, None::<Vec<String>>, None::<Vec<String>>).unwrap();
4546
TableConfiguration::try_new(metadata, protocol, table_root.clone(), 0).unwrap()
4647
}
4748

@@ -329,7 +330,7 @@ async fn unsupported_protocol_feature_midstream() {
329330
// First commit: Basic protocol with CDF enabled
330331
mock_table
331332
.commit([
332-
protocol_action(1, 1, None, None),
333+
protocol_action(2, 6, None, None),
333334
metadata_with_cdf(get_schema()),
334335
])
335336
.await;
-555 Bytes
Binary file not shown.
46 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)