Skip to content

Commit ebb3319

Browse files
committed
fix(worker): re-impmlement decision events iterator with comments
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent 1123f7b commit ebb3319

File tree

4 files changed

+274
-469
lines changed

4 files changed

+274
-469
lines changed

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 103 additions & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
"""
88

99
from dataclasses import dataclass, field
10-
from typing import List, Optional
10+
from typing import Iterator, List, Optional, Tuple
1111

12+
from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator
1213
from cadence.api.v1.history_pb2 import HistoryEvent
1314
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1415

@@ -19,33 +20,21 @@ class DecisionEvents:
1920
Represents events for a single decision iteration.
2021
"""
2122

22-
events: List[HistoryEvent] = field(default_factory=list)
23-
markers: List[HistoryEvent] = field(default_factory=list)
24-
replay: bool = False
25-
replay_current_time_milliseconds: Optional[int] = None
26-
next_decision_event_id: Optional[int] = None
23+
input: List[HistoryEvent]
24+
output: List[HistoryEvent]
25+
markers: List[HistoryEvent]
26+
replay: bool
27+
replay_current_time_milliseconds: int
28+
next_decision_event_id: int
2729

28-
def get_events(self) -> List[HistoryEvent]:
29-
"""Return all events in this decision iteration."""
30-
return self.events
31-
32-
def get_markers(self) -> List[HistoryEvent]:
33-
"""Return marker events."""
34-
return self.markers
35-
36-
def is_replay(self) -> bool:
37-
"""Check if this decision is in replay mode."""
38-
return self.replay
39-
40-
def get_event_by_id(self, event_id: int) -> Optional[HistoryEvent]:
41-
"""Retrieve a specific event by ID, returns None if not found."""
42-
for event in self.events:
30+
def get_output_event_by_id(self, event_id: int) -> Optional[HistoryEvent]:
31+
for event in self.input:
4332
if hasattr(event, "event_id") and event.event_id == event_id:
4433
return event
4534
return None
4635

4736

48-
class DecisionEventsIterator:
37+
class DecisionEventsIterator(Iterator[DecisionEvents]):
4938
"""
5039
Iterator for processing decision events from workflow history.
5140
@@ -54,207 +43,106 @@ class DecisionEventsIterator:
5443
"""
5544

5645
def __init__(
57-
self, decision_task: PollForDecisionTaskResponse, events: List[HistoryEvent]
46+
self, decision_task: PollForDecisionTaskResponse,
47+
events: List[HistoryEvent],
5848
):
5949
self._decision_task = decision_task
60-
self._events: List[HistoryEvent] = events
61-
self._decision_task_started_event: Optional[HistoryEvent] = None
62-
self._next_decision_event_id = 1
63-
self._replay = True
50+
self._events: HistoryEventsIterator = HistoryEventsIterator(events)
51+
self._next_decision_event_id: Optional[int] = None
6452
self._replay_current_time_milliseconds: Optional[int] = None
6553

66-
self._event_index = 0
67-
# Find first decision task started event
68-
for i, event in enumerate(self._events):
69-
if _is_decision_task_started(event):
70-
self._event_index = i
71-
break
72-
73-
async def has_next_decision_events(self) -> bool:
74-
# Look for the next DecisionTaskStarted event from current position
75-
for i in range(self._event_index, len(self._events)):
76-
if _is_decision_task_started(self._events[i]):
77-
return True
78-
79-
return False
54+
def __iter__(self):
55+
return self
8056

81-
async def next_decision_events(self) -> DecisionEvents:
82-
# Find next DecisionTaskStarted event
83-
start_index = None
84-
for i in range(self._event_index, len(self._events)):
85-
if _is_decision_task_started(self._events[i]):
86-
start_index = i
57+
def __next__(self) -> DecisionEvents:
58+
59+
"""
60+
Process the next decision batch.
61+
1. Find the next valid decision task started event during replay or last scheduled decision task events for non-replay
62+
2. Collect the decision input events before the decision task
63+
3. Collect the decision output events after the decision task
64+
65+
Relay mode is determined by checking if the decision task is completed or not
66+
"""
67+
decision_input_events: List[HistoryEvent] = []
68+
decision_output_events: List[HistoryEvent] = []
69+
decision_event: Optional[HistoryEvent] = None
70+
for event in self._events:
71+
match event.WhichOneof("attributes"):
72+
case "decision_task_started_event_attributes":
73+
next_event = self._events.peek()
74+
75+
# latest event, not replay, assign started event as decision event insteaad
76+
if next_event == None:
77+
decision_event = event
78+
break
79+
80+
match next_event.WhichOneof("attributes"):
81+
case "decision_task_failed_event_attributes" | "decision_task_timed_out_event_attributes":
82+
# skip failed / timed out decision tasks and continue searching
83+
next(self._events)
84+
continue
85+
case "decision_task_completed_event_attributes":
86+
# found decision task completed event, stop
87+
decision_event = next(self._events)
88+
break
89+
case _:
90+
raise ValueError(f"unexpected event type after decision task started event: {next_event}")
91+
92+
case _:
93+
decision_input_events.append(event)
94+
95+
if not decision_event:
96+
raise StopIteration(f"no decision event found")
97+
98+
# collect decision output events
99+
while self._events.has_next():
100+
if not is_decision_event(self._events.peek()):
87101
break
88-
89-
if start_index is None:
90-
raise StopIteration("No more decision events")
91-
92-
decision_events = DecisionEvents()
93-
decision_events.replay = self._replay
94-
decision_events.replay_current_time_milliseconds = (
95-
self._replay_current_time_milliseconds
102+
decision_output_events.append(next(self._events))
103+
104+
replay_current_time_milliseconds = decision_event.event_time.ToMilliseconds()
105+
106+
replay : bool
107+
next_decision_event_id : int
108+
if decision_event.WhichOneof("attributes") == "decision_task_completed_event_attributes": # completed decision task
109+
replay = True
110+
next_decision_event_id = decision_event.event_id + 1
111+
else:
112+
replay = False
113+
next_decision_event_id = decision_event.event_id + 2
114+
115+
# collect marker events
116+
markers = [m for m in decision_output_events if is_marker_event(m)]
117+
118+
return DecisionEvents(
119+
input=decision_input_events,
120+
output=decision_output_events,
121+
markers=markers,
122+
replay=replay,
123+
replay_current_time_milliseconds=replay_current_time_milliseconds,
124+
next_decision_event_id=next_decision_event_id,
96125
)
97-
decision_events.next_decision_event_id = self._next_decision_event_id
98-
99-
# Process DecisionTaskStarted event
100-
decision_task_started = self._events[start_index]
101-
self._decision_task_started_event = decision_task_started
102-
decision_events.events.append(decision_task_started)
103-
104-
# Update replay time if available
105-
if decision_task_started.event_time:
106-
self._replay_current_time_milliseconds = (
107-
decision_task_started.event_time.seconds * 1000
108-
)
109-
decision_events.replay_current_time_milliseconds = (
110-
self._replay_current_time_milliseconds
111-
)
112-
113-
# Process subsequent events until we find the corresponding DecisionTask completion
114-
current_index = start_index + 1
115-
while current_index < len(self._events):
116-
event = self._events[current_index]
117-
decision_events.events.append(event)
118-
119-
# Categorize the event
120-
if _is_marker_recorded(event):
121-
decision_events.markers.append(event)
122-
elif _is_decision_task_completion(event):
123-
# This marks the end of this decision iteration
124-
self._process_decision_completion_event(event, decision_events)
125-
current_index += 1 # Move past this event
126-
break
127-
128-
current_index += 1
129-
130-
# Update the event index for next iteration
131-
self._event_index = current_index
132-
133-
# Update the next decision event ID
134-
if decision_events.events:
135-
last_event = decision_events.events[-1]
136-
if hasattr(last_event, "event_id"):
137-
self._next_decision_event_id = last_event.event_id + 1
138-
139-
# Check if this is the last decision events
140-
# Set replay to false only if there are no more decision events after this one
141-
# Check directly without calling has_next_decision_events to avoid recursion
142-
has_more = False
143-
for i in range(self._event_index, len(self._events)):
144-
if _is_decision_task_started(self._events[i]):
145-
has_more = True
146-
break
147-
148-
if not has_more:
149-
self._replay = False
150-
decision_events.replay = False
151-
152-
return decision_events
153-
154-
def _process_decision_completion_event(
155-
self, event: HistoryEvent, decision_events: DecisionEvents
156-
):
157-
"""Process the decision completion event and update state."""
158126

159-
# Check if we're still in replay mode
160-
# This is determined by comparing event IDs with the current decision task's started event ID
161-
if (
162-
self._decision_task_started_event
163-
and hasattr(self._decision_task_started_event, "event_id")
164-
and hasattr(event, "event_id")
165-
):
166-
# If this completion event ID is >= the current decision task's started event ID,
167-
# we're no longer in replay mode
168-
current_task_started_id = (
169-
getattr(self._decision_task.started_event_id, "value", 0)
170-
if hasattr(self._decision_task, "started_event_id")
171-
else 0
172-
)
173-
174-
if event.event_id >= current_task_started_id:
175-
self._replay = False
176-
decision_events.replay = False
177-
178-
def get_replay_current_time_milliseconds(self) -> Optional[int]:
179-
"""Get the current replay time in milliseconds."""
180-
return self._replay_current_time_milliseconds
181-
182-
def is_replay_mode(self) -> bool:
183-
"""Check if the iterator is currently in replay mode."""
184-
return self._replay
185-
186-
def __aiter__(self):
187-
return self
188-
189-
async def __anext__(self) -> DecisionEvents:
190-
if not await self.has_next_decision_events():
191-
raise StopAsyncIteration
192-
return await self.next_decision_events()
193-
194-
195-
# Utility functions
196127
def is_decision_event(event: HistoryEvent) -> bool:
197-
"""Check if an event is a decision-related event."""
198-
return (
199-
_is_decision_task_started(event)
200-
or _is_decision_task_completed(event)
201-
or _is_decision_task_failed(event)
202-
or _is_decision_task_timed_out(event)
203-
)
204-
128+
"""Check if an event is a decision output event."""
129+
return event != None and event.WhichOneof("attributes") in set([
130+
"activity_task_scheduled_event_attributes",
131+
"start_child_workflow_execution_initiated_event_attributes",
132+
"timer_started_event_attributes",
133+
"workflow_execution_completed_event_attributes",
134+
"workflow_execution_failed_event_attributes",
135+
"workflow_execution_canceled_event_attributes",
136+
"workflow_execution_continued_as_new_event_attributes",
137+
"activity_task_cancel_requested_event_attributes",
138+
"request_cancel_activity_task_failed_event_attributes",
139+
"timer_canceled_event_attributes",
140+
"cancel_timer_failed_event_attributes",
141+
"request_cancel_external_workflow_execution_initiated_event_attributes",
142+
"marker_recorded_event_attributes",
143+
"signal_external_workflow_execution_initiated_event_attributes",
144+
"upsert_workflow_search_attributes_event_attributes",
145+
])
205146

206147
def is_marker_event(event: HistoryEvent) -> bool:
207-
"""Check if an event is a marker event."""
208-
return _is_marker_recorded(event)
209-
210-
211-
def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]:
212-
"""Extract timestamp from an event in milliseconds."""
213-
if hasattr(event, "event_time") and event.HasField("event_time"):
214-
seconds = getattr(event.event_time, "seconds", 0)
215-
return seconds * 1000 if seconds > 0 else None
216-
return None
217-
218-
219-
def _is_decision_task_started(event: HistoryEvent) -> bool:
220-
"""Check if event is DecisionTaskStarted."""
221-
return hasattr(event, "decision_task_started_event_attributes") and event.HasField(
222-
"decision_task_started_event_attributes"
223-
)
224-
225-
226-
def _is_decision_task_completed(event: HistoryEvent) -> bool:
227-
"""Check if event is DecisionTaskCompleted."""
228-
return hasattr(
229-
event, "decision_task_completed_event_attributes"
230-
) and event.HasField("decision_task_completed_event_attributes")
231-
232-
233-
def _is_decision_task_failed(event: HistoryEvent) -> bool:
234-
"""Check if event is DecisionTaskFailed."""
235-
return hasattr(event, "decision_task_failed_event_attributes") and event.HasField(
236-
"decision_task_failed_event_attributes"
237-
)
238-
239-
240-
def _is_decision_task_timed_out(event: HistoryEvent) -> bool:
241-
"""Check if event is DecisionTaskTimedOut."""
242-
return hasattr(
243-
event, "decision_task_timed_out_event_attributes"
244-
) and event.HasField("decision_task_timed_out_event_attributes")
245-
246-
247-
def _is_marker_recorded(event: HistoryEvent) -> bool:
248-
"""Check if event is MarkerRecorded."""
249-
return hasattr(event, "marker_recorded_event_attributes") and event.HasField(
250-
"marker_recorded_event_attributes"
251-
)
252-
253-
254-
def _is_decision_task_completion(event: HistoryEvent) -> bool:
255-
"""Check if event is any kind of decision task completion."""
256-
return (
257-
_is_decision_task_completed(event)
258-
or _is_decision_task_failed(event)
259-
or _is_decision_task_timed_out(event)
260-
)
148+
return event != None and event.WhichOneof("attributes") == "marker_recorded_event_attributes"

cadence/_internal/workflow/history_event_iterator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Iterator, List, Optional
2+
from cadence.api.v1.history_pb2 import HistoryEvent
13
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
24
from cadence.api.v1.service_workflow_pb2 import (
35
GetWorkflowExecutionHistoryRequest,
@@ -32,3 +34,24 @@ async def iterate_history_events(
3234
)
3335
current_page = response.history.events
3436
next_page_token = response.next_page_token
37+
38+
class HistoryEventsIterator(Iterator[HistoryEvent]):
39+
def __init__(self, events: List[HistoryEvent]):
40+
self._iter = iter(events)
41+
self._current = next(self._iter, None)
42+
43+
def __iter__(self):
44+
return self
45+
46+
def __next__(self) -> HistoryEvent:
47+
if not self._current:
48+
raise StopIteration("No more events")
49+
event = self._current
50+
self._current = next(self._iter, None)
51+
return event
52+
53+
def has_next(self) -> bool:
54+
return self._current is not None
55+
56+
def peek(self) -> Optional[HistoryEvent]:
57+
return self._current

cadence/_internal/workflow/workflow_engine.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ async def _process_decision_events(
128128
processed_any_decision_events = False
129129

130130
# Check if there are any decision events to process
131-
while await events_iterator.has_next_decision_events():
132-
decision_events = await events_iterator.next_decision_events()
131+
for decision_events in events_iterator:
133132
processed_any_decision_events = True
134133

135134
# Log decision events batch processing (matches Go client patterns)

0 commit comments

Comments
 (0)