Skip to content

Commit 78a6ea5

Browse files
authored
Merge pull request #171 from clue-labs/drain-throughstream
Fix `drain` event of `ThroughStream` to handle potential race condition
2 parents 3ec71bc + 6273e35 commit 78a6ea5

File tree

2 files changed

+70
-7
lines changed

2 files changed

+70
-7
lines changed

src/ThroughStream.php

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,19 @@ public function __construct($callback = null)
9393

9494
public function pause()
9595
{
96-
$this->paused = true;
96+
// only allow pause if still readable, false otherwise
97+
$this->paused = $this->readable;
9798
}
9899

99100
public function resume()
100101
{
102+
$this->paused = false;
103+
104+
// emit drain event if previous write was paused (throttled)
101105
if ($this->drain) {
102106
$this->drain = false;
103107
$this->emit('drain');
104108
}
105-
$this->paused = false;
106109
}
107110

108111
public function pipe(WritableStreamInterface $dest, array $options = array())
@@ -139,12 +142,13 @@ public function write($data)
139142

140143
$this->emit('data', array($data));
141144

145+
// emit drain event on next resume if currently paused (throttled)
142146
if ($this->paused) {
143147
$this->drain = true;
144-
return false;
145148
}
146149

147-
return true;
150+
// continue writing if still writable and not paused (throttled), false otherwise
151+
return $this->writable && !$this->paused;
148152
}
149153

150154
public function end($data = null)
@@ -164,7 +168,7 @@ public function end($data = null)
164168

165169
$this->readable = false;
166170
$this->writable = false;
167-
$this->paused = true;
171+
$this->paused = false;
168172
$this->drain = false;
169173

170174
$this->emit('end');
@@ -179,9 +183,10 @@ public function close()
179183

180184
$this->readable = false;
181185
$this->writable = false;
182-
$this->closed = true;
183-
$this->paused = true;
186+
$this->paused = false;
184187
$this->drain = false;
188+
189+
$this->closed = true;
185190
$this->callback = null;
186191

187192
$this->emit('close');

tests/ThroughStreamTest.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,30 @@ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
9595
$this->assertFalse($ret);
9696
}
9797

98+
/** @test */
99+
public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventEndsStream()
100+
{
101+
$through = new ThroughStream();
102+
$through->on('data', function () use ($through) {
103+
$through->end();
104+
});
105+
$ret = $through->write('foo');
106+
107+
$this->assertFalse($ret);
108+
}
109+
110+
/** @test */
111+
public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventClosesStream()
112+
{
113+
$through = new ThroughStream();
114+
$through->on('data', function () use ($through) {
115+
$through->close();
116+
});
117+
$ret = $through->write('foo');
118+
119+
$this->assertFalse($ret);
120+
}
121+
98122
/** @test */
99123
public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused()
100124
{
@@ -106,6 +130,40 @@ public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWh
106130
$through->resume();
107131
}
108132

133+
/** @test */
134+
public function itShouldNotEmitDrainOnResumeAfterClose()
135+
{
136+
$through = new ThroughStream();
137+
$through->close();
138+
139+
$through->on('drain', $this->expectCallableNever());
140+
$through->resume();
141+
}
142+
143+
/** @test */
144+
public function itShouldNotEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenThatCausesStreamToClose()
145+
{
146+
$through = new ThroughStream();
147+
$through->on('data', function () use ($through) { $through->close(); });
148+
$through->write('foo');
149+
150+
$through->on('drain', $this->expectCallableNever());
151+
$through->resume();
152+
}
153+
154+
/** @test */
155+
public function itShouldReturnFalseForAnyDataWrittenToItAfterPausingFromDrainEvent()
156+
{
157+
$through = new ThroughStream();
158+
$through->pause();
159+
$through->write('foo');
160+
161+
$through->on('drain', function () use ($through) { $through->pause(); });
162+
$through->resume();
163+
164+
$this->assertFalse($through->write('bar'));
165+
}
166+
109167
/** @test */
110168
public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause()
111169
{

0 commit comments

Comments
 (0)