Skip to content

Commit 3d3da17

Browse files
authored
feat: Support for self managed kafka as an event source (#2091)
1 parent fb4d963 commit 3d3da17

File tree

38 files changed

+2367
-14
lines changed

38 files changed

+2367
-14
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from integration.helpers.base_test import BaseTest
2+
from parameterized import parameterized
3+
4+
5+
class TestFunctionWithSelfManagedKafka(BaseTest):
6+
@parameterized.expand(
7+
[
8+
"combination/function_with_self_managed_kafka",
9+
"combination/function_with_self_managed_kafka_intrinsics",
10+
]
11+
)
12+
def test_function_with_self_managed_kafka(self, file_name):
13+
self.create_and_verify_stack(file_name)
14+
# Get the notification configuration and make sure Lambda Function connection is added
15+
lambda_client = self.client_provider.lambda_client
16+
function_name = self.get_physical_id_by_type("AWS::Lambda::Function")
17+
lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"]
18+
event_source_mapping_id = self.get_physical_id_by_type("AWS::Lambda::EventSourceMapping")
19+
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id)
20+
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
21+
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[
2+
{
3+
"LogicalResourceId": "KafkaFunction",
4+
"ResourceType": "AWS::Lambda::Function"
5+
},
6+
{
7+
"LogicalResourceId": "KafkaFunctionMyKafkaCluster",
8+
"ResourceType": "AWS::Lambda::EventSourceMapping"
9+
},
10+
{
11+
"LogicalResourceId": "KafkaFunctionRole",
12+
"ResourceType": "AWS::IAM::Role"
13+
},
14+
{
15+
"LogicalResourceId": "KafkaUserSecret",
16+
"ResourceType": "AWS::SecretsManager::Secret"
17+
}
18+
]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[
2+
{
3+
"LogicalResourceId": "KafkaFunctionWithIntrinsics",
4+
"ResourceType": "AWS::Lambda::Function"
5+
},
6+
{
7+
"LogicalResourceId": "KafkaFunctionWithIntrinsicsMyKafkaClusterWithIntrinsics",
8+
"ResourceType": "AWS::Lambda::EventSourceMapping"
9+
},
10+
{
11+
"LogicalResourceId": "KafkaFunctionWithIntrinsicsRole",
12+
"ResourceType": "AWS::IAM::Role"
13+
},
14+
{
15+
"LogicalResourceId": "KafkaUserSecret",
16+
"ResourceType": "AWS::SecretsManager::Secret"
17+
}
18+
]
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Resources:
2+
KafkaFunction:
3+
Type: AWS::Serverless::Function
4+
Properties:
5+
Handler: index.handler
6+
Runtime: nodejs12.x
7+
CodeUri: ${codeuri}
8+
MemorySize: 128
9+
Events:
10+
MyKafkaCluster:
11+
Type: SelfManagedKafka
12+
Properties:
13+
KafkaBootstrapServers:
14+
- abc.xyz.com:9092
15+
- 123.45.67.89:9096
16+
Topics:
17+
- Topic1
18+
SourceAccessConfigurations:
19+
- Type: BASIC_AUTH
20+
URI:
21+
Ref: KafkaUserSecret
22+
23+
KafkaUserSecret:
24+
Type: AWS::SecretsManager::Secret
25+
Properties:
26+
Name: KafkaUserPassword
27+
SecretString:
28+
Fn::Sub: '{"username":"testBrokerUser","password":"testBrokerPassword"}'
29+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
Parameters:
2+
TopicsValue:
3+
Type: CommaDelimitedList
4+
Default: Topic
5+
6+
KafkaBootstrapServersValue:
7+
Type: String
8+
Default: abc.xyz.com:9092
9+
10+
Resources:
11+
KafkaFunctionWithIntrinsics:
12+
Type: AWS::Serverless::Function
13+
Properties:
14+
Handler: index.handler
15+
Runtime: nodejs12.x
16+
CodeUri: ${codeuri}
17+
MemorySize: 128
18+
Events:
19+
MyKafkaClusterWithIntrinsics:
20+
Type: SelfManagedKafka
21+
Properties:
22+
KafkaBootstrapServers:
23+
- Ref: KafkaBootstrapServersValue
24+
Topics:
25+
Ref: TopicsValue
26+
SourceAccessConfigurations:
27+
- Type: BASIC_AUTH
28+
URI:
29+
Ref: KafkaUserSecret
30+
31+
KafkaUserSecret:
32+
Type: AWS::SecretsManager::Secret
33+
Properties:
34+
Name: KafkaUserPassword
35+
SecretString:
36+
Fn::Sub: '{"username":"testBrokerUserWithInstrinsic","password":"testBrokerPasswordWithInstrinsic"}'
37+

samtranslator/model/eventsources/pull.py

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
from six import string_types
12
from samtranslator.metrics.method_decorator import cw_timer
23
from samtranslator.model import ResourceMacro, PropertyType
34
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
4-
from samtranslator.model.types import is_type, is_str
5+
from samtranslator.model.types import is_type, is_str, list_of
6+
from samtranslator.model.intrinsics import is_intrinsic
57

68
from samtranslator.model.lambda_ import LambdaEventSourceMapping
79
from samtranslator.translator.arn_generator import ArnGenerator
@@ -20,6 +22,7 @@ class PullEventSource(ResourceMacro):
2022
"""
2123

2224
resource_type = None
25+
requires_stream_queue_broker = True
2326
property_types = {
2427
"Stream": PropertyType(False, is_str()),
2528
"Queue": PropertyType(False, is_str()),
@@ -39,6 +42,7 @@ class PullEventSource(ResourceMacro):
3942
"SecretsManagerKmsKeyId": PropertyType(False, is_str()),
4043
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
4144
"FunctionResponseTypes": PropertyType(False, is_type(list)),
45+
"KafkaBootstrapServers": PropertyType(False, is_type(list)),
4246
}
4347

4448
def get_policy_arn(self):
@@ -74,7 +78,7 @@ def to_cloudformation(self, **kwargs):
7478
except NotImplementedError:
7579
function_name_or_arn = function.get_runtime_attr("arn")
7680

77-
if not self.Stream and not self.Queue and not self.Broker:
81+
if self.requires_stream_queue_broker and not self.Stream and not self.Queue and not self.Broker:
7882
raise InvalidEventException(
7983
self.relative_id,
8084
"No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
@@ -99,6 +103,11 @@ def to_cloudformation(self, **kwargs):
99103
lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
100104
lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes
101105

106+
if self.KafkaBootstrapServers:
107+
lambda_eventsourcemapping.SelfManagedEventSource = {
108+
"Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
109+
}
110+
102111
destination_config_policy = None
103112
if self.DestinationConfig:
104113
# `Type` property is for sam to attach the right policies
@@ -286,3 +295,149 @@ def get_policy_statements(self):
286295
}
287296
document["PolicyDocument"]["Statement"].append(kms_policy)
288297
return [document]
298+
299+
300+
class SelfManagedKafka(PullEventSource):
301+
"""
302+
SelfManagedKafka event source
303+
"""
304+
305+
resource_type = "SelfManagedKafka"
306+
requires_stream_queue_broker = False
307+
AUTH_MECHANISM = ["SASL_SCRAM_256_AUTH", "SASL_SCRAM_512_AUTH", "BASIC_AUTH"]
308+
309+
def get_policy_arn(self):
310+
return None
311+
312+
def get_policy_statements(self):
313+
if not self.KafkaBootstrapServers:
314+
raise InvalidEventException(
315+
self.relative_id,
316+
"No KafkaBootstrapServers provided for self managed kafka as an event source",
317+
)
318+
319+
if not self.Topics:
320+
raise InvalidEventException(
321+
self.relative_id,
322+
"No Topics provided for self managed kafka as an event source",
323+
)
324+
325+
if len(self.Topics) != 1:
326+
raise InvalidEventException(
327+
self.relative_id,
328+
"Topics for self managed kafka only supports single configuration entry.",
329+
)
330+
331+
if not self.SourceAccessConfigurations:
332+
raise InvalidEventException(
333+
self.relative_id,
334+
"No SourceAccessConfigurations for self managed kafka event provided.",
335+
)
336+
document = self.generate_policy_document()
337+
return [document]
338+
339+
def generate_policy_document(self):
340+
statements = []
341+
authentication_uri, has_vpc_config = self.get_secret_key()
342+
if authentication_uri:
343+
secret_manager = self.get_secret_manager_secret(authentication_uri)
344+
statements.append(secret_manager)
345+
346+
if has_vpc_config:
347+
vpc_permissions = self.get_vpc_permission()
348+
statements.append(vpc_permissions)
349+
350+
if self.SecretsManagerKmsKeyId:
351+
kms_policy = self.get_kms_policy()
352+
statements.append(kms_policy)
353+
354+
document = {
355+
"PolicyDocument": {
356+
"Statement": statements,
357+
"Version": "2012-10-17",
358+
},
359+
"PolicyName": "SelfManagedKafkaExecutionRolePolicy",
360+
}
361+
362+
return document
363+
364+
def get_secret_key(self):
365+
authentication_uri = None
366+
has_vpc_subnet = False
367+
has_vpc_security_group = False
368+
for config in self.SourceAccessConfigurations:
369+
if config.get("Type") == "VPC_SUBNET":
370+
self.validate_uri(config, "VPC_SUBNET")
371+
has_vpc_subnet = True
372+
373+
elif config.get("Type") == "VPC_SECURITY_GROUP":
374+
self.validate_uri(config, "VPC_SECURITY_GROUP")
375+
has_vpc_security_group = True
376+
377+
elif config.get("Type") in self.AUTH_MECHANISM:
378+
if authentication_uri:
379+
raise InvalidEventException(
380+
self.relative_id,
381+
"Multiple auth mechanism properties specified in SourceAccessConfigurations for self managed kafka event.",
382+
)
383+
self.validate_uri(config, "auth mechanism")
384+
authentication_uri = config.get("URI")
385+
386+
else:
387+
raise InvalidEventException(
388+
self.relative_id,
389+
"Invalid SourceAccessConfigurations Type provided for self managed kafka event.",
390+
)
391+
392+
if (not has_vpc_subnet and has_vpc_security_group) or (has_vpc_subnet and not has_vpc_security_group):
393+
raise InvalidEventException(
394+
self.relative_id,
395+
"VPC_SUBNET and VPC_SECURITY_GROUP in SourceAccessConfigurations for SelfManagedKafka must be both provided.",
396+
)
397+
return authentication_uri, (has_vpc_subnet and has_vpc_security_group)
398+
399+
def validate_uri(self, config, msg):
400+
if not config.get("URI"):
401+
raise InvalidEventException(
402+
self.relative_id,
403+
"No {} URI property specified in SourceAccessConfigurations for self managed kafka event.".format(msg),
404+
)
405+
406+
if not isinstance(config.get("URI"), string_types) and not is_intrinsic(config.get("URI")):
407+
raise InvalidEventException(
408+
self.relative_id,
409+
"Wrong Type for {} URI property specified in SourceAccessConfigurations for self managed kafka event.".format(
410+
msg
411+
),
412+
)
413+
414+
def get_secret_manager_secret(self, authentication_uri):
415+
return {
416+
"Action": ["secretsmanager:GetSecretValue"],
417+
"Effect": "Allow",
418+
"Resource": authentication_uri,
419+
}
420+
421+
def get_vpc_permission(self):
422+
return {
423+
"Action": [
424+
"ec2:CreateNetworkInterface",
425+
"ec2:DescribeNetworkInterfaces",
426+
"ec2:DeleteNetworkInterface",
427+
"ec2:DescribeVpcs",
428+
"ec2:DescribeSubnets",
429+
"ec2:DescribeSecurityGroups",
430+
],
431+
"Effect": "Allow",
432+
"Resource": "*",
433+
}
434+
435+
def get_kms_policy(self):
436+
return {
437+
"Action": ["kms:Decrypt"],
438+
"Effect": "Allow",
439+
"Resource": {
440+
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
441+
+ self.SecretsManagerKmsKeyId
442+
},
443+
}

samtranslator/model/lambda_.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class LambdaEventSourceMapping(Resource):
6464
property_types = {
6565
"BatchSize": PropertyType(False, is_type(int)),
6666
"Enabled": PropertyType(False, is_type(bool)),
67-
"EventSourceArn": PropertyType(True, is_str()),
67+
"EventSourceArn": PropertyType(False, is_str()),
6868
"FunctionName": PropertyType(True, is_str()),
6969
"MaximumBatchingWindowInSeconds": PropertyType(False, is_type(int)),
7070
"MaximumRetryAttempts": PropertyType(False, is_type(int)),
@@ -78,6 +78,7 @@ class LambdaEventSourceMapping(Resource):
7878
"SourceAccessConfigurations": PropertyType(False, is_type(list)),
7979
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
8080
"FunctionResponseTypes": PropertyType(False, is_type(list)),
81+
"SelfManagedEventSource": PropertyType(False, is_type(dict)),
8182
}
8283

8384
runtime_attrs = {"name": lambda self: ref(self.logical_id)}

0 commit comments

Comments
 (0)