@@ -512,7 +512,7 @@ bgw_main_concurrent_part(Datum main_arg)
512512 if (failures_count ++ >= PART_WORKER_MAX_ATTEMPTS )
513513 {
514514 /* Mark slot as FREE */
515- cps_set_status (part_slot , WS_FREE );
515+ cps_set_status (part_slot , CPS_FREE );
516516
517517 elog (LOG ,
518518 "Concurrent partitioning worker has canceled the task because "
@@ -555,7 +555,7 @@ bgw_main_concurrent_part(Datum main_arg)
555555 }
556556
557557 /* If other backend requested to stop us, quit */
558- if (cps_check_status (part_slot ) == WS_STOPPING )
558+ if (cps_check_status (part_slot ) == CPS_STOPPING )
559559 break ;
560560 }
561561 while (rows > 0 || failed ); /* do while there's still rows to be relocated */
@@ -564,7 +564,7 @@ bgw_main_concurrent_part(Datum main_arg)
564564 pfree (sql );
565565
566566 /* Mark slot as FREE */
567- cps_set_status (part_slot , WS_FREE );
567+ cps_set_status (part_slot , CPS_FREE );
568568}
569569
570570
@@ -603,7 +603,8 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
603603
604604 SpinLockAcquire (& cur_slot -> mutex );
605605
606- if (empty_slot_idx < 0 )
606+ /* Should we take this slot into account? */
607+ if (empty_slot_idx < 0 && cur_slot -> worker_status == CPS_FREE )
607608 {
608609 empty_slot_idx = i ;
609610 keep_this_lock = true;
@@ -630,7 +631,7 @@ partition_table_concurrently(PG_FUNCTION_ARGS)
630631 {
631632 /* Initialize concurrent part slot */
632633 InitConcurrentPartSlot (& concurrent_part_slots [empty_slot_idx ],
633- GetAuthenticatedUserId (), WS_WORKING ,
634+ GetAuthenticatedUserId (), CPS_WORKING ,
634635 MyDatabaseId , relid , 1000 , 1.0 );
635636
636637 SpinLockRelease (& concurrent_part_slots [empty_slot_idx ].mutex );
@@ -707,12 +708,13 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
707708 for (i = userctx -> cur_idx ; i < PART_WORKER_SLOTS ; i ++ )
708709 {
709710 ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
711+ HeapTuple htup = NULL ;
710712
713+ HOLD_INTERRUPTS ();
711714 SpinLockAcquire (& cur_slot -> mutex );
712715
713- if (cur_slot -> worker_status != WS_FREE )
716+ if (cur_slot -> worker_status != CPS_FREE )
714717 {
715- HeapTuple tuple ;
716718 Datum values [Natts_pathman_cp_tasks ];
717719 bool isnull [Natts_pathman_cp_tasks ] = { 0 };
718720
@@ -725,12 +727,12 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
725727 /* Now build a status string */
726728 switch (cur_slot -> worker_status )
727729 {
728- case WS_WORKING :
730+ case CPS_WORKING :
729731 values [Anum_pathman_cp_tasks_status - 1 ] =
730732 PointerGetDatum (cstring_to_text ("working" ));
731733 break ;
732734
733- case WS_STOPPING :
735+ case CPS_STOPPING :
734736 values [Anum_pathman_cp_tasks_status - 1 ] =
735737 PointerGetDatum (cstring_to_text ("stopping" ));
736738 break ;
@@ -741,15 +743,18 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
741743 }
742744
743745 /* Form output tuple */
744- tuple = heap_form_tuple (funcctx -> tuple_desc , values , isnull );
746+ htup = heap_form_tuple (funcctx -> tuple_desc , values , isnull );
745747
746748 /* Switch to next worker */
747749 userctx -> cur_idx = i + 1 ;
748-
749- SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
750750 }
751751
752752 SpinLockRelease (& cur_slot -> mutex );
753+ RESUME_INTERRUPTS ();
754+
755+ /* Return tuple if needed */
756+ if (htup )
757+ SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (htup ));
753758 }
754759
755760 SRF_RETURN_DONE (funcctx );
@@ -770,19 +775,22 @@ stop_concurrent_part_task(PG_FUNCTION_ARGS)
770775 {
771776 ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
772777
778+ HOLD_INTERRUPTS ();
773779 SpinLockAcquire (& cur_slot -> mutex );
774780
775- if (cur_slot -> worker_status != WS_FREE &&
781+ if (cur_slot -> worker_status != CPS_FREE &&
776782 cur_slot -> relid == relid &&
777783 cur_slot -> dbid == MyDatabaseId )
778784 {
779785 elog (NOTICE , "Worker will stop after it finishes current batch" );
780786
781- cur_slot -> worker_status = WS_STOPPING ;
787+ /* Change worker's state & set 'worker_found' */
788+ cur_slot -> worker_status = CPS_STOPPING ;
782789 worker_found = true;
783790 }
784791
785792 SpinLockRelease (& cur_slot -> mutex );
793+ RESUME_INTERRUPTS ();
786794 }
787795
788796 if (worker_found )
0 commit comments