88 * ------------------------------------------------------------------------
99 */
1010
11- #include "partition_filter .h"
11+ #include "init .h"
1212#include "nodes_common.h"
13+ #include "partition_filter.h"
1314#include "utils.h"
14- #include "init.h"
1515
16+ #include "foreign/fdwapi.h"
17+ #include "foreign/foreign.h"
18+ #include "nodes/nodeFuncs.h"
1619#include "utils/guc.h"
1720#include "utils/memutils.h"
18- #include "nodes/nodeFuncs.h"
1921#include "utils/lsyscache.h"
2022#include "utils/syscache.h"
2123
@@ -35,8 +37,26 @@ typedef struct
3537 bool estate_not_modified ; /* did we modify EState somehow? */
3638} estate_mod_data ;
3739
40+ /*
41+ * Allow INSERTs into any FDW \ postgres_fdw \ no FDWs at all.
42+ */
43+ typedef enum
44+ {
45+ PF_FDW_INSERT_DISABLED = 0 , /* INSERTs into FDWs are prohibited */
46+ PF_FDW_INSERT_POSTGRES , /* INSERTs into postgres_fdw are OK */
47+ PF_FDW_INSERT_ANY_FDW /* INSERTs into any FDWs are OK */
48+ } PF_insert_fdw_mode ;
49+
50+ static const struct config_enum_entry pg_pathman_insert_into_fdw_options [] = {
51+ { "disabled" , PF_FDW_INSERT_DISABLED , false },
52+ { "postgres" , PF_FDW_INSERT_POSTGRES , false },
53+ { "any_fdw" , PF_FDW_INSERT_ANY_FDW , false },
54+ { NULL , 0 , false }
55+ };
56+
3857
3958bool pg_pathman_enable_partition_filter = true;
59+ int pg_pathman_insert_into_fdw = PF_FDW_INSERT_POSTGRES ;
4060
4161CustomScanMethods partition_filter_plan_methods ;
4262CustomExecMethods partition_filter_exec_methods ;
@@ -47,6 +67,9 @@ static void partition_filter_visitor(Plan *plan, void *context);
4767static List * pfilter_build_tlist (List * tlist );
4868static Index append_rte_to_estate (EState * estate , RangeTblEntry * rte );
4969static int append_rri_to_estate (EState * estate , ResultRelInfo * rri );
70+ static void prepare_rri_fdw_for_insert (EState * estate ,
71+ ResultRelInfoHolder * rri_holder ,
72+ void * arg );
5073
5174
5275void
@@ -74,6 +97,18 @@ init_partition_filter_static_data(void)
7497 NULL ,
7598 NULL ,
7699 NULL );
100+
101+ DefineCustomEnumVariable ("pg_pathman.insert_into_fdw" ,
102+ "Allow INSERTS into FDW partitions." ,
103+ NULL ,
104+ & pg_pathman_insert_into_fdw ,
105+ PF_FDW_INSERT_POSTGRES ,
106+ pg_pathman_insert_into_fdw_options ,
107+ PGC_SUSET ,
108+ 0 ,
109+ NULL ,
110+ NULL ,
111+ NULL );
77112}
78113
79114
@@ -179,6 +214,7 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
179214 Index child_rte_idx ;
180215 ResultRelInfo * part_result_rel_info ;
181216
217+ /* Lock partition and check if it exists */
182218 LockRelationOid (partid , parts_storage -> head_open_lock_mode );
183219 if (!SearchSysCacheExists1 (RELOID , ObjectIdGetDatum (partid )))
184220 {
@@ -236,18 +272,18 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
236272 /* ri_ConstraintExprs will be initialized by ExecRelCheck() */
237273 part_result_rel_info -> ri_ConstraintExprs = NULL ;
238274
239- /* Now fill the ResultRelInfo holder */
275+ /* Finally fill the ResultRelInfo holder */
240276 rri_holder -> partid = partid ;
241277 rri_holder -> result_rel_info = part_result_rel_info ;
242278
243- /* Append ResultRelInfo to storage->es_alloc_result_rels */
244- append_rri_to_estate (parts_storage -> estate , part_result_rel_info );
245-
246279 /* Call on_new_rri_holder_callback() if needed */
247280 if (parts_storage -> on_new_rri_holder_callback )
248281 parts_storage -> on_new_rri_holder_callback (parts_storage -> estate ,
249282 rri_holder ,
250283 parts_storage -> callback_arg );
284+
285+ /* Append ResultRelInfo to storage->es_alloc_result_rels */
286+ append_rri_to_estate (parts_storage -> estate , part_result_rel_info );
251287 }
252288
253289 return rri_holder ;
@@ -351,7 +387,7 @@ partition_filter_begin(CustomScanState *node, EState *estate, int eflags)
351387 /* Init ResultRelInfo cache */
352388 init_result_parts_storage (& state -> result_parts , estate ,
353389 state -> on_conflict_action != ONCONFLICT_NONE ,
354- ResultPartsStorageStandard , NULL , NULL );
390+ ResultPartsStorageStandard , prepare_rri_fdw_for_insert , NULL );
355391
356392 state -> warning_triggered = false;
357393}
@@ -499,6 +535,148 @@ select_partition_for_insert(const PartRelationInfo *prel,
499535 return rri_holder ;
500536}
501537
538+ /*
539+ * Callback to be executed on FDW partitions.
540+ */
541+ static void
542+ prepare_rri_fdw_for_insert (EState * estate ,
543+ ResultRelInfoHolder * rri_holder ,
544+ void * arg )
545+ {
546+ ResultRelInfo * rri = rri_holder -> result_rel_info ;
547+ FdwRoutine * fdw_routine = rri -> ri_FdwRoutine ;
548+ Oid partid ;
549+
550+ /* Nothing to do if not FDW */
551+ if (fdw_routine == NULL )
552+ return ;
553+
554+ partid = RelationGetRelid (rri -> ri_RelationDesc );
555+
556+ /* Perform some checks according to 'pg_pathman_insert_into_fdw' */
557+ switch (pg_pathman_insert_into_fdw )
558+ {
559+ case PF_FDW_INSERT_DISABLED :
560+ elog (ERROR , "INSERTs into FDW partitions are disabled" );
561+ break ;
562+
563+ case PF_FDW_INSERT_POSTGRES :
564+ {
565+ ForeignDataWrapper * fdw ;
566+ ForeignServer * fserver ;
567+
568+ /* Check if it's PostgreSQL FDW */
569+ fserver = GetForeignServer (GetForeignTable (partid )-> serverid );
570+ fdw = GetForeignDataWrapper (fserver -> fdwid );
571+ if (strcmp ("postgres_fdw" , fdw -> fdwname ) != 0 )
572+ elog (ERROR , "FDWs other than postgres_fdw are restricted" );
573+ }
574+ break ;
575+
576+ case PF_FDW_INSERT_ANY_FDW :
577+ {
578+ ForeignDataWrapper * fdw ;
579+ ForeignServer * fserver ;
580+
581+ fserver = GetForeignServer (GetForeignTable (partid )-> serverid );
582+ fdw = GetForeignDataWrapper (fserver -> fdwid );
583+ if (strcmp ("postgres_fdw" , fdw -> fdwname ) != 0 )
584+ elog (WARNING , "unrestricted FDW mode may lead to \"%s\" crashes" ,
585+ fdw -> fdwname );
586+ }
587+ break ; /* do nothing */
588+
589+ default :
590+ elog (ERROR , "Mode is not implemented yet" );
591+ break ;
592+ }
593+
594+ if (fdw_routine -> PlanForeignModify )
595+ {
596+ RangeTblEntry * rte ;
597+ ModifyTableState mtstate ;
598+ List * fdw_private ;
599+ Query query ;
600+ PlannedStmt * plan ;
601+ TupleDesc tupdesc ;
602+ int i ,
603+ target_attr ;
604+
605+ /* Fetch RangeTblEntry for partition */
606+ rte = rt_fetch (rri -> ri_RangeTableIndex , estate -> es_range_table );
607+
608+ /* Fetch tuple descriptor */
609+ tupdesc = RelationGetDescr (rri -> ri_RelationDesc );
610+
611+ /* Create fake Query node */
612+ memset ((void * ) & query , 0 , sizeof (Query ));
613+ NodeSetTag (& query , T_Query );
614+
615+ query .commandType = CMD_INSERT ;
616+ query .querySource = QSRC_ORIGINAL ;
617+ query .resultRelation = 1 ;
618+ query .rtable = list_make1 (copyObject (rte ));
619+ query .jointree = makeNode (FromExpr );
620+
621+ query .targetList = NIL ;
622+ query .returningList = NIL ;
623+
624+ /* Generate 'query.targetList' using 'tupdesc' */
625+ target_attr = 1 ;
626+ for (i = 0 ; i < tupdesc -> natts ; i ++ )
627+ {
628+ Form_pg_attribute attr ;
629+ TargetEntry * te ;
630+ Param * param ;
631+
632+ attr = tupdesc -> attrs [i ];
633+
634+ if (attr -> attisdropped )
635+ continue ;
636+
637+ param = makeNode (Param );
638+ param -> paramkind = PARAM_EXTERN ;
639+ param -> paramid = target_attr ;
640+ param -> paramtype = attr -> atttypid ;
641+ param -> paramtypmod = attr -> atttypmod ;
642+ param -> paramcollid = attr -> attcollation ;
643+ param -> location = -1 ;
644+
645+ te = makeTargetEntry ((Expr * ) param , target_attr ,
646+ pstrdup (NameStr (attr -> attname )),
647+ false);
648+
649+ query .targetList = lappend (query .targetList , te );
650+
651+ target_attr ++ ;
652+ }
653+
654+ /* Create fake ModifyTableState */
655+ memset ((void * ) & mtstate , 0 , sizeof (ModifyTableState ));
656+ NodeSetTag (& mtstate , T_ModifyTableState );
657+ mtstate .ps .state = estate ;
658+ mtstate .operation = CMD_INSERT ;
659+ mtstate .resultRelInfo = rri ;
660+ mtstate .mt_onconflict = ONCONFLICT_NONE ;
661+
662+ /* Plan fake query in for FDW access to be planned as well */
663+ elog (DEBUG1 , "FDW(%u): plan fake query for fdw_private" , partid );
664+ plan = standard_planner (& query , 0 , NULL );
665+
666+ /* Extract fdw_private from useless plan */
667+ elog (DEBUG1 , "FDW(%u): extract fdw_private" , partid );
668+ fdw_private = (List * )
669+ linitial (((ModifyTable * ) plan -> planTree )-> fdwPrivLists );
670+
671+ /* call BeginForeignModify on 'rri' */
672+ elog (DEBUG1 , "FDW(%u): call BeginForeignModify on a fake INSERT node" , partid );
673+ fdw_routine -> BeginForeignModify (& mtstate , rri , fdw_private , 0 , 0 );
674+
675+ /* Report success */
676+ elog (DEBUG1 , "FDW(%u): success" , partid );
677+ }
678+ }
679+
502680/*
503681 * Used by fetch_estate_mod_data() to find estate_mod_data.
504682 */
@@ -581,7 +759,11 @@ append_rri_to_estate(EState *estate, ResultRelInfo *rri)
581759 estate -> es_num_result_relations * sizeof (ResultRelInfo ));
582760 }
583761
584- /* Append ResultRelInfo to 'es_result_relations' array */
762+ /*
763+ * Append ResultRelInfo to 'es_result_relations' array.
764+ * NOTE: this is probably safe since ResultRelInfo
765+ * contains nothing but pointers to various structs.
766+ */
585767 estate -> es_result_relations [estate -> es_num_result_relations ] = * rri ;
586768
587769 /* Update estate_mod_data */
0 commit comments