@@ -95,12 +95,19 @@ init_concurrent_part_task_slots(void)
9595{
9696 bool found ;
9797 Size size = estimate_concurrent_part_task_slots_size ();
98+ int i ;
9899
99100 concurrent_part_slots = (ConcurrentPartSlot * )
100101 ShmemInitStruct ("array of ConcurrentPartSlots" , size , & found );
101102
102103 /* Initialize 'concurrent_part_slots' if needed */
103- if (!found ) memset (concurrent_part_slots , 0 , size );
104+ if (!found )
105+ {
106+ memset (concurrent_part_slots , 0 , size );
107+
108+ for (i = 0 ; i < PART_WORKER_SLOTS ; i ++ )
109+ pg_atomic_init_flag_impl (& concurrent_part_slots [i ].slot_used );
110+ }
104111}
105112
106113
@@ -423,9 +430,9 @@ bgw_main_concurrent_part(Datum main_arg)
423430 {
424431 MemoryContext old_mcxt ;
425432
426- Oid types [2 ] = { OIDOID , INT4OID };
427- Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
428- bool nulls [2 ] = { false, false };
433+ Oid types [2 ] = { OIDOID , INT4OID };
434+ Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
435+ bool nulls [2 ] = { false, false };
429436
430437 /* Reset loop variables */
431438 failed = false;
@@ -506,6 +513,7 @@ bgw_main_concurrent_part(Datum main_arg)
506513 {
507514 /* Mark slot as FREE */
508515 part_slot -> worker_status = WS_FREE ;
516+ pg_atomic_clear_flag (& part_slot -> slot_used );
509517
510518 elog (LOG ,
511519 "Concurrent partitioning worker has canceled the task because "
@@ -561,7 +569,10 @@ bgw_main_concurrent_part(Datum main_arg)
561569
562570 /* Reclaim the resources */
563571 pfree (sql );
572+
573+ /* Set slot free */
564574 part_slot -> worker_status = WS_FREE ;
575+ pg_atomic_clear_flag (& part_slot -> slot_used );
565576}
566577
567578
@@ -596,16 +607,24 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
596607 */
597608 for (i = 0 ; i < PART_WORKER_SLOTS ; i ++ )
598609 {
599- if (concurrent_part_slots [i ].worker_status == WS_FREE )
610+ /*
611+ * Attempt to acquire the flag. If it has alread been used then skip
612+ * this slot and try another one
613+ */
614+ if (!pg_atomic_test_set_flag (& concurrent_part_slots [i ].slot_used ))
615+ continue ;
616+
617+ /* If atomic flag wasn't used then status should be WS_FREE */
618+ Assert (concurrent_part_slots [i ].worker_status == WS_FREE );
619+
620+ if (empty_slot_idx < 0 )
600621 {
601- if (empty_slot_idx < 0 )
602- {
603- my_slot = & concurrent_part_slots [i ];
604- empty_slot_idx = i ;
605- }
622+ my_slot = & concurrent_part_slots [i ];
623+ empty_slot_idx = i ;
606624 }
607- else if (concurrent_part_slots [i ].relid == relid &&
608- concurrent_part_slots [i ].dbid == MyDatabaseId )
625+
626+ if (concurrent_part_slots [i ].relid == relid &&
627+ concurrent_part_slots [i ].dbid == MyDatabaseId )
609628 {
610629 elog (ERROR ,
611630 "Table \"%s\" is already being partitioned" ,
@@ -745,13 +764,16 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
745764{
746765 Oid relid = PG_GETARG_OID (0 );
747766 int i ;
767+ ConcurrentPartSlot * slot ;
748768
749769 for (i = 0 ; i < PART_WORKER_SLOTS ; i ++ )
750- if (concurrent_part_slots [i ].worker_status != WS_FREE &&
751- concurrent_part_slots [i ].relid == relid &&
752- concurrent_part_slots [i ].dbid == MyDatabaseId )
770+ slot = & concurrent_part_slots [i ];
771+
772+ if (slot -> worker_status != WS_FREE &&
773+ slot -> relid == relid &&
774+ slot -> dbid == MyDatabaseId )
753775 {
754- concurrent_part_slots [ i ]. worker_status = WS_STOPPING ;
776+ slot -> worker_status = WS_STOPPING ;
755777 elog (NOTICE , "Worker will stop after it finishes current batch" );
756778
757779 PG_RETURN_BOOL (true);
0 commit comments