Skip to content

Commit a38e88b

Browse files
Made updates to CloudFormation template, SAM template file and AvroProducerHandler.java file to take care of correctly passing environment variables and also not have the user type in VPC, Subnets and Security Group
1 parent c3759a8 commit a38e88b

File tree

3 files changed

+74
-6
lines changed

3 files changed

+74
-6
lines changed

msk-lambda-schema-avro-java-sam/MSKAndKafkaClientEC2.yaml

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ Parameters:
3333
Default: https://github.com/aws-samples/serverless-patterns/
3434
ContactSchemaName:
3535
Type: String
36-
Default: ContactSchema
36+
Default: ContactSchema
37+
GlueSchemaRegistryForMSKName:
38+
Type: String
39+
Default: GlueSchemaRegistryForMSK
40+
3741
Conditions:
3842
CreateProvisionedCluster: !Equals
3943
- !Ref EnvType
@@ -271,13 +275,21 @@ Resources:
271275
FromPort: 22
272276
ToPort: 22
273277
SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId
274-
278+
279+
MSKGlueRegistry:
280+
Type: AWS::Glue::Registry
281+
Properties:
282+
Name: !Ref GlueSchemaRegistryForMSKName
283+
Description: "Registry for storing schemas related to MSK"
284+
275285
ContactSchema:
276286
Type: AWS::Glue::Schema
277287
Properties:
278288
Name: !Ref ContactSchemaName
279289
Compatibility: BACKWARD
280290
DataFormat: AVRO
291+
Registry:
292+
Arn: !GetAtt MSKGlueRegistry.Arn
281293
SchemaDefinition: |
282294
{
283295
"type": "record",
@@ -543,11 +555,26 @@ Resources:
543555
cd ./serverless-patterns/msk-lambda-schema-avro-java-sam
544556
cp template_original.yaml template.yaml
545557
sudo chown -R ec2-user .
558+
559+
GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name}
560+
CONTACT_SCHEMA=${contact_schema_name}
561+
VPC_ID=${vpcid}
562+
LAMBDA_SECURITY_GROUP_ID={securitygroup}
563+
PRIVATE_SUBNET_1={privatesubnetone}
564+
PRIVATE_SUBNET_2={privatesubnettwo}
565+
PRIVATE_SUBNET_3={privatesubnetthree}
566+
SUBNET_IDS="$PRIVATE_SUBNET_1,$PRIVATE_SUBNET_2,$PRIVATE_SUBNET_3"
567+
546568
source /home/ec2-user/.bash_profile
547569
sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml
548570
sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml
549571
sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml
550572
sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml
573+
sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml
574+
sed -i "s/CONTACT_SCHEMA/$CONTACT_SCHEMA/g" template.yaml
575+
sed -i "s/VPC_ID/$VPC_ID/g" template.yaml
576+
sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml
577+
sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml
551578
552579
# Get IP CIDR range for EC2 Instance Connect
553580
cd /home/ec2-user
@@ -570,6 +597,14 @@ Resources:
570597
serverless_land_github_location: !Ref ServerlessLandGithubLocation
571598
aws_region: !Ref 'AWS::Region'
572599
java_version: !Ref JavaVersion
600+
vpcid: !Ref VPCId
601+
privatesubnetone: !Ref PrivateSubnetMSKOne
602+
privatesubnettwo: !Ref PrivateSubnetMSKTwo
603+
privatesubnetthree: !Ref PrivateSubnetMSKThree
604+
securitygroup: !Ref SecurityGroupId
605+
glue_registry_name: !Ref GlueSchemaRegistryForMSKName
606+
contact_schema_name: !Ref ContactSchemaName
607+
573608

574609
KafkaClientEC2InstanceServerless:
575610
Condition: CreateServerlessCluster
@@ -816,11 +851,27 @@ Resources:
816851
cd ./serverless-patterns/msk-lambda-schema-avro-java-sam
817852
cp template_original.yaml template.yaml
818853
sudo chown -R ec2-user .
854+
855+
GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name}
856+
CONTACT_SCHEMA=${contact_schema_name}
857+
VPC_ID=${vpcid}
858+
LAMBDA_SECURITY_GROUP_ID={securitygroup}
859+
PRIVATE_SUBNET_1={privatesubnetone}
860+
PRIVATE_SUBNET_2={privatesubnettwo}
861+
PRIVATE_SUBNET_3={privatesubnetthree}
862+
SUBNET_IDS="$PRIVATE_SUBNET_1,$PRIVATE_SUBNET_2,$PRIVATE_SUBNET_3"
863+
864+
819865
source /home/ec2-user/.bash_profile
820866
sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml
821867
sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml
822868
sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml
823869
sed -i "s/JAVA_VERSION/$JAVA_VERSION/g" template.yaml
870+
sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml
871+
sed -i "s/CONTACT_SCHEMA/$CONTACT_SCHEMA/g" template.yaml
872+
sed -i "s/VPC_ID/$VPC_ID/g" template.yaml
873+
sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml
874+
sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml
824875
825876
# Get IP CIDR range for EC2 Instance Connect
826877
cd /home/ec2-user
@@ -843,6 +894,15 @@ Resources:
843894
serverless_land_github_location: !Ref ServerlessLandGithubLocation
844895
aws_region: !Ref 'AWS::Region'
845896
java_version: !Ref JavaVersion
897+
vpcid: !Ref VPCId
898+
privatesubnetone: !Ref PrivateSubnetMSKOne
899+
privatesubnettwo: !Ref PrivateSubnetMSKTwo
900+
privatesubnetthree: !Ref PrivateSubnetMSKThree
901+
securitygroup: !Ref SecurityGroupId
902+
glue_registry_name: !Ref GlueSchemaRegistryForMSKName
903+
contact_schema_name: !Ref ContactSchemaName
904+
905+
846906

847907
EC2InstanceEndpoint:
848908
Type: AWS::EC2::InstanceConnectEndpoint
@@ -1215,6 +1275,9 @@ Resources:
12151275
SecurityGroups:
12161276
- !GetAtt MSKSecurityGroup.GroupId
12171277

1278+
1279+
1280+
12181281
Outputs:
12191282
VPCId:
12201283
Description: The ID of the VPC created

msk-lambda-schema-avro-java-sam/kafka_event_producer_function/src/main/java/com/amazonaws/services/lambda/samples/events/msk/AvroProducerHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public String handleRequest(Map<String, Object> event, Context context) {
3232
try {
3333
// Get environment variables
3434
String mskClusterArn = System.getenv("MSK_CLUSTER_ARN");
35-
String kafkaTopic = System.getenv("KAFKA_TOPIC");
35+
String kafkaTopic = System.getenv("MSK_TOPIC");
3636
String schemaName = System.getenv("CONTACT_SCHEMA_NAME");
3737
String region = System.getenv("AWS_REGION");
3838
String registryName = System.getenv("REGISTRY_NAME") != null ?

msk-lambda-schema-avro-java-sam/template_original.yaml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ Resources:
125125
Environment:
126126
Variables:
127127
MSK_CLUSTER_ARN: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ]
128-
KAFKA_TOPIC: !Ref MSKTopic
128+
MSK_TOPIC: !Ref MSKTopic
129+
REGISTRY_NAME: !Ref GlueSchemaRegistryName
129130
CONTACT_SCHEMA_NAME: !Ref ContactSchemaName
130131
JAVA_TOOL_OPTIONS: -XX:+TieredCompilation -XX:TieredStopAtLevel=1
131132
Policies:
@@ -191,10 +192,14 @@ Parameters:
191192
Type: String
192193
Description: Enter the name of the MSK Topic
193194
Default: KAFKA_TOPIC
195+
GlueSchemaRegistryName:
196+
Type: String
197+
Description: Enter the name of the Glue Schema Registry
198+
Default: GLUE_SCHEMA_REGISTRY_NAME
194199
ContactSchemaName:
195200
Type: String
196201
Description: Enter the name of the Contact Schema
197-
Default: ContactSchema
202+
Default: CONTACT_SCHEMA
198203
VpcId:
199204
Type: String
200205
Description: Enter the VPC ID where the MSK cluster is deployed
@@ -206,7 +211,7 @@ Parameters:
206211
SecurityGroupIds:
207212
Type: CommaDelimitedList
208213
Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated)
209-
Default: SECURITY_GROUP_IDS
214+
Default: LAMBDA_SECURITY_GROUP_ID
210215

211216
Outputs:
212217
MSKConsumerLambdaFunction:

0 commit comments

Comments
 (0)