Skip to content

Commit fa66474

Browse files
authored
fix(worker): re-impmlement decision events iterator with comments (#48)
<!-- Describe what has changed in this PR --> **What changed?** * re-implementing the batching logic <!-- Tell your future self why have you made these changes --> **Why?** current implementation has problems and is based off java SDK with confusing naming. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit Test that matches java SDK cadence-workflow/cadence-java-client#1035 <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent 1123f7b commit fa66474

File tree

4 files changed

+288
-465
lines changed

4 files changed

+288
-465
lines changed

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 114 additions & 210 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@
66
particularly focusing on decision-related events for replay and execution.
77
"""
88

9-
from dataclasses import dataclass, field
10-
from typing import List, Optional
9+
from dataclasses import dataclass
10+
from typing import Iterator, List, Optional
1111

12+
from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator
1213
from cadence.api.v1.history_pb2 import HistoryEvent
1314
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1415

@@ -19,33 +20,21 @@ class DecisionEvents:
1920
Represents events for a single decision iteration.
2021
"""
2122

22-
events: List[HistoryEvent] = field(default_factory=list)
23-
markers: List[HistoryEvent] = field(default_factory=list)
24-
replay: bool = False
25-
replay_current_time_milliseconds: Optional[int] = None
26-
next_decision_event_id: Optional[int] = None
23+
input: List[HistoryEvent]
24+
output: List[HistoryEvent]
25+
markers: List[HistoryEvent]
26+
replay: bool
27+
replay_current_time_milliseconds: int
28+
next_decision_event_id: int
2729

28-
def get_events(self) -> List[HistoryEvent]:
29-
"""Return all events in this decision iteration."""
30-
return self.events
31-
32-
def get_markers(self) -> List[HistoryEvent]:
33-
"""Return marker events."""
34-
return self.markers
35-
36-
def is_replay(self) -> bool:
37-
"""Check if this decision is in replay mode."""
38-
return self.replay
39-
40-
def get_event_by_id(self, event_id: int) -> Optional[HistoryEvent]:
41-
"""Retrieve a specific event by ID, returns None if not found."""
42-
for event in self.events:
30+
def get_output_event_by_id(self, event_id: int) -> Optional[HistoryEvent]:
31+
for event in self.input:
4332
if hasattr(event, "event_id") and event.event_id == event_id:
4433
return event
4534
return None
4635

4736

48-
class DecisionEventsIterator:
37+
class DecisionEventsIterator(Iterator[DecisionEvents]):
4938
"""
5039
Iterator for processing decision events from workflow history.
5140
@@ -54,207 +43,122 @@ class DecisionEventsIterator:
5443
"""
5544

5645
def __init__(
57-
self, decision_task: PollForDecisionTaskResponse, events: List[HistoryEvent]
46+
self,
47+
decision_task: PollForDecisionTaskResponse,
48+
events: List[HistoryEvent],
5849
):
5950
self._decision_task = decision_task
60-
self._events: List[HistoryEvent] = events
61-
self._decision_task_started_event: Optional[HistoryEvent] = None
62-
self._next_decision_event_id = 1
63-
self._replay = True
51+
self._events: HistoryEventsIterator = HistoryEventsIterator(events)
52+
self._next_decision_event_id: Optional[int] = None
6453
self._replay_current_time_milliseconds: Optional[int] = None
6554

66-
self._event_index = 0
67-
# Find first decision task started event
68-
for i, event in enumerate(self._events):
69-
if _is_decision_task_started(event):
70-
self._event_index = i
71-
break
72-
73-
async def has_next_decision_events(self) -> bool:
74-
# Look for the next DecisionTaskStarted event from current position
75-
for i in range(self._event_index, len(self._events)):
76-
if _is_decision_task_started(self._events[i]):
77-
return True
78-
79-
return False
80-
81-
async def next_decision_events(self) -> DecisionEvents:
82-
# Find next DecisionTaskStarted event
83-
start_index = None
84-
for i in range(self._event_index, len(self._events)):
85-
if _is_decision_task_started(self._events[i]):
86-
start_index = i
87-
break
88-
89-
if start_index is None:
90-
raise StopIteration("No more decision events")
91-
92-
decision_events = DecisionEvents()
93-
decision_events.replay = self._replay
94-
decision_events.replay_current_time_milliseconds = (
95-
self._replay_current_time_milliseconds
96-
)
97-
decision_events.next_decision_event_id = self._next_decision_event_id
98-
99-
# Process DecisionTaskStarted event
100-
decision_task_started = self._events[start_index]
101-
self._decision_task_started_event = decision_task_started
102-
decision_events.events.append(decision_task_started)
103-
104-
# Update replay time if available
105-
if decision_task_started.event_time:
106-
self._replay_current_time_milliseconds = (
107-
decision_task_started.event_time.seconds * 1000
108-
)
109-
decision_events.replay_current_time_milliseconds = (
110-
self._replay_current_time_milliseconds
111-
)
112-
113-
# Process subsequent events until we find the corresponding DecisionTask completion
114-
current_index = start_index + 1
115-
while current_index < len(self._events):
116-
event = self._events[current_index]
117-
decision_events.events.append(event)
118-
119-
# Categorize the event
120-
if _is_marker_recorded(event):
121-
decision_events.markers.append(event)
122-
elif _is_decision_task_completion(event):
123-
# This marks the end of this decision iteration
124-
self._process_decision_completion_event(event, decision_events)
125-
current_index += 1 # Move past this event
126-
break
127-
128-
current_index += 1
129-
130-
# Update the event index for next iteration
131-
self._event_index = current_index
132-
133-
# Update the next decision event ID
134-
if decision_events.events:
135-
last_event = decision_events.events[-1]
136-
if hasattr(last_event, "event_id"):
137-
self._next_decision_event_id = last_event.event_id + 1
55+
def __iter__(self):
56+
return self
13857

139-
# Check if this is the last decision events
140-
# Set replay to false only if there are no more decision events after this one
141-
# Check directly without calling has_next_decision_events to avoid recursion
142-
has_more = False
143-
for i in range(self._event_index, len(self._events)):
144-
if _is_decision_task_started(self._events[i]):
145-
has_more = True
58+
def __next__(self) -> DecisionEvents:
59+
"""
60+
Process the next decision batch.
61+
1. Find the next valid decision task started event during replay or last scheduled decision task events for non-replay
62+
2. Collect the decision input events before the decision task
63+
3. Collect the decision output events after the decision task
64+
65+
Relay mode is determined by checking if the decision task is completed or not
66+
"""
67+
decision_input_events: List[HistoryEvent] = []
68+
decision_output_events: List[HistoryEvent] = []
69+
decision_event: Optional[HistoryEvent] = None
70+
for event in self._events:
71+
match event.WhichOneof("attributes"):
72+
case "decision_task_started_event_attributes":
73+
next_event = self._events.peek()
74+
75+
# latest event, not replay, assign started event as decision event insteaad
76+
if next_event is None:
77+
decision_event = event
78+
break
79+
80+
match next_event.WhichOneof("attributes"):
81+
case (
82+
"decision_task_failed_event_attributes"
83+
| "decision_task_timed_out_event_attributes"
84+
):
85+
# skip failed / timed out decision tasks and continue searching
86+
next(self._events)
87+
continue
88+
case "decision_task_completed_event_attributes":
89+
# found decision task completed event, stop
90+
decision_event = next(self._events)
91+
break
92+
case _:
93+
raise ValueError(
94+
f"unexpected event type after decision task started event: {next_event}"
95+
)
96+
97+
case _:
98+
decision_input_events.append(event)
99+
100+
if not decision_event:
101+
raise StopIteration("no decision event found")
102+
103+
# collect decision output events
104+
while self._events.has_next():
105+
nxt = self._events.peek() if self._events.has_next() else None
106+
if nxt and not is_decision_event(nxt):
146107
break
108+
decision_output_events.append(next(self._events))
147109

148-
if not has_more:
149-
self._replay = False
150-
decision_events.replay = False
151-
152-
return decision_events
110+
replay_current_time_milliseconds = decision_event.event_time.ToMilliseconds()
153111

154-
def _process_decision_completion_event(
155-
self, event: HistoryEvent, decision_events: DecisionEvents
156-
):
157-
"""Process the decision completion event and update state."""
158-
159-
# Check if we're still in replay mode
160-
# This is determined by comparing event IDs with the current decision task's started event ID
112+
replay: bool
113+
next_decision_event_id: int
161114
if (
162-
self._decision_task_started_event
163-
and hasattr(self._decision_task_started_event, "event_id")
164-
and hasattr(event, "event_id")
165-
):
166-
# If this completion event ID is >= the current decision task's started event ID,
167-
# we're no longer in replay mode
168-
current_task_started_id = (
169-
getattr(self._decision_task.started_event_id, "value", 0)
170-
if hasattr(self._decision_task, "started_event_id")
171-
else 0
172-
)
173-
174-
if event.event_id >= current_task_started_id:
175-
self._replay = False
176-
decision_events.replay = False
177-
178-
def get_replay_current_time_milliseconds(self) -> Optional[int]:
179-
"""Get the current replay time in milliseconds."""
180-
return self._replay_current_time_milliseconds
181-
182-
def is_replay_mode(self) -> bool:
183-
"""Check if the iterator is currently in replay mode."""
184-
return self._replay
185-
186-
def __aiter__(self):
187-
return self
188-
189-
async def __anext__(self) -> DecisionEvents:
190-
if not await self.has_next_decision_events():
191-
raise StopAsyncIteration
192-
return await self.next_decision_events()
115+
decision_event.WhichOneof("attributes")
116+
== "decision_task_completed_event_attributes"
117+
): # completed decision task
118+
replay = True
119+
next_decision_event_id = decision_event.event_id + 1
120+
else:
121+
replay = False
122+
next_decision_event_id = decision_event.event_id + 2
123+
124+
# collect marker events
125+
markers = [m for m in decision_output_events if is_marker_event(m)]
126+
127+
return DecisionEvents(
128+
input=decision_input_events,
129+
output=decision_output_events,
130+
markers=markers,
131+
replay=replay,
132+
replay_current_time_milliseconds=replay_current_time_milliseconds,
133+
next_decision_event_id=next_decision_event_id,
134+
)
193135

194136

195-
# Utility functions
196137
def is_decision_event(event: HistoryEvent) -> bool:
197-
"""Check if an event is a decision-related event."""
198-
return (
199-
_is_decision_task_started(event)
200-
or _is_decision_task_completed(event)
201-
or _is_decision_task_failed(event)
202-
or _is_decision_task_timed_out(event)
138+
"""Check if an event is a decision output event."""
139+
return event is not None and event.WhichOneof("attributes") in set(
140+
[
141+
"activity_task_scheduled_event_attributes",
142+
"start_child_workflow_execution_initiated_event_attributes",
143+
"timer_started_event_attributes",
144+
"workflow_execution_completed_event_attributes",
145+
"workflow_execution_failed_event_attributes",
146+
"workflow_execution_canceled_event_attributes",
147+
"workflow_execution_continued_as_new_event_attributes",
148+
"activity_task_cancel_requested_event_attributes",
149+
"request_cancel_activity_task_failed_event_attributes",
150+
"timer_canceled_event_attributes",
151+
"cancel_timer_failed_event_attributes",
152+
"request_cancel_external_workflow_execution_initiated_event_attributes",
153+
"marker_recorded_event_attributes",
154+
"signal_external_workflow_execution_initiated_event_attributes",
155+
"upsert_workflow_search_attributes_event_attributes",
156+
]
203157
)
204158

205159

206160
def is_marker_event(event: HistoryEvent) -> bool:
207-
"""Check if an event is a marker event."""
208-
return _is_marker_recorded(event)
209-
210-
211-
def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]:
212-
"""Extract timestamp from an event in milliseconds."""
213-
if hasattr(event, "event_time") and event.HasField("event_time"):
214-
seconds = getattr(event.event_time, "seconds", 0)
215-
return seconds * 1000 if seconds > 0 else None
216-
return None
217-
218-
219-
def _is_decision_task_started(event: HistoryEvent) -> bool:
220-
"""Check if event is DecisionTaskStarted."""
221-
return hasattr(event, "decision_task_started_event_attributes") and event.HasField(
222-
"decision_task_started_event_attributes"
223-
)
224-
225-
226-
def _is_decision_task_completed(event: HistoryEvent) -> bool:
227-
"""Check if event is DecisionTaskCompleted."""
228-
return hasattr(
229-
event, "decision_task_completed_event_attributes"
230-
) and event.HasField("decision_task_completed_event_attributes")
231-
232-
233-
def _is_decision_task_failed(event: HistoryEvent) -> bool:
234-
"""Check if event is DecisionTaskFailed."""
235-
return hasattr(event, "decision_task_failed_event_attributes") and event.HasField(
236-
"decision_task_failed_event_attributes"
237-
)
238-
239-
240-
def _is_decision_task_timed_out(event: HistoryEvent) -> bool:
241-
"""Check if event is DecisionTaskTimedOut."""
242-
return hasattr(
243-
event, "decision_task_timed_out_event_attributes"
244-
) and event.HasField("decision_task_timed_out_event_attributes")
245-
246-
247-
def _is_marker_recorded(event: HistoryEvent) -> bool:
248-
"""Check if event is MarkerRecorded."""
249-
return hasattr(event, "marker_recorded_event_attributes") and event.HasField(
250-
"marker_recorded_event_attributes"
251-
)
252-
253-
254-
def _is_decision_task_completion(event: HistoryEvent) -> bool:
255-
"""Check if event is any kind of decision task completion."""
256-
return (
257-
_is_decision_task_completed(event)
258-
or _is_decision_task_failed(event)
259-
or _is_decision_task_timed_out(event)
161+
return bool(
162+
event is not None
163+
and event.WhichOneof("attributes") == "marker_recorded_event_attributes"
260164
)

cadence/_internal/workflow/history_event_iterator.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from typing import Iterator, List, Optional
2+
from cadence.api.v1.history_pb2 import HistoryEvent
13
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
24
from cadence.api.v1.service_workflow_pb2 import (
35
GetWorkflowExecutionHistoryRequest,
@@ -32,3 +34,25 @@ async def iterate_history_events(
3234
)
3335
current_page = response.history.events
3436
next_page_token = response.next_page_token
37+
38+
39+
class HistoryEventsIterator(Iterator[HistoryEvent]):
40+
def __init__(self, events: List[HistoryEvent]):
41+
self._iter = iter(events)
42+
self._current = next(self._iter, None)
43+
44+
def __iter__(self):
45+
return self
46+
47+
def __next__(self) -> HistoryEvent:
48+
if not self._current:
49+
raise StopIteration("No more events")
50+
event = self._current
51+
self._current = next(self._iter, None)
52+
return event
53+
54+
def has_next(self) -> bool:
55+
return self._current is not None
56+
57+
def peek(self) -> Optional[HistoryEvent]:
58+
return self._current

0 commit comments

Comments
 (0)