@@ -66,73 +66,56 @@ def process_window(
6666 latest_timestamp = max (timestamp_ms , state_ts )
6767
6868 # Calculate session expiry threshold
69- session_expiry_threshold = latest_timestamp - grace_ms
69+ expiry_threshold = latest_timestamp - grace_ms
7070
7171 # Check if the event is too late
72- if timestamp_ms < session_expiry_threshold :
73- late_by_ms = session_expiry_threshold - timestamp_ms
72+ if timestamp_ms < expiry_threshold :
7473 self ._on_expired_window (
7574 value = value ,
7675 key = key ,
7776 start = timestamp_ms ,
78- end = timestamp_ms , # End time is the timestamp of the last event
77+ end = timestamp_ms ,
7978 timestamp_ms = timestamp_ms ,
80- late_by_ms = late_by_ms ,
79+ late_by_ms = expiry_threshold - timestamp_ms ,
8180 )
8281 return [], []
8382
84- # Look for an existing session that can be extended
85- can_extend_session = False
86- existing_aggregated = None
87- old_window_to_delete = None
88-
8983 # Search for active sessions that can accommodate the new event
90- search_start = max (0 , timestamp_ms - timeout_ms * 2 )
91- windows = state .get_windows (
92- search_start , timestamp_ms + timeout_ms + 1 , backwards = True
93- )
94-
95- for (window_start , window_end ), aggregated_value , _ in windows :
84+ for (window_start , window_end ), aggregated_value , _ in state .get_windows (
85+ start_from_ms = max (0 , timestamp_ms - timeout_ms * 2 ),
86+ start_to_ms = timestamp_ms + timeout_ms + 1 ,
87+ backwards = True ,
88+ ):
9689 # Calculate the time gap between the new event and the session's last activity
9790 # window_end now directly represents the timestamp of the last event
98- session_last_activity = window_end
99- time_gap = timestamp_ms - session_last_activity
91+ time_gap = timestamp_ms - window_end
10092
10193 # Check if this session can be extended
10294 if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start :
95+ extend_session = True
10396 session_start = window_start
104- # Only update end time if the new event is newer than the current end time
97+ # Only update end time if the new event is greater than the current end time
10598 session_end = max (window_end , timestamp_ms )
106- can_extend_session = True
10799 existing_aggregated = aggregated_value
108- old_window_to_delete = (window_start , window_end )
100+ # Delete the old window if extending an existing session
101+ state .delete_window (window_start , window_end )
109102 break
110-
111- # If no extendable session found, start a new one
112- if not can_extend_session :
113- session_start = timestamp_ms
114- session_end = timestamp_ms # End time is the timestamp of the last event
103+ else :
104+ # If no extendable session found, start a new one
105+ extend_session = False
106+ session_start = session_end = timestamp_ms
115107
116108 # Process the event for this session
117109 updated_windows : list [WindowKeyResult ] = []
118110
119- # Delete the old window if extending an existing session
120- if can_extend_session and old_window_to_delete :
121- old_start , old_end = old_window_to_delete
122- state .delete_window (old_start , old_end )
123-
124111 # Add to collection if needed
125112 if collect :
126- state .add_to_collection (
127- value = self ._collect_value (value ),
128- id = timestamp_ms ,
129- )
113+ state .add_to_collection (value = self ._collect_value (value ), id = timestamp_ms )
130114
131115 # Update the session window aggregation
132- aggregated = None
133116 if aggregate :
134117 current_value = (
135- existing_aggregated if can_extend_session else self ._initialize_value ()
118+ existing_aggregated if extend_session else self ._initialize_value ()
136119 )
137120
138121 aggregated = self ._aggregate_value (current_value , value , timestamp_ms )
@@ -142,6 +125,8 @@ def process_window(
142125 self ._results (aggregated , [], session_start , session_end ),
143126 )
144127 )
128+ else :
129+ aggregated = None
145130
146131 state .update_window (
147132 session_start , session_end , value = aggregated , timestamp_ms = timestamp_ms
@@ -150,12 +135,10 @@ def process_window(
150135 # Expire old sessions
151136 if self ._closing_strategy == ClosingStrategy .PARTITION :
152137 expired_windows = self .expire_by_partition (
153- transaction , session_expiry_threshold , collect
138+ transaction , expiry_threshold , collect
154139 )
155140 else :
156- expired_windows = self .expire_by_key (
157- key , state , session_expiry_threshold , collect
158- )
141+ expired_windows = self .expire_by_key (key , state , expiry_threshold , collect )
159142
160143 return updated_windows , expired_windows
161144
0 commit comments