2020use std:: collections:: HashMap ;
2121
2222use async_trait:: async_trait;
23- use futures:: lock:: Mutex ;
23+ use futures:: lock:: { Mutex , MutexGuard } ;
2424use itertools:: Itertools ;
25- use uuid:: Uuid ;
2625
2726use super :: namespace_state:: NamespaceState ;
2827use crate :: io:: FileIO ;
2928use crate :: spec:: { TableMetadata , TableMetadataBuilder } ;
3029use crate :: table:: Table ;
3130use crate :: {
32- Catalog , Error , ErrorKind , Namespace , NamespaceIdent , Result , TableCommit , TableCreation ,
33- TableIdent ,
31+ Catalog , Error , ErrorKind , MetadataLocation , Namespace , NamespaceIdent , Result , TableCommit ,
32+ TableCreation , TableIdent ,
3433} ;
3534
3635/// namespace `location` property
@@ -45,14 +44,31 @@ pub struct MemoryCatalog {
4544}
4645
4746impl MemoryCatalog {
48- /// Creates an memory catalog.
47+ /// Creates a memory catalog.
4948 pub fn new ( file_io : FileIO , warehouse_location : Option < String > ) -> Self {
5049 Self {
5150 root_namespace_state : Mutex :: new ( NamespaceState :: default ( ) ) ,
5251 file_io,
5352 warehouse_location,
5453 }
5554 }
55+
56+ /// Loads a table from the locked namespace state.
57+ async fn load_table_from_locked_state (
58+ & self ,
59+ table_ident : & TableIdent ,
60+ root_namespace_state : & MutexGuard < ' _ , NamespaceState > ,
61+ ) -> Result < Table > {
62+ let metadata_location = root_namespace_state. get_existing_table_location ( table_ident) ?;
63+ let metadata = TableMetadata :: read_from ( & self . file_io , metadata_location) . await ?;
64+
65+ Table :: builder ( )
66+ . identifier ( table_ident. clone ( ) )
67+ . metadata ( metadata)
68+ . metadata_location ( metadata_location. to_string ( ) )
69+ . file_io ( self . file_io . clone ( ) )
70+ . build ( )
71+ }
5672}
5773
5874#[ async_trait]
@@ -203,12 +219,7 @@ impl Catalog for MemoryCatalog {
203219 let metadata = TableMetadataBuilder :: from_table_creation ( table_creation) ?
204220 . build ( ) ?
205221 . metadata ;
206- let metadata_location = format ! (
207- "{}/metadata/{}-{}.metadata.json" ,
208- & location,
209- 0 ,
210- Uuid :: new_v4( )
211- ) ;
222+ let metadata_location = MetadataLocation :: new_with_table_location ( location) . to_string ( ) ;
212223
213224 metadata. write_to ( & self . file_io , & metadata_location) . await ?;
214225
@@ -226,15 +237,8 @@ impl Catalog for MemoryCatalog {
226237 async fn load_table ( & self , table_ident : & TableIdent ) -> Result < Table > {
227238 let root_namespace_state = self . root_namespace_state . lock ( ) . await ;
228239
229- let metadata_location = root_namespace_state. get_existing_table_location ( table_ident) ?;
230- let metadata = TableMetadata :: read_from ( & self . file_io , metadata_location) . await ?;
231-
232- Table :: builder ( )
233- . file_io ( self . file_io . clone ( ) )
234- . metadata_location ( metadata_location. clone ( ) )
235- . metadata ( metadata)
236- . identifier ( table_ident. clone ( ) )
237- . build ( )
240+ self . load_table_from_locked_state ( table_ident, & root_namespace_state)
241+ . await
238242 }
239243
240244 /// Drop a table from the catalog.
@@ -289,12 +293,30 @@ impl Catalog for MemoryCatalog {
289293 . build ( )
290294 }
291295
292- /// Update a table to the catalog.
293- async fn update_table ( & self , _commit : TableCommit ) -> Result < Table > {
294- Err ( Error :: new (
295- ErrorKind :: FeatureUnsupported ,
296- "MemoryCatalog does not currently support updating tables." ,
297- ) )
296+ /// Update a table in the catalog.
297+ async fn update_table ( & self , commit : TableCommit ) -> Result < Table > {
298+ let mut root_namespace_state = self . root_namespace_state . lock ( ) . await ;
299+
300+ let current_table = self
301+ . load_table_from_locked_state ( commit. identifier ( ) , & root_namespace_state)
302+ . await ?;
303+
304+ // Apply TableCommit to get staged table
305+ let staged_table = commit. apply ( current_table) ?;
306+
307+ // Write table metadata to the new location
308+ staged_table
309+ . metadata ( )
310+ . write_to (
311+ staged_table. file_io ( ) ,
312+ staged_table. metadata_location_result ( ) ?,
313+ )
314+ . await ?;
315+
316+ // Flip the pointer to reference the new metadata file.
317+ let updated_table = root_namespace_state. commit_table_update ( staged_table) ?;
318+
319+ Ok ( updated_table)
298320 }
299321}
300322
@@ -303,13 +325,15 @@ mod tests {
303325 use std:: collections:: HashSet ;
304326 use std:: hash:: Hash ;
305327 use std:: iter:: FromIterator ;
328+ use std:: vec;
306329
307330 use regex:: Regex ;
308331 use tempfile:: TempDir ;
309332
310333 use super :: * ;
311334 use crate :: io:: FileIOBuilder ;
312335 use crate :: spec:: { NestedField , PartitionSpec , PrimitiveType , Schema , SortOrder , Type } ;
336+ use crate :: transaction:: { ApplyTransactionAction , Transaction } ;
313337
314338 fn temp_path ( ) -> String {
315339 let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
@@ -335,7 +359,7 @@ mod tests {
335359 }
336360 }
337361
338- fn to_set < T : std :: cmp :: Eq + Hash > ( vec : Vec < T > ) -> HashSet < T > {
362+ fn to_set < T : Eq + Hash > ( vec : Vec < T > ) -> HashSet < T > {
339363 HashSet :: from_iter ( vec)
340364 }
341365
@@ -348,8 +372,8 @@ mod tests {
348372 . unwrap ( )
349373 }
350374
351- async fn create_table < C : Catalog > ( catalog : & C , table_ident : & TableIdent ) {
352- let _ = catalog
375+ async fn create_table < C : Catalog > ( catalog : & C , table_ident : & TableIdent ) -> Table {
376+ catalog
353377 . create_table (
354378 & table_ident. namespace ,
355379 TableCreation :: builder ( )
@@ -358,7 +382,7 @@ mod tests {
358382 . build ( ) ,
359383 )
360384 . await
361- . unwrap ( ) ;
385+ . unwrap ( )
362386 }
363387
364388 async fn create_tables < C : Catalog > ( catalog : & C , table_idents : Vec < & TableIdent > ) {
@@ -367,6 +391,14 @@ mod tests {
367391 }
368392 }
369393
394+ async fn create_table_with_namespace < C : Catalog > ( catalog : & C ) -> Table {
395+ let namespace_ident = NamespaceIdent :: new ( "abc" . into ( ) ) ;
396+ create_namespace ( catalog, & namespace_ident) . await ;
397+
398+ let table_ident = TableIdent :: new ( namespace_ident, "test" . to_string ( ) ) ;
399+ create_table ( catalog, & table_ident) . await
400+ }
401+
370402 fn assert_table_eq ( table : & Table , expected_table_ident : & TableIdent , expected_schema : & Schema ) {
371403 assert_eq ! ( table. identifier( ) , expected_table_ident) ;
372404
@@ -411,7 +443,12 @@ mod tests {
411443 fn assert_table_metadata_location_matches ( table : & Table , regex_str : & str ) {
412444 let actual = table. metadata_location ( ) . unwrap ( ) . to_string ( ) ;
413445 let regex = Regex :: new ( regex_str) . unwrap ( ) ;
414- assert ! ( regex. is_match( & actual) )
446+ assert ! (
447+ regex. is_match( & actual) ,
448+ "Expected metadata location to match regex, but got location: {} and regex: {}" ,
449+ actual,
450+ regex
451+ )
415452 }
416453
417454 #[ tokio:: test]
@@ -1063,7 +1100,7 @@ mod tests {
10631100 let table_name = "tbl1" ;
10641101 let expected_table_ident = TableIdent :: new ( namespace_ident. clone ( ) , table_name. into ( ) ) ;
10651102 let expected_table_metadata_location_regex = format ! (
1066- "^{}/tbl1/metadata/0 -{}.metadata.json$" ,
1103+ "^{}/tbl1/metadata/00000 -{}.metadata.json$" ,
10671104 namespace_location, UUID_REGEX_STR ,
10681105 ) ;
10691106
@@ -1116,7 +1153,7 @@ mod tests {
11161153 let expected_table_ident =
11171154 TableIdent :: new ( nested_namespace_ident. clone ( ) , table_name. into ( ) ) ;
11181155 let expected_table_metadata_location_regex = format ! (
1119- "^{}/tbl1/metadata/0 -{}.metadata.json$" ,
1156+ "^{}/tbl1/metadata/00000 -{}.metadata.json$" ,
11201157 nested_namespace_location, UUID_REGEX_STR ,
11211158 ) ;
11221159
@@ -1157,7 +1194,7 @@ mod tests {
11571194 let table_name = "tbl1" ;
11581195 let expected_table_ident = TableIdent :: new ( namespace_ident. clone ( ) , table_name. into ( ) ) ;
11591196 let expected_table_metadata_location_regex = format ! (
1160- "^{}/a/tbl1/metadata/0 -{}.metadata.json$" ,
1197+ "^{}/a/tbl1/metadata/00000 -{}.metadata.json$" ,
11611198 warehouse_location, UUID_REGEX_STR
11621199 ) ;
11631200
@@ -1205,7 +1242,7 @@ mod tests {
12051242 let expected_table_ident =
12061243 TableIdent :: new ( nested_namespace_ident. clone ( ) , table_name. into ( ) ) ;
12071244 let expected_table_metadata_location_regex = format ! (
1208- "^{}/a/b/tbl1/metadata/0 -{}.metadata.json$" ,
1245+ "^{}/a/b/tbl1/metadata/00000 -{}.metadata.json$" ,
12091246 warehouse_location, UUID_REGEX_STR
12101247 ) ;
12111248
@@ -1705,7 +1742,7 @@ mod tests {
17051742 . unwrap_err( )
17061743 . to_string( ) ,
17071744 format!(
1708- "TableAlreadyExists => Cannot create table {:? }. Table already exists." ,
1745+ "TableAlreadyExists => Cannot create table {:?}. Table already exists." ,
17091746 & dst_table_ident
17101747 ) ,
17111748 ) ;
@@ -1754,4 +1791,87 @@ mod tests {
17541791 metadata_location
17551792 ) ;
17561793 }
1794+
1795+ #[ tokio:: test]
1796+ async fn test_update_table ( ) {
1797+ let catalog = new_memory_catalog ( ) ;
1798+
1799+ let table = create_table_with_namespace ( & catalog) . await ;
1800+
1801+ // Assert the table doesn't contain the update yet
1802+ assert ! ( !table. metadata( ) . properties( ) . contains_key( "key" ) ) ;
1803+
1804+ // Update table metadata
1805+ let tx = Transaction :: new ( & table) ;
1806+ let updated_table = tx
1807+ . update_table_properties ( )
1808+ . set ( "key" . to_string ( ) , "value" . to_string ( ) )
1809+ . apply ( tx)
1810+ . unwrap ( )
1811+ . commit ( & catalog)
1812+ . await
1813+ . unwrap ( ) ;
1814+
1815+ assert_eq ! (
1816+ updated_table. metadata( ) . properties( ) . get( "key" ) . unwrap( ) ,
1817+ "value"
1818+ ) ;
1819+
1820+ assert_eq ! ( table. identifier( ) , updated_table. identifier( ) ) ;
1821+ assert_eq ! ( table. metadata( ) . uuid( ) , updated_table. metadata( ) . uuid( ) ) ;
1822+ assert ! ( table. metadata( ) . last_updated_ms( ) < updated_table. metadata( ) . last_updated_ms( ) ) ;
1823+ assert_ne ! ( table. metadata_location( ) , updated_table. metadata_location( ) ) ;
1824+
1825+ assert ! (
1826+ table. metadata( ) . metadata_log( ) . len( ) < updated_table. metadata( ) . metadata_log( ) . len( )
1827+ ) ;
1828+ }
1829+
1830+ #[ tokio:: test]
1831+ async fn test_update_table_fails_if_table_doesnt_exist ( ) {
1832+ let catalog = new_memory_catalog ( ) ;
1833+
1834+ let namespace_ident = NamespaceIdent :: new ( "a" . into ( ) ) ;
1835+ create_namespace ( & catalog, & namespace_ident) . await ;
1836+
1837+ // This table is not known to the catalog.
1838+ let table_ident = TableIdent :: new ( namespace_ident, "test" . to_string ( ) ) ;
1839+ let table = build_table ( table_ident) ;
1840+
1841+ let tx = Transaction :: new ( & table) ;
1842+ let err = tx
1843+ . update_table_properties ( )
1844+ . set ( "key" . to_string ( ) , "value" . to_string ( ) )
1845+ . apply ( tx)
1846+ . unwrap ( )
1847+ . commit ( & catalog)
1848+ . await
1849+ . unwrap_err ( ) ;
1850+ assert_eq ! ( err. kind( ) , ErrorKind :: TableNotFound ) ;
1851+ }
1852+
1853+ fn build_table ( ident : TableIdent ) -> Table {
1854+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
1855+
1856+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
1857+ let location = temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ;
1858+
1859+ let table_creation = TableCreation :: builder ( )
1860+ . name ( ident. name ( ) . to_string ( ) )
1861+ . schema ( simple_table_schema ( ) )
1862+ . location ( location)
1863+ . build ( ) ;
1864+ let metadata = TableMetadataBuilder :: from_table_creation ( table_creation)
1865+ . unwrap ( )
1866+ . build ( )
1867+ . unwrap ( )
1868+ . metadata ;
1869+
1870+ Table :: builder ( )
1871+ . identifier ( ident)
1872+ . metadata ( metadata)
1873+ . file_io ( file_io)
1874+ . build ( )
1875+ . unwrap ( )
1876+ }
17571877}
0 commit comments