Skip to content

Commit d60a6e4

Browse files
authored
Merge pull request #268 from chinmaychandak/master
Add groupby aggregate tests, along with a few additions for cudf integration.
2 parents a00b6e3 + e691394 commit d60a6e4

File tree

2 files changed

+339
-171
lines changed

2 files changed

+339
-171
lines changed

streamz/dataframe/aggregations.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from numbers import Number
55

66
import numpy as np
7+
import pandas as pd
78
from .utils import is_series_like, is_index_like, get_dataframe_package
89

910

@@ -202,10 +203,10 @@ def diff_loc(dfs, new, window=None):
202203
"""
203204
dfs = deque(dfs)
204205
dfs.append(new)
205-
mx = max(df.index.max() for df in dfs)
206+
mx = pd.Timestamp(max(df.index.max() for df in dfs))
206207
mn = mx - window
207208
old = []
208-
while dfs[0].index.min() < mn:
209+
while pd.Timestamp(dfs[0].index.min()) < mn:
209210
o = dfs[0].loc[:mn]
210211
old.append(o) # TODO: avoid copy if fully lost
211212
dfs[0] = dfs[0].iloc[len(o):]
@@ -347,8 +348,6 @@ def windowed_groupby_accumulator(acc, new, diff=None, window=None, agg=None, gro
347348
for o, og in zip(old, old_groupers):
348349
if 'groupers' in acc:
349350
assert len(o) == len(og)
350-
if hasattr(og, 'index'):
351-
assert (o.index == og.index).all()
352351
if len(o):
353352
state, result = agg.on_old(state, o, grouper=og)
354353
size_state, _ = size.on_old(size_state, o, grouper=og)
@@ -407,11 +406,13 @@ class GroupbySum(GroupbyAggregation):
407406
def on_new(self, acc, new, grouper=None):
408407
g = self.grouped(new, grouper=grouper)
409408
result = acc.add(g.sum(), fill_value=0)
409+
result.index.name = acc.index.name
410410
return result, result
411411

412412
def on_old(self, acc, old, grouper=None):
413413
g = self.grouped(old, grouper=grouper)
414414
result = acc.sub(g.sum(), fill_value=0)
415+
result.index.name = acc.index.name
415416
return result, result
416417

417418
def initial(self, new, grouper=None):
@@ -427,12 +428,14 @@ def on_new(self, acc, new, grouper=None):
427428
g = self.grouped(new, grouper=grouper)
428429
result = acc.add(g.count(), fill_value=0)
429430
result = result.astype(int)
431+
result.index.name = acc.index.name
430432
return result, result
431433

432434
def on_old(self, acc, old, grouper=None):
433435
g = self.grouped(old, grouper=grouper)
434436
result = acc.sub(g.count(), fill_value=0)
435437
result = result.astype(int)
438+
result.index.name = acc.index.name
436439
return result, result
437440

438441
def initial(self, new, grouper=None):
@@ -448,12 +451,14 @@ def on_new(self, acc, new, grouper=None):
448451
g = self.grouped(new, grouper=grouper)
449452
result = acc.add(g.size(), fill_value=0)
450453
result = result.astype(int)
454+
result.index.name = acc.index.name
451455
return result, result
452456

453457
def on_old(self, acc, old, grouper=None):
454458
g = self.grouped(old, grouper=grouper)
455459
result = acc.sub(g.size(), fill_value=0)
456460
result = result.astype(int)
461+
result.index.name = acc.index.name
457462
return result, result
458463

459464
def initial(self, new, grouper=None):
@@ -467,10 +472,12 @@ def initial(self, new, grouper=None):
467472
class ValueCounts(Aggregation):
468473
def on_new(self, acc, new, grouper=None):
469474
result = acc.add(new.value_counts(), fill_value=0).astype(int)
475+
result.index.name = acc.index.name
470476
return result, result
471477

472478
def on_old(self, acc, new, grouper=None):
473479
result = acc.sub(new.value_counts(), fill_value=0).astype(int)
480+
result.index.name = acc.index.name
474481
return result, result
475482

476483
def initial(self, new, grouper=None):
@@ -483,15 +490,17 @@ def on_new(self, acc, new, grouper=None):
483490
g = self.grouped(new, grouper=grouper)
484491
totals = totals.add(g.sum(), fill_value=0)
485492
counts = counts.add(g.count(), fill_value=0)
486-
493+
totals.index.name = acc[0].index.name
494+
counts.index.name = acc[1].index.name
487495
return (totals, counts), totals / counts
488496

489497
def on_old(self, acc, old, grouper=None):
490498
totals, counts = acc
491499
g = self.grouped(old, grouper=grouper)
492500
totals = totals.sub(g.sum(), fill_value=0)
493501
counts = counts.sub(g.count(), fill_value=0)
494-
502+
totals.index.name = acc[0].index.name
503+
counts.index.name = acc[1].index.name
495504
return (totals, counts), totals / counts
496505

497506
def initial(self, new, grouper=None):

0 commit comments

Comments
 (0)