Skip to content

Commit 5048d72

Browse files
committed
New serverless pattern - Serverless Messaging Redrive
1 parent 800cc9a commit 5048d72

File tree

5 files changed

+427
-0
lines changed

5 files changed

+427
-0
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import json
2+
import boto3
3+
import os
4+
import logging
5+
6+
# Set up logging
7+
logger = logging.getLogger()
8+
logger.setLevel(logging.INFO)
9+
10+
# Initialize AWS clients
11+
sqs = boto3.client('sqs')
12+
13+
# Environment variables
14+
MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL']
15+
FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL']
16+
17+
def can_fix_message(message):
18+
"""
19+
Determine if a message can be fixed automatically.
20+
21+
Extension points:
22+
1. Add validation for specific message formats
23+
2. Implement business-specific fix rules
24+
3. Add data transformation logic
25+
4. Implement retry strategies
26+
5. Add validation against external systems
27+
"""
28+
try:
29+
# Basic message validation
30+
# Add your validation logic here
31+
return True
32+
except Exception as e:
33+
logger.error(f"Validation error: {str(e)}")
34+
return False
35+
36+
def fix_message(message):
37+
"""
38+
Apply fixes to the message.
39+
40+
Extension points:
41+
1. Add data normalization
42+
2. Implement field-specific fixes
43+
3. Add data enrichment
44+
4. Implement format conversion
45+
5. Add validation rules
46+
"""
47+
try:
48+
fixed_message = message.copy()
49+
# Add your fix logic here
50+
fixed_message['wasFixed'] = True
51+
return fixed_message
52+
except Exception as e:
53+
logger.error(f"Fix error: {str(e)}")
54+
return None
55+
56+
def lambda_handler(event, context):
57+
"""
58+
Process messages and route them based on fixability.
59+
60+
Flow:
61+
1. Attempt to fix message
62+
2. If fixable -> Main Queue
63+
3. If unfixable -> Fatal DLQ
64+
65+
Extension points:
66+
1. Add more sophisticated routing logic
67+
2. Implement custom error handling
68+
3. Add message transformation
69+
4. Implement retry mechanisms
70+
5. Add monitoring and metrics
71+
"""
72+
processed_count = 0
73+
74+
for record in event['Records']:
75+
message_id = 'unknown' # Initialize message_id with default value
76+
77+
try:
78+
message = json.loads(record['body'])
79+
message_id = record.get('messageId', 'unknown')
80+
81+
logger.info(f"Processing message: {message_id}")
82+
83+
if can_fix_message(message):
84+
fixed_message = fix_message(message)
85+
if fixed_message:
86+
# Send to main queue
87+
sqs.send_message(
88+
QueueUrl=MAIN_QUEUE_URL,
89+
MessageBody=json.dumps(fixed_message)
90+
)
91+
logger.info(f"Fixed message sent to main queue: {message_id}")
92+
else:
93+
raise ValueError("Message fix failed")
94+
else:
95+
# Send to fatal DLQ
96+
message['failureReason'] = 'Message cannot be automatically fixed'
97+
sqs.send_message(
98+
QueueUrl=FATAL_DLQ_URL,
99+
MessageBody=json.dumps(message)
100+
)
101+
logger.warning(f"Message sent to fatal DLQ: {message_id}")
102+
103+
processed_count += 1
104+
105+
except Exception as e:
106+
logger.error(f"Error processing message {message_id}: {str(e)}")
107+
try:
108+
error_message = {
109+
'originalMessage': record['body'],
110+
'failureReason': str(e),
111+
'timestamp': context.invoked_function_arn
112+
}
113+
sqs.send_message(
114+
QueueUrl=FATAL_DLQ_URL,
115+
MessageBody=json.dumps(error_message)
116+
)
117+
logger.error(f"Error message sent to fatal DLQ: {message_id}")
118+
except Exception as fatal_e:
119+
logger.critical(f"Fatal DLQ error: {str(fatal_e)}")
120+
raise
121+
122+
return {
123+
'statusCode': 200,
124+
'body': json.dumps({
125+
'processedMessages': processed_count
126+
})
127+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
boto3==1.26.137
2+
jsonschema==4.17.3
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import json
2+
import logging
3+
4+
# Set up logging
5+
logger = logging.getLogger()
6+
logger.setLevel(logging.INFO)
7+
8+
def validate_message_structure(message):
9+
"""
10+
Validate message structure and required fields.
11+
Args:
12+
message: Dictionary containing message data
13+
Returns:
14+
bool: True if valid message structure, False otherwise
15+
"""
16+
required_fields = ['messageType', 'payload', 'timestamp']
17+
return all(field in message for field in required_fields)
18+
19+
def process_message(message):
20+
"""
21+
Process the message content.
22+
Args:
23+
message: Dictionary containing message data
24+
Returns:
25+
bool: True if processing successful, False otherwise
26+
"""
27+
try:
28+
# Validate message structure
29+
if not validate_message_structure(message):
30+
logger.error("Message missing required fields")
31+
raise ValueError("Invalid message structure")
32+
33+
message_type = message['messageType']
34+
payload = message['payload']
35+
36+
# Validate message type
37+
valid_types = ['TYPE_A', 'TYPE_B', 'TYPE_C']
38+
if message_type not in valid_types:
39+
logger.error(f"Invalid message type: {message_type}")
40+
raise ValueError(f"Invalid message type: {message_type}")
41+
42+
# Check for downstream system status
43+
if 'systemStatus' in message and message['systemStatus'].lower() == 'unavailable':
44+
logger.error("Target system is unavailable")
45+
raise ValueError("DOWNSTREAM_ERROR: Target system unavailable")
46+
47+
# Process the message based on type
48+
logger.info(f"Processing message type: {message_type}")
49+
50+
# Add type-specific processing logic here
51+
if message_type == 'TYPE_A':
52+
# Process TYPE_A messages
53+
pass
54+
elif message_type == 'TYPE_B':
55+
# Process TYPE_B messages
56+
pass
57+
elif message_type == 'TYPE_C':
58+
# Process TYPE_C messages
59+
pass
60+
61+
return True
62+
63+
except Exception as e:
64+
logger.error(f"Error processing message: {str(e)}")
65+
raise
66+
67+
def lambda_handler(event, context):
68+
"""
69+
Main Lambda handler function.
70+
Args:
71+
event: Lambda event object
72+
context: Lambda context object
73+
Returns:
74+
dict: Response object
75+
"""
76+
logger.info(f"Processing {len(event['Records'])} messages")
77+
78+
processed_count = 0
79+
failed_count = 0
80+
downstream_errors = 0
81+
82+
for record in event['Records']:
83+
try:
84+
# Parse the message body
85+
message = json.loads(record['body'])
86+
87+
# Process the message
88+
if process_message(message):
89+
processed_count += 1
90+
logger.info(f"Successfully processed message: {message.get('messageId', 'unknown')}")
91+
else:
92+
failed_count += 1
93+
logger.warning(f"Message processing returned False: {message.get('messageId', 'unknown')}")
94+
95+
except json.JSONDecodeError as e:
96+
failed_count += 1
97+
logger.error(f"Invalid JSON in message: {str(e)}")
98+
raise
99+
100+
except ValueError as e:
101+
if "DOWNSTREAM_ERROR" in str(e):
102+
downstream_errors += 1
103+
logger.error("Downstream error detected")
104+
raise
105+
failed_count += 1
106+
logger.error(f"Validation error: {str(e)}")
107+
raise
108+
109+
except Exception as e:
110+
failed_count += 1
111+
logger.error(f"Unexpected error processing message: {str(e)}")
112+
raise
113+
114+
return {
115+
'statusCode': 200,
116+
'body': json.dumps({
117+
'processed': processed_count,
118+
'failed': failed_count,
119+
'downstream_errors': downstream_errors
120+
})
121+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
boto3==1.26.137
2+
jsonschema==4.17.3

0 commit comments

Comments
 (0)