diff --git a/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts b/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts index d08e326de..646bad461 100644 --- a/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts +++ b/src/views/workflow-history/__fixtures__/workflow-history-pending-events.ts @@ -3,6 +3,9 @@ import type { PendingDecisionTaskStartEvent, } from '../workflow-history.types'; +import { scheduleActivityTaskEvent } from './workflow-history-activity-events'; +import { scheduleDecisionTaskEvent } from './workflow-history-decision-events'; + export const pendingActivityTaskStartEvent = { eventId: null, computedEventId: 'pending-7', @@ -94,3 +97,54 @@ export const pendingDecisionTaskStartEventWithStartedState = { }, }, } as const satisfies PendingDecisionTaskStartEvent; + +// Factory functions for creating test data dynamically + +export function createPendingActivity( + scheduleId: string, + options?: { activityId?: string } +): PendingActivityTaskStartEvent { + return { + ...pendingActivityTaskStartEvent, + computedEventId: `pending-${scheduleId}`, + pendingActivityTaskStartEventAttributes: { + ...pendingActivityTaskStartEvent.pendingActivityTaskStartEventAttributes, + scheduleId, + ...(options?.activityId && { activityId: options.activityId }), + }, + } as PendingActivityTaskStartEvent; +} + +export function createPendingDecision( + scheduleId: string +): PendingDecisionTaskStartEvent { + return { + ...pendingDecisionTaskStartEvent, + computedEventId: `pending-${scheduleId}`, + pendingDecisionTaskStartEventAttributes: { + ...pendingDecisionTaskStartEvent.pendingDecisionTaskStartEventAttributes, + scheduleId, + }, + } as PendingDecisionTaskStartEvent; +} + +export function createScheduleActivityEvent( + eventId: string, + options?: { activityId?: string } +) { + return { + ...scheduleActivityTaskEvent, + eventId, + activityTaskScheduledEventAttributes: { + ...scheduleActivityTaskEvent.activityTaskScheduledEventAttributes, + ...(options?.activityId && { activityId: options.activityId }), + }, + }; +} + +export function createScheduleDecisionEvent(eventId: string) { + return { + ...scheduleDecisionTaskEvent, + eventId, + }; +} diff --git a/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx new file mode 100644 index 000000000..4529096d0 --- /dev/null +++ b/src/views/workflow-history/helpers/__tests__/workflow-history-grouper.test.tsx @@ -0,0 +1,712 @@ +import { + completedActivityTaskEvents, + startActivityTaskEvent, +} from '../../__fixtures__/workflow-history-activity-events'; +import { startDecisionTaskEvent } from '../../__fixtures__/workflow-history-decision-events'; +import { + createPendingActivity, + createPendingDecision, + createScheduleActivityEvent, + createScheduleDecisionEvent, + pendingActivityTaskStartEvent, +} from '../../__fixtures__/workflow-history-pending-events'; +import WorkflowHistoryGrouper from '../workflow-history-grouper'; +import type { + GroupingStateChangeCallback, + Props, +} from '../workflow-history-grouper.types'; + +// Track all setups for cleanup +const allCleanups: Array<() => void> = []; + +describe(WorkflowHistoryGrouper.name, () => { + afterEach(async () => { + // Clean up any pending timeouts from all tests + allCleanups.forEach((cleanup) => cleanup()); + allCleanups.length = 0; + + // Give time for any pending async operations to complete + await new Promise((resolve) => setTimeout(resolve, 10)); + }); + + it('should process events and create groups', async () => { + const { grouper, waitForProcessing } = setup(); + + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + const groups = grouper.getState().groups; + expect(groups).toBeDefined(); + expect(groups['7']).toBeDefined(); + expect(groups['7'].groupType).toBe('Activity'); + expect(grouper.getLastProcessedEventIndex()).toBe(2); + }); + + it('should have getLastProcessedEventIndex pointing to the last processed event', async () => { + const { grouper, waitForProcessing } = setup(); + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + + // First call with partial events + grouper.updateEvents([ + completedActivityTaskEvents[0], + completedActivityTaskEvents[1], + ]); + await waitForProcessing(); + + expect(grouper.getLastProcessedEventIndex()).toBe(1); + + // Second call with all events + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + expect(grouper.getLastProcessedEventIndex()).toBe(2); + }); + + it('should add new pending activities to groups', async () => { + const { grouper, waitForProcessing } = setup(); + + // First call with scheduled event only + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Update with pending activity + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + const groups = grouper.getState().groups; + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should add new pending decision to groups', async () => { + const { grouper, waitForProcessing } = setup(); + + // First call with scheduled event only + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); + + // Add pending decision + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + const decisionGroup = grouper.getState().groups['2']; + expect(decisionGroup.groupType).toBe('Decision'); + expect(decisionGroup.events).toHaveLength(2); + }); + + it('should remove stale pending activities from groups', async () => { + const { grouper, waitForProcessing } = setup(); + + // First call with pending activity + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + const firstGroups = grouper.getState().groups; + const firstActivityGroup = firstGroups['7']; + expect(firstActivityGroup.events).toHaveLength(2); + + // Second call without pending activity (it completed) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, + }); + + const groups = grouper.getState().groups; + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(1); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + }); + + it('should remove stale pending decision from groups', async () => { + const { grouper, waitForProcessing } = setup(); + + // First call with pending decision + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + const firstGroups = grouper.getState().groups; + expect(firstGroups['2'].events).toHaveLength(2); + + // Second call without pending decision (it completed) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, + }); + + const decisionGroup = grouper.getState().groups['2']; + expect(decisionGroup.events).toHaveLength(1); + }); + + it('should handle multiple pending activity state transitions', async () => { + const { grouper, waitForProcessing } = setup(); + + // Initial state + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Add pending activity + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Remove pending activity (it started) + grouper.updateEvents([ + createScheduleActivityEvent('7'), + startActivityTaskEvent, + ]); + await waitForProcessing(); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: null, + }); + + const activityGroup = grouper.getState().groups['7']; + expect(activityGroup.events).toHaveLength(2); + expect( + activityGroup.events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(false); + }); + + it('should return current groups without processing', () => { + const { grouper } = setup(); + + const groups = grouper.getState().groups; + + expect(groups).toEqual({}); + }); + + it('should reset grouper state', async () => { + const { grouper, waitForProcessing } = setup(); + + // Process some events + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + expect(grouper.getLastProcessedEventIndex()).toBe(2); + expect(Object.keys(grouper.getState().groups).length).toBeGreaterThan(0); + + // Reset + grouper.reset(); + + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + expect(grouper.getState().groups).toEqual({}); + }); + + it('should reprocess events after reset', async () => { + const { grouper, waitForProcessing } = setup(); + + // Process events + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + const firstGroups = grouper.getState().groups; + + // Reset and reprocess + grouper.reset(); + expect(grouper.getState().groups).toEqual({}); + + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + expect(grouper.getState().groups).toEqual(firstGroups); + }); + + it('should buffer pending activity when group does not exist yet', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add pending activity BEFORE scheduled event exists + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Group should NOT exist yet (pending event is buffered) + let groups = grouper.getState().groups; + expect(groups['7']).toBeUndefined(); + + // Now add the scheduled event + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Group should now exist with both scheduled and pending events + groups = grouper.getState().groups; + const activityGroup = groups['7']; + expect(activityGroup).toBeDefined(); + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should buffer pending decision when group does not exist yet', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add pending decision BEFORE scheduled event exists + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + // Group should NOT exist yet (pending event is buffered) + let groups = grouper.getState().groups; + expect(groups['2']).toBeUndefined(); + + // Now add the scheduled event + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); + + // Group should now exist with both scheduled and pending events + groups = grouper.getState().groups; + const decisionGroup = groups['2']; + expect(decisionGroup).toBeDefined(); + expect(decisionGroup.groupType).toBe('Decision'); + expect(decisionGroup.events).toHaveLength(2); + }); + + it('should handle multiple buffered pending activities', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add multiple pending activities BEFORE their scheduled events + await grouper.updatePendingEvents({ + pendingStartActivities: [ + createPendingActivity('7'), + createPendingActivity('10', { activityId: '1' }), + ], + pendingStartDecision: null, + }); + + // No groups should exist yet + expect(Object.keys(grouper.getState().groups).length).toBe(0); + + // Add first scheduled event + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // First group should now exist + let groups = grouper.getState().groups; + expect(groups['7']).toBeDefined(); + expect(groups['10']).toBeUndefined(); + + // Add second scheduled event + grouper.updateEvents([ + createScheduleActivityEvent('7'), + createScheduleActivityEvent('10', { activityId: '1' }), + ]); + await waitForProcessing(); + + // Both groups should now exist + groups = grouper.getState().groups; + expect(groups['7']).toBeDefined(); + expect(groups['10']).toBeDefined(); + expect(groups['7'].events).toHaveLength(2); + expect(groups['10'].events).toHaveLength(2); + }); + + it('should clear pending activities buffer when pending events are updated', async () => { + const { grouper, waitForProcessing } = setup(); + + // Buffer first pending activity for scheduleId: '7' + await grouper.updatePendingEvents({ + pendingStartActivities: [createPendingActivity('7')], + pendingStartDecision: null, + }); + + // Update with different pending activity for scheduleId: '10' (old one should be removed from buffer) + await grouper.updatePendingEvents({ + pendingStartActivities: [ + createPendingActivity('10', { activityId: '1' }), + ], + pendingStartDecision: null, + }); + + // No groups should exist yet (still buffered) + expect(Object.keys(grouper.getState().groups).length).toBe(0); + + // Now add scheduled events for both activities + grouper.updateEvents([ + createScheduleActivityEvent('7'), // scheduleId: '7' + createScheduleActivityEvent('10', { activityId: '1' }), // scheduleId: '10' + ]); + await waitForProcessing(); + + const groups = grouper.getState().groups; + + // Group '7' should only have scheduled event (pending was cleared from buffer) + expect(groups['7']).toBeDefined(); + expect(groups['7'].events).toHaveLength(1); + expect( + groups['7'].events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(false); + + // Group '10' should have both scheduled and pending events (current pending in buffer) + expect(groups['10']).toBeDefined(); + expect(groups['10'].events).toHaveLength(2); + expect( + groups['10'].events.some( + (e) => e.attributes === 'pendingActivityTaskStartEventAttributes' + ) + ).toBe(true); + }); + + it('should clear buffer on reset', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add pending activity without scheduled event (will be buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Reset the grouper + grouper.reset(); + + // Add scheduled event after reset + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Group should only have scheduled event (buffered pending was cleared) + const groups = grouper.getState().groups; + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(1); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + }); + + it('should apply buffered pending events after updatePendingEvents if groups now exist', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add pending activity BEFORE scheduled event (will be buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // No group yet + expect(grouper.getState().groups['7']).toBeUndefined(); + + // Process scheduled event + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Call updatePendingEvents again with same pending activity + // This should trigger applyBufferedPendingEvents and merge the buffered event + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Group should now have both events + const groups = grouper.getState().groups; + const activityGroup = groups['7']; + expect(activityGroup.events).toHaveLength(2); + }); + + it('should handle scenario where scheduled event arrives after pending event update', async () => { + const { grouper, waitForProcessing } = setup(); + + // Step 1: Pending activity arrives first (buffered) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Step 2: Scheduled event arrives + grouper.updateEvents([createScheduleActivityEvent('7')]); + await waitForProcessing(); + + // Should have complete group with both events + const groups = grouper.getState().groups; + const activityGroup = groups['7']; + expect(activityGroup).toBeDefined(); + expect(activityGroup.events).toHaveLength(2); + expect(activityGroup.events[0].attributes).toBe( + 'activityTaskScheduledEventAttributes' + ); + expect(activityGroup.events[1].attributes).toBe( + 'pendingActivityTaskStartEventAttributes' + ); + }); + + it('should not create incomplete groups when pending arrives before scheduled', async () => { + const { grouper } = setup(); + + // Only add pending activity (no scheduled event) + await grouper.updatePendingEvents({ + pendingStartActivities: [pendingActivityTaskStartEvent], + pendingStartDecision: null, + }); + + // Group should NOT exist in the UI + const groups = grouper.getState().groups; + expect(groups['7']).toBeUndefined(); + expect(Object.keys(groups).length).toBe(0); + }); + + it('should handle pending decision buffer clearing when decision changes', async () => { + const { grouper, waitForProcessing } = setup(); + + // Buffer first decision for scheduleId: '2' + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + // Update with different decision for scheduleId: '10' (old one should be removed from buffer) + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('10'), + }); + + // No groups should exist yet (still buffered) + expect(Object.keys(grouper.getState().groups).length).toBe(0); + + // Now add scheduled events for both decisions + grouper.updateEvents([ + createScheduleDecisionEvent('2'), // scheduleId: '2' + createScheduleDecisionEvent('10'), // scheduleId: '10' + ]); + await waitForProcessing(); + + const groups = grouper.getState().groups; + + // Group '2' should only have scheduled event (pending was cleared from buffer) + expect(groups['2']).toBeDefined(); + expect(groups['2'].events).toHaveLength(1); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + + // Group '10' should have both scheduled and pending events (current pending in buffer) + expect(groups['10']).toBeDefined(); + expect(groups['10'].events).toHaveLength(2); + expect( + groups['10'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(true); + }); + + it('should filter out pending decision when decision group has more than 2 events', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add scheduled event and pending decision + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + // Group should have 2 events (scheduled + pending) + let groups = grouper.getState().groups; + expect(groups['2'].events).toHaveLength(2); + + // Now add started event (makes it 3 events total) + grouper.updateEvents([ + createScheduleDecisionEvent('2'), + startDecisionTaskEvent, + ]); + await waitForProcessing(); + + // Pending decision should be filtered out when there are more than 2 events + groups = grouper.getState().groups; + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + + // even if pending event is updated again, it should not be added to the group + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + groups = grouper.getState().groups; + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(false); + }); + + it('should keep pending decision when decision group has exactly 2 events', async () => { + const { grouper, waitForProcessing } = setup(); + + // Add scheduled event and pending decision + grouper.updateEvents([createScheduleDecisionEvent('2')]); + await waitForProcessing(); + + await grouper.updatePendingEvents({ + pendingStartActivities: [], + pendingStartDecision: createPendingDecision('2'), + }); + + // Group should have 2 events (scheduled + pending) + const groups = grouper.getState().groups; + expect(groups['2'].events).toHaveLength(2); + expect( + groups['2'].events.some( + (e) => e.attributes === 'pendingDecisionTaskStartEventAttributes' + ) + ).toBe(true); + }); + + it('should clean up all resources when destroy is called', async () => { + const { grouper, handleStateChange, waitForProcessing } = setup(); + + // Process some events and verify onChange is called + grouper.updateEvents(completedActivityTaskEvents); + await waitForProcessing(); + + expect(handleStateChange).toHaveBeenCalled(); + expect(Object.keys(grouper.getState().groups).length).toBeGreaterThan(0); + + handleStateChange.mockClear(); + // Destroy the grouper + grouper.destroy(); + + // Verify state is reset + expect(grouper.getState().groups).toEqual({}); + expect(grouper.getLastProcessedEventIndex()).toBe(-1); + + // Process new events - onChange should NOT be called anymore + grouper.updateEvents(completedActivityTaskEvents); + + // Verify onChange was NOT called after destroy + expect(handleStateChange).not.toHaveBeenCalled(); + }); + + it('should return current state via getState', async () => { + const { grouper, waitForProcessing } = setup(); + + // Initial state - no events processed + let state = grouper.getState(); + expect(state).toEqual({ + groups: {}, + processedEventsCount: 0, + remainingEventsCount: 0, + status: 'idle', + }); + + // Add events - with batchSize=1, first batch will be processed synchronously but subsequent batches will be async + grouper.updateEvents(completedActivityTaskEvents); + + // Check state after first batch (might be synchronous) + state = grouper.getState(); + // First batch is processed immediately, so processedEventsCount should be at least 1 + expect(state.processedEventsCount).toBeGreaterThan(0); + + // Wait for processing to complete + await waitForProcessing(); + + // After processing - status should be idle + state = grouper.getState(); + expect(state).toEqual({ + groups: expect.any(Object), + processedEventsCount: completedActivityTaskEvents.length, + remainingEventsCount: 0, + status: 'idle', + }); + expect(Object.keys(state.groups).length).toBeGreaterThan(0); + + // Verify getState() returns consistent data + const anotherState = grouper.getState(); + expect(anotherState.groups).toEqual(state.groups); + }); +}); + +function setup(options: Partial = {}) { + // Queue of promise resolvers/rejecters waiting for processing to complete + const pendingResolvers: Array<{ + resolve: () => void; + reject: (error: Error) => void; + timeoutId: NodeJS.Timeout; + }> = []; + + // Create state change handler that resolves pending promises when processing completes + const handleStateChange: jest.MockedFunction = + jest.fn((state) => { + if (state.status === 'idle') { + // Resolve all pending promises at once + pendingResolvers.forEach(({ timeoutId, resolve }) => { + clearTimeout(timeoutId); + resolve(); + }); + pendingResolvers.length = 0; + } + }); + + // Create grouper and subscribe to state changes + const grouper = new WorkflowHistoryGrouper(options); + grouper.onChange(handleStateChange); + + // Helper function to wait for next processing cycle + const waitForProcessing = async (timeout = 1000): Promise => { + // Check if already idle (processing completed synchronously) + if (grouper.getState().status === 'idle') { + return Promise.resolve(); + } + + await new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + // Remove this resolver from queue if it times out + const index = pendingResolvers.findIndex( + (r) => r.timeoutId === timeoutId + ); + if (index !== -1) { + pendingResolvers.splice(index, 1); + } + reject(new Error('Timeout waiting for processing to complete')); + }, timeout); + + pendingResolvers.push({ resolve, reject, timeoutId }); + }); + }; + + // Cleanup function to clear any pending timeouts and unsubscribe + const cleanup = () => { + pendingResolvers.forEach(({ timeoutId }) => clearTimeout(timeoutId)); + pendingResolvers.length = 0; + grouper.destroy(); + }; + + // Register cleanup automatically + allCleanups.push(cleanup); + + return { grouper, handleStateChange, waitForProcessing }; +} diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.ts b/src/views/workflow-history/helpers/workflow-history-grouper.ts new file mode 100644 index 000000000..e0917a0d8 --- /dev/null +++ b/src/views/workflow-history/helpers/workflow-history-grouper.ts @@ -0,0 +1,546 @@ +import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent'; +import logger from '@/utils/logger'; + +import type { + ExtendedActivityHistoryEvent, + ExtendedDecisionHistoryEvent, + HistoryEventsGroup, + HistoryEventsGroups, + PendingActivityTaskStartEvent, + PendingDecisionTaskStartEvent, +} from '../workflow-history.types'; + +import isChildWorkflowExecutionEvent from './check-history-event-group/is-child-workflow-execution-event'; +import isExtendedActivityEvent from './check-history-event-group/is-extended-activity-event'; +import isExtendedDecisionEvent from './check-history-event-group/is-extended-decision-event'; +import isRequestCancelExternalWorkflowExecutionEvent from './check-history-event-group/is-request-cancel-external-workflow-execution-event'; +import isSignalExternalWorkflowExecutionEvent from './check-history-event-group/is-signal-external-workflow-execution-event'; +import isSingleEvent from './check-history-event-group/is-single-event'; +import isTimerEvent from './check-history-event-group/is-timer-event'; +import getHistoryEventGroupId from './get-history-event-group-id'; +import getActivityGroupFromEvents from './get-history-group-from-events/get-activity-group-from-events'; +import getChildWorkflowExecutionGroupFromEvents from './get-history-group-from-events/get-child-workflow-execution-group-from-events'; +import getDecisionGroupFromEvents from './get-history-group-from-events/get-decision-group-from-events'; +import getRequestCancelExternalWorkflowExecutionGroupFromEvents from './get-history-group-from-events/get-request-cancel-external-workflow-execution-group-from-events'; +import getSignalExternalWorkflowExecutionGroupFromEvents from './get-history-group-from-events/get-signal-external-workflow-execution-group-from-events'; +import getSingleEventGroupFromEvents from './get-history-group-from-events/get-single-event-group-from-events'; +import getTimerGroupFromEvents from './get-history-group-from-events/get-timer-group-from-events'; +import placeEventInGroupEvents from './place-event-in-group-events'; +import { + type GroupingProcessState, + type ProcessEventsParams, + type Props, +} from './workflow-history-grouper.types'; + +/** + * Stateful history events grouper that processes events incrementally. + * + * This class maintains the state of processed events and groups, allowing + * efficient incremental updates as new events arrive. It tracks pending + * activities and decisions, automatically adding new ones and removing + * stale ones from groups. + */ +export default class WorkflowHistoryGrouper { + private allEvents: HistoryEvent[] = []; + private lastProcessedEventIndex: number = -1; + private groups: HistoryEventsGroups = {}; + private currentPendingActivities: PendingActivityTaskStartEvent[] = []; + private currentPendingDecision: PendingDecisionTaskStartEvent | null = null; + private subscribers: Set<(state: GroupingProcessState) => void> = new Set(); + private batchSize?: number; + private isProcessing: boolean = false; + + // Buffer for pending events that arrived before their group exists + private bufferedPendingActivities: PendingActivityTaskStartEvent[] = []; + private bufferedPendingDecision: PendingDecisionTaskStartEvent | null = null; + + constructor({ batchSize }: Props = {}) { + this.batchSize = batchSize; + } + + /** + * Subscribe to state changes. + * Returns an unsubscribe function. + */ + public onChange(callback: (state: GroupingProcessState) => void): () => void { + this.subscribers.add(callback); + return () => { + this.subscribers.delete(callback); + }; + } + + /** + * Updates the events list and automatically starts processing. + * The processor will continue batch by batch until all events are processed. + * If already processing, the new events will be queued and processed after current batch completes. + * Listen to onChange for progress updates. + * + */ + public updateEvents(events: HistoryEvent[]): void { + // Update allEvents with the latest complete list + this.allEvents = events; + + // If already processing, the loop will automatically pick up the new events + // No need to do anything - the pointer-based approach handles this + if (this.isProcessing) { + return; + } + + this.startProcessing(); + } + + /** + * Updates pending events (activities and decisions). + * This should be called separately from updateEvents. + */ + public updatePendingEvents(params: ProcessEventsParams) { + // Update pending events (add new ones, remove stale ones) + + const currentPendingActivities = this.currentPendingActivities; + const currentPendingDecision = this.currentPendingDecision; + + this.currentPendingActivities = params.pendingStartActivities; + this.currentPendingDecision = params.pendingStartDecision; + + this.bufferedPendingActivities = []; + this.bufferedPendingDecision = null; + + this.processPendingEvents( + currentPendingActivities, + params.pendingStartActivities, + currentPendingDecision, + params.pendingStartDecision + ); + } + + /** + * Resets the grouper state, clearing all processed events and groups. + * Useful for reprocessing events from scratch. + */ + public reset(): void { + this.allEvents = []; + this.lastProcessedEventIndex = -1; + this.groups = {}; + this.currentPendingActivities = []; + this.currentPendingDecision = null; + this.bufferedPendingActivities = []; + this.bufferedPendingDecision = null; + this.isProcessing = false; + } + + /** + * Destroys the grouper, cleaning up all resources. + * Clears all subscribers and resets internal state. + * Call this when the grouper is no longer needed. + */ + public destroy(): void { + this.subscribers.clear(); + this.reset(); + } + + /** + * Gets the index of the last processed event. + */ + public getLastProcessedEventIndex(): number { + return this.lastProcessedEventIndex; + } + + /** + * Gets the current state of the grouper. + * Returns current groups, processing status, and event counts. + */ + public getState(): GroupingProcessState { + return { + groups: { ...this.groups }, + processedEventsCount: this.lastProcessedEventIndex + 1, + remainingEventsCount: + this.allEvents.length - this.lastProcessedEventIndex - 1, + status: this.isProcessing ? 'processing' : 'idle', + }; + } + + // ============================================================================ + // Private Implementation + // ============================================================================ + + /** + * Starts the processing cycle. + * Schedules the first batch - all batches go through the scheduler. + */ + private startProcessing(): void { + // Check if there are events to process + if ( + this.isProcessing || + this.lastProcessedEventIndex >= this.allEvents.length - 1 + ) { + return; + } + + this.isProcessing = true; + + // Schedule the first batch (and all subsequent batches will be scheduled too) + this.scheduleNextBatch(); + } + + /** + * Schedules the next batch using the best available API. + * Uses Scheduler API if available, otherwise falls back to setTimeout. + */ + private scheduleNextBatch() { + // If first batch, process immediately; this helps avoid UI delays + if (this.lastProcessedEventIndex === -1) { + this.processBatch(); + } else if ( + typeof window !== 'undefined' && + 'scheduler' in window && + 'postTask' in (window.scheduler as any) + ) { + // Use Scheduler API with background priority if available + (window.scheduler as any) + .postTask(() => this.processBatch(), { priority: 'background' }) + .catch(() => { + // Fallback to setTimeout if postTask fails + // setTimeout adds the processBatch to Macro Task Queue (lowest priority queue) to allow current microtasks (UI updates) to complete first + setTimeout(() => this.processBatch(), 0); + }); + } else { + // Fallback to setTimeout + setTimeout(() => this.processBatch(), 0); + } + } + + /** + * Processes a single batch of events (or all remaining events if no batchSize). + * This method handles the core grouping logic and schedules itself for the next batch. + */ + private processBatch(): void { + // Check if there are events to process + if (this.lastProcessedEventIndex >= this.allEvents.length - 1) { + this.isProcessing = false; + return; + } + + // Calculate batch boundaries + const batchStart = this.lastProcessedEventIndex + 1; + const batchEnd = + this.batchSize !== undefined && this.batchSize > 0 + ? Math.min(batchStart + this.batchSize, this.allEvents.length) + : this.allEvents.length; + + // Process this batch synchronously using indices (avoids array slicing) + this.groups = this.groupEvents(batchStart, batchEnd, this.groups); + + // After processing new events, try to apply any buffered pending events + // whose groups may now exist + this.applyBufferedPendingEvents(); + + // Move pointer forward + this.lastProcessedEventIndex = batchEnd - 1; + + // Check if there are more events to process + const hasMoreEvents = + this.lastProcessedEventIndex < this.allEvents.length - 1; + + // Update processing state before reporting to subscribers + if (!hasMoreEvents) { + this.isProcessing = false; + } + + // Report progress to all subscribers + const state = this.getState(); + this.subscribers.forEach((callback) => callback(state)); + + // Schedule next batch if needed + if (hasMoreEvents) { + this.scheduleNextBatch(); + } + } + + /** + * Groups a batch of new events and updates existing groups. + * Synchronous implementation that processes events immediately. + */ + private groupEvents( + startIndex: number, + endIndex: number, + existingGroups: HistoryEventsGroups + ): HistoryEventsGroups { + const groups = { ...existingGroups }; + + // Process new history events using indices (avoids array slicing) + for (let i = startIndex; i < endIndex; i++) { + const event = this.allEvents[i]; + const groupId = getHistoryEventGroupId(event); + if (!groupId) { + logger.warn( + { + eventId: event.eventId, + eventTime: event.eventTime, + }, + "Couldn't extract groupId from event, check event payload and extraction logic" + ); + continue; + } + + const defaultGroupDetails: Partial = { + events: [], + hasMissingEvents: false, + label: '', + }; + const currentGroup = groups[groupId] || defaultGroupDetails; + const updatedEventsArr = placeEventInGroupEvents( + event, + currentGroup.events + ); + + if (updatedEventsArr.every(isExtendedActivityEvent)) { + groups[groupId] = getActivityGroupFromEvents(updatedEventsArr); + } else if (updatedEventsArr.every(isExtendedDecisionEvent)) { + // If there are more than 2 decision events, filter out the pending decision task start event + // Pending decision task start event is only added to the group when the scheduled decision task event is added + // This logic can be moved later to getDecisionGroupFromEvents + const filteredDecisionEvents = + updatedEventsArr.length > 2 + ? updatedEventsArr.filter( + (e) => + e.attributes !== 'pendingDecisionTaskStartEventAttributes' + ) + : updatedEventsArr; + groups[groupId] = getDecisionGroupFromEvents(filteredDecisionEvents); + } else if (updatedEventsArr.every(isTimerEvent)) { + groups[groupId] = getTimerGroupFromEvents(updatedEventsArr); + } else if (updatedEventsArr.every(isChildWorkflowExecutionEvent)) { + groups[groupId] = + getChildWorkflowExecutionGroupFromEvents(updatedEventsArr); + } else if ( + updatedEventsArr.every(isSignalExternalWorkflowExecutionEvent) + ) { + groups[groupId] = + getSignalExternalWorkflowExecutionGroupFromEvents(updatedEventsArr); + } else if ( + updatedEventsArr.every(isRequestCancelExternalWorkflowExecutionEvent) + ) { + groups[groupId] = + getRequestCancelExternalWorkflowExecutionGroupFromEvents( + updatedEventsArr + ); + } else if (updatedEventsArr.every(isSingleEvent)) { + groups[groupId] = getSingleEventGroupFromEvents(updatedEventsArr); + } else { + logger.warn( + { + eventId: event.eventId, + eventTime: event.eventTime, + events: updatedEventsArr.map(({ eventId, eventTime }) => ({ + eventId, + eventTime, + })), + }, + 'No handler for grouping this event' + ); + } + } + + return groups; + } + + /** + * Adds a pending activity to a group, removing any existing pending activities first. + * Only adds the new pending activity if it has an eventTime. + */ + private addPendingActivityToGroup( + groupId: string, + pendingActivity: PendingActivityTaskStartEvent + ) { + const currentGroup = this.groups[groupId]; + if (currentGroup && currentGroup.events.every(isExtendedActivityEvent)) { + const filteredEvents = currentGroup.events.filter( + (e) => e.attributes !== 'pendingActivityTaskStartEventAttributes' + ) as ExtendedActivityHistoryEvent[]; + + this.groups[groupId] = getActivityGroupFromEvents([ + ...filteredEvents, + pendingActivity as ExtendedActivityHistoryEvent, + ]); + } + } + + /** + * Adds a pending decision to a group, removing any existing pending decision first. + * Only adds if the group has exactly one scheduled event. + */ + private updatePendingDecisionInGroup( + groupId: string, + pendingDecision: PendingDecisionTaskStartEvent | null + ) { + const currentGroup = this.groups[groupId]; + if (currentGroup && currentGroup.events.every(isExtendedDecisionEvent)) { + // Remove any existing pending decision + const filteredEvents = currentGroup.events.filter( + (e) => e.attributes !== 'pendingDecisionTaskStartEventAttributes' + ) as ExtendedDecisionHistoryEvent[]; + + // Only add pending decision if group has exactly one scheduled event + if ( + pendingDecision && + filteredEvents.length === 1 && + filteredEvents[0].attributes === 'decisionTaskScheduledEventAttributes' + ) { + const updatedEventsArr: ExtendedDecisionHistoryEvent[] = [ + ...filteredEvents, + pendingDecision, + ]; + this.groups[groupId] = getDecisionGroupFromEvents(updatedEventsArr); + } else { + // Just update without pending decision + this.groups[groupId] = getDecisionGroupFromEvents(filteredEvents); + } + } + } + + /** + * Updates pending activities and decisions. + */ + private processPendingEvents( + currentPendingActivities: PendingActivityTaskStartEvent[], + newPendingActivities: PendingActivityTaskStartEvent[], + currentPendingDecision: PendingDecisionTaskStartEvent | null, + newPendingDecision: PendingDecisionTaskStartEvent | null + ) { + this.updatePendingActivities( + currentPendingActivities, + newPendingActivities + ); + + this.updatePendingDecision(currentPendingDecision, newPendingDecision); + } + + /** + * Updates pending activities in groups by removing old ones and adding new ones. + * If a group doesn't exist yet, buffers the pending activity until the + * scheduled event arrives. + * Buffer is already cleared before this is called, so we're rebuilding from scratch. + */ + private updatePendingActivities( + currentPendingActivities: PendingActivityTaskStartEvent[], + newPendingActivities: PendingActivityTaskStartEvent[] + ): void { + const existingPendingGroups = new Set( + currentPendingActivities.map((pa) => getHistoryEventGroupId(pa)) + ); + // First, remove all current pending activities from their groups + currentPendingActivities.forEach((pa) => { + const groupId = getHistoryEventGroupId(pa); + if (groupId && existingPendingGroups.has(groupId)) { + const currentGroup = this.groups[groupId]; + if ( + currentGroup && + currentGroup.events.every(isExtendedActivityEvent) + ) { + const filteredEvents = currentGroup.events.filter( + (e) => e.attributes !== 'pendingActivityTaskStartEventAttributes' + ); + + this.groups[groupId] = getActivityGroupFromEvents(filteredEvents); + } + } + }); + + // Then, add all new pending activities to their groups (or buffer them) + newPendingActivities.forEach((pa) => { + const groupId = getHistoryEventGroupId(pa); + if (!groupId) { + logger.warn( + { + computedEventId: pa.computedEventId, + eventTime: pa.eventTime, + }, + "Couldn't extract groupId from pending activity event" + ); + return; + } + + if (this.groups[groupId]) { + this.addPendingActivityToGroup(groupId, pa); + } else { + this.bufferedPendingActivities.push(pa); + } + }); + } + + /** + * Adds the current pending decision to groups. + * If the group doesn't exist yet, buffers the pending decision until the + * scheduled event arrives. + * Buffer was cleared before this is called, so we're rebuilding from scratch. + */ + private updatePendingDecision( + currentPendingDecision: PendingDecisionTaskStartEvent | null, + newPendingDecision: PendingDecisionTaskStartEvent | null + ): void { + // Remove old pending decision from its group (if exists) + if (currentPendingDecision) { + const groupId = getHistoryEventGroupId(currentPendingDecision); + if (groupId) { + this.updatePendingDecisionInGroup(groupId, null); + } + } + + // Add new pending decision (to group or buffer) + if (newPendingDecision) { + const groupId = getHistoryEventGroupId(newPendingDecision); + if (!groupId) { + logger.warn( + { + computedEventId: newPendingDecision.computedEventId, + eventTime: newPendingDecision.eventTime, + }, + "Couldn't extract groupId from pending decision event" + ); + return; + } + + if (this.groups[groupId]) { + this.updatePendingDecisionInGroup(groupId, newPendingDecision); + } else { + this.bufferedPendingDecision = newPendingDecision; + } + } + } + + /** + * Applies buffered pending events to groups when their scheduled events arrive. + * This is called after processing new events to merge any pending events + * that were waiting for their groups to be created. + */ + private applyBufferedPendingEvents(): void { + // Apply buffered pending activities + const activitiesToKeepBuffered: PendingActivityTaskStartEvent[] = []; + + this.bufferedPendingActivities.forEach((activity) => { + const groupId = getHistoryEventGroupId(activity); + if (groupId && this.groups[groupId]) { + this.addPendingActivityToGroup(groupId, activity); + } else { + activitiesToKeepBuffered.push(activity); + } + }); + + this.bufferedPendingActivities = activitiesToKeepBuffered; + + // Apply buffered pending decision + if (this.bufferedPendingDecision) { + const groupId = getHistoryEventGroupId(this.bufferedPendingDecision); + if (groupId) { + // Try to add to existing group using helper + if (this.groups[groupId]) { + this.updatePendingDecisionInGroup( + groupId, + this.bufferedPendingDecision + ); + this.bufferedPendingDecision = null; + } + } + } + } +} diff --git a/src/views/workflow-history/helpers/workflow-history-grouper.types.ts b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts new file mode 100644 index 000000000..782d5b473 --- /dev/null +++ b/src/views/workflow-history/helpers/workflow-history-grouper.types.ts @@ -0,0 +1,45 @@ +import { type HistoryEvent } from '@/__generated__/proto-ts/uber/cadence/api/v1/HistoryEvent'; + +import type { + HistoryEventsGroups, + PendingActivityTaskStartEvent, + PendingDecisionTaskStartEvent, +} from '../workflow-history.types'; + +export type ProcessEventsParams = { + pendingStartActivities: PendingActivityTaskStartEvent[]; + pendingStartDecision: PendingDecisionTaskStartEvent | null; +}; + +/** + * Processing status for incremental grouping operations. + */ +export type ProcessingStatus = 'idle' | 'processing'; + +/** + * State snapshot of the grouping process. + */ +export type GroupingProcessState = { + /** Current groups accumulated so far */ + groups: HistoryEventsGroups; + /** Number of events that have been successfully processed since the grouper was created/reset */ + processedEventsCount: number; + /** Number of events that are still pending (not yet processed) */ + remainingEventsCount: number; + /** Current processing status */ + status: ProcessingStatus; +}; + +/** + * Callback invoked when grouping state changes. + */ +export type GroupingStateChangeCallback = (state: GroupingProcessState) => void; + +export type Props = { + /** + * Batch size for incremental processing. + * If specified, events will be processed in batches to allow progress updates. + * If not specified, all events are processed at once. + */ + batchSize?: number; +};