Skip to content

Commit f8f8522

Browse files
authored
feat: add allocation reconciliation (#882)
* feat(tap-agent): add periodic allocation reconciliation Add a background task that periodically triggers allocation reconciliation to ensure recovery after subgraph connectivity issues. Previously, the allocation watcher was purely event-driven and only fired when the allocation list changed. If allocations were closed during a connectivity outage and the list became static afterward, no messages would fire and stale SenderAllocation actors would keep running, never triggering mark_rav_last(). This change adds: - New config option (default: 5 min) - ReconcileAllocations message type for SenderAccount - Periodic task that sends ReconcileAllocations every interval - Handler that forces re-check of all allocations against the watcher This ensures closed allocations are detected and RAVs are properly marked as 'last' for redemption, even after connectivity issues. * test(tap-agent): add tests and validation for allocation reconciliation - Add unit test for ReconcileAllocations message handling that verifies it triggers UpdateAllocationIds with current allocations from watcher - Add config validation for allocation_reconciliation_interval_secs: error if 0, warn if < 60s - Add test for periodic task lifecycle verifying task spawn/abort - Update create_sender_account() test helper to expose indexer_allocations_tx and configurable reconciliation interval - Add tokio test-util feature for time control in tests * chore(tap-agent): add observability improvements for allocation reconciliation - Add ALLOCATION_RECONCILIATION_RUNS counter metric - Elevate reconciliation logs from debug to info - Enhance config documentation explaining the connectivity failure scenario
1 parent d9e7d3e commit f8f8522

File tree

6 files changed

+401
-20
lines changed

6 files changed

+401
-20
lines changed

crates/config/src/config.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,20 @@ impl Config {
226226
);
227227
}
228228

229+
if self.tap.allocation_reconciliation_interval_secs == Duration::ZERO {
230+
return Err(
231+
"tap.allocation_reconciliation_interval_secs must be greater than 0".to_string(),
232+
);
233+
}
234+
235+
if self.tap.allocation_reconciliation_interval_secs < Duration::from_secs(60) {
236+
tracing::warn!(
237+
"Your `tap.allocation_reconciliation_interval_secs` value is too low. \
238+
This may cause unnecessary load on the system. \
239+
A recommended value is at least 60 seconds."
240+
);
241+
}
242+
229243
// Horizon configuration validation
230244
// Explicit toggle via `horizon.enabled`. When enabled, require both
231245
// `blockchain.subgraph_service_address` and
@@ -441,6 +455,23 @@ pub struct TapConfig {
441455
/// over the escrow balance
442456
#[serde(default)]
443457
pub trusted_senders: HashSet<Address>,
458+
459+
/// Interval in seconds for periodic allocation reconciliation.
460+
///
461+
/// Allocation state is normally updated via watcher events from the network subgraph.
462+
/// However, if connectivity to the subgraph is lost, allocation closure events may be
463+
/// missed. This periodic reconciliation forces a re-check of all allocations against
464+
/// the current subgraph state, ensuring stale allocations are detected and processed
465+
/// even after connectivity failures.
466+
///
467+
/// Default: 300 (5 minutes)
468+
#[serde(default = "default_allocation_reconciliation_interval_secs")]
469+
#[serde_as(as = "DurationSecondsWithFrac<f64>")]
470+
pub allocation_reconciliation_interval_secs: Duration,
471+
}
472+
473+
fn default_allocation_reconciliation_interval_secs() -> Duration {
474+
Duration::from_secs(300)
444475
}
445476

446477
#[derive(Debug, Deserialize)]

crates/tap-agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,4 @@ test-log = { workspace = true, features = ["trace"] }
6767
rstest.workspace = true
6868
stdext.workspace = true
6969
insta.workspace = true
70+
tokio = { workspace = true, features = ["test-util"] }

0 commit comments

Comments
 (0)