1212
1313#include "copy_stmt_hooking.h"
1414#include "init.h"
15+ #include "partition_filter.h"
1516#include "relation_info.h"
1617
1718#include "access/htup_details.h"
2627#include "nodes/makefuncs.h"
2728#include "utils/builtins.h"
2829#include "utils/lsyscache.h"
30+ #include "utils/memutils.h"
2931#include "utils/rel.h"
3032#include "utils/rls.h"
3133
3234#include "libpq/libpq.h"
3335
3436
37+ static uint64 PathmanCopyFrom (CopyState cstate ,
38+ Relation parent_rel ,
39+ List * range_table ,
40+ bool old_protocol );
41+ static ResultRelInfoHolder * select_partition_for_copy (const PartRelationInfo * prel ,
42+ ResultPartsStorage * parts_storage ,
43+ Datum value , EState * estate );
44+
45+
3546/*
3647 * Is pg_pathman supposed to handle this COPY stmt?
3748 */
@@ -283,6 +294,11 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
283294 /* COPY ... FROM ... */
284295 if (is_from )
285296 {
297+ bool is_old_protocol ;
298+
299+ is_old_protocol = PG_PROTOCOL_MAJOR (FrontendProtocol ) < 3 &&
300+ stmt -> filename == NULL ;
301+
286302 /* There should be relation */
287303 Assert (rel );
288304
@@ -293,9 +309,7 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
293309
294310 cstate = BeginCopyFrom (rel , stmt -> filename , stmt -> is_program ,
295311 stmt -> attlist , stmt -> options );
296- /* TODO: copy files to DB */
297- heap_close (rel , NoLock );
298- * processed = 0 ;
312+ * processed = PathmanCopyFrom (cstate , rel , range_table , is_old_protocol );
299313 EndCopyFrom (cstate );
300314 }
301315 /* COPY ... TO ... */
@@ -314,4 +328,233 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
314328 /* Call standard DoCopy using a new CopyStmt */
315329 DoCopy (& modified_copy_stmt , queryString , processed );
316330 }
331+
332+ /*
333+ * Close the relation. If reading, we can release the AccessShareLock we
334+ * got; if writing, we should hold the lock until end of transaction to
335+ * ensure that updates will be committed before lock is released.
336+ */
337+ if (rel != NULL )
338+ heap_close (rel , (is_from ? NoLock : AccessShareLock ));
339+ }
340+
341+ /*
342+ * Copy FROM file to relation.
343+ */
344+ static uint64
345+ PathmanCopyFrom (CopyState cstate , Relation parent_rel ,
346+ List * range_table , bool old_protocol )
347+ {
348+ HeapTuple tuple ;
349+ TupleDesc tupDesc ;
350+ Datum * values ;
351+ bool * nulls ;
352+
353+ ResultPartsStorage parts_storage ;
354+ ResultRelInfo * parent_result_rel ;
355+
356+ EState * estate = CreateExecutorState (); /* for ExecConstraints() */
357+ ExprContext * econtext ;
358+ TupleTableSlot * myslot ;
359+ MemoryContext oldcontext = CurrentMemoryContext ;
360+
361+ uint64 processed = 0 ;
362+
363+
364+ tupDesc = RelationGetDescr (parent_rel );
365+
366+ parent_result_rel = makeNode (ResultRelInfo );
367+ InitResultRelInfo (parent_result_rel ,
368+ parent_rel ,
369+ 1 , /* dummy rangetable index */
370+ 0 );
371+ ExecOpenIndices (parent_result_rel , false);
372+
373+ estate -> es_result_relations = parent_result_rel ;
374+ estate -> es_num_result_relations = 1 ;
375+ estate -> es_result_relation_info = parent_result_rel ;
376+ estate -> es_range_table = range_table ;
377+
378+ /* Initialize ResultPartsStorage */
379+ init_result_parts_storage (& parts_storage , estate , false,
380+ ResultPartsStorageStandard ,
381+ check_acl_for_partition , NULL );
382+ parts_storage .saved_rel_info = parent_result_rel ;
383+
384+ /* Set up a tuple slot too */
385+ myslot = ExecInitExtraTupleSlot (estate );
386+ ExecSetSlotDescriptor (myslot , tupDesc );
387+ /* Triggers might need a slot as well */
388+ estate -> es_trig_tuple_slot = ExecInitExtraTupleSlot (estate );
389+
390+ /* Prepare to catch AFTER triggers. */
391+ AfterTriggerBeginQuery ();
392+
393+ /*
394+ * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
395+ * should do this for COPY, since it's not really an "INSERT" statement as
396+ * such. However, executing these triggers maintains consistency with the
397+ * EACH ROW triggers that we already fire on COPY.
398+ */
399+ ExecBSInsertTriggers (estate , parent_result_rel );
400+
401+ values = (Datum * ) palloc (tupDesc -> natts * sizeof (Datum ));
402+ nulls = (bool * ) palloc (tupDesc -> natts * sizeof (bool ));
403+
404+ econtext = GetPerTupleExprContext (estate );
405+
406+ for (;;)
407+ {
408+ TupleTableSlot * slot ;
409+ bool skip_tuple ;
410+ Oid tuple_oid = InvalidOid ;
411+
412+ const PartRelationInfo * prel ;
413+ ResultRelInfoHolder * rri_holder_child ;
414+ ResultRelInfo * child_result_rel ;
415+
416+ CHECK_FOR_INTERRUPTS ();
417+
418+ ResetPerTupleExprContext (estate );
419+
420+ /* Fetch PartRelationInfo for parent relation */
421+ prel = get_pathman_relation_info (RelationGetRelid (parent_rel ));
422+
423+ /* Switch into its memory context */
424+ MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
425+
426+ if (!NextCopyFrom (cstate , econtext , values , nulls , & tuple_oid ))
427+ break ;
428+
429+ /* Search for a matching partition */
430+ rri_holder_child = select_partition_for_copy (prel , & parts_storage ,
431+ values [prel -> attnum - 1 ],
432+ estate );
433+ child_result_rel = rri_holder_child -> result_rel_info ;
434+ estate -> es_result_relation_info = child_result_rel ;
435+
436+ /* And now we can form the input tuple. */
437+ tuple = heap_form_tuple (tupDesc , values , nulls );
438+ if (tuple_oid != InvalidOid )
439+ HeapTupleSetOid (tuple , tuple_oid );
440+
441+ /*
442+ * Constraints might reference the tableoid column, so initialize
443+ * t_tableOid before evaluating them.
444+ */
445+ tuple -> t_tableOid = RelationGetRelid (child_result_rel -> ri_RelationDesc );
446+
447+ /* Triggers and stuff need to be invoked in query context. */
448+ MemoryContextSwitchTo (oldcontext );
449+
450+ /* Place tuple in tuple slot --- but slot shouldn't free it */
451+ slot = myslot ;
452+ ExecStoreTuple (tuple , slot , InvalidBuffer , false);
453+
454+ skip_tuple = false;
455+
456+ /* BEFORE ROW INSERT Triggers */
457+ if (child_result_rel -> ri_TrigDesc &&
458+ child_result_rel -> ri_TrigDesc -> trig_insert_before_row )
459+ {
460+ slot = ExecBRInsertTriggers (estate , child_result_rel , slot );
461+
462+ if (slot == NULL ) /* "do nothing" */
463+ skip_tuple = true;
464+ else /* trigger might have changed tuple */
465+ tuple = ExecMaterializeSlot (slot );
466+ }
467+
468+ /* Proceed if we still have a tuple */
469+ if (!skip_tuple )
470+ {
471+ List * recheckIndexes = NIL ;
472+
473+ /* Check the constraints of the tuple */
474+ if (child_result_rel -> ri_RelationDesc -> rd_att -> constr )
475+ ExecConstraints (child_result_rel , slot , estate );
476+
477+ /* OK, store the tuple and create index entries for it */
478+ simple_heap_insert (child_result_rel -> ri_RelationDesc , tuple );
479+
480+ if (child_result_rel -> ri_NumIndices > 0 )
481+ recheckIndexes = ExecInsertIndexTuples (slot , & (tuple -> t_self ),
482+ estate , false, NULL ,
483+ NIL );
484+
485+ /* AFTER ROW INSERT Triggers */
486+ ExecARInsertTriggers (estate , child_result_rel , tuple ,
487+ recheckIndexes );
488+
489+ list_free (recheckIndexes );
490+
491+ /*
492+ * We count only tuples not suppressed by a BEFORE INSERT trigger;
493+ * this is the same definition used by execMain.c for counting
494+ * tuples inserted by an INSERT command.
495+ */
496+ processed ++ ;
497+ }
498+ }
499+
500+ MemoryContextSwitchTo (oldcontext );
501+
502+ /*
503+ * In the old protocol, tell pqcomm that we can process normal protocol
504+ * messages again.
505+ */
506+ if (old_protocol )
507+ pq_endmsgread ();
508+
509+ /* Execute AFTER STATEMENT insertion triggers */
510+ ExecASInsertTriggers (estate , parent_result_rel );
511+
512+ /* Handle queued AFTER triggers */
513+ AfterTriggerEndQuery (estate );
514+
515+ pfree (values );
516+ pfree (nulls );
517+
518+ ExecResetTupleTable (estate -> es_tupleTable , false);
519+ fini_result_parts_storage (& parts_storage );
520+
521+ FreeExecutorState (estate );
522+
523+ return processed ;
524+ }
525+
526+ /*
527+ * Smart wrapper for scan_result_parts_storage().
528+ */
529+ static ResultRelInfoHolder *
530+ select_partition_for_copy (const PartRelationInfo * prel ,
531+ ResultPartsStorage * parts_storage ,
532+ Datum value , EState * estate )
533+ {
534+ ExprContext * econtext ;
535+ ResultRelInfoHolder * rri_holder ;
536+ Oid selected_partid = InvalidOid ;
537+ Oid * parts ;
538+ int nparts ;
539+
540+ econtext = GetPerTupleExprContext (estate );
541+
542+ /* Search for matching partitions using partitioned column */
543+ parts = find_partitions_for_value (value , prel , econtext , & nparts );
544+
545+ if (nparts > 1 )
546+ elog (ERROR , "PATHMAN COPY selected more than one partition" );
547+ else if (nparts == 0 )
548+ elog (ERROR ,
549+ "There is no suitable partition for key '%s'" ,
550+ datum_to_cstring (value , prel -> atttype ));
551+ else
552+ selected_partid = parts [0 ];
553+
554+ /* Replace parent table with a suitable partition */
555+ MemoryContextSwitchTo (estate -> es_query_cxt );
556+ rri_holder = scan_result_parts_storage (selected_partid , parts_storage );
557+ MemoryContextSwitchTo (GetPerTupleMemoryContext (estate ));
558+
559+ return rri_holder ;
317560}
0 commit comments