@@ -626,24 +626,43 @@ impl PgReplicationClient {
626626 publication : Option < & str > ,
627627 ) -> EtlResult < Vec < ColumnSchema > > {
628628 let ( pub_cte, pub_pred) = if let Some ( publication) = publication {
629- (
630- format ! (
631- "with pub_attrs as (
632- select unnest(r.prattrs)
633- from pg_publication_rel r
634- left join pg_publication p on r.prpubid = p.oid
635- where p.pubname = {publication}
636- and r.prrelid = {table_id}
629+ let is_pg14_or_earlier = self . is_postgres_14_or_earlier ( ) . await ?;
630+
631+ if !is_pg14_or_earlier {
632+ (
633+ format ! (
634+ "with pub_attrs as (
635+ select unnest(r.prattrs)
636+ from pg_publication_rel r
637+ left join pg_publication p on r.prpubid = p.oid
638+ where p.pubname = {publication}
639+ and r.prrelid = {table_id}
640+ )" ,
641+ publication = quote_literal( publication) ,
642+ ) ,
643+ "and (
644+ case (select count(*) from pub_attrs)
645+ when 0 then true
646+ else (a.attnum in (select * from pub_attrs))
647+ end
637648 )" ,
638- publication = quote_literal( publication) ,
639- ) ,
640- "and (
641- case (select count(*) from pub_attrs)
642- when 0 then true
643- else (a.attnum in (select * from pub_attrs))
644- end
645- )" ,
646- )
649+ )
650+ } else {
651+ // No column-level filtering, check if table is in publication
652+ (
653+ format ! (
654+ "with pub_table as (
655+ select 1 as exists_in_pub
656+ from pg_publication_rel r
657+ left join pg_publication p on r.prpubid = p.oid
658+ where p.pubname = {publication}
659+ and r.prrelid = {table_id}
660+ )" ,
661+ publication = quote_literal( publication) ,
662+ ) ,
663+ "and (select count(*) from pub_table) > 0" ,
664+ )
665+ }
647666 } else {
648667 ( "" . into ( ) , "" )
649668 } ;
@@ -697,6 +716,26 @@ impl PgReplicationClient {
697716 Ok ( column_schemas)
698717 }
699718
719+ async fn is_postgres_14_or_earlier ( & self ) -> EtlResult < bool > {
720+ let version_query = "SHOW server_version_num" ;
721+
722+ for message in self . client . simple_query ( version_query) . await ? {
723+ if let SimpleQueryMessage :: Row ( row) = message {
724+ let version_str =
725+ Self :: get_row_value :: < String > ( & row, "server_version_num" , "server_version_num" )
726+ . await ?;
727+ let server_version: i32 = version_str. parse ( ) . unwrap_or ( 0 ) ;
728+
729+ // PostgreSQL version format is typically: MAJOR * 10000 + MINOR * 100 + PATCH
730+ // For version 14.x.x, this would be 140000 + minor * 100 + patch
731+ // For version 15.x.x, this would be 150000 + minor * 100 + patch
732+ return Ok ( server_version < 150000 ) ;
733+ }
734+ }
735+
736+ Ok ( false )
737+ }
738+
700739 /// Creates a COPY stream for reading data from a table using its OID.
701740 ///
702741 /// The stream will include only the specified columns and use text format.
0 commit comments