Skip to content

Commit 9eb805f

Browse files
ryleung-solanaaeyakovenko
authored andcommitted
Send async quic batch of txs (solana-labs#24298)
Add an interface send_wire_transaction_batch_async to TpuConnection to allow for sending batches without waiting for completion Co-authored-by: Anatoly Yakovenko <anatoly@solana.com>
1 parent 138446c commit 9eb805f

File tree

5 files changed

+67
-15
lines changed

5 files changed

+67
-15
lines changed

client/src/connection_cache.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,25 @@ pub fn send_wire_transaction_async(
256256
r
257257
}
258258

259+
pub fn send_wire_transaction_batch_async(
260+
packets: Vec<Vec<u8>>,
261+
addr: &SocketAddr,
262+
) -> Result<(), TransportError> {
263+
let (conn, stats) = get_connection(addr);
264+
let client_stats = Arc::new(ClientStats::default());
265+
let len = packets.len();
266+
let r = match conn {
267+
Connection::Udp(conn) => {
268+
conn.send_wire_transaction_batch_async(packets, client_stats.clone())
269+
}
270+
Connection::Quic(conn) => {
271+
conn.send_wire_transaction_batch_async(packets, client_stats.clone())
272+
}
273+
};
274+
stats.add_client_stats(&client_stats, len, r.is_ok());
275+
r
276+
}
277+
259278
pub fn send_wire_transaction(
260279
wire_transaction: &[u8],
261280
addr: &SocketAddr,

client/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
#[macro_use]
33
extern crate serde_derive;
44

5+
#[macro_use]
6+
extern crate solana_metrics;
7+
58
pub mod blockhash_query;
69
pub mod client_error;
710
pub mod connection_cache;
@@ -27,9 +30,6 @@ pub mod tpu_connection;
2730
pub mod transaction_executor;
2831
pub mod udp_client;
2932

30-
#[macro_use]
31-
extern crate solana_metrics;
32-
3333
pub mod mock_sender_for_cli {
3434
/// Magic `SIGNATURE` value used by `solana-cli` unit tests.
3535
/// Please don't use this constant.

client/src/quic_client.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,31 @@ impl TpuConnection for QuicTpuConnection {
119119
stats: Arc<ClientStats>,
120120
) -> TransportResult<()> {
121121
let _guard = RUNTIME.enter();
122-
//drop and detach the task
123122
let client = self.client.clone();
124-
inc_new_counter_info!("send_wire_transaction_async", 1);
123+
//drop and detach the task
125124
let _ = RUNTIME.spawn(async move {
126125
let send_buffer = client.send_buffer(wire_transaction, &stats);
127126
if let Err(e) = send_buffer.await {
128-
inc_new_counter_warn!("send_wire_transaction_async_fail", 1);
129127
warn!("Failed to send transaction async to {:?}", e);
130-
} else {
131-
inc_new_counter_info!("send_wire_transaction_async_pass", 1);
128+
datapoint_warn!("send-wire-async", ("failure", 1, i64),);
129+
}
130+
});
131+
Ok(())
132+
}
133+
134+
fn send_wire_transaction_batch_async(
135+
&self,
136+
buffers: Vec<Vec<u8>>,
137+
stats: Arc<ClientStats>,
138+
) -> TransportResult<()> {
139+
let _guard = RUNTIME.enter();
140+
let client = self.client.clone();
141+
//drop and detach the task
142+
let _ = RUNTIME.spawn(async move {
143+
let send_batch = client.send_batch(&buffers, &stats);
144+
if let Err(e) = send_batch.await {
145+
warn!("Failed to send transaction batch async to {:?}", e);
146+
datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),);
132147
}
133148
});
134149
Ok(())
@@ -269,13 +284,16 @@ impl QuicClient {
269284
.iter()
270285
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
271286

272-
let futures = chunks.into_iter().map(|buffs| {
273-
join_all(
274-
buffs
275-
.into_iter()
276-
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
277-
)
278-
});
287+
let futures: Vec<_> = chunks
288+
.into_iter()
289+
.map(|buffs| {
290+
join_all(
291+
buffs
292+
.into_iter()
293+
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
294+
)
295+
})
296+
.collect();
279297

280298
for f in futures {
281299
f.await.into_iter().try_for_each(|res| res)?;

client/src/tpu_connection.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,10 @@ pub trait TpuConnection {
7070
) -> TransportResult<()>
7171
where
7272
T: AsRef<[u8]>;
73+
74+
fn send_wire_transaction_batch_async(
75+
&self,
76+
buffers: Vec<Vec<u8>>,
77+
stats: Arc<ClientStats>,
78+
) -> TransportResult<()>;
7379
}

client/src/udp_client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,13 @@ impl TpuConnection for UdpTpuConnection {
6262
batch_send(&self.socket, &pkts)?;
6363
Ok(())
6464
}
65+
fn send_wire_transaction_batch_async(
66+
&self,
67+
buffers: Vec<Vec<u8>>,
68+
_stats: Arc<ClientStats>,
69+
) -> TransportResult<()> {
70+
let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect();
71+
batch_send(&self.socket, &pkts)?;
72+
Ok(())
73+
}
6574
}

0 commit comments

Comments
 (0)