1818#include "pathman_workers.h"
1919#include "relation_info.h"
2020#include "utils.h"
21+ #include "xact_handling.h"
2122
2223#include "access/htup_details.h"
2324#include "access/xact.h"
3132#include "storage/latch.h"
3233#include "utils/builtins.h"
3334#include "utils/datum.h"
35+ #include "utils/memutils.h"
3436#include "utils/lsyscache.h"
3537#include "utils/typcache.h"
3638#include "utils/resowner.h"
@@ -351,6 +353,17 @@ bgw_main_spawn_partitions(Datum main_arg)
351353 DebugPrintDatum (value , args -> value_type ), MyProcPid );
352354#endif
353355
356+ /* Check again if there's a conflicting lock */
357+ if (xact_conflicting_lock_exists (args -> partitioned_table ))
358+ {
359+ elog (LOG , "%s: there's a conflicting lock on relation \"%s\"" ,
360+ spawn_partitions_bgw ,
361+ get_rel_name_or_relid (args -> partitioned_table ));
362+
363+ dsm_detach (segment );
364+ return ; /* exit quickly */
365+ }
366+
354367 /* Create partitions and save the Oid of the last one */
355368 args -> result = create_partitions_internal (args -> partitioned_table ,
356369 value , /* unpacked Datum */
@@ -378,45 +391,51 @@ bgw_main_spawn_partitions(Datum main_arg)
378391static void
379392bgw_main_concurrent_part (Datum main_arg )
380393{
381- ConcurrentPartSlot * args ;
382- Oid types [2 ] = { OIDOID , INT4OID };
383- Datum vals [2 ];
384- bool nulls [2 ] = { false, false };
385394 int rows ;
386- int slot_idx = DatumGetInt32 (main_arg );
387- MemoryContext worker_context = CurrentMemoryContext ;
388- int failures_count = 0 ;
389395 bool failed ;
396+ int failures_count = 0 ;
390397 char * sql = NULL ;
391-
392- /* Create resource owner */
393- CurrentResourceOwner = ResourceOwnerCreate (NULL , "PartitionDataWorker" );
394-
395- args = & concurrent_part_slots [slot_idx ];
396- args -> pid = MyProcPid ;
397- vals [0 ] = args -> relid ;
398- vals [1 ] = 10000 ;
398+ ConcurrentPartSlot * part_slot ;
399399
400400 /* Establish signal handlers before unblocking signals. */
401401 pqsignal (SIGTERM , handle_sigterm );
402402
403403 /* We're now ready to receive signals */
404404 BackgroundWorkerUnblockSignals ();
405405
406+ /* Create resource owner */
407+ CurrentResourceOwner = ResourceOwnerCreate (NULL , concurrent_part_bgw );
408+
409+ /* Update concurrent part slot */
410+ part_slot = & concurrent_part_slots [DatumGetInt32 (main_arg )];
411+ part_slot -> pid = MyProcPid ;
412+
406413 /* Establish connection and start transaction */
407- BackgroundWorkerInitializeConnectionByOid (args -> dbid , InvalidOid );
414+ BackgroundWorkerInitializeConnectionByOid (part_slot -> dbid , part_slot -> userid );
408415
416+ /* Initialize pg_pathman's local config */
417+ StartTransactionCommand ();
418+ bg_worker_load_config (concurrent_part_bgw );
419+ CommitTransactionCommand ();
420+
421+ /* Do the job */
409422 do
410423 {
424+ Oid types [2 ] = { OIDOID , INT4OID };
425+ Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
426+ bool nulls [2 ] = { false, false };
427+
428+ /* Reset loop variables */
411429 failed = false;
412430 rows = 0 ;
431+
432+ /* Start new transaction (syscache access etc.) */
413433 StartTransactionCommand ();
414- bg_worker_load_config ("PartitionDataWorker" );
415434
416435 SPI_connect ();
417436 PushActiveSnapshot (GetTransactionSnapshot ());
418437
419- /* Do some preparation within the first iteration */
438+ /* Prepare the query if needed */
420439 if (sql == NULL )
421440 {
422441 MemoryContext oldcontext ;
@@ -425,78 +444,104 @@ bgw_main_concurrent_part(Datum main_arg)
425444 * Allocate as SQL query in top memory context because current
426445 * context will be destroyed after transaction finishes
427446 */
428- oldcontext = MemoryContextSwitchTo (worker_context );
447+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
429448 sql = psprintf ("SELECT %s._partition_data_concurrent($1::oid, p_limit:=$2)" ,
430- get_namespace_name (get_pathman_schema ()));
449+ get_namespace_name (get_pathman_schema ()));
431450 MemoryContextSwitchTo (oldcontext );
432451 }
433452
453+ /* Exec ret = _partition_data_concurrent() */
434454 PG_TRY ();
435455 {
436456 int ret ;
437457 bool isnull ;
438458
439459 ret = SPI_execute_with_args (sql , 2 , types , vals , nulls , false, 0 );
440- if (ret > 0 )
460+ if (ret == SPI_OK_SELECT )
441461 {
442462 TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
443463 HeapTuple tuple = SPI_tuptable -> vals [0 ];
444464
445- Assert (SPI_processed == 1 );
465+ Assert (SPI_processed == 1 ); /* there should be 1 result at most */
446466
447467 rows = DatumGetInt32 (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
468+
469+ Assert (!isnull ); /* ... and ofc it must not be NULL */
448470 }
449471 }
450472 PG_CATCH ();
451473 {
452474 ErrorData * error ;
475+
453476 EmitErrorReport ();
477+
454478 error = CopyErrorData ();
455- elog (LOG , "Worker error : %s" , error -> message );
479+ elog (LOG , "%s : %s" , concurrent_part_bgw , error -> message );
456480 FlushErrorState ();
481+ FreeErrorData (error );
457482
458483 /*
459484 * The most common exception we can catch here is a deadlock with
460485 * concurrent user queries. Check that attempts count doesn't exceed
461486 * some reasonable value
462487 */
463- if (100 <= failures_count ++ )
488+ if (failures_count ++ > PART_WORKER_MAX_ATTEMPTS )
464489 {
465- pfree (sql );
466- args -> worker_status = WS_FREE ;
490+ /* Mark slot as FREE */
491+ part_slot -> worker_status = WS_FREE ;
492+
467493 elog (LOG ,
468- "The concurrent partitioning worker exiting because the "
469- "maximum attempts count exceeded. See the error message below" );
470- exit (1 );
494+ "Concurrent partitioning worker has canceled the task because "
495+ "maximum amount of attempts (%d) had been exceeded. "
496+ "See the error message below" ,
497+ PART_WORKER_MAX_ATTEMPTS );
498+
499+ return ; /* exit quickly */
471500 }
501+
502+ /* Set 'failed' flag */
472503 failed = true;
473504 }
474505 PG_END_TRY ();
475506
476507 SPI_finish ();
477508 PopActiveSnapshot ();
509+
478510 if (failed )
479511 {
480- /* abort transaction and sleep for a second */
512+ #ifdef USE_ASSERT_CHECKING
513+ elog (DEBUG2 , "%s: could not relocate batch, total: %lu [%u]" ,
514+ concurrent_part_bgw , part_slot -> total_rows , MyProcPid );
515+ #endif
516+
517+ /* Abort transaction and sleep for a second */
481518 AbortCurrentTransaction ();
482- DirectFunctionCall1 (pg_sleep , Float8GetDatum (1 ));
519+ DirectFunctionCall1 (pg_sleep , Float8GetDatum (part_slot -> sleep_time ));
483520 }
484521 else
485522 {
486- /* Reset failures counter and commit transaction */
523+ /* Commit transaction and reset 'failures_count' */
487524 CommitTransactionCommand ();
488525 failures_count = 0 ;
489- args -> total_rows += rows ;
526+
527+ /* Add rows to total_rows */
528+ part_slot -> total_rows += rows ;
529+
530+ #ifdef USE_ASSERT_CHECKING
531+ elog (DEBUG2 , "%s: relocated %d rows, total: %lu [%u]" ,
532+ concurrent_part_bgw , rows , part_slot -> total_rows , MyProcPid );
533+ #endif
490534 }
491535
492- /* If other backend requested to stop worker then quit */
493- if (args -> worker_status == WS_STOPPING )
536+ /* If other backend requested to stop us, quit */
537+ if (part_slot -> worker_status == WS_STOPPING )
494538 break ;
495539 }
496- while (rows > 0 || failed ); /* do while there is still rows to relocate */
540+ while (rows > 0 || failed ); /* do while there's still rows to be relocated */
497541
542+ /* Reclaim the resources */
498543 pfree (sql );
499- args -> worker_status = WS_FREE ;
544+ part_slot -> worker_status = WS_FREE ;
500545}
501546
502547
@@ -513,6 +558,8 @@ bgw_main_concurrent_part(Datum main_arg)
513558Datum
514559partition_table_concurrently (PG_FUNCTION_ARGS )
515560{
561+ #define tostr (str ) ( #str )
562+
516563 Oid relid = PG_GETARG_OID (0 );
517564 ConcurrentPartSlot * my_slot = NULL ;
518565 int empty_slot_idx = -1 ;
@@ -550,7 +597,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
550597 elog (ERROR , "No empty worker slots found" );
551598
552599 /* Initialize concurrent part slot */
553- InitConcurrentPartSlot (my_slot , WS_WORKING , MyDatabaseId , relid );
600+ InitConcurrentPartSlot (my_slot , GetAuthenticatedUserId (),
601+ WS_WORKING , MyDatabaseId , relid ,
602+ 1000 , 1.0 );
554603
555604 /* Start worker (we should not wait) */
556605 start_bg_worker (concurrent_part_bgw ,
@@ -560,8 +609,9 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
560609
561610 /* Tell user everything's fine */
562611 elog (NOTICE ,
563- "Worker started. You can stop it with the following command: "
564- "select stop_concurrent_part_task('%s');" ,
612+ "Worker started. You can stop it "
613+ "with the following command: select %s('%s');" ,
614+ tostr (stop_concurrent_part_task ), /* convert function's name to literal */
565615 get_rel_name (relid ));
566616
567617 PG_RETURN_VOID ();
@@ -594,12 +644,20 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
594644 userctx -> cur_idx = 0 ;
595645
596646 /* Create tuple descriptor */
597- tupdesc = CreateTemplateTupleDesc (5 , false);
598- TupleDescInitEntry (tupdesc , (AttrNumber ) 1 , "pid" , INT4OID , -1 , 0 );
599- TupleDescInitEntry (tupdesc , (AttrNumber ) 2 , "dbid" , OIDOID , -1 , 0 );
600- TupleDescInitEntry (tupdesc , (AttrNumber ) 3 , "relid" , REGCLASSOID , -1 , 0 );
601- TupleDescInitEntry (tupdesc , (AttrNumber ) 4 , "processed" , INT4OID , -1 , 0 );
602- TupleDescInitEntry (tupdesc , (AttrNumber ) 5 , "status" , TEXTOID , -1 , 0 );
647+ tupdesc = CreateTemplateTupleDesc (Natts_pathman_cp_tasks , false);
648+
649+ TupleDescInitEntry (tupdesc , Anum_pathman_cp_tasks_userid ,
650+ "userid" , REGROLEOID , -1 , 0 );
651+ TupleDescInitEntry (tupdesc , Anum_pathman_cp_tasks_pid ,
652+ "pid" , INT4OID , -1 , 0 );
653+ TupleDescInitEntry (tupdesc , Anum_pathman_cp_tasks_dbid ,
654+ "dbid" , OIDOID , -1 , 0 );
655+ TupleDescInitEntry (tupdesc , Anum_pathman_cp_tasks_relid ,
656+ "relid" , REGCLASSOID , -1 , 0 );
657+ TupleDescInitEntry (tupdesc , Anum_pathman_cp_tasks_processed ,
658+ "processed" , INT4OID , -1 , 0 );
659+ TupleDescInitEntry (tupdesc , Anum_pathman_cp_tasks_status ,
660+ "status" , TEXTOID , -1 , 0 );
603661
604662 funcctx -> tuple_desc = BlessTupleDesc (tupdesc );
605663 funcctx -> user_fctx = (void * ) userctx ;
@@ -610,35 +668,39 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
610668 funcctx = SRF_PERCALL_SETUP ();
611669 userctx = (active_workers_cxt * ) funcctx -> user_fctx ;
612670
613- /*
614- * Iterate through worker slots
615- */
671+ /* Iterate through worker slots */
616672 for (i = userctx -> cur_idx ; i < PART_WORKER_SLOTS ; i ++ )
617673 {
618- if (concurrent_part_slots [i ].worker_status != WS_FREE )
674+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
675+
676+ if (cur_slot -> worker_status != WS_FREE )
619677 {
620678 HeapTuple tuple ;
621- Datum values [5 ];
622- bool isnull [5 ] = { false, false, false, false, false };
679+ Datum values [Natts_pathman_cp_tasks ];
680+ bool isnull [Natts_pathman_cp_tasks ] = { 0 , 0 , 0 , 0 , 0 , 0 };
623681
624- values [0 ] = concurrent_part_slots [i ].pid ;
625- values [1 ] = concurrent_part_slots [i ].dbid ;
626- values [2 ] = concurrent_part_slots [i ].relid ;
627- values [3 ] = concurrent_part_slots [i ].total_rows ;
682+ values [Anum_pathman_cp_tasks_userid - 1 ] = cur_slot -> userid ;
683+ values [Anum_pathman_cp_tasks_pid - 1 ] = cur_slot -> pid ;
684+ values [Anum_pathman_cp_tasks_dbid - 1 ] = cur_slot -> dbid ;
685+ values [Anum_pathman_cp_tasks_relid - 1 ] = cur_slot -> relid ;
686+ values [Anum_pathman_cp_tasks_processed - 1 ] = cur_slot -> total_rows ;
628687
629688 /* Now build a status string */
630- switch (concurrent_part_slots [ i ]. worker_status )
689+ switch (cur_slot -> worker_status )
631690 {
632691 case WS_WORKING :
633- values [4 ] = PointerGetDatum (pstrdup ("working" ));
692+ values [Anum_pathman_cp_tasks_status - 1 ] =
693+ PointerGetDatum (cstring_to_text ("working" ));
634694 break ;
635695
636696 case WS_STOPPING :
637- values [4 ] = PointerGetDatum (pstrdup ("stopping" ));
697+ values [Anum_pathman_cp_tasks_status - 1 ] =
698+ PointerGetDatum (cstring_to_text ("stopping" ));
638699 break ;
639700
640701 default :
641- values [4 ] = PointerGetDatum (pstrdup ("[unknown]" ));
702+ values [Anum_pathman_cp_tasks_status - 1 ] =
703+ PointerGetDatum (cstring_to_text ("[unknown]" ));
642704 }
643705
644706 /* Form output tuple */
@@ -670,7 +732,7 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
670732 concurrent_part_slots [i ].dbid == MyDatabaseId )
671733 {
672734 concurrent_part_slots [i ].worker_status = WS_STOPPING ;
673- elog (NOTICE , "Worker will stop after current batch's finished " );
735+ elog (NOTICE , "Worker will stop after it finishes current batch" );
674736
675737 PG_RETURN_BOOL (true);
676738 }
0 commit comments