Skip to content

Commit 9d42d51

Browse files
committed
fix: Index AppMaps in parallel
This prevents one long-running CPU-bound appmap analysis from holding up the entire queue.
1 parent e7a85ca commit 9d42d51

File tree

8 files changed

+163
-40
lines changed

8 files changed

+163
-40
lines changed

packages/cli/src/fingerprint/appmapIndex.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,11 @@ export default class AppMapIndex {
3535
* abort.
3636
*/
3737
async initialize(): Promise<boolean> {
38-
try {
39-
const stats = await fStat(this.appmapFileName);
40-
if (!stats.isFile()) return false;
41-
this.appmapCreatedAt = stats.mtimeMs;
42-
this.size = stats.size;
43-
} catch (error) {
44-
console.log(`File ${this.appmapFileName} does not exist or is not a file.`);
45-
return false;
46-
}
38+
const stats = await fStat(this.appmapFileName);
39+
// note ENOENT and similar errors should be handled upstream
40+
if (!stats.isFile()) return false;
41+
this.appmapCreatedAt = stats.mtimeMs;
42+
this.size = stats.size;
4743

4844
return true;
4945
}

packages/cli/src/fingerprint/fileTooLargeError.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ export default class FileTooLargeError extends Error {
55
super();
66
}
77

8+
name = 'FileTooLargeError';
9+
810
get message(): string {
911
const size = prettyBytes(this.bytes);
1012
const maxSize = prettyBytes(this.maxBytes);

packages/cli/src/fingerprint/fingerprintDirectoryCommand.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class FingerprintDirectoryCommand {
1616
}
1717

1818
const fpQueue = new FingerprintQueue();
19-
fpQueue.handler.on('index', ({ numEvents, metadata }) => {
19+
fpQueue.on('index', ({ numEvents, metadata }) => {
2020
this.appmaps += 1;
2121
this.events += numEvents;
2222
this.metadata = metadata;
Lines changed: 82 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,107 @@
1-
import { queue, QueueObject } from 'async';
1+
import { EventEmitter } from 'node:events';
2+
3+
import WorkerPool from '../lib/workerPool';
4+
import { isNodeError, verbose } from '../utils';
25
import FileTooLargeError from './fileTooLargeError';
3-
import Fingerprinter from './fingerprinter';
4-
import { isNodeError } from '../utils';
6+
import {
7+
FINGERPRINT_WORKER_FILE,
8+
type FingerprintTask,
9+
type FingerprintTaskResult,
10+
} from './fingerprintWorker';
11+
import type { FingerprintEvent } from './fingerprinter';
512

6-
export default class FingerprintQueue {
7-
public handler: Fingerprinter;
8-
public failOnError = true;
13+
/**
14+
* FingerprintQueue manages parallel processing of AppMap files using a worker pool.
15+
*
16+
* Each AppMap file is processed by a worker thread which:
17+
* 1. Reads and validates the AppMap file
18+
* 2. Generates a fingerprint (canonicalized representation)
19+
* 3. Saves the fingerprint to the index directory
20+
*
21+
* Error handling:
22+
* - FileTooLargeError: Logs a warning and skips the file
23+
* - ENOENT (file not found): Logs a warning and skips the file
24+
* - Other errors: Stores the error and continues processing other files
25+
* The error will be thrown when process() is called
26+
*
27+
* Events:
28+
* - 'index': Emitted when an AppMap is successfully fingerprinted
29+
* Payload: FingerprintEvent containing numEvents and metadata
30+
*
31+
* Example:
32+
* ```typescript
33+
* const queue = new FingerprintQueue(4); // 4 worker threads
34+
* queue.on('index', ({ numEvents, metadata }) => {
35+
* console.log(`Indexed AppMap with ${numEvents} events`);
36+
* });
37+
* queue.push('path/to/appmap.json');
38+
* await queue.process(); // Wait for all files to be processed
39+
* ```
40+
*
41+
* @emits index - Emitted when an AppMap is successfully fingerprinted
42+
*/
43+
// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging
44+
class FingerprintQueue extends EventEmitter {
945
private lastError: unknown;
10-
private queue: QueueObject<string>;
1146
private pending = new Set<string>();
47+
private pool: WorkerPool;
1248

13-
constructor(private size = 2) {
14-
// eslint-disable-next-line no-use-before-define
15-
this.handler = new Fingerprinter();
16-
this.queue = queue(async (appmapFileName) => {
17-
try {
18-
await this.handler.fingerprint(appmapFileName);
19-
} catch (e) {
20-
console.warn(`Error fingerprinting ${appmapFileName}: ${e}`);
21-
}
22-
this.pending.delete(appmapFileName);
23-
}, this.size);
24-
this.queue.drain(() => (this.handler.checkVersion = false));
25-
this.queue.error((error) => {
26-
if (error instanceof FileTooLargeError) {
49+
constructor(numThreads = 2) {
50+
super();
51+
this.pool = new WorkerPool(FINGERPRINT_WORKER_FILE, numThreads);
52+
}
53+
54+
#processed(err: Error | null, { error, result, appmapFile }: FingerprintTaskResult) {
55+
this.pending.delete(appmapFile);
56+
if (err) {
57+
console.warn(`Error fingerprinting ${appmapFile}: ${String(err)}`);
58+
return;
59+
}
60+
61+
if (error) {
62+
// eslint-disable-next-line no-param-reassign
63+
error = WorkerPool.recoverError(error);
64+
// Note errors transported over IPC won't have the same prototype.
65+
if (error.name === 'FileTooLargeError') {
2766
console.warn(
2867
[
2968
`Skipped: ${error.message}`,
3069
'Tip: consider recording a shorter interaction or removing some classes from appmap.yml.',
3170
].join('\n')
3271
);
3372
} else if (isNodeError(error, 'ENOENT')) {
34-
console.warn(`Skipped: ${error.path}\nThe file does not exist.`);
35-
} else if (this.failOnError) {
36-
this.lastError = error;
73+
console.warn(`Skipped: ${appmapFile}\nThe file does not exist.`);
3774
} else {
38-
console.warn(`Skipped: ${error}`);
75+
console.warn(`Skipped: ${String(error)}`);
76+
this.lastError = err ?? error;
3977
}
40-
});
78+
}
79+
if (result) this.emit('index', result);
4180
}
4281

4382
async process() {
44-
if (!this.queue.idle()) await this.queue.drain();
83+
await this.pool.drain();
4584
if (this.lastError) throw this.lastError;
4685
}
4786

4887
push(job: string) {
4988
if (this.pending.has(job)) return;
5089
this.pending.add(job);
51-
this.queue.push(job);
90+
this.pool.runTask(makeTask(job), this.#processed.bind(this));
5291
}
5392
}
93+
94+
function makeTask(appmapFile: string): FingerprintTask {
95+
return {
96+
verbose: verbose(),
97+
appmapFile,
98+
};
99+
}
100+
101+
// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging
102+
interface FingerprintQueue {
103+
on(event: 'index', listener: (data: FingerprintEvent) => void): this;
104+
emit(event: 'index', data: FingerprintEvent): boolean;
105+
}
106+
107+
export default FingerprintQueue;

packages/cli/src/fingerprint/fingerprintWatchCommand.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export default class FingerprintWatchCommand {
3838
await this.reportUsage(events);
3939
this.numProcessed += events.length;
4040
});
41-
this.eventAggregator.attach(this.fpQueue.handler, 'index');
41+
this.eventAggregator.attach(this.fpQueue, 'index');
4242
}
4343

4444
removePidfile() {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import assert from 'node:assert';
2+
import { existsSync } from 'node:fs';
3+
import { join } from 'node:path';
4+
import { isNativeError } from 'node:util/types';
5+
import { parentPort } from 'node:worker_threads';
6+
7+
import { verbose } from '../utils';
8+
import Fingerprinter, { FingerprintEvent } from './fingerprinter';
9+
10+
export type FingerprintTask = {
11+
verbose: boolean;
12+
appmapFile: string;
13+
};
14+
15+
export type FingerprintTaskResult = {
16+
appmapFile: string;
17+
error?: Error;
18+
result?: FingerprintEvent;
19+
};
20+
21+
if (parentPort) {
22+
const fingerprinter = new Fingerprinter();
23+
24+
// eslint-disable-next-line @typescript-eslint/no-misused-promises
25+
parentPort.on('message', async (task: FingerprintTask) => {
26+
assert(parentPort);
27+
verbose(task.verbose);
28+
const result: FingerprintTaskResult = {
29+
appmapFile: task.appmapFile,
30+
};
31+
try {
32+
result.result = await fingerprinter.fingerprint(task.appmapFile);
33+
} catch (e) {
34+
assert(isNativeError(e));
35+
result.error = {
36+
...e,
37+
name: e.name,
38+
message: e.message,
39+
stack: e.stack,
40+
cause: e.cause,
41+
};
42+
}
43+
parentPort.postMessage(result);
44+
});
45+
}
46+
47+
const builtWorkerFile = join(__dirname, '../../built/fingerprint/fingerprintWorker.js');
48+
export const FINGERPRINT_WORKER_FILE = existsSync(builtWorkerFile) ? builtWorkerFile : __filename;

packages/cli/src/lib/workerPool.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Worker } from 'worker_threads';
77
import assert from 'assert';
88
import { warn } from 'console';
99
import { verbose } from '../utils';
10+
import { isNativeError } from 'util/types';
1011

1112
const kTaskInfo = Symbol('kTaskInfo');
1213
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
@@ -169,4 +170,26 @@ export default class WorkerPool extends EventEmitter {
169170
this.freeWorkers.push(worker);
170171
this.emit(kWorkerFreedEvent);
171172
}
173+
174+
/**
175+
* Errors passed over IPC lose all properties.
176+
* This function restores the properties of the original error.
177+
*/
178+
static recoverError(e: unknown): Error {
179+
if (typeof e !== 'object') return new Error(String(e));
180+
if (e === null) return new Error('null');
181+
if (isNativeError(e)) return e;
182+
183+
// If the error is not a native error, we need to create a new one
184+
// and copy the properties over.
185+
// This is a workaround for the fact that the error object is not
186+
// serialized correctly over IPC.
187+
const error = new Error('message' in e ? String(e.message) : undefined);
188+
for (const key of Object.keys(e)) {
189+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
190+
error[key] = e[key];
191+
}
192+
193+
return error;
194+
}
172195
}

packages/cli/tests/unit/fingerprint/fingerprintQueue.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ describe(FingerprintQueue, () => {
55
beforeEach(() => verbose(process.env.DEBUG === 'true'));
66

77
it('gracefully handles files which cannot be read', async () => {
8-
const logSpy = jest.spyOn(console, 'log');
8+
const logSpy = jest.spyOn(console, 'warn');
99

1010
const queue = new FingerprintQueue();
1111
queue.push('missing-file.appmap.json');

0 commit comments

Comments
 (0)