Skip to content

Commit adc5a94

Browse files
Bugfixes for failures during data recovery
Add a recovery callback to destroy partially-recovered members when interrupted by failure Fix the case of one rank finishing a store where its partner fails, followed by a commit on the succesfull rank. Now come to a consensus on timestamps on group reinitialization.
1 parent 71e59e4 commit adc5a94

File tree

2 files changed

+144
-54
lines changed

2 files changed

+144
-54
lines changed

src/fenix_data_policy_in_memory_raid.c

Lines changed: 141 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
#include <mpi.h>
5858
#include "fenix.h"
59+
#include "fenix_ext.h"
5960
#include "fenix_opt.h"
6061
#include "fenix_data_subset.h"
6162
#include "fenix_data_recovery.h"
@@ -123,8 +124,25 @@ typedef struct __fenix_imr_group{
123124
int entries_count;
124125
fenix_imr_mentry_t* entries;
125126
int num_snapshots;
127+
int* timestamps;
126128
} fenix_imr_group_t;
127129

130+
typedef struct __fenix_imr_undo_log{
131+
int groupid, memberid;
132+
} fenix_imr_undo_log_t;
133+
134+
void __imr_sync_timestamps(fenix_imr_group_t* group);
135+
136+
void __imr_undo_restore(MPI_Comm comm, int err, void* data){
137+
fenix_imr_undo_log_t* undo_log = (fenix_imr_undo_log_t*)data;
138+
139+
Fenix_Data_member_delete(undo_log->groupid, undo_log->memberid);
140+
141+
free(data);
142+
Fenix_Callback_pop(); //Should be this callback itself.
143+
}
144+
145+
128146
void __fenix_policy_in_memory_raid_get_group(fenix_group_t** group, MPI_Comm comm,
129147
int timestart, int depth, void* policy_value, int* flag){
130148
*group = (fenix_group_t *)malloc(sizeof(fenix_imr_group_t));
@@ -257,8 +275,11 @@ void __fenix_policy_in_memory_raid_get_group(fenix_group_t** group, MPI_Comm com
257275
new_group->entries =
258276
(fenix_imr_mentry_t*) malloc(sizeof(fenix_imr_mentry_t) * __FENIX_IMR_DEFAULT_MENTRY_NUM);
259277
new_group->num_snapshots = 0;
260-
261-
278+
new_group->timestamps = (int*)malloc(sizeof(int)*depth);
279+
280+
new_group->base.comm = comm;
281+
new_group->base.current_rank = my_rank;
282+
__imr_sync_timestamps(new_group);
262283
*flag = FENIX_SUCCESS;
263284
}
264285

@@ -370,12 +391,7 @@ int __imr_member_create(fenix_group_t* g, fenix_member_entry_t* mentry){
370391
//Initialize to smallest # blocks allowed.
371392
__fenix_data_subset_init(1, new_imr_mentry->data_regions + i);
372393
new_imr_mentry->data_regions[i].specifier = __FENIX_SUBSET_EMPTY;
373-
374-
//-1 is not a valid timestamp, use as an indicator that the data isn't valid.
375-
new_imr_mentry->timestamp[i] = -1;
376394
}
377-
//The first commit's timestamp is the group's timestart.
378-
new_imr_mentry->timestamp[0] = group->base.timestart;
379395

380396
group->entries_count++;
381397

@@ -398,7 +414,7 @@ void __imr_member_free(fenix_imr_mentry_t* mentry, int depth){
398414
}
399415

400416
int __imr_member_delete(fenix_group_t* g, int member_id){
401-
int retval = -1;
417+
int retval = FENIX_SUCCESS;
402418
fenix_imr_group_t* group = (fenix_imr_group_t*)g;
403419
//Find the member first
404420
fenix_imr_mentry_t *mentry;
@@ -460,9 +476,10 @@ int __imr_member_store(fenix_group_t* g, int member_id,
460476
void* recv_buf = malloc(serialized_size * member_data->datatype_size);
461477

462478
MPI_Sendrecv(serialized, serialized_size * member_data->datatype_size, MPI_BYTE,
463-
group->partners[1], group->base.groupid ^ STORE_PAYLOAD_TAG, recv_buf,
464-
serialized_size * member_data->datatype_size, MPI_BYTE, group->partners[0],
465-
group->base.groupid ^ STORE_PAYLOAD_TAG, group->base.comm, NULL);
479+
group->partners[1], group->base.groupid ^ STORE_PAYLOAD_TAG,
480+
recv_buf, serialized_size * member_data->datatype_size, MPI_BYTE,
481+
group->partners[0], group->base.groupid ^ STORE_PAYLOAD_TAG,
482+
group->base.comm, NULL);
466483

467484
//Expand the serialized data out and store into the partner's portion of this data entry.
468485
__fenix_data_subset_deserialize(&subset_specifier, recv_buf,
@@ -575,13 +592,18 @@ int __imr_commit(fenix_group_t* g){
575592

576593
fenix_imr_group_t *group = (fenix_imr_group_t*)g;
577594

595+
if(group->num_snapshots == group->base.depth+1){
596+
//Full of timestamps, remove the oldest and proceed as normal.
597+
memcpy(group->timestamps, group->timestamps+1, group->base.depth);
598+
group->num_snapshots--;
599+
}
600+
group->timestamps[group->num_snapshots++] = group->base.timestamp;
601+
602+
578603
//For each entry id (eid)
579604
for(int eid = 0; eid < group->entries_count; eid++){
580605
fenix_imr_mentry_t *mentry = &group->entries[eid];
581606

582-
//Two cases for each member entry:
583-
// (1) depth has been reached, shift out the oldest commit
584-
// (2) depth has not been reached, just commit and start filling a new location.
585607
if(mentry->current_head == group->base.depth + 1){
586608
//The entry is full, one snapshot should be shifted out.
587609

@@ -598,24 +620,11 @@ int __imr_commit(fenix_group_t* g){
598620

599621
mentry->data[group->base.depth + 1] = first_data;
600622
mentry->data_regions[group->base.depth + 1].specifier = __FENIX_SUBSET_EMPTY;
601-
mentry->timestamp[group->base.depth + 1] = mentry->timestamp[group->base.depth] + 1;
602-
603-
} else {
604-
//The entry is not full, just shift the current head.
605-
mentry->current_head++;
606-
607-
//Everything is initialized to correct values, we just need to provide
608-
//the correct timestamp for the next snapshot.
609-
mentry->timestamp[mentry->current_head] = mentry->timestamp[mentry->current_head-1] + 1;
610-
611-
if(eid == 0){
612-
//Only do this once
613-
group->num_snapshots++;
614-
}
623+
mentry->current_head--;
615624
}
616-
}
617625

618-
group->base.timestamp = group->entries[0].timestamp[group->entries[0].current_head - 1];
626+
mentry->timestamp[mentry->current_head++] = group->base.timestamp;
627+
}
619628

620629
return to_return;
621630
}
@@ -698,6 +707,9 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
698707
int retval = -1;
699708

700709
fenix_imr_group_t* group = (fenix_imr_group_t*)g;
710+
//One-time fix after a reinit.
711+
if(group->base.timestamp == -1 && group->num_snapshots > 0)
712+
group->base.timestamp = group->timestamps[group->num_snapshots-1];
701713

702714
fenix_imr_mentry_t* mentry;
703715
//find_mentry returns the error status. We found the member (and corresponding data) if there are no errors.
@@ -711,6 +723,8 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
711723

712724
int recovery_locally_possible;
713725

726+
fenix_imr_undo_log_t* undo_data; //Used for undoing partial restores interrupted by failures.
727+
714728
if(group->raid_mode == 1){
715729
int my_data_found, partner_data_found;
716730

@@ -722,6 +736,7 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
722736
MPI_Sendrecv(&found_member, 1, MPI_INT, group->partners[1], PARTNER_STATUS_TAG,
723737
&partner_data_found, 1, MPI_INT, group->partners[0], PARTNER_STATUS_TAG,
724738
group->base.comm, NULL);
739+
725740

726741
if(found_member && partner_data_found && my_data_found){
727742
//I have my data, and the person who's data I am backing up has theirs. We're good to go.
@@ -738,17 +753,6 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
738753
if(!partner_data_found)
739754
__fenix_data_member_send_metadata(group->base.groupid, member_id, group->partners[0]);
740755

741-
//Now my partner will need all of the entries. First they'll need to know how many snapshots
742-
//to expect.
743-
if(!partner_data_found)
744-
MPI_Send((void*) &(group->num_snapshots), 1, MPI_INT, group->partners[0],
745-
RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm);
746-
747-
//They also need the timestamps for each snapshot, as well as the value for the next.
748-
if(!partner_data_found)
749-
MPI_Send((void*)mentry->timestamp, group->num_snapshots+1, MPI_INT, group->partners[0],
750-
RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm);
751-
752756
for(int snapshot = 0; snapshot < group->num_snapshots; snapshot++){
753757
//send data region info next
754758
if(!partner_data_found)
@@ -788,21 +792,22 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
788792
__fenix_member_create(group->base.groupid, packet.memberid, NULL, packet.current_count,
789793
packet.datatype_size);
790794

795+
//Mark the member for deletion if another failure interrupts recovering fully.
796+
undo_data = (fenix_imr_undo_log_t*)malloc(sizeof(fenix_imr_undo_log_t));
797+
undo_data->groupid = group->base.groupid;
798+
undo_data->memberid = member_id;
799+
Fenix_Callback_register(__imr_undo_restore, (void*)undo_data);
800+
791801
__imr_find_mentry(group, member_id, &mentry);
792802
int member_data_index = __fenix_search_memberid(group->base.member, member_id);
793803
member_data = group->base.member->member_entry[member_data_index];
794-
795-
MPI_Recv((void*)&(group->num_snapshots), 1, MPI_INT, group->partners[1],
796-
RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm, NULL);
797-
804+
798805
mentry->current_head = group->num_snapshots;
799806

800-
//We also need to explicitly ask for all timestamps, since user may have deleted some and caused mischief.
801-
MPI_Recv((void*)(mentry->timestamp), group->num_snapshots + 1, MPI_INT, group->partners[1],
802-
RECOVER_MEMBER_ENTRY_TAG^group->base.groupid, group->base.comm, NULL);
803-
804807
//now recover data.
805808
for(int snapshot = 0; snapshot < group->num_snapshots; snapshot++){
809+
mentry->timestamp[snapshot] = group->timestamps[snapshot];
810+
806811
__fenix_data_subset_free(mentry->data_regions+snapshot);
807812
__fenix_data_subset_recv(mentry->data_regions+snapshot, group->partners[1],
808813
__IMR_RECOVER_DATA_REGION_TAG ^ group->base.groupid, group->base.comm);
@@ -828,11 +833,16 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
828833
free(recv_buf);
829834
}
830835
}
836+
837+
//Member restored fully, so we don't need to mark it for undoing on failure.
838+
Fenix_Callback_pop();
839+
free(undo_data);
831840
}
832841

833842

834843
recovery_locally_possible = found_member || (my_data_found && partner_data_found);
835-
844+
if(recovery_locally_possible) retval = FENIX_SUCCESS;
845+
836846
} else if (group->raid_mode == 5){
837847
int* set_results = malloc(sizeof(int) * group->set_size);
838848
MPI_Allgather((void*)&found_member, 1, MPI_INT, (void*)set_results, 1, MPI_INT,
@@ -890,6 +900,13 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
890900
__fenix_member_create(group->base.groupid, packet.memberid, NULL, packet.current_count,
891901
packet.datatype_size);
892902

903+
//Mark the member for deletion if another failure interrupts recovering fully.
904+
undo_data = (fenix_imr_undo_log_t*)malloc(sizeof(fenix_imr_undo_log_t));
905+
undo_data->groupid = group->base.groupid;
906+
undo_data->memberid = member_id;
907+
Fenix_Callback_register(__imr_undo_restore, (void*)undo_data);
908+
909+
893910
__imr_find_mentry(group, member_id, &mentry);
894911
int member_data_index = __fenix_search_memberid(group->base.member, member_id);
895912
member_data = group->base.member->member_entry[member_data_index];
@@ -956,6 +973,12 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
956973
}
957974
}
958975

976+
if(!found_member){
977+
//Member restored fully, so we don't need to mark it for undoing on failure.
978+
Fenix_Callback_pop();
979+
free(undo_data);
980+
}
981+
959982
}
960983

961984
retval = FENIX_SUCCESS;
@@ -1032,7 +1055,7 @@ int __imr_member_restore(fenix_group_t* g, int member_id,
10321055
}
10331056

10341057
//Dont forget to clear the commit buffer
1035-
mentry->data_regions[mentry->current_head].specifier = __FENIX_SUBSET_EMPTY;
1058+
if(recovery_locally_possible) mentry->data_regions[mentry->current_head].specifier = __FENIX_SUBSET_EMPTY;
10361059

10371060

10381061
return retval;
@@ -1128,11 +1151,78 @@ int __imr_reinit(fenix_group_t* g, int* flag){
11281151
MPI_Comm_create_group(g->comm, set_group, 0, &(group->set_comm));
11291152
}
11301153

1154+
__imr_sync_timestamps(group);
1155+
11311156
*flag = FENIX_SUCCESS;
11321157

11331158
return FENIX_SUCCESS;
11341159
}
11351160

1161+
void __imr_sync_timestamps(fenix_imr_group_t* group){
1162+
int n_snapshots = group->num_snapshots;
1163+
1164+
if(group->raid_mode == 1){
1165+
int partner_snapshots;
1166+
MPI_Sendrecv(&n_snapshots, 1, MPI_INT, group->partners[0], 34560,
1167+
&partner_snapshots, 1, MPI_INT, group->partners[1], 34560,
1168+
group->base.comm, MPI_STATUS_IGNORE);
1169+
n_snapshots = n_snapshots > partner_snapshots ? n_snapshots : partner_snapshots;
1170+
1171+
MPI_Sendrecv(&n_snapshots, 1, MPI_INT, group->partners[1], 34561,
1172+
&partner_snapshots, 1, MPI_INT, group->partners[0], 34561,
1173+
group->base.comm, MPI_STATUS_IGNORE);
1174+
n_snapshots = n_snapshots > partner_snapshots ? n_snapshots : partner_snapshots;
1175+
} else {
1176+
MPI_Allreduce(MPI_IN_PLACE, &n_snapshots, 1, MPI_INT, MPI_MAX, group->set_comm);
1177+
}
1178+
1179+
bool need_reset = group->num_snapshots != n_snapshots;
1180+
for(int i = group->num_snapshots; i < n_snapshots; i++) group->timestamps[i] = -1;
1181+
1182+
if(group->raid_mode == 1){
1183+
int* p0_stamps = (int*)malloc(sizeof(int)*n_snapshots);
1184+
int* p1_stamps = (int*)malloc(sizeof(int)*n_snapshots);
1185+
1186+
MPI_Sendrecv(group->timestamps, n_snapshots, MPI_INT, group->partners[1], 34562,
1187+
p0_stamps, n_snapshots, MPI_INT, group->partners[0], 34562,
1188+
group->base.comm, MPI_STATUS_IGNORE);
1189+
MPI_Sendrecv(group->timestamps, n_snapshots, MPI_INT, group->partners[0], 34563,
1190+
p1_stamps, n_snapshots, MPI_INT, group->partners[1], 34563,
1191+
group->base.comm, MPI_STATUS_IGNORE);
1192+
1193+
for(int i = 0; i < n_snapshots; i++){
1194+
int old_stamp = group->timestamps[i];
1195+
group->timestamps[i] = group->timestamps[i] > p0_stamps[i] ? group->timestamps[i] : p0_stamps[i];
1196+
group->timestamps[i] = group->timestamps[i] > p1_stamps[i] ? group->timestamps[i] : p1_stamps[i];
1197+
1198+
need_reset |= group->timestamps[i] != old_stamp;
1199+
}
1200+
1201+
free(p0_stamps);
1202+
free(p1_stamps);
1203+
} else {
1204+
MPI_Allreduce(MPI_IN_PLACE, group->timestamps, n_snapshots, MPI_INT, MPI_MAX, group->set_comm);
1205+
}
1206+
1207+
group->num_snapshots = n_snapshots;
1208+
if(n_snapshots > 0) group->base.timestamp = group->timestamps[n_snapshots-1];
1209+
else group->base.timestamp = -1;
1210+
1211+
//Now fix members
1212+
if(need_reset && group->entries_count > 0) {
1213+
if(fenix.options.verbose == 1){
1214+
verbose_print("Outdated timestamps on rank %d. All members will require full recovery.\n",
1215+
group->base.current_rank);
1216+
}
1217+
//For now, just delete all members and assume partner(s) can
1218+
//help me rebuild fully consistent state
1219+
for(int i = group->entries_count-1; i >= 0; i--){
1220+
int memberid = group->entries[i].memberid;
1221+
Fenix_Data_member_delete(group->base.groupid, memberid);
1222+
}
1223+
}
1224+
}
1225+
11361226
int __imr_get_redundant_policy(fenix_group_t* group, int* policy_name,
11371227
void* policy_value, int* flag){
11381228
int retval = FENIX_SUCCESS;

src/fenix_data_recovery.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,11 +551,11 @@ int __fenix_data_commit(int groupid, int *timestamp) {
551551
} else {
552552
fenix_group_t *group = (fenix.data_recovery->group[group_index]);
553553

554-
group->vtbl.commit(group);
555-
556-
if (group->timestamp +1 -1) group->timestamp++;
554+
if (group->timestamp != -1) group->timestamp++;
557555
else group->timestamp = group->timestart;
558556

557+
group->vtbl.commit(group);
558+
559559
if (timestamp != NULL) {
560560
*timestamp = group->timestamp;
561561
}

0 commit comments

Comments
 (0)