Skip to content

Commit a8a9baa

Browse files
committed
snapshots: wait for init ack before sending fail/next/done
1 parent 696ed15 commit a8a9baa

File tree

1 file changed

+17
-3
lines changed

1 file changed

+17
-3
lines changed

src/discof/restore/fd_snapct_tile.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ init_load( fd_snapct_tile_t * ctx,
406406
if( !file ) out->addr = ctx->addr;
407407
fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
408408
ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
409+
ctx->flush_ack = 0;
409410

410411
if( file ) {
411412
/* When loading from a local file and not from HTTP, there is no
@@ -753,6 +754,7 @@ after_credit( fd_snapct_tile_t * ctx,
753754

754755
/* ============================================================== */
755756
case FD_SNAPCT_STATE_READING_FULL_FILE:
757+
if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
756758
if( FD_UNLIKELY( ctx->malformed ) ) {
757759
ctx->malformed = 0;
758760
fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
@@ -772,6 +774,7 @@ after_credit( fd_snapct_tile_t * ctx,
772774

773775
/* ============================================================== */
774776
case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
777+
if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
775778
if( FD_UNLIKELY( ctx->malformed ) ) {
776779
ctx->malformed = 0;
777780
fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
@@ -790,6 +793,7 @@ after_credit( fd_snapct_tile_t * ctx,
790793

791794
/* ============================================================== */
792795
case FD_SNAPCT_STATE_READING_FULL_HTTP:
796+
if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
793797
if( FD_UNLIKELY( ctx->malformed ) ) {
794798
ctx->malformed = 0;
795799
fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
@@ -811,6 +815,7 @@ after_credit( fd_snapct_tile_t * ctx,
811815

812816
/* ============================================================== */
813817
case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
818+
if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
814819
if( FD_UNLIKELY( ctx->malformed ) ) {
815820
ctx->malformed = 0;
816821
fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
@@ -1060,10 +1065,19 @@ snapin_frag( fd_snapct_tile_t * ctx,
10601065
ulong sig ) {
10611066
switch( sig ) {
10621067
case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
1068+
if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_FULL_HTTP ||
1069+
ctx->state==FD_SNAPCT_STATE_READING_FULL_FILE ) ) {
1070+
FD_TEST( !ctx->flush_ack );
1071+
ctx->flush_ack = 1;
1072+
} else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1073+
break;
1074+
10631075
case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
1064-
/* Note: We do not need to wait for the init control message to
1065-
be flushed through the entire pipeline, like we do for fail and
1066-
done. It is safe to immediately send a fail message downstream. */
1076+
if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP ||
1077+
ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_FILE ) ) {
1078+
FD_TEST( !ctx->flush_ack );
1079+
ctx->flush_ack = 1;
1080+
} else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
10671081
break;
10681082

10691083
case FD_SNAPSHOT_MSG_CTRL_NEXT:

0 commit comments

Comments
 (0)