Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
dc15bfb
feat: support decimal
RaphDal Oct 21, 2025
c186a73
refactor: remove unused constant for decimal type code
RaphDal Oct 21, 2025
6c3459f
docs: clarify comment on NewScaledDecimal regarding nil/empty unscale…
RaphDal Oct 22, 2025
f486bab
refactor: simplify protocol version handling in integration tests
RaphDal Oct 22, 2025
100bba6
fix: update error message for decimal value size limit
RaphDal Oct 22, 2025
5a3e29b
fix: improve validation for decimal text exponent handling
RaphDal Oct 22, 2025
b2e697e
fix: handle null decimals and improve error reporting in DecimalColumn
RaphDal Oct 28, 2025
a1c3981
fix: remove unnecessary check for mantissa digits in decimal validation
RaphDal Oct 28, 2025
d2b6670
refactor: rework decimal integration to conform to other types
RaphDal Oct 29, 2025
a7f4c07
fix: handle null decimals in DecimalColumn methods to prevent writing…
RaphDal Oct 29, 2025
b6e5de3
fix: adjust twos complement return value for zero in bigIntToTwosComp…
RaphDal Oct 29, 2025
f3222af
fix: correct method name in LineSender interface documentation
RaphDal Oct 29, 2025
8405773
fix: rename IsNull method to isNull for consistency in ScaledDecimal
RaphDal Oct 29, 2025
8e4947f
Add decimal serialization benchmark
puzpuzpuz Nov 10, 2025
0f8349b
refactor: cleanup decimal method names
RaphDal Nov 11, 2025
4746449
refactor: rename ScaledDecimal to Decimal for consistency across the …
RaphDal Nov 11, 2025
e777a3b
Merge branch 'rd_decimal' of github.com:questdb/go-questdb-client int…
RaphDal Nov 11, 2025
36d7070
fix: improve bigIntToTwosComplement to handle large values and normal…
RaphDal Nov 11, 2025
2d84a90
refactor: update decimal column methods in benchmark for consistency
RaphDal Nov 11, 2025
a2c068d
refactor: update decimal column type from ScaledDecimal to Decimal fo…
RaphDal Nov 11, 2025
6677246
refactor: rename DecimalColumn method to DecimalColumnFromString for …
RaphDal Nov 11, 2025
7ebdb94
test: add unit tests for NewDecimalUnsafe function to validate behavior
RaphDal Nov 11, 2025
e8d4fa1
fix: use allocation-free conversion of negative big.Int to two's comp…
RaphDal Nov 13, 2025
04d4bdc
refactor: simplify bigIntToTwosComplement by removing redundant sign …
RaphDal Nov 13, 2025
d7e05d5
Update container version
puzpuzpuz Nov 14, 2025
cf9cfb7
Fix null element type code
puzpuzpuz Nov 14, 2025
f3762c6
Improve tests
puzpuzpuz Nov 14, 2025
187208f
Fix more tests
puzpuzpuz Nov 14, 2025
60fb338
Remove null array serialization and skip the column instead
puzpuzpuz Nov 14, 2025
e7e5545
Relax validation
puzpuzpuz Nov 14, 2025
788eb0b
More fixes
puzpuzpuz Nov 14, 2025
21e47e2
Speed up integration test
puzpuzpuz Nov 14, 2025
be59b9a
Pre-create decimal columns
puzpuzpuz Nov 14, 2025
f5d1d9e
Add missing nils
puzpuzpuz Nov 14, 2025
c222868
Add missing column definition
puzpuzpuz Nov 14, 2025
d620051
More test fixes
puzpuzpuz Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,36 @@ will now also serialize ``float64`` (double-precision) columns as binary.
You might see a performance uplift if this is a dominant data type in your
ingestion workload.

## Decimal columns

QuestDB server version 9.2.0 and newer supports decimal columns with arbitrary precision and scale.
The Go client converts supported decimal values to QuestDB's text/binary wire format automatically:

- `DecimalColumnScaled`: `questdb.ScaledDecimal`, including helpers like `questdb.NewDecimalFromInt64` and `questdb.NewDecimal`.
- `DecimalColumnShopspring`: `github.com/shopspring/decimal.Decimal` values or pointers.
- `DecimalColumnString`: `string` literals representing decimal values (validated at runtime).

```go
price := qdb.NewDecimalFromInt64(12345, 2) // 123.45 with scale 2
commission := qdb.NewDecimal(big.NewInt(-750), 4) // -0.0750 with scale 4

err = sender.
Table("trades").
Symbol("symbol", "ETH-USD").
DecimalColumnScaled("price", price).
DecimalColumnScaled("commission", commission).
AtNow(ctx)
```

To emit textual decimals, pass a validated string literal:

```go
err = sender.
Table("quotes").
DecimalColumnString("mid", "1.23456").
AtNow(ctx)
```

## Pooled Line Senders

**Warning: Experimental feature designed for use with HTTP senders ONLY**
Expand Down
64 changes: 64 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,70 @@ func (b *buffer) Float64Column(name string, val float64) *buffer {
return b
}

func (b *buffer) DecimalColumnScaled(name string, val ScaledDecimal) *buffer {
if !b.prepareForField() {
return b
}
return b.decimalColumnScaled(name, val)
}

func (b *buffer) decimalColumnScaled(name string, val ScaledDecimal) *buffer {
if err := val.ensureValidScale(); err != nil {
b.lastErr = err
return b
}
if val.IsNull() {
// Don't write null decimals
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}
b.WriteByte('=')
b.WriteByte('=')
b.WriteByte(decimalBinaryTypeCode)
b.WriteByte((uint8)(val.scale))
b.WriteByte(32 - val.offset)
b.Write(val.unscaled[val.offset:])
b.hasFields = true
return b
}

func (b *buffer) DecimalColumnString(name string, val string) *buffer {
if !b.prepareForField() {
return b
}
if err := validateDecimalText(val); err != nil {
b.lastErr = err
return b
}
b.lastErr = b.writeColumnName(name)
if b.lastErr != nil {
return b
}
b.WriteByte('=')
b.WriteString(val)
b.WriteByte('d')
b.hasFields = true
return b
}

func (b *buffer) DecimalColumnShopspring(name string, val ShopspringDecimal) *buffer {
if !b.prepareForField() {
return b
}
if val == nil {
return b
}
dec, err := convertShopspringDecimal(val)
if err != nil {
b.lastErr = err
return b
}
return b.decimalColumnScaled(name, dec)
}

func (b *buffer) Float64ColumnBinary(name string, val float64) *buffer {
if !b.prepareForField() {
return b
Expand Down
242 changes: 242 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ import (

type bufWriterFn func(b *qdb.Buffer) error

type fakeShopspringDecimal struct {
coeff *big.Int
exp int32
}

func (f fakeShopspringDecimal) Coefficient() *big.Int {
return f.coeff
}

func (f fakeShopspringDecimal) Exponent() int32 {
return f.exp
}

func newTestBuffer() qdb.Buffer {
return qdb.NewBuffer(128*1024, 1024*1024, 127)
}
Expand Down Expand Up @@ -481,6 +494,235 @@ func TestFloat64ColumnBinary(t *testing.T) {
}
}

func TestDecimalColumnScaled(t *testing.T) {
negative, err := qdb.NewDecimal(big.NewInt(-12345), 3)
assert.NoError(t, err)

prefix := []byte(testTable + " price==")
testCases := []struct {
name string
value qdb.ScaledDecimal
expected []byte
}{
{
name: "positive",
value: qdb.NewDecimalFromInt64(12345, 2),
expected: append(prefix, 0x17, 0x02, 0x02, 0x30, 0x39, 0x0A),
},
{
name: "negative",
value: negative,
expected: append(prefix, 0x17, 0x03, 0x02, 0xCF, 0xC7, 0x0A),
},
{
name: "zero with scale",
value: qdb.NewDecimalFromInt64(0, 4),
expected: append(prefix, 0x17, 0x04, 0x01, 0x0, 0x0A),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnScaled("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)
assert.Equal(t, tc.expected, buf.Messages())
})
}
}

func TestDecimalColumnScaledTrimmingAndPadding(t *testing.T) {
prefix := []byte(testTable + " price==")

testCases := []struct {
name string
value qdb.ScaledDecimal
expectedBytes []byte
}{
{
name: "127 boundary",
value: qdb.NewDecimalFromInt64(127, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0x7F},
},
{
name: "128 sign extension",
value: qdb.NewDecimalFromInt64(128, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0x00, 0x80},
},
{
name: "255 sign extension",
value: qdb.NewDecimalFromInt64(255, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0x00, 0xFF},
},
{
name: "32768 sign extension",
value: qdb.NewDecimalFromInt64(32768, 0),
expectedBytes: []byte{0x17, 0x00, 0x03, 0x00, 0x80, 0x00},
},
{
name: "-1",
value: qdb.NewDecimalFromInt64(-1, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0xFF},
},
{
name: "-2",
value: qdb.NewDecimalFromInt64(-2, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0xFE},
},
{
name: "-127",
value: qdb.NewDecimalFromInt64(-127, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0x81},
},
{
name: "-128",
value: qdb.NewDecimalFromInt64(-128, 0),
expectedBytes: []byte{0x17, 0x00, 0x01, 0x80},
},
{
name: "-129",
value: qdb.NewDecimalFromInt64(-129, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0xFF, 0x7F},
},
{
name: "-256 sign extension",
value: qdb.NewDecimalFromInt64(-256, 0),
expectedBytes: []byte{0x17, 0x00, 0x02, 0xFF, 0x00},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()

err := buf.Table(testTable).DecimalColumnScaled("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)

expected := append(append([]byte{}, prefix...), tc.expectedBytes...)
expected = append(expected, '\n')
assert.Equal(t, expected, buf.Messages())
})
}
}

func TestDecimalColumnShopspring(t *testing.T) {
prefix := []byte(testTable + " price==")

testCases := []struct {
name string
value fakeShopspringDecimal
expectedBytes []byte
}{
{
name: "negative exponent scales value",
value: fakeShopspringDecimal{coeff: big.NewInt(12345), exp: -2},
expectedBytes: []byte{0x17, 0x02, 0x02, 0x30, 0x39},
},
{
name: "positive exponent multiplies coefficient",
value: fakeShopspringDecimal{coeff: big.NewInt(123), exp: 2},
expectedBytes: []byte{0x17, 0x00, 0x02, 0x30, 0x0C},
},
{
name: "positive value sign extension",
value: fakeShopspringDecimal{coeff: big.NewInt(128), exp: 0},
expectedBytes: []byte{0x17, 0x00, 0x02, 0x00, 0x80},
},
{
name: "negative value sign extension",
value: fakeShopspringDecimal{coeff: big.NewInt(-12345), exp: -3},
expectedBytes: []byte{0x17, 0x03, 0x02, 0xCF, 0xC7},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()

err := buf.Table(testTable).DecimalColumnShopspring("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)

expected := append(append([]byte{}, prefix...), tc.expectedBytes...)
expected = append(expected, '\n')
assert.Equal(t, expected, buf.Messages())
})
}
}

func TestDecimalColumnStringValidation(t *testing.T) {
t.Run("valid strings", func(t *testing.T) {
testCases := []struct {
name string
value string
expected string
}{
{"integer", "123", "123d"},
{"decimal", "123.450", "123.450d"},
{"negative", "-0.001", "-0.001d"},
{"exponent positive", "1.2e3", "1.2e3d"},
{"exponent negative", "-4.5E-2", "-4.5E-2d"},
{"nan token", "NaN", "NaNd"},
{"infinity token", "Infinity", "Infinityd"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnString("price", tc.value).At(time.Time{}, false)
assert.NoError(t, err)
expected := []byte(testTable + " price=" + tc.expected + "\n")
assert.Equal(t, expected, buf.Messages())
})
}
})

t.Run("invalid strings", func(t *testing.T) {
testCases := []struct {
name string
value string
}{
{"empty", ""},
{"sign only", "+"},
{"double dot", "12.3.4"},
{"invalid char", "12a3"},
{"exponent missing mantissa", "e10"},
{"exponent no digits", "1.2e"},
{"exponent sign no digits", "1.2e+"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnString("price", tc.value).At(time.Time{}, false)
assert.Error(t, err)
assert.Contains(t, err.Error(), "decimal")
assert.Empty(t, buf.Messages())
})
}
})
}

func TestDecimalColumnErrors(t *testing.T) {
t.Run("invalid scale", func(t *testing.T) {
buf := newTestBuffer()
dec := qdb.NewDecimalFromInt64(1, 100)
err := buf.Table(testTable).DecimalColumnScaled("price", dec).At(time.Time{}, false)
assert.ErrorContains(t, err, "decimal scale")
assert.Empty(t, buf.Messages())
})

t.Run("overflow", func(t *testing.T) {
bigVal := new(big.Int).Lsh(big.NewInt(1), 2100)
_, err := qdb.NewDecimal(bigVal, 0)
assert.ErrorContains(t, err, "exceeds 32 bytes")
})

t.Run("no column", func(t *testing.T) {
buf := newTestBuffer()
err := buf.Table(testTable).DecimalColumnShopspring("price", nil).At(time.Time{}, false)
assert.ErrorContains(t, err, "no symbols or columns were provided: invalid message")
assert.Empty(t, buf.Messages())
})
}

func TestFloat64Array1DColumn(t *testing.T) {
testCases := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions conf_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func confFromStr(conf string) (*lineSenderConfig, error) {
return nil, NewInvalidConfigStrError("invalid %s value, %q is not a valid int", k, v)
}
pVersion := protocolVersion(version)
if pVersion < ProtocolVersion1 || pVersion > ProtocolVersion2 {
return nil, NewInvalidConfigStrError("current client only supports protocol version 1 (text format for all datatypes), 2 (binary format for part datatypes) or explicitly unset")
if pVersion < ProtocolVersion1 || pVersion > ProtocolVersion3 {
return nil, NewInvalidConfigStrError("current client only supports protocol version 1 (text format for all datatypes), 2 (binary format for part datatypes), 3 (decimals) or explicitly unset")
}
senderConf.protocolVersion = pVersion
}
Expand Down
Loading
Loading