Skip to content

Commit bbcb988

Browse files
author
kamillecao
committed
expose low level api about builder and sender.
1 parent d7e9606 commit bbcb988

File tree

1 file changed

+39
-27
lines changed

1 file changed

+39
-27
lines changed

src/insert.rs

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,14 @@ impl RowsBuilder {
9292
}
9393

9494
/// inserted rows sender
95-
pub struct RowsSender {
95+
pub struct RowsSender<T> {
9696
state: RowsSenderState,
9797
send_timeout: Option<Duration>,
9898
end_timeout: Option<Duration>,
9999
// Use boxed `Sleep` to reuse a timer entry, it improves performance.
100100
// Also, `tokio::time::timeout()` significantly increases a future's size.
101101
sleep: Pin<Box<Sleep>>,
102+
_marker: PhantomData<fn() -> T>, // TODO: test contravariance.
102103
}
103104

104105
// It should be a regular function, but it decreases performance.
@@ -115,7 +116,30 @@ macro_rules! timeout {
115116
}};
116117
}
117118

118-
impl RowsSender {
119+
impl<T> RowsSender<T> {
120+
pub fn new(client: &Client, table: &str) -> Self
121+
where
122+
T: Row,
123+
{
124+
let fields = row::join_column_names::<T>()
125+
.expect("the row type must be a struct or a wrapper around it");
126+
127+
// TODO: what about escaping a table name?
128+
// https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
129+
let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields);
130+
131+
Self {
132+
state: RowsSenderState::NotStarted {
133+
client: Box::new(client.clone()),
134+
sql,
135+
},
136+
send_timeout: None,
137+
end_timeout: None,
138+
sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))),
139+
_marker: PhantomData,
140+
}
141+
}
142+
119143
/// Send one chunk and keep sender active
120144
pub async fn send_chunk(&mut self, chunk: Bytes) -> Result<()> {
121145
debug_assert!(matches!(self.state, RowsSenderState::Active { .. }));
@@ -279,7 +303,7 @@ impl RowsSender {
279303
}
280304
}
281305

282-
pub enum RowsSenderState {
306+
pub(crate) enum RowsSenderState {
283307
NotStarted {
284308
client: Box<Client>,
285309
sql: String,
@@ -347,8 +371,7 @@ impl RowsSenderState {
347371
#[must_use]
348372
pub struct Insert<T> {
349373
builder: RowsBuilder,
350-
sender: RowsSender,
351-
_marker: PhantomData<fn() -> T>, // TODO: test contravariance.
374+
sender: RowsSender<T>,
352375
}
353376

354377
impl<T> Insert<T> {
@@ -357,33 +380,14 @@ impl<T> Insert<T> {
357380
where
358381
T: Row,
359382
{
360-
let fields = row::join_column_names::<T>()
361-
.expect("the row type must be a struct or a wrapper around it");
362-
363-
// TODO: what about escaping a table name?
364-
// https://clickhouse.com/docs/en/sql-reference/syntax#identifiers
365-
let sql = format!("INSERT INTO {}({}) FORMAT RowBinary", table, fields);
383+
let sender = RowsSender::new(client, table);
366384

367385
#[cfg(feature = "lz4")]
368386
let builder = RowsBuilder::new(client.compression);
369387
#[cfg(not(feature = "lz4"))]
370388
let builder = RowsBuilder::new();
371389

372-
let sender = RowsSender {
373-
state: RowsSenderState::NotStarted {
374-
client: Box::new(client.clone()),
375-
sql,
376-
},
377-
send_timeout: None,
378-
end_timeout: None,
379-
sleep: Box::pin(tokio::time::sleep(Duration::new(0, 0))),
380-
};
381-
382-
Ok(Self {
383-
sender,
384-
builder,
385-
_marker: PhantomData,
386-
})
390+
Ok(Self { sender, builder })
387391
}
388392

389393
/// Sets timeouts for different operations.
@@ -405,10 +409,18 @@ impl<T> Insert<T> {
405409
send_timeout: Option<Duration>,
406410
end_timeout: Option<Duration>,
407411
) -> Self {
408-
self.sender.set_timeouts(send_timeout, end_timeout);
412+
self.set_timeouts(send_timeout, end_timeout);
409413
self
410414
}
411415

416+
pub(crate) fn set_timeouts(
417+
&mut self,
418+
send_timeout: Option<Duration>,
419+
end_timeout: Option<Duration>,
420+
) {
421+
self.sender.set_timeouts(send_timeout, end_timeout);
422+
}
423+
412424
/// Similar to [`Client::with_option`], but for this particular INSERT
413425
/// statement only.
414426
///

0 commit comments

Comments
 (0)