Skip to content

Commit 655953f

Browse files
authored
Add kafka.serializer interfaces (#912)
1 parent f6291e6 commit 655953f

File tree

5 files changed

+67
-24
lines changed

5 files changed

+67
-24
lines changed

kafka/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def emit(self, record):
2626
create_message, create_gzip_message, create_snappy_message)
2727
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
2828
from kafka.structs import TopicPartition, OffsetAndMetadata
29+
from kafka.serializer import Serializer, Deserializer
2930

3031
# To be deprecated when KafkaProducer interface is released
3132
from kafka.client import SimpleClient

kafka/consumer/fetcher.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from kafka.protocol.fetch import FetchRequest
1616
from kafka.protocol.message import PartialMessage
1717
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
18+
from kafka.serializer import Deserializer
1819
from kafka.structs import TopicPartition
1920

2021
log = logging.getLogger(__name__)
@@ -507,15 +508,25 @@ def _unpack_message_set(self, tp, messages):
507508
if absolute_base_offset >= 0:
508509
inner_offset += absolute_base_offset
509510

510-
key, value = self._deserialize(inner_msg)
511+
key = self._deserialize(
512+
self.config['key_deserializer'],
513+
tp.topic, inner_msg.key)
514+
value = self._deserialize(
515+
self.config['value_deserializer'],
516+
tp.topic, inner_msg.value)
511517
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
512518
inner_timestamp, msg.timestamp_type,
513519
key, value, inner_msg.crc,
514520
len(inner_msg.key) if inner_msg.key is not None else -1,
515521
len(inner_msg.value) if inner_msg.value is not None else -1)
516522

517523
else:
518-
key, value = self._deserialize(msg)
524+
key = self._deserialize(
525+
self.config['key_deserializer'],
526+
tp.topic, msg.key)
527+
value = self._deserialize(
528+
self.config['value_deserializer'],
529+
tp.topic, msg.value)
519530
yield ConsumerRecord(tp.topic, tp.partition, offset,
520531
msg.timestamp, msg.timestamp_type,
521532
key, value, msg.crc,
@@ -541,16 +552,12 @@ def __next__(self):
541552
self._iterator = None
542553
raise
543554

544-
def _deserialize(self, msg):
545-
if self.config['key_deserializer']:
546-
key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable
547-
else:
548-
key = msg.key
549-
if self.config['value_deserializer']:
550-
value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable
551-
else:
552-
value = msg.value
553-
return key, value
555+
def _deserialize(self, f, topic, bytes_):
556+
if not f:
557+
return bytes_
558+
if isinstance(f, Deserializer):
559+
return f.deserialize(topic, bytes_)
560+
return f(bytes_)
554561

555562
def _send_offset_request(self, partition, timestamp):
556563
"""Fetch a single offset before the given timestamp for the partition.

kafka/producer/kafka.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ..metrics import MetricConfig, Metrics
1414
from ..partitioner.default import DefaultPartitioner
1515
from ..protocol.message import Message, MessageSet
16+
from ..serializer import Serializer
1617
from ..structs import TopicPartition
1718
from .future import FutureRecordMetadata, FutureProduceResult
1819
from .record_accumulator import AtomicInteger, RecordAccumulator
@@ -485,7 +486,12 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
485486
# available
486487
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
487488

488-
key_bytes, value_bytes = self._serialize(topic, key, value)
489+
key_bytes = self._serialize(
490+
self.config['key_serializer'],
491+
topic, key)
492+
value_bytes = self._serialize(
493+
self.config['value_serializer'],
494+
topic, value)
489495
partition = self._partition(topic, partition, key, value,
490496
key_bytes, value_bytes)
491497

@@ -606,17 +612,12 @@ def _wait_on_metadata(self, topic, max_wait):
606612
else:
607613
log.debug("_wait_on_metadata woke after %s secs.", elapsed)
608614

609-
def _serialize(self, topic, key, value):
610-
# pylint: disable-msg=not-callable
611-
if self.config['key_serializer']:
612-
serialized_key = self.config['key_serializer'](key)
613-
else:
614-
serialized_key = key
615-
if self.config['value_serializer']:
616-
serialized_value = self.config['value_serializer'](value)
617-
else:
618-
serialized_value = value
619-
return serialized_key, serialized_value
615+
def _serialize(self, f, topic, data):
616+
if not f:
617+
return data
618+
if isinstance(f, Serializer):
619+
return f.serialize(topic, data)
620+
return f(data)
620621

621622
def _partition(self, topic, partition, key, value,
622623
serialized_key, serialized_value):

kafka/serializer/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from __future__ import absolute_import
2+
3+
from .abstract import Serializer, Deserializer

kafka/serializer/abstract.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import absolute_import
2+
3+
import abc
4+
5+
6+
class Serializer(object):
7+
__meta__ = abc.ABCMeta
8+
9+
def __init__(self, **config):
10+
pass
11+
12+
@abc.abstractmethod
13+
def serialize(self, topic, value):
14+
pass
15+
16+
def close(self):
17+
pass
18+
19+
20+
class Deserializer(object):
21+
__meta__ = abc.ABCMeta
22+
23+
def __init__(self, **config):
24+
pass
25+
26+
@abc.abstractmethod
27+
def deserialize(self, topic, bytes_):
28+
pass
29+
30+
def close(self):
31+
pass

0 commit comments

Comments
 (0)