@@ -15,6 +15,8 @@ use rand::random;
1515use chrono:: { DateTime , NaiveDate , NaiveDateTime , NaiveTime , Utc } ;
1616use etl:: types:: PgNumeric ;
1717use serde_json:: json;
18+ use std:: collections:: HashMap ;
19+ use std:: num:: NonZeroU64 ;
1820use std:: str:: FromStr ;
1921use std:: sync:: Arc ;
2022use uuid:: Uuid ;
@@ -1213,3 +1215,86 @@ async fn test_large_transaction_batching() {
12131215 // Due to the batch timeout, in practice, there will be more commits than the batch size.
12141216 assert ! ( commits. len( ) >= ( insert_count / batch_size) ) ;
12151217}
1218+
1219+ #[ tokio:: test( flavor = "multi_thread" ) ]
1220+ async fn compaction_minimizes_small_files ( ) {
1221+ init_test_tracing ( ) ;
1222+
1223+ let database = spawn_source_database ( ) . await ;
1224+ let database_schema = setup_test_database_schema ( & database, TableSelection :: UsersOnly ) . await ;
1225+
1226+ let delta_database = setup_delta_connection ( ) . await ;
1227+
1228+ let store = NotifyingStore :: new ( ) ;
1229+
1230+ // Configure compaction to run after every commit for the users table.
1231+ let mut table_config: HashMap < String , Arc < etl_destinations:: deltalake:: DeltaTableConfig > > =
1232+ HashMap :: new ( ) ;
1233+ table_config. insert (
1234+ database_schema. users_schema ( ) . name . name . clone ( ) ,
1235+ Arc :: new ( etl_destinations:: deltalake:: DeltaTableConfig {
1236+ compact_after_commits : Some ( NonZeroU64 :: new ( 1 ) . unwrap ( ) ) ,
1237+ ..Default :: default ( )
1238+ } ) ,
1239+ ) ;
1240+
1241+ let raw_destination = delta_database
1242+ . build_destination_with_config ( store. clone ( ) , table_config)
1243+ . await ;
1244+ let destination = TestDestinationWrapper :: wrap ( raw_destination) ;
1245+
1246+ // Use a batch size of 1 so each insert becomes a separate commit and small file.
1247+ let pipeline_id: PipelineId = random ( ) ;
1248+ let mut pipeline = create_pipeline_with (
1249+ & database. config ,
1250+ pipeline_id,
1251+ database_schema. publication_name ( ) ,
1252+ store. clone ( ) ,
1253+ destination. clone ( ) ,
1254+ Some ( BatchConfig {
1255+ max_size : 1 ,
1256+ max_fill_ms : 1000 ,
1257+ } ) ,
1258+ ) ;
1259+
1260+ let users_state_notify = store
1261+ . notify_on_table_state_type (
1262+ database_schema. users_schema ( ) . id ,
1263+ TableReplicationPhaseType :: SyncDone ,
1264+ )
1265+ . await ;
1266+
1267+ pipeline. start ( ) . await . unwrap ( ) ;
1268+ users_state_notify. notified ( ) . await ;
1269+
1270+ // Generate several inserts to create many small files (one per commit).
1271+ let insert_count: u64 = 12 ;
1272+ let event_notify = destination
1273+ . wait_for_events_count ( vec ! [ ( EventType :: Insert , insert_count) ] )
1274+ . await ;
1275+
1276+ for i in 1 ..=insert_count {
1277+ database
1278+ . insert_values (
1279+ database_schema. users_schema ( ) . name . clone ( ) ,
1280+ & [ "name" , "age" ] ,
1281+ & [ & format ! ( "c_user_{i}" ) , & ( i as i32 ) ] ,
1282+ )
1283+ . await
1284+ . unwrap ( ) ;
1285+ }
1286+
1287+ event_notify. notified ( ) . await ;
1288+
1289+ tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) . await ;
1290+
1291+ pipeline. shutdown_and_wait ( ) . await . unwrap ( ) ;
1292+
1293+ let users_table = delta_database
1294+ . load_table ( & database_schema. users_schema ( ) . name )
1295+ . await
1296+ . unwrap ( ) ;
1297+
1298+ assert_table_snapshot ! ( "compaction_minimizes_small_files" , users_table. clone( ) ) ;
1299+ assert ! ( users_table. snapshot( ) . unwrap( ) . file_paths_iter( ) . count( ) <= 12 ) ;
1300+ }
0 commit comments