From 459acbe2d575b9e2400123e5c99f10ddbdd0249a Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 09:57:21 +0100 Subject: [PATCH 1/9] grouping utility Signed-off-by: Assem Hafez --- .../workflow-history-grouper.test.tsx | 718 ++++++++++++++++++ .../helpers/workflow-history-grouper.ts | 522 +++++++++++++ .../helpers/workflow-history-grouper.types.ts | 57 ++ 3 files changed, 1297 insertions(+) create mode 100644 src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx create mode 100644 src/views/workflow-history/helpers/workflow-history-grouper.ts create mode 100644 src/views/workflow-history/helpers/workflow-history-grouper.types.ts diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx new file mode 100644 index 000000000..706a7e99f --- /dev/null +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -0,0 +1,718 @@ +import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent'; + +import { + completedActivityTaskEvents, + scheduleActivityTaskEvent, + startActivityTaskEvent, +} from '../../__fixtures__/workflow-history-activity-events'; +import { + completedDecisionTaskEvents, + scheduleDecisionTaskEvent, + startDecisionTaskEvent, +} from '../../__fixtures__/workflow-history-decision-events'; +import { + pendingActivityTaskStartEvent, + pendingDecisionTaskStartEvent, +} from '../../__fixtures__/workflow-history-pending-events'; +import type { + ActivityHistoryGroup, + PendingActivityTaskStartEvent, + PendingDecisionTaskStartEvent, +} from '../../workflow-history.types'; +import WorkflowHistoryGrouper from '../workflow-history-grouper'; +import type { Props } from '../workflow-history-grouper.types'; + +// Create pending decision that matches the scheduleDecisionTaskEvent (eventId: '2') +const pendingDecisionForScheduledEvent = { + ...pendingDecisionTaskStartEvent, + computedEventId: 'pending-2', + pendingDecisionTaskStartEventAttributes: { + ...pendingDecisionTaskStartEvent.pendingDecisionTaskStartEventAttributes, + scheduleId: '2', + }, +} as const satisfies PendingDecisionTaskStartEvent; + +// Helper to create a grouper with a mock onChange +function createGrouper(options: Partial = {}) { + const onChange = jest.fn(); + const grouper = new WorkflowHistoryGrouper({ + onChange, + ...options, + }); + return { grouper, onChange }; +} + +// Helper to wait for processing to complete +function waitForProcessing(onChange: jest.Mock, timeout = 5000): Promise { + return new Promise((resolve, reject) => { + const startTime = Date.now(); + + const checkComplete = () => { + // Check if onChange was called with status 'idle' or 'error' + const lastCall = onChange.mock.calls[onChange.mock.calls.length - 1]; + if ( + lastCall && + (lastCall[0].status === 'idle' || lastCall[0].status === 'error') + ) { + resolve(); + return; + } + + // Check timeout + if (Date.now() - startTime > timeout) { + reject(new Error('Timeout waiting for processing to complete')); + return; + } + + // Check again soon + setTimeout(checkComplete, 10); + }; + + // Start checking after a short delay to let updateEvents kick off + setTimeout(checkComplete, 10); + }); +} + +describe(WorkflowHistoryGrouper.name, () => { + describe('basic event processing', () => { + it('should process events and create groups', async () => { + const { grouper, onChange } = createGrouper(); + + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + const groups = grouper.getGroups(); + expect(groups).toBeDefined(); + expect(groups['7']).toBeDefined(); + expect(groups['7'].groupType).toBe('Activity'); + expect(grouper.getLastProcessedEventIndex()).toBe(2); + }); + + it('should not reprocess events on subsequent calls with same events', async () => { + const { grouper, onChange } = createGrouper(); + + // First call + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + const initialGroups = grouper.getGroups(); + const initialIndex = grouper.getLastProcessedEventIndex(); + + // Second call with same events - should not trigger processing + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + // Give it a moment, but don't wait for onChange since nothing should happen + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(grouper.getGroups()).toEqual(initialGroups); + expect(grouper.getLastProcessedEventIndex()).toBe(initialIndex); + }); + + it('should process only new events on subsequent calls', async () => { + const { grouper, onChange } = createGrouper(); + + // First call with partial events + grouper.updateEvents([ + scheduleActivityTaskEvent, + startActivityTaskEvent, + ] as HistoryEvent[]); + await waitForProcessing(onChange); + + expect(grouper.getLastProcessedEventIndex()).toBe(1); + + // Second call with all events + onChange.mockClear(); + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + const groups = grouper.getGroups(); + expect(grouper.getLastProcessedEventIndex()).toBe(2); + expect(groups['7']).toBeDefined(); + expect((groups['7'] as ActivityHistoryGroup).events).toHaveLength(3); + }); + }); + + describe('pending activities management', () => { + it('should add new pending activities to groups', async () => { + const { grouper, onChange } = createGrouper(); + + // First call with scheduled event only + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Update with pending activity + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + const groups = grouper.getGroups(); + const activityGroup = groups['7'] as ActivityHistoryGroup; + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should remove stale pending activities from groups', async () => { + const { grouper, onChange } = createGrouper(); + + // First call with pending activity + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + const firstGroups = grouper.getGroups(); + const firstActivityGroup = firstGroups['7'] as ActivityHistoryGroup; + expect(firstActivityGroup.events).toHaveLength(2); + + // Second call without pending activity (it completed) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, + }); + + const groups = grouper.getGroups(); + const activityGroup = groups['7'] as ActivityHistoryGroup; + expect(activityGroup.events).toHaveLength(1); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + }); + + it('should handle multiple pending activity state transitions', async () => { + const { grouper, onChange } = createGrouper(); + + // Initial state + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Add pending activity + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Remove pending activity (it started) + onChange.mockClear(); + grouper.updateEvents([ + scheduleActivityTaskEvent, + startActivityTaskEvent, + ] as HistoryEvent[]); + await waitForProcessing(onChange); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, + }); + + const activityGroup = grouper.getGroups()['7'] as ActivityHistoryGroup; + expect(activityGroup.events).toHaveLength(2); + expect( + activityGroup.events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(false); + }); + }); + + describe('pending decision management', () => { + it('should add new pending decision to groups', async () => { + const { grouper, onChange } = createGrouper(); + + // First call with scheduled event only + grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Add pending decision + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecisionForScheduledEvent, + }); + + const decisionGroup = grouper.getGroups()['2']; + expect(decisionGroup.groupType).toBe('Decision'); + expect(decisionGroup.events).toHaveLength(2); + }); + + it('should remove stale pending decision from groups', async () => { + const { grouper, onChange } = createGrouper(); + + // First call with pending decision + grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecisionForScheduledEvent, + }); + + const firstGroups = grouper.getGroups(); + expect(firstGroups['2'].events).toHaveLength(2); + + // Second call without pending decision (it completed) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, + }); + + const decisionGroup = grouper.getGroups()['2']; + expect(decisionGroup.events).toHaveLength(1); + }); + }); + + describe('state management', () => { + it('should track last processed event index correctly', () => { + const { grouper } = createGrouper(); + + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + }); + + it('should return current groups without processing', () => { + const { grouper } = createGrouper(); + + const groups = grouper.getGroups(); + + expect(groups).toEqual({}); + }); + + it('should reset grouper state', async () => { + const { grouper, onChange } = createGrouper(); + + // Process some events + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + expect(grouper.getLastProcessedEventIndex()).toBe(2); + expect(Object.keys(grouper.getGroups()).length).toBeGreaterThan(0); + + // Reset + grouper.reset(); + + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + expect(grouper.getGroups()).toEqual({}); + }); + + it('should reprocess events after reset', async () => { + const { grouper, onChange } = createGrouper(); + + // Process events + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + const firstGroups = grouper.getGroups(); + + // Reset and reprocess + grouper.reset(); + onChange.mockClear(); + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + expect(grouper.getGroups()).toEqual(firstGroups); + }); + }); + + describe('pending event buffering', () => { + it('should buffer pending activity when group does not exist yet', async () => { + const { grouper, onChange } = createGrouper(); + + // Add pending activity BEFORE scheduled event exists + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Group should NOT exist yet (pending event is buffered) + let groups = grouper.getGroups(); + expect(groups['7']).toBeUndefined(); + + // Now add the scheduled event + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Group should now exist with both scheduled and pending events + groups = grouper.getGroups(); + const activityGroup = groups['7'] as ActivityHistoryGroup; + expect(activityGroup).toBeDefined(); + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should buffer pending decision when group does not exist yet', async () => { + const { grouper, onChange } = createGrouper(); + + // Add pending decision BEFORE scheduled event exists + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecisionForScheduledEvent, + }); + + // Group should NOT exist yet (pending event is buffered) + let groups = grouper.getGroups(); + expect(groups['2']).toBeUndefined(); + + // Now add the scheduled event + grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Group should now exist with both scheduled and pending events + groups = grouper.getGroups(); + const decisionGroup = groups['2']; + expect(decisionGroup).toBeDefined(); + expect(decisionGroup.groupType).toBe('Decision'); + expect(decisionGroup.events).toHaveLength(2); + }); + + it('should handle multiple buffered pending activities', async () => { + const { grouper, onChange } = createGrouper(); + + const pendingActivity1 = { + ...pendingActivityTaskStartEvent, + computedEventId: 'pending-7', + pendingActivityTaskStartEventAttributes: { + ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, + scheduleId: '7', + }, + } as const satisfies PendingActivityTaskStartEvent; + + const pendingActivity2 = { + ...pendingActivityTaskStartEvent, + computedEventId: 'pending-10', + pendingActivityTaskStartEventAttributes: { + ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, + scheduleId: '10', + activityId: '1', + }, + } as const satisfies PendingActivityTaskStartEvent; + + const scheduleEvent2 = { + ...scheduleActivityTaskEvent, + eventId: '10', + activityTaskScheduledEventAttributes: { + ...scheduleActivityTaskEvent.activityTaskScheduledEventAttributes, + activityId: '1', + }, + }; + + // Add multiple pending activities BEFORE their scheduled events + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivity1, pendingActivity2], + pendingStartDecision: null, + }); + + // No groups should exist yet + expect(Object.keys(grouper.getGroups()).length).toBe(0); + + // Add first scheduled event + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // First group should now exist + let groups = grouper.getGroups(); + expect(groups['7']).toBeDefined(); + expect(groups['10']).toBeUndefined(); + + // Add second scheduled event + onChange.mockClear(); + grouper.updateEvents([ + scheduleActivityTaskEvent, + scheduleEvent2, + ] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Both groups should now exist + groups = grouper.getGroups(); + expect(groups['7']).toBeDefined(); + expect(groups['10']).toBeDefined(); + expect((groups['7'] as ActivityHistoryGroup).events).toHaveLength(2); + expect((groups['10'] as ActivityHistoryGroup).events).toHaveLength(2); + }); + + it('should clear buffer when pending events are updated', async () => { + const { grouper } = createGrouper(); + + const pendingActivity1 = { + ...pendingActivityTaskStartEvent, + computedEventId: 'pending-7', + } as const satisfies PendingActivityTaskStartEvent; + + const pendingActivity2 = { + ...pendingActivityTaskStartEvent, + computedEventId: 'pending-10', + pendingActivityTaskStartEventAttributes: { + ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, + scheduleId: '10', + }, + } as const satisfies PendingActivityTaskStartEvent; + + // Add pending activity that won't have a group + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivity1], + pendingStartDecision: null, + }); + + // Update with different pending activity (old one should be removed from buffer) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivity2], + pendingStartDecision: null, + }); + + // No groups should exist + expect(Object.keys(grouper.getGroups()).length).toBe(0); + }); + + it('should clear buffer on reset', async () => { + const { grouper, onChange } = createGrouper(); + + // Add pending activity without scheduled event (will be buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Reset the grouper + grouper.reset(); + + // Add scheduled event after reset + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Group should only have scheduled event (buffered pending was cleared) + const groups = grouper.getGroups(); + const activityGroup = groups['7'] as ActivityHistoryGroup; + expect(activityGroup.events).toHaveLength(1); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + }); + + it('should apply buffered pending events after updatePendingEvents if groups now exist', async () => { + const { grouper, onChange } = createGrouper(); + + // Add pending activity BEFORE scheduled event (will be buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // No group yet + expect(grouper.getGroups()['7']).toBeUndefined(); + + // Process scheduled event + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Call updatePendingEvents again with same pending activity + // This should trigger applyBufferedPendingEvents and merge the buffered event + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Group should now have both events + const groups = grouper.getGroups(); + const activityGroup = groups['7'] as ActivityHistoryGroup; + expect(activityGroup.events).toHaveLength(2); + }); + + it('should handle scenario where scheduled event arrives after pending event update', async () => { + const { grouper, onChange } = createGrouper(); + + // Step 1: Pending activity arrives first (buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Step 2: Scheduled event arrives + grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Step 3: Another updatePendingEvents call (maybe with different pending events) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Should have complete group with both events + const groups = grouper.getGroups(); + const activityGroup = groups['7'] as ActivityHistoryGroup; + expect(activityGroup).toBeDefined(); + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should not create incomplete groups when pending arrives before scheduled', async () => { + const { grouper } = createGrouper(); + + // Only add pending activity (no scheduled event) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Group should NOT exist in the UI + const groups = grouper.getGroups(); + expect(groups['7']).toBeUndefined(); + expect(Object.keys(groups).length).toBe(0); + }); + + it('should handle pending decision buffer clearing when decision changes', async () => { + const { grouper } = createGrouper(); + + const pendingDecision1 = { + ...pendingDecisionTaskStartEvent, + computedEventId: 'pending-7', + } as const satisfies PendingDecisionTaskStartEvent; + + const pendingDecision2 = { + ...pendingDecisionTaskStartEvent, + computedEventId: 'pending-10', + pendingDecisionTaskStartEventAttributes: { + ...pendingDecisionTaskStartEvent.pendingDecisionTaskStartEventAttributes, + scheduleId: '10', + }, + } as const satisfies PendingDecisionTaskStartEvent; + + // Buffer first decision + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecision1, + }); + + // Update with different decision (old one should be removed from buffer) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecision2, + }); + + // No groups should exist + expect(Object.keys(grouper.getGroups()).length).toBe(0); + }); + }); + + describe('decision group filtering', () => { + it('should filter out pending decision when decision group has more than 2 events', async () => { + const { grouper, onChange } = createGrouper(); + + // Add scheduled event and pending decision + grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecisionForScheduledEvent, + }); + + // Group should have 2 events (scheduled + pending) + let groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + + // Now add started event (makes it 3 events total) + onChange.mockClear(); + grouper.updateEvents([ + scheduleDecisionTaskEvent, + startDecisionTaskEvent, + ] as HistoryEvent[]); + await waitForProcessing(onChange); + + // Pending decision should be filtered out when there are more than 2 events + groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + + // Add completed event (makes it 3+ events) + onChange.mockClear(); + grouper.updateEvents(completedDecisionTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + // Still should not have pending decision + groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(3); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + }); + + it('should keep pending decision when decision group has exactly 2 events', async () => { + const { grouper, onChange } = createGrouper(); + + // Add scheduled event and pending decision + grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); + await waitForProcessing(onChange); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: pendingDecisionForScheduledEvent, + }); + + // Group should have 2 events (scheduled + pending) + const groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(true); + }); + }); + + describe('groups shallow copy in onChange', () => { + it('should return shallow copy of groups object in onChange callback', async () => { + const { grouper, onChange } = createGrouper(); + + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + // Get groups from onChange callback + const lastCall = onChange.mock.calls[onChange.mock.calls.length - 1]; + const groupsFromCallback = lastCall[0].currentGroups; + + // Try to add a new group to the callback's groups object + groupsFromCallback['999'] = groupsFromCallback['7']; + + // Internal groups should not have the new group (shallow copy protects object structure) + const internalGroups = grouper.getGroups(); + expect(internalGroups['999']).toBeUndefined(); + }); + + it('should allow modification of group properties (shallow copy limitation)', async () => { + const { grouper, onChange } = createGrouper(); + + grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); + await waitForProcessing(onChange); + + // Get groups from onChange callback + const lastCall = onChange.mock.calls[onChange.mock.calls.length - 1]; + const groupsFromCallback = lastCall[0].currentGroups; + const originalLabel = groupsFromCallback['7'].label; + + // Modify a group's property - this WILL affect internal state (shallow copy limitation) + groupsFromCallback['7'].label = 'Modified Label'; + + // Internal groups ARE modified since group objects are shared references + const internalGroups = grouper.getGroups(); + expect(internalGroups['7'].label).toBe('Modified Label'); + expect(internalGroups['7'].label).not.toBe(originalLabel); + }); + }); +}); diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts new file mode 100644 index 000000000..51cb2638b --- /dev/null +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -0,0 +1,522 @@ +import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent'; +import logger from '@/utils/logger'; + +import type { + ExtendedActivityHistoryEvent, + ExtendedDecisionHistoryEvent, + HistoryEventsGroup, + HistoryEventsGroups, + PendingActivityTaskStartEvent, + PendingDecisionTaskStartEvent, +} from '../workflow-history.types'; + +import isChildWorkflowExecutionEvent from './check-history-event-group/is-child-workflow-execution-event'; +import isExtendedActivityEvent from './check-history-event-group/is-extended-activity-event'; +import isExtendedDecisionEvent from './check-history-event-group/is-extended-decision-event'; +import isRequestCancelExternalWorkflowExecutionEvent from './check-history-event-group/is-request-cancel-external-workflow-execution-event'; +import isSignalExternalWorkflowExecutionEvent from './check-history-event-group/is-signal-external-workflow-execution-event'; +import isSingleEvent from './check-history-event-group/is-single-event'; +import isTimerEvent from './check-history-event-group/is-timer-event'; +import getHistoryEventGroupId from './get-history-event-group-id'; +import getActivityGroupFromEvents from './get-history-group-from-events/get-activity-group-from-events'; +import getChildWorkflowExecutionGroupFromEvents from './get-history-group-from-events/get-child-workflow-execution-group-from-events'; +import getDecisionGroupFromEvents from './get-history-group-from-events/get-decision-group-from-events'; +import getRequestCancelExternalWorkflowExecutionGroupFromEvents from './get-history-group-from-events/get-request-cancel-external-workflow-execution-group-from-events'; +import getSignalExternalWorkflowExecutionGroupFromEvents from './get-history-group-from-events/get-signal-external-workflow-execution-group-from-events'; +import getSingleEventGroupFromEvents from './get-history-group-from-events/get-single-event-group-from-events'; +import getTimerGroupFromEvents from './get-history-group-from-events/get-timer-group-from-events'; +import placeEventInGroupEvents from './place-event-in-group-events'; +import { + type GroupingProcessState, + type ProcessEventsParams, + type Props, +} from './workflow-history-grouper.types'; + +/** + * Stateful history events grouper that processes events incrementally. + * + * This class maintains the state of processed events and groups, allowing + * efficient incremental updates as new events arrive. It tracks pending + * activities and decisions, automatically adding new ones and removing + * stale ones from groups. + */ +export default class WorkflowHistoryGrouper { + private allEvents: HistoryEvent[] = []; + private lastProcessedEventIndex: number = -1; + private groups: HistoryEventsGroups = {}; + private currentPendingActivities: PendingActivityTaskStartEvent[] = []; + private currentPendingDecision: PendingDecisionTaskStartEvent | null = null; + private onChange: (state: GroupingProcessState) => void; + private batchSize?: number; + private isProcessing: boolean = false; + + // Buffer for pending events that arrived before their group exists + private bufferedPendingActivities: PendingActivityTaskStartEvent[] = []; + private bufferedPendingDecision: PendingDecisionTaskStartEvent | null = null; + + constructor({ onChange, batchSize }: Props) { + this.onChange = onChange; + this.batchSize = batchSize; + } + + /** + * Updates the events list and automatically starts processing. + * The processor will continue batch by batch until all events are processed. + * If already processing, the new events will be queued and processed after current batch completes. + * Listen to onChange for progress updates. + * + */ + public updateEvents(events: HistoryEvent[]): void { + // Update allEvents with the latest complete list + this.allEvents = events; + + // If already processing, the loop will automatically pick up the new events + // No need to do anything - the pointer-based approach handles this + if (this.isProcessing) { + return; + } + + this.startProcessing(); + } + + /** + * Updates pending events (activities and decisions). + * This should be called separately from updateEvents. + */ + public async updatePendingEvents(params: ProcessEventsParams) { + // Update pending events (add new ones, remove stale ones) + + const currentPendingActivities = this.currentPendingActivities; + const currentPendingDecision = this.currentPendingDecision; + + this.currentPendingActivities = params.pendingStartActivities; + this.currentPendingDecision = params.pendingStartDecision; + + this.bufferedPendingActivities = []; + this.bufferedPendingDecision = null; + + this.processPendingEvents( + currentPendingActivities, + params.pendingStartActivities, + currentPendingDecision, + params.pendingStartDecision + ); + } + + /** + * Resets the grouper state, clearing all processed events and groups. + * Useful for reprocessing events from scratch. + */ + public reset(): void { + this.allEvents = []; + this.lastProcessedEventIndex = -1; + this.groups = {}; + this.currentPendingActivities = []; + this.currentPendingDecision = null; + this.bufferedPendingActivities = []; + this.bufferedPendingDecision = null; + this.isProcessing = false; + } + + /** + * Gets the current groups without processing new events. + * + * @returns Current state of groups + */ + public getGroups(): HistoryEventsGroups { + return this.groups; + } + + /** + * Gets the index of the last processed event. + */ + public getLastProcessedEventIndex(): number { + return this.lastProcessedEventIndex; + } + + // ============================================================================ + // Private Implementation + // ============================================================================ + + /** + * Starts the processing cycle. + * Schedules the first batch - all batches go through the scheduler. + */ + private startProcessing(): void { + // Check if there are events to process + if ( + this.isProcessing || + this.lastProcessedEventIndex >= this.allEvents.length - 1 + ) { + return; + } + + this.isProcessing = true; + + // Schedule the first batch (and all subsequent batches will be scheduled too) + this.scheduleNextBatch(); + } + + /** + * Schedules the next batch using the best available API. + * Uses Scheduler API if available, otherwise falls back to Promise microtask. + */ + private scheduleNextBatch(): void { + // Check if Scheduler API is available + const useScheduler = true; + if ( + useScheduler && + typeof window !== 'undefined' && + 'scheduler' in window && + 'postTask' in (window.scheduler as any) + ) { + // Use Scheduler API with background priority for non-urgent work + (window.scheduler as any) + .postTask(() => this.processBatch(), { priority: 'background' }) + .catch(() => { + // Fallback if postTask fails + setTimeout(() => this.processBatch(), 0); + }); + } else { + // Fallback to Promise microtask + setTimeout(() => this.processBatch(), 0); + } + } + + /** + * Processes a single batch of events (or all remaining events if no batchSize). + * This method handles the core grouping logic and schedules itself for the next batch. + */ + private async processBatch(): Promise { + // Check if there are events to process + if (this.lastProcessedEventIndex >= this.allEvents.length - 1) { + this.isProcessing = false; + return; + } + + // Calculate batch boundaries + const batchStart = this.lastProcessedEventIndex + 1; + const batchEnd = + this.batchSize !== undefined && this.batchSize > 0 + ? Math.min(batchStart + this.batchSize, this.allEvents.length) + : this.allEvents.length; + + // Process this batch synchronously using indices (avoids array slicing) + this.groups = this.groupEvents(batchStart, batchEnd, this.groups); + + // After processing new events, try to apply any buffered pending events + // whose groups may now exist + this.applyBufferedPendingEvents(); + + // Move pointer forward + this.lastProcessedEventIndex = batchEnd - 1; + + // Calculate progress + const processedEventsCount = this.lastProcessedEventIndex + 1; + const remainingEventsCount = this.allEvents.length - processedEventsCount; + + // Report progress + this.onChange({ + currentGroups: { ...this.groups }, + processedEventsCount, + remainingEventsCount, + status: remainingEventsCount > 0 ? 'processing' : 'idle', + }); + + // Check if there are more events to process + if (this.lastProcessedEventIndex < this.allEvents.length - 1) { + this.scheduleNextBatch(); + } else { + // All done + this.isProcessing = false; + } + } + + /** + * Groups a batch of new events and updates existing groups. + * Synchronous implementation that processes events immediately. + */ + private groupEvents( + startIndex: number, + endIndex: number, + existingGroups: HistoryEventsGroups + ): HistoryEventsGroups { + const groups = { ...existingGroups }; + + // Process new history events using indices (avoids array slicing) + for (let i = startIndex; i < endIndex; i++) { + const event = this.allEvents[i]; + const groupId = getHistoryEventGroupId(event); + if (!groupId) { + logger.warn( + { + eventId: event.eventId, + eventTime: event.eventTime, + }, + "Couldn't extract groupId from event, check event payload and extraction logic" + ); + continue; + } + + const defaultGroupDetails: Partial = { + events: [], + hasMissingEvents: false, + label: '', + }; + const currentGroup = groups[groupId] || defaultGroupDetails; + const updatedEventsArr = placeEventInGroupEvents( + event, + currentGroup.events + ); + + if (updatedEventsArr.every(isExtendedActivityEvent)) { + groups[groupId] = getActivityGroupFromEvents(updatedEventsArr); + } else if (updatedEventsArr.every(isExtendedDecisionEvent)) { + // If there are more than 2 decision events, filter out the pending decision task start event + // Pending decision task start event is only added to the group when the scheduled decision task event is added + // This logic can be moved later to getDecisionGroupFromEvents + const filteredDecisionEvents = + updatedEventsArr.length > 2 + ? updatedEventsArr.filter( + (e) => + e.attributes !== 'pendingDecisionTaskStartEventAttributes' + ) + : updatedEventsArr; + groups[groupId] = getDecisionGroupFromEvents(filteredDecisionEvents); + } else if (updatedEventsArr.every(isTimerEvent)) { + groups[groupId] = getTimerGroupFromEvents(updatedEventsArr); + } else if (updatedEventsArr.every(isChildWorkflowExecutionEvent)) { + groups[groupId] = + getChildWorkflowExecutionGroupFromEvents(updatedEventsArr); + } else if ( + updatedEventsArr.every(isSignalExternalWorkflowExecutionEvent) + ) { + groups[groupId] = + getSignalExternalWorkflowExecutionGroupFromEvents(updatedEventsArr); + } else if ( + updatedEventsArr.every(isRequestCancelExternalWorkflowExecutionEvent) + ) { + groups[groupId] = + getRequestCancelExternalWorkflowExecutionGroupFromEvents( + updatedEventsArr + ); + } else if (updatedEventsArr.every(isSingleEvent)) { + groups[groupId] = getSingleEventGroupFromEvents(updatedEventsArr); + } else { + logger.warn( + { + eventId: event.eventId, + eventTime: event.eventTime, + events: updatedEventsArr.map(({ eventId, eventTime }) => ({ + eventId, + eventTime, + })), + }, + 'No handler for grouping this event' + ); + } + } + + return groups; + } + + /** + * Adds a pending activity to a group, removing any existing pending activities first. + * Only adds the new pending activity if it has an eventTime. + */ + private addPendingActivityToGroup( + groupId: string, + pendingActivity: PendingActivityTaskStartEvent + ) { + const currentGroup = this.groups[groupId]; + if (currentGroup && currentGroup.events.every(isExtendedActivityEvent)) { + const filteredEvents = currentGroup.events.filter( + (e) => e.attributes !== 'pendingActivityTaskStartEventAttributes' + ) as ExtendedActivityHistoryEvent[]; + + this.groups[groupId] = getActivityGroupFromEvents([ + ...filteredEvents, + pendingActivity as ExtendedActivityHistoryEvent, + ]); + } + } + + /** + * Adds a pending decision to a group, removing any existing pending decision first. + * Only adds if the group has exactly one scheduled event. + */ + private updatePendingDecisionInGroup( + groupId: string, + pendingDecision: PendingDecisionTaskStartEvent | null + ) { + const currentGroup = this.groups[groupId]; + if (currentGroup && currentGroup.events.every(isExtendedDecisionEvent)) { + // Remove any existing pending decision + const filteredEvents = currentGroup.events.filter( + (e) => e.attributes !== 'pendingDecisionTaskStartEventAttributes' + ) as ExtendedDecisionHistoryEvent[]; + + // Only add pending decision if group has exactly one scheduled event + if ( + pendingDecision && + filteredEvents.length === 1 && + filteredEvents[0].attributes === 'decisionTaskScheduledEventAttributes' + ) { + const updatedEventsArr: ExtendedDecisionHistoryEvent[] = [ + ...filteredEvents, + pendingDecision, + ]; + this.groups[groupId] = getDecisionGroupFromEvents(updatedEventsArr); + } else { + // Just update without pending decision + this.groups[groupId] = getDecisionGroupFromEvents(filteredEvents); + } + } + } + + /** + * Updates pending activities and decisions. + */ + private processPendingEvents( + currentPendingActivities: PendingActivityTaskStartEvent[], + newPendingActivities: PendingActivityTaskStartEvent[], + currentPendingDecision: PendingDecisionTaskStartEvent | null, + newPendingDecision: PendingDecisionTaskStartEvent | null + ) { + this.updatePendingActivities( + currentPendingActivities, + newPendingActivities + ); + + this.updatePendingDecision(currentPendingDecision, newPendingDecision); + } + + /** + * Updates pending activities in groups by removing old ones and adding new ones. + * If a group doesn't exist yet, buffers the pending activity until the + * scheduled event arrives. + * Buffer is already cleared before this is called, so we're rebuilding from scratch. + */ + private updatePendingActivities( + currentPendingActivities: PendingActivityTaskStartEvent[], + newPendingActivities: PendingActivityTaskStartEvent[] + ): void { + const existingPendingGroups = new Set( + currentPendingActivities.map((pa) => getHistoryEventGroupId(pa)) + ); + // First, remove all current pending activities from their groups + currentPendingActivities.forEach((pa) => { + const groupId = getHistoryEventGroupId(pa); + if (groupId && existingPendingGroups.has(groupId)) { + const currentGroup = this.groups[groupId]; + if ( + currentGroup && + currentGroup.events.every(isExtendedActivityEvent) + ) { + const filteredEvents = currentGroup.events.filter( + (e) => e.attributes !== 'pendingActivityTaskStartEventAttributes' + ); + + this.groups[groupId] = getActivityGroupFromEvents(filteredEvents); + } + } + }); + + // Then, add all new pending activities to their groups (or buffer them) + newPendingActivities.forEach((pa) => { + const groupId = getHistoryEventGroupId(pa); + if (!groupId) { + logger.warn( + { + computedEventId: pa.computedEventId, + eventTime: pa.eventTime, + }, + "Couldn't extract groupId from pending activity event" + ); + return; + } + + if (this.groups[groupId]) { + this.addPendingActivityToGroup(groupId, pa); + } else { + this.bufferedPendingActivities.push(pa); + } + }); + } + + /** + * Adds the current pending decision to groups. + * If the group doesn't exist yet, buffers the pending decision until the + * scheduled event arrives. + * Buffer was cleared before this is called, so we're rebuilding from scratch. + */ + private updatePendingDecision( + currentPendingDecision: PendingDecisionTaskStartEvent | null, + newPendingDecision: PendingDecisionTaskStartEvent | null + ): void { + // Remove old pending decision from its group (if exists) + if (currentPendingDecision) { + const groupId = getHistoryEventGroupId(currentPendingDecision); + if (groupId) { + this.updatePendingDecisionInGroup(groupId, null); + } + } + + // Add new pending decision (to group or buffer) + if (newPendingDecision) { + const groupId = getHistoryEventGroupId(newPendingDecision); + if (!groupId) { + logger.warn( + { + computedEventId: newPendingDecision.computedEventId, + eventTime: newPendingDecision.eventTime, + }, + "Couldn't extract groupId from pending decision event" + ); + return; + } + + if (this.groups[groupId]) { + this.updatePendingDecisionInGroup(groupId, newPendingDecision); + } else { + this.bufferedPendingDecision = newPendingDecision; + } + } + } + + /** + * Applies buffered pending events to groups when their scheduled events arrive. + * This is called after processing new events to merge any pending events + * that were waiting for their groups to be created. + */ + private applyBufferedPendingEvents(): void { + // Apply buffered pending activities + const activitiesToKeepBuffered: PendingActivityTaskStartEvent[] = []; + + this.bufferedPendingActivities.forEach((activity) => { + const groupId = getHistoryEventGroupId(activity); + if (groupId && this.groups[groupId]) { + this.addPendingActivityToGroup(groupId, activity); + } else { + activitiesToKeepBuffered.push(activity); + } + }); + + this.bufferedPendingActivities = activitiesToKeepBuffered; + + // Apply buffered pending decision + if (this.bufferedPendingDecision) { + const groupId = getHistoryEventGroupId(this.bufferedPendingDecision); + if (groupId) { + // Try to add to existing group using helper + if (this.groups[groupId]) { + this.updatePendingDecisionInGroup( + groupId, + this.bufferedPendingDecision + ); + this.bufferedPendingDecision = null; + } + } + } + } +} diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts new file mode 100644 index 000000000..1674624dd --- /dev/null +++ b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts @@ -0,0 +1,57 @@ +import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent'; + +import type { + HistoryEventsGroups, + PendingActivityTaskStartEvent, + PendingDecisionTaskStartEvent, +} from '../workflow-history.types'; + +export type ProcessEventsParams = { + pendingStartActivities: PendingActivityTaskStartEvent[]; + pendingStartDecision: PendingDecisionTaskStartEvent | null; +}; + +export type ProcessEventsResult = { + groups: HistoryEventsGroups; + lastProcessedEventIndex: number; +}; + +/** + * Processing status for incremental grouping operations. + */ +export type ProcessingStatus = 'idle' | 'processing' | 'error'; + +/** + * State snapshot of the grouping process. + */ +export type GroupingProcessState = { + /** Current groups accumulated so far */ + currentGroups: HistoryEventsGroups; + /** Number of events that have been successfully processed since the grouper was created/reset */ + processedEventsCount: number; + /** Number of events that are still pending (not yet processed) */ + remainingEventsCount: number; + /** Current processing status */ + status: ProcessingStatus; +}; + +/** + * Callback invoked when grouping state changes. + */ +export type GroupingStateChangeCallback = (state: GroupingProcessState) => void; + +export type Props = { + /** + * Callback invoked when grouping state changes. + * Provides real-time updates on processing progress. + * Required to receive state updates. + */ + onChange: GroupingStateChangeCallback; + + /** + * Batch size for incremental processing. + * If specified, events will be processed in batches to allow progress updates. + * If not specified, all events are processed at once. + */ + batchSize?: number; +}; From ca4f4323963107a1b59d49d5836a39d052c6bb89 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Wed, 5 Nov 2025 23:29:48 +0100 Subject: [PATCH 2/9] update test cases Signed-off-by: Assem Hafez --- .../workflow-history-pending-events.ts | 54 + .../workflow-history-grouper.test.tsx | 1164 ++++++++--------- .../helpers/workflow-history-grouper.ts | 7 +- .../helpers/workflow-history-grouper.types.ts | 2 +- 4 files changed, 604 insertions(+), 623 deletions(-) diff --git a/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts b/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts index d08e326de..646bad461 100644 --- a/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts +++ b/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts @@ -3,6 +3,9 @@ import type { PendingDecisionTaskStartEvent, } from '../workflow-history.types'; +import { scheduleActivityTaskEvent } from './workflow-history-activity-events'; +import { scheduleDecisionTaskEvent } from './workflow-history-decision-events'; + export const pendingActivityTaskStartEvent = { eventId: null, computedEventId: 'pending-7', @@ -94,3 +97,54 @@ export const pendingDecisionTaskStartEventWithStartedState = { }, }, } as const satisfies PendingDecisionTaskStartEvent; + +// Factory functions for creating test data dynamically + +export function createPendingActivity( + scheduleId: string, + options?: { activityId?: string } +): PendingActivityTaskStartEvent { + return { + ...pendingActivityTaskStartEvent, + computedEventId: `pending-${scheduleId}`, + pendingActivityTaskStartEventAttributes: { + ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, + scheduleId, + ...(options?.activityId && { activityId: options.activityId }), + }, + } as PendingActivityTaskStartEvent; +} + +export function createPendingDecision( + scheduleId: string +): PendingDecisionTaskStartEvent { + return { + ...pendingDecisionTaskStartEvent, + computedEventId: `pending-${scheduleId}`, + pendingDecisionTaskStartEventAttributes: { + ...pendingDecisionTaskStartEvent.pendingDecisionTaskStartEventAttributes, + scheduleId, + }, + } as PendingDecisionTaskStartEvent; +} + +export function createScheduleActivityEvent( + eventId: string, + options?: { activityId?: string } +) { + return { + ...scheduleActivityTaskEvent, + eventId, + activityTaskScheduledEventAttributes: { + ...scheduleActivityTaskEvent.activityTaskScheduledEventAttributes, + ...(options?.activityId && { activityId: options.activityId }), + }, + }; +} + +export function createScheduleDecisionEvent(eventId: string) { + return { + ...scheduleDecisionTaskEvent, + eventId, + }; +} diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx index 706a7e99f..231644afb 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -1,718 +1,648 @@ -import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent'; - import { completedActivityTaskEvents, - scheduleActivityTaskEvent, startActivityTaskEvent, } from '../../__fixtures__/workflow-history-activity-events'; +import { startDecisionTaskEvent } from '../../__fixtures__/workflow-history-decision-events'; import { - completedDecisionTaskEvents, - scheduleDecisionTaskEvent, - startDecisionTaskEvent, -} from '../../__fixtures__/workflow-history-decision-events'; -import { + createPendingActivity, + createPendingDecision, + createScheduleActivityEvent, + createScheduleDecisionEvent, pendingActivityTaskStartEvent, - pendingDecisionTaskStartEvent, } from '../../__fixtures__/workflow-history-pending-events'; -import type { - ActivityHistoryGroup, - PendingActivityTaskStartEvent, - PendingDecisionTaskStartEvent, -} from '../../workflow-history.types'; import WorkflowHistoryGrouper from '../workflow-history-grouper'; -import type { Props } from '../workflow-history-grouper.types'; - -// Create pending decision that matches the scheduleDecisionTaskEvent (eventId: '2') -const pendingDecisionForScheduledEvent = { - ...pendingDecisionTaskStartEvent, - computedEventId: 'pending-2', - pendingDecisionTaskStartEventAttributes: { - ...pendingDecisionTaskStartEvent.pendingDecisionTaskStartEventAttributes, - scheduleId: '2', - }, -} as const satisfies PendingDecisionTaskStartEvent; - -// Helper to create a grouper with a mock onChange -function createGrouper(options: Partial = {}) { - const onChange = jest.fn(); - const grouper = new WorkflowHistoryGrouper({ - onChange, - ...options, - }); - return { grouper, onChange }; -} +import type { + GroupingStateChangeCallback, + Props, +} from '../workflow-history-grouper.types'; -// Helper to wait for processing to complete -function waitForProcessing(onChange: jest.Mock, timeout = 5000): Promise { - return new Promise((resolve, reject) => { - const startTime = Date.now(); - - const checkComplete = () => { - // Check if onChange was called with status 'idle' or 'error' - const lastCall = onChange.mock.calls[onChange.mock.calls.length - 1]; - if ( - lastCall && - (lastCall[0].status === 'idle' || lastCall[0].status === 'error') - ) { - resolve(); - return; - } +// Commonly used mocks - // Check timeout - if (Date.now() - startTime > timeout) { - reject(new Error('Timeout waiting for processing to complete')); - return; - } +// Track all setups for cleanup +const allCleanups: Array<() => void> = []; - // Check again soon - setTimeout(checkComplete, 10); - }; +describe(WorkflowHistoryGrouper.name, () => { + afterEach(async () => { + // Clean up any pending timeouts from all tests + allCleanups.forEach((cleanup) => cleanup()); + allCleanups.length = 0; - // Start checking after a short delay to let updateEvents kick off - setTimeout(checkComplete, 10); + // Give time for any pending async operations to complete + await new Promise((resolve) => setTimeout(resolve, 10)); }); -} -describe(WorkflowHistoryGrouper.name, () => { - describe('basic event processing', () => { - it('should process events and create groups', async () => { - const { grouper, onChange } = createGrouper(); - - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); - - const groups = grouper.getGroups(); - expect(groups).toBeDefined(); - expect(groups['7']).toBeDefined(); - expect(groups['7'].groupType).toBe('Activity'); - expect(grouper.getLastProcessedEventIndex()).toBe(2); - }); + it('should process events and create groups', async () => { + const { grouper, waitForProcessing } = setup(); - it('should not reprocess events on subsequent calls with same events', async () => { - const { grouper, onChange } = createGrouper(); + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); - // First call - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + const groups = grouper.getGroups(); + expect(groups).toBeDefined(); + expect(groups['7']).toBeDefined(); + expect(groups['7'].groupType).toBe('Activity'); + expect(grouper.getLastProcessedEventIndex()).toBe(2); + }); - const initialGroups = grouper.getGroups(); - const initialIndex = grouper.getLastProcessedEventIndex(); + it('should have getLastProcessedEventIndex pointing to the last processed event', async () => { + const { grouper, waitForProcessing } = setup(); + expect(grouper.getLastProcessedEventIndex()).toBe(-1); - // Second call with same events - should not trigger processing - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - // Give it a moment, but don't wait for onChange since nothing should happen - await new Promise((resolve) => setTimeout(resolve, 50)); + // First call with partial events + grouper.updateEvents([ + completedActivityTaskEvents[0], + completedActivityTaskEvents[1], + ]); + await waitForProcessing(); - expect(grouper.getGroups()).toEqual(initialGroups); - expect(grouper.getLastProcessedEventIndex()).toBe(initialIndex); - }); + expect(grouper.getLastProcessedEventIndex()).toBe(1); - it('should process only new events on subsequent calls', async () => { - const { grouper, onChange } = createGrouper(); + // Second call with all events + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); - // First call with partial events - grouper.updateEvents([ - scheduleActivityTaskEvent, - startActivityTaskEvent, - ] as HistoryEvent[]); - await waitForProcessing(onChange); + expect(grouper.getLastProcessedEventIndex()).toBe(2); + }); - expect(grouper.getLastProcessedEventIndex()).toBe(1); + it('should add new pending activities to groups', async () => { + const { grouper, waitForProcessing } = setup(); - // Second call with all events - onChange.mockClear(); - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + // First call with scheduled event only + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); - const groups = grouper.getGroups(); - expect(grouper.getLastProcessedEventIndex()).toBe(2); - expect(groups['7']).toBeDefined(); - expect((groups['7'] as ActivityHistoryGroup).events).toHaveLength(3); + // Update with pending activity + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); + + const groups = grouper.getGroups(); + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); }); - describe('pending activities management', () => { - it('should add new pending activities to groups', async () => { - const { grouper, onChange } = createGrouper(); - - // First call with scheduled event only - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Update with pending activity - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - const groups = grouper.getGroups(); - const activityGroup = groups['7'] as ActivityHistoryGroup; - expect(activityGroup.events).toHaveLength(2); - expect(activityGroup.events[1].attributes).toBe( - 'pendingActivityTaskStartEventAttributes' - ); + it('should add new pending decision to groups', async () => { + const { grouper, waitForProcessing } = setup(); + + // First call with scheduled event only + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); + + // Add pending decision + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), }); - it('should remove stale pending activities from groups', async () => { - const { grouper, onChange } = createGrouper(); - - // First call with pending activity - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - const firstGroups = grouper.getGroups(); - const firstActivityGroup = firstGroups['7'] as ActivityHistoryGroup; - expect(firstActivityGroup.events).toHaveLength(2); - - // Second call without pending activity (it completed) - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: null, - }); - - const groups = grouper.getGroups(); - const activityGroup = groups['7'] as ActivityHistoryGroup; - expect(activityGroup.events).toHaveLength(1); - expect(activityGroup.events[0].attributes).toBe( - 'activityTaskScheduledEventAttributes' - ); + const decisionGroup = grouper.getGroups()['2']; + expect(decisionGroup.groupType).toBe('Decision'); + expect(decisionGroup.events).toHaveLength(2); + }); + + it('should remove stale pending activities from groups', async () => { + const { grouper, waitForProcessing } = setup(); + + // First call with pending activity + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); - it('should handle multiple pending activity state transitions', async () => { - const { grouper, onChange } = createGrouper(); - - // Initial state - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Add pending activity - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // Remove pending activity (it started) - onChange.mockClear(); - grouper.updateEvents([ - scheduleActivityTaskEvent, - startActivityTaskEvent, - ] as HistoryEvent[]); - await waitForProcessing(onChange); - - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: null, - }); - - const activityGroup = grouper.getGroups()['7'] as ActivityHistoryGroup; - expect(activityGroup.events).toHaveLength(2); - expect( - activityGroup.events.some( - (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' - ) - ).toBe(false); + const firstGroups = grouper.getGroups(); + const firstActivityGroup = firstGroups['7']; + expect(firstActivityGroup.events).toHaveLength(2); + + // Second call without pending activity (it completed) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, }); + + const groups = grouper.getGroups(); + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(1); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); }); - describe('pending decision management', () => { - it('should add new pending decision to groups', async () => { - const { grouper, onChange } = createGrouper(); + it('should remove stale pending decision from groups', async () => { + const { grouper, waitForProcessing } = setup(); - // First call with scheduled event only - grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); + // First call with pending decision + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); - // Add pending decision - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecisionForScheduledEvent, - }); + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + const firstGroups = grouper.getGroups(); + expect(firstGroups['2'].events).toHaveLength(2); - const decisionGroup = grouper.getGroups()['2']; - expect(decisionGroup.groupType).toBe('Decision'); - expect(decisionGroup.events).toHaveLength(2); + // Second call without pending decision (it completed) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, }); - it('should remove stale pending decision from groups', async () => { - const { grouper, onChange } = createGrouper(); + const decisionGroup = grouper.getGroups()['2']; + expect(decisionGroup.events).toHaveLength(1); + }); - // First call with pending decision - grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); + it('should handle multiple pending activity state transitions', async () => { + const { grouper, waitForProcessing } = setup(); - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecisionForScheduledEvent, - }); + // Initial state + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); - const firstGroups = grouper.getGroups(); - expect(firstGroups['2'].events).toHaveLength(2); + // Add pending activity + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); - // Second call without pending decision (it completed) - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: null, - }); + // Remove pending activity (it started) + grouper.updateEvents([ + createScheduleActivityEvent('7'), + startActivityTaskEvent, + ]); + await waitForProcessing(); - const decisionGroup = grouper.getGroups()['2']; - expect(decisionGroup.events).toHaveLength(1); + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, }); + + const activityGroup = grouper.getGroups()['7']; + expect(activityGroup.events).toHaveLength(2); + expect( + activityGroup.events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(false); }); - describe('state management', () => { - it('should track last processed event index correctly', () => { - const { grouper } = createGrouper(); + it('should return current groups without processing', () => { + const { grouper } = setup(); - expect(grouper.getLastProcessedEventIndex()).toBe(-1); - }); + const groups = grouper.getGroups(); - it('should return current groups without processing', () => { - const { grouper } = createGrouper(); + expect(groups).toEqual({}); + }); - const groups = grouper.getGroups(); + it('should reset grouper state', async () => { + const { grouper, waitForProcessing } = setup(); - expect(groups).toEqual({}); - }); + // Process some events + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); - it('should reset grouper state', async () => { - const { grouper, onChange } = createGrouper(); + expect(grouper.getLastProcessedEventIndex()).toBe(2); + expect(Object.keys(grouper.getGroups()).length).toBeGreaterThan(0); - // Process some events - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + // Reset + grouper.reset(); - expect(grouper.getLastProcessedEventIndex()).toBe(2); - expect(Object.keys(grouper.getGroups()).length).toBeGreaterThan(0); + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + expect(grouper.getGroups()).toEqual({}); + }); - // Reset - grouper.reset(); + it('should reprocess events after reset', async () => { + const { grouper, waitForProcessing } = setup(); - expect(grouper.getLastProcessedEventIndex()).toBe(-1); - expect(grouper.getGroups()).toEqual({}); - }); + // Process events + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); - it('should reprocess events after reset', async () => { - const { grouper, onChange } = createGrouper(); + const firstGroups = grouper.getGroups(); - // Process events - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + // Reset and reprocess + grouper.reset(); + expect(grouper.getGroups()).toEqual({}); - const firstGroups = grouper.getGroups(); + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); - // Reset and reprocess - grouper.reset(); - onChange.mockClear(); - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + expect(grouper.getGroups()).toEqual(firstGroups); + }); - expect(grouper.getGroups()).toEqual(firstGroups); + it('should buffer pending activity when group does not exist yet', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add pending activity BEFORE scheduled event exists + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); + + // Group should NOT exist yet (pending event is buffered) + let groups = grouper.getGroups(); + expect(groups['7']).toBeUndefined(); + + // Now add the scheduled event + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Group should now exist with both scheduled and pending events + groups = grouper.getGroups(); + const activityGroup = groups['7']; + expect(activityGroup).toBeDefined(); + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); }); - describe('pending event buffering', () => { - it('should buffer pending activity when group does not exist yet', async () => { - const { grouper, onChange } = createGrouper(); - - // Add pending activity BEFORE scheduled event exists - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // Group should NOT exist yet (pending event is buffered) - let groups = grouper.getGroups(); - expect(groups['7']).toBeUndefined(); - - // Now add the scheduled event - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Group should now exist with both scheduled and pending events - groups = grouper.getGroups(); - const activityGroup = groups['7'] as ActivityHistoryGroup; - expect(activityGroup).toBeDefined(); - expect(activityGroup.events).toHaveLength(2); - expect(activityGroup.events[0].attributes).toBe( - 'activityTaskScheduledEventAttributes' - ); - expect(activityGroup.events[1].attributes).toBe( - 'pendingActivityTaskStartEventAttributes' - ); - }); + it('should buffer pending decision when group does not exist yet', async () => { + const { grouper, waitForProcessing } = setup(); - it('should buffer pending decision when group does not exist yet', async () => { - const { grouper, onChange } = createGrouper(); - - // Add pending decision BEFORE scheduled event exists - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecisionForScheduledEvent, - }); - - // Group should NOT exist yet (pending event is buffered) - let groups = grouper.getGroups(); - expect(groups['2']).toBeUndefined(); - - // Now add the scheduled event - grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Group should now exist with both scheduled and pending events - groups = grouper.getGroups(); - const decisionGroup = groups['2']; - expect(decisionGroup).toBeDefined(); - expect(decisionGroup.groupType).toBe('Decision'); - expect(decisionGroup.events).toHaveLength(2); + // Add pending decision BEFORE scheduled event exists + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), }); - it('should handle multiple buffered pending activities', async () => { - const { grouper, onChange } = createGrouper(); - - const pendingActivity1 = { - ...pendingActivityTaskStartEvent, - computedEventId: 'pending-7', - pendingActivityTaskStartEventAttributes: { - ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, - scheduleId: '7', - }, - } as const satisfies PendingActivityTaskStartEvent; - - const pendingActivity2 = { - ...pendingActivityTaskStartEvent, - computedEventId: 'pending-10', - pendingActivityTaskStartEventAttributes: { - ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, - scheduleId: '10', - activityId: '1', - }, - } as const satisfies PendingActivityTaskStartEvent; - - const scheduleEvent2 = { - ...scheduleActivityTaskEvent, - eventId: '10', - activityTaskScheduledEventAttributes: { - ...scheduleActivityTaskEvent.activityTaskScheduledEventAttributes, - activityId: '1', - }, - }; - - // Add multiple pending activities BEFORE their scheduled events - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivity1, pendingActivity2], - pendingStartDecision: null, - }); - - // No groups should exist yet - expect(Object.keys(grouper.getGroups()).length).toBe(0); - - // Add first scheduled event - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // First group should now exist - let groups = grouper.getGroups(); - expect(groups['7']).toBeDefined(); - expect(groups['10']).toBeUndefined(); - - // Add second scheduled event - onChange.mockClear(); - grouper.updateEvents([ - scheduleActivityTaskEvent, - scheduleEvent2, - ] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Both groups should now exist - groups = grouper.getGroups(); - expect(groups['7']).toBeDefined(); - expect(groups['10']).toBeDefined(); - expect((groups['7'] as ActivityHistoryGroup).events).toHaveLength(2); - expect((groups['10'] as ActivityHistoryGroup).events).toHaveLength(2); - }); + // Group should NOT exist yet (pending event is buffered) + let groups = grouper.getGroups(); + expect(groups['2']).toBeUndefined(); + + // Now add the scheduled event + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); - it('should clear buffer when pending events are updated', async () => { - const { grouper } = createGrouper(); - - const pendingActivity1 = { - ...pendingActivityTaskStartEvent, - computedEventId: 'pending-7', - } as const satisfies PendingActivityTaskStartEvent; - - const pendingActivity2 = { - ...pendingActivityTaskStartEvent, - computedEventId: 'pending-10', - pendingActivityTaskStartEventAttributes: { - ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, - scheduleId: '10', - }, - } as const satisfies PendingActivityTaskStartEvent; - - // Add pending activity that won't have a group - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivity1], - pendingStartDecision: null, - }); - - // Update with different pending activity (old one should be removed from buffer) - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivity2], - pendingStartDecision: null, - }); - - // No groups should exist - expect(Object.keys(grouper.getGroups()).length).toBe(0); + // Group should now exist with both scheduled and pending events + groups = grouper.getGroups(); + const decisionGroup = groups['2']; + expect(decisionGroup).toBeDefined(); + expect(decisionGroup.groupType).toBe('Decision'); + expect(decisionGroup.events).toHaveLength(2); + }); + + it('should handle multiple buffered pending activities', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add multiple pending activities BEFORE their scheduled events + await grouper.updatePendingEvents({ + pendingStartActivities: [ + createPendingActivity('7'), + createPendingActivity('10', { activityId: '1' }), + ], + pendingStartDecision: null, }); - it('should clear buffer on reset', async () => { - const { grouper, onChange } = createGrouper(); - - // Add pending activity without scheduled event (will be buffered) - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // Reset the grouper - grouper.reset(); - - // Add scheduled event after reset - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Group should only have scheduled event (buffered pending was cleared) - const groups = grouper.getGroups(); - const activityGroup = groups['7'] as ActivityHistoryGroup; - expect(activityGroup.events).toHaveLength(1); - expect(activityGroup.events[0].attributes).toBe( - 'activityTaskScheduledEventAttributes' - ); + // No groups should exist yet + expect(Object.keys(grouper.getGroups()).length).toBe(0); + + // Add first scheduled event + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // First group should now exist + let groups = grouper.getGroups(); + expect(groups['7']).toBeDefined(); + expect(groups['10']).toBeUndefined(); + + // Add second scheduled event + grouper.updateEvents([ + createScheduleActivityEvent('7'), + createScheduleActivityEvent('10', { activityId: '1' }), + ]); + await waitForProcessing(); + + // Both groups should now exist + groups = grouper.getGroups(); + expect(groups['7']).toBeDefined(); + expect(groups['10']).toBeDefined(); + expect(groups['7'].events).toHaveLength(2); + expect(groups['10'].events).toHaveLength(2); + }); + + it('should clear pending activities buffer when pending events are updated', async () => { + const { grouper, waitForProcessing } = setup(); + + // Buffer first pending activity for scheduleId: '7' + await grouper.updatePendingEvents({ + pendingStartActivities: [createPendingActivity('7')], + pendingStartDecision: null, }); - it('should apply buffered pending events after updatePendingEvents if groups now exist', async () => { - const { grouper, onChange } = createGrouper(); - - // Add pending activity BEFORE scheduled event (will be buffered) - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // No group yet - expect(grouper.getGroups()['7']).toBeUndefined(); - - // Process scheduled event - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Call updatePendingEvents again with same pending activity - // This should trigger applyBufferedPendingEvents and merge the buffered event - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // Group should now have both events - const groups = grouper.getGroups(); - const activityGroup = groups['7'] as ActivityHistoryGroup; - expect(activityGroup.events).toHaveLength(2); + // Update with different pending activity for scheduleId: '10' (old one should be removed from buffer) + await grouper.updatePendingEvents({ + pendingStartActivities: [ + createPendingActivity('10', { activityId: '1' }), + ], + pendingStartDecision: null, }); - it('should handle scenario where scheduled event arrives after pending event update', async () => { - const { grouper, onChange } = createGrouper(); - - // Step 1: Pending activity arrives first (buffered) - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // Step 2: Scheduled event arrives - grouper.updateEvents([scheduleActivityTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Step 3: Another updatePendingEvents call (maybe with different pending events) - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); - - // Should have complete group with both events - const groups = grouper.getGroups(); - const activityGroup = groups['7'] as ActivityHistoryGroup; - expect(activityGroup).toBeDefined(); - expect(activityGroup.events).toHaveLength(2); - expect(activityGroup.events[0].attributes).toBe( - 'activityTaskScheduledEventAttributes' - ); - expect(activityGroup.events[1].attributes).toBe( - 'pendingActivityTaskStartEventAttributes' - ); + // No groups should exist yet (still buffered) + expect(Object.keys(grouper.getGroups()).length).toBe(0); + + // Now add scheduled events for both activities + grouper.updateEvents([ + createScheduleActivityEvent('7'), // scheduleId: '7' + createScheduleActivityEvent('10', { activityId: '1' }), // scheduleId: '10' + ]); + await waitForProcessing(); + + const groups = grouper.getGroups(); + + // Group '7' should only have scheduled event (pending was cleared from buffer) + expect(groups['7']).toBeDefined(); + expect(groups['7'].events).toHaveLength(1); + expect( + groups['7'].events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(false); + + // Group '10' should have both scheduled and pending events (current pending in buffer) + expect(groups['10']).toBeDefined(); + expect(groups['10'].events).toHaveLength(2); + expect( + groups['10'].events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(true); + }); + + it('should clear buffer on reset', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add pending activity without scheduled event (will be buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); - it('should not create incomplete groups when pending arrives before scheduled', async () => { - const { grouper } = createGrouper(); + // Reset the grouper + grouper.reset(); + + // Add scheduled event after reset + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Group should only have scheduled event (buffered pending was cleared) + const groups = grouper.getGroups(); + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(1); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + }); - // Only add pending activity (no scheduled event) - await grouper.updatePendingEvents({ - pendingStartActivities: [pendingActivityTaskStartEvent], - pendingStartDecision: null, - }); + it('should apply buffered pending events after updatePendingEvents if groups now exist', async () => { + const { grouper, waitForProcessing } = setup(); - // Group should NOT exist in the UI - const groups = grouper.getGroups(); - expect(groups['7']).toBeUndefined(); - expect(Object.keys(groups).length).toBe(0); + // Add pending activity BEFORE scheduled event (will be buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); - it('should handle pending decision buffer clearing when decision changes', async () => { - const { grouper } = createGrouper(); - - const pendingDecision1 = { - ...pendingDecisionTaskStartEvent, - computedEventId: 'pending-7', - } as const satisfies PendingDecisionTaskStartEvent; - - const pendingDecision2 = { - ...pendingDecisionTaskStartEvent, - computedEventId: 'pending-10', - pendingDecisionTaskStartEventAttributes: { - ...pendingDecisionTaskStartEvent.pendingDecisionTaskStartEventAttributes, - scheduleId: '10', - }, - } as const satisfies PendingDecisionTaskStartEvent; - - // Buffer first decision - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecision1, - }); - - // Update with different decision (old one should be removed from buffer) - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecision2, - }); - - // No groups should exist - expect(Object.keys(grouper.getGroups()).length).toBe(0); + // No group yet + expect(grouper.getGroups()['7']).toBeUndefined(); + + // Process scheduled event + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Call updatePendingEvents again with same pending activity + // This should trigger applyBufferedPendingEvents and merge the buffered event + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); + + // Group should now have both events + const groups = grouper.getGroups(); + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(2); }); - describe('decision group filtering', () => { - it('should filter out pending decision when decision group has more than 2 events', async () => { - const { grouper, onChange } = createGrouper(); - - // Add scheduled event and pending decision - grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecisionForScheduledEvent, - }); - - // Group should have 2 events (scheduled + pending) - let groups = grouper.getGroups(); - expect(groups['2'].events).toHaveLength(2); - - // Now add started event (makes it 3 events total) - onChange.mockClear(); - grouper.updateEvents([ - scheduleDecisionTaskEvent, - startDecisionTaskEvent, - ] as HistoryEvent[]); - await waitForProcessing(onChange); - - // Pending decision should be filtered out when there are more than 2 events - groups = grouper.getGroups(); - expect(groups['2'].events).toHaveLength(2); - expect( - groups['2'].events.some( - (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' - ) - ).toBe(false); - - // Add completed event (makes it 3+ events) - onChange.mockClear(); - grouper.updateEvents(completedDecisionTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); - - // Still should not have pending decision - groups = grouper.getGroups(); - expect(groups['2'].events).toHaveLength(3); - expect( - groups['2'].events.some( - (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' - ) - ).toBe(false); + it('should handle scenario where scheduled event arrives after pending event update', async () => { + const { grouper, waitForProcessing } = setup(); + + // Step 1: Pending activity arrives first (buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); - it('should keep pending decision when decision group has exactly 2 events', async () => { - const { grouper, onChange } = createGrouper(); - - // Add scheduled event and pending decision - grouper.updateEvents([scheduleDecisionTaskEvent] as HistoryEvent[]); - await waitForProcessing(onChange); - - await grouper.updatePendingEvents({ - pendingStartActivities: [], - pendingStartDecision: pendingDecisionForScheduledEvent, - }); - - // Group should have 2 events (scheduled + pending) - const groups = grouper.getGroups(); - expect(groups['2'].events).toHaveLength(2); - expect( - groups['2'].events.some( - (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' - ) - ).toBe(true); + // Step 2: Scheduled event arrives + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Should have complete group with both events + const groups = grouper.getGroups(); + const activityGroup = groups['7']; + expect(activityGroup).toBeDefined(); + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should not create incomplete groups when pending arrives before scheduled', async () => { + const { grouper } = setup(); + + // Only add pending activity (no scheduled event) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, }); + + // Group should NOT exist in the UI + const groups = grouper.getGroups(); + expect(groups['7']).toBeUndefined(); + expect(Object.keys(groups).length).toBe(0); }); - describe('groups shallow copy in onChange', () => { - it('should return shallow copy of groups object in onChange callback', async () => { - const { grouper, onChange } = createGrouper(); + it('should handle pending decision buffer clearing when decision changes', async () => { + const { grouper, waitForProcessing } = setup(); - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + // Buffer first decision for scheduleId: '2' + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); - // Get groups from onChange callback - const lastCall = onChange.mock.calls[onChange.mock.calls.length - 1]; - const groupsFromCallback = lastCall[0].currentGroups; + // Update with different decision for scheduleId: '10' (old one should be removed from buffer) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('10'), + }); - // Try to add a new group to the callback's groups object - groupsFromCallback['999'] = groupsFromCallback['7']; + // No groups should exist yet (still buffered) + expect(Object.keys(grouper.getGroups()).length).toBe(0); + + // Now add scheduled events for both decisions + grouper.updateEvents([ + createScheduleDecisionEvent('2'), // scheduleId: '2' + createScheduleDecisionEvent('10'), // scheduleId: '10' + ]); + await waitForProcessing(); + + const groups = grouper.getGroups(); + + // Group '2' should only have scheduled event (pending was cleared from buffer) + expect(groups['2']).toBeDefined(); + expect(groups['2'].events).toHaveLength(1); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + + // Group '10' should have both scheduled and pending events (current pending in buffer) + expect(groups['10']).toBeDefined(); + expect(groups['10'].events).toHaveLength(2); + expect( + groups['10'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(true); + }); + + it('should filter out pending decision when decision group has more than 2 events', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add scheduled event and pending decision + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); - // Internal groups should not have the new group (shallow copy protects object structure) - const internalGroups = grouper.getGroups(); - expect(internalGroups['999']).toBeUndefined(); + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), }); - it('should allow modification of group properties (shallow copy limitation)', async () => { - const { grouper, onChange } = createGrouper(); + // Group should have 2 events (scheduled + pending) + let groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + + // Now add started event (makes it 3 events total) + grouper.updateEvents([ + createScheduleDecisionEvent('2'), + startDecisionTaskEvent, + ]); + await waitForProcessing(); + + // Pending decision should be filtered out when there are more than 2 events + groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + + // even if pending event is updated again, it should not be added to the group + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); - grouper.updateEvents(completedActivityTaskEvents as HistoryEvent[]); - await waitForProcessing(onChange); + groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + }); - // Get groups from onChange callback - const lastCall = onChange.mock.calls[onChange.mock.calls.length - 1]; - const groupsFromCallback = lastCall[0].currentGroups; - const originalLabel = groupsFromCallback['7'].label; + it('should keep pending decision when decision group has exactly 2 events', async () => { + const { grouper, waitForProcessing } = setup(); - // Modify a group's property - this WILL affect internal state (shallow copy limitation) - groupsFromCallback['7'].label = 'Modified Label'; + // Add scheduled event and pending decision + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); - // Internal groups ARE modified since group objects are shared references - const internalGroups = grouper.getGroups(); - expect(internalGroups['7'].label).toBe('Modified Label'); - expect(internalGroups['7'].label).not.toBe(originalLabel); + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), }); + + // Group should have 2 events (scheduled + pending) + const groups = grouper.getGroups(); + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(true); }); }); + +function setup(options: Partial = {}) { + // Queue of promise resolvers/rejecters waiting for processing to complete + const pendingResolvers: Array<{ + resolve: () => void; + reject: (error: Error) => void; + timeoutId: NodeJS.Timeout; + }> = []; + + // Create onChange mock that resolves pending promises when processing completes + const onChange: jest.MockedFunction = jest.fn( + (state) => { + if (state.status === 'idle') { + // Resolve all pending promises at once + pendingResolvers.forEach(({ timeoutId, resolve }) => { + clearTimeout(timeoutId); + resolve(); + }); + pendingResolvers.length = 0; + } + } + ); + + // Create grouper with onChange and any additional options + const grouper = new WorkflowHistoryGrouper({ + onChange, + ...options, + }); + + // Helper function to wait for next processing cycle + const waitForProcessing = async (timeout = 1000): Promise => { + await new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + // Remove this resolver from queue if it times out + const index = pendingResolvers.findIndex( + (r) => r.timeoutId === timeoutId + ); + if (index !== -1) { + pendingResolvers.splice(index, 1); + } + reject(new Error('Timeout waiting for processing to complete')); + }, timeout); + + pendingResolvers.push({ resolve, reject, timeoutId }); + }); + }; + + // Cleanup function to clear any pending timeouts + const cleanup = () => { + pendingResolvers.forEach(({ timeoutId }) => clearTimeout(timeoutId)); + pendingResolvers.length = 0; + }; + + // Register cleanup automatically + allCleanups.push(cleanup); + + return { grouper, onChange, waitForProcessing }; +} diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index 51cb2638b..4e542efcb 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -161,11 +161,8 @@ export default class WorkflowHistoryGrouper { * Schedules the next batch using the best available API. * Uses Scheduler API if available, otherwise falls back to Promise microtask. */ - private scheduleNextBatch(): void { - // Check if Scheduler API is available - const useScheduler = true; + private scheduleNextBatch() { if ( - useScheduler && typeof window !== 'undefined' && 'scheduler' in window && 'postTask' in (window.scheduler as any) @@ -187,7 +184,7 @@ export default class WorkflowHistoryGrouper { * Processes a single batch of events (or all remaining events if no batchSize). * This method handles the core grouping logic and schedules itself for the next batch. */ - private async processBatch(): Promise { + private processBatch(): void { // Check if there are events to process if (this.lastProcessedEventIndex >= this.allEvents.length - 1) { this.isProcessing = false; diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts index 1674624dd..8c075cd3d 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts @@ -19,7 +19,7 @@ export type ProcessEventsResult = { /** * Processing status for incremental grouping operations. */ -export type ProcessingStatus = 'idle' | 'processing' | 'error'; +export type ProcessingStatus = 'idle' | 'processing'; /** * State snapshot of the grouping process. From af7836654ffb02806a03cc1400689db0b26c37f1 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Thu, 6 Nov 2025 00:46:49 +0100 Subject: [PATCH 3/9] change the api of onChange and add destroy method Signed-off-by: Assem Hafez --- .../workflow-history-grouper.test.tsx | 47 ++++++++++++++----- .../helpers/workflow-history-grouper.ts | 33 ++++++++++--- .../helpers/workflow-history-grouper.types.ts | 7 --- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx index 231644afb..4eb20f616 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -587,6 +587,31 @@ describe(WorkflowHistoryGrouper.name, () => { ) ).toBe(true); }); + + it('should clean up all resources when destroy is called', async () => { + const { grouper, handleStateChange, waitForProcessing } = setup(); + + // Process some events and verify onChange is called + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + expect(handleStateChange).toHaveBeenCalled(); + expect(Object.keys(grouper.getGroups()).length).toBeGreaterThan(0); + + handleStateChange.mockClear(); + // Destroy the grouper + grouper.destroy(); + + // Verify state is reset + expect(grouper.getGroups()).toEqual({}); + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + + // Process new events - onChange should NOT be called anymore + grouper.updateEvents(completedActivityTaskEvents); + + // Verify onChange was NOT called after destroy + expect(handleStateChange).not.toHaveBeenCalled(); + }); }); function setup(options: Partial = {}) { @@ -597,9 +622,9 @@ function setup(options: Partial = {}) { timeoutId: NodeJS.Timeout; }> = []; - // Create onChange mock that resolves pending promises when processing completes - const onChange: jest.MockedFunction = jest.fn( - (state) => { + // Create state change handler that resolves pending promises when processing completes + const handleStateChange: jest.MockedFunction = + jest.fn((state) => { if (state.status === 'idle') { // Resolve all pending promises at once pendingResolvers.forEach(({ timeoutId, resolve }) => { @@ -608,14 +633,11 @@ function setup(options: Partial = {}) { }); pendingResolvers.length = 0; } - } - ); + }); - // Create grouper with onChange and any additional options - const grouper = new WorkflowHistoryGrouper({ - onChange, - ...options, - }); + // Create grouper and subscribe to state changes + const grouper = new WorkflowHistoryGrouper(options); + grouper.onChange(handleStateChange); // Helper function to wait for next processing cycle const waitForProcessing = async (timeout = 1000): Promise => { @@ -635,14 +657,15 @@ function setup(options: Partial = {}) { }); }; - // Cleanup function to clear any pending timeouts + // Cleanup function to clear any pending timeouts and unsubscribe const cleanup = () => { pendingResolvers.forEach(({ timeoutId }) => clearTimeout(timeoutId)); pendingResolvers.length = 0; + grouper.destroy(); }; // Register cleanup automatically allCleanups.push(cleanup); - return { grouper, onChange, waitForProcessing }; + return { grouper, handleStateChange, waitForProcessing }; } diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index 4e542efcb..3dea5c3d6 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -46,7 +46,7 @@ export default class WorkflowHistoryGrouper { private groups: HistoryEventsGroups = {}; private currentPendingActivities: PendingActivityTaskStartEvent[] = []; private currentPendingDecision: PendingDecisionTaskStartEvent | null = null; - private onChange: (state: GroupingProcessState) => void; + private subscribers: Set<(state: GroupingProcessState) => void> = new Set(); private batchSize?: number; private isProcessing: boolean = false; @@ -54,11 +54,21 @@ export default class WorkflowHistoryGrouper { private bufferedPendingActivities: PendingActivityTaskStartEvent[] = []; private bufferedPendingDecision: PendingDecisionTaskStartEvent | null = null; - constructor({ onChange, batchSize }: Props) { - this.onChange = onChange; + constructor({ batchSize }: Props = {}) { this.batchSize = batchSize; } + /** + * Subscribe to state changes. + * Returns an unsubscribe function. + */ + public onChange(callback: (state: GroupingProcessState) => void): () => void { + this.subscribers.add(callback); + return () => { + this.subscribers.delete(callback); + }; + } + /** * Updates the events list and automatically starts processing. * The processor will continue batch by batch until all events are processed. @@ -118,6 +128,16 @@ export default class WorkflowHistoryGrouper { this.isProcessing = false; } + /** + * Destroys the grouper, cleaning up all resources. + * Clears all subscribers and resets internal state. + * Call this when the grouper is no longer needed. + */ + public destroy(): void { + this.subscribers.clear(); + this.reset(); + } + /** * Gets the current groups without processing new events. * @@ -212,13 +232,14 @@ export default class WorkflowHistoryGrouper { const processedEventsCount = this.lastProcessedEventIndex + 1; const remainingEventsCount = this.allEvents.length - processedEventsCount; - // Report progress - this.onChange({ + // Report progress to all subscribers + const state: GroupingProcessState = { currentGroups: { ...this.groups }, processedEventsCount, remainingEventsCount, status: remainingEventsCount > 0 ? 'processing' : 'idle', - }); + }; + this.subscribers.forEach((callback) => callback(state)); // Check if there are more events to process if (this.lastProcessedEventIndex < this.allEvents.length - 1) { diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts index 8c075cd3d..b4ba1492a 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts @@ -41,13 +41,6 @@ export type GroupingProcessState = { export type GroupingStateChangeCallback = (state: GroupingProcessState) => void; export type Props = { - /** - * Callback invoked when grouping state changes. - * Provides real-time updates on processing progress. - * Required to receive state updates. - */ - onChange: GroupingStateChangeCallback; - /** * Batch size for incremental processing. * If specified, events will be processed in batches to allow progress updates. From 0539afee8554a6c5dd7e7b9382339c99653fe885 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Thu, 6 Nov 2025 01:47:12 +0100 Subject: [PATCH 4/9] replace getGroups with getState Signed-off-by: Assem Hafez --- .../workflow-history-grouper.test.tsx | 108 ++++++++++++------ .../helpers/workflow-history-grouper.ts | 46 ++++---- .../helpers/workflow-history-grouper.types.ts | 2 +- 3 files changed, 97 insertions(+), 59 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx index 4eb20f616..4a226b65f 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -37,7 +37,7 @@ describe(WorkflowHistoryGrouper.name, () => { grouper.updateEvents(completedActivityTaskEvents); await waitForProcessing(); - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; expect(groups).toBeDefined(); expect(groups['7']).toBeDefined(); expect(groups['7'].groupType).toBe('Activity'); @@ -77,7 +77,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: null, }); - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; const activityGroup = groups['7']; expect(activityGroup.events).toHaveLength(2); expect(activityGroup.events[1].attributes).toBe( @@ -98,7 +98,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: createPendingDecision('2'), }); - const decisionGroup = grouper.getGroups()['2']; + const decisionGroup = grouper.getState().groups['2']; expect(decisionGroup.groupType).toBe('Decision'); expect(decisionGroup.events).toHaveLength(2); }); @@ -115,7 +115,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: null, }); - const firstGroups = grouper.getGroups(); + const firstGroups = grouper.getState().groups; const firstActivityGroup = firstGroups['7']; expect(firstActivityGroup.events).toHaveLength(2); @@ -125,7 +125,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: null, }); - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; const activityGroup = groups['7']; expect(activityGroup.events).toHaveLength(1); expect(activityGroup.events[0].attributes).toBe( @@ -145,7 +145,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: createPendingDecision('2'), }); - const firstGroups = grouper.getGroups(); + const firstGroups = grouper.getState().groups; expect(firstGroups['2'].events).toHaveLength(2); // Second call without pending decision (it completed) @@ -154,7 +154,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: null, }); - const decisionGroup = grouper.getGroups()['2']; + const decisionGroup = grouper.getState().groups['2']; expect(decisionGroup.events).toHaveLength(1); }); @@ -183,7 +183,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: null, }); - const activityGroup = grouper.getGroups()['7']; + const activityGroup = grouper.getState().groups['7']; expect(activityGroup.events).toHaveLength(2); expect( activityGroup.events.some( @@ -195,7 +195,7 @@ describe(WorkflowHistoryGrouper.name, () => { it('should return current groups without processing', () => { const { grouper } = setup(); - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; expect(groups).toEqual({}); }); @@ -208,13 +208,13 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); expect(grouper.getLastProcessedEventIndex()).toBe(2); - expect(Object.keys(grouper.getGroups()).length).toBeGreaterThan(0); + expect(Object.keys(grouper.getState().groups).length).toBeGreaterThan(0); // Reset grouper.reset(); expect(grouper.getLastProcessedEventIndex()).toBe(-1); - expect(grouper.getGroups()).toEqual({}); + expect(grouper.getState().groups).toEqual({}); }); it('should reprocess events after reset', async () => { @@ -224,16 +224,16 @@ describe(WorkflowHistoryGrouper.name, () => { grouper.updateEvents(completedActivityTaskEvents); await waitForProcessing(); - const firstGroups = grouper.getGroups(); + const firstGroups = grouper.getState().groups; // Reset and reprocess grouper.reset(); - expect(grouper.getGroups()).toEqual({}); + expect(grouper.getState().groups).toEqual({}); grouper.updateEvents(completedActivityTaskEvents); await waitForProcessing(); - expect(grouper.getGroups()).toEqual(firstGroups); + expect(grouper.getState().groups).toEqual(firstGroups); }); it('should buffer pending activity when group does not exist yet', async () => { @@ -246,7 +246,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // Group should NOT exist yet (pending event is buffered) - let groups = grouper.getGroups(); + let groups = grouper.getState().groups; expect(groups['7']).toBeUndefined(); // Now add the scheduled event @@ -254,7 +254,7 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); // Group should now exist with both scheduled and pending events - groups = grouper.getGroups(); + groups = grouper.getState().groups; const activityGroup = groups['7']; expect(activityGroup).toBeDefined(); expect(activityGroup.events).toHaveLength(2); @@ -276,7 +276,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // Group should NOT exist yet (pending event is buffered) - let groups = grouper.getGroups(); + let groups = grouper.getState().groups; expect(groups['2']).toBeUndefined(); // Now add the scheduled event @@ -284,7 +284,7 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); // Group should now exist with both scheduled and pending events - groups = grouper.getGroups(); + groups = grouper.getState().groups; const decisionGroup = groups['2']; expect(decisionGroup).toBeDefined(); expect(decisionGroup.groupType).toBe('Decision'); @@ -304,14 +304,14 @@ describe(WorkflowHistoryGrouper.name, () => { }); // No groups should exist yet - expect(Object.keys(grouper.getGroups()).length).toBe(0); + expect(Object.keys(grouper.getState().groups).length).toBe(0); // Add first scheduled event grouper.updateEvents([createScheduleActivityEvent('7')]); await waitForProcessing(); // First group should now exist - let groups = grouper.getGroups(); + let groups = grouper.getState().groups; expect(groups['7']).toBeDefined(); expect(groups['10']).toBeUndefined(); @@ -323,7 +323,7 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); // Both groups should now exist - groups = grouper.getGroups(); + groups = grouper.getState().groups; expect(groups['7']).toBeDefined(); expect(groups['10']).toBeDefined(); expect(groups['7'].events).toHaveLength(2); @@ -348,7 +348,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // No groups should exist yet (still buffered) - expect(Object.keys(grouper.getGroups()).length).toBe(0); + expect(Object.keys(grouper.getState().groups).length).toBe(0); // Now add scheduled events for both activities grouper.updateEvents([ @@ -357,7 +357,7 @@ describe(WorkflowHistoryGrouper.name, () => { ]); await waitForProcessing(); - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; // Group '7' should only have scheduled event (pending was cleared from buffer) expect(groups['7']).toBeDefined(); @@ -395,7 +395,7 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); // Group should only have scheduled event (buffered pending was cleared) - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; const activityGroup = groups['7']; expect(activityGroup.events).toHaveLength(1); expect(activityGroup.events[0].attributes).toBe( @@ -413,7 +413,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // No group yet - expect(grouper.getGroups()['7']).toBeUndefined(); + expect(grouper.getState().groups['7']).toBeUndefined(); // Process scheduled event grouper.updateEvents([createScheduleActivityEvent('7')]); @@ -427,7 +427,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // Group should now have both events - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; const activityGroup = groups['7']; expect(activityGroup.events).toHaveLength(2); }); @@ -446,7 +446,7 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); // Should have complete group with both events - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; const activityGroup = groups['7']; expect(activityGroup).toBeDefined(); expect(activityGroup.events).toHaveLength(2); @@ -468,7 +468,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // Group should NOT exist in the UI - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; expect(groups['7']).toBeUndefined(); expect(Object.keys(groups).length).toBe(0); }); @@ -489,7 +489,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // No groups should exist yet (still buffered) - expect(Object.keys(grouper.getGroups()).length).toBe(0); + expect(Object.keys(grouper.getState().groups).length).toBe(0); // Now add scheduled events for both decisions grouper.updateEvents([ @@ -498,7 +498,7 @@ describe(WorkflowHistoryGrouper.name, () => { ]); await waitForProcessing(); - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; // Group '2' should only have scheduled event (pending was cleared from buffer) expect(groups['2']).toBeDefined(); @@ -532,7 +532,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // Group should have 2 events (scheduled + pending) - let groups = grouper.getGroups(); + let groups = grouper.getState().groups; expect(groups['2'].events).toHaveLength(2); // Now add started event (makes it 3 events total) @@ -543,7 +543,7 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); // Pending decision should be filtered out when there are more than 2 events - groups = grouper.getGroups(); + groups = grouper.getState().groups; expect(groups['2'].events).toHaveLength(2); expect( groups['2'].events.some( @@ -557,7 +557,7 @@ describe(WorkflowHistoryGrouper.name, () => { pendingStartDecision: createPendingDecision('2'), }); - groups = grouper.getGroups(); + groups = grouper.getState().groups; expect(groups['2'].events).toHaveLength(2); expect( groups['2'].events.some( @@ -579,7 +579,7 @@ describe(WorkflowHistoryGrouper.name, () => { }); // Group should have 2 events (scheduled + pending) - const groups = grouper.getGroups(); + const groups = grouper.getState().groups; expect(groups['2'].events).toHaveLength(2); expect( groups['2'].events.some( @@ -596,14 +596,14 @@ describe(WorkflowHistoryGrouper.name, () => { await waitForProcessing(); expect(handleStateChange).toHaveBeenCalled(); - expect(Object.keys(grouper.getGroups()).length).toBeGreaterThan(0); + expect(Object.keys(grouper.getState().groups).length).toBeGreaterThan(0); handleStateChange.mockClear(); // Destroy the grouper grouper.destroy(); // Verify state is reset - expect(grouper.getGroups()).toEqual({}); + expect(grouper.getState().groups).toEqual({}); expect(grouper.getLastProcessedEventIndex()).toBe(-1); // Process new events - onChange should NOT be called anymore @@ -612,6 +612,42 @@ describe(WorkflowHistoryGrouper.name, () => { // Verify onChange was NOT called after destroy expect(handleStateChange).not.toHaveBeenCalled(); }); + + it('should return current state via getState', async () => { + const { grouper, waitForProcessing } = setup(); + + // Initial state - no events processed + let state = grouper.getState(); + expect(state).toEqual({ + groups: {}, + processedEventsCount: 0, + remainingEventsCount: 0, + status: 'idle', + }); + + // Add events but don't wait - status should be processing + grouper.updateEvents(completedActivityTaskEvents); + state = grouper.getState(); + expect(state.status).toBe('processing'); + expect(state.remainingEventsCount).toBeGreaterThan(0); + + // Wait for processing to complete + await waitForProcessing(); + + // After processing - status should be idle + state = grouper.getState(); + expect(state).toEqual({ + groups: expect.any(Object), + processedEventsCount: completedActivityTaskEvents.length, + remainingEventsCount: 0, + status: 'idle', + }); + expect(Object.keys(state.groups).length).toBeGreaterThan(0); + + // Verify getState() returns consistent data + const anotherState = grouper.getState(); + expect(anotherState.groups).toEqual(state.groups); + }); }); function setup(options: Partial = {}) { diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index 3dea5c3d6..7425bc6fd 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -139,19 +139,24 @@ export default class WorkflowHistoryGrouper { } /** - * Gets the current groups without processing new events. - * - * @returns Current state of groups + * Gets the index of the last processed event. */ - public getGroups(): HistoryEventsGroups { - return this.groups; + public getLastProcessedEventIndex(): number { + return this.lastProcessedEventIndex; } /** - * Gets the index of the last processed event. + * Gets the current state of the grouper. + * Returns current groups, processing status, and event counts. */ - public getLastProcessedEventIndex(): number { - return this.lastProcessedEventIndex; + public getState(): GroupingProcessState { + return { + groups: { ...this.groups }, + processedEventsCount: this.lastProcessedEventIndex + 1, + remainingEventsCount: + this.allEvents.length - this.lastProcessedEventIndex - 1, + status: this.isProcessing ? 'processing' : 'idle', + }; } // ============================================================================ @@ -228,25 +233,22 @@ export default class WorkflowHistoryGrouper { // Move pointer forward this.lastProcessedEventIndex = batchEnd - 1; - // Calculate progress - const processedEventsCount = this.lastProcessedEventIndex + 1; - const remainingEventsCount = this.allEvents.length - processedEventsCount; + // Check if there are more events to process + const hasMoreEvents = + this.lastProcessedEventIndex < this.allEvents.length - 1; + + // Update processing state before reporting to subscribers + if (!hasMoreEvents) { + this.isProcessing = false; + } // Report progress to all subscribers - const state: GroupingProcessState = { - currentGroups: { ...this.groups }, - processedEventsCount, - remainingEventsCount, - status: remainingEventsCount > 0 ? 'processing' : 'idle', - }; + const state = this.getState(); this.subscribers.forEach((callback) => callback(state)); - // Check if there are more events to process - if (this.lastProcessedEventIndex < this.allEvents.length - 1) { + // Schedule next batch if needed + if (hasMoreEvents) { this.scheduleNextBatch(); - } else { - // All done - this.isProcessing = false; } } diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts index b4ba1492a..45c4e44b9 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts @@ -26,7 +26,7 @@ export type ProcessingStatus = 'idle' | 'processing'; */ export type GroupingProcessState = { /** Current groups accumulated so far */ - currentGroups: HistoryEventsGroups; + groups: HistoryEventsGroups; /** Number of events that have been successfully processed since the grouper was created/reset */ processedEventsCount: number; /** Number of events that are still pending (not yet processed) */ From bb6905a0e662c6076af3da53ce8b8e085f235127 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Thu, 6 Nov 2025 02:09:04 +0100 Subject: [PATCH 5/9] call onchange after updating pending events Signed-off-by: Assem Hafez --- .../helpers/workflow-history-grouper.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index 7425bc6fd..df1c0ef8c 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -92,8 +92,9 @@ export default class WorkflowHistoryGrouper { /** * Updates pending events (activities and decisions). * This should be called separately from updateEvents. + * Notifies subscribers after groups are updated. */ - public async updatePendingEvents(params: ProcessEventsParams) { + public updatePendingEvents(params: ProcessEventsParams) { // Update pending events (add new ones, remove stale ones) const currentPendingActivities = this.currentPendingActivities; @@ -111,6 +112,10 @@ export default class WorkflowHistoryGrouper { currentPendingDecision, params.pendingStartDecision ); + + // Notify subscribers after groups are updated + const state = this.getState(); + this.subscribers.forEach((callback) => callback(state)); } /** @@ -184,7 +189,7 @@ export default class WorkflowHistoryGrouper { /** * Schedules the next batch using the best available API. - * Uses Scheduler API if available, otherwise falls back to Promise microtask. + * Uses Scheduler API if available, otherwise falls back to setTimeout. */ private scheduleNextBatch() { if ( @@ -196,11 +201,11 @@ export default class WorkflowHistoryGrouper { (window.scheduler as any) .postTask(() => this.processBatch(), { priority: 'background' }) .catch(() => { - // Fallback if postTask fails + // Fallback to setTimeout if postTask fails setTimeout(() => this.processBatch(), 0); }); } else { - // Fallback to Promise microtask + // Fallback to setTimeout setTimeout(() => this.processBatch(), 0); } } From 1089623a9c82a272445094a27aba524f7ac93ec6 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Mon, 10 Nov 2025 14:28:20 +0100 Subject: [PATCH 6/9] use sync processBatch for first page Signed-off-by: Assem Hafez --- .../__tests__/workflow-history-grouper.test.tsx | 13 ++++++++++--- .../helpers/workflow-history-grouper.ts | 16 +++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx index 4a226b65f..0c642bd3e 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -625,11 +625,13 @@ describe(WorkflowHistoryGrouper.name, () => { status: 'idle', }); - // Add events but don't wait - status should be processing + // Add events - with batchSize=1, first batch will be processed synchronously but subsequent batches will be async grouper.updateEvents(completedActivityTaskEvents); + + // Check state after first batch (might be synchronous) state = grouper.getState(); - expect(state.status).toBe('processing'); - expect(state.remainingEventsCount).toBeGreaterThan(0); + // First batch is processed immediately, so processedEventsCount should be at least 1 + expect(state.processedEventsCount).toBeGreaterThan(0); // Wait for processing to complete await waitForProcessing(); @@ -677,6 +679,11 @@ function setup(options: Partial = {}) { // Helper function to wait for next processing cycle const waitForProcessing = async (timeout = 1000): Promise => { + // Check if already idle (processing completed synchronously) + if (grouper.getState().status === 'idle') { + return Promise.resolve(); + } + await new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { // Remove this resolver from queue if it times out diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index df1c0ef8c..927994041 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -92,9 +92,8 @@ export default class WorkflowHistoryGrouper { /** * Updates pending events (activities and decisions). * This should be called separately from updateEvents. - * Notifies subscribers after groups are updated. */ - public updatePendingEvents(params: ProcessEventsParams) { + public async updatePendingEvents(params: ProcessEventsParams) { // Update pending events (add new ones, remove stale ones) const currentPendingActivities = this.currentPendingActivities; @@ -112,10 +111,6 @@ export default class WorkflowHistoryGrouper { currentPendingDecision, params.pendingStartDecision ); - - // Notify subscribers after groups are updated - const state = this.getState(); - this.subscribers.forEach((callback) => callback(state)); } /** @@ -189,15 +184,18 @@ export default class WorkflowHistoryGrouper { /** * Schedules the next batch using the best available API. - * Uses Scheduler API if available, otherwise falls back to setTimeout. + * Uses Scheduler API if available, otherwise falls back to Promise microtask. */ private scheduleNextBatch() { - if ( + // if first batch process immediately, this helps avoiding UI delays + if (this.lastProcessedEventIndex === -1) { + this.processBatch(); + } else if ( typeof window !== 'undefined' && 'scheduler' in window && 'postTask' in (window.scheduler as any) ) { - // Use Scheduler API with background priority for non-urgent work + // Use Scheduler API with background priority if available (window.scheduler as any) .postTask(() => this.processBatch(), { priority: 'background' }) .catch(() => { From eb22eff3368660248fe5d9c7f6f07501c3b8018b Mon Sep 17 00:00:00 2001 From: Assem Hafez <137278762+Assem-Uber@users.noreply.github.com> Date: Thu, 13 Nov 2025 17:10:16 +0100 Subject: [PATCH 7/9] Update src/views/workflow-history/helpers/workflow-history-grouper.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/views/workflow-history/helpers/workflow-history-grouper.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index 927994041..6b6dd2630 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -184,7 +184,7 @@ export default class WorkflowHistoryGrouper { /** * Schedules the next batch using the best available API. - * Uses Scheduler API if available, otherwise falls back to Promise microtask. + * Uses Scheduler API if available, otherwise falls back to setTimeout. */ private scheduleNextBatch() { // if first batch process immediately, this helps avoiding UI delays From 213673cfb046a79fa02f37bc66876c675f748338 Mon Sep 17 00:00:00 2001 From: Assem Hafez Date: Thu, 13 Nov 2025 17:23:14 +0100 Subject: [PATCH 8/9] Apply PR comments --- .../helpers/__tests__/workflow-history-grouper.test.tsx | 2 -- .../workflow-history/helpers/workflow-history-grouper.ts | 5 +++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx index 0c642bd3e..4529096d0 100644 --- a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -16,8 +16,6 @@ import type { Props, } from '../workflow-history-grouper.types'; -// Commonly used mocks - // Track all setups for cleanup const allCleanups: Array<() => void> = []; diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts index 6b6dd2630..e0917a0d8 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -93,7 +93,7 @@ export default class WorkflowHistoryGrouper { * Updates pending events (activities and decisions). * This should be called separately from updateEvents. */ - public async updatePendingEvents(params: ProcessEventsParams) { + public updatePendingEvents(params: ProcessEventsParams) { // Update pending events (add new ones, remove stale ones) const currentPendingActivities = this.currentPendingActivities; @@ -187,7 +187,7 @@ export default class WorkflowHistoryGrouper { * Uses Scheduler API if available, otherwise falls back to setTimeout. */ private scheduleNextBatch() { - // if first batch process immediately, this helps avoiding UI delays + // If first batch, process immediately; this helps avoid UI delays if (this.lastProcessedEventIndex === -1) { this.processBatch(); } else if ( @@ -200,6 +200,7 @@ export default class WorkflowHistoryGrouper { .postTask(() => this.processBatch(), { priority: 'background' }) .catch(() => { // Fallback to setTimeout if postTask fails + // setTimeout adds the processBatch to Macro Task Queue (lowest priority queue) to allow current microtasks (UI updates) to complete first setTimeout(() => this.processBatch(), 0); }); } else { From 5d8e74aebc25cd584b58635f59976b361ebc4657 Mon Sep 17 00:00:00 2001 From: Assem Hafez <137278762+Assem-Uber@users.noreply.github.com> Date: Thu, 13 Nov 2025 17:24:11 +0100 Subject: [PATCH 9/9] Update src/views/workflow-history/helpers/workflow-history-grouper.types.ts Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../helpers/workflow-history-grouper.types.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts index 45c4e44b9..782d5b473 100644 --- a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts +++ b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts @@ -11,11 +11,6 @@ export type ProcessEventsParams = { pendingStartDecision: PendingDecisionTaskStartEvent | null; }; -export type ProcessEventsResult = { - groups: HistoryEventsGroups; - lastProcessedEventIndex: number; -}; - /** * Processing status for incremental grouping operations. */