Skip to content

Commit d6e06ab

Browse files
Fixed issue with chunked rowsets when data packets are coalesced
1 parent 66aa5b5 commit d6e06ab

File tree

10 files changed

+133
-70
lines changed

10 files changed

+133
-70
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# https://bun.sh/guides/ecosystem/docker
12
# docker build -t sqlitcloud-gateway .
23
# docker run -p 8090:8090 -p 4000:4000 --name sqlitecloud-gateway sqlitecloud-gateway
34

package-lock.json

Lines changed: 27 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
"@typescript-eslint/eslint-plugin": "^4.22.0",
6262
"@typescript-eslint/parser": "^4.22.0",
6363
"dotenv": "^16.4.1",
64+
"dotenv-cli": "^7.3.0",
6465
"eslint": "^7.32.0",
6566
"eslint-config-prettier": "^8.10.0",
6667
"eslint-plugin-node": "^11.1.0",

public/index.html

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ <h2>Messages:</h2>
9191
const sql = messageInput.value
9292
const startTime = Date.now()
9393
socket.emit('v1/sql', { sql }, response => {
94-
console.debug(`sql - sql: ${sql}, elapsed: ${Date.now() - startTime}`, response)
94+
console.debug(`sql - sql: ${sql} (${Date.now() - startTime}ms)`, response)
9595
appendMessage(`sql | ${JSON.stringify(response)}`)
9696
})
9797
})
@@ -100,11 +100,10 @@ <h2>Messages:</h2>
100100
if (!socket) {
101101
socket = setupSocket()
102102
}
103-
socket.emit('v1/info', { }, response => {
104-
appendMessage(`info | rss: ${(response.data.rss/1024/1024).toFixed(2)}mb, ${JSON.stringify(response)}`)
103+
socket.emit('v1/info', {}, response => {
104+
appendMessage(`info | rss: ${(response.data.rss / 1024 / 1024).toFixed(2)}mb, ${JSON.stringify(response)}`)
105105
})
106106
})
107-
108-
</script>
107+
</script>
109108
</body>
110109
</html>

src/drivers/connection-tls.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,17 @@
44

55
import { SQLiteCloudConfig, SQLiteCloudError, ErrorCallback, ResultsCallback } from './types'
66
import { SQLiteCloudConnection } from './connection'
7-
import { formatCommand, hasCommandLength, parseCommandLength, popData, decompressBuffer, CMD_COMPRESSED, CMD_ROWSET_CHUNK } from './protocol'
7+
import {
8+
formatCommand,
9+
hasCommandLength,
10+
parseCommandLength,
11+
popData,
12+
decompressBuffer,
13+
CMD_COMPRESSED,
14+
CMD_ROWSET_CHUNK,
15+
bufferEndsWith,
16+
ROWSET_CHUNKS_END
17+
} from './protocol'
818
import { getInitializationCommands, anonimizeError, anonimizeCommand } from './utilities'
919
import { parseRowsetChunks } from './protocol'
1020

@@ -181,19 +191,15 @@ export class SQLiteCloudTlsConnection extends SQLiteCloudConnection {
181191
const { data } = popData(buffer)
182192
finish(null, data)
183193
} else {
184-
// @ts-expect-error
185194
// check if rowset received the ending chunk
186-
if (data.subarray(data.indexOf(' ') + 1, data.length).toString() === '0 0 0 ') {
195+
if (bufferEndsWith(buffer, ROWSET_CHUNKS_END)) {
196+
rowsetChunks.push(buffer)
187197
const parsedData = parseRowsetChunks(rowsetChunks)
188198
finish?.call(this, null, parsedData)
189199
} else {
190200
// no ending string? ask server for another chunk
191201
rowsetChunks.push(buffer)
192202
buffer = Buffer.alloc(0)
193-
194-
// no longer need to ack the server
195-
// const okCommand = formatCommand('OK')
196-
// this.socket?.write(okCommand)
197203
}
198204
}
199205
}

src/drivers/protocol.ts

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import { SQLiteCloudError, type SQLCloudRowsetMetadata, type SQLiteCloudDataTypes } from './types'
66
import { SQLiteCloudRowset } from './rowset'
7+
import { debug } from 'console'
78

89
const lz4 = require('lz4js')
910

@@ -28,6 +29,10 @@ export const CMD_ARRAY = '='
2829
// const CMD_PUBSUB = '|'
2930
// const CMD_RECONNECT = '@'
3031

32+
// To mark the end of the Rowset, the special string /LEN 0 0 0 is sent (LEN is always 6 in this case)
33+
// https://github.com/sqlitecloud/sdk/blob/master/PROTOCOL.md#scsp-rowset-chunk
34+
export const ROWSET_CHUNKS_END = '/6 0 0 0 '
35+
3136
//
3237
// utility functions
3338
//
@@ -190,22 +195,34 @@ function parseRowset(buffer: Buffer, spaceIndex: number): SQLiteCloudRowset {
190195
return new SQLiteCloudRowset(metadata, data)
191196
}
192197

198+
export function bufferStartsWith(buffer: Buffer, prefix: string): boolean {
199+
return buffer.length >= prefix.length && buffer.subarray(0, prefix.length).toString('utf8') === prefix
200+
}
201+
202+
export function bufferEndsWith(buffer: Buffer, suffix: string): boolean {
203+
return buffer.length >= suffix.length && buffer.subarray(buffer.length - suffix.length, buffer.length).toString('utf8') === suffix
204+
}
205+
193206
/**
194207
* Parse a chunk of a chunked rowset command, eg:
195208
* *LEN 0:VERS NROWS NCOLS DATA
209+
* @see https://github.com/sqlitecloud/sdk/blob/master/PROTOCOL.md#scsp-rowset-chunk
196210
*/
197211
export function parseRowsetChunks(buffers: Buffer[]): SQLiteCloudRowset {
212+
let buffer = Buffer.concat(buffers)
213+
if (!bufferStartsWith(buffer, CMD_ROWSET_CHUNK) || !bufferEndsWith(buffer, ROWSET_CHUNKS_END)) {
214+
throw new Error('SQLiteCloudConnection.parseRowsetChunks - invalid chunks buffer')
215+
}
216+
198217
let metadata: SQLCloudRowsetMetadata = { version: 1, numberOfColumns: 0, numberOfRows: 0, columns: [] }
199218
const data: any[] = []
200219

201-
for (let i = 0; i < buffers.length; i++) {
202-
let buffer: Buffer = buffers[i]
203-
204-
// validate and skip data type
205-
const dataType = buffer.subarray(0, 1).toString()
206-
console.assert(dataType === CMD_ROWSET_CHUNK)
207-
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
220+
// validate and skip data type
221+
const dataType = buffer.subarray(0, 1).toString()
222+
console.assert(dataType === CMD_ROWSET_CHUNK)
223+
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
208224

225+
while (true) {
209226
// chunk header, eg: 0:VERS NROWS NCOLS
210227
const { index: chunkIndex, metadata: chunkMetadata, fwdBuffer } = parseRowsetHeader(buffer)
211228
buffer = fwdBuffer
@@ -224,10 +241,17 @@ export function parseRowsetChunks(buffers: Buffer[]): SQLiteCloudRowset {
224241
data.push(itemData)
225242
buffer = fwdBuffer
226243
}
244+
245+
// no more chunks?
246+
if (bufferStartsWith(buffer, ROWSET_CHUNKS_END)) {
247+
break
248+
}
227249
}
228250

229-
console.assert(data && data.length === metadata.numberOfRows * metadata.numberOfColumns, 'SQLiteCloudConnection.parseRowsetChunks - invalid rowset data')
230-
return new SQLiteCloudRowset(metadata, data)
251+
console.assert(data && data.length === metadata.numberOfRows * metadata.numberOfColumns, 'parseRowsetChunks - invalid rowset data')
252+
const rowset = new SQLiteCloudRowset(metadata, data)
253+
// console.debug(`parseRowsetChunks - ${rowset.numberOfRows} rows, ${rowset.numberOfColumns} columns`)
254+
return rowset
231255
}
232256

233257
/** Pop one or more space separated integers from beginning of buffer, move buffer forward */

src/gateway/connection-bun.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import { SQLiteCloudError } from '../drivers/types'
99
import { SQLiteCloudBunConnection } from './connection-bun'
1010

11-
const CHINOOK_DATABASE_URL = process.env['CHINOOK_DATABASE_URL'] as string
11+
let CHINOOK_DATABASE_URL = process.env['CHINOOK_DATABASE_URL'] as string
1212
console.assert(CHINOOK_DATABASE_URL, 'CHINOOK_DATABASE_URL is required')
13+
CHINOOK_DATABASE_URL = 'sqlitecloud://admin:uN3ARhdcKQ@oggdnp3zm.sqlite.cloud:8860/chinook.db?verbose=true'
1314

1415
import { expect, test, describe, beforeEach, afterEach } from 'bun:test'
1516

@@ -177,7 +178,7 @@ describe('SQLiteCloudBunConnection', () => {
177178
expect(results.columnsNames).toEqual(['key', 'value'])
178179
})
179180

180-
test('should test chunked rowset', async () => {
181+
test.only('should test chunked rowset', async () => {
181182
const results = await sendCommands(chinookConnection, 'TEST ROWSET_CHUNK')
182183
expect(results.numberOfRows).toBe(147)
183184
expect(results.numberOfColumns).toBe(1)

src/gateway/connection-bun.ts

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import {
1313
decompressBuffer,
1414
parseRowsetChunks,
1515
CMD_COMPRESSED,
16-
CMD_ROWSET_CHUNK
16+
CMD_ROWSET_CHUNK,
17+
bufferEndsWith,
18+
ROWSET_CHUNKS_END
1719
} from '../drivers/protocol'
1820
import type { Socket } from 'bun'
1921

@@ -125,9 +127,8 @@ export class SQLiteCloudBunConnection extends SQLiteCloudConnection {
125127

126128
// reset buffer and rowset chunks, define response callback
127129
this.buffer = Buffer.alloc(0)
128-
this.rowsetChunks = []
129-
this.processCallback = callback
130130
this.startedOn = new Date()
131+
this.processCallback = callback
131132

132133
// compose commands following SCPC protocol
133134
const formattedCommands = formatCommand(commands)
@@ -144,9 +145,8 @@ export class SQLiteCloudBunConnection extends SQLiteCloudConnection {
144145
// onData is called when data is received, it will process the data until all data is retrieved for a response
145146
// when response is complete or there's an error, finish is called to call the results callback set by processCommands...
146147

147-
// buffer to store incoming data
148+
// buffer to accumulate incoming data until an whole command is received and can be parsed
148149
private buffer: Buffer = Buffer.alloc(0)
149-
private rowsetChunks: Buffer[] = []
150150
private startedOn: Date = new Date()
151151

152152
// callback to be called when a command is finished processing
@@ -155,14 +155,16 @@ export class SQLiteCloudBunConnection extends SQLiteCloudConnection {
155155
/** Handles data received in response to an outbound command sent by processCommands */
156156
private processCommandsData(socket: Socket<any>, data: Buffer) {
157157
try {
158-
// on first ondata event, dataType is read from data, on subsequent ondata event, is read from buffer that is the concatanations of data received on each ondata event
159-
let dataType = this.buffer.length === 0 ? data.subarray(0, 1).toString() : this.buffer.subarray(0, 1).toString('utf8')
160-
this.buffer = Buffer.concat([this.buffer, data])
161-
const commandLength = hasCommandLength(dataType)
158+
// append data to buffer as it arrives
159+
if (data.length && data.length > 0) {
160+
this.buffer = Buffer.concat([this.buffer, data])
161+
}
162162

163-
if (commandLength) {
163+
let dataType = this.buffer?.subarray(0, 1).toString()
164+
if (hasCommandLength(dataType)) {
164165
const commandLength = parseCommandLength(this.buffer)
165166
const hasReceivedEntireCommand = this.buffer.length - this.buffer.indexOf(' ') - 1 >= commandLength ? true : false
167+
console.debug(`dataType: ${dataType} buffer.length: ${this.buffer.length}, commandLenght: ${commandLength}`)
166168
if (hasReceivedEntireCommand) {
167169
if (this.config?.verbose) {
168170
let bufferString = this.buffer.toString('utf8')
@@ -182,18 +184,10 @@ export class SQLiteCloudBunConnection extends SQLiteCloudConnection {
182184
const { data } = popData(this.buffer)
183185
this.processCommandsFinish?.call(this, null, data)
184186
} else {
185-
// check if rowset received the ending chunk
186-
if (data.subarray(data.indexOf(' ') + 1, data.length).toString() === '0 0 0 ') {
187-
const parsedData = parseRowsetChunks(this.rowsetChunks)
187+
// check if rowset received the ending chunk in which case it can be unpacked
188+
if (bufferEndsWith(this.buffer, ROWSET_CHUNKS_END)) {
189+
const parsedData = parseRowsetChunks([this.buffer])
188190
this.processCommandsFinish?.call(this, null, parsedData)
189-
} else {
190-
// no ending string? ask server for another chunk
191-
this.rowsetChunks.push(this.buffer)
192-
this.buffer = Buffer.alloc(0)
193-
194-
// no longer need to ack the server
195-
// const okCommand = formatCommand('OK')
196-
// this.socket?.write(okCommand)
197191
}
198192
}
199193
}
@@ -221,6 +215,7 @@ export class SQLiteCloudBunConnection extends SQLiteCloudConnection {
221215
// console.debug('BunTransport.finish - result', result)
222216
}
223217
if (this.processCallback) {
218+
// console.error(`SQLiteCloudBunConnection.processCommandsFinish - error:${error}, result: ${result}`, error, result)
224219
this.processCallback(error, result)
225220
}
226221
}

test/connection-tls.test.ts

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ describe('connection-tls', () => {
8989
})
9090

9191
it('should connect with insecure connection string', done => {
92-
if (INSECURE_DATABASE_URL) {
92+
if (!INSECURE_DATABASE_URL) {
93+
done()
94+
} else {
9395
expect(INSECURE_DATABASE_URL).toBeDefined()
9496
const conn = new SQLiteCloudTlsConnection(INSECURE_DATABASE_URL, error => {
9597
expect(error).toBeNull()
@@ -295,29 +297,7 @@ describe('connection-tls', () => {
295297
expect(results[1]['key']).toBe('INDEXED')
296298
expect(results[2]['key']).toBe('INDEX')
297299
expect(results[3]['key']).toBe('DESC')
298-
299-
database.close()
300-
done()
301-
})
302-
},
303-
LONG_TIMEOUT
304-
)
305-
306-
it(
307-
'should test chunked rowset via ',
308-
done => {
309-
// this operation sends 150 packets, so we need to increase the timeout
310-
const database = getChinookTlsConnection(undefined, { timeout: 60 * 1000 })
311-
database.sendCommands('TEST ROWSET_CHUNK', (error, results) => {
312-
expect(error).toBeNull()
313-
expect(results.numberOfRows).toBe(147)
314-
expect(results.numberOfColumns).toBe(1)
315-
expect(results.columnsNames).toEqual(['key'])
316-
317-
expect(results[0]['key']).toBe('REINDEX')
318-
expect(results[1]['key']).toBe('INDEXED')
319-
expect(results[2]['key']).toBe('INDEX')
320-
expect(results[3]['key']).toBe('DESC')
300+
expect(results[146]['key']).toBe('PRIMARY')
321301

322302
database.close()
323303
done()

test/protocol.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//
2+
// protocol.test.ts
3+
//
4+
5+
import { parseRowsetChunks } from '../src/drivers/protocol'
6+
7+
// response sent by the server when we TEST ROWSET_CHUNK
8+
const CHUNKED_RESPONSE = Buffer.from(
9+
'/24 1:1 1 1 +3 key+7 REINDEX/18 2:1 1 1 +7 INDEXED/16 3:1 1 1 +5 INDEX/15 4:1 1 1 +4 DESC/17 5:1 1 1 +6 ESCAPE/15 6:1 1 1 +4 EACH/16 7:1 1 1 +5 CHECK/14 8:1 1 1 +3 KEY/17 9:1 1 1 +6 BEFORE/19 10:1 1 1 +7 FOREIGN/15 11:1 1 1 +3 FOR/18 12:1 1 1 +6 IGNORE/18 13:1 1 1 +6 REGEXP/19 14:1 1 1 +7 EXPLAIN/19 15:1 1 1 +7 INSTEAD/15 16:1 1 1 +3 ADD/20 17:1 1 1 +8 DATABASE/14 18:1 1 1 +2 AS/18 19:1 1 1 +6 SELECT/17 20:1 1 1 +5 TABLE/16 21:1 1 1 +4 LEFT/16 22:1 1 1 +4 THEN/15 23:1 1 1 +3 END/23 24:1 1 1 +10 DEFERRABLE/16 25:1 1 1 +4 ELSE/19 26:1 1 1 +7 EXCLUDE/18 27:1 1 1 +6 DELETE/21 28:1 1 1 +9 TEMPORARY/16 29:1 1 1 +4 TEMP/14 30:1 1 1 +2 OR/18 31:1 1 1 +6 ISNULL/17 32:1 1 1 +5 NULLS/21 33:1 1 1 +9 SAVEPOINT/21 34:1 1 1 +9 INTERSECT/16 35:1 1 1 +4 TIES/19 36:1 1 1 +7 NOTNULL/15 37:1 1 1 +3 NOT/14 38:1 1 1 +2 NO/16 39:1 1 1 +4 NULL/16 40:1 1 1 +4 LIKE/18 41:1 1 1 +6 EXCEPT/24 42:1 1 1 +11 TRANSACTION/18 43:1 1 1 +6 ACTION/14 44:1 1 1 +2 ON/19 45:1 1 1 +7 NATURAL/17 46:1 1 1 +5 ALTER/17 47:1 1 1 +5 RAISE/21 48:1 1 1 +9 EXCLUSIVE/18 49:1 1 1 +6 EXISTS/23 50:1 1 1 +10 CONSTRAINT/16 51:1 1 1 +4 INTO/18 52:1 1 1 +6 OFFSET/14 53:1 1 1 +2 OF/15 54:1 1 1 +3 SET/19 55:1 1 1 +7 TRIGGER/17 56:1 1 1 +5 RANGE/21 57:1 1 1 +9 GENERATED/18 58:1 1 1 +6 DETACH/18 59:1 1 1 +6 HAVING/16 60:1 1 1 +4 GLOB/17 61:1 1 1 +5 BEGIN/17 62:1 1 1 +5 INNER/23 63:1 1 1 +10 REFERENCES/18 64:1 1 1 +6 UNIQUE/17 65:1 1 1 +5 QUERY/19 66:1 1 1 +7 WITHOUT/16 67:1 1 1 +4 WITH/17 68:1 1 1 +5 OUTER/19 69:1 1 1 +7 RELEASE/18 70:1 1 1 +6 ATTACH/19 71:1 1 1 +7 BETWEEN/19 72:1 1 1 +7 NOTHING/18 73:1 1 1 +6 GROUPS/17 74:1 1 1 +5 GROUP/19 75:1 1 1 +7 CASCADE/15 76:1 1 1 +3 ASC/19 77:1 1 1 +7 DEFAULT/16 78:1 1 1 +4 CASE/19 79:1 1 1 +7 COLLATE/18 80:1 1 1 +6 CREATE/25 81:1 1 1 +12 CURRENT_DATE/21 82:1 1 1 +9 IMMEDIATE/16 83:1 1 1 +4 JOIN/18 84:1 1 1 +6 INSERT/17 85:1 1 1 +5 MATCH/16 86:1 1 1 +4 PLAN/19 87:1 1 1 +7 ANALYZE/18 88:1 1 1 +6 PRAGMA/25 89:1 1 1 +12 MATERIALIZED/20 90:1 1 1 +8 DEFERRED/20 91:1 1 1 +8 DISTINCT/14 92:1 1 1 +2 IS/18 93:1 1 1 +6 UPDATE/18 94:1 1 1 +6 VALUES/19 95:1 1 1 +7 VIRTUAL/18 96:1 1 1 +6 ALWAYS/16 97:1 1 1 +4 WHEN/17 98:1 1 1 +5 WHERE/21 99:1 1 1 +9 RECURSIVE/18 100:1 1 1 +5 ABORT/18 101:1 1 1 +5 AFTER/19 102:1 1 1 +6 RENAME/16 103:1 1 1 +3 AND/17 104:1 1 1 +4 DROP/22 105:1 1 1 +9 PARTITION/27 106:1 1 1 +13 AUTOINCREMENT/15 107:1 1 1 +2 TO/15 108:1 1 1 +2 IN/17 109:1 1 1 +4 CAST/19 110:1 1 1 +6 COLUMN/19 111:1 1 1 +6 COMMIT/21 112:1 1 1 +8 CONFLICT/18 113:1 1 1 +5 CROSS/31 114:1 1 1 +17 CURRENT_TIMESTAMP/26 115:1 1 1 +12 CURRENT_TIME/20 116:1 1 1 +7 CURRENT/22 117:1 1 1 +9 PRECEDING/17 118:1 1 1 +4 FAIL/17 119:1 1 1 +4 LAST/19 120:1 1 1 +6 FILTER/20 121:1 1 1 +7 REPLACE/18 122:1 1 1 +5 FIRST/22 123:1 1 1 +9 FOLLOWING/17 124:1 1 1 +4 FROM/17 125:1 1 1 +4 FULL/18 126:1 1 1 +5 LIMIT/15 127:1 1 1 +2 IF/18 128:1 1 1 +5 ORDER/21 129:1 1 1 +8 RESTRICT/19 130:1 1 1 +6 OTHERS/17 131:1 1 1 +4 OVER/22 132:1 1 1 +9 RETURNING/18 133:1 1 1 +5 RIGHT/21 134:1 1 1 +8 ROLLBACK/17 135:1 1 1 +4 ROWS/16 136:1 1 1 +3 ROW/22 137:1 1 1 +9 UNBOUNDED/18 138:1 1 1 +5 UNION/18 139:1 1 1 +5 USING/19 140:1 1 1 +6 VACUUM/17 141:1 1 1 +4 VIEW/19 142:1 1 1 +6 WINDOW/15 143:1 1 1 +2 DO/15 144:1 1 1 +2 BY/22 145:1 1 1 +9 INITIALLY/16 146:1 1 1 +3 ALL/20 147:1 1 1 +7 PRIMARY/6 0 0 0 '
10+
)
11+
12+
describe('parseRowsetChunks', () => {
13+
it('should extract rowset from single buffer', () => {
14+
const rowset = parseRowsetChunks([CHUNKED_RESPONSE])
15+
expect(rowset.length).toBe(147)
16+
expect(rowset[0]['key']).toBe('REINDEX')
17+
expect(rowset[146]['key']).toBe('PRIMARY')
18+
})
19+
20+
it('should extract rowset from segmented buffers', () => {
21+
// split CHUNKED_RESPONSE into 3 random sized buffers
22+
const buffer1 = CHUNKED_RESPONSE.slice(0, 100)
23+
const buffer2 = CHUNKED_RESPONSE.slice(100, 200)
24+
const buffer3 = CHUNKED_RESPONSE.slice(200)
25+
26+
const rowset = parseRowsetChunks([buffer1, buffer2, buffer3])
27+
expect(rowset.length).toBe(147)
28+
expect(rowset[0]['key']).toBe('REINDEX')
29+
expect(rowset[146]['key']).toBe('PRIMARY')
30+
})
31+
})

0 commit comments

Comments
 (0)