Skip to content

Commit 64031b6

Browse files
authored
Add tracing spans to validator client duty cycles (sigp#8482)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com> Co-Authored-By: Jimmy Chen <jimmy@sigmaprime.io>
1 parent 7cee5d6 commit 64031b6

File tree

3 files changed

+63
-6
lines changed

3 files changed

+63
-6
lines changed

validator_client/validator_services/src/attestation_service.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::ops::Deref;
88
use std::sync::Arc;
99
use task_executor::TaskExecutor;
1010
use tokio::time::{Duration, Instant, sleep, sleep_until};
11-
use tracing::{debug, error, info, trace, warn};
11+
use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
1212
use tree_hash::TreeHash;
1313
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot};
1414
use validator_store::{Error as ValidatorStoreError, ValidatorStore};
@@ -243,6 +243,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
243243
///
244244
/// The given `validator_duties` should already be filtered to only contain those that match
245245
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
246+
#[instrument(
247+
name = "attestation_duty_cycle",
248+
skip_all,
249+
fields(%slot, %committee_index)
250+
)]
246251
async fn publish_attestations_and_aggregates(
247252
self,
248253
slot: Slot,
@@ -328,6 +333,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
328333
///
329334
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
330335
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
336+
#[instrument(skip_all, fields(%slot, %committee_index))]
331337
async fn produce_and_publish_attestations(
332338
&self,
333339
slot: Slot,
@@ -357,6 +363,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
357363
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
358364
.map(|result| result.data)
359365
})
366+
.instrument(info_span!("fetch_attestation_data"))
360367
.await
361368
.map_err(|e| e.to_string())?;
362369

@@ -439,6 +446,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
439446

440447
// Execute all the futures in parallel, collecting any successful results.
441448
let (ref attestations, ref validator_indices): (Vec<_>, Vec<_>) = join_all(signing_futures)
449+
.instrument(info_span!(
450+
"sign_attestations",
451+
count = validator_duties.len()
452+
))
442453
.await
443454
.into_iter()
444455
.flatten()
@@ -487,6 +498,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
487498
.post_beacon_pool_attestations_v2::<S::E>(single_attestations, fork_name)
488499
.await
489500
})
501+
.instrument(info_span!(
502+
"publish_attestations",
503+
count = attestations.len()
504+
))
490505
.await
491506
{
492507
Ok(()) => info!(
@@ -523,6 +538,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
523538
/// Only one aggregated `Attestation` is downloaded from the BN. It is then cloned and signed
524539
/// by each validator and the list of individually-signed `SignedAggregateAndProof` objects is
525540
/// returned to the BN.
541+
#[instrument(skip_all, fields(slot = %attestation_data.slot, %committee_index))]
526542
async fn produce_and_publish_aggregates(
527543
&self,
528544
attestation_data: &AttestationData,
@@ -575,6 +591,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
575591
.map(|result| result.data)
576592
}
577593
})
594+
.instrument(info_span!("fetch_aggregate_attestation"))
578595
.await
579596
.map_err(|e| e.to_string())?;
580597

@@ -617,7 +634,12 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
617634
});
618635

619636
// Execute all the futures in parallel, collecting any successful results.
637+
let aggregator_count = validator_duties
638+
.iter()
639+
.filter(|d| d.selection_proof.is_some())
640+
.count();
620641
let signed_aggregate_and_proofs = join_all(signing_futures)
642+
.instrument(info_span!("sign_aggregates", count = aggregator_count))
621643
.await
622644
.into_iter()
623645
.flatten()
@@ -647,6 +669,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
647669
.await
648670
}
649671
})
672+
.instrument(info_span!(
673+
"publish_aggregates",
674+
count = signed_aggregate_and_proofs.len()
675+
))
650676
.await
651677
{
652678
Ok(()) => {

validator_client/validator_services/src/block_service.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::Arc;
1111
use std::time::Duration;
1212
use task_executor::TaskExecutor;
1313
use tokio::sync::mpsc;
14-
use tracing::{debug, error, info, trace, warn};
14+
use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
1515
use types::{BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes, Slot};
1616
use validator_store::{Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore};
1717

@@ -320,6 +320,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
320320
}
321321

322322
#[allow(clippy::too_many_arguments)]
323+
#[instrument(skip_all, fields(%slot, ?validator_pubkey))]
323324
async fn sign_and_publish_block(
324325
&self,
325326
proposer_fallback: ProposerFallback<T>,
@@ -333,6 +334,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
333334
let res = self
334335
.validator_store
335336
.sign_block(*validator_pubkey, unsigned_block, slot)
337+
.instrument(info_span!("sign_block"))
336338
.await;
337339

338340
let signed_block = match res {
@@ -389,6 +391,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
389391
Ok(())
390392
}
391393

394+
#[instrument(
395+
name = "block_proposal_duty_cycle",
396+
skip_all,
397+
fields(%slot, ?validator_pubkey)
398+
)]
392399
async fn publish_block(
393400
self,
394401
slot: Slot,
@@ -483,6 +490,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
483490
Ok(())
484491
}
485492

493+
#[instrument(skip_all)]
486494
async fn publish_signed_block_contents(
487495
&self,
488496
signed_block: &SignedBlock<S::E>,
@@ -518,6 +526,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> BlockService<S, T> {
518526
Ok::<_, BlockError>(())
519527
}
520528

529+
#[instrument(skip_all, fields(%slot))]
521530
async fn get_validator_block(
522531
beacon_node: &BeaconNodeHttpClient,
523532
slot: Slot,

validator_client/validator_services/src/sync_committee_service.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::Arc;
1111
use std::sync::atomic::{AtomicBool, Ordering};
1212
use task_executor::TaskExecutor;
1313
use tokio::time::{Duration, Instant, sleep, sleep_until};
14-
use tracing::{debug, error, info, trace, warn};
14+
use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn};
1515
use types::{
1616
ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription,
1717
SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId,
@@ -208,7 +208,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
208208
.publish_sync_committee_signatures(slot, block_root, validator_duties)
209209
.map(|_| ())
210210
.await
211-
},
211+
}
212+
.instrument(info_span!("sync_committee_signature_publish", %slot)),
212213
"sync_committee_signature_publish",
213214
);
214215

@@ -225,14 +226,16 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
225226
)
226227
.map(|_| ())
227228
.await
228-
},
229+
}
230+
.instrument(info_span!("sync_committee_aggregate_publish", %slot)),
229231
"sync_committee_aggregate_publish",
230232
);
231233

232234
Ok(())
233235
}
234236

235237
/// Publish sync committee signatures.
238+
#[instrument(skip_all, fields(%slot, ?beacon_block_root))]
236239
async fn publish_sync_committee_signatures(
237240
&self,
238241
slot: Slot,
@@ -277,6 +280,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
277280

278281
// Execute all the futures in parallel, collecting any successful results.
279282
let committee_signatures = &join_all(signature_futures)
283+
.instrument(info_span!(
284+
"sign_sync_signatures",
285+
count = validator_duties.len()
286+
))
280287
.await
281288
.into_iter()
282289
.flatten()
@@ -288,6 +295,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
288295
.post_beacon_pool_sync_committee_signatures(committee_signatures)
289296
.await
290297
})
298+
.instrument(info_span!(
299+
"publish_sync_signatures",
300+
count = committee_signatures.len()
301+
))
291302
.await
292303
.map_err(|e| {
293304
error!(
@@ -328,7 +339,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
328339
)
329340
.map(|_| ())
330341
.await
331-
},
342+
}
343+
.instrument(info_span!("publish_sync_committee_aggregate_for_subnet", %slot, ?beacon_block_root, %subnet_id)),
332344
"sync_committee_aggregate_publish_subnet",
333345
);
334346
}
@@ -357,6 +369,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
357369
.get_validator_sync_committee_contribution(&sync_contribution_data)
358370
.await
359371
})
372+
.instrument(info_span!("fetch_sync_contribution"))
360373
.await
361374
.map_err(|e| {
362375
crit!(
@@ -372,6 +385,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
372385
.data;
373386

374387
// Create futures to produce signed contributions.
388+
let aggregator_count = subnet_aggregators.len();
375389
let signature_futures = subnet_aggregators.into_iter().map(
376390
|(aggregator_index, aggregator_pk, selection_proof)| async move {
377391
match self
@@ -405,6 +419,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
405419

406420
// Execute all the futures in parallel, collecting any successful results.
407421
let signed_contributions = &join_all(signature_futures)
422+
.instrument(info_span!(
423+
"sign_sync_contributions",
424+
count = aggregator_count
425+
))
408426
.await
409427
.into_iter()
410428
.flatten()
@@ -417,6 +435,10 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> SyncCommitteeService<S
417435
.post_validator_contribution_and_proofs(signed_contributions)
418436
.await
419437
})
438+
.instrument(info_span!(
439+
"publish_sync_contributions",
440+
count = signed_contributions.len()
441+
))
420442
.await
421443
.map_err(|e| {
422444
error!(

0 commit comments

Comments
 (0)