@@ -659,7 +659,8 @@ impl PgReplicationClient {
659659 when 0 then true
660660 else (a.attnum in (select * from pub_attrs))
661661 end
662- )" ,
662+ )"
663+ . to_string ( ) ,
663664 )
664665 } else {
665666 // Postgres 14 or earlier or unknown, fallback to no column-level filtering
@@ -674,20 +675,90 @@ impl PgReplicationClient {
674675 )" ,
675676 publication = quote_literal( publication) ,
676677 ) ,
677- "and (select count(*) from pub_table) > 0" ,
678+ format ! (
679+ "and ((select count(*) from pub_table) > 0 or exists(
680+ -- Also allow if parent table is in publication (for partitioned tables)
681+ select 1 from pg_inherits i
682+ join pg_publication_rel r on r.prrelid = i.inhparent
683+ join pg_publication p on p.oid = r.prpubid
684+ where i.inhrelid = {table_id} and p.pubname = {publication}
685+ ))" ,
686+ publication = quote_literal( publication) ,
687+ ) ,
678688 )
679689 }
680690 } else {
681- ( "" . into ( ) , "" )
691+ ( "" . to_string ( ) , "" . to_string ( ) )
692+ } ;
693+
694+ let has_pub_cte = !pub_cte. is_empty ( ) ;
695+
696+ let cte_prefix = if has_pub_cte {
697+ // If there's already a pub_cte WITH clause, add our CTEs to it with a comma
698+ format ! ( "{pub_cte}," )
699+ } else {
700+ // If no pub_cte, start our own WITH clause (no need for RECURSIVE)
701+ "with " . to_string ( )
682702 } ;
683703
684704 let column_info_query = format ! (
685- "{pub_cte}
686- select a.attname,
705+ "{cte_prefix}
706+ -- Find direct parent of current table (if it's a partition)
707+ direct_parent as (
708+ select i.inhparent as parent_oid
709+ from pg_inherits i
710+ where i.inhrelid = {table_id}::oid
711+ limit 1
712+ ),
713+ -- Get parent table's primary key columns
714+ parent_pk_cols as (
715+ select array_agg(a.attname order by x.n) as pk_column_names
716+ from pg_constraint con
717+ join unnest(con.conkey) with ordinality as x(attnum, n) on true
718+ join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
719+ join direct_parent dp on con.conrelid = dp.parent_oid
720+ where con.contype = 'p'
721+ group by con.conname
722+ ),
723+ -- Check if current table has a unique index on the parent PK columns
724+ partition_has_pk_index as (
725+ select case
726+ when exists (select 1 from direct_parent)
727+ and exists (select 1 from parent_pk_cols)
728+ and exists (
729+ -- Check if there's a unique, valid index on the parent PK columns
730+ select 1
731+ from pg_index ix
732+ cross join parent_pk_cols pk
733+ where ix.indrelid = {table_id}::oid
734+ and ix.indisunique = true
735+ and ix.indisvalid = true
736+ and array(
737+ select a.attname
738+ from unnest(ix.indkey) with ordinality k(attnum, ord)
739+ join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
740+ where ord <= ix.indnkeyatts -- exclude INCLUDE columns
741+ order by ord
742+ ) = pk.pk_column_names
743+ ) then true
744+ else false
745+ end as has_inherited_pk
746+ )
747+ SELECT a.attname,
687748 a.atttypid,
688749 a.atttypmod,
689750 a.attnotnull,
690- coalesce(i.indisprimary, false) as primary
751+ case
752+ -- First check for direct primary key
753+ when coalesce(i.indisprimary, false) = true then true
754+ -- Then check for inherited primary key from partitioned table parent
755+ when (select has_inherited_pk from partition_has_pk_index) = true
756+ and exists (
757+ select 1 from parent_pk_cols pk
758+ where a.attname = any(pk.pk_column_names)
759+ ) then true
760+ else false
761+ end as primary
691762 from pg_attribute a
692763 left join pg_index i
693764 on a.attrelid = i.indrelid
0 commit comments