Skip to content

Commit a56c85c

Browse files
committed
repair: handling tricky edge cases with dropping peers, and being at head of turbine, invalidating iterator on publish
1 parent fbff822 commit a56c85c

File tree

6 files changed

+131
-54
lines changed

6 files changed

+131
-54
lines changed

src/discof/forest/fd_forest.c

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ requests_remove( fd_forest_t * forest, ulong pool_idx ) {
149149
fd_forest_ref_t * pool = fd_forest_reqspool( forest );
150150
fd_forest_ref_t * ele;
151151
if( FD_LIKELY( ele = fd_forest_requests_ele_remove( requests, &pool_idx, NULL, pool ) ) ) {
152+
/* invalidate the iterator if it is on the removed slot. */
153+
if( FD_UNLIKELY( forest->iter.ele_idx == pool_idx ) ) {
154+
forest->iter.ele_idx = ULONG_MAX;
155+
}
152156
fd_forest_reqslist_ele_remove( fd_forest_reqslist( forest ), ele, pool );
153157
fd_forest_reqspool_ele_release( pool, ele );
154158
}
@@ -632,8 +636,7 @@ fd_forest_blk_insert( fd_forest_t * forest, ulong slot, ulong parent_slot ) {
632636
fd_forest_frontier_ele_query( frontier, &ele->slot, NULL, pool ) ) ) {
633637
/* There is a chance that we connected this ele to the main tree. If
634638
this ele doesn't have a parent in the consumed/requests map, add it to the
635-
consumed/requests map. If there are no requests in the deque though
636-
(common case after catchup), don't even bother iterating. */
639+
consumed/requests map. */
637640
ulong ancestor = fd_forest_pool_idx( pool, ele );
638641
int has_requests_anc = 0;
639642
int has_consumed_anc = 0;
@@ -760,6 +763,7 @@ fd_forest_publish( fd_forest_t * forest, ulong new_root_slot ) {
760763
fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest );
761764
fd_forest_frontier_t * frontier = fd_forest_frontier( forest );
762765
fd_forest_subtrees_t * subtrees = fd_forest_subtrees( forest );
766+
fd_forest_ref_t * conspool = fd_forest_conspool( forest );
763767
fd_forest_blk_t * pool = fd_forest_pool( forest );
764768
ulong null = fd_forest_pool_idx_null( pool );
765769
ulong * queue = fd_forest_deque( forest );
@@ -849,7 +853,7 @@ fd_forest_publish( fd_forest_t * forest, ulong new_root_slot ) {
849853
In that case we need to continue repairing from the new root, so
850854
add it to the consumed map. */
851855

852-
if( FD_UNLIKELY( fd_forest_conspool_used( fd_forest_conspool( forest ) ) == 0 ) ) {
856+
if( FD_UNLIKELY( fd_forest_conslist_is_empty( fd_forest_conslist( forest ), conspool ) ) ) {
853857
consumed_insert( forest, fd_forest_pool_idx( pool, new_root_ele ) );
854858
requests_insert( forest, fd_forest_pool_idx( pool, new_root_ele ) );
855859
new_root_ele->complete_idx = 0;
@@ -956,14 +960,23 @@ fd_forest_iter_next( fd_forest_t * forest ) {
956960
requests_insert( forest, fd_forest_pool_idx( pool, child ) );
957961
child = fd_forest_pool_ele_const( pool, child->sibling );
958962
}
959-
requests_remove( forest, iter->ele_idx ); /* remove finished slot from head of requests deque */
960-
if( FD_UNLIKELY( ele->complete_idx == UINT_MAX ) ) {
961-
/* if we just made a highest_window_idx request, add this slot back to the requests deque at the end */
963+
/* so annoying. cant call requests_remove because itll invalidate the current iter->ele_idx,
964+
so we explicitly pop the head and free the ele here. */
965+
fd_forest_ref_t * head = fd_forest_reqslist_ele_pop_head( fd_forest_reqslist( forest ), reqspool );
966+
fd_forest_requests_ele_remove ( fd_forest_requests( forest ), &head->idx, NULL, reqspool );
967+
fd_forest_reqspool_ele_release( reqspool, head );
968+
969+
if( FD_UNLIKELY( iter->shred_idx == UINT_MAX && ( ele->buffered_idx == UINT_MAX || ele->buffered_idx < ele->complete_idx ) ) ) {
970+
/* If we just made a highest_window_idx request, add this slot
971+
back to the requests deque at the end. Also condition on
972+
whether or not this slot is still incomplete. If the slot
973+
is complete and we add it back to the loop, we will end up
974+
infinite looping. */
962975
requests_insert( forest, iter->ele_idx );
963976
}
964977
}
965978

966-
/* move onto the next slot */
979+
/* Move onto the next slot */
967980
if( FD_UNLIKELY( fd_forest_reqslist_is_empty( reqslist, reqspool ) ) ) {
968981
iter->ele_idx = fd_forest_pool_idx_null( pool );
969982
iter->shred_idx = UINT_MAX;

src/discof/forest/test_forest.c

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,25 @@ test_iter_publish( fd_wksp_t * wksp ) {
862862

863863
}
864864

865+
void
866+
test_iter_caught_up( fd_wksp_t * wksp ) {
867+
ulong ele_max = 8;
868+
void * mem = fd_wksp_alloc_laddr( wksp, fd_forest_align(), fd_forest_footprint( ele_max ), 1UL );
869+
FD_TEST( mem );
870+
fd_forest_t * forest = fd_forest_join( fd_forest_new( mem, ele_max, 42UL /* seed */ ) );
871+
872+
fd_forest_init( forest, 0 );
873+
fd_forest_blk_fec_insert ( forest, 1, 0, 0, 0, 1 );
874+
fd_forest_blk_fec_insert ( forest, 2, 1, 0, 0, 1 ); /* fully caught up */
875+
fd_forest_blk_data_shred_insert( forest, 3, 2, 0, 0, 0, 0 );
876+
877+
for( int i = 0; i < 10; i++ ) {
878+
fd_forest_iter_next( forest );
879+
FD_LOG_NOTICE(("iter: slot %lu, idx %u", idx_slot( forest, forest->iter.ele_idx ), forest->iter.shred_idx));
880+
}
881+
882+
}
883+
865884

866885
int
867886
main( int argc, char ** argv ) {
@@ -873,18 +892,19 @@ main( int argc, char ** argv ) {
873892
fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( page_sz ), page_cnt, fd_shmem_cpu_idx( numa_idx ), "wksp", 0UL );
874893
FD_TEST( wksp );
875894

876-
test_invalid_frontier_insert( wksp );
877-
test_publish( wksp );
878-
test_publish_incremental( wksp );
879-
test_out_of_order( wksp );
880-
test_forks( wksp );
881-
test_print_tree( wksp );
882-
// test_large_print_tree( wksp);
883-
test_linear_forest_iterator( wksp );
884-
test_branched_forest_iterator( wksp );
885-
test_frontier( wksp );
886-
test_fec_clear( wksp );
887-
test_iter_publish( wksp );
895+
//test_invalid_frontier_insert( wksp );
896+
//test_publish( wksp );
897+
//test_publish_incremental( wksp );
898+
//test_out_of_order( wksp );
899+
//test_forks( wksp );
900+
//test_print_tree( wksp );
901+
//// test_large_print_tree( wksp);
902+
//test_linear_forest_iterator( wksp );
903+
//test_branched_forest_iterator( wksp );
904+
//test_frontier( wksp );
905+
//test_fec_clear( wksp );
906+
//test_iter_publish( wksp );
907+
test_iter_caught_up( wksp );
888908

889909
fd_halt();
890910
return 0;

src/discof/repair/fd_inflight.h

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,18 @@
44
#include "../../flamenco/types/fd_types.h"
55

66
/* fd_inflights tracks repair requests that are inflight to other
7-
validators. This module is not necessary for the repair protocol and
8-
strategy, but is useful for metrics and reporting. Incorrect updates
9-
and removals from this module are non-critical. Requests are key-ed
10-
by nonce as in the current strategy (see fd_policy.h), all requests
11-
have a unique nonce. The chances that an inflight request does not
12-
get a response are non-negligible due to shred tile upstream deduping
13-
duplicates. */
7+
validators. This module is useful for metrics and reporting.
8+
In-exact updates of orphan requests and highest window requests from
9+
this module are non-critical, but exact updates of shred requests are
10+
critical. Repair tile relies on this module to be able to re-request
11+
any shreds that it has sent, because policy next does not request any
12+
shred twice.
13+
(TODO should this be rolled into policy.h?)
14+
15+
Requests are key-ed by nonce as in the current strategy (see
16+
fd_policy.h), all requests have a unique nonce. The chances that an
17+
inflight request does not get a response are non-negligible due to
18+
shred tile upstream deduping duplicates. */
1419

1520
/* Max number of pending requests */
1621
#define FD_INFLIGHT_REQ_MAX (1<<20)
@@ -96,7 +101,7 @@ fd_inflights_should_drain( fd_inflights_t * table, long now ) {
96101
if( FD_UNLIKELY( fd_inflight_dlist_is_empty( table->dlist, table->pool ) ) ) return 0;
97102

98103
fd_inflight_t * inflight_req = fd_inflight_dlist_ele_peek_head( table->dlist, table->pool );
99-
if( FD_UNLIKELY( inflight_req->timestamp_ns + 100e6L < now ) ) return 1;
104+
if( FD_UNLIKELY( inflight_req->timestamp_ns + 90e6L < now ) ) return 1;
100105
return 0;
101106
}
102107

src/discof/repair/fd_policy.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ fd_policy_peer_select( fd_policy_t * policy ) {
164164
fd_peer_dlist_t * worst_dlist = policy->peers.slow;
165165
fd_peer_t * pool = policy->peers.pool;
166166

167+
if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
168+
167169
fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
168170

169171
while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
@@ -235,10 +237,10 @@ fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair
235237
means that the shred_idx of the iterf is likely to be UINT_MAX,
236238
which means calling fd_forest_iter_next will advance the iterf
237239
to the next slot. */
238-
//forest->iter.shred_idx = UINT_MAX; // heinous... i'm sorry
239-
//fd_forest_iter_next( forest );
240-
//if( FD_UNLIKELY( fd_forest_iter_done( forest->iter, forest ) ) ) break;
241-
//continue;
240+
forest->iter.shred_idx = UINT_MAX;
241+
/* TODO: Heinous... I'm sorry. Easiest way to ensure this slot gets added back to the requests deque.
242+
but maybe there should be an explicit API for it. */
243+
return NULL;
242244
}
243245

244246
if( FD_UNLIKELY( forest->iter.shred_idx == UINT_MAX ) ) {
@@ -300,7 +302,7 @@ fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
300302

301303
if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
302304
/* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
303-
fd_peer_dlist_t * dlist = policy->peers.select.stage == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
305+
fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
304306
policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool );
305307
}
306308

src/discof/repair/fd_repair_metrics.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ struct fd_slot_metrics {
1818
};
1919
typedef struct fd_slot_metrics fd_slot_metrics_t;
2020

21-
#define FD_CATCHUP_METRICS_MAX 256
21+
#define FD_CATCHUP_METRICS_MAX 16384
2222

2323
struct fd_repair_metrics_t {
2424
fd_slot_metrics_t slots[ FD_CATCHUP_METRICS_MAX ];

src/discof/repair/fd_repair_tile.c

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -591,40 +591,71 @@ after_sign( ctx_t * ctx,
591591
}
592592
}
593593

594-
sign_req_t * pending = fd_signs_map_query( ctx->signs_map, pending_key, NULL );
595-
if( FD_UNLIKELY( !pending ) ) FD_LOG_CRIT(( "No pending request found for key %lu", pending_key ));
594+
sign_req_t * pending_ = fd_signs_map_query( ctx->signs_map, pending_key, NULL );
595+
sign_req_t pending[1] = { *pending_ }; /* Make a copy of the pending request so we can sign_map_remove immediately. */
596+
sign_map_remove( ctx, pending_key );
597+
598+
if( FD_UNLIKELY( !pending_ ) ) FD_LOG_CRIT(( "No pending request found for key %lu", pending_key ));
596599

600+
/* Thhis is a pong message */
597601
if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_PONG ) ) {
598602
fd_memcpy( pending->msg.pong.sig, ctx->buffer, 64UL );
599603
send_packet( ctx, stem, 1, pending->pong_data.peer_addr.addr, pending->pong_data.peer_addr.port, pending->pong_data.daddr, pending->buf, fd_repair_sz( &pending->msg ), fd_frag_meta_ts_comp( fd_tickcount() ) );
600-
sign_map_remove( ctx, pending_key );
601604
return;
602605
}
603606

604-
/* else: regular repair shred request format */
605-
607+
/* Inject the signature into the pending request */
606608
fd_memcpy( pending->buf + 4, ctx->buffer, 64UL );
607609
uint src_ip4 = 0U;
608-
fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
610+
611+
/* This is a warmup message */
612+
if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.slot == 0 ) ) {
613+
fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
614+
if( FD_UNLIKELY( active ) ) send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
615+
else { /* This is a warmup request for a peer that is no longer active. There's no reason to pick another peer for a warmup rq, so just drop it. */ }
616+
return;
617+
}
618+
619+
/* This is a regular repair shred request
620+
621+
TODO: anyways to make this less complicated? Essentially we need to
622+
ensure we always send out any shred requests we have, because policy_next
623+
has no way to revisit a shred. But the fact that peers can drop out
624+
of the active peer list makes this complicated.
625+
626+
1. If the peer is still there (common), it's fine.
627+
2. If the peer is not there, we can select another peer and send the request.
628+
3. If the peer is not there, and we have no other peers, we can add
629+
this request to the inflights table, pretend we've sent it and
630+
let the inflight timeout request it down the line.
631+
*/
632+
fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
633+
int is_regular_req = pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.nonce > 0; // not a highest/orphan request
609634

610635
if( FD_UNLIKELY( !active ) ) {
611-
FD_LOG_INFO(( "Signed a message for %s, but it is no longer in the active peer list", FD_BASE58_ENC_32_ALLOCA( &pending->msg.shred.to ) ));
612-
/* Happens extremely rarely, so we can just pick a new peer and
613-
try to resign here. */
614636
fd_pubkey_t const * new_peer = fd_policy_peer_select( ctx->policy );
615-
pending->msg.shred.to = *new_peer;
616-
sign_map_remove( ctx, pending_key );
617-
fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = pending->msg } );
637+
if( FD_LIKELY( new_peer ) ) {
638+
/* We have a new peer, so we can send the request */
639+
pending->msg.shred.to = *new_peer;
640+
fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = pending->msg } );
641+
}
642+
643+
if( FD_UNLIKELY( !new_peer && is_regular_req ) ) {
644+
/* This is real devastation - we clearly had a peer at the time of
645+
making this request, but for some reason we now have ZERO
646+
peers. The only thing we can do is to add this artificially to
647+
the inflights table, pretend we've sent it and let the inflight
648+
timeout request it down the line. */
649+
fd_inflights_request_insert( ctx->inflight, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
650+
}
618651
return;
619652
}
620-
621-
int is_regular_request = pending->msg.kind != FD_REPAIR_KIND_PONG && pending->msg.shred.nonce > 0;
622-
if( FD_LIKELY( is_regular_request && pending->msg.kind == FD_REPAIR_KIND_SHRED ) ) {
653+
/* Happy path - all is well, our peer didn't drop out from beneath us. */
654+
if( FD_LIKELY( is_regular_req ) ) {
623655
fd_inflights_request_insert( ctx->inflight, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
624656
fd_policy_peer_request_update( ctx->policy, &pending->msg.shred.to );
625657
}
626658
send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
627-
sign_map_remove( ctx, pending_key );
628659
}
629660

630661
static inline void
@@ -850,6 +881,7 @@ after_frag( ctx_t * ctx,
850881
}
851882
}
852883
/* update metrics */
884+
ctx->metrics->repaired_slots = fd_forest_highest_repaired_slot( ctx->forest );
853885
return;
854886
}
855887

@@ -898,9 +930,16 @@ after_credit( ctx_t * ctx,
898930
fd_forest_blk_t * blk = fd_forest_query( ctx->forest, slot );
899931
if( FD_UNLIKELY( !fd_forest_blk_idxs_test( blk->idxs, shred_idx ) ) ) {
900932
fd_pubkey_t const * peer = fd_policy_peer_select( ctx->policy );
901-
fd_repair_msg_t * msg = fd_repair_shred( ctx->protocol, peer, (ulong)((ulong)now / 1e6L), (uint)nonce, slot, shred_idx );
902-
fd_repair_send_sign_request( ctx, sign_out, msg, NULL );
903-
return;
933+
if( FD_UNLIKELY( !peer ) ) {
934+
/* No peers. But we CANNOT lose this request. */
935+
/* Add this request to the inflights table, pretend we've sent it and let the inflight timeout request it down the line. */
936+
fd_hash_t hash = { .ul[0] = 0 };
937+
fd_inflights_request_insert( ctx->inflight, nonce, &hash, slot, shred_idx );
938+
} else {
939+
fd_repair_msg_t * msg = fd_repair_shred( ctx->protocol, peer, (ulong)((ulong)now / 1e6L), (uint)nonce, slot, shred_idx );
940+
fd_repair_send_sign_request( ctx, sign_out, msg, NULL );
941+
return;
942+
}
904943
}
905944
}
906945

@@ -1115,8 +1154,6 @@ populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
11151154

11161155
static inline void
11171156
metrics_write( ctx_t * ctx ) {
1118-
ctx->metrics->repaired_slots = fd_forest_highest_repaired_slot( ctx->forest );
1119-
11201157
FD_MCNT_SET( REPAIR, CURRENT_SLOT, ctx->metrics->current_slot );
11211158
FD_MCNT_SET( REPAIR, REPAIRED_SLOTS, ctx->metrics->repaired_slots );
11221159
FD_MCNT_SET( REPAIR, REQUEST_PEERS, fd_peer_pool_used( ctx->policy->peers.pool ) );

0 commit comments

Comments
 (0)