Skip to content

Commit e7c7232

Browse files
committed
Updated code based upon recommendations
1 parent 49e0acf commit e7c7232

File tree

5 files changed

+228
-201
lines changed

5 files changed

+228
-201
lines changed

serverless-message-processing/README.md

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,47 @@ An adaptable pattern for message processing using AWS serverless services, featu
1111
## Basic Flow
1212
1. Messages enter through API Gateway
1313
2. Main queue receives messages
14-
3. Processor Lambda handles messages
15-
4. Failed messages route to DLQs
16-
5. Decision maker attempts an automated recovery
14+
3. Lambda function polls the main queue using Event Source Mappings (ESMs) and handles the messages.
15+
Read more about how Lambda synchronously processes queue messages in this [documentation.](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html)
16+
4. Failed messages route to DLQs -In this case a failed message would be a malformed email, however this can be adapted to other use cases.
17+
5. Decision maker attempts an automated recovery -In this sample, we remediate common email malform issues including whitespace and typos in domain extensions.
1718

1819
## Deployment
1920
# Build the SAM application
21+
The ```sam build ``` command prepares an application for subsequent steps in the developer workflow, such as local testing or deploying to AWS.
22+
2023
```
2124
sam build
2225
```
2326
# Deploy the application
27+
The ```sam deploy``` command deploys an application to the AWS Cloud using AWS CloudFormation. The ```--guided``` option is to have the AWS SAM CLI use prompts to guide you through the deployment.
28+
2429
```
2530
sam deploy --guided
2631
```
2732

2833
## Key Features
2934
- Automatic retry mechanism
3035
- Segregation of recoverable/fatal errors
31-
- Extensible processing logic
36+
- Processing logic with the potential for points of adaptation
3237

3338
## API Reference
3439
# Send Message
35-
```
3640

37-
POST /message
38-
Content-Type: application/json
39-
```
41+
The following is an example API call that you can try with your own endpoint.
42+
4043
```
41-
{
42-
"messageType": "TYPE_A|TYPE_B|TYPE_C",
43-
"payload": {},
44-
"timestamp": "ISO8601_TIMESTAMP"
45-
}
44+
45+
curl -X POST \
46+
'https://\${endpoint}/prod/message' \
47+
-H 'Content-Type: application/json' \
48+
-d '{
49+
"messageType": "TYPE_A",
50+
"body": {
51+
"email": "user@@example.com"
52+
},
53+
"timestamp": "2023-11-22T10:30:00Z"
54+
}'
4655
```
4756

4857

@@ -54,5 +63,12 @@ Content-Type: application/json
5463
- Monitoring requirements
5564
- API Design
5665

57-
## Note
58-
This is a sample pattern. Adapt security, scaling, and processing logic according to your requirements.
66+
## Cleanup
67+
1. Delete the SAM template
68+
```
69+
sam delete
70+
```
71+
2. Confirm the stack has been deleted
72+
```
73+
aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus"
74+
```

serverless-message-processing/example-pattern.json

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
{
2-
"title": "Step Functions to Athena",
2+
"title": "Serverless Messaging Redrive",
33
"description": "Create a Step Functions workflow to query Amazon Athena.",
44
"language": "Python",
55
"level": "200",
66
"framework": "SAM",
77
"introBox": {
88
"headline": "How it works",
99
"text": [
10-
"This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions",
11-
"The system automatically handles message validation, applies fixes where possible, and routes messages to appropriate queues based on their fixability.",
12-
"It has built-in error handling and detailed logging, it provides a robust framework for message processing that can be easily extended for specific business needs.",
13-
"This pattern uses AWS Lambda for processing, multiple SQS queues for message routing, and includes 2 dead-letter queue (DLQ) for messages requiring human intervention or for auto-remediation."
14-
]
10+
"This sample project demonstrates how to use a serverless solution for processing and fixing malformed messages using SQS queues and Lambda functions" ]
1511
},
1612
"gitHub": {
1713
"template": {

serverless-message-processing/functions/decision_maker/app.py

Lines changed: 119 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -15,61 +15,72 @@
1515
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
1616
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']
1717

18-
# Email validation pattern
19-
EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
20-
2118
def fix_email(email):
2219
"""
2320
Attempt to fix common email format issues
24-
Can be amended to other scenarios e.g. Downstream issues
21+
Args:
22+
email: String containing malformed email
23+
Returns:
24+
str: Fixed email or original if unfixable
2525
"""
26-
# Remove multiple @ symbols, keep the last one
27-
if email.count('@') > 1:
28-
parts = email.split('@')
29-
email = f"{parts[0]}@{parts[-1]}"
30-
31-
# Remove spaces
32-
email = email.strip().replace(' ', '')
33-
34-
# Fix common typos in domain extensions
35-
common_fixes = {
36-
'.con': '.com',
37-
'.vom': '.com',
38-
'.comm': '.com',
39-
'.orgg': '.org',
40-
'.nett': '.net'
41-
}
42-
43-
for wrong, right in common_fixes.items():
44-
if email.endswith(wrong):
45-
email = email[:-len(wrong)] + right
46-
47-
return email
26+
try:
27+
logger.info(f"Starting email fix attempt for: {email}")
28+
29+
# Remove whitespace
30+
email = email.strip()
31+
32+
# Handle multiple @ symbols
33+
if email.count('@') > 1:
34+
parts = email.split('@')
35+
email = f"{parts[0]}@{parts[-1]}"
36+
logger.info(f"Fixed multiple @ symbols. Result: {email}")
37+
38+
# Common domain typo fixes
39+
domain_fixes = {
40+
'.con': '.com',
41+
'.vom': '.com',
42+
'.comm': '.com',
43+
'.orgg': '.org',
44+
'.nett': '.net',
45+
'.ckm': '.com',
46+
'.cm': '.com'
47+
}
48+
49+
original_email = email
50+
for wrong, right in domain_fixes.items():
51+
if email.endswith(wrong):
52+
email = email[:-len(wrong)] + right
53+
logger.info(f"Fixed domain from {wrong} to {right}. Before: {original_email}, After: {email}")
54+
break
55+
56+
return email
57+
except Exception as e:
58+
logger.error(f"Error fixing email: {str(e)}")
59+
return None
4860

49-
def can_fix_email(message):
61+
def validate_fixed_email(email):
5062
"""
51-
Check if the email in the message can be fixed
63+
Validate fixed email format.
64+
Args:
65+
email: String containing fixed email
66+
Returns:
67+
bool: True if valid email format, False otherwise
5268
"""
53-
if 'email' not in message:
54-
return False
55-
56-
email = message['email']
57-
fixed_email = fix_email(email)
58-
59-
return bool(re.match(EMAIL_PATTERN, fixed_email))
60-
69+
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
70+
return bool(re.match(email_pattern, email))
6171

6272
def lambda_handler(event, context):
6373
"""
64-
Processes messages from a DLQ that have already failed to be automatically processed,
65-
and attempts automated remediation and redelivery of the messages back to the main queue.
66-
If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical
67-
approach of human intervention is required.
68-
74+
Processes messages from a DLQ that have failed validation,
75+
attempting to fix common email format issues.
76+
If fixed successfully, messages are sent back to the main queue.
77+
If unfixable, messages are sent to a fatal DLQ.
78+
6979
Flow:
70-
1. Attempt to fix message
71-
2. If fixable -> Main Queue
72-
3. If unfixable -> Fatal DLQ
80+
1. Extract email from failed message
81+
2. Attempt to fix common issues
82+
3. If fixed → Main Queue
83+
4. If unfixable → Fatal DLQ
7384
7485
Extension points:
7586
1. Add more sophisticated routing logic- including a delay queue
@@ -78,75 +89,104 @@ def lambda_handler(event, context):
7889
4. Implement retry mechanisms
7990
5. Add monitoring and metrics
8091
81-
"""
92+
Args:
93+
event: Lambda event object containing SQS messages
94+
context: Lambda context object
95+
Returns:
96+
dict: Processing summary with counts
97+
"""
8298
processed_count = 0
99+
fixed_count = 0
100+
fatal_count = 0
101+
102+
logger.info(f"Starting to process batch of {len(event['Records'])} messages")
83103

84104
for record in event['Records']:
85105
try:
86-
# Parse the message body
106+
# Parse the failed message
87107
message = json.loads(record['body'])
88-
original_message_id = record.get('messageId', 'unknown')
108+
original_message_id = message.get('messageId', 'unknown')
89109

90-
logger.info(f"Processing failed message: {original_message_id}")
110+
# Detailed message content logging
111+
logger.info(f"Processing message content: {json.dumps(message, indent=2)}")
91112

92-
93-
94-
95-
# Option A: Try to fix malformed email
113+
# Check if message has already been remediated
114+
if 'remediation' in message:
115+
logger.info("Message already remediated, skipping processing")
116+
continue
96117

97-
if can_fix_email(message) and not re.match(EMAIL_PATTERN, message['email']):
98-
fixed_email = fix_email(message['email'])
99-
logger.info(f"Fixed email from '{message['email']}' to '{fixed_email}'")
118+
# Extract email from payload
119+
if 'payload' in message and 'email' in message['payload']:
120+
original_email = message['payload']['email']
100121

101-
# Update the message with fixed email
102-
message['email'] = fixed_email
103-
message['emailWasFixed'] = True
122+
# Attempt to fix email
123+
fixed_email = fix_email(original_email)
104124

105-
# Send back to main queue
106-
sqs.send_message(
107-
QueueUrl=MAIN_QUEUE_URL,
108-
MessageBody=json.dumps(message)
109-
)
110-
111-
logger.info(f"Sent fixed message back to main queue: {original_message_id}")
112-
113-
# Option B: Cannot fix - send to fatal DLQ
125+
if fixed_email and validate_fixed_email(fixed_email):
126+
# Update message with fixed email
127+
message['payload']['email'] = fixed_email
128+
message['remediation'] = {
129+
'original_email': original_email,
130+
'fixed_email': fixed_email,
131+
'timestamp': context.invoked_function_arn
132+
}
133+
134+
# Send back to main queue
135+
sqs.send_message(
136+
QueueUrl=MAIN_QUEUE_URL,
137+
MessageBody=json.dumps(message)
138+
)
139+
fixed_count += 1
140+
else:
141+
# Send to fatal DLQ if unfixable
142+
message['failureReason'] = 'Email could not be remediated'
143+
sqs.send_message(
144+
QueueUrl=FATAL_DLQ_URL,
145+
MessageBody=json.dumps(message)
146+
)
147+
fatal_count += 1
114148
else:
115-
logger.warning(f"Message cannot be fixed, sending to fatal DLQ: {original_message_id}")
116-
117-
# Add failure reason if not present
118-
if 'failureReason' not in message:
119-
message['failureReason'] = 'Unrecoverable error - could not fix message'
120-
121-
# Send to fatal DLQ
149+
# Send to fatal DLQ if message structure is invalid
150+
message['failureReason'] = 'Invalid message structure - missing email in payload'
122151
sqs.send_message(
123152
QueueUrl=FATAL_DLQ_URL,
124153
MessageBody=json.dumps(message)
125154
)
126-
155+
fatal_count += 1
156+
127157
processed_count += 1
128158

129159
except Exception as e:
130160
logger.error(f"Error processing message {original_message_id}: {str(e)}")
131-
# If we can't process the decision, send to fatal DLQ
132161
try:
133162
error_message = {
134163
'originalMessage': record['body'],
135-
'failureReason': f"Decision maker error: {str(e)}",
164+
'failureReason': f"Remediation error: {str(e)}",
136165
'timestamp': context.invoked_function_arn
137166
}
138167
sqs.send_message(
139168
QueueUrl=FATAL_DLQ_URL,
140169
MessageBody=json.dumps(error_message)
141170
)
142-
171+
fatal_count += 1
143172
except Exception as fatal_e:
144173
logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}")
145174
raise
146175

176+
# Execution summary
177+
logger.info(f"""
178+
=== Execution Summary ===
179+
Messages Processed: {processed_count}
180+
Messages Fixed: {fixed_count}
181+
Messages Fatal: {fatal_count}
182+
========================
183+
""")
184+
147185
return {
148186
'statusCode': 200,
149187
'body': json.dumps({
150-
'processedMessages': processed_count
188+
'processed': processed_count,
189+
'fixed': fixed_count,
190+
'fatal': fatal_count
151191
})
152192
}

0 commit comments

Comments
 (0)