@@ -582,11 +582,8 @@ impl BitcoindChainSource {
582582 }
583583
584584 pub ( crate ) async fn process_broadcast_package ( & self , package : Vec < Transaction > ) {
585- // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
586- // features, we should eventually switch to use `submitpackage` via the
587- // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
588- // transactions.
589- for tx in & package {
585+ if package. len ( ) == 1 {
586+ let tx = & package[ 0 ] ;
590587 let txid = tx. compute_txid ( ) ;
591588 let timeout_fut = tokio:: time:: timeout (
592589 Duration :: from_secs ( TX_BROADCAST_TIMEOUT_SECS ) ,
@@ -621,6 +618,48 @@ impl BitcoindChainSource {
621618 ) ;
622619 } ,
623620 }
621+ } else if package. len ( ) > 1 {
622+ let txids: Vec < _ > = package. iter ( ) . map ( |tx| tx. compute_txid ( ) ) . collect ( ) ;
623+ let timeout_fut = tokio:: time:: timeout (
624+ Duration :: from_secs ( TX_BROADCAST_TIMEOUT_SECS ) ,
625+ self . api_client . submit_package ( & package) ,
626+ ) ;
627+ match timeout_fut. await {
628+ Ok ( res) => match res {
629+ Ok ( ( package_msg, mut ids) ) => {
630+ // TODO: seems we don't get the txids back in the same order...
631+ ids. sort_unstable ( ) ;
632+ let mut sorted_txids = txids. clone ( ) ;
633+ sorted_txids. sort_unstable ( ) ;
634+ debug_assert_eq ! ( ids, sorted_txids) ;
635+ log_trace ! (
636+ self . logger,
637+ "Package broadcast message {}, txids: {:?}" ,
638+ package_msg,
639+ txids,
640+ ) ;
641+ } ,
642+ Err ( e) => {
643+ log_error ! ( self . logger, "Failed to broadcast package {:?}: {}" , txids, e) ;
644+ log_trace ! ( self . logger, "Failed broadcast package bytes:" ) ;
645+ for tx in package {
646+ log_trace ! ( self . logger, "{}" , log_bytes!( tx. encode( ) ) ) ;
647+ }
648+ } ,
649+ } ,
650+ Err ( e) => {
651+ log_error ! (
652+ self . logger,
653+ "Failed to broadcast package due to timeout {:?}: {}" ,
654+ txids,
655+ e
656+ ) ;
657+ log_trace ! ( self . logger, "Failed broadcast package bytes:" ) ;
658+ for tx in package {
659+ log_trace ! ( self . logger, "{}" , log_bytes!( tx. encode( ) ) ) ;
660+ }
661+ } ,
662+ }
624663 }
625664 }
626665}
@@ -717,6 +756,34 @@ impl BitcoindClient {
717756 rpc_client. call_method :: < Txid > ( "sendrawtransaction" , & [ tx_json] ) . await
718757 }
719758
759+ /// Submits the provided package
760+ pub ( crate ) async fn submit_package (
761+ & self , package : & [ Transaction ] ,
762+ ) -> std:: io:: Result < ( String , Vec < Txid > ) > {
763+ match self {
764+ BitcoindClient :: Rpc { rpc_client, .. } => {
765+ Self :: submit_package_inner ( Arc :: clone ( rpc_client) , package) . await
766+ } ,
767+ BitcoindClient :: Rest { rpc_client, .. } => {
768+ // Bitcoin Core's REST interface does not support submitting packages
769+ // so we use the RPC client.
770+ Self :: submit_package_inner ( Arc :: clone ( rpc_client) , package) . await
771+ } ,
772+ }
773+ }
774+
775+ async fn submit_package_inner (
776+ rpc_client : Arc < RpcClient > , package : & [ Transaction ] ,
777+ ) -> std:: io:: Result < ( String , Vec < Txid > ) > {
778+ let package_serialized: Vec < _ > =
779+ package. iter ( ) . map ( |tx| bitcoin:: consensus:: encode:: serialize_hex ( tx) ) . collect ( ) ;
780+ let package_json = serde_json:: json!( package_serialized) ;
781+ rpc_client
782+ . call_method :: < SubmitPackageResponse > ( "submitpackage" , & [ package_json] )
783+ . await
784+ . map ( |resp| ( resp. package_msg , resp. txids ) )
785+ }
786+
720787 /// Retrieve the fee estimate needed for a transaction to begin
721788 /// confirmation within the provided `num_blocks`.
722789 pub ( crate ) async fn get_fee_estimate_for_target (
@@ -1352,6 +1419,43 @@ impl TryInto<GetMempoolEntryResponse> for JsonResponse {
13521419 }
13531420}
13541421
1422+ pub struct SubmitPackageResponse {
1423+ package_msg : String ,
1424+ txids : Vec < Txid > ,
1425+ }
1426+
1427+ impl TryInto < SubmitPackageResponse > for JsonResponse {
1428+ type Error = std:: io:: Error ;
1429+ fn try_into ( self ) -> std:: io:: Result < SubmitPackageResponse > {
1430+ let package_msg = self . 0 [ "package_msg" ]
1431+ . as_str ( )
1432+ . ok_or ( std:: io:: Error :: new (
1433+ std:: io:: ErrorKind :: Other ,
1434+ "Failed to parse submitpackage response" ,
1435+ ) ) ?
1436+ . to_string ( ) ;
1437+ let tx_results = self . 0 [ "tx-results" ] . as_object ( ) . ok_or ( std:: io:: Error :: new (
1438+ std:: io:: ErrorKind :: Other ,
1439+ "Failed to parse submitpackage response" ,
1440+ ) ) ?;
1441+ let mut txids = Vec :: with_capacity ( tx_results. len ( ) ) ;
1442+ for tx_result in tx_results. values ( ) {
1443+ let txid_string = tx_result[ "txid" ] . as_str ( ) . ok_or ( std:: io:: Error :: new (
1444+ std:: io:: ErrorKind :: Other ,
1445+ "Failed to parse submitpackage response" ,
1446+ ) ) ?;
1447+ let txid: Txid = txid_string. parse ( ) . map_err ( |_| {
1448+ std:: io:: Error :: new (
1449+ std:: io:: ErrorKind :: Other ,
1450+ "Failed to parse submitpackage response" ,
1451+ )
1452+ } ) ?;
1453+ txids. push ( txid) ;
1454+ }
1455+ Ok ( SubmitPackageResponse { package_msg, txids } )
1456+ }
1457+ }
1458+
13551459#[ derive( Debug , Clone ) ]
13561460pub ( crate ) struct MempoolEntry {
13571461 /// The transaction id
0 commit comments