Skip to content

Commit 04cdb3a

Browse files
authored
Merge pull request #245 from martindurant/docs
update some docs and examples
2 parents c48f93a + 5a30a37 commit 04cdb3a

File tree

5 files changed

+49
-21
lines changed

5 files changed

+49
-21
lines changed

docs/source/api.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ Sources
4141
.. autosummary::
4242
filenames
4343
from_kafka
44+
from_kafka_batched
4445
from_process
4546
from_textfile
46-
from_socket
47+
from_tcp
48+
from_http_server
4749

4850
DaskStream
4951
----------

docs/source/conf.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@
5656
# built documents.
5757
#
5858
# The short X.Y version.
59-
version = '0.0.1'
59+
import streamz
60+
version = streamz.__version__
6061
# The full version, including alpha/beta/rc tags.
61-
release = '0.0.1'
62+
release = streamz.__version__
6263

6364
# The language for content autogenerated by Sphinx. Refer to documentation
6465
# for a list of supported languages.

docs/source/index.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,4 @@ data streaming systems like `Apache Flink <https://flink.apache.org/>`_,
8888
collections.rst
8989
api.rst
9090
collections-api.rst
91-
dataframe-aggregations.rst
9291
async.rst

examples/network_wordcount.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#! /usr/env python
2+
""" a recreation of spark-streaming's network_wordcount
3+
4+
https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#a-quick-example
5+
"""
6+
import time
7+
from streamz import Stream
8+
9+
# absolute port on localhost for now
10+
s = Stream.from_tcp(9999)
11+
s.map(bytes.split).flatten().frequencies().sink(print)
12+
13+
print(
14+
"""In another terminal execute
15+
> nc 127.0.0.1 9999
16+
and then start typing content
17+
"""
18+
)
19+
20+
s.start()
21+
time.sleep(600)

streamz/sources.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,22 @@ class from_textfile(Source):
5252
Parameters
5353
----------
5454
f: file or string
55+
Source of the data. If string, will be opened.
5556
poll_interval: Number
5657
Interval to poll file for new data in seconds
57-
delimiter: str ("\n")
58+
delimiter: str
5859
Character(s) to use to split the data into parts
59-
start: bool (False)
60+
start: bool
6061
Whether to start running immediately; otherwise call stream.start()
6162
explicitly.
62-
from_end: bool (False)
63+
from_end: bool
6364
Whether to begin streaming from the end of the file (i.e., only emit
6465
lines appended after the stream starts).
6566
66-
Example
67-
-------
67+
Examples
68+
--------
6869
>>> source = Stream.from_textfile('myfile.json') # doctest: +SKIP
6970
>>> js.map(json.loads).pluck('value').sum().sink(print) # doctest: +SKIP
70-
7171
>>> source.start() # doctest: +SKIP
7272
7373
Returns
@@ -186,8 +186,8 @@ class from_tcp(Source):
186186
server_kwargs : dict or None
187187
If given, additional arguments to pass to TCPServer
188188
189-
Example
190-
-------
189+
Examples
190+
--------
191191
192192
>>> source = Source.from_tcp(4567) # doctest: +SKIP
193193
"""
@@ -253,9 +253,11 @@ class from_http_server(Source):
253253
server_kwargs : dict or None
254254
If given, set of further parameters to pass on to HTTPServer
255255
256-
Example
257-
-------
256+
Examples
257+
--------
258+
258259
>>> source = Source.from_http_server(4567) # doctest: +SKIP
260+
259261
"""
260262

261263
def __init__(self, port, path='/.*', start=False, server_kwargs=None):
@@ -382,20 +384,22 @@ class from_kafka(Source):
382384
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
383385
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
384386
Examples:
385-
bootstrap.servers: Connection string(s) (host:port) by which to reach Kafka
386-
group.id: Identity of the consumer. If multiple sources share the same
387-
group, each message will be passed to only one of them.
387+
bootstrap.servers, Connection string(s) (host:port) by which to reach
388+
Kafka;
389+
group.id, Identity of the consumer. If multiple sources share the same
390+
group, each message will be passed to only one of them.
388391
poll_interval: number
389392
Seconds that elapse between polling Kafka for new messages
390393
start: bool (False)
391394
Whether to start polling upon instantiation
392395
393-
Example
394-
-------
396+
Examples
397+
--------
395398
396399
>>> source = Stream.from_kafka(['mytopic'],
397400
... {'bootstrap.servers': 'localhost:9092',
398401
... 'group.id': 'streamz'}) # doctest: +SKIP
402+
399403
"""
400404
def __init__(self, topics, consumer_params, poll_interval=0.1, start=False, **kwargs):
401405
self.cpars = consumer_params
@@ -532,12 +536,13 @@ def from_kafka_batched(topic, consumer_params, poll_interval='1s',
532536
start: bool (False)
533537
Whether to start polling upon instantiation
534538
535-
Example
536-
-------
539+
Examples
540+
--------
537541
538542
>>> source = Stream.from_kafka_batched('mytopic',
539543
... {'bootstrap.servers': 'localhost:9092',
540544
... 'group.id': 'streamz'}, npartitions=4) # doctest: +SKIP
545+
541546
"""
542547
if dask:
543548
from distributed.client import default_client

0 commit comments

Comments
 (0)