Skip to content

Commit 280b04b

Browse files
Streaming writes to avoid memory issues in node.js
1 parent eee5958 commit 280b04b

File tree

2 files changed

+51
-36
lines changed

2 files changed

+51
-36
lines changed

src/gateway/gateway.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,15 @@ io.on('connection', socket => {
106106

107107
// Run websocket server
108108
server.listen(SOCKET_PORT, () => {
109-
console.debug(`WebSocket server is running on port ${SOCKET_PORT}`)
109+
console.debug(`WebSocket on ws://localhost:${SOCKET_PORT}`)
110110
})
111111

112112
//
113113
// HTTP server
114114
//
115115

116116
app.listen(HTTP_PORT, () => {
117-
console.debug(`HTTP server is running on port ${HTTP_PORT}`)
117+
console.debug(`HTTP server on http://localhost:${HTTP_PORT}`)
118118
})
119119

120120
app.get('/v1/info', (req, res) => {

test/1brc.test.ts

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,47 +10,61 @@ import { SQLiteCloudRowset } from '../src'
1010
import { SQLiteCloudConnection } from '../src/drivers/connection'
1111
import { getChinookTlsConnection, getTestingDatabaseName, sendCommandsAsync } from './shared'
1212
import * as util from 'util'
13+
import * as readline from 'readline'
1314

1415
const fs = require('fs')
1516
const path = require('path')
1617

1718
const BRC_UNIQUE_STATIONS = 41343
18-
const BRC_INSERT_CHUNKS = 300_000 // insert this many rows per request
19+
const BRC_INSERT_CHUNKS = 350_000 // insert this many rows per request
1920

20-
const BRC_TIMEOUT = 12 * 60 * 60 * 1000 // 12 hours
21+
const BRC_TIMEOUT = 24 * 60 * 60 * 1000 // 1 day
2122
jest.setTimeout(BRC_TIMEOUT) // Set global timeout
2223

2324
describe('1 billion row challenge', () => {
2425
it('should create 50_000 measurements', async () => {
2526
await createMeasurements(50_000)
2627
})
28+
2729
it('should run 50_000 row challenge', async () => {
2830
await testChallenge(50_000)
2931
})
3032

3133
it('should create 500_000 measurements', async () => {
3234
await createMeasurements(500_000)
3335
})
36+
3437
it('should run 500_000 row challenge with chunked inserts', async () => {
3538
await testChallenge(500_000)
3639
})
37-
it('should run 500_000 row challenge with a single insert statement', async () => {
38-
await testChallenge(500_000, 500_000)
39-
})
40+
41+
// it('should run 500_000 row challenge with a single insert statement', async () => {
42+
// await testChallenge(500_000, 500_000)
43+
// })
4044

4145
it('should create 10_000_000 measurements', async () => {
4246
await createMeasurements(10_000_000)
4347
})
48+
4449
it('should run 10_000_000 row challenge', async () => {
4550
await testChallenge(10_000_000)
4651
})
4752

4853
it('should create 50_000_000 measurements', async () => {
4954
await createMeasurements(50_000_000)
5055
})
56+
5157
it('should run 50_000_000 row challenge', async () => {
5258
await testChallenge(50_000_000)
5359
})
60+
61+
it('should create 200_000_000 measurements', async () => {
62+
await createMeasurements(200_000_000)
63+
})
64+
65+
it('should run 200_000_000 row challenge', async () => {
66+
await testChallenge(200_000_000)
67+
})
5468
})
5569

5670
//
@@ -127,7 +141,7 @@ async function createMeasurements(numberOfRows: number = 1000000) {
127141
await write(chunkCsv)
128142
}
129143

130-
console.log(`Wrote 1brc_${numberOfRows}_rows.csv in ${Date.now() - startedOn}ms`)
144+
console.log(`Created 1brc_${numberOfRows}_rows.csv in ${Date.now() - startedOn}ms`)
131145
}
132146

133147
/** Read csv with measurements, insert in chunks, summarize and write out results to csv */
@@ -137,38 +151,40 @@ async function testChallenge(numberOfRows: number, insertChunks = BRC_INSERT_CHU
137151
const { connection, database } = await createDatabaseAsync(numberOfRows)
138152
try {
139153
const parseOn = Date.now()
140-
// parse csv into array of city/temperature
141-
const csvPathname = path.resolve(__dirname, 'assets/1brc', `1brc_${numberOfRows}_rows.csv`)
142-
const csvText = fs.readFileSync(csvPathname, 'utf8')
143-
const lines = csvText.trim().split('\n') // Split the CSV text by newline
144-
const data: { city: string; temp: number }[] = lines.map((line: string) => {
145-
const [city, temp] = line.split(';') // Split each line by semicolon
146-
return { city, temp: parseFloat(temp) } // Parse the temperature as a number
147-
})
148-
expect(lines.length).toBe(numberOfRows)
149-
const uniqueStations = new Set(data.map(item => item.city))
150-
expect(uniqueStations.size).toBe(BRC_UNIQUE_STATIONS)
151-
console.debug(`Read 1brc_${numberOfRows}_rows.csv in ${Date.now() - parseOn}ms`)
152154

153155
// create database and table
154156
const createResult = await sendCommandsAsync(connection, `CREATE TABLE measurements(city VARCHAR(26), temp FLOAT);`)
155157
expect(createResult).toBe('OK')
156158

157-
for (let chunk = 0, startRow = 0; startRow < numberOfRows; chunk++, startRow += BRC_INSERT_CHUNKS) {
158-
const insertOn = Date.now()
159-
// insert chunk of rows into sqlite database
160-
const dataChunk = data.slice(startRow, Math.min(numberOfRows, startRow + BRC_INSERT_CHUNKS))
161-
const values = dataChunk.map(({ city, temp }) => `('${city.replaceAll("'", "''")}', ${temp})`).join(',\n')
162-
const insertSql = `INSERT INTO measurements (city, temp) VALUES \n${values};`
163-
164-
// const sqlPathname = path.resolve(__dirname, 'assets/1brc', `1brc_${numberOfRows}_rows_${chunk}.sql`)
165-
// fs.writeFileSync(sqlPathname, insertSql)
166-
167-
// insert values into database
168-
const insertResult = (await sendCommandsAsync(connection, insertSql)) as Array<number>
169-
expect(Array.isArray(insertResult)).toBeTruthy()
170-
expect(insertResult[3] as number).toBe(dataChunk.length) // totalChanges
171-
console.debug(`Inserted ${dataChunk.length} rows (${Math.floor(insertSql.length / 1024)}KB) in ${Date.now() - insertOn}ms`)
159+
const csvPathname = path.resolve(__dirname, 'assets/1brc', `1brc_${numberOfRows}_rows.csv`)
160+
const fileStream = fs.createReadStream(csvPathname)
161+
162+
const rl = readline.createInterface({
163+
input: fileStream,
164+
crlfDelay: Infinity
165+
})
166+
167+
let dataChunk = []
168+
let rowCount = 0
169+
for await (const line of rl) {
170+
const [city, temp] = line.split(';') // Split each line by semicolon
171+
dataChunk.push({ city, temp: parseFloat(temp) }) // Parse the temperature as a number
172+
173+
if (dataChunk.length === insertChunks || rowCount + 1 === numberOfRows) {
174+
const insertOn = Date.now()
175+
const values = dataChunk.map(({ city, temp }) => `('${city.replaceAll("'", "''")}', ${temp})`).join(',\n')
176+
const insertSql = `INSERT INTO measurements (city, temp) VALUES \n${values};`
177+
178+
// insert values into database
179+
const insertResult = (await sendCommandsAsync(connection, insertSql)) as Array<number>
180+
expect(Array.isArray(insertResult)).toBeTruthy()
181+
expect(insertResult[3] as number).toBe(dataChunk.length) // totalChanges
182+
console.debug(`Inserted ${dataChunk.length} rows (${Math.floor(insertSql.length / 1024)}KB) in ${Date.now() - insertOn}ms`)
183+
184+
dataChunk = [] // reset data chunk
185+
}
186+
187+
rowCount++
172188
}
173189

174190
// calculate averages, etc
@@ -177,7 +193,6 @@ async function testChallenge(numberOfRows: number, insertChunks = BRC_INSERT_CHU
177193
const selectResult = (await sendCommandsAsync(connection, selectSql)) as SQLiteCloudRowset
178194
expect(selectResult).toBeTruthy()
179195
expect(selectResult.length).toBe(BRC_UNIQUE_STATIONS)
180-
console.debug(`Selected ${numberOfRows} rows with aggregates in ${Date.now() - selectOn}ms`)
181196

182197
console.log(`Ran ${numberOfRows} challenge in ${Date.now() - startedOn}ms`)
183198

0 commit comments

Comments
 (0)