Skip to content

Commit 31f2972

Browse files
committed
feat: add BufferV3 and support for DECIMAL column values in IBuffer interface
1 parent 008c5ed commit 31f2972

File tree

5 files changed

+182
-46
lines changed

5 files changed

+182
-46
lines changed

src/net-questdb-client/Buffers/Buffer.cs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,13 @@ public static class Buffer
4242
/// <exception cref="NotImplementedException"></exception>
4343
public static IBuffer Create(int bufferSize, int maxNameLen, int maxBufSize, ProtocolVersion version)
4444
{
45-
switch (version)
45+
return version switch
4646
{
47-
case ProtocolVersion.V1:
48-
return new BufferV1(bufferSize, maxNameLen, maxBufSize);
49-
case ProtocolVersion.V2:
50-
case ProtocolVersion.Auto:
51-
return new BufferV2(bufferSize, maxNameLen, maxBufSize);
52-
}
53-
54-
throw new NotImplementedException();
47+
ProtocolVersion.V1 => new BufferV1(bufferSize, maxNameLen, maxBufSize),
48+
ProtocolVersion.V2 => new BufferV2(bufferSize, maxNameLen, maxBufSize),
49+
ProtocolVersion.V3 => new BufferV3(bufferSize, maxNameLen, maxBufSize),
50+
ProtocolVersion.Auto => new BufferV2(bufferSize, maxNameLen, maxBufSize),
51+
_ => throw new NotImplementedException(),
52+
};
5553
}
56-
}
54+
}

src/net-questdb-client/Buffers/BufferV1.cs

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,17 @@ public void AtNanos(long timestampNanos)
136136
public void Clear()
137137
{
138138
_currentBufferIndex = 0;
139-
Chunk = _buffers[_currentBufferIndex].Buffer;
139+
Chunk = _buffers[_currentBufferIndex].Buffer;
140140
for (var i = 0; i < _buffers.Count; i++)
141141
{
142142
_buffers[i] = (_buffers[i].Buffer, 0);
143143
}
144144

145-
Position = 0;
146-
RowCount = 0;
147-
Length = 0;
148-
WithinTransaction = false;
149-
_currentTableName = "";
145+
Position = 0;
146+
RowCount = 0;
147+
Length = 0;
148+
WithinTransaction = false;
149+
_currentTableName = "";
150150
_lineStartBufferIndex = 0;
151151
_lineStartBufferPosition = 0;
152152
}
@@ -164,10 +164,10 @@ public void TrimExcessBuffers()
164164
/// <inheritdoc />
165165
public void CancelRow()
166166
{
167-
_currentBufferIndex = _lineStartBufferIndex;
168-
Length -= Position - _lineStartBufferPosition;
169-
Position = _lineStartBufferPosition;
170-
_hasTable = false;
167+
_currentBufferIndex = _lineStartBufferIndex;
168+
Length -= Position - _lineStartBufferPosition;
169+
Position = _lineStartBufferPosition;
170+
_hasTable = false;
171171
}
172172

173173
/// <inheritdoc />
@@ -236,10 +236,10 @@ public IBuffer Table(ReadOnlySpan<char> name)
236236
GuardTableAlreadySet();
237237
GuardInvalidTableName(name);
238238

239-
_quoted = false;
239+
_quoted = false;
240240
_hasTable = true;
241241

242-
_lineStartBufferIndex = _currentBufferIndex;
242+
_lineStartBufferIndex = _currentBufferIndex;
243243
_lineStartBufferPosition = Position;
244244

245245
EncodeUtf8(name);
@@ -394,14 +394,14 @@ public IBuffer Put(long value)
394394
new ArgumentOutOfRangeException());
395395
}
396396

397-
Span<byte> num = stackalloc byte[20];
398-
var pos = num.Length;
399-
var remaining = Math.Abs(value);
397+
Span<byte> num = stackalloc byte[20];
398+
var pos = num.Length;
399+
var remaining = Math.Abs(value);
400400
do
401401
{
402402
var digit = remaining % 10;
403-
num[--pos] = (byte)('0' + digit);
404-
remaining /= 10;
403+
num[--pos] = (byte)('0' + digit);
404+
remaining /= 10;
405405
} while (remaining != 0);
406406

407407
if (value < 0)
@@ -414,7 +414,7 @@ public IBuffer Put(long value)
414414

415415
num.Slice(pos, len).CopyTo(Chunk.AsSpan(Position));
416416
Position += len;
417-
Length += len;
417+
Length += len;
418418

419419
return this;
420420
}
@@ -457,7 +457,7 @@ public IBuffer Put(byte value)
457457
internal void Advance(int by)
458458
{
459459
Position += by;
460-
Length += by;
460+
Length += by;
461461
}
462462

463463
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -474,8 +474,8 @@ private void FinishLine()
474474
{
475475
PutAscii('\n');
476476
RowCount++;
477-
_hasTable = false;
478-
_noFields = true;
477+
_hasTable = false;
478+
_noFields = true;
479479
_noSymbols = true;
480480
GuardExceededMaxBufferSize();
481481
}
@@ -540,9 +540,9 @@ private void PutUtf8(char c)
540540
NextBuffer();
541541
}
542542

543-
var bytes = Chunk.AsSpan(Position);
544-
Span<char> chars = stackalloc char[1] { c, };
545-
var byteLength = Encoding.UTF8.GetBytes(chars, bytes);
543+
var bytes = Chunk.AsSpan(Position);
544+
Span<char> chars = stackalloc char[1] { c, };
545+
var byteLength = Encoding.UTF8.GetBytes(chars, bytes);
546546
Advance(byteLength);
547547
}
548548

@@ -789,4 +789,9 @@ private void GuardFsFileNameLimit(ReadOnlySpan<char> name)
789789
$"Name is too long, must be under {_maxNameLen} bytes.");
790790
}
791791
}
792+
793+
public virtual IBuffer Column(ReadOnlySpan<char> name, decimal? value)
794+
{
795+
throw new IngressError(ErrorCode.ProtocolVersionError, "Protocol Version does not support DECIMAL types");
796+
}
792797
}

src/net-questdb-client/Buffers/BufferV2.cs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ private void PutBinaryLE<T>(T value) where T : struct
9595
{
9696
var size = Marshal.SizeOf<T>();
9797
EnsureCapacity(size);
98-
var length = Marshal.SizeOf<T>();
99-
var mem = MemoryMarshal.Cast<byte, T>(Chunk.AsSpan(Position, length));
98+
var mem = MemoryMarshal.Cast<byte, T>(Chunk.AsSpan(Position, size));
10099
mem[0] = value;
101-
Advance(length);
100+
Advance(size);
102101
}
103102

104103
// ReSharper disable once InconsistentNaming
@@ -114,16 +113,16 @@ private void PutBinaryBE<T>(T value) where T : struct
114113
// ReSharper disable once InconsistentNaming
115114
private void PutBinaryManyLE<T>(ReadOnlySpan<T> value) where T : struct
116115
{
117-
var srcSpan = MemoryMarshal.Cast<T, byte>(value);
116+
var srcSpan = MemoryMarshal.Cast<T, byte>(value);
118117
var byteSize = Marshal.SizeOf<T>();
119118

120119
while (srcSpan.Length > 0)
121120
{
122-
var dstLength = GetSpareCapacity(); // length
121+
var dstLength = GetSpareCapacity(); // length
123122
if (dstLength < byteSize)
124123
{
125124
NextBuffer();
126-
dstLength = GetSpareCapacity();
125+
dstLength = GetSpareCapacity();
127126
}
128127
var availLength = dstLength - dstLength % byteSize; // rounded length
129128

@@ -133,7 +132,7 @@ private void PutBinaryManyLE<T>(ReadOnlySpan<T> value) where T : struct
133132
Advance(srcSpan.Length);
134133
return;
135134
}
136-
var dstSpan = Chunk.AsSpan(Position, availLength);
135+
var dstSpan = Chunk.AsSpan(Position, availLength);
137136
srcSpan.Slice(0, availLength).CopyTo(dstSpan);
138137
Advance(availLength);
139138
srcSpan = srcSpan.Slice(availLength);
@@ -180,7 +179,7 @@ public override IBuffer Column<T>(ReadOnlySpan<char> name, ReadOnlySpan<T> value
180179
return PutDoubleArray(name, value);
181180
}
182181

183-
private IBuffer PutDoubleArray<T>(ReadOnlySpan<char> name, ReadOnlySpan<T> value) where T : struct
182+
private IBuffer PutDoubleArray<T>(ReadOnlySpan<char> name, ReadOnlySpan<T> value) where T : struct
184183
{
185184
SetTableIfAppropriate();
186185
PutArrayOfDoubleHeader(name);
@@ -199,15 +198,15 @@ public override IBuffer Column(ReadOnlySpan<char> name, Array? value)
199198
// The value is null, do not include the column in the message
200199
return this;
201200
}
202-
201+
203202
var type = value.GetType().GetElementType();
204203
GuardAgainstNonDoubleTypes(type ?? throw new InvalidOperationException());
205204
if (value.Rank == 1)
206205
{
207206
// Fast path, one dim array
208207
return PutDoubleArray(name, (ReadOnlySpan<double>)value!);
209208
}
210-
209+
211210
SetTableIfAppropriate();
212211
PutArrayOfDoubleHeader(name);
213212

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*******************************************************************************
2+
* ___ _ ____ ____
3+
* / _ \ _ _ ___ ___| |_| _ \| __ )
4+
* | | | | | | |/ _ \/ __| __| | | | _ \
5+
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
6+
* \__\_\\__,_|\___||___/\__|____/|____/
7+
*
8+
* Copyright (c) 2014-2019 Appsicle
9+
* Copyright (c) 2019-2024 QuestDB
10+
*
11+
* Licensed under the Apache License, Version 2.0 (the "License");
12+
* you may not use this file except in compliance with the License.
13+
* You may obtain a copy of the License at
14+
*
15+
* http://www.apache.org/licenses/LICENSE-2.0
16+
*
17+
* Unless required by applicable law or agreed to in writing, software
18+
* distributed under the License is distributed on an "AS IS" BASIS,
19+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20+
* See the License for the specific language governing permissions and
21+
* limitations under the License.
22+
*
23+
******************************************************************************/
24+
25+
using System.Buffers.Binary;
26+
using System.Runtime.InteropServices;
27+
using QuestDB.Enums;
28+
using QuestDB.Utils;
29+
30+
namespace QuestDB.Buffers;
31+
32+
/// <summary />
33+
public class BufferV3 : BufferV2
34+
{
35+
/// <summary />
36+
public BufferV3(int bufferSize, int maxNameLen, int maxBufSize) : base(bufferSize, maxNameLen, maxBufSize)
37+
{
38+
}
39+
40+
// Sign mask for the flags field. A value of zero in this bit indicates a
41+
// positive Decimal value, and a value of one in this bit indicates a
42+
// negative Decimal value.
43+
private const int SignMask = unchecked((int)0x80000000);
44+
45+
// Scale mask for the flags field. This byte in the flags field contains
46+
// the power of 10 to divide the Decimal value by. The scale byte must
47+
// contain a value between 0 and 28 inclusive.
48+
private const int ScaleMask = 0x00FF0000;
49+
50+
// Number of bits scale is shifted by.
51+
private const int ScaleShift = 16;
52+
53+
/// <inheritdoc />
54+
public override IBuffer Column(ReadOnlySpan<char> name, decimal? value)
55+
{
56+
// # Binary Format
57+
// 1. Binary format marker: `'='` (0x3D)
58+
// 2. Type identifier: BinaryFormatType.DECIMAL byte
59+
// 3. Scale: 1 byte (0-76 inclusive) - number of decimal places
60+
// 4. Length: 1 byte - number of bytes in the unscaled value
61+
// 5. Unscaled value: variable-length byte array in two's complement format, big-endian
62+
SetTableIfAppropriate();
63+
Column(name)
64+
.PutAscii(Constants.BINARY_FORMAT_FLAG)
65+
.Put((byte)BinaryFormatType.DECIMAL);
66+
if (value is null)
67+
{
68+
Put((byte)0); // Scale
69+
Put((byte)0); // Length
70+
return this;
71+
}
72+
73+
Span<int> parts = stackalloc int[4];
74+
decimal.GetBits(value.Value, parts);
75+
76+
int flags = parts[3];
77+
byte scale = (byte)((flags & ScaleMask) >> ScaleShift);
78+
79+
// 3. Scale
80+
Put(scale);
81+
82+
int low = parts[0];
83+
int mid = parts[1];
84+
int high = parts[2];
85+
bool bitSign = false;
86+
bool negative = (flags & SignMask) != 0;
87+
88+
if (negative)
89+
{
90+
// QuestDB expects negative mantissas in two's complement.
91+
low = ~low + 1;
92+
int c = low == 0 ? 1 : 0;
93+
mid = ~mid + c;
94+
c = mid == 0 && c == 1 ? 1 : 0;
95+
high = ~high + c;
96+
// We may overflow, we need an extra byte to convey the sign.
97+
bitSign = high == 0 && c == 1;
98+
}
99+
else if ((high & 0x80000000) != 0)
100+
{
101+
// If the highest bit is set, we need an extra byte of 0 to convey the sign.
102+
bitSign = true;
103+
}
104+
105+
var size = bitSign ? 13 : 12;
106+
107+
// 4. Length
108+
Put((byte)size);
109+
110+
// 5. Unscaled value
111+
EnsureCapacity(size);
112+
var span = Chunk.AsSpan(Position, size);
113+
var offset = 0;
114+
if (bitSign)
115+
{
116+
span[offset++] = (byte)(negative ? 255 : 0);
117+
}
118+
BinaryPrimitives.WriteInt32BigEndian(span.Slice(offset, 4), high);
119+
offset += 4;
120+
BinaryPrimitives.WriteInt32BigEndian(span.Slice(offset, 4), mid);
121+
offset += 4;
122+
BinaryPrimitives.WriteInt32BigEndian(span.Slice(offset, 4), low);
123+
Advance(size);
124+
125+
return this;
126+
}
127+
}

src/net-questdb-client/Buffers/IBuffer.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,14 @@ public interface IBuffer
230230

231231
/// <summary />
232232
public IBuffer Column(ReadOnlySpan<char> name, Array? value);
233-
233+
234234
/// <summary />
235235
public IBuffer Column<T>(ReadOnlySpan<char> name, IEnumerable<T> value, IEnumerable<int> shape) where T : struct;
236-
}
236+
237+
/// <summary>
238+
/// Records a DECIMAL column value using the ILP binary decimal layout:
239+
/// '=' marker, decimal type id (23), scale byte, mantissa length, and a big-endian
240+
/// two's complement mantissa sourced from <see cref="decimal.GetBits(decimal)" />.
241+
/// </summary>
242+
public IBuffer Column(ReadOnlySpan<char> name, decimal? value);
243+
}

0 commit comments

Comments
 (0)