Skip to content

Commit 3c259e6

Browse files
authored
feat: support for additional arrow datatypes and microsecond timestamps (#118)
1 parent a4711b4 commit 3c259e6

File tree

2 files changed

+401
-39
lines changed

2 files changed

+401
-39
lines changed

src/questdb/dataframe.pxi

Lines changed: 188 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,14 @@ cdef enum col_source_t:
148148
col_source_str_lrg_utf8_arrow = 406000
149149
col_source_dt64ns_numpy = 501000
150150
col_source_dt64ns_tz_arrow = 502000
151-
col_source_arr_f64_numpyobj = 601100
152-
col_source_decimal_pyobj = 701100
153-
col_source_decimal32_arrow = 702000
154-
col_source_decimal64_arrow = 703000
155-
col_source_decimal128_arrow = 704000
156-
col_source_decimal256_arrow = 705000
151+
col_source_dt64us_numpy = 601000
152+
col_source_dt64us_tz_arrow = 602000
153+
col_source_arr_f64_numpyobj = 701100
154+
col_source_decimal_pyobj = 801100
155+
col_source_decimal32_arrow = 802000
156+
col_source_decimal64_arrow = 803000
157+
col_source_decimal128_arrow = 804000
158+
col_source_decimal256_arrow = 805000
157159

158160

159161
cdef bint col_source_needs_gil(col_source_t source) noexcept nogil:
@@ -242,6 +244,8 @@ cdef dict _TARGET_TO_SOURCES = {
242244
col_target_t.col_target_column_ts: {
243245
col_source_t.col_source_dt64ns_numpy,
244246
col_source_t.col_source_dt64ns_tz_arrow,
247+
col_source_t.col_source_dt64us_numpy,
248+
col_source_t.col_source_dt64us_tz_arrow
245249
},
246250
col_target_t.col_target_column_arr_f64: {
247251
col_source_t.col_source_arr_f64_numpyobj,
@@ -256,6 +260,8 @@ cdef dict _TARGET_TO_SOURCES = {
256260
col_target_t.col_target_at: {
257261
col_source_t.col_source_dt64ns_numpy,
258262
col_source_t.col_source_dt64ns_tz_arrow,
263+
col_source_t.col_source_dt64us_numpy,
264+
col_source_t.col_source_dt64us_tz_arrow,
259265
},
260266
}
261267

@@ -386,11 +392,22 @@ cdef enum col_dispatch_code_t:
386392
col_target_t.col_target_column_ts + \
387393
col_source_t.col_source_dt64ns_tz_arrow
388394

395+
col_dispatch_code_column_ts__dt64us_numpy = \
396+
col_target_t.col_target_column_ts + col_source_t.col_source_dt64us_numpy
397+
col_dispatch_code_column_ts__dt64us_tz_arrow = \
398+
col_target_t.col_target_column_ts + \
399+
col_source_t.col_source_dt64us_tz_arrow
400+
389401
col_dispatch_code_at__dt64ns_numpy = \
390402
col_target_t.col_target_at + col_source_t.col_source_dt64ns_numpy
391403
col_dispatch_code_at__dt64ns_tz_arrow = \
392404
col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow
393405

406+
col_dispatch_code_at__dt64us_numpy = \
407+
col_target_t.col_target_at + col_source_t.col_source_dt64us_numpy
408+
col_dispatch_code_at__dt64us_tz_arrow = \
409+
col_target_t.col_target_at + col_source_t.col_source_dt64us_tz_arrow
410+
394411
col_dispatch_code_column_arr_f64__arr_f64_numpyobj = \
395412
col_target_t.col_target_column_arr_f64 + col_source_t.col_source_arr_f64_numpyobj
396413

@@ -508,6 +525,7 @@ cdef object _NUMPY_INT64 = None
508525
cdef object _NUMPY_FLOAT32 = None
509526
cdef object _NUMPY_FLOAT64 = None
510527
cdef object _NUMPY_DATETIME64_NS = None
528+
cdef object _NUMPY_DATETIME64_US = None
511529
cdef object _NUMPY_OBJECT = None
512530
cdef object _PANDAS = None # module object
513531
cdef object _PANDAS_NA = None # pandas.NA
@@ -541,6 +559,7 @@ cdef object _dataframe_may_import_deps():
541559
global _NUMPY_FLOAT32
542560
global _NUMPY_FLOAT64
543561
global _NUMPY_DATETIME64_NS
562+
global _NUMPY_DATETIME64_US
544563
global _NUMPY_OBJECT
545564
if _NUMPY is not None:
546565
return
@@ -567,6 +586,7 @@ cdef object _dataframe_may_import_deps():
567586
_NUMPY_FLOAT32 = type(_NUMPY.dtype('float32'))
568587
_NUMPY_FLOAT64 = type(_NUMPY.dtype('float64'))
569588
_NUMPY_DATETIME64_NS = type(_NUMPY.dtype('datetime64[ns]'))
589+
_NUMPY_DATETIME64_US = type(_NUMPY.dtype('datetime64[us]'))
570590
_NUMPY_OBJECT = type(_NUMPY.dtype('object'))
571591
_PANDAS = pandas
572592
_PANDAS_NA = pandas.NA
@@ -781,16 +801,47 @@ cdef int64_t _AT_IS_SERVER_NOW = -2
781801
cdef int64_t _AT_IS_SET_BY_COLUMN = -1
782802

783803

784-
cdef str _SUPPORTED_DATETIMES = 'datetime64[ns] or datetime64[ns, tz]'
804+
cdef str _SUPPORTED_DATETIMES = 'datetime64[ns], datetime64[us], datetime64[ns, tz], timestamp[ns][pyarrow], or timestamp[us][pyarrow]'
785805

786806

787-
cdef object _dataframe_is_supported_datetime(object dtype):
788-
if (isinstance(dtype, _NUMPY_DATETIME64_NS) and
789-
(str(dtype) == 'datetime64[ns]')):
790-
return True
791-
if isinstance(dtype, _PANDAS.DatetimeTZDtype):
792-
return dtype.unit == 'ns'
793-
return False
807+
cdef int _dataframe_classify_timestamp_dtype(object dtype) except -1:
808+
"""
809+
Classify the dtype and determine if it's supported for use as a timestamp.
810+
811+
Returns:
812+
> 0 - a value castable to a `col_source_t`.
813+
0 - dtype is not a supported timestamp datatype.
814+
"""
815+
cdef object arrow_type
816+
if isinstance(dtype, _NUMPY_DATETIME64_NS) and str(dtype) == "datetime64[ns]":
817+
return col_source_t.col_source_dt64ns_numpy
818+
elif isinstance(dtype, _NUMPY_DATETIME64_US) and str(dtype) == "datetime64[us]":
819+
return col_source_t.col_source_dt64us_numpy
820+
elif isinstance(dtype, _PANDAS.DatetimeTZDtype):
821+
# Docs say this should always be nanos, but best assert in case the API changes in the future.
822+
# https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html
823+
if dtype.unit == 'ns':
824+
return col_source_t.col_source_dt64ns_tz_arrow
825+
else:
826+
raise IngressError(
827+
IngressErrorCode.BadDataFrame,
828+
f'Unsupported pandas dtype {dtype} unit {dtype.unit}. ' +
829+
'Raise an issue if you think it should be supported: ' +
830+
'https://github.com/questdb/py-questdb-client/issues.')
831+
elif isinstance(dtype, _PANDAS.ArrowDtype):
832+
arrow_type = dtype.pyarrow_dtype
833+
if arrow_type.id == _PYARROW.lib.Type_TIMESTAMP:
834+
if arrow_type.unit == "ns":
835+
return col_source_t.col_source_dt64ns_tz_arrow
836+
elif arrow_type.unit == "us":
837+
return col_source_t.col_source_dt64us_tz_arrow
838+
else:
839+
raise IngressError(
840+
IngressErrorCode.BadDataFrame,
841+
f'Unsupported arrow dtype {dtype} unit {arrow_type.unit}. ' +
842+
'Raise an issue if you think it should be supported: ' +
843+
'https://github.com/questdb/py-questdb-client/issues.')
844+
return 0
794845

795846

796847
cdef ssize_t _dataframe_resolve_at(
@@ -827,7 +878,7 @@ cdef ssize_t _dataframe_resolve_at(
827878
'Must be one of: None, TimestampNanos, datetime, ' +
828879
'int (column index), str (colum name)')
829880
dtype = df.dtypes.iloc[col_index]
830-
if _dataframe_is_supported_datetime(dtype):
881+
if _dataframe_classify_timestamp_dtype(dtype) != 0:
831882
at_value_out[0] = _AT_IS_SET_BY_COLUMN
832883
col = &cols.d[col_index]
833884
col.setup.meta_target = meta_target_t.meta_target_at
@@ -954,28 +1005,52 @@ cdef void_int _dataframe_category_series_as_arrow(
9541005
f'got a category of {pandas_col.series.dtype.categories.dtype}.')
9551006

9561007
cdef void_int _dataframe_series_resolve_arrow(PandasCol pandas_col, object arrowtype, col_t *col) except -1:
1008+
cdef bint is_decimal_col = False
9571009
_dataframe_series_as_arrow(pandas_col, col)
9581010
if arrowtype.id == _PYARROW.lib.Type_DECIMAL32:
9591011
col.setup.source = col_source_t.col_source_decimal32_arrow
1012+
is_decimal_col = True
9601013
elif arrowtype.id == _PYARROW.lib.Type_DECIMAL64:
9611014
col.setup.source = col_source_t.col_source_decimal64_arrow
1015+
is_decimal_col = True
9621016
elif arrowtype.id == _PYARROW.lib.Type_DECIMAL128:
9631017
col.setup.source = col_source_t.col_source_decimal128_arrow
1018+
is_decimal_col = True
9641019
elif arrowtype.id == _PYARROW.lib.Type_DECIMAL256:
9651020
col.setup.source = col_source_t.col_source_decimal256_arrow
1021+
is_decimal_col = True
1022+
elif arrowtype.id == _PYARROW.lib.Type_BOOL:
1023+
col.setup.source = col_source_t.col_source_bool_arrow
1024+
elif arrowtype.id == _PYARROW.lib.Type_LARGE_STRING:
1025+
col.setup.source = col_source_t.col_source_str_lrg_utf8_arrow
1026+
elif arrowtype.id == _PYARROW.lib.Type_FLOAT:
1027+
col.setup.source = col_source_t.col_source_f32_arrow
1028+
elif arrowtype.id == _PYARROW.lib.Type_DOUBLE:
1029+
col.setup.source = col_source_t.col_source_f64_arrow
1030+
elif arrowtype.id == _PYARROW.lib.Type_INT8:
1031+
col.setup.source = col_source_t.col_source_i8_arrow
1032+
elif arrowtype.id == _PYARROW.lib.Type_INT16:
1033+
col.setup.source = col_source_t.col_source_i16_arrow
1034+
elif arrowtype.id == _PYARROW.lib.Type_INT32:
1035+
col.setup.source = col_source_t.col_source_i32_arrow
1036+
elif arrowtype.id == _PYARROW.lib.Type_INT64:
1037+
col.setup.source = col_source_t.col_source_i64_arrow
9661038
else:
9671039
raise IngressError(
9681040
IngressErrorCode.BadDataFrame,
9691041
f'Unsupported arrow type {arrowtype} for column {pandas_col.name!r}. ' +
9701042
'Raise an issue if you think it should be supported: ' +
9711043
'https://github.com/questdb/py-questdb-client/issues.')
972-
if arrowtype.scale < 0 or arrowtype.scale > 76:
973-
raise IngressError(
974-
IngressErrorCode.BadDataFrame,
975-
f'Bad column {pandas_col.name!r}: ' +
976-
f'Unsupported decimal scale {arrowtype.scale}: ' +
977-
'Must be in the range 0 to 76 inclusive.')
978-
col.scale = <uint8_t>arrowtype.scale
1044+
if is_decimal_col:
1045+
if arrowtype.scale < 0 or arrowtype.scale > 76:
1046+
raise IngressError(
1047+
IngressErrorCode.BadDataFrame,
1048+
f'Bad column {pandas_col.name!r}: ' +
1049+
f'Unsupported decimal scale {arrowtype.scale}: ' +
1050+
'Must be in the range 0 to 76 inclusive.')
1051+
col.scale = <uint8_t>arrowtype.scale
1052+
else:
1053+
col.scale = 0
9791054
return 0
9801055

9811056
cdef inline bint _dataframe_is_float_nan(PyObject* obj) noexcept:
@@ -1061,7 +1136,15 @@ cdef void_int _dataframe_series_sniff_pyobj(
10611136
cdef void_int _dataframe_resolve_source_and_buffers(
10621137
PandasCol pandas_col, col_t* col) except -1:
10631138
cdef object dtype = pandas_col.dtype
1064-
if isinstance(dtype, _NUMPY_BOOL):
1139+
cdef int ts_col_source = _dataframe_classify_timestamp_dtype(dtype)
1140+
if ts_col_source != 0:
1141+
col.setup.source = <col_source_t>ts_col_source
1142+
if ((col.setup.source == col_source_t.col_source_dt64ns_numpy) or
1143+
(col.setup.source == col_source_t.col_source_dt64us_numpy)):
1144+
_dataframe_series_as_pybuf(pandas_col, col)
1145+
else:
1146+
_dataframe_series_as_arrow(pandas_col, col)
1147+
elif isinstance(dtype, _NUMPY_BOOL):
10651148
col.setup.source = col_source_t.col_source_bool_numpy
10661149
_dataframe_series_as_pybuf(pandas_col, col)
10671150
elif isinstance(dtype, _PANDAS.BooleanDtype):
@@ -1150,14 +1233,6 @@ cdef void_int _dataframe_resolve_source_and_buffers(
11501233
f'for column {pandas_col.name} of dtype {dtype}.')
11511234
elif isinstance(dtype, _PANDAS.CategoricalDtype):
11521235
_dataframe_category_series_as_arrow(pandas_col, col)
1153-
elif (isinstance(dtype, _NUMPY_DATETIME64_NS) and
1154-
_dataframe_is_supported_datetime(dtype)):
1155-
col.setup.source = col_source_t.col_source_dt64ns_numpy
1156-
_dataframe_series_as_pybuf(pandas_col, col)
1157-
elif (isinstance(dtype, _PANDAS.DatetimeTZDtype) and
1158-
_dataframe_is_supported_datetime(dtype)):
1159-
col.setup.source = col_source_t.col_source_dt64ns_tz_arrow
1160-
_dataframe_series_as_arrow(pandas_col, col)
11611236
elif isinstance(dtype, _NUMPY_OBJECT):
11621237
_dataframe_series_sniff_pyobj(pandas_col, col)
11631238
elif isinstance(dtype, _PANDAS.ArrowDtype):
@@ -2126,6 +2201,21 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy(
21262201
_ensure_has_gil(gs)
21272202
raise c_err_to_py(err)
21282203

2204+
2205+
cdef void_int _dataframe_serialize_cell_column_ts__dt64us_numpy(
2206+
line_sender_buffer* ls_buf,
2207+
qdb_pystr_buf* b,
2208+
col_t* col,
2209+
PyThreadState** gs) except -1:
2210+
cdef line_sender_error* err = NULL
2211+
cdef int64_t* access = <int64_t*>col.cursor.chunk.buffers[1]
2212+
cdef int64_t cell = access[col.cursor.offset]
2213+
if cell != _NAT:
2214+
if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err):
2215+
_ensure_has_gil(gs)
2216+
raise c_err_to_py(err)
2217+
2218+
21292219
cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(
21302220
line_sender_buffer* ls_buf,
21312221
qdb_pystr_buf* b,
@@ -2265,6 +2355,7 @@ cdef void_int _dataframe_serialize_cell_column_decimal__decimal256_arrow(
22652355
_ensure_has_gil(gs)
22662356
raise c_err_to_py(err)
22672357

2358+
22682359
cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(
22692360
line_sender_buffer* ls_buf,
22702361
qdb_pystr_buf* b,
@@ -2282,6 +2373,23 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(
22822373
raise c_err_to_py(err)
22832374

22842375

2376+
cdef void_int _dataframe_serialize_cell_column_ts__dt64us_tz_arrow(
2377+
line_sender_buffer* ls_buf,
2378+
qdb_pystr_buf* b,
2379+
col_t* col,
2380+
PyThreadState** gs) except -1:
2381+
cdef line_sender_error* err = NULL
2382+
cdef bint valid = _dataframe_arrow_is_valid(&col.cursor)
2383+
cdef int64_t cell
2384+
cdef int64_t* access
2385+
if valid:
2386+
access = <int64_t*>col.cursor.chunk.buffers[1]
2387+
cell = access[col.cursor.offset]
2388+
if not line_sender_buffer_column_ts_micros(ls_buf, col.name, cell, &err):
2389+
_ensure_has_gil(gs)
2390+
raise c_err_to_py(err)
2391+
2392+
22852393
cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy(
22862394
line_sender_buffer* ls_buf,
22872395
qdb_pystr_buf* b,
@@ -2301,6 +2409,25 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_numpy(
23012409
raise c_err_to_py(err)
23022410

23032411

2412+
cdef void_int _dataframe_serialize_cell_at_dt64us_numpy(
2413+
line_sender_buffer* ls_buf,
2414+
qdb_pystr_buf* b,
2415+
col_t* col,
2416+
PyThreadState** gs) except -1:
2417+
cdef line_sender_error* err = NULL
2418+
cdef int64_t* access = <int64_t*>col.cursor.chunk.buffers[1]
2419+
cdef int64_t cell = access[col.cursor.offset]
2420+
if cell == _NAT:
2421+
if not line_sender_buffer_at_now(ls_buf, &err):
2422+
_ensure_has_gil(gs)
2423+
raise c_err_to_py(err)
2424+
else:
2425+
# Note: ls_buf will validate against negative numbers.
2426+
if not line_sender_buffer_at_micros(ls_buf, cell, &err):
2427+
_ensure_has_gil(gs)
2428+
raise c_err_to_py(err)
2429+
2430+
23042431
cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow(
23052432
line_sender_buffer* ls_buf,
23062433
qdb_pystr_buf* b,
@@ -2323,6 +2450,28 @@ cdef void_int _dataframe_serialize_cell_at_dt64ns_tz_arrow(
23232450
raise c_err_to_py(err)
23242451

23252452

2453+
cdef void_int _dataframe_serialize_cell_at_dt64us_tz_arrow(
2454+
line_sender_buffer* ls_buf,
2455+
qdb_pystr_buf* b,
2456+
col_t* col,
2457+
PyThreadState** gs) except -1:
2458+
cdef line_sender_error* err = NULL
2459+
cdef bint valid = _dataframe_arrow_is_valid(&col.cursor)
2460+
cdef int64_t* access
2461+
cdef int64_t cell
2462+
if valid:
2463+
access = <int64_t*>col.cursor.chunk.buffers[1]
2464+
cell = access[col.cursor.offset]
2465+
# Note: ls_buf will validate against negative numbers.
2466+
if not line_sender_buffer_at_micros(ls_buf, cell, &err):
2467+
_ensure_has_gil(gs)
2468+
raise c_err_to_py(err)
2469+
else:
2470+
if not line_sender_buffer_at_now(ls_buf, &err):
2471+
_ensure_has_gil(gs)
2472+
raise c_err_to_py(err)
2473+
2474+
23262475
cdef void_int _dataframe_serialize_cell(
23272476
line_sender_buffer* ls_buf,
23282477
qdb_pystr_buf* b,
@@ -2421,6 +2570,8 @@ cdef void_int _dataframe_serialize_cell(
24212570
_dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs)
24222571
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy:
24232572
_dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs)
2573+
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_numpy:
2574+
_dataframe_serialize_cell_column_ts__dt64us_numpy(ls_buf, b, col, gs)
24242575
elif dc == col_dispatch_code_t.col_dispatch_code_column_arr_f64__arr_f64_numpyobj:
24252576
_dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj(ls_buf, b, col)
24262577
elif dc == col_dispatch_code_t.col_dispatch_code_column_decimal__decimal_pyobj:
@@ -2435,10 +2586,16 @@ cdef void_int _dataframe_serialize_cell(
24352586
_dataframe_serialize_cell_column_decimal__decimal256_arrow(ls_buf, b, col, gs)
24362587
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow:
24372588
_dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs)
2589+
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64us_tz_arrow:
2590+
_dataframe_serialize_cell_column_ts__dt64us_tz_arrow(ls_buf, b, col, gs)
24382591
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy:
24392592
_dataframe_serialize_cell_at_dt64ns_numpy(ls_buf, b, col, gs)
2593+
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_numpy:
2594+
_dataframe_serialize_cell_at_dt64us_numpy(ls_buf, b, col, gs)
24402595
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_tz_arrow:
24412596
_dataframe_serialize_cell_at_dt64ns_tz_arrow(ls_buf, b, col, gs)
2597+
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64us_tz_arrow:
2598+
_dataframe_serialize_cell_at_dt64us_tz_arrow(ls_buf, b, col, gs)
24422599
else:
24432600
_ensure_has_gil(gs)
24442601
raise RuntimeError(f"Unknown column dispatch code: {dc}")

0 commit comments

Comments
 (0)