@@ -17,23 +17,84 @@ use std::path::Path;
1717use databend_common_base:: base:: tokio;
1818use databend_common_exception:: ErrorCode ;
1919use databend_common_exception:: Result ;
20+ use databend_common_expression:: block_debug:: pretty_format_blocks;
21+ use databend_common_expression:: DataBlock ;
2022use databend_enterprise_query:: test_kits:: context:: EESetup ;
2123use databend_query:: sessions:: QueryContext ;
2224use databend_query:: sessions:: TableContext ;
2325use databend_query:: test_kits:: TestFixture ;
26+ use futures_util:: TryStreamExt ;
27+ use goldenfile:: Mint ;
2428
2529// TODO investigate this
2630// NOTE: SHOULD specify flavor = "multi_thread", otherwise query execution might be hanged
2731#[ tokio:: test( flavor = "multi_thread" ) ]
28- async fn test_vacuum2_all ( ) -> Result < ( ) > {
32+ async fn test_table_function_fuse_vacuum2_all ( ) -> Result < ( ) > {
2933 let ee_setup = EESetup :: new ( ) ;
3034 let fixture = TestFixture :: setup_with_custom ( ee_setup) . await ?;
35+
36+ setup ( & fixture) . await ?;
37+
38+
39+ // vacuum them all
40+ let res = fixture. execute_command ( "call system$fuse_vacuum2()" ) . await ;
41+
42+ // Check that:
43+
44+ // 1. non-fuse tables should not stop us
45+
46+ assert ! ( res. is_ok( ) ) ;
47+
48+ // 2. fuse table data should be vacuumed
49+
50+ let storage_root = fixture. storage_root ( ) ;
51+
52+ let ctx = fixture. new_query_ctx ( ) . await ?;
53+ check_files_left ( & ctx, storage_root, "db1" , "t1" ) . await ?;
54+ check_files_left ( & ctx, storage_root, "default" , "t1" ) . await ?;
55+
56+ Ok ( ( ) )
57+ }
58+
59+ #[ tokio:: test( flavor = "multi_thread" ) ]
60+ async fn test_vacuum_all_stmt ( ) -> Result < ( ) > {
61+ let ee_setup = EESetup :: new ( ) ;
62+ let fixture = TestFixture :: setup_with_custom ( ee_setup) . await ?;
63+
64+ setup ( & fixture) . await ?;
65+
66+ let mut mint = Mint :: new ( "tests/it/testdata" ) ;
67+ let file = & mut mint. new_goldenfile ( "vacuum_all_stmt.txt" ) . unwrap ( ) ;
68+
69+ // execute the VACUUM TABLE ALL
70+ let res_block = fixture
71+ . execute_query ( "vacuum table all" )
72+ . await ?
73+ . try_collect :: < Vec < DataBlock > > ( )
74+ . await ?;
75+
76+ let block_string = pretty_format_blocks ( & res_block) . unwrap ( ) ;
77+ use std:: io:: Write ;
78+ writeln ! ( file, "{}" , block_string) . unwrap ( ) ;
79+
80+ // Check: fuse table data should be vacuumed
81+
82+ let storage_root = fixture. storage_root ( ) ;
83+
84+ let ctx = fixture. new_query_ctx ( ) . await ?;
85+ check_files_left ( & ctx, storage_root, "db1" , "t1" ) . await ?;
86+ check_files_left ( & ctx, storage_root, "default" , "t1" ) . await ?;
87+
88+ Ok ( ( ) )
89+ }
90+
91+ async fn setup ( fixture : & TestFixture ) -> Result < ( ) > {
92+
3193 // Adjust retention period to 0, so that dropped tables will be vacuumed immediately
3294 let session = fixture. default_session ( ) ;
3395 session. get_settings ( ) . set_data_retention_time_in_days ( 0 ) ?;
3496
35- let ctx = fixture. new_query_ctx ( ) . await ?;
36-
97+ // Prepare test db / tables
3798 let setup_statements = vec ! [
3899 // create non-system db1, create fuse and non-fuse table in it.
39100 "create database db1" ,
@@ -53,67 +114,49 @@ async fn test_vacuum2_all() -> Result<()> {
53114 for stmt in setup_statements {
54115 fixture. execute_command ( stmt) . await ?;
55116 }
56-
57- // vacuum them all
58- let res = fixture. execute_command ( "call system$fuse_vacuum2()" ) . await ;
59-
60- // Check that:
61-
62- // 1. non-fuse tables should not stop us
63-
64- assert ! ( res. is_ok( ) ) ;
65-
66- // 2. fuse table data should be vacuumed
67-
68- let storage_root = fixture. storage_root ( ) ;
69-
70- async fn check_files_left (
71- ctx : & QueryContext ,
72- storage_root : & str ,
73- db_name : & str ,
74- tbl_name : & str ,
75- ) -> Result < ( ) > {
76- let tenant = ctx. get_tenant ( ) ;
77- let table = ctx
78- . get_default_catalog ( ) ?
79- . get_table ( & tenant, db_name, tbl_name)
80- . await ?;
81-
82- let db = ctx
83- . get_default_catalog ( ) ?
84- . get_database ( & tenant, db_name)
85- . await ?;
86-
87- let path = Path :: new ( storage_root)
88- . join ( db. get_db_info ( ) . database_id . db_id . to_string ( ) )
89- . join ( table. get_id ( ) . to_string ( ) ) ;
90-
91- let walker = walkdir:: WalkDir :: new ( path) . into_iter ( ) ;
92-
93- let mut files_left = Vec :: new ( ) ;
94- for entry in walker {
95- let entry = entry. unwrap ( ) ;
96- if entry. file_type ( ) . is_file ( ) {
97- files_left. push ( entry) ;
98- }
117+ Ok ( ( ) )
118+ }
119+ async fn check_files_left (
120+ ctx : & QueryContext ,
121+ storage_root : & str ,
122+ db_name : & str ,
123+ tbl_name : & str ,
124+ ) -> Result < ( ) > {
125+ let tenant = ctx. get_tenant ( ) ;
126+ let table = ctx
127+ . get_default_catalog ( ) ?
128+ . get_table ( & tenant, db_name, tbl_name)
129+ . await ?;
130+
131+ let db = ctx
132+ . get_default_catalog ( ) ?
133+ . get_database ( & tenant, db_name)
134+ . await ?;
135+
136+ let path = Path :: new ( storage_root)
137+ . join ( db. get_db_info ( ) . database_id . db_id . to_string ( ) )
138+ . join ( table. get_id ( ) . to_string ( ) ) ;
139+
140+ let walker = walkdir:: WalkDir :: new ( path) . into_iter ( ) ;
141+
142+ let mut files_left = Vec :: new ( ) ;
143+ for entry in walker {
144+ let entry = entry. unwrap ( ) ;
145+ if entry. file_type ( ) . is_file ( ) {
146+ files_left. push ( entry) ;
99147 }
100-
101- // There should be one snapshot file and one snapshot hint file left
102- assert_eq ! ( files_left. len( ) , 2 ) ;
103-
104- files_left. sort_by ( |a, b| a. file_name ( ) . cmp ( b. file_name ( ) ) ) ;
105- // First is the only snapshot left
106- files_left[ 0 ] . path ( ) . to_string_lossy ( ) . contains ( "/_ss/" ) ;
107- // Second one is the last snapshot location hint
108- files_left[ 1 ]
109- . path ( )
110- . to_string_lossy ( )
111- . contains ( "last_snapshot_location_hint_v2" ) ;
112- Ok :: < ( ) , ErrorCode > ( ( ) )
113148 }
114149
115- check_files_left ( & ctx, storage_root, "db1" , "t1" ) . await ?;
116- check_files_left ( & ctx, storage_root, "default" , "t1" ) . await ?;
117-
118- Ok ( ( ) )
150+ // There should be one snapshot file and one snapshot hint file left
151+ assert_eq ! ( files_left. len( ) , 2 ) ;
152+
153+ files_left. sort_by ( |a, b| a. file_name ( ) . cmp ( b. file_name ( ) ) ) ;
154+ // First is the only snapshot left
155+ files_left[ 0 ] . path ( ) . to_string_lossy ( ) . contains ( "/_ss/" ) ;
156+ // Second one is the last snapshot location hint
157+ files_left[ 1 ]
158+ . path ( )
159+ . to_string_lossy ( )
160+ . contains ( "last_snapshot_location_hint_v2" ) ;
161+ Ok :: < ( ) , ErrorCode > ( ( ) )
119162}
0 commit comments