@@ -55,6 +55,8 @@ struct ompi_continuation_t {
5555 opal_atomic_int32_t cont_failed ; /**< the continution is failed */
5656 opal_atomic_int32_t cont_request_check ; /**< flag set by the failed continuation handler to block
5757 * completing threads from freeing their request */
58+ int cont_rc ; /** return code to be passed to callback */
59+ bool cont_invoke_failed ; /** if true, failed continuations will be invoked and passed the error code */
5860};
5961
6062/* Convenience typedef */
@@ -70,6 +72,8 @@ static void ompi_continuation_construct(ompi_continuation_t* cont)
7072 cont -> cont_opreqs = NULL ;
7173 cont -> cont_failed = 0 ;
7274 cont -> cont_request_check = 0 ;
75+ cont -> cont_rc = MPI_SUCCESS ;
76+ cont -> cont_invoke_failed = false;
7377}
7478
7579static void ompi_continuation_destruct (ompi_continuation_t * cont )
@@ -306,7 +310,7 @@ int ompi_continue_cont_invoke(ompi_continuation_t *cont)
306310
307311 MPIX_Continue_cb_function * fn = cont -> cont_cb ;
308312 void * cont_data = cont -> cont_data ;
309- int rc = fn (MPI_SUCCESS , cont_data );
313+ int rc = fn (cont -> cont_rc , cont_data );
310314 ompi_continue_cont_release (cont , rc );
311315 return rc ;
312316}
@@ -711,22 +715,31 @@ static void handle_failed_cont(ompi_continuation_t *cont, int status, bool have_
711715 /* wait for other threads in request_completion_cb to decrement the counter */
712716 cont -> cont_request_check = 0 ;
713717 while (cont -> cont_num_active != 1 ) { }
714- cont -> cont_num_active = 0 ;
715718 }
716719 opal_list_remove_item (& cont_req -> cont_incomplete_list , & cont -> super .super );
717- opal_list_append (& cont_req -> cont_failed_list , & cont -> super .super );
718- int32_t num_active = OPAL_THREAD_ADD_FETCH32 (& cont_req -> cont_num_active , -1 );
719- if (MPI_SUCCESS == cont_req -> super .req_status .MPI_ERROR ) {
720- cont_req -> super .req_status .MPI_ERROR = status ;
721- cont_req -> cont_errorinfo .mpi_object = error_object ;
722- cont_req -> cont_errorinfo .type = error_object_type ;
723- }
724720
725- if (0 == num_active ) {
726- opal_atomic_wmb ();
727- ompi_request_complete (& cont_req -> super , true);
728- }
721+ if (cont -> cont_invoke_failed ) {
722+ /* make sure all requests have completed and enqueue the continuation for execution */
723+ cont -> cont_rc = status ;
724+ if (0 == OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 )) {
725+ ompi_continue_enqueue_runnable (cont );
726+ }
727+ } else {
728+ cont -> cont_num_active = 0 ;
729+ /* put the continuation into the list of failed continuations */
730+ opal_list_append (& cont_req -> cont_failed_list , & cont -> super .super );
731+ int32_t num_active = OPAL_THREAD_ADD_FETCH32 (& cont_req -> cont_num_active , -1 );
732+ if (MPI_SUCCESS == cont_req -> super .req_status .MPI_ERROR ) {
733+ cont_req -> super .req_status .MPI_ERROR = status ;
734+ cont_req -> cont_errorinfo .mpi_object = error_object ;
735+ cont_req -> cont_errorinfo .type = error_object_type ;
736+ }
729737
738+ if (0 == num_active ) {
739+ opal_atomic_wmb ();
740+ ompi_request_complete (& cont_req -> super , true);
741+ }
742+ }
730743 if (!have_cont_req_lock ) {
731744 opal_atomic_unlock (& cont_req -> cont_lock );
732745 }
@@ -770,7 +783,6 @@ static int request_completion_cb(ompi_request_t *request)
770783 ompi_request_free (& request );
771784 }
772785 opal_atomic_wmb ();
773- //int32_t num_active = OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -1);
774786
775787 if (1 == cont -> cont_num_active || 0 == OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 )) {
776788 /* the continuation is ready for execution */
@@ -783,7 +795,14 @@ static int request_completion_cb(ompi_request_t *request)
783795 handle_failed_cont (cont , request -> req_status .MPI_ERROR , false);
784796 } else {
785797 /* someone else handles the fault, so just signal that we're done with the continuation object */
786- OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 );
798+ if (1 == cont -> cont_num_active || 0 == OPAL_THREAD_ADD_FETCH32 (& cont -> cont_num_active , -1 )) {
799+ /* we're responsible for enqueuing the continuation for execution if:
800+ * 1) we don't have access to the requests (handle_failed_cont couldn't handle all pending requests); and
801+ * 2) this is the last outstanding request
802+ */
803+ cont -> cont_num_active = 0 ;
804+ ompi_continue_enqueue_runnable (cont );
805+ }
787806 }
788807
789808 opal_free_list_return (& ompi_request_cont_data_freelist , & req_cont_data -> super );
@@ -824,6 +843,7 @@ int ompi_continue_global_wakeup(int status) {
824843 }
825844
826845 opal_mutex_atomic_unlock (& cont_req_list_mtx );
846+ return OMPI_SUCCESS ;
827847}
828848
829849int ompi_continue_attach (
@@ -839,8 +859,9 @@ int ompi_continue_attach(
839859 return OMPI_ERR_REQUEST ;
840860 }
841861
842- bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE );
862+ bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE );
843863 bool defer_complete = (flags & MPIX_CONT_DEFER_COMPLETE );
864+ bool invoke_failed = (flags & MPIX_CONT_INVOKE_FAILED ) | (flags & MPIX_CONT_REQBUF_VOLATILE );
844865
845866 ompi_cont_request_t * cont_req = (ompi_cont_request_t * )continuation_request ;
846867 ompi_continuation_t * cont = ompi_continue_cont_create (count , cont_req , cont_cb ,
@@ -857,6 +878,8 @@ int ompi_continue_attach(
857878 reset_requests = false;
858879 }
859880
881+ cont -> cont_invoke_failed = invoke_failed ;
882+
860883 /* memory barrier to make sure a thread completing a request see
861884 * a correct continuation object */
862885 opal_atomic_wmb ();
0 commit comments