1+ use std:: num:: NonZeroI32 ;
2+
13use etl_config:: shared:: { IntoConnectOptions , PgConnectionConfig } ;
24use tokio:: runtime:: Handle ;
35use tokio_postgres:: types:: { ToSql , Type } ;
46use tokio_postgres:: { Client , GenericClient , NoTls , Transaction } ;
57use tracing:: info;
68
9+ use crate :: replication:: extract_server_version;
710use crate :: types:: { ColumnSchema , TableId , TableName } ;
811
912/// Table modification operations for ALTER TABLE statements.
@@ -34,10 +37,15 @@ pub enum TableModification<'a> {
3437pub struct PgDatabase < G > {
3538 pub config : PgConnectionConfig ,
3639 pub client : Option < G > ,
40+ server_version : Option < NonZeroI32 > ,
3741 destroy_on_drop : bool ,
3842}
3943
4044impl < G : GenericClient > PgDatabase < G > {
45+ pub fn server_version ( & self ) -> Option < NonZeroI32 > {
46+ self . server_version
47+ }
48+
4149 /// Creates a Postgres publication for the specified tables.
4250 ///
4351 /// Sets up logical replication by creating a publication that includes
@@ -71,19 +79,51 @@ impl<G: GenericClient> PgDatabase<G> {
7179 publication_name : & str ,
7280 schema : Option < & str > ,
7381 ) -> Result < ( ) , tokio_postgres:: Error > {
74- let create_publication_query = match schema {
75- Some ( schema_name) => format ! (
76- "create publication {} for tables in schema {}" ,
77- publication_name, schema_name
78- ) ,
79- None => format ! ( "create publication {} for all tables" , publication_name) ,
80- } ;
81-
82- self . client
83- . as_ref ( )
84- . unwrap ( )
85- . execute ( & create_publication_query, & [ ] )
86- . await ?;
82+ let client = self . client . as_ref ( ) . unwrap ( ) ;
83+
84+ if let Some ( server_version) = self . server_version
85+ && server_version. get ( ) >= 150000
86+ {
87+ // PostgreSQL 15+ supports FOR ALL TABLES IN SCHEMA syntax
88+ let create_publication_query = match schema {
89+ Some ( schema_name) => format ! (
90+ "create publication {} for tables in schema {}" ,
91+ publication_name, schema_name
92+ ) ,
93+ None => format ! ( "create publication {} for all tables" , publication_name) ,
94+ } ;
95+
96+ client. execute ( & create_publication_query, & [ ] ) . await ?;
97+ } else {
98+ // PostgreSQL 14 and earlier: create publication and add tables individually
99+ match schema {
100+ Some ( schema_name) => {
101+ let create_pub_query = format ! ( "create publication {}" , publication_name) ;
102+ client. execute ( & create_pub_query, & [ ] ) . await ?;
103+
104+ let tables_query = format ! (
105+ "select schemaname, tablename from pg_tables where schemaname = '{}'" ,
106+ schema_name
107+ ) ;
108+ let rows = client. query ( & tables_query, & [ ] ) . await ?;
109+
110+ for row in rows {
111+ let schema: String = row. get ( 0 ) ;
112+ let table: String = row. get ( 1 ) ;
113+ let add_table_query = format ! (
114+ "alter publication {} add table {}.{}" ,
115+ publication_name, schema, table
116+ ) ;
117+ client. execute ( & add_table_query, & [ ] ) . await ?;
118+ }
119+ }
120+ None => {
121+ let create_publication_query =
122+ format ! ( "create publication {} for all tables" , publication_name) ;
123+ client. execute ( & create_publication_query, & [ ] ) . await ?;
124+ }
125+ }
126+ }
87127
88128 Ok ( ( ) )
89129 }
@@ -369,7 +409,8 @@ impl PgDatabase<Client> {
369409
370410 Self {
371411 config,
372- client : Some ( client) ,
412+ client : Some ( client. 0 ) ,
413+ server_version : client. 1 ,
373414 destroy_on_drop : true ,
374415 }
375416 }
@@ -386,7 +427,8 @@ impl PgDatabase<Client> {
386427
387428 Self {
388429 config,
389- client : Some ( client) ,
430+ client : Some ( client. 0 ) ,
431+ server_version : client. 1 ,
390432 destroy_on_drop : true ,
391433 }
392434 }
@@ -401,6 +443,7 @@ impl PgDatabase<Client> {
401443 PgDatabase {
402444 config : self . config . clone ( ) ,
403445 client : Some ( transaction) ,
446+ server_version : self . server_version ,
404447 destroy_on_drop : false ,
405448 }
406449 }
@@ -450,7 +493,7 @@ pub fn id_column_schema() -> ColumnSchema {
450493///
451494/// # Panics
452495/// Panics if connection or database creation fails.
453- pub async fn create_pg_database ( config : & PgConnectionConfig ) -> Client {
496+ pub async fn create_pg_database ( config : & PgConnectionConfig ) -> ( Client , Option < NonZeroI32 > ) {
454497 // Create the database via a single connection
455498 let ( client, connection) = {
456499 let config: tokio_postgres:: Config = config. without_db ( ) ;
@@ -474,14 +517,16 @@ pub async fn create_pg_database(config: &PgConnectionConfig) -> Client {
474517 . expect ( "Failed to create database" ) ;
475518
476519 // Connects to the actual Postgres database
477- connect_to_pg_database ( config) . await
520+ let ( client, server_version) = connect_to_pg_database ( config) . await ;
521+
522+ ( client, server_version)
478523}
479524
480525/// Connects to an existing Postgres database.
481526///
482527/// Establishes a client connection to the database specified in the configuration.
483528/// Assumes the database already exists.
484- pub async fn connect_to_pg_database ( config : & PgConnectionConfig ) -> Client {
529+ pub async fn connect_to_pg_database ( config : & PgConnectionConfig ) -> ( Client , Option < NonZeroI32 > ) {
485530 // Create a new client connected to the created database
486531 let ( client, connection) = {
487532 let config: tokio_postgres:: Config = config. with_db ( ) ;
@@ -490,6 +535,9 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client {
490535 . await
491536 . expect ( "Failed to connect to Postgres" )
492537 } ;
538+ let server_version = connection
539+ . parameter ( "server_version" )
540+ . and_then ( extract_server_version) ;
493541
494542 // Spawn the connection on a new task
495543 tokio:: spawn ( async move {
@@ -498,7 +546,7 @@ pub async fn connect_to_pg_database(config: &PgConnectionConfig) -> Client {
498546 }
499547 } ) ;
500548
501- client
549+ ( client, server_version )
502550}
503551
504552/// Drops a Postgres database and cleans up all resources.
0 commit comments