@@ -883,3 +883,54 @@ def test_session_window_string_key_extension(
883883 assert updated [0 ][1 ]["start" ] == 30000
884884 assert updated [0 ][1 ]["end" ] == 30000 # timestamp of last event
885885 assert updated [0 ][1 ]["value" ] == 25
886+
887+ def test_out_of_order_events_end_time (
888+ self , session_window_definition_factory , state_manager
889+ ):
890+ """Test that out-of-order events correctly maintain the latest timestamp as end time"""
891+ window_def = session_window_definition_factory (
892+ inactivity_gap_ms = 10000 , grace_ms = 5000
893+ )
894+ window = window_def .sum ()
895+ window .final (closing_strategy = "key" )
896+
897+ store = state_manager .get_store (stream_id = "test" , store_name = window .name )
898+ store .assign_partition (0 )
899+
900+ with store .start_partition_transaction (0 ) as tx :
901+ key = b"key"
902+
903+ # 1. Start session with event at timestamp 1000
904+ updated , expired = process (
905+ window , value = 1 , key = key , transaction = tx , timestamp_ms = 1000
906+ )
907+ assert updated [0 ][1 ]["start" ] == 1000
908+ assert updated [0 ][1 ]["end" ] == 1000 # End should be 1000
909+ assert updated [0 ][1 ]["value" ] == 1
910+
911+ # 2. Add event at timestamp 8000 (in order)
912+ updated , expired = process (
913+ window , value = 2 , key = key , transaction = tx , timestamp_ms = 8000
914+ )
915+ assert updated [0 ][1 ]["start" ] == 1000
916+ assert updated [0 ][1 ]["end" ] == 8000 # End should be 8000 (latest event)
917+ assert updated [0 ][1 ]["value" ] == 3
918+
919+ # 3. Add OUT-OF-ORDER event at timestamp 3000 (before 8000)
920+ # This should be accepted (within grace period) but should NOT change the end time
921+ updated , expired = process (
922+ window , value = 10 , key = key , transaction = tx , timestamp_ms = 3000
923+ )
924+ assert updated [0 ][1 ]["start" ] == 1000
925+ # KEY TEST: End time should remain 8000, not become 3000!
926+ assert updated [0 ][1 ]["end" ] == 8000
927+ assert updated [0 ][1 ]["value" ] == 13
928+
929+ # 4. Add event NEWER than current end (timestamp 9000)
930+ updated , expired = process (
931+ window , value = 4 , key = key , transaction = tx , timestamp_ms = 9000
932+ )
933+ assert updated [0 ][1 ]["start" ] == 1000
934+ # NOW the end time should update to 9000
935+ assert updated [0 ][1 ]["end" ] == 9000
936+ assert updated [0 ][1 ]["value" ] == 17
0 commit comments