Skip to content

Commit b0132d8

Browse files
2 parents a38e88b + d9f2d94 commit b0132d8

File tree

5 files changed

+149
-3
lines changed

5 files changed

+149
-3
lines changed

msk-lambda-schema-avro-java-sam/README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,44 @@ The Key and Value are base64 encoded and have to be decoded. A message can also
160160

161161
The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs.
162162

163+
### Message Filtering with Event Source Mapping
164+
165+
This sample application demonstrates how to use event source mapping filters with Amazon MSK and Lambda. The producer Lambda function generates contacts with zip codes that start with either "1000" or "2000" (with approximately 50% probability for each). However, the consumer Lambda function is configured to only process messages where the zip code starts with "1000".
166+
167+
#### Filter Configuration
168+
169+
The filter is configured in the SAM template using the `FilterCriteria` property of the MSK event source mapping:
170+
171+
```yaml
172+
FilterCriteria:
173+
Filters:
174+
- Pattern: '{ "value": { "zip": [ { "prefix": "1000" } ] }}'
175+
```
176+
177+
This filter pattern instructs the event source mapping to only send messages to the Lambda function if the message value contains a "zip" field that starts with "1000".
178+
179+
#### Verifying the Filter Behavior
180+
181+
To verify that the filter is working correctly, follow these steps:
182+
183+
1. **Invoke the producer Lambda function** using one of the methods described above.
184+
185+
2. **Check the producer function logs** in CloudWatch:
186+
- Navigate to the CloudWatch Logs console
187+
- Find the log group for the producer function (`/aws/lambda/msk-lambda-schema-avro-java-sam-LambdaMSKProducerJavaFunction-XXXXXXXXXXXX`)
188+
- Open the most recent log stream
189+
- Look for the "ZIP CODE DISTRIBUTION SUMMARY" section, which shows how many messages were generated with zip codes starting with "1000" and how many with "2000"
190+
- You should see that the producer generated a mix of both zip code types
191+
192+
3. **Check the consumer function logs** in CloudWatch:
193+
- Navigate to the CloudWatch Logs console
194+
- Find the log group for the consumer function (`/aws/lambda/msk-lambda-schema-avro-java-sam-LambdaMSKConsumerJavaFunction-XXXXXXXXXXXX`)
195+
- Open the most recent log stream
196+
- You should see that the consumer only processed messages with zip codes starting with "1000"
197+
- Messages with zip codes starting with "2000" were filtered out by the event source mapping and never reached the Lambda function
198+
199+
This demonstrates how event source mapping filters can be used to efficiently process only the messages that match specific criteria, reducing Lambda invocation costs and processing overhead.
200+
163201
## Cleanup
164202

165203
You can first clean-up the Lambda function by running the sam delete command

msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public String handleRequest(KafkaEvent event, Context context) {
4141

4242
String response = new String("200 OK");
4343
this.listOfMessages = new ArrayList<KafkaMessage>();
44+
45+
// Counters for zip code patterns
46+
int zip1000Count = 0;
47+
int zip2000Count = 0;
4448
//Incoming KafkaEvent object has a property called records that is a map
4549
//Each key in the map is a combination of a topic and a partition
4650
Map<String, List<KafkaEventRecord>> record=event.getRecords();
@@ -128,6 +132,33 @@ public String handleRequest(KafkaEvent event, Context context) {
128132
try {
129133
decodedValue = new String(decodedValueBytes);
130134
logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue));
135+
136+
// Add more detailed logging for AVRO messages
137+
logger.log("=== AVRO MESSAGE DETAILS ===");
138+
logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:");
139+
140+
// Try to extract some common fields from the AVRO binary data
141+
// This is a simple approach to show some readable content
142+
StringBuilder readableContent = new StringBuilder();
143+
for (int i = 0; i < decodedValueBytes.length; i++) {
144+
// Skip non-printable characters
145+
if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) {
146+
readableContent.append((char)decodedValueBytes[i]);
147+
}
148+
}
149+
150+
String readableString = readableContent.toString();
151+
logger.log("Readable content extracted from AVRO: " + readableString);
152+
153+
// Check for zip code patterns
154+
if (readableString.contains("1000")) {
155+
logger.log("FOUND ZIP CODE STARTING WITH 1000");
156+
}
157+
if (readableString.contains("2000")) {
158+
logger.log("FOUND ZIP CODE STARTING WITH 2000");
159+
}
160+
161+
logger.log("=== END AVRO MESSAGE DETAILS ===");
131162
} catch (Exception e) {
132163
logger.log("ERROR converting bytes to string: " + e.getMessage());
133164
decodedValue = "Error decoding: " + e.getMessage();
@@ -178,9 +209,39 @@ public String handleRequest(KafkaEvent event, Context context) {
178209
// as well as in Json format using gson.toJson function
179210
logger.log("Received this message from Kafka - " + thisMessage.toString());
180211
logger.log("Message in JSON format : " + gson.toJson(thisMessage));
212+
213+
// Add a more readable summary of the message
214+
logger.log("=== MESSAGE SUMMARY ===");
215+
logger.log("Topic: " + thisMessage.getTopic());
216+
logger.log("Partition: " + thisMessage.getPartition());
217+
logger.log("Offset: " + thisMessage.getOffset());
218+
logger.log("Key: " + thisMessage.getDecodedKey());
219+
220+
// Check for zip code patterns in the decoded value
221+
String decodedValueStr = thisMessage.getDecodedValue();
222+
if (decodedValueStr != null) {
223+
if (decodedValueStr.contains("1000")) {
224+
logger.log("ZIP CODE: Found 1000 pattern in message");
225+
zip1000Count++;
226+
}
227+
if (decodedValueStr.contains("2000")) {
228+
logger.log("ZIP CODE: Found 2000 pattern in message");
229+
zip2000Count++;
230+
}
231+
}
232+
233+
logger.log("=== END MESSAGE SUMMARY ===");
181234
}
182235
}
183236
logger.log("All Messages in this batch = " + gson.toJson(listOfMessages));
237+
238+
// Log summary of zip code distribution
239+
logger.log("========== ZIP CODE DISTRIBUTION SUMMARY ==========");
240+
logger.log("Messages with zip code containing 1000: " + zip1000Count);
241+
logger.log("Messages with zip code containing 2000: " + zip2000Count);
242+
logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count));
243+
logger.log("====================================================");
244+
184245
logger.log("========== LAMBDA FUNCTION COMPLETED ==========");
185246
return response;
186247
}

msk-lambda-schema-avro-java-sam/kafka_event_producer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/AvroProducerHandler.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public String handleRequest(Map<String, Object> event, Context context) {
2929
LambdaLogger logger = context.getLogger();
3030
logger.log("Received event: " + gson.toJson(event));
3131

32+
// Initialize counters for zip code distribution
33+
int messageCount = 10;
34+
int zip1000Count = 0;
35+
int zip2000Count = 0;
36+
3237
try {
3338
// Get environment variables
3439
String mskClusterArn = System.getenv("MSK_CLUSTER_ARN");
@@ -42,6 +47,9 @@ public String handleRequest(Map<String, Object> event, Context context) {
4247
throw new RuntimeException("Required environment variables not set: MSK_CLUSTER_ARN, KAFKA_TOPIC, CONTACT_SCHEMA_NAME");
4348
}
4449

50+
// Log that we're generating zip codes with different prefixes
51+
logger.log("Generating contacts with zip codes starting with 1000 (50% chance) or 2000 (50% chance)");
52+
4553
// Create a Contact object from the input event or use default values
4654
Contact contact = createContactFromEvent(event);
4755
logger.log("Created contact: " + gson.toJson(contact));
@@ -73,7 +81,6 @@ public String handleRequest(Map<String, Object> event, Context context) {
7381
logger.log("Schema definition: " + schemaDefinition);
7482

7583
// Send 10 messages
76-
int messageCount = 10;
7784
logger.log("Sending " + messageCount + " AVRO messages to topic: " + kafkaTopic);
7885

7986
for (int i = 0; i < messageCount; i++) {
@@ -90,13 +97,34 @@ public String handleRequest(Map<String, Object> event, Context context) {
9097
logger.log("Sending contact #" + (i+1) + ": " + gson.toJson(messageContact));
9198
logger.log("AVRO record #" + (i+1) + ": " + messageAvroRecord.toString());
9299

100+
// Log the zip code prefix for distribution tracking
101+
String zipCode = messageContact.getZip();
102+
if (zipCode != null && zipCode.length() >= 4) {
103+
String prefix = zipCode.substring(0, 4);
104+
logger.log("Contact #" + (i+1) + " zip code prefix: " + prefix);
105+
106+
// Count zip codes by prefix
107+
if ("1000".equals(prefix)) {
108+
zip1000Count++;
109+
} else if ("2000".equals(prefix)) {
110+
zip2000Count++;
111+
}
112+
}
113+
93114
// Send the message
94115
KafkaProducerHelper.sendAvroMessage(producer, kafkaTopic, messageKey, messageAvroRecord);
95116
logger.log("Successfully sent AVRO message #" + (i+1) + " to topic: " + kafkaTopic);
96117
}
118+
119+
// Log summary of zip code distribution
120+
logger.log("ZIP CODE DISTRIBUTION SUMMARY:");
121+
logger.log("Messages with zip code starting with 1000: " + zip1000Count);
122+
logger.log("Messages with zip code starting with 2000: " + zip2000Count);
123+
logger.log("Other zip code formats: " + (messageCount - zip1000Count - zip2000Count));
97124
}
98125

99-
return "Successfully sent 10 AVRO messages to Kafka topic: " + kafkaTopic;
126+
return "Successfully sent " + messageCount + " AVRO messages to Kafka topic: " + kafkaTopic +
127+
" (Zip codes: " + zip1000Count + " with prefix 1000, " + zip2000Count + " with prefix 2000)";
100128
} catch (Exception e) {
101129
logger.log("Error sending AVRO message: " + e.getMessage());
102130
e.printStackTrace();
@@ -121,7 +149,17 @@ private Contact createContactFromEvent(Map<String, Object> event) {
121149
contact.setCity(getStringValue(event, "city", "AnyCity"));
122150
contact.setCounty(getStringValue(event, "county", "AnyCounty"));
123151
contact.setState(getStringValue(event, "state", "AnyState"));
124-
contact.setZip(getStringValue(event, "zip", "1000" + randomDigit()));
152+
153+
// Generate zip code starting with 1000 50% of the time and 2000 the other 50%
154+
if (event.containsKey("zip") && event.get("zip") != null) {
155+
// If zip is provided in the event, use it as is
156+
contact.setZip(event.get("zip").toString());
157+
} else {
158+
// 50% chance for each prefix
159+
String prefix = Math.random() < 0.5 ? "1000" : "2000";
160+
contact.setZip(prefix + randomDigit());
161+
}
162+
125163
contact.setHomePhone(getStringValue(event, "homePhone", "555-123-" + randomDigits(4)));
126164
contact.setCellPhone(getStringValue(event, "cellPhone", "555-456-" + randomDigits(4)));
127165
contact.setEmail(getStringValue(event, "email", "user-" + randomSuffix() + "@example.com"));

msk-lambda-schema-avro-java-sam/template.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ Resources:
5050
DestinationConfig:
5151
OnFailure:
5252
Destination: !GetAtt ConsumerDLQ.Arn
53+
FilterCriteria:
54+
Filters:
55+
- Pattern: '{ "value": { "zip": [ { "prefix": "2000" } ] }}'
5356
Policies:
5457
- Statement:
5558
- Sid: KafkaClusterPermissionsPolicy
@@ -198,12 +201,15 @@ Parameters:
198201
VpcId:
199202
Type: String
200203
Description: Enter the VPC ID where the MSK cluster is deployed
204+
Default: VPC_ID
201205
SubnetIds:
202206
Type: CommaDelimitedList
203207
Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated)
208+
Default: SUBNET_IDS
204209
SecurityGroupIds:
205210
Type: CommaDelimitedList
206211
Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated)
212+
Default: SECURITY_GROUP_IDS
207213

208214
Outputs:
209215
MSKConsumerLambdaFunction:

msk-lambda-schema-avro-java-sam/template_original.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ Resources:
5050
DestinationConfig:
5151
OnFailure:
5252
Destination: !GetAtt ConsumerDLQ.Arn
53+
FilterCriteria:
54+
Filters:
55+
- Pattern: '{ "value": { "zip": [ { "prefix": "2000" } ] }}'
5356
Policies:
5457
- Statement:
5558
- Sid: KafkaClusterPermissionsPolicy

0 commit comments

Comments
 (0)