1010class StreamingDataFrame ()
1111```
1212
13- [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / dataframe / dataframe .py # L90 )
13+ [[VIEW SOURCE ]](https :// github .com / quixio / quix - streams / blob / main / quixstreams / dataframe / dataframe .py # L94 )
1414
1515`StreamingDataFrame` is the main object you will use for ETL work .
1616
@@ -73,7 +73,7 @@ sdf = sdf.to_topic(topic_obj)
7373def stream_id () -> str
7474```
7575
76- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L175 )
76+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L179 )
7777
7878An identifier of the data stream this StreamingDataFrame
7979manipulates in the application.
@@ -107,7 +107,7 @@ def apply(func: Union[
107107 metadata: bool = False ) -> " StreamingDataFrame"
108108```
109109
110- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L234 )
110+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L238 )
111111
112112Apply a function to transform the value and return a new value.
113113
@@ -165,7 +165,7 @@ def update(func: Union[
165165 metadata: bool = False ) -> " StreamingDataFrame"
166166```
167167
168- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L338 )
168+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L342 )
169169
170170Apply a function to mutate value in - place or to perform a side effect
171171
@@ -233,7 +233,7 @@ def filter(func: Union[
233233 metadata: bool = False ) -> " StreamingDataFrame"
234234```
235235
236- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L441 )
236+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L445 )
237237
238238Filter value using provided function.
239239
@@ -285,7 +285,7 @@ def group_by(key: Union[str, Callable[[Any], Any]],
285285 key_serializer: SerializerType = " json" ) -> " StreamingDataFrame"
286286```
287287
288- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L526 )
288+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L530 )
289289
290290" Groups" messages by re- keying them via the provided group_by operation
291291
@@ -350,7 +350,7 @@ a clone with this operation added (assign to keep its effect).
350350def contains(keys: Union[str , list[str ]]) -> StreamingSeries
351351```
352352
353- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L640 )
353+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L644 )
354354
355355Check if keys are present in the Row value.
356356
@@ -392,7 +392,7 @@ def to_topic(
392392 key: Optional[Callable[[Any], Any]] = None ) -> " StreamingDataFrame"
393393```
394394
395- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L684 )
395+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L688 )
396396
397397Produce current value to a topic. You can optionally specify a new key.
398398
@@ -463,7 +463,7 @@ def set_timestamp(
463463 func: Callable[[Any, Any, int , Any], int ]) -> " StreamingDataFrame"
464464```
465465
466- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L753 )
466+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L757 )
467467
468468Set a new timestamp based on the current message value and its metadata.
469469
@@ -516,7 +516,7 @@ def set_headers(
516516) -> " StreamingDataFrame"
517517```
518518
519- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L796 )
519+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L800 )
520520
521521Set new message headers based on the current message value and metadata.
522522
@@ -565,7 +565,7 @@ a new StreamingDataFrame instance
565565def print (pretty: bool = True , metadata: bool = False ) -> " StreamingDataFrame"
566566```
567567
568- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L847 )
568+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L851 )
569569
570570Print out the current message value (and optionally, the message metadata) to
571571
@@ -628,7 +628,7 @@ def print_table(
628628 int ]] = None ) -> " StreamingDataFrame"
629629```
630630
631- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L893 )
631+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L897 )
632632
633633Print a table with the most recent records.
634634
@@ -721,7 +721,7 @@ sdf.print_table(size=5, title="Live Records", slowdown=1)
721721def compose(sink: Optional[VoidExecutor] = None ) -> dict[str , VoidExecutor]
722722```
723723
724- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1009 )
724+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1013 )
725725
726726Compose all functions of this StreamingDataFrame into one big closure.
727727
@@ -775,7 +775,7 @@ def test(value: Any,
775775 topic: Optional[Topic] = None ) -> List[Any]
776776```
777777
778- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1043 )
778+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1047 )
779779
780780A shorthand to test `StreamingDataFrame` with provided value
781781
@@ -811,11 +811,13 @@ def tumbling_window(
811811 duration_ms: Union[int , timedelta],
812812 grace_ms: Union[int , timedelta] = 0 ,
813813 name: Optional[str ] = None ,
814- on_late: Optional[WindowOnLateCallback] = None
814+ on_late: Optional[WindowOnLateCallback] = None ,
815+ before_update: Optional[WindowBeforeUpdateCallback] = None ,
816+ after_update: Optional[WindowAfterUpdateCallback] = None
815817) -> TumblingTimeWindowDefinition
816818```
817819
818- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1082 )
820+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1086 )
819821
820822Create a time- based tumbling window transformation on this StreamingDataFrame.
821823
@@ -885,6 +887,18 @@ sdf = (
885887 If the callback returns `True ` , the message about a late record will be logged
886888 (default behavior).
887889 Otherwise, no message will be logged.
890+ - `before_update` : an optional callback to trigger early window expiration
891+ before the window is updated.
892+ The callback receives `aggregated` (current aggregated value or default/ None ),
893+ `value` , `key` , `timestamp` , and `headers` .
894+ If it returns `True ` , the window will be expired immediately.
895+ Default - `None ` .
896+ - `after_update` : an optional callback to trigger early window expiration
897+ after the window is updated.
898+ The callback receives `aggregated` (updated aggregated value), `value` , `key` ,
899+ `timestamp` , and `headers` .
900+ If it returns `True ` , the window will be expired immediately.
901+ Default - `None ` .
888902
889903
890904< br>
@@ -907,7 +921,7 @@ def tumbling_count_window(
907921 name: Optional[str ] = None ) -> TumblingCountWindowDefinition
908922```
909923
910- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1171 )
924+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1193 )
911925
912926Create a count- based tumbling window transformation on this StreamingDataFrame.
913927
@@ -976,11 +990,13 @@ def hopping_window(
976990 step_ms: Union[int , timedelta],
977991 grace_ms: Union[int , timedelta] = 0 ,
978992 name: Optional[str ] = None ,
979- on_late: Optional[WindowOnLateCallback] = None
993+ on_late: Optional[WindowOnLateCallback] = None ,
994+ before_update: Optional[WindowBeforeUpdateCallback] = None ,
995+ after_update: Optional[WindowAfterUpdateCallback] = None
980996) -> HoppingTimeWindowDefinition
981997```
982998
983- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1221 )
999+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1243 )
9841000
9851001Create a time- based hopping window transformation on this StreamingDataFrame.
9861002
@@ -1060,6 +1076,18 @@ sdf = (
10601076 If the callback returns `True ` , the message about a late record will be logged
10611077 (default behavior).
10621078 Otherwise, no message will be logged.
1079+ - `before_update` : an optional callback to trigger early window expiration
1080+ before the window is updated.
1081+ The callback receives `aggregated` (current aggregated value or default/ None ),
1082+ `value` , `key` , `timestamp` , and `headers` .
1083+ If it returns `True ` , the window will be expired immediately.
1084+ Default - `None ` .
1085+ - `after_update` : an optional callback to trigger early window expiration
1086+ after the window is updated.
1087+ The callback receives `aggregated` (updated aggregated value), `value` , `key` ,
1088+ `timestamp` , and `headers` .
1089+ If it returns `True ` , the window will be expired immediately.
1090+ Default - `None ` .
10631091
10641092
10651093< br>
@@ -1083,7 +1111,7 @@ def hopping_count_window(
10831111 name: Optional[str ] = None ) -> HoppingCountWindowDefinition
10841112```
10851113
1086- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1324 )
1114+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1364 )
10871115
10881116Create a count- based hopping window transformation on this StreamingDataFrame.
10891117
@@ -1161,7 +1189,7 @@ def sliding_window(
11611189) -> SlidingTimeWindowDefinition
11621190```
11631191
1164- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1381 )
1192+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1421 )
11651193
11661194Create a time- based sliding window transformation on this StreamingDataFrame.
11671195
@@ -1259,7 +1287,7 @@ def sliding_count_window(
12591287 name: Optional[str ] = None ) -> SlidingCountWindowDefinition
12601288```
12611289
1262- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1476 )
1290+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1516 )
12631291
12641292Create a count- based sliding window transformation on this StreamingDataFrame.
12651293
@@ -1329,7 +1357,7 @@ sdf = (
13291357def fill(* columns: str , ** mapping: Any) -> " StreamingDataFrame"
13301358```
13311359
1332- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1529 )
1360+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1569 )
13331361
13341362Fill missing values in the message value with a constant value.
13351363
@@ -1386,7 +1414,7 @@ def drop(columns: Union[str, List[str]],
13861414 errors: Literal[" ignore" , " raise" ] = " raise" ) -> " StreamingDataFrame"
13871415```
13881416
1389- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1581 )
1417+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1621 )
13901418
13911419Drop column(s) from the message value (value must support `del ` , like a dict ).
13921420
@@ -1430,7 +1458,7 @@ a new StreamingDataFrame instance
14301458def sink(sink: BaseSink)
14311459```
14321460
1433- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1625 )
1461+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1665 )
14341462
14351463Sink the processed data to the specified destination.
14361464
@@ -1458,7 +1486,7 @@ operations, but branches can still be generated from its originating SDF.
14581486def concat(other: " StreamingDataFrame" ) -> " StreamingDataFrame"
14591487```
14601488
1461- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1663 )
1489+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1703 )
14621490
14631491Concatenate two StreamingDataFrames together and return a new one.
14641492
@@ -1499,7 +1527,7 @@ def join_asof(right: "StreamingDataFrame",
14991527 name: Optional[str ] = None ) -> " StreamingDataFrame"
15001528```
15011529
1502- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1699 )
1530+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1739 )
15031531
15041532Join the left dataframe with the records of the right dataframe with
15051533
@@ -1582,7 +1610,7 @@ def join_interval(
15821610 forward_ms: Union[int , timedelta] = 0 ) -> " StreamingDataFrame"
15831611```
15841612
1585- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1775 )
1613+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1815 )
15861614
15871615Join the left dataframe with records from the right dataframe that fall within
15881616
@@ -1685,7 +1713,7 @@ def join_lookup(
16851713) -> " StreamingDataFrame"
16861714```
16871715
1688- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1880 )
1716+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1920 )
16891717
16901718Note: This is an experimental feature, and its API is likely to change in the future.
16911719
@@ -1746,7 +1774,7 @@ sdf = sdf.join_lookup(lookup, fields)
17461774def register_store(store_type: Optional[StoreTypes] = None ) -> None
17471775```
17481776
1749- [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L1969 )
1777+ [[VIEW SOURCE ]](https:// github.com/ quixio/ quix- streams/ blob/ main/ quixstreams/ dataframe/ dataframe.py# L2009 )
17501778
17511779Register the default store for the current stream_id in StateStoreManager.
17521780
0 commit comments