Skip to content

Commit e3338c2

Browse files
authored
Merge pull request #254 from skmatti/cudf-docs-examples
Add docs and examples for cudf support
2 parents ffed062 + 4d8e081 commit e3338c2

File tree

3 files changed

+168
-1
lines changed

3 files changed

+168
-1
lines changed

docs/source/collections.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ bring special consideration to certain types of data:
66

77
1. ``streamz.batch``: supports streams of lists of Python objects like tuples
88
or dictionaries
9-
2. ``streamz.dataframe``: supports streams of Pandas dataframes or Pandas series
9+
2. ``streamz.dataframe``: supports streams of Pandas/cudf dataframes or Pandas/cudf series.
10+
cudf support is in beta phase and has limited functionality as of cudf version ``0.8``
1011

1112
These high-level APIs help us handle common situations in data processing.
1213
They help us implement complex algorithms and also improve efficiency.

docs/source/gpu-dataframes.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
Streaming GPU DataFrames(cudf)
2+
------------------------------
3+
4+
The ``streamz.dataframe`` module provides DataFrame-like interface on streaming
5+
data as described in ``dataframes`` documentation. It provides support for dataframe
6+
like libraries such as pandas and cudf. This documentation is specific to streaming GPU
7+
dataframes(cudf).
8+
9+
The example in the ``dataframes`` documentation is rewritten below using cudf dataframes
10+
just by replacing ``pandas`` module with ``cudf``:
11+
12+
.. code-block:: python
13+
14+
import cudf
15+
from streamz.dataframe import DataFrame
16+
17+
example = cudf.DataFrame({'name': [], 'amount': []})
18+
sdf = DataFrame(stream, example=example)
19+
20+
sdf[sdf.name == 'Alice'].amount.sum()
21+
22+
23+
Supported Operations
24+
--------------------
25+
26+
Streaming cudf dataframes support the following classes of operations
27+
28+
- Elementwise operations like ``df.x + 1``
29+
- Filtering like ``df[df.name == 'Alice']``
30+
- Column addition like ``df['z'] = df.x + df.y``
31+
- Reductions like ``df.amount.mean()``
32+
- Windowed aggregations (fixed length) like ``df.window(n=100).amount.sum()``
33+
34+
The following operations are not supported with cudf(as of version 0.8) yet
35+
- Groupby-aggregations like ``df.groupby(df.name).amount.mean()``
36+
- Windowed aggregations (index valued) like ``df.window(value='2h').amount.sum()``
37+
- Windowed groupby aggregations like ``df.window(value='2h').groupby('name').amount.sum()``
38+
39+
40+
Window based Aggregations with cudf are supported just as explained in ``dataframes`` documentation.
41+
The support for groupby operations will be added in future.

examples/gpu-dataframes.ipynb

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"from streamz.dataframe import DataFrame\n",
10+
"import cudf"
11+
]
12+
},
13+
{
14+
"cell_type": "markdown",
15+
"metadata": {},
16+
"source": [
17+
"# Basic example"
18+
]
19+
},
20+
{
21+
"cell_type": "code",
22+
"execution_count": null,
23+
"metadata": {},
24+
"outputs": [],
25+
"source": [
26+
"\n",
27+
"cu_df = cudf.DataFrame({'x': np.arange(10, dtype=float)+10, 'y': [1.0, 2.0] * 5})\n",
28+
"\n",
29+
"sdf = DataFrame(example=cu_df)\n",
30+
"\n",
31+
"L = sdf.window(n=15).x.sum().stream.sink_to_list()"
32+
]
33+
},
34+
{
35+
"cell_type": "code",
36+
"execution_count": null,
37+
"metadata": {},
38+
"outputs": [],
39+
"source": [
40+
"sdf.emit(cu_df.iloc[:8])\n",
41+
"sdf.emit(cu_df)\n",
42+
"sdf.emit(cu_df)"
43+
]
44+
},
45+
{
46+
"cell_type": "code",
47+
"execution_count": null,
48+
"metadata": {},
49+
"outputs": [],
50+
"source": [
51+
"print(L[0])\n",
52+
"print(L[1])\n",
53+
"print(L[2])"
54+
]
55+
},
56+
{
57+
"cell_type": "markdown",
58+
"metadata": {},
59+
"source": [
60+
"# Advanced example\n",
61+
"The following pipeline reads json encoded strings from Kafka in batches and process them on GPUs and write the result back to a different Kafka topic. This pipeline can be easily extended to run on Dask Stream as well.\n",
62+
"Note: Uses cudf 0.8"
63+
]
64+
},
65+
{
66+
"cell_type": "code",
67+
"execution_count": null,
68+
"metadata": {},
69+
"outputs": [],
70+
"source": [
71+
"# read messages from kafka and create a stream\n",
72+
"\n",
73+
"consume_topic = \"my-topic\"\n",
74+
"produce_topic = \"my-out-topic\"\n",
75+
"bootstrap_servers = 'localhost:9092'\n",
76+
"consumer_conf = {'bootstrap.servers': bootstrap_servers,\n",
77+
" 'group.id': 'group-123', 'session.timeout.ms': 600}\n",
78+
"producer_conf = {'bootstrap.servers': bootstrap_servers}\n",
79+
"\n",
80+
"stream = Stream.from_kafka_batched(consume_topic, consumer_conf, poll_interval='10s',\n",
81+
" npartitions=10, asynchronous=True)"
82+
]
83+
},
84+
{
85+
"cell_type": "code",
86+
"execution_count": null,
87+
"metadata": {},
88+
"outputs": [],
89+
"source": [
90+
"# convert batch of encoded json strings to gpu dataframes\n",
91+
"cudf_stream = stream\\\n",
92+
" .map(lambda msgs: \"\\n\".join([msg.decode('utf-8') for msg in msgs]))\\\n",
93+
" .map(cudf.read_json, lines=True)\n",
94+
"\n",
95+
"# create a streamz dataframe from the above stream and sample dataframe\n",
96+
"cudf_example = cudf.DataFrame({'x': np.arange(10, dtype=float)+10, 'y': [1.0, 2.0] * 5})\n",
97+
"stdf = DataFrame(cudf_stream, example=cudf_example)\n",
98+
"\n",
99+
"# perform aggregation and write to kafka\n",
100+
"stdf.window(n=15).x.mean().stream.to_kafka(produce_topic, producer_conf)\n"
101+
]
102+
}
103+
],
104+
"metadata": {
105+
"kernelspec": {
106+
"display_name": "Python 3",
107+
"language": "python",
108+
"name": "python3"
109+
},
110+
"language_info": {
111+
"codemirror_mode": {
112+
"name": "ipython",
113+
"version": 3
114+
},
115+
"file_extension": ".py",
116+
"mimetype": "text/x-python",
117+
"name": "python",
118+
"nbconvert_exporter": "python",
119+
"pygments_lexer": "ipython3",
120+
"version": "3.6.3"
121+
}
122+
},
123+
"nbformat": 4,
124+
"nbformat_minor": 2
125+
}

0 commit comments

Comments
 (0)