Skip to content

Commit 800f82e

Browse files
committed
multiple pulse Txs per block
1 parent 40945a9 commit 800f82e

File tree

1 file changed

+85
-57
lines changed

1 file changed

+85
-57
lines changed

pallets/drand/src/lib.rs

Lines changed: 85 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -309,18 +309,21 @@ pub mod pallet {
309309
signature,
310310
&payload.block_number,
311311
&payload.public,
312+
None,
312313
)
313314
}
314315
Call::write_pulse {
315316
pulses_payload: payload,
316317
signature,
317318
} => {
318319
let signature = signature.as_ref().ok_or(InvalidTransaction::BadSigner)?;
320+
let rounds: Vec<RoundNumber> = payload.pulses.iter().map(|p| p.round).collect();
319321
Self::validate_signature_and_parameters(
320322
payload,
321323
signature,
322324
&payload.block_number,
323325
&payload.public,
326+
Some(&rounds),
324327
)
325328
}
326329
_ => InvalidTransaction::Call.into(),
@@ -482,32 +485,37 @@ impl<T: Config> Pallet<T> {
482485
pulses.push(pulse);
483486
}
484487

485-
let signer = Signer::<T, T::AuthorityId>::all_accounts();
486-
487-
let results = signer.send_unsigned_transaction(
488-
|account| PulsesPayload {
489-
block_number,
490-
pulses: pulses.clone(),
491-
public: account.public.clone(),
492-
},
493-
|pulses_payload, signature| Call::write_pulse {
494-
pulses_payload,
495-
signature: Some(signature),
496-
},
497-
);
498-
499-
for (acc, res) in &results {
500-
match res {
501-
Ok(()) => log::debug!(
502-
"Drand: [{:?}] Submitted new pulses up to round: {:?}",
503-
acc.id,
504-
last_stored_round.saturating_add(rounds_to_fetch)
505-
),
506-
Err(e) => log::error!(
507-
"Drand: [{:?}] Failed to submit transaction: {:?}",
508-
acc.id,
509-
e
510-
),
488+
let signer = Signer::<T, T::AuthorityId>::any_account();
489+
490+
// Submit one tx per pulse, ascending rounds.
491+
for pulse in pulses.into_iter() {
492+
let round = pulse.round;
493+
494+
if let Some((acc, res)) = signer.send_unsigned_transaction(
495+
|account| PulsesPayload {
496+
block_number,
497+
pulses: vec![pulse.clone()],
498+
public: account.public.clone(),
499+
},
500+
|pulses_payload, signature| Call::write_pulse {
501+
pulses_payload,
502+
signature: Some(signature),
503+
},
504+
) {
505+
match res {
506+
Ok(()) => log::debug!("Drand: [{:?}] submitted round {:?}", acc.id, round),
507+
Err(e) => log::debug!(
508+
"Drand: [{:?}] failed to submit round {:?}: {:?}",
509+
acc.id,
510+
round,
511+
e
512+
),
513+
}
514+
} else {
515+
log::debug!(
516+
"Drand: No local account available to submit round {:?}",
517+
round
518+
);
511519
}
512520
}
513521
}
@@ -633,54 +641,74 @@ impl<T: Config> Pallet<T> {
633641
signature: &T::Signature,
634642
block_number: &BlockNumberFor<T>,
635643
public: &T::Public,
644+
rounds: Option<&[RoundNumber]>,
636645
) -> TransactionValidity {
637646
let signature_valid =
638647
SignedPayload::<T>::verify::<T::AuthorityId>(payload, signature.clone());
639648
if !signature_valid {
640649
return InvalidTransaction::BadProof.into();
641650
}
642-
Self::validate_transaction_parameters(block_number, public)
651+
Self::validate_transaction_parameters(block_number, public, rounds)
643652
}
644653

645654
fn validate_transaction_parameters(
646655
block_number: &BlockNumberFor<T>,
647656
public: &T::Public,
657+
rounds: Option<&[RoundNumber]>,
648658
) -> TransactionValidity {
649-
// Now let's check if the transaction has any chance to succeed.
650659
let next_unsigned_at = NextUnsignedAt::<T>::get();
651-
if &next_unsigned_at > block_number {
652-
return InvalidTransaction::Stale.into();
653-
}
654-
// Let's make sure to reject transactions from the future.
655660
let current_block = frame_system::Pallet::<T>::block_number();
656-
if &current_block < block_number {
661+
662+
if current_block < *block_number {
657663
return InvalidTransaction::Future.into();
658664
}
659665

660-
let provides_tag = (next_unsigned_at, public.encode()).using_encoded(blake2_256);
661-
662-
ValidTransaction::with_tag_prefix("DrandOffchainWorker")
663-
// We set the priority to the value stored at `UnsignedPriority`.
664-
.priority(T::UnsignedPriority::get())
665-
// This transaction does not require anything else to go before into the pool.
666-
// In theory we could require `previous_unsigned_at` transaction to go first,
667-
// but it's not necessary in our case.
668-
// We set the `provides` tag to be the same as `next_unsigned_at`. This makes
669-
// sure only one transaction produced after `next_unsigned_at` will ever
670-
// get to the transaction pool and will end up in the block.
671-
// We can still have multiple transactions compete for the same "spot",
672-
// and the one with higher priority will replace other one in the pool.
673-
.and_provides(provides_tag)
674-
// The transaction is only valid for next block. After that it's
675-
// going to be revalidated by the pool.
676-
.longevity(1)
677-
// It's fine to propagate that transaction to other peers, which means it can be
678-
// created even by nodes that don't produce blocks.
679-
// Note that sometimes it's better to keep it for yourself (if you are the block
680-
// producer), since for instance in some schemes others may copy your solution and
681-
// claim a reward.
682-
.propagate(true)
683-
.build()
666+
match rounds {
667+
Some(rs) => {
668+
let r_opt = rs.first().copied();
669+
let has_second = rs.get(1).is_some();
670+
let r = match (r_opt, has_second) {
671+
(Some(round), false) => round,
672+
_ => return InvalidTransaction::Call.into(),
673+
};
674+
675+
// Allow multiple unsigned txs in the same block even after the first updates the gate.
676+
if next_unsigned_at > current_block {
677+
return InvalidTransaction::Stale.into();
678+
}
679+
680+
// Drop stale rounds at mempool time to avoid re-including last block's rounds.
681+
let last = LastStoredRound::<T>::get();
682+
if r <= last {
683+
return InvalidTransaction::Stale.into();
684+
}
685+
686+
// Priority favors lower rounds first.
687+
let priority =
688+
T::UnsignedPriority::get().saturating_add(u64::MAX.saturating_sub(r));
689+
690+
ValidTransaction::with_tag_prefix("DrandOffchainWorker")
691+
.priority(priority)
692+
.and_provides((b"drand", r).using_encoded(blake2_256))
693+
.longevity(3)
694+
.propagate(false)
695+
.build()
696+
}
697+
698+
None => {
699+
if next_unsigned_at > *block_number {
700+
return InvalidTransaction::Stale.into();
701+
}
702+
703+
let provides_tag = (next_unsigned_at, public.encode()).using_encoded(blake2_256);
704+
ValidTransaction::with_tag_prefix("DrandOffchainWorker")
705+
.priority(T::UnsignedPriority::get())
706+
.and_provides(provides_tag)
707+
.longevity(1)
708+
.propagate(true)
709+
.build()
710+
}
711+
}
684712
}
685713

686714
fn prune_old_pulses(last_stored_round: RoundNumber) {

0 commit comments

Comments
 (0)