@@ -2,14 +2,19 @@ import type { AsyncHandler } from '@aws-lambda-powertools/commons/types';
22import { isNull , isRecord } from '@aws-lambda-powertools/commons/typeutils' ;
33import type { StandardSchemaV1 } from '@standard-schema/spec' ;
44import type { Context , Handler } from 'aws-lambda' ;
5+ import { deserialize as deserializeJson } from './deserializer/json.js' ;
6+ import { deserialize as deserializePrimitive } from './deserializer/primitive.js' ;
57import {
68 KafkaConsumerAvroMissingSchemaError ,
9+ KafkaConsumerDeserializationError ,
10+ KafkaConsumerError ,
711 KafkaConsumerParserError ,
812 KafkaConsumerProtobufMissingSchemaError ,
913} from './errors.js' ;
1014import type {
1115 ConsumerRecord ,
1216 ConsumerRecords ,
17+ Deserializer ,
1318 Record as KafkaRecord ,
1419 MSKEvent ,
1520 SchemaConfig ,
@@ -27,7 +32,7 @@ const assertIsMSKEvent = (event: unknown): event is MSKEvent => {
2732 ! isRecord ( event . records ) ||
2833 ! Object . values ( event . records ) . every ( ( arr ) => Array . isArray ( arr ) )
2934 ) {
30- throw new Error (
35+ throw new KafkaConsumerError (
3136 'Event is not a valid MSKEvent. Expected an object with a "records" property.'
3237 ) ;
3338 }
@@ -69,69 +74,80 @@ const deserializeHeaders = (headers: Record<string, number[]>[] | null) => {
6974 * @param config - The schema configuration to use for deserialization. See {@link SchemaConfigValue | `SchemaConfigValue`}.
7075 * If not provided, the value is decoded as a UTF-8 string.
7176 */
72- const deserialize = async ( value : string , config ?: SchemaConfigValue ) => {
73- // no config -> default to base64 decoding
77+ const deserialize = (
78+ value : string ,
79+ deserializer : Deserializer ,
80+ config ?: SchemaConfigValue
81+ ) => {
7482 if ( config === undefined ) {
75- return Buffer . from ( value , 'base64' ) . toString ( ) ;
83+ return deserializer ( value ) ;
7684 }
77-
78- // if config is provided, we expect it to have a specific type
79- if ( ! [ 'json' , 'avro' , 'protobuf' ] . includes ( config . type ) ) {
80- throw new Error (
81- `Unsupported deserialization type: ${ config . type } . Supported types are: json, avro, protobuf.`
82- ) ;
83- }
84-
8585 if ( config . type === 'json' ) {
86- const deserializer = await import ( './deserializer/json.js' ) ;
87- return deserializer . deserialize ( value ) ;
86+ return deserializer ( value ) ;
8887 }
8988
9089 if ( config . type === 'avro' ) {
9190 if ( ! config . schema ) {
9291 throw new KafkaConsumerAvroMissingSchemaError (
93- 'Schema string is required for Avro deserialization'
92+ 'Schema string is required for avro deserialization'
9493 ) ;
9594 }
96- const deserializer = await import ( './deserializer/avro.js' ) ;
97- return deserializer . deserialize ( value , config . schema ) ;
95+ return deserializer ( value , config . schema ) ;
9896 }
9997 if ( config . type === 'protobuf' ) {
10098 if ( ! config . schema ) {
10199 throw new KafkaConsumerProtobufMissingSchemaError (
102- 'Schema string is required for Protobuf deserialization'
100+ 'Schema string is required for protobuf deserialization'
103101 ) ;
104102 }
105- const deserializer = await import ( './deserializer/protobuf.js' ) ;
106- return deserializer . deserialize ( value , config . schema ) ;
103+ return deserializer ( value , config . schema ) ;
107104 }
108105} ;
109106
110107/**
111- * Deserialize the key of a Kafka record .
108+ * Get the deserializer function based on the provided type .
112109 *
113- * If the key is `undefined`, it returns `undefined`.
114- *
115- * @param key - The base64-encoded key to deserialize.
116- * @param config - The schema configuration for deserializing the key. See {@link SchemaConfigValue | `SchemaConfigValue`}.
110+ * @param type - The type of deserializer to use. Supported types are: `json`, `avro`, `protobuf`, or `undefined`.
111+ * If `undefined`, it defaults to deserializing as a primitive string.
117112 */
118- const deserializeKey = async ( key ?: string , config ?: SchemaConfigValue ) => {
119- if ( key === undefined || key === '' ) {
120- return undefined ;
113+ const getDeserializer = async ( type ?: string ) => {
114+ if ( ! type ) {
115+ return deserializePrimitive as Deserializer ;
116+ }
117+ if ( type === 'json' ) {
118+ return deserializeJson as Deserializer ;
119+ }
120+ if ( type === 'protobuf' ) {
121+ const deserializer = await import ( './deserializer/protobuf.js' ) ;
122+ return deserializer . deserialize as Deserializer ;
123+ }
124+ if ( type === 'avro' ) {
125+ const deserializer = await import ( './deserializer/avro.js' ) ;
126+ return deserializer . deserialize as Deserializer ;
121127 }
122- if ( isNull ( key ) ) return null ;
123- return await deserialize ( key , config ) ;
128+ throw new KafkaConsumerDeserializationError (
129+ `Unsupported deserialization type: ${ type } . Supported types are: json, avro, protobuf.`
130+ ) ;
124131} ;
125132
126- const parseSchema = async ( value : unknown , schema : StandardSchemaV1 ) => {
127- let result = schema [ '~standard' ] . validate ( value ) ;
133+ /**
134+ * Parse a value against a provided schema using the `~standard` property for validation.
135+ *
136+ * @param value - The value to parse against the schema.
137+ * @param schema - The schema to validate against, which should be a {@link StandardSchemaV1 | `Standard Schema V1`} object.
138+ */
139+ const parseSchema = ( value : unknown , schema : StandardSchemaV1 ) => {
140+ const result = schema [ '~standard' ] . validate ( value ) ;
128141 /* v8 ignore start */
129- if ( result instanceof Promise ) result = await result ;
130- /* v8 ignore stop */
131- if ( result . issues ) {
142+ if ( result instanceof Promise )
132143 throw new KafkaConsumerParserError (
133- ` Schema validation failed ${ result . issues } `
144+ ' Schema parsing supports only synchronous validation'
134145 ) ;
146+ /* v8 ignore stop */
147+ if ( result . issues ) {
148+ throw new KafkaConsumerParserError ( 'Schema validation failed' , {
149+ cause : result . issues ,
150+ } ) ;
135151 }
136152 return result . value ;
137153} ;
@@ -142,24 +158,45 @@ const parseSchema = async (value: unknown, schema: StandardSchemaV1) => {
142158 * @param record - A single record from the MSK event.
143159 * @param config - The schema configuration for deserializing the record's key and value.
144160 */
145- const deserializeRecord = async ( record : KafkaRecord , config : SchemaConfig ) => {
161+ const deserializeRecord = async (
162+ record : KafkaRecord ,
163+ config ?: SchemaConfig
164+ ) => {
146165 const { key, value, headers, ...rest } = record ;
147- const { key : keyConfig , value : valueConfig } = config ;
166+ const { key : keyConfig , value : valueConfig } = config || { } ;
148167
149- const deserializedKey = await deserializeKey ( key , keyConfig ) ;
150- const deserializedValue = await deserialize ( value , valueConfig ) ;
168+ const deserializerKey = await getDeserializer ( keyConfig ?. type ) ;
169+ const deserializerValue = await getDeserializer ( valueConfig ?. type ) ;
151170
152171 return {
153172 ...rest ,
154- key : keyConfig ?. parserSchema
155- ? await parseSchema ( deserializedKey , keyConfig . parserSchema )
156- : deserializedKey ,
157- value : valueConfig ?. parserSchema
158- ? await parseSchema ( deserializedValue , valueConfig . parserSchema )
159- : deserializedValue ,
173+ get key ( ) {
174+ if ( key === undefined || key === '' ) {
175+ return undefined ;
176+ }
177+ if ( isNull ( key ) ) return null ;
178+ const deserializedKey = deserialize ( key , deserializerKey , keyConfig ) ;
179+
180+ return keyConfig ?. parserSchema
181+ ? parseSchema ( deserializedKey , keyConfig . parserSchema )
182+ : deserializedKey ;
183+ } ,
160184 originalKey : key ,
185+ get value ( ) {
186+ const deserializedValue = deserialize (
187+ value ,
188+ deserializerValue ,
189+ valueConfig
190+ ) ;
191+
192+ return valueConfig ?. parserSchema
193+ ? parseSchema ( deserializedValue , valueConfig . parserSchema )
194+ : deserializedValue ;
195+ } ,
161196 originalValue : value ,
162- headers : deserializeHeaders ( headers ) ,
197+ get headers ( ) {
198+ return deserializeHeaders ( headers ) ;
199+ } ,
163200 originalHeaders : headers ,
164201 } ;
165202} ;
@@ -202,15 +239,20 @@ const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => {
202239 */
203240const kafkaConsumer = < K , V > (
204241 handler : AsyncHandler < Handler < ConsumerRecords < K , V > > > ,
205- config : SchemaConfig
242+ config ? : SchemaConfig
206243) : ( ( event : MSKEvent , context : Context ) => Promise < unknown > ) => {
207244 return async ( event : MSKEvent , context : Context ) : Promise < unknown > => {
208245 assertIsMSKEvent ( event ) ;
209246
210247 const consumerRecords : ConsumerRecord < K , V > [ ] = [ ] ;
211248 for ( const recordsArray of Object . values ( event . records ) ) {
212249 for ( const record of recordsArray ) {
213- consumerRecords . push ( await deserializeRecord ( record , config ) ) ;
250+ consumerRecords . push (
251+ ( await deserializeRecord (
252+ record ,
253+ config
254+ ) ) as unknown as ConsumerRecord < K , V >
255+ ) ;
214256 }
215257 }
216258
0 commit comments