|
| 1 | +/** |
| 2 | + * 1brc.test.ts - insert lots of data, syntesize, extract, benchmark |
| 3 | + * https://github.com/gunnarmorling/1brc/tree/main |
| 4 | + */ |
| 5 | + |
| 6 | +import { SQLiteCloudRowset } from '../src' |
| 7 | +import { SQLiteCloudConnection } from '../src/drivers/connection' |
| 8 | +import { EXTRA_LONG_TIMEOUT, LONG_TIMEOUT, getChinookTlsConnection, getTestingDatabaseName, sendCommandsAsync } from './shared' |
| 9 | +import * as util from 'util' |
| 10 | + |
| 11 | +const fs = require('fs') |
| 12 | +const path = require('path') |
| 13 | + |
| 14 | +const BRC_UNIQUE_STATIONS = 41343 |
| 15 | + |
| 16 | +async function createDatabaseAsync(numberOfRows: number): Promise<{ connection: SQLiteCloudConnection; database: string }> { |
| 17 | + const connection = getChinookTlsConnection() |
| 18 | + const database = getTestingDatabaseName(`1brc-${numberOfRows}`) |
| 19 | + const createSql = `UNUSE DATABASE; CREATE DATABASE ${database}; USE DATABASE ${database};` |
| 20 | + const createResults = await sendCommandsAsync(connection, createSql) |
| 21 | + expect(createResults).toBe('OK') |
| 22 | + return { database, connection } |
| 23 | +} |
| 24 | + |
| 25 | +async function destroyDatabaseAsync(connection: SQLiteCloudConnection, database: string) { |
| 26 | + const cleanupResults = await sendCommandsAsync(connection, `UNUSE DATABASE; REMOVE DATABASE ${database}`) |
| 27 | + expect(cleanupResults).toBe('OK') |
| 28 | + connection.close() |
| 29 | +} |
| 30 | + |
| 31 | +const BRC_TIMEOUT = 15 * 60 * 1000 // 15 minutes |
| 32 | +jest.setTimeout(BRC_TIMEOUT) // Set global timeout |
| 33 | + |
| 34 | +describe('1 billion row challenge', () => { |
| 35 | + it('should create 50_000 measurements', async () => { |
| 36 | + await createMeasurements(50_000) |
| 37 | + }) |
| 38 | + it('should run 50_000 row challenge', async () => { |
| 39 | + await testChallenge(50_000) |
| 40 | + }) |
| 41 | + |
| 42 | + it('should create 500_000 measurements', async () => { |
| 43 | + await createMeasurements(500_000) |
| 44 | + }) |
| 45 | + it('should run 500_000 row challenge', async () => { |
| 46 | + await testChallenge(500_000) |
| 47 | + }) |
| 48 | +}) |
| 49 | + |
| 50 | +// |
| 51 | +// generate data on the fly |
| 52 | +// |
| 53 | + |
| 54 | +class WeatherStation { |
| 55 | + constructor(public id: string, public meanTemperature: number) {} |
| 56 | + |
| 57 | + measurement(): number { |
| 58 | + let m = this.randomGaussian(this.meanTemperature, 10) |
| 59 | + return Math.round(m * 10.0) / 10.0 |
| 60 | + } |
| 61 | + |
| 62 | + private randomGaussian(mean: number, stdDev: number): number { |
| 63 | + let u1 = Math.random() |
| 64 | + let u2 = Math.random() |
| 65 | + let randStdNormal = Math.sqrt(-2.0 * Math.log(u1)) * Math.sin(2.0 * Math.PI * u2) |
| 66 | + return mean + stdDev * randStdNormal |
| 67 | + } |
| 68 | +} |
| 69 | + |
| 70 | +async function createMeasurements(numberOfRows: number = 1000000) { |
| 71 | + let startedOn = Date.now() |
| 72 | + |
| 73 | + const srcPathname = path.resolve(__dirname, 'assets/1brc', 'weather_stations.csv') |
| 74 | + const srcText = fs.readFileSync(srcPathname, 'utf8') |
| 75 | + |
| 76 | + // parse into array of city/temperature |
| 77 | + const lines = srcText.trim().split('\n') // Split the CSV text by newline |
| 78 | + const stations: WeatherStation[] = lines.map((line: string) => { |
| 79 | + const [city, temp] = line.split(';') // Split each line by semicolon |
| 80 | + return new WeatherStation(city, parseFloat(temp)) // Parse the temperature as a number |
| 81 | + }) |
| 82 | + const uniqueStations = new Set(stations.map(station => station.id)) |
| 83 | + expect(uniqueStations.size).toBe(BRC_UNIQUE_STATIONS) |
| 84 | + |
| 85 | + const csvPathname = path.resolve(__dirname, 'assets/1brc', `1brc_${numberOfRows}_rows.csv`) |
| 86 | + let writeStream = fs.createWriteStream(csvPathname) |
| 87 | + let write = util.promisify(writeStream.write).bind(writeStream) |
| 88 | + |
| 89 | + // write initial data (otherwise some stations may be missing in the random selection) |
| 90 | + for (let station of stations) { |
| 91 | + await write(`${station.id};${station.measurement()}\n`) |
| 92 | + } |
| 93 | + |
| 94 | + // make up the rest of the data |
| 95 | + for (let i = stations.length; i < numberOfRows; i++) { |
| 96 | + if (i > 0 && i % 50_000_000 === 0) { |
| 97 | + console.log(`Wrote ${i} measurements in ${Date.now() - startedOn}ms`) |
| 98 | + } |
| 99 | + let station = stations[Math.floor(Math.random() * stations.length)] |
| 100 | + await write(`${station.id};${station.measurement()}\n`) |
| 101 | + } |
| 102 | + |
| 103 | + console.log(`Wrote 1brc_${numberOfRows}_rows.csv in ${Date.now() - startedOn}ms`) |
| 104 | +} |
| 105 | + |
| 106 | +async function testChallenge(numberOfRows: number) { |
| 107 | + const startedOn = Date.now() |
| 108 | + |
| 109 | + try { |
| 110 | + const csvPathname = path.resolve(__dirname, 'assets/1brc', `1brc_${numberOfRows}_rows.csv`) |
| 111 | + const csvText = fs.readFileSync(csvPathname, 'utf8') |
| 112 | + |
| 113 | + // parse into array of city/temperature |
| 114 | + const lines = csvText.trim().split('\n') // Split the CSV text by newline |
| 115 | + const data: { city: string; temp: number }[] = lines.map((line: string) => { |
| 116 | + const [city, temp] = line.split(';') // Split each line by semicolon |
| 117 | + return { city, temp: parseFloat(temp) } // Parse the temperature as a number |
| 118 | + }) |
| 119 | + expect(lines.length).toBe(numberOfRows) |
| 120 | + |
| 121 | + const uniqueStations = new Set(data.map(item => item.city)) |
| 122 | + expect(uniqueStations.size).toBe(BRC_UNIQUE_STATIONS) |
| 123 | + |
| 124 | + // create database and table |
| 125 | + const { connection, database } = await createDatabaseAsync(lines.length) |
| 126 | + const createResult = await sendCommandsAsync(connection, `CREATE TABLE measurements(city VARCHAR(26), temp FLOAT);`) |
| 127 | + expect(createResult).toBe('OK') |
| 128 | + |
| 129 | + // insert into sqlite database |
| 130 | + const values = data.map(({ city, temp }) => `('${city.replaceAll("'", "''")}', ${temp})`).join(',\n') |
| 131 | + const insertSql = `INSERT INTO measurements (city, temp) VALUES \n${values};` |
| 132 | + const sqlPathname = path.resolve(__dirname, 'assets/1brc', `1brc_${numberOfRows}_rows.sql`) |
| 133 | + fs.writeFileSync(sqlPathname, insertSql) |
| 134 | + |
| 135 | + // insert values into database |
| 136 | + const insertResult = (await sendCommandsAsync(connection, insertSql)) as Array<number> |
| 137 | + expect(Array.isArray(insertResult)).toBeTruthy() |
| 138 | + expect(insertResult[2] as number).toBe(numberOfRows) |
| 139 | + |
| 140 | + // calculate averages, etc |
| 141 | + const selectSql = 'SELECT city, MIN(temp), AVG(temp), MAX(temp) FROM measurements GROUP BY city' |
| 142 | + const selectResult = (await sendCommandsAsync(connection, selectSql)) as SQLiteCloudRowset |
| 143 | + expect(selectResult).toBeTruthy() |
| 144 | + expect(selectResult.length).toBe(BRC_UNIQUE_STATIONS) |
| 145 | + |
| 146 | + console.log(`Ran ${numberOfRows} challenge in ${Date.now() - startedOn}ms`) |
| 147 | + debugger |
| 148 | + } catch (error) { |
| 149 | + console.error(`An error occoured while running 1brc, error: ${error}`) |
| 150 | + throw error |
| 151 | + } finally { |
| 152 | + // await destroyDatabaseAsync(connection, database) |
| 153 | + } |
| 154 | +} |
0 commit comments