Skip to content

Commit 9096533

Browse files
committed
feat: add decimal support to buffer
1 parent b3f08fa commit 9096533

File tree

8 files changed

+978
-7
lines changed

8 files changed

+978
-7
lines changed

questdb-rs-ffi/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ pub enum line_sender_error_code {
224224

225225
/// Line sender protocol version error.
226226
line_sender_error_protocol_version_error,
227+
228+
/// The supplied decimal is invalid.
229+
line_sender_error_invalid_decimal,
227230
}
228231

229232
impl From<ErrorCode> for line_sender_error_code {
@@ -252,6 +255,9 @@ impl From<ErrorCode> for line_sender_error_code {
252255
ErrorCode::ProtocolVersionError => {
253256
line_sender_error_code::line_sender_error_protocol_version_error
254257
}
258+
ErrorCode::InvalidDecimal => {
259+
line_sender_error_code::line_sender_error_invalid_decimal
260+
}
255261
}
256262
}
257263
}

questdb-rs/Cargo.toml

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,26 @@ itoa = "1.0"
2828
aws-lc-rs = { version = "1.13", optional = true }
2929
ring = { version = "0.17.14", optional = true }
3030
rustls-pki-types = "1.0.1"
31-
rustls = { version = "0.23.25", default-features = false, features = ["logging", "std", "tls12"] }
31+
rustls = { version = "0.23.25", default-features = false, features = [
32+
"logging",
33+
"std",
34+
"tls12",
35+
] }
3236
rustls-native-certs = { version = "0.8.1", optional = true }
3337
webpki-roots = { version = "1.0.1", default-features = false, optional = true }
3438
chrono = { version = "0.4.40", optional = true }
3539

3640
# We need to limit the `ureq` version to 3.0.x since we use
3741
# the `ureq::unversioned` module which does not respect semantic versioning.
38-
ureq = { version = "3.0.10, <3.1.0", default-features = false, features = ["_tls"], optional = true }
42+
ureq = { version = "3.0.10, <3.1.0", default-features = false, features = [
43+
"_tls",
44+
], optional = true }
3945
serde_json = { version = "1", optional = true }
4046
questdb-confstr = "0.1.1"
4147
rand = { version = "0.9.0", optional = true }
4248
ndarray = { version = "0.16", optional = true }
49+
rust_decimal = { version = "1.38.0", optional = true }
50+
bigdecimal = { version = "0.4.8", optional = true }
4351

4452
[target.'cfg(windows)'.dependencies]
4553
winapi = { version = "0.3.9", features = ["ws2def"] }
@@ -68,7 +76,13 @@ sync-sender = ["sync-sender-tcp", "sync-sender-http"]
6876
sync-sender-tcp = ["_sync-sender", "_sender-tcp", "dep:socket2"]
6977

7078
## Sync ILP/HTTP
71-
sync-sender-http = ["_sync-sender", "_sender-http", "dep:ureq", "dep:serde_json", "dep:rand"]
79+
sync-sender-http = [
80+
"_sync-sender",
81+
"_sender-http",
82+
"dep:ureq",
83+
"dep:serde_json",
84+
"dep:rand",
85+
]
7286

7387
## Allow use OS-provided root TLS certificates
7488
tls-native-certs = ["dep:rustls-native-certs"]
@@ -91,6 +105,12 @@ json_tests = []
91105
## Enable methods to create timestamp objects from chrono::DateTime objects.
92106
chrono_timestamp = ["chrono"]
93107

108+
## Enable serialization of rust_decimal::Decimal in ILP
109+
rust_decimal = ["dep:rust_decimal"]
110+
111+
## Enable serialization of bigdecimal::BigDecimal in ILP
112+
bigdecimal = ["dep:bigdecimal"]
113+
94114
# Hidden derived features, used in code to enable-disable code sections. Don't use directly.
95115
_sender-tcp = []
96116
_sender-http = []
@@ -109,7 +129,9 @@ almost-all-features = [
109129
"insecure-skip-verify",
110130
"json_tests",
111131
"chrono_timestamp",
112-
"ndarray"
132+
"ndarray",
133+
"rust_decimal",
134+
"bigdecimal",
113135
]
114136

115137
[[example]]
@@ -126,8 +148,8 @@ required-features = ["chrono_timestamp"]
126148

127149
[[example]]
128150
name = "http"
129-
required-features = ["sync-sender-http", "ndarray"]
151+
required-features = ["sync-sender-http", "ndarray", "rust_decimal"]
130152

131153
[[example]]
132154
name = "protocol_version"
133-
required-features = ["sync-sender-http", "ndarray"]
155+
required-features = ["sync-sender-http", "ndarray", "bigdecimal"]

questdb-rs/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ pub enum ErrorCode {
7878

7979
/// Validate protocol version error.
8080
ProtocolVersionError,
81+
82+
/// The supplied decimal is invalid.
83+
InvalidDecimal,
8184
}
8285

8386
/// An error that occurred when using QuestDB client library.

questdb-rs/src/ingress/buffer.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* limitations under the License.
2222
*
2323
******************************************************************************/
24+
use crate::ingress::decimal::DecimalSerializer;
2425
use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
2526
use crate::ingress::{
2627
ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampNanos,
@@ -71,7 +72,7 @@ where
7172
quoting_fn(output);
7273
}
7374

74-
fn must_escape_unquoted(c: u8) -> bool {
75+
pub fn must_escape_unquoted(c: u8) -> bool {
7576
matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
7677
}
7778

@@ -974,6 +975,22 @@ impl Buffer {
974975
Ok(self)
975976
}
976977

978+
/// Record a decimal value for the given column.
979+
/// ```
980+
pub fn column_decimal<'a, N, S>(&mut self, name: N, value: S) -> crate::Result<&mut Self>
981+
where
982+
N: TryInto<ColumnName<'a>>,
983+
S: DecimalSerializer,
984+
Error: From<N::Error>,
985+
{
986+
self.write_column_key(name)?;
987+
value.serialize(
988+
&mut self.output,
989+
self.protocol_version == ProtocolVersion::V2,
990+
)?;
991+
Ok(self)
992+
}
993+
977994
/// Record a multidimensional array value for the given column.
978995
///
979996
/// Supports arrays with up to [`MAX_ARRAY_DIMS`] dimensions. The array elements must

questdb-rs/src/ingress/decimal.rs

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*******************************************************************************
2+
* ___ _ ____ ____
3+
* / _ \ _ _ ___ ___| |_| _ \| __ )
4+
* | | | | | | |/ _ \/ __| __| | | | _ \
5+
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
6+
* \__\_\\__,_|\___||___/\__|____/|____/
7+
*
8+
* Copyright (c) 2014-2019 Appsicle
9+
* Copyright (c) 2019-2025 QuestDB
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*
23+
******************************************************************************/
24+
25+
use crate::{error, ingress::must_escape_unquoted, Result};
26+
27+
/// Trait for types that can be serialized as decimal values in the InfluxDB Line Protocol (ILP).
28+
///
29+
/// Decimal values can be serialized in two formats:
30+
///
31+
/// # Text Format
32+
/// The decimal is written as a string representation followed by a `'d'` suffix.
33+
///
34+
/// Example: `"123.45d"` or `"1.5e-3d"`
35+
///
36+
/// Implementers must:
37+
/// - Write the decimal's text representation to the output buffer
38+
/// - Append the `'d'` suffix
39+
/// - Ensure no ILP reserved characters are present (space, comma, equals, newline, carriage return, backslash)
40+
///
41+
/// # Binary Format
42+
/// A more compact binary encoding consisting of:
43+
///
44+
/// 1. Binary format marker: `'='` (0x3D)
45+
/// 2. Type identifier: [`DECIMAL_BINARY_FORMAT_TYPE`](crate::ingress::DECIMAL_BINARY_FORMAT_TYPE) byte
46+
/// 3. Scale: 1 byte (0-76 inclusive) - number of decimal places
47+
/// 4. Length: 1 byte - number of bytes in the unscaled value
48+
/// 5. Unscaled value: variable-length byte array in two's complement format, big-endian
49+
///
50+
/// Example: For decimal `123.45` with scale 2 and unscaled value 12345:
51+
/// ```text
52+
/// = [DECIMAL_BINARY_FORMAT_TYPE] [2] [2] [0x30] [0x39]
53+
/// ```
54+
///
55+
/// # Binary Format Notes
56+
/// - Binary format is only supported when `support_binary` is `true` (Protocol V2)
57+
/// - The unscaled value must be encoded in two's complement big-endian format
58+
/// - Maximum scale is 76
59+
/// - Length byte indicates how many bytes follow for the unscaled value
60+
pub trait DecimalSerializer {
61+
/// Serialize this value as a decimal in ILP format.
62+
///
63+
/// # Parameters
64+
///
65+
/// * `out` - The output buffer to write the serialized decimal to
66+
/// * `support_binary` - If `true`, binary format may be used (Protocol V2).
67+
/// If `false`, text format must be used (Protocol V1).
68+
fn serialize(self, out: &mut Vec<u8>, support_binary: bool) -> Result<()>;
69+
}
70+
71+
/// Implementation for string slices containing decimal representations.
72+
///
73+
/// This implementation always uses the text format, regardless of the `support_binary` parameter,
74+
/// as it cannot parse the string to extract scale and unscaled value needed for binary encoding.
75+
///
76+
/// # Format
77+
/// The string is validated and written as-is, followed by the 'd' suffix.
78+
///
79+
/// # Validation
80+
/// The implementation performs **partial validation only**:
81+
/// - Rejects ILP reserved characters (space, comma, equals, newline, carriage return, backslash)
82+
/// - Does NOT validate the actual decimal syntax (e.g., "not-a-number" would pass)
83+
///
84+
/// This is intentional: full parsing would add overhead. The QuestDB server performs complete
85+
/// validation and will reject malformed decimals.
86+
///
87+
/// # Examples
88+
/// - `"123.45"` → `"123.45d"`
89+
/// - `"1.5e-3"` → `"1.5e-3d"`
90+
/// - `"-0.001"` → `"-0.001d"`
91+
///
92+
/// # Errors
93+
/// Returns [`Error`] with [`ErrorCode::InvalidDecimal`](crate::error::ErrorCode::InvalidDecimal)
94+
/// if the string contains ILP reserved characters.
95+
impl DecimalSerializer for &str {
96+
fn serialize(self, out: &mut Vec<u8>, _support_binary: bool) -> Result<()> {
97+
// Pre-allocate space for the string content plus the 'd' suffix
98+
out.reserve(self.len() + 1);
99+
100+
// Validate and copy each byte, rejecting ILP reserved characters
101+
// that would break the protocol (space, comma, equals, newline, etc.)
102+
for b in self.bytes() {
103+
if must_escape_unquoted(b) {
104+
return Err(error::fmt!(
105+
InvalidDecimal,
106+
"Unexpected character {:?} in decimal str",
107+
b
108+
));
109+
}
110+
out.push(b);
111+
}
112+
113+
// Append the 'd' suffix to mark this as a decimal value
114+
out.push(b'd');
115+
116+
Ok(())
117+
}
118+
}
119+
120+
use crate::ingress::DECIMAL_BINARY_FORMAT_TYPE;
121+
122+
/// Helper to format decimal values directly to a byte buffer without heap allocation.
123+
#[cfg(any(feature = "rust_decimal", feature = "bigdecimal"))]
124+
struct DecimalWriter<'a> {
125+
buf: &'a mut Vec<u8>,
126+
}
127+
128+
#[cfg(any(feature = "rust_decimal", feature = "bigdecimal"))]
129+
impl<'a> std::fmt::Write for DecimalWriter<'a> {
130+
fn write_str(&mut self, s: &str) -> std::fmt::Result {
131+
self.buf.extend_from_slice(s.as_bytes());
132+
Ok(())
133+
}
134+
}
135+
136+
#[cfg(feature = "rust_decimal")]
137+
impl DecimalSerializer for &rust_decimal::Decimal {
138+
fn serialize(self, out: &mut Vec<u8>, support_binary: bool) -> Result<()> {
139+
if !support_binary {
140+
// Text format
141+
use std::fmt::Write;
142+
write!(DecimalWriter { buf: out }, "{}", self)
143+
.map_err(|_| error::fmt!(InvalidDecimal, "Failed to format decimal value"))?;
144+
out.push(b'd');
145+
return Ok(());
146+
}
147+
148+
// Binary format: '=' marker + type + scale + length + mantissa bytes
149+
out.push(b'=');
150+
out.push(DECIMAL_BINARY_FORMAT_TYPE);
151+
152+
// rust_decimal::Decimal guarantees:
153+
// - MAX_SCALE is 28, which is within QuestDB's limit of 76
154+
// - Mantissa is always 96 bits (12 bytes), never exceeds this size
155+
debug_assert!(rust_decimal::Decimal::MAX_SCALE <= 76);
156+
debug_assert!(
157+
rust_decimal::Decimal::MAX.mantissa() & 0x7FFF_FFFF_0000_0000_0000_0000_0000_0000i128
158+
== 0
159+
);
160+
161+
out.push(self.scale() as u8);
162+
163+
// We skip the upper 3 bytes (which are sign-extended) and write the lower 13 bytes
164+
let mantissa = self.mantissa();
165+
out.push(13);
166+
out.extend_from_slice(&mantissa.to_be_bytes()[3..]); // Skip upper 4 bytes, write lower 12
167+
168+
Ok(())
169+
}
170+
}
171+
172+
#[cfg(feature = "bigdecimal")]
173+
impl DecimalSerializer for &bigdecimal::BigDecimal {
174+
fn serialize(self, out: &mut Vec<u8>, support_binary: bool) -> Result<()> {
175+
if !support_binary {
176+
// Text format
177+
use std::fmt::Write;
178+
write!(DecimalWriter { buf: out }, "{}", self)
179+
.map_err(|_| error::fmt!(InvalidDecimal, "Failed to format decimal value"))?;
180+
out.push(b'd');
181+
return Ok(());
182+
}
183+
184+
// Binary format: '=' marker + type + scale + length + mantissa bytes
185+
out.push(b'=');
186+
out.push(DECIMAL_BINARY_FORMAT_TYPE);
187+
188+
let (unscaled, mut scale) = self.as_bigint_and_scale();
189+
if scale > 76 {
190+
return Err(error::fmt!(
191+
InvalidDecimal,
192+
"QuestDB ILP does not support scale greater than 76, got {}",
193+
scale
194+
));
195+
}
196+
197+
// QuestDB binary ILP doesn't support negative scale, we need to upscale the
198+
// unscaled value to be compliant
199+
let bytes = if scale < 0 {
200+
use bigdecimal::num_bigint;
201+
let unscaled =
202+
unscaled.into_owned() * num_bigint::BigInt::from(10).pow((-scale) as u32);
203+
scale = 0;
204+
unscaled.to_signed_bytes_be()
205+
} else {
206+
unscaled.to_signed_bytes_be()
207+
};
208+
209+
if bytes.len() > i8::MAX as usize {
210+
return Err(error::fmt!(
211+
InvalidDecimal,
212+
"QuestDB ILP does not support values greater than {} bytes, got {}",
213+
i8::MAX,
214+
bytes.len()
215+
));
216+
}
217+
218+
out.push(scale as u8);
219+
220+
// Write length byte and mantissa bytes
221+
out.push(bytes.len() as u8);
222+
out.extend_from_slice(&bytes);
223+
224+
Ok(())
225+
}
226+
}

questdb-rs/src/ingress/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ pub use buffer::*;
6262
mod sender;
6363
pub use sender::*;
6464

65+
mod decimal;
66+
pub use decimal::DecimalSerializer;
67+
6568
const MAX_NAME_LEN_DEFAULT: usize = 127;
6669

6770
/// The maximum allowed dimensions for arrays.
@@ -71,6 +74,7 @@ pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; // 1 << 28 - 1
7174

7275
pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
7376
pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
77+
pub(crate) const DECIMAL_BINARY_FORMAT_TYPE: u8 = 23;
7478

7579
/// The version of InfluxDB Line Protocol used to communicate with the server.
7680
#[derive(Debug, Copy, Clone, PartialEq)]

0 commit comments

Comments
 (0)