From 8a0e6dfa808b23b7ec6d02cf47ab97a1b81e35c8 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Mon, 13 Oct 2025 16:44:05 +0200 Subject: [PATCH 01/19] feat: Add support for decimal.Decimal object --- c-questdb-client | 2 +- src/questdb/dataframe.md | 55 +++++++++++++++++++++++++++++++++++++ src/questdb/dataframe.pxi | 45 ++++++++++++++++++++++++++++-- src/questdb/ingress.pyi | 14 ++++++++-- src/questdb/ingress.pyx | 8 +++++- src/questdb/line_sender.pxd | 12 ++++++-- 6 files changed, 127 insertions(+), 9 deletions(-) diff --git a/c-questdb-client b/c-questdb-client index 924bc390..20959036 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit 924bc3905388d24dbebb31dfe326fd64123cf52f +Subproject commit 2095903623fd976b7a991db1318873b1472b8a2e diff --git a/src/questdb/dataframe.md b/src/questdb/dataframe.md index 5d42d389..7b41e9b4 100644 --- a/src/questdb/dataframe.md +++ b/src/questdb/dataframe.md @@ -93,6 +93,7 @@ We need to extract: * 64-bit floats * UTF-8 string buffers * Nanosecond-precision UTC unix epoch 64-bit signed int timestamps +* decimals ```python import pandas as pd @@ -100,6 +101,60 @@ import pyarrow as pa import datetime as dt ``` +### Decimals + +Decimals aren't natively supported by pandas nor numpy, they use the `decimal.Decimal` objects. + +#### Pandas + +Decimals stored as Python objects in an 'object' dtype column. + +```python +>>> df = pd.DataFrame({ 'decimals': [Decimal('123.456')] }) +>>> df.dtypes +decimals object +dtype: object +``` + +#### Numpy + +Similarly, numpy stores decimals as Python objects. + +```python +>>> arr = numpy.array([Decimal('123.456')]) +>>> arr +array([Decimal('123.456')], dtype=object) +``` + +#### PyArrow + +PyArrow provides native decimal support with configurable precision and scale. +The data is stored in a fixed-width binary format. + +```python +import pyarrow as pa +from decimal import Decimal + +# Create decimal array: decimal128(precision, scale) +# precision = total digits, scale = digits after decimal point +decimal_array = pa.array( + [Decimal('123.456'), Decimal('789.012'), Decimal('-456.789'), None], + type=pa.decimal128(10, 3) # 10 total digits, 3 after decimal +) + +# Use in DataFrame with ArrowDtype +df = pd.DataFrame({ + 'prices': pd.array( + [Decimal('123.45'), Decimal('678.90'), None], + dtype=pd.ArrowDtype(pa.decimal128(10, 2)) + ) +}) +``` + +Notes: +- 4 datatypes: `decimal32`, `decimal64`, `decimal128` and `decimal256` +- Nulls are supported via Arrow's validity bitmap + ### Booleans ```python diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 1dffc643..7d36f5b4 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -1,5 +1,7 @@ # See: dataframe.md for technical overview. +from decimal import Decimal + # Auto-flush settings. # The individual `interval`, `row_count` and `byte_count` # settings are set to `-1` when disabled. @@ -73,7 +75,8 @@ cdef enum col_target_t: col_target_column_str = 6 col_target_column_ts = 7 col_target_column_arr_f64 = 8 - col_target_at = 9 + col_target_column_decimal = 9 + col_target_at = 10 cdef dict _TARGET_NAMES = { @@ -86,6 +89,7 @@ cdef dict _TARGET_NAMES = { col_target_t.col_target_column_str: "string", col_target_t.col_target_column_ts: "timestamp", col_target_t.col_target_column_arr_f64: "array", + col_target_t.col_target_column_decimal: "decimal", col_target_t.col_target_at: "designated timestamp", } @@ -127,6 +131,7 @@ cdef enum col_source_t: col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 + col_source_decimal_pyobj = 701100 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: @@ -149,6 +154,7 @@ cdef dict _PYOBJ_SOURCE_DESCR = { col_source_t.col_source_int_pyobj: "int", col_source_t.col_source_float_pyobj: "float", col_source_t.col_source_str_pyobj: "str", + col_source_t.col_source_decimal_pyobj: "Decimal", } @@ -218,6 +224,9 @@ cdef dict _TARGET_TO_SOURCES = { col_target_t.col_target_column_arr_f64: { col_source_t.col_source_arr_f64_numpyobj, }, + col_target_t.col_target_column_decimal: { + col_source_t.col_source_decimal_pyobj, + }, col_target_t.col_target_at: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, @@ -233,7 +242,8 @@ cdef tuple _FIELD_TARGETS = ( col_target_t.col_target_column_f64, col_target_t.col_target_column_str, col_target_t.col_target_column_ts, - col_target_t.col_target_column_arr_f64) + col_target_t.col_target_column_arr_f64, + col_target_t.col_target_column_decimal) # Targets that map directly from a meta target. @@ -358,6 +368,9 @@ cdef enum col_dispatch_code_t: col_dispatch_code_column_arr_f64__arr_f64_numpyobj = \ col_target_t.col_target_column_arr_f64 + col_source_t.col_source_arr_f64_numpyobj + col_dispatch_code_column_decimal__decimal_pyobj = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal_pyobj + # Int values in order for sorting (as needed for API's sequential coupling). cdef enum meta_target_t: @@ -971,6 +984,8 @@ cdef void_int _dataframe_series_sniff_pyobj( 'Unsupported object column containing bytes.' + 'If this is a string column, decode it first. ' + 'See: https://stackoverflow.com/questions/40389764/') + elif isinstance(obj, Decimal): + col.setup.source = col_source_t.col_source_decimal_pyobj else: raise IngressError( IngressErrorCode.BadDataFrame, @@ -1086,6 +1101,11 @@ cdef void_int _dataframe_resolve_source_and_buffers( _dataframe_series_as_arrow(pandas_col, col) elif isinstance(dtype, _NUMPY_OBJECT): _dataframe_series_sniff_pyobj(pandas_col, col) + elif isinstance(dtype, _PANDAS.ArrowDtype): + raise IngressError( + IngressErrorCode.BadDataFrame, + 'blabla' + ) else: raise IngressError( IngressErrorCode.BadDataFrame, @@ -2089,6 +2109,25 @@ cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj( &err): raise c_err_to_py(err) + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col) except -1: + cdef line_sender_error* err = NULL + cdef PyObject** access = col.cursor.chunk.buffers[1] + cdef PyObject* cell = access[col.cursor.offset] + cdef line_sender_utf8 value_utf8 + + if _dataframe_is_null_pyobj(cell): + pass + else: + decimal_str = str(cell) + str_to_utf8(b, decimal_str, &value_utf8) + if not line_sender_buffer_column_decimal_str(ls_buf, col.name, value_utf8, &err): + raise c_err_to_py(err) + + cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, @@ -2247,6 +2286,8 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_arr_f64__arr_f64_numpyobj: _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj: + _dataframe_serialize_cell_column_decimal__decimal_pyobj(ls_buf, b, col) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow: _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy: diff --git a/src/questdb/ingress.pyi b/src/questdb/ingress.pyi index 855e5f36..7989a79a 100644 --- a/src/questdb/ingress.pyi +++ b/src/questdb/ingress.pyi @@ -40,6 +40,7 @@ from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd +from decimal import Decimal class IngressErrorCode(Enum): """Category of Error.""" @@ -57,6 +58,7 @@ class IngressErrorCode(Enum): ConfigError = ... ArrayError = ... ProtocolVersionError = ... + DecimalError = ... BadDataFrame = ... @@ -202,7 +204,7 @@ class SenderTransaction: *, symbols: Optional[Dict[str, Optional[str]]] = None, columns: Optional[ - Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] + Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray, Decimal]] ] = None, at: Union[ServerTimestampType, TimestampNanos, datetime], ) -> SenderTransaction: @@ -381,7 +383,7 @@ class Buffer: *, symbols: Optional[Dict[str, Optional[str]]] = None, columns: Optional[ - Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] + Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray, Decimal]] ] = None, at: Union[ServerTimestampType, TimestampNanos, datetime], ) -> Buffer: @@ -402,7 +404,8 @@ class Buffer: 'col5': TimestampMicros(123456789), 'col6': datetime(2019, 1, 1, 12, 0, 0), 'col7': np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]), - 'col8': None}, + 'col8': None, + 'col9': Decimal('123.456')}, at=TimestampNanos(123456789)) # Only symbols specified. Designated timestamp assigned by the db. @@ -449,6 +452,8 @@ class Buffer: - `ARRAY `_ * - ``datetime.datetime`` and ``TimestampMicros`` - `TIMESTAMP `_ + * - ``Decimal`` + - `DECIMAL `_ * - ``None`` - *Column is skipped and not serialized.* @@ -701,6 +706,9 @@ class Buffer: * - ``'datetime64[ns, tz]'`` - Y - ``TIMESTAMP`` **ζ** + * - ``'object'`` (``Decimal`` objects) + - Y (``NaN``) + - ``DECIMAL`` .. note:: diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 0c620f17..bbff6091 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -142,7 +142,8 @@ class IngressErrorCode(Enum): ConfigError = line_sender_error_config_error ArrayError = line_sender_error_array_error ProtocolVersionError = line_sender_error_protocol_version_error - BadDataFrame = line_sender_error_protocol_version_error + 1 + DecimalError = line_sender_error_invalid_decimal + BadDataFrame = line_sender_error_invalid_decimal + 1 def __str__(self) -> str: """Return the name of the enum.""" @@ -188,6 +189,8 @@ cdef inline object c_err_code_to_py(line_sender_error_code code): return IngressErrorCode.ArrayError elif code == line_sender_error_protocol_version_error: return IngressErrorCode.ProtocolVersionError + elif code == line_sender_error_invalid_decimal: + return IngressErrorCode.DecimalError else: raise ValueError('Internal error converting error code.') @@ -1442,6 +1445,9 @@ cdef class Buffer: * - ``'datetime64[ns, tz]'`` - Y - ``TIMESTAMP`` **ζ** + * - ``'object'`` (``Decimal`` objects) + - Y (``NaN``) + - ``DECIMAL`` .. note:: diff --git a/src/questdb/line_sender.pxd b/src/questdb/line_sender.pxd index 2b00404e..1bb154e5 100644 --- a/src/questdb/line_sender.pxd +++ b/src/questdb/line_sender.pxd @@ -40,8 +40,9 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error_http_not_supported, line_sender_error_server_flush_error, line_sender_error_config_error, - line_sender_error_array_error - line_sender_error_protocol_version_error + line_sender_error_array_error, + line_sender_error_protocol_version_error, + line_sender_error_invalid_decimal cdef enum line_sender_protocol: line_sender_protocol_tcp, @@ -263,6 +264,13 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + bint line_sender_buffer_column_decimal_str( + line_sender_buffer* buffer, + line_sender_column_name name, + line_sender_utf8 value, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_buffer_at_nanos( line_sender_buffer* buffer, int64_t epoch_nanos, From 4826301e1d4c64e10335cf1bab47163866a62755 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Wed, 15 Oct 2025 17:50:31 +0200 Subject: [PATCH 02/19] feat: add support to arrow's decimals --- c-questdb-client | 2 +- src/questdb/dataframe.pxi | 164 ++++++++++++++++++++++++++++++++++-- src/questdb/ingress.pyx | 4 +- src/questdb/line_sender.pxd | 12 ++- 4 files changed, 170 insertions(+), 12 deletions(-) diff --git a/c-questdb-client b/c-questdb-client index 20959036..fe7cceb4 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit 2095903623fd976b7a991db1318873b1472b8a2e +Subproject commit fe7cceb4c88aa092b98c90faa9dbd7e17ac6dd85 diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 7d36f5b4..a7f3f17d 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -53,6 +53,21 @@ cdef bint should_auto_flush( return False +cdef inline uint32_t bswap32(uint32_t value): + return (((value & 0xFF000000u) >> 24u) | + ((value & 0x00FF0000u) >> 8u) | + ((value & 0x0000FF00u) << 8u) | + ((value & 0x000000FFu) << 24u)) + +cdef inline uint64_t bswap64(uint64_t value): + return (((value & 0xFF00000000000000u) >> 56u) | + ((value & 0x00FF000000000000u) >> 40u) | + ((value & 0x0000FF0000000000u) >> 24u) | + ((value & 0x000000FF00000000u) >> 8u) | + ((value & 0x00000000FF000000u) << 8u) | + ((value & 0x0000000000FF0000u) << 24u) | + ((value & 0x000000000000FF00u) << 40u) | + ((value & 0x00000000000000FFu) << 56u)) cdef struct col_chunks_t: size_t n_chunks @@ -132,6 +147,10 @@ cdef enum col_source_t: col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 col_source_decimal_pyobj = 701100 + col_source_decimal32_arrow = 702100 + col_source_decimal64_arrow = 703100 + col_source_decimal128_arrow = 704100 + col_source_decimal256_arrow = 705100 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: @@ -226,6 +245,10 @@ cdef dict _TARGET_TO_SOURCES = { }, col_target_t.col_target_column_decimal: { col_source_t.col_source_decimal_pyobj, + col_source_t.col_source_decimal32_arrow, + col_source_t.col_source_decimal64_arrow, + col_source_t.col_source_decimal128_arrow, + col_source_t.col_source_decimal256_arrow, }, col_target_t.col_target_at: { col_source_t.col_source_dt64ns_numpy, @@ -370,6 +393,14 @@ cdef enum col_dispatch_code_t: col_dispatch_code_column_decimal__decimal_pyobj = \ col_target_t.col_target_column_decimal + col_source_t.col_source_decimal_pyobj + col_dispatch_code_column_decimal__decimal32_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal32_arrow + col_dispatch_code_column_decimal__decimal64_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal64_arrow + col_dispatch_code_column_decimal__decimal128_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal128_arrow + col_dispatch_code_column_decimal__decimal256_arrow = \ + col_target_t.col_target_column_decimal + col_source_t.col_source_decimal256_arrow # Int values in order for sorting (as needed for API's sequential coupling). @@ -395,6 +426,7 @@ cdef struct col_t: line_sender_column_name name col_cursor_t cursor col_setup_t* setup # Grouping to reduce size of struct. + uint8_t scale # For arrow decimal types only, else 0. cdef void col_t_release(col_t* col) noexcept: @@ -918,6 +950,30 @@ cdef void_int _dataframe_category_series_as_arrow( 'Expected a category of strings, ' + f'got a category of {pandas_col.series.dtype.categories.dtype}.') +cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrowtype, col_t *col) except -1: + _dataframe_series_as_arrow(pandas_col, col) + if arrowtype.id == _PYARROW.lib.Type_DECIMAL32: + col.setup.source = col_source_t.col_source_decimal32_arrow + elif arrowtype.id == _PYARROW.lib.Type_DECIMAL64: + col.setup.source = col_source_t.col_source_decimal64_arrow + elif arrowtype.id == _PYARROW.lib.Type_DECIMAL128: + col.setup.source = col_source_t.col_source_decimal128_arrow + elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256: + col.setup.source = col_source_t.col_source_decimal256_arrow + else: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Unsupported arrow type {arrowtype} for column {pandas_col.name!r}. ' + + 'Raise an issue if you think it should be supported: ' + + 'https://github.com/questdb/py-questdb-client/issues.') + if arrowtype.scale < 0 or arrowtype.scale > 76: + raise IngressError( + IngressErrorCode.BadDataFrame, + f'Bad column {pandas_col.name!r}: ' + + f'Unsupported decimal scale {arrowtype.scale}: ' + + 'Must be in the range 0 to 76 inclusive.') + col.scale = arrowtype.scale + return 0 cdef inline bint _dataframe_is_float_nan(PyObject* obj) noexcept: return PyFloat_CheckExact(obj) and isnan(PyFloat_AS_DOUBLE(obj)) @@ -1102,10 +1158,7 @@ cdef void_int _dataframe_resolve_source_and_buffers( elif isinstance(dtype, _NUMPY_OBJECT): _dataframe_series_sniff_pyobj(pandas_col, col) elif isinstance(dtype, _PANDAS.ArrowDtype): - raise IngressError( - IngressErrorCode.BadDataFrame, - 'blabla' - ) + _dataframe_series_resolve_arrow(pandas_col, dtype.pyarrow_dtype, col) else: raise IngressError( IngressErrorCode.BadDataFrame, @@ -1113,7 +1166,6 @@ cdef void_int _dataframe_resolve_source_and_buffers( 'Raise an issue if you think it should be supported: ' + 'https://github.com/questdb/py-questdb-client/issues.') - cdef void_int _dataframe_resolve_target( PandasCol pandas_col, col_t* col) except -1: cdef col_target_t target @@ -1245,7 +1297,8 @@ cdef void_int _dataframe_resolve_args( cdef inline bint _dataframe_arrow_get_bool(col_cursor_t* cursor) noexcept nogil: return ( (cursor.chunk.buffers[1])[cursor.offset // 8] & - (1 << (cursor.offset % 8))) + (1 << (cursor.offset % 8)) + ) cdef inline bint _dataframe_arrow_is_valid(col_cursor_t* cursor) noexcept nogil: @@ -1254,7 +1307,9 @@ cdef inline bint _dataframe_arrow_is_valid(col_cursor_t* cursor) noexcept nogil: cursor.chunk.null_count == 0 or ( (cursor.chunk.buffers[0])[cursor.offset // 8] & - (1 << (cursor.offset % 8)))) + (1 << (cursor.offset % 8)) + ) + ) cdef inline void _dataframe_arrow_get_cat_value( @@ -2124,8 +2179,93 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( else: decimal_str = str(cell) str_to_utf8(b, decimal_str, &value_utf8) - if not line_sender_buffer_column_decimal_str(ls_buf, col.name, value_utf8, &err): + if not line_sender_buffer_column_dec_str(ls_buf, col.name, value_utf8, &err): + raise c_err_to_py(err) + + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal32_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint32_t value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + value = bswap32((col.cursor.chunk.buffers[1])[col.cursor.offset]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, &value, sizeof(value), &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal64_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint64_t value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + value = bswap64((col.cursor.chunk.buffers[1])[col.cursor.offset]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, &value, sizeof(value), &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal128_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint64_t *cell + cdef uint64_t[2] value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + cell = &(col.cursor.chunk.buffers[1])[col.cursor.offset << 1] + value[0] = bswap64(cell[1]) + value[1] = bswap64(cell[0]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, value, 16, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + return 0 + +cdef void_int _dataframe_serialize_cell_column_decimal__decimal256_arrow( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef line_sender_error* err = NULL + cdef bint valid = _dataframe_arrow_is_valid(&col.cursor) + cdef uint64_t *cell + cdef uint64_t[4] value + if not valid: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) + else: + cell = &(col.cursor.chunk.buffers[1])[col.cursor.offset << 2] + value[0] = bswap64(cell[3]) + value[1] = bswap64(cell[2]) + value[2] = bswap64(cell[1]) + value[3] = bswap64(cell[0]) + if not line_sender_buffer_column_dec(ls_buf, col.name, col.scale, value, 32, &err): + _ensure_has_gil(gs) raise c_err_to_py(err) + return 0 cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( @@ -2288,6 +2428,14 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col) elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj: _dataframe_serialize_cell_column_decimal__decimal_pyobj(ls_buf, b, col) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal32_arrow: + _dataframe_serialize_cell_column_decimal__decimal32_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal64_arrow: + _dataframe_serialize_cell_column_decimal__decimal64_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal128_arrow: + _dataframe_serialize_cell_column_decimal__decimal128_arrow(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal256_arrow: + _dataframe_serialize_cell_column_decimal__decimal256_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow: _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy: diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index bbff6091..d2ea427f 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -827,10 +827,10 @@ cdef class Buffer: :param int init_buf_size: Initial capacity of the buffer in bytes. :param int max_name_len: Maximum length of a table or column name. """ - if protocol_version not in (1, 2): + if protocol_version not in range(1, 4): raise IngressError( IngressErrorCode.ProtocolVersionError, - 'Invalid protocol version. Supported versions are 1 and 2.') + 'Invalid protocol version. Supported versions are 1-3.') self._cinit_impl(protocol_version, init_buf_size, max_name_len) cdef inline _cinit_impl(self, line_sender_protocol_version version, size_t init_buf_size, size_t max_name_len): diff --git a/src/questdb/line_sender.pxd b/src/questdb/line_sender.pxd index 1bb154e5..059c5687 100644 --- a/src/questdb/line_sender.pxd +++ b/src/questdb/line_sender.pxd @@ -53,6 +53,7 @@ cdef extern from "questdb/ingress/line_sender.h": cdef enum line_sender_protocol_version: line_sender_protocol_version_1 = 1, line_sender_protocol_version_2 = 2, + line_sender_protocol_version_3 = 3, cdef enum line_sender_ca: line_sender_ca_webpki_roots, @@ -264,13 +265,22 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil - bint line_sender_buffer_column_decimal_str( + bint line_sender_buffer_column_dec_str( line_sender_buffer* buffer, line_sender_column_name name, line_sender_utf8 value, line_sender_error** err_out ) noexcept nogil + bint line_sender_buffer_column_dec( + line_sender_buffer* buffer, + line_sender_column_name name, + const unsigned int scale, + const uint8_t* data, + size_t data_len, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_buffer_at_nanos( line_sender_buffer* buffer, int64_t epoch_nanos, From 7aae3a82d9512b96fcc0299e3015b87a51c29619 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Thu, 16 Oct 2025 09:28:10 +0200 Subject: [PATCH 03/19] tests: add serialization tests for decimals --- src/questdb/ingress.pyx | 6 ++- test/test.py | 11 ++-- test/test_dataframe.py | 116 ++++++++++++++++++++++++++++++++-------- 3 files changed, 106 insertions(+), 27 deletions(-) diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index d2ea427f..c6303411 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -1942,10 +1942,14 @@ cdef class Sender: if not line_sender_opts_protocol_version( self._opts, line_sender_protocol_version_2, &err): raise c_err_to_py(err) + elif (protocol_version == 3) or (protocol_version == '3'): + if not line_sender_opts_protocol_version( + self._opts, line_sender_protocol_version_3, &err): + raise c_err_to_py(err) else: raise IngressError( IngressErrorCode.ConfigError, - '"protocol_version" must be None, "auto", 1 or 2' + + '"protocol_version" must be None, "auto", 1-3' + f' not {protocol_version!r}') if auth_timeout is not None: diff --git a/test/test.py b/test/test.py index cb0267aa..759df795 100755 --- a/test/test.py +++ b/test/test.py @@ -42,6 +42,7 @@ if pd is not None: from test_dataframe import TestPandasProtocolVersionV1 from test_dataframe import TestPandasProtocolVersionV2 + from test_dataframe import TestPandasProtocolVersionV3 else: class TestNoPandas(unittest.TestCase): def test_no_pandas(self): @@ -393,8 +394,8 @@ def test_bad_protocol_versions(self): '0', 'automatic', 0, - 3, - '3', + 4, + '4', 1.5, '1.5', '2.0', @@ -403,8 +404,8 @@ def test_bad_protocol_versions(self): for version in bad_versions: with self.assertRaisesRegex( qi.IngressError, - '"protocol_version" must be None, "auto", 1 or 2'): - self.builder('tcp', '127.0.0.1', 12345, protocol_version='3') + '"protocol_version" must be None, "auto", 1-3'): + self.builder('tcp', '127.0.0.1', 12345, protocol_version=version) self.fail('Should not have reached here - constructing sender') bad_versions.append(None) @@ -1443,7 +1444,7 @@ class TestBufferProtocolVersionV1(TestBases.TestBuffer): class TestBufferProtocolVersionV2(TestBases.TestBuffer): - name = 'protocol version 1' + name = 'protocol version 2' version = 2 diff --git a/test/test_dataframe.py b/test/test_dataframe.py index df1822e2..4890641b 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -8,6 +8,7 @@ import functools import tempfile import pathlib +from decimal import Decimal from test_tools import _float_binary_bytes, _array_binary_bytes BROKEN_TIMEZONES = True @@ -80,6 +81,23 @@ def _dataframe(protocol_version: int, *args, **kwargs): pd.Timestamp('20180312')]} ) +DECIMAL_BINARY_FORMAT_TYPE = 23 + +def _decimal_from_unscaled(unscaled, scale: int): + if unscaled is None: + return None + return Decimal(unscaled).scaleb(-scale) + + +def _decimal_binary_payload(unscaled, scale: int, byte_width: int) -> bytes: + if unscaled is None: + return b'=' + bytes([DECIMAL_BINARY_FORMAT_TYPE, 0, 0]) + return ( + b'=' + + bytes([DECIMAL_BINARY_FORMAT_TYPE, scale, byte_width]) + + int(unscaled).to_bytes(byte_width, byteorder='big', signed=True) + ) + def with_tmp_dir(func): @functools.wraps(func) @@ -524,6 +542,57 @@ def test_f64_numpy_col(self): b'tbl1 a' + _float_binary_bytes(float('NAN'), self.version == 1) + b'\n' + b'tbl1 a' + _float_binary_bytes(1.7976931348623157e308, self.version == 1) + b'\n') + def test_decimal_pyobj_column(self): + df = pd.DataFrame({'dec': [Decimal('123.45'), Decimal('-0.5')]}) + if self.version < 3: + with self.assertRaisesRegex( + qi.IngressError, + 'does not support the decimal datatype'): + _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + return + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + self.assertEqual( + buf.splitlines(), + [b'tbl dec=123.45d', b'tbl dec=-0.5d']) + + def test_decimal_arrow_columns(self): + if self.version < 3: + arr = pd.array( + [Decimal('1.23')], + dtype=pd.ArrowDtype(pa.decimal128(10, 2))) + df = pd.DataFrame({'dec': arr, 'count': [0]}) + with self.assertRaisesRegex( + qi.IngressError, + 'does not support the decimal datatype'): + _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + return + + arrow_cases = [ + (pa.decimal32(7, 2), [12345, -6789]), + (pa.decimal64(14, 4), [123456789, -987654321]), + (pa.decimal128(38, 6), [123456789012345, -987654321012345, None]), + (pa.decimal256(76, 10), [1234567890123456789012345, -987654321098765432109876, None]), + ] + + for arrow_type, unscaled_values in arrow_cases: + values = [_decimal_from_unscaled(unscaled, arrow_type.scale) for unscaled in unscaled_values] + arr = pd.array(values, dtype=pd.ArrowDtype(arrow_type)) + counts = list(range(len(values))) + df = pd.DataFrame({'dec': arr, 'count': counts}) + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + offset = 0 + prefix = b'tbl dec=' + for unscaled, count in zip(unscaled_values, counts): + suffix = f',count={count}i\n'.encode('ascii') + end = buf.index(suffix, offset) + line = buf[offset:end + len(suffix)] + self.assertTrue(line.startswith(prefix), line) + payload = line[len(prefix):len(line) - len(suffix)] if len(suffix) else line[len(prefix):] + expected_payload = _decimal_binary_payload(unscaled, arrow_type.scale, arrow_type.byte_width) + self.assertEqual(payload, expected_payload) + offset = end + len(suffix) + self.assertEqual(offset, len(buf)) + def test_u8_arrow_col(self): df = pd.DataFrame({ 'a': pd.Series([ @@ -839,12 +908,12 @@ def test_datetime64_numpy_col(self): buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) self.assertEqual( buf, - b'tbl1 a=1546300800000000t,b="a"\n' + - b'tbl1 a=1546300801000000t,b="b"\n' + - b'tbl1 a=1546300802000000t,b="c"\n' + - b'tbl1 a=1546300803000000t,b="d"\n' + - b'tbl1 a=1546300804000000t,b="e"\n' + - b'tbl1 a=1546300805000000t,b="f"\n' + + b'tbl1 a=1546300800000000000n,b="a"\n' + + b'tbl1 a=1546300801000000000n,b="b"\n' + + b'tbl1 a=1546300802000000000n,b="c"\n' + + b'tbl1 a=1546300803000000000n,b="d"\n' + + b'tbl1 a=1546300804000000000n,b="e"\n' + + b'tbl1 a=1546300805000000000n,b="f"\n' + b'tbl1 b="g"\n' + b'tbl1 b="h"\n' + b'tbl1 b="i"\n') @@ -856,9 +925,9 @@ def test_datetime64_numpy_col(self): buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) self.assertEqual( buf, - b'tbl1 a=0t\n' + - b'tbl1 a=1000000t\n' + - b'tbl1 a=2000000t\n') + b'tbl1 a=0n\n' + + b'tbl1 a=1000000000n\n' + + b'tbl1 a=2000000000n\n') def test_datetime64_tz_arrow_col(self): df = pd.DataFrame({ @@ -878,10 +947,10 @@ def test_datetime64_tz_arrow_col(self): self.assertEqual( buf, # Note how these are 5hr offset from `test_datetime64_numpy_col`. - b'tbl1,b=sym1 a=1546318800000000t\n' + - b'tbl1,b=sym2 a=1546318801000000t\n' + + b'tbl1,b=sym1 a=1546318800000000000n\n' + + b'tbl1,b=sym2 a=1546318801000000000n\n' + b'tbl1,b=sym3\n' + - b'tbl1,b=sym4 a=1546318803000000t\n') + b'tbl1,b=sym4 a=1546318803000000000n\n') # Not epoch 0. df = pd.DataFrame({ @@ -900,9 +969,9 @@ def test_datetime64_tz_arrow_col(self): self.assertEqual( buf, # Note how these are 5hr offset from `test_datetime64_numpy_col`. - b'tbl1,b=sym1 a=18000000000t\n' + - b'tbl1,b=sym2 a=18001000000t\n' + - b'tbl1,b=sym3 a=18002000000t\n') + b'tbl1,b=sym1 a=18000000000000n\n' + + b'tbl1,b=sym2 a=18001000000000n\n' + + b'tbl1,b=sym3 a=18002000000000n\n') # Actual epoch 0. df = pd.DataFrame({ @@ -920,9 +989,9 @@ def test_datetime64_tz_arrow_col(self): buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) self.assertEqual( buf, - b'tbl1,b=sym1 a=0t\n' + - b'tbl1,b=sym2 a=1000000t\n' + - b'tbl1,b=sym3 a=2000000t\n') + b'tbl1,b=sym1 a=0n\n' + + b'tbl1,b=sym2 a=1000000000n\n' + + b'tbl1,b=sym3 a=2000000000n\n') df2 = pd.DataFrame({ 'a': [ @@ -936,8 +1005,8 @@ def test_datetime64_tz_arrow_col(self): # Mostly, here assert that negative timestamps are allowed. self.assertIn( buf, - [b'tbl1,b=sym1 a=-2208970800000000t\n', - b'tbl1,b=sym1 a=-2208971040000000t\n']) + [b'tbl1,b=sym1 a=-2208970800000000000n\n', + b'tbl1,b=sym1 a=-2208971040000000000n\n']) def test_datetime64_numpy_at(self): df = pd.DataFrame({ @@ -1581,7 +1650,7 @@ def test_arrow_chunked_array(self): # need to, so - as for now - we just test that we raise a nice error. with self.assertRaisesRegex( qi.IngressError, - r"Unsupported dtype int16\[pyarrow\] for column 'a'.*github"): + r"Unsupported arrow type int16 for column 'a'.*github"): _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) @unittest.skipIf(not fastparquet, 'fastparquet not installed') @@ -1680,6 +1749,11 @@ class TestPandasProtocolVersionV2(TestPandasBase.TestPandas): version = 2 +class TestPandasProtocolVersionV3(TestPandasBase.TestPandas): + name = 'protocol version 3' + version = 3 + + if __name__ == '__main__': if os.environ.get('TEST_QUESTDB_PROFILE') == '1': import cProfile From 856c809a47cad37d8db89de9b300f159c120d399 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Thu, 16 Oct 2025 12:34:51 +0200 Subject: [PATCH 04/19] feat: uses binary format for decimal object --- src/questdb/dataframe.pxi | 39 +++++++++++++++--- src/questdb/mpdecimal_compat.h | 56 +++++++++++++++++++++++++ src/questdb/mpdecimal_compat.pxd | 71 ++++++++++++++++++++++++++++++++ test/test_dataframe.py | 53 ++++++++++++++++++++++-- 4 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 src/questdb/mpdecimal_compat.h create mode 100644 src/questdb/mpdecimal_compat.pxd diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index a7f3f17d..e26a2745 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -2,6 +2,9 @@ from decimal import Decimal +from cpython.bytes cimport PyBytes_AsString +from .mpdecimal_compat cimport decimal_pyobj_to_binary + # Auto-flush settings. # The individual `interval`, `row_count` and `byte_count` # settings are set to `-1` when disabled. @@ -2172,15 +2175,39 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( cdef line_sender_error* err = NULL cdef PyObject** access = col.cursor.chunk.buffers[1] cdef PyObject* cell = access[col.cursor.offset] - cdef line_sender_utf8 value_utf8 + cdef unsigned int scale = 0 + cdef object mantissa + cdef const uint8_t* mantissa_ptr + cdef Py_ssize_t mantissa_len if _dataframe_is_null_pyobj(cell): - pass - else: - decimal_str = str(cell) - str_to_utf8(b, decimal_str, &value_utf8) - if not line_sender_buffer_column_dec_str(ls_buf, col.name, value_utf8, &err): + return 0 + + # Convert the Python Decimal into (scale, mantissa) bytes; returns None for special values. + mantissa = decimal_pyobj_to_binary( + cell, + &scale, + IngressError, + IngressErrorCode.BadDataFrame) + if mantissa is None: + if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): raise c_err_to_py(err) + return 0 + + if len(mantissa) > 127: + raise IngressError( + IngressErrorCode.BadDataFrame, + 'Decimal mantissa too large; maximum supported size is 127 bytes.') + + mantissa_ptr = PyBytes_AsString(mantissa) + if mantissa_ptr is NULL: + raise MemoryError() + mantissa_len = PyBytes_GET_SIZE(mantissa) + + if not line_sender_buffer_column_dec(ls_buf, col.name, scale, mantissa_ptr, mantissa_len, &err): + raise c_err_to_py(err) + + return 0 cdef void_int _dataframe_serialize_cell_column_decimal__decimal32_arrow( diff --git a/src/questdb/mpdecimal_compat.h b/src/questdb/mpdecimal_compat.h new file mode 100644 index 00000000..9abdaafb --- /dev/null +++ b/src/questdb/mpdecimal_compat.h @@ -0,0 +1,56 @@ +#ifndef MPDECIMAL_COMPAT_H +#define MPDECIMAL_COMPAT_H + +#include +#include +#include + +/* Determine the limb type used by CPython's libmpdec build. */ +#if SIZE_MAX == UINT64_MAX +typedef uint64_t mpd_uint_t; +typedef int64_t mpd_ssize_t; +#define MPD_RADIX_CONST UINT64_C(10000000000000000000) /* 10**19 */ +#elif SIZE_MAX == UINT32_MAX +typedef uint32_t mpd_uint_t; +typedef int32_t mpd_ssize_t; +#define MPD_RADIX_CONST UINT32_C(1000000000) /* 10**9 */ +#else +#error "Unsupported platform: mpdecimal compatibility requires 32-bit or 64-bit size_t." +#endif + +typedef struct { + uint8_t flags; + mpd_ssize_t exp; + mpd_ssize_t digits; + mpd_ssize_t len; + mpd_ssize_t alloc; + mpd_uint_t* data; +} mpd_t; + +typedef struct { + PyObject_HEAD + Py_hash_t hash; + mpd_t dec; + mpd_uint_t data[4]; +} PyDecObject; + +static inline mpd_t* decimal_mpd(PyObject* obj) { + return &((PyDecObject*)obj)->dec; +} + +static inline mpd_uint_t* decimal_digits(PyObject* obj) { + PyDecObject* dec = (PyDecObject*)obj; + return dec->dec.data != NULL ? dec->dec.data : dec->data; +} + +enum { + MPD_FLAG_SIGN = 0x01, + MPD_FLAG_INF = 0x02, + MPD_FLAG_NAN = 0x04, + MPD_FLAG_SNAN = 0x08, + MPD_FLAG_SPECIAL_MASK = MPD_FLAG_INF | MPD_FLAG_NAN | MPD_FLAG_SNAN +}; + +static const mpd_uint_t MPD_RADIX = MPD_RADIX_CONST; + +#endif /* MPDECIMAL_COMPAT_H */ diff --git a/src/questdb/mpdecimal_compat.pxd b/src/questdb/mpdecimal_compat.pxd new file mode 100644 index 00000000..8a8b412e --- /dev/null +++ b/src/questdb/mpdecimal_compat.pxd @@ -0,0 +1,71 @@ +from libc.stdint cimport uint8_t +from libc.stddef cimport size_t +from cpython.object cimport PyObject + +# Mirror the subset of libmpdec types that CPython embeds in Decimal objects. +ctypedef size_t mpd_uint_t +ctypedef Py_ssize_t mpd_ssize_t + +cdef extern from "mpdecimal_compat.h": + ctypedef struct mpd_t: + uint8_t flags + mpd_ssize_t exp + mpd_ssize_t digits + mpd_ssize_t len + mpd_ssize_t alloc + mpd_uint_t* data + + mpd_t* decimal_mpd(PyObject* obj) + mpd_uint_t* decimal_digits(PyObject* obj) + const mpd_uint_t MPD_RADIX + const uint8_t MPD_FLAG_SIGN + const uint8_t MPD_FLAG_SPECIAL_MASK + +cdef inline object decimal_pyobj_to_binary( + PyObject* cell, + unsigned int* encoded_scale, + object ingress_error_cls, + object bad_dataframe_code) except *: + """Convert a Python ``Decimal`` to ILP binary components.""" + cdef mpd_t* mpd + cdef mpd_uint_t* digits_ptr + cdef unsigned long long flag_low + cdef Py_ssize_t idx + cdef Py_ssize_t scale_value + cdef object unscaled_obj + + mpd = decimal_mpd(cell) + + flag_low = mpd.flags & 0xFF + if (flag_low & MPD_FLAG_SPECIAL_MASK) != 0: + # NaN/Inf values propagate as ILP nulls (caller will emit empty payload). + encoded_scale[0] = 0 + return None + + digits_ptr = decimal_digits(cell) + + if mpd.len <= 0: + unscaled_obj = 0 + else: + unscaled_obj = digits_ptr[mpd.len - 1] + for idx in range(mpd.len - 2, -1, -1): + # Each limb stores MPD_RADIX (10^9 or 10^19) digits in little-endian order. + unscaled_obj = unscaled_obj * MPD_RADIX + digits_ptr[idx] + + if mpd.exp >= 0: + # Decimal ILP does not support negative scales; adjust the unscaled value instead. + if mpd.exp != 0: + unscaled_obj = unscaled_obj * (10 ** mpd.exp) + scale_value = 0 + else: + scale_value = -mpd.exp + if scale_value > 76: + raise ingress_error_cls( + bad_dataframe_code, + f'Decimal scale {scale_value} exceeds the maximum supported scale of 76') + + if (flag_low & MPD_FLAG_SIGN) != 0: + unscaled_obj = -unscaled_obj + + encoded_scale[0] = scale_value + return unscaled_obj.to_bytes((unscaled_obj.bit_length() + 7) // 8, byteorder='big', signed=True) diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 4890641b..50bef67a 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -83,6 +83,27 @@ def _dataframe(protocol_version: int, *args, **kwargs): DECIMAL_BINARY_FORMAT_TYPE = 23 + +def _decode_decimal_payload(line: bytes, prefix: bytes = b'tbl dec=') -> tuple[int, bytes]: + """Extract (scale, mantissa-bytes) from a serialized decimal line.""" + if not line.startswith(prefix): + raise AssertionError(f'Unexpected decimal prefix in line: {line!r}') + payload = line[len(prefix):] + if len(payload) < 4: + raise AssertionError(f'Invalid decimal payload length: {len(payload)}') + if payload[0] != ord('='): + raise AssertionError(f'Unexpected decimal type marker: {payload[0]}') + if payload[1] != DECIMAL_BINARY_FORMAT_TYPE: + raise AssertionError(f'Unexpected decimal format type: {payload[1]}') + scale = payload[2] + byte_width = payload[3] + mantissa = payload[4:] + if len(mantissa) != byte_width: + raise AssertionError( + f'Expected {byte_width} mantissa bytes, got {len(mantissa)}') + return scale, mantissa + + def _decimal_from_unscaled(unscaled, scale: int): if unscaled is None: return None @@ -551,9 +572,35 @@ def test_decimal_pyobj_column(self): _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) return buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) - self.assertEqual( - buf.splitlines(), - [b'tbl dec=123.45d', b'tbl dec=-0.5d']) + decoded = [_decode_decimal_payload(line) for line in buf.splitlines()] + expected = [Decimal('123.45'), Decimal('-0.5')] + self.assertEqual(len(decoded), len(expected)) + for (scale, mantissa), expected_value in zip(decoded, expected): + unscaled = int.from_bytes(mantissa, byteorder='big', signed=True) + self.assertEqual(Decimal(unscaled).scaleb(-scale), expected_value) + + def test_decimal_pyobj_trailing_zeros_and_integer(self): + if self.version < 3: + self.skipTest('decimal datatype requires ILP version 3 or later') + df = pd.DataFrame({'dec': [Decimal('1.2300'), Decimal('1000')]}) + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + decoded = [_decode_decimal_payload(line) for line in buf.splitlines()] + expected = [Decimal('1.23'), Decimal('1000')] + self.assertEqual(len(decoded), len(expected)) + for (scale, mantissa), expected_value in zip(decoded, expected): + unscaled = int.from_bytes(mantissa, byteorder='big', signed=True) + self.assertEqual(Decimal(unscaled).scaleb(-scale), expected_value) + + def test_decimal_pyobj_special_values(self): + if self.version < 3: + self.skipTest('decimal datatype requires ILP version 3 or later') + df = pd.DataFrame({'dec': [Decimal('NaN'), Decimal('Infinity'), Decimal('-Infinity')]}) + buf = _dataframe(self.version, df, table_name='tbl', at=qi.ServerTimestamp) + decoded = [_decode_decimal_payload(line) for line in buf.splitlines()] + self.assertEqual(len(decoded), 3) + for scale, mantissa in decoded: + self.assertEqual(scale, 0) + self.assertEqual(len(mantissa), 0) def test_decimal_arrow_columns(self): if self.version < 3: From 5772e0e15fca62b3bd826e6bedad08486d7e0e00 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 17 Oct 2025 15:31:29 +0200 Subject: [PATCH 05/19] fix: correct enum value for col_source_decimal_pyobj --- src/questdb/dataframe.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index e26a2745..c3fad59e 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -149,7 +149,7 @@ cdef enum col_source_t: col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 - col_source_decimal_pyobj = 701100 + col_source_decimal_pyobj = 701000 col_source_decimal32_arrow = 702100 col_source_decimal64_arrow = 703100 col_source_decimal128_arrow = 704100 From b62520a86b2c35d9d4cd4b4d1924541048f62ab6 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 09:07:49 +0100 Subject: [PATCH 06/19] fix: hold GIL when working with py_obj --- src/questdb/dataframe.pxi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index c3fad59e..e26a2745 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -149,7 +149,7 @@ cdef enum col_source_t: col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 - col_source_decimal_pyobj = 701000 + col_source_decimal_pyobj = 701100 col_source_decimal32_arrow = 702100 col_source_decimal64_arrow = 703100 col_source_decimal128_arrow = 704100 From 5d7ee5ea0105301d5ec8349f669c322d8907bb26 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 09:17:12 +0100 Subject: [PATCH 07/19] tests: fix test to work with new protocol version 3 --- test/mock_server.py | 5 +++-- test/test.py | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/test/mock_server.py b/test/mock_server.py index bcac584d..6178a4f7 100644 --- a/test/mock_server.py +++ b/test/mock_server.py @@ -117,11 +117,12 @@ def __exit__(self, _ex_type, _ex_value, _ex_tb): SETTINGS_WITH_PROTOCOL_VERSION_V1 = '{"config":{"release.type":"OSS","release.version":"[DEVELOPMENT]","line.proto.support.versions":[1],"ilp.proto.transports":["tcp","http"],"posthog.enabled":false,"posthog.api.key":null,"cairo.max.file.name.length":127},"preferences.version":0,"preferences":{}}' SETTINGS_WITH_PROTOCOL_VERSION_V2 = '{"config":{"release.type":"OSS","release.version":"[DEVELOPMENT]","line.proto.support.versions":[2],"ilp.proto.transports":["tcp","http"],"posthog.enabled":false,"posthog.api.key":null,"cairo.max.file.name.length":127},"preferences.version":0,"preferences":{}}' SETTINGS_WITH_PROTOCOL_VERSION_V3 = '{"config":{"release.type":"OSS","release.version":"[DEVELOPMENT]","line.proto.support.versions":[3],"ilp.proto.transports":["tcp","http"],"posthog.enabled":false,"posthog.api.key":null,"cairo.max.file.name.length":127},"preferences.version":0,"preferences":{}}' -SETTINGS_WITH_PROTOCOL_VERSION_V1_V2 = '{"config":{"release.type":"OSS","release.version":"[DEVELOPMENT]","line.proto.support.versions":[1,2],"ilp.proto.transports":["tcp","http"],"posthog.enabled":false,"posthog.api.key":null,"cairo.max.file.name.length":127},"preferences.version":0,"preferences":{}}' +SETTINGS_WITH_PROTOCOL_VERSION_V4 = '{"config":{"release.type":"OSS","release.version":"[DEVELOPMENT]","line.proto.support.versions":[4],"ilp.proto.transports":["tcp","http"],"posthog.enabled":false,"posthog.api.key":null,"cairo.max.file.name.length":127},"preferences.version":0,"preferences":{}}' +SETTINGS_WITH_PROTOCOL_VERSION_V1_V2_V3 = '{"config":{"release.type":"OSS","release.version":"[DEVELOPMENT]","line.proto.support.versions":[1,2,3],"ilp.proto.transports":["tcp","http"],"posthog.enabled":false,"posthog.api.key":null,"cairo.max.file.name.length":127},"preferences.version":0,"preferences":{}}' SETTINGS_WITHOUT_PROTOCOL_VERSION = '{ "release.type": "OSS", "release.version": "[DEVELOPMENT]", "acl.enabled": false, "posthog.enabled": false, "posthog.api.key": null }' class HttpServer: - def __init__(self, settings=SETTINGS_WITH_PROTOCOL_VERSION_V1_V2, delay_seconds=0): + def __init__(self, settings=SETTINGS_WITH_PROTOCOL_VERSION_V1_V2_V3, delay_seconds=0): self.delay_seconds = delay_seconds self.requests = [] self.responses = [] diff --git a/test/test.py b/test/test.py index 9b8d7262..a620bc4c 100755 --- a/test/test.py +++ b/test/test.py @@ -24,7 +24,7 @@ from mock_server import (Server, HttpServer, SETTINGS_WITHOUT_PROTOCOL_VERSION, SETTINGS_WITH_PROTOCOL_VERSION_V1, SETTINGS_WITH_PROTOCOL_VERSION_V2, - SETTINGS_WITH_PROTOCOL_VERSION_V1_V2,SETTINGS_WITH_PROTOCOL_VERSION_V3) + SETTINGS_WITH_PROTOCOL_VERSION_V1_V2_V3,SETTINGS_WITH_PROTOCOL_VERSION_V4) import questdb.ingress as qi @@ -1148,8 +1148,8 @@ def test_http_auto_protocol_version_only_v1(self): def test_http_auto_protocol_version_only_v2(self): self._test_sender_http_auto_protocol_version(SETTINGS_WITH_PROTOCOL_VERSION_V2, 2) - def test_http_auto_protocol_version_v1_v2(self): - self._test_sender_http_auto_protocol_version(SETTINGS_WITH_PROTOCOL_VERSION_V1_V2, 2) + def test_http_auto_protocol_version_v1_v2_v3(self): + self._test_sender_http_auto_protocol_version(SETTINGS_WITH_PROTOCOL_VERSION_V1_V2_V3, 3) def test_http_auto_protocol_version_without_version(self): self._test_sender_http_auto_protocol_version(SETTINGS_WITHOUT_PROTOCOL_VERSION, 1) @@ -1172,8 +1172,8 @@ def _test_sender_http_auto_protocol_version(self, settings, expected_version: in self.assertEqual(server.requests[0], exp) def test_http_auto_protocol_version_unsupported_client(self): - with self.assertRaisesRegex(qi.IngressError, 'Server does not support current client'): - with HttpServer(SETTINGS_WITH_PROTOCOL_VERSION_V3) as server, self.builder('http', '127.0.0.1', server.port) as sender: + with self.assertRaisesRegex(qi.IngressError, r'Server does not support any of the client protocol versions.*'): + with HttpServer(SETTINGS_WITH_PROTOCOL_VERSION_V4) as server, self.builder('http', '127.0.0.1', server.port) as sender: sender.row('tbl1', columns={'x': 42}) def test_specify_line_protocol_explicitly(self): From 0d7017f718aef6a52ee2bd973a9b32eec2188e4f Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 09:33:36 +0100 Subject: [PATCH 08/19] adding print to debug failing test --- test/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test.py b/test/test.py index a620bc4c..0b115941 100755 --- a/test/test.py +++ b/test/test.py @@ -434,6 +434,7 @@ def test_bad_protocol_versions(self): bad_versions.append(None) for version in bad_versions: with self.assertRaises(Exception) as capture: + print("Testing bad protocol version:", version) qi.Buffer(protocol_version=version) self.fail('Should not have reached here - constructing buffer') From 48e97093b75a008130ee97a20e22f6f8ec3e6d1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Dalmon?= <38668811+RaphDal@users.noreply.github.com> Date: Fri, 7 Nov 2025 09:34:34 +0100 Subject: [PATCH 09/19] Remove GIL requirement for arrow decimals Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- src/questdb/dataframe.pxi | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index e26a2745..71e8621d 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -150,10 +150,10 @@ cdef enum col_source_t: col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 col_source_decimal_pyobj = 701100 - col_source_decimal32_arrow = 702100 - col_source_decimal64_arrow = 703100 - col_source_decimal128_arrow = 704100 - col_source_decimal256_arrow = 705100 + col_source_decimal32_arrow = 702000 + col_source_decimal64_arrow = 703000 + col_source_decimal128_arrow = 704000 + col_source_decimal256_arrow = 705000 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: From 5b4cbfaed65356f12b57ea0422b95d252e68c18e Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 09:41:26 +0100 Subject: [PATCH 10/19] fix: update decimal source enum values for compatibility --- src/questdb/dataframe.pxi | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 71e8621d..e26a2745 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -150,10 +150,10 @@ cdef enum col_source_t: col_source_dt64ns_tz_arrow = 502000 col_source_arr_f64_numpyobj = 601100 col_source_decimal_pyobj = 701100 - col_source_decimal32_arrow = 702000 - col_source_decimal64_arrow = 703000 - col_source_decimal128_arrow = 704000 - col_source_decimal256_arrow = 705000 + col_source_decimal32_arrow = 702100 + col_source_decimal64_arrow = 703100 + col_source_decimal128_arrow = 704100 + col_source_decimal256_arrow = 705100 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: From c2c19d70a455279385acd31a54f0c26b4639bc2c Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 10:04:35 +0100 Subject: [PATCH 11/19] test: add debug print for bad protocol version handling --- src/questdb/ingress.pyx | 1 + test/test.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 0b45f627..b783fc8b 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -827,6 +827,7 @@ cdef class Buffer: :param int init_buf_size: Initial capacity of the buffer in bytes. :param int max_name_len: Maximum length of a table or column name. """ + print('Buffer protocol_version:', protocol_version) if protocol_version not in range(1, 4): raise IngressError( IngressErrorCode.ProtocolVersionError, diff --git a/test/test.py b/test/test.py index 0b115941..bae9349d 100755 --- a/test/test.py +++ b/test/test.py @@ -436,6 +436,7 @@ def test_bad_protocol_versions(self): with self.assertRaises(Exception) as capture: print("Testing bad protocol version:", version) qi.Buffer(protocol_version=version) + print("Should not have reached here - constructed buffer", version) self.fail('Should not have reached here - constructing buffer') self.assertIn(type(capture.exception), (qi.IngressError, TypeError)) From 9b00b39b8838ec1f8b9fbc3ca875e7ca4d628b32 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 10:25:57 +0100 Subject: [PATCH 12/19] refactor: remove debug print statements for protocol version handling --- src/questdb/ingress.pyx | 1 - test/test.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index b783fc8b..0b45f627 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -827,7 +827,6 @@ cdef class Buffer: :param int init_buf_size: Initial capacity of the buffer in bytes. :param int max_name_len: Maximum length of a table or column name. """ - print('Buffer protocol_version:', protocol_version) if protocol_version not in range(1, 4): raise IngressError( IngressErrorCode.ProtocolVersionError, diff --git a/test/test.py b/test/test.py index bae9349d..095b70f7 100755 --- a/test/test.py +++ b/test/test.py @@ -419,7 +419,6 @@ def test_bad_protocol_versions(self): 0, 4, '4', - 1.5, '1.5', '2.0', ] @@ -434,9 +433,7 @@ def test_bad_protocol_versions(self): bad_versions.append(None) for version in bad_versions: with self.assertRaises(Exception) as capture: - print("Testing bad protocol version:", version) qi.Buffer(protocol_version=version) - print("Should not have reached here - constructed buffer", version) self.fail('Should not have reached here - constructing buffer') self.assertIn(type(capture.exception), (qi.IngressError, TypeError)) From cad427c92c789073a678ee48473b56cd138cdad3 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 14:11:57 +0100 Subject: [PATCH 13/19] fix: restore conditional for expected timestamp type based on QuestDB version --- test/system_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/system_test.py b/test/system_test.py index 76805205..cae26171 100755 --- a/test/system_test.py +++ b/test/system_test.py @@ -212,9 +212,7 @@ def test_http(self): resp = self.qdb_plain.retry_check_table(table_name, min_rows=3) - # Re-enable the line below once https://github.com/questdb/questdb/pull/6220 is merged - # exp_ts_type = 'TIMESTAMP' if self.qdb_plain.version <= (9, 1, 0) else 'TIMESTAMP_NS' - exp_ts_type = 'TIMESTAMP' + exp_ts_type = 'TIMESTAMP' if self.qdb_plain.version <= (9, 1, 0) else 'TIMESTAMP_NS' exp_columns = [ {'name': 'name_a', 'type': 'SYMBOL'}, From 674e83c168e7952606050634a5e6c69b862c6b79 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 15:13:14 +0100 Subject: [PATCH 14/19] tests: add decimal support tests for pyarrow and decimal types --- test/system_test.py | 66 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/test/system_test.py b/test/system_test.py index cae26171..b9a60b35 100755 --- a/test/system_test.py +++ b/test/system_test.py @@ -8,6 +8,8 @@ import uuid import pathlib import numpy as np +import decimal +import pyarrow as pa import patch_path PROJ_ROOT = patch_path.PROJ_ROOT @@ -27,10 +29,11 @@ import questdb.ingress as qi -QUESTDB_VERSION = '9.1.0' +QUESTDB_VERSION = '9.1.1' QUESTDB_PLAIN_INSTALL_PATH = None QUESTDB_AUTH_INSTALL_PATH = None FIRST_ARRAY_RELEASE = (8, 4, 0) +FIRST_DECIMAL_RELEASE = (9, 2, 0) def may_install_questdb(): global QUESTDB_PLAIN_INSTALL_PATH @@ -75,6 +78,10 @@ def setUpClass(cls): QUESTDB_AUTH_INSTALL_PATH, auth=True, wrap_tls=True) cls.qdb_auth.start() + if os.environ.get('TEST_QUESTDB_INTEGRATION_FORCE_MAX_VERSION') == '1': + cls.qdb_plain.version = (999, 999, 999) + cls.qdb_auth.version = (999, 999, 999) + @classmethod def tearDownClass(cls): if cls.qdb_auth: @@ -251,7 +258,6 @@ def test_f64_arr(self): 'f64_arr2': array2, 'f64_arr3': array3}, at=qi.ServerTimestamp) - resp = self.qdb_plain.retry_check_table(table_name) exp_columns = [{'dim': 3, 'elemType': 'DOUBLE', 'name': 'f64_arr1', 'type': 'ARRAY'}, {'dim': 3, 'elemType': 'DOUBLE', 'name': 'f64_arr2', 'type': 'ARRAY'}, @@ -264,5 +270,61 @@ def test_f64_arr(self): scrubbed_data = [row[:-1] for row in resp['dataset']] self.assertEqual(scrubbed_data, expected_data) + def test_decimal_py_obj(self): + if self.qdb_plain.version < FIRST_DECIMAL_RELEASE: + self.skipTest('old server does not support decimal') + table_name = uuid.uuid4().hex + pending = None + with qi.Sender('http', 'localhost', self.qdb_plain.http_server_port) as sender: + sender.row( + table_name, + columns={ + 'dec_col': decimal.Decimal('12345.678')}, + at=qi.ServerTimestamp) + pending = bytes(sender) + + resp = self.qdb_plain.retry_check_table(table_name, min_rows=1, log_ctx=pending) + exp_columns = [{'name': 'dec_col', 'type': 'DECIMAL(18,3)'}, + {'name': 'timestamp', 'type': 'TIMESTAMP'}] + self.assertEqual(resp['columns'], exp_columns) + expected_data = [['12345.678']] + scrubbed_data = [row[:-1] for row in resp['dataset']] + self.assertEqual(scrubbed_data, expected_data) + + @unittest.skipIf(not pa, 'pyarrow not installed') + @unittest.skipIf(not pd, 'pandas not installed') + def test_decimal_pyarrow(self): + if self.qdb_plain.version < FIRST_DECIMAL_RELEASE: + self.skipTest('old server does not support decimal') + df = pd.DataFrame({ + 'prices': pd.array( + [ + decimal.Decimal('-99999.99'), + decimal.Decimal('-678'), + None + ], + dtype=pd.ArrowDtype(pa.decimal128(18, 2)) + ) + }) + + table_name = uuid.uuid4().hex + pending = None + with qi.Sender('http', 'localhost', self.qdb_plain.http_server_port) as sender: + sender.dataframe(df, table_name=table_name, at=qi.ServerTimestamp) + pending = bytes(sender) + + resp = self.qdb_plain.retry_check_table(table_name, min_rows=1, log_ctx=pending) + exp_columns = [{'name': 'prices', 'type': 'DECIMAL(18,3)'}, + {'name': 'timestamp', 'type': 'TIMESTAMP'}] + self.assertEqual(resp['columns'], exp_columns) + expected_data = [ + ['-99999.990'], + ['-678.000'], + [None] + ] + scrubbed_data = [row[:-1] for row in resp['dataset']] + self.assertEqual(scrubbed_data, expected_data) + + if __name__ == '__main__': unittest.main() \ No newline at end of file From 784d8b9f42060bc97bf6eebe01fe3f217578e36b Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 15:13:43 +0100 Subject: [PATCH 15/19] feat: support decimal serialization directly from column --- src/questdb/dataframe.pxi | 30 +++++++++++++++++------------- src/questdb/ingress.pyx | 8 ++++++++ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index e26a2745..694a70ee 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -2167,30 +2167,21 @@ cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj( &err): raise c_err_to_py(err) - -cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( - line_sender_buffer* ls_buf, - qdb_pystr_buf* b, - col_t* col) except -1: +cdef void_int serialize_decimal_py_obj(line_sender_buffer *buf, line_sender_column_name c_name, PyObject* value) except -1: cdef line_sender_error* err = NULL - cdef PyObject** access = col.cursor.chunk.buffers[1] - cdef PyObject* cell = access[col.cursor.offset] cdef unsigned int scale = 0 cdef object mantissa cdef const uint8_t* mantissa_ptr cdef Py_ssize_t mantissa_len - if _dataframe_is_null_pyobj(cell): - return 0 - # Convert the Python Decimal into (scale, mantissa) bytes; returns None for special values. mantissa = decimal_pyobj_to_binary( - cell, + value, &scale, IngressError, IngressErrorCode.BadDataFrame) if mantissa is None: - if not line_sender_buffer_column_dec(ls_buf, col.name, 0, NULL, 0, &err): + if not line_sender_buffer_column_dec(buf, c_name, 0, NULL, 0, &err): raise c_err_to_py(err) return 0 @@ -2204,12 +2195,25 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( raise MemoryError() mantissa_len = PyBytes_GET_SIZE(mantissa) - if not line_sender_buffer_column_dec(ls_buf, col.name, scale, mantissa_ptr, mantissa_len, &err): + if not line_sender_buffer_column_dec(buf, c_name, scale, mantissa_ptr, mantissa_len, &err): raise c_err_to_py(err) return 0 +cdef void_int _dataframe_serialize_cell_column_decimal__decimal_pyobj( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col) except -1: + cdef PyObject** access = col.cursor.chunk.buffers[1] + cdef PyObject* cell = access[col.cursor.offset] + + if _dataframe_is_null_pyobj(cell): + return 0 + + return serialize_decimal_py_obj(ls_buf, col.name, cell) + + cdef void_int _dataframe_serialize_cell_column_decimal__decimal32_arrow( line_sender_buffer* ls_buf, qdb_pystr_buf* b, diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 0b45f627..b763f909 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -944,6 +944,10 @@ cdef class Buffer: if not line_sender_buffer_column_bool(self._impl, c_name, value, &err): raise c_err_to_py(err) + cdef inline void_int _column_decimal( + self, line_sender_column_name c_name, object value) except -1: + return serialize_decimal_py_obj(self._impl, c_name, value) + cdef inline void_int _column_i64( self, line_sender_column_name c_name, int64_t value) except -1: cdef line_sender_error* err = NULL @@ -1038,6 +1042,8 @@ cdef class Buffer: self._column_numpy(c_name, value) elif isinstance(value, cp_datetime): self._column_dt(c_name, value) + elif isinstance(value, Decimal): + self._column_decimal(c_name, value) else: valid = ', '.join(( 'bool', @@ -1192,6 +1198,8 @@ cdef class Buffer: - Serialized as ILP type * - ``bool`` - `BOOLEAN `_ + * - ``decimal`` + - `DECIMAL `_ * - ``int`` - `INTEGER `_ * - ``float`` From 94dba610cbe4948e451bb8ee19ab2d602f2346c3 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 15:13:51 +0100 Subject: [PATCH 16/19] fix: adjust byte length calculation for unscaled object in decimal conversion --- src/questdb/mpdecimal_compat.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/questdb/mpdecimal_compat.pxd b/src/questdb/mpdecimal_compat.pxd index 8a8b412e..b30801f2 100644 --- a/src/questdb/mpdecimal_compat.pxd +++ b/src/questdb/mpdecimal_compat.pxd @@ -68,4 +68,4 @@ cdef inline object decimal_pyobj_to_binary( unscaled_obj = -unscaled_obj encoded_scale[0] = scale_value - return unscaled_obj.to_bytes((unscaled_obj.bit_length() + 7) // 8, byteorder='big', signed=True) + return unscaled_obj.to_bytes((unscaled_obj.bit_length() + 7) // 8 + 1, byteorder='big', signed=True) From 3a57ea477ff7ea275b6ddcbaa5d5981bf191cbf7 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 15:14:08 +0100 Subject: [PATCH 17/19] fix: add decimal repo test in ci --- ci/run_tests_pipeline.yaml | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/ci/run_tests_pipeline.yaml b/ci/run_tests_pipeline.yaml index 98842056..a27ac434 100644 --- a/ci/run_tests_pipeline.yaml +++ b/ci/run_tests_pipeline.yaml @@ -36,7 +36,7 @@ stages: submodules: true - task: UsePythonVersion@0 inputs: - versionSpec: '3.12' + versionSpec: "3.12" - script: | python3 --version python3 -m pip install cython @@ -44,20 +44,31 @@ stages: - script: python3 ci/pip_install_deps.py displayName: "Install pandas latest" condition: eq(variables.pandasVersion, '') - - script: python3 ci/pip_install_deps.py --pandas-version==$(pandasVersion) + - script: python3 ci/pip_install_deps.py --pandas-version==$(pandasVersion) displayName: "Install pandas older" condition: ne(variables.pandasVersion, '') - script: python3 proj.py build displayName: "Build" - script: | git clone --depth 1 https://github.com/questdb/questdb.git - displayName: git clone questdb + displayName: git clone questdb master condition: eq(variables.vsQuestDbMaster, true) - task: Maven@3 - displayName: "Compile QuestDB" + displayName: "Compile QuestDB master" inputs: - mavenPOMFile: 'questdb/pom.xml' - jdkVersionOption: '1.17' + mavenPOMFile: "questdb/pom.xml" + jdkVersionOption: "1.17" + options: "-DskipTests -Pbuild-web-console" + condition: eq(variables.vsQuestDbMaster, true) + - script: | + git clone --depth 1 --branch rd_decimal_integration https://github.com/questdb/questdb.git questdb-decimal + displayName: git clone questdb decimal branch + condition: eq(variables.vsQuestDbMaster, true) + - task: Maven@3 + displayName: "Compile QuestDB decimal branch" + inputs: + mavenPOMFile: "questdb-decimal/pom.xml" + jdkVersionOption: "1.17" options: "-DskipTests -Pbuild-web-console" condition: eq(variables.vsQuestDbMaster, true) - script: python3 proj.py test 1 @@ -68,7 +79,15 @@ stages: displayName: "Test vs master" env: JAVA_HOME: $(JAVA_HOME_17_X64) - QDB_REPO_PATH: './questdb' + QDB_REPO_PATH: "./questdb" + condition: eq(variables.vsQuestDbMaster, true) + # Remove before merging decimal support PR + - script: python3 proj.py test 1 + displayName: "Test vs decimal" + env: + JAVA_HOME: $(JAVA_HOME_17_X64) + QDB_REPO_PATH: "./questdb-decimal" + TEST_QUESTDB_INTEGRATION_FORCE_MAX_VERSION: "1" condition: eq(variables.vsQuestDbMaster, true) - job: TestsAgainstVariousNumpyVersion1x pool: @@ -82,7 +101,7 @@ stages: submodules: true - task: UsePythonVersion@0 inputs: - versionSpec: '3.9' + versionSpec: "3.9" - script: | python3 --version python3 -m pip install uv @@ -106,7 +125,7 @@ stages: submodules: true - task: UsePythonVersion@0 inputs: - versionSpec: '3.11' + versionSpec: "3.11" - script: | python3 --version python3 -m pip install uv From 00ff8a45db0a14b0bd220c2ed84dd7e54a99c483 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 15:34:41 +0100 Subject: [PATCH 18/19] fix: correct byte length calculation for unscaled object in decimal conversion --- src/questdb/mpdecimal_compat.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/questdb/mpdecimal_compat.pxd b/src/questdb/mpdecimal_compat.pxd index b30801f2..d1169b12 100644 --- a/src/questdb/mpdecimal_compat.pxd +++ b/src/questdb/mpdecimal_compat.pxd @@ -68,4 +68,4 @@ cdef inline object decimal_pyobj_to_binary( unscaled_obj = -unscaled_obj encoded_scale[0] = scale_value - return unscaled_obj.to_bytes((unscaled_obj.bit_length() + 7) // 8 + 1, byteorder='big', signed=True) + return unscaled_obj.to_bytes((unscaled_obj.bit_length() + 8) // 8, byteorder='big', signed=True) From 6990a2089ab400086bfe40bd88ddd20a0761dc39 Mon Sep 17 00:00:00 2001 From: Raphael DALMON Date: Fri, 7 Nov 2025 15:34:55 +0100 Subject: [PATCH 19/19] fix: update pyarrow import handling --- test/system_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/system_test.py b/test/system_test.py index b9a60b35..982f3ad0 100755 --- a/test/system_test.py +++ b/test/system_test.py @@ -9,7 +9,6 @@ import pathlib import numpy as np import decimal -import pyarrow as pa import patch_path PROJ_ROOT = patch_path.PROJ_ROOT @@ -24,6 +23,7 @@ import pyarrow except ImportError: pd = None + pyarrow = None import questdb.ingress as qi @@ -219,7 +219,7 @@ def test_http(self): resp = self.qdb_plain.retry_check_table(table_name, min_rows=3) - exp_ts_type = 'TIMESTAMP' if self.qdb_plain.version <= (9, 1, 0) else 'TIMESTAMP_NS' + exp_ts_type = 'TIMESTAMP' if self.qdb_plain.version < (9, 1, 0) else 'TIMESTAMP_NS' exp_columns = [ {'name': 'name_a', 'type': 'SYMBOL'}, @@ -291,7 +291,7 @@ def test_decimal_py_obj(self): scrubbed_data = [row[:-1] for row in resp['dataset']] self.assertEqual(scrubbed_data, expected_data) - @unittest.skipIf(not pa, 'pyarrow not installed') + @unittest.skipIf(not pyarrow, 'pyarrow not installed') @unittest.skipIf(not pd, 'pandas not installed') def test_decimal_pyarrow(self): if self.qdb_plain.version < FIRST_DECIMAL_RELEASE: @@ -303,7 +303,7 @@ def test_decimal_pyarrow(self): decimal.Decimal('-678'), None ], - dtype=pd.ArrowDtype(pa.decimal128(18, 2)) + dtype=pd.ArrowDtype(pyarrow.decimal128(18, 2)) ) })