Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
1922030
doc tweak
amunra Jun 3, 2025
f5dcf86
fixed a typo
amunra Jun 3, 2025
8464f73
consolidating error codes
amunra Jun 3, 2025
9d85550
fixed C++ formatting
amunra Jun 3, 2025
dee903c
fix to run against merged array branch
amunra Jun 3, 2025
0417764
fixed TestVsQuestDBMaster CI job
amunra Jun 3, 2025
19c47cb
add tests for `max_name_len` and fix one bug.
kafka1991 Jun 3, 2025
d8843d4
add c-major array layout c api.
kafka1991 Jun 4, 2025
56d171b
optimize strideArrayView performance.
kafka1991 Jun 4, 2025
680bdea
add temp benchmark code.
kafka1991 Jun 5, 2025
dee8aea
revert temp benchmark code.
kafka1991 Jun 5, 2025
28a900f
add flush_and_keep_with_flags
kafka1991 Jun 5, 2025
0a900e9
refactor `strideArrayView` to make iterator more efficient.
kafka1991 Jun 5, 2025
1a8df97
add 3d and 4d iterator tests.
kafka1991 Jun 6, 2025
94db703
fix protocol_version docs and remove warning.
kafka1991 Jun 6, 2025
3c4209f
fix protocol_version docs and remove warning.
kafka1991 Jun 6, 2025
d98a255
introduce low-level c++ api to ingest array.
kafka1991 Jun 6, 2025
892fe9d
introduce low-level c++ api to ingest array.
kafka1991 Jun 6, 2025
8fba187
add array support docs.
kafka1991 Jun 6, 2025
5152445
fix protocol_version docs and remove warning.
kafka1991 Jun 6, 2025
48389d5
change ssize_t to intptr_t
kafka1991 Jun 6, 2025
87e237c
add high level c++ api
kafka1991 Jun 6, 2025
28bd22c
add inline(always) for strideArrayView Iterator
kafka1991 Jun 11, 2025
6201662
Merge remote-tracking branch 'origin/main' into v5
amunra Jun 13, 2025
816698e
Merge remote-tracking branch 'origin/main' into v5
amunra Jun 17, 2025
4a656bc
C++ API improvements
amunra Jun 17, 2025
aa918b4
little refactor c api for array.
kafka1991 Jun 18, 2025
d2b1440
fix c tests
kafka1991 Jun 18, 2025
71373cd
C++ API customization point for arrays
amunra Jun 18, 2025
cf62b6b
cleanup
amunra Jun 18, 2025
c78ea1f
doc clean-up
amunra Jun 19, 2025
d6f2db8
Added C++ array customisation point example
amunra Jun 19, 2025
9f144e4
reformatted example code with clang-format
amunra Jun 19, 2025
415cf86
updated example sections for C and C++
amunra Jun 19, 2025
4781337
Fixed questdb-rs docs.rs not building
amunra Jun 19, 2025
268ce5d
examples summary for questdb-rs
amunra Jun 19, 2025
e03c2c2
dummy test version
amunra Jun 19, 2025
8d7b770
upgraded to maintained bump-my-version tool
amunra Jun 19, 2025
2586e55
fixed one that got missed.. which is why we have this stuff in the fi…
amunra Jun 19, 2025
91b7cf3
fixed up .bumpversion.toml
amunra Jun 19, 2025
8bb4eec
updated version to 5.0.0
amunra Jun 19, 2025
8d6ce96
fixed readme compile issue
amunra Jun 19, 2025
2c43600
Comment issue spotted by Copilot AI
amunra Jun 19, 2025
5af7515
.bumpversion.toml fixup
amunra Jun 20, 2025
80cb3f8
lower limit for MAX_ARRAY_BUFFER_SIZE
amunra Jun 26, 2025
860891b
cargo clippy --fix
amunra Jul 2, 2025
e9cace2
bumped named version supporting arrays from 8.4.0to 9.0.0
amunra Jul 2, 2025
75602c4
Refactoring and general improvements (#113)
amunra Jul 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
line_sender_c_example_array_elem_strides
examples/concat.c
examples/line_sender_c_example_array_elem_strides.c)
compile_example(
line_sender_c_example_array_c_major
examples/concat.c
examples/line_sender_c_example_array_c_major.c)
compile_example(
line_sender_c_example_auth
examples/concat.c
Expand Down Expand Up @@ -141,6 +145,9 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
compile_example(
line_sender_cpp_example_auth
examples/line_sender_cpp_example_auth.cpp)
compile_example(
line_sender_cpp_example_array_c_major
examples/line_sender_cpp_example_array_c_major.cpp)
compile_example(
line_sender_cpp_example_tls_ca
examples/line_sender_cpp_example_tls_ca.cpp)
Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ The library supports the following ILP protocol versions.

These protocol versions are supported over both HTTP and TCP.

If you use HTTP, the library will automatically detect the server's
latest supported protocol version and use it. If you use TCP, you can specify the
`protocol_version=N` parameter when constructing the `Sender` object.

| Version | Description | Server Comatibility |
* If you use HTTP and `protocol_version=auto` or unset, the library will
automatically detect the server's
latest supported protocol version and use it (recommended).
* If you use TCP, you can specify the
`protocol_version=N` parameter when constructing the `Sender` object
(TCP defaults to `protocol_version=1`).

| Version | Description | Server Compatibility |
| ------- | ------------------------------------------------------- | --------------------- |
| **1** | Over HTTP it's compatible InfluxDB Line Protocol (ILP) | All QuestDB versions |
| **2** | 64-bit floats sent as binary, adds n-dimentional arrays | 8.4.0+ (2023-10-30) |
Expand Down
176 changes: 164 additions & 12 deletions cpp_test/test_line_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ std::string& push_double_arr_to_buffer(
return buffer;
}

std::string& push_double_arr_to_buffer(
std::string& buffer,
std::vector<double>& data,
size_t rank,
uintptr_t* shape)
{
buffer.push_back(14);
buffer.push_back(10);
buffer.push_back(static_cast<char>(rank));
for (size_t i = 0; i < rank; ++i)
buffer.append(
reinterpret_cast<const char*>(&shape[i]), sizeof(uint32_t));
buffer.append(
reinterpret_cast<const char*>(data.data()),
data.size() * sizeof(double));
return buffer;
}

std::string& push_double_to_buffer(std::string& buffer, double data)
{
buffer.push_back(16);
Expand Down Expand Up @@ -199,15 +217,26 @@ TEST_CASE("line_sender c api basics")
reinterpret_cast<uint8_t*>(arr_data.data()),
sizeof(arr_data),
&err));
line_sender_column_name arr_name3 = QDB_COLUMN_NAME_LITERAL("a3");
CHECK(
::line_sender_buffer_column_f64_arr_c_major(
buffer,
arr_name3,
rank,
shape,
reinterpret_cast<uint8_t*>(arr_data.data()),
sizeof(arr_data),
&err));
CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err));
CHECK(server.recv() == 0);
CHECK(::line_sender_buffer_size(buffer) == 266);
CHECK(::line_sender_buffer_size(buffer) == 382);
CHECK(::line_sender_flush(sender, buffer, &err));
::line_sender_buffer_free(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1 f1=="};
push_double_to_buffer(expect, 0.5).append(",a1==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a3==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n");
CHECK(server.msgs(0) == expect);
}
Expand Down Expand Up @@ -262,8 +291,8 @@ TEST_CASE("line_sender c++ api basics")
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
// 3D array of doubles
size_t rank = 3;
std::vector<uintptr_t> shape{2, 3, 2};
std::vector<intptr_t> strides{48, 16, 8};
uintptr_t shape[] = {2, 3, 2};
intptr_t strides[] = {48, 16, 8};
std::array<double, 12> arr_data = {
48123.5,
2.4,
Expand All @@ -277,28 +306,131 @@ TEST_CASE("line_sender c++ api basics")
2.7,
48121.5,
4.3};
std::vector<intptr_t> elem_strides{6, 2, 1};
intptr_t elem_strides[] = {6, 2, 1};
buffer.table("test")
.symbol("t1", "v1")
.symbol("t2", "")
.column("f1", 0.5)
.column<true>("a1", rank, shape, strides, arr_data)
.column<false>("a2", rank, shape, elem_strides, arr_data)
.column<questdb::ingress::array_strides_size_mode::bytes>(
"a1", rank, shape, strides, arr_data)
.column<questdb::ingress::array_strides_size_mode::elems>(
"a2", rank, shape, elem_strides, arr_data)
.column("a3", rank, shape, arr_data)
.column<questdb::ingress::array_strides_size_mode::bytes>(
"a4", rank, shape, strides, arr_data.data(), arr_data.size())
.column<questdb::ingress::array_strides_size_mode::elems>(
"a5", rank, shape, elem_strides, arr_data.data(), arr_data.size())
.column("a6", rank, shape, arr_data.data(), arr_data.size())
.at(questdb::ingress::timestamp_nanos{10000000});

CHECK(server.recv() == 0);
CHECK(buffer.size() == 270);
CHECK(buffer.size() == 734);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= f1=="};
push_double_to_buffer(expect, 0.5).append(",a1==");
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
.append(",a2==");
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a3==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a4==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a5==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a6==");
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n");
CHECK(server.msgs(0) == expect);
}

TEST_CASE("line_sender array vector API")
{
questdb::ingress::test::mock_server server;
questdb::ingress::opts opts{
questdb::ingress::protocol::tcp,
std::string("127.0.0.1"),
std::to_string(server.port())};
opts.protocol_version(questdb::ingress::protocol_version::v2);
questdb::ingress::line_sender sender{opts};
CHECK_FALSE(sender.must_close());
server.accept();
CHECK(server.recv() == 0);
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
std::vector<double> arr_data = {
48123.5,
2.4,
48124.0,
1.8,
48124.5,
0.9,
48122.5,
3.1,
48122.0,
2.7,
48121.5,
4.3};
buffer.table("test")
.symbol("t1", "v1")
.symbol("t2", "")
.column("a1", arr_data)
.at(questdb::ingress::timestamp_nanos{10000000});

uintptr_t test_shape[] = {12};
CHECK(server.recv() == 0);
CHECK(buffer.size() == 132);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= a1=="};
push_double_arr_to_buffer(expect, arr_data, 1, test_shape)
.append(" 10000000\n");
CHECK(server.msgs(0) == expect);
}

#if __cplusplus >= 202002L
TEST_CASE("line_sender array span API")
{
questdb::ingress::test::mock_server server;
questdb::ingress::opts opts{
questdb::ingress::protocol::tcp,
std::string("127.0.0.1"),
std::to_string(server.port())};
opts.protocol_version(questdb::ingress::protocol_version::v2);
questdb::ingress::line_sender sender{opts};
CHECK_FALSE(sender.must_close());
server.accept();
CHECK(server.recv() == 0);

questdb::ingress::line_sender_buffer buffer = sender.new_buffer();

std::vector<double> arr_data = {
48123.5,
2.4,
48124.0,
1.8,
48124.5,
0.9,
48122.5,
3.1,
48122.0,
2.7,
48121.5,
4.3};
std::span<const double> data_span = arr_data;
buffer.table("test")
.symbol("t1", "v1")
.symbol("t2", "")
.column("a1", data_span.subspan(1, 8))
.at(questdb::ingress::timestamp_nanos{10000000});
std::vector<double> expect_arr_data = {
2.4, 48124.0, 1.8, 48124.5, 0.9, 48122.5, 3.1, 48122.0};

uintptr_t test_shape[] = {8};
CHECK(server.recv() == 0);
CHECK(buffer.size() == 100);
sender.flush(buffer);
CHECK(server.recv() == 1);
std::string expect{"test,t1=v1,t2= a1=="};
push_double_arr_to_buffer(expect, expect_arr_data, 1, test_shape)
.append(" 10000000\n");
CHECK(server.msgs(0) == expect);
}
#endif

TEST_CASE("test multiple lines")
{
questdb::ingress::test::mock_server server;
Expand Down Expand Up @@ -810,15 +942,35 @@ TEST_CASE("Test timestamp column.")
const auto exp = ss.str();
CHECK(buffer.peek() == exp);

sender.flush_and_keep(buffer);
try
{
sender.flush_and_keep_with_flags(buffer, true);
CHECK_MESSAGE(false, "Expected exception");
}
catch (const questdb::ingress::line_sender_error& se)
{
std::string msg{se.what()};
CHECK_MESSAGE(
msg.rfind(
"Transactional flushes are not supported for ILP over TCP",
0) == 0,
msg);
}
catch (...)
{
CHECK_MESSAGE(false, "Other exception raised.");
}

sender.flush_and_keep(buffer);
sender.flush_and_keep_with_flags(buffer, false);
CHECK(buffer.peek() == exp);

server.accept();
sender.close();

CHECK(server.recv() == 1);
CHECK(server.recv() == 2);
CHECK(server.msgs(0) == exp);
CHECK(server.msgs(1) == exp);
}

TEST_CASE("test timestamp_micros and timestamp_nanos::now()")
Expand Down
3 changes: 3 additions & 0 deletions examples/line_sender_c_example_array_byte_strides.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <string.h>
#include "concat.h"

/*
* QuestDB server version 8.4.0 or later is required for array support.
*/
static bool example(const char* host, const char* port)
{
line_sender_error* err = NULL;
Expand Down
99 changes: 99 additions & 0 deletions examples/line_sender_c_example_array_c_major.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include <questdb/ingress/line_sender.h>
#include <stdio.h>
#include <stdbool.h>
#include <string.h>
#include "concat.h"

/*
* QuestDB server version 8.4.0 or later is required for array support.
*/
static bool example(const char* host, const char* port)
{
line_sender_error* err = NULL;
line_sender* sender = NULL;
line_sender_buffer* buffer = NULL;
char* conf_str = concat("tcp::addr=", host, ":", port, ";protocol_version=2;");
if (!conf_str)
{
fprintf(stderr, "Could not concatenate configuration string.\n");
return false;
}

line_sender_utf8 conf_str_utf8 = {0, NULL};
if (!line_sender_utf8_init(
&conf_str_utf8, strlen(conf_str), conf_str, &err))
goto on_error;

sender = line_sender_from_conf(conf_str_utf8, &err);
if (!sender)
goto on_error;

free(conf_str);
conf_str = NULL;

buffer = line_sender_buffer_new_for_sender(sender);
line_sender_buffer_reserve(buffer, 64 * 1024);

line_sender_table_name table_name = QDB_TABLE_NAME_LITERAL("market_orders_c_major");
line_sender_column_name symbol_col = QDB_COLUMN_NAME_LITERAL("symbol");
line_sender_column_name book_col = QDB_COLUMN_NAME_LITERAL("order_book");

if (!line_sender_buffer_table(buffer, table_name, &err))
goto on_error;

line_sender_utf8 symbol_val = QDB_UTF8_LITERAL("BTC-USD");
if (!line_sender_buffer_symbol(buffer, symbol_col, symbol_val, &err))
goto on_error;

size_t array_rank = 3;
uintptr_t array_shape[] = {2, 3, 2};
double array_data[] = {
48123.5,
2.4,
48124.0,
1.8,
48124.5,
0.9,
48122.5,
3.1,
48122.0,
2.7,
48121.5,
4.3};

if (!line_sender_buffer_column_f64_arr_c_major(
buffer,
book_col,
array_rank,
array_shape,
(const uint8_t*)array_data,
sizeof(array_data),
&err))
goto on_error;

if (!line_sender_buffer_at_nanos(buffer, line_sender_now_nanos(), &err))
goto on_error;

if (!line_sender_flush(sender, buffer, &err))
goto on_error;

line_sender_close(sender);
return true;

on_error:;
size_t err_len = 0;
const char* err_msg = line_sender_error_msg(err, &err_len);
fprintf(stderr, "Error: %.*s\n", (int)err_len, err_msg);
free(conf_str);
line_sender_error_free(err);
line_sender_buffer_free(buffer);
line_sender_close(sender);
return false;
}

int main(int argc, const char* argv[])
{
const char* host = (argc >= 2) ? argv[1] : "localhost";
const char* port = (argc >= 3) ? argv[2] : "9009";
return !example(host, port);
}
Loading