File tree Expand file tree Collapse file tree 1 file changed +2
-1
lines changed Expand file tree Collapse file tree 1 file changed +2
-1
lines changed Original file line number Diff line number Diff line change 1111use Rx \Observable ;
1212use Rx \Operator \CutOperator ;
1313use Rx \React \Promise ;
14+ use Rx \Scheduler \ImmediateScheduler ;
1415
1516final class AsyncStreamingClient implements AsyncStreamingClientInterface
1617{
@@ -60,7 +61,7 @@ protected function stream(RequestInterface $request): Observable
6061 return Promise::toObservable ($ this ->client ->handle (new StreamingRequestCommand (
6162 $ request
6263 )))->switchLatest ()->lift (function () {
63- return new CutOperator (self ::STREAM_DELIMITER );
64+ return new CutOperator (self ::STREAM_DELIMITER , new ImmediateScheduler () );
6465 })->filter (function (string $ json ) {
6566 return trim ($ json ) !== '' ; // To keep the stream alive Twitter sends an empty line at times
6667 })->_ApiClients_jsonDecode ()->flatMap (function (array $ document ) {
You can’t perform that action at this time.
0 commit comments