Skip to content

Commit 91c8114

Browse files
Satish KumarSatish Kumar
authored andcommitted
Merge remote-tracking branch 'upstream/master'
Pull latest commits from original repo
2 parents ed328a3 + 35b8e9c commit 91c8114

File tree

5 files changed

+271
-4
lines changed

5 files changed

+271
-4
lines changed

docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Sources
4141
filenames
4242
from_kafka
4343
from_textfile
44+
from_socket
4445

4546
DaskStream
4647
----------

docs/source/dask.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,23 @@ did.
117117
``source.emit``.
118118

119119
.. _Dask: https://dask.pydata.org/en/latest/
120+
121+
122+
Gotchas
123+
+++++++
124+
125+
126+
An important gotcha with ``DaskStream`` is that it is a subclass ``Stream``, and so can be used as an input
127+
to any function expecting a ``Stream``. If there is no intervening ``.gather()``, then the downstream node will
128+
receive Dask futures instead of the data they represent::
129+
130+
source = Stream()
131+
source2 = Stream()
132+
a = source.scatter().map(inc)
133+
b = source2.combine_latest(a)
134+
135+
In this case, the combine operation will get real values from ``source2``, and Dask futures.
136+
Downstream nodes would be free to operate on the futures, but more likely, the line should be::
137+
138+
b = source2.combine_latest(a.gather())
139+

streamz/sources.py

Lines changed: 148 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ class from_textfile(Source):
5050
start: bool (False)
5151
Whether to start running immediately; otherwise call stream.start()
5252
explicitly.
53+
from_end: bool (False)
54+
Whether to begin streaming from the end of the file (i.e., only emit
55+
lines appended after the stream starts).
5356
5457
Example
5558
-------
@@ -63,26 +66,33 @@ class from_textfile(Source):
6366
Stream
6467
"""
6568
def __init__(self, f, poll_interval=0.100, delimiter='\n', start=False,
66-
**kwargs):
69+
from_end=False, **kwargs):
6770
if isinstance(f, str):
6871
f = open(f)
6972
self.file = f
73+
self.from_end = from_end
7074
self.delimiter = delimiter
7175

7276
self.poll_interval = poll_interval
7377
super(from_textfile, self).__init__(ensure_io_loop=True, **kwargs)
7478
self.stopped = True
79+
self.started = False
7580
if start:
7681
self.start()
7782

7883
def start(self):
7984
self.stopped = False
85+
self.started = False
8086
self.loop.add_callback(self.do_poll)
8187

8288
@gen.coroutine
8389
def do_poll(self):
8490
buffer = ''
85-
while True:
91+
if self.from_end:
92+
# this only happens when we are ready to read
93+
self.file.seek(0, 2)
94+
while not self.stopped:
95+
self.started = True
8696
line = self.file.read()
8797
if line:
8898
buffer = buffer + line
@@ -93,8 +103,6 @@ def do_poll(self):
93103
yield self._emit(part + self.delimiter)
94104
else:
95105
yield gen.sleep(self.poll_interval)
96-
if self.stopped:
97-
break
98106

99107

100108
@Stream.register_api(staticmethod)
@@ -147,6 +155,142 @@ def do_poll(self):
147155
break
148156

149157

158+
@Stream.register_api(staticmethod)
159+
class from_tcp(Source):
160+
"""
161+
Creates events by reading from a socket using tornado TCPServer
162+
163+
The stream of incoming bytes is split on a given delimiter, and the parts
164+
become the emitted events.
165+
166+
Parameters
167+
----------
168+
port : int
169+
The port to open and listen on. It only gets opened when the source
170+
is started, and closed upon ``stop()``
171+
delimiter : bytes
172+
The incoming data will be split on this value. The resulting events
173+
will still have the delimiter at the end.
174+
start : bool
175+
Whether to immediately initiate the source. You probably want to
176+
set up downstream nodes first.
177+
server_kwargs : dict or None
178+
If given, additional arguments to pass to TCPServer
179+
180+
Example
181+
-------
182+
183+
>>> source = Source.from_tcp(4567) # doctest: +SKIP
184+
"""
185+
def __init__(self, port, delimiter=b'\n', start=False,
186+
server_kwargs=None):
187+
super(from_tcp, self).__init__(ensure_io_loop=True)
188+
self.stopped = True
189+
self.server_kwargs = server_kwargs or {}
190+
self.port = port
191+
self.server = None
192+
self.delimiter = delimiter
193+
if start: # pragma: no cover
194+
self.start()
195+
196+
@gen.coroutine
197+
def _start_server(self):
198+
from tornado.tcpserver import TCPServer
199+
from tornado.iostream import StreamClosedError
200+
201+
class EmitServer(TCPServer):
202+
source = self
203+
204+
@gen.coroutine
205+
def handle_stream(self, stream, address):
206+
while True:
207+
try:
208+
data = yield stream.read_until(self.source.delimiter)
209+
yield self.source._emit(data)
210+
except StreamClosedError:
211+
break
212+
213+
self.server = EmitServer(**self.server_kwargs)
214+
self.server.listen(self.port)
215+
216+
def start(self):
217+
if self.stopped:
218+
self.loop.add_callback(self._start_server)
219+
self.stopped = False
220+
221+
def stop(self):
222+
if not self.stopped:
223+
self.server.stop()
224+
self.server = None
225+
self.stopped = True
226+
227+
228+
@Stream.register_api(staticmethod)
229+
class from_http_server(Source):
230+
"""Listen for HTTP POSTs on given port
231+
232+
Each connection will emit one event, containing the body data of
233+
the request
234+
235+
Parameters
236+
----------
237+
port : int
238+
The port to listen on
239+
path : str
240+
Specific path to listen on. Can be regex, but content is not used.
241+
start : bool
242+
Whether to immediately startup the server. Usually you want to connect
243+
downstream nodes first, and then call ``.start()``.
244+
server_kwargs : dict or None
245+
If given, set of further parameters to pass on to HTTPServer
246+
247+
Example
248+
-------
249+
>>> source = Source.from_http_server(4567) # doctest: +SKIP
250+
"""
251+
252+
def __init__(self, port, path='/.*', start=False, server_kwargs=None):
253+
self.port = port
254+
self.path = path
255+
self.server_kwargs = server_kwargs or {}
256+
super(from_http_server, self).__init__(ensure_io_loop=True)
257+
self.stopped = True
258+
self.server = None
259+
if start: # pragma: no cover
260+
self.start()
261+
262+
def _start_server(self):
263+
from tornado.web import Application, RequestHandler
264+
from tornado.httpserver import HTTPServer
265+
266+
class Handler(RequestHandler):
267+
source = self
268+
269+
@gen.coroutine
270+
def post(self):
271+
yield self.source._emit(self.request.body)
272+
self.write('OK')
273+
274+
application = Application([
275+
(self.path, Handler),
276+
])
277+
self.server = HTTPServer(application, **self.server_kwargs)
278+
self.server.listen(self.port)
279+
280+
def start(self):
281+
"""Start HTTP server and listen"""
282+
if self.stopped:
283+
self.loop.add_callback(self._start_server)
284+
self.stopped = False
285+
286+
def stop(self):
287+
"""Shutdown HTTP server"""
288+
if not self.stopped:
289+
self.server.stop()
290+
self.server = None
291+
self.stopped = True
292+
293+
150294
@Stream.register_api(staticmethod)
151295
class from_kafka(Source):
152296
""" Accepts messages from Kafka

streamz/tests/test_core.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,25 @@ def test_from_file():
787787
assert time() < start + 2 # reads within 2s
788788

789789

790+
@gen_test()
791+
def test_from_file_end():
792+
with tmpfile() as fn:
793+
with open(fn, 'wt') as f:
794+
f.write('data1\n')
795+
f.flush()
796+
797+
source = Stream.from_textfile(fn, poll_interval=0.010,
798+
start=False, from_end=True)
799+
out = source.sink_to_list()
800+
source.start()
801+
assert out == []
802+
yield await_for(lambda: source.started, 2, period=0.02)
803+
804+
f.write('data2\n')
805+
f.flush()
806+
yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1)
807+
808+
790809
@gen_test()
791810
def test_filenames():
792811
with tmpfile() as fn:

streamz/tests/test_sources.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import pytest
2+
from streamz import Source
3+
from streamz.utils_test import wait_for, await_for, gen_test
4+
import socket
5+
6+
7+
def test_tcp():
8+
port = 9876
9+
s = Source.from_tcp(port)
10+
out = s.sink_to_list()
11+
s.start()
12+
wait_for(lambda: s.server is not None, 2, period=0.02)
13+
14+
try:
15+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
16+
sock.connect(("localhost", port))
17+
sock.send(b'data\n')
18+
sock.close()
19+
20+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
21+
sock.connect(("localhost", port))
22+
sock.send(b'data\n')
23+
24+
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
25+
sock2.connect(("localhost", port))
26+
sock2.send(b'data2\n')
27+
wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2,
28+
period=0.01)
29+
finally:
30+
s.stop()
31+
sock.close()
32+
sock2.close()
33+
34+
35+
@gen_test(timeout=60)
36+
def test_tcp_async():
37+
port = 9876
38+
s = Source.from_tcp(port)
39+
out = s.sink_to_list()
40+
s.start()
41+
yield await_for(lambda: s.server is not None, 2, period=0.02)
42+
43+
try:
44+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45+
sock.connect(("localhost", port))
46+
sock.send(b'data\n')
47+
sock.close()
48+
49+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50+
sock.connect(("localhost", port))
51+
sock.send(b'data\n')
52+
53+
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
54+
sock2.connect(("localhost", port))
55+
sock2.send(b'data2\n')
56+
yield await_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2,
57+
period=0.01)
58+
finally:
59+
s.stop()
60+
sock.close()
61+
sock2.close()
62+
63+
64+
def test_http():
65+
requests = pytest.importorskip('requests')
66+
port = 9875
67+
s = Source.from_http_server(port)
68+
out = s.sink_to_list()
69+
s.start()
70+
wait_for(lambda: s.server is not None, 2, period=0.02)
71+
72+
r = requests.post('http://localhost:%i/' % port, data=b'data')
73+
wait_for(lambda: out == [b'data'], 2, period=0.01)
74+
assert r.ok
75+
76+
r = requests.post('http://localhost:%i/other' % port, data=b'data2')
77+
wait_for(lambda: out == [b'data', b'data2'], 2, period=0.01)
78+
assert r.ok
79+
80+
s.stop()
81+
82+
with pytest.raises(requests.exceptions.RequestException):
83+
requests.post('http://localhost:%i/other' % port, data=b'data2')

0 commit comments

Comments
 (0)