22import zlib
33from typing import Dict , List , Literal , Type , Union
44
5- from pydantic import BaseModel , field_validator
5+ from pydantic import BaseModel , Field , field_validator
66
77from aws_lambda_powertools .shared .functions import base64_decode
88from aws_lambda_powertools .utilities .parser .models .cloudwatch import (
1111
1212
1313class KinesisDataStreamRecordPayload (BaseModel ):
14- kinesisSchemaVersion : str
15- partitionKey : str
16- sequenceNumber : str
17- data : Union [bytes , Type [BaseModel ], BaseModel ] # base64 encoded str is parsed into bytes
18- approximateArrivalTimestamp : float
14+ kinesisSchemaVersion : str = Field (
15+ description = "The version of the Kinesis Data Streams record format." ,
16+ examples = ["1.0" ],
17+ )
18+ partitionKey : str = Field (
19+ description = "The partition key that was used to place the record in the stream." ,
20+ examples = ["user123" , "device-001" , "order-12345" ],
21+ )
22+ sequenceNumber : str = Field (
23+ description = "The unique sequence number for the record within the shard." ,
24+ examples = [
25+ "49590338271490256608559692538361571095921575989136588898" ,
26+ "49545115243490985018280067714973144582180062593244200961" ,
27+ ],
28+ )
29+ data : Union [bytes , Type [BaseModel ], BaseModel ] = Field ( # base64 encoded str is parsed into bytes
30+ description = "The data payload of the record. Base64 encoded string is automatically decoded to bytes." ,
31+ )
32+ approximateArrivalTimestamp : float = Field (
33+ description = "The approximate time that the record was inserted into the stream (Unix timestamp)." ,
34+ examples = [1428537600.0 , 1609459200.5 ],
35+ )
1936
2037 @field_validator ("data" , mode = "before" )
2138 def data_base64_decode (cls , value ):
2239 return base64_decode (value )
2340
2441
2542class KinesisDataStreamRecord (BaseModel ):
26- eventSource : Literal ["aws:kinesis" ]
27- eventVersion : str
28- eventID : str
29- eventName : Literal ["aws:kinesis:record" ]
30- invokeIdentityArn : str
31- awsRegion : str
32- eventSourceARN : str
33- kinesis : KinesisDataStreamRecordPayload
43+ eventSource : Literal ["aws:kinesis" ] = Field (
44+ description = "The AWS service that generated the event." ,
45+ examples = ["aws:kinesis" ],
46+ )
47+ eventVersion : str = Field (description = "The version of the event schema." , examples = ["1.0" ])
48+ eventID : str = Field (
49+ description = "A unique identifier for the event." ,
50+ examples = ["shardId-000000000006:49590338271490256608559692538361571095921575989136588898" ],
51+ )
52+ eventName : Literal ["aws:kinesis:record" ] = Field (
53+ description = "The name of the event type." ,
54+ examples = ["aws:kinesis:record" ],
55+ )
56+ invokeIdentityArn : str = Field (
57+ description = "The ARN of the IAM role used to invoke the Lambda function." ,
58+ examples = ["arn:aws:iam::123456789012:role/lambda-kinesis-role" ],
59+ )
60+ awsRegion : str = Field (
61+ description = "The AWS region where the Kinesis stream is located." ,
62+ examples = ["us-east-1" , "us-west-2" , "eu-west-1" ],
63+ )
64+ eventSourceARN : str = Field (
65+ description = "The ARN of the Kinesis stream that generated the event." ,
66+ examples = ["arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" ],
67+ )
68+ kinesis : KinesisDataStreamRecordPayload = Field (description = "The Kinesis-specific data for the record." )
3469
3570 def decompress_zlib_record_data_as_json (self ) -> Dict :
3671 """Decompress Kinesis Record bytes data zlib compressed to JSON"""
@@ -41,7 +76,10 @@ def decompress_zlib_record_data_as_json(self) -> Dict:
4176
4277
4378class KinesisDataStreamModel (BaseModel ):
44- Records : List [KinesisDataStreamRecord ]
79+ Records : List [KinesisDataStreamRecord ] = Field (
80+ description = "A list of Kinesis Data Stream records that triggered the Lambda function." ,
81+ examples = [[]],
82+ )
4583
4684
4785def extract_cloudwatch_logs_from_event (event : KinesisDataStreamModel ) -> List [CloudWatchLogsDecode ]:
0 commit comments