Skip to content

Commit 0f131aa

Browse files
authored
Merge pull request #2300 from interludic2000/shaunguo-pattern-delay-fifo-sqs-queue
Create new pattern - Amazon SQS FIFO Queue with delay using AWS Lambda and AWS DynamoDB
2 parents 9042dd2 + 2cfeafb commit 0f131aa

19 files changed

+544
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
*.swp
2+
package-lock.json
3+
__pycache__
4+
.pytest_cache
5+
.venv
6+
*.egg-info
7+
.env
8+
9+
# CDK asset staging directory
10+
.cdk.staging
11+
cdk.out
12+
13+
delay_fifo_queue_test/test_stack.py
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Amazon SQS FIFO queue with delay using AWS Lambda and Amazon DynamoDB
2+
3+
This pattern shows how to introduce a delay between processing messages while maintaining order from an individual client. The message is sent sequentially to the downstream service for processing to minimize the consequences of unordered events.
4+
5+
Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/sqs-fifo-delayed-queue-dynamodb
6+
7+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
8+
9+
## Requirements
10+
11+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
12+
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
13+
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
14+
* [AWS Cloud Development Kit](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) (AWS CDK) installed
15+
16+
## Deployment Instructions
17+
18+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
19+
```
20+
git clone https://github.com/aws-samples/serverless-patterns
21+
```
22+
1. Change directory to the pattern directory:
23+
```
24+
cd sqs-fifo-delayed-queue-dynamodb
25+
```
26+
1. From the command line, use AWS CDK to deploy the AWS resources for the pattern as specified in the `delay_fifo_queue_test/delay_fifo_queue_test_stack.py` file.
27+
```
28+
python3 -m pip install -r requirements.txt
29+
cdk synth
30+
cdk deploy
31+
```
32+
33+
## How it works
34+
35+
This pattern deploys an Amazon SQS FIFO queue called `primary_queue`, a AWS Lambda function `process_queue_function`, a DynamoDB table `customer_table` and a second SQS FIFO queue `downstream_queue`.
36+
37+
When a messages from `primary_queue` is processed by the `process_queue_function`, it is checked against `customer_table` to see if another message from the same message sender has been processed with in a specified time frame.
38+
If true, the message is not processed and with be retried after the visibility timeout on the `primary_queue`.
39+
If false, the message is sent to the `downstream_queue` for processing. An entry is made to `customer_table` with a TTL.
40+
41+
42+
## Testing
43+
44+
1. Edit lines 3 and 7 of `send_messages.sh` with the `DelayFifoQueue` URL from the output of the `cdk deploy`. Run this script to send test messages to the queue.
45+
2. Head to AWS console and go to SQS service. Click on Queues, and select the queue containing the text `DelayFifoQueueDownstream`.
46+
3. Click on `Send and receive messages` then `Poll for messages` to see current messages in the queue.
47+
4. You shold observe messages with `test1`, `test2-first-message`, `test3` and `test4` in the `downstream_queue`.
48+
5. After around 60 seconds poll again, there should be another messages in the `downstream_queue` with `test2-delayed-message-1` as MessageBody.
49+
6. After another 60 seconds poll again, there should be another messages in the `downstream_queue` with `test2-delayed-message-2` as MessageBody.
50+
51+
## Cleanup
52+
53+
1. Delete the stack
54+
```
55+
cdk destroy
56+
```
57+
----
58+
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
59+
60+
SPDX-License-Identifier: MIT-0
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env python3
2+
import aws_cdk as cdk
3+
4+
from delay_fifo_queue_test.delay_fifo_queue_test_stack import DelayFifoQueueTestStack
5+
6+
7+
app = cdk.App()
8+
DelayFifoQueueTestStack(app, "DelayFifoQueueTestStack",
9+
# If you don't specify 'env', this stack will be environment-agnostic.
10+
# Account/Region-dependent features and context lookups will not work,
11+
# but a single synthesized template can be deployed anywhere.
12+
13+
# Uncomment the next line to specialize this stack for the AWS Account
14+
# and Region that are implied by the current CLI configuration.
15+
16+
#env=cdk.Environment(account=os.getenv('CDK_DEFAULT_ACCOUNT'), region=os.getenv('CDK_DEFAULT_REGION')),
17+
18+
# Uncomment the next line if you know exactly what Account and Region you
19+
# want to deploy the stack to. */
20+
21+
#env=cdk.Environment(account='123456789012', region='us-east-1'),
22+
23+
# For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html
24+
)
25+
26+
app.synth()
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"app": "python3 app.py",
3+
"watch": {
4+
"include": [
5+
"**"
6+
],
7+
"exclude": [
8+
"README.md",
9+
"cdk*.json",
10+
"requirements*.txt",
11+
"source.bat",
12+
"**/__init__.py",
13+
"python/__pycache__",
14+
"tests"
15+
]
16+
},
17+
"context": {
18+
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
19+
"@aws-cdk/core:checkSecretUsage": true,
20+
"@aws-cdk/core:target-partitions": [
21+
"aws",
22+
"aws-cn"
23+
],
24+
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
25+
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
26+
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
27+
"@aws-cdk/aws-iam:minimizePolicies": true,
28+
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
29+
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
30+
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
31+
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
32+
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
33+
"@aws-cdk/core:enablePartitionLiterals": true,
34+
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
35+
"@aws-cdk/aws-iam:standardizedServicePrincipals": true,
36+
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
37+
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
38+
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
39+
"@aws-cdk/aws-route53-patters:useCertificate": true,
40+
"@aws-cdk/customresources:installLatestAwsSdkDefault": false,
41+
"@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true,
42+
"@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true,
43+
"@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true,
44+
"@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true,
45+
"@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true,
46+
"@aws-cdk/aws-redshift:columnId": true,
47+
"@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true,
48+
"@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true,
49+
"@aws-cdk/aws-apigateway:requestValidatorUniqueId": true
50+
}
51+
}

sqs-fifo-delayed-queue-dynamodb/delay_fifo_queue_test/__init__.py

Whitespace-only changes.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from aws_cdk import (
2+
Duration,
3+
Stack,
4+
aws_sqs as sqs,
5+
aws_lambda as lambda_,
6+
aws_iam as iam,
7+
aws_cloudwatch as cloudwatch,
8+
aws_cloudwatch_actions as cloudwatch_actions,
9+
aws_dynamodb as dynamodb,
10+
aws_sns as sns,
11+
aws_sns_subscriptions as subscriptions,
12+
CfnOutput as cfnoutput
13+
)
14+
from constructs import Construct
15+
16+
class DelayFifoQueueTestStack(Stack):
17+
18+
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
19+
super().__init__(scope, construct_id, **kwargs)
20+
21+
# create a dead letter queue called primary_queue_dlq
22+
primary_queue_dlq = sqs.Queue(self, "DelayFifoQueueDlq",
23+
visibility_timeout=Duration.seconds(60),
24+
fifo=True,
25+
content_based_deduplication=True
26+
)
27+
28+
# create an initial primary SQS FIFO queue with a visibility timeout of 60 seconds
29+
primary_queue = sqs.Queue(self, "DelayFifoQueue",
30+
visibility_timeout=Duration.seconds(60),
31+
fifo=True,
32+
content_based_deduplication=True,
33+
dead_letter_queue=sqs.DeadLetterQueue(
34+
max_receive_count=5,
35+
queue=primary_queue_dlq
36+
)
37+
)
38+
39+
# create a downstream SQS FIFO queue with a visibility timeout of 60 seconds
40+
downstream_queue = sqs.Queue(self, "DelayFifoQueueDownstream",
41+
visibility_timeout=Duration.seconds(60),
42+
fifo=True,
43+
content_based_deduplication=True
44+
)
45+
46+
# create a dynamodb table to store customer id and created timestamp
47+
customer_table = dynamodb.Table(self, "CustomerTable",
48+
table_name="DelayFifoQueueCustomerTable",
49+
partition_key=dynamodb.Attribute(name="customer_id", type=dynamodb.AttributeType.STRING),
50+
time_to_live_attribute="ttl"
51+
)
52+
53+
# create a Lambda function to process messages from the queue
54+
process_queue_function = lambda_.Function(self, "ProcessMessageLambda",
55+
runtime=lambda_.Runtime.PYTHON_3_9,
56+
code=lambda_.Code.from_asset("lambda"),
57+
handler="process_message.handler",
58+
environment={
59+
"QUEUE_URL": downstream_queue.queue_url,
60+
"TABLE_NAME": customer_table.table_name
61+
})
62+
63+
# create an SNS topic to send notifications when primary_queue_dlq is not empty
64+
dlq_size_sns_topic = sns.Topic(self, "PrimaryQueueDqlSizeAlertTopic")
65+
dlq_size_sns_topic.add_subscription(subscriptions.EmailSubscription("notification_address@email.com"))
66+
67+
# create a CloudWatch alarm if primary_queue_dlq is not empty
68+
dlq_size_alarm = cloudwatch.Alarm(self, "PrimaryQueueDqlSizeAlert",
69+
metric=cloudwatch.Metric(metric_name="ApproximateNumberOfMessagesVisible",
70+
namespace="AWS/SQS",
71+
dimensions_map={
72+
"QueueName": primary_queue_dlq.queue_name
73+
},
74+
statistic="Sum",
75+
period=Duration.seconds(60)
76+
),
77+
evaluation_periods=1,
78+
threshold=0,
79+
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD,
80+
treat_missing_data=cloudwatch.TreatMissingData.NOT_BREACHING
81+
)
82+
dlq_size_alarm.add_alarm_action(
83+
cloudwatch_actions.SnsAction(
84+
topic = dlq_size_sns_topic
85+
)
86+
)
87+
88+
89+
# create Lambda execution role that has access to receive messages from primary_queue queue
90+
process_queue_function.add_to_role_policy(iam.PolicyStatement(
91+
actions=["sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl"],
92+
resources=[primary_queue.queue_arn]
93+
))
94+
95+
# add to Lambda execution role policy to send messages to the downstream_queue queue
96+
process_queue_function.add_to_role_policy(iam.PolicyStatement(
97+
actions=["sqs:SendMessage"],
98+
resources=[downstream_queue.queue_arn]
99+
))
100+
101+
lambda_.EventSourceMapping(self, "ProcessMessageLambdaEventSourceMapping",
102+
event_source_arn=primary_queue.queue_arn,
103+
target=process_queue_function,
104+
batch_size=10,
105+
report_batch_item_failures=True
106+
)
107+
108+
# give permissions for the function to read and write to the dynamodb table
109+
customer_table.grant_read_write_data(process_queue_function)
110+
111+
cfnoutput(self, "DelayFifoQueueURL", value=primary_queue.queue_url)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
{
2+
"title": "Amazon SQS FIFO queue with controlled delay",
3+
"description": "Amazon SQS FIFO queue with delay using AWS Lambda and Amazon DynamoDB",
4+
"language": "Python",
5+
"level": "200",
6+
"framework": "CDK",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"This pattern shows how to introduce a delay between processing messages while maintaining order from an individual client. The message is sent sequentially to the downstream service for processing to minimize the consequences of unordered events."
11+
]
12+
},
13+
"gitHub": {
14+
"template": {
15+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sqs-fifo-delayed-queue-dynamodb",
16+
"templateURL": "serverless-patterns/sqs-fifo-delayed-queue-dynamodb",
17+
"projectFolder": "sqs-fifo-delayed-queue-dynamodb",
18+
"templateFile": "delay_fifo_queue_test/delay_fifo_queue_test_stack.py"
19+
}
20+
},
21+
"deploy": {
22+
"text": [
23+
"cdk deploy"
24+
]
25+
},
26+
"testing": {
27+
"text": [
28+
"See the GitHub repo for detailed testing instructions."
29+
]
30+
},
31+
"cleanup": {
32+
"text": [
33+
"Delete the stack: <code>cdk destroy</code>."
34+
]
35+
},
36+
"authors": [
37+
{
38+
"name": "Shaun Guo",
39+
"image": "https://media.licdn.com/dms/image/C5103AQG3KMyMdEIKpA/profile-displayphoto-shrink_800_800/0/1517283953925?e=1692835200&v=beta&t=AxJ9ST_8K_bw8nqTPDaJB2F5dnQspES9FuJ64DBScC8",
40+
"bio": "Shaun is a Senior Technical Account Manager at Amazon Web Services based in Australia",
41+
"linkedin": "shaun-guo"
42+
}
43+
]
44+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# create python lambda function
2+
import boto3
3+
import os
4+
import time
5+
6+
dynamodb = boto3.resource('dynamodb')
7+
sqs = boto3.client('sqs')
8+
table = dynamodb.Table(os.environ['TABLE_NAME'])
9+
10+
# Message delay and TTL for dynamodb item is 60 seconds
11+
message_delay_seconds = 60
12+
13+
def handler(event, context):
14+
batch_item_failures = []
15+
sqs_batch_response = {}
16+
17+
# iterate sqs messages based on message group id
18+
for record in event['Records']:
19+
# get message group id
20+
message_group_id = record['attributes']['MessageGroupId']
21+
22+
# get message body
23+
message_body = record['body']
24+
25+
# query records with customer_id as message_group_id
26+
try:
27+
response = table.get_item(
28+
Key={
29+
'customer_id': message_group_id
30+
})
31+
except:
32+
print("An error has occurred while fetching record from CustomerTable table.")
33+
batch_item_failures.append({"itemIdentifier": record['messageId']})
34+
continue
35+
36+
# if response does not return a record
37+
if ('Item' in response):
38+
# get the item
39+
response_item = response['Item']
40+
41+
# get the exprying time of the item from response_item
42+
item_ttl_epoch_seconds = response_item['ttl']
43+
44+
# if TTL has expired, send the message body to downstream sqs queue and update the dynamodb table with a new TTL, else place the item back on the delay queue
45+
if (item_ttl_epoch_seconds - int(time.time()) < 0):
46+
process_message(message_body, message_group_id)
47+
else:
48+
batch_item_failures.append({"itemIdentifier": record['messageId']})
49+
else:
50+
# if no records found, send message to downstream sqs queue and update the dynamodb table with a ttl
51+
try:
52+
process_message(message_body, message_group_id)
53+
except:
54+
batch_item_failures.append({"itemIdentifier": record['messageId']})
55+
56+
sqs_batch_response["batchItemFailures"] = batch_item_failures
57+
58+
return sqs_batch_response
59+
60+
def process_message(message_body, message_group_id):
61+
# send message to downstream sqs queue
62+
expiry_epoch_time = int(time.time()) + message_delay_seconds
63+
try:
64+
sqs.send_message(
65+
QueueUrl=os.environ['QUEUE_URL'],
66+
MessageBody=message_body,
67+
MessageGroupId=message_group_id
68+
)
69+
except:
70+
raise Exception("An error has occurred sending message to downstream queue.")
71+
72+
# Update Dynamodb table called CustomerTable with customer_id as partition key and created_timestamp as sort key with the ISO 8601 timestamp
73+
try:
74+
table.put_item(
75+
Item={
76+
'customer_id': message_group_id,
77+
'ttl': expiry_epoch_time
78+
}
79+
)
80+
except:
81+
raise Exception("An error has occurred inserting record to CustomerTable table.")
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pytest==6.2.5

0 commit comments

Comments
 (0)