Skip to content

Commit 22aa8bc

Browse files
committed
remove LineTransformStream
I went in favor of a native solution. LineTransformStream had some bugs.
1 parent 9c7be5f commit 22aa8bc

File tree

6 files changed

+88
-46
lines changed

6 files changed

+88
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
### Changed
33
- **BREAKING** Default python path changed back to `python` on Windows. [#237](https://github.com/extrabacon/python-shell/issues/237)
44
- **BREAKING** `error` event renamed to `pythonError` event. [#118](https://github.com/extrabacon/python-shell/issues/118)
5+
- **BREAKING** `receive` methods removed in favor of `splitter` arguments in the constructor. This lets the default splitting logic reside in a reuseable stream transformer. Now if you have extra pipes you can reuse `newlineTransformer` to split incoming data into newline-seperated lines.
56

67
### Added
78
- `error` event that is fired upon failure to launch process, among other things. [#118](https://github.com/extrabacon/python-shell/issues/118)

README.md

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -161,17 +161,19 @@ Creates an instance of `PythonShell` and starts the Python process
161161
* `script`: the path of the script to execute
162162
* `options`: the execution options, consisting of:
163163
* `mode`: Configures how data is exchanged when data flows through stdin and stdout. The possible values are:
164-
* `text`: each line of data (ending with "\n") is emitted as a message (default)
165-
* `json`: each line of data (ending with "\n") is parsed as JSON and emitted as a message
164+
* `text`: each line of data is emitted as a message (default)
165+
* `json`: each line of data is parsed as JSON and emitted as a message
166166
* `binary`: data is streamed as-is through `stdout` and `stdin`
167-
* `formatter`: each message to send is transformed using this method, then appended with "\n"
168-
* `parser`: each line of data (ending with "\n") is parsed with this function and its result is emitted as a message
169-
* `stderrParser`: each line of logs (ending with "\n") is parsed with this function and its result is emitted as a message
167+
* `formatter`: each message to send is transformed using this method, then appended with a newline
168+
* `parser`: each line of data is parsed with this function and its result is emitted as a message
169+
* `stderrParser`: each line of logs is parsed with this function and its result is emitted as a message
170170
* `encoding`: the text encoding to apply on the child process streams (default: "utf8")
171171
* `pythonPath`: The path where to locate the "python" executable. Default: "python3" ("python" for Windows)
172172
* `pythonOptions`: Array of option switches to pass to "python"
173173
* `scriptPath`: The default path where to look for scripts. Default is the current working directory.
174174
* `args`: Array of arguments to pass to the script
175+
* `stdoutSplitter`: splits stdout into chunks, defaulting to splitting into newline-seperated lines
176+
* `stderrSplitter`: splits stderr into chunks, defaulting to splitting into newline-seperated lines
175177

176178
Other options are forwarded to `child_process.spawn`.
177179

@@ -269,14 +271,6 @@ let shell = new PythonShell('script.py', { mode: 'json'});
269271
shell.send({ command: "do_stuff", args: [1, 2, 3] });
270272
```
271273

272-
#### `.receive(data)`
273-
274-
Parses incoming data from the Python script written via stdout and emits `message` events. This method is called automatically as data is being received from stdout.
275-
276-
#### `.receiveStderr(data)`
277-
278-
Parses incoming logs from the Python script written via stderr and emits `stderr` events. This method is called automatically as data is being received from stderr.
279-
280274
#### `.end(callback)`
281275

282276
Closes the stdin stream, allowing the Python script to finish and exit. The optional callback is invoked when the process is terminated.
@@ -287,7 +281,7 @@ Terminates the python script. A kill signal may be provided by `signal`, if `sig
287281

288282
#### event: `message`
289283

290-
Fires when a chunk of data is parsed from the stdout stream via the `receive` method. If a `parser` method is specified, the result of this function will be the message value. This event is not emitted in binary mode.
284+
After the stdout stream is split into chunks by stdoutSplitter the chunks are parsed by the parser and a message event is emitted for each parsed chunk. This event is not emitted in binary mode.
291285

292286
Example:
293287

@@ -307,7 +301,7 @@ shell.on('message', function (message) {
307301

308302
#### event: `stderr`
309303

310-
Fires when a chunk of logs is parsed from the stderr stream via the `receiveStderr` method. If a `stderrParser` method is specified, the result of this function will be the message value. This event is not emitted in binary mode.
304+
After the stderr stream is split into chunks by stderrSplitter the chunks are parsed by the parser and a message event is emitted for each parsed chunk. This event is not emitted in binary mode.
311305

312306
Example:
313307

@@ -336,6 +330,10 @@ Fires when:
336330

337331
If the process could not be spawned please double-check that python can be launched from the terminal.
338332

333+
### NewlineTransformer
334+
335+
A utility class for splitting stream data into newlines. Used as the default for stdoutSplitter and stderrSplitter if they are unspecified. You can use this class for any extra python streams if you'd like.
336+
339337
## Used By:
340338

341339
Python-Shell is used by [arepl-vscode](https://github.com/almenon/arepl-vscode), [gitinspector](https://github.com/ejwa/gitinspector), [pyspreadsheet](https://github.com/extrabacon/pyspreadsheet), [AtlantOS Ocean Data QC](https://github.com/ocean-data-qc/ocean-data-qc) and more!

index.ts

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ import { EventEmitter } from 'events';
22
import { ChildProcess, spawn, SpawnOptions, exec, execSync } from 'child_process';
33
import { EOL as newline, tmpdir } from 'os';
44
import { join, sep } from 'path'
5-
import { Readable, Writable } from 'stream'
5+
import { Readable, Transform, TransformCallback, Writable } from 'stream'
66
import { writeFile, writeFileSync } from 'fs';
77
import { promisify } from 'util';
8-
const LineTransformStream = require('line-transform-stream')
98

109
function toArray<T>(source?: T | T[]): T[] {
1110
if (typeof source === 'undefined' || source === null) {
@@ -44,9 +43,9 @@ export interface Options extends SpawnOptions {
4443
* if binary is enabled message and stderr events will not be emitted
4544
*/
4645
mode?: 'text' | 'json' | 'binary'
47-
formatter?: (param: string) => any
48-
parser?: (param: string) => any
49-
stderrParser?: (param: string) => any
46+
formatter?: string | ((param: string) => any)
47+
parser?: string | ((param: string) => any)
48+
stderrParser?: string | ((param: string) => any)
5049
encoding?: string
5150
pythonPath?: string
5251
/**
@@ -68,6 +67,28 @@ export class PythonShellError extends Error {
6867
exitCode?: number;
6968
}
7069

70+
/**
71+
* Takes in a string stream and emits batches seperated by newlines
72+
*/
73+
export class NewlineTransformer extends Transform {
74+
// NewlineTransformer: Megatron's little known once-removed cousin
75+
private _lastLineData: string;
76+
_transform(chunk: any, encoding: string, callback: TransformCallback){
77+
let data: string = chunk.toString()
78+
if (this._lastLineData) data = this._lastLineData + data
79+
const lines = data.split(newline)
80+
this._lastLineData = lines.pop()
81+
//@ts-ignore this works, node ignores the encoding if it's a number
82+
lines.forEach(this.push.bind(this))
83+
callback()
84+
}
85+
_flush(done: TransformCallback){
86+
if (this._lastLineData) this.push(this._lastLineData)
87+
this._lastLineData = null;
88+
done()
89+
}
90+
}
91+
7192
/**
7293
* An interactive Python shell exchanging data through stdio
7394
* @param {string} script The python script to execute
@@ -103,7 +124,7 @@ export class PythonShell extends EventEmitter {
103124
* @param scriptPath path to script. Relative to current directory or options.scriptFolder if specified
104125
* @param options
105126
*/
106-
constructor(scriptPath: string, options?: Options) {
127+
constructor(scriptPath: string, options?: Options, stdoutSplitter: Transform = null, stderrSplitter: Transform = null) {
107128
super();
108129

109130
/**
@@ -151,16 +172,24 @@ export class PythonShell extends EventEmitter {
151172
// Node buffers stdout&stderr in batches regardless of newline placement
152173
// This is troublesome if you want to recieve distinct individual messages
153174
// for example JSON parsing breaks if it recieves partial JSON
154-
// so we use LineTransformStream to emit each batch seperated by newline
175+
// so we use newlineTransformer to emit each batch seperated by newline
155176
if (this.parser && this.stdout) {
156-
this.stdout.pipe(new LineTransformStream((data) => {
157-
this.emit('message', self.parser(data));
158-
}))
177+
if(!stdoutSplitter) stdoutSplitter = new NewlineTransformer()
178+
// note that setting the encoding turns the chunk into a string
179+
stdoutSplitter.setEncoding(options.encoding || 'utf8')
180+
this.stdout.pipe(stdoutSplitter).on('data', (chunk: string) => {
181+
this.emit('message', self.parser(chunk));
182+
});
159183
}
184+
185+
// listen to stderr and emit errors for incoming data
160186
if (this.stderrParser && this.stderr) {
161-
this.stderr.pipe(new LineTransformStream((data) => {
162-
this.emit('stderr', self.stderrParser(data));
163-
}))
187+
if(!stderrSplitter) stderrSplitter = new NewlineTransformer()
188+
// note that setting the encoding turns the chunk into a string
189+
stderrSplitter.setEncoding(options.encoding || 'utf8')
190+
this.stderr.pipe(stderrSplitter).on('data', (chunk: string) => {
191+
this.emit('stderr', self.stderrParser(chunk));
192+
});
164193
}
165194

166195
if (this.stderr) {

package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,10 @@
1010
"appveyorTest": "tsc -p ./ && nyc mocha --reporter mocha-appveyor-reporter test/*.js",
1111
"compile": "tsc -watch -p ./"
1212
},
13-
"dependencies": {
14-
"line-transform-stream": "^0.1.0"
15-
},
1613
"devDependencies": {
1714
"@types/mocha": "^8.2.1",
1815
"@types/node": "^10.5.2",
16+
"@types/should": "^13.0.0",
1917
"mocha": "^8.2.1",
2018
"mocha-appveyor-reporter": "^0.4.0",
2119
"should": "^13.2.1",
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import sys, json, os
2+
3+
for line in sys.stdin:
4+
line = line.replace('$', os.linesep)
5+
print(line[:-1], end='')

test/test-python-shell.ts

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ describe('PythonShell', function () {
328328
});
329329
});
330330

331-
describe('.receive(data)', function () {
331+
describe('stdout', function () {
332332
it('should emit messages as strings when mode is "text"', function (done) {
333333
let pyshell = new PythonShell('echo_text.py', {
334334
mode: 'text'
@@ -358,15 +358,16 @@ describe('PythonShell', function () {
358358
}).end(done);
359359
});
360360
it('should properly buffer partial messages', function (done) {
361-
let pyshell = new PythonShell('echo_text.py', {
362-
mode: 'json'
361+
// echo_text_with_newline_control echoes text with $'s replaced with newlines
362+
let pyshell = new PythonShell('echo_text_with_newline_control.py', {
363+
mode: 'text'
363364
});
364365
pyshell.on('message', (message) => {
365366
console.log(message)
366-
message.should.be.an.Object;
367-
message.should.eql({ a: true });
368-
}).send('{"a"').send(':').send('true}' + newline + '{').send('"a":true}' + newline).end(() => {
369-
console.log('done called')
367+
let messageObject = JSON.parse(message)
368+
messageObject.should.be.an.Object;
369+
messageObject.should.eql({ a: true });
370+
}).send('{"a"').send(':').send('true}${').send('"a":true}$').end(() => {
370371
done()
371372
});
372373
});
@@ -375,10 +376,10 @@ describe('PythonShell', function () {
375376
args: ['hello', 'world'],
376377
mode: 'binary'
377378
});
378-
pyshell.receive = function () {
379+
pyshell.on('message', ()=>{
379380
done('should not emit messages in binary mode');
380381
return undefined
381-
};
382+
});
382383
pyshell.end(done);
383384
});
384385
it('should use a custom parser function', function (done) {
@@ -399,7 +400,7 @@ describe('PythonShell', function () {
399400
});
400401
});
401402

402-
describe('.receiveStderr(data)', function () {
403+
describe('stderr', function () {
403404
it('should emit stderr logs as strings when mode is "text"', function (done) {
404405
let pyshell = new PythonShell('stderrLogging.py', {
405406
mode: 'text'
@@ -415,13 +416,14 @@ describe('PythonShell', function () {
415416
});
416417
it('should not be invoked when mode is "binary"', function (done) {
417418
let pyshell = new PythonShell('stderrLogging.py', {
418-
mode: 'binary'
419+
stderrParser: 'binary'
419420
});
420-
pyshell.receiveStderr = function () {
421+
pyshell.on('stderr', ()=>{
421422
done('should not emit stderr in binary mode');
422-
return undefined
423-
};
424-
pyshell.end(done);
423+
});
424+
pyshell.end(()=>{
425+
done()
426+
});
425427
});
426428
it('should use a custom parser function', function (done) {
427429
let pyshell = new PythonShell('stderrLogging.py', {
@@ -501,6 +503,15 @@ describe('PythonShell', function () {
501503
done();
502504
});
503505
});
506+
it('should work in json mode', function (done) {
507+
let pyshell = new PythonShell('error.py', {mode: 'json'});
508+
pyshell.on('pythonError', function (err) {
509+
err.stack.should.containEql('----- Python Traceback -----');
510+
err.stack.should.containEql('File "test' + sep + 'python' + sep + 'error.py", line 4');
511+
err.stack.should.containEql('File "test' + sep + 'python' + sep + 'error.py", line 6');
512+
done();
513+
});
514+
});
504515
});
505516

506517
describe('.terminate()', function () {

0 commit comments

Comments
 (0)