Skip to content

Commit 0c50cdd

Browse files
committed
feat(inbox): Queue depth lambda fully working!
1 parent 1a1fbdd commit 0c50cdd

File tree

5 files changed

+131
-85
lines changed

5 files changed

+131
-85
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/construct.handler.ts

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import * as awsLambda from 'aws-lambda';
2-
31
import { getSecrets, getRedisConnection, getQueueDepth, publishMetrics } from './methods';
42

53
export interface Result {
@@ -24,18 +22,13 @@ export const handler = async () => {
2422
throw new Error('REDIS_PORT environment variable does not parseInt');
2523
}
2624

27-
// parse queue names
2825
const queuesBlob = process.env.QUEUE_NAMES;
2926
if (!queuesBlob) {
3027
throw new Error('QUEUE_NAMES environment variable not set');
3128
}
3229
const queueNames: string[] = JSON.parse(queuesBlob);
3330

34-
// get username/password
3531
const redisSecretArn = process.env.REDIS_SECRET_ARN;
36-
if (!redisSecretArn) {
37-
throw new Error('REDIS_SECRET_ARN environment variable not set');
38-
}
3932

4033
const redisSecretPasswordPath = process.env.REDIS_SECRET_PASSWORD_PATH;
4134
if (!redisSecretPasswordPath) {
@@ -47,40 +40,58 @@ export const handler = async () => {
4740
throw new Error('REDIS_SECRET_USERNAME_PATH environment variable not set');
4841
}
4942

50-
const { username, password } = await getSecrets({
51-
redisSecretArn,
52-
redisSecretPasswordPath,
53-
redisSecretUsernamePath,
54-
});
43+
const cwService = process.env.CW_SERVICE;
44+
if (!cwService) {
45+
throw new Error('CW_SERVICE environment variable not set');
46+
}
5547

56-
// connect to redis instance, note this waits until the connection is 'ready'.
57-
const redisConnection = await getRedisConnection({
48+
const cwNamespace = process.env.CW_NAMESPACE;
49+
if (!cwNamespace) {
50+
throw new Error('CW_NAMESPACE environment variable not set');
51+
}
52+
53+
console.debug(`Connecting to redis at ${host}:${port}`);
54+
55+
const connectionOptions = {
5856
connectionName: 'queueDepthMetricPublisher',
5957
host,
6058
port,
61-
username,
62-
password,
63-
tls: {}, // always use tls, but be easy about it.
64-
});
59+
} as any;
60+
61+
if (redisSecretArn) {
62+
const { username, password } = await getSecrets({
63+
redisSecretArn,
64+
redisSecretPasswordPath,
65+
redisSecretUsernamePath,
66+
});
67+
connectionOptions.username = username;
68+
connectionOptions.password = password;
69+
}
6570

66-
// capture the values for the queues
71+
// connect to redis instance, note this waits until the connection is 'ready'.
72+
const redisConnection = await getRedisConnection(connectionOptions);
73+
74+
// capture the depth of all queues
6775
const results = await Promise.all(
6876
queueNames.map(async (queueName) => {
69-
let depth = await getQueueDepth({ redisConnection, queueName });
70-
if (!depth) {
71-
console.error(`Error fetching queue depth for ${queueName}`);
72-
depth = -1;
77+
try {
78+
let depth = await getQueueDepth({ redisConnection, queueName });
79+
return { queueName, depth };
80+
} catch (error) {
81+
console.error(`Error fetching queue depth for ${queueName}`, error);
7382
}
74-
return { queueName, depth };
83+
return { queueName, depth: -1 };
7584
}),
7685
);
7786

7887
if (results === undefined) {
7988
throw new Error('Error fetching ALL queue depths');
8089
}
8190

82-
// publish the metrics
83-
publishMetrics(results);
91+
publishMetrics(results, {
92+
namespace: cwNamespace,
93+
serviceName: cwService,
94+
});
8495
} catch (error) {
8596
console.error('Error publishing metric data', error);
8697
throw error;

src/construct.ts

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Duration } from 'aws-cdk-lib';
1+
import { Duration, Stack, aws_ec2 } from 'aws-cdk-lib';
22
import { PolicyStatement } from 'aws-cdk-lib/aws-iam';
33
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
44
import { Runtime } from 'aws-cdk-lib/aws-lambda';
@@ -19,6 +19,11 @@ export interface RedisQueueDepthMetricPublisherProps {
1919
* @default 'RedisQueueDepth'
2020
*/
2121
readonly cwNamespace?: string;
22+
/**
23+
* The CloudWatch service to publish metrics to.
24+
* @default 'RedisQueueDepthMetricPublisher'
25+
*/
26+
readonly cwService?: string;
2227
/**
2328
* Time intervals that Lambda will be triggered to publish metric in CloudWatch.
2429
* @default Duration.minutes(1)
@@ -54,7 +59,7 @@ export interface RedisQueueDepthMetricPublisherProps {
5459
* You can override these paths using {@link redisSecretPasswordPath}, and
5560
* {@link redisSecretUsernamePath} respectively.
5661
*/
57-
readonly redisSecretArn: string;
62+
readonly redisSecretArn?: string;
5863
/**
5964
* In the best possible world, we would be using ABAC to allow decryption of SecretsManager payload.
6065
* If that is an option, leave this undefined.
@@ -68,13 +73,26 @@ export interface RedisQueueDepthMetricPublisherProps {
6873
*
6974
* @default 'password'
7075
*/
71-
readonly redisSecretPasswordPath: string;
76+
readonly redisSecretPasswordPath?: string;
7277
/**
7378
* Override the JSON path for the username in the {@link redisSecretArn}
7479
*
7580
* @default 'username'
7681
*/
77-
readonly redisSecretUsernamePath: string;
82+
readonly redisSecretUsernamePath?: string;
83+
/**
84+
* The regions to publish metrics to/observe metrics from.
85+
* @default ['us-west-2']
86+
*/
87+
readonly regions?: string[];
88+
/**
89+
* The VPC to run the Lambda in.
90+
*/
91+
readonly vpc?: any;
92+
/**
93+
* The SecurityGroupId to grant the lambda to access redis clusters
94+
*/
95+
readonly securityGroupId?: string;
7896
}
7997

8098
/**
@@ -86,6 +104,7 @@ export class RedisQueueDepthMetricPublisher extends Construct {
86104
readonly handler: NodejsFunction;
87105
readonly rule: Rule;
88106
readonly cwNamespace: string;
107+
readonly cwService: string;
89108

90109
/**
91110
* Creates a new instance of RedisQueueDepthMetricPublisher.
@@ -96,8 +115,15 @@ export class RedisQueueDepthMetricPublisher extends Construct {
96115

97116
constructor(scope: Construct, id: Namer, props: RedisQueueDepthMetricPublisherProps) {
98117
super(scope, id.pascal);
118+
119+
if (!props.redisSecretArn && (!props.vpc || !props.securityGroupId)) {
120+
throw new Error('Either a secretArn or a vpc/securityGroupId must be provided');
121+
}
122+
99123
this.publishFrequency = props.publishFrequency ?? Duration.minutes(1);
100124
this.cwNamespace = props.cwNamespace ?? 'RedisQueueDepth';
125+
this.cwService = props.cwService ?? 'RedisQueueDepthMetricPublisher';
126+
this.regions = props.regions ?? ['us-west-2'];
101127
const myConstruct = this;
102128

103129
// Note the name implies where we fetch the code from
@@ -108,16 +134,12 @@ export class RedisQueueDepthMetricPublisher extends Construct {
108134
},
109135
logRetention: props.cloudwatchLogsRetention ?? RetentionDays.THREE_MONTHS,
110136
memorySize: 512,
111-
runtime: Runtime.NODEJS_LATEST, // Should be at least node20, but let's be aggressive about this.
137+
runtime: Runtime.NODEJS_18_X, // Should be at least node20, but let's be aggressive about this.
112138
timeout: Duration.seconds(45),
139+
...(props.vpc ? { vpc: props.vpc } : {}),
113140
});
114141

115-
[
116-
new PolicyStatement({
117-
sid: 'fetchRedisSecret',
118-
actions: ['secretsmanager:GetSecretValue'],
119-
resources: [props.redisSecretArn],
120-
}),
142+
const policies = [
121143
new PolicyStatement({
122144
sid: 'putRedisQueueDepth',
123145
actions: ['cloudwatch:PutMetricData'],
@@ -128,29 +150,53 @@ export class RedisQueueDepthMetricPublisher extends Construct {
128150
},
129151
},
130152
}),
131-
].forEach((policy) => this.handler.addToRolePolicy(policy));
153+
];
132154

155+
if (props.redisSecretArn) {
156+
policies.push(
157+
new PolicyStatement({
158+
sid: 'fetchRedisSecret',
159+
actions: ['secretsmanager:GetSecretValue'],
160+
resources: [props.redisSecretArn],
161+
}),
162+
);
163+
}
133164
if (props.redisSecretKeyArn) {
134-
this.handler.addToRolePolicy(
165+
policies.push(
135166
new PolicyStatement({
136167
sid: 'decryptRedisSecret',
137168
actions: ['kms:Decrypt'],
138169
resources: [props.redisSecretKeyArn],
139170
}),
140171
);
141172
}
173+
policies.forEach((policy) => this.handler.addToRolePolicy(policy));
142174

143175
this.handler
144176
.addEnvironment('QUEUE_NAMES', JSON.stringify(props.queueNames))
145177
.addEnvironment('REDIS_ADDR', props.redisAddr)
146178
.addEnvironment('REDIS_PORT', props.redisPort ?? '6379')
147-
.addEnvironment('REDIS_SECRET_ARN', props.redisSecretArn)
179+
.addEnvironment('REDIS_SECRET_ARN', props.redisSecretArn! || '')
148180
.addEnvironment('REDIS_SECRET_PASSWORD_PATH', props.redisSecretPasswordPath ?? 'password')
149-
.addEnvironment('REDIS_SECRET_USERNAME_PATH', props.redisSecretUsernamePath ?? 'username');
181+
.addEnvironment('REDIS_SECRET_USERNAME_PATH', props.redisSecretUsernamePath ?? 'username')
182+
.addEnvironment('CW_SERVICE', props.cwService ?? this.cwService)
183+
.addEnvironment('CW_NAMESPACE', props.cwNamespace ?? this.cwNamespace);
184+
185+
if (props.securityGroupId) {
186+
const securityGroup = aws_ec2.SecurityGroup.fromSecurityGroupId(this, 'securityGroup', props.securityGroupId);
187+
this.handler.connections.allowTo(securityGroup, aws_ec2.Port.tcp(6379));
188+
}
150189

151190
this.rule = new Rule(this, 'rule', {
152191
schedule: Schedule.rate(this.publishFrequency),
153192
});
154193
this.rule.addTarget(new LambdaFunction(this.handler));
155194
}
156195
}
196+
197+
export class RedisQueueDepthMetricPublisherStack extends Stack {
198+
constructor(scope: Construct, id: string, props: RedisQueueDepthMetricPublisherProps) {
199+
super(scope, id);
200+
new RedisQueueDepthMetricPublisher(this, new Namer([id, 'lambda']), props);
201+
}
202+
}

src/methods.ts

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,8 @@ export interface GetQueueDepthProps {
7373
}
7474

7575
export async function getQueueDepth(props: GetQueueDepthProps): Promise<number> {
76-
let count = 0;
77-
let cursor = '0';
78-
79-
do {
80-
const result = await props.redisConnection.scan(cursor, 'MATCH', `${props.queueName}*`, 'COUNT', '1000');
81-
cursor = result[0];
82-
count += result[1].length;
83-
} while (cursor !== '0');
84-
85-
return count;
76+
console.debug(`Getting queue depth for ${props.queueName}`);
77+
return props.redisConnection.llen(props.queueName);
8678
}
8779

8880
export interface PublishMetricsProps {}
@@ -93,24 +85,21 @@ export interface PublishMetricsPayload {
9385
export async function publishMetrics(payload: PublishMetricsPayload[], options?: MetricsOptions): Promise<void> {
9486
const metrics = new Metrics(options);
9587
payload.forEach((p) => {
96-
if (p.depth >= 0) {
97-
metrics.addMetric(p.queueName, MetricUnits.Count, p.depth);
98-
}
88+
metrics.addMetric(`queue_depth:${p.queueName}`, MetricUnits.Count, p.depth);
9989
});
10090
metrics.publishStoredMetrics();
101-
return;
10291
}
10392

104-
async function localTest() {
105-
const conn = await getRedisConnection({
106-
host: 'localhost',
107-
});
108-
const depth = await getQueueDepth({
109-
redisConnection: conn,
110-
queueName: 'NOTIFQUEUE:NOTIFQUEUE',
111-
});
112-
113-
console.log({ depth });
114-
}
93+
// async function localTest() {
94+
// const conn = await getRedisConnection({
95+
// host: 'localhost',
96+
// });
97+
// const depth = await getQueueDepth({
98+
// redisConnection: conn,
99+
// queueName: 'NOTIFQUEUE:NOTIFQUEUE',
100+
// });
101+
102+
// console.log({ depth });
103+
// }
115104

116105
// localTest();

0 commit comments

Comments
 (0)