Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,13 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
managed_policy_map = kwargs.get("managed_policy_map", {})
get_managed_policy_map = kwargs.get("get_managed_policy_map")

execution_role = None
if lambda_function.Role is None:
execution_role = self._construct_role(
managed_policy_map,
event_invoke_policies,
intrinsics_resolver,
get_managed_policy_map,
)
lambda_function.Role = execution_role.get_runtime_attr("arn")
resources.append(execution_role)
execution_role = self._construct_role(
managed_policy_map,
event_invoke_policies,
intrinsics_resolver,
get_managed_policy_map,
)
self._make_lambda_role(lambda_function, intrinsics_resolver, execution_role, resources)

try:
resources += self._generate_event_resources(
Expand All @@ -374,6 +371,42 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P

return resources

def _make_lambda_role(
self,
lambda_function: LambdaFunction,
intrinsics_resolver: IntrinsicsResolver,
execution_role: IAMRole,
resources: List[Any],
) -> None:
lambda_role = lambda_function.Role

if lambda_role is None:
resources.append(execution_role)
lambda_function.Role = execution_role.get_runtime_attr("arn")

if is_intrinsic_if(lambda_role):
resources.append(execution_role)

# We need to create and if else condition here
role_resolved_value = intrinsics_resolver.resolve_parameter_refs(self.Role)
role_list = role_resolved_value.get("Fn::If")

# both are none values then we need to create a role
if is_intrinsic_no_value(role_list[1]) and is_intrinsic_no_value(role_list[2]):
lambda_function.Role = execution_role.get_runtime_attr("arn")

# first value is none so we should create condition ? create : [2]
elif is_intrinsic_no_value(role_list[1]):
lambda_function.Role = make_conditional(
role_list[0], execution_role.get_runtime_attr("arn"), role_list[2]
)

# second value is none so we should create condition ? [1] : create
elif is_intrinsic_no_value(role_list[2]):
lambda_function.Role = make_conditional(
role_list[0], role_list[1], execution_role.get_runtime_attr("arn")
)

def _construct_event_invoke_config( # noqa: PLR0913
self,
function_name: str,
Expand Down
112 changes: 112 additions & 0 deletions tests/model/test_sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,3 +738,115 @@ def test_function_datasource_set_with_none():
api = SamGraphQLApi("MyApi")
none_datasource = api._construct_none_datasource("foo")
assert none_datasource


class TestSamFunctionRoleResolver(TestCase):
"""
Tests for resolving IAM role property values in SamFunction
"""

def setUp(self):
self.function = SamFunction("foo")
self.function.CodeUri = "s3://foobar/foo.zip"
self.function.Runtime = "foo"
self.function.Handler = "bar"
self.template = {"Conditions": {}}

self.kwargs = {
"intrinsics_resolver": IntrinsicsResolver({}),
"event_resources": [],
"managed_policy_map": {},
"resource_resolver": ResourceResolver({}),
"conditions": self.template.get("Conditions", {}),
}

def test_role_none_creates_execution_role(self):
self.function.Role = None
cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]

self.assertEqual(len(generated_roles), 1) # Should create execution role

def test_role_explicit_arn_no_execution_role(self):
test_role = "arn:aws:iam::123456789012:role/existing-role"
self.function.Role = test_role

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(len(generated_roles), 0) # Should not create execution role
self.assertEqual(lambda_function.Role, test_role)

def test_role_fn_if_no_aws_no_value_keeps_original(self):
role_conditional = {
"Fn::If": ["Condition", "arn:aws:iam::123456789012:role/existing-role", {"Ref": "iamRoleArn"}]
}
self.function.Role = role_conditional

template = {"Conditions": {"Condition": True}}
kwargs = dict(self.kwargs)
kwargs["conditions"] = template.get("Conditions", {})

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(len(generated_roles), 1)
self.assertEqual(lambda_function.Role, role_conditional)

def test_role_fn_if_both_no_value_creates_execution_role(self):
role_conditional = {"Fn::If": ["Condition", {"Ref": "AWS::NoValue"}, {"Ref": "AWS::NoValue"}]}
self.function.Role = role_conditional

template = {"Conditions": {"Condition": True}}
kwargs = dict(self.kwargs)
kwargs["conditions"] = template.get("Conditions", {})

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]

self.assertEqual(len(generated_roles), 1)

def test_role_fn_if_first_no_value_creates_conditional_role(self):
role_conditional = {"Fn::If": ["Condition", {"Ref": "AWS::NoValue"}, {"Ref": "iamRoleArn"}]}
self.function.Role = role_conditional

template = {"Conditions": {"Condition": True}}
kwargs = dict(self.kwargs)
kwargs["conditions"] = template.get("Conditions", {})

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(len(generated_roles), 1)
self.assertEqual(
lambda_function.Role, {"Fn::If": ["Condition", {"Fn::GetAtt": ["fooRole", "Arn"]}, {"Ref": "iamRoleArn"}]}
)

def test_role_fn_if_second_no_value_creates_conditional_role(self):
role_conditional = {"Fn::If": ["Condition", {"Ref": "iamRoleArn"}, {"Ref": "AWS::NoValue"}]}
self.function.Role = role_conditional

template = {"Conditions": {"Condition": True}}
kwargs = dict(self.kwargs)
kwargs["conditions"] = template.get("Conditions", {})

cfn_resources = self.function.to_cloudformation(**self.kwargs)
generated_roles = [x for x in cfn_resources if isinstance(x, IAMRole)]
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(len(generated_roles), 1)
self.assertEqual(
lambda_function.Role, {"Fn::If": ["Condition", {"Ref": "iamRoleArn"}, {"Fn::GetAtt": ["fooRole", "Arn"]}]}
)

def test_role_get_att_no_execution_role(self):
role_get_att = {"Fn::GetAtt": ["MyCustomRole", "Arn"]}
self.function.Role = role_get_att

cfn_resources = self.function.to_cloudformation(**self.kwargs)
lambda_function = next(r for r in cfn_resources if r.resource_type == "AWS::Lambda::Function")

self.assertEqual(lambda_function.Role, role_get_att)