Skip to content

Commit 807680f

Browse files
Fix issue with compression in chunked results, close #32
1 parent 23e96b6 commit 807680f

File tree

5 files changed

+144
-51
lines changed

5 files changed

+144
-51
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/connection.ts

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ export class SQLiteCloudConnection {
228228

229229
let buffer = Buffer.alloc(0)
230230
const rowsetChunks: Buffer[] = []
231-
this.log(`Sending: ${commands}`)
231+
this.log(`Send: ${commands}`)
232232

233233
// define what to do if an answer does not arrive within the set timeout
234234
let socketTimeout: number
@@ -251,31 +251,30 @@ export class SQLiteCloudConnection {
251251

252252
// define the Promise that waits for the server response
253253
const readData = (data: Uint8Array) => {
254-
this.log(`Received: ${data.length > 100 ? data.toString().substring(0, 100) + '...' : data.toString()}`)
255254
try {
256255
// on first ondata event, dataType is read from data, on subsequent ondata event, is read from buffer that is the concatanations of data received on each ondata event
257-
const dataType = buffer.length === 0 ? data.subarray(0, 1).toString() : buffer.subarray(0, 1).toString('utf8')
256+
let dataType = buffer.length === 0 ? data.subarray(0, 1).toString() : buffer.subarray(0, 1).toString('utf8')
258257
buffer = Buffer.concat([buffer, data])
259258
const commandLength = hasCommandLength(dataType)
260259

261260
if (commandLength) {
262261
const commandLength = parseCommandLength(buffer)
263-
264-
// in case of compressed data, extract the dataType of compressed data
265-
let compressedDataType = null
266-
if (dataType === CMD_COMPRESSED) {
267-
// remove LEN
268-
let compressedBuffer = buffer.subarray(buffer.indexOf(' ') + 1, buffer.length)
269-
// remove compressed size
270-
compressedBuffer = compressedBuffer.subarray(compressedBuffer.indexOf(' ') + 1, compressedBuffer.length)
271-
// remove decompressed size
272-
compressedBuffer = compressedBuffer.subarray(compressedBuffer.indexOf(' ') + 1, compressedBuffer.length)
273-
compressedDataType = compressedBuffer.subarray(0, 1).toString('utf8')
274-
}
275-
276262
const hasReceivedEntireCommand = buffer.length - buffer.indexOf(' ') - 1 >= commandLength ? true : false
277263
if (hasReceivedEntireCommand) {
278-
if (dataType !== CMD_ROWSET_CHUNK && compressedDataType !== CMD_ROWSET_CHUNK) {
264+
if (this.config.verbose) {
265+
let bufferString = buffer.toString('utf8')
266+
if (bufferString.length > 1000) {
267+
bufferString = bufferString.substring(0, 100) + '...' + bufferString.substring(bufferString.length - 40)
268+
}
269+
this.log(`Receive: ${bufferString}`)
270+
}
271+
272+
// need to decompress this buffer before decoding?
273+
if (dataType === CMD_COMPRESSED) {
274+
;({ buffer, dataType } = decompressBuffer(buffer))
275+
}
276+
277+
if (dataType !== CMD_ROWSET_CHUNK) {
279278
this.socket?.off('data', readData)
280279
const { data } = popData(buffer)
281280
finish(null, data)
@@ -289,7 +288,9 @@ export class SQLiteCloudConnection {
289288
// no ending string? ask server for another chunk
290289
rowsetChunks.push(buffer)
291290
buffer = Buffer.alloc(0)
292-
this.socket?.write(formatCommand('OK'))
291+
const okCommand = formatCommand('OK')
292+
this.log(`Send: ${okCommand}`)
293+
this.socket?.write(okCommand)
293294
}
294295
}
295296
}
@@ -403,20 +404,21 @@ function parseCommandLength(data: Buffer) {
403404
/** Receive a compressed buffer, decompress with lz4, return buffer and datatype */
404405
function decompressBuffer(buffer: Buffer): { buffer: Buffer; dataType: string } {
405406
const spaceIndex = buffer.indexOf(' ')
406-
buffer = buffer.subarray(spaceIndex + 1, buffer.length)
407+
buffer = buffer.subarray(spaceIndex + 1)
407408

408409
// extract compressed size
409410
const compressedSize = parseInt(buffer.subarray(0, buffer.indexOf(' ') + 1).toString('utf8'))
410-
buffer = buffer.subarray(buffer.indexOf(' ') + 1, buffer.length)
411+
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
411412

412413
// extract decompressed size
413414
const decompressedSize = parseInt(buffer.subarray(0, buffer.indexOf(' ') + 1).toString('utf8'))
414-
buffer = buffer.subarray(buffer.indexOf(' ') + 1, buffer.length)
415+
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
415416

416417
// extract compressed dataType
417418
const dataType = buffer.subarray(0, 1).toString('utf8')
418419
const decompressedBuffer = Buffer.alloc(decompressedSize)
419-
const decompressionResult = lz4.decodeBlock(buffer.subarray(buffer.length - compressedSize, buffer.length), decompressedBuffer)
420+
const compressedBuffer = buffer.subarray(buffer.length - compressedSize)
421+
const decompressionResult = lz4.decompressBlock(compressedBuffer, decompressedBuffer, 0, compressedSize, 0)
420422
buffer = Buffer.concat([buffer.subarray(0, buffer.length - compressedSize), decompressedBuffer])
421423
if (decompressionResult <= 0 || decompressionResult !== decompressedSize) {
422424
throw new Error(`lz4 decompression error at offset ${decompressionResult}`)
@@ -548,7 +550,11 @@ function parseRowsetChunks(buffers: Buffer[]) {
548550
const data = []
549551

550552
for (let i = 0; i < buffers.length; i++) {
551-
let buffer = buffers[i]
553+
let buffer: Buffer = buffers[i]
554+
555+
// validate and skip data type
556+
const dataType = buffer.subarray(0, 1).toString()
557+
console.assert(dataType === CMD_ROWSET_CHUNK)
552558
buffer = buffer.subarray(buffer.indexOf(' ') + 1)
553559

554560
// chunk header, eg: 0:VERS NROWS NCOLS
@@ -595,11 +601,8 @@ function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCloudRows
595601
// first character is the data type
596602
console.assert(buffer && buffer instanceof Buffer)
597603
let dataType: string = buffer.subarray(0, 1).toString('utf8')
598-
599-
// need to decompress this buffer before decoding?
600-
if (dataType === CMD_COMPRESSED) {
601-
;({ buffer, dataType } = decompressBuffer(buffer))
602-
}
604+
console.assert(dataType !== CMD_COMPRESSED, "Compressed data shouldn't be decompressed before parsing")
605+
console.assert(dataType !== CMD_ROWSET_CHUNK, 'Chunked data should be parsed by parseRowsetChunks')
603606

604607
let spaceIndex = buffer.indexOf(' ')
605608
if (spaceIndex === -1) {
@@ -635,9 +638,6 @@ function popData(buffer: Buffer): { data: SQLiteCloudDataTypes | SQLiteCloudRows
635638
return popResults(parseArray(buffer, spaceIndex))
636639
case CMD_ROWSET:
637640
return popResults(parseRowset(buffer, spaceIndex))
638-
case CMD_ROWSET_CHUNK:
639-
console.assert(false)
640-
break
641641
case CMD_ERROR:
642642
parseError(buffer, spaceIndex) // throws custom error
643643
break

test/connection.test.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { SQLiteCloudError } from '../src/index'
66
import { SQLiteCloudConnection, anonimizeCommand } from '../src/connection'
77
import { parseConnectionString } from '../src/utilities'
88
import { CHINOOK_DATABASE_URL, TESTING_DATABASE_URL, LONG_TIMEOUT, getTestingConfig, getChinookConfig, getChinookConnection } from './shared'
9+
import { Database } from 'sqlite3'
910

1011
describe('connection', () => {
1112
let chinook: SQLiteCloudConnection
@@ -281,6 +282,20 @@ describe('connection', () => {
281282
})
282283

283284
describe('send select commands', () => {
285+
it('should select results with no colum names', done => {
286+
chinook.sendCommands("select 42, 'hello'", (error, results) => {
287+
expect(error).toBeNull()
288+
expect(results.numberOfColumns).toBe(2)
289+
expect(results.numberOfRows).toBe(1)
290+
expect(results.version == 1 || results.version == 2).toBeTruthy()
291+
expect(results.columnsNames).toEqual(['42', "'hello'"]) // column name should be hello, not 'hello'
292+
expect(results.getItem(0, 0)).toBe(42)
293+
expect(results.getItem(0, 1)).toBe('hello')
294+
295+
done()
296+
})
297+
})
298+
284299
it('should select long formatted string', done => {
285300
chinook.sendCommands("USE DATABASE :memory:; select printf('%.*c', 1000, 'x') AS DDD", (error, results) => {
286301
expect(error).toBeNull()
@@ -398,7 +413,7 @@ describe('connection', () => {
398413
if (++completed >= numQueries) {
399414
const queryMs = (Date.now() - startTime) / numQueries
400415
console.log(`${numQueries}x batched selects, ${queryMs.toFixed(0)}ms per query`)
401-
expect(queryMs).toBeLessThan(2000)
416+
expect(queryMs).toBeLessThan(5000)
402417
done()
403418
}
404419
}

test/shared.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,25 +68,28 @@ export const CHINOOK_FIRST_TRACK = {
6868
UnitPrice: 0.99
6969
}
7070

71-
export function getChinookConfig(url = CHINOOK_DATABASE_URL): SQLiteCloudConfig {
72-
const chinookConfig = parseConnectionString(url)
71+
export function getChinookConfig(url = CHINOOK_DATABASE_URL, extraConfig?: Partial<SQLiteCloudConfig>): SQLiteCloudConfig {
72+
let chinookConfig = parseConnectionString(url)
7373
if (chinookConfig.host === 'localhost' && chinookConfig.tlsOptions === undefined) {
7474
chinookConfig.tlsOptions = {
7575
ca: SELF_SIGNED_CERTIFICATE
7676
}
7777
}
7878
chinookConfig.timeout = 10 * 1000 // 10 seconds
79+
if (extraConfig) {
80+
chinookConfig = { ...chinookConfig, ...extraConfig }
81+
}
7982
return chinookConfig
8083
}
8184

82-
export function getChinookConnection(callback?: ResultsCallback): SQLiteCloudConnection {
83-
const chinookConfig = getChinookConfig()
85+
export function getChinookConnection(callback?: ResultsCallback, extraConfig?: Partial<SQLiteCloudConfig>): SQLiteCloudConnection {
86+
const chinookConfig = getChinookConfig(CHINOOK_DATABASE_URL, extraConfig)
8487
return new SQLiteCloudConnection(chinookConfig, callback)
8588
}
8689

8790
/** Returns a chinook.db connection, caller is responsible for closing the database */
88-
export function getChinookDatabase(callback?: ResultsCallback): Database {
89-
const chinookConfig = getChinookConfig()
91+
export function getChinookDatabase(callback?: ResultsCallback, extraConfig?: Partial<SQLiteCloudConfig>): Database {
92+
const chinookConfig = getChinookConfig(CHINOOK_DATABASE_URL, extraConfig)
9093
return new Database(chinookConfig, callback)
9194
}
9295

test/stress.test.ts

Lines changed: 86 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
* stress.test.ts - opens lots of connections and queries
33
*/
44

5-
import { Database } from '../src/database'
6-
import { getChinookDatabase, getTestingDatabaseAsync, removeDatabaseAsync } from './shared'
7-
import { SEQUENCE_TEST_SIZE, SIMULTANEOUS_TEST_SIZE, EXTRA_LONG_TIMEOUT } from './shared'
5+
import { Database, SQLiteCloudRowset } from '../src'
6+
import { getChinookDatabase, getTestingDatabaseAsync, removeDatabaseAsync, SEQUENCE_TEST_SIZE, SIMULTANEOUS_TEST_SIZE, EXTRA_LONG_TIMEOUT } from './shared'
7+
8+
const WARN_SPEED_MS = 500 // will warn if slower than this
9+
const EXPECT_SPEED_MS = 6 * 1000 // will throw error if slower than this
810

911
describe('stress testing', () => {
1012
it(
@@ -20,8 +22,10 @@ describe('stress testing', () => {
2022
connection.close()
2123
if (i % 25 === 0) {
2224
const connectionMs = (Date.now() - startTime) / i
23-
console.log(`${i}x open and close, ${connectionMs.toFixed(0)}ms per connection`)
24-
expect(connectionMs).toBeLessThan(2000)
25+
if (connectionMs > WARN_SPEED_MS) {
26+
console.warn(`${i}x open and close, ${connectionMs.toFixed(0)}ms per connection`)
27+
expect(connectionMs).toBeLessThan(EXPECT_SPEED_MS)
28+
}
2529
}
2630
}
2731
} catch (error) {
@@ -46,8 +50,10 @@ describe('stress testing', () => {
4650
await removeDatabaseAsync(database)
4751
if (i % 25 === 0) {
4852
const connectionMs = (Date.now() - startTime) / i
49-
console.log(`${i}x open, read, write and close, ${connectionMs.toFixed(0)}ms per connection`)
50-
expect(connectionMs).toBeLessThan(2000)
53+
if (connectionMs > WARN_SPEED_MS) {
54+
console.warn(`${i}x open, read, write and close, ${connectionMs.toFixed(0)}ms per connection`)
55+
expect(connectionMs).toBeLessThan(EXPECT_SPEED_MS)
56+
}
5157
}
5258
}
5359
} catch (error) {
@@ -86,8 +92,10 @@ describe('stress testing', () => {
8692
}
8793

8894
const connectionMs = (Date.now() - startTime) / SIMULTANEOUS_TEST_SIZE
89-
console.log(`${SIMULTANEOUS_TEST_SIZE}x open simultaneously, ${connectionMs.toFixed(0)}ms per connection`)
90-
expect(connectionMs).toBeLessThan(2000)
95+
if (connectionMs > WARN_SPEED_MS) {
96+
console.warn(`${SIMULTANEOUS_TEST_SIZE}x open simultaneously, ${connectionMs.toFixed(0)}ms per connection`)
97+
expect(connectionMs).toBeLessThan(EXPECT_SPEED_MS)
98+
}
9199
} catch (error) {
92100
console.error(`Error opening connection ${connections.length}: ${error}`)
93101
expect(error).toBeNull()
@@ -110,8 +118,75 @@ describe('stress testing', () => {
110118
}
111119

112120
const queryMs = (Date.now() - startTime) / numQueries
113-
console.log(`${numQueries}x database.sql selects, ${queryMs.toFixed(0)}ms per query`)
114-
expect(queryMs).toBeLessThan(2000)
121+
if (queryMs > WARN_SPEED_MS) {
122+
console.warn(`${numQueries}x database.sql selects, ${queryMs.toFixed(0)}ms per query`)
123+
expect(queryMs).toBeLessThan(EXPECT_SPEED_MS)
124+
}
125+
},
126+
EXTRA_LONG_TIMEOUT
127+
)
128+
129+
it('should receive small responses with uncompressed data', async () => {
130+
const chinook = getChinookDatabase()
131+
132+
// small response is NOT compressed
133+
const smallResults = (await chinook.sql`SELECT hex(randomblob(1000)) 'small'`) as SQLiteCloudRowset // 1 KB
134+
expect(smallResults).toHaveLength(1)
135+
expect(smallResults.metadata.columns[0].name).toBe('small')
136+
expect(smallResults[0].small).toHaveLength(2 * 1000) // hex encoded
137+
})
138+
139+
it(
140+
'should receive large responses with uncompressed data',
141+
async () => {
142+
const chinook = getChinookDatabase()
143+
144+
// large response is NOT compressed
145+
const largeResults = (await chinook.sql`SELECT hex(zeroblob(10000000)) 'columName'`) as SQLiteCloudRowset // 10 MB
146+
expect(largeResults).toHaveLength(1)
147+
expect(largeResults.metadata.columns[0].name).toBe('columName')
148+
const largeResultString = largeResults[0].columName as string
149+
expect(largeResultString).toHaveLength(2 * 10000000) // 20 MB
150+
},
151+
EXTRA_LONG_TIMEOUT
152+
)
153+
154+
it(
155+
'should receive medium responses with compressed data',
156+
async () => {
157+
const chinook = getChinookDatabase()
158+
159+
// enable compression
160+
const enable = await chinook.sql`SET CLIENT KEY COMPRESSION TO 1;`
161+
162+
// large response is compressed
163+
const blobSize = 20 * 1024 // 20 KB
164+
const largeCompressedResults = (await chinook.sql`SELECT hex(randomblob(${blobSize})) AS 'columnName'`) as SQLiteCloudRowset
165+
expect(largeCompressedResults).toHaveLength(1)
166+
expect(largeCompressedResults.metadata.columns[0].name).toBe('columnName')
167+
expect(largeCompressedResults[0].columnName).toHaveLength(2 * blobSize)
168+
},
169+
EXTRA_LONG_TIMEOUT
170+
)
171+
172+
it(
173+
'should receive large responses with compressed data',
174+
async () => {
175+
const chinook = getChinookDatabase()
176+
177+
// enable compression
178+
const enable = await chinook.sql`SET CLIENT KEY COMPRESSION TO 1;`
179+
180+
// large response is compressed
181+
const blobSize = 10 * 1024 * 1024 // 10 MB
182+
const largeCompressedResults = (await chinook.sql`SELECT hex(randomblob(${blobSize})) AS 'columnName'`) as SQLiteCloudRowset
183+
expect(largeCompressedResults).toHaveLength(1)
184+
expect(largeCompressedResults.metadata.columns[0].name).toBe('columnName')
185+
expect(largeCompressedResults[0].columnName).toHaveLength(2 * blobSize)
186+
187+
const postfix = await chinook.sql`SELECT 1`
188+
expect(postfix).toHaveLength(1)
189+
expect(postfix[0]['1']).toBe(1)
115190
},
116191
EXTRA_LONG_TIMEOUT
117192
)

0 commit comments

Comments
 (0)