@@ -42,8 +42,8 @@ internal class Config
4242 public PartitionerDelegate defaultPartitioner ;
4343 }
4444
45- private ISerializer < TKey > keySerializer ;
46- private ISerializer < TValue > valueSerializer ;
45+ private Func < TKey , SerializationContext , ReadOnlyMemory < byte > ? > serializeKey ;
46+ private Func < TValue , SerializationContext , ReadOnlyMemory < byte > ? > serializeValue ;
4747 private IAsyncSerializer < TKey > asyncKeySerializer ;
4848 private IAsyncSerializer < TValue > asyncValueSerializer ;
4949
@@ -58,6 +58,14 @@ internal class Config
5858 { typeof ( byte [ ] ) , Serializers . ByteArray }
5959 } ;
6060
61+ private static readonly Dictionary < Type , object > memorySerializeFuncs = new Dictionary < Type , object >
62+ {
63+ [ typeof ( Memory < byte > ) ] = ( Memory < byte > x , SerializationContext _ ) => ( ReadOnlyMemory < byte > ? ) x ,
64+ [ typeof ( Memory < byte > ? ) ] = ( Memory < byte > ? x , SerializationContext _ ) => ( ReadOnlyMemory < byte > ? ) x ,
65+ [ typeof ( ReadOnlyMemory < byte > ) ] = ( ReadOnlyMemory < byte > x , SerializationContext _ ) => ( ReadOnlyMemory < byte > ? ) x ,
66+ [ typeof ( ReadOnlyMemory < byte > ? ) ] = ( ReadOnlyMemory < byte > ? x , SerializationContext _ ) => x ,
67+ } ;
68+
6169 private int cancellationDelayMaxMs ;
6270 private bool disposeHasBeenCalled = false ;
6371 private object disposeHasBeenCalledLockObj = new object ( ) ;
@@ -510,14 +518,12 @@ private void InitializeSerializers(
510518 {
511519 if ( defaultSerializers . TryGetValue ( typeof ( TKey ) , out object serializer ) )
512520 {
513- this . keySerializer = ( ISerializer < TKey > ) serializer ;
521+ keySerializer = ( ISerializer < TKey > ) serializer ;
522+ this . serializeKey = ( k , ctx ) => keySerializer . Serialize ( k , ctx ) ? . AsMemory ( ) ;
514523 }
515- else if ( typeof ( TKey ) == typeof ( Memory < byte > )
516- || typeof ( TKey ) == typeof ( ReadOnlyMemory < byte > )
517- || typeof ( TKey ) == typeof ( Memory < byte > ? )
518- || typeof ( TKey ) == typeof ( ReadOnlyMemory < byte > ? ) )
524+ else if ( memorySerializeFuncs . TryGetValue ( typeof ( TKey ) , out object serialize ) )
519525 {
520- // Serializers are not used for Memory <byte>.
526+ this . serializeKey = ( Func < TKey , SerializationContext , ReadOnlyMemory < byte > ? > ) serialize ;
521527 }
522528 else
523529 {
@@ -531,7 +537,7 @@ private void InitializeSerializers(
531537 }
532538 else if ( keySerializer != null && asyncKeySerializer == null )
533539 {
534- this . keySerializer = keySerializer ;
540+ this . serializeKey = ( k , ctx ) => keySerializer . Serialize ( k , ctx ) ? . AsMemory ( ) ;
535541 }
536542 else
537543 {
@@ -543,14 +549,12 @@ private void InitializeSerializers(
543549 {
544550 if ( defaultSerializers . TryGetValue ( typeof ( TValue ) , out object serializer ) )
545551 {
546- this . valueSerializer = ( ISerializer < TValue > ) serializer ;
552+ valueSerializer = ( ISerializer < TValue > ) serializer ;
553+ this . serializeValue = ( k , ctx ) => valueSerializer . Serialize ( k , ctx ) ? . AsMemory ( ) ;
547554 }
548- else if ( typeof ( TValue ) == typeof ( Memory < byte > )
549- || typeof ( TValue ) == typeof ( ReadOnlyMemory < byte > )
550- || typeof ( TValue ) == typeof ( Memory < byte > ? )
551- || typeof ( TValue ) == typeof ( ReadOnlyMemory < byte > ? ) )
555+ else if ( memorySerializeFuncs . TryGetValue ( typeof ( TKey ) , out object serialize ) )
552556 {
553- // Serializers are not used for Memory <byte>.
557+ this . serializeValue = ( Func < TValue , SerializationContext , ReadOnlyMemory < byte > ? > ) serialize ;
554558 }
555559 else
556560 {
@@ -564,7 +568,7 @@ private void InitializeSerializers(
564568 }
565569 else if ( valueSerializer != null && asyncValueSerializer == null )
566570 {
567- this . valueSerializer = valueSerializer ;
571+ this . serializeValue = ( k , ctx ) => valueSerializer . Serialize ( k , ctx ) ? . AsMemory ( ) ;
568572 }
569573 else
570574 {
@@ -773,28 +777,9 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
773777 ReadOnlyMemory < byte > ? keyBytes ;
774778 try
775779 {
776- if ( keySerializer != null )
777- {
778- SerializationContext ctx = new ( MessageComponentType . Key , topicPartition . Topic , headers ) ;
779- keyBytes = keySerializer . Serialize ( message . Key , ctx ) ? . AsMemory ( ) ;
780- }
781- else if ( asyncKeySerializer != null )
782- {
783- SerializationContext ctx = new ( MessageComponentType . Key , topicPartition . Topic , headers ) ;
784- keyBytes = ( await asyncKeySerializer . SerializeAsync ( message . Key , ctx ) . ConfigureAwait ( false ) ) ? . AsMemory ( ) ;
785- }
786- else if ( message . Key is Memory < byte > memory )
787- {
788- keyBytes = memory ;
789- }
790- else if ( message . Key is ReadOnlyMemory < byte > readOnlyMemory )
791- {
792- keyBytes = readOnlyMemory ;
793- }
794- else // Null Memory<byte>?
795- {
796- keyBytes = null ;
797- }
780+ keyBytes = ( serializeKey != null )
781+ ? serializeKey ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) )
782+ : await asyncKeySerializer . SerializeAsync ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) ) . ConfigureAwait ( false ) ;
798783 }
799784 catch ( Exception ex )
800785 {
@@ -811,28 +796,9 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
811796 ReadOnlyMemory < byte > ? valBytes ;
812797 try
813798 {
814- if ( valueSerializer != null )
815- {
816- SerializationContext ctx = new ( MessageComponentType . Value , topicPartition . Topic , headers ) ;
817- valBytes = valueSerializer . Serialize ( message . Value , ctx ) ? . AsMemory ( ) ;
818- }
819- else if ( asyncValueSerializer != null )
820- {
821- SerializationContext ctx = new ( MessageComponentType . Value , topicPartition . Topic , headers ) ;
822- valBytes = ( await asyncValueSerializer . SerializeAsync ( message . Value , ctx ) . ConfigureAwait ( false ) ) ? . AsMemory ( ) ;
823- }
824- else if ( message . Value is Memory < byte > memory )
825- {
826- valBytes = memory ;
827- }
828- else if ( message . Value is ReadOnlyMemory < byte > readOnlyMemory )
829- {
830- valBytes = readOnlyMemory ;
831- }
832- else // Null Memory<byte>?
833- {
834- valBytes = null ;
835- }
799+ valBytes = ( serializeValue != null )
800+ ? serializeValue ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) )
801+ : await asyncValueSerializer . SerializeAsync ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) ) . ConfigureAwait ( false ) ;
836802 }
837803 catch ( Exception ex )
838804 {
@@ -934,27 +900,9 @@ public void Produce(
934900 ReadOnlyMemory < byte > ? keyBytes ;
935901 try
936902 {
937- if ( keySerializer != null )
938- {
939- SerializationContext ctx = new ( MessageComponentType . Key , topicPartition . Topic , headers ) ;
940- keyBytes = keySerializer . Serialize ( message . Key , ctx ) ? . AsMemory ( ) ;
941- }
942- else if ( asyncKeySerializer != null )
943- {
944- throw new InvalidOperationException ( "Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required." ) ;
945- }
946- else if ( message . Key is Memory < byte > memory )
947- {
948- keyBytes = memory ;
949- }
950- else if ( message . Key is ReadOnlyMemory < byte > readOnlyMemory )
951- {
952- keyBytes = readOnlyMemory ;
953- }
954- else // Null Memory<byte>?
955- {
956- keyBytes = null ;
957- }
903+ keyBytes = ( serializeKey != null )
904+ ? serializeKey ( message . Key , new SerializationContext ( MessageComponentType . Key , topicPartition . Topic , headers ) )
905+ : throw new InvalidOperationException ( "Produce called with an IAsyncSerializer key serializer configured but an ISerializer is required." ) ;
958906 }
959907 catch ( Exception ex )
960908 {
@@ -971,27 +919,9 @@ public void Produce(
971919 ReadOnlyMemory < byte > ? valBytes ;
972920 try
973921 {
974- if ( valueSerializer != null )
975- {
976- SerializationContext ctx = new ( MessageComponentType . Value , topicPartition . Topic , headers ) ;
977- valBytes = valueSerializer . Serialize ( message . Value , ctx ) ? . AsMemory ( ) ;
978- }
979- else if ( asyncValueSerializer != null )
980- {
981- throw new InvalidOperationException ( "Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required." ) ;
982- }
983- else if ( message . Value is Memory < byte > memory )
984- {
985- valBytes = memory ;
986- }
987- else if ( message . Value is ReadOnlyMemory < byte > readOnlyMemory )
988- {
989- valBytes = readOnlyMemory ;
990- }
991- else // Null Memory<byte>?
992- {
993- valBytes = null ;
994- }
922+ valBytes = ( serializeValue != null )
923+ ? serializeValue ( message . Value , new SerializationContext ( MessageComponentType . Value , topicPartition . Topic , headers ) )
924+ : throw new InvalidOperationException ( "Produce called with an IAsyncSerializer value serializer configured but an ISerializer is required." ) ;
995925 }
996926 catch ( Exception ex )
997927 {
0 commit comments