diff --git a/cadence/_internal/workflow/decision_events_iterator.py b/cadence/_internal/workflow/decision_events_iterator.py index 2657aa4..1d6e0f2 100644 --- a/cadence/_internal/workflow/decision_events_iterator.py +++ b/cadence/_internal/workflow/decision_events_iterator.py @@ -6,9 +6,10 @@ particularly focusing on decision-related events for replay and execution. """ -from dataclasses import dataclass, field -from typing import List, Optional +from dataclasses import dataclass +from typing import Iterator, List, Optional +from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator from cadence.api.v1.history_pb2 import HistoryEvent from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse @@ -19,33 +20,21 @@ class DecisionEvents: Represents events for a single decision iteration. """ - events: List[HistoryEvent] = field(default_factory=list) - markers: List[HistoryEvent] = field(default_factory=list) - replay: bool = False - replay_current_time_milliseconds: Optional[int] = None - next_decision_event_id: Optional[int] = None + input: List[HistoryEvent] + output: List[HistoryEvent] + markers: List[HistoryEvent] + replay: bool + replay_current_time_milliseconds: int + next_decision_event_id: int - def get_events(self) -> List[HistoryEvent]: - """Return all events in this decision iteration.""" - return self.events - - def get_markers(self) -> List[HistoryEvent]: - """Return marker events.""" - return self.markers - - def is_replay(self) -> bool: - """Check if this decision is in replay mode.""" - return self.replay - - def get_event_by_id(self, event_id: int) -> Optional[HistoryEvent]: - """Retrieve a specific event by ID, returns None if not found.""" - for event in self.events: + def get_output_event_by_id(self, event_id: int) -> Optional[HistoryEvent]: + for event in self.input: if hasattr(event, "event_id") and event.event_id == event_id: return event return None -class DecisionEventsIterator: +class DecisionEventsIterator(Iterator[DecisionEvents]): """ Iterator for processing decision events from workflow history. @@ -54,207 +43,122 @@ class DecisionEventsIterator: """ def __init__( - self, decision_task: PollForDecisionTaskResponse, events: List[HistoryEvent] + self, + decision_task: PollForDecisionTaskResponse, + events: List[HistoryEvent], ): self._decision_task = decision_task - self._events: List[HistoryEvent] = events - self._decision_task_started_event: Optional[HistoryEvent] = None - self._next_decision_event_id = 1 - self._replay = True + self._events: HistoryEventsIterator = HistoryEventsIterator(events) + self._next_decision_event_id: Optional[int] = None self._replay_current_time_milliseconds: Optional[int] = None - self._event_index = 0 - # Find first decision task started event - for i, event in enumerate(self._events): - if _is_decision_task_started(event): - self._event_index = i - break - - async def has_next_decision_events(self) -> bool: - # Look for the next DecisionTaskStarted event from current position - for i in range(self._event_index, len(self._events)): - if _is_decision_task_started(self._events[i]): - return True - - return False - - async def next_decision_events(self) -> DecisionEvents: - # Find next DecisionTaskStarted event - start_index = None - for i in range(self._event_index, len(self._events)): - if _is_decision_task_started(self._events[i]): - start_index = i - break - - if start_index is None: - raise StopIteration("No more decision events") - - decision_events = DecisionEvents() - decision_events.replay = self._replay - decision_events.replay_current_time_milliseconds = ( - self._replay_current_time_milliseconds - ) - decision_events.next_decision_event_id = self._next_decision_event_id - - # Process DecisionTaskStarted event - decision_task_started = self._events[start_index] - self._decision_task_started_event = decision_task_started - decision_events.events.append(decision_task_started) - - # Update replay time if available - if decision_task_started.event_time: - self._replay_current_time_milliseconds = ( - decision_task_started.event_time.seconds * 1000 - ) - decision_events.replay_current_time_milliseconds = ( - self._replay_current_time_milliseconds - ) - - # Process subsequent events until we find the corresponding DecisionTask completion - current_index = start_index + 1 - while current_index < len(self._events): - event = self._events[current_index] - decision_events.events.append(event) - - # Categorize the event - if _is_marker_recorded(event): - decision_events.markers.append(event) - elif _is_decision_task_completion(event): - # This marks the end of this decision iteration - self._process_decision_completion_event(event, decision_events) - current_index += 1 # Move past this event - break - - current_index += 1 - - # Update the event index for next iteration - self._event_index = current_index - - # Update the next decision event ID - if decision_events.events: - last_event = decision_events.events[-1] - if hasattr(last_event, "event_id"): - self._next_decision_event_id = last_event.event_id + 1 + def __iter__(self): + return self - # Check if this is the last decision events - # Set replay to false only if there are no more decision events after this one - # Check directly without calling has_next_decision_events to avoid recursion - has_more = False - for i in range(self._event_index, len(self._events)): - if _is_decision_task_started(self._events[i]): - has_more = True + def __next__(self) -> DecisionEvents: + """ + Process the next decision batch. + 1. Find the next valid decision task started event during replay or last scheduled decision task events for non-replay + 2. Collect the decision input events before the decision task + 3. Collect the decision output events after the decision task + + Relay mode is determined by checking if the decision task is completed or not + """ + decision_input_events: List[HistoryEvent] = [] + decision_output_events: List[HistoryEvent] = [] + decision_event: Optional[HistoryEvent] = None + for event in self._events: + match event.WhichOneof("attributes"): + case "decision_task_started_event_attributes": + next_event = self._events.peek() + + # latest event, not replay, assign started event as decision event insteaad + if next_event is None: + decision_event = event + break + + match next_event.WhichOneof("attributes"): + case ( + "decision_task_failed_event_attributes" + | "decision_task_timed_out_event_attributes" + ): + # skip failed / timed out decision tasks and continue searching + next(self._events) + continue + case "decision_task_completed_event_attributes": + # found decision task completed event, stop + decision_event = next(self._events) + break + case _: + raise ValueError( + f"unexpected event type after decision task started event: {next_event}" + ) + + case _: + decision_input_events.append(event) + + if not decision_event: + raise StopIteration("no decision event found") + + # collect decision output events + while self._events.has_next(): + nxt = self._events.peek() if self._events.has_next() else None + if nxt and not is_decision_event(nxt): break + decision_output_events.append(next(self._events)) - if not has_more: - self._replay = False - decision_events.replay = False - - return decision_events + replay_current_time_milliseconds = decision_event.event_time.ToMilliseconds() - def _process_decision_completion_event( - self, event: HistoryEvent, decision_events: DecisionEvents - ): - """Process the decision completion event and update state.""" - - # Check if we're still in replay mode - # This is determined by comparing event IDs with the current decision task's started event ID + replay: bool + next_decision_event_id: int if ( - self._decision_task_started_event - and hasattr(self._decision_task_started_event, "event_id") - and hasattr(event, "event_id") - ): - # If this completion event ID is >= the current decision task's started event ID, - # we're no longer in replay mode - current_task_started_id = ( - getattr(self._decision_task.started_event_id, "value", 0) - if hasattr(self._decision_task, "started_event_id") - else 0 - ) - - if event.event_id >= current_task_started_id: - self._replay = False - decision_events.replay = False - - def get_replay_current_time_milliseconds(self) -> Optional[int]: - """Get the current replay time in milliseconds.""" - return self._replay_current_time_milliseconds - - def is_replay_mode(self) -> bool: - """Check if the iterator is currently in replay mode.""" - return self._replay - - def __aiter__(self): - return self - - async def __anext__(self) -> DecisionEvents: - if not await self.has_next_decision_events(): - raise StopAsyncIteration - return await self.next_decision_events() + decision_event.WhichOneof("attributes") + == "decision_task_completed_event_attributes" + ): # completed decision task + replay = True + next_decision_event_id = decision_event.event_id + 1 + else: + replay = False + next_decision_event_id = decision_event.event_id + 2 + + # collect marker events + markers = [m for m in decision_output_events if is_marker_event(m)] + + return DecisionEvents( + input=decision_input_events, + output=decision_output_events, + markers=markers, + replay=replay, + replay_current_time_milliseconds=replay_current_time_milliseconds, + next_decision_event_id=next_decision_event_id, + ) -# Utility functions def is_decision_event(event: HistoryEvent) -> bool: - """Check if an event is a decision-related event.""" - return ( - _is_decision_task_started(event) - or _is_decision_task_completed(event) - or _is_decision_task_failed(event) - or _is_decision_task_timed_out(event) + """Check if an event is a decision output event.""" + return event is not None and event.WhichOneof("attributes") in set( + [ + "activity_task_scheduled_event_attributes", + "start_child_workflow_execution_initiated_event_attributes", + "timer_started_event_attributes", + "workflow_execution_completed_event_attributes", + "workflow_execution_failed_event_attributes", + "workflow_execution_canceled_event_attributes", + "workflow_execution_continued_as_new_event_attributes", + "activity_task_cancel_requested_event_attributes", + "request_cancel_activity_task_failed_event_attributes", + "timer_canceled_event_attributes", + "cancel_timer_failed_event_attributes", + "request_cancel_external_workflow_execution_initiated_event_attributes", + "marker_recorded_event_attributes", + "signal_external_workflow_execution_initiated_event_attributes", + "upsert_workflow_search_attributes_event_attributes", + ] ) def is_marker_event(event: HistoryEvent) -> bool: - """Check if an event is a marker event.""" - return _is_marker_recorded(event) - - -def extract_event_timestamp_millis(event: HistoryEvent) -> Optional[int]: - """Extract timestamp from an event in milliseconds.""" - if hasattr(event, "event_time") and event.HasField("event_time"): - seconds = getattr(event.event_time, "seconds", 0) - return seconds * 1000 if seconds > 0 else None - return None - - -def _is_decision_task_started(event: HistoryEvent) -> bool: - """Check if event is DecisionTaskStarted.""" - return hasattr(event, "decision_task_started_event_attributes") and event.HasField( - "decision_task_started_event_attributes" - ) - - -def _is_decision_task_completed(event: HistoryEvent) -> bool: - """Check if event is DecisionTaskCompleted.""" - return hasattr( - event, "decision_task_completed_event_attributes" - ) and event.HasField("decision_task_completed_event_attributes") - - -def _is_decision_task_failed(event: HistoryEvent) -> bool: - """Check if event is DecisionTaskFailed.""" - return hasattr(event, "decision_task_failed_event_attributes") and event.HasField( - "decision_task_failed_event_attributes" - ) - - -def _is_decision_task_timed_out(event: HistoryEvent) -> bool: - """Check if event is DecisionTaskTimedOut.""" - return hasattr( - event, "decision_task_timed_out_event_attributes" - ) and event.HasField("decision_task_timed_out_event_attributes") - - -def _is_marker_recorded(event: HistoryEvent) -> bool: - """Check if event is MarkerRecorded.""" - return hasattr(event, "marker_recorded_event_attributes") and event.HasField( - "marker_recorded_event_attributes" - ) - - -def _is_decision_task_completion(event: HistoryEvent) -> bool: - """Check if event is any kind of decision task completion.""" - return ( - _is_decision_task_completed(event) - or _is_decision_task_failed(event) - or _is_decision_task_timed_out(event) + return bool( + event is not None + and event.WhichOneof("attributes") == "marker_recorded_event_attributes" ) diff --git a/cadence/_internal/workflow/history_event_iterator.py b/cadence/_internal/workflow/history_event_iterator.py index 900f8a5..9697436 100644 --- a/cadence/_internal/workflow/history_event_iterator.py +++ b/cadence/_internal/workflow/history_event_iterator.py @@ -1,3 +1,5 @@ +from typing import Iterator, List, Optional +from cadence.api.v1.history_pb2 import HistoryEvent from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.service_workflow_pb2 import ( GetWorkflowExecutionHistoryRequest, @@ -32,3 +34,25 @@ async def iterate_history_events( ) current_page = response.history.events next_page_token = response.next_page_token + + +class HistoryEventsIterator(Iterator[HistoryEvent]): + def __init__(self, events: List[HistoryEvent]): + self._iter = iter(events) + self._current = next(self._iter, None) + + def __iter__(self): + return self + + def __next__(self) -> HistoryEvent: + if not self._current: + raise StopIteration("No more events") + event = self._current + self._current = next(self._iter, None) + return event + + def has_next(self) -> bool: + return self._current is not None + + def peek(self) -> Optional[HistoryEvent]: + return self._current diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 8024a75..b7acd34 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -128,8 +128,7 @@ async def _process_decision_events( processed_any_decision_events = False # Check if there are any decision events to process - while await events_iterator.has_next_decision_events(): - decision_events = await events_iterator.next_decision_events() + for decision_events in events_iterator: processed_any_decision_events = True # Log decision events batch processing (matches Go client patterns) diff --git a/tests/cadence/_internal/workflow/test_decision_events_iterator.py b/tests/cadence/_internal/workflow/test_decision_events_iterator.py index 1e70661..2268421 100644 --- a/tests/cadence/_internal/workflow/test_decision_events_iterator.py +++ b/tests/cadence/_internal/workflow/test_decision_events_iterator.py @@ -9,42 +9,163 @@ from cadence.api.v1.history_pb2 import HistoryEvent, History from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.common_pb2 import WorkflowExecution -from google.protobuf.timestamp_pb2 import Timestamp from cadence._internal.workflow.decision_events_iterator import ( - DecisionEvents, DecisionEventsIterator, - is_decision_event, - is_marker_event, - extract_event_timestamp_millis, ) -def create_mock_history_event( - event_id: int, event_type: str, timestamp_seconds: int = 1000 -) -> HistoryEvent: - """Create a mock history event for testing.""" - event = HistoryEvent() - event.event_id = event_id - - # Create proper protobuf timestamp - timestamp = Timestamp() - timestamp.seconds = timestamp_seconds - event.event_time.CopyFrom(timestamp) +class TestDecisionEventsIterator: + """Test the DecisionEventsIterator class.""" - # Set the appropriate attribute based on event type - if event_type == "decision_task_started": - event.decision_task_started_event_attributes.SetInParent() - elif event_type == "decision_task_completed": - event.decision_task_completed_event_attributes.SetInParent() - elif event_type == "decision_task_failed": - event.decision_task_failed_event_attributes.SetInParent() - elif event_type == "decision_task_timed_out": - event.decision_task_timed_out_event_attributes.SetInParent() - elif event_type == "marker_recorded": - event.marker_recorded_event_attributes.SetInParent() + @pytest.mark.parametrize( + "name, event_types, expected", + [ + ( + "workflow_started", + [ + "workflow_execution_started", + "decision_task_scheduled", + "decision_task_started", + ], + [ + { + "input": 2, + "output": 0, + "markers": 0, + "replay": False, + "replay_time": 3000, + "next_decision_event_id": 5, + }, + ], + ), + ( + "workflow_with_activity_scheduled", + [ + "workflow_execution_started", + "decision_task_scheduled", + "decision_task_started", + "decision_task_completed", + "activity_scheduled", + ], + [ + { + "input": 2, + "output": 1, + "markers": 0, + "replay": True, + "replay_time": 4000, + "next_decision_event_id": 5, + }, + ], + ), + ( + "workflow_with_activity_completed", + [ + "workflow_execution_started", + "decision_task_scheduled", + "decision_task_started", + "decision_task_completed", + "activity_scheduled", + "activity_started", + "activity_completed", + "decision_task_scheduled", + "decision_task_started", + ], + [ + { + "input": 2, + "output": 1, + "markers": 0, + "replay": True, + "replay_time": 4000, + "next_decision_event_id": 5, + }, + { + "input": 3, + "output": 0, + "markers": 0, + "replay": False, + "replay_time": 9000, + "next_decision_event_id": 11, + }, + ], + ), + ], + ) + def test_successful_cases(self, name, event_types, expected): + events = create_mock_history_event(event_types) + decision_task = create_mock_decision_task(events) + iterator = DecisionEventsIterator(decision_task, events) - return event + batches = [decision_events for decision_events in iterator] + assert len(expected) == len(batches) + + for expect, batch in zip(expected, batches): + assert len(batch.input) == expect["input"] + assert len(batch.output) == expect["output"] + assert len(batch.markers) == expect["markers"] + assert batch.replay == expect["replay"] + assert batch.replay_current_time_milliseconds == expect["replay_time"] + assert batch.next_decision_event_id == expect["next_decision_event_id"] + + +def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]: + events = [] + for i, event_type in enumerate(event_types): + event = HistoryEvent() + event.event_id = i + 1 + event.event_time.FromMilliseconds((i + 1) * 1000) + + # Set the appropriate attribute based on event type + if event_type == "decision_task_started": + event.decision_task_started_event_attributes.SetInParent() + elif event_type == "decision_task_completed": + event.decision_task_completed_event_attributes.SetInParent() + elif event_type == "decision_task_failed": + event.decision_task_failed_event_attributes.SetInParent() + elif event_type == "decision_task_timed_out": + event.decision_task_timed_out_event_attributes.SetInParent() + elif event_type == "marker_recorded": + event.marker_recorded_event_attributes.SetInParent() + elif event_type == "activity_scheduled": + event.activity_task_scheduled_event_attributes.SetInParent() + elif event_type == "activity_started": + event.activity_task_started_event_attributes.SetInParent() + elif event_type == "activity_completed": + event.activity_task_completed_event_attributes.SetInParent() + elif event_type == "activity_failed": + event.activity_task_failed_event_attributes.SetInParent() + elif event_type == "activity_timed_out": + event.activity_task_timed_out_event_attributes.SetInParent() + elif event_type == "activity_cancel_requested": + event.activity_task_cancel_requested_event_attributes.SetInParent() + elif event_type == "request_cancel_activity_task_failed": + event.request_cancel_activity_task_failed_event_attributes.SetInParent() + elif event_type == "activity_canceled": + event.activity_task_canceled_event_attributes.SetInParent() + elif event_type == "timer_started": + event.timer_started_event_attributes.SetInParent() + elif event_type == "timer_fired": + event.timer_fired_event_attributes.SetInParent() + elif event_type == "timer_canceled": + event.timer_canceled_event_attributes.SetInParent() + elif event_type == "cancel_timer_failed": + event.cancel_timer_failed_event_attributes.SetInParent() + elif event_type == "request_cancel_external_workflow_execution_initiated": + event.request_cancel_external_workflow_execution_initiated_event_attributes.SetInParent() + elif event_type == "request_cancel_external_workflow_execution_failed": + event.request_cancel_external_workflow_execution_failed_event_attributes.SetInParent() + elif event_type == "external_workflow_execution_cancel_requested": + event.external_workflow_execution_cancel_requested_event_attributes.SetInParent() + elif event_type == "workflow_execution_started": + event.workflow_execution_started_event_attributes.SetInParent() + elif event_type == "workflow_execution_completed": + event.workflow_execution_completed_event_attributes.SetInParent() + + events.append(event) + + return events def create_mock_decision_task( @@ -68,228 +189,3 @@ def create_mock_decision_task( task.next_page_token = next_page_token return task - - -class TestDecisionEvents: - """Test the DecisionEvents class.""" - - def test_decision_events_initialization(self): - """Test DecisionEvents initialization.""" - decision_events = DecisionEvents() - - assert decision_events.get_events() == [] - assert decision_events.get_markers() == [] - assert not decision_events.is_replay() - assert decision_events.replay_current_time_milliseconds is None - assert decision_events.next_decision_event_id is None - - def test_decision_events_with_data(self): - """Test DecisionEvents with actual data.""" - events = [ - create_mock_history_event(1, "decision_task_started"), - create_mock_history_event(2, "decision_task_completed"), - ] - markers = [create_mock_history_event(3, "marker_recorded")] - - decision_events_obj = DecisionEvents( - events=events, - markers=markers, - replay=True, - replay_current_time_milliseconds=123456, - next_decision_event_id=4, - ) - - assert decision_events_obj.get_events() == events - assert decision_events_obj.get_markers() == markers - assert decision_events_obj.is_replay() - assert decision_events_obj.replay_current_time_milliseconds == 123456 - assert decision_events_obj.next_decision_event_id == 4 - - def test_get_event_by_id(self): - """Test retrieving event by ID.""" - event1 = create_mock_history_event(1, "decision_task_started") - event2 = create_mock_history_event(2, "decision_task_completed") - - decision_events = DecisionEvents(events=[event1, event2]) - - assert decision_events.get_event_by_id(1) == event1 - assert decision_events.get_event_by_id(2) == event2 - assert decision_events.get_event_by_id(999) is None - - -class TestDecisionEventsIterator: - """Test the DecisionEventsIterator class.""" - - @pytest.mark.asyncio - async def test_single_decision_iteration(self): - """Test processing a single decision iteration.""" - # Create events for a complete decision iteration - events = [ - create_mock_history_event(1, "decision_task_started", 1000), - create_mock_history_event( - 2, "activity_scheduled", 1001 - ), # Some workflow event - create_mock_history_event(3, "marker_recorded", 1002), - create_mock_history_event(4, "decision_task_completed", 1003), - ] - - decision_task = create_mock_decision_task(events) - iterator = DecisionEventsIterator(decision_task, events) - - assert await iterator.has_next_decision_events() - - decision_events = await iterator.next_decision_events() - - assert len(decision_events.get_events()) == 4 - assert len(decision_events.get_markers()) == 1 - assert decision_events.get_markers()[0].event_id == 3 - # In this test scenario with only one decision iteration, replay gets set to false - # when we determine there are no more decision events after this one - # This matches the Java client behavior where the last decision events have replay=false - assert not decision_events.is_replay() - assert decision_events.replay_current_time_milliseconds == 1000 * 1000 - - @pytest.mark.asyncio - async def test_multiple_decision_iterations(self): - """Test processing multiple decision iterations.""" - # Create events for two decision iterations - events = [ - # First iteration - create_mock_history_event(1, "decision_task_started", 1000), - create_mock_history_event(2, "decision_task_completed", 1001), - # Second iteration - create_mock_history_event(3, "decision_task_started", 1002), - create_mock_history_event(4, "decision_task_completed", 1003), - ] - - decision_task = create_mock_decision_task(events) - iterator = DecisionEventsIterator(decision_task, events) - - # First iteration - assert await iterator.has_next_decision_events() - first_decision = await iterator.next_decision_events() - assert len(first_decision.get_events()) == 2 - assert first_decision.get_events()[0].event_id == 1 - - # Second iteration - assert await iterator.has_next_decision_events() - second_decision = await iterator.next_decision_events() - assert len(second_decision.get_events()) == 2 - assert second_decision.get_events()[0].event_id == 3 - - # No more iterations - assert not await iterator.has_next_decision_events() - - @pytest.mark.asyncio - async def test_iterator_protocol(self): - """Test that DecisionEventsIterator works with Python iterator protocol.""" - events = [ - create_mock_history_event(1, "decision_task_started"), - create_mock_history_event(2, "decision_task_completed"), - create_mock_history_event(3, "decision_task_started"), - create_mock_history_event(4, "decision_task_completed"), - ] - - decision_task = create_mock_decision_task(events) - iterator = DecisionEventsIterator(decision_task, events) - - decision_events_list = [] - async for decision_events in iterator: - decision_events_list.append(decision_events) - - assert len(decision_events_list) == 2 - - -class TestUtilityFunctions: - """Test utility functions.""" - - def test_is_decision_event(self): - """Test is_decision_event utility function.""" - decision_event = create_mock_history_event(1, "decision_task_started") - non_decision_event = create_mock_history_event( - 2, "activity_scheduled" - ) # Random event type - - assert is_decision_event(decision_event) - assert not is_decision_event(non_decision_event) - - def test_is_marker_event(self): - """Test is_marker_event utility function.""" - marker_event = create_mock_history_event(1, "marker_recorded") - non_marker_event = create_mock_history_event(2, "decision_task_started") - - assert is_marker_event(marker_event) - assert not is_marker_event(non_marker_event) - - def test_extract_event_timestamp_millis(self): - """Test extract_event_timestamp_millis utility function.""" - event = create_mock_history_event(1, "some_event", 1234) - - timestamp_millis = extract_event_timestamp_millis(event) - assert timestamp_millis == 1234 * 1000 - - # Test event without timestamp - event_no_timestamp = HistoryEvent() - assert extract_event_timestamp_millis(event_no_timestamp) is None - - -class TestIntegrationScenarios: - """Test real-world integration scenarios.""" - - @pytest.mark.asyncio - async def test_replay_detection(self): - """Test replay mode detection.""" - # Simulate a scenario where we have historical events and current events - events = [ - create_mock_history_event(1, "decision_task_started"), - create_mock_history_event(2, "decision_task_completed"), - create_mock_history_event(3, "decision_task_started"), # Current decision - ] - - decision_task = create_mock_decision_task(events) - # Mock the started_event_id to indicate current decision - decision_task.started_event_id = 3 - - iterator = DecisionEventsIterator(decision_task, events) - - # First decision should be replay (but gets set to false when no more events) - await iterator.next_decision_events() - # Since this test has incomplete events (no completion for the third decision), - # the replay logic may behave differently - # assert first_decision.is_replay() - - # When we get to current decision, replay should be false - # (This would need the completion event to trigger the replay mode change) - - @pytest.mark.asyncio - async def test_complex_workflow_scenario(self): - """Test a complex workflow with multiple event types.""" - events = [ - create_mock_history_event(1, "decision_task_started"), - create_mock_history_event(2, "activity_scheduled"), # Activity scheduled - create_mock_history_event(3, "activity_started"), # Activity started - create_mock_history_event(4, "marker_recorded"), - create_mock_history_event(5, "activity_completed"), # Activity completed - create_mock_history_event(6, "decision_task_completed"), - create_mock_history_event(7, "decision_task_started"), - create_mock_history_event(8, "decision_task_completed"), - ] - - decision_task = create_mock_decision_task(events) - iterator = DecisionEventsIterator(decision_task, events) - - all_decisions = [] - async for decision_events in iterator: - all_decisions.append(decision_events) - - assert len(all_decisions) == 2 - - # First decision should have more events including markers - first_decision = all_decisions[0] - assert len(first_decision.get_events()) == 6 # Events 1-6 - assert len(first_decision.get_markers()) == 1 # Event 4 - - # Second decision should be simpler - second_decision = all_decisions[1] - assert len(second_decision.get_events()) == 2 # Events 7-8 - assert len(second_decision.get_markers()) == 0