@@ -47,19 +47,19 @@ public GenericDeserializerImpl(
4747 if ( config . UseLatestVersion != null ) { this . useLatestVersion = config . UseLatestVersion . Value ; }
4848 if ( config . UseLatestWithMetadata != null ) { this . useLatestWithMetadata = config . UseLatestWithMetadata ; }
4949 if ( config . SubjectNameStrategy != null ) { this . subjectNameStrategy = config . SubjectNameStrategy . Value . ToDelegate ( ) ; }
50- if ( config . SchemaIdStrategy != null ) { this . schemaIdDeserializer = config . SchemaIdStrategy . Value . ToDeserializer ( ) ; }
50+ if ( config . SchemaIdStrategy != null ) { this . schemaIdDecoder = config . SchemaIdStrategy . Value . ToDeserializer ( ) ; }
5151 }
5252
5353 public override async Task < GenericRecord > DeserializeAsync ( ReadOnlyMemory < byte > data , bool isNull ,
5454 SerializationContext context )
5555 {
5656 return isNull
5757 ? default
58- : await Deserialize ( context . Topic , context . Headers , data . ToArray ( ) ,
58+ : await Deserialize ( context . Topic , context . Headers , data ,
5959 context . Component == MessageComponentType . Key ) . ConfigureAwait ( false ) ;
6060 }
6161
62- public async Task < GenericRecord > Deserialize ( string topic , Headers headers , byte [ ] array , bool isKey )
62+ public async Task < GenericRecord > Deserialize ( string topic , Headers headers , ReadOnlyMemory < byte > array , bool isKey )
6363 {
6464 try
6565 {
@@ -83,68 +83,74 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, byte
8383 SerializationContext context = new SerializationContext (
8484 isKey ? MessageComponentType . Key : MessageComponentType . Value , topic , headers ) ;
8585 SchemaId writerId = new SchemaId ( SchemaType . Avro ) ;
86- using ( var stream = schemaIdDeserializer . Deserialize ( array , context , ref writerId ) )
86+ var payload = schemaIdDecoder . Decode ( array , context , ref writerId ) ;
87+
88+ ( writerSchemaJson , writerSchema ) = await GetWriterSchema ( subject , writerId ) . ConfigureAwait ( false ) ;
89+ if ( subject == null )
8790 {
88- ( writerSchemaJson , writerSchema ) = await GetWriterSchema ( subject , writerId ) . ConfigureAwait ( false ) ;
89- if ( subject = = null )
91+ subject = GetSubjectName ( topic , isKey , writerSchema . Fullname ) ;
92+ if ( subject ! = null )
9093 {
91- subject = GetSubjectName ( topic , isKey , writerSchema . Fullname ) ;
92- if ( subject != null )
93- {
94- latestSchema = await GetReaderSchema ( subject )
95- . ConfigureAwait ( continueOnCapturedContext : false ) ;
96- }
97- }
98-
99- if ( latestSchema != null )
100- {
101- migrations = await GetMigrations ( subject , writerSchemaJson , latestSchema )
94+ latestSchema = await GetReaderSchema ( subject )
10295 . ConfigureAwait ( continueOnCapturedContext : false ) ;
10396 }
97+ }
10498
105- DatumReader < GenericRecord > datumReader ;
106- if ( migrations . Count > 0 )
99+ if ( latestSchema != null )
100+ {
101+ migrations = await GetMigrations ( subject , writerSchemaJson , latestSchema )
102+ . ConfigureAwait ( continueOnCapturedContext : false ) ;
103+ }
104+
105+ DatumReader < GenericRecord > datumReader ;
106+ if ( migrations . Count > 0 )
107+ {
108+ using ( var stream = new MemoryStream ( payload . ToArray ( ) ) )
107109 {
108110 data = new GenericReader < GenericRecord > ( writerSchema , writerSchema )
109111 . Read ( default ( GenericRecord ) , new BinaryDecoder ( stream ) ) ;
112+ }
113+
114+ string jsonString ;
115+ using ( var jsonStream = new MemoryStream ( ) )
116+ {
117+ GenericRecord record = data ;
118+ DatumWriter < object > datumWriter = new GenericDatumWriter < object > ( writerSchema ) ;
110119
111- string jsonString ;
112- using ( var jsonStream = new MemoryStream ( ) )
113- {
114- GenericRecord record = data ;
115- DatumWriter < object > datumWriter = new GenericDatumWriter < object > ( writerSchema ) ;
120+ JsonEncoder encoder = new JsonEncoder ( writerSchema , jsonStream ) ;
121+ datumWriter . Write ( record , encoder ) ;
122+ encoder . Flush ( ) ;
116123
117- JsonEncoder encoder = new JsonEncoder ( writerSchema , jsonStream ) ;
118- datumWriter . Write ( record , encoder ) ;
119- encoder . Flush ( ) ;
124+ jsonString = Encoding . UTF8 . GetString ( jsonStream . ToArray ( ) ) ;
125+ }
120126
121- jsonString = Encoding . UTF8 . GetString ( jsonStream . ToArray ( ) ) ;
122- }
127+ JToken json = JToken . Parse ( jsonString ) ;
128+ json = await ExecuteMigrations ( migrations , isKey , subject , topic , headers , json )
129+ . ContinueWith ( t => ( JToken ) t . Result )
130+ . ConfigureAwait ( continueOnCapturedContext : false ) ;
131+ readerSchemaJson = latestSchema ;
132+ readerSchema = await GetParsedSchema ( readerSchemaJson ) . ConfigureAwait ( false ) ;
133+ Avro . IO . Decoder decoder = new JsonDecoder ( readerSchema , json . ToString ( Formatting . None ) ) ;
123134
124- JToken json = JToken . Parse ( jsonString ) ;
125- json = await ExecuteMigrations ( migrations , isKey , subject , topic , headers , json )
126- . ContinueWith ( t => ( JToken ) t . Result )
127- . ConfigureAwait ( continueOnCapturedContext : false ) ;
135+ datumReader = new GenericReader < GenericRecord > ( readerSchema , readerSchema ) ;
136+ data = datumReader . Read ( default ( GenericRecord ) , decoder ) ;
137+ }
138+ else
139+ {
140+ if ( latestSchema != null )
141+ {
128142 readerSchemaJson = latestSchema ;
129143 readerSchema = await GetParsedSchema ( readerSchemaJson ) . ConfigureAwait ( false ) ;
130- Avro . IO . Decoder decoder = new JsonDecoder ( readerSchema , json . ToString ( Formatting . None ) ) ;
131-
132- datumReader = new GenericReader < GenericRecord > ( readerSchema , readerSchema ) ;
133- data = datumReader . Read ( default ( GenericRecord ) , decoder ) ;
134144 }
135145 else
136146 {
137- if ( latestSchema != null )
138- {
139- readerSchemaJson = latestSchema ;
140- readerSchema = await GetParsedSchema ( readerSchemaJson ) . ConfigureAwait ( false ) ;
141- }
142- else
143- {
144- readerSchemaJson = writerSchemaJson ;
145- readerSchema = writerSchema ;
146- }
147- datumReader = await GetDatumReader ( writerSchema , readerSchema ) . ConfigureAwait ( false ) ;
147+ readerSchemaJson = writerSchemaJson ;
148+ readerSchema = writerSchema ;
149+ }
150+ datumReader = await GetDatumReader ( writerSchema , readerSchema ) . ConfigureAwait ( false ) ;
151+
152+ using ( var stream = new MemoryStream ( payload . ToArray ( ) ) )
153+ {
148154 data = datumReader . Read ( default ( GenericRecord ) , new BinaryDecoder ( stream ) ) ;
149155 }
150156 }
0 commit comments