@@ -814,31 +814,12 @@ impl PgReplicationClient {
814814 . collect :: < Vec < _ > > ( )
815815 . join ( ", " ) ;
816816
817- let copy_query = if self . is_partitioned_table ( table_id) . await ?
818- && let leaf_partitions = self . get_leaf_partition_ids ( table_id) . await ?
819- && !leaf_partitions. is_empty ( )
820- {
821- let mut selects = Vec :: with_capacity ( leaf_partitions. len ( ) ) ;
822- for child_id in leaf_partitions {
823- let child_name = self . get_table_name ( child_id) . await ?;
824- let select = format ! (
825- "select {} from {}" ,
826- column_list,
827- child_name. as_quoted_identifier( )
828- ) ;
829- selects. push ( select) ;
830- }
831-
832- let union_query = selects. join ( " union all " ) ;
833- format ! ( r#"copy ({union_query}) to stdout with (format text);"# )
834- } else {
835- let table_name = self . get_table_name ( table_id) . await ?;
836- format ! (
837- r#"copy {} ({}) to stdout with (format text);"# ,
838- table_name. as_quoted_identifier( ) ,
839- column_list
840- )
841- } ;
817+ let table_name = self . get_table_name ( table_id) . await ?;
818+ let copy_query = format ! (
819+ r#"copy (select {} from {}) to stdout with (format text);"# ,
820+ column_list,
821+ table_name. as_quoted_identifier( )
822+ ) ;
842823
843824 // TODO: allow passing in format binary or text
844825 let stream = self . client . copy_out_simple ( & copy_query) . await ?;
@@ -876,53 +857,4 @@ impl PgReplicationClient {
876857 )
877858 } )
878859 }
879-
880- /// Returns true if the given table id refers to a partitioned table (relkind = 'p').
881- async fn is_partitioned_table ( & self , table_id : TableId ) -> EtlResult < bool > {
882- let query = format ! ( "select c.relkind from pg_class c where c.oid = {table_id}" ) ;
883-
884- for msg in self . client . simple_query ( & query) . await ? {
885- if let SimpleQueryMessage :: Row ( row) = msg {
886- let relkind = Self :: get_row_value :: < String > ( & row, "relkind" , "pg_class" ) . await ?;
887- return Ok ( relkind == "p" ) ;
888- }
889- }
890-
891- bail ! (
892- ErrorKind :: SourceSchemaError ,
893- "Table not found" ,
894- format!( "Table not found in database (table id: {})" , table_id)
895- ) ;
896- }
897-
898- /// Returns all leaf partition OIDs for a partitioned table.
899- async fn get_leaf_partition_ids ( & self , parent_id : TableId ) -> EtlResult < Vec < TableId > > {
900- let query = format ! (
901- r#"
902- with recursive parts(relid) as (
903- select i.inhrelid
904- from pg_inherits i
905- where i.inhparent = {parent_id}
906- union all
907- select i.inhrelid
908- from pg_inherits i
909- join parts p on p.relid = i.inhparent
910- )
911- select p.relid as oid
912- from parts p
913- left join pg_inherits i on i.inhparent = p.relid
914- where i.inhrelid is null
915- "#
916- ) ;
917-
918- let mut ids = Vec :: new ( ) ;
919- for msg in self . client . simple_query ( & query) . await ? {
920- if let SimpleQueryMessage :: Row ( row) = msg {
921- let id = Self :: get_row_value :: < TableId > ( & row, "oid" , "pg_inherits" ) . await ?;
922- ids. push ( id) ;
923- }
924- }
925-
926- Ok ( ids)
927- }
928860}
0 commit comments