Skip to content

Commit a823e17

Browse files
committed
Added filter scenario
1 parent c3759a8 commit a823e17

File tree

4 files changed

+111
-3
lines changed

4 files changed

+111
-3
lines changed

msk-lambda-schema-avro-java-sam/kafka_event_consumer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/HandlerMSK.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public String handleRequest(KafkaEvent event, Context context) {
4141

4242
String response = new String("200 OK");
4343
this.listOfMessages = new ArrayList<KafkaMessage>();
44+
45+
// Counters for zip code patterns
46+
int zip1000Count = 0;
47+
int zip2000Count = 0;
4448
//Incoming KafkaEvent object has a property called records that is a map
4549
//Each key in the map is a combination of a topic and a partition
4650
Map<String, List<KafkaEventRecord>> record=event.getRecords();
@@ -128,6 +132,33 @@ public String handleRequest(KafkaEvent event, Context context) {
128132
try {
129133
decodedValue = new String(decodedValueBytes);
130134
logger.log("Decoded value as string: " + (decodedValue.length() > 100 ? decodedValue.substring(0, 100) + "..." : decodedValue));
135+
136+
// Add more detailed logging for AVRO messages
137+
logger.log("=== AVRO MESSAGE DETAILS ===");
138+
logger.log("Message appears to be AVRO-formatted. Attempting to extract fields:");
139+
140+
// Try to extract some common fields from the AVRO binary data
141+
// This is a simple approach to show some readable content
142+
StringBuilder readableContent = new StringBuilder();
143+
for (int i = 0; i < decodedValueBytes.length; i++) {
144+
// Skip non-printable characters
145+
if (decodedValueBytes[i] >= 32 && decodedValueBytes[i] < 127) {
146+
readableContent.append((char)decodedValueBytes[i]);
147+
}
148+
}
149+
150+
String readableString = readableContent.toString();
151+
logger.log("Readable content extracted from AVRO: " + readableString);
152+
153+
// Check for zip code patterns
154+
if (readableString.contains("1000")) {
155+
logger.log("FOUND ZIP CODE STARTING WITH 1000");
156+
}
157+
if (readableString.contains("2000")) {
158+
logger.log("FOUND ZIP CODE STARTING WITH 2000");
159+
}
160+
161+
logger.log("=== END AVRO MESSAGE DETAILS ===");
131162
} catch (Exception e) {
132163
logger.log("ERROR converting bytes to string: " + e.getMessage());
133164
decodedValue = "Error decoding: " + e.getMessage();
@@ -178,9 +209,39 @@ public String handleRequest(KafkaEvent event, Context context) {
178209
// as well as in Json format using gson.toJson function
179210
logger.log("Received this message from Kafka - " + thisMessage.toString());
180211
logger.log("Message in JSON format : " + gson.toJson(thisMessage));
212+
213+
// Add a more readable summary of the message
214+
logger.log("=== MESSAGE SUMMARY ===");
215+
logger.log("Topic: " + thisMessage.getTopic());
216+
logger.log("Partition: " + thisMessage.getPartition());
217+
logger.log("Offset: " + thisMessage.getOffset());
218+
logger.log("Key: " + thisMessage.getDecodedKey());
219+
220+
// Check for zip code patterns in the decoded value
221+
String decodedValueStr = thisMessage.getDecodedValue();
222+
if (decodedValueStr != null) {
223+
if (decodedValueStr.contains("1000")) {
224+
logger.log("ZIP CODE: Found 1000 pattern in message");
225+
zip1000Count++;
226+
}
227+
if (decodedValueStr.contains("2000")) {
228+
logger.log("ZIP CODE: Found 2000 pattern in message");
229+
zip2000Count++;
230+
}
231+
}
232+
233+
logger.log("=== END MESSAGE SUMMARY ===");
181234
}
182235
}
183236
logger.log("All Messages in this batch = " + gson.toJson(listOfMessages));
237+
238+
// Log summary of zip code distribution
239+
logger.log("========== ZIP CODE DISTRIBUTION SUMMARY ==========");
240+
logger.log("Messages with zip code containing 1000: " + zip1000Count);
241+
logger.log("Messages with zip code containing 2000: " + zip2000Count);
242+
logger.log("Other messages: " + (listOfMessages.size() - zip1000Count - zip2000Count));
243+
logger.log("====================================================");
244+
184245
logger.log("========== LAMBDA FUNCTION COMPLETED ==========");
185246
return response;
186247
}

msk-lambda-schema-avro-java-sam/kafka_event_producer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/AvroProducerHandler.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ public String handleRequest(Map<String, Object> event, Context context) {
2929
LambdaLogger logger = context.getLogger();
3030
logger.log("Received event: " + gson.toJson(event));
3131

32+
// Initialize counters for zip code distribution
33+
int messageCount = 10;
34+
int zip1000Count = 0;
35+
int zip2000Count = 0;
36+
3237
try {
3338
// Get environment variables
3439
String mskClusterArn = System.getenv("MSK_CLUSTER_ARN");
@@ -42,6 +47,9 @@ public String handleRequest(Map<String, Object> event, Context context) {
4247
throw new RuntimeException("Required environment variables not set: MSK_CLUSTER_ARN, KAFKA_TOPIC, CONTACT_SCHEMA_NAME");
4348
}
4449

50+
// Log that we're generating zip codes with different prefixes
51+
logger.log("Generating contacts with zip codes starting with 1000 (50% chance) or 2000 (50% chance)");
52+
4553
// Create a Contact object from the input event or use default values
4654
Contact contact = createContactFromEvent(event);
4755
logger.log("Created contact: " + gson.toJson(contact));
@@ -73,7 +81,6 @@ public String handleRequest(Map<String, Object> event, Context context) {
7381
logger.log("Schema definition: " + schemaDefinition);
7482

7583
// Send 10 messages
76-
int messageCount = 10;
7784
logger.log("Sending " + messageCount + " AVRO messages to topic: " + kafkaTopic);
7885

7986
for (int i = 0; i < messageCount; i++) {
@@ -90,13 +97,34 @@ public String handleRequest(Map<String, Object> event, Context context) {
9097
logger.log("Sending contact #" + (i+1) + ": " + gson.toJson(messageContact));
9198
logger.log("AVRO record #" + (i+1) + ": " + messageAvroRecord.toString());
9299

100+
// Log the zip code prefix for distribution tracking
101+
String zipCode = messageContact.getZip();
102+
if (zipCode != null && zipCode.length() >= 4) {
103+
String prefix = zipCode.substring(0, 4);
104+
logger.log("Contact #" + (i+1) + " zip code prefix: " + prefix);
105+
106+
// Count zip codes by prefix
107+
if ("1000".equals(prefix)) {
108+
zip1000Count++;
109+
} else if ("2000".equals(prefix)) {
110+
zip2000Count++;
111+
}
112+
}
113+
93114
// Send the message
94115
KafkaProducerHelper.sendAvroMessage(producer, kafkaTopic, messageKey, messageAvroRecord);
95116
logger.log("Successfully sent AVRO message #" + (i+1) + " to topic: " + kafkaTopic);
96117
}
118+
119+
// Log summary of zip code distribution
120+
logger.log("ZIP CODE DISTRIBUTION SUMMARY:");
121+
logger.log("Messages with zip code starting with 1000: " + zip1000Count);
122+
logger.log("Messages with zip code starting with 2000: " + zip2000Count);
123+
logger.log("Other zip code formats: " + (messageCount - zip1000Count - zip2000Count));
97124
}
98125

99-
return "Successfully sent 10 AVRO messages to Kafka topic: " + kafkaTopic;
126+
return "Successfully sent " + messageCount + " AVRO messages to Kafka topic: " + kafkaTopic +
127+
" (Zip codes: " + zip1000Count + " with prefix 1000, " + zip2000Count + " with prefix 2000)";
100128
} catch (Exception e) {
101129
logger.log("Error sending AVRO message: " + e.getMessage());
102130
e.printStackTrace();
@@ -121,7 +149,17 @@ private Contact createContactFromEvent(Map<String, Object> event) {
121149
contact.setCity(getStringValue(event, "city", "AnyCity"));
122150
contact.setCounty(getStringValue(event, "county", "AnyCounty"));
123151
contact.setState(getStringValue(event, "state", "AnyState"));
124-
contact.setZip(getStringValue(event, "zip", "1000" + randomDigit()));
152+
153+
// Generate zip code starting with 1000 50% of the time and 2000 the other 50%
154+
if (event.containsKey("zip") && event.get("zip") != null) {
155+
// If zip is provided in the event, use it as is
156+
contact.setZip(event.get("zip").toString());
157+
} else {
158+
// 50% chance for each prefix
159+
String prefix = Math.random() < 0.5 ? "1000" : "2000";
160+
contact.setZip(prefix + randomDigit());
161+
}
162+
125163
contact.setHomePhone(getStringValue(event, "homePhone", "555-123-" + randomDigits(4)));
126164
contact.setCellPhone(getStringValue(event, "cellPhone", "555-456-" + randomDigits(4)));
127165
contact.setEmail(getStringValue(event, "email", "user-" + randomSuffix() + "@example.com"));

msk-lambda-schema-avro-java-sam/template.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ Resources:
5050
DestinationConfig:
5151
OnFailure:
5252
Destination: !GetAtt ConsumerDLQ.Arn
53+
FilterCriteria:
54+
Filters:
55+
- Pattern: '{ "value": { "zip": [ { "prefix": "2000" } ] }}'
5356
Policies:
5457
- Statement:
5558
- Sid: KafkaClusterPermissionsPolicy
@@ -198,12 +201,15 @@ Parameters:
198201
VpcId:
199202
Type: String
200203
Description: Enter the VPC ID where the MSK cluster is deployed
204+
Default: VPC_ID
201205
SubnetIds:
202206
Type: CommaDelimitedList
203207
Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated)
208+
Default: SUBNET_IDS
204209
SecurityGroupIds:
205210
Type: CommaDelimitedList
206211
Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated)
212+
Default: SECURITY_GROUP_IDS
207213

208214
Outputs:
209215
MSKConsumerLambdaFunction:

msk-lambda-schema-avro-java-sam/template_original.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ Resources:
5050
DestinationConfig:
5151
OnFailure:
5252
Destination: !GetAtt ConsumerDLQ.Arn
53+
FilterCriteria:
54+
Filters:
55+
- Pattern: '{ "value": { "zip": [ { "prefix": "2000" } ] }}'
5356
Policies:
5457
- Statement:
5558
- Sid: KafkaClusterPermissionsPolicy

0 commit comments

Comments
 (0)