Skip to content

Commit d269266

Browse files
Allow user to invoke callbacks, better define callback behavior when nested faults occur
1 parent 9716e22 commit d269266

File tree

7 files changed

+68
-21
lines changed

7 files changed

+68
-21
lines changed

include/fenix.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,11 @@ int Fenix_Callback_register(void (*recover)(MPI_Comm, int, void *),
279279
*/
280280
int Fenix_Callback_pop();
281281

282+
/**
283+
* @brief Invoke all callbacks with information from the last recovered fault
284+
*/
285+
void Fenix_Callback_invoke_all();
286+
282287
/**
283288
* @brief Check for any failed ranks
284289
*

include/fenix.hpp

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,23 +87,29 @@ constexpr ResumeMode JUMP = FENIX_RESUME_JUMP;
8787
constexpr ResumeMode RETURN = FENIX_RESUME_RETURN;
8888
constexpr ResumeMode THROW = FENIX_RESUME_THROW;
8989

90+
enum CallbackExceptionMode {
91+
RETHROW,
92+
SQUASH
93+
};
94+
9095
using UnhandledMode = Fenix_Unhandled_mode;
9196
constexpr UnhandledMode SILENT = FENIX_UNHANDLED_SILENT;
9297
constexpr UnhandledMode PRINT = FENIX_UNHANDLED_PRINT;
9398
constexpr UnhandledMode ABORT = FENIX_UNHANDLED_ABORT;
9499

95100
namespace Args {
96101
struct FenixInitArgs {
97-
int* role = nullptr;
98-
MPI_Comm in_comm = MPI_COMM_WORLD;
99-
MPI_Comm* out_comm = nullptr;
100-
int* argc = nullptr;
101-
char*** argv = nullptr;
102-
int spares = 0;
103-
int spawn = 0;
104-
ResumeMode resume_mode = THROW;
105-
UnhandledMode unhandled_mode = ABORT;
106-
int* err = nullptr;
102+
int* role = nullptr;
103+
MPI_Comm in_comm = MPI_COMM_WORLD;
104+
MPI_Comm* out_comm = nullptr;
105+
int* argc = nullptr;
106+
char*** argv = nullptr;
107+
int spares = 0;
108+
int spawn = 0;
109+
ResumeMode resume_mode = THROW;
110+
CallbackExceptionMode callback_exception_mode = RETHROW;
111+
UnhandledMode unhandled_mode = ABORT;
112+
int* err = nullptr;
107113
};
108114
}
109115

@@ -127,6 +133,9 @@ int callback_register(std::function<void(MPI_Comm, int)> callback);
127133
//@!brief Overload of #Fenix_Callback_pop
128134
int callback_pop();
129135

136+
//@!brief Overload of #Fenix_Callback_invoke_all
137+
void callback_invoke_all();
138+
130139
/**
131140
* @brief Get the failed ranks from the most recent recovery
132141
* @return vector of failed ranks

include/fenix_ext.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,13 @@ typedef struct {
7474
int spare_ranks; // Spare ranks entered by user to repair failed ranks
7575

7676
ResumeMode resume_mode = JUMP;
77+
CallbackExceptionMode callback_exception_mode = RETHROW;
7778
UnhandledMode unhandled_mode = ABORT;
7879
int ignore_errs = false; // Temporarily ignore all errors & recovery
7980
int spawn_policy; // Indicate dynamic process spawning
8081
jmp_buf *recover_environment; // Calling environment to fill the jmp_buf structure
8182

83+
int mpi_fail_code = MPI_SUCCESS;
8284
int repair_result = FENIX_SUCCESS; // Internal variable to store the result of MPI comm repair
8385
int role = FENIX_ROLE_INITIAL_RANK;
8486

include/fenix_process_recovery.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ int __fenix_callback_register(fenix_callback_func& recover);
9898

9999
int __fenix_callback_pop();
100100

101-
void __fenix_callback_invoke_all(int error);
101+
void __fenix_callback_invoke_all();
102102

103103
int* __fenix_get_fail_ranks(int *, int, int);
104104

src/fenix.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ int Fenix_Callback_pop() {
8888
return __fenix_callback_pop();
8989
}
9090

91+
void Fenix_Callback_invoke_all() {
92+
__fenix_callback_invoke_all();
93+
}
94+
9195
int Fenix_Initialized(int *flag) {
9296
*flag = (fenix.fenix_init_flag) ? 1 : 0;
9397
return FENIX_SUCCESS;
@@ -298,6 +302,10 @@ int callback_pop() {
298302
return __fenix_callback_pop();
299303
}
300304

305+
void callback_invoke_all() {
306+
__fenix_callback_invoke_all();
307+
}
308+
301309
std::vector<int> fail_list(){
302310
if(fenix.fail_world_size == 0) return {};
303311
return {fenix.fail_world, fenix.fail_world+fenix.fail_world_size};

src/fenix_callbacks.cpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@
6262
#include "fenix_data_recovery.hpp"
6363
#include "fenix_opt.hpp"
6464
#include "fenix_util.hpp"
65+
#include "fenix_exception.hpp"
6566
#include <mpi.h>
6667

68+
using namespace Fenix;
6769

6870
int __fenix_callback_register(fenix_callback_func& recover)
6971
{
@@ -83,9 +85,28 @@ int __fenix_callback_pop(){
8385
return FENIX_SUCCESS;
8486
}
8587

86-
void __fenix_callback_invoke_all(int error)
87-
{
88-
for(auto it = fenix.callbacks.rbegin(); it != fenix.callbacks.rend(); it++){
89-
(*it)(*fenix.user_world, error);
88+
void __fenix_callback_invoke_all(){
89+
//If callbacks are invoked in a nested manner due to caught exceptions
90+
//within a callback, we want to only finish the most recent call. All prior
91+
//calls should exit as soon as control returns.
92+
static int callbacks_depth = 0;
93+
int m_callbacks_layer = callbacks_depth++;
94+
95+
try {
96+
for(auto& cb : fenix.callbacks) {
97+
if(callbacks_depth != m_callbacks_layer+1) break;
98+
cb(*fenix.user_world, fenix.mpi_fail_code);
99+
}
100+
} catch (const CommException& e) {
101+
switch(fenix.callback_exception_mode){
102+
case(RETHROW):
103+
if(m_callbacks_layer == 0) callbacks_depth = 0;
104+
throw;
105+
case(SQUASH):
106+
break;
107+
}
90108
}
109+
110+
//Reset the callback depth when leaving the outermost call
111+
if(m_callbacks_layer == 0) callbacks_depth = 0;
91112
}

src/fenix_process_recovery.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ int fenix_preinit(const Args::FenixInitArgs& args, jmp_buf* jump_env){
112112
fenix.spawn_policy = args.spawn;
113113
fenix.recover_environment = jump_env;
114114
fenix.resume_mode = args.resume_mode;
115+
fenix.callback_exception_mode = args.callback_exception_mode;
115116
fenix.unhandled_mode = args.unhandled_mode;
116117
fenix.ret_role = args.role ? args.role : &fenix.role;
117118
fenix.ret_error = args.err ? args.err : &fenix.repair_result;
@@ -663,9 +664,10 @@ void __fenix_postinit()
663664
34095347, fenix.new_world, &fenix.check_failures_req);
664665
}
665666

666-
if (fenix.role == FENIX_ROLE_SURVIVOR_RANK) {
667-
__fenix_callback_invoke_all(*fenix.ret_error);
667+
if(fenix.role != FENIX_ROLE_INITIAL_RANK) {
668+
__fenix_callback_invoke_all();
668669
}
670+
669671
if (fenix.options.verbose == 9) {
670672
verbose_print("After barrier. current_rank: %d, role: %d\n", __fenix_get_current_rank(fenix.new_world),
671673
fenix.role);
@@ -798,17 +800,17 @@ void __fenix_test_MPI(MPI_Comm *pcomm, int *pret, ...)
798800
{
799801
int ret_repair;
800802
int index;
801-
int ret = *pret;
803+
fenix.mpi_fail_code = *pret;
804+
802805
if(!fenix.fenix_init_flag || __fenix_spare_rank() == 1 || fenix.ignore_errs) {
803806
return;
804807
}
805808

806-
switch (ret) {
809+
switch (fenix.mpi_fail_code) {
807810
case MPI_ERR_PROC_FAILED_PENDING:
808811
case MPI_ERR_PROC_FAILED:
809812
MPIX_Comm_revoke(*fenix.world);
810813
MPIX_Comm_revoke(fenix.new_world);
811-
812814
if(fenix.user_world_exists) MPIX_Comm_revoke(*fenix.user_world);
813815

814816
fenix.repair_result = __fenix_repair_ranks();
@@ -819,7 +821,7 @@ void __fenix_test_MPI(MPI_Comm *pcomm, int *pret, ...)
819821
default:
820822
int len;
821823
char errstr[MPI_MAX_ERROR_STRING];
822-
MPI_Error_string(ret, errstr, &len);
824+
MPI_Error_string(fenix.mpi_fail_code, errstr, &len);
823825
switch (fenix.unhandled_mode) {
824826
case ABORT:
825827
fprintf(stderr, "UNHANDLED ERR: %s\n", errstr);

0 commit comments

Comments
 (0)