@@ -512,7 +512,10 @@ private void InitializeSerializers(
512512 {
513513 this . keySerializer = ( ISerializer < TKey > ) serializer ;
514514 }
515- else if ( typeof ( TValue ) == typeof ( Memory < byte > ) || typeof ( TValue ) == typeof ( ReadOnlyMemory < byte > ) )
515+ else if ( typeof ( TKey ) == typeof ( Memory < byte > )
516+ || typeof ( TKey ) == typeof ( ReadOnlyMemory < byte > )
517+ || typeof ( TKey ) == typeof ( Memory < byte > ? )
518+ || typeof ( TKey ) == typeof ( ReadOnlyMemory < byte > ? ) )
516519 {
517520 // Serializers are not used for Memory<byte>.
518521 }
@@ -542,7 +545,10 @@ private void InitializeSerializers(
542545 {
543546 this . valueSerializer = ( ISerializer < TValue > ) serializer ;
544547 }
545- else if ( typeof ( TValue ) == typeof ( Memory < byte > ) || typeof ( TValue ) == typeof ( ReadOnlyMemory < byte > ) )
548+ else if ( typeof ( TValue ) == typeof ( Memory < byte > )
549+ || typeof ( TValue ) == typeof ( ReadOnlyMemory < byte > )
550+ || typeof ( TValue ) == typeof ( Memory < byte > ? )
551+ || typeof ( TValue ) == typeof ( ReadOnlyMemory < byte > ? ) )
546552 {
547553 // Serializers are not used for Memory<byte>.
548554 }
@@ -765,24 +771,27 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
765771 {
766772 Headers headers = message . Headers ?? new Headers ( ) ;
767773
768- ReadOnlyMemory < byte > ? keyBytes ;
774+ ReadOnlyMemory < byte > ? keyBytes = null ;
769775 try
770776 {
771- if ( message . Key is Memory < byte > memory )
777+ if ( keySerializer != null )
778+ {
779+ byte [ ] keyBytesArray = keySerializer . Serialize ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) ) ;
780+ keyBytes = keyBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : keyBytesArray ;
781+ }
782+ else if ( asyncKeySerializer != null )
783+ {
784+ byte [ ] keyBytesArray = await asyncKeySerializer . SerializeAsync ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) ) . ConfigureAwait ( false ) ;
785+ keyBytes = keyBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : keyBytesArray ;
786+ }
787+ else if ( message . Key is Memory < byte > memory )
772788 {
773789 keyBytes = memory ;
774790 }
775791 else if ( message . Key is ReadOnlyMemory < byte > readOnlyMemory )
776792 {
777793 keyBytes = readOnlyMemory ;
778794 }
779- else
780- {
781- byte [ ] keyBytesArray = keySerializer != null
782- ? keySerializer . Serialize ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) )
783- : await asyncKeySerializer . SerializeAsync ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) ) . ConfigureAwait ( false ) ;
784- keyBytes = keyBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : keyBytesArray ;
785- }
786795 }
787796 catch ( Exception ex )
788797 {
@@ -796,24 +805,27 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
796805 ex ) ;
797806 }
798807
799- ReadOnlyMemory < byte > ? valBytes ;
808+ ReadOnlyMemory < byte > ? valBytes = null ;
800809 try
801810 {
802- if ( message . Value is Memory < byte > memory )
811+ if ( valueSerializer != null )
812+ {
813+ byte [ ] valueBytesArray = valueSerializer . Serialize ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) ) ;
814+ valBytes = valueBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : valueBytesArray ;
815+ }
816+ else if ( asyncValueSerializer != null )
817+ {
818+ byte [ ] valueBytesArray = await asyncValueSerializer . SerializeAsync ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) ) . ConfigureAwait ( false ) ;
819+ valBytes = valueBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : valueBytesArray ;
820+ }
821+ else if ( message . Value is Memory < byte > memory )
803822 {
804823 valBytes = memory ;
805824 }
806825 else if ( message . Value is ReadOnlyMemory < byte > readOnlyMemory )
807826 {
808827 valBytes = readOnlyMemory ;
809828 }
810- else
811- {
812- byte [ ] valBytesArray = valueSerializer != null
813- ? valueSerializer . Serialize ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) )
814- : await asyncValueSerializer . SerializeAsync ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) ) . ConfigureAwait ( false ) ;
815- valBytes = valBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : valBytesArray ;
816- }
817829 }
818830 catch ( Exception ex )
819831 {
@@ -912,24 +924,26 @@ public void Produce(
912924
913925 Headers headers = message . Headers ?? new Headers ( ) ;
914926
915- ReadOnlyMemory < byte > ? keyBytes ;
927+ ReadOnlyMemory < byte > ? keyBytes = null ;
916928 try
917929 {
918- if ( message . Key is Memory < byte > memory )
930+ if ( keySerializer != null )
931+ {
932+ byte [ ] keyBytesArray = keySerializer . Serialize ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) ) ;
933+ keyBytes = keyBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : keyBytesArray ;
934+ }
935+ else if ( asyncKeySerializer != null )
936+ {
937+ throw new InvalidOperationException ( "Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required." ) ;
938+ }
939+ else if ( message . Key is Memory < byte > memory )
919940 {
920941 keyBytes = memory ;
921942 }
922943 else if ( message . Key is ReadOnlyMemory < byte > readOnlyMemory )
923944 {
924945 keyBytes = readOnlyMemory ;
925946 }
926- else
927- {
928- byte [ ] keyBytesArray = keySerializer != null
929- ? keySerializer . Serialize ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) )
930- : throw new InvalidOperationException ( "Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required." ) ;
931- keyBytes = keyBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : keyBytesArray ;
932- }
933947 }
934948 catch ( Exception ex )
935949 {
@@ -943,24 +957,26 @@ public void Produce(
943957 ex ) ;
944958 }
945959
946- ReadOnlyMemory < byte > ? valBytes ;
960+ ReadOnlyMemory < byte > ? valBytes = null ;
947961 try
948962 {
949- if ( message . Value is Memory < byte > memory )
963+ if ( valueSerializer != null )
964+ {
965+ byte [ ] valueBytesArray = valueSerializer . Serialize ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) ) ;
966+ valBytes = valueBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : valueBytesArray ;
967+ }
968+ else if ( asyncValueSerializer != null )
969+ {
970+ throw new InvalidOperationException ( "Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required." ) ;
971+ }
972+ else if ( message . Value is Memory < byte > memory )
950973 {
951974 valBytes = memory ;
952975 }
953976 else if ( message . Value is ReadOnlyMemory < byte > readOnlyMemory )
954977 {
955978 valBytes = readOnlyMemory ;
956979 }
957- else
958- {
959- byte [ ] valBytesArray = valueSerializer != null
960- ? valueSerializer . Serialize ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) )
961- : throw new InvalidOperationException ( "Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required." ) ;
962- valBytes = valBytesArray == null ? ( ReadOnlyMemory < byte > ? ) null : valBytesArray ;
963- }
964980 }
965981 catch ( Exception ex )
966982 {
0 commit comments