diff --git a/packages/api/src/tasks/checkAlerts/index.ts b/packages/api/src/tasks/checkAlerts/index.ts index 9d4af8687..a2f07d513 100644 --- a/packages/api/src/tasks/checkAlerts/index.ts +++ b/packages/api/src/tasks/checkAlerts/index.ts @@ -3,6 +3,7 @@ // -------------------------------------------------------- import PQueue from '@esm2cjs/p-queue'; import * as clickhouse from '@hyperdx/common-utils/dist/clickhouse'; +import { ResponseJSON } from '@hyperdx/common-utils/dist/clickhouse'; import { ClickhouseClient } from '@hyperdx/common-utils/dist/clickhouse/node'; import { getMetadata, @@ -188,6 +189,179 @@ const extractGroupKeyFromMapKey = (mapKey: string, alertId: string): string => { : ''; }; +/** Determine if we should skip the alert check based on how recently it was last evaluated. */ +const shouldSkipAlertCheck = ( + details: AlertDetails, + hasGroupBy: boolean, + nowInMinsRoundDown: Date, +) => { + const { alert, previousMap } = details; + const alertKeyPrefix = getAlertKeyPrefix(alert.id); + + // Skip if ANY previous history for this alert was created in the current window + return Array.from(previousMap.entries()).some(([key, history]) => { + // For grouped alerts, check any key that starts with alertId prefix + // For non-grouped alerts, check exact match with alertId + const isMatchingKey = hasGroupBy + ? key.startsWith(alertKeyPrefix) + : key === alert.id; + + return ( + isMatchingKey && + fns.getTime(history.createdAt) === fns.getTime(nowInMinsRoundDown) + ); + }); +}; + +/** Get the date range for evaluating the alert */ +const getAlertEvaluationDateRange = ( + { alert, previousMap }: AlertDetails, + hasGroupBy: boolean, + nowInMinsRoundDown: Date, + windowSizeInMins: number, +) => { + // Calculate date range for the query + // Find the latest createdAt among all histories for this alert + let previousCreatedAt: Date | undefined; + if (hasGroupBy) { + // For grouped alerts, find the latest createdAt among all groups + // Use the latest to avoid checking from old groups that might no longer exist + const alertKeyPrefix = getAlertKeyPrefix(alert.id); + for (const [key, history] of previousMap.entries()) { + if (key.startsWith(alertKeyPrefix)) { + if (!previousCreatedAt || history.createdAt > previousCreatedAt) { + previousCreatedAt = history.createdAt; + } + } + } + } else { + // For non-grouped alerts, get the single history + const previous = previousMap.get(alert.id); + previousCreatedAt = previous?.createdAt; + } + + return calcAlertDateRange( + previousCreatedAt + ? previousCreatedAt.getTime() + : fns.subMinutes(nowInMinsRoundDown, windowSizeInMins).getTime(), + nowInMinsRoundDown.getTime(), + windowSizeInMins, + ); +}; + +const getChartConfigFromAlert = ( + details: AlertDetails, + connection: string, + dateRange: [Date, Date], + windowSizeInMins: number, +): ChartConfigWithOptDateRange | undefined => { + const { alert, source } = details; + if (details.taskType === AlertTaskType.SAVED_SEARCH) { + const savedSearch = details.savedSearch; + return { + connection, + displayType: DisplayType.Line, + dateRange, + dateRangeStartInclusive: true, + dateRangeEndInclusive: false, + from: source.from, + granularity: `${windowSizeInMins} minute`, + select: [ + { + aggFn: 'count', + aggCondition: '', + valueExpression: '', + }, + ], + where: savedSearch.where, + whereLanguage: savedSearch.whereLanguage, + groupBy: alert.groupBy, + implicitColumnExpression: source.implicitColumnExpression, + timestampValueExpression: source.timestampValueExpression, + }; + } else if (details.taskType === AlertTaskType.TILE) { + const tile = details.tile; + // Doesn't work for metric alerts yet + if (tile.config.displayType === DisplayType.Line) { + return { + connection, + dateRange, + dateRangeStartInclusive: true, + dateRangeEndInclusive: false, + displayType: tile.config.displayType, + from: source.from, + granularity: `${windowSizeInMins} minute`, + groupBy: tile.config.groupBy, + implicitColumnExpression: source.implicitColumnExpression, + metricTables: source.metricTables, + select: tile.config.select, + timestampValueExpression: source.timestampValueExpression, + where: tile.config.where, + seriesReturnType: tile.config.seriesReturnType, + }; + } + } + + logger.error( + { + alertId: alert.id, + }, + `Unsupported alert source: ${alert.source}`, + ); + + return undefined; +}; + +const getResponseMetadata = ( + data: ResponseJSON>, +) => { + // attach JS type + const meta = + data.meta?.map(m => ({ + ...m, + jsType: clickhouse.convertCHDataTypeToJSType(m.type), + })) ?? []; + + const timestampColumnName = meta.find( + m => m.jsType === clickhouse.JSDataType.Date, + )?.name; + const valueColumnNames = new Set( + meta + .filter(m => m.jsType === clickhouse.JSDataType.Number) + .map(m => m.name), + ); + + if (timestampColumnName == null) { + logger.error({ meta }, 'Failed to find timestamp column'); + return undefined; + } + + if (valueColumnNames.size === 0) { + logger.error({ meta }, 'Failed to find value column'); + return undefined; + } + + return { timestampColumnName, valueColumnNames }; +}; + +const parseAlertData = ( + data: Record, + meta: { timestampColumnName: string; valueColumnNames: Set }, +) => { + let value: number | null = null; + const extraFields: string[] = []; + + for (const [k, v] of Object.entries(data)) { + if (meta.valueColumnNames.has(k)) { + value = isString(v) ? parseInt(v) : v; + } else if (k !== meta.timestampColumnName) { + extraFields.push(`${k}:${v}`); + } + } + + return { value, extraFields }; +}; + export const processAlert = async ( now: Date, details: AlertDetails, @@ -200,28 +374,10 @@ export const processAlert = async ( try { const windowSizeInMins = ms(alert.interval) / 60000; const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now); - - // Check if we should skip this alert run - // Skip if ANY previous history for this alert was created in the current window const hasGroupBy = alert.groupBy && alert.groupBy.length > 0; - const alertKeyPrefix = getAlertKeyPrefix(alert.id); - - const shouldSkip = Array.from(previousMap.entries()).some( - ([key, history]) => { - // For grouped alerts, check any key that starts with alertId prefix - // For non-grouped alerts, check exact match with alertId - const isMatchingKey = hasGroupBy - ? key.startsWith(alertKeyPrefix) - : key === alert.id; - - return ( - isMatchingKey && - fns.getTime(history.createdAt) === fns.getTime(nowInMinsRoundDown) - ); - }, - ); - if (shouldSkip) { + // Check if we should skip this alert check based on last evaluation time + if (shouldSkipAlertCheck(details, !!hasGroupBy, nowInMinsRoundDown)) { logger.info( { windowSizeInMins, @@ -235,90 +391,20 @@ export const processAlert = async ( return; } - // Calculate date range for the query - // Find the latest createdAt among all histories for this alert - let previousCreatedAt: Date | undefined; - if (hasGroupBy) { - // For grouped alerts, find the latest createdAt among all groups - // Use the latest to avoid checking from old groups that might no longer exist - const alertKeyPrefix = getAlertKeyPrefix(alert.id); - for (const [key, history] of previousMap.entries()) { - if (key.startsWith(alertKeyPrefix)) { - if (!previousCreatedAt || history.createdAt > previousCreatedAt) { - previousCreatedAt = history.createdAt; - } - } - } - } else { - // For non-grouped alerts, get the single history - const previous = previousMap.get(alert.id); - previousCreatedAt = previous?.createdAt; - } - - const dateRange = calcAlertDateRange( - previousCreatedAt - ? previousCreatedAt.getTime() - : fns.subMinutes(nowInMinsRoundDown, windowSizeInMins).getTime(), - nowInMinsRoundDown.getTime(), + const dateRange = getAlertEvaluationDateRange( + details, + !!hasGroupBy, + nowInMinsRoundDown, windowSizeInMins, ); - let chartConfig: ChartConfigWithOptDateRange | undefined; - if (details.taskType === AlertTaskType.SAVED_SEARCH) { - const savedSearch = details.savedSearch; - chartConfig = { - connection: connectionId, - displayType: DisplayType.Line, - dateRange, - dateRangeStartInclusive: true, - dateRangeEndInclusive: false, - from: source.from, - granularity: `${windowSizeInMins} minute`, - select: [ - { - aggFn: 'count', - aggCondition: '', - valueExpression: '', - }, - ], - where: savedSearch.where, - whereLanguage: savedSearch.whereLanguage, - groupBy: alert.groupBy, - implicitColumnExpression: source.implicitColumnExpression, - timestampValueExpression: source.timestampValueExpression, - }; - } else if (details.taskType === AlertTaskType.TILE) { - const tile = details.tile; - // Doesn't work for metric alerts yet - if (tile.config.displayType === DisplayType.Line) { - chartConfig = { - connection: connectionId, - dateRange, - dateRangeStartInclusive: true, - dateRangeEndInclusive: false, - displayType: tile.config.displayType, - from: source.from, - granularity: `${windowSizeInMins} minute`, - groupBy: tile.config.groupBy, - implicitColumnExpression: source.implicitColumnExpression, - metricTables: source.metricTables, - select: tile.config.select, - timestampValueExpression: source.timestampValueExpression, - where: tile.config.where, - seriesReturnType: tile.config.seriesReturnType, - }; - } - } else { - logger.error( - { - alertId: alert.id, - }, - `Unsupported alert source: ${alert.source}`, - ); - return; - } + const chartConfig = getChartConfigFromAlert( + details, + connectionId, + dateRange, + windowSizeInMins, + ); - // Fetch data if (chartConfig == null) { logger.error( { @@ -330,6 +416,7 @@ export const processAlert = async ( return; } + // Query for alert data const metadata = getMetadata(clickhouseClient); const checksData = await clickhouseClient.queryChartConfig({ config: chartConfig, @@ -347,7 +434,6 @@ export const processAlert = async ( `Received alert metric [${alert.source} source]`, ); - // TODO: support INSUFFICIENT_DATA state // Track state per group (or one history if no groupBy) const histories = new Map(); @@ -366,59 +452,68 @@ export const processAlert = async ( return histories.get(groupKey)!; }; - if (checksData?.data && checksData?.data.length > 0) { - // attach JS type - const meta = - checksData.meta?.map(m => ({ - ...m, - jsType: clickhouse.convertCHDataTypeToJSType(m.type), - })) ?? []; - - const timestampColumnName = meta.find( - m => m.jsType === clickhouse.JSDataType.Date, - )?.name; - const valueColumnNames = new Set( - meta - .filter(m => m.jsType === clickhouse.JSDataType.Number) - .map(m => m.name), + // Helper to send a notification, catching and logging any errors. + const trySendNotification = async ({ + group, + totalCount, + state, + startTime = nowInMinsRoundDown, + }: { + state: AlertState; + totalCount: number; + group: string; + startTime?: Date; + }) => { + logger.info( + { alertId: alert.id, group, totalCount }, + state === AlertState.ALERT + ? `Triggering ${alert.channel.type} alarm!` + : `Alert resolved for group "${group}", triggering ${alert.channel.type} notification`, ); - if (timestampColumnName == null) { + try { + // Casts to any here because this is where I stopped unraveling the + // alert logic requiring large, nested objects. We should look at + // cleaning this up next. fireChannelEvent guards against null values + // for these properties. + await fireChannelEvent({ + alert, + alertProvider, + attributes: {}, // FIXME: support attributes (logs + resources ?) + clickhouseClient, + dashboard: (details as any).dashboard, + startTime, + endTime: fns.addMinutes(startTime, windowSizeInMins), + group, + metadata, + savedSearch: (details as any).savedSearch, + source, + state, + totalCount, + windowSizeInMins, + teamWebhooksById, + }); + } catch (e) { logger.error( - { - meta, - alertId: alert.id, - }, - 'Failed to find timestamp column', + { alertId: alert.id, group, error: serializeError(e) }, + 'Failed to fire channel event', ); - return; } - if (valueColumnNames.size === 0) { - logger.error( - { - meta, - alertId: alert.id, - }, - 'Failed to find value column', - ); + }; + + if (checksData?.data && checksData?.data.length > 0) { + const meta = getResponseMetadata(checksData); + if (!meta) { + logger.error({ alertId: alert.id }, 'Failed to get response metadata'); return; } for (const checkData of checksData.data) { - let _value: number | null = null; - const extraFields: string[] = []; - // TODO: other keys should be attributes ? (for alert message template) - for (const [k, v] of Object.entries(checkData)) { - if (valueColumnNames.has(k)) { - _value = isString(v) ? parseInt(v) : v; - } else if (k !== timestampColumnName) { - extraFields.push(`${k}:${v}`); - } - } + const { value, extraFields } = parseAlertData(checkData, meta); // TODO: we might want to fix the null value from the upstream (check if this is still needed) // this happens when the ratio is 0/0 - if (_value == null) { + if (value == null) { continue; } @@ -426,57 +521,21 @@ export const processAlert = async ( const groupKey = hasGroupBy ? extraFields.join(', ') : ''; const history = getOrCreateHistory(groupKey); - const bucketStart = new Date(checkData[timestampColumnName]); - if (doesExceedThreshold(alert.thresholdType, alert.threshold, _value)) { + const bucketStart = new Date(checkData[meta.timestampColumnName]); + if (doesExceedThreshold(alert.thresholdType, alert.threshold, value)) { history.state = AlertState.ALERT; - logger.info( - { - alertId: alert.id, - group: groupKey, - totalCount: _value, - checkData, - }, - `Triggering ${alert.channel.type} alarm!`, - ); - - try { - // Casts to any here because this is where I stopped unraveling the - // alert logic requiring large, nested objects. We should look at - // cleaning this up next. fireChannelEvent guards against null values - // for these properties. - await fireChannelEvent({ - alert, - alertProvider, - attributes: {}, // FIXME: support attributes (logs + resources ?) - clickhouseClient, - dashboard: (details as any).dashboard, - endTime: fns.addMinutes(bucketStart, windowSizeInMins), - group: groupKey, - metadata, - savedSearch: (details as any).savedSearch, - source, - startTime: bucketStart, - state: AlertState.ALERT, - totalCount: _value, - windowSizeInMins, - teamWebhooksById, - }); - } catch (e) { - logger.error( - { - alertId: alert.id, - group: groupKey, - error: serializeError(e), - }, - 'Failed to fire channel event', - ); - } + await trySendNotification({ + state: AlertState.ALERT, + group: groupKey, + totalCount: value, + startTime: bucketStart, + }); history.counts += 1; } else { - // TODO: if the alert was previously alerting (differnt bucket), should we set state to OK (plus auto-resolve)? + // TODO: if the alert was previously alerting (different bucket), should we set state to OK (plus auto-resolve)? } - history.lastValues.push({ count: _value, startTime: bucketStart }); + history.lastValues.push({ count: value, startTime: bucketStart }); } } @@ -517,46 +576,13 @@ export const processAlert = async ( groupPrevious?.state === AlertState.ALERT && history.state === AlertState.OK ) { - logger.info( - { - alertId: alert.id, - group: groupKey, - }, - `Alert resolved for group "${groupKey}", triggering ${alert.channel.type} notification`, - ); - - try { - const lastValue = history.lastValues[history.lastValues.length - 1]; - await fireChannelEvent({ - alert, - alertProvider, - attributes: {}, // FIXME: support attributes (logs + resources ?) - clickhouseClient, - dashboard: (details as any).dashboard, - endTime: fns.addMinutes( - lastValue?.startTime || nowInMinsRoundDown, - windowSizeInMins, - ), - group: groupKey, - metadata, - savedSearch: (details as any).savedSearch, - source, - startTime: lastValue?.startTime || nowInMinsRoundDown, - state: AlertState.OK, - totalCount: lastValue?.count || 0, - windowSizeInMins, - teamWebhooksById, - }); - } catch (e) { - logger.error( - { - alertId: alert.id, - group: groupKey, - error: serializeError(e), - }, - 'Failed to fire resolved channel event', - ); - } + const lastValue = history.lastValues[history.lastValues.length - 1]; + await trySendNotification({ + state: AlertState.OK, + group: groupKey, + totalCount: lastValue?.count || 0, + startTime: lastValue?.startTime || nowInMinsRoundDown, + }); } }