@@ -24,17 +24,16 @@ use std::sync::Arc;
2424use async_trait:: async_trait;
2525use datafusion:: arrow:: datatypes:: SchemaRef as ArrowSchemaRef ;
2626use datafusion:: catalog:: Session ;
27- use datafusion:: common:: DataFusionError ;
2827use datafusion:: datasource:: { TableProvider , TableType } ;
29- use datafusion:: error:: Result as DFResult ;
28+ use datafusion:: error:: { DataFusionError , Result as DFResult } ;
3029use datafusion:: logical_expr:: dml:: InsertOp ;
3130use datafusion:: logical_expr:: { Expr , TableProviderFilterPushDown } ;
3231use datafusion:: physical_plan:: ExecutionPlan ;
3332use datafusion:: physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
3433use iceberg:: arrow:: schema_to_arrow_schema;
3534use iceberg:: inspect:: MetadataTableType ;
3635use iceberg:: table:: Table ;
37- use iceberg:: { Catalog , Error , ErrorKind , NamespaceIdent , Result , TableIdent } ;
36+ use iceberg:: { Catalog , Error , ErrorKind , Result , TableIdent } ;
3837use metadata_table:: IcebergMetadataTableProvider ;
3938
4039use crate :: physical_plan:: commit:: IcebergCommitExec ;
@@ -43,7 +42,7 @@ use crate::physical_plan::write::IcebergWriteExec;
4342
4443/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
4544/// managing access to a [`Table`].
46- #[ derive( Debug , Clone ) ]
45+ #[ derive( Clone ) ]
4746pub struct IcebergTableProvider {
4847 /// A table in the catalog.
4948 table : Table ,
@@ -55,6 +54,16 @@ pub struct IcebergTableProvider {
5554 catalog : Option < Arc < dyn Catalog > > ,
5655}
5756
57+ impl std:: fmt:: Debug for IcebergTableProvider {
58+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
59+ f. debug_struct ( "IcebergTableProvider" )
60+ . field ( "table" , & self . table )
61+ . field ( "snapshot_id" , & self . snapshot_id )
62+ . field ( "schema" , & self . schema )
63+ . finish_non_exhaustive ( )
64+ }
65+ }
66+
5867impl IcebergTableProvider {
5968 pub ( crate ) fn new ( table : Table , schema : ArrowSchemaRef ) -> Self {
6069 IcebergTableProvider {
@@ -67,13 +76,8 @@ impl IcebergTableProvider {
6776 /// Asynchronously tries to construct a new [`IcebergTableProvider`]
6877 /// using the given client and table name to fetch an actual [`Table`]
6978 /// in the provided namespace.
70- pub ( crate ) async fn try_new (
71- client : Arc < dyn Catalog > ,
72- namespace : NamespaceIdent ,
73- name : impl Into < String > ,
74- ) -> Result < Self > {
75- let ident = TableIdent :: new ( namespace, name. into ( ) ) ;
76- let table = client. load_table ( & ident) . await ?;
79+ pub async fn try_new ( client : Arc < dyn Catalog > , table_name : TableIdent ) -> Result < Self > {
80+ let table = client. load_table ( & table_name) . await ?;
7781
7882 let schema = Arc :: new ( schema_to_arrow_schema ( table. metadata ( ) . current_schema ( ) ) ?) ;
7983
@@ -151,8 +155,19 @@ impl TableProvider for IcebergTableProvider {
151155 filters : & [ Expr ] ,
152156 _limit : Option < usize > ,
153157 ) -> DFResult < Arc < dyn ExecutionPlan > > {
158+ // Get the latest table metadata from the catalog if it exists
159+ let table = if let Some ( catalog) = & self . catalog {
160+ catalog
161+ . load_table ( self . table . identifier ( ) )
162+ . await
163+ . map_err ( |e| {
164+ DataFusionError :: Execution ( format ! ( "Error getting Iceberg table metadata: {e}" ) )
165+ } ) ?
166+ } else {
167+ self . table . clone ( )
168+ } ;
154169 Ok ( Arc :: new ( IcebergTableScan :: new (
155- self . table . clone ( ) ,
170+ table,
156171 self . snapshot_id ,
157172 self . schema . clone ( ) ,
158173 projection,
0 commit comments