Skip to content

Commit 23345cf

Browse files
committed
fix(rolling.skew): handle outliers in window
This change uses online update for the mean instead of computing the centralized values. Additionally, it checks for possible catastrophic cancellation by big changes in 3rd central moment.
1 parent 7401a36 commit 23345cf

File tree

2 files changed

+85
-124
lines changed

2 files changed

+85
-124
lines changed

pandas/_libs/window/aggregations.pyx

Lines changed: 76 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# cython: boundscheck=False, wraparound=False, cdivision=True
22

33
from libc.math cimport (
4+
fabs,
45
round,
56
signbit,
67
sqrt,
@@ -482,196 +483,149 @@ def roll_var(const float64_t[:] values, ndarray[int64_t] start,
482483

483484

484485
cdef float64_t calc_skew(int64_t minp, int64_t nobs,
485-
float64_t x, float64_t xx, float64_t xxx,
486-
int64_t num_consecutive_same_value
486+
float64_t mean, float64_t m2, float64_t m3,
487487
) noexcept nogil:
488488
cdef:
489489
float64_t result, dnobs
490-
float64_t A, B, C, R
490+
float64_t moments_ratio, correction
491491

492492
if nobs >= minp:
493493
dnobs = <float64_t>nobs
494-
A = x / dnobs
495-
B = xx / dnobs - A * A
496-
C = xxx / dnobs - A * A * A - 3 * A * B
497494

498495
if nobs < 3:
499496
result = NaN
500-
# GH 42064 46431
501-
# uniform case, force result to be 0
502-
elif num_consecutive_same_value >= nobs:
503-
result = 0.0
504-
# #18044: with uniform distribution, floating issue will
505-
# cause B != 0. and cause the result is a very
497+
# #18044: with degenerate distribution, floating issue will
498+
# cause m2 != 0. and cause the result is a very
506499
# large number.
507500
#
508501
# in core/nanops.py nanskew/nankurt call the function
509502
# _zero_out_fperr(m2) to fix floating error.
510503
# if the variance is less than 1e-14, it could be
511504
# treat as zero, here we follow the original
512-
# skew/kurt behaviour to check B <= 1e-14
513-
elif B <= 1e-14:
505+
# skew/kurt behaviour to check m2 < n * (eps * eps * mean * mean)
506+
elif m2 < dnobs * (1e-28 * mean * mean if fabs(mean) > 1e-14 else 1e-14):
514507
result = NaN
515508
else:
516-
R = sqrt(B)
517-
result = ((sqrt(dnobs * (dnobs - 1.)) * C) /
518-
((dnobs - 2) * R * R * R))
509+
moments_ratio = m3 / (m2 * sqrt(m2))
510+
correction = dnobs * sqrt((dnobs - 1)) / (dnobs - 2)
511+
result = moments_ratio * correction
519512
else:
520513
result = NaN
521514

522515
return result
523516

524517

525518
cdef void add_skew(float64_t val, int64_t *nobs,
526-
float64_t *x, float64_t *xx,
527-
float64_t *xxx,
528-
float64_t *compensation_x,
529-
float64_t *compensation_xx,
530-
float64_t *compensation_xxx,
531-
int64_t *num_consecutive_same_value,
532-
float64_t *prev_value,
519+
float64_t *mean, float64_t *m2,
520+
float64_t *m3,
521+
bint *numerically_unstable
533522
) noexcept nogil:
534523
""" add a value from the skew calc """
535524
cdef:
536-
float64_t y, t
525+
float64_t n, delta, delta_n, term1, m3_update, new_m3
537526

538527
# Not NaN
539528
if val == val:
540-
nobs[0] = nobs[0] + 1
529+
nobs[0] += 1
530+
n = <float64_t>(nobs[0])
531+
delta = val - mean[0]
532+
delta_n = delta / n
533+
term1 = delta * delta_n * (n - 1.0)
541534

542-
y = val - compensation_x[0]
543-
t = x[0] + y
544-
compensation_x[0] = t - x[0] - y
545-
x[0] = t
546-
y = val * val - compensation_xx[0]
547-
t = xx[0] + y
548-
compensation_xx[0] = t - xx[0] - y
549-
xx[0] = t
550-
y = val * val * val - compensation_xxx[0]
551-
t = xxx[0] + y
552-
compensation_xxx[0] = t - xxx[0] - y
553-
xxx[0] = t
535+
m3_update = delta_n * (term1 * (n - 2.0) - 3.0 * m2[0])
536+
new_m3 = m3[0] + m3_update
537+
if fabs(m3_update) + fabs(m3[0]) > 1e10 * fabs(new_m3):
538+
# possible catastrophic cancellation
539+
numerically_unstable[0] = True
554540

555-
# GH#42064, record num of same values to remove floating point artifacts
556-
if val == prev_value[0]:
557-
num_consecutive_same_value[0] += 1
558-
else:
559-
# reset to 1 (include current value itself)
560-
num_consecutive_same_value[0] = 1
561-
prev_value[0] = val
541+
m3[0] = new_m3
542+
m2[0] += term1
543+
mean[0] += delta_n
562544

563545

564546
cdef void remove_skew(float64_t val, int64_t *nobs,
565-
float64_t *x, float64_t *xx,
566-
float64_t *xxx,
567-
float64_t *compensation_x,
568-
float64_t *compensation_xx,
569-
float64_t *compensation_xxx) noexcept nogil:
547+
float64_t *mean, float64_t *m2,
548+
float64_t *m3,
549+
bint *numerically_unstable) noexcept nogil:
570550
""" remove a value from the skew calc """
571551
cdef:
572-
float64_t y, t
552+
float64_t n, delta, delta_n, term1, m3_update, new_m3
573553

574554
# Not NaN
575555
if val == val:
576-
nobs[0] = nobs[0] - 1
556+
nobs[0] -= 1
557+
n = <float64_t>(nobs[0])
558+
delta = val - mean[0]
559+
delta_n = delta / n
560+
term1 = delta_n * delta * (n + 1.0)
577561

578-
y = - val - compensation_x[0]
579-
t = x[0] + y
580-
compensation_x[0] = t - x[0] - y
581-
x[0] = t
582-
y = - val * val - compensation_xx[0]
583-
t = xx[0] + y
584-
compensation_xx[0] = t - xx[0] - y
585-
xx[0] = t
586-
y = - val * val * val - compensation_xxx[0]
587-
t = xxx[0] + y
588-
compensation_xxx[0] = t - xxx[0] - y
589-
xxx[0] = t
562+
m3_update = delta_n * (term1 * (n + 2.0) - 3.0 * m2[0])
563+
new_m3 = m3[0] - m3_update
564+
565+
if fabs(m3_update) + fabs(m3[0]) > 1e10 * fabs(new_m3):
566+
# possible catastrophic cancellation
567+
numerically_unstable[0] = True
568+
569+
m3[0] = new_m3
570+
m2[0] -= term1
571+
mean[0] -= delta_n
590572

591573

592574
def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start,
593575
ndarray[int64_t] end, int64_t minp) -> np.ndarray:
594576
cdef:
595577
Py_ssize_t i, j
596-
float64_t val, min_val, mean_val, sum_val = 0
597-
float64_t compensation_xxx_add, compensation_xxx_remove
598-
float64_t compensation_xx_add, compensation_xx_remove
599-
float64_t compensation_x_add, compensation_x_remove
600-
float64_t x, xx, xxx
601-
float64_t prev_value
602-
int64_t nobs = 0, N = len(start), V = len(values), nobs_mean = 0
603-
int64_t s, e, num_consecutive_same_value
604-
ndarray[float64_t] output, values_copy
605-
bint is_monotonic_increasing_bounds
578+
float64_t val
579+
float64_t mean, m2, m3
580+
int64_t nobs = 0, N = len(start)
581+
int64_t s, e
582+
ndarray[float64_t] output
583+
bint requires_recompute, numerically_unstable = False
606584

607585
minp = max(minp, 3)
608586
is_monotonic_increasing_bounds = is_monotonic_increasing_start_end_bounds(
609587
start, end
610588
)
611589
output = np.empty(N, dtype=np.float64)
612-
min_val = np.nanmin(values)
613-
values_copy = np.copy(values)
614590

615591
with nogil:
616-
for i in range(0, V):
617-
val = values_copy[i]
618-
if val == val:
619-
nobs_mean += 1
620-
sum_val += val
621-
mean_val = sum_val / nobs_mean
622-
# Other cases would lead to imprecision for smallest values
623-
if min_val - mean_val > -1e5:
624-
mean_val = round(mean_val)
625-
for i in range(0, V):
626-
values_copy[i] = values_copy[i] - mean_val
627-
628592
for i in range(0, N):
629-
630593
s = start[i]
631594
e = end[i]
632595

633-
# Over the first window, observations can only be added
634-
# never removed
635-
if i == 0 or not is_monotonic_increasing_bounds or s >= end[i - 1]:
636-
637-
prev_value = values[s]
638-
num_consecutive_same_value = 0
639-
640-
compensation_xxx_add = compensation_xxx_remove = 0
641-
compensation_xx_add = compensation_xx_remove = 0
642-
compensation_x_add = compensation_x_remove = 0
643-
x = xx = xxx = 0
644-
nobs = 0
645-
for j in range(s, e):
646-
val = values_copy[j]
647-
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
648-
&compensation_xx_add, &compensation_xxx_add,
649-
&num_consecutive_same_value, &prev_value)
650-
651-
else:
596+
requires_recompute = (
597+
i == 0
598+
or not is_monotonic_increasing_bounds
599+
or s >= end[i - 1]
600+
or numerically_unstable
601+
)
652602

653-
# After the first window, observations can both be added
654-
# and removed
655-
# calculate deletes
603+
if not requires_recompute:
656604
for j in range(start[i - 1], s):
657-
val = values_copy[j]
658-
remove_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_remove,
659-
&compensation_xx_remove, &compensation_xxx_remove)
605+
val = values[j]
606+
remove_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)
660607

661608
# calculate adds
662609
for j in range(end[i - 1], e):
663-
val = values_copy[j]
664-
add_skew(val, &nobs, &x, &xx, &xxx, &compensation_x_add,
665-
&compensation_xx_add, &compensation_xxx_add,
666-
&num_consecutive_same_value, &prev_value)
610+
val = values[j]
611+
add_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)
667612

668-
output[i] = calc_skew(minp, nobs, x, xx, xxx, num_consecutive_same_value)
613+
if requires_recompute or numerically_unstable:
614+
numerically_unstable = False
615+
mean = m2 = m3 = 0.0
616+
nobs = 0
617+
618+
for j in range(s, e):
619+
val = values[j]
620+
add_skew(val, &nobs, &mean, &m2, &m3, &numerically_unstable)
621+
622+
output[i] = calc_skew(minp, nobs, mean, m2, m3)
669623

670624
if not is_monotonic_increasing_bounds:
671625
nobs = 0
672-
x = 0.0
673-
xx = 0.0
674-
xxx = 0.0
626+
mean = 0.0
627+
m2 = 0.0
628+
m3 = 0.0
675629

676630
return output
677631

pandas/tests/window/test_rolling.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,9 @@ def test_rolling_decreasing_indices(method):
11721172
increasing = getattr(df.rolling(window=5), method)()
11731173
decreasing = getattr(df_reverse.rolling(window=5), method)()
11741174

1175-
assert np.abs(decreasing.values[::-1][:-4] - increasing.values[4:]).max() < 1e-12
1175+
tm.assert_almost_equal(
1176+
decreasing.values[::-1][:-4], increasing.values[4:], atol=1e-12
1177+
)
11761178

11771179

11781180
@pytest.mark.parametrize(
@@ -1438,13 +1440,18 @@ def test_rolling_skew_kurt_numerical_stability(method):
14381440

14391441

14401442
@pytest.mark.parametrize(
1441-
"method, data, values",
1443+
("method", "data", "values"),
14421444
[
14431445
(
14441446
"skew",
14451447
[3000000, 1, 1, 2, 3, 4, 999],
14461448
[np.nan] * 3 + [2.0, 0.854563, 0.0, 1.999984],
14471449
),
1450+
(
1451+
"skew",
1452+
[1e6, -1e6, 1, 2, 3, 4, 5, 6],
1453+
[np.nan] * 3 + [-5.51135192e-06, -2.0, 0.0, 0.0, 0.0],
1454+
),
14481455
(
14491456
"kurt",
14501457
[3000000, 1, 1, 2, 3, 4, 999],

0 commit comments

Comments
 (0)