Skip to content

Commit 0b93c94

Browse files
martindurantmrocklin
authored andcommitted
Add TCPServer and HTTPServer sources (#227)
1 parent c26a84c commit 0b93c94

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed

docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Sources
4141
filenames
4242
from_kafka
4343
from_textfile
44+
from_socket
4445

4546
DaskStream
4647
----------

streamz/sources.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,142 @@ def do_poll(self):
147147
break
148148

149149

150+
@Stream.register_api(staticmethod)
151+
class from_tcp(Source):
152+
"""
153+
Creates events by reading from a socket using tornado TCPServer
154+
155+
The stream of incoming bytes is split on a given delimiter, and the parts
156+
become the emitted events.
157+
158+
Parameters
159+
----------
160+
port : int
161+
The port to open and listen on. It only gets opened when the source
162+
is started, and closed upon ``stop()``
163+
delimiter : bytes
164+
The incoming data will be split on this value. The resulting events
165+
will still have the delimiter at the end.
166+
start : bool
167+
Whether to immediately initiate the source. You probably want to
168+
set up downstream nodes first.
169+
server_kwargs : dict or None
170+
If given, additional arguments to pass to TCPServer
171+
172+
Example
173+
-------
174+
175+
>>> source = Source.from_tcp(4567) # doctest: +SKIP
176+
"""
177+
def __init__(self, port, delimiter=b'\n', start=False,
178+
server_kwargs=None):
179+
super(from_tcp, self).__init__(ensure_io_loop=True)
180+
self.stopped = True
181+
self.server_kwargs = server_kwargs or {}
182+
self.port = port
183+
self.server = None
184+
self.delimiter = delimiter
185+
if start: # pragma: no cover
186+
self.start()
187+
188+
@gen.coroutine
189+
def _start_server(self):
190+
from tornado.tcpserver import TCPServer
191+
from tornado.iostream import StreamClosedError
192+
193+
class EmitServer(TCPServer):
194+
source = self
195+
196+
@gen.coroutine
197+
def handle_stream(self, stream, address):
198+
while True:
199+
try:
200+
data = yield stream.read_until(self.source.delimiter)
201+
yield self.source._emit(data)
202+
except StreamClosedError:
203+
break
204+
205+
self.server = EmitServer(**self.server_kwargs)
206+
self.server.listen(self.port)
207+
208+
def start(self):
209+
if self.stopped:
210+
self.loop.add_callback(self._start_server)
211+
self.stopped = False
212+
213+
def stop(self):
214+
if not self.stopped:
215+
self.server.stop()
216+
self.server = None
217+
self.stopped = True
218+
219+
220+
@Stream.register_api(staticmethod)
221+
class from_http_server(Source):
222+
"""Listen for HTTP POSTs on given port
223+
224+
Each connection will emit one event, containing the body data of
225+
the request
226+
227+
Parameters
228+
----------
229+
port : int
230+
The port to listen on
231+
path : str
232+
Specific path to listen on. Can be regex, but content is not used.
233+
start : bool
234+
Whether to immediately startup the server. Usually you want to connect
235+
downstream nodes first, and then call ``.start()``.
236+
server_kwargs : dict or None
237+
If given, set of further parameters to pass on to HTTPServer
238+
239+
Example
240+
-------
241+
>>> source = Source.from_http_server(4567) # doctest: +SKIP
242+
"""
243+
244+
def __init__(self, port, path='/.*', start=False, server_kwargs=None):
245+
self.port = port
246+
self.path = path
247+
self.server_kwargs = server_kwargs or {}
248+
super(from_http_server, self).__init__(ensure_io_loop=True)
249+
self.stopped = True
250+
self.server = None
251+
if start: # pragma: no cover
252+
self.start()
253+
254+
def _start_server(self):
255+
from tornado.web import Application, RequestHandler
256+
from tornado.httpserver import HTTPServer
257+
258+
class Handler(RequestHandler):
259+
source = self
260+
261+
@gen.coroutine
262+
def post(self):
263+
yield self.source._emit(self.request.body)
264+
self.write('OK')
265+
266+
application = Application([
267+
(self.path, Handler),
268+
])
269+
self.server = HTTPServer(application, **self.server_kwargs)
270+
self.server.listen(self.port)
271+
272+
def start(self):
273+
"""Start HTTP server and listen"""
274+
if self.stopped:
275+
self.loop.add_callback(self._start_server)
276+
self.stopped = False
277+
278+
def stop(self):
279+
"""Shutdown HTTP server"""
280+
if not self.stopped:
281+
self.server.stop()
282+
self.server = None
283+
self.stopped = True
284+
285+
150286
@Stream.register_api(staticmethod)
151287
class from_kafka(Source):
152288
""" Accepts messages from Kafka

streamz/tests/test_sources.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import pytest
2+
from streamz import Source
3+
from streamz.utils_test import wait_for, await_for, gen_test
4+
import socket
5+
6+
7+
def test_tcp():
8+
port = 9876
9+
s = Source.from_tcp(port)
10+
out = s.sink_to_list()
11+
s.start()
12+
wait_for(lambda: s.server is not None, 2, period=0.02)
13+
14+
try:
15+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
16+
sock.connect(("localhost", port))
17+
sock.send(b'data\n')
18+
sock.close()
19+
20+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
21+
sock.connect(("localhost", port))
22+
sock.send(b'data\n')
23+
24+
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
25+
sock2.connect(("localhost", port))
26+
sock2.send(b'data2\n')
27+
wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2,
28+
period=0.01)
29+
finally:
30+
s.stop()
31+
sock.close()
32+
sock2.close()
33+
34+
35+
@gen_test(timeout=60)
36+
def test_tcp_async():
37+
port = 9876
38+
s = Source.from_tcp(port)
39+
out = s.sink_to_list()
40+
s.start()
41+
yield await_for(lambda: s.server is not None, 2, period=0.02)
42+
43+
try:
44+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45+
sock.connect(("localhost", port))
46+
sock.send(b'data\n')
47+
sock.close()
48+
49+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50+
sock.connect(("localhost", port))
51+
sock.send(b'data\n')
52+
53+
sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
54+
sock2.connect(("localhost", port))
55+
sock2.send(b'data2\n')
56+
yield await_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2,
57+
period=0.01)
58+
finally:
59+
s.stop()
60+
sock.close()
61+
sock2.close()
62+
63+
64+
def test_http():
65+
requests = pytest.importorskip('requests')
66+
port = 9875
67+
s = Source.from_http_server(port)
68+
out = s.sink_to_list()
69+
s.start()
70+
wait_for(lambda: s.server is not None, 2, period=0.02)
71+
72+
r = requests.post('http://localhost:%i/' % port, data=b'data')
73+
wait_for(lambda: out == [b'data'], 2, period=0.01)
74+
assert r.ok
75+
76+
r = requests.post('http://localhost:%i/other' % port, data=b'data2')
77+
wait_for(lambda: out == [b'data', b'data2'], 2, period=0.01)
78+
assert r.ok
79+
80+
s.stop()
81+
82+
with pytest.raises(requests.exceptions.RequestException):
83+
requests.post('http://localhost:%i/other' % port, data=b'data2')

0 commit comments

Comments
 (0)