From 1993e59214654c464162bede780b818ef754105a Mon Sep 17 00:00:00 2001 From: cali-jumptrading Date: Wed, 6 Aug 2025 20:50:21 +0000 Subject: [PATCH] snapshots: lthash tile --- book/api/metrics-generated.md | 12 + src/app/firedancer-dev/commands/backtest.c | 94 +++++- .../firedancer-dev/commands/snapshot_load.c | 83 ++++- src/app/firedancer-dev/main.c | 2 + src/app/firedancer/config/default.toml | 21 ++ src/app/firedancer/main.c | 9 + src/app/firedancer/topology.c | 45 ++- src/app/shared/fd_config.c | 1 + src/app/shared/fd_config.h | 5 + src/app/shared/fd_config_parse.c | 4 +- src/app/shared/fd_tile_unit_test.c | 2 + src/disco/metrics/generate/types.py | 9 +- src/disco/metrics/generated/fd_metrics_all.c | 3 + src/disco/metrics/generated/fd_metrics_all.h | 3 +- .../metrics/generated/fd_metrics_snaplt.c | 8 + .../metrics/generated/fd_metrics_snaplt.h | 25 ++ src/disco/metrics/metrics.xml | 6 + src/disco/topo/fd_topob.c | 1 + src/discof/restore/Local.mk | 1 + src/discof/restore/fd_snapin_tile.c | 277 ++++++++++++---- src/discof/restore/fd_snaplt_tile.c | 306 ++++++++++++++++++ src/discof/restore/fd_snaprd_tile.c | 71 +++- src/discof/restore/utils/fd_ssctrl.h | 55 ++++ src/flamenco/runtime/fd_hashes.c | 29 +- src/flamenco/runtime/fd_hashes.h | 23 ++ src/flamenco/runtime/tests/run_backtest_ci.sh | 2 +- .../runtime/tests/run_backtest_tests_all.sh | 2 +- .../runtime/tests/run_ledger_backtest.sh | 11 +- 28 files changed, 995 insertions(+), 115 deletions(-) create mode 100644 src/disco/metrics/generated/fd_metrics_snaplt.c create mode 100644 src/disco/metrics/generated/fd_metrics_snaplt.h create mode 100644 src/discof/restore/fd_snaplt_tile.c diff --git a/book/api/metrics-generated.md b/book/api/metrics-generated.md index 768aa1dba8c..0d670023095 100644 --- a/book/api/metrics-generated.md +++ b/book/api/metrics-generated.md @@ -887,6 +887,18 @@ +## Snaplt Tile + +
+ +| Metric | Type | Description | +|--------|------|-------------| +| snaplt_​state | gauge | State of the tile. 0=hashing, 1=done, 2=shutdown | +| snaplt_​full_​accounts_​hashed | gauge | Number of accounts hashed for the full snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted | +| snaplt_​incremental_​accounts_​hashed | gauge | Number of accounts hashed for the incremental snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted | + +
+ ## Ipecho Tile
diff --git a/src/app/firedancer-dev/commands/backtest.c b/src/app/firedancer-dev/commands/backtest.c index e5a8561635e..68a0d7f4f50 100644 --- a/src/app/firedancer-dev/commands/backtest.c +++ b/src/app/firedancer-dev/commands/backtest.c @@ -23,6 +23,7 @@ #include "../../../disco/metrics/fd_metrics.h" #include "../../../util/pod/fd_pod_format.h" #include "../../../discof/restore/utils/fd_ssmsg.h" +#include "../../../discof/restore/utils/fd_ssctrl.h" #include "../../../discof/tower/fd_tower_tile.h" #include "../../../discof/reasm/fd_reasm.h" #include "../../../discof/replay/fd_exec.h" /* FD_RUNTIME_PUBLIC_ACCOUNT_UPDATE_MSG_MTU */ @@ -38,6 +39,8 @@ static void backtest_topo( config_t * config ) { ulong exec_tile_cnt = config->firedancer.layout.exec_tile_count; ulong writer_tile_cnt = config->firedancer.layout.writer_tile_count; + ulong snaplt_tile_cnt = config->firedancer.layout.snaplt_tile_count; + int snaplt_disabled = config->development.snapshots.disable_lthash_verification; int disable_snap_loader = !config->gossip.entrypoints_cnt; int solcap_enabled = strlen( config->capture.solcap_capture )>0; @@ -100,6 +103,14 @@ backtest_topo( config_t * config ) { snaprd_tile->allow_shutdown = 1; snapdc_tile->allow_shutdown = 1; snapin_tile->allow_shutdown = 1; + + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_wksp( topo, "snaplt" ); + for( ulong i=0UL; iallow_shutdown = 1; + } + } } else { fd_topob_wksp( topo, "genesi" ); fd_topob_tile( topo, "genesi", "genesi", "metric_in", cpu_idx++, 0, 0 )->allow_shutdown = 1; @@ -128,7 +139,12 @@ backtest_topo( config_t * config ) { fd_topob_wksp( topo, "snapdc_rd" ); fd_topob_wksp( topo, "snapin_rd" ); fd_topob_wksp( topo, "snap_out" ); - fd_topob_wksp( topo, "replay_manif" ); + + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_wksp( topo, "snapin_lt" ); + fd_topob_wksp( topo, "snaplt_out" ); + fd_topob_wksp( topo, "snaplt_rd" ); + } /* TODO: Should be depth of 1 or 2, not 4, but it causes backpressure from the replay tile parsing the manifest, remove when this is fixed. */ @@ -139,6 +155,12 @@ backtest_topo( config_t * config ) { fd_topob_link( topo, "snapdc_rd", "snapdc_rd", 128UL, 0UL, 1UL ); fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL ); + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_link( topo, "snapin_lt", "snapin_lt", 128UL, sizeof(fd_snapshot_existing_account_t), 1UL ); + FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_out", "snaplt_out", 128UL, 2048UL, 1UL ); + FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_rd", "snaplt_rd", 128UL, 0UL, 1UL ); + } + fd_topob_tile_out( topo, "snaprd", 0UL, "snap_zstd", 0UL ); fd_topob_tile_in ( topo, "snapdc", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); fd_topob_tile_out( topo, "snapdc", 0UL, "snap_stream", 0UL ); @@ -150,6 +172,15 @@ backtest_topo( config_t * config ) { fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_rd", 0UL ); fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL ); + + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_tile_out( topo, "snapin", 0UL, "snapin_lt", 0UL ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snaplt_out", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaplt", i, "metric_in", "snapin_lt", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaprd", 0UL, "metric_in", "snaplt_rd", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_out", i ); + FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_rd", i ); + } } else { fd_topob_wksp( topo, "genesi_out" ); fd_topob_link( topo, "genesi_out", "genesi_out", 2UL, 10UL*1024UL*1024UL+32UL+sizeof(fd_lthash_value_t), 1UL ); @@ -302,14 +333,7 @@ backtest_topo( config_t * config ) { } if( FD_LIKELY( !disable_snap_loader ) ) { - /* Replay decoded manifest dcache topo obj */ - fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" ); - fd_pod_insertf_ulong( topo->props, 2UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id ); - fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id ); - fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE ); - fd_topob_tile_uses( topo, snapin_tile, replay_manifest_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE ); - fd_topob_tile_uses( topo, replay_tile, replay_manifest_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY ); } for( ulong i=0UL; itile_cnt; i++ ) { @@ -394,6 +418,16 @@ backtest_cmd_fn( args_t * args FD_PARAM_UNUSED, ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics ); ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics ); + ulong volatile * snaplt_metrics[ FD_MAX_SNAPLT_TILES ]; + ulong snaplt_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplt" ); + + for( ulong i=0UL; itiles[ snaplt_tile_idx ]; + snaplt_metrics[ i ] = fd_metrics_tile( snaplt_tile->metrics ); + } + ulong total_off_old = 0UL; ulong snaprd_backp_old = 0UL; ulong snaprd_wait_old = 0UL; @@ -401,16 +435,34 @@ backtest_cmd_fn( args_t * args FD_PARAM_UNUSED, ulong snapdc_wait_old = 0UL; ulong snapin_backp_old = 0UL; ulong snapin_wait_old = 0UL; + ulong snaplt_backp_old = 0UL; + ulong snaplt_wait_old = 0UL; ulong acc_cnt_old = 0UL; sleep( 1 ); - puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" ); + puts( "" ); + puts( "Columns:" ); + puts( "- bw: Uncompressed bandwidth" ); + puts( "- backp: Backpressured by downstream tile" ); + puts( "- stall: Waiting on upstream tile" ); + puts( "- acc: Number of accounts" ); + puts( "" ); + puts( "-------------backp=(snaprd,snapdc,snapin,snaplt) busy=(snaprd,snapdc,snapin,snaplt)---------------" ); long next = start+1000L*1000L*1000L; for(;;) { ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); + ulong snaplt_status = ULONG_MAX; + + ulong snaplt_status_sum = 0UL; + for( ulong i=0UL; i0UL ? 1UL : 2UL; - if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break; + if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL && snaplt_status==2UL ) ) break; long cur = fd_log_wallclock(); if( FD_UNLIKELY( cur @@ -25,7 +26,9 @@ snapshot_load_topo( config_t * config, args_t const * args ) { fd_topo_t * topo = &config->topo; fd_topob_new( &config->topo, config->name ); - topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size ); + topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size ); + ulong snaplt_tile_cnt = config->firedancer.layout.snaplt_tile_count; + int snaplt_disabled = config->development.snapshots.disable_lthash_verification; fd_topob_wksp( topo, "funk" ); fd_topo_obj_t * funk_obj = setup_topo_funk( topo, "funk", @@ -37,7 +40,7 @@ snapshot_load_topo( config_t * config, static ushort tile_to_cpu[ FD_TILE_MAX ] = {0}; if( args->snapshot_load.tile_cpus[0] ) { ulong cpu_cnt = fd_tile_private_cpus_parse( args->snapshot_load.tile_cpus, tile_to_cpu ); - if( FD_UNLIKELY( cpu_cnt<4UL ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least 4", cpu_cnt )); + if( FD_UNLIKELY( cpu_cnt<4UL + (snaplt_disabled?0:snaplt_tile_cnt) ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least %lu", cpu_cnt, 4UL + (snaplt_disabled?snaplt_tile_cnt:0) )); } /* metrics tile *****************************************************/ @@ -77,6 +80,25 @@ snapshot_load_topo( config_t * config, fd_topo_tile_t * snapin_tile = fd_topob_tile( topo, "snapin", "snapin", "snapin", tile_to_cpu[3], 0, 0 ); snapin_tile->allow_shutdown = 1; + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_wksp( topo, "snaplt" ); + fd_topob_wksp( topo, "snapin_lt" ); + fd_topob_wksp( topo, "snaplt_out" ); + fd_topob_wksp( topo, "snaplt_rd" ); + + #define FOR(cnt) for( ulong i=0UL; iallow_shutdown = 1; + } + } + + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_link( topo, "snapin_lt", "snapin_lt", 128UL, sizeof(fd_snapshot_existing_account_t), 1UL ); + FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_out", "snaplt_out", 128UL, 2048UL, 1UL ); + FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_rd", "snaplt_rd", 128UL, 0UL, 1UL ); + } + /* uncompressed stream -> snapin tile */ fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snap_stream", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); @@ -86,7 +108,7 @@ snapshot_load_topo( config_t * config, /* snapshot manifest out link */ fd_topob_wksp( topo, "snap_out" ); - fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 2UL, sizeof(fd_snapshot_manifest_t), 1UL ); + fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 4UL, sizeof(fd_snapshot_manifest_t), 1UL ); snap_out_link->permit_no_consumers = 1; fd_topob_tile_out( topo, "snapin", 0UL, "snap_out", 0UL ); @@ -99,6 +121,15 @@ snapshot_load_topo( config_t * config, fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL ); + if( FD_LIKELY( !snaplt_disabled ) ) { + fd_topob_tile_out( topo, "snapin", 0UL, "snapin_lt", 0UL ); + FOR(snaplt_tile_cnt) fd_topob_tile_in( topo, "snapin", 0UL, "metric_in", "snaplt_out", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_in( topo, "snaplt", i, "metric_in", "snapin_lt", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaprd", 0UL, "metric_in", "snaplt_rd", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_out", i ); + FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_rd", i ); + } + for( ulong i=0UL; itile_cnt; i++ ) { fd_topo_tile_t * tile = &topo->tiles[ i ]; fd_topo_configure_tile( tile, config ); @@ -159,6 +190,16 @@ snapshot_load_cmd_fn( args_t * args, fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ]; fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ]; + ulong volatile * snaplt_metrics[ FD_MAX_SNAPLT_TILES ]; + ulong snaplt_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplt" ); + + for( ulong i=0UL; itiles[ snaplt_tile_idx ]; + snaplt_metrics[ i ] = fd_metrics_tile( snaplt_tile->metrics ); + } + ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics ); ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics ); ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics ); @@ -170,6 +211,8 @@ snapshot_load_cmd_fn( args_t * args, ulong snapdc_wait_old = 0UL; ulong snapin_backp_old = 0UL; ulong snapin_wait_old = 0UL; + ulong snaplt_backp_old = 0UL; + ulong snaplt_wait_old = 0UL; ulong acc_cnt_old = 0UL; sleep( 1 ); puts( "" ); @@ -179,14 +222,23 @@ snapshot_load_cmd_fn( args_t * args, puts( "- stall: Waiting on upstream tile" ); puts( "- acc: Number of accounts" ); puts( "" ); - puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" ); + puts( "-------------backp=(snaprd,snapdc,snapin,snaplt) busy=(snaprd,snapdc,snapin,snaplt)---------------" ); long next = start+1000L*1000L*1000L; for(;;) { ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] ); + ulong snaplt_status = ULONG_MAX; + + ulong snaplt_status_sum = 0UL; + for( ulong i=0UL; i0UL ? 1UL : 2UL; - if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break; + if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL && snaplt_status==2UL ) ) break; long cur = fd_log_wallclock(); if( FD_UNLIKELY( curfiredancer.layout.exec_tile_count; ulong writer_tile_cnt = config->firedancer.layout.writer_tile_count; ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count; + ulong snaplt_tile_cnt = config->firedancer.layout.snaplt_tile_count; int snapshots_enabled = !!config->gossip.entrypoints_cnt; - int solcap_enabled = strcmp( "", config->capture.solcap_capture ); + int snaplt_disabled = config->development.snapshots.disable_lthash_verification; + int solcap_enabled = strcmp( "", config->capture.solcap_capture ); fd_topo_t * topo = fd_topob_new( &config->topo, config->name ); @@ -316,6 +319,13 @@ fd_topo_initialize( config_t * config ) { fd_topob_wksp( topo, "snap_out" ); /* TODO: Rename ... */ } + if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) { + fd_topob_wksp( topo, "snaplt" ); + fd_topob_wksp( topo, "snapin_lt" ); + fd_topob_wksp( topo, "snaplt_out" ); + fd_topob_wksp( topo, "snaplt_rd" ); + } + #define FOR(cnt) for( ulong i=0UL; inet.ingress_buffer_size, FD_NET_MTU, 1UL ); if( FD_LIKELY( snapshots_enabled ) ) { - /**/ fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384UL, 1UL ); /* TODO: Rename */ - /**/ fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL ); /* TODO: Rename */ - /**/ fd_topob_link( topo, "snap_out", "snap_out", 2UL, sizeof(fd_snapshot_manifest_t), 1UL ); /* TODO: Rename */ - /**/ fd_topob_link( topo, "snapdc_rd", "snapdc_rd", 128UL, 0UL, 1UL ); - /**/ fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL ); + /**/ fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384UL, 1UL ); /* TODO: Rename */ + /**/ fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL ); /* TODO: Rename */ + /**/ fd_topob_link( topo, "snap_out", "snap_out", 4UL, sizeof(fd_snapshot_manifest_t), 1UL ); /* TODO: Rename */ + /**/ fd_topob_link( topo, "snapdc_rd", "snapdc_rd", 128UL, 0UL, 1UL ); + /**/ fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL ); + } + + if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) { + fd_topob_link( topo, "snapin_lt", "snapin_lt", 128UL, sizeof(fd_snapshot_existing_account_t), 1UL ); + FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_out", "snaplt_out", 128UL, 2048UL, 1UL ); + FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_rd", "snaplt_rd", 128UL, 0UL, 1UL ); } /**/ fd_topob_link( topo, "genesi_out", "genesi_out", 2UL, 10UL*1024UL*1024UL+32UL+sizeof(fd_lthash_value_t), 1UL ); @@ -434,6 +450,10 @@ fd_topo_initialize( config_t * config ) { fd_topob_tile( topo, "snapin", "snapin", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 )->allow_shutdown = 1; } + if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) { + for( ulong i=0UL; itile_cnt ], 0, 0 )->allow_shutdown = 1; + } + /**/ fd_topob_tile( topo, "genesi", "genesi", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 )->allow_shutdown = 1; /**/ fd_topob_tile( topo, "ipecho", "ipecho", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 ); FOR(gossvf_tile_cnt) fd_topob_tile( topo, "gossvf", "gossvf", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 ); @@ -506,6 +526,15 @@ fd_topo_initialize( config_t * config ) { fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "snap_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); } + if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) { + fd_topob_tile_out( topo, "snapin", 0UL, "snapin_lt", 0UL ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snaplt_out", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaplt", i, "metric_in", "snapin_lt", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaprd", 0UL, "metric_in", "snaplt_rd", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); + FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_out", i ); + FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_rd", i ); + } + /**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "genesi_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "gossip_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); /**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "tower_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED ); @@ -959,7 +988,9 @@ fd_topo_configure_tile( fd_topo_tile_t * tile, tile->snapin.funk_obj_id = fd_pod_query_ulong( config->topo.props, "funk", ULONG_MAX ); - } else if( FD_UNLIKELY( !strcmp( tile->name, "repair" ) ) ) { + } else if( FD_UNLIKELY( !strcmp( tile->name, "snaplt" ) ) ) { + + }else if( FD_UNLIKELY( !strcmp( tile->name, "repair" ) ) ) { tile->repair.max_pending_shred_sets = config->tiles.shred.max_pending_shred_sets; tile->repair.repair_intake_listen_port = config->tiles.repair.repair_intake_listen_port; tile->repair.repair_serve_listen_port = config->tiles.repair.repair_serve_listen_port; diff --git a/src/app/shared/fd_config.c b/src/app/shared/fd_config.c index e4f5ebaa347..c78f4f35c89 100644 --- a/src/app/shared/fd_config.c +++ b/src/app/shared/fd_config.c @@ -457,6 +457,7 @@ fd_config_fill( fd_config_t * config, static void fd_config_validatef( fd_configf_t const * config ) { CFG_HAS_NON_ZERO( layout.sign_tile_count ); + CFG_HAS_NON_ZERO( layout.snaplt_tile_count ); if( FD_UNLIKELY( config->layout.sign_tile_count < 2 ) ) { FD_LOG_ERR(( "layout.sign_tile_count must be >= 2" )); } diff --git a/src/app/shared/fd_config.h b/src/app/shared/fd_config.h index dbba7e93ff7..ee50c219221 100644 --- a/src/app/shared/fd_config.h +++ b/src/app/shared/fd_config.h @@ -117,6 +117,7 @@ struct fd_configf { uint writer_tile_count; uint sign_tile_count; uint gossvf_tile_count; + uint snaplt_tile_count; } layout; struct { @@ -349,6 +350,10 @@ struct fd_config { char affinity[ AFFINITY_SZ ]; } udpecho; + struct { + int disable_lthash_verification; + } snapshots; + struct { char affinity[ AFFINITY_SZ ]; } snapshot_load; diff --git a/src/app/shared/fd_config_parse.c b/src/app/shared/fd_config_parse.c index 1492ffb2721..3b26daf56dc 100644 --- a/src/app/shared/fd_config_parse.c +++ b/src/app/shared/fd_config_parse.c @@ -82,6 +82,7 @@ fd_config_extract_podf( uchar * pod, CFG_POP ( uint, layout.writer_tile_count ); CFG_POP ( uint, layout.sign_tile_count ); CFG_POP ( uint, layout.gossvf_tile_count ); + CFG_POP ( uint, layout.snaplt_tile_count ); CFG_POP ( ulong, blockstore.shred_max ); CFG_POP ( ulong, blockstore.block_max ); @@ -322,7 +323,8 @@ fd_config_extract_pod( uchar * pod, CFG_POP ( cstr, development.pktgen.affinity ); CFG_POP ( cstr, development.pktgen.fake_dst_ip ); - CFG_POP ( cstr, development.udpecho.affinity ); + CFG_POP ( cstr, development.udpecho.affinity ); + CFG_POP ( bool, development.snapshots.disable_lthash_verification ); CFG_POP ( bool, development.gui.websocket_compression ); diff --git a/src/app/shared/fd_tile_unit_test.c b/src/app/shared/fd_tile_unit_test.c index 58e68245664..4933e065556 100644 --- a/src/app/shared/fd_tile_unit_test.c +++ b/src/app/shared/fd_tile_unit_test.c @@ -81,6 +81,7 @@ fd_topo_run_tile_t dummy_tile_rpcsrv = { .name = "rpcsrv" }; fd_topo_run_tile_t dummy_tile_snaprd = { .name = "snaprd" }; fd_topo_run_tile_t dummy_tile_snapdc = { .name = "snapdc" }; fd_topo_run_tile_t dummy_tile_snapin = { .name = "snapin" }; +fd_topo_run_tile_t dummy_tile_snaplt = { .name = "snaplt" }; fd_topo_run_tile_t dummy_tile_arch_f = { .name = "arch_f" }; fd_topo_run_tile_t dummy_tile_arch_w = { .name = "arch_w" }; fd_topo_run_tile_t dummy_tile_scap = { .name = "scap" }; @@ -123,6 +124,7 @@ fd_topo_run_tile_t * TILES[] = { &dummy_tile_snaprd, &dummy_tile_snapdc, &dummy_tile_snapin, + &dummy_tile_snaplt, &dummy_tile_arch_f, &dummy_tile_arch_w, &dummy_tile_scap, diff --git a/src/disco/metrics/generate/types.py b/src/disco/metrics/generate/types.py index 4656d1f834b..e8313fafb62 100644 --- a/src/disco/metrics/generate/types.py +++ b/src/disco/metrics/generate/types.py @@ -30,10 +30,11 @@ class Tile(Enum): SNAPRD = 24 SNAPDC = 25 SNAPIN = 26 - IPECHO = 27 - GOSSVF = 28 - BANKF = 29 - RESOLF = 30 + SNAPLT = 27 + IPECHO = 28 + GOSSVF = 29 + BANKF = 30 + RESOLF = 31 class MetricType(Enum): COUNTER = 0 diff --git a/src/disco/metrics/generated/fd_metrics_all.c b/src/disco/metrics/generated/fd_metrics_all.c index a19db0267a8..cf6d16d2e24 100644 --- a/src/disco/metrics/generated/fd_metrics_all.c +++ b/src/disco/metrics/generated/fd_metrics_all.c @@ -59,6 +59,7 @@ const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT] = { "snaprd", "snapdc", "snapin", + "snaplt", "ipecho", "gossvf", "bankf", @@ -89,6 +90,7 @@ const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = { FD_METRICS_SNAPRD_TOTAL, FD_METRICS_SNAPDC_TOTAL, FD_METRICS_SNAPIN_TOTAL, + FD_METRICS_SNAPLT_TOTAL, FD_METRICS_IPECHO_TOTAL, FD_METRICS_GOSSVF_TOTAL, FD_METRICS_BANKF_TOTAL, @@ -118,6 +120,7 @@ const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT] FD_METRICS_SNAPRD, FD_METRICS_SNAPDC, FD_METRICS_SNAPIN, + FD_METRICS_SNAPLT, FD_METRICS_IPECHO, FD_METRICS_GOSSVF, FD_METRICS_BANKF, diff --git a/src/disco/metrics/generated/fd_metrics_all.h b/src/disco/metrics/generated/fd_metrics_all.h index cb843373cee..54ff87d1101 100644 --- a/src/disco/metrics/generated/fd_metrics_all.h +++ b/src/disco/metrics/generated/fd_metrics_all.h @@ -27,6 +27,7 @@ #include "fd_metrics_snaprd.h" #include "fd_metrics_snapdc.h" #include "fd_metrics_snapin.h" +#include "fd_metrics_snaplt.h" #include "fd_metrics_metric.h" #include "fd_metrics_ipecho.h" /* Start of LINK OUT metrics */ @@ -165,7 +166,7 @@ extern const fd_metrics_meta_t FD_METRICS_ALL_LINK_OUT[FD_METRICS_ALL_LINK_OUT_T #define FD_METRICS_TOTAL_SZ (8UL*253UL) -#define FD_METRICS_TILE_KIND_CNT 27 +#define FD_METRICS_TILE_KIND_CNT 28 extern const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT]; extern const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT]; extern const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT]; diff --git a/src/disco/metrics/generated/fd_metrics_snaplt.c b/src/disco/metrics/generated/fd_metrics_snaplt.c new file mode 100644 index 00000000000..acbd267fdac --- /dev/null +++ b/src/disco/metrics/generated/fd_metrics_snaplt.c @@ -0,0 +1,8 @@ +/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */ +#include "fd_metrics_snaplt.h" + +const fd_metrics_meta_t FD_METRICS_SNAPLT[FD_METRICS_SNAPLT_TOTAL] = { + DECLARE_METRIC( SNAPLT_STATE, GAUGE ), + DECLARE_METRIC( SNAPLT_FULL_ACCOUNTS_HASHED, GAUGE ), + DECLARE_METRIC( SNAPLT_INCREMENTAL_ACCOUNTS_HASHED, GAUGE ), +}; diff --git a/src/disco/metrics/generated/fd_metrics_snaplt.h b/src/disco/metrics/generated/fd_metrics_snaplt.h new file mode 100644 index 00000000000..e9145be08fe --- /dev/null +++ b/src/disco/metrics/generated/fd_metrics_snaplt.h @@ -0,0 +1,25 @@ +/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */ + +#include "../fd_metrics_base.h" +#include "fd_metrics_enums.h" + +#define FD_METRICS_GAUGE_SNAPLT_STATE_OFF (16UL) +#define FD_METRICS_GAUGE_SNAPLT_STATE_NAME "snaplt_state" +#define FD_METRICS_GAUGE_SNAPLT_STATE_TYPE (FD_METRICS_TYPE_GAUGE) +#define FD_METRICS_GAUGE_SNAPLT_STATE_DESC "State of the tile. 0=hashing, 1=done, 2=shutdown" +#define FD_METRICS_GAUGE_SNAPLT_STATE_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_OFF (17UL) +#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_NAME "snaplt_full_accounts_hashed" +#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_TYPE (FD_METRICS_TYPE_GAUGE) +#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_DESC "Number of accounts hashed for the full snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted" +#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_OFF (18UL) +#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_NAME "snaplt_incremental_accounts_hashed" +#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_TYPE (FD_METRICS_TYPE_GAUGE) +#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_DESC "Number of accounts hashed for the incremental snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted" +#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_CVT (FD_METRICS_CONVERTER_NONE) + +#define FD_METRICS_SNAPLT_TOTAL (3UL) +extern const fd_metrics_meta_t FD_METRICS_SNAPLT[FD_METRICS_SNAPLT_TOTAL]; diff --git a/src/disco/metrics/metrics.xml b/src/disco/metrics/metrics.xml index 38655d25a56..677da8b64d4 100644 --- a/src/disco/metrics/metrics.xml +++ b/src/disco/metrics/metrics.xml @@ -1004,6 +1004,12 @@ metric introduced. + + + + + + diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c index 12786f5abbb..3597c78eb1e 100644 --- a/src/disco/topo/fd_topob.c +++ b/src/disco/topo/fd_topob.c @@ -383,6 +383,7 @@ fd_topob_auto_layout( fd_topo_t * topo, "snaprd", /* FIREDANCER only */ "snapdc", /* FIREDANCER only */ "snapin", /* FIREDANCER only */ + "snaplt", /* FIREDANCER only */ "arch_f", /* FIREDANCER only */ "arch_w", /* FIREDANCER only */ }; diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk index a4c1d484542..0811e38e988 100644 --- a/src/discof/restore/Local.mk +++ b/src/discof/restore/Local.mk @@ -6,6 +6,7 @@ endif ifdef FD_HAS_INT128 $(call add-objs,fd_snapin_tile,fd_discof) endif +$(call add-objs,fd_snaplt_tile,fd_discof) endif ifdef FD_HAS_INT128 $(call add-objs,utils/fd_snapshot_parser,fd_discof) diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c index befe8fc2e91..76168c11644 100644 --- a/src/discof/restore/fd_snapin_tile.c +++ b/src/discof/restore/fd_snapin_tile.c @@ -7,6 +7,8 @@ #include "../../flamenco/runtime/fd_acc_mgr.h" #include "../../flamenco/types/fd_types.h" #include "../../funk/fd_funk.h" +#include "../../ballet/lthash/fd_lthash.h" +#include "../../flamenco/runtime/fd_hashes.h" #define NAME "snapin" @@ -20,20 +22,32 @@ #define FD_SNAPIN_STATE_MALFORMED (1) /* The snapshot is malformed, we are waiting for a reset notification */ #define FD_SNAPIN_STATE_SHUTDOWN (2) /* The tile is done, been told to shut down, and has likely already exited */ +#define FD_SNAPIN_HSH_IDX (2UL) + struct fd_snapin_tile { - int full; - int state; + int full; + int state; + int pending_ack; ulong seed; long boot_timestamp; fd_funk_t funk[1]; + fd_funk_txn_t * root_funk_txn; fd_funk_txn_t * funk_txn; uchar * acc_data; - fd_stem_context_t * stem; + fd_stem_context_t * stem; fd_snapshot_parser_t * ssparse; + struct { + int enabled; + ulong received_lthashes; + ulong num_hash_tiles; + fd_lthash_value_t expected_lthash; + fd_lthash_value_t calculated_lthash; + } hash_info; + struct { ulong full_bytes_read; ulong incremental_bytes_read; @@ -45,6 +59,7 @@ struct fd_snapin_tile { ulong chunk0; ulong wmark; ulong mtu; + ulong chunk_offset; } in; struct { @@ -54,6 +69,21 @@ struct fd_snapin_tile { ulong chunk; ulong mtu; } manifest_out; + + struct { + fd_wksp_t * wksp; + ulong chunk0; + ulong wmark; + ulong chunk; + ulong mtu; + } hash_out; + + struct { + fd_wksp_t * wksp; + ulong chunk0; + ulong wmark; + ulong mtu; + } hash_in[ FD_MAX_SNAPLT_TILES ]; }; typedef struct fd_snapin_tile fd_snapin_tile_t; @@ -89,14 +119,34 @@ metrics_write( fd_snapin_tile_t * ctx ) { FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state ); } +static void +transition_malformed( fd_snapin_tile_t * ctx, + fd_stem_context_t * stem ) { + ctx->state = FD_SNAPIN_STATE_MALFORMED; + fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL ); +} + static void manifest_cb( void * _ctx ) { fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx; + ulong sz = sizeof(fd_snapshot_manifest_t); + + if( FD_LIKELY( ctx->hash_info.enabled ) ) { + fd_snapshot_manifest_t const * manifest = fd_chunk_to_laddr_const( ctx->manifest_out.wksp, ctx->manifest_out.chunk ); + if( FD_LIKELY( manifest->has_accounts_lthash ) ) { + fd_memcpy( &ctx->hash_info.expected_lthash, manifest->accounts_lthash, sizeof(fd_lthash_value_t) ); + } else { + FD_LOG_WARNING(( "snapshot manifest doesn't have an accounts lthash" )); + transition_malformed( ctx, ctx->stem ); + } + } + + FD_TEST( sz<=ctx->manifest_out.mtu ); ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL ); - fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL ); - ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark ); + fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sz, 0UL, 0UL, 0UL ); + ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sz, ctx->manifest_out.chunk0, ctx->manifest_out.wmark ); } static void @@ -106,10 +156,41 @@ account_cb( void * _ctx, fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t*)hdr->meta.pubkey ); fd_funk_rec_query_t query[1]; - fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->funk_txn, &id, query ); + fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->funk_txn, &id, query ); + fd_funk_rec_t const * existing_rec = rec; int should_publish = 0; fd_funk_rec_prepare_t prepare[1]; + if( FD_LIKELY( !existing_rec && !ctx->full ) ) { + /* An existing record may exist in an ancestor transaction when + loading the incremental snapshot. */ + existing_rec = fd_funk_rec_clone( ctx->funk, ctx->funk_txn, &id, prepare, NULL ); + } + if( FD_LIKELY( existing_rec ) ) { + /* If a record exists either in the current txn, its hash needs to + be subtracted from the running hash in the hashing tiles. */ + fd_account_meta_t * meta = fd_funk_val( existing_rec, ctx->funk->wksp ); + if( FD_UNLIKELY( meta ) ) { + if( FD_LIKELY( meta->slot>ctx->ssparse->accv_slot ) ) { + /* Existing record has a higher slot than the current account + being inserted. Ignore the current account and keep the + existing record as is. */ + ctx->acc_data = NULL; + return; + } + + if( FD_LIKELY( ctx->hash_info.enabled ) ) { + /* Account is a duplicate, need to subtract its hash before + updating the record in funk. */ + fd_snapshot_existing_account_t * existing_account = fd_chunk_to_laddr( ctx->hash_out.wksp, ctx->hash_out.chunk ); + fd_snapshot_account_init( &existing_account->hdr, hdr->meta.pubkey, meta->owner, meta->lamports, meta->executable, meta->dlen ); + fd_memcpy( existing_account->data, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen ); + fd_stem_publish( ctx->stem, FD_SNAPIN_HSH_IDX, FD_SNAPSHOT_HASH_MSG_SUB, ctx->hash_out.chunk, sizeof(fd_snapshot_existing_account_t), 0UL, 0UL, 0UL ); + ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sizeof(fd_snapshot_existing_account_t), ctx->hash_out.chunk0, ctx->hash_out.wmark ); + } + } + } + if( FD_LIKELY( !rec ) ) { should_publish = 1; rec = fd_funk_rec_prepare( ctx->funk, ctx->funk_txn, &id, prepare, NULL ); @@ -117,24 +198,13 @@ account_cb( void * _ctx, } fd_account_meta_t * meta = fd_funk_val( rec, ctx->funk->wksp ); - if( FD_UNLIKELY( meta ) ) { - if( FD_LIKELY( meta->slot>ctx->ssparse->accv_slot ) ) { - ctx->acc_data = NULL; - return; - } - - /* TODO: Reaching here means the existing value is a duplicate - account. We need to hash the existing account and subtract that - hash from the running lthash. */ - } - if( FD_LIKELY( rec->val_szmeta.data_len ) ) { meta = fd_funk_val_truncate( (fd_funk_rec_t*)rec, ctx->funk->alloc, ctx->funk->wksp, 0UL, sizeof(fd_account_meta_t)+hdr->meta.data_len, NULL ); FD_TEST( meta ); } - meta->dlen = (uint)hdr->meta.data_len; - meta->slot = ctx->ssparse->accv_slot; + meta->dlen = (uint)hdr->meta.data_len; + meta->slot = ctx->ssparse->accv_slot; memcpy( meta->owner, hdr->info.owner, sizeof(fd_pubkey_t) ); meta->lamports = hdr->info.lamports; meta->executable = hdr->info.executable; @@ -143,6 +213,14 @@ account_cb( void * _ctx, ctx->metrics.accounts_inserted++; if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( ctx->funk, prepare ); + + if( FD_LIKELY( ctx->hash_info.enabled ) ) { + /* send account hdr to snaplt tile */ + fd_snapshot_account_t * account = fd_chunk_to_laddr( ctx->hash_out.wksp, ctx->hash_out.chunk ); + fd_snapshot_account_init( account, hdr->meta.pubkey, hdr->info.owner, hdr->info.lamports, hdr->info.executable, hdr->meta.data_len ); + fd_stem_publish( ctx->stem, FD_SNAPIN_HSH_IDX, FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR, ctx->hash_out.chunk, sizeof(fd_snapshot_account_t), 0UL, 0UL, 0UL ); + ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sizeof(fd_snapshot_account_t), ctx->hash_out.chunk0, ctx->hash_out.wmark ); + } } static void @@ -154,21 +232,23 @@ account_data_cb( void * _ctx, fd_memcpy( ctx->acc_data, buf, data_sz ); ctx->acc_data += data_sz; -} -static void -transition_malformed( fd_snapin_tile_t * ctx, - fd_stem_context_t * stem ) { - ctx->state = FD_SNAPIN_STATE_MALFORMED; - fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL ); + if( FD_LIKELY( ctx->hash_info.enabled ) ) { + FD_TEST( data_sz<=ctx->hash_out.mtu ); + /* send acc data to snaplt tile */ + uchar * snaplt_acc_data = fd_chunk_to_laddr( ctx->hash_out.wksp, ctx->hash_out.chunk ); + fd_memcpy( snaplt_acc_data, buf, data_sz ); + fd_stem_publish( ctx->stem, FD_SNAPIN_HSH_IDX, FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA, ctx->hash_out.chunk, data_sz, 0UL, 0UL, 0UL ); + ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, data_sz, ctx->hash_out.chunk0, ctx->hash_out.wmark ); + } } -static void +static int handle_data_frag( fd_snapin_tile_t * ctx, ulong chunk, ulong sz, fd_stem_context_t * stem ) { - if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return; + if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return 0; FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE ); FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu ); @@ -176,49 +256,61 @@ handle_data_frag( fd_snapin_tile_t * ctx, if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) { FD_LOG_WARNING(( "received data fragment while in done state" )); transition_malformed( ctx, stem ); - return; + return 0; } uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk ); - uchar const * const chunk_end = chunk_start + sz; - uchar const * cur = chunk_start; - - for(;;) { - if( FD_UNLIKELY( cur>=chunk_end ) ) { - break; - } - - cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) ); - if( FD_UNLIKELY( ctx->ssparse->flags ) ) { - if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) { - transition_malformed( ctx, stem ); - return; - } + uchar const * const chunk_end = chunk_start + sz; + uchar const * cur = chunk_start + ctx->in.chunk_offset; + + cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) ); + if( FD_UNLIKELY( ctx->ssparse->flags ) ) { + if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) { + transition_malformed( ctx, stem ); + return 0; } } + ctx->in.chunk_offset = (ulong)(cur - chunk_start); + if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_DONE ) ) ctx->state = FD_SNAPIN_STATE_DONE; if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += sz; else ctx->metrics.incremental_bytes_read += sz; + + if( FD_UNLIKELY( ctx->in.chunk_offset==sz ) ) { + ctx->in.chunk_offset = 0UL; + return 0; + } + + return 1; } static void handle_control_frag( fd_snapin_tile_t * ctx, fd_stem_context_t * stem, - ulong sig ) { + ulong sig, + ulong in_idx, + ulong chunk ) { + /* 1. Pass the control message downstream to the next consumer. */ + if( FD_LIKELY( ctx->hash_info.enabled && sig!=FD_SNAPSHOT_HASH_MSG_RESULT ) ) + fd_stem_publish( stem, FD_SNAPIN_HSH_IDX, sig, ctx->hash_out.chunk, 0UL, 0UL, 0UL, 0UL ); + switch( sig ) { case FD_SNAPSHOT_MSG_CTRL_RESET_FULL: ctx->full = 1; fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu ); - fd_funk_txn_cancel_root( ctx->funk ); + fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 ); + fd_lthash_zero( &ctx->hash_info.expected_lthash ); + fd_lthash_zero( &ctx->hash_info.calculated_lthash ); ctx->state = FD_SNAPIN_STATE_LOADING; break; case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL: ctx->full = 0; fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu ); - if( FD_UNLIKELY( !ctx->funk_txn ) ) fd_funk_txn_cancel_root( ctx->funk ); - else fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 ); + fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 ); + fd_lthash_zero( &ctx->hash_info.expected_lthash ); + fd_lthash_zero( &ctx->hash_info.calculated_lthash ); ctx->state = FD_SNAPIN_STATE_LOADING; break; case FD_SNAPSHOT_MSG_CTRL_EOF_FULL: @@ -229,10 +321,10 @@ handle_control_frag( fd_snapin_tile_t * ctx, break; } + if( FD_LIKELY( ctx->hash_info.enabled ) ) ctx->pending_ack = 1; fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu ); - fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid(); - ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 ); + ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->root_funk_txn, &incremental_xid, 0 ); FD_TEST( ctx->funk_txn ); ctx->full = 0; @@ -245,18 +337,50 @@ handle_control_frag( fd_snapin_tile_t * ctx, break; } - if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 ); - fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL ); + if( FD_LIKELY( ctx->hash_info.enabled ) ) ctx->pending_ack = 1; break; case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: + FD_TEST( ctx->pending_ack==0 ); ctx->state = FD_SNAPIN_STATE_SHUTDOWN; metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */ + + fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 ); + fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL ); + break; + case FD_SNAPSHOT_HASH_MSG_RESULT: { + FD_TEST( ctx->hash_info.enabled && ctx->pending_ack ); + /* TODO: more robust in indexing */ + fd_lthash_value_t const * calculated_lthash = fd_chunk_to_laddr_const( ctx->hash_in[ in_idx - 1UL ].wksp, chunk ); + fd_lthash_add( &ctx->hash_info.calculated_lthash, calculated_lthash ); + ctx->hash_info.received_lthashes++; + + if( FD_LIKELY( ctx->hash_info.received_lthashes!=ctx->hash_info.num_hash_tiles ) ) break; + + ctx->pending_ack = 0; + if( FD_UNLIKELY( memcmp( &ctx->hash_info.expected_lthash, &ctx->hash_info.calculated_lthash, sizeof(fd_lthash_value_t) ) ) ) { + FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in snapshot manifest", + FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.calculated_lthash ), + FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.expected_lthash ) )); + transition_malformed( ctx, stem ); + } else { + FD_LOG_NOTICE(( "calculated accounts lthash %s matches accounts lthash %s in snapshot manifest", + FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.calculated_lthash ), + FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.expected_lthash ) )); + } + ctx->hash_info.received_lthashes = 0UL; break; + } default: FD_LOG_ERR(( "unexpected control sig %lu", sig )); return; } + /* snapin waits for result lthashes to come back from the snaplt tiles. + Until these hashes come back, snapin cannot ack the control message + sent from snaprd because snaprd would advance before snapin + is ready to receive another snapshot byte stream or shutdown. */ + if( FD_UNLIKELY( ctx->pending_ack ) ) return; + /* We must acknowledge after handling the control frag, because if it causes us to generate a malformed transition, that must be sent back to the snaprd controller before the acknowledgement. */ @@ -284,8 +408,8 @@ returnable_frag( fd_snapin_tile_t * ctx, FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN ); - if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem ); - else handle_control_frag( ctx, stem, sig ); + if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem ); + else handle_control_frag( ctx, stem, sig, in_idx, chunk ); return 0; } @@ -310,23 +434,31 @@ unprivileged_init( fd_topo_t * topo, fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) ); void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) ); - ctx->full = 1; - ctx->state = FD_SNAPIN_STATE_LOADING; + ctx->full = 1; + ctx->state = FD_SNAPIN_STATE_LOADING; + ctx->pending_ack = 0; ctx->boot_timestamp = fd_log_wallclock(); FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) ); - ctx->funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map ); + ctx->root_funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map ); + ctx->funk_txn = ctx->root_funk_txn; - ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb ); + ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb ); FD_TEST( ctx->ssparse ); fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) ); + ulong num_hash_tiles = fd_topo_tile_name_cnt( topo, "snaplt" ); + ctx->hash_info.num_hash_tiles = num_hash_tiles; + ctx->hash_info.received_lthashes = 0UL; + ctx->hash_info.enabled = num_hash_tiles>0UL; + ulong snapin_out_cnt = ctx->hash_info.enabled ? 3UL : 2UL; + if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" )); - if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt )); - if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt )); + if( FD_UNLIKELY( tile->in_cnt!=1UL + num_hash_tiles ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 2", tile->in_cnt )); + if( FD_UNLIKELY( tile->out_cnt!=snapin_out_cnt ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 3", tile->out_cnt )); fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ]; ctx->manifest_out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp; @@ -343,9 +475,36 @@ unprivileged_init( fd_topo_t * topo, ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache ); ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu ); ctx->in.mtu = in_link->mtu; + ctx->in.chunk_offset = 0UL; + + if( FD_LIKELY( ctx->hash_info.enabled ) ) { + for( ulong i=0UL; ilinks[ tile->in_link_id[ i+1UL ] ]; + FD_TEST( strcmp( hash_in_link->name, "snaplt_out" )==0 ); + fd_topo_wksp_t const * hash_in_wksp = &topo->workspaces[ topo->objs[ hash_in_link->dcache_obj_id ].wksp_id ]; + ctx->hash_in[ i ].wksp = hash_in_wksp->wksp; + ctx->hash_in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->hash_in[ i ].wksp, hash_in_link->dcache ); + ctx->hash_in[ i ].wmark = fd_dcache_compact_wmark( ctx->hash_in[ i ].wksp, hash_in_link->dcache, hash_in_link->mtu ); + ctx->hash_in[ i ].mtu = hash_in_link->mtu; + } + + fd_topo_link_t const * hash_out_link = &topo->links[ tile->out_link_id[ FD_SNAPIN_HSH_IDX ] ]; + FD_TEST( strcmp( hash_out_link->name, "snapin_lt" )==0 ); + ctx->hash_out.wksp = topo->workspaces[ topo->objs[ hash_out_link->dcache_obj_id ].wksp_id ].wksp; + ctx->hash_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( hash_out_link->dcache ), hash_out_link->dcache ); + ctx->hash_out.wmark = fd_dcache_compact_wmark ( ctx->hash_out.wksp, hash_out_link->dcache, hash_out_link->mtu ); + ctx->hash_out.chunk = ctx->hash_out.chunk0; + ctx->hash_out.mtu = hash_out_link->mtu; + } + + fd_lthash_zero( &ctx->hash_info.expected_lthash ); + fd_lthash_zero( &ctx->hash_info.calculated_lthash ); } -#define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */ +/* For control fragments, one acknowledgement, and one malformed + message. Or one FD_SNAPSHOT_HASH_MSG_SUB, one + FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR, and one malformed message */ +#define STEM_BURST 3UL #define STEM_LAZY 1000L #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t diff --git a/src/discof/restore/fd_snaplt_tile.c b/src/discof/restore/fd_snaplt_tile.c new file mode 100644 index 00000000000..86aed118831 --- /dev/null +++ b/src/discof/restore/fd_snaplt_tile.c @@ -0,0 +1,306 @@ +#include "../../disco/topo/fd_topo.h" +#include "../../disco/metrics/fd_metrics.h" +#include "../../ballet/lthash/fd_lthash.h" +#include "../../flamenco/runtime/fd_hashes.h" +#include "../../flamenco/runtime/fd_acc_mgr.h" + +#include "utils/fd_ssctrl.h" + +#define NAME "snaplt" + +/* The snaplt tile is a state machine that hashes accounts from an + account input stream that it receives from the snapin tile. + + An account input stream starts with a SNAPSHOT_HASH_MSG_RESET + message, which indicates the start of an account input stream. + An account input stream ends with a SNAPSHOT_HASH_MSG_FINI message, + indicating snaplt should send its calculated accounts hash to snapin + with a SNAPSHOT_HASH_MSG_RESULT message. */ + +#define FD_SNAPLT_STATE_HASHING (0) +#define FD_SNAPLT_STATE_DONE (1) +#define FD_SNAPLT_STATE_SHUTDOWN (2) + +struct fd_snaplt_tile { + int state; + int full; + + fd_lthash_value_t running_lthash; + + fd_snapshot_account_t account; + ulong acc_data_sz; + + int hash_account; + ulong num_hash_tiles; + ulong hash_tile_idx; + fd_blake3_t b3[1]; + + + struct { + struct { + ulong accounts_hashed; + } full; + + struct { + ulong accounts_hashed; + } incremental; + } metrics; + + struct { + fd_wksp_t * wksp; + ulong chunk0; + ulong wmark; + ulong mtu; + } in; + + struct { + fd_wksp_t * wksp; + ulong chunk0; + ulong wmark; + ulong chunk; + ulong mtu; + } out; + +}; + +typedef struct fd_snaplt_tile fd_snaplt_tile_t; + +static int +should_hash_account( fd_snaplt_tile_t * ctx, + uchar const account_pubkey[ static FD_HASH_FOOTPRINT ] ) { + return fd_hash( account_pubkey[ 4UL ], account_pubkey, sizeof(fd_pubkey_t) )%ctx->num_hash_tiles==ctx->hash_tile_idx; +} + +static inline int +should_shutdown( fd_snaplt_tile_t * ctx ) { + return ctx->state==FD_SNAPLT_STATE_SHUTDOWN; +} + +static ulong +scratch_align( void ) { + return 128UL; +} + +static ulong +scratch_footprint( fd_topo_tile_t const * tile ) { + (void)tile; + ulong l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, alignof(fd_snaplt_tile_t), sizeof(fd_snaplt_tile_t) ); + return FD_LAYOUT_FINI( l, alignof(fd_snaplt_tile_t) ); +} + +static void +metrics_write( fd_snaplt_tile_t * ctx ) { + FD_MGAUGE_SET( SNAPLT, FULL_ACCOUNTS_HASHED, ctx->metrics.full.accounts_hashed ); + FD_MGAUGE_SET( SNAPLT, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed ); + FD_MGAUGE_SET( SNAPLT, STATE, (ulong)(ctx->state) ); +} + +static void +handle_data_frag( fd_snaplt_tile_t * ctx, + ulong sig, + ulong chunk, + ulong sz ) { + switch( sig ) { + case FD_SNAPSHOT_HASH_MSG_SUB: { + FD_TEST( ctx->state==FD_SNAPLT_STATE_HASHING ); + + fd_snapshot_existing_account_t const * prev_acc = fd_chunk_to_laddr_const( ctx->in.wksp, chunk ); + + if( !should_hash_account( ctx, prev_acc->hdr.pubkey) ) break; + + fd_lthash_value_t prev_lthash[1]; + fd_hashes_account_lthash_simple( prev_acc->hdr.pubkey, + prev_acc->hdr.owner, + prev_acc->hdr.lamports, + prev_acc->hdr.executable, + prev_acc->data, + prev_acc->hdr.data_len, + prev_lthash ); + fd_lthash_sub( &ctx->running_lthash, prev_lthash ); + return; + } + case FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR: { + FD_TEST( ctx->state==FD_SNAPLT_STATE_HASHING && ctx->acc_data_sz==0UL ); + fd_snapshot_account_t const * account = fd_chunk_to_laddr_const( ctx->in.wksp, chunk ); + if( !should_hash_account( ctx, account->pubkey) ) break; + + if( FD_LIKELY( account->lamports!=0UL ) ) { + ctx->hash_account = 1; + fd_blake3_init( ctx->b3 ); + fd_blake3_append( ctx->b3, &account->lamports, sizeof( ulong ) ); + fd_memcpy( &ctx->account, account, sizeof(fd_snapshot_account_t) ); + } + break; + } + case FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA: { + FD_TEST( ctx->state==FD_SNAPLT_STATE_HASHING ); + if( FD_LIKELY( ctx->hash_account ) ) { + fd_blake3_append( ctx->b3, fd_chunk_to_laddr_const( ctx->in.wksp, chunk ), sz ); + ctx->acc_data_sz += sz; + } + break; + } + default: + FD_LOG_ERR(( "unexpected sig %lu in handle_data_frag", sig )); + return; + } + + /* Additive account hashing */ + if( FD_LIKELY( ctx->acc_data_sz==ctx->account.data_len && ctx->hash_account ) ) { + fd_lthash_value_t account_lthash[1]; + fd_lthash_zero( account_lthash ); + + uchar executable_flag = ctx->account.executable & 0x1; + fd_blake3_append( ctx->b3, &executable_flag, sizeof( uchar ) ); + fd_blake3_append( ctx->b3, ctx->account.owner, FD_HASH_FOOTPRINT ); + fd_blake3_append( ctx->b3, ctx->account.pubkey, FD_HASH_FOOTPRINT ); + fd_blake3_fini_2048( ctx->b3, account_lthash->bytes ); + + fd_lthash_add( &ctx->running_lthash, account_lthash ); + ctx->acc_data_sz = 0UL; + ctx->hash_account = 0; + + if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++; + else ctx->metrics.incremental.accounts_hashed++; + } +} + +static void +handle_control_frag( fd_snaplt_tile_t * ctx, + fd_stem_context_t * stem, + ulong sig ) { + switch( sig ) { + case FD_SNAPSHOT_MSG_CTRL_RESET_FULL: + ctx->full = 1; + ctx->state = FD_SNAPLT_STATE_HASHING; + fd_lthash_zero( &ctx->running_lthash ); + + ctx->metrics.full.accounts_hashed = 0UL; + ctx->metrics.incremental.accounts_hashed = 0UL; + break; + case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL: + ctx->full = 0; + ctx->state = FD_SNAPLT_STATE_HASHING; + fd_lthash_zero( &ctx->running_lthash ); + ctx->metrics.incremental.accounts_hashed = 0UL; + break; + case FD_SNAPSHOT_MSG_CTRL_EOF_FULL: { + uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk ); + fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) ); + fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL ); + ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk0, ctx->out.wmark ); + ctx->full = 0; + fd_lthash_zero( &ctx->running_lthash ); + break; + } + case FD_SNAPSHOT_MSG_CTRL_DONE: { + uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk ); + fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) ); + fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL ); + ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk0, ctx->out.wmark ); + ctx->state = FD_SNAPLT_STATE_DONE; + break; + } + case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: + FD_LOG_INFO(( "num hashed accounts in full snapshot is %lu and in incremental snapshot is %lu", ctx->metrics.full.accounts_hashed, ctx->metrics.incremental.accounts_hashed )); + FD_TEST( ctx->state==FD_SNAPLT_STATE_DONE ); + ctx->state = FD_SNAPLT_STATE_SHUTDOWN; + break; + default: + FD_LOG_ERR(( "unexpected sig %lu in handle_control_frag", sig )); + return; + } + /* We must acknowledge after handling the control frag, because if it + causes us to generate a malformed transition, that must be sent + back to the snaprd controller before the acknowledgement. */ + fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL ); +} + +static inline int +returnable_frag( fd_snaplt_tile_t * ctx, + ulong in_idx, + ulong seq, + ulong sig, + ulong chunk, + ulong sz, + ulong ctl, + ulong tsorig, + ulong tspub, + fd_stem_context_t * stem ) { + (void)in_idx; + (void)seq; + (void)ctl; + (void)tsorig; + (void)tspub; + + FD_TEST( ctx->state!=FD_SNAPLT_STATE_SHUTDOWN ); + + if( sig==FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR || + sig==FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA || + sig==FD_SNAPSHOT_HASH_MSG_SUB ) handle_data_frag( ctx, sig, chunk, sz ); + else handle_control_frag( ctx, stem, sig ); + + return 0; +} + +static void +unprivileged_init( fd_topo_t * topo, + fd_topo_tile_t * tile ) { + void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id ); + + FD_SCRATCH_ALLOC_INIT( l, scratch ); + fd_snaplt_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplt_tile_t), sizeof(fd_snaplt_tile_t) ); + + if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt )); + if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt )); + + fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ]; + fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ]; + ctx->in.wksp = in_wksp->wksp;; + ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache ); + ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu ); + ctx->in.mtu = in_link->mtu; + + fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ]; + ctx->out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp; + ctx->out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache ); + ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, writer_link->dcache, writer_link->mtu ); + ctx->out.chunk = ctx->out.chunk0; + ctx->out.mtu = writer_link->mtu; + + FD_TEST( strncmp( topo->links[ tile->out_link_id[ 1UL ] ].name, "snaplt_rd", 9UL )==0 ); + ctx->metrics.full.accounts_hashed = 0UL; + ctx->metrics.incremental.accounts_hashed = 0UL; + + ctx->state = FD_SNAPLT_STATE_HASHING; + ctx->full = 1; + ctx->acc_data_sz = 0UL; + ctx->hash_account = 0; + ctx->num_hash_tiles = fd_topo_tile_name_cnt( topo, "snaplt" ); + ctx->hash_tile_idx = tile->kind_id; + + fd_lthash_zero( &ctx->running_lthash ); +} + +#define STEM_BURST 1UL +#define STEM_LAZY 1000L + +#define STEM_CALLBACK_CONTEXT_TYPE fd_snaplt_tile_t +#define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplt_tile_t) + +#define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown +#define STEM_CALLBACK_METRICS_WRITE metrics_write +#define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag + +#include "../../disco/stem/fd_stem.c" + +fd_topo_run_tile_t fd_tile_snaplt = { + .name = NAME, + .scratch_align = scratch_align, + .scratch_footprint = scratch_footprint, + .unprivileged_init = unprivileged_init, + .run = stem_run, +}; + +#undef NAME diff --git a/src/discof/restore/fd_snaprd_tile.c b/src/discof/restore/fd_snaprd_tile.c index 5bdb42a4ae2..da13ef7d3d1 100644 --- a/src/discof/restore/fd_snaprd_tile.c +++ b/src/discof/restore/fd_snaprd_tile.c @@ -41,9 +41,9 @@ #define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */ -#define IN_KIND_SNAPCTL (0) -#define IN_KIND_GOSSIP (1) -#define MAX_IN_LINKS (3) +#define IN_KIND_SNAPCTL (0) +#define IN_KIND_GOSSIP (1) +#define MAX_IN_LINK_KINDS (3) struct fd_snaprd_tile { fd_ssping_t * ssping; @@ -72,7 +72,7 @@ struct fd_snaprd_tile { int incremental_snapshot_fd; } local_out; - uchar in_kind[ MAX_IN_LINKS ]; + uchar in_kind[ MAX_IN_LINK_KINDS ]; struct { ulong full_snapshot_slot; @@ -115,6 +115,7 @@ struct fd_snaprd_tile { /* TODO: Don't do this ... should be in the monitor instead */ struct { + ulong snaplt_tile_cnt; ulong prev_bytes_read; ulong prev_accounts_inserted; volatile ulong * cur_accounts_inserted; @@ -124,6 +125,8 @@ struct fd_snaprd_tile { ulong prev_snapdc_wait; volatile ulong * cur_snapdc_caughtup_postfrag; ulong prev_snapin_backp_prefrag; volatile ulong * cur_snapin_backp_prefrag; ulong prev_snapin_wait; volatile ulong * cur_snapin_caughtup_postfrag; + ulong prev_snaplt_backp_prefrag; volatile ulong * cur_snaplt_backp_prefrag[ FD_MAX_SNAPLT_TILES ]; + ulong prev_snaplt_wait; volatile ulong * cur_snaplt_caughtup_postfrag[ FD_MAX_SNAPLT_TILES ]; } diagnostics; struct { @@ -388,6 +391,13 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) { ulong snapin_backp = *ctx->diagnostics.cur_snapin_backp_prefrag; ulong snapin_wait = *ctx->diagnostics.cur_snapin_caughtup_postfrag + snapin_backp; + ulong snaplt_backp = 0UL; + ulong snaplt_wait = 0UL; + for( ulong i=0UL; idiagnostics.snaplt_tile_cnt; i++ ) { + snaplt_backp += *ctx->diagnostics.cur_snaplt_backp_prefrag[ i ]; + snaplt_wait += *ctx->diagnostics.cur_snaplt_caughtup_postfrag[ i ] + snaplt_backp; + } + ulong accounts_inserted = *ctx->diagnostics.cur_accounts_inserted; double ns_per_tick = 1.0/fd_tempo_tick_per_ns( NULL ); @@ -402,15 +412,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) { case FD_SNAPRD_STATE_READING_FULL_FILE: { double progress = 0.0; if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total; - FD_LOG_NOTICE(( "restoring full from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", + double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0; + double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0; + FD_LOG_NOTICE(( "restoring full from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", progress, bandwidth, ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7, + snaplt_backp_val, 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ), + snaplt_busy_val, (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 )); break; } @@ -424,15 +438,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) { case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE: { double progress = 0.0; if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total; - FD_LOG_NOTICE(( "restoring incremental from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", + double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0; + double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0; + FD_LOG_NOTICE(( "restoring incremental from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", progress, bandwidth, ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7, + snaplt_backp_val, 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ), + snaplt_busy_val, (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 )); break; } @@ -443,15 +461,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) { case FD_SNAPRD_STATE_READING_FULL_HTTP: { double progress = 0.0; if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total; - FD_LOG_NOTICE(( "restoring full from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", + double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0; + double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0; + FD_LOG_NOTICE(( "restoring full from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", progress, bandwidth, ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7, + snaplt_backp_val, 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ), + snaplt_busy_val, (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 )); break; } @@ -466,15 +488,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) { case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: { double progress = 0.0; if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total; - FD_LOG_NOTICE(( "restoring incremental from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", + double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0; + double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0; + FD_LOG_NOTICE(( "restoring incremental from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s", progress, bandwidth, ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7, ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7, + snaplt_backp_val, 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ), 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ), + snaplt_busy_val, (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 )); break; } @@ -497,6 +523,8 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) { ctx->diagnostics.prev_snapdc_wait = snapdc_wait; ctx->diagnostics.prev_snapin_backp_prefrag = snapin_backp; ctx->diagnostics.prev_snapin_wait = snapin_wait; + ctx->diagnostics.prev_snaplt_backp_prefrag = snaplt_backp; + ctx->diagnostics.prev_snaplt_wait = snaplt_wait; ctx->diagnostics.prev_accounts_inserted = accounts_inserted; } @@ -519,9 +547,9 @@ after_credit( fd_snaprd_tile_t * ctx, /* All control fragments sent by the snaprd tile must be fully acknowledged by all downstream consumers before processing can proceed, to prevent tile state machines from getting out of sync - (see fd_ssctrl.h for more details). Currently there are two - downstream consumers, snapdc and snapin. */ -#define NUM_SNAP_CONSUMERS (2UL) + (see fd_ssctrl.h for more details). Currently there are three + downstream consumers, snapdc, snapin, and snaplt. */ +#define NUM_SNAP_CONSUMERS (2UL + ctx->diagnostics.snaplt_tile_cnt) if( FD_UNLIKELY( now>ctx->diagnostic_deadline_nanos ) ) { ctx->diagnostic_deadline_nanos = now+(long)1e9; @@ -926,19 +954,35 @@ unprivileged_init( fd_topo_t * topo, ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics ); ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics ); ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics ); + + ulong volatile * snaplt_metrics[ FD_MAX_SNAPLT_TILES ]; + ctx->diagnostics.snaplt_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplt" ); + + for( ulong i=0UL; idiagnostics.snaplt_tile_cnt; i++ ) { + ulong snaplt_tile_idx = fd_topo_find_tile( topo, "snaplt", i ); + FD_TEST( snaplt_tile_idx!=ULONG_MAX ); + fd_topo_tile_t * snaplt_tile = &topo->tiles[ snaplt_tile_idx ]; + snaplt_metrics[ i ] = fd_metrics_tile( snaplt_tile->metrics ); + } + ctx->diagnostics.cur_snaprd_backp_prefrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ); ctx->diagnostics.cur_snaprd_caughtup_postfrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ); ctx->diagnostics.cur_snapdc_backp_prefrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ); ctx->diagnostics.cur_snapdc_caughtup_postfrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ); ctx->diagnostics.cur_snapin_backp_prefrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ); ctx->diagnostics.cur_snapin_caughtup_postfrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ); + + for( ulong i=0UL; idiagnostics.snaplt_tile_cnt; i++ ) { + ctx->diagnostics.cur_snaplt_backp_prefrag[ i ] = snaplt_metrics[ i ]+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ); + ctx->diagnostics.cur_snaplt_caughtup_postfrag[ i ] = snaplt_metrics[ i ]+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ); + } ctx->diagnostics.cur_accounts_inserted = snapin_metrics+MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED ); ctx->gossip.ci_table = _ci_table; /* zero-out memory so that we can perform null checks in after_frag */ fd_memset( ctx->gossip.ci_table, 0, sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE ); - FD_TEST( tile->in_cnt<=MAX_IN_LINKS ); + FD_TEST( tile->in_cnt<=MAX_IN_LINK_KINDS + ctx->diagnostics.snaplt_tile_cnt ); for( ulong i=0UL; i<(tile->in_cnt); i++ ){ fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ]; if( 0==strcmp( in_link->name, "gossip_out" ) ) { @@ -949,7 +993,8 @@ unprivileged_init( fd_topo_t * topo, // ctx->gossip_in.wmark = fd_dcache_compact_wmark ( ctx->gossip_in.mem, in_link->dcache, in_link->mtu ); // ctx->gossip_in.mtu = in_link->mtu; } else if( 0==strcmp( in_link->name, "snapdc_rd" ) || - 0==strcmp( in_link->name, "snapin_rd" ) ) { + 0==strcmp( in_link->name, "snapin_rd" ) || + 0==strcmp( in_link->name, "snaplt_rd" ) ) { ctx->in_kind[ i ] = IN_KIND_SNAPCTL; } } diff --git a/src/discof/restore/utils/fd_ssctrl.h b/src/discof/restore/utils/fd_ssctrl.h index f21fe288b49..fe3d44398fa 100644 --- a/src/discof/restore/utils/fd_ssctrl.h +++ b/src/discof/restore/utils/fd_ssctrl.h @@ -1,6 +1,9 @@ #ifndef HEADER_fd_src_discof_restore_utils_fd_ssctrl_h #define HEADER_fd_src_discof_restore_utils_fd_ssctrl_h +#include "../../../flamenco/types/fd_types_custom.h" +#include "../../../flamenco/runtime/fd_runtime_const.h" + /* The snapshot tiles have a somewhat involved state machine, which is controlled by snaprd. Imagine first the following sequence: @@ -49,4 +52,56 @@ #define FD_SNAPSHOT_MSG_CTRL_ACK (6UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */ #define FD_SNAPSHOT_MSG_CTRL_MALFORMED (7UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */ +/* snapin -> snaplt */ +#define FD_SNAPSHOT_HASH_MSG_SUB (8UL) /* Indicates snapin has encountered a duplicate account whose hash must be subtracted */ +#define FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR (9UL) /* Indicates snapin has encountered a new account metadata */ +#define FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA (10UL) /* Account data that is sent as snapin processes a new account */ + +/* snaplt -> snapin */ +#define FD_SNAPSHOT_HASH_MSG_RESULT (11UL) /* Hash result sent from snaplt to snapin */ + +#define FD_MAX_SNAPLT_TILES (16UL) + +/* fd_snapshot_account is the contents of the + SNAPSHOT_HASH_MSG_ACCOUNT_HDR message. It contains account metadata + that is contained in the accounts hash. */ +struct fd_snapshot_account { + uchar pubkey[ FD_HASH_FOOTPRINT ]; + uchar owner[ FD_HASH_FOOTPRINT ]; + ulong lamports; + uchar executable; + ulong data_len; +}; +typedef struct fd_snapshot_account fd_snapshot_account_t; + +/* fd_snapshot_account_init initializes a fd_snapshot_account_t struct + with the appropriate account metadata fields. */ +static inline void +fd_snapshot_account_init( fd_snapshot_account_t * account, + uchar const pubkey[ FD_HASH_FOOTPRINT ], + uchar const owner[ FD_PUBKEY_FOOTPRINT ], + ulong lamports, + uchar executable, + ulong data_len ) { + fd_memcpy( account->pubkey, pubkey, FD_HASH_FOOTPRINT ); + fd_memcpy( account->owner, owner, FD_PUBKEY_FOOTPRINT ); + account->lamports = lamports; + account->executable = executable; + account->data_len = data_len; +} + +/* fd_snapshot_existing_account is the contents of the + SNAPSHOT_HASH_MSG_SUB message. It contains a fd_snapshot_account_t + header and the corresponding account data in a single message. + + For simplicity and conformance to burst limitations in snapin, the + entire duplicate account is sent in one message (one frag). Consider + caching the lthash of the duplicate account so we do not have to + send the entire account over. */ +struct fd_snapshot_existing_account { + fd_snapshot_account_t hdr; + uchar data[ FD_RUNTIME_ACC_SZ_MAX ]; +}; +typedef struct fd_snapshot_existing_account fd_snapshot_existing_account_t; + #endif /* HEADER_fd_src_discof_restore_utils_fd_ssctrl_h */ diff --git a/src/flamenco/runtime/fd_hashes.c b/src/flamenco/runtime/fd_hashes.c index 45c48f47a80..e3c3f6debd5 100644 --- a/src/flamenco/runtime/fd_hashes.c +++ b/src/flamenco/runtime/fd_hashes.c @@ -13,21 +13,38 @@ fd_hashes_account_lthash( fd_pubkey_t const * pubkey, fd_account_meta_t const * account, uchar const * data, fd_lthash_value_t * lthash_out ) { + fd_hashes_account_lthash_simple( pubkey->uc, + account->owner, + account->lamports, + account->executable, + data, + account->dlen, + lthash_out ); +} + +void +fd_hashes_account_lthash_simple( uchar const pubkey[ static FD_HASH_FOOTPRINT ], + uchar const owner[ static FD_PUBKEY_FOOTPRINT ], + ulong lamports, + uchar executable, + uchar const * data, + ulong data_len, + fd_lthash_value_t * lthash_out ) { fd_lthash_zero( lthash_out ); /* Accounts with zero lamports are not included in the hash, so they should always be treated as zero */ - if( FD_UNLIKELY( account->lamports == 0 ) ) { + if( FD_UNLIKELY( lamports == 0 ) ) { return; } - uchar executable = account->executable & 0x1; + uchar executable_flag = executable & 0x1; fd_blake3_t b3[1]; fd_blake3_init( b3 ); - fd_blake3_append( b3, &account->lamports, sizeof( ulong ) ); - fd_blake3_append( b3, data, account->dlen ); - fd_blake3_append( b3, &executable, sizeof( uchar ) ); - fd_blake3_append( b3, account->owner, FD_PUBKEY_FOOTPRINT ); + fd_blake3_append( b3, &lamports, sizeof( ulong ) ); + fd_blake3_append( b3, data, data_len ); + fd_blake3_append( b3, &executable_flag, sizeof( uchar ) ); + fd_blake3_append( b3, owner, FD_PUBKEY_FOOTPRINT ); fd_blake3_append( b3, pubkey, FD_PUBKEY_FOOTPRINT ); fd_blake3_fini_2048( b3, lthash_out->bytes ); } diff --git a/src/flamenco/runtime/fd_hashes.h b/src/flamenco/runtime/fd_hashes.h index fb8b0ed1a8b..928f134f635 100644 --- a/src/flamenco/runtime/fd_hashes.h +++ b/src/flamenco/runtime/fd_hashes.h @@ -56,6 +56,29 @@ fd_hashes_account_lthash( fd_pubkey_t const * pubkey, uchar const * data, fd_lthash_value_t * lthash_out ); +/* fd_hashes_account_lthash_simple is functionally the same as + fd_hashes_account_lthash, but with simpler arguments that detail + the exact parameters that go into the lthash. + + pubkey points to the account's public key (32 bytes). owner points + to the account's owner (32 bytes). lamports is the account's + lamports. executable is the account's executable flag. data points + to the account data. data_len is the length of the account data. + lthash_out points to where the computed lthash value will be written + (2048 bytes). + + On return, lthash_out contains the computed lthash. This function + assumes all pointers are valid and properly aligned. The account + data pointer must be readable for account->dlen bytes. */ +void +fd_hashes_account_lthash_simple( uchar const pubkey[ static FD_HASH_FOOTPRINT ], + uchar const owner[ static FD_PUBKEY_FOOTPRINT ], + ulong lamports, + uchar executable, + uchar const * data, + ulong data_len, + fd_lthash_value_t * lthash_out ); + /* fd_hashes_update_lthash updates the bank's incremental lthash when an account is modified during transaction execution. The bank lthash is maintained incrementally by subtracting the old account hash and diff --git a/src/flamenco/runtime/tests/run_backtest_ci.sh b/src/flamenco/runtime/tests/run_backtest_ci.sh index db316955b4d..16faa8a4af3 100755 --- a/src/flamenco/runtime/tests/run_backtest_ci.sh +++ b/src/flamenco/runtime/tests/run_backtest_ci.sh @@ -3,7 +3,7 @@ set -e src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-308392063-v2.3.0 -y 5 -m 2000000 -e 308392090 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l devnet-350814254-v2.3.0 -y 3 -m 2000000 -e 350814284 -c 2.3.0 -src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0 +src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0 -lt src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-324823213-v2.3.0 -y 4 -m 2000000 -e 324823214 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-325467935-v2.3.0 -y 4 -m 2000000 -e 325467936 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-283927487-v2.3.0 -y 3 -m 2000000 -e 283927497 -c 2.3.0 diff --git a/src/flamenco/runtime/tests/run_backtest_tests_all.sh b/src/flamenco/runtime/tests/run_backtest_tests_all.sh index b2930f87a5d..4bf9ee4ce8c 100755 --- a/src/flamenco/runtime/tests/run_backtest_tests_all.sh +++ b/src/flamenco/runtime/tests/run_backtest_tests_all.sh @@ -63,7 +63,7 @@ src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-307395181-v2.3.0 -y src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-308392063-v2.3.0 -y 5 -m 2000000 -e 308392090 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l devnet-350814254-v2.3.0 -y 3 -m 2000000 -e 350814284 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-311586340-v2.3.0 -y 3 -m 2000000 -e 311586380 -c 2.3.0 -src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0 +src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0 -lt src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-324823213-v2.3.0 -y 4 -m 2000000 -e 324823214 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-325467935-v2.3.0 -y 4 -m 2000000 -e 325467936 -c 2.3.0 src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-283927487-v2.3.0 -y 3 -m 2000000 -e 283927497 -c 2.3.0 diff --git a/src/flamenco/runtime/tests/run_ledger_backtest.sh b/src/flamenco/runtime/tests/run_ledger_backtest.sh index 627bbd403ef..f4a053e67b2 100755 --- a/src/flamenco/runtime/tests/run_ledger_backtest.sh +++ b/src/flamenco/runtime/tests/run_ledger_backtest.sh @@ -22,6 +22,7 @@ HUGE_TLBFS_MOUNT_PATH=${HUGE_TLBFS_MOUNT_PATH:="/mnt/.fd"} HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE=${HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE:="true"} HAS_INCREMENTAL="false" REDOWNLOAD=1 +DISABLE_LTHASH_VERIFICATION="true" while [[ $# -gt 0 ]]; do case $1 in @@ -100,6 +101,10 @@ while [[ $# -gt 0 ]]; do REDOWNLOAD=0 shift ;; + -lt|--enable-lthash-verification) + DISABLE_LTHASH_VERIFICATION="false" + shift + ;; -*|--*) echo "unknown option $1" exit 1 @@ -166,6 +171,7 @@ echo " maximum_download_retry_abort = 0 [layout] shred_tile_count = 4 + snaplt_tile_count = 4 [tiles] [tiles.archiver] enabled = true @@ -200,7 +206,10 @@ echo " snapshots = \"$DUMP/$LEDGER\" [hugetlbfs] mount_path = \"$HUGE_TLBFS_MOUNT_PATH\" - allow_hugepage_increase = $HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE" > $DUMP_DIR/${LEDGER}_backtest.toml + allow_hugepage_increase = $HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE +[development] + [development.snapshots] + disable_lthash_verification = $DISABLE_LTHASH_VERIFICATION" > $DUMP_DIR/${LEDGER}_backtest.toml if [[ -z "$GENESIS" ]]; then echo "[gossip]