Skip to content

Commit d3fcc27

Browse files
authored
feat: JournalDb trait (#13)
* feat: JournalDb trait * feat: ingestor task * refactor: introduce a chill type alias
1 parent 7d3ec30 commit d3fcc27

File tree

9 files changed

+167
-21
lines changed

9 files changed

+167
-21
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ signet-zenith = "0.9.0"
5353
ajj = { version = "0.3.4" }
5454

5555
# trevm
56-
trevm = { version = "0.27.0", features = ["full_env_cfg"] }
56+
trevm = { version = "0.27.1", features = ["full_env_cfg"] }
5757

5858
# Alloy periphery crates
5959
alloy = { version = "1.0.19", features = [

crates/db/Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ repository.workspace = true
1111
[dependencies]
1212
signet-node-types.workspace = true
1313

14-
1514
signet-evm.workspace = true
1615
signet-types.workspace = true
1716
signet-zenith.workspace = true
1817

18+
trevm.workspace = true
19+
1920
alloy.workspace = true
2021

2122
reth.workspace = true
@@ -24,4 +25,7 @@ reth-prune-types.workspace = true
2425

2526
itertools.workspace = true
2627
serde.workspace = true
27-
tracing.workspace = true
28+
tracing.workspace = true
29+
futures-util = "0.3.31"
30+
tokio.workspace = true
31+
auto_impl = "1.3.0"

crates/db/src/aliases.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
use reth::providers::DatabaseProviderRW;
2+
use signet_node_types::SignetNodeTypes;
3+
4+
/// A Convenience alias for a [`DatabaseProviderRW`] using [`SignetNodeTypes`].
5+
pub type SignetDbRw<Db> = DatabaseProviderRW<Db, SignetNodeTypes<Db>>;

crates/db/src/chain.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@ use reth::providers::Chain;
33
use signet_zenith::{Passage, Transactor, Zenith};
44
use std::collections::BTreeMap;
55

6-
/// The host extraction contents for a block.
6+
/// Host extraction contrents for a single block.
7+
///
8+
/// This is a container type for DB operations. It is held by the [`RuChain`]
9+
/// struct, and constructed during the [`RuWriter::get_extraction_results`]
10+
/// method.
11+
///
12+
/// [`RuWriter::get_extraction_results`]:
13+
/// crate::traits::RuWriter::get_extraction_results
714
#[derive(Debug, Clone, Default, PartialEq, Eq)]
815
pub struct DbExtractionResults {
916
/// The zenith header for the block.
@@ -16,7 +23,8 @@ pub struct DbExtractionResults {
1623
pub enter_tokens: Vec<Passage::EnterToken>,
1724
}
1825

19-
/// Equivalent of [`Chain`] but also containing zenith events and information.
26+
/// Equivalent of [`Chain`] but also containing [`DbExtractionResults`] for
27+
/// each block.
2028
#[derive(Debug, Clone, Default, PartialEq, Eq)]
2129
pub struct RuChain {
2230
/// Inner chain of RU blocks.

crates/db/src/journal/mod.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//! Utilities for working with Signet journals in a reth database.
2+
3+
mod r#trait;
4+
pub use r#trait::JournalDb;
5+
6+
use crate::SignetDbRw;
7+
use futures_util::{Stream, StreamExt};
8+
use reth::providers::ProviderResult;
9+
use signet_node_types::NodeTypesDbTrait;
10+
use std::sync::Arc;
11+
use tokio::task::JoinHandle;
12+
use trevm::journal::BlockUpdate;
13+
14+
/// A task that ingests journals into a reth database.
15+
#[derive(Debug)]
16+
pub struct JournalIngestor<Db: NodeTypesDbTrait> {
17+
db: Arc<SignetDbRw<Db>>,
18+
}
19+
20+
impl<Db: NodeTypesDbTrait> From<SignetDbRw<Db>> for JournalIngestor<Db> {
21+
fn from(value: SignetDbRw<Db>) -> Self {
22+
Self::new(value.into())
23+
}
24+
}
25+
26+
impl<Db: NodeTypesDbTrait> From<Arc<SignetDbRw<Db>>> for JournalIngestor<Db> {
27+
fn from(value: Arc<SignetDbRw<Db>>) -> Self {
28+
Self::new(value)
29+
}
30+
}
31+
32+
impl<Db: NodeTypesDbTrait> JournalIngestor<Db> {
33+
/// Create a new `JournalIngestor` with the given database provider.
34+
pub const fn new(db: Arc<SignetDbRw<Db>>) -> Self {
35+
Self { db }
36+
}
37+
38+
async fn task_future<S>(self, mut stream: S) -> ProviderResult<()>
39+
where
40+
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
41+
{
42+
while let Some(item) = stream.next().await {
43+
// FUTURE: Sanity check that the header height matches the update
44+
// height. Sanity check that both heights are 1 greater than the
45+
// last height in the database.
46+
47+
let db = self.db.clone();
48+
let (header, block_update) = item;
49+
50+
// DB interaction is sync, so we spawn a blocking task for it. We
51+
// immediately await that task. This prevents blocking the worker
52+
// thread
53+
tokio::task::spawn_blocking(move || db.ingest(&header, block_update))
54+
.await
55+
.expect("ingestion should not panic")?;
56+
}
57+
// Stream has ended, return Ok
58+
Ok(())
59+
}
60+
61+
/// Spawn a task to ingest journals from the provided stream.
62+
pub fn spawn<S>(self, stream: S) -> JoinHandle<ProviderResult<()>>
63+
where
64+
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
65+
{
66+
tokio::spawn(self.task_future(stream))
67+
}
68+
}
69+
70+
/// Ingest journals from a stream into a reth database.
71+
pub async fn ingest_journals<Db, S>(db: Arc<SignetDbRw<Db>>, stream: S) -> ProviderResult<()>
72+
where
73+
Db: NodeTypesDbTrait,
74+
S: Stream<Item = (alloy::consensus::Header, BlockUpdate<'static>)> + Send + Unpin + 'static,
75+
{
76+
let ingestor = JournalIngestor::new(db);
77+
ingestor.task_future(stream).await
78+
}

crates/db/src/journal/trait.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::RuWriter;
2+
use alloy::consensus::{BlockHeader, Header};
3+
use reth::{providers::ProviderResult, revm::db::BundleState};
4+
use signet_evm::{BlockResult, ExecutionOutcome};
5+
use signet_types::primitives::{RecoveredBlock, SealedBlock, SealedHeader, TransactionSigned};
6+
use trevm::journal::BlockUpdate;
7+
8+
/// A database that can be updated with journals.
9+
pub trait JournalDb: RuWriter {
10+
/// Ingest a journal into the database.
11+
///
12+
/// This will create a [`BlockResult`] from the provided header and update,
13+
/// and append it to the database using [`RuWriter::append_host_block`].
14+
///
15+
/// This DOES NOT update tables containing historical transactions,
16+
/// receipts, events, etc. It only updates tables related to headers,
17+
/// and state.
18+
///
19+
/// This is intended to be used for tx simulation, and other purposes that
20+
/// need fast state access WITHTOUT needing to retrieve historical data.
21+
fn ingest(&self, header: &Header, update: BlockUpdate<'_>) -> ProviderResult<()> {
22+
let journal_hash = update.journal_hash();
23+
24+
// TODO: remove the clone in future versions. This can be achieved by
25+
// _NOT_ making a `BlockResult` and instead manually updating relevan
26+
// tables. However, this means diverging more fro the underlying reth
27+
// logic that we are currently re-using.
28+
let bundle_state: BundleState = update.journal().clone().into();
29+
let execution_outcome = ExecutionOutcome::new(bundle_state, vec![], header.number());
30+
31+
let block: SealedBlock<TransactionSigned, Header> =
32+
SealedBlock { header: SealedHeader::new(header.to_owned()), body: Default::default() };
33+
let block_result =
34+
BlockResult { sealed_block: RecoveredBlock::new(block, vec![]), execution_outcome };
35+
36+
self.append_host_block(
37+
None,
38+
std::iter::empty(),
39+
std::iter::empty(),
40+
std::iter::empty(),
41+
&block_result,
42+
journal_hash,
43+
)
44+
}
45+
}
46+
47+
impl<T> JournalDb for T where T: RuWriter {}

crates/db/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@
1111
#![deny(unused_must_use, rust_2018_idioms)]
1212
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
1313

14+
mod aliases;
15+
pub use aliases::SignetDbRw;
16+
1417
mod chain;
1518
pub use chain::{DbExtractionResults, RuChain};
1619

1720
mod convert;
1821
pub use convert::DataCompat;
1922

23+
pub mod journal;
24+
2025
mod provider;
2126

2227
mod tables;

crates/db/src/provider.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
DataCompat, DbZenithHeader, RuChain, ZenithHeaders,
2+
DataCompat, DbZenithHeader, RuChain, SignetDbRw, ZenithHeaders,
33
tables::{DbSignetEvent, JournalHashes, SignetEvents},
44
traits::RuWriter,
55
};
@@ -11,9 +11,9 @@ use reth::{
1111
primitives::{Account, StaticFileSegment},
1212
providers::{
1313
AccountReader, BlockBodyIndicesProvider, BlockNumReader, BlockReader, BlockWriter, Chain,
14-
DBProvider, DatabaseProviderRW, HistoryWriter, OriginalValuesKnown, ProviderError,
15-
ProviderResult, StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
16-
StaticFileWriter, StorageLocation,
14+
DBProvider, HistoryWriter, OriginalValuesKnown, ProviderError, ProviderResult,
15+
StageCheckpointWriter, StateWriter, StaticFileProviderFactory, StaticFileWriter,
16+
StorageLocation,
1717
},
1818
};
1919
use reth_db::{
@@ -25,7 +25,7 @@ use reth_db::{
2525
};
2626
use reth_prune_types::{MINIMUM_PRUNING_DISTANCE, PruneMode};
2727
use signet_evm::BlockResult;
28-
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
28+
use signet_node_types::NodeTypesDbTrait;
2929
use signet_types::primitives::RecoveredBlock;
3030
use signet_zenith::{
3131
Passage::{self, Enter, EnterToken},
@@ -35,7 +35,7 @@ use signet_zenith::{
3535
use std::ops::RangeInclusive;
3636
use tracing::{debug, instrument, trace, warn};
3737

38-
impl<Db> RuWriter for DatabaseProviderRW<Db, SignetNodeTypes<Db>>
38+
impl<Db> RuWriter for SignetDbRw<Db>
3939
where
4040
Db: NodeTypesDbTrait,
4141
{
@@ -430,7 +430,6 @@ where
430430
/// see the documentation for each function.
431431
fn append_host_block(
432432
&self,
433-
host_height: u64,
434433
header: Option<Zenith::BlockHeader>,
435434
transacts: impl IntoIterator<Item = Transact>,
436435
enters: impl IntoIterator<Item = Passage::Enter>,
@@ -480,7 +479,7 @@ where
480479

481480
self.update_pipeline_stages(ru_height, false)?;
482481

483-
debug!(target: "signet_db_lifecycle", host_height, ru_height, "Appended blocks");
482+
debug!(target: "signet_db_lifecycle", ru_height, "Appended blocks");
484483

485484
Ok(())
486485
}

crates/db/src/traits.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
use crate::{DbExtractionResults, DbSignetEvent, RuChain};
1+
use crate::{DbExtractionResults, DbSignetEvent, RuChain, SignetDbRw};
22
use alloy::primitives::{Address, B256, BlockNumber, U256};
33
use itertools::Itertools;
44
use reth::{
55
primitives::Account,
6-
providers::{DatabaseProviderRW, OriginalValuesKnown, ProviderResult, StorageLocation},
6+
providers::{OriginalValuesKnown, ProviderResult, StorageLocation},
77
};
88
use reth_db::models::StoredBlockBodyIndices;
99
use signet_evm::BlockResult;
10-
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
10+
use signet_node_types::NodeTypesDbTrait;
1111
use signet_types::primitives::RecoveredBlock;
1212
use signet_zenith::{Passage, Transactor, Zenith};
1313
use std::{collections::BTreeMap, ops::RangeInclusive};
1414
use tracing::trace;
1515

1616
/// Writer for [`Passage::Enter`] events.
17+
#[auto_impl::auto_impl(&, Arc, Box)]
1718
pub trait RuWriter {
1819
/// Get the last block number
1920
fn last_block_number(&self) -> ProviderResult<BlockNumber>;
@@ -254,7 +255,6 @@ pub trait RuWriter {
254255
#[allow(clippy::too_many_arguments)]
255256
fn append_host_block(
256257
&self,
257-
host_height: u64,
258258
header: Option<Zenith::BlockHeader>,
259259
transacts: impl IntoIterator<Item = Transactor::Transact>,
260260
enters: impl IntoIterator<Item = Passage::Enter>,
@@ -293,7 +293,7 @@ pub trait RuWriter {
293293
}
294294

295295
/// Extend the [`DatabaseProviderRW`] with a guarded commit function.
296-
pub trait DbProviderExt<Db>: Into<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>
296+
pub trait DbProviderExt<Db>: Into<SignetDbRw<Db>>
297297
where
298298
Db: NodeTypesDbTrait,
299299
{
@@ -302,18 +302,18 @@ where
302302
/// transaction is rolled back.
303303
fn update(
304304
self,
305-
f: impl FnOnce(&mut DatabaseProviderRW<Db, SignetNodeTypes<Db>>) -> ProviderResult<()>,
305+
f: impl FnOnce(&mut SignetDbRw<Db>) -> ProviderResult<()>,
306306
) -> ProviderResult<()>;
307307
}
308308

309309
impl<T, Db> DbProviderExt<Db> for T
310310
where
311311
Db: NodeTypesDbTrait,
312-
T: Into<DatabaseProviderRW<Db, SignetNodeTypes<Db>>>,
312+
T: Into<SignetDbRw<Db>>,
313313
{
314314
fn update(
315315
self,
316-
f: impl FnOnce(&mut DatabaseProviderRW<Db, SignetNodeTypes<Db>>) -> ProviderResult<()>,
316+
f: impl FnOnce(&mut SignetDbRw<Db>) -> ProviderResult<()>,
317317
) -> ProviderResult<()> {
318318
let mut this = self.into();
319319
f(&mut this)?;

0 commit comments

Comments
 (0)