Skip to content

Commit bf5e471

Browse files
committed
fix: dummy el race condition
1 parent 377eb98 commit bf5e471

File tree

2 files changed

+112
-22
lines changed

2 files changed

+112
-22
lines changed

beacon_node/client/src/builder.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,24 @@ where
205205
jwt_secret_path: None,
206206
};
207207

208+
// Create a channel to wait for the dummy EL to be ready
209+
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
210+
208211
// Spawn the dummy EL in a background task
209212
tokio::spawn(async move {
210-
if let Err(e) = dummy_el::start_dummy_el(dummy_el_config).await {
213+
if let Err(e) = dummy_el::prepare_and_start_dummy_el(dummy_el_config, ready_tx).await
214+
{
211215
eprintln!("Error starting dummy execution layer: {:?}", e);
212216
}
213217
});
218+
219+
// Wait for the dummy EL to be ready before continuing
220+
if let Err(_) = ready_rx.await {
221+
return Err(
222+
"Dummy execution layer failed to start or signal readiness".to_string(),
223+
);
224+
}
225+
info!("Dummy execution layer is ready");
214226
}
215227

216228
let kzg_err_msg = |e| format!("Failed to load trusted setup: {:?}", e);

dummy_el/src/lib.rs

Lines changed: 99 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use serde_json::{json, Value as JsonValue};
2020
use std::net::SocketAddr;
2121
use std::path::PathBuf;
2222
use std::sync::Arc;
23+
use tokio::sync::oneshot;
2324
use tracing::{debug, error, warn};
2425

2526
const JSONRPC_VERSION: &str = "2.0";
@@ -36,6 +37,20 @@ pub struct DummyElConfig {
3637
pub jwt_secret_path: Option<PathBuf>,
3738
}
3839

40+
/// Represents a prepared dummy execution layer ready to run
41+
pub struct PreparedDummyEl {
42+
engine_listener: tokio::net::TcpListener,
43+
engine_app: Router,
44+
rpc_listener: tokio::net::TcpListener,
45+
rpc_app: Router,
46+
ws_listener: tokio::net::TcpListener,
47+
ws_app: Router,
48+
metrics_listener: tokio::net::TcpListener,
49+
metrics_app: Router,
50+
p2p_tcp_task: tokio::task::JoinHandle<()>,
51+
p2p_udp_task: tokio::task::JoinHandle<()>,
52+
}
53+
3954
#[derive(Debug, Clone)]
4055
struct AppState {
4156
jwt_secret: Option<Vec<u8>>,
@@ -160,21 +175,37 @@ async fn handle_rpc(
160175
| "engine_newPayloadV2"
161176
| "engine_newPayloadV3"
162177
| "engine_newPayloadV4" => {
163-
debug!("{}: returning SYNCING status", request.method);
178+
debug!("{}: returning VALID status", request.method);
179+
// Extract blockHash from the ExecutionPayload (params[0])
180+
let block_hash = request
181+
.params
182+
.get(0)
183+
.and_then(|payload| payload.get("blockHash"))
184+
.and_then(|hash| hash.as_str())
185+
.unwrap_or("0x0000000000000000000000000000000000000000000000000000000000000000");
186+
164187
Ok(json!({
165-
"status": "SYNCING",
166-
"latestValidHash": null,
188+
"status": "VALID",
189+
"latestValidHash": block_hash,
167190
"validationError": null
168191
}))
169192
}
170193
"engine_forkchoiceUpdatedV1"
171194
| "engine_forkchoiceUpdatedV2"
172195
| "engine_forkchoiceUpdatedV3" => {
173-
debug!("{}: returning SYNCING status", request.method);
196+
debug!("{}: returning VALID status", request.method);
197+
// Extract headBlockHash from the ForkchoiceState (params[0])
198+
let head_block_hash = request
199+
.params
200+
.get(0)
201+
.and_then(|state| state.get("headBlockHash"))
202+
.and_then(|hash| hash.as_str())
203+
.unwrap_or("0x0000000000000000000000000000000000000000000000000000000000000000");
204+
174205
Ok(json!({
175206
"payloadStatus": {
176-
"status": "SYNCING",
177-
"latestValidHash": null,
207+
"status": "VALID",
208+
"latestValidHash": head_block_hash,
178209
"validationError": null
179210
},
180211
"payloadId": null
@@ -322,11 +353,29 @@ fn read_jwt_secret(path: &PathBuf) -> anyhow::Result<Vec<u8>> {
322353
Ok(bytes)
323354
}
324355

325-
/// Start the dummy execution layer server
356+
/// Prepare the dummy execution layer for startup
326357
///
327-
/// This spawns the dummy EL HTTP servers on the configured ports.
328-
/// Returns a task handle that should be spawned or awaited.
329-
pub async fn start_dummy_el(config: DummyElConfig) -> anyhow::Result<()> {
358+
/// This function binds all necessary ports and prepares the servers,
359+
/// then signals readiness via the oneshot channel before running the servers.
360+
/// The function does not return until the servers are shut down.
361+
pub async fn prepare_and_start_dummy_el(
362+
config: DummyElConfig,
363+
ready_tx: oneshot::Sender<()>,
364+
) -> anyhow::Result<()> {
365+
let prepared = prepare_dummy_el(config).await?;
366+
367+
// Signal that we're ready
368+
let _ = ready_tx.send(());
369+
370+
// Now run the servers
371+
prepared.run().await
372+
}
373+
374+
/// Prepare the dummy execution layer server without starting it
375+
///
376+
/// This binds all ports and prepares the servers but does not start accepting connections.
377+
/// Returns a `PreparedDummyEl` that can be run with the `run()` method.
378+
pub async fn prepare_dummy_el(config: DummyElConfig) -> anyhow::Result<PreparedDummyEl> {
330379
// Read JWT secret if provided
331380
let jwt_secret = match &config.jwt_secret_path {
332381
Some(path) => match read_jwt_secret(path) {
@@ -426,22 +475,51 @@ pub async fn start_dummy_el(config: DummyElConfig) -> anyhow::Result<()> {
426475
}
427476
});
428477

429-
debug!("Ready to accept requests on all ports");
430-
431-
// Spawn all servers concurrently
478+
// Bind all servers without starting them
432479
let engine_listener = tokio::net::TcpListener::bind(engine_addr).await?;
433480
let rpc_listener = tokio::net::TcpListener::bind(rpc_addr).await?;
434481
let ws_listener = tokio::net::TcpListener::bind(ws_addr).await?;
435482
let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?;
436483

437-
tokio::select! {
438-
result = axum::serve(engine_listener, engine_app) => result?,
439-
result = axum::serve(rpc_listener, rpc_app) => result?,
440-
result = axum::serve(ws_listener, ws_app) => result?,
441-
result = axum::serve(metrics_listener, metrics_app) => result?,
442-
_ = p2p_tcp_task => {},
443-
_ = p2p_udp_task => {},
484+
debug!("All listeners bound and ready");
485+
486+
Ok(PreparedDummyEl {
487+
engine_listener,
488+
engine_app,
489+
rpc_listener,
490+
rpc_app,
491+
ws_listener,
492+
ws_app,
493+
metrics_listener,
494+
metrics_app,
495+
p2p_tcp_task,
496+
p2p_udp_task,
497+
})
498+
}
499+
500+
impl PreparedDummyEl {
501+
/// Run the prepared dummy execution layer servers
502+
pub async fn run(self) -> anyhow::Result<()> {
503+
debug!("Running dummy execution layer servers");
504+
505+
tokio::select! {
506+
result = axum::serve(self.engine_listener, self.engine_app) => result?,
507+
result = axum::serve(self.rpc_listener, self.rpc_app) => result?,
508+
result = axum::serve(self.ws_listener, self.ws_app) => result?,
509+
result = axum::serve(self.metrics_listener, self.metrics_app) => result?,
510+
_ = self.p2p_tcp_task => {},
511+
_ = self.p2p_udp_task => {},
512+
}
513+
514+
Ok(())
444515
}
516+
}
445517

446-
Ok(())
518+
/// Start the dummy execution layer server (legacy function)
519+
///
520+
/// This is a convenience function that prepares and starts the dummy EL.
521+
/// For more control, use `prepare_dummy_el()` and `prepare_and_start_dummy_el()`.
522+
pub async fn start_dummy_el(config: DummyElConfig) -> anyhow::Result<()> {
523+
let prepared = prepare_dummy_el(config).await?;
524+
prepared.run().await
447525
}

0 commit comments

Comments
 (0)