@@ -91,7 +91,7 @@ def test_multiaggregation(
9191 key ,
9292 {
9393 "start" : 1000 ,
94- "end" : 11000 , # 1000 + 10000 timeout
94+ "end" : 1000 , # timestamp of last event
9595 "count" : 1 ,
9696 "sum" : 1 ,
9797 "mean" : 1.0 ,
@@ -112,7 +112,7 @@ def test_multiaggregation(
112112 key ,
113113 {
114114 "start" : 1000 ,
115- "end" : 15000 , # 5000 + 10000 timeout
115+ "end" : 5000 , # timestamp of last event
116116 "count" : 2 ,
117117 "sum" : 5 ,
118118 "mean" : 2.5 ,
@@ -132,7 +132,7 @@ def test_multiaggregation(
132132 key ,
133133 {
134134 "start" : 1000 ,
135- "end" : 15000 ,
135+ "end" : 5000 , # timestamp of last event
136136 "count" : 2 ,
137137 "sum" : 5 ,
138138 "mean" : 2.5 ,
@@ -147,7 +147,7 @@ def test_multiaggregation(
147147 key ,
148148 {
149149 "start" : 26000 ,
150- "end" : 36000 , # 26000 + 10000 timeout
150+ "end" : 26000 , # timestamp of last event
151151 "count" : 1 ,
152152 "sum" : 2 ,
153153 "mean" : 2.0 ,
@@ -182,7 +182,7 @@ def test_sessionwindow_count(
182182 assert len (updated ) == 1
183183 assert updated [0 ][1 ]["value" ] == 2
184184 assert updated [0 ][1 ]["start" ] == 1000
185- assert updated [0 ][1 ]["end" ] == 15000 # 5000 + 10000
185+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
186186 assert not expired
187187
188188 @pytest .mark .parametrize ("expiration" , ("key" , "partition" ))
@@ -207,7 +207,7 @@ def test_sessionwindow_sum(
207207 assert len (updated ) == 1
208208 assert updated [0 ][1 ]["value" ] == 5
209209 assert updated [0 ][1 ]["start" ] == 1000
210- assert updated [0 ][1 ]["end" ] == 15000
210+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
211211 assert not expired
212212
213213 @pytest .mark .parametrize ("expiration" , ("key" , "partition" ))
@@ -232,7 +232,7 @@ def test_sessionwindow_mean(
232232 assert len (updated ) == 1
233233 assert updated [0 ][1 ]["value" ] == 3.0
234234 assert updated [0 ][1 ]["start" ] == 1000
235- assert updated [0 ][1 ]["end" ] == 15000
235+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
236236 assert not expired
237237
238238 @pytest .mark .parametrize ("expiration" , ("key" , "partition" ))
@@ -260,7 +260,7 @@ def test_sessionwindow_reduce(
260260 assert len (updated ) == 1
261261 assert updated [0 ][1 ]["value" ] == [2 , 3 ]
262262 assert updated [0 ][1 ]["start" ] == 1000
263- assert updated [0 ][1 ]["end" ] == 15000
263+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
264264 assert not expired
265265
266266 @pytest .mark .parametrize ("expiration" , ("key" , "partition" ))
@@ -285,7 +285,7 @@ def test_sessionwindow_max(
285285 assert len (updated ) == 1
286286 assert updated [0 ][1 ]["value" ] == 5
287287 assert updated [0 ][1 ]["start" ] == 1000
288- assert updated [0 ][1 ]["end" ] == 15000
288+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
289289 assert not expired
290290
291291 @pytest .mark .parametrize ("expiration" , ("key" , "partition" ))
@@ -310,7 +310,7 @@ def test_sessionwindow_min(
310310 assert len (updated ) == 1
311311 assert updated [0 ][1 ]["value" ] == 2
312312 assert updated [0 ][1 ]["start" ] == 1000
313- assert updated [0 ][1 ]["end" ] == 15000
313+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
314314 assert not expired
315315
316316 @pytest .mark .parametrize ("expiration" , ("key" , "partition" ))
@@ -336,7 +336,7 @@ def test_sessionwindow_collect(
336336 window , value = 4 , key = key , transaction = tx , timestamp_ms = 25000
337337 )
338338 assert not updated
339- assert expired == [(key , {"start" : 1000 , "end" : 18000 , "value" : [1 , 2 , 3 ]})]
339+ assert expired == [(key , {"start" : 1000 , "end" : 8000 , "value" : [1 , 2 , 3 ]})]
340340
341341 @pytest .mark .parametrize (
342342 "timeout, grace, name" ,
@@ -391,7 +391,7 @@ def test_session_window_process_timeout_behavior(
391391 assert len (updated ) == 1
392392 assert updated [0 ][1 ]["value" ] == 1
393393 assert updated [0 ][1 ]["start" ] == 1000
394- assert updated [0 ][1 ]["end" ] == 6000 # 1000 + 5000
394+ assert updated [0 ][1 ]["end" ] == 1000 # timestamp of last event
395395 assert not expired
396396
397397 # Add to session 1 (within timeout)
@@ -401,7 +401,7 @@ def test_session_window_process_timeout_behavior(
401401 assert len (updated ) == 1
402402 assert updated [0 ][1 ]["value" ] == 3
403403 assert updated [0 ][1 ]["start" ] == 1000
404- assert updated [0 ][1 ]["end" ] == 9000 # 4000 + 5000
404+ assert updated [0 ][1 ]["end" ] == 4000 # timestamp of last event
405405 assert not expired
406406
407407 # Start session 2 (outside timeout) - should expire session 1
@@ -411,12 +411,12 @@ def test_session_window_process_timeout_behavior(
411411 assert len (updated ) == 1
412412 assert updated [0 ][1 ]["value" ] == 5
413413 assert updated [0 ][1 ]["start" ] == 15000
414- assert updated [0 ][1 ]["end" ] == 20000 # 15000 + 5000
414+ assert updated [0 ][1 ]["end" ] == 15000 # timestamp of last event
415415
416416 assert len (expired ) == 1
417417 assert expired [0 ][1 ]["value" ] == 3
418418 assert expired [0 ][1 ]["start" ] == 1000
419- assert expired [0 ][1 ]["end" ] == 9000
419+ assert expired [0 ][1 ]["end" ] == 4000 # timestamp of last event
420420
421421 def test_session_window_grace_period (
422422 self , session_window_definition_factory , state_manager
@@ -670,7 +670,7 @@ def test_session_window_merge_sessions(
670670 )
671671 assert len (updated ) == 1
672672 assert updated [0 ][1 ]["start" ] == 1000
673- assert updated [0 ][1 ]["end" ] == 11000 # 1000 + 10000
673+ assert updated [0 ][1 ]["end" ] == 1000 # timestamp of last event
674674 assert updated [0 ][1 ]["value" ] == 1
675675 assert not expired
676676
@@ -683,12 +683,12 @@ def test_session_window_merge_sessions(
683683 # First session should now be expired
684684 assert len (expired ) == 1
685685 assert expired [0 ][1 ]["start" ] == 1000
686- assert expired [0 ][1 ]["end" ] == 11000
686+ assert expired [0 ][1 ]["end" ] == 1000 # timestamp of last event
687687 assert expired [0 ][1 ]["value" ] == 1
688688
689689 assert len (updated ) == 1
690690 assert updated [0 ][1 ]["start" ] == 20000
691- assert updated [0 ][1 ]["end" ] == 30000 # 20000 + 10000
691+ assert updated [0 ][1 ]["end" ] == 20000 # timestamp of last event
692692 assert updated [0 ][1 ]["value" ] == 10
693693
694694 # Add another event to the second session
@@ -697,7 +697,7 @@ def test_session_window_merge_sessions(
697697 )
698698 assert len (updated ) == 1
699699 assert updated [0 ][1 ]["start" ] == 20000
700- assert updated [0 ][1 ]["end" ] == 35000 # 25000 + 10000
700+ assert updated [0 ][1 ]["end" ] == 25000 # timestamp of last event
701701 assert updated [0 ][1 ]["value" ] == 15 # 10 + 5
702702 assert not expired
703703
@@ -710,13 +710,13 @@ def test_session_window_merge_sessions(
710710 # Second session should be expired
711711 assert len (expired ) == 1
712712 assert expired [0 ][1 ]["start" ] == 20000
713- assert expired [0 ][1 ]["end" ] == 35000
713+ assert expired [0 ][1 ]["end" ] == 25000 # timestamp of last event
714714 assert expired [0 ][1 ]["value" ] == 15
715715
716716 # Third session starts
717717 assert len (updated ) == 1
718718 assert updated [0 ][1 ]["start" ] == 50000
719- assert updated [0 ][1 ]["end" ] == 60000 # 50000 + 10000
719+ assert updated [0 ][1 ]["end" ] == 50000 # timestamp of last event
720720 assert updated [0 ][1 ]["value" ] == 100
721721
722722 def test_session_window_bridging_event_scenario (
@@ -755,7 +755,7 @@ def test_session_window_bridging_event_scenario(
755755 )
756756 assert len (updated ) == 1
757757 assert updated [0 ][1 ]["start" ] == 1000
758- assert updated [0 ][1 ]["end" ] == 11000 # 1000 + 10000
758+ assert updated [0 ][1 ]["end" ] == 1000 # timestamp of last event
759759 assert updated [0 ][1 ]["value" ] == 5
760760 assert not expired
761761
@@ -770,7 +770,7 @@ def test_session_window_bridging_event_scenario(
770770 # Event at 12000 is before 13000, so it should extend Session A
771771 assert len (updated ) == 1
772772 assert updated [0 ][1 ]["start" ] == 1000 # Session A extended
773- assert updated [0 ][1 ]["end" ] == 22000 # 12000 + 10000
773+ assert updated [0 ][1 ]["end" ] == 12000 # timestamp of last event
774774 assert updated [0 ][1 ]["value" ] == 15 # 5 + 10
775775 assert not expired
776776
@@ -781,7 +781,7 @@ def test_session_window_bridging_event_scenario(
781781 # This should extend the already extended Session A further
782782 assert len (updated ) == 1
783783 assert updated [0 ][1 ]["start" ] == 1000 # Still Session A
784- assert updated [0 ][1 ]["end" ] == 25000 # 15000 + 10000
784+ assert updated [0 ][1 ]["end" ] == 15000 # timestamp of last event
785785 assert updated [0 ][1 ]["value" ] == 35 # 5 + 10 + 20
786786 assert not expired
787787
@@ -791,7 +791,7 @@ def test_session_window_bridging_event_scenario(
791791 )
792792 assert len (expired ) == 1
793793 assert expired [0 ][1 ]["start" ] == 1000
794- assert expired [0 ][1 ]["end" ] == 25000
794+ assert expired [0 ][1 ]["end" ] == 15000 # timestamp of last event
795795 assert expired [0 ][1 ]["value" ] == 35 # All events combined
796796
797797 assert len (updated ) == 1
@@ -828,7 +828,7 @@ def test_session_window_string_key_extension(
828828 )
829829 assert len (updated ) == 1
830830 assert updated [0 ][1 ]["start" ] == 1000
831- assert updated [0 ][1 ]["end" ] == 11000 # 1000 + 10000
831+ assert updated [0 ][1 ]["end" ] == 1000 # timestamp of last event
832832 assert updated [0 ][1 ]["value" ] == 100
833833 assert not expired
834834
@@ -839,7 +839,7 @@ def test_session_window_string_key_extension(
839839 )
840840 assert len (updated ) == 1
841841 assert updated [0 ][1 ]["start" ] == 1000 # Session extended, same start
842- assert updated [0 ][1 ]["end" ] == 15000 # 5000 + 10000 (new end time)
842+ assert updated [0 ][1 ]["end" ] == 5000 # timestamp of last event
843843 assert updated [0 ][1 ]["value" ] == 300 # 100 + 200
844844 assert not expired
845845
@@ -849,7 +849,7 @@ def test_session_window_string_key_extension(
849849 )
850850 assert len (updated ) == 1
851851 assert updated [0 ][1 ]["start" ] == 1000 # Session extended again
852- assert updated [0 ][1 ]["end" ] == 18000 # 8000 + 10000
852+ assert updated [0 ][1 ]["end" ] == 8000 # timestamp of last event
853853 assert updated [0 ][1 ]["value" ] == 350 # 100 + 200 + 50
854854 assert not expired
855855
@@ -861,7 +861,7 @@ def test_session_window_string_key_extension(
861861 assert len (updated ) == 1
862862 assert updated [0 ][0 ] == key2 # Different key
863863 assert updated [0 ][1 ]["start" ] == 9000
864- assert updated [0 ][1 ]["end" ] == 19000 # 9000 + 10000
864+ assert updated [0 ][1 ]["end" ] == 9000 # timestamp of last event
865865 assert updated [0 ][1 ]["value" ] == 75
866866 assert not expired
867867
@@ -874,12 +874,12 @@ def test_session_window_string_key_extension(
874874 assert len (expired ) == 1
875875 assert expired [0 ][0 ] == key
876876 assert expired [0 ][1 ]["start" ] == 1000
877- assert expired [0 ][1 ]["end" ] == 18000
877+ assert expired [0 ][1 ]["end" ] == 8000 # timestamp of last event
878878 assert expired [0 ][1 ]["value" ] == 350
879879
880880 # Should have started a new session for the first key
881881 assert len (updated ) == 1
882882 assert updated [0 ][0 ] == key
883883 assert updated [0 ][1 ]["start" ] == 30000
884- assert updated [0 ][1 ]["end" ] == 40000
884+ assert updated [0 ][1 ]["end" ] == 30000 # timestamp of last event
885885 assert updated [0 ][1 ]["value" ] == 25
0 commit comments