Skip to content

Commit f14cff3

Browse files
Fix partitioned tables
1 parent 41c2945 commit f14cff3

File tree

3 files changed

+184
-163
lines changed

3 files changed

+184
-163
lines changed

etl/src/replication/client.rs

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,8 @@ impl PgReplicationClient {
657657
when 0 then true
658658
else (a.attnum in (select * from pub_attrs))
659659
end
660-
)",
660+
)"
661+
.to_string(),
661662
)
662663
} else {
663664
// No column-level filtering, check if table is in publication
@@ -672,20 +673,90 @@ impl PgReplicationClient {
672673
)",
673674
publication = quote_literal(publication),
674675
),
675-
"and (select count(*) from pub_table) > 0",
676+
format!(
677+
"and ((select count(*) from pub_table) > 0 or exists(
678+
-- Also allow if parent table is in publication (for partitioned tables)
679+
select 1 from pg_inherits i
680+
join pg_publication_rel r on r.prrelid = i.inhparent
681+
join pg_publication p on p.oid = r.prpubid
682+
where i.inhrelid = {table_id} and p.pubname = {publication}
683+
))",
684+
publication = quote_literal(publication),
685+
),
676686
)
677687
}
678688
} else {
679-
("".into(), "")
689+
("".to_string(), "".to_string())
690+
};
691+
692+
let has_pub_cte = !pub_cte.is_empty();
693+
694+
let cte_prefix = if has_pub_cte {
695+
// If there's already a pub_cte WITH clause, add our CTEs to it with a comma
696+
format!("{pub_cte},")
697+
} else {
698+
// If no pub_cte, start our own WITH clause (no need for RECURSIVE)
699+
"with ".to_string()
680700
};
681701

682702
let column_info_query = format!(
683-
"{pub_cte}
684-
select a.attname,
703+
"{cte_prefix}
704+
-- Find direct parent of current table (if it's a partition)
705+
direct_parent as (
706+
select i.inhparent as parent_oid
707+
from pg_inherits i
708+
where i.inhrelid = {table_id}::oid
709+
limit 1
710+
),
711+
-- Get parent table's primary key columns
712+
parent_pk_cols as (
713+
select array_agg(a.attname order by x.n) as pk_column_names
714+
from pg_constraint con
715+
join unnest(con.conkey) with ordinality as x(attnum, n) on true
716+
join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
717+
join direct_parent dp on con.conrelid = dp.parent_oid
718+
where con.contype = 'p'
719+
group by con.conname
720+
),
721+
-- Check if current table has a unique index on the parent PK columns
722+
partition_has_pk_index as (
723+
select case
724+
when exists (select 1 from direct_parent)
725+
and exists (select 1 from parent_pk_cols)
726+
and exists (
727+
-- Check if there's a unique, valid index on the parent PK columns
728+
select 1
729+
from pg_index ix
730+
cross join parent_pk_cols pk
731+
where ix.indrelid = {table_id}::oid
732+
and ix.indisunique = true
733+
and ix.indisvalid = true
734+
and array(
735+
select a.attname
736+
from unnest(ix.indkey) with ordinality k(attnum, ord)
737+
join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
738+
where ord <= ix.indnkeyatts -- exclude INCLUDE columns
739+
order by ord
740+
) = pk.pk_column_names
741+
) then true
742+
else false
743+
end as has_inherited_pk
744+
)
745+
SELECT a.attname,
685746
a.atttypid,
686747
a.atttypmod,
687748
a.attnotnull,
688-
coalesce(i.indisprimary, false) as primary
749+
case
750+
-- First check for direct primary key
751+
when coalesce(i.indisprimary, false) = true then true
752+
-- Then check for inherited primary key from partitioned table parent
753+
when (select has_inherited_pk from partition_has_pk_index) = true
754+
and exists (
755+
select 1 from parent_pk_cols pk
756+
where a.attname = any(pk.pk_column_names)
757+
) then true
758+
else false
759+
end as primary
689760
from pg_attribute a
690761
left join pg_index i
691762
on a.attrelid = i.indrelid

etl/src/test_utils/test_schema.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,77 @@ pub async fn setup_test_database_schema<G: GenericClient>(
127127
}
128128
}
129129

130+
/// Creates a partitioned table with the given name and partitions.
131+
///
132+
/// This function creates:
133+
/// 1. A parent partitioned table with a primary key
134+
/// 2. Several child partitions based on the provided partition specifications
135+
///
136+
/// Returns the table ID of the parent table and a list of partition table IDs.
137+
pub async fn create_partitioned_table<G: GenericClient>(
138+
database: &PgDatabase<G>,
139+
table_name: TableName,
140+
partition_specs: &[(&str, &str)], // (partition_name, partition_constraint)
141+
) -> Result<(TableId, Vec<TableId>), tokio_postgres::Error> {
142+
let create_parent_query = format!(
143+
"CREATE TABLE {} (
144+
id bigserial,
145+
data text NOT NULL,
146+
partition_key integer NOT NULL,
147+
PRIMARY KEY (id, partition_key)
148+
) PARTITION BY RANGE (partition_key)",
149+
table_name.as_quoted_identifier()
150+
);
151+
152+
database.run_sql(&create_parent_query).await?;
153+
154+
let parent_row = database
155+
.client
156+
.as_ref()
157+
.unwrap()
158+
.query_one(
159+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
160+
WHERE n.nspname = $1 AND c.relname = $2",
161+
&[&table_name.schema, &table_name.name],
162+
)
163+
.await?;
164+
165+
let parent_table_id: TableId = parent_row.get(0);
166+
let mut partition_table_ids = Vec::new();
167+
168+
for (partition_name, partition_constraint) in partition_specs {
169+
let partition_table_name = TableName::new(
170+
table_name.schema.clone(),
171+
format!("{}_{}", table_name.name, partition_name),
172+
);
173+
174+
let create_partition_query = format!(
175+
"CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
176+
partition_table_name.as_quoted_identifier(),
177+
table_name.as_quoted_identifier(),
178+
partition_constraint
179+
);
180+
181+
database.run_sql(&create_partition_query).await?;
182+
183+
let partition_row = database
184+
.client
185+
.as_ref()
186+
.unwrap()
187+
.query_one(
188+
"SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace
189+
WHERE n.nspname = $1 AND c.relname = $2",
190+
&[&partition_table_name.schema, &partition_table_name.name],
191+
)
192+
.await?;
193+
194+
let partition_table_id: TableId = partition_row.get(0);
195+
partition_table_ids.push(partition_table_id);
196+
}
197+
198+
Ok((parent_table_id, partition_table_ids))
199+
}
200+
130201
/// Inserts users data into the database for testing purposes.
131202
pub async fn insert_users_data<G: GenericClient>(
132203
client: &mut PgDatabase<G>,

0 commit comments

Comments
 (0)