Skip to content

Commit acd4c04

Browse files
Fixing concurrency issues
- opening socket and initialization in single operation
1 parent 16f5f34 commit acd4c04

File tree

4 files changed

+175
-158
lines changed

4 files changed

+175
-158
lines changed

src/connection.ts

Lines changed: 139 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ export const DEFAULT_TIMEOUT = 300 * 1000
3838
/** Default tls connection port */
3939
export const DEFAULT_PORT = 9960
4040

41-
/** SQLiteCloud low-level connection, will do messaging, handle socket, authentication, etc. */
41+
/**
42+
* SQLiteCloud low-level connection, will do messaging, handle socket, authentication, etc.
43+
* A connection socket is established when the connection is created and closed when the connection is closed.
44+
* All operations are serialized by waiting for any pending operations to complete. Once a connection is closed,
45+
* it cannot be reopened and you must create a new connection.
46+
*/
4247
export class SQLiteCloudConnection {
4348
/** Parse and validate provided connectionString or configuration */
4449
constructor(config: SQLiteCloudConfig | string, callback?: ErrorCallback) {
@@ -47,7 +52,6 @@ export class SQLiteCloudConnection {
4752
} else {
4853
this.config = this.validateConfiguration(config)
4954
}
50-
5155
this.connect(callback)
5256
}
5357

@@ -146,39 +150,23 @@ export class SQLiteCloudConnection {
146150
return commands
147151
}
148152

149-
//
150-
// public methods
151-
//
152-
153-
/** Enable verbose logging for debug purposes */
154-
public verbose(): void {
155-
this.config.verbose = true
156-
}
157-
158153
/* Opens a connection with the server and sends the initialization commands. Will throw in case of errors. */
159-
public connect(callback?: ErrorCallback): this {
160-
if (this.socket) {
161-
callback?.call(this, null)
162-
return this
163-
}
164-
154+
private connect(callback?: ErrorCallback): this {
165155
this.operations.enqueue(done => {
156+
// connection established while we were waiting in line?
157+
console.assert(this.socket === undefined, 'Connection already established')
158+
166159
// clear all listeners and call done in the operations queue
167160
const finish: ResultsCallback = error => {
168161
if (this.socket) {
169162
this.socket.removeAllListeners('data')
170163
this.socket.removeAllListeners('error')
171164
this.socket.removeAllListeners('close')
172165
}
173-
if (callback) {
174-
callback?.call(this, error)
175-
callback = undefined
176-
}
177166
if (error) {
178167
this.close()
179168
}
180-
// process next operation in the queue
181-
done?.call(this, error ? error : undefined)
169+
done(error)
182170
}
183171

184172
// connect to tls socket, initialize connection, setup event handlers
@@ -190,12 +178,25 @@ export class SQLiteCloudConnection {
190178
finish(new SQLiteCloudError('Connection was not authorized', { cause: anonimizedError }))
191179
} else {
192180
this.socket = client
193-
finish(null)
181+
182+
// send initialization commands
183+
const commands = this.initializationCommands
184+
this.processCommands(commands, error => {
185+
if (error) {
186+
this.close()
187+
}
188+
if (callback) {
189+
callback?.call(this, error)
190+
callback = undefined
191+
}
192+
finish(error)
193+
})
194194
}
195195
})
196196

197197
client.on('close', () => {
198198
this.log('Connection closed')
199+
this.socket = undefined
199200
finish(new SQLiteCloudError('Connection was closed'))
200201
})
201202

@@ -204,131 +205,141 @@ export class SQLiteCloudConnection {
204205
finish(new SQLiteCloudError('Connection error', { cause: error }))
205206
})
206207
})
207-
208-
// send initialization commands (will be enqued in the operations queue)
209-
const commands = this.initializationCommands
210-
this.sendCommands(commands, error => {
211-
callback?.call(this, error)
212-
})
213-
214208
return this
215209
}
216210

217-
/** Will send a command and return the resulting rowset or result or throw an error */
218-
public sendCommands(commands: string, callback?: ResultsCallback): this {
219-
this.operations.enqueue(done => {
220-
// connection needs to be established?
221-
if (!this.socket) {
222-
callback?.call(this, new SQLiteCloudError('Connection not established', { errorCode: 'ERR_CONNECTION_NOT_ESTABLISHED' }))
223-
return
224-
}
211+
/** Will send a command immediately (no queueing), return the rowset/result or throw an error */
212+
private processCommands(commands: string, callback?: ResultsCallback): this {
213+
// connection needs to be established?
214+
if (!this.socket) {
215+
callback?.call(this, new SQLiteCloudError('Connection not established', { errorCode: 'ERR_CONNECTION_NOT_ESTABLISHED' }))
216+
return this
217+
}
225218

226-
// compose commands following SCPC protocol
227-
commands = formatCommand(commands)
219+
// compose commands following SCPC protocol
220+
commands = formatCommand(commands)
228221

229-
let buffer = Buffer.alloc(0)
230-
const rowsetChunks: Buffer[] = []
231-
const startedOn = new Date()
232-
this.log(`Send: ${commands}`)
222+
let buffer = Buffer.alloc(0)
223+
const rowsetChunks: Buffer[] = []
224+
const startedOn = new Date()
225+
this.log(`Send: ${commands}`)
233226

234-
// define what to do if an answer does not arrive within the set timeout
235-
let socketTimeout: number
227+
// define what to do if an answer does not arrive within the set timeout
228+
let socketTimeout: number
236229

237-
// clear all listeners and call done in the operations queue
238-
const finish: ResultsCallback = (error, result) => {
239-
clearTimeout(socketTimeout)
240-
if (this.socket) {
241-
this.socket.removeAllListeners('data')
242-
this.socket.removeAllListeners('error')
243-
this.socket.removeAllListeners('close')
244-
}
245-
if (callback) {
246-
callback?.call(this, error, result)
247-
callback = undefined
248-
}
249-
// process next operation in the queue
250-
done?.call(this, error ? error : undefined)
230+
// clear all listeners and call done in the operations queue
231+
const finish: ResultsCallback = (error, result) => {
232+
clearTimeout(socketTimeout)
233+
if (this.socket) {
234+
this.socket.removeAllListeners('data')
235+
this.socket.removeAllListeners('error')
236+
this.socket.removeAllListeners('close')
251237
}
238+
if (callback) {
239+
callback?.call(this, error, result)
240+
callback = undefined
241+
}
242+
}
252243

253-
// define the Promise that waits for the server response
254-
const readData = (data: Uint8Array) => {
255-
try {
256-
// on first ondata event, dataType is read from data, on subsequent ondata event, is read from buffer that is the concatanations of data received on each ondata event
257-
let dataType = buffer.length === 0 ? data.subarray(0, 1).toString() : buffer.subarray(0, 1).toString('utf8')
258-
buffer = Buffer.concat([buffer, data])
259-
const commandLength = hasCommandLength(dataType)
260-
261-
if (commandLength) {
262-
const commandLength = parseCommandLength(buffer)
263-
const hasReceivedEntireCommand = buffer.length - buffer.indexOf(' ') - 1 >= commandLength ? true : false
264-
if (hasReceivedEntireCommand) {
265-
if (this.config.verbose) {
266-
let bufferString = buffer.toString('utf8')
267-
if (bufferString.length > 1000) {
268-
bufferString = bufferString.substring(0, 100) + '...' + bufferString.substring(bufferString.length - 40)
269-
}
270-
const elapsedMs = new Date().getTime() - startedOn.getTime()
271-
this.log(`Receive: ${bufferString} - ${elapsedMs}ms`)
272-
}
273-
274-
// need to decompress this buffer before decoding?
275-
if (dataType === CMD_COMPRESSED) {
276-
;({ buffer, dataType } = decompressBuffer(buffer))
244+
// define the Promise that waits for the server response
245+
const readData = (data: Uint8Array) => {
246+
try {
247+
// on first ondata event, dataType is read from data, on subsequent ondata event, is read from buffer that is the concatanations of data received on each ondata event
248+
let dataType = buffer.length === 0 ? data.subarray(0, 1).toString() : buffer.subarray(0, 1).toString('utf8')
249+
buffer = Buffer.concat([buffer, data])
250+
const commandLength = hasCommandLength(dataType)
251+
252+
if (commandLength) {
253+
const commandLength = parseCommandLength(buffer)
254+
const hasReceivedEntireCommand = buffer.length - buffer.indexOf(' ') - 1 >= commandLength ? true : false
255+
if (hasReceivedEntireCommand) {
256+
if (this.config.verbose) {
257+
let bufferString = buffer.toString('utf8')
258+
if (bufferString.length > 1000) {
259+
bufferString = bufferString.substring(0, 100) + '...' + bufferString.substring(bufferString.length - 40)
277260
}
261+
const elapsedMs = new Date().getTime() - startedOn.getTime()
262+
this.log(`Receive: ${bufferString} - ${elapsedMs}ms`)
263+
}
278264

279-
if (dataType !== CMD_ROWSET_CHUNK) {
280-
this.socket?.off('data', readData)
281-
const { data } = popData(buffer)
282-
finish(null, data)
283-
} else {
284-
// @ts-expect-error
285-
// check if rowset received the ending chunk
286-
if (data.subarray(data.indexOf(' ') + 1, data.length).toString() === '0 0 0 ') {
287-
const parsedData = parseRowsetChunks(rowsetChunks)
288-
finish?.call(this, null, parsedData)
289-
} else {
290-
// no ending string? ask server for another chunk
291-
rowsetChunks.push(buffer)
292-
buffer = Buffer.alloc(0)
293-
const okCommand = formatCommand('OK')
294-
this.log(`Send: ${okCommand}`)
295-
this.socket?.write(okCommand)
296-
}
297-
}
265+
// need to decompress this buffer before decoding?
266+
if (dataType === CMD_COMPRESSED) {
267+
;({ buffer, dataType } = decompressBuffer(buffer))
298268
}
299-
} else {
300-
// command with no explicit len so make sure that the final character is a space
301-
const lastChar = buffer.subarray(buffer.length - 1, buffer.length).toString('utf8')
302-
if (lastChar == ' ') {
269+
270+
if (dataType !== CMD_ROWSET_CHUNK) {
271+
this.socket?.off('data', readData)
303272
const { data } = popData(buffer)
304273
finish(null, data)
274+
} else {
275+
// @ts-expect-error
276+
// check if rowset received the ending chunk
277+
if (data.subarray(data.indexOf(' ') + 1, data.length).toString() === '0 0 0 ') {
278+
const parsedData = parseRowsetChunks(rowsetChunks)
279+
finish?.call(this, null, parsedData)
280+
} else {
281+
// no ending string? ask server for another chunk
282+
rowsetChunks.push(buffer)
283+
buffer = Buffer.alloc(0)
284+
const okCommand = formatCommand('OK')
285+
this.log(`Send: ${okCommand}`)
286+
this.socket?.write(okCommand)
287+
}
305288
}
306289
}
307-
} catch (error) {
308-
console.assert(error instanceof Error)
309-
if (error instanceof Error) {
310-
finish(error)
290+
} else {
291+
// command with no explicit len so make sure that the final character is a space
292+
const lastChar = buffer.subarray(buffer.length - 1, buffer.length).toString('utf8')
293+
if (lastChar == ' ') {
294+
const { data } = popData(buffer)
295+
finish(null, data)
311296
}
312297
}
298+
} catch (error) {
299+
console.assert(error instanceof Error)
300+
if (error instanceof Error) {
301+
finish(error)
302+
}
313303
}
304+
}
314305

315-
this.socket?.once('close', () => {
316-
finish(new SQLiteCloudError('Connection was closed', { cause: anonimizeCommand(commands) }))
317-
})
306+
this.socket?.once('close', () => {
307+
finish(new SQLiteCloudError('Connection was closed', { cause: anonimizeCommand(commands) }))
308+
})
318309

319-
this.socket?.write(commands, 'utf8', () => {
320-
socketTimeout = setTimeout(() => {
321-
const timeoutError = new SQLiteCloudError('Request timed out', { cause: anonimizeCommand(commands) })
322-
this.log(`Request timed out, config.timeout is ${this.config.timeout as number}ms`, timeoutError)
323-
finish(timeoutError)
324-
}, this.config.timeout)
325-
this.socket?.on('data', readData)
326-
})
310+
this.socket?.write(commands, 'utf8', () => {
311+
socketTimeout = setTimeout(() => {
312+
const timeoutError = new SQLiteCloudError('Request timed out', { cause: anonimizeCommand(commands) })
313+
this.log(`Request timed out, config.timeout is ${this.config.timeout as number}ms`, timeoutError)
314+
finish(timeoutError)
315+
}, this.config.timeout)
316+
this.socket?.on('data', readData)
317+
})
318+
319+
this.socket?.once('error', (error: any) => {
320+
this.log('Socket error', error)
321+
this.close()
322+
finish(new SQLiteCloudError('Socket error', { cause: anonimizeError(error) }))
323+
})
327324

328-
this.socket?.once('error', (error: any) => {
329-
this.log('Socket error', error)
330-
this.close()
331-
finish(new SQLiteCloudError('Socket error', { cause: anonimizeError(error) }))
325+
return this
326+
}
327+
328+
//
329+
// public methods
330+
//
331+
332+
/** Enable verbose logging for debug purposes */
333+
public verbose(): void {
334+
this.config.verbose = true
335+
}
336+
337+
/** Will enquee a command to be executed and callback with the resulting rowset/result/error */
338+
public sendCommands(commands: string, callback?: ResultsCallback): this {
339+
this.operations.enqueue(done => {
340+
this.processCommands(commands, (error, result) => {
341+
callback?.call(this, error, result)
342+
done(error)
332343
})
333344
})
334345

@@ -350,7 +361,7 @@ export class SQLiteCloudConnection {
350361
// OperationsQueue - used to linearize operations on the connection
351362
//
352363

353-
type OperationCallback = (error?: Error) => void
364+
type OperationCallback = (error: Error | null) => void
354365
type Operation = (done: OperationCallback) => void
355366

356367
export class OperationsQueue {

src/database.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ export class Database extends EventEmitter {
3232
constructor(config: SQLiteCloudConfig | string, callback?: ErrorCallback) {
3333
super()
3434
this.config = typeof config === 'string' ? { connectionString: config } : config
35-
this.connections = []
36-
3735
// opens first connection to the database automatically
3836
this.getConnection(callback as ResultsCallback)
3937
}
@@ -42,7 +40,7 @@ export class Database extends EventEmitter {
4240
private config: SQLiteCloudConfig
4341

4442
/** Database connections */
45-
private connections: SQLiteCloudConnection[]
43+
private connections: SQLiteCloudConnection[] = []
4644

4745
//
4846
// private methods
@@ -54,16 +52,17 @@ export class Database extends EventEmitter {
5452
if (this.connections?.length > 0) {
5553
callback?.call(this, null, this.connections[0])
5654
} else {
57-
const connection = new SQLiteCloudConnection(this.config)
58-
this.connections.push(connection)
59-
connection.connect(error => {
60-
if (error) {
61-
this.handleError(connection, error, callback)
62-
} else {
63-
callback?.call(this, null)
64-
this.emitEvent('open')
65-
}
66-
})
55+
this.connections.push(
56+
new SQLiteCloudConnection(this.config, error => {
57+
if (error) {
58+
this.handleError(this.connections[0], error, callback)
59+
} else {
60+
console.assert
61+
callback?.call(this, null, this.connections[0])
62+
this.emitEvent('open')
63+
}
64+
})
65+
)
6766
}
6867
}
6968

0 commit comments

Comments
 (0)