Skip to content

Commit 4d8a841

Browse files
committed
switch to LineTransformStream
this breaks unit tests for some reason :(
1 parent 59d029e commit 4d8a841

File tree

4 files changed

+661
-131
lines changed

4 files changed

+661
-131
lines changed

index.ts

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { join, sep } from 'path'
55
import { Readable, Writable } from 'stream'
66
import { writeFile, writeFileSync } from 'fs';
77
import { promisify } from 'util';
8+
const LineTransformStream = require('line-transform-stream')
89

910
function toArray<T>(source?: T | T[]): T[] {
1011
if (typeof source === 'undefined' || source === null) {
@@ -147,14 +148,19 @@ export class PythonShell extends EventEmitter {
147148
self.parser && self[name] && self[name].setEncoding(options.encoding || 'utf8');
148149
});
149150

150-
// parse incoming data on stdout
151+
// Node buffers stdout&stderr in batches regardless of newline placement
152+
// This is troublesome if you want to recieve distinct individual messages
153+
// for example JSON parsing breaks if it recieves partial JSON
154+
// so we use LineTransformStream to emit each batch seperated by newline
151155
if (this.parser && this.stdout) {
152-
this.stdout.on('data', this.receive.bind(this));
156+
this.stdout.pipe(new LineTransformStream((data) => {
157+
this.emit('message', self.parser(data));
158+
}))
153159
}
154-
155-
// listen to stderr and emit errors for incoming data
156160
if (this.stderrParser && this.stderr) {
157-
this.stderr.on('data', this.receiveStderr.bind(this));
161+
this.stdout.pipe(new LineTransformStream((data) => {
162+
this.emit('stderr', self.stderrParser(data));
163+
}))
158164
}
159165

160166
if (this.stderr) {
@@ -178,7 +184,7 @@ export class PythonShell extends EventEmitter {
178184
self.stdoutHasEnded = true;
179185
}
180186

181-
this.childProcess.on('error', function(err: NodeJS.ErrnoException){
187+
this.childProcess.on('error', function (err: NodeJS.ErrnoException) {
182188
self.emit('error', err);
183189
})
184190
this.childProcess.on('exit', function (code, signal) {
@@ -350,50 +356,6 @@ export class PythonShell extends EventEmitter {
350356
return this;
351357
};
352358

353-
/**
354-
* Parses data received from the Python shell stdout stream and emits "message" events
355-
* This method is not used in binary mode
356-
* Override this method to parse incoming data from the Python process into messages
357-
* @param {string|Buffer} data The data to parse into messages
358-
*/
359-
receive(data: string | Buffer) {
360-
return this.receiveInternal(data, 'message');
361-
};
362-
363-
/**
364-
* Parses data received from the Python shell stderr stream and emits "stderr" events
365-
* This method is not used in binary mode
366-
* Override this method to parse incoming logs from the Python process into messages
367-
* @param {string|Buffer} data The data to parse into messages
368-
*/
369-
receiveStderr(data: string | Buffer) {
370-
return this.receiveInternal(data, 'stderr');
371-
};
372-
373-
private receiveInternal(data: string | Buffer, emitType: 'message' | 'stderr') {
374-
let self = this;
375-
let parts = ('' + data).split(newline);
376-
377-
if (parts.length === 1) {
378-
// an incomplete record, keep buffering
379-
this._remaining = (this._remaining || '') + parts[0];
380-
return this;
381-
}
382-
383-
let lastLine = parts.pop();
384-
// fix the first line with the remaining from the previous iteration of 'receive'
385-
parts[0] = (this._remaining || '') + parts[0];
386-
// keep the remaining for the next iteration of 'receive'
387-
this._remaining = lastLine;
388-
389-
parts.forEach(function (part) {
390-
if (emitType == 'message') self.emit(emitType, self.parser(part));
391-
else if (emitType == 'stderr') self.emit(emitType, self.stderrParser(part));
392-
});
393-
394-
return this;
395-
}
396-
397359
/**
398360
* Closes the stdin stream. Unless python is listening for stdin in a loop
399361
* this should cause the process to finish its work and close.

0 commit comments

Comments
 (0)