Skip to content

Commit e248048

Browse files
authored
Feature: Early Window Expiration with Triggers (#1044)
1 parent 812cd08 commit e248048

File tree

11 files changed

+984
-20
lines changed

11 files changed

+984
-20
lines changed

docs/windowing.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,76 @@ if __name__ == '__main__':
593593
```
594594

595595

596+
### Early window expiration with triggers
597+
!!! info New in v3.24.0
598+
599+
To expire windows before their natural expiration time based on custom conditions, you can pass `before_update` or `after_update` callbacks to `.tumbling_window()` and `.hopping_window()` methods.
600+
601+
This is useful when you want to emit results as soon as certain conditions are met, rather than waiting for the window to close naturally.
602+
603+
**How it works**:
604+
605+
- The `before_update` callback is invoked before the window aggregation is updated with a new value.
606+
- The `after_update` callback is invoked after the window aggregation has been updated with a new value.
607+
- Both callbacks receive: `aggregated` (current or updated aggregated value), `value` (incoming value), `key`, `timestamp`, and `headers`.
608+
- For `collect()` operations without aggregation, `aggregated` contains the list of collected values.
609+
- If either callback returns `True`, the window is immediately expired and emitted downstream.
610+
- The window metadata is deleted from state, but collected values (if using `.collect()`) remain until natural expiration.
611+
- This means a triggered window can be "resurrected" if new data arrives within its time range - a new window will be created with the previously collected values still present.
612+
613+
**Example with after_update**:
614+
615+
```python
616+
from typing import Any
617+
618+
from datetime import timedelta
619+
from quixstreams import Application
620+
621+
app = Application(...)
622+
sdf = app.dataframe(...)
623+
624+
625+
def trigger_on_threshold(
626+
aggregated: int, value: Any, key: Any, timestamp: int, headers: Any
627+
) -> bool:
628+
"""
629+
Expire the window early when the sum exceeds 1000.
630+
"""
631+
return aggregated > 1000
632+
633+
634+
# Define a 1-hour tumbling window with early expiration trigger
635+
sdf = (
636+
sdf.tumbling_window(timedelta(hours=1), after_update=trigger_on_threshold)
637+
.sum()
638+
.final()
639+
)
640+
641+
# Start the application
642+
if __name__ == '__main__':
643+
app.run()
644+
645+
```
646+
647+
**Example with before_update**:
648+
649+
```python
650+
def trigger_before_large_value(
651+
aggregated: int, value: Any, key: Any, timestamp: int, headers: Any
652+
) -> bool:
653+
"""
654+
Expire the window before adding a value if it would make the sum too large.
655+
"""
656+
return (aggregated + value) > 1000
657+
658+
659+
sdf = (
660+
sdf.tumbling_window(timedelta(hours=1), before_update=trigger_before_large_value)
661+
.sum()
662+
.final()
663+
)
664+
```
665+
596666

597667
## Emitting results
598668

quixstreams/dataframe/dataframe.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@
7272
TumblingCountWindowDefinition,
7373
TumblingTimeWindowDefinition,
7474
)
75-
from .windows.base import WindowOnLateCallback
75+
from .windows.base import (
76+
WindowAfterUpdateCallback,
77+
WindowBeforeUpdateCallback,
78+
WindowOnLateCallback,
79+
)
7680

7781
if typing.TYPE_CHECKING:
7882
from quixstreams.processing import ProcessingContext
@@ -1085,6 +1089,8 @@ def tumbling_window(
10851089
grace_ms: Union[int, timedelta] = 0,
10861090
name: Optional[str] = None,
10871091
on_late: Optional[WindowOnLateCallback] = None,
1092+
before_update: Optional[WindowBeforeUpdateCallback] = None,
1093+
after_update: Optional[WindowAfterUpdateCallback] = None,
10881094
) -> TumblingTimeWindowDefinition:
10891095
"""
10901096
Create a time-based tumbling window transformation on this StreamingDataFrame.
@@ -1151,6 +1157,20 @@ def tumbling_window(
11511157
(default behavior).
11521158
Otherwise, no message will be logged.
11531159
1160+
:param before_update: an optional callback to trigger early window expiration
1161+
before the window is updated.
1162+
The callback receives `aggregated` (current aggregated value or default/None),
1163+
`value`, `key`, `timestamp`, and `headers`.
1164+
If it returns `True`, the window will be expired immediately.
1165+
Default - `None`.
1166+
1167+
:param after_update: an optional callback to trigger early window expiration
1168+
after the window is updated.
1169+
The callback receives `aggregated` (updated aggregated value), `value`, `key`,
1170+
`timestamp`, and `headers`.
1171+
If it returns `True`, the window will be expired immediately.
1172+
Default - `None`.
1173+
11541174
:return: `TumblingTimeWindowDefinition` instance representing the tumbling window
11551175
configuration.
11561176
This object can be further configured with aggregation functions
@@ -1166,6 +1186,8 @@ def tumbling_window(
11661186
dataframe=self,
11671187
name=name,
11681188
on_late=on_late,
1189+
before_update=before_update,
1190+
after_update=after_update,
11691191
)
11701192

11711193
def tumbling_count_window(
@@ -1225,6 +1247,8 @@ def hopping_window(
12251247
grace_ms: Union[int, timedelta] = 0,
12261248
name: Optional[str] = None,
12271249
on_late: Optional[WindowOnLateCallback] = None,
1250+
before_update: Optional[WindowBeforeUpdateCallback] = None,
1251+
after_update: Optional[WindowAfterUpdateCallback] = None,
12281252
) -> HoppingTimeWindowDefinition:
12291253
"""
12301254
Create a time-based hopping window transformation on this StreamingDataFrame.
@@ -1302,6 +1326,20 @@ def hopping_window(
13021326
(default behavior).
13031327
Otherwise, no message will be logged.
13041328
1329+
:param before_update: an optional callback to trigger early window expiration
1330+
before the window is updated.
1331+
The callback receives `aggregated` (current aggregated value or default/None),
1332+
`value`, `key`, `timestamp`, and `headers`.
1333+
If it returns `True`, the window will be expired immediately.
1334+
Default - `None`.
1335+
1336+
:param after_update: an optional callback to trigger early window expiration
1337+
after the window is updated.
1338+
The callback receives `aggregated` (updated aggregated value), `value`, `key`,
1339+
`timestamp`, and `headers`.
1340+
If it returns `True`, the window will be expired immediately.
1341+
Default - `None`.
1342+
13051343
:return: `HoppingTimeWindowDefinition` instance representing the hopping
13061344
window configuration.
13071345
This object can be further configured with aggregation functions
@@ -1319,6 +1357,8 @@ def hopping_window(
13191357
dataframe=self,
13201358
name=name,
13211359
on_late=on_late,
1360+
before_update=before_update,
1361+
after_update=after_update,
13221362
)
13231363

13241364
def hopping_count_window(

quixstreams/dataframe/windows/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
WindowResult: TypeAlias = dict[str, Any]
3535
WindowKeyResult: TypeAlias = tuple[Any, WindowResult]
3636
Message: TypeAlias = tuple[WindowResult, Any, int, Any]
37+
WindowBeforeUpdateCallback: TypeAlias = Callable[[Any, Any, Any, int, Any], bool]
38+
WindowAfterUpdateCallback: TypeAlias = Callable[[Any, Any, Any, int, Any], bool]
3739

3840
WindowAggregateFunc = Callable[[Any, Any], Any]
3941

@@ -65,6 +67,7 @@ def process_window(
6567
value: Any,
6668
key: Any,
6769
timestamp_ms: int,
70+
headers: Any,
6871
transaction: WindowedPartitionTransaction,
6972
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
7073
pass
@@ -134,6 +137,7 @@ def window_callback(
134137
value=value,
135138
key=key,
136139
timestamp_ms=timestamp_ms,
140+
headers=_headers,
137141
transaction=transaction,
138142
)
139143
# Use window start timestamp as a new record timestamp
@@ -176,7 +180,11 @@ def window_callback(
176180
transaction: WindowedPartitionTransaction,
177181
) -> Iterable[Message]:
178182
updated_windows, expired_windows = self.process_window(
179-
value=value, key=key, timestamp_ms=timestamp_ms, transaction=transaction
183+
value=value,
184+
key=key,
185+
timestamp_ms=timestamp_ms,
186+
headers=_headers,
187+
transaction=transaction,
180188
)
181189

182190
# loop over the expired_windows generator to ensure the windows

quixstreams/dataframe/windows/count_based.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def process_window(
5858
value: Any,
5959
key: Any,
6060
timestamp_ms: int,
61+
headers: Any,
6162
transaction: WindowedPartitionTransaction[str, CountWindowsData],
6263
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
6364
"""

quixstreams/dataframe/windows/definitions.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
)
1616
from .base import (
1717
Window,
18+
WindowAfterUpdateCallback,
19+
WindowBeforeUpdateCallback,
1820
WindowOnLateCallback,
1921
)
2022
from .count_based import (
@@ -54,11 +56,15 @@ def __init__(
5456
name: Optional[str],
5557
dataframe: "StreamingDataFrame",
5658
on_late: Optional[WindowOnLateCallback] = None,
59+
before_update: Optional[WindowBeforeUpdateCallback] = None,
60+
after_update: Optional[WindowAfterUpdateCallback] = None,
5761
) -> None:
5862
super().__init__()
5963

6064
self._name = name
6165
self._on_late = on_late
66+
self._before_update = before_update
67+
self._after_update = after_update
6268
self._dataframe = dataframe
6369

6470
@abstractmethod
@@ -239,6 +245,8 @@ def __init__(
239245
name: Optional[str] = None,
240246
step_ms: Optional[int] = None,
241247
on_late: Optional[WindowOnLateCallback] = None,
248+
before_update: Optional[WindowBeforeUpdateCallback] = None,
249+
after_update: Optional[WindowAfterUpdateCallback] = None,
242250
):
243251
if not isinstance(duration_ms, int):
244252
raise TypeError("Window size must be an integer")
@@ -253,7 +261,7 @@ def __init__(
253261
f"got {step_ms}ms"
254262
)
255263

256-
super().__init__(name, dataframe, on_late)
264+
super().__init__(name, dataframe, on_late, before_update, after_update)
257265

258266
self._duration_ms = duration_ms
259267
self._grace_ms = grace_ms
@@ -281,6 +289,8 @@ def __init__(
281289
dataframe: "StreamingDataFrame",
282290
name: Optional[str] = None,
283291
on_late: Optional[WindowOnLateCallback] = None,
292+
before_update: Optional[WindowBeforeUpdateCallback] = None,
293+
after_update: Optional[WindowAfterUpdateCallback] = None,
284294
):
285295
super().__init__(
286296
duration_ms=duration_ms,
@@ -289,6 +299,8 @@ def __init__(
289299
name=name,
290300
step_ms=step_ms,
291301
on_late=on_late,
302+
before_update=before_update,
303+
after_update=after_update,
292304
)
293305

294306
def _get_name(self, func_name: Optional[str]) -> str:
@@ -320,6 +332,8 @@ def _create_window(
320332
aggregators=aggregators or {},
321333
collectors=collectors or {},
322334
on_late=self._on_late,
335+
before_update=self._before_update,
336+
after_update=self._after_update,
323337
)
324338

325339

@@ -331,13 +345,17 @@ def __init__(
331345
dataframe: "StreamingDataFrame",
332346
name: Optional[str] = None,
333347
on_late: Optional[WindowOnLateCallback] = None,
348+
before_update: Optional[WindowBeforeUpdateCallback] = None,
349+
after_update: Optional[WindowAfterUpdateCallback] = None,
334350
):
335351
super().__init__(
336352
duration_ms=duration_ms,
337353
grace_ms=grace_ms,
338354
dataframe=dataframe,
339355
name=name,
340356
on_late=on_late,
357+
before_update=before_update,
358+
after_update=after_update,
341359
)
342360

343361
def _get_name(self, func_name: Optional[str]) -> str:
@@ -368,6 +386,8 @@ def _create_window(
368386
aggregators=aggregators or {},
369387
collectors=collectors or {},
370388
on_late=self._on_late,
389+
before_update=self._before_update,
390+
after_update=self._after_update,
371391
)
372392

373393

@@ -379,13 +399,22 @@ def __init__(
379399
dataframe: "StreamingDataFrame",
380400
name: Optional[str] = None,
381401
on_late: Optional[WindowOnLateCallback] = None,
402+
before_update: Optional[WindowBeforeUpdateCallback] = None,
403+
after_update: Optional[WindowAfterUpdateCallback] = None,
382404
):
405+
if before_update is not None or after_update is not None:
406+
raise ValueError(
407+
"Sliding windows do not support trigger callbacks (before_update/after_update). "
408+
"Use tumbling or hopping windows instead."
409+
)
383410
super().__init__(
384411
duration_ms=duration_ms,
385412
grace_ms=grace_ms,
386413
dataframe=dataframe,
387414
name=name,
388415
on_late=on_late,
416+
before_update=before_update,
417+
after_update=after_update,
389418
)
390419

391420
def _get_name(self, func_name: Optional[str]) -> str:
@@ -417,6 +446,8 @@ def _create_window(
417446
aggregators=aggregators or {},
418447
collectors=collectors or {},
419448
on_late=self._on_late,
449+
before_update=self._before_update,
450+
after_update=self._after_update,
420451
)
421452

422453

quixstreams/dataframe/windows/sliding.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def process_window(
3535
value: Any,
3636
key: Any,
3737
timestamp_ms: int,
38+
headers: Any,
3839
transaction: WindowedPartitionTransaction,
3940
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
4041
"""

0 commit comments

Comments
 (0)