|
1 | 1 | import json |
| 2 | +import re |
2 | 3 | import boto3 |
3 | 4 | import os |
4 | 5 | import logging |
|
14 | 15 | MAIN_QUEUE_URL = os.environ['MAIN_QUEUE_URL'] |
15 | 16 | FATAL_DLQ_URL = os.environ['FATAL_DLQ_URL'] |
16 | 17 |
|
17 | | -def can_fix_message(message): |
| 18 | +# Email validation pattern |
| 19 | +EMAIL_PATTERN = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
| 20 | + |
| 21 | +def fix_email(email): |
18 | 22 | """ |
19 | | - Determine if a message can be fixed automatically. |
20 | | - |
21 | | - Extension points: |
22 | | - 1. Add validation for specific message formats |
23 | | - 2. Implement business-specific fix rules |
24 | | - 3. Add data transformation logic |
25 | | - 4. Implement retry strategies |
26 | | - 5. Add validation against external systems |
| 23 | + Attempt to fix common email format issues |
| 24 | + Can be amended to other scenarios e.g. Downstream issues |
27 | 25 | """ |
28 | | - try: |
29 | | - # Basic message validation |
30 | | - # Add your validation logic here |
31 | | - return True |
32 | | - except Exception as e: |
33 | | - logger.error(f"Validation error: {str(e)}") |
34 | | - return False |
| 26 | + # Remove multiple @ symbols, keep the last one |
| 27 | + if email.count('@') > 1: |
| 28 | + parts = email.split('@') |
| 29 | + email = f"{parts[0]}@{parts[-1]}" |
| 30 | + |
| 31 | + # Remove spaces |
| 32 | + email = email.strip().replace(' ', '') |
| 33 | + |
| 34 | + # Fix common typos in domain extensions |
| 35 | + common_fixes = { |
| 36 | + '.con': '.com', |
| 37 | + '.vom': '.com', |
| 38 | + '.comm': '.com', |
| 39 | + '.orgg': '.org', |
| 40 | + '.nett': '.net' |
| 41 | + } |
| 42 | + |
| 43 | + for wrong, right in common_fixes.items(): |
| 44 | + if email.endswith(wrong): |
| 45 | + email = email[:-len(wrong)] + right |
| 46 | + |
| 47 | + return email |
35 | 48 |
|
36 | | -def fix_message(message): |
| 49 | +def can_fix_email(message): |
37 | 50 | """ |
38 | | - Apply fixes to the message. |
39 | | - |
40 | | - Extension points: |
41 | | - 1. Add data normalization |
42 | | - 2. Implement field-specific fixes |
43 | | - 3. Add data enrichment |
44 | | - 4. Implement format conversion |
45 | | - 5. Add validation rules |
| 51 | + Check if the email in the message can be fixed |
46 | 52 | """ |
47 | | - try: |
48 | | - fixed_message = message.copy() |
49 | | - # Add your fix logic here |
50 | | - fixed_message['wasFixed'] = True |
51 | | - return fixed_message |
52 | | - except Exception as e: |
53 | | - logger.error(f"Fix error: {str(e)}") |
54 | | - return None |
| 53 | + if 'email' not in message: |
| 54 | + return False |
| 55 | + |
| 56 | + email = message['email'] |
| 57 | + fixed_email = fix_email(email) |
| 58 | + |
| 59 | + return bool(re.match(EMAIL_PATTERN, fixed_email)) |
| 60 | + |
55 | 61 |
|
56 | 62 | def lambda_handler(event, context): |
57 | 63 | """ |
58 | | - Process messages and route them based on fixability. |
59 | | - |
| 64 | + Processes messages from a DLQ that have already failed to be automatically processed, |
| 65 | + and attempts automated remediation and redelivery of the messages back to the main queue. |
| 66 | + If no suitable fixes can be applied, messages end up in a fatal DLQ where the typical |
| 67 | + approach of human intervention is required. |
| 68 | +
|
60 | 69 | Flow: |
61 | 70 | 1. Attempt to fix message |
62 | 71 | 2. If fixable -> Main Queue |
63 | 72 | 3. If unfixable -> Fatal DLQ |
64 | 73 | |
65 | 74 | Extension points: |
66 | | - 1. Add more sophisticated routing logic |
| 75 | + 1. Add more sophisticated routing logic- including a delay queue |
67 | 76 | 2. Implement custom error handling |
68 | 77 | 3. Add message transformation |
69 | 78 | 4. Implement retry mechanisms |
70 | 79 | 5. Add monitoring and metrics |
71 | | - """ |
| 80 | +
|
| 81 | + """ |
72 | 82 | processed_count = 0 |
73 | 83 |
|
74 | 84 | for record in event['Records']: |
75 | | - message_id = 'unknown' # Initialize message_id with default value |
76 | | - |
77 | 85 | try: |
| 86 | + # Parse the message body |
78 | 87 | message = json.loads(record['body']) |
79 | | - message_id = record.get('messageId', 'unknown') |
| 88 | + original_message_id = record.get('messageId', 'unknown') |
80 | 89 |
|
81 | | - logger.info(f"Processing message: {message_id}") |
| 90 | + logger.info(f"Processing failed message: {original_message_id}") |
82 | 91 |
|
83 | | - if can_fix_message(message): |
84 | | - fixed_message = fix_message(message) |
85 | | - if fixed_message: |
86 | | - # Send to main queue |
87 | | - sqs.send_message( |
88 | | - QueueUrl=MAIN_QUEUE_URL, |
89 | | - MessageBody=json.dumps(fixed_message) |
90 | | - ) |
91 | | - logger.info(f"Fixed message sent to main queue: {message_id}") |
92 | | - else: |
93 | | - raise ValueError("Message fix failed") |
| 92 | + |
| 93 | + |
| 94 | + |
| 95 | + # Option A: Try to fix malformed email |
| 96 | + |
| 97 | + if can_fix_email(message) and not re.match(EMAIL_PATTERN, message['email']): |
| 98 | + fixed_email = fix_email(message['email']) |
| 99 | + logger.info(f"Fixed email from '{message['email']}' to '{fixed_email}'") |
| 100 | + |
| 101 | + # Update the message with fixed email |
| 102 | + message['email'] = fixed_email |
| 103 | + message['emailWasFixed'] = True |
| 104 | + |
| 105 | + # Send back to main queue |
| 106 | + sqs.send_message( |
| 107 | + QueueUrl=MAIN_QUEUE_URL, |
| 108 | + MessageBody=json.dumps(message) |
| 109 | + ) |
| 110 | + |
| 111 | + logger.info(f"Sent fixed message back to main queue: {original_message_id}") |
| 112 | + |
| 113 | + # Option B: Cannot fix - send to fatal DLQ |
94 | 114 | else: |
| 115 | + logger.warning(f"Message cannot be fixed, sending to fatal DLQ: {original_message_id}") |
| 116 | + |
| 117 | + # Add failure reason if not present |
| 118 | + if 'failureReason' not in message: |
| 119 | + message['failureReason'] = 'Unrecoverable error - could not fix message' |
| 120 | + |
95 | 121 | # Send to fatal DLQ |
96 | | - message['failureReason'] = 'Message cannot be automatically fixed' |
97 | 122 | sqs.send_message( |
98 | 123 | QueueUrl=FATAL_DLQ_URL, |
99 | 124 | MessageBody=json.dumps(message) |
100 | 125 | ) |
101 | | - logger.warning(f"Message sent to fatal DLQ: {message_id}") |
102 | | - |
| 126 | + |
103 | 127 | processed_count += 1 |
104 | 128 |
|
105 | 129 | except Exception as e: |
106 | | - logger.error(f"Error processing message {message_id}: {str(e)}") |
| 130 | + logger.error(f"Error processing message {original_message_id}: {str(e)}") |
| 131 | + # If we can't process the decision, send to fatal DLQ |
107 | 132 | try: |
108 | 133 | error_message = { |
109 | 134 | 'originalMessage': record['body'], |
110 | | - 'failureReason': str(e), |
| 135 | + 'failureReason': f"Decision maker error: {str(e)}", |
111 | 136 | 'timestamp': context.invoked_function_arn |
112 | 137 | } |
113 | 138 | sqs.send_message( |
114 | 139 | QueueUrl=FATAL_DLQ_URL, |
115 | 140 | MessageBody=json.dumps(error_message) |
116 | 141 | ) |
117 | | - logger.error(f"Error message sent to fatal DLQ: {message_id}") |
| 142 | + |
118 | 143 | except Exception as fatal_e: |
119 | | - logger.critical(f"Fatal DLQ error: {str(fatal_e)}") |
| 144 | + logger.critical(f"Could not send to fatal DLQ: {str(fatal_e)}") |
120 | 145 | raise |
121 | 146 |
|
122 | 147 | return { |
|
0 commit comments