Skip to content

Commit a34b163

Browse files
committed
Migrates to json-event-parser
Keeps buffering all the data in memory
1 parent df0af6d commit a34b163

File tree

6 files changed

+730
-744
lines changed

6 files changed

+730
-744
lines changed

lib/ContextTree.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import {JsonLdContextNormalized} from "jsonld-context-parser";
88
*/
99
export class ContextTree {
1010

11-
private readonly subTrees: {[key: string]: ContextTree} = {};
11+
private readonly subTrees: {[key: string | number]: ContextTree} = {};
1212
private context: Promise<JsonLdContextNormalized> | null;
1313

14-
public getContext(keys: string[]): Promise<{ context: JsonLdContextNormalized, depth: number }> | null {
14+
public getContext(keys: (string | number)[]): Promise<{ context: JsonLdContextNormalized, depth: number }> | null {
1515
if (keys.length > 0) {
1616
const [head, ...tail] = keys;
1717
const subTree = this.subTrees[head];
@@ -25,7 +25,7 @@ export class ContextTree {
2525
return this.context ? this.context.then((context) => ({ context, depth: 0 })) : null;
2626
}
2727

28-
public setContext(keys: any[], context: Promise<JsonLdContextNormalized> | null) {
28+
public setContext(keys: (string | number)[], context: Promise<JsonLdContextNormalized> | null) {
2929
if (keys.length === 0) {
3030
this.context = context;
3131
} else {

lib/JsonLdParser.ts

Lines changed: 93 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import * as RDF from "@rdfjs/types";
2-
// tslint:disable-next-line:no-var-requires
3-
const Parser = require('jsonparse');
42
import {ERROR_CODES, ErrorCoded, IDocumentLoader, JsonLdContext, Util as ContextUtil} from "jsonld-context-parser";
5-
import {PassThrough, Transform, Readable} from "readable-stream";
3+
// @ts-ignore The types are not updated yet
4+
import {PassThrough, Transform, Stream, pipeline} from "readable-stream";
65
import {EntryHandlerArrayValue} from "./entryhandler/EntryHandlerArrayValue";
76
import {EntryHandlerContainer} from "./entryhandler/EntryHandlerContainer";
87
import {EntryHandlerInvalidFallback} from "./entryhandler/EntryHandlerInvalidFallback";
@@ -20,6 +19,9 @@ import {EntryHandlerKeywordValue} from "./entryhandler/keyword/EntryHandlerKeywo
2019
import {ParsingContext} from "./ParsingContext";
2120
import {Util} from "./Util";
2221
import {parse as parseLinkHeader} from "http-link-header";
22+
import {JsonEventParser} from "json-event-parser";
23+
import {JsonEvent} from "json-event-parser/lib/JsonEventParser";
24+
2325

2426
/**
2527
* A stream transformer that parses JSON-LD (text) streams to an {@link RDF.Stream}.
@@ -46,11 +48,10 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
4648
private readonly parsingContext: ParsingContext;
4749
private readonly util: Util;
4850

49-
private readonly jsonParser: any;
5051
// Jobs that are not started yet that process a @context (only used if streamingProfile is false)
5152
private readonly contextJobs: (() => Promise<void>)[][];
5253
// Jobs that are not started yet that process a @type (only used if streamingProfile is false)
53-
private readonly typeJobs: { job: () => Promise<void>, keys: string[] }[];
54+
private readonly typeJobs: { job: () => Promise<void>, keys: (string | number)[] }[];
5455
// Jobs that are not started yet because of a missing @context or @type (only used if streamingProfile is false)
5556
private readonly contextAwaitingJobs: { job: () => Promise<void>, keys: string[] }[];
5657

@@ -60,30 +61,27 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
6061
private lastKeys: any[];
6162
// A promise representing the last job
6263
private lastOnValueJob: Promise<void>;
64+
// The keys inside of the JSON tree
65+
private readonly jsonKeyStack: (string | number)[];
66+
// The value inside of the JSON tree
67+
private readonly jsonValueStack: any[];
6368

6469
constructor(options?: IJsonLdParserOptions) {
65-
super({ readableObjectMode: true });
70+
super({ readableObjectMode: true, writableObjectMode: true });
6671
options = options || {};
6772
this.options = options;
6873
this.parsingContext = new ParsingContext({ parser: this, ...options });
6974
this.util = new Util({ dataFactory: options.dataFactory, parsingContext: this.parsingContext });
7075

71-
this.jsonParser = new Parser();
7276
this.contextJobs = [];
7377
this.typeJobs = [];
7478
this.contextAwaitingJobs = [];
7579

7680
this.lastDepth = 0;
7781
this.lastKeys = [];
7882
this.lastOnValueJob = Promise.resolve();
79-
80-
this.attachJsonParserListeners();
81-
82-
this.on('end', () => {
83-
if (typeof this.jsonParser.mode !== 'undefined') {
84-
this.emit('error', new Error('Unclosed document'))
85-
}
86-
})
83+
this.jsonKeyStack = [];
84+
this.jsonValueStack = [];
8785
}
8886

8987
/**
@@ -157,22 +155,20 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
157155
* @return {RDF.Stream} A quad stream.
158156
*/
159157
public import(stream: EventEmitter): RDF.Stream {
160-
if('pipe' in stream) {
161-
stream.on('error', (error) => parsed.emit('error', error));
162-
const parsed = (<Readable>stream).pipe(new JsonLdParser(this.options));
163-
return parsed;
164-
} else {
165-
const output = new PassThrough({ readableObjectMode: true });
166-
stream.on('error', (error) => parsed.emit('error', error));
167-
stream.on('data', (data) => output.push(data));
168-
stream.on('end', () => output.push(null));
169-
const parsed = output.pipe(new JsonLdParser(this.options));
170-
return parsed;
158+
let input: Stream = (<Stream>stream);
159+
if(!('pipe' in stream)) {
160+
input = new PassThrough({ readableObjectMode: true });
161+
stream.on('error', (error) => input.emit('error', error));
162+
stream.on('data', (data) => input.push(data));
163+
stream.on('end', () => input.push(null));
171164
}
165+
return pipeline(input, new JsonEventParser(), new JsonLdParser(this.options), (err: any) => {
166+
// We ignore the error?
167+
});
172168
}
173169

174-
public _transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void): void {
175-
this.jsonParser.write(chunk);
170+
public _transform(event: any, _encoding: string, callback: (error?: Error | null, data?: any) => void): void {
171+
this.onJsonEvent(event);
176172
this.lastOnValueJob
177173
.then(() => callback(), (error) => callback(error));
178174
}
@@ -199,7 +195,7 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
199195
if (listPointer) {
200196
// Terminate the list if the had at least one value
201197
if (listPointer.value) {
202-
this.emit('data', this.util.dataFactory.quad(listPointer.value, this.util.rdfRest, this.util.rdfNil,
198+
this.push(this.util.dataFactory.quad(listPointer.value, this.util.rdfRest, this.util.rdfNil,
203199
this.util.getDefaultGraph()));
204200
}
205201

@@ -413,65 +409,79 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
413409
*
414410
* This should only be called once.
415411
*/
416-
protected attachJsonParserListeners() {
417-
// Listen to json parser events
418-
this.jsonParser.onValue = (value: any) => {
419-
const depth = this.jsonParser.stack.length;
420-
const keys = (new Array(depth + 1).fill(0)).map((v, i) => {
421-
return i === depth ? this.jsonParser.key : this.jsonParser.stack[i].key;
422-
});
423-
424-
if (!this.isParsingContextInner(depth)) { // Don't parse inner nodes inside @context
425-
const valueJobCb = () => this.newOnValueJob(keys, value, depth, true);
426-
if (!this.parsingContext.streamingProfile
412+
protected onJsonEvent(event: JsonEvent) {
413+
let key: any;
414+
let value: any;
415+
switch (event.type) {
416+
case 'open-object':
417+
this.insertInStack(event.key, {}, true);
418+
return;
419+
case 'open-array':
420+
this.insertInStack(event.key, [], true);
421+
return;
422+
case 'value':
423+
this.insertInStack(event.key, event.value, false);
424+
key = event.key;
425+
value = event.value;
426+
break;
427+
case 'close-object':
428+
case 'close-array':
429+
key = this.jsonKeyStack[this.jsonKeyStack.length - 1];
430+
value = this.jsonValueStack[this.jsonValueStack.length - 1];
431+
}
432+
433+
const depth = this.jsonKeyStack.length;
434+
const keys = <string[]><any[]>[undefined, ...this.jsonKeyStack];
435+
436+
if (!this.isParsingContextInner()) { // Don't parse inner nodes inside @context
437+
const valueJobCb = () => this.newOnValueJob(keys, value, depth, true);
438+
if (!this.parsingContext.streamingProfile
427439
&& !this.parsingContext.contextTree.getContext(keys.slice(0, -1))) {
428440
// If an out-of-order context is allowed,
429441
// we have to buffer everything.
430442
// We store jobs for @context's and @type's separately,
431443
// because at the end, we have to process them first.
432444
// We also handle @type because these *could* introduce a type-scoped context.
433-
if (keys[depth] === '@context') {
434-
let jobs = this.contextJobs[depth];
435-
if (!jobs) {
436-
jobs = this.contextJobs[depth] = [];
437-
}
438-
jobs.push(valueJobCb);
439-
} else if (keys[depth] === '@type'
440-
|| typeof keys[depth] === 'number' && keys[depth - 1] === '@type') { // Also capture @type with array values
441-
// Remove @type from keys, because we want it to apply to parent later on
442-
this.typeJobs.push({ job: valueJobCb, keys: keys.slice(0, keys.length - 1) });
443-
} else {
444-
this.contextAwaitingJobs.push({ job: valueJobCb, keys });
445+
if (key === '@context') {
446+
let jobs = this.contextJobs[depth];
447+
if (!jobs) {
448+
jobs = this.contextJobs[depth] = [];
445449
}
450+
jobs.push(valueJobCb);
451+
} else if (key === '@type'
452+
|| typeof key === 'number' && this.jsonKeyStack[this.jsonKeyStack.length - 2] === '@type') { // Also capture @type with array values
453+
// Remove @type from keys, because we want it to apply to parent later on
454+
this.typeJobs.push({ job: valueJobCb, keys: keys.slice(0, keys.length - 1) });
446455
} else {
447-
// Make sure that our value jobs are chained synchronously
448-
this.lastOnValueJob = this.lastOnValueJob.then(valueJobCb);
456+
this.contextAwaitingJobs.push({ job: valueJobCb, keys });
449457
}
458+
} else {
459+
// Make sure that our value jobs are chained synchronously
460+
this.lastOnValueJob = this.lastOnValueJob.then(valueJobCb);
461+
}
450462

451463
// Execute all buffered jobs on deeper levels
452-
if (!this.parsingContext.streamingProfile && depth === 0) {
453-
this.lastOnValueJob = this.lastOnValueJob
464+
if (!this.parsingContext.streamingProfile && depth === 0) {
465+
this.lastOnValueJob = this.lastOnValueJob
454466
.then(() => this.executeBufferedJobs());
455-
}
456467
}
457-
};
458-
this.jsonParser.onError = (error: Error) => {
459-
this.emit('error', error);
460-
};
468+
}
469+
470+
switch (event.type) {
471+
case 'close-object':
472+
case 'close-array':
473+
this.jsonValueStack.pop();
474+
case "value":
475+
this.jsonKeyStack.pop();
476+
}
461477
}
462478

463479
/**
464480
* Check if the parser is currently parsing an element that is part of an @context entry.
465-
* @param {number} depth A depth.
466481
* @return {boolean} A boolean.
467482
*/
468-
protected isParsingContextInner(depth: number) {
469-
for (let i = depth; i > 0; i--) {
470-
if (this.jsonParser.stack[i - 1].key === '@context') {
471-
return true;
472-
}
473-
}
474-
return false;
483+
protected isParsingContextInner() {
484+
return this.jsonKeyStack.slice(0, -1).includes('@context');
475485
}
476486

477487
/**
@@ -497,7 +507,7 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
497507
// We check all possible parent nodes for the current job, from root to leaves.
498508
if (this.typeJobs.length > 0) {
499509
// First collect all applicable type jobs
500-
const applicableTypeJobs: { job: () => Promise<void>, keys: string[] }[] = [];
510+
const applicableTypeJobs: { job: () => Promise<void>, keys: (string | number)[] }[] = [];
501511
const applicableTypeJobIds: number[] = [];
502512
for (let i = 0; i < this.typeJobs.length; i++) {
503513
const typeJob = this.typeJobs[i];
@@ -526,6 +536,19 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
526536
await job.job();
527537
}
528538
}
539+
540+
private insertInStack(key: string | number | undefined, value: any, push: boolean): void {
541+
if (typeof key === 'string') {
542+
this.jsonKeyStack.push(key);
543+
this.jsonValueStack[this.jsonValueStack.length - 1][key] = value;
544+
} else if (typeof key === 'number') {
545+
this.jsonKeyStack.push(key);
546+
this.jsonValueStack[this.jsonValueStack.length - 1].push(value);
547+
}
548+
if (push) {
549+
this.jsonValueStack.push(value);
550+
}
551+
}
529552
}
530553

531554
/**

lib/Util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class Util {
156156
* @param needle An array to check if it is a prefix.
157157
* @param haystack An array to look in.
158158
*/
159-
public static isPrefixArray(needle: string[], haystack: string[]): boolean {
159+
public static isPrefixArray(needle: any[], haystack: any[]): boolean {
160160
if (needle.length > haystack.length) {
161161
return false;
162162
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"canonicalize": "^1.0.1",
3737
"http-link-header": "^1.0.2",
3838
"jsonld-context-parser": "^2.1.3",
39-
"jsonparse": "^1.3.1",
39+
"json-event-parser": "1.0.0-beta.1",
4040
"rdf-data-factory": "^1.1.0",
4141
"readable-stream": "^4.0.0"
4242
},

0 commit comments

Comments
 (0)