@@ -532,11 +532,8 @@ impl BitcoindChainSource {
532532 }
533533
534534 pub ( crate ) async fn process_broadcast_package ( & self , package : Vec < Transaction > ) {
535- // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
536- // features, we should eventually switch to use `submitpackage` via the
537- // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
538- // transactions.
539- for tx in & package {
535+ if package. len ( ) == 1 {
536+ let tx = & package[ 0 ] ;
540537 let txid = tx. compute_txid ( ) ;
541538 let timeout_fut = tokio:: time:: timeout (
542539 Duration :: from_secs ( TX_BROADCAST_TIMEOUT_SECS ) ,
@@ -571,6 +568,48 @@ impl BitcoindChainSource {
571568 ) ;
572569 } ,
573570 }
571+ } else if package. len ( ) > 1 {
572+ let txids: Vec < _ > = package. iter ( ) . map ( |tx| tx. compute_txid ( ) ) . collect ( ) ;
573+ let timeout_fut = tokio:: time:: timeout (
574+ Duration :: from_secs ( TX_BROADCAST_TIMEOUT_SECS ) ,
575+ self . api_client . submit_package ( & package) ,
576+ ) ;
577+ match timeout_fut. await {
578+ Ok ( res) => match res {
579+ Ok ( ( package_msg, mut ids) ) => {
580+ // TODO: seems we don't get the txids back in the same order...
581+ ids. sort_unstable ( ) ;
582+ let mut sorted_txids = txids. clone ( ) ;
583+ sorted_txids. sort_unstable ( ) ;
584+ debug_assert_eq ! ( ids, sorted_txids) ;
585+ log_trace ! (
586+ self . logger,
587+ "Package broadcast message {}, txids: {:?}" ,
588+ package_msg,
589+ txids,
590+ ) ;
591+ } ,
592+ Err ( e) => {
593+ log_error ! ( self . logger, "Failed to broadcast package {:?}: {}" , txids, e) ;
594+ log_trace ! ( self . logger, "Failed broadcast package bytes:" ) ;
595+ for tx in package {
596+ log_trace ! ( self . logger, "{}" , log_bytes!( tx. encode( ) ) ) ;
597+ }
598+ } ,
599+ } ,
600+ Err ( e) => {
601+ log_error ! (
602+ self . logger,
603+ "Failed to broadcast package due to timeout {:?}: {}" ,
604+ txids,
605+ e
606+ ) ;
607+ log_trace ! ( self . logger, "Failed broadcast package bytes:" ) ;
608+ for tx in package {
609+ log_trace ! ( self . logger, "{}" , log_bytes!( tx. encode( ) ) ) ;
610+ }
611+ } ,
612+ }
574613 }
575614 }
576615}
@@ -667,6 +706,34 @@ impl BitcoindClient {
667706 rpc_client. call_method :: < Txid > ( "sendrawtransaction" , & [ tx_json] ) . await
668707 }
669708
709+ /// Submits the provided package
710+ pub ( crate ) async fn submit_package (
711+ & self , package : & [ Transaction ] ,
712+ ) -> std:: io:: Result < ( String , Vec < Txid > ) > {
713+ match self {
714+ BitcoindClient :: Rpc { rpc_client, .. } => {
715+ Self :: submit_package_inner ( Arc :: clone ( rpc_client) , package) . await
716+ } ,
717+ BitcoindClient :: Rest { rpc_client, .. } => {
718+ // Bitcoin Core's REST interface does not support submitting packages
719+ // so we use the RPC client.
720+ Self :: submit_package_inner ( Arc :: clone ( rpc_client) , package) . await
721+ } ,
722+ }
723+ }
724+
725+ async fn submit_package_inner (
726+ rpc_client : Arc < RpcClient > , package : & [ Transaction ] ,
727+ ) -> std:: io:: Result < ( String , Vec < Txid > ) > {
728+ let package_serialized: Vec < _ > =
729+ package. iter ( ) . map ( |tx| bitcoin:: consensus:: encode:: serialize_hex ( tx) ) . collect ( ) ;
730+ let package_json = serde_json:: json!( package_serialized) ;
731+ rpc_client
732+ . call_method :: < SubmitPackageResponse > ( "submitpackage" , & [ package_json] )
733+ . await
734+ . map ( |resp| ( resp. package_msg , resp. txids ) )
735+ }
736+
670737 /// Retrieve the fee estimate needed for a transaction to begin
671738 /// confirmation within the provided `num_blocks`.
672739 pub ( crate ) async fn get_fee_estimate_for_target (
@@ -1302,6 +1369,43 @@ impl TryInto<GetMempoolEntryResponse> for JsonResponse {
13021369 }
13031370}
13041371
1372+ pub struct SubmitPackageResponse {
1373+ package_msg : String ,
1374+ txids : Vec < Txid > ,
1375+ }
1376+
1377+ impl TryInto < SubmitPackageResponse > for JsonResponse {
1378+ type Error = std:: io:: Error ;
1379+ fn try_into ( self ) -> std:: io:: Result < SubmitPackageResponse > {
1380+ let package_msg = self . 0 [ "package_msg" ]
1381+ . as_str ( )
1382+ . ok_or ( std:: io:: Error :: new (
1383+ std:: io:: ErrorKind :: Other ,
1384+ "Failed to parse submitpackage response" ,
1385+ ) ) ?
1386+ . to_string ( ) ;
1387+ let tx_results = self . 0 [ "tx-results" ] . as_object ( ) . ok_or ( std:: io:: Error :: new (
1388+ std:: io:: ErrorKind :: Other ,
1389+ "Failed to parse submitpackage response" ,
1390+ ) ) ?;
1391+ let mut txids = Vec :: with_capacity ( tx_results. len ( ) ) ;
1392+ for tx_result in tx_results. values ( ) {
1393+ let txid_string = tx_result[ "txid" ] . as_str ( ) . ok_or ( std:: io:: Error :: new (
1394+ std:: io:: ErrorKind :: Other ,
1395+ "Failed to parse submitpackage response" ,
1396+ ) ) ?;
1397+ let txid: Txid = txid_string. parse ( ) . map_err ( |_| {
1398+ std:: io:: Error :: new (
1399+ std:: io:: ErrorKind :: Other ,
1400+ "Failed to parse submitpackage response" ,
1401+ )
1402+ } ) ?;
1403+ txids. push ( txid) ;
1404+ }
1405+ Ok ( SubmitPackageResponse { package_msg, txids } )
1406+ }
1407+ }
1408+
13051409#[ derive( Debug , Clone ) ]
13061410pub ( crate ) struct MempoolEntry {
13071411 /// The transaction id
0 commit comments