Skip to content

Commit fa07ec6

Browse files
authored
fix: fix read parquert file when schema change (apache#1750)
1 parent 826d98f commit fa07ec6

File tree

1 file changed

+106
-23
lines changed

1 file changed

+106
-23
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 106 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -583,35 +583,25 @@ impl ArrowReader {
583583
true
584584
});
585585

586-
if column_map.len() != leaf_field_ids.len() {
587-
let missing_fields = leaf_field_ids
588-
.iter()
589-
.filter(|field_id| !column_map.contains_key(field_id))
590-
.collect::<Vec<_>>();
591-
return Err(Error::new(
592-
ErrorKind::DataInvalid,
593-
format!(
594-
"Parquet schema {} and Iceberg schema {} do not match.",
595-
iceberg_schema, iceberg_schema_of_task
596-
),
597-
)
598-
.with_context("column_map", format! {"{:?}", column_map})
599-
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
600-
.with_context("missing_fields", format! {"{:?}", missing_fields}));
601-
}
602-
586+
// Only project columns that exist in the Parquet file.
587+
// Missing columns will be added by RecordBatchTransformer with default/NULL values.
588+
// This supports schema evolution where new columns are added to the table schema
589+
// but old Parquet files don't have them yet.
603590
let mut indices = vec![];
604591
for field_id in leaf_field_ids {
605592
if let Some(col_idx) = column_map.get(&field_id) {
606593
indices.push(*col_idx);
607-
} else {
608-
return Err(Error::new(
609-
ErrorKind::DataInvalid,
610-
format!("Field {} is not found in Parquet schema.", field_id),
611-
));
612594
}
595+
// Skip fields that don't exist in the Parquet file - they will be added later
596+
}
597+
598+
if indices.is_empty() {
599+
// If no columns from the projection exist in the file, project all columns
600+
// This can happen if all requested columns are new and need to be added by the transformer
601+
Ok(ProjectionMask::all())
602+
} else {
603+
Ok(ProjectionMask::leaves(parquet_schema, indices))
613604
}
614-
Ok(ProjectionMask::leaves(parquet_schema, indices))
615605
}
616606
}
617607

@@ -1958,4 +1948,97 @@ message schema {
19581948

19591949
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
19601950
}
1951+
1952+
/// Test schema evolution: reading old Parquet file (with only column 'a')
1953+
/// using a newer table schema (with columns 'a' and 'b').
1954+
/// This tests that:
1955+
/// 1. get_arrow_projection_mask allows missing columns
1956+
/// 2. RecordBatchTransformer adds missing column 'b' with NULL values
1957+
#[tokio::test]
1958+
async fn test_schema_evolution_add_column() {
1959+
use arrow_array::{Array, Int32Array};
1960+
1961+
// New table schema: columns 'a' and 'b' (b was added later, file only has 'a')
1962+
let new_schema = Arc::new(
1963+
Schema::builder()
1964+
.with_schema_id(2)
1965+
.with_fields(vec![
1966+
NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(),
1967+
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
1968+
])
1969+
.build()
1970+
.unwrap(),
1971+
);
1972+
1973+
// Create Arrow schema for old Parquet file (only has column 'a')
1974+
let arrow_schema_old = Arc::new(ArrowSchema::new(vec![
1975+
Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([(
1976+
PARQUET_FIELD_ID_META_KEY.to_string(),
1977+
"1".to_string(),
1978+
)])),
1979+
]));
1980+
1981+
// Write old Parquet file with only column 'a'
1982+
let tmp_dir = TempDir::new().unwrap();
1983+
let table_location = tmp_dir.path().to_str().unwrap().to_string();
1984+
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
1985+
1986+
let data_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
1987+
let to_write = RecordBatch::try_new(arrow_schema_old.clone(), vec![data_a]).unwrap();
1988+
1989+
let props = WriterProperties::builder()
1990+
.set_compression(Compression::SNAPPY)
1991+
.build();
1992+
let file = File::create(format!("{}/old_file.parquet", &table_location)).unwrap();
1993+
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
1994+
writer.write(&to_write).expect("Writing batch");
1995+
writer.close().unwrap();
1996+
1997+
// Read the old Parquet file using the NEW schema (with column 'b')
1998+
let reader = ArrowReaderBuilder::new(file_io).build();
1999+
let tasks = Box::pin(futures::stream::iter(
2000+
vec![Ok(FileScanTask {
2001+
start: 0,
2002+
length: 0,
2003+
record_count: None,
2004+
data_file_path: format!("{}/old_file.parquet", table_location),
2005+
data_file_format: DataFileFormat::Parquet,
2006+
schema: new_schema.clone(),
2007+
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
2008+
predicate: None,
2009+
deletes: vec![],
2010+
})]
2011+
.into_iter(),
2012+
)) as FileScanTaskStream;
2013+
2014+
let result = reader
2015+
.read(tasks)
2016+
.unwrap()
2017+
.try_collect::<Vec<RecordBatch>>()
2018+
.await
2019+
.unwrap();
2020+
2021+
// Verify we got the correct data
2022+
assert_eq!(result.len(), 1);
2023+
let batch = &result[0];
2024+
2025+
// Should have 2 columns now
2026+
assert_eq!(batch.num_columns(), 2);
2027+
assert_eq!(batch.num_rows(), 3);
2028+
2029+
// Column 'a' should have the original data
2030+
let col_a = batch
2031+
.column(0)
2032+
.as_primitive::<arrow_array::types::Int32Type>();
2033+
assert_eq!(col_a.values(), &[1, 2, 3]);
2034+
2035+
// Column 'b' should be all NULLs (it didn't exist in the old file)
2036+
let col_b = batch
2037+
.column(1)
2038+
.as_primitive::<arrow_array::types::Int32Type>();
2039+
assert_eq!(col_b.null_count(), 3);
2040+
assert!(col_b.is_null(0));
2041+
assert!(col_b.is_null(1));
2042+
assert!(col_b.is_null(2));
2043+
}
19612044
}

0 commit comments

Comments
 (0)