@@ -106,7 +106,7 @@ init_concurrent_part_task_slots(void)
106106 memset (concurrent_part_slots , 0 , size );
107107
108108 for (i = 0 ; i < PART_WORKER_SLOTS ; i ++ )
109- pg_atomic_init_flag_impl (& concurrent_part_slots [i ].slot_used );
109+ SpinLockInit (& concurrent_part_slots [i ].mutex );
110110 }
111111}
112112
@@ -235,10 +235,10 @@ start_bg_worker(const char bgworker_name[BGW_MAXLEN],
235235static dsm_segment *
236236create_partitions_bg_worker_segment (Oid relid , Datum value , Oid value_type )
237237{
238- TypeCacheEntry * typcache ;
239- Size datum_size ;
240- Size segment_size ;
241- dsm_segment * segment ;
238+ TypeCacheEntry * typcache ;
239+ Size datum_size ;
240+ Size segment_size ;
241+ dsm_segment * segment ;
242242 SpawnPartitionArgs * args ;
243243
244244 typcache = lookup_type_cache (value_type , 0 );
@@ -314,10 +314,10 @@ create_partitions_bg_worker(Oid relid, Datum value, Oid value_type)
314314static void
315315bgw_main_spawn_partitions (Datum main_arg )
316316{
317- dsm_handle handle = DatumGetUInt32 (main_arg );
318- dsm_segment * segment ;
319- SpawnPartitionArgs * args ;
320- Datum value ;
317+ dsm_handle handle = DatumGetUInt32 (main_arg );
318+ dsm_segment * segment ;
319+ SpawnPartitionArgs * args ;
320+ Datum value ;
321321
322322 /* Establish signal handlers before unblocking signals. */
323323 pqsignal (SIGTERM , handle_sigterm );
@@ -512,8 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512512 if (failures_count ++ >= PART_WORKER_MAX_ATTEMPTS )
513513 {
514514 /* Mark slot as FREE */
515- part_slot -> worker_status = WS_FREE ;
516- pg_atomic_clear_flag (& part_slot -> slot_used );
515+ cps_set_status (part_slot , WS_FREE );
517516
518517 elog (LOG ,
519518 "Concurrent partitioning worker has canceled the task because "
@@ -534,14 +533,6 @@ bgw_main_concurrent_part(Datum main_arg)
534533
535534 if (failed )
536535 {
537- #ifdef USE_ASSERT_CHECKING
538- elog (DEBUG1 , "%s: could not relocate batch (%d/%d), total: %lu [%u]" ,
539- concurrent_part_bgw ,
540- failures_count , PART_WORKER_MAX_ATTEMPTS , /* current/max */
541- part_slot -> total_rows ,
542- MyProcPid );
543- #endif
544-
545536 /* Abort transaction and sleep for a second */
546537 AbortCurrentTransaction ();
547538 DirectFunctionCall1 (pg_sleep , Float8GetDatum (part_slot -> sleep_time ));
@@ -553,26 +544,27 @@ bgw_main_concurrent_part(Datum main_arg)
553544 failures_count = 0 ;
554545
555546 /* Add rows to total_rows */
547+ SpinLockAcquire (& part_slot -> mutex );
556548 part_slot -> total_rows += rows ;
557-
549+ /* Report debug message */
558550#ifdef USE_ASSERT_CHECKING
559551 elog (DEBUG1 , "%s: relocated %d rows, total: %lu [%u]" ,
560552 concurrent_part_bgw , rows , part_slot -> total_rows , MyProcPid );
561553#endif
554+ SpinLockRelease (& part_slot -> mutex );
562555 }
563556
564557 /* If other backend requested to stop us, quit */
565- if (part_slot -> worker_status == WS_STOPPING )
558+ if (cps_check_status ( part_slot ) == WS_STOPPING )
566559 break ;
567560 }
568561 while (rows > 0 || failed ); /* do while there's still rows to be relocated */
569562
570563 /* Reclaim the resources */
571564 pfree (sql );
572565
573- /* Set slot free */
574- part_slot -> worker_status = WS_FREE ;
575- pg_atomic_clear_flag (& part_slot -> slot_used );
566+ /* Mark slot as FREE */
567+ cps_set_status (part_slot , WS_FREE );
576568}
577569
578570
@@ -589,12 +581,11 @@ bgw_main_concurrent_part(Datum main_arg)
589581Datum
590582partition_table_concurrently (PG_FUNCTION_ARGS )
591583{
592- #define tostr (str ) ( #str )
584+ #define tostr (str ) ( #str ) /* convert function's name to literal */
593585
594- Oid relid = PG_GETARG_OID (0 );
595- ConcurrentPartSlot * my_slot = NULL ;
596- int empty_slot_idx = -1 ;
597- int i ;
586+ Oid relid = PG_GETARG_OID (0 );
587+ int empty_slot_idx = -1 ;
588+ int i ;
598589
599590 /* Check if relation is a partitioned table */
600591 shout_if_prel_is_invalid (relid ,
@@ -607,38 +598,43 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
607598 */
608599 for (i = 0 ; i < PART_WORKER_SLOTS ; i ++ )
609600 {
610- /*
611- * Attempt to acquire the flag. If it has alread been used then skip
612- * this slot and try another one
613- */
614- if (!pg_atomic_test_set_flag (& concurrent_part_slots [i ].slot_used ))
615- continue ;
601+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
602+ bool keep_this_lock = false;
616603
617- /* If atomic flag wasn't used then status should be WS_FREE */
618- Assert (concurrent_part_slots [i ].worker_status == WS_FREE );
604+ SpinLockAcquire (& cur_slot -> mutex );
619605
620606 if (empty_slot_idx < 0 )
621607 {
622- my_slot = & concurrent_part_slots [i ];
623608 empty_slot_idx = i ;
609+ keep_this_lock = true;
624610 }
625611
626- if (concurrent_part_slots [ i ]. relid == relid &&
627- concurrent_part_slots [ i ]. dbid == MyDatabaseId )
612+ if (cur_slot -> relid == relid &&
613+ cur_slot -> dbid == MyDatabaseId )
628614 {
615+ if (empty_slot_idx >= 0 )
616+ SpinLockRelease (& cur_slot -> mutex );
617+
629618 elog (ERROR ,
630619 "Table \"%s\" is already being partitioned" ,
631620 get_rel_name (relid ));
632621 }
622+
623+ if (!keep_this_lock )
624+ SpinLockRelease (& cur_slot -> mutex );
633625 }
634626
635- if (my_slot == NULL )
627+ if (empty_slot_idx < 0 )
636628 elog (ERROR , "No empty worker slots found" );
629+ else
630+ {
631+ /* Initialize concurrent part slot */
632+ InitConcurrentPartSlot (& concurrent_part_slots [empty_slot_idx ],
633+ GetAuthenticatedUserId (), WS_WORKING ,
634+ MyDatabaseId , relid , 1000 , 1.0 );
637635
638- /* Initialize concurrent part slot */
639- InitConcurrentPartSlot (my_slot , GetAuthenticatedUserId (),
640- WS_WORKING , MyDatabaseId , relid ,
641- 1000 , 1.0 );
636+ SpinLockRelease (& concurrent_part_slots [empty_slot_idx ].mutex );
637+ }
642638
643639 /* Start worker (we should not wait) */
644640 start_bg_worker (concurrent_part_bgw ,
@@ -712,11 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
712708 {
713709 ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
714710
711+ SpinLockAcquire (& cur_slot -> mutex );
712+
715713 if (cur_slot -> worker_status != WS_FREE )
716714 {
717715 HeapTuple tuple ;
718716 Datum values [Natts_pathman_cp_tasks ];
719- bool isnull [Natts_pathman_cp_tasks ] = { 0 , 0 , 0 , 0 , 0 , 0 };
717+ bool isnull [Natts_pathman_cp_tasks ] = { 0 };
720718
721719 values [Anum_pathman_cp_tasks_userid - 1 ] = cur_slot -> userid ;
722720 values [Anum_pathman_cp_tasks_pid - 1 ] = cur_slot -> pid ;
@@ -750,6 +748,8 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
750748
751749 SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
752750 }
751+
752+ SpinLockRelease (& cur_slot -> mutex );
753753 }
754754
755755 SRF_RETURN_DONE (funcctx );
@@ -763,22 +763,35 @@ Datum
763763stop_concurrent_part_task (PG_FUNCTION_ARGS )
764764{
765765 Oid relid = PG_GETARG_OID (0 );
766+ bool worker_found = false;
766767 int i ;
767- ConcurrentPartSlot * slot ;
768768
769- for (i = 0 ; i < PART_WORKER_SLOTS ; i ++ )
770- slot = & concurrent_part_slots [i ];
769+ for (i = 0 ; i < PART_WORKER_SLOTS && !worker_found ; i ++ )
770+ {
771+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
772+
773+ SpinLockAcquire (& cur_slot -> mutex );
771774
772- if (slot -> worker_status != WS_FREE &&
773- slot -> relid == relid &&
774- slot -> dbid == MyDatabaseId )
775+ if (cur_slot -> worker_status != WS_FREE &&
776+ cur_slot -> relid == relid &&
777+ cur_slot -> dbid == MyDatabaseId )
775778 {
776- slot -> worker_status = WS_STOPPING ;
777779 elog (NOTICE , "Worker will stop after it finishes current batch" );
778780
779- PG_RETURN_BOOL (true);
781+ cur_slot -> worker_status = WS_STOPPING ;
782+ worker_found = true;
780783 }
781784
782- elog (ERROR , "Cannot find worker for relation \"%s\"" ,
783- get_rel_name_or_relid (relid ));
785+ SpinLockRelease (& cur_slot -> mutex );
786+ }
787+
788+ if (worker_found )
789+ PG_RETURN_BOOL (true);
790+ else
791+ {
792+ elog (ERROR , "Cannot find worker for relation \"%s\"" ,
793+ get_rel_name_or_relid (relid ));
794+
795+ PG_RETURN_BOOL (false); /* keep compiler happy */
796+ }
784797}
0 commit comments