From 81b27e1c99c26de7ed766f92ecb0b6a0b114fcb6 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Thu, 30 Oct 2025 14:28:39 +0100 Subject: [PATCH 1/2] Add watermarks bootstrapping --- quixstreams/app.py | 8 +- quixstreams/processing/watermarking.py | 180 ++++++++++++++++- .../test_processing/test_watermarking.py | 181 ++++++++++++++++++ 3 files changed, 364 insertions(+), 5 deletions(-) create mode 100644 tests/test_quixstreams/test_processing/test_watermarking.py diff --git a/quixstreams/app.py b/quixstreams/app.py index 0a6f14706..7ffe254c2 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -1111,12 +1111,12 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]): ] # TODO: The set is used because the watermark tp can already be present in the "topic_partitions" # because we use `subscribe()` earlier. Fix the mess later. - # TODO: Also, how to avoid reading the whole WM topic on each restart? - # We really need only the most recent data - # Is it fine to read it from the end? The active partitions must still publish something. - # Or should we commit it? self._consumer.assign(list(set(topic_partitions + watermarks_partitions))) + # Bootstrap watermarks by progressively reading the watermarks topic + # This uses an exponential backoff strategy to minimize startup time + self._watermark_manager.bootstrap_watermarks(self._consumer) + # Pause changelog topic+partitions immediately after assignment changelog_topics = {t.name for t in self._topic_manager.changelog_topics_list} changelog_tps = [tp for tp in topic_partitions if tp.topic in changelog_topics] diff --git a/quixstreams/processing/watermarking.py b/quixstreams/processing/watermarking.py index b777af432..b7ea50604 100644 --- a/quixstreams/processing/watermarking.py +++ b/quixstreams/processing/watermarking.py @@ -1,13 +1,19 @@ import logging from time import monotonic -from typing import Optional, TypedDict +from typing import TYPE_CHECKING, Optional, TypedDict + +from confluent_kafka import TopicPartition from quixstreams.internal_producer import InternalProducer +from quixstreams.kafka.consumer import raise_for_msg_error from quixstreams.models import Topic from quixstreams.models.topics.manager import TopicManager from quixstreams.utils.format import format_timestamp from quixstreams.utils.json import dumps +if TYPE_CHECKING: + from quixstreams.kafka import BaseConsumer + logger = logging.getLogger(__name__) __all__ = ("WatermarkManager", "WatermarkMessage") @@ -155,3 +161,175 @@ def _get_watermark(self) -> int: if watermarks := self._watermarks.values(): watermark = min(watermarks) return watermark + + def bootstrap_watermarks(self, consumer: "BaseConsumer") -> None: + """ + Bootstrap watermarks by reading the watermarks topic progressively. + + This method uses an exponential backoff strategy: + 1. Try to read N messages from the end of the topic + 2. If not all topic-partitions are found, seek further back exponentially + 3. Continue until all TPs have watermarks or the beginning is reached + + :param consumer: The Kafka consumer to use for reading watermarks + """ + watermarks_topic_name = self.watermarks_topic.name + watermarks_partition = 0 # Watermarks topic always has 1 partition + + # Get the expected topic-partitions that need watermarks + expected_tps = set(self._watermarks.keys()) + if not expected_tps: + logger.info("No topic-partitions to bootstrap watermarks for") + return + + logger.info( + f"Bootstrapping watermarks for {len(expected_tps)} topic-partitions " + f"from topic '{watermarks_topic_name}'. Expected TPs: {expected_tps}" + ) + + # Get the high watermark (end offset) of the watermarks topic + tp = TopicPartition(watermarks_topic_name, watermarks_partition) + logger.debug(f"Getting watermark offsets for {watermarks_topic_name}...") + try: + _, high_offset = consumer.get_watermark_offsets(tp, timeout=5.0) + logger.debug(f"Watermarks topic high offset: {high_offset}") + except Exception as e: + # If we can't get watermark offsets, the topic might not be ready yet + # Log a warning but allow the application to start with -1 watermarks + logger.warning( + f"Failed to get watermark offsets for topic {watermarks_topic_name}: {e}. " + f"Watermarks will start at -1 and be updated as messages arrive." + ) + return + + if high_offset == 0: + logger.info("Watermarks topic is empty, no bootstrapping needed") + return + + # Progressive search parameters + initial_lookback = 100 # Start by looking at last 100 messages + lookback_step = min(initial_lookback, high_offset) + found_tps: set[tuple[str, int]] = set() + seek_offset = max(0, high_offset - lookback_step) + + iteration_count = 0 + max_iterations = 20 # Safety limit to prevent infinite loops + while found_tps != expected_tps: + iteration_count += 1 + if iteration_count > max_iterations: + missing_tps = expected_tps - found_tps + raise RuntimeError( + f"Bootstrap failed: exceeded {max_iterations} iterations. " + f"Found {len(found_tps)}/{len(expected_tps)} topic-partitions. " + f"Missing: {missing_tps}. This suggests a bug in the bootstrap logic." + ) + logger.info( + f"Bootstrap iteration {iteration_count}: seeking to offset {seek_offset} " + f"(lookback_step={lookback_step}, found {len(found_tps)}/{len(expected_tps)} TPs)" + ) + + # Seek to the calculated position + tp_with_offset = TopicPartition( + watermarks_topic_name, watermarks_partition, seek_offset + ) + try: + consumer.seek(tp_with_offset) + logger.debug(f"Seeked to offset {seek_offset}") + except Exception as e: + logger.error(f"Failed to seek to offset {seek_offset}: {e}") + raise + + # Read messages from seek_offset towards previous seek_offset + # or until all TPs are found + messages_read = 0 + max_messages_to_read = lookback_step + + # Timeout for this specific seek iteration (30 seconds) + iteration_timeout = 30.0 + iteration_start_time = monotonic() + consecutive_poll_timeouts = 0 + max_consecutive_poll_timeouts = 5 # Stop after 5 consecutive empty polls + + while messages_read < max_messages_to_read: + # Check if this iteration has timed out + if monotonic() - iteration_start_time > iteration_timeout: + missing_tps = expected_tps - found_tps + raise TimeoutError( + f"Bootstrap failed: polling timeout after {iteration_timeout}s for seek offset {seek_offset}. " + f"Found {len(found_tps)}/{len(expected_tps)} topic-partitions. " + f"Missing: {missing_tps}. Cannot start application without complete watermark state." + ) + + msg = consumer.poll(timeout=1.0) + if msg is None: + consecutive_poll_timeouts += 1 + # If we've had many consecutive timeouts, assume we've read all available messages + # in this range and move to the next iteration + if consecutive_poll_timeouts >= max_consecutive_poll_timeouts: + logger.info( + f"No more messages available after {consecutive_poll_timeouts} empty polls at offset {seek_offset}, " + f"moving to next iteration (read {messages_read}/{max_messages_to_read} messages)" + ) + break + continue + + # Reset consecutive timeout counter when we get a message + consecutive_poll_timeouts = 0 + + # Skip messages from other topics (shouldn't happen but be safe) + if msg.topic() != watermarks_topic_name: + continue + + messages_read += 1 + + # Deserialize and process the watermark message + try: + # Raise if message has an error + msg = raise_for_msg_error(msg) + watermark_msg = self.watermarks_topic.deserialize(msg).value + tp_key = (watermark_msg["topic"], watermark_msg["partition"]) + + # Only track if it's an expected TP + if tp_key in expected_tps: + timestamp = watermark_msg["timestamp"] + # Update the watermark (use max to handle out-of-order reads) + current = self._watermarks.get(tp_key, -1) + self._watermarks[tp_key] = max(current, timestamp) + found_tps.add(tp_key) + + logger.debug( + f"Bootstrapped watermark for {watermark_msg['topic']}[{watermark_msg['partition']}]: " + f"{format_timestamp(timestamp)}" + ) + + # Stop if we've found all TPs + if found_tps == expected_tps: + logger.info( + f"Successfully bootstrapped all {len(expected_tps)} topic-partitions " + f"after reading {messages_read} messages" + ) + return + + except Exception as e: + logger.warning(f"Failed to deserialize watermark message: {e}") + continue + + # If we've read everything and still missing TPs, expand lookback exponentially + if found_tps != expected_tps: + if seek_offset == 0: + # We've read the entire topic from the beginning + missing_tps = expected_tps - found_tps + logger.warning( + f"Reached beginning of watermarks topic but {len(missing_tps)} " + f"topic-partitions still have no watermarks: {missing_tps}. " + f"They will remain at -1 until new watermarks arrive." + ) + return + + # Double the step and seek further back from current position + lookback_step = min(lookback_step * 2, seek_offset) + seek_offset = max(0, seek_offset - lookback_step) + + logger.info( + f"Finished bootstrapping watermarks: found {len(found_tps)}/{len(expected_tps)} topic-partitions" + ) diff --git a/tests/test_quixstreams/test_processing/test_watermarking.py b/tests/test_quixstreams/test_processing/test_watermarking.py new file mode 100644 index 000000000..0e123466c --- /dev/null +++ b/tests/test_quixstreams/test_processing/test_watermarking.py @@ -0,0 +1,181 @@ +from unittest.mock import Mock + +from quixstreams.models import Topic, TopicConfig +from quixstreams.processing.watermarking import WatermarkManager + + +class TestWatermarkBootstrap: + """ + Basic tests for watermark bootstrapping. + + Note: Full testing of the progressive search algorithm requires integration + tests with a real Kafka broker, as mocking the complex message polling and + deserialization flow is error-prone and doesn't provide meaningful coverage. + """ + + def test_bootstrap_watermarks_empty_topic(self, topic_manager_factory): + """ + Test that bootstrap handles an empty watermarks topic gracefully. + """ + topic_manager = topic_manager_factory() + producer = Mock() + wm_manager = WatermarkManager( + producer=producer, topic_manager=topic_manager, interval=1.0 + ) + + test_topic = Topic( + name="topic1", + value_deserializer="json", + create_config=TopicConfig(num_partitions=1, replication_factor=1), + ) + test_topic.broker_config = test_topic.create_config + wm_manager.set_topics([test_topic]) + + consumer = Mock() + consumer.get_watermark_offsets.return_value = (0, 0) # Empty topic + + wm_manager.bootstrap_watermarks(consumer) + + # Watermark should remain at -1 + assert wm_manager._watermarks[("topic1", 0)] == -1 + + # No seek should be called for empty topic + consumer.seek.assert_not_called() + + def test_bootstrap_watermarks_no_expected_tps(self, topic_manager_factory): + """ + Test that bootstrap handles the case where no topic-partitions are expected. + """ + topic_manager = topic_manager_factory() + producer = Mock() + wm_manager = WatermarkManager( + producer=producer, topic_manager=topic_manager, interval=1.0 + ) + + # Don't set any topics - no expected TPs + consumer = Mock() + + wm_manager.bootstrap_watermarks(consumer) + + # Should exit early without calling get_watermark_offsets + consumer.get_watermark_offsets.assert_not_called() + + def test_bootstrap_watermarks_exponential_backoff(self, topic_manager_factory): + """ + Test that bootstrap uses exponential backoff when not all TPs are found. + + This test verifies true exponential backoff WITHOUT re-reading: + 1. Initial seek: offset 900 (1000 - 100), read 100 messages to offset 1000 + 2. If not all TPs found, seek back: offset 700 (900 - 200), read 200 messages to offset 900 + 3. If still not found, seek back: offset 300 (700 - 400), read 400 messages to offset 700 + 4. Continues until all TPs found or offset 0 is reached + + Key: Each iteration seeks BACK from the previous position, not from high_offset. + This avoids re-reading the same messages multiple times. + """ + topic_manager = topic_manager_factory() + producer = Mock() + wm_manager = WatermarkManager( + producer=producer, topic_manager=topic_manager, interval=1.0 + ) + + # Set up 3 topic-partitions + test_topic = Topic( + name="topic1", + value_deserializer="json", + create_config=TopicConfig(num_partitions=3, replication_factor=1), + ) + test_topic.broker_config = test_topic.create_config + wm_manager.set_topics([test_topic]) + + consumer = Mock() + consumer.get_watermark_offsets.return_value = (0, 1000) + + # Track which iteration we're in + poll_count = [0] + found_partitions = set() + + def mock_poll(timeout): + poll_count[0] += 1 + + # First iteration: seeking to 900, reading toward 1000 + # Return messages for partitions 0 and 1 only + if consumer.seek.call_count == 1: + if poll_count[0] <= 100: + # Return watermarks for partitions 0 and 1 + partition = 0 if poll_count[0] % 2 == 0 else 1 + if partition not in found_partitions: + found_partitions.add(partition) + return create_mock_watermark_message( + wm_manager.watermarks_topic, + "topic1", + partition, + 1000 + poll_count[0], + ) + else: + return None # Timeout to trigger next iteration + + # Second iteration: seeking to 700 (900 - 200), reading toward 900 + # Now include partition 2 + elif consumer.seek.call_count == 2: + # Return watermark for partition 2 + if poll_count[0] % 3 == 0: + found_partitions.add(2) + return create_mock_watermark_message( + wm_manager.watermarks_topic, "topic1", 2, 3000 + ) + return None # Return None to speed up the loop + + return None + + consumer.poll.side_effect = mock_poll + + # Run bootstrap + wm_manager.bootstrap_watermarks(consumer) + + # Verify exponential backoff happened + assert consumer.seek.call_count >= 2, "Should have seeked at least twice" + + # Verify seek offsets show TRUE exponential backoff (seeking backwards, not from high_offset) + seek_calls = consumer.seek.call_args_list + first_seek = seek_calls[0][0][0] + second_seek = seek_calls[1][0][0] + + # First seek: offset = 1000 - 100 = 900 + assert ( + first_seek.offset == 900 + ), f"First seek should be at 900, got {first_seek.offset}" + + # Second seek: offset = 900 - 200 = 700 (seeking back from previous position) + assert ( + second_seek.offset == 700 + ), f"Second seek should be at 700 (900 - 200), got {second_seek.offset}" + + # All watermarks should be set + assert wm_manager._watermarks[("topic1", 0)] > -1 + assert wm_manager._watermarks[("topic1", 1)] > -1 + assert wm_manager._watermarks[("topic1", 2)] > -1 + + +def create_mock_watermark_message(watermarks_topic, topic, partition, timestamp): + """ + Helper to create a properly mocked Kafka message. + """ + msg = Mock() + msg.error.return_value = None + msg.topic.return_value = watermarks_topic.name + msg.partition.return_value = 0 + msg.offset.return_value = 1 + + # Create a mock Row that will be returned by deserialize + mock_row = Mock() + mock_row.value = { + "topic": topic, + "partition": partition, + "timestamp": timestamp, + } + + # Mock the deserialize method to return our row + watermarks_topic.deserialize = Mock(return_value=mock_row) + + return msg From 28b410035fa427759096a7b7830921fb6aa5d906 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Fri, 31 Oct 2025 12:22:40 +0100 Subject: [PATCH 2/2] Release v4.0.0a3 --- quixstreams/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quixstreams/__init__.py b/quixstreams/__init__.py index f5e4f1891..414600a0c 100644 --- a/quixstreams/__init__.py +++ b/quixstreams/__init__.py @@ -5,4 +5,4 @@ __all__ = ["Application", "message_context", "MessageContext", "State"] -__version__ = "4.0.0a2" +__version__ = "4.0.0a3"