Skip to content

Commit df0af6d

Browse files
authored
Use .pipe in .import if available
Required for #71
1 parent 736be04 commit df0af6d

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

lib/JsonLdParser.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as RDF from "@rdfjs/types";
22
// tslint:disable-next-line:no-var-requires
33
const Parser = require('jsonparse');
44
import {ERROR_CODES, ErrorCoded, IDocumentLoader, JsonLdContext, Util as ContextUtil} from "jsonld-context-parser";
5-
import {PassThrough, Transform} from "readable-stream";
5+
import {PassThrough, Transform, Readable} from "readable-stream";
66
import {EntryHandlerArrayValue} from "./entryhandler/EntryHandlerArrayValue";
77
import {EntryHandlerContainer} from "./entryhandler/EntryHandlerContainer";
88
import {EntryHandlerInvalidFallback} from "./entryhandler/EntryHandlerInvalidFallback";
@@ -157,12 +157,18 @@ export class JsonLdParser extends Transform implements RDF.Sink<EventEmitter, RD
157157
* @return {RDF.Stream} A quad stream.
158158
*/
159159
public import(stream: EventEmitter): RDF.Stream {
160-
const output = new PassThrough({ readableObjectMode: true });
161-
stream.on('error', (error) => parsed.emit('error', error));
162-
stream.on('data', (data) => output.push(data));
163-
stream.on('end', () => output.push(null));
164-
const parsed = output.pipe(new JsonLdParser(this.options));
165-
return parsed;
160+
if('pipe' in stream) {
161+
stream.on('error', (error) => parsed.emit('error', error));
162+
const parsed = (<Readable>stream).pipe(new JsonLdParser(this.options));
163+
return parsed;
164+
} else {
165+
const output = new PassThrough({ readableObjectMode: true });
166+
stream.on('error', (error) => parsed.emit('error', error));
167+
stream.on('data', (data) => output.push(data));
168+
stream.on('end', () => output.push(null));
169+
const parsed = output.pipe(new JsonLdParser(this.options));
170+
return parsed;
171+
}
166172
}
167173

168174
public _transform(chunk: any, encoding: string, callback: (error?: Error | null, data?: any) => void): void {

test/JsonLdParser-test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {JsonLdParser} from "../index";
22
import arrayifyStream from 'arrayify-stream';
33
const streamifyString = require('streamify-string');
44
import * as RDF from "@rdfjs/types";
5+
import { EventEmitter } from 'events';
56
import {DataFactory} from "rdf-data-factory";
67
import each from 'jest-each';
78
import "jest-rdf";
@@ -12445,10 +12446,34 @@ describe('JsonLdParser', () => {
1244512446
]);
1244612447
});
1244712448

12449+
12450+
it('should parse a bad stream', async () => {
12451+
const stream = new EventEmitter();
12452+
const result = parser.import(stream);
12453+
stream.emit("data", `
12454+
{
12455+
"@id": "http://example.org/node",
12456+
"http://example.org/p": "def"
12457+
}`);
12458+
stream.emit("end");
12459+
return expect(await arrayifyStream(result)).toBeRdfIsomorphic([
12460+
DF.quad(DF.namedNode('http://example.org/node'),
12461+
DF.namedNode('http://example.org/p'),
12462+
DF.literal('def')),
12463+
]);
12464+
});
12465+
1244812466
it('should forward error events', async () => {
1244912467
const stream = new PassThrough();
1245012468
stream._read = () => stream.emit('error', new Error('my error'));
1245112469
return expect(arrayifyStream(parser.import(stream))).rejects.toThrow(new Error('my error'));
1245212470
});
12471+
12472+
it('should forward error events with a bad stream', async () => {
12473+
const stream = new EventEmitter();
12474+
const result = parser.import(stream);
12475+
stream.emit('error', new Error('my error'));
12476+
return expect(arrayifyStream(result)).rejects.toThrow(new Error('my error'));
12477+
});
1245312478
});
1245412479
});

0 commit comments

Comments
 (0)