Skip to content

Commit 5a30a37

Browse files
author
Martin Durant
committed
Merge branch 'master' into docs
2 parents f16b464 + c48f93a commit 5a30a37

File tree

7 files changed

+247
-9
lines changed

7 files changed

+247
-9
lines changed

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Stream
2525
rate_limit
2626
scatter
2727
sink
28+
slice
2829
sliding_window
2930
starmap
3031
timed_window
@@ -41,6 +42,7 @@ Sources
4142
filenames
4243
from_kafka
4344
from_kafka_batched
45+
from_process
4446
from_textfile
4547
from_tcp
4648
from_http_server

examples/fib_asyncio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ def run_asyncio_loop():
2424
finally:
2525
loop.close()
2626

27+
2728
run_asyncio_loop()

streamz/collection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ def _repr_html_(self):
229229
return "<h5>%s - elements like<h5>\n%s" % (type(self).__name__, body)
230230

231231
def _ipython_display_(self, **kwargs):
232-
return self.stream.latest().rate_limit(0.5).gather()._ipython_display_(**kwargs)
232+
return self.stream.latest().rate_limit(
233+
0.5).gather()._ipython_display_(**kwargs)
233234

234235
def emit(self, x):
235236
self.verify(x)

streamz/core.py

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,10 @@ class filter(Stream):
622622
predicate : function
623623
The predicate. Should return True or False, where
624624
True means that the predicate is satisfied.
625+
*args :
626+
The arguments to pass to the predicate.
627+
**kwargs:
628+
Keyword arguments to pass to predicate
625629
626630
Examples
627631
--------
@@ -633,15 +637,19 @@ class filter(Stream):
633637
2
634638
4
635639
"""
636-
def __init__(self, upstream, predicate, **kwargs):
640+
641+
def __init__(self, upstream, predicate, *args, **kwargs):
637642
if predicate is None:
638643
predicate = _truthy
639644
self.predicate = predicate
645+
stream_name = kwargs.pop("stream_name", None)
646+
self.kwargs = kwargs
647+
self.args = args
640648

641-
Stream.__init__(self, upstream, **kwargs)
649+
Stream.__init__(self, upstream, stream_name=stream_name)
642650

643651
def update(self, x, who=None):
644-
if self.predicate(x):
652+
if self.predicate(x, *self.args, **self.kwargs):
645653
return self._emit(x)
646654

647655

@@ -652,13 +660,19 @@ class accumulate(Stream):
652660
This performs running or cumulative reductions, applying the function
653661
to the previous total and the new element. The function should take
654662
two arguments, the previous accumulated state and the next element and
655-
it should return a new accumulated state.
663+
it should return a new accumulated state,
664+
- ``state = func(previous_state, new_value)`` (returns_state=False)
665+
- ``state, result = func(previous_state, new_value)`` (returns_state=True)
666+
667+
where the new_state is passed to the next invocation. The state or result
668+
is emitted downstream for the two cases.
656669
657670
Parameters
658671
----------
659672
func: callable
660673
start: object
661-
Initial value. Defaults to the first submitted element
674+
Initial value, passed as the value of ``previous_state`` on the first
675+
invocation. Defaults to the first submitted element
662676
returns_state: boolean
663677
If true then func should return both the state and the value to emit
664678
If false then both values are the same, and func returns one value
@@ -667,14 +681,41 @@ class accumulate(Stream):
667681
668682
Examples
669683
--------
684+
A running total, producing triangular numbers
685+
670686
>>> source = Stream()
671687
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
672688
>>> for i in range(5):
673689
... source.emit(i)
690+
0
674691
1
675692
3
676693
6
677694
10
695+
696+
A count of number of events (including the current one)
697+
698+
>>> source = Stream()
699+
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
700+
>>> for _ in range(5):
701+
... source.emit(0)
702+
1
703+
2
704+
3
705+
4
706+
5
707+
708+
Like the builtin "enumerate".
709+
710+
>>> source = Stream()
711+
>>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
712+
... start=(0, 0), returns_state=True
713+
... ).sink(print)
714+
>>> for i in range(3):
715+
... source.emit(0)
716+
(0, 0)
717+
(1, 0)
718+
(2, 0)
678719
"""
679720
_graphviz_shape = 'box'
680721

@@ -706,6 +747,54 @@ def update(self, x, who=None):
706747
return self._emit(result)
707748

708749

750+
@Stream.register_api()
751+
class slice(Stream):
752+
"""
753+
Get only some events in a stream by position. Works like list[] syntax.
754+
755+
Parameters
756+
----------
757+
start : int
758+
First event to use. If None, start from the beginnning
759+
end : int
760+
Last event to use (non-inclusive). If None, continue without stopping.
761+
Does not support negative indexing.
762+
step : int
763+
Pass on every Nth event. If None, pass every one.
764+
765+
Examples
766+
--------
767+
>>> source = Stream()
768+
>>> source.slice(2, 6, 2).sink(print)
769+
>>> for i in range(5):
770+
... source.emit(0)
771+
2
772+
4
773+
"""
774+
775+
def __init__(self, upstream, start=None, end=None, step=None, **kwargs):
776+
self.state = 0
777+
self.star = start or 0
778+
self.end = end
779+
self.step = step or 1
780+
if any((_ or 0) < 0 for _ in [start, end, step]):
781+
raise ValueError("Negative indices not supported by slice")
782+
stream_name = kwargs.pop('stream_name', None)
783+
Stream.__init__(self, upstream, stream_name=stream_name)
784+
self._check_end()
785+
786+
def update(self, x, who=None):
787+
if self.state >= self.star and self.state % self.step == 0:
788+
self.emit(x)
789+
self.state += 1
790+
self._check_end()
791+
792+
def _check_end(self):
793+
if self.end and self.state >= self.end:
794+
# we're done
795+
self.upstream.downstreams.remove(self)
796+
797+
709798
@Stream.register_api()
710799
class partition(Stream):
711800
""" Partition stream into tuples of equal size
@@ -740,10 +829,17 @@ def update(self, x, who=None):
740829
class sliding_window(Stream):
741830
""" Produce overlapping tuples of size n
742831
832+
Parameters
833+
----------
834+
return_partial : bool
835+
If True, yield tuples as soon as any events come in, each tuple being
836+
smaller or equal to the window size. If False, only start yielding
837+
tuples once a full window has accrued.
838+
743839
Examples
744840
--------
745841
>>> source = Stream()
746-
>>> source.sliding_window(3).sink(print)
842+
>>> source.sliding_window(3, return_partial=False).sink(print)
747843
>>> for i in range(8):
748844
... source.emit(i)
749845
(0, 1, 2)
@@ -755,14 +851,15 @@ class sliding_window(Stream):
755851
"""
756852
_graphviz_shape = 'diamond'
757853

758-
def __init__(self, upstream, n, **kwargs):
854+
def __init__(self, upstream, n, return_partial=True, **kwargs):
759855
self.n = n
760856
self.buffer = deque(maxlen=n)
857+
self.partial = return_partial
761858
Stream.__init__(self, upstream, **kwargs)
762859

763860
def update(self, x, who=None):
764861
self.buffer.append(x)
765-
if len(self.buffer) == self.n:
862+
if self.partial or len(self.buffer) == self.n:
766863
return self._emit(tuple(self.buffer))
767864
else:
768865
return []

streamz/sources.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ def write(text):
3535
class Source(Stream):
3636
_graphviz_shape = 'doubleoctagon'
3737

38+
def __init__(self, **kwargs):
39+
self.stopped = True
40+
super(Source, self).__init__(**kwargs)
41+
42+
def stop(self): # pragma: no cover
43+
# fallback stop method - for poll functions with while not self.stopped
44+
if not self.stopped:
45+
self.stopped = True
46+
3847

3948
@Stream.register_api(staticmethod)
4049
class from_textfile(Source):
@@ -293,6 +302,71 @@ def stop(self):
293302
self.stopped = True
294303

295304

305+
@Stream.register_api(staticmethod)
306+
class from_process(Source):
307+
"""Messages from a running external process
308+
309+
This doesn't work on Windows
310+
311+
Parameters
312+
----------
313+
cmd : list of str or str
314+
Command to run: program name, followed by arguments
315+
open_kwargs : dict
316+
To pass on the the process open function, see ``subprocess.Popen``.
317+
with_stderr : bool
318+
Whether to include the process STDERR in the stream
319+
start : bool
320+
Whether to immediately startup the process. Usually you want to connect
321+
downstream nodes first, and then call ``.start()``.
322+
323+
Example
324+
-------
325+
>>> source = Source.from_process(['ping', 'localhost']) # doctest: +SKIP
326+
"""
327+
328+
def __init__(self, cmd, open_kwargs=None, with_stderr=False, start=False):
329+
self.cmd = cmd
330+
self.open_kwargs = open_kwargs or {}
331+
self.with_stderr = with_stderr
332+
super(from_process, self).__init__(ensure_io_loop=True)
333+
self.stopped = True
334+
self.process = None
335+
if start: # pragma: no cover
336+
self.start()
337+
338+
@gen.coroutine
339+
def _start_process(self):
340+
# should be done in asyncio (py3 only)? Apparently can handle Windows
341+
# with appropriate config.
342+
from tornado.process import Subprocess
343+
from tornado.iostream import StreamClosedError
344+
import subprocess
345+
stderr = subprocess.STDOUT if self.with_stderr else subprocess.PIPE
346+
process = Subprocess(self.cmd, stdout=Subprocess.STREAM,
347+
stderr=stderr, **self.open_kwargs)
348+
while not self.stopped:
349+
try:
350+
out = yield process.stdout.read_until(b'\n')
351+
except StreamClosedError:
352+
# process exited
353+
break
354+
yield self._emit(out)
355+
yield process.stdout.close()
356+
process.proc.terminate()
357+
358+
def start(self):
359+
"""Start external process"""
360+
if self.stopped:
361+
self.loop.add_callback(self._start_process)
362+
self.stopped = False
363+
364+
def stop(self):
365+
"""Shutdown external process"""
366+
if not self.stopped:
367+
self.stopped = True
368+
369+
296370
@Stream.register_api(staticmethod)
297371
class from_kafka(Source):
298372
""" Accepts messages from Kafka

streamz/tests/test_core.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,26 @@ def test_filter():
8383
assert L == [0, 2, 4, 6, 8]
8484

8585

86+
def test_filter_args():
87+
source = Stream()
88+
L = source.filter(lambda x, n: x % n == 0, 2).sink_to_list()
89+
90+
for i in range(10):
91+
source.emit(i)
92+
93+
assert L == [0, 2, 4, 6, 8]
94+
95+
96+
def test_filter_kwargs():
97+
source = Stream()
98+
L = source.filter(lambda x, n=1: x % n == 0, n=2).sink_to_list()
99+
100+
for i in range(10):
101+
source.emit(i)
102+
103+
assert L == [0, 2, 4, 6, 8]
104+
105+
86106
def test_filter_none():
87107
source = Stream()
88108
L = source.filter(None).sink_to_list()
@@ -148,6 +168,14 @@ def test_sliding_window():
148168
source = Stream()
149169
L = source.sliding_window(2).sink_to_list()
150170

171+
for i in range(10):
172+
source.emit(i)
173+
174+
assert L == [(0, ), (0, 1), (1, 2), (2, 3), (3, 4), (4, 5),
175+
(5, 6), (6, 7), (7, 8), (8, 9)]
176+
177+
L = source.sliding_window(2, return_partial=False).sink_to_list()
178+
151179
for i in range(10):
152180
source.emit(i)
153181

@@ -1099,6 +1127,31 @@ def test_share_common_ioloop(clean): # noqa: F811
10991127
assert aa.loop is bb.loop
11001128

11011129

1130+
@pytest.mark.parametrize('data', [
1131+
[[], [0, 1, 2, 3, 4, 5]],
1132+
[[None, None, None], [0, 1, 2, 3, 4, 5]],
1133+
[[1, None, None], [1, 2, 3, 4, 5]],
1134+
[[None, 4, None], [0, 1, 2, 3]],
1135+
[[None, 4, 2], [0, 2]],
1136+
[[3, 1, None], []]
1137+
1138+
])
1139+
def test_slice(data):
1140+
pars, expected = data
1141+
a = Stream()
1142+
b = a.slice(*pars)
1143+
out = b.sink_to_list()
1144+
for i in range(6):
1145+
a.emit(i)
1146+
assert out == expected
1147+
1148+
1149+
def test_slice_err():
1150+
a = Stream()
1151+
with pytest.raises(ValueError):
1152+
a.slice(end=-1)
1153+
1154+
11021155
def test_start():
11031156
flag = []
11041157

streamz/tests/test_sources.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,13 @@ def test_http():
8181

8282
with pytest.raises(requests.exceptions.RequestException):
8383
requests.post('http://localhost:%i/other' % port, data=b'data2')
84+
85+
86+
@gen_test(timeout=60)
87+
def test_process():
88+
cmd = ["python", "-c", "for i in range(4): print(i)"]
89+
s = Source.from_process(cmd)
90+
out = s.sink_to_list()
91+
s.start()
92+
yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5)
93+
s.stop()

0 commit comments

Comments
 (0)