Skip to content

Commit 68096f4

Browse files
committed
2 parents a48c46b + b5e9624 commit 68096f4

File tree

35 files changed

+6820
-0
lines changed

35 files changed

+6820
-0
lines changed

msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml

Lines changed: 1347 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
# msk-lambda-schema-avro-java-sam
2+
# Java AWS Lambda Kafka consumer and AVRO producer with Schema Registry and AVRO, using AWS SAM
3+
4+
This pattern is an example of Lambda functions that:
5+
1. Consume messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic
6+
2. Produce AVRO-formatted messages to an Amazon MSK topic using Schema Registry
7+
8+
Both functions use IAM authentication to connect to the MSK Cluster and leverage AWS Glue Schema Registry for AVRO schema management.
9+
10+
This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders.
11+
12+
- kafka_event_consumer_function/src/main/java - Code for the consumer Lambda function.
13+
- kafka_event_producer_function/src/main/java - Code for the AVRO producer Lambda function.
14+
- events - Invocation events that you can use to invoke the functions.
15+
- kafka_event_consumer_function/src/test/java - Unit tests for the consumer code.
16+
- template.yaml - A template that defines the application's Lambda functions.
17+
- template_original.yaml - The original template with placeholders that get replaced during deployment.
18+
- MSKAndKafkaClientEC2.yaml - A Cloudformation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisities already installed, so you can directly build and deploy the lambda functions and test them out.
19+
20+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
21+
22+
## Requirements
23+
24+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
25+
26+
## Run the Cloudformation template to create the MSK Cluster and Client EC2 machine
27+
28+
* [Run the Cloudformation template using the file MSKAndKafkaClientEC2.yaml] - You can go to the AWS Cloudformation console, create a new stack by specifying the template file. You can keep the defaults for input parameters or modify them as necessary. Wait for the Cloudformation stack to be created. This Cloudformation template will create an MSK cluster (Provisioned or Serverless based on your selection). It will also create an EC2 machine that you can use as a client.
29+
30+
* [Connect to the EC2 machine] - Once the Cloudformation stack is created, you can go to the EC2 console and log into the machine using either "Connect using EC2 Instance Connect" or "Connect using EC2 Instance Connect Endpoint" option under the "EC2 Instance Connect" tab.
31+
Note: You may need to wait for some time after the Cloudformation stack is created, as some UserData scripts continue running after the Cloudformation stack shows Created.
32+
33+
* [Check if Kafka Topic has been created] - Once you are inside the EC2 machine, you should be in the /home/ec2-user folder. Check to see the contents of the file kafka_topic_creator_output.txt by running the command cat kafka_topic_creator_output.txt. You should see an output such as "Created topic MskIamJavaLambdaTopic."
34+
35+
If you are not able to find the file kafka_topic_creator_output.txt or if it is blank or you see an error message, then you need to run the file ./kafka_topic_creator.sh. This file runs a script that goes and creates the Kafka topic that the Lambda function will subscribe to.
36+
37+
## Pre-requisites to Deploy the sample Lambda function
38+
39+
The EC2 machine that was created by running the Cloudformation template has all the software that will be needed to deploy the Lambda function.
40+
41+
The AWS SAM CLI is a serverless tool for building and testing Lambda applications. It uses Docker to locally test your functions in an Amazon Linux environment that resembles the Lambda execution environment. It can also emulate your application's build environment and API.
42+
43+
* Java - On the EC2 machine, we have installed the version of Java that you selected. We have installed Amazon Corrretto JDK of the version that you had selected at the time of specifying the input parameters in the Cloudformation template. At the time of publishing this pattern, only Java versions 11, 17 and 21 are supported by AWS SAM
44+
* Maven - On the EC2 machine, we have installed Maven (https://maven.apache.org/install.html)
45+
* AWS SAM CLI - We have installed the AWS SAM CLI (https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html)
46+
* Docker - We have installed the Docker Community Edition on the EC2 machine (https://hub.docker.com/search/?type=edition&offering=community)
47+
48+
We have also cloned the Github repository for serverless-patterns on the EC2 machine already by running the below command
49+
```
50+
git clone https://github.com/aws-samples/serverless-patterns.git
51+
```
52+
Change directory to the pattern directory:
53+
```
54+
cd serverless-patterns/msk-lambda-schema-avro-java-sam
55+
```
56+
57+
## Build the application
58+
59+
Build your application with the `sam build` command.
60+
61+
```bash
62+
sam build
63+
```
64+
65+
The SAM CLI installs dependencies defined in `kafka_event_consumer_function/pom.xml`, creates a deployment package, and saves it in the `.aws-sam/build` folder.
66+
67+
## Deploy the sample application
68+
69+
To deploy your application for the first time, run the following in your shell:
70+
71+
```bash
72+
sam deploy --capabilities CAPABILITY_IAM --no-confirm-changeset --no-disable-rollback --region $AWS_REGION --stack-name msk-lambda-schema-avro-java-sam --guided
73+
```
74+
75+
The sam deploy command will package and deploy your application to AWS, with a series of prompts:
76+
77+
* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name.
78+
* **AWS Region**: The AWS region you want to deploy your app to.
79+
* **Parameter MSKClusterName**: The name of the MSK Cluster. This will be `<stack-name>-cluster` from the CloudFormation template you deployed in the previous step.
80+
* **Parameter MSKClusterId**: The unique ID of the MSK Cluster. This can be found in the MSK console or extracted from the MSK ARN in the CloudFormation outputs.
81+
* **Parameter MSKTopic**: The Kafka topic on which the Lambda functions will produce and consume messages. You can find this in the CloudFormation outputs as `KafkaTopicForLambda`.
82+
* **Parameter ContactSchemaName**: The name of the schema to be used for the AVRO serialization (default: ContactSchema).
83+
* **Parameter VpcId**: The ID of the VPC where the MSK cluster is deployed. You can find this in the CloudFormation outputs as `VPCId`.
84+
* **Parameter SubnetIds**: Comma-separated list of subnet IDs where the MSK cluster is deployed. You can find these in the CloudFormation outputs as `PrivateSubnetMSKOne`, `PrivateSubnetMSKTwo`, and `PrivateSubnetMSKThree`.
85+
* **Parameter SecurityGroupIds**: Comma-separated list of security group IDs that allow access to the MSK cluster. You can find this in the CloudFormation outputs as `SecurityGroupId`.
86+
* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review.
87+
* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions.
88+
* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails.
89+
* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project.
90+
* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally.
91+
* **SAM configuration environment [default]**: Environment for storing deployment information locally.
92+
93+
You should get a message "Successfully created/updated stack - <StackName> in <Region>" if all goes well.
94+
95+
**Note: You must retrieve the required parameters from the CloudFormation outputs in the AWS Console after deploying the MSKAndKafkaClientEC2.yaml template. These outputs contain all the necessary information for deploying the Lambda functions.**
96+
97+
98+
## Test the sample application
99+
100+
Once the Lambda functions are deployed, you can test the application by invoking the producer Lambda function, which will generate AVRO-formatted messages to the MSK topic. The consumer Lambda function will then automatically process these messages.
101+
102+
### Option 1: Invoke the producer Lambda function using AWS CLI
103+
104+
You can invoke the producer Lambda function using the AWS CLI with the following command:
105+
106+
```bash
107+
aws lambda invoke \
108+
--function-name msk-lambda-schema-avro-java-sam-LambdaMSKProducerJavaFunction-XXXXXXXXXXXX \
109+
--payload '{"message": "Test message using AVRO and Schema Registry"}' \
110+
--cli-binary-format raw-in-base64-out \
111+
response.json
112+
```
113+
114+
You can find the exact function name in the AWS Lambda console or by running:
115+
116+
```bash
117+
aws lambda list-functions --query "Functions[?contains(FunctionName, 'Producer')].FunctionName"
118+
```
119+
120+
### Option 2: Invoke the producer Lambda function using AWS Console
121+
122+
1. Open the [AWS Lambda Console](https://console.aws.amazon.com/lambda)
123+
2. Find and select your producer Lambda function (it will be named something like `msk-lambda-schema-avro-java-sam-LambdaMSKProducerJavaFunction-XXXXXXXXXXXX`)
124+
3. Click on the "Test" tab
125+
4. Create a new test event with the following JSON payload:
126+
```json
127+
{
128+
"message": "Test message using AVRO and Schema Registry"
129+
}
130+
```
131+
5. Click "Test" to invoke the function
132+
133+
### Verify the results
134+
135+
After invoking the producer function, check the CloudWatch logs for both Lambda functions:
136+
137+
1. Open the [CloudWatch Logs Console](https://console.aws.amazon.com/cloudwatch/home#logs:)
138+
2. Find the log groups for both your producer and consumer Lambda functions:
139+
- Producer log group: `/aws/lambda/msk-lambda-schema-avro-java-sam-LambdaMSKProducerJavaFunction-XXXXXXXXXXXX`
140+
- Consumer log group: `/aws/lambda/msk-lambda-schema-avro-java-sam-LambdaMSKConsumerJavaFunction-XXXXXXXXXXXX`
141+
142+
You can search for these log groups by typing "msk-lambda-schema-avro-java-sam" in the filter box.
143+
144+
3. Click on each log group and then select the most recent log stream (typically named with a timestamp and UUID)
145+
4. In the producer logs, look for entries showing:
146+
- Successful serialization of the message using AVRO format
147+
- Successful registration or retrieval of the schema from Schema Registry
148+
- Confirmation that the message was sent to the MSK topic
149+
150+
5. In the consumer logs, look for entries showing:
151+
- Receipt of the message batch from the MSK topic
152+
- Successful deserialization of the AVRO message
153+
- The decoded message content and any processing performed on it
154+
155+
The consumer Lambda function will automatically process messages from the MSK topic. It parses the Kafka messages and outputs the fields in the Kafka messages to CloudWatch logs.
156+
157+
Each key has a list of messages. Each Kafka message has the following properties - Topic, Partition, Offset, TimeStamp, TimeStampType, Key and Value
158+
159+
The Key and Value are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value.
160+
161+
The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs.
162+
163+
### Message Filtering with Event Source Mapping
164+
165+
This sample application demonstrates how to use event source mapping filters with Amazon MSK and Lambda. The producer Lambda function generates contacts with zip codes that start with either "1000" or "2000" (with approximately 50% probability for each). However, the consumer Lambda function is configured to only process messages where the zip code starts with "1000".
166+
167+
#### Filter Configuration
168+
169+
The filter is configured in the SAM template using the `FilterCriteria` property of the MSK event source mapping:
170+
171+
```yaml
172+
FilterCriteria:
173+
Filters:
174+
- Pattern: '{ "value": { "zip": [ { "prefix": "1000" } ] }}'
175+
```
176+
177+
This filter pattern instructs the event source mapping to only send messages to the Lambda function if the message value contains a "zip" field that starts with "1000".
178+
179+
#### Verifying the Filter Behavior
180+
181+
To verify that the filter is working correctly, follow these steps:
182+
183+
1. **Invoke the producer Lambda function** using one of the methods described above.
184+
185+
2. **Check the producer function logs** in CloudWatch:
186+
- Navigate to the CloudWatch Logs console
187+
- Find the log group for the producer function (`/aws/lambda/msk-lambda-schema-avro-java-sam-LambdaMSKProducerJavaFunction-XXXXXXXXXXXX`)
188+
- Open the most recent log stream
189+
- Look for the "ZIP CODE DISTRIBUTION SUMMARY" section, which shows how many messages were generated with zip codes starting with "1000" and how many with "2000"
190+
- You should see that the producer generated a mix of both zip code types
191+
192+
3. **Check the consumer function logs** in CloudWatch:
193+
- Navigate to the CloudWatch Logs console
194+
- Find the log group for the consumer function (`/aws/lambda/msk-lambda-schema-avro-java-sam-LambdaMSKConsumerJavaFunction-XXXXXXXXXXXX`)
195+
- Open the most recent log stream
196+
- You should see that the consumer only processed messages with zip codes starting with "1000"
197+
- Messages with zip codes starting with "2000" were filtered out by the event source mapping and never reached the Lambda function
198+
199+
This demonstrates how event source mapping filters can be used to efficiently process only the messages that match specific criteria, reducing Lambda invocation costs and processing overhead.
200+
201+
## Cleanup
202+
203+
You can first clean-up the Lambda function by running the sam delete command
204+
205+
```
206+
cd /home/ec2-user/serverless-patterns/msk-lambda-schema-avro-java-sam
207+
sam delete
208+
209+
```
210+
confirm by pressing y for both the questions
211+
You should see the lambda function getting deleted and a final confirmation "Deleted successfully" on the command-line
212+
213+
Next you need to delete the Cloudformation template that created the MSK Server and the EC2 machine by going to the Cloudformation console and selecting the stack and then hitting the "Delete" button. It will run for sometime but eventually you should see the stack getting cleaned up. If you get an error message that says the stack could not be deleted, please retry again and do a Force Delete. The reason this may happen is because ENIs created by the deplayed Lambda function in your VPC may prevent the VPC from being deleted even after deleting the lambda function.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"firstname": "John",
3+
"lastname": "Doe",
4+
"company": "Example Corp",
5+
"street": "123 Main St",
6+
"city": "Seattle",
7+
"county": "King",
8+
"state": "WA",
9+
"zip": "98101",
10+
"homePhone": "555-123-4567",
11+
"cellPhone": "555-987-6543",
12+
"email": "john.doe@example.com",
13+
"website": "https://www.johndoe.com"
14+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"records":{
3+
"myTopic-0":[
4+
{
5+
"topic":"myTopic",
6+
"partition":0,
7+
"offset":250,
8+
"timestamp":1678072110111,
9+
"timestampType":"CREATE_TIME",
10+
"value":"Zg==",
11+
"headers":[
12+
13+
]
14+
},
15+
{
16+
"topic":"myTopic",
17+
"partition":0,
18+
"offset":251,
19+
"timestamp":1678072111086,
20+
"timestampType":"CREATE_TIME",
21+
"value":"Zw==",
22+
"headers":[
23+
24+
]
25+
}
26+
]
27+
},
28+
"eventSource":"aws:kafka",
29+
"eventSourceArn":"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13",
30+
"bootstrapServers":"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098"
31+
}

0 commit comments

Comments
 (0)