diff --git a/book/api/metrics-generated.md b/book/api/metrics-generated.md
index 768aa1dba8c..0d670023095 100644
--- a/book/api/metrics-generated.md
+++ b/book/api/metrics-generated.md
@@ -887,6 +887,18 @@
+## Snaplt Tile
+
+
+
+| Metric | Type | Description |
+|--------|------|-------------|
+| snaplt_state | gauge | State of the tile. 0=hashing, 1=done, 2=shutdown |
+| snaplt_full_accounts_hashed | gauge | Number of accounts hashed for the full snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted |
+| snaplt_incremental_accounts_hashed | gauge | Number of accounts hashed for the incremental snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted |
+
+
+
## Ipecho Tile
diff --git a/src/app/firedancer-dev/commands/backtest.c b/src/app/firedancer-dev/commands/backtest.c
index e5a8561635e..68a0d7f4f50 100644
--- a/src/app/firedancer-dev/commands/backtest.c
+++ b/src/app/firedancer-dev/commands/backtest.c
@@ -23,6 +23,7 @@
#include "../../../disco/metrics/fd_metrics.h"
#include "../../../util/pod/fd_pod_format.h"
#include "../../../discof/restore/utils/fd_ssmsg.h"
+#include "../../../discof/restore/utils/fd_ssctrl.h"
#include "../../../discof/tower/fd_tower_tile.h"
#include "../../../discof/reasm/fd_reasm.h"
#include "../../../discof/replay/fd_exec.h" /* FD_RUNTIME_PUBLIC_ACCOUNT_UPDATE_MSG_MTU */
@@ -38,6 +39,8 @@ static void
backtest_topo( config_t * config ) {
ulong exec_tile_cnt = config->firedancer.layout.exec_tile_count;
ulong writer_tile_cnt = config->firedancer.layout.writer_tile_count;
+ ulong snaplt_tile_cnt = config->firedancer.layout.snaplt_tile_count;
+ int snaplt_disabled = config->development.snapshots.disable_lthash_verification;
int disable_snap_loader = !config->gossip.entrypoints_cnt;
int solcap_enabled = strlen( config->capture.solcap_capture )>0;
@@ -100,6 +103,14 @@ backtest_topo( config_t * config ) {
snaprd_tile->allow_shutdown = 1;
snapdc_tile->allow_shutdown = 1;
snapin_tile->allow_shutdown = 1;
+
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_wksp( topo, "snaplt" );
+ for( ulong i=0UL; iallow_shutdown = 1;
+ }
+ }
} else {
fd_topob_wksp( topo, "genesi" );
fd_topob_tile( topo, "genesi", "genesi", "metric_in", cpu_idx++, 0, 0 )->allow_shutdown = 1;
@@ -128,7 +139,12 @@ backtest_topo( config_t * config ) {
fd_topob_wksp( topo, "snapdc_rd" );
fd_topob_wksp( topo, "snapin_rd" );
fd_topob_wksp( topo, "snap_out" );
- fd_topob_wksp( topo, "replay_manif" );
+
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_wksp( topo, "snapin_lt" );
+ fd_topob_wksp( topo, "snaplt_out" );
+ fd_topob_wksp( topo, "snaplt_rd" );
+ }
/* TODO: Should be depth of 1 or 2, not 4, but it causes backpressure
from the replay tile parsing the manifest, remove when this is
fixed. */
@@ -139,6 +155,12 @@ backtest_topo( config_t * config ) {
fd_topob_link( topo, "snapdc_rd", "snapdc_rd", 128UL, 0UL, 1UL );
fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_link( topo, "snapin_lt", "snapin_lt", 128UL, sizeof(fd_snapshot_existing_account_t), 1UL );
+ FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_out", "snaplt_out", 128UL, 2048UL, 1UL );
+ FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_rd", "snaplt_rd", 128UL, 0UL, 1UL );
+ }
+
fd_topob_tile_out( topo, "snaprd", 0UL, "snap_zstd", 0UL );
fd_topob_tile_in ( topo, "snapdc", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_out( topo, "snapdc", 0UL, "snap_stream", 0UL );
@@ -150,6 +172,15 @@ backtest_topo( config_t * config ) {
fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_rd", 0UL );
fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL );
+
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_tile_out( topo, "snapin", 0UL, "snapin_lt", 0UL );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snaplt_out", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaplt", i, "metric_in", "snapin_lt", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaprd", 0UL, "metric_in", "snaplt_rd", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_out", i );
+ FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_rd", i );
+ }
} else {
fd_topob_wksp( topo, "genesi_out" );
fd_topob_link( topo, "genesi_out", "genesi_out", 2UL, 10UL*1024UL*1024UL+32UL+sizeof(fd_lthash_value_t), 1UL );
@@ -302,14 +333,7 @@ backtest_topo( config_t * config ) {
}
if( FD_LIKELY( !disable_snap_loader ) ) {
- /* Replay decoded manifest dcache topo obj */
- fd_topo_obj_t * replay_manifest_dcache = fd_topob_obj( topo, "dcache", "replay_manif" );
- fd_pod_insertf_ulong( topo->props, 2UL << 30UL, "obj.%lu.data_sz", replay_manifest_dcache->id );
- fd_pod_insert_ulong( topo->props, "manifest_dcache", replay_manifest_dcache->id );
-
fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
- fd_topob_tile_uses( topo, snapin_tile, replay_manifest_dcache, FD_SHMEM_JOIN_MODE_READ_WRITE );
- fd_topob_tile_uses( topo, replay_tile, replay_manifest_dcache, FD_SHMEM_JOIN_MODE_READ_ONLY );
}
for( ulong i=0UL; itile_cnt; i++ ) {
@@ -394,6 +418,16 @@ backtest_cmd_fn( args_t * args FD_PARAM_UNUSED,
ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
+ ulong volatile * snaplt_metrics[ FD_MAX_SNAPLT_TILES ];
+ ulong snaplt_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplt" );
+
+ for( ulong i=0UL; itiles[ snaplt_tile_idx ];
+ snaplt_metrics[ i ] = fd_metrics_tile( snaplt_tile->metrics );
+ }
+
ulong total_off_old = 0UL;
ulong snaprd_backp_old = 0UL;
ulong snaprd_wait_old = 0UL;
@@ -401,16 +435,34 @@ backtest_cmd_fn( args_t * args FD_PARAM_UNUSED,
ulong snapdc_wait_old = 0UL;
ulong snapin_backp_old = 0UL;
ulong snapin_wait_old = 0UL;
+ ulong snaplt_backp_old = 0UL;
+ ulong snaplt_wait_old = 0UL;
ulong acc_cnt_old = 0UL;
sleep( 1 );
- puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" );
+ puts( "" );
+ puts( "Columns:" );
+ puts( "- bw: Uncompressed bandwidth" );
+ puts( "- backp: Backpressured by downstream tile" );
+ puts( "- stall: Waiting on upstream tile" );
+ puts( "- acc: Number of accounts" );
+ puts( "" );
+ puts( "-------------backp=(snaprd,snapdc,snapin,snaplt) busy=(snaprd,snapdc,snapin,snaplt)---------------" );
long next = start+1000L*1000L*1000L;
for(;;) {
ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
+ ulong snaplt_status = ULONG_MAX;
+
+ ulong snaplt_status_sum = 0UL;
+ for( ulong i=0UL; i0UL ? 1UL : 2UL;
- if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break;
+ if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL && snaplt_status==2UL ) ) break;
long cur = fd_log_wallclock();
if( FD_UNLIKELY( cur
@@ -25,7 +26,9 @@ snapshot_load_topo( config_t * config,
args_t const * args ) {
fd_topo_t * topo = &config->topo;
fd_topob_new( &config->topo, config->name );
- topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
+ topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
+ ulong snaplt_tile_cnt = config->firedancer.layout.snaplt_tile_count;
+ int snaplt_disabled = config->development.snapshots.disable_lthash_verification;
fd_topob_wksp( topo, "funk" );
fd_topo_obj_t * funk_obj = setup_topo_funk( topo, "funk",
@@ -37,7 +40,7 @@ snapshot_load_topo( config_t * config,
static ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
if( args->snapshot_load.tile_cpus[0] ) {
ulong cpu_cnt = fd_tile_private_cpus_parse( args->snapshot_load.tile_cpus, tile_to_cpu );
- if( FD_UNLIKELY( cpu_cnt<4UL ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least 4", cpu_cnt ));
+ if( FD_UNLIKELY( cpu_cnt<4UL + (snaplt_disabled?0:snaplt_tile_cnt) ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least %lu", cpu_cnt, 4UL + (snaplt_disabled?snaplt_tile_cnt:0) ));
}
/* metrics tile *****************************************************/
@@ -77,6 +80,25 @@ snapshot_load_topo( config_t * config,
fd_topo_tile_t * snapin_tile = fd_topob_tile( topo, "snapin", "snapin", "snapin", tile_to_cpu[3], 0, 0 );
snapin_tile->allow_shutdown = 1;
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_wksp( topo, "snaplt" );
+ fd_topob_wksp( topo, "snapin_lt" );
+ fd_topob_wksp( topo, "snaplt_out" );
+ fd_topob_wksp( topo, "snaplt_rd" );
+
+ #define FOR(cnt) for( ulong i=0UL; iallow_shutdown = 1;
+ }
+ }
+
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_link( topo, "snapin_lt", "snapin_lt", 128UL, sizeof(fd_snapshot_existing_account_t), 1UL );
+ FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_out", "snaplt_out", 128UL, 2048UL, 1UL );
+ FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_rd", "snaplt_rd", 128UL, 0UL, 1UL );
+ }
+
/* uncompressed stream -> snapin tile */
fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snap_stream", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
@@ -86,7 +108,7 @@ snapshot_load_topo( config_t * config,
/* snapshot manifest out link */
fd_topob_wksp( topo, "snap_out" );
- fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 2UL, sizeof(fd_snapshot_manifest_t), 1UL );
+ fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 4UL, sizeof(fd_snapshot_manifest_t), 1UL );
snap_out_link->permit_no_consumers = 1;
fd_topob_tile_out( topo, "snapin", 0UL, "snap_out", 0UL );
@@ -99,6 +121,15 @@ snapshot_load_topo( config_t * config,
fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL );
+ if( FD_LIKELY( !snaplt_disabled ) ) {
+ fd_topob_tile_out( topo, "snapin", 0UL, "snapin_lt", 0UL );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in( topo, "snapin", 0UL, "metric_in", "snaplt_out", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in( topo, "snaplt", i, "metric_in", "snapin_lt", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaprd", 0UL, "metric_in", "snaplt_rd", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_out", i );
+ FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_rd", i );
+ }
+
for( ulong i=0UL; itile_cnt; i++ ) {
fd_topo_tile_t * tile = &topo->tiles[ i ];
fd_topo_configure_tile( tile, config );
@@ -159,6 +190,16 @@ snapshot_load_cmd_fn( args_t * args,
fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
+ ulong volatile * snaplt_metrics[ FD_MAX_SNAPLT_TILES ];
+ ulong snaplt_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplt" );
+
+ for( ulong i=0UL; itiles[ snaplt_tile_idx ];
+ snaplt_metrics[ i ] = fd_metrics_tile( snaplt_tile->metrics );
+ }
+
ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
@@ -170,6 +211,8 @@ snapshot_load_cmd_fn( args_t * args,
ulong snapdc_wait_old = 0UL;
ulong snapin_backp_old = 0UL;
ulong snapin_wait_old = 0UL;
+ ulong snaplt_backp_old = 0UL;
+ ulong snaplt_wait_old = 0UL;
ulong acc_cnt_old = 0UL;
sleep( 1 );
puts( "" );
@@ -179,14 +222,23 @@ snapshot_load_cmd_fn( args_t * args,
puts( "- stall: Waiting on upstream tile" );
puts( "- acc: Number of accounts" );
puts( "" );
- puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" );
+ puts( "-------------backp=(snaprd,snapdc,snapin,snaplt) busy=(snaprd,snapdc,snapin,snaplt)---------------" );
long next = start+1000L*1000L*1000L;
for(;;) {
ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
+ ulong snaplt_status = ULONG_MAX;
+
+ ulong snaplt_status_sum = 0UL;
+ for( ulong i=0UL; i0UL ? 1UL : 2UL;
- if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break;
+ if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL && snaplt_status==2UL ) ) break;
long cur = fd_log_wallclock();
if( FD_UNLIKELY( curfiredancer.layout.exec_tile_count;
ulong writer_tile_cnt = config->firedancer.layout.writer_tile_count;
ulong sign_tile_cnt = config->firedancer.layout.sign_tile_count;
+ ulong snaplt_tile_cnt = config->firedancer.layout.snaplt_tile_count;
int snapshots_enabled = !!config->gossip.entrypoints_cnt;
- int solcap_enabled = strcmp( "", config->capture.solcap_capture );
+ int snaplt_disabled = config->development.snapshots.disable_lthash_verification;
+ int solcap_enabled = strcmp( "", config->capture.solcap_capture );
fd_topo_t * topo = fd_topob_new( &config->topo, config->name );
@@ -316,6 +319,13 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "snap_out" ); /* TODO: Rename ... */
}
+ if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) {
+ fd_topob_wksp( topo, "snaplt" );
+ fd_topob_wksp( topo, "snapin_lt" );
+ fd_topob_wksp( topo, "snaplt_out" );
+ fd_topob_wksp( topo, "snaplt_rd" );
+ }
+
#define FOR(cnt) for( ulong i=0UL; inet.ingress_buffer_size, FD_NET_MTU, 1UL );
if( FD_LIKELY( snapshots_enabled ) ) {
- /**/ fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384UL, 1UL ); /* TODO: Rename */
- /**/ fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL ); /* TODO: Rename */
- /**/ fd_topob_link( topo, "snap_out", "snap_out", 2UL, sizeof(fd_snapshot_manifest_t), 1UL ); /* TODO: Rename */
- /**/ fd_topob_link( topo, "snapdc_rd", "snapdc_rd", 128UL, 0UL, 1UL );
- /**/ fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
+ /**/ fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384UL, 1UL ); /* TODO: Rename */
+ /**/ fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL ); /* TODO: Rename */
+ /**/ fd_topob_link( topo, "snap_out", "snap_out", 4UL, sizeof(fd_snapshot_manifest_t), 1UL ); /* TODO: Rename */
+ /**/ fd_topob_link( topo, "snapdc_rd", "snapdc_rd", 128UL, 0UL, 1UL );
+ /**/ fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
+ }
+
+ if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) {
+ fd_topob_link( topo, "snapin_lt", "snapin_lt", 128UL, sizeof(fd_snapshot_existing_account_t), 1UL );
+ FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_out", "snaplt_out", 128UL, 2048UL, 1UL );
+ FOR(snaplt_tile_cnt) fd_topob_link( topo, "snaplt_rd", "snaplt_rd", 128UL, 0UL, 1UL );
}
/**/ fd_topob_link( topo, "genesi_out", "genesi_out", 2UL, 10UL*1024UL*1024UL+32UL+sizeof(fd_lthash_value_t), 1UL );
@@ -434,6 +450,10 @@ fd_topo_initialize( config_t * config ) {
fd_topob_tile( topo, "snapin", "snapin", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 )->allow_shutdown = 1;
}
+ if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) {
+ for( ulong i=0UL; itile_cnt ], 0, 0 )->allow_shutdown = 1;
+ }
+
/**/ fd_topob_tile( topo, "genesi", "genesi", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 )->allow_shutdown = 1;
/**/ fd_topob_tile( topo, "ipecho", "ipecho", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
FOR(gossvf_tile_cnt) fd_topob_tile( topo, "gossvf", "gossvf", "metric_in", tile_to_cpu[ topo->tile_cnt ], 0, 1 );
@@ -506,6 +526,15 @@ fd_topo_initialize( config_t * config ) {
fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "snap_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
}
+ if( FD_LIKELY( snapshots_enabled && !snaplt_disabled ) ) {
+ fd_topob_tile_out( topo, "snapin", 0UL, "snapin_lt", 0UL );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snaplt_out", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaplt", i, "metric_in", "snapin_lt", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_in ( topo, "snaprd", 0UL, "metric_in", "snaplt_rd", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
+ FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_out", i );
+ FOR(snaplt_tile_cnt) fd_topob_tile_out( topo, "snaplt", i, "snaplt_rd", i );
+ }
+
/**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "genesi_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "gossip_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "repair", 0UL, "metric_in", "tower_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
@@ -959,7 +988,9 @@ fd_topo_configure_tile( fd_topo_tile_t * tile,
tile->snapin.funk_obj_id = fd_pod_query_ulong( config->topo.props, "funk", ULONG_MAX );
- } else if( FD_UNLIKELY( !strcmp( tile->name, "repair" ) ) ) {
+ } else if( FD_UNLIKELY( !strcmp( tile->name, "snaplt" ) ) ) {
+
+ }else if( FD_UNLIKELY( !strcmp( tile->name, "repair" ) ) ) {
tile->repair.max_pending_shred_sets = config->tiles.shred.max_pending_shred_sets;
tile->repair.repair_intake_listen_port = config->tiles.repair.repair_intake_listen_port;
tile->repair.repair_serve_listen_port = config->tiles.repair.repair_serve_listen_port;
diff --git a/src/app/shared/fd_config.c b/src/app/shared/fd_config.c
index e4f5ebaa347..c78f4f35c89 100644
--- a/src/app/shared/fd_config.c
+++ b/src/app/shared/fd_config.c
@@ -457,6 +457,7 @@ fd_config_fill( fd_config_t * config,
static void
fd_config_validatef( fd_configf_t const * config ) {
CFG_HAS_NON_ZERO( layout.sign_tile_count );
+ CFG_HAS_NON_ZERO( layout.snaplt_tile_count );
if( FD_UNLIKELY( config->layout.sign_tile_count < 2 ) ) {
FD_LOG_ERR(( "layout.sign_tile_count must be >= 2" ));
}
diff --git a/src/app/shared/fd_config.h b/src/app/shared/fd_config.h
index dbba7e93ff7..ee50c219221 100644
--- a/src/app/shared/fd_config.h
+++ b/src/app/shared/fd_config.h
@@ -117,6 +117,7 @@ struct fd_configf {
uint writer_tile_count;
uint sign_tile_count;
uint gossvf_tile_count;
+ uint snaplt_tile_count;
} layout;
struct {
@@ -349,6 +350,10 @@ struct fd_config {
char affinity[ AFFINITY_SZ ];
} udpecho;
+ struct {
+ int disable_lthash_verification;
+ } snapshots;
+
struct {
char affinity[ AFFINITY_SZ ];
} snapshot_load;
diff --git a/src/app/shared/fd_config_parse.c b/src/app/shared/fd_config_parse.c
index 1492ffb2721..3b26daf56dc 100644
--- a/src/app/shared/fd_config_parse.c
+++ b/src/app/shared/fd_config_parse.c
@@ -82,6 +82,7 @@ fd_config_extract_podf( uchar * pod,
CFG_POP ( uint, layout.writer_tile_count );
CFG_POP ( uint, layout.sign_tile_count );
CFG_POP ( uint, layout.gossvf_tile_count );
+ CFG_POP ( uint, layout.snaplt_tile_count );
CFG_POP ( ulong, blockstore.shred_max );
CFG_POP ( ulong, blockstore.block_max );
@@ -322,7 +323,8 @@ fd_config_extract_pod( uchar * pod,
CFG_POP ( cstr, development.pktgen.affinity );
CFG_POP ( cstr, development.pktgen.fake_dst_ip );
- CFG_POP ( cstr, development.udpecho.affinity );
+ CFG_POP ( cstr, development.udpecho.affinity );
+ CFG_POP ( bool, development.snapshots.disable_lthash_verification );
CFG_POP ( bool, development.gui.websocket_compression );
diff --git a/src/app/shared/fd_tile_unit_test.c b/src/app/shared/fd_tile_unit_test.c
index 58e68245664..4933e065556 100644
--- a/src/app/shared/fd_tile_unit_test.c
+++ b/src/app/shared/fd_tile_unit_test.c
@@ -81,6 +81,7 @@ fd_topo_run_tile_t dummy_tile_rpcsrv = { .name = "rpcsrv" };
fd_topo_run_tile_t dummy_tile_snaprd = { .name = "snaprd" };
fd_topo_run_tile_t dummy_tile_snapdc = { .name = "snapdc" };
fd_topo_run_tile_t dummy_tile_snapin = { .name = "snapin" };
+fd_topo_run_tile_t dummy_tile_snaplt = { .name = "snaplt" };
fd_topo_run_tile_t dummy_tile_arch_f = { .name = "arch_f" };
fd_topo_run_tile_t dummy_tile_arch_w = { .name = "arch_w" };
fd_topo_run_tile_t dummy_tile_scap = { .name = "scap" };
@@ -123,6 +124,7 @@ fd_topo_run_tile_t * TILES[] = {
&dummy_tile_snaprd,
&dummy_tile_snapdc,
&dummy_tile_snapin,
+ &dummy_tile_snaplt,
&dummy_tile_arch_f,
&dummy_tile_arch_w,
&dummy_tile_scap,
diff --git a/src/disco/metrics/generate/types.py b/src/disco/metrics/generate/types.py
index 4656d1f834b..e8313fafb62 100644
--- a/src/disco/metrics/generate/types.py
+++ b/src/disco/metrics/generate/types.py
@@ -30,10 +30,11 @@ class Tile(Enum):
SNAPRD = 24
SNAPDC = 25
SNAPIN = 26
- IPECHO = 27
- GOSSVF = 28
- BANKF = 29
- RESOLF = 30
+ SNAPLT = 27
+ IPECHO = 28
+ GOSSVF = 29
+ BANKF = 30
+ RESOLF = 31
class MetricType(Enum):
COUNTER = 0
diff --git a/src/disco/metrics/generated/fd_metrics_all.c b/src/disco/metrics/generated/fd_metrics_all.c
index a19db0267a8..cf6d16d2e24 100644
--- a/src/disco/metrics/generated/fd_metrics_all.c
+++ b/src/disco/metrics/generated/fd_metrics_all.c
@@ -59,6 +59,7 @@ const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT] = {
"snaprd",
"snapdc",
"snapin",
+ "snaplt",
"ipecho",
"gossvf",
"bankf",
@@ -89,6 +90,7 @@ const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT] = {
FD_METRICS_SNAPRD_TOTAL,
FD_METRICS_SNAPDC_TOTAL,
FD_METRICS_SNAPIN_TOTAL,
+ FD_METRICS_SNAPLT_TOTAL,
FD_METRICS_IPECHO_TOTAL,
FD_METRICS_GOSSVF_TOTAL,
FD_METRICS_BANKF_TOTAL,
@@ -118,6 +120,7 @@ const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT]
FD_METRICS_SNAPRD,
FD_METRICS_SNAPDC,
FD_METRICS_SNAPIN,
+ FD_METRICS_SNAPLT,
FD_METRICS_IPECHO,
FD_METRICS_GOSSVF,
FD_METRICS_BANKF,
diff --git a/src/disco/metrics/generated/fd_metrics_all.h b/src/disco/metrics/generated/fd_metrics_all.h
index cb843373cee..54ff87d1101 100644
--- a/src/disco/metrics/generated/fd_metrics_all.h
+++ b/src/disco/metrics/generated/fd_metrics_all.h
@@ -27,6 +27,7 @@
#include "fd_metrics_snaprd.h"
#include "fd_metrics_snapdc.h"
#include "fd_metrics_snapin.h"
+#include "fd_metrics_snaplt.h"
#include "fd_metrics_metric.h"
#include "fd_metrics_ipecho.h"
/* Start of LINK OUT metrics */
@@ -165,7 +166,7 @@ extern const fd_metrics_meta_t FD_METRICS_ALL_LINK_OUT[FD_METRICS_ALL_LINK_OUT_T
#define FD_METRICS_TOTAL_SZ (8UL*253UL)
-#define FD_METRICS_TILE_KIND_CNT 27
+#define FD_METRICS_TILE_KIND_CNT 28
extern const char * FD_METRICS_TILE_KIND_NAMES[FD_METRICS_TILE_KIND_CNT];
extern const ulong FD_METRICS_TILE_KIND_SIZES[FD_METRICS_TILE_KIND_CNT];
extern const fd_metrics_meta_t * FD_METRICS_TILE_KIND_METRICS[FD_METRICS_TILE_KIND_CNT];
diff --git a/src/disco/metrics/generated/fd_metrics_snaplt.c b/src/disco/metrics/generated/fd_metrics_snaplt.c
new file mode 100644
index 00000000000..acbd267fdac
--- /dev/null
+++ b/src/disco/metrics/generated/fd_metrics_snaplt.c
@@ -0,0 +1,8 @@
+/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */
+#include "fd_metrics_snaplt.h"
+
+const fd_metrics_meta_t FD_METRICS_SNAPLT[FD_METRICS_SNAPLT_TOTAL] = {
+ DECLARE_METRIC( SNAPLT_STATE, GAUGE ),
+ DECLARE_METRIC( SNAPLT_FULL_ACCOUNTS_HASHED, GAUGE ),
+ DECLARE_METRIC( SNAPLT_INCREMENTAL_ACCOUNTS_HASHED, GAUGE ),
+};
diff --git a/src/disco/metrics/generated/fd_metrics_snaplt.h b/src/disco/metrics/generated/fd_metrics_snaplt.h
new file mode 100644
index 00000000000..e9145be08fe
--- /dev/null
+++ b/src/disco/metrics/generated/fd_metrics_snaplt.h
@@ -0,0 +1,25 @@
+/* THIS FILE IS GENERATED BY gen_metrics.py. DO NOT HAND EDIT. */
+
+#include "../fd_metrics_base.h"
+#include "fd_metrics_enums.h"
+
+#define FD_METRICS_GAUGE_SNAPLT_STATE_OFF (16UL)
+#define FD_METRICS_GAUGE_SNAPLT_STATE_NAME "snaplt_state"
+#define FD_METRICS_GAUGE_SNAPLT_STATE_TYPE (FD_METRICS_TYPE_GAUGE)
+#define FD_METRICS_GAUGE_SNAPLT_STATE_DESC "State of the tile. 0=hashing, 1=done, 2=shutdown"
+#define FD_METRICS_GAUGE_SNAPLT_STATE_CVT (FD_METRICS_CONVERTER_NONE)
+
+#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_OFF (17UL)
+#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_NAME "snaplt_full_accounts_hashed"
+#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_TYPE (FD_METRICS_TYPE_GAUGE)
+#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_DESC "Number of accounts hashed for the full snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted"
+#define FD_METRICS_GAUGE_SNAPLT_FULL_ACCOUNTS_HASHED_CVT (FD_METRICS_CONVERTER_NONE)
+
+#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_OFF (18UL)
+#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_NAME "snaplt_incremental_accounts_hashed"
+#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_TYPE (FD_METRICS_TYPE_GAUGE)
+#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_DESC "Number of accounts hashed for the incremental snapshot during snapshot loading. Might decrease if snapshot load is aborted and restarted"
+#define FD_METRICS_GAUGE_SNAPLT_INCREMENTAL_ACCOUNTS_HASHED_CVT (FD_METRICS_CONVERTER_NONE)
+
+#define FD_METRICS_SNAPLT_TOTAL (3UL)
+extern const fd_metrics_meta_t FD_METRICS_SNAPLT[FD_METRICS_SNAPLT_TOTAL];
diff --git a/src/disco/metrics/metrics.xml b/src/disco/metrics/metrics.xml
index 38655d25a56..677da8b64d4 100644
--- a/src/disco/metrics/metrics.xml
+++ b/src/disco/metrics/metrics.xml
@@ -1004,6 +1004,12 @@ metric introduced.
+
+
+
+
+
+
diff --git a/src/disco/topo/fd_topob.c b/src/disco/topo/fd_topob.c
index 12786f5abbb..3597c78eb1e 100644
--- a/src/disco/topo/fd_topob.c
+++ b/src/disco/topo/fd_topob.c
@@ -383,6 +383,7 @@ fd_topob_auto_layout( fd_topo_t * topo,
"snaprd", /* FIREDANCER only */
"snapdc", /* FIREDANCER only */
"snapin", /* FIREDANCER only */
+ "snaplt", /* FIREDANCER only */
"arch_f", /* FIREDANCER only */
"arch_w", /* FIREDANCER only */
};
diff --git a/src/discof/restore/Local.mk b/src/discof/restore/Local.mk
index a4c1d484542..0811e38e988 100644
--- a/src/discof/restore/Local.mk
+++ b/src/discof/restore/Local.mk
@@ -6,6 +6,7 @@ endif
ifdef FD_HAS_INT128
$(call add-objs,fd_snapin_tile,fd_discof)
endif
+$(call add-objs,fd_snaplt_tile,fd_discof)
endif
ifdef FD_HAS_INT128
$(call add-objs,utils/fd_snapshot_parser,fd_discof)
diff --git a/src/discof/restore/fd_snapin_tile.c b/src/discof/restore/fd_snapin_tile.c
index befe8fc2e91..76168c11644 100644
--- a/src/discof/restore/fd_snapin_tile.c
+++ b/src/discof/restore/fd_snapin_tile.c
@@ -7,6 +7,8 @@
#include "../../flamenco/runtime/fd_acc_mgr.h"
#include "../../flamenco/types/fd_types.h"
#include "../../funk/fd_funk.h"
+#include "../../ballet/lthash/fd_lthash.h"
+#include "../../flamenco/runtime/fd_hashes.h"
#define NAME "snapin"
@@ -20,20 +22,32 @@
#define FD_SNAPIN_STATE_MALFORMED (1) /* The snapshot is malformed, we are waiting for a reset notification */
#define FD_SNAPIN_STATE_SHUTDOWN (2) /* The tile is done, been told to shut down, and has likely already exited */
+#define FD_SNAPIN_HSH_IDX (2UL)
+
struct fd_snapin_tile {
- int full;
- int state;
+ int full;
+ int state;
+ int pending_ack;
ulong seed;
long boot_timestamp;
fd_funk_t funk[1];
+ fd_funk_txn_t * root_funk_txn;
fd_funk_txn_t * funk_txn;
uchar * acc_data;
- fd_stem_context_t * stem;
+ fd_stem_context_t * stem;
fd_snapshot_parser_t * ssparse;
+ struct {
+ int enabled;
+ ulong received_lthashes;
+ ulong num_hash_tiles;
+ fd_lthash_value_t expected_lthash;
+ fd_lthash_value_t calculated_lthash;
+ } hash_info;
+
struct {
ulong full_bytes_read;
ulong incremental_bytes_read;
@@ -45,6 +59,7 @@ struct fd_snapin_tile {
ulong chunk0;
ulong wmark;
ulong mtu;
+ ulong chunk_offset;
} in;
struct {
@@ -54,6 +69,21 @@ struct fd_snapin_tile {
ulong chunk;
ulong mtu;
} manifest_out;
+
+ struct {
+ fd_wksp_t * wksp;
+ ulong chunk0;
+ ulong wmark;
+ ulong chunk;
+ ulong mtu;
+ } hash_out;
+
+ struct {
+ fd_wksp_t * wksp;
+ ulong chunk0;
+ ulong wmark;
+ ulong mtu;
+ } hash_in[ FD_MAX_SNAPLT_TILES ];
};
typedef struct fd_snapin_tile fd_snapin_tile_t;
@@ -89,14 +119,34 @@ metrics_write( fd_snapin_tile_t * ctx ) {
FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
}
+static void
+transition_malformed( fd_snapin_tile_t * ctx,
+ fd_stem_context_t * stem ) {
+ ctx->state = FD_SNAPIN_STATE_MALFORMED;
+ fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
+}
+
static void
manifest_cb( void * _ctx ) {
fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
+ ulong sz = sizeof(fd_snapshot_manifest_t);
+
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) {
+ fd_snapshot_manifest_t const * manifest = fd_chunk_to_laddr_const( ctx->manifest_out.wksp, ctx->manifest_out.chunk );
+ if( FD_LIKELY( manifest->has_accounts_lthash ) ) {
+ fd_memcpy( &ctx->hash_info.expected_lthash, manifest->accounts_lthash, sizeof(fd_lthash_value_t) );
+ } else {
+ FD_LOG_WARNING(( "snapshot manifest doesn't have an accounts lthash" ));
+ transition_malformed( ctx, ctx->stem );
+ }
+ }
+
+ FD_TEST( sz<=ctx->manifest_out.mtu );
ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
- fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
- ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
+ fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sz, 0UL, 0UL, 0UL );
+ ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sz, ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
}
static void
@@ -106,10 +156,41 @@ account_cb( void * _ctx,
fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t*)hdr->meta.pubkey );
fd_funk_rec_query_t query[1];
- fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->funk_txn, &id, query );
+ fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->funk_txn, &id, query );
+ fd_funk_rec_t const * existing_rec = rec;
int should_publish = 0;
fd_funk_rec_prepare_t prepare[1];
+ if( FD_LIKELY( !existing_rec && !ctx->full ) ) {
+ /* An existing record may exist in an ancestor transaction when
+ loading the incremental snapshot. */
+ existing_rec = fd_funk_rec_clone( ctx->funk, ctx->funk_txn, &id, prepare, NULL );
+ }
+ if( FD_LIKELY( existing_rec ) ) {
+ /* If a record exists either in the current txn, its hash needs to
+ be subtracted from the running hash in the hashing tiles. */
+ fd_account_meta_t * meta = fd_funk_val( existing_rec, ctx->funk->wksp );
+ if( FD_UNLIKELY( meta ) ) {
+ if( FD_LIKELY( meta->slot>ctx->ssparse->accv_slot ) ) {
+ /* Existing record has a higher slot than the current account
+ being inserted. Ignore the current account and keep the
+ existing record as is. */
+ ctx->acc_data = NULL;
+ return;
+ }
+
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) {
+ /* Account is a duplicate, need to subtract its hash before
+ updating the record in funk. */
+ fd_snapshot_existing_account_t * existing_account = fd_chunk_to_laddr( ctx->hash_out.wksp, ctx->hash_out.chunk );
+ fd_snapshot_account_init( &existing_account->hdr, hdr->meta.pubkey, meta->owner, meta->lamports, meta->executable, meta->dlen );
+ fd_memcpy( existing_account->data, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen );
+ fd_stem_publish( ctx->stem, FD_SNAPIN_HSH_IDX, FD_SNAPSHOT_HASH_MSG_SUB, ctx->hash_out.chunk, sizeof(fd_snapshot_existing_account_t), 0UL, 0UL, 0UL );
+ ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sizeof(fd_snapshot_existing_account_t), ctx->hash_out.chunk0, ctx->hash_out.wmark );
+ }
+ }
+ }
+
if( FD_LIKELY( !rec ) ) {
should_publish = 1;
rec = fd_funk_rec_prepare( ctx->funk, ctx->funk_txn, &id, prepare, NULL );
@@ -117,24 +198,13 @@ account_cb( void * _ctx,
}
fd_account_meta_t * meta = fd_funk_val( rec, ctx->funk->wksp );
- if( FD_UNLIKELY( meta ) ) {
- if( FD_LIKELY( meta->slot>ctx->ssparse->accv_slot ) ) {
- ctx->acc_data = NULL;
- return;
- }
-
- /* TODO: Reaching here means the existing value is a duplicate
- account. We need to hash the existing account and subtract that
- hash from the running lthash. */
- }
-
if( FD_LIKELY( rec->val_szmeta.data_len ) ) {
meta = fd_funk_val_truncate( (fd_funk_rec_t*)rec, ctx->funk->alloc, ctx->funk->wksp, 0UL, sizeof(fd_account_meta_t)+hdr->meta.data_len, NULL );
FD_TEST( meta );
}
- meta->dlen = (uint)hdr->meta.data_len;
- meta->slot = ctx->ssparse->accv_slot;
+ meta->dlen = (uint)hdr->meta.data_len;
+ meta->slot = ctx->ssparse->accv_slot;
memcpy( meta->owner, hdr->info.owner, sizeof(fd_pubkey_t) );
meta->lamports = hdr->info.lamports;
meta->executable = hdr->info.executable;
@@ -143,6 +213,14 @@ account_cb( void * _ctx,
ctx->metrics.accounts_inserted++;
if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( ctx->funk, prepare );
+
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) {
+ /* send account hdr to snaplt tile */
+ fd_snapshot_account_t * account = fd_chunk_to_laddr( ctx->hash_out.wksp, ctx->hash_out.chunk );
+ fd_snapshot_account_init( account, hdr->meta.pubkey, hdr->info.owner, hdr->info.lamports, hdr->info.executable, hdr->meta.data_len );
+ fd_stem_publish( ctx->stem, FD_SNAPIN_HSH_IDX, FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR, ctx->hash_out.chunk, sizeof(fd_snapshot_account_t), 0UL, 0UL, 0UL );
+ ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sizeof(fd_snapshot_account_t), ctx->hash_out.chunk0, ctx->hash_out.wmark );
+ }
}
static void
@@ -154,21 +232,23 @@ account_data_cb( void * _ctx,
fd_memcpy( ctx->acc_data, buf, data_sz );
ctx->acc_data += data_sz;
-}
-static void
-transition_malformed( fd_snapin_tile_t * ctx,
- fd_stem_context_t * stem ) {
- ctx->state = FD_SNAPIN_STATE_MALFORMED;
- fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) {
+ FD_TEST( data_sz<=ctx->hash_out.mtu );
+ /* send acc data to snaplt tile */
+ uchar * snaplt_acc_data = fd_chunk_to_laddr( ctx->hash_out.wksp, ctx->hash_out.chunk );
+ fd_memcpy( snaplt_acc_data, buf, data_sz );
+ fd_stem_publish( ctx->stem, FD_SNAPIN_HSH_IDX, FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA, ctx->hash_out.chunk, data_sz, 0UL, 0UL, 0UL );
+ ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, data_sz, ctx->hash_out.chunk0, ctx->hash_out.wmark );
+ }
}
-static void
+static int
handle_data_frag( fd_snapin_tile_t * ctx,
ulong chunk,
ulong sz,
fd_stem_context_t * stem ) {
- if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return;
+ if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return 0;
FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE );
FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
@@ -176,49 +256,61 @@ handle_data_frag( fd_snapin_tile_t * ctx,
if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) {
FD_LOG_WARNING(( "received data fragment while in done state" ));
transition_malformed( ctx, stem );
- return;
+ return 0;
}
uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
- uchar const * const chunk_end = chunk_start + sz;
- uchar const * cur = chunk_start;
-
- for(;;) {
- if( FD_UNLIKELY( cur>=chunk_end ) ) {
- break;
- }
-
- cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) );
- if( FD_UNLIKELY( ctx->ssparse->flags ) ) {
- if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) {
- transition_malformed( ctx, stem );
- return;
- }
+ uchar const * const chunk_end = chunk_start + sz;
+ uchar const * cur = chunk_start + ctx->in.chunk_offset;
+
+ cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) );
+ if( FD_UNLIKELY( ctx->ssparse->flags ) ) {
+ if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) {
+ transition_malformed( ctx, stem );
+ return 0;
}
}
+ ctx->in.chunk_offset = (ulong)(cur - chunk_start);
+
if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_DONE ) ) ctx->state = FD_SNAPIN_STATE_DONE;
if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += sz;
else ctx->metrics.incremental_bytes_read += sz;
+
+ if( FD_UNLIKELY( ctx->in.chunk_offset==sz ) ) {
+ ctx->in.chunk_offset = 0UL;
+ return 0;
+ }
+
+ return 1;
}
static void
handle_control_frag( fd_snapin_tile_t * ctx,
fd_stem_context_t * stem,
- ulong sig ) {
+ ulong sig,
+ ulong in_idx,
+ ulong chunk ) {
+ /* 1. Pass the control message downstream to the next consumer. */
+ if( FD_LIKELY( ctx->hash_info.enabled && sig!=FD_SNAPSHOT_HASH_MSG_RESULT ) )
+ fd_stem_publish( stem, FD_SNAPIN_HSH_IDX, sig, ctx->hash_out.chunk, 0UL, 0UL, 0UL, 0UL );
+
switch( sig ) {
case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
ctx->full = 1;
fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
- fd_funk_txn_cancel_root( ctx->funk );
+ fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
+ fd_lthash_zero( &ctx->hash_info.expected_lthash );
+ fd_lthash_zero( &ctx->hash_info.calculated_lthash );
ctx->state = FD_SNAPIN_STATE_LOADING;
break;
case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
ctx->full = 0;
fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
- if( FD_UNLIKELY( !ctx->funk_txn ) ) fd_funk_txn_cancel_root( ctx->funk );
- else fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
+ fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
+ fd_lthash_zero( &ctx->hash_info.expected_lthash );
+ fd_lthash_zero( &ctx->hash_info.calculated_lthash );
ctx->state = FD_SNAPIN_STATE_LOADING;
break;
case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
@@ -229,10 +321,10 @@ handle_control_frag( fd_snapin_tile_t * ctx,
break;
}
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) ctx->pending_ack = 1;
fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
-
fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
- ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
+ ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->root_funk_txn, &incremental_xid, 0 );
FD_TEST( ctx->funk_txn );
ctx->full = 0;
@@ -245,18 +337,50 @@ handle_control_frag( fd_snapin_tile_t * ctx,
break;
}
- if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
- fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) ctx->pending_ack = 1;
break;
case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
+ FD_TEST( ctx->pending_ack==0 );
ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
+
+ fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
+ fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
+ break;
+ case FD_SNAPSHOT_HASH_MSG_RESULT: {
+ FD_TEST( ctx->hash_info.enabled && ctx->pending_ack );
+ /* TODO: more robust in indexing */
+ fd_lthash_value_t const * calculated_lthash = fd_chunk_to_laddr_const( ctx->hash_in[ in_idx - 1UL ].wksp, chunk );
+ fd_lthash_add( &ctx->hash_info.calculated_lthash, calculated_lthash );
+ ctx->hash_info.received_lthashes++;
+
+ if( FD_LIKELY( ctx->hash_info.received_lthashes!=ctx->hash_info.num_hash_tiles ) ) break;
+
+ ctx->pending_ack = 0;
+ if( FD_UNLIKELY( memcmp( &ctx->hash_info.expected_lthash, &ctx->hash_info.calculated_lthash, sizeof(fd_lthash_value_t) ) ) ) {
+ FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in snapshot manifest",
+ FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.calculated_lthash ),
+ FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.expected_lthash ) ));
+ transition_malformed( ctx, stem );
+ } else {
+ FD_LOG_NOTICE(( "calculated accounts lthash %s matches accounts lthash %s in snapshot manifest",
+ FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.calculated_lthash ),
+ FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_info.expected_lthash ) ));
+ }
+ ctx->hash_info.received_lthashes = 0UL;
break;
+ }
default:
FD_LOG_ERR(( "unexpected control sig %lu", sig ));
return;
}
+ /* snapin waits for result lthashes to come back from the snaplt tiles.
+ Until these hashes come back, snapin cannot ack the control message
+ sent from snaprd because snaprd would advance before snapin
+ is ready to receive another snapshot byte stream or shutdown. */
+ if( FD_UNLIKELY( ctx->pending_ack ) ) return;
+
/* We must acknowledge after handling the control frag, because if it
causes us to generate a malformed transition, that must be sent
back to the snaprd controller before the acknowledgement. */
@@ -284,8 +408,8 @@ returnable_frag( fd_snapin_tile_t * ctx,
FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
- if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
- else handle_control_frag( ctx, stem, sig );
+ if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem );
+ else handle_control_frag( ctx, stem, sig, in_idx, chunk );
return 0;
}
@@ -310,23 +434,31 @@ unprivileged_init( fd_topo_t * topo,
fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
- ctx->full = 1;
- ctx->state = FD_SNAPIN_STATE_LOADING;
+ ctx->full = 1;
+ ctx->state = FD_SNAPIN_STATE_LOADING;
+ ctx->pending_ack = 0;
ctx->boot_timestamp = fd_log_wallclock();
FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
- ctx->funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map );
+ ctx->root_funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map );
+ ctx->funk_txn = ctx->root_funk_txn;
- ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb );
+ ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb );
FD_TEST( ctx->ssparse );
fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
+ ulong num_hash_tiles = fd_topo_tile_name_cnt( topo, "snaplt" );
+ ctx->hash_info.num_hash_tiles = num_hash_tiles;
+ ctx->hash_info.received_lthashes = 0UL;
+ ctx->hash_info.enabled = num_hash_tiles>0UL;
+ ulong snapin_out_cnt = ctx->hash_info.enabled ? 3UL : 2UL;
+
if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
- if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
- if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
+ if( FD_UNLIKELY( tile->in_cnt!=1UL + num_hash_tiles ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 2", tile->in_cnt ));
+ if( FD_UNLIKELY( tile->out_cnt!=snapin_out_cnt ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 3", tile->out_cnt ));
fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
ctx->manifest_out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
@@ -343,9 +475,36 @@ unprivileged_init( fd_topo_t * topo,
ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
ctx->in.mtu = in_link->mtu;
+ ctx->in.chunk_offset = 0UL;
+
+ if( FD_LIKELY( ctx->hash_info.enabled ) ) {
+ for( ulong i=0UL; ilinks[ tile->in_link_id[ i+1UL ] ];
+ FD_TEST( strcmp( hash_in_link->name, "snaplt_out" )==0 );
+ fd_topo_wksp_t const * hash_in_wksp = &topo->workspaces[ topo->objs[ hash_in_link->dcache_obj_id ].wksp_id ];
+ ctx->hash_in[ i ].wksp = hash_in_wksp->wksp;
+ ctx->hash_in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->hash_in[ i ].wksp, hash_in_link->dcache );
+ ctx->hash_in[ i ].wmark = fd_dcache_compact_wmark( ctx->hash_in[ i ].wksp, hash_in_link->dcache, hash_in_link->mtu );
+ ctx->hash_in[ i ].mtu = hash_in_link->mtu;
+ }
+
+ fd_topo_link_t const * hash_out_link = &topo->links[ tile->out_link_id[ FD_SNAPIN_HSH_IDX ] ];
+ FD_TEST( strcmp( hash_out_link->name, "snapin_lt" )==0 );
+ ctx->hash_out.wksp = topo->workspaces[ topo->objs[ hash_out_link->dcache_obj_id ].wksp_id ].wksp;
+ ctx->hash_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( hash_out_link->dcache ), hash_out_link->dcache );
+ ctx->hash_out.wmark = fd_dcache_compact_wmark ( ctx->hash_out.wksp, hash_out_link->dcache, hash_out_link->mtu );
+ ctx->hash_out.chunk = ctx->hash_out.chunk0;
+ ctx->hash_out.mtu = hash_out_link->mtu;
+ }
+
+ fd_lthash_zero( &ctx->hash_info.expected_lthash );
+ fd_lthash_zero( &ctx->hash_info.calculated_lthash );
}
-#define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */
+/* For control fragments, one acknowledgement, and one malformed
+ message. Or one FD_SNAPSHOT_HASH_MSG_SUB, one
+ FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR, and one malformed message */
+#define STEM_BURST 3UL
#define STEM_LAZY 1000L
#define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
diff --git a/src/discof/restore/fd_snaplt_tile.c b/src/discof/restore/fd_snaplt_tile.c
new file mode 100644
index 00000000000..86aed118831
--- /dev/null
+++ b/src/discof/restore/fd_snaplt_tile.c
@@ -0,0 +1,306 @@
+#include "../../disco/topo/fd_topo.h"
+#include "../../disco/metrics/fd_metrics.h"
+#include "../../ballet/lthash/fd_lthash.h"
+#include "../../flamenco/runtime/fd_hashes.h"
+#include "../../flamenco/runtime/fd_acc_mgr.h"
+
+#include "utils/fd_ssctrl.h"
+
+#define NAME "snaplt"
+
+/* The snaplt tile is a state machine that hashes accounts from an
+ account input stream that it receives from the snapin tile.
+
+ An account input stream starts with a SNAPSHOT_HASH_MSG_RESET
+ message, which indicates the start of an account input stream.
+ An account input stream ends with a SNAPSHOT_HASH_MSG_FINI message,
+ indicating snaplt should send its calculated accounts hash to snapin
+ with a SNAPSHOT_HASH_MSG_RESULT message. */
+
+#define FD_SNAPLT_STATE_HASHING (0)
+#define FD_SNAPLT_STATE_DONE (1)
+#define FD_SNAPLT_STATE_SHUTDOWN (2)
+
+struct fd_snaplt_tile {
+ int state;
+ int full;
+
+ fd_lthash_value_t running_lthash;
+
+ fd_snapshot_account_t account;
+ ulong acc_data_sz;
+
+ int hash_account;
+ ulong num_hash_tiles;
+ ulong hash_tile_idx;
+ fd_blake3_t b3[1];
+
+
+ struct {
+ struct {
+ ulong accounts_hashed;
+ } full;
+
+ struct {
+ ulong accounts_hashed;
+ } incremental;
+ } metrics;
+
+ struct {
+ fd_wksp_t * wksp;
+ ulong chunk0;
+ ulong wmark;
+ ulong mtu;
+ } in;
+
+ struct {
+ fd_wksp_t * wksp;
+ ulong chunk0;
+ ulong wmark;
+ ulong chunk;
+ ulong mtu;
+ } out;
+
+};
+
+typedef struct fd_snaplt_tile fd_snaplt_tile_t;
+
+static int
+should_hash_account( fd_snaplt_tile_t * ctx,
+ uchar const account_pubkey[ static FD_HASH_FOOTPRINT ] ) {
+ return fd_hash( account_pubkey[ 4UL ], account_pubkey, sizeof(fd_pubkey_t) )%ctx->num_hash_tiles==ctx->hash_tile_idx;
+}
+
+static inline int
+should_shutdown( fd_snaplt_tile_t * ctx ) {
+ return ctx->state==FD_SNAPLT_STATE_SHUTDOWN;
+}
+
+static ulong
+scratch_align( void ) {
+ return 128UL;
+}
+
+static ulong
+scratch_footprint( fd_topo_tile_t const * tile ) {
+ (void)tile;
+ ulong l = FD_LAYOUT_INIT;
+ l = FD_LAYOUT_APPEND( l, alignof(fd_snaplt_tile_t), sizeof(fd_snaplt_tile_t) );
+ return FD_LAYOUT_FINI( l, alignof(fd_snaplt_tile_t) );
+}
+
+static void
+metrics_write( fd_snaplt_tile_t * ctx ) {
+ FD_MGAUGE_SET( SNAPLT, FULL_ACCOUNTS_HASHED, ctx->metrics.full.accounts_hashed );
+ FD_MGAUGE_SET( SNAPLT, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
+ FD_MGAUGE_SET( SNAPLT, STATE, (ulong)(ctx->state) );
+}
+
+static void
+handle_data_frag( fd_snaplt_tile_t * ctx,
+ ulong sig,
+ ulong chunk,
+ ulong sz ) {
+ switch( sig ) {
+ case FD_SNAPSHOT_HASH_MSG_SUB: {
+ FD_TEST( ctx->state==FD_SNAPLT_STATE_HASHING );
+
+ fd_snapshot_existing_account_t const * prev_acc = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
+
+ if( !should_hash_account( ctx, prev_acc->hdr.pubkey) ) break;
+
+ fd_lthash_value_t prev_lthash[1];
+ fd_hashes_account_lthash_simple( prev_acc->hdr.pubkey,
+ prev_acc->hdr.owner,
+ prev_acc->hdr.lamports,
+ prev_acc->hdr.executable,
+ prev_acc->data,
+ prev_acc->hdr.data_len,
+ prev_lthash );
+ fd_lthash_sub( &ctx->running_lthash, prev_lthash );
+ return;
+ }
+ case FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR: {
+ FD_TEST( ctx->state==FD_SNAPLT_STATE_HASHING && ctx->acc_data_sz==0UL );
+ fd_snapshot_account_t const * account = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
+ if( !should_hash_account( ctx, account->pubkey) ) break;
+
+ if( FD_LIKELY( account->lamports!=0UL ) ) {
+ ctx->hash_account = 1;
+ fd_blake3_init( ctx->b3 );
+ fd_blake3_append( ctx->b3, &account->lamports, sizeof( ulong ) );
+ fd_memcpy( &ctx->account, account, sizeof(fd_snapshot_account_t) );
+ }
+ break;
+ }
+ case FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA: {
+ FD_TEST( ctx->state==FD_SNAPLT_STATE_HASHING );
+ if( FD_LIKELY( ctx->hash_account ) ) {
+ fd_blake3_append( ctx->b3, fd_chunk_to_laddr_const( ctx->in.wksp, chunk ), sz );
+ ctx->acc_data_sz += sz;
+ }
+ break;
+ }
+ default:
+ FD_LOG_ERR(( "unexpected sig %lu in handle_data_frag", sig ));
+ return;
+ }
+
+ /* Additive account hashing */
+ if( FD_LIKELY( ctx->acc_data_sz==ctx->account.data_len && ctx->hash_account ) ) {
+ fd_lthash_value_t account_lthash[1];
+ fd_lthash_zero( account_lthash );
+
+ uchar executable_flag = ctx->account.executable & 0x1;
+ fd_blake3_append( ctx->b3, &executable_flag, sizeof( uchar ) );
+ fd_blake3_append( ctx->b3, ctx->account.owner, FD_HASH_FOOTPRINT );
+ fd_blake3_append( ctx->b3, ctx->account.pubkey, FD_HASH_FOOTPRINT );
+ fd_blake3_fini_2048( ctx->b3, account_lthash->bytes );
+
+ fd_lthash_add( &ctx->running_lthash, account_lthash );
+ ctx->acc_data_sz = 0UL;
+ ctx->hash_account = 0;
+
+ if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
+ else ctx->metrics.incremental.accounts_hashed++;
+ }
+}
+
+static void
+handle_control_frag( fd_snaplt_tile_t * ctx,
+ fd_stem_context_t * stem,
+ ulong sig ) {
+ switch( sig ) {
+ case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
+ ctx->full = 1;
+ ctx->state = FD_SNAPLT_STATE_HASHING;
+ fd_lthash_zero( &ctx->running_lthash );
+
+ ctx->metrics.full.accounts_hashed = 0UL;
+ ctx->metrics.incremental.accounts_hashed = 0UL;
+ break;
+ case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
+ ctx->full = 0;
+ ctx->state = FD_SNAPLT_STATE_HASHING;
+ fd_lthash_zero( &ctx->running_lthash );
+ ctx->metrics.incremental.accounts_hashed = 0UL;
+ break;
+ case FD_SNAPSHOT_MSG_CTRL_EOF_FULL: {
+ uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
+ fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
+ fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
+ ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk0, ctx->out.wmark );
+ ctx->full = 0;
+ fd_lthash_zero( &ctx->running_lthash );
+ break;
+ }
+ case FD_SNAPSHOT_MSG_CTRL_DONE: {
+ uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
+ fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
+ fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
+ ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_SNAPSHOT_HASH_MSG_RESULT, ctx->out.chunk0, ctx->out.wmark );
+ ctx->state = FD_SNAPLT_STATE_DONE;
+ break;
+ }
+ case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
+ FD_LOG_INFO(( "num hashed accounts in full snapshot is %lu and in incremental snapshot is %lu", ctx->metrics.full.accounts_hashed, ctx->metrics.incremental.accounts_hashed ));
+ FD_TEST( ctx->state==FD_SNAPLT_STATE_DONE );
+ ctx->state = FD_SNAPLT_STATE_SHUTDOWN;
+ break;
+ default:
+ FD_LOG_ERR(( "unexpected sig %lu in handle_control_frag", sig ));
+ return;
+ }
+ /* We must acknowledge after handling the control frag, because if it
+ causes us to generate a malformed transition, that must be sent
+ back to the snaprd controller before the acknowledgement. */
+ fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
+}
+
+static inline int
+returnable_frag( fd_snaplt_tile_t * ctx,
+ ulong in_idx,
+ ulong seq,
+ ulong sig,
+ ulong chunk,
+ ulong sz,
+ ulong ctl,
+ ulong tsorig,
+ ulong tspub,
+ fd_stem_context_t * stem ) {
+ (void)in_idx;
+ (void)seq;
+ (void)ctl;
+ (void)tsorig;
+ (void)tspub;
+
+ FD_TEST( ctx->state!=FD_SNAPLT_STATE_SHUTDOWN );
+
+ if( sig==FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR ||
+ sig==FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA ||
+ sig==FD_SNAPSHOT_HASH_MSG_SUB ) handle_data_frag( ctx, sig, chunk, sz );
+ else handle_control_frag( ctx, stem, sig );
+
+ return 0;
+}
+
+static void
+unprivileged_init( fd_topo_t * topo,
+ fd_topo_tile_t * tile ) {
+ void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
+
+ FD_SCRATCH_ALLOC_INIT( l, scratch );
+ fd_snaplt_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplt_tile_t), sizeof(fd_snaplt_tile_t) );
+
+ if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
+ if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
+
+ fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
+ fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
+ ctx->in.wksp = in_wksp->wksp;;
+ ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
+ ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
+ ctx->in.mtu = in_link->mtu;
+
+ fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
+ ctx->out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
+ ctx->out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
+ ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, writer_link->dcache, writer_link->mtu );
+ ctx->out.chunk = ctx->out.chunk0;
+ ctx->out.mtu = writer_link->mtu;
+
+ FD_TEST( strncmp( topo->links[ tile->out_link_id[ 1UL ] ].name, "snaplt_rd", 9UL )==0 );
+ ctx->metrics.full.accounts_hashed = 0UL;
+ ctx->metrics.incremental.accounts_hashed = 0UL;
+
+ ctx->state = FD_SNAPLT_STATE_HASHING;
+ ctx->full = 1;
+ ctx->acc_data_sz = 0UL;
+ ctx->hash_account = 0;
+ ctx->num_hash_tiles = fd_topo_tile_name_cnt( topo, "snaplt" );
+ ctx->hash_tile_idx = tile->kind_id;
+
+ fd_lthash_zero( &ctx->running_lthash );
+}
+
+#define STEM_BURST 1UL
+#define STEM_LAZY 1000L
+
+#define STEM_CALLBACK_CONTEXT_TYPE fd_snaplt_tile_t
+#define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplt_tile_t)
+
+#define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
+#define STEM_CALLBACK_METRICS_WRITE metrics_write
+#define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
+
+#include "../../disco/stem/fd_stem.c"
+
+fd_topo_run_tile_t fd_tile_snaplt = {
+ .name = NAME,
+ .scratch_align = scratch_align,
+ .scratch_footprint = scratch_footprint,
+ .unprivileged_init = unprivileged_init,
+ .run = stem_run,
+};
+
+#undef NAME
diff --git a/src/discof/restore/fd_snaprd_tile.c b/src/discof/restore/fd_snaprd_tile.c
index 5bdb42a4ae2..da13ef7d3d1 100644
--- a/src/discof/restore/fd_snaprd_tile.c
+++ b/src/discof/restore/fd_snaprd_tile.c
@@ -41,9 +41,9 @@
#define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */
-#define IN_KIND_SNAPCTL (0)
-#define IN_KIND_GOSSIP (1)
-#define MAX_IN_LINKS (3)
+#define IN_KIND_SNAPCTL (0)
+#define IN_KIND_GOSSIP (1)
+#define MAX_IN_LINK_KINDS (3)
struct fd_snaprd_tile {
fd_ssping_t * ssping;
@@ -72,7 +72,7 @@ struct fd_snaprd_tile {
int incremental_snapshot_fd;
} local_out;
- uchar in_kind[ MAX_IN_LINKS ];
+ uchar in_kind[ MAX_IN_LINK_KINDS ];
struct {
ulong full_snapshot_slot;
@@ -115,6 +115,7 @@ struct fd_snaprd_tile {
/* TODO: Don't do this ... should be in the monitor instead */
struct {
+ ulong snaplt_tile_cnt;
ulong prev_bytes_read;
ulong prev_accounts_inserted; volatile ulong * cur_accounts_inserted;
@@ -124,6 +125,8 @@ struct fd_snaprd_tile {
ulong prev_snapdc_wait; volatile ulong * cur_snapdc_caughtup_postfrag;
ulong prev_snapin_backp_prefrag; volatile ulong * cur_snapin_backp_prefrag;
ulong prev_snapin_wait; volatile ulong * cur_snapin_caughtup_postfrag;
+ ulong prev_snaplt_backp_prefrag; volatile ulong * cur_snaplt_backp_prefrag[ FD_MAX_SNAPLT_TILES ];
+ ulong prev_snaplt_wait; volatile ulong * cur_snaplt_caughtup_postfrag[ FD_MAX_SNAPLT_TILES ];
} diagnostics;
struct {
@@ -388,6 +391,13 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) {
ulong snapin_backp = *ctx->diagnostics.cur_snapin_backp_prefrag;
ulong snapin_wait = *ctx->diagnostics.cur_snapin_caughtup_postfrag + snapin_backp;
+ ulong snaplt_backp = 0UL;
+ ulong snaplt_wait = 0UL;
+ for( ulong i=0UL; idiagnostics.snaplt_tile_cnt; i++ ) {
+ snaplt_backp += *ctx->diagnostics.cur_snaplt_backp_prefrag[ i ];
+ snaplt_wait += *ctx->diagnostics.cur_snaplt_caughtup_postfrag[ i ] + snaplt_backp;
+ }
+
ulong accounts_inserted = *ctx->diagnostics.cur_accounts_inserted;
double ns_per_tick = 1.0/fd_tempo_tick_per_ns( NULL );
@@ -402,15 +412,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) {
case FD_SNAPRD_STATE_READING_FULL_FILE: {
double progress = 0.0;
if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total;
- FD_LOG_NOTICE(( "restoring full from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
+ double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0;
+ double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0;
+ FD_LOG_NOTICE(( "restoring full from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
progress,
bandwidth,
((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
+ snaplt_backp_val,
100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
+ snaplt_busy_val,
(double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
break;
}
@@ -424,15 +438,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) {
case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE: {
double progress = 0.0;
if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total;
- FD_LOG_NOTICE(( "restoring incremental from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
+ double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0;
+ double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0;
+ FD_LOG_NOTICE(( "restoring incremental from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
progress,
bandwidth,
((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
+ snaplt_backp_val,
100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
+ snaplt_busy_val,
(double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
break;
}
@@ -443,15 +461,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) {
case FD_SNAPRD_STATE_READING_FULL_HTTP: {
double progress = 0.0;
if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total;
- FD_LOG_NOTICE(( "restoring full from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
+ double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0;
+ double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0;
+ FD_LOG_NOTICE(( "restoring full from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
progress,
bandwidth,
((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
+ snaplt_backp_val,
100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
+ snaplt_busy_val,
(double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
break;
}
@@ -466,15 +488,19 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) {
case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
double progress = 0.0;
if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total;
- FD_LOG_NOTICE(( "restoring incremental from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
+ double snaplt_backp_val = ctx->diagnostics.snaplt_tile_cnt ? ((double)( snaplt_backp-ctx->diagnostics.prev_snaplt_backp_prefrag )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt : 0.0;
+ double snaplt_busy_val = ctx->diagnostics.snaplt_tile_cnt ? 100-(((double)( snaplt_wait-ctx->diagnostics.prev_snaplt_wait )*ns_per_tick )/1e7/(double)ctx->diagnostics.snaplt_tile_cnt ) : 0.0;
+ FD_LOG_NOTICE(( "restoring incremental from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
progress,
bandwidth,
((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
+ snaplt_backp_val,
100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
+ snaplt_busy_val,
(double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
break;
}
@@ -497,6 +523,8 @@ print_diagnostics( fd_snaprd_tile_t * ctx ) {
ctx->diagnostics.prev_snapdc_wait = snapdc_wait;
ctx->diagnostics.prev_snapin_backp_prefrag = snapin_backp;
ctx->diagnostics.prev_snapin_wait = snapin_wait;
+ ctx->diagnostics.prev_snaplt_backp_prefrag = snaplt_backp;
+ ctx->diagnostics.prev_snaplt_wait = snaplt_wait;
ctx->diagnostics.prev_accounts_inserted = accounts_inserted;
}
@@ -519,9 +547,9 @@ after_credit( fd_snaprd_tile_t * ctx,
/* All control fragments sent by the snaprd tile must be fully
acknowledged by all downstream consumers before processing can
proceed, to prevent tile state machines from getting out of sync
- (see fd_ssctrl.h for more details). Currently there are two
- downstream consumers, snapdc and snapin. */
-#define NUM_SNAP_CONSUMERS (2UL)
+ (see fd_ssctrl.h for more details). Currently there are three
+ downstream consumers, snapdc, snapin, and snaplt. */
+#define NUM_SNAP_CONSUMERS (2UL + ctx->diagnostics.snaplt_tile_cnt)
if( FD_UNLIKELY( now>ctx->diagnostic_deadline_nanos ) ) {
ctx->diagnostic_deadline_nanos = now+(long)1e9;
@@ -926,19 +954,35 @@ unprivileged_init( fd_topo_t * topo,
ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
+
+ ulong volatile * snaplt_metrics[ FD_MAX_SNAPLT_TILES ];
+ ctx->diagnostics.snaplt_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplt" );
+
+ for( ulong i=0UL; idiagnostics.snaplt_tile_cnt; i++ ) {
+ ulong snaplt_tile_idx = fd_topo_find_tile( topo, "snaplt", i );
+ FD_TEST( snaplt_tile_idx!=ULONG_MAX );
+ fd_topo_tile_t * snaplt_tile = &topo->tiles[ snaplt_tile_idx ];
+ snaplt_metrics[ i ] = fd_metrics_tile( snaplt_tile->metrics );
+ }
+
ctx->diagnostics.cur_snaprd_backp_prefrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
ctx->diagnostics.cur_snaprd_caughtup_postfrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
ctx->diagnostics.cur_snapdc_backp_prefrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
ctx->diagnostics.cur_snapdc_caughtup_postfrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
ctx->diagnostics.cur_snapin_backp_prefrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
ctx->diagnostics.cur_snapin_caughtup_postfrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
+
+ for( ulong i=0UL; idiagnostics.snaplt_tile_cnt; i++ ) {
+ ctx->diagnostics.cur_snaplt_backp_prefrag[ i ] = snaplt_metrics[ i ]+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
+ ctx->diagnostics.cur_snaplt_caughtup_postfrag[ i ] = snaplt_metrics[ i ]+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
+ }
ctx->diagnostics.cur_accounts_inserted = snapin_metrics+MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED );
ctx->gossip.ci_table = _ci_table;
/* zero-out memory so that we can perform null checks in after_frag */
fd_memset( ctx->gossip.ci_table, 0, sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
- FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
+ FD_TEST( tile->in_cnt<=MAX_IN_LINK_KINDS + ctx->diagnostics.snaplt_tile_cnt );
for( ulong i=0UL; i<(tile->in_cnt); i++ ){
fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
if( 0==strcmp( in_link->name, "gossip_out" ) ) {
@@ -949,7 +993,8 @@ unprivileged_init( fd_topo_t * topo,
// ctx->gossip_in.wmark = fd_dcache_compact_wmark ( ctx->gossip_in.mem, in_link->dcache, in_link->mtu );
// ctx->gossip_in.mtu = in_link->mtu;
} else if( 0==strcmp( in_link->name, "snapdc_rd" ) ||
- 0==strcmp( in_link->name, "snapin_rd" ) ) {
+ 0==strcmp( in_link->name, "snapin_rd" ) ||
+ 0==strcmp( in_link->name, "snaplt_rd" ) ) {
ctx->in_kind[ i ] = IN_KIND_SNAPCTL;
}
}
diff --git a/src/discof/restore/utils/fd_ssctrl.h b/src/discof/restore/utils/fd_ssctrl.h
index f21fe288b49..fe3d44398fa 100644
--- a/src/discof/restore/utils/fd_ssctrl.h
+++ b/src/discof/restore/utils/fd_ssctrl.h
@@ -1,6 +1,9 @@
#ifndef HEADER_fd_src_discof_restore_utils_fd_ssctrl_h
#define HEADER_fd_src_discof_restore_utils_fd_ssctrl_h
+#include "../../../flamenco/types/fd_types_custom.h"
+#include "../../../flamenco/runtime/fd_runtime_const.h"
+
/* The snapshot tiles have a somewhat involved state machine, which is
controlled by snaprd. Imagine first the following sequence:
@@ -49,4 +52,56 @@
#define FD_SNAPSHOT_MSG_CTRL_ACK (6UL) /* Sent from tiles back to snaprd, meaning they ACK whatever control message was pending */
#define FD_SNAPSHOT_MSG_CTRL_MALFORMED (7UL) /* Sent from tiles back to snaprd, meaning they consider the current snapshot malformed */
+/* snapin -> snaplt */
+#define FD_SNAPSHOT_HASH_MSG_SUB (8UL) /* Indicates snapin has encountered a duplicate account whose hash must be subtracted */
+#define FD_SNAPSHOT_HASH_MSG_ACCOUNT_HDR (9UL) /* Indicates snapin has encountered a new account metadata */
+#define FD_SNAPSHOT_HASH_MSG_ACCOUNT_DATA (10UL) /* Account data that is sent as snapin processes a new account */
+
+/* snaplt -> snapin */
+#define FD_SNAPSHOT_HASH_MSG_RESULT (11UL) /* Hash result sent from snaplt to snapin */
+
+#define FD_MAX_SNAPLT_TILES (16UL)
+
+/* fd_snapshot_account is the contents of the
+ SNAPSHOT_HASH_MSG_ACCOUNT_HDR message. It contains account metadata
+ that is contained in the accounts hash. */
+struct fd_snapshot_account {
+ uchar pubkey[ FD_HASH_FOOTPRINT ];
+ uchar owner[ FD_HASH_FOOTPRINT ];
+ ulong lamports;
+ uchar executable;
+ ulong data_len;
+};
+typedef struct fd_snapshot_account fd_snapshot_account_t;
+
+/* fd_snapshot_account_init initializes a fd_snapshot_account_t struct
+ with the appropriate account metadata fields. */
+static inline void
+fd_snapshot_account_init( fd_snapshot_account_t * account,
+ uchar const pubkey[ FD_HASH_FOOTPRINT ],
+ uchar const owner[ FD_PUBKEY_FOOTPRINT ],
+ ulong lamports,
+ uchar executable,
+ ulong data_len ) {
+ fd_memcpy( account->pubkey, pubkey, FD_HASH_FOOTPRINT );
+ fd_memcpy( account->owner, owner, FD_PUBKEY_FOOTPRINT );
+ account->lamports = lamports;
+ account->executable = executable;
+ account->data_len = data_len;
+}
+
+/* fd_snapshot_existing_account is the contents of the
+ SNAPSHOT_HASH_MSG_SUB message. It contains a fd_snapshot_account_t
+ header and the corresponding account data in a single message.
+
+ For simplicity and conformance to burst limitations in snapin, the
+ entire duplicate account is sent in one message (one frag). Consider
+ caching the lthash of the duplicate account so we do not have to
+ send the entire account over. */
+struct fd_snapshot_existing_account {
+ fd_snapshot_account_t hdr;
+ uchar data[ FD_RUNTIME_ACC_SZ_MAX ];
+};
+typedef struct fd_snapshot_existing_account fd_snapshot_existing_account_t;
+
#endif /* HEADER_fd_src_discof_restore_utils_fd_ssctrl_h */
diff --git a/src/flamenco/runtime/fd_hashes.c b/src/flamenco/runtime/fd_hashes.c
index 45c48f47a80..e3c3f6debd5 100644
--- a/src/flamenco/runtime/fd_hashes.c
+++ b/src/flamenco/runtime/fd_hashes.c
@@ -13,21 +13,38 @@ fd_hashes_account_lthash( fd_pubkey_t const * pubkey,
fd_account_meta_t const * account,
uchar const * data,
fd_lthash_value_t * lthash_out ) {
+ fd_hashes_account_lthash_simple( pubkey->uc,
+ account->owner,
+ account->lamports,
+ account->executable,
+ data,
+ account->dlen,
+ lthash_out );
+}
+
+void
+fd_hashes_account_lthash_simple( uchar const pubkey[ static FD_HASH_FOOTPRINT ],
+ uchar const owner[ static FD_PUBKEY_FOOTPRINT ],
+ ulong lamports,
+ uchar executable,
+ uchar const * data,
+ ulong data_len,
+ fd_lthash_value_t * lthash_out ) {
fd_lthash_zero( lthash_out );
/* Accounts with zero lamports are not included in the hash, so they should always be treated as zero */
- if( FD_UNLIKELY( account->lamports == 0 ) ) {
+ if( FD_UNLIKELY( lamports == 0 ) ) {
return;
}
- uchar executable = account->executable & 0x1;
+ uchar executable_flag = executable & 0x1;
fd_blake3_t b3[1];
fd_blake3_init( b3 );
- fd_blake3_append( b3, &account->lamports, sizeof( ulong ) );
- fd_blake3_append( b3, data, account->dlen );
- fd_blake3_append( b3, &executable, sizeof( uchar ) );
- fd_blake3_append( b3, account->owner, FD_PUBKEY_FOOTPRINT );
+ fd_blake3_append( b3, &lamports, sizeof( ulong ) );
+ fd_blake3_append( b3, data, data_len );
+ fd_blake3_append( b3, &executable_flag, sizeof( uchar ) );
+ fd_blake3_append( b3, owner, FD_PUBKEY_FOOTPRINT );
fd_blake3_append( b3, pubkey, FD_PUBKEY_FOOTPRINT );
fd_blake3_fini_2048( b3, lthash_out->bytes );
}
diff --git a/src/flamenco/runtime/fd_hashes.h b/src/flamenco/runtime/fd_hashes.h
index fb8b0ed1a8b..928f134f635 100644
--- a/src/flamenco/runtime/fd_hashes.h
+++ b/src/flamenco/runtime/fd_hashes.h
@@ -56,6 +56,29 @@ fd_hashes_account_lthash( fd_pubkey_t const * pubkey,
uchar const * data,
fd_lthash_value_t * lthash_out );
+/* fd_hashes_account_lthash_simple is functionally the same as
+ fd_hashes_account_lthash, but with simpler arguments that detail
+ the exact parameters that go into the lthash.
+
+ pubkey points to the account's public key (32 bytes). owner points
+ to the account's owner (32 bytes). lamports is the account's
+ lamports. executable is the account's executable flag. data points
+ to the account data. data_len is the length of the account data.
+ lthash_out points to where the computed lthash value will be written
+ (2048 bytes).
+
+ On return, lthash_out contains the computed lthash. This function
+ assumes all pointers are valid and properly aligned. The account
+ data pointer must be readable for account->dlen bytes. */
+void
+fd_hashes_account_lthash_simple( uchar const pubkey[ static FD_HASH_FOOTPRINT ],
+ uchar const owner[ static FD_PUBKEY_FOOTPRINT ],
+ ulong lamports,
+ uchar executable,
+ uchar const * data,
+ ulong data_len,
+ fd_lthash_value_t * lthash_out );
+
/* fd_hashes_update_lthash updates the bank's incremental lthash when an
account is modified during transaction execution. The bank lthash is
maintained incrementally by subtracting the old account hash and
diff --git a/src/flamenco/runtime/tests/run_backtest_ci.sh b/src/flamenco/runtime/tests/run_backtest_ci.sh
index db316955b4d..16faa8a4af3 100755
--- a/src/flamenco/runtime/tests/run_backtest_ci.sh
+++ b/src/flamenco/runtime/tests/run_backtest_ci.sh
@@ -3,7 +3,7 @@ set -e
src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-308392063-v2.3.0 -y 5 -m 2000000 -e 308392090 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l devnet-350814254-v2.3.0 -y 3 -m 2000000 -e 350814284 -c 2.3.0
-src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0
+src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0 -lt
src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-324823213-v2.3.0 -y 4 -m 2000000 -e 324823214 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-325467935-v2.3.0 -y 4 -m 2000000 -e 325467936 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-283927487-v2.3.0 -y 3 -m 2000000 -e 283927497 -c 2.3.0
diff --git a/src/flamenco/runtime/tests/run_backtest_tests_all.sh b/src/flamenco/runtime/tests/run_backtest_tests_all.sh
index b2930f87a5d..4bf9ee4ce8c 100755
--- a/src/flamenco/runtime/tests/run_backtest_tests_all.sh
+++ b/src/flamenco/runtime/tests/run_backtest_tests_all.sh
@@ -63,7 +63,7 @@ src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-307395181-v2.3.0 -y
src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-308392063-v2.3.0 -y 5 -m 2000000 -e 308392090 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l devnet-350814254-v2.3.0 -y 3 -m 2000000 -e 350814284 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-311586340-v2.3.0 -y 3 -m 2000000 -e 311586380 -c 2.3.0
-src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0
+src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-281546597-v2.3.0 -y 3 -m 2000000 -e 281546597 -c 2.3.0 -lt
src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-324823213-v2.3.0 -y 4 -m 2000000 -e 324823214 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l mainnet-325467935-v2.3.0 -y 4 -m 2000000 -e 325467936 -c 2.3.0
src/flamenco/runtime/tests/run_ledger_backtest.sh -l testnet-283927487-v2.3.0 -y 3 -m 2000000 -e 283927497 -c 2.3.0
diff --git a/src/flamenco/runtime/tests/run_ledger_backtest.sh b/src/flamenco/runtime/tests/run_ledger_backtest.sh
index 627bbd403ef..f4a053e67b2 100755
--- a/src/flamenco/runtime/tests/run_ledger_backtest.sh
+++ b/src/flamenco/runtime/tests/run_ledger_backtest.sh
@@ -22,6 +22,7 @@ HUGE_TLBFS_MOUNT_PATH=${HUGE_TLBFS_MOUNT_PATH:="/mnt/.fd"}
HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE=${HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE:="true"}
HAS_INCREMENTAL="false"
REDOWNLOAD=1
+DISABLE_LTHASH_VERIFICATION="true"
while [[ $# -gt 0 ]]; do
case $1 in
@@ -100,6 +101,10 @@ while [[ $# -gt 0 ]]; do
REDOWNLOAD=0
shift
;;
+ -lt|--enable-lthash-verification)
+ DISABLE_LTHASH_VERIFICATION="false"
+ shift
+ ;;
-*|--*)
echo "unknown option $1"
exit 1
@@ -166,6 +171,7 @@ echo "
maximum_download_retry_abort = 0
[layout]
shred_tile_count = 4
+ snaplt_tile_count = 4
[tiles]
[tiles.archiver]
enabled = true
@@ -200,7 +206,10 @@ echo "
snapshots = \"$DUMP/$LEDGER\"
[hugetlbfs]
mount_path = \"$HUGE_TLBFS_MOUNT_PATH\"
- allow_hugepage_increase = $HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE" > $DUMP_DIR/${LEDGER}_backtest.toml
+ allow_hugepage_increase = $HUGE_TLBFS_ALLOW_HUGEPAGE_INCREASE
+[development]
+ [development.snapshots]
+ disable_lthash_verification = $DISABLE_LTHASH_VERIFICATION" > $DUMP_DIR/${LEDGER}_backtest.toml
if [[ -z "$GENESIS" ]]; then
echo "[gossip]