1- import type { ZodSchema , z } from 'zod' ;
1+ import { ZodError , type ZodIssue , type ZodSchema , type z } from 'zod' ;
22import { ParseError } from '../errors.js' ;
3- import { KinesisFirehoseSchema } from '../schemas/index.js' ;
3+ import {
4+ type KinesisFirehoseRecordSchema ,
5+ KinesisFirehoseSchema ,
6+ } from '../schemas/index.js' ;
47import type { ParsedResult } from '../types/index.js' ;
5- import { Envelope , envelopeDiscriminator } from './envelope.js' ;
8+ import { envelopeDiscriminator } from './envelope.js' ;
69
710/**
811 * Kinesis Firehose Envelope to extract array of Records
@@ -23,10 +26,33 @@ export const KinesisFirehoseEnvelope = {
2326 */
2427 [ envelopeDiscriminator ] : 'array' as const ,
2528 parse < T extends ZodSchema > ( data : unknown , schema : T ) : z . infer < T > [ ] {
26- const parsedEnvelope = KinesisFirehoseSchema . parse ( data ) ;
29+ let parsedEnvelope : z . infer < typeof KinesisFirehoseSchema > ;
30+ try {
31+ parsedEnvelope = KinesisFirehoseSchema . parse ( data ) ;
32+ } catch ( error ) {
33+ throw new ParseError ( 'Failed to parse Kinesis Firehose envelope' , {
34+ cause : error as Error ,
35+ } ) ;
36+ }
2737
28- return parsedEnvelope . records . map ( ( record ) => {
29- return Envelope . parse ( record . data , schema ) ;
38+ return parsedEnvelope . records . map ( ( record , recordIndex ) => {
39+ let parsedRecord : z . infer < typeof KinesisFirehoseRecordSchema > ;
40+ try {
41+ parsedRecord = schema . parse ( record . data ) ;
42+ } catch ( error ) {
43+ throw new ParseError (
44+ `Failed to parse Kinesis Firehose record at index ${ recordIndex } ` ,
45+ {
46+ cause : new ZodError (
47+ ( error as ZodError ) . issues . map ( ( issue ) => ( {
48+ ...issue ,
49+ path : [ 'records' , recordIndex , 'data' , ...issue . path ] ,
50+ } ) )
51+ ) ,
52+ }
53+ ) ;
54+ }
55+ return parsedRecord ;
3056 } ) ;
3157 } ,
3258
@@ -35,7 +61,6 @@ export const KinesisFirehoseEnvelope = {
3561 schema : T
3662 ) : ParsedResult < unknown , z . infer < T > [ ] > {
3763 const parsedEnvelope = KinesisFirehoseSchema . safeParse ( data ) ;
38-
3964 if ( ! parsedEnvelope . success ) {
4065 return {
4166 success : false ,
@@ -45,25 +70,49 @@ export const KinesisFirehoseEnvelope = {
4570 originalEvent : data ,
4671 } ;
4772 }
48- const parsedRecords : z . infer < T > [ ] = [ ] ;
4973
50- for ( const record of parsedEnvelope . data . records ) {
51- const parsedData = Envelope . safeParse ( record . data , schema ) ;
52- if ( ! parsedData . success ) {
53- return {
54- success : false ,
55- error : new ParseError ( 'Failed to parse Kinesis Firehose record' , {
56- cause : parsedData . error ,
57- } ) ,
58- originalEvent : data ,
59- } ;
60- }
61- parsedRecords . push ( parsedData . data ) ;
74+ const result = parsedEnvelope . data . records . reduce < {
75+ success : boolean ;
76+ records : z . infer < T > [ ] ;
77+ errors : {
78+ [ key : number | string ] : { issues : ZodIssue [ ] } ;
79+ } ;
80+ } > (
81+ ( acc , record , index ) => {
82+ const parsedRecord = schema . safeParse ( record . data ) ;
83+
84+ if ( ! parsedRecord . success ) {
85+ const issues = parsedRecord . error . issues . map ( ( issue ) => ( {
86+ ...issue ,
87+ path : [ 'records' , index , 'data' , ...issue . path ] ,
88+ } ) ) ;
89+ acc . success = false ;
90+ acc . errors [ index ] = { issues } ;
91+ return acc ;
92+ }
93+
94+ acc . records . push ( parsedRecord . data ) ;
95+ return acc ;
96+ } ,
97+ { success : true , records : [ ] , errors : { } }
98+ ) ;
99+
100+ if ( result . success ) {
101+ return { success : true , data : result . records } ;
62102 }
63103
104+ const errorMessage =
105+ Object . keys ( result . errors ) . length > 1
106+ ? `Failed to parse Kinesis Firehose records at indexes ${ Object . keys ( result . errors ) . join ( ', ' ) } `
107+ : `Failed to parse Kinesis Firehose record at index ${ Object . keys ( result . errors ) [ 0 ] } ` ;
64108 return {
65- success : true ,
66- data : parsedRecords ,
109+ success : false ,
110+ error : new ParseError ( errorMessage , {
111+ cause : new ZodError (
112+ Object . values ( result . errors ) . flatMap ( ( error ) => error . issues )
113+ ) ,
114+ } ) ,
115+ originalEvent : data ,
67116 } ;
68117 } ,
69118} ;
0 commit comments