Skip to content

Commit 7b3990c

Browse files
author
kfc-manager
committed
enha: added trigger to define DynamoDB stream or SQS queue as event source
1 parent 79762a1 commit 7b3990c

File tree

4 files changed

+245
-6
lines changed

4 files changed

+245
-6
lines changed

README.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ This module provides a Lambda function which logs to CloudWatch. If no image URI
2525
| ------------- | --------------------------------------------------------------------------------------------- | -------------- | -------- | :------: |
2626
| identifier | Unique identifier to differentiate global resources. | `string` | n/a | yes |
2727
| policies | List of IAM policy ARNs for the Lambda's IAM role. | `list(string)` | [] | no |
28+
| trigger | Object to define trigger of the Lambda function. | `object` | null | no |
2829
| vpc_config | Object to define the subnets and security groups for the Lambda function. | `object` | null | no |
2930
| log_config | Object to define logging configuration of the Lambda function to CloudWatch. | `object` | null | no |
3031
| image | Object of the image which will be pulled by the Lambda function to execute. | `object` | null | no |
@@ -34,12 +35,23 @@ This module provides a Lambda function which logs to CloudWatch. If no image URI
3435
| env_variables | A map of environment variables for the Lambda function at runtime. | `map(string)` | {} | no |
3536
| tags | A map of tags to add to all resources. | `map(string)` | {} | no |
3637

38+
### `trigger`
39+
40+
| Name | Description | Type | Default | Required |
41+
| --------------- | -------------------------------------------------------------------------------------------------------------------------- | -------- | ------- | :------: |
42+
| queue_arn | The ARN of the SQS queue, which triggers the Lambda function. Must be defined if 'stream_arn' is not defined. | `string` | null | no |
43+
| stream_arn | The ARN of the DynamoDB stream, which triggers the Lambda function. Must be defined if 'queue_arn' is not defined. | `string` | null | no |
44+
| batch_size | Amount of items a single Lambda invocation processes from the source. | `number` | 1 | no |
45+
| max_concurrency | Maximum amount of Lambda functions the SQS queue invokes concurrently. | `number` | 1000 | no |
46+
| max_retries | Maximum retry attempts the Lambda function makes to process the DynamoDB stream. The value '-1' means it tries infinitely. | `number` | -1 | no |
47+
| filter | A filter pattern of which messages the Lambda function processes. Must be in JSON format. | `string` | null | no |
48+
3749
### `vpc_config`
3850

3951
| Name | Description | Type | Default | Required |
4052
| --------------- | ------------------------------------------------------------ | -------------- | ------- | :------: |
4153
| subnets | List of subnet IDs in which the Lambda function will run in. | `list(string)` | n/a | yes |
42-
| security_groups | List of security group IDs the Lambda function will hold. | `list(string)` | n/a | yes |
54+
| security_groups | List of security group IDs the Lambda function will hold. | `list(string)` | [] | no |
4355

4456
### `log_config`
4557

@@ -78,6 +90,18 @@ module "function" {
7890
"arn:aws:iam::aws:policy/AdministratorAccess-Amplify"
7991
]
8092
93+
trigger = {
94+
queue_arn = "arn:aws:sqs:eu-central-1:444455556666:queue1"
95+
batch_size = 10
96+
max_concurrency = 100
97+
filter = jsonencode({
98+
body = {
99+
Temperature : [{ numeric : [">", 0, "<=", 100] }]
100+
Location : ["New York"]
101+
}
102+
})
103+
}
104+
81105
log_config = {
82106
retention_in_days = 7
83107
}

main.tf

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
resource "aws_cloudwatch_log_group" "main" {
66
count = var.log_config != null ? 1 : 0
77
name = "${var.identifier}-lambda"
8-
retention_in_days = try(var.log_config["retention_in_days"], null)
8+
retention_in_days = var.log_config["retention_in_days"]
99

1010
tags = var.tags
1111
}
@@ -116,7 +116,7 @@ resource "aws_lambda_function" "main" {
116116
function_name = var.identifier
117117
package_type = "Image"
118118
role = aws_iam_role.main.arn
119-
image_uri = var.image == null ? "${aws_ecr_repository.main[0].repository_url}:latest" : try(var.image["uri"], null)
119+
image_uri = var.image == null ? "${aws_ecr_repository.main[0].repository_url}:latest" : var.image["uri"]
120120
memory_size = var.memory_size
121121
timeout = var.timeout
122122
architectures = [var.architecture]
@@ -128,8 +128,8 @@ resource "aws_lambda_function" "main" {
128128
dynamic "vpc_config" {
129129
for_each = var.vpc_config != null ? [1] : []
130130
content {
131-
subnet_ids = try(var.vpc_config["subnets"], null)
132-
security_group_ids = try(var.vpc_config["security_groups"], null)
131+
subnet_ids = var.vpc_config["subnets"]
132+
security_group_ids = var.vpc_config["security_groups"]
133133
}
134134
}
135135

@@ -143,3 +143,37 @@ resource "aws_lambda_function" "main" {
143143

144144
tags = var.tags
145145
}
146+
147+
resource "aws_lambda_event_source_mapping" "queue" {
148+
count = try(var.trigger["queue_arn"], null) != null ? 1 : 0
149+
event_source_arn = var.trigger["queue_arn"]
150+
enabled = true
151+
function_name = aws_lambda_function.main.arn
152+
batch_size = var.trigger["batch_size"]
153+
154+
dynamic "scaling_config" {
155+
for_each = try(var.trigger["max_concurrency"], null) != null ? [1] : []
156+
content {
157+
maximum_concurrency = var.trigger["max_concurrency"]
158+
}
159+
}
160+
161+
dynamic "filter_criteria" {
162+
for_each = try(var.trigger["filter"], null) != null ? [1] : []
163+
content {
164+
filter {
165+
pattern = var.trigger["filter"]
166+
}
167+
}
168+
}
169+
}
170+
171+
resource "aws_lambda_event_source_mapping" "stream" {
172+
count = try(var.trigger["stream_arn"], null) != null ? 1 : 0
173+
event_source_arn = var.trigger["stream_arn"]
174+
enabled = true
175+
function_name = aws_lambda_function.main.arn
176+
batch_size = var.trigger["batch_size"]
177+
maximum_retry_attempts = var.trigger["max_retries"]
178+
starting_position = "LATEST"
179+
}

tests/trigger.tftest.hcl

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
provider "aws" {
2+
region = "eu-central-1"
3+
default_tags {
4+
tags = {
5+
Environment = "Test"
6+
}
7+
}
8+
}
9+
10+
run "queue_and_stream_arn_defined" {
11+
command = plan
12+
13+
variables {
14+
identifier = "abc"
15+
trigger = {
16+
queue_arn = "test-queue"
17+
stream_arn = "test-stream"
18+
}
19+
}
20+
21+
expect_failures = [var.trigger]
22+
}
23+
24+
run "queue_and_stream_arn_undefined" {
25+
command = plan
26+
27+
variables {
28+
identifier = "abc"
29+
trigger = {}
30+
}
31+
32+
expect_failures = [var.trigger]
33+
}
34+
35+
run "filter_without_queue" {
36+
command = plan
37+
38+
variables {
39+
identifier = "abc"
40+
trigger = {
41+
stream_arn = "test-stream"
42+
filter = "test"
43+
}
44+
}
45+
46+
expect_failures = [var.trigger]
47+
}
48+
49+
run "filter_with_queue" {
50+
command = plan
51+
52+
variables {
53+
identifier = "abc"
54+
trigger = {
55+
queue_arn = "test-queue"
56+
filter = "test"
57+
}
58+
}
59+
}
60+
61+
run "max_concurrency_upper_limit" {
62+
command = plan
63+
64+
variables {
65+
identifier = "abc"
66+
trigger = {
67+
queue_arn = "test-queue"
68+
max_concurrency = 1001
69+
}
70+
}
71+
72+
expect_failures = [var.trigger]
73+
}
74+
75+
run "max_concurrency_lower_limit" {
76+
command = plan
77+
78+
variables {
79+
identifier = "abc"
80+
trigger = {
81+
queue_arn = "test-queue"
82+
max_concurrency = 1
83+
}
84+
}
85+
86+
expect_failures = [var.trigger]
87+
}
88+
89+
run "queue_trigger" {
90+
command = plan
91+
92+
variables {
93+
identifier = "abc"
94+
trigger = {
95+
queue_arn = "test-queue"
96+
}
97+
}
98+
99+
assert {
100+
condition = length(aws_lambda_event_source_mapping.queue) == 1
101+
error_message = "Queue source mapping was not created"
102+
}
103+
104+
assert {
105+
condition = length(aws_lambda_event_source_mapping.stream) == 0
106+
error_message = "Stream source mapping was created unexpectedly"
107+
}
108+
}
109+
110+
run "stream_trigger" {
111+
command = plan
112+
113+
variables {
114+
identifier = "abc"
115+
trigger = {
116+
stream_arn = "test-stream"
117+
}
118+
}
119+
120+
assert {
121+
condition = length(aws_lambda_event_source_mapping.queue) == 0
122+
error_message = "Queue source mapping was created unexpectedly"
123+
}
124+
125+
assert {
126+
condition = length(aws_lambda_event_source_mapping.stream) == 1
127+
error_message = "Stream source mapping was not created"
128+
}
129+
}
130+
131+
run "without_trigger" {
132+
command = plan
133+
134+
variables {
135+
identifier = "abc"
136+
trigger = null
137+
}
138+
139+
assert {
140+
condition = length(aws_lambda_event_source_mapping.queue) == 0
141+
error_message = "Queue source mapping was created unexpectedly"
142+
}
143+
144+
assert {
145+
condition = length(aws_lambda_event_source_mapping.stream) == 0
146+
error_message = "Stream source mapping was created unexpectedly"
147+
}
148+
}

variables.tf

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,44 @@ variable "policies" {
1313
default = []
1414
}
1515

16+
variable "trigger" {
17+
description = "Object to define trigger of the Lambda function."
18+
type = object({
19+
queue_arn = optional(string, null)
20+
stream_arn = optional(string, null)
21+
batch_size = optional(number, 1)
22+
max_concurrency = optional(number, 1000)
23+
max_retries = optional(number, -1)
24+
filter = optional(string, null)
25+
})
26+
default = null
27+
validation {
28+
condition = var.trigger == null || (
29+
try(var.trigger["queue_arn"], null) != null || try(var.trigger["stream_arn"], null) != null)
30+
error_message = "Either 'queue_arn' or 'stream_arn' must be defined"
31+
}
32+
validation {
33+
condition = var.trigger == null || (
34+
try(var.trigger["queue_arn"], null) == null || try(var.trigger["stream_arn"], null) == null)
35+
error_message = "Values 'queue_arn' and 'stream_arn' can not be both defined"
36+
}
37+
validation {
38+
condition = !(
39+
try(var.trigger["queue_arn"], null) == null && try(var.trigger["filter"], null) != null)
40+
error_message = "Value 'queue_arn' must be defined if 'filter' is defined"
41+
}
42+
validation {
43+
condition = try(var.trigger["max_concurrency"], 1000) <= 1000 && (
44+
try(var.trigger["max_concurrency"], 2) >= 2)
45+
error_message = "Value 'max_concurrency' must be between '2' and '1000'"
46+
}
47+
}
48+
1649
variable "vpc_config" {
1750
description = "Object to define the subnets and security groups for the Lambda function."
1851
type = object({
1952
subnets = list(string)
20-
security_groups = list(string)
53+
security_groups = optional(list(string), [])
2154
})
2255
default = null
2356
validation {

0 commit comments

Comments
 (0)