11from datetime import datetime
22from typing import Dict , List , Literal , Optional , Type , Union
33
4- from pydantic import BaseModel , field_validator
4+ from pydantic import BaseModel , Field , field_validator
55
66from aws_lambda_powertools .shared .functions import base64_decode , bytes_to_string , decode_header_bytes
77
88SERVERS_DELIMITER = ","
99
1010
1111class KafkaRecordSchemaMetadata (BaseModel ):
12- dataFormat : str
13- schemaId : str
12+ dataFormat : str = Field (
13+ description = "The data format of the schema (e.g., AVRO, JSON)." ,
14+ examples = ["AVRO" , "JSON" , "PROTOBUF" ],
15+ )
16+ schemaId : str = Field (
17+ description = "The unique identifier of the schema." ,
18+ examples = ["1234" , "5678" , "schema-abc-123" ],
19+ )
1420
1521
1622class KafkaRecordModel (BaseModel ):
17- topic : str
18- partition : int
19- offset : int
20- timestamp : datetime
21- timestampType : str
22- key : Optional [bytes ] = None
23- value : Union [str , Type [BaseModel ]]
24- headers : List [Dict [str , bytes ]]
25- keySchemaMetadata : Optional [KafkaRecordSchemaMetadata ] = None
26- valueSchemaMetadata : Optional [KafkaRecordSchemaMetadata ] = None
23+ topic : str = Field (
24+ description = "The Kafka topic name from which the record originated." ,
25+ examples = ["mytopic" , "user-events" , "order-processing" , "mymessage-with-unsigned" ],
26+ )
27+ partition : int = Field (
28+ description = "The partition number within the topic from which the record was consumed." ,
29+ examples = [0 , 1 , 5 , 10 ],
30+ )
31+ offset : int = Field (
32+ description = "The offset of the record within the partition." ,
33+ examples = [15 , 123 , 456789 , 1000000 ],
34+ )
35+ timestamp : datetime = Field (
36+ description = "The timestamp of the record." ,
37+ examples = [1545084650987 , 1640995200000 , 1672531200000 ],
38+ )
39+ timestampType : str = Field (
40+ description = "The type of timestamp (CREATE_TIME or LOG_APPEND_TIME)." ,
41+ examples = ["CREATE_TIME" , "LOG_APPEND_TIME" ],
42+ )
43+ key : Optional [bytes ] = Field (
44+ default = None ,
45+ description = "The message key, base64-encoded. Can be null for messages without keys." ,
46+ examples = ["cmVjb3JkS2V5" , "dXNlci0xMjM=" , "b3JkZXItNDU2" , None ],
47+ )
48+ value : Union [str , Type [BaseModel ]] = Field (
49+ description = "The message value, base64-encoded." ,
50+ examples = [
51+ "eyJrZXkiOiJ2YWx1ZSJ9" ,
52+ "eyJtZXNzYWdlIjogIkhlbGxvIEthZmthIn0=" ,
53+ "eyJ1c2VyX2lkIjogMTIzLCAiYWN0aW9uIjogImxvZ2luIn0=" ,
54+ ],
55+ )
56+ headers : List [Dict [str , bytes ]] = Field (
57+ description = "A list of message headers as key-value pairs with byte array values." ,
58+ examples = [
59+ [{"headerKey" : [104 , 101 , 97 , 100 , 101 , 114 , 86 , 97 , 108 , 117 , 101 ]}],
60+ [{"contentType" : [97 , 112 , 112 , 108 , 105 , 99 , 97 , 116 , 105 , 111 , 110 , 47 , 106 , 115 , 111 , 110 ]}],
61+ [],
62+ ],
63+ )
64+ keySchemaMetadata : Optional [KafkaRecordSchemaMetadata ] = Field (
65+ default = None ,
66+ description = "Schema metadata for the message key when using schema registry." ,
67+ examples = [{"dataFormat" : "AVRO" , "schemaId" : "1234" }, None ],
68+ )
69+ valueSchemaMetadata : Optional [KafkaRecordSchemaMetadata ] = Field (
70+ default = None ,
71+ description = "Schema metadata for the message value when using schema registry." ,
72+ examples = [{"dataFormat" : "AVRO" , "schemaId" : "1234" }, None ],
73+ )
2774
2875 # key is optional; only decode if not None
2976 @field_validator ("key" , mode = "before" )
@@ -44,8 +91,23 @@ def decode_headers_list(cls, value):
4491
4592
4693class KafkaBaseEventModel (BaseModel ):
47- bootstrapServers : List [str ]
48- records : Dict [str , List [KafkaRecordModel ]]
94+ bootstrapServers : List [str ] = Field (
95+ description = "A list of Kafka bootstrap servers (broker endpoints)." ,
96+ examples = [
97+ ["b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" ],
98+ [
99+ "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" ,
100+ "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" ,
101+ ],
102+ ],
103+ )
104+ records : Dict [str , List [KafkaRecordModel ]] = Field (
105+ description = "A dictionary mapping topic-partition combinations to lists of Kafka records." ,
106+ examples = [
107+ {"mytopic-0" : [{"topic" : "mytopic" , "partition" : 0 , "offset" : 15 }]},
108+ {"user-events-1" : [{"topic" : "user-events" , "partition" : 1 , "offset" : 123 }]},
109+ ],
110+ )
49111
50112 @field_validator ("bootstrapServers" , mode = "before" )
51113 def split_servers (cls , value ):
@@ -59,7 +121,10 @@ class KafkaSelfManagedEventModel(KafkaBaseEventModel):
59121 - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
60122 """
61123
62- eventSource : Literal ["SelfManagedKafka" ]
124+ eventSource : Literal ["SelfManagedKafka" ] = Field (
125+ description = "The event source identifier for self-managed Kafka." ,
126+ examples = ["SelfManagedKafka" ],
127+ )
63128
64129
65130class KafkaMskEventModel (KafkaBaseEventModel ):
@@ -69,5 +134,14 @@ class KafkaMskEventModel(KafkaBaseEventModel):
69134 - https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html
70135 """
71136
72- eventSource : Literal ["aws:kafka" ]
73- eventSourceArn : str
137+ eventSource : Literal ["aws:kafka" ] = Field (
138+ description = "The AWS service that invoked the function." ,
139+ examples = ["aws:kafka" ],
140+ )
141+ eventSourceArn : str = Field (
142+ description = "The Amazon Resource Name (ARN) of the MSK cluster." ,
143+ examples = [
144+ "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" ,
145+ "arn:aws:kafka:eu-central-1:123456789012:cluster/MyCluster/xyz789-1234-5678-90ab-cdef12345678-2" ,
146+ ],
147+ )
0 commit comments