1+ /* ------------------------------------------------------------------------
2+ *
3+ * copy_stmt_hooking.c
4+ * Override COPY TO/FROM statement for partitioned tables
5+ *
6+ * Copyright (c) 2016, Postgres Professional
7+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
8+ * Portions Copyright (c) 1994, Regents of the University of California
9+ *
10+ * ------------------------------------------------------------------------
11+ */
12+
113#include "copy_stmt_hooking.h"
14+ #include "init.h"
215#include "relation_info.h"
316
17+ #include "access/htup_details.h"
18+ #include "access/sysattr.h"
19+ #include "access/xact.h"
420#include "catalog/namespace.h"
21+ #include "catalog/pg_attribute.h"
522#include "commands/copy.h"
23+ #include "commands/trigger.h"
24+ #include "executor/executor.h"
25+ #include "miscadmin.h"
26+ #include "nodes/makefuncs.h"
27+ #include "utils/builtins.h"
28+ #include "utils/lsyscache.h"
29+ #include "utils/rel.h"
30+ #include "utils/rls.h"
31+
32+ #include "libpq/libpq.h"
633
734
835/*
@@ -14,6 +41,12 @@ is_pathman_related_copy(Node *parsetree)
1441 CopyStmt * copy_stmt = (CopyStmt * ) parsetree ;
1542 Oid partitioned_table ;
1643
44+ if (!IsOverrideCopyEnabled ())
45+ {
46+ elog (DEBUG1 , "COPY statement hooking is disabled" );
47+ return false;
48+ }
49+
1750 /* Check that it's a CopyStmt */
1851 if (!IsA (parsetree , CopyStmt ))
1952 return false;
@@ -23,11 +56,266 @@ is_pathman_related_copy(Node *parsetree)
2356 return false;
2457
2558 /* TODO: select appropriate lock for COPY */
26- partitioned_table = RangeVarGetRelid (copy_stmt -> relation , NoLock , false);
59+ partitioned_table = RangeVarGetRelid (copy_stmt -> relation ,
60+ (copy_stmt -> is_from ?
61+ RowExclusiveLock :
62+ AccessShareLock ),
63+ false);
2764
2865 /* Check that relation is partitioned */
2966 if (get_pathman_relation_info (partitioned_table ))
67+ {
68+ elog (DEBUG1 , "Overriding default behavior for COPY (%u)" , partitioned_table );
3069 return true;
70+ }
3171
3272 return false;
3373}
74+
75+ /*
76+ * CopyGetAttnums - build an integer list of attnums to be copied
77+ *
78+ * The input attnamelist is either the user-specified column list,
79+ * or NIL if there was none (in which case we want all the non-dropped
80+ * columns).
81+ *
82+ * rel can be NULL ... it's only used for error reports.
83+ */
84+ static List *
85+ CopyGetAttnums (TupleDesc tupDesc , Relation rel , List * attnamelist )
86+ {
87+ List * attnums = NIL ;
88+
89+ if (attnamelist == NIL )
90+ {
91+ /* Generate default column list */
92+ Form_pg_attribute * attr = tupDesc -> attrs ;
93+ int attr_count = tupDesc -> natts ;
94+ int i ;
95+
96+ for (i = 0 ; i < attr_count ; i ++ )
97+ {
98+ if (attr [i ]-> attisdropped )
99+ continue ;
100+ attnums = lappend_int (attnums , i + 1 );
101+ }
102+ }
103+ else
104+ {
105+ /* Validate the user-supplied list and extract attnums */
106+ ListCell * l ;
107+
108+ foreach (l , attnamelist )
109+ {
110+ char * name = strVal (lfirst (l ));
111+ int attnum ;
112+ int i ;
113+
114+ /* Lookup column name */
115+ attnum = InvalidAttrNumber ;
116+ for (i = 0 ; i < tupDesc -> natts ; i ++ )
117+ {
118+ if (tupDesc -> attrs [i ]-> attisdropped )
119+ continue ;
120+ if (namestrcmp (& (tupDesc -> attrs [i ]-> attname ), name ) == 0 )
121+ {
122+ attnum = tupDesc -> attrs [i ]-> attnum ;
123+ break ;
124+ }
125+ }
126+ if (attnum == InvalidAttrNumber )
127+ {
128+ if (rel != NULL )
129+ ereport (ERROR ,
130+ (errcode (ERRCODE_UNDEFINED_COLUMN ),
131+ errmsg ("column \"%s\" of relation \"%s\" does not exist" ,
132+ name , RelationGetRelationName (rel ))));
133+ else
134+ ereport (ERROR ,
135+ (errcode (ERRCODE_UNDEFINED_COLUMN ),
136+ errmsg ("column \"%s\" does not exist" ,
137+ name )));
138+ }
139+ /* Check for duplicates */
140+ if (list_member_int (attnums , attnum ))
141+ ereport (ERROR ,
142+ (errcode (ERRCODE_DUPLICATE_COLUMN ),
143+ errmsg ("column \"%s\" specified more than once" ,
144+ name )));
145+ attnums = lappend_int (attnums , attnum );
146+ }
147+ }
148+
149+ return attnums ;
150+ }
151+
152+ /*
153+ * Execute COPY TO/FROM statement for a partitioned table.
154+ * NOTE: based on DoCopy() (see copy.c).
155+ */
156+ void
157+ PathmanDoCopy (const CopyStmt * stmt , const char * queryString , uint64 * processed )
158+ {
159+ CopyState cstate ;
160+ bool is_from = stmt -> is_from ;
161+ bool pipe = (stmt -> filename == NULL );
162+ Relation rel ;
163+ Oid relid ;
164+ Node * query = NULL ;
165+ List * range_table = NIL ;
166+
167+ /* Disallow COPY TO/FROM file or program except to superusers. */
168+ if (!pipe && !superuser ())
169+ {
170+ if (stmt -> is_program )
171+ ereport (ERROR ,
172+ (errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
173+ errmsg ("must be superuser to COPY to or from an external program" ),
174+ errhint ("Anyone can COPY to stdout or from stdin. "
175+ "psql's \\copy command also works for anyone." )));
176+ else
177+ ereport (ERROR ,
178+ (errcode (ERRCODE_INSUFFICIENT_PRIVILEGE ),
179+ errmsg ("must be superuser to COPY to or from a file" ),
180+ errhint ("Anyone can COPY to stdout or from stdin. "
181+ "psql's \\copy command also works for anyone." )));
182+ }
183+
184+ if (stmt -> relation )
185+ {
186+ TupleDesc tupDesc ;
187+ AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT );
188+ List * attnums ;
189+ ListCell * cur ;
190+ RangeTblEntry * rte ;
191+
192+ Assert (!stmt -> query );
193+
194+ /* Open the relation (we've locked it in is_pathman_related_copy()) */
195+ rel = heap_openrv (stmt -> relation , NoLock );
196+
197+ relid = RelationGetRelid (rel );
198+
199+ rte = makeNode (RangeTblEntry );
200+ rte -> rtekind = RTE_RELATION ;
201+ rte -> relid = RelationGetRelid (rel );
202+ rte -> relkind = rel -> rd_rel -> relkind ;
203+ rte -> requiredPerms = required_access ;
204+ range_table = list_make1 (rte );
205+
206+ tupDesc = RelationGetDescr (rel );
207+ attnums = CopyGetAttnums (tupDesc , rel , stmt -> attlist );
208+ foreach (cur , attnums )
209+ {
210+ int attno = lfirst_int (cur ) - FirstLowInvalidHeapAttributeNumber ;
211+
212+ if (is_from )
213+ rte -> insertedCols = bms_add_member (rte -> insertedCols , attno );
214+ else
215+ rte -> selectedCols = bms_add_member (rte -> selectedCols , attno );
216+ }
217+ ExecCheckRTPerms (range_table , true);
218+
219+ /*
220+ * We should perform a query instead of low-level heap scan whenever:
221+ * a) table has a RLS policy;
222+ * b) table is partitioned & it's COPY FROM.
223+ */
224+ if (check_enable_rls (rte -> relid , InvalidOid , false) == RLS_ENABLED ||
225+ is_from == false) /* rewrite COPY table TO statements */
226+ {
227+ SelectStmt * select ;
228+ ColumnRef * cr ;
229+ ResTarget * target ;
230+ RangeVar * from ;
231+
232+ if (is_from )
233+ ereport (ERROR ,
234+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
235+ errmsg ("COPY FROM not supported with row-level security" ),
236+ errhint ("Use INSERT statements instead." )));
237+
238+ /* Build target list */
239+ cr = makeNode (ColumnRef );
240+
241+ if (!stmt -> attlist )
242+ cr -> fields = list_make1 (makeNode (A_Star ));
243+ else
244+ cr -> fields = stmt -> attlist ;
245+
246+ cr -> location = 1 ;
247+
248+ target = makeNode (ResTarget );
249+ target -> name = NULL ;
250+ target -> indirection = NIL ;
251+ target -> val = (Node * ) cr ;
252+ target -> location = 1 ;
253+
254+ /*
255+ * Build RangeVar for from clause, fully qualified based on the
256+ * relation which we have opened and locked.
257+ */
258+ from = makeRangeVar (get_namespace_name (RelationGetNamespace (rel )),
259+ RelationGetRelationName (rel ), -1 );
260+
261+ /* Build query */
262+ select = makeNode (SelectStmt );
263+ select -> targetList = list_make1 (target );
264+ select -> fromClause = list_make1 (from );
265+
266+ query = (Node * ) select ;
267+
268+ /*
269+ * Close the relation for now, but keep the lock on it to prevent
270+ * changes between now and when we start the query-based COPY.
271+ *
272+ * We'll reopen it later as part of the query-based COPY.
273+ */
274+ heap_close (rel , NoLock );
275+ rel = NULL ;
276+ }
277+ }
278+ else
279+ {
280+ Assert (stmt -> query );
281+
282+ query = stmt -> query ;
283+ relid = InvalidOid ;
284+ rel = NULL ;
285+ }
286+
287+ /* COPY ... FROM ... */
288+ if (is_from )
289+ {
290+ /* There should be relation */
291+ Assert (rel );
292+
293+ /* check read-only transaction and parallel mode */
294+ if (XactReadOnly && !rel -> rd_islocaltemp )
295+ PreventCommandIfReadOnly ("PATHMAN COPY FROM" );
296+ PreventCommandIfParallelMode ("PATHMAN COPY FROM" );
297+
298+ cstate = BeginCopyFrom (rel , stmt -> filename , stmt -> is_program ,
299+ stmt -> attlist , stmt -> options );
300+ /* TODO: copy files to DB */
301+ heap_close (rel , NoLock );
302+ * processed = 0 ;
303+ EndCopyFrom (cstate );
304+ }
305+ /* COPY ... TO ... */
306+ else
307+ {
308+ CopyStmt modified_copy_stmt ;
309+
310+ /* We should've created a query */
311+ Assert (query );
312+
313+ /* Copy 'stmt' and override some of the fields */
314+ modified_copy_stmt = * stmt ;
315+ modified_copy_stmt .relation = NULL ;
316+ modified_copy_stmt .query = query ;
317+
318+ /* Call standard DoCopy using a new CopyStmt */
319+ DoCopy (& modified_copy_stmt , queryString , processed );
320+ }
321+ }
0 commit comments