@@ -94,14 +94,14 @@ def process_window(
9494
9595 for (window_start , window_end ), aggregated_value , _ in windows :
9696 # Calculate the time gap between the new event and the session's last activity
97- # window_end is stored as last_event_timestamp + 1, so subtract 1 to get actual last event time
98- session_last_activity = window_end - 1
97+ # window_end now directly represents the timestamp of the last event
98+ session_last_activity = window_end
9999 time_gap = timestamp_ms - session_last_activity
100100
101101 # Check if this session can be extended
102102 if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start :
103103 session_start = window_start
104- session_end = timestamp_ms + 1
104+ session_end = timestamp_ms
105105 can_extend_session = True
106106 existing_aggregated = aggregated_value
107107 old_window_to_delete = (window_start , window_end )
@@ -110,7 +110,7 @@ def process_window(
110110 # If no extendable session found, start a new one
111111 if not can_extend_session :
112112 session_start = timestamp_ms
113- session_end = timestamp_ms + 1
113+ session_end = timestamp_ms # End time is the timestamp of the last event
114114
115115 # Process the event for this session
116116 updated_windows : list [WindowKeyResult ] = []
@@ -138,7 +138,7 @@ def process_window(
138138 updated_windows .append (
139139 (
140140 key ,
141- self ._results (aggregated , [], session_start , session_end - 1 ),
141+ self ._results (aggregated , [], session_start , session_end ),
142142 )
143143 )
144144
@@ -221,18 +221,18 @@ def expire_by_key(
221221 windows_to_delete = []
222222 for (window_start , window_end ), aggregated , _ in all_windows :
223223 # Session expires when the session end time + timeout has passed the expiry threshold
224- # window_end is stored as last_event_timestamp + 1, so we subtract 1 and add timeout_ms
225- last_event_timestamp = window_end - 1
226- if last_event_timestamp + self ._timeout_ms <= expiry_threshold :
224+ # window_end directly represents the timestamp of the last event
225+ if window_end + self ._timeout_ms <= expiry_threshold :
227226 collected = []
228227 if collect :
229- collected = state .get_from_collection (window_start , window_end )
228+ # window_end is now the timestamp of the last event, so we need +1 to include it
229+ collected = state .get_from_collection (window_start , window_end + 1 )
230230
231231 windows_to_delete .append ((window_start , window_end ))
232232 count += 1
233233 yield (
234234 key ,
235- self ._results (aggregated , collected , window_start , window_end - 1 ),
235+ self ._results (aggregated , collected , window_start , window_end ),
236236 )
237237
238238 # Clean up expired windows
0 commit comments