Skip to content

Commit 6445fa9

Browse files
authored
Merge pull request #224 from skmatti/master
Add cudf support for Streamz DataFrame
2 parents 35b8e9c + d22149d commit 6445fa9

File tree

6 files changed

+590
-51
lines changed

6 files changed

+590
-51
lines changed

streamz/collection.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import operator
2+
import types
23

34
from streamz import Stream, core
45

@@ -41,10 +42,9 @@ def map_partitions(func, *args, **kwargs):
4142
if not isinstance(arg, Streaming)]
4243
stream = s.stream.map(partial_by_order, function=func, other=other,
4344
**kwargs)
44-
45-
for typ, s_type in _stream_types[stream_type]:
46-
if isinstance(example, typ):
47-
return s_type(stream, example)
45+
s_type = get_stream_type(example, stream_type)
46+
if s_type:
47+
return s_type(stream, example)
4848
return Streaming(stream, example, stream_type=stream_type)
4949

5050

@@ -204,11 +204,11 @@ def accumulate_partitions(self, func, *args, **kwargs):
204204
if returns_state:
205205
_, example = example
206206
stream = self.stream.accumulate(func, *args, start=start,
207-
returns_state=returns_state, **kwargs)
207+
returns_state=returns_state, **kwargs)
208208

209-
for typ, s_type in _stream_types[stream_type]:
210-
if isinstance(example, typ):
211-
return s_type(stream, example)
209+
s_type = get_stream_type(example, stream_type)
210+
if s_type:
211+
return s_type(stream, example)
212212
return Streaming(stream, example, stream_type=stream_type)
213213

214214
def __repr__(self):
@@ -242,12 +242,16 @@ def verify(self, x):
242242
(self._subtype, type(x)))
243243

244244

245-
def stream_type(example, stream_type='streaming'):
245+
def get_stream_type(example, stream_type='streaming'):
246246
for typ, s_type in _stream_types[stream_type]:
247-
if isinstance(example, typ):
247+
if isinstance(typ, types.FunctionType):
248+
"""For Frame like objects we use utility functions to check type.
249+
i.e, DataFrame like objects are checked using is_dataframe_like."""
250+
if typ(example):
251+
return s_type
252+
elif isinstance(example, typ):
248253
return s_type
249-
raise TypeError("No streaming equivalent found for type %s" %
250-
type(example).__name__)
254+
return None
251255

252256

253257
def partial_by_order(*args, **kwargs):

streamz/dataframe/aggregations.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import numpy as np
77
import pandas as pd
8+
from .utils import is_series_like, is_index_like
89

910

1011
class Aggregation(object):
@@ -324,7 +325,7 @@ def windowed_groupby_accumulator(acc, new, diff=None, window=None, agg=None, gro
324325
acc = {'dfs': [],
325326
'state': agg.initial(new, grouper=grouper),
326327
'size-state': size.initial(new, grouper=grouper)}
327-
if isinstance(grouper, (pd.Series, pd.Index, np.ndarray)):
328+
if isinstance(grouper, np.ndarray) or is_series_like(grouper) or is_index_like(grouper):
328329
acc['groupers'] = deque([])
329330

330331
dfs = acc['dfs']
@@ -416,7 +417,7 @@ def on_old(self, acc, old, grouper=None):
416417
def initial(self, new, grouper=None):
417418
if hasattr(grouper, 'iloc'):
418419
grouper = grouper.iloc[:0]
419-
if isinstance(grouper, (pd.Index, np.ndarray)):
420+
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
420421
grouper = grouper[:0]
421422
return self.grouped(new.iloc[:0], grouper=grouper).sum()
422423

@@ -437,7 +438,7 @@ def on_old(self, acc, old, grouper=None):
437438
def initial(self, new, grouper=None):
438439
if hasattr(grouper, 'iloc'):
439440
grouper = grouper.iloc[:0]
440-
if isinstance(grouper, (pd.Index, np.ndarray)):
441+
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
441442
grouper = grouper[:0]
442443
return self.grouped(new.iloc[:0], grouper=grouper).count()
443444

@@ -458,7 +459,7 @@ def on_old(self, acc, old, grouper=None):
458459
def initial(self, new, grouper=None):
459460
if hasattr(grouper, 'iloc'):
460461
grouper = grouper.iloc[:0]
461-
if isinstance(grouper, (pd.Index, np.ndarray)):
462+
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
462463
grouper = grouper[:0]
463464
return self.grouped(new.iloc[:0], grouper=grouper).size()
464465

@@ -496,7 +497,7 @@ def on_old(self, acc, old, grouper=None):
496497
def initial(self, new, grouper=None):
497498
if hasattr(grouper, 'iloc'):
498499
grouper = grouper.iloc[:0]
499-
if isinstance(grouper, (pd.Index, np.ndarray)):
500+
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
500501
grouper = grouper[:0]
501502
g = self.grouped(new.iloc[:0], grouper=grouper)
502503
return (g.sum(), g.count())
@@ -532,7 +533,7 @@ def on_old(self, acc, old, grouper=None):
532533
def initial(self, new, grouper=None):
533534
if hasattr(grouper, 'iloc'):
534535
grouper = grouper.iloc[:0]
535-
if isinstance(grouper, (pd.Index, np.ndarray)):
536+
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
536537
grouper = grouper[:0]
537538

538539
new = new.iloc[:0]

streamz/dataframe/core.py

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from __future__ import division, print_function
22

3-
from collections import OrderedDict
43
import operator
4+
from collections import OrderedDict
55
from time import time
6-
76
import numpy as np
87
import pandas as pd
98
import toolz
@@ -14,6 +13,7 @@
1413
from ..sources import Source
1514
from ..utils import M
1615
from . import aggregations
16+
from .utils import is_dataframe_like, is_series_like, is_index_like, get_base_frame_type
1717

1818

1919
class BaseFrame(Streaming):
@@ -245,14 +245,16 @@ def __setitem__(self, key, value):
245245
else:
246246
example = self.example.copy()
247247
example[key] = value
248-
result = self.map_partitions(pd.DataFrame.assign, self, **{key: value})
248+
df_type = type(self.example)
249+
result = self.map_partitions(df_type.assign, self, **{key: value})
249250

250251
self.stream = result.stream
251252
self.example = result.example
252253
return self
253254

254255
def query(self, expr, **kwargs):
255-
return self.map_partitions(pd.DataFrame.query, self, expr, **kwargs)
256+
df_type = type(self.example)
257+
return self.map_partitions(df_type.query, self, expr, **kwargs)
256258

257259

258260
class DataFrame(Frame, _DataFrameMixin):
@@ -266,10 +268,9 @@ class DataFrame(Frame, _DataFrameMixin):
266268
--------
267269
Series
268270
"""
269-
_subtype = pd.DataFrame
270271

271272
def __init__(self, *args, **kwargs):
272-
# {'x': sdf.x + 1, 'y': sdf.y - 1}
273+
# {'x': sdf.x + 1, 'y': sdf.y - 1} - works only with pandas
273274
if len(args) == 1 and not kwargs and isinstance(args[0], dict):
274275
def concat(tup, columns=None):
275276
result = pd.concat(tup, axis=1)
@@ -283,7 +284,14 @@ def concat(tup, columns=None):
283284
for k, v in args[0].items()})
284285
DataFrame.__init__(self, stream, example)
285286
else:
286-
return super(DataFrame, self).__init__(*args, **kwargs)
287+
example = None
288+
if "example" in kwargs:
289+
example = kwargs.get('example')
290+
elif len(args) > 1:
291+
example = args[1]
292+
self._subtype = get_base_frame_type(self.__class__.__name__,
293+
is_dataframe_like, example)
294+
super(DataFrame, self).__init__(*args, **kwargs)
287295

288296
def verify(self, x):
289297
""" Verify consistency of elements that pass through this stream """
@@ -314,7 +322,20 @@ class Series(Frame, _SeriesMixin):
314322
--------
315323
DataFrame
316324
"""
317-
_subtype = pd.Series
325+
326+
def __init__(self, *args, **kwargs):
327+
example = None
328+
if "example" in kwargs:
329+
example = kwargs.get('example')
330+
elif len(args) > 1:
331+
example = args[1]
332+
if isinstance(self, Index):
333+
self._subtype = get_base_frame_type(self.__class__.__name__,
334+
is_index_like, example)
335+
else:
336+
self._subtype = get_base_frame_type(self.__class__.__name__,
337+
is_series_like, example)
338+
super(Series, self).__init__(*args, **kwargs)
318339

319340
def value_counts(self):
320341
return self.accumulate_partitions(aggregations.accumulator,
@@ -324,7 +345,7 @@ def value_counts(self):
324345

325346

326347
class Index(Series):
327-
_subtype = pd.Index
348+
pass
328349

329350

330351
class DataFrames(Frames, _DataFrameMixin):
@@ -362,6 +383,7 @@ class Rolling(object):
362383
>>> sdf.rolling(10).x.mean() # doctest: +SKIP
363384
>>> sdf.rolling('100ms').x.mean() # doctest: +SKIP
364385
"""
386+
365387
def __init__(self, sdf, window, min_periods):
366388
self.root = sdf
367389
if not isinstance(window, int):
@@ -382,12 +404,12 @@ def __getattr__(self, key):
382404

383405
def _known_aggregation(self, op, *args, **kwargs):
384406
return self.root.accumulate_partitions(rolling_accumulator,
385-
window=self.window,
386-
op=op,
387-
args=args,
388-
kwargs=kwargs,
389-
start=(),
390-
returns_state=True)
407+
window=self.window,
408+
op=op,
409+
args=args,
410+
kwargs=kwargs,
411+
start=(),
412+
returns_state=True)
391413

392414
def sum(self):
393415
""" Rolling sum """
@@ -440,6 +462,7 @@ class Window(OperatorMixin):
440462
--------
441463
DataFrame.window: contains full docstring
442464
"""
465+
443466
def __init__(self, sdf, n=None, value=None):
444467
if value is None and isinstance(n, (str, pd.Timedelta)):
445468
value = n
@@ -492,12 +515,12 @@ def aggregate(self, agg):
492515
diff = aggregations.diff_loc
493516
window = self.value
494517
return self.root.accumulate_partitions(aggregations.window_accumulator,
495-
diff=diff,
496-
window=window,
497-
agg=agg,
498-
start=None,
499-
returns_state=True,
500-
stream_type='updating')
518+
diff=diff,
519+
window=window,
520+
agg=agg,
521+
start=None,
522+
returns_state=True,
523+
stream_type='updating')
501524

502525
def full(self):
503526
return self.aggregate(aggregations.Full())
@@ -573,6 +596,7 @@ def _accumulate_size(accumulator, new):
573596

574597
class GroupBy(object):
575598
""" Groupby aggregations on streaming dataframes """
599+
576600
def __init__(self, root, grouper, index=None):
577601
self.root = root
578602
self.grouper = grouper
@@ -603,7 +627,7 @@ def _accumulate(self, Agg, **kwargs):
603627
state = agg.initial(self.root.example, grouper=grouper_example)
604628
if hasattr(grouper_example, 'iloc'):
605629
grouper_example = grouper_example.iloc[:0]
606-
elif isinstance(grouper_example, (np.ndarray, pd.Index)):
630+
elif isinstance(grouper_example, np.ndarray) or is_index_like(grouper_example):
607631
grouper_example = grouper_example[:0]
608632
_, example = agg.on_new(state,
609633
self.root.example.iloc[:0],
@@ -614,8 +638,9 @@ def _accumulate(self, Agg, **kwargs):
614638
start=None,
615639
returns_state=True)
616640

617-
for typ, s_type in _stream_types[stream_type]:
618-
if isinstance(example, typ):
641+
for fn, s_type in _stream_types[stream_type]:
642+
"""Function checks if example is of a specific frame type"""
643+
if fn(example):
619644
return s_type(outstream, example)
620645
return Streaming(outstream, example, stream_type=stream_type)
621646

@@ -646,6 +671,7 @@ def var(self, ddof=1):
646671

647672
class WindowedGroupBy(GroupBy):
648673
""" Groupby aggregations over a window of data """
674+
649675
def __init__(self, root, grouper, index=None, n=None, value=None):
650676
self.root = root
651677
self.grouper = grouper
@@ -678,7 +704,7 @@ def _accumulate(self, Agg, **kwargs):
678704
state = agg.initial(self.root.example, grouper=grouper_example)
679705
if hasattr(grouper_example, 'iloc'):
680706
grouper_example = grouper_example.iloc[:0]
681-
elif isinstance(grouper_example, (np.ndarray, pd.Index)):
707+
elif isinstance(grouper_example, np.ndarray) or is_index_like(grouper_example):
682708
grouper_example = grouper_example[:0]
683709
_, example = agg.on_new(state,
684710
self.root.example.iloc[:0],
@@ -698,8 +724,9 @@ def _accumulate(self, Agg, **kwargs):
698724
diff=diff,
699725
window=window)
700726

701-
for typ, s_type in _stream_types[stream_type]:
702-
if isinstance(example, typ):
727+
for fn, s_type in _stream_types[stream_type]:
728+
"""Function checks if example is of a specific frame type"""
729+
if fn(example):
703730
return s_type(outstream, example)
704731
return Streaming(outstream, example, stream_type=stream_type)
705732

@@ -712,7 +739,7 @@ def _random_df(tup):
712739
df = pd.DataFrame({'x': np.random.random(len(index)),
713740
'y': np.random.poisson(size=len(index)),
714741
'z': np.random.normal(0, 1, size=len(index))},
715-
index=index)
742+
index=index)
716743
return df
717744

718745

@@ -737,6 +764,7 @@ class Random(DataFrame):
737764
-------
738765
>>> source = Random(freq='100ms', interval='1s') # doctest: +SKIP
739766
"""
767+
740768
def __init__(self, freq='100ms', interval='500ms', dask=False):
741769
if dask:
742770
from streamz.dask import DaskStream
@@ -775,8 +803,8 @@ def _cb(interval, freq, source, continue_):
775803
last = now
776804

777805

778-
_stream_types['streaming'].append((pd.DataFrame, DataFrame))
779-
_stream_types['streaming'].append((pd.Index, Index))
780-
_stream_types['streaming'].append((pd.Series, Series))
781-
_stream_types['updating'].append((pd.DataFrame, DataFrames))
782-
_stream_types['updating'].append((pd.Series, Seriess))
806+
_stream_types['streaming'].append((is_dataframe_like, DataFrame))
807+
_stream_types['streaming'].append((is_index_like, Index))
808+
_stream_types['streaming'].append((is_series_like, Series))
809+
_stream_types['updating'].append((is_dataframe_like, DataFrames))
810+
_stream_types['updating'].append((is_series_like, Seriess))

0 commit comments

Comments
 (0)