Skip to content

Commit d62a539

Browse files
committed
Support object streams by buffering original write chunks in array
1 parent 00e269d commit d62a539

File tree

3 files changed

+52
-12
lines changed

3 files changed

+52
-12
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Alternatively, you can also refer to them with their fully-qualified name:
3333

3434
```php
3535
\React\Promise\Stream\buffer(…);
36-
```
36+
```
3737

3838
### buffer()
3939

@@ -190,8 +190,9 @@ a `Promise` which resolves with a `WritableStreamInterface`.
190190

191191
This function returns a writable stream instance (implementing `WritableStreamInterface`)
192192
right away which acts as a proxy for the future promise resolution.
193+
Any writes to this instance will be buffered in memory for when the promise resolves.
193194
Once the given Promise resolves with a `WritableStreamInterface`, any data you
194-
wrote to the proxy will be piped to the inner stream.
195+
have written to the proxy will be forwarded transparently to the inner stream.
195196

196197
```php
197198
//$promise = someFunctionWhichResolvesWithAStream();

src/UnwrapWritableStream.php

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class UnwrapWritableStream extends EventEmitter implements WritableStreamInterfa
1616
{
1717
private $promise;
1818
private $stream;
19-
private $buffer = '';
19+
private $buffer = array();
2020
private $closed = false;
2121
private $ending = false;
2222

@@ -69,10 +69,15 @@ function (WritableStreamInterface $stream) use ($out, &$store, &$buffer, &$endin
6969
$stream->on('close', array($out, 'close'));
7070
$out->on('close', array($stream, 'close'));
7171

72-
if ($buffer !== '') {
72+
if ($buffer) {
7373
// flush buffer to stream and check if its buffer is not exceeded
74-
$drained = $stream->write($buffer) !== false;
75-
$buffer = '';
74+
$drained = true;
75+
foreach ($buffer as $chunk) {
76+
if (!$stream->write($chunk)) {
77+
$drained = false;
78+
}
79+
}
80+
$buffer = array();
7681

7782
if ($drained) {
7883
// signal drain event, because the output stream previous signalled a full buffer
@@ -109,7 +114,7 @@ public function write($data)
109114
}
110115

111116
// append to buffer and signal the buffer is full
112-
$this->buffer .= $data;
117+
$this->buffer[] = $data;
113118
return false;
114119
}
115120

@@ -128,7 +133,7 @@ public function end($data = null)
128133

129134
// append to buffer
130135
if ($data !== null) {
131-
$this->buffer .= $data;
136+
$this->buffer[] = $data;
132137
}
133138
}
134139

@@ -143,7 +148,7 @@ public function close()
143148
return;
144149
}
145150

146-
$this->buffer = '';
151+
$this->buffer = array();
147152
$this->ending = true;
148153
$this->closed = true;
149154

tests/UnwrapWritableTest.php

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Clue\React\Block;
66
use React\EventLoop\Factory;
77
use React\Promise;
8+
use React\Promise\Deferred;
89
use React\Promise\Stream;
910
use React\Promise\Timer;
1011
use React\Stream\ThroughStream;
@@ -149,11 +150,30 @@ public function testForwardsDataImmediatelyIfPromiseIsAlreadyResolved()
149150
$stream->write('hello');
150151
}
151152

152-
public function testForwardsDataInOneGoOncePromiseResolves()
153+
public function testForwardsOriginalDataOncePromiseResolves()
153154
{
155+
$data = new \stdClass();
156+
154157
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
155158
$input->expects($this->once())->method('isWritable')->willReturn(true);
156-
$input->expects($this->once())->method('write')->with('helloworld');
159+
$input->expects($this->once())->method('write')->with($data);
160+
$input->expects($this->never())->method('end');
161+
162+
$promise = Timer\resolve(0.001, $this->loop)->then(function () use ($input) {
163+
return $input;
164+
});
165+
$stream = Stream\unwrapWritable($promise);
166+
167+
$stream->write($data);
168+
169+
$this->loop->run();
170+
}
171+
172+
public function testForwardsDataInOriginalChunksOncePromiseResolves()
173+
{
174+
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
175+
$input->expects($this->once())->method('isWritable')->willReturn(true);
176+
$input->expects($this->exactly(2))->method('write')->withConsecutive(array('hello'), array('world'));
157177
$input->expects($this->never())->method('end');
158178

159179
$promise = Timer\resolve(0.001, $this->loop)->then(function () use ($input) {
@@ -185,7 +205,7 @@ public function testForwardsDataAndEndOncePromiseResolves()
185205
{
186206
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
187207
$input->expects($this->once())->method('isWritable')->willReturn(true);
188-
$input->expects($this->once())->method('write')->with('helloworld!');
208+
$input->expects($this->exactly(3))->method('write')->withConsecutive(array('hello'), array('world'), array('!'));
189209
$input->expects($this->once())->method('end');
190210

191211
$promise = Timer\resolve(0.001, $this->loop)->then(function () use ($input) {
@@ -247,6 +267,20 @@ public function testEmitsErrorAndClosesWhenInputEmitsError()
247267
$this->assertFalse($stream->isWritable());
248268
}
249269

270+
public function testDoesNotEmitDrainWhenStreamBufferExceededAfterForwardingData()
271+
{
272+
$input = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
273+
$input->expects($this->once())->method('isWritable')->willReturn(true);
274+
$input->expects($this->once())->method('write')->with('hello')->willReturn(false);
275+
276+
$deferred = new Deferred();
277+
$stream = Stream\unwrapWritable($deferred->promise());
278+
$stream->write('hello');
279+
280+
$stream->on('drain', $this->expectCallableNever());
281+
$deferred->resolve($input);
282+
}
283+
250284
public function testEmitsDrainWhenInputEmitsDrain()
251285
{
252286
$input = new ThroughStream();

0 commit comments

Comments
 (0)