Skip to content

Commit 7e9ea27

Browse files
committed
Submit packages via bitcoind rpc
1 parent 4cbd1bb commit 7e9ea27

File tree

1 file changed

+90
-5
lines changed

1 file changed

+90
-5
lines changed

src/chain/bitcoind.rs

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,8 @@ impl BitcoindChainSource {
532532
}
533533

534534
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
535-
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
536-
// features, we should eventually switch to use `submitpackage` via the
537-
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
538-
// transactions.
539-
for tx in &package {
535+
if package.len() == 1 {
536+
let tx = &package[0];
540537
let txid = tx.compute_txid();
541538
let timeout_fut = tokio::time::timeout(
542539
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
@@ -571,6 +568,39 @@ impl BitcoindChainSource {
571568
);
572569
},
573570
}
571+
} else if package.len() > 1 {
572+
let txids: Vec<_> = package.iter().map(|tx| tx.compute_txid()).collect();
573+
let timeout_fut = tokio::time::timeout(
574+
Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS),
575+
self.api_client.submit_package(&package),
576+
);
577+
match timeout_fut.await {
578+
Ok(res) => match res {
579+
Ok(ids) => {
580+
debug_assert_eq!(ids, txids);
581+
log_trace!(self.logger, "Successfully broadcast package {:?}", txids);
582+
},
583+
Err(e) => {
584+
log_error!(self.logger, "Failed to broadcast package {:?}: {}", txids, e);
585+
log_trace!(self.logger, "Failed broadcast package bytes:");
586+
for tx in package {
587+
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
588+
}
589+
},
590+
},
591+
Err(e) => {
592+
log_error!(
593+
self.logger,
594+
"Failed to broadcast package due to timeout {:?}: {}",
595+
txids,
596+
e
597+
);
598+
log_trace!(self.logger, "Failed broadcast package bytes:");
599+
for tx in package {
600+
log_trace!(self.logger, "{}", log_bytes!(tx.encode()));
601+
}
602+
},
603+
}
574604
}
575605
}
576606
}
@@ -667,6 +697,34 @@ impl BitcoindClient {
667697
rpc_client.call_method::<Txid>("sendrawtransaction", &[tx_json]).await
668698
}
669699

700+
/// Submits the provided package
701+
pub(crate) async fn submit_package(
702+
&self, package: &[Transaction],
703+
) -> std::io::Result<Vec<Txid>> {
704+
match self {
705+
BitcoindClient::Rpc { rpc_client, .. } => {
706+
Self::submit_package_inner(Arc::clone(rpc_client), package).await
707+
},
708+
BitcoindClient::Rest { rpc_client, .. } => {
709+
// Bitcoin Core's REST interface does not support submitting packages
710+
// so we use the RPC client.
711+
Self::submit_package_inner(Arc::clone(rpc_client), package).await
712+
},
713+
}
714+
}
715+
716+
async fn submit_package_inner(
717+
rpc_client: Arc<RpcClient>, package: &[Transaction],
718+
) -> std::io::Result<Vec<Txid>> {
719+
let package_serialized: Vec<_> =
720+
package.iter().map(|tx| bitcoin::consensus::encode::serialize_hex(tx)).collect();
721+
let package_json = serde_json::json!(package_serialized);
722+
rpc_client
723+
.call_method::<SubmitPackageResponse>("submitpackage", &[package_json])
724+
.await
725+
.map(|resp| resp.0)
726+
}
727+
670728
/// Retrieve the fee estimate needed for a transaction to begin
671729
/// confirmation within the provided `num_blocks`.
672730
pub(crate) async fn get_fee_estimate_for_target(
@@ -1302,6 +1360,33 @@ impl TryInto<GetMempoolEntryResponse> for JsonResponse {
13021360
}
13031361
}
13041362

1363+
pub struct SubmitPackageResponse(Vec<Txid>);
1364+
1365+
impl TryInto<SubmitPackageResponse> for JsonResponse {
1366+
type Error = std::io::Error;
1367+
fn try_into(self) -> std::io::Result<SubmitPackageResponse> {
1368+
let tx_results = self.0["tx-results"].as_object().ok_or(std::io::Error::new(
1369+
std::io::ErrorKind::Other,
1370+
"Failed to parse submitpackage response",
1371+
))?;
1372+
let mut txids = Vec::with_capacity(tx_results.len());
1373+
for tx_result in tx_results.values() {
1374+
let txid_string = tx_result["txid"].as_str().ok_or(std::io::Error::new(
1375+
std::io::ErrorKind::Other,
1376+
"Failed to parse submitpackage response",
1377+
))?;
1378+
let txid: Txid = txid_string.parse().map_err(|_| {
1379+
std::io::Error::new(
1380+
std::io::ErrorKind::Other,
1381+
"Failed to parse submitpackage response",
1382+
)
1383+
})?;
1384+
txids.push(txid);
1385+
}
1386+
Ok(SubmitPackageResponse(txids))
1387+
}
1388+
}
1389+
13051390
#[derive(Debug, Clone)]
13061391
pub(crate) struct MempoolEntry {
13071392
/// The transaction id

0 commit comments

Comments
 (0)