Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions crates/ethereum/payload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use reth_transaction_pool::{
ValidPoolTransaction,
};
use revm::context_interface::Block as _;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tracing::{debug, trace, warn};

mod config;
Expand Down Expand Up @@ -114,7 +117,13 @@ where
&self,
config: PayloadConfig<Self::Attributes>,
) -> Result<EthBuiltPayload, PayloadBuilderError> {
let args = BuildArguments::new(Default::default(), config, Default::default(), None);
let args = BuildArguments::new(
Default::default(),
config,
Default::default(),
None,
Arc::new(AtomicBool::new(false)),
);

default_ethereum_payload(
self.evm_config.clone(),
Expand Down Expand Up @@ -149,7 +158,7 @@ where
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>>,
F: FnOnce(BestTransactionsAttributes) -> BestTransactionsIter<Pool>,
{
let BuildArguments { mut cached_reads, config, cancel, best_payload } = args;
let BuildArguments { mut cached_reads, config, cancel, best_payload, is_resolving } = args;
let PayloadConfig { parent_header, attributes } = config;

let state_provider = client.state_by_block_hash(parent_header.hash())?;
Expand Down Expand Up @@ -228,6 +237,13 @@ where
return Ok(BuildOutcome::Cancelled)
}

// check if the payload is being resolved, if so we should stop adding more transactions
// and return whatever payload we have built so far
if is_resolving.load(Ordering::Relaxed) {
debug!(target: "payload_builder", "payload is being resolved, stopping transaction processing");
break
}

// convert tx to a signed transaction
let tx = pool_tx.to_consensus();

Expand Down
29 changes: 27 additions & 2 deletions crates/optimism/payload/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ use reth_revm::{
use reth_storage_api::{errors::ProviderError, StateProvider, StateProviderFactory};
use reth_transaction_pool::{BestTransactionsAttributes, PoolTransaction, TransactionPool};
use revm::context::{Block, BlockEnv};
use std::{marker::PhantomData, sync::Arc};
use std::{
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tracing::{debug, trace, warn};

/// Optimism's payload builder
Expand Down Expand Up @@ -180,7 +186,7 @@ where
Txs:
PayloadTransactions<Transaction: PoolTransaction<Consensus = N::SignedTx> + OpPooledTx>,
{
let BuildArguments { mut cached_reads, config, cancel, best_payload } = args;
let BuildArguments { mut cached_reads, config, cancel, best_payload, is_resolving } = args;

let ctx = OpPayloadBuilderCtx {
evm_config: self.evm_config.clone(),
Expand All @@ -189,6 +195,7 @@ where
config,
cancel,
best_payload,
is_resolving,
};

let builder = OpBuilder::new(best);
Expand Down Expand Up @@ -225,6 +232,7 @@ where
config,
cancel: Default::default(),
best_payload: Default::default(),
is_resolving: Arc::new(AtomicBool::new(false)),
};

let state_provider = self.client.state_by_block_hash(ctx.parent().hash())?;
Expand Down Expand Up @@ -279,6 +287,7 @@ where
cached_reads: Default::default(),
cancel: Default::default(),
best_payload: None,
is_resolving: Arc::new(AtomicBool::new(false)),
};
self.build_payload(args, |_| NoopPayloadTransactions::<Pool::Transaction>::default())?
.into_payload()
Expand Down Expand Up @@ -558,6 +567,8 @@ pub struct OpPayloadBuilderCtx<
pub cancel: CancelOnDrop,
/// The currently best payload.
pub best_payload: Option<OpBuiltPayload<Evm::Primitives>>,
/// Flag indicating whether the payload job is being resolved.
pub is_resolving: Arc<AtomicBool>,
}

impl<Evm, ChainSpec, Attrs> OpPayloadBuilderCtx<Evm, ChainSpec, Attrs>
Expand Down Expand Up @@ -597,6 +608,13 @@ where
is_better_payload(self.best_payload.as_ref(), total_fees)
}

/// Returns `true` if the payload job is being resolved.
///
/// When this returns `true`, the builder should finish as quickly as possible.
pub fn is_resolving(&self) -> bool {
self.is_resolving.load(Ordering::Relaxed)
}

/// Prepares a [`BlockBuilder`] for the next block.
pub fn block_builder<'a, DB: Database>(
&'a self,
Expand Down Expand Up @@ -740,6 +758,13 @@ where
return Ok(Some(()))
}

// check if the payload is being resolved, if so we should stop adding more transactions
// and return whatever payload we have built so far
if self.is_resolving() {
debug!(target: "payload_builder", "payload is being resolved, stopping transaction processing");
return Ok(Some(()))
}

let gas_used = match builder.execute_transaction(tx.clone()) {
Ok(gas_used) => gas_used,
Err(BlockExecutionError::Validation(BlockValidationError::InvalidTx {
Expand Down
41 changes: 37 additions & 4 deletions crates/payload/basic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use std::{
future::Future,
ops::Deref,
pin::Pin,
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH},
};
Expand Down Expand Up @@ -177,6 +180,7 @@ where
payload_task_guard: self.payload_task_guard.clone(),
metrics: Default::default(),
builder: self.builder.clone(),
is_resolving: Arc::new(AtomicBool::new(false)),
};

// start the first job right away
Expand Down Expand Up @@ -328,6 +332,12 @@ where
///
/// See [`PayloadBuilder`]
builder: Builder,
/// Flag to indicate that the payload is being resolved.
///
/// This is set to `true` when [`PayloadJob::resolve`] is called, signaling to any
/// in-progress build tasks that they should finish as quickly as possible and return
/// whatever payload they have built so far rather than waiting for more transactions.
is_resolving: Arc<AtomicBool>,
}

impl<Tasks, Builder> BasicPayloadJob<Tasks, Builder>
Expand All @@ -349,11 +359,17 @@ where
self.metrics.inc_initiated_payload_builds();
let cached_reads = self.cached_reads.take().unwrap_or_default();
let builder = self.builder.clone();
let is_resolving = self.is_resolving.clone();
self.executor.spawn_blocking(Box::pin(async move {
// acquire the permit for executing the task
let _permit = guard.acquire().await;
let args =
BuildArguments { cached_reads, config: payload_config, cancel, best_payload };
let args = BuildArguments {
cached_reads,
config: payload_config,
cancel,
best_payload,
is_resolving,
};
let result = builder.try_build(args);
let _ = tx.send(result);
}));
Expand Down Expand Up @@ -463,6 +479,9 @@ where
&mut self,
kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
// Signal to any in-progress build tasks that we're resolving
self.is_resolving.store(true, Ordering::SeqCst);

let best_payload = self.best_payload.payload().cloned();
if best_payload.is_none() && self.pending_block.is_none() {
// ensure we have a job scheduled if we don't have a best payload yet and none is active
Expand All @@ -480,6 +499,7 @@ where
config: self.config.clone(),
cancel: CancelOnDrop::default(),
best_payload: None,
is_resolving: self.is_resolving.clone(),
};

match self.builder.on_missing_payload(args) {
Expand Down Expand Up @@ -804,6 +824,11 @@ pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
pub cancel: CancelOnDrop,
/// The best payload achieved so far.
pub best_payload: Option<Payload>,
/// Flag indicating whether the payload job is being resolved.
///
/// When `true`, the builder should finish as quickly as possible and return whatever
/// payload it has built so far, rather than waiting for more transactions.
pub is_resolving: Arc<AtomicBool>,
}

impl<Attributes, Payload: BuiltPayload> BuildArguments<Attributes, Payload> {
Expand All @@ -813,8 +838,16 @@ impl<Attributes, Payload: BuiltPayload> BuildArguments<Attributes, Payload> {
config: PayloadConfig<Attributes, HeaderTy<Payload::Primitives>>,
cancel: CancelOnDrop,
best_payload: Option<Payload>,
is_resolving: Arc<AtomicBool>,
) -> Self {
Self { cached_reads, config, cancel, best_payload }
Self { cached_reads, config, cancel, best_payload, is_resolving }
}

/// Returns `true` if the payload job is being resolved.
///
/// When this returns `true`, the builder should finish as quickly as possible.
pub fn is_resolving(&self) -> bool {
self.is_resolving.load(Ordering::Relaxed)
}
}

Expand Down
4 changes: 3 additions & 1 deletion crates/payload/basic/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ where
&self,
args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError> {
let BuildArguments { cached_reads, config, cancel, best_payload } = args;
let BuildArguments { cached_reads, config, cancel, best_payload, is_resolving } = args;
let PayloadConfig { parent_header, attributes } = config;

match attributes {
Expand All @@ -210,6 +210,7 @@ where
None
}
}),
is_resolving,
};
self.left.try_build(left_args).map(|out| out.map_payload(Either::Left))
}
Expand All @@ -225,6 +226,7 @@ where
None
}
}),
is_resolving,
};
self.right.try_build(right_args).map(|out| out.map_payload(Either::Right))
}
Expand Down