|
1 | | -import datetime |
2 | | -import sys |
3 | | -import uuid |
4 | | -from dataclasses import dataclass |
5 | | - |
6 | | -import boto3 |
7 | | - |
8 | | -from tests.e2e.utils import data_fetcher, infrastructure |
9 | | - |
10 | | -# We only need typing_extensions for python versions <3.8 |
11 | | -if sys.version_info >= (3, 8): |
12 | | - from typing import TypedDict |
13 | | -else: |
14 | | - from typing_extensions import TypedDict |
15 | | - |
16 | | -from typing import Dict, Generator, Optional |
17 | | - |
18 | 1 | import pytest |
19 | 2 |
|
20 | | - |
21 | | -class LambdaConfig(TypedDict): |
22 | | - parameters: dict |
23 | | - environment_variables: Dict[str, str] |
24 | | - |
25 | | - |
26 | | -@dataclass |
27 | | -class InfrastructureOutput: |
28 | | - arns: Dict[str, str] |
29 | | - execution_time: datetime.datetime |
30 | | - |
31 | | - def get_lambda_arns(self) -> Dict[str, str]: |
32 | | - return self.arns |
33 | | - |
34 | | - def get_lambda_function_arn(self, cf_output_name: str) -> Optional[str]: |
35 | | - return self.arns.get(cf_output_name) |
36 | | - |
37 | | - def get_lambda_function_name(self, cf_output_name: str) -> Optional[str]: |
38 | | - lambda_arn = self.get_lambda_function_arn(cf_output_name=cf_output_name) |
39 | | - return lambda_arn.split(":")[-1] if lambda_arn else None |
40 | | - |
41 | | - def get_lambda_execution_time(self) -> datetime.datetime: |
42 | | - return self.execution_time |
43 | | - |
44 | | - def get_lambda_execution_time_timestamp(self) -> int: |
45 | | - return int(self.execution_time.timestamp() * 1000) |
46 | | - |
47 | | - |
48 | | -@pytest.fixture(scope="module") |
49 | | -def create_infrastructure(config, request) -> Generator[Dict[str, str], None, None]: |
50 | | - stack_name = f"test-lambda-{uuid.uuid4()}" |
51 | | - test_dir = request.fspath.dirname |
52 | | - handlers_dir = f"{test_dir}/handlers/" |
53 | | - |
54 | | - infra = infrastructure.Infrastructure(stack_name=stack_name, handlers_dir=handlers_dir, config=config) |
55 | | - yield infra.deploy(Stack=infrastructure.InfrastructureStack) |
56 | | - infra.delete() |
57 | | - |
58 | | - |
59 | | -@pytest.fixture(scope="module") |
60 | | -def execute_lambda(create_infrastructure) -> InfrastructureOutput: |
61 | | - execution_time = datetime.datetime.utcnow() |
62 | | - session = boto3.Session() |
63 | | - client = session.client("lambda") |
64 | | - for _, arn in create_infrastructure.items(): |
65 | | - data_fetcher.get_lambda_response(lambda_arn=arn, client=client) |
66 | | - return InfrastructureOutput(arns=create_infrastructure, execution_time=execution_time) |
| 3 | +from tests.e2e.utils.infrastructure import LambdaLayerStack, deploy_once |
| 4 | + |
| 5 | + |
| 6 | +@pytest.fixture(scope="session") |
| 7 | +def lambda_layer_arn(lambda_layer_deployment): |
| 8 | + yield lambda_layer_deployment.get("LayerArn") |
| 9 | + |
| 10 | + |
| 11 | +@pytest.fixture(scope="session") |
| 12 | +def lambda_layer_deployment(request: pytest.FixtureRequest, tmp_path_factory: pytest.TempPathFactory, worker_id: str): |
| 13 | + """Setup and teardown logic for E2E test infrastructure |
| 14 | +
|
| 15 | + Parameters |
| 16 | + ---------- |
| 17 | + request : pytest.FixtureRequest |
| 18 | + pytest request fixture to introspect absolute path to test being executed |
| 19 | + tmp_path_factory : pytest.TempPathFactory |
| 20 | + pytest temporary path factory to discover shared tmp when multiple CPU processes are spun up |
| 21 | + worker_id : str |
| 22 | + pytest-xdist worker identification to detect whether parallelization is enabled |
| 23 | +
|
| 24 | + Yields |
| 25 | + ------ |
| 26 | + Dict[str, str] |
| 27 | + CloudFormation Outputs from deployed infrastructure |
| 28 | + """ |
| 29 | + yield from deploy_once( |
| 30 | + stack=LambdaLayerStack, |
| 31 | + request=request, |
| 32 | + tmp_path_factory=tmp_path_factory, |
| 33 | + worker_id=worker_id, |
| 34 | + layer_arn="", |
| 35 | + ) |
0 commit comments