|
2 | 2 | from typing import Dict |
3 | 3 | from unittest import mock |
4 | 4 |
|
5 | | -from confluent_kafka import TopicPartition |
| 5 | +from confluent_kafka import TopicPartition, KafkaException, KafkaError |
6 | 6 | from streaming_data_types.fbschemas.forwarder_config_update_fc00.UpdateType import ( |
7 | 7 | UpdateType, |
8 | 8 | ) |
@@ -49,7 +49,20 @@ def save_configuration(self, update_handlers: Dict): |
49 | 49 | def retrieve_configuration(self): |
50 | 50 | """Retrieve last valid configuration buffer.""" |
51 | 51 | topic = TopicPartition(self._topic, partition=0) |
52 | | - low_offset, high_offset = self._consumer.get_watermark_offsets(topic) |
| 52 | + try: |
| 53 | + low_offset, high_offset = self._consumer.get_watermark_offsets(topic) |
| 54 | + except KafkaException as e: |
| 55 | + kafka_error = e.args[0] |
| 56 | + |
| 57 | + if kafka_error == KafkaError._UNKNOWN_PARTITION: |
| 58 | + # Topic doesn't exist yet - create it (assuming auto-create is enabled) |
| 59 | + message = serialise_fc00(UpdateType.REMOVEALL, []) |
| 60 | + self._producer.produce(self._topic, bytes(message), int(time.time() * 1000)) |
| 61 | + self._producer.flush() |
| 62 | + low_offset, high_offset = self._consumer.get_watermark_offsets(topic) |
| 63 | + else: |
| 64 | + raise |
| 65 | + |
53 | 66 | # Set offset to current_offset to start retrieving from last message |
54 | 67 | current_offset = high_offset - 1 |
55 | 68 | topic.offset = current_offset |
|
0 commit comments