1010from quixstreams .models .serializers import (
1111 DESERIALIZERS ,
1212 SERIALIZERS ,
13- BytesDeserializer ,
14- BytesSerializer ,
1513 Deserializer ,
16- DeserializerIsNotProvidedError ,
1714 DeserializerType ,
1815 IgnoreMessage ,
1916 MessageField ,
2017 SerializationContext ,
2118 Serializer ,
22- SerializerIsNotProvidedError ,
2319 SerializerType ,
2420)
2521from quixstreams .models .timestamps import TimestampType
@@ -57,29 +53,39 @@ def as_dict(self):
5753 return dataclasses .asdict (self )
5854
5955
60- def _get_serializer (serializer : Optional [ SerializerType ] ) -> Optional [ Serializer ] :
56+ def _resolve_serializer (serializer : SerializerType ) -> Serializer :
6157 if isinstance (serializer , str ):
6258 try :
6359 return SERIALIZERS [serializer ]()
6460 except KeyError :
6561 raise ValueError (
66- f" Unknown deserializer option ' { serializer } '; "
62+ f' Unknown serializer option " { serializer } "; '
6763 f"valid options are { list (SERIALIZERS .keys ())} "
6864 )
65+ elif not isinstance (serializer , Serializer ):
66+ raise ValueError (
67+ f"Serializer must be either one of { list (SERIALIZERS .keys ())} "
68+ f'or a subclass of Serializer; got "{ serializer } "'
69+ )
6970 return serializer
7071
7172
72- def _get_deserializer (
73- deserializer : Optional [ DeserializerType ] ,
74- ) -> Optional [ Deserializer ] :
73+ def _resolve_deserializer (
74+ deserializer : DeserializerType ,
75+ ) -> Deserializer :
7576 if isinstance (deserializer , str ):
7677 try :
7778 return DESERIALIZERS [deserializer ]()
7879 except KeyError :
7980 raise ValueError (
80- f" Unknown deserializer option ' { deserializer } '; "
81+ f' Unknown deserializer option " { deserializer } "; '
8182 f"valid options are { list (DESERIALIZERS .keys ())} "
8283 )
84+ elif not isinstance (deserializer , Deserializer ):
85+ raise ValueError (
86+ f"Deserializer must be either one of { list (DESERIALIZERS .keys ())} "
87+ f'or a subclass of Deserializer; got "{ deserializer } "'
88+ )
8389 return deserializer
8490
8591
@@ -103,10 +109,10 @@ def __init__(
103109 name : str ,
104110 topic_type : TopicType = TopicType .REGULAR ,
105111 create_config : Optional [TopicConfig ] = None ,
106- value_deserializer : Optional [ DeserializerType ] = None ,
107- key_deserializer : Optional [ DeserializerType ] = BytesDeserializer () ,
108- value_serializer : Optional [ SerializerType ] = None ,
109- key_serializer : Optional [ SerializerType ] = BytesSerializer () ,
112+ value_deserializer : DeserializerType = "json" ,
113+ key_deserializer : DeserializerType = "bytes" ,
114+ value_serializer : SerializerType = "json" ,
115+ key_serializer : SerializerType = "bytes" ,
110116 timestamp_extractor : Optional [TimestampExtractor ] = None ,
111117 quix_name : str = "" ,
112118 ):
@@ -133,10 +139,10 @@ def __init__(
133139 self .quix_name = quix_name or name
134140 self ._create_config = copy .deepcopy (create_config )
135141 self ._broker_config : Optional [TopicConfig ] = None
136- self ._value_deserializer = _get_deserializer (value_deserializer )
137- self ._key_deserializer = _get_deserializer (key_deserializer )
138- self ._value_serializer = _get_serializer (value_serializer )
139- self ._key_serializer = _get_serializer (key_serializer )
142+ self ._value_deserializer = _resolve_deserializer (value_deserializer )
143+ self ._key_deserializer = _resolve_deserializer (key_deserializer )
144+ self ._value_serializer = _resolve_serializer (value_serializer )
145+ self ._key_serializer = _resolve_serializer (key_serializer )
140146 self ._timestamp_extractor = timestamp_extractor
141147 self ._type = topic_type
142148
@@ -202,36 +208,27 @@ def row_serialize(self, row: Row, key: Any) -> KafkaMessage:
202208 :param key: message key to serialize
203209 :return: KafkaMessage object with serialized values
204210 """
205- if self ._key_serializer is None :
206- raise SerializerIsNotProvidedError (
207- f'Key serializer is not provided for topic "{ self .name } "'
208- )
209- if self ._value_serializer is None :
210- raise SerializerIsNotProvidedError (
211- f'Value serializer is not provided for topic "{ self .name } "'
212- )
213211
212+ serialization_ctx = SerializationContext (
213+ topic = self .name , field = MessageField .KEY , headers = row .headers
214+ )
214215 # Try to serialize the key only if it's not None
215216 # If key is None then pass it as is
216217 # Otherwise, different serializers may serialize None differently
217218 if key is None :
218219 key_serialized = None
219220 else :
220- key_ctx = SerializationContext (
221- topic = self .name , field = MessageField .KEY , headers = row .headers
222- )
223- key_serialized = self ._key_serializer (key , ctx = key_ctx )
221+ key_serialized = self ._key_serializer (key , ctx = serialization_ctx )
224222
225223 # Update message headers with headers supplied by the value serializer.
226224 extra_headers = self ._value_serializer .extra_headers
227225 headers = merge_headers (row .headers , extra_headers )
228- value_ctx = SerializationContext (
229- topic = self .name , field = MessageField .VALUE , headers = row .headers
230- )
226+ serialization_ctx .field = MessageField .VALUE
227+ value_serialized = self ._value_serializer (row .value , ctx = serialization_ctx )
231228
232229 return KafkaMessage (
233230 key = key_serialized ,
234- value = self . _value_serializer ( row . value , ctx = value_ctx ) ,
231+ value = value_serialized ,
235232 headers = headers ,
236233 )
237234
@@ -244,50 +241,46 @@ def row_deserialize(
244241 :param message: an object with interface of `confluent_kafka.Message`
245242 :return: Row, list of Rows or None if the message is ignored.
246243 """
247- if self ._key_deserializer is None :
248- raise DeserializerIsNotProvidedError (
249- f'Key deserializer is not provided for topic "{ self .name } "'
250- )
251- if self ._value_deserializer is None :
252- raise DeserializerIsNotProvidedError (
253- f'Value deserializer is not provided for topic "{ self .name } "'
254- )
255-
256244 headers = message .headers ()
245+ topic = message .topic ()
246+ partition = message .partition ()
247+ offset = message .offset ()
257248
249+ serialization_ctx = SerializationContext (
250+ topic = topic , field = MessageField .KEY , headers = headers
251+ )
258252 if (key_bytes := message .key ()) is None :
259253 key_deserialized = None
260254 else :
261- key_ctx = SerializationContext (
262- topic = message . topic (), field = MessageField . KEY , headers = headers
255+ key_deserialized = self . _key_deserializer (
256+ value = key_bytes , ctx = serialization_ctx
263257 )
264- key_deserialized = self ._key_deserializer (value = key_bytes , ctx = key_ctx )
265258
266259 if (value_bytes := message .value ()) is None :
267260 value_deserialized = None
268261 else :
269- value_ctx = SerializationContext (
270- topic = message . topic (), field = MessageField . VALUE , headers = headers
271- )
262+ # Reuse the SerializationContext object here to avoid creating a new
263+ # one with almost the same fields
264+ serialization_ctx . field = MessageField . VALUE
272265 try :
273266 value_deserialized = self ._value_deserializer (
274- value = value_bytes , ctx = value_ctx
267+ value = value_bytes , ctx = serialization_ctx
275268 )
276269 except IgnoreMessage :
277270 # Ignore message completely if deserializer raised IgnoreValueError.
278271 logger .debug (
279272 'Ignore incoming message: partition="%s[%s]" offset="%s"' ,
280- message . topic () ,
281- message . partition () ,
282- message . offset () ,
273+ topic ,
274+ partition ,
275+ offset ,
283276 )
284277 return None
285278
286279 timestamp_type , timestamp_ms = message .timestamp ()
287280 message_context = MessageContext (
288- topic = message . topic () ,
289- partition = message . partition () ,
290- offset = message . offset () ,
281+ topic = topic ,
282+ partition = partition ,
283+ offset = offset ,
291284 size = len (message ),
292285 leader_epoch = message .leader_epoch (),
293286 )
@@ -312,8 +305,8 @@ def row_deserialize(
312305 )
313306 return rows
314307
315- if self ._timestamp_extractor :
316- timestamp_ms = self . _timestamp_extractor (
308+ if ( timestamp_extractor := self ._timestamp_extractor ) is not None :
309+ timestamp_ms = timestamp_extractor (
317310 value_deserialized , headers , timestamp_ms , TimestampType (timestamp_type )
318311 )
319312
@@ -332,24 +325,13 @@ def serialize(
332325 headers : Optional [Headers ] = None ,
333326 timestamp_ms : Optional [int ] = None ,
334327 ) -> KafkaMessage :
335- if self ._key_serializer :
336- key_ctx = SerializationContext (
337- topic = self .name , field = MessageField .KEY , headers = headers
338- )
339- key = self ._key_serializer (key , ctx = key_ctx )
340- elif key is not None :
341- raise SerializerIsNotProvidedError (
342- f'Key serializer is not provided for topic "{ self .name } "'
343- )
344- if self ._value_serializer :
345- value_ctx = SerializationContext (
346- topic = self .name , field = MessageField .VALUE , headers = headers
347- )
348- value = self ._value_serializer (value , ctx = value_ctx )
349- elif value is not None :
350- raise SerializerIsNotProvidedError (
351- f'Value serializer is not provided for topic "{ self .name } "'
352- )
328+ serialization_ctx = SerializationContext (
329+ topic = self .name , field = MessageField .KEY , headers = headers
330+ )
331+ key = self ._key_serializer (key , ctx = serialization_ctx )
332+ serialization_ctx .field = MessageField .VALUE
333+ value = self ._value_serializer (value , ctx = serialization_ctx )
334+
353335 return KafkaMessage (
354336 key = key ,
355337 value = value ,
@@ -358,30 +340,18 @@ def serialize(
358340 )
359341
360342 def deserialize (self , message : SuccessfulConfluentKafkaMessageProto ):
343+ serialization_ctx = SerializationContext (
344+ topic = message .topic (),
345+ field = MessageField .KEY ,
346+ headers = message .headers (),
347+ )
361348 if (key := message .key ()) is not None :
362- if self ._key_deserializer :
363- key_ctx = SerializationContext (
364- topic = message .topic (),
365- field = MessageField .KEY ,
366- headers = message .headers (),
367- )
368- key = self ._key_deserializer (key , ctx = key_ctx )
369- else :
370- raise DeserializerIsNotProvidedError (
371- f'Key deserializer is not provided for topic "{ self .name } "'
372- )
349+ key = self ._key_deserializer (key , ctx = serialization_ctx )
350+
373351 if (value := message .value ()) is not None :
374- if self ._value_deserializer :
375- value_ctx = SerializationContext (
376- topic = message .topic (),
377- field = MessageField .VALUE ,
378- headers = message .headers (),
379- )
380- value = self ._value_deserializer (value , ctx = value_ctx )
381- else :
382- raise DeserializerIsNotProvidedError (
383- f'Value deserializer is not provided for topic "{ self .name } "'
384- )
352+ serialization_ctx .field = MessageField .VALUE
353+ value = self ._value_deserializer (value , ctx = serialization_ctx )
354+
385355 return KafkaMessage (
386356 key = key ,
387357 value = value ,
0 commit comments