Skip to content

Commit b4a9d70

Browse files
authored
Merge pull request #443 from theory/insert-block
Add BeginInsert/InsertData/EndInsert flow
2 parents 6919524 + 67e923e commit b4a9d70

File tree

5 files changed

+309
-9
lines changed

5 files changed

+309
-9
lines changed

README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,47 @@ target_link_libraries(${PROJECT_NAME} PRIVATE clickhouse-cpp-lib)
157157
- run `rm -rf build && cmake -B build -S . && cmake --build build -j32` to remove remainders of the previous builds, run CMake and build the
158158
application. The generated binary is located in location `build/application-example`.
159159

160+
## Batch Insertion
161+
162+
In addition to the `Insert` method, which inserts all the data in a block in a
163+
single call, you can use the `BeginInsert` / `InsertData` / `EndInsert`
164+
pattern to insert batches of data. This can be useful for managing larger data
165+
sets without inflating memory with the entire set.
166+
167+
To use it pass `BeginInsert` an `INSERT` statement ending in `VALUES` but with
168+
no actual values. Use the resulting `Block` to append batches of data, sending
169+
each to the sever with `InsertData`. Finally, call `EndInsert` (or let the
170+
client go out of scope) to signal the server that insertion is complete.
171+
Example:
172+
173+
```cpp
174+
// Start the insertion.
175+
auto block = client->BeginInsert("INSERT INTO foo (id, name) VALUES");
176+
177+
// Grab the columns from the block.
178+
auto col1 = block[0]->As<ColumnUInt64>();
179+
auto col2 = block[1]->As<ColumnString>();
180+
181+
// Add a couple of records to the block.
182+
col1.Append(1);
183+
col1.Append(2);
184+
col2.Append("holden");
185+
col2.Append("naomi");
186+
187+
// Send those records.
188+
block.RefreshRowCount();
189+
client->InsertData(block);
190+
block.Clear();
191+
192+
// Add another record.
193+
col1.Append(3);
194+
col2.Append("amos");
195+
196+
// Send it and finish.
197+
block.RefreshRowCount();
198+
client->EndInsert(block);
199+
```
200+
160201
## Thread-safety
161202
⚠ Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques. ⚠
162203

clickhouse/block.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ class Block {
8585
return columns_.at(idx).name;
8686
}
8787

88-
/// Convinience method to wipe out all rows from all columns
88+
/// Convenience method to wipe out all rows from all columns
8989
void Clear();
9090

91-
/// Convinience method to do Reserve() on all columns
91+
/// Convenience method to do Reserve() on all columns
9292
void Reserve(size_t new_cap);
9393

9494
/// Reference to column by index in the block.

clickhouse/client.cpp

Lines changed: 163 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ class Client::Impl {
161161

162162
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
163163

164+
Block BeginInsert(Query query);
165+
166+
void InsertData(const Block& block);
167+
168+
void EndInsert();
169+
164170
void Ping();
165171

166172
void ResetConnection();
@@ -175,6 +181,7 @@ class Client::Impl {
175181
bool Handshake();
176182

177183
bool ReceivePacket(uint64_t* server_packet = nullptr);
184+
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);
178185

179186
void SendQuery(const Query& query, bool finalize = true);
180187
void FinalizeQuery();
@@ -208,6 +215,7 @@ class Client::Impl {
208215
}
209216

210217
private:
218+
bool inserting;
211219
/// In case of network errors tries to reconnect to server and
212220
/// call fuc several times.
213221
void RetryGuard(std::function<void()> func);
@@ -280,10 +288,15 @@ Client::Impl::Impl(const ClientOptions& opts,
280288
}
281289
}
282290

283-
Client::Impl::~Impl()
284-
{ }
291+
Client::Impl::~Impl() {
292+
// Wrap up an insert if one is in progress.
293+
EndInsert();
294+
}
285295

286296
void Client::Impl::ExecuteQuery(Query query) {
297+
if (inserting) {
298+
throw ProtocolError("cannot execute query while inserting");
299+
}
287300
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
288301

289302
if (options_.ping_before_query) {
@@ -299,6 +312,9 @@ void Client::Impl::ExecuteQuery(Query query) {
299312

300313

301314
void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
315+
if (inserting) {
316+
throw ProtocolError("cannot execute query while inserting");
317+
}
302318
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
303319
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
304320
}
@@ -362,6 +378,9 @@ std::string NameToQueryString(const std::string &input)
362378
}
363379

364380
void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
381+
if (inserting) {
382+
throw ProtocolError("cannot execute query while inserting");
383+
}
365384
if (options_.ping_before_query) {
366385
RetryGuard([this]() { Ping(); });
367386
}
@@ -397,10 +416,24 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
397416
}
398417

399418
// Send data.
419+
inserting = true;
400420
SendData(block);
401-
// Send empty block as marker of
402-
// end of data.
421+
EndInsert();
422+
}
423+
424+
void Client::Impl::InsertData(const Block& block) {
425+
if (!inserting) {
426+
throw ProtocolError("illegal call to InsertData without first calling BeginInsert");
427+
}
428+
SendData(block);
429+
}
430+
431+
void Client::Impl::EndInsert() {
432+
if (!inserting) return;
433+
434+
// Send empty block as marker of end of data.
403435
SendData(Block());
436+
inserting = false;
404437

405438
// Wait for EOS.
406439
uint64_t eos_packet{0};
@@ -416,6 +449,9 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
416449
}
417450

418451
void Client::Impl::Ping() {
452+
if (inserting) {
453+
throw ProtocolError("cannot execute query while inserting");
454+
}
419455
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
420456
output_->Flush();
421457

@@ -429,6 +465,7 @@ void Client::Impl::Ping() {
429465

430466
void Client::Impl::ResetConnection() {
431467
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
468+
inserting = false;
432469

433470
if (!Handshake()) {
434471
throw ProtocolError("fail to connect to " + options_.host);
@@ -648,6 +685,78 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
648685
}
649686
}
650687

688+
bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
689+
uint64_t packet_type = 0;
690+
691+
while (true) {
692+
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
693+
throw std::runtime_error("unexpected package type " +
694+
std::to_string((int)packet_type) + " for insert query");
695+
}
696+
if (server_packet) {
697+
*server_packet = packet_type;
698+
}
699+
700+
switch (packet_type) {
701+
case ServerCodes::Data: {
702+
if (!ReceiveData()) {
703+
throw ProtocolError("can't read data packet from input stream");
704+
}
705+
return true;
706+
}
707+
708+
case ServerCodes::Exception: {
709+
ReceiveException();
710+
return false;
711+
}
712+
713+
case ServerCodes::ProfileInfo:
714+
case ServerCodes::Progress:
715+
case ServerCodes::Pong:
716+
case ServerCodes::Hello:
717+
continue;
718+
719+
case ServerCodes::Log: {
720+
// log tag
721+
if (!WireFormat::SkipString(*input_)) {
722+
return false;
723+
}
724+
Block block;
725+
726+
// Use uncompressed stream since log blocks usually contain only one row
727+
if (!ReadBlock(*input_, &block)) {
728+
return false;
729+
}
730+
731+
if (events_) {
732+
events_->OnServerLog(block);
733+
}
734+
continue;
735+
}
736+
737+
case ServerCodes::TableColumns: {
738+
// external table name
739+
if (!WireFormat::SkipString(*input_)) {
740+
return false;
741+
}
742+
743+
// columns metadata
744+
if (!WireFormat::SkipString(*input_)) {
745+
return false;
746+
}
747+
continue;
748+
}
749+
750+
// No others expected.
751+
case ServerCodes::EndOfStream:
752+
case ServerCodes::ProfileEvents:
753+
default:
754+
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
755+
break;
756+
}
757+
}
758+
}
759+
651760
bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
652761
// Additional information about block.
653762
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -923,7 +1032,6 @@ void Client::Impl::FinalizeQuery() {
9231032
output_->Flush();
9241033
}
9251034

926-
9271035
void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
9281036
// Additional information about block.
9291037
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1063,7 +1171,7 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10631171
}
10641172
}
10651173
}
1066-
// Connectiong with current_endpoint_ are broken.
1174+
// Connections with current_endpoint_ are broken.
10671175
// Trying to establish with the another one from the list.
10681176
size_t connection_attempts_count = GetConnectionAttempts();
10691177
for (size_t i = 0; i < connection_attempts_count;)
@@ -1085,6 +1193,34 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
10851193
}
10861194
}
10871195

1196+
Block Client::Impl::BeginInsert(Query query) {
1197+
if (inserting) {
1198+
throw ProtocolError("cannot execute query while inserting");
1199+
}
1200+
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
1201+
1202+
if (options_.ping_before_query) {
1203+
RetryGuard([this]() { Ping(); });
1204+
}
1205+
1206+
// Create a callback to extract the block with the proper query columns.
1207+
Block block;
1208+
query.OnData([&block](const Block& b) {
1209+
block = std::move(b);
1210+
return true;
1211+
});
1212+
1213+
SendQuery(query.GetText());
1214+
1215+
// Receive data packet but keep the query/connection open.
1216+
if (!ReceivePreparePackets()) {
1217+
throw std::runtime_error("fail to receive data packet");
1218+
}
1219+
1220+
inserting = true;
1221+
return block;
1222+
}
1223+
10881224
Client::Client(const ClientOptions& opts)
10891225
: options_(opts)
10901226
, impl_(new Impl(opts))
@@ -1149,6 +1285,27 @@ void Client::Insert(const std::string& table_name, const std::string& query_id,
11491285
impl_->Insert(table_name, query_id, block);
11501286
}
11511287

1288+
Block Client::BeginInsert(const std::string& query) {
1289+
return impl_->BeginInsert(Query(query));
1290+
}
1291+
1292+
Block Client::BeginInsert(const std::string& query, const std::string& query_id) {
1293+
return impl_->BeginInsert(Query(query, query_id));
1294+
}
1295+
1296+
void Client::InsertData(const Block& block) {
1297+
impl_->InsertData(block);
1298+
}
1299+
1300+
void Client::EndInsert(const Block& block) {
1301+
impl_->InsertData(block);
1302+
impl_->EndInsert();
1303+
}
1304+
1305+
void Client::EndInsert() {
1306+
impl_->EndInsert();
1307+
}
1308+
11521309
void Client::Ping() {
11531310
impl_->Ping();
11541311
}

clickhouse/client.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,17 @@ class Client {
273273
void Insert(const std::string& table_name, const Block& block);
274274
void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
275275

276+
/// Start an \p INSERT statement, insert batches of data, then finish the insert.
277+
Block BeginInsert(const std::string& query);
278+
Block BeginInsert(const std::string& query, const std::string& query_id);
279+
280+
/// Insert data using a \p block returned by \p BeginInsert.
281+
void InsertData(const Block& block);
282+
283+
/// End an \p INSERT session started by \p BeginInsert.
284+
void EndInsert();
285+
void EndInsert(const Block& block);
286+
276287
/// Ping server for aliveness.
277288
void Ping();
278289

0 commit comments

Comments
 (0)