Skip to content

Commit c48f93a

Browse files
authored
Merge pull request #241 from martindurant/slice
extra functionality
2 parents faf61ca + 1adb8c9 commit c48f93a

File tree

7 files changed

+216
-6
lines changed

7 files changed

+216
-6
lines changed

docs/source/api.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Stream
2525
rate_limit
2626
scatter
2727
sink
28+
slice
2829
sliding_window
2930
starmap
3031
timed_window
@@ -40,6 +41,7 @@ Sources
4041
.. autosummary::
4142
filenames
4243
from_kafka
44+
from_process
4345
from_textfile
4446
from_socket
4547

examples/fib_asyncio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ def run_asyncio_loop():
2424
finally:
2525
loop.close()
2626

27+
2728
run_asyncio_loop()

streamz/collection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ def _repr_html_(self):
229229
return "<h5>%s - elements like<h5>\n%s" % (type(self).__name__, body)
230230

231231
def _ipython_display_(self, **kwargs):
232-
return self.stream.latest().rate_limit(0.5).gather()._ipython_display_(**kwargs)
232+
return self.stream.latest().rate_limit(
233+
0.5).gather()._ipython_display_(**kwargs)
233234

234235
def emit(self, x):
235236
self.verify(x)

streamz/core.py

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -660,13 +660,19 @@ class accumulate(Stream):
660660
This performs running or cumulative reductions, applying the function
661661
to the previous total and the new element. The function should take
662662
two arguments, the previous accumulated state and the next element and
663-
it should return a new accumulated state.
663+
it should return a new accumulated state,
664+
- ``state = func(previous_state, new_value)`` (returns_state=False)
665+
- ``state, result = func(previous_state, new_value)`` (returns_state=True)
666+
667+
where the new_state is passed to the next invocation. The state or result
668+
is emitted downstream for the two cases.
664669
665670
Parameters
666671
----------
667672
func: callable
668673
start: object
669-
Initial value. Defaults to the first submitted element
674+
Initial value, passed as the value of ``previous_state`` on the first
675+
invocation. Defaults to the first submitted element
670676
returns_state: boolean
671677
If true then func should return both the state and the value to emit
672678
If false then both values are the same, and func returns one value
@@ -675,14 +681,41 @@ class accumulate(Stream):
675681
676682
Examples
677683
--------
684+
A running total, producing triangular numbers
685+
678686
>>> source = Stream()
679687
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
680688
>>> for i in range(5):
681689
... source.emit(i)
690+
0
682691
1
683692
3
684693
6
685694
10
695+
696+
A count of number of events (including the current one)
697+
698+
>>> source = Stream()
699+
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
700+
>>> for _ in range(5):
701+
... source.emit(0)
702+
1
703+
2
704+
3
705+
4
706+
5
707+
708+
Like the builtin "enumerate".
709+
710+
>>> source = Stream()
711+
>>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
712+
... start=(0, 0), returns_state=True
713+
... ).sink(print)
714+
>>> for i in range(3):
715+
... source.emit(0)
716+
(0, 0)
717+
(1, 0)
718+
(2, 0)
686719
"""
687720
_graphviz_shape = 'box'
688721

@@ -714,6 +747,54 @@ def update(self, x, who=None):
714747
return self._emit(result)
715748

716749

750+
@Stream.register_api()
751+
class slice(Stream):
752+
"""
753+
Get only some events in a stream by position. Works like list[] syntax.
754+
755+
Parameters
756+
----------
757+
start : int
758+
First event to use. If None, start from the beginnning
759+
end : int
760+
Last event to use (non-inclusive). If None, continue without stopping.
761+
Does not support negative indexing.
762+
step : int
763+
Pass on every Nth event. If None, pass every one.
764+
765+
Examples
766+
--------
767+
>>> source = Stream()
768+
>>> source.slice(2, 6, 2).sink(print)
769+
>>> for i in range(5):
770+
... source.emit(0)
771+
2
772+
4
773+
"""
774+
775+
def __init__(self, upstream, start=None, end=None, step=None, **kwargs):
776+
self.state = 0
777+
self.star = start or 0
778+
self.end = end
779+
self.step = step or 1
780+
if any((_ or 0) < 0 for _ in [start, end, step]):
781+
raise ValueError("Negative indices not supported by slice")
782+
stream_name = kwargs.pop('stream_name', None)
783+
Stream.__init__(self, upstream, stream_name=stream_name)
784+
self._check_end()
785+
786+
def update(self, x, who=None):
787+
if self.state >= self.star and self.state % self.step == 0:
788+
self.emit(x)
789+
self.state += 1
790+
self._check_end()
791+
792+
def _check_end(self):
793+
if self.end and self.state >= self.end:
794+
# we're done
795+
self.upstream.downstreams.remove(self)
796+
797+
717798
@Stream.register_api()
718799
class partition(Stream):
719800
""" Partition stream into tuples of equal size
@@ -748,10 +829,17 @@ def update(self, x, who=None):
748829
class sliding_window(Stream):
749830
""" Produce overlapping tuples of size n
750831
832+
Parameters
833+
----------
834+
return_partial : bool
835+
If True, yield tuples as soon as any events come in, each tuple being
836+
smaller or equal to the window size. If False, only start yielding
837+
tuples once a full window has accrued.
838+
751839
Examples
752840
--------
753841
>>> source = Stream()
754-
>>> source.sliding_window(3).sink(print)
842+
>>> source.sliding_window(3, return_partial=False).sink(print)
755843
>>> for i in range(8):
756844
... source.emit(i)
757845
(0, 1, 2)
@@ -763,14 +851,15 @@ class sliding_window(Stream):
763851
"""
764852
_graphviz_shape = 'diamond'
765853

766-
def __init__(self, upstream, n, **kwargs):
854+
def __init__(self, upstream, n, return_partial=True, **kwargs):
767855
self.n = n
768856
self.buffer = deque(maxlen=n)
857+
self.partial = return_partial
769858
Stream.__init__(self, upstream, **kwargs)
770859

771860
def update(self, x, who=None):
772861
self.buffer.append(x)
773-
if len(self.buffer) == self.n:
862+
if self.partial or len(self.buffer) == self.n:
774863
return self._emit(tuple(self.buffer))
775864
else:
776865
return []

streamz/sources.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ def write(text):
3535
class Source(Stream):
3636
_graphviz_shape = 'doubleoctagon'
3737

38+
def __init__(self, **kwargs):
39+
self.stopped = True
40+
super(Source, self).__init__(**kwargs)
41+
42+
def stop(self): # pragma: no cover
43+
# fallback stop method - for poll functions with while not self.stopped
44+
if not self.stopped:
45+
self.stopped = True
46+
3847

3948
@Stream.register_api(staticmethod)
4049
class from_textfile(Source):
@@ -291,6 +300,71 @@ def stop(self):
291300
self.stopped = True
292301

293302

303+
@Stream.register_api(staticmethod)
304+
class from_process(Source):
305+
"""Messages from a running external process
306+
307+
This doesn't work on Windows
308+
309+
Parameters
310+
----------
311+
cmd : list of str or str
312+
Command to run: program name, followed by arguments
313+
open_kwargs : dict
314+
To pass on the the process open function, see ``subprocess.Popen``.
315+
with_stderr : bool
316+
Whether to include the process STDERR in the stream
317+
start : bool
318+
Whether to immediately startup the process. Usually you want to connect
319+
downstream nodes first, and then call ``.start()``.
320+
321+
Example
322+
-------
323+
>>> source = Source.from_process(['ping', 'localhost']) # doctest: +SKIP
324+
"""
325+
326+
def __init__(self, cmd, open_kwargs=None, with_stderr=False, start=False):
327+
self.cmd = cmd
328+
self.open_kwargs = open_kwargs or {}
329+
self.with_stderr = with_stderr
330+
super(from_process, self).__init__(ensure_io_loop=True)
331+
self.stopped = True
332+
self.process = None
333+
if start: # pragma: no cover
334+
self.start()
335+
336+
@gen.coroutine
337+
def _start_process(self):
338+
# should be done in asyncio (py3 only)? Apparently can handle Windows
339+
# with appropriate config.
340+
from tornado.process import Subprocess
341+
from tornado.iostream import StreamClosedError
342+
import subprocess
343+
stderr = subprocess.STDOUT if self.with_stderr else subprocess.PIPE
344+
process = Subprocess(self.cmd, stdout=Subprocess.STREAM,
345+
stderr=stderr, **self.open_kwargs)
346+
while not self.stopped:
347+
try:
348+
out = yield process.stdout.read_until(b'\n')
349+
except StreamClosedError:
350+
# process exited
351+
break
352+
yield self._emit(out)
353+
yield process.stdout.close()
354+
process.proc.terminate()
355+
356+
def start(self):
357+
"""Start external process"""
358+
if self.stopped:
359+
self.loop.add_callback(self._start_process)
360+
self.stopped = False
361+
362+
def stop(self):
363+
"""Shutdown external process"""
364+
if not self.stopped:
365+
self.stopped = True
366+
367+
294368
@Stream.register_api(staticmethod)
295369
class from_kafka(Source):
296370
""" Accepts messages from Kafka

streamz/tests/test_core.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,14 @@ def test_sliding_window():
168168
source = Stream()
169169
L = source.sliding_window(2).sink_to_list()
170170

171+
for i in range(10):
172+
source.emit(i)
173+
174+
assert L == [(0, ), (0, 1), (1, 2), (2, 3), (3, 4), (4, 5),
175+
(5, 6), (6, 7), (7, 8), (8, 9)]
176+
177+
L = source.sliding_window(2, return_partial=False).sink_to_list()
178+
171179
for i in range(10):
172180
source.emit(i)
173181

@@ -1119,6 +1127,31 @@ def test_share_common_ioloop(clean): # noqa: F811
11191127
assert aa.loop is bb.loop
11201128

11211129

1130+
@pytest.mark.parametrize('data', [
1131+
[[], [0, 1, 2, 3, 4, 5]],
1132+
[[None, None, None], [0, 1, 2, 3, 4, 5]],
1133+
[[1, None, None], [1, 2, 3, 4, 5]],
1134+
[[None, 4, None], [0, 1, 2, 3]],
1135+
[[None, 4, 2], [0, 2]],
1136+
[[3, 1, None], []]
1137+
1138+
])
1139+
def test_slice(data):
1140+
pars, expected = data
1141+
a = Stream()
1142+
b = a.slice(*pars)
1143+
out = b.sink_to_list()
1144+
for i in range(6):
1145+
a.emit(i)
1146+
assert out == expected
1147+
1148+
1149+
def test_slice_err():
1150+
a = Stream()
1151+
with pytest.raises(ValueError):
1152+
a.slice(end=-1)
1153+
1154+
11221155
def test_start():
11231156
flag = []
11241157

streamz/tests/test_sources.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,13 @@ def test_http():
8181

8282
with pytest.raises(requests.exceptions.RequestException):
8383
requests.post('http://localhost:%i/other' % port, data=b'data2')
84+
85+
86+
@gen_test(timeout=60)
87+
def test_process():
88+
cmd = ["python", "-c", "for i in range(4): print(i)"]
89+
s = Source.from_process(cmd)
90+
out = s.sink_to_list()
91+
s.start()
92+
yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5)
93+
s.stop()

0 commit comments

Comments
 (0)