Skip to content

Commit 12e9be2

Browse files
dangusevgwaramadze
authored andcommitted
Recovery: remove the "processed_offsets" mechanism
1 parent b7cccaf commit 12e9be2

File tree

19 files changed

+77
-396
lines changed

19 files changed

+77
-396
lines changed

quixstreams/app.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import time
77
import uuid
88
import warnings
9-
from collections import defaultdict
109
from pathlib import Path
1110
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast
1211

@@ -1036,18 +1035,6 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10361035
non_changelog_tps = [
10371036
tp for tp in topic_partitions if tp.topic in non_changelog_topics
10381037
]
1039-
committed_tps = self._consumer.committed(
1040-
partitions=non_changelog_tps, timeout=30
1041-
)
1042-
committed_offsets: dict[int, dict[str, int]] = defaultdict(dict)
1043-
for tp in committed_tps:
1044-
if tp.error:
1045-
raise RuntimeError(
1046-
f"Failed to get committed offsets for "
1047-
f'"{tp.topic}[{tp.partition}]" from the broker: {tp.error}'
1048-
)
1049-
committed_offsets[tp.partition][tp.topic] = tp.offset
1050-
10511038
# Match the assigned TP with a stream ID via DataFrameRegistry
10521039
for tp in non_changelog_tps:
10531040
stream_ids = self._dataframe_registry.get_stream_ids(
@@ -1056,9 +1043,7 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
10561043
# Assign store partitions for the given stream ids
10571044
for stream_id in stream_ids:
10581045
self._state_manager.on_partition_assign(
1059-
stream_id=stream_id,
1060-
partition=tp.partition,
1061-
committed_offsets=committed_offsets[tp.partition],
1046+
stream_id=stream_id, partition=tp.partition
10621047
)
10631048
self._run_tracker.timeout_refresh()
10641049

quixstreams/checkpointing/checkpoint.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,20 +228,12 @@ def commit(self):
228228
partition,
229229
store_name,
230230
), transaction in self._store_transactions.items():
231-
topics = self._dataframe_registry.get_topics_for_stream_id(
232-
stream_id=stream_id
233-
)
234-
processed_offsets = {
235-
topic: offset
236-
for (topic, partition_), offset in self._tp_offsets.items()
237-
if topic in topics and partition_ == partition
238-
}
239231
if transaction.failed:
240232
raise StoreTransactionFailed(
241233
f'Detected a failed transaction for store "{store_name}", '
242234
f"the checkpoint is aborted"
243235
)
244-
transaction.prepare(processed_offsets=processed_offsets)
236+
transaction.prepare()
245237

246238
# Step 3. Flush producer to trigger all delivery callbacks and ensure that
247239
# all messages are produced

quixstreams/sources/base/manager.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,7 @@ def _recover_state(self, source: StatefulSource) -> StorePartition:
156156
self._consumer.assign([changelog_tp])
157157

158158
store_partitions = state_manager.on_partition_assign(
159-
stream_id=None,
160-
partition=source.assigned_store_partition,
161-
committed_offsets={},
159+
stream_id=None, partition=source.assigned_store_partition
162160
)
163161

164162
if state_manager.recovery_required:

quixstreams/state/base/transaction.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@
2525
)
2626
from quixstreams.state.metadata import (
2727
CHANGELOG_CF_MESSAGE_HEADER,
28-
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER,
2928
DEFAULT_PREFIX,
3029
SEPARATOR,
3130
Marker,
3231
)
3332
from quixstreams.state.serialization import DumpsFunc, LoadsFunc, deserialize, serialize
34-
from quixstreams.utils.json import dumps as json_dumps
3533

3634
from .state import State, TransactionState
3735

@@ -477,7 +475,7 @@ def exists(self, key: K, prefix: bytes, cf_name: str = "default") -> bool:
477475
return self._partition.exists(key_serialized, cf_name=cf_name)
478476

479477
@validate_transaction_status(PartitionTransactionStatus.STARTED)
480-
def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
478+
def prepare(self) -> None:
481479
"""
482480
Produce changelog messages to the changelog topic for all changes accumulated
483481
in this transaction and prepare transaction to flush its state to the state
@@ -488,18 +486,16 @@ def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
488486
489487
If changelog is disabled for this application, no updates will be produced
490488
to the changelog topic.
491-
492-
:param processed_offsets: the dict with <topic: offset> of the latest processed message
493489
"""
494490

495491
try:
496-
self._prepare(processed_offsets=processed_offsets)
492+
self._prepare()
497493
self._status = PartitionTransactionStatus.PREPARED
498494
except Exception:
499495
self._status = PartitionTransactionStatus.FAILED
500496
raise
501497

502-
def _prepare(self, processed_offsets: Optional[dict[str, int]]):
498+
def _prepare(self):
503499
if self._changelog_producer is None:
504500
return
505501

@@ -508,13 +504,11 @@ def _prepare(self, processed_offsets: Optional[dict[str, int]]):
508504
f'topic_name="{self._changelog_producer.changelog_name}" '
509505
f"partition={self._changelog_producer.partition}"
510506
)
511-
source_tp_offset_header = json_dumps(processed_offsets)
512507
column_families = self._update_cache.get_column_families()
513508

514509
for cf_name in column_families:
515510
headers: Headers = {
516511
CHANGELOG_CF_MESSAGE_HEADER: cf_name,
517-
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER: source_tp_offset_header,
518512
}
519513

520514
updates = self._update_cache.get_updates(cf_name=cf_name)

quixstreams/state/manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,16 +295,13 @@ def on_partition_assign(
295295
self,
296296
stream_id: Optional[str],
297297
partition: int,
298-
committed_offsets: dict[str, int],
299298
) -> Dict[str, StorePartition]:
300299
"""
301300
Assign store partitions for each registered store for the given stream_id
302301
and partition number, and return a list of assigned `StorePartition` objects.
303302
304303
:param stream_id: stream id
305304
:param partition: Kafka topic partition number
306-
:param committed_offsets: a dict with latest committed offsets
307-
of all assigned topics for this partition number.
308305
:return: list of assigned `StorePartition`
309306
"""
310307
store_partitions = {}
@@ -315,7 +312,6 @@ def on_partition_assign(
315312
self._recovery_manager.assign_partition(
316313
topic=stream_id,
317314
partition=partition,
318-
committed_offsets=committed_offsets,
319315
store_partitions=store_partitions,
320316
)
321317
return store_partitions

quixstreams/state/metadata.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
SEPARATOR_LENGTH = len(SEPARATOR)
55

66
CHANGELOG_CF_MESSAGE_HEADER = "__column_family__"
7-
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER = "__processed_tp_offsets__"
87
METADATA_CF_NAME = "__metadata__"
98

109
DEFAULT_PREFIX = b""

quixstreams/state/recovery.py

Lines changed: 15 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,13 @@
1313
from quixstreams.models.types import Headers
1414
from quixstreams.state.base import StorePartition
1515
from quixstreams.utils.dicts import dict_values
16-
from quixstreams.utils.json import loads as json_loads
1716

1817
from .exceptions import (
1918
ChangelogTopicPartitionNotAssigned,
2019
ColumnFamilyHeaderMissing,
2120
InvalidStoreChangelogOffset,
2221
)
23-
from .metadata import (
24-
CHANGELOG_CF_MESSAGE_HEADER,
25-
CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER,
26-
)
22+
from .metadata import CHANGELOG_CF_MESSAGE_HEADER
2723

2824
logger = logging.getLogger(__name__)
2925

@@ -50,7 +46,6 @@ def __init__(
5046
changelog_name: str,
5147
partition_num: int,
5248
store_partition: StorePartition,
53-
committed_offsets: dict[str, int],
5449
lowwater: int,
5550
highwater: int,
5651
):
@@ -59,7 +54,6 @@ def __init__(
5954
self._store_partition = store_partition
6055
self._changelog_lowwater = lowwater
6156
self._changelog_highwater = highwater
62-
self._committed_offsets = committed_offsets
6357
self._recovery_consume_position: Optional[int] = None
6458
self._initial_offset: Optional[int] = None
6559

@@ -154,40 +148,23 @@ def recover_from_changelog_message(
154148
f"Header '{CHANGELOG_CF_MESSAGE_HEADER}' missing from changelog message"
155149
)
156150

157-
# Parse the processed topic-partition-offset info from the changelog message
158-
# headers to determine whether the update should be applied or skipped.
159-
# It can be empty if the message was produced by the older version of the lib.
160-
processed_offsets = json_loads(
161-
headers.get(CHANGELOG_PROCESSED_OFFSETS_MESSAGE_HEADER, b"null")
162-
)
163-
if processed_offsets is None or self._should_apply_changelog(
164-
processed_offsets=processed_offsets
165-
):
166-
key = changelog_message.key()
167-
if not isinstance(key, bytes):
168-
raise TypeError(
169-
f'Invalid changelog key type {type(key)}, expected "bytes"'
170-
)
171-
172-
value = changelog_message.value()
173-
if not isinstance(value, (bytes, _NoneType)):
174-
raise TypeError(
175-
f'Invalid changelog value type {type(value)}, expected "bytes"'
176-
)
151+
key = changelog_message.key()
152+
if not isinstance(key, bytes):
153+
raise TypeError(f'Invalid changelog key type {type(key)}, expected "bytes"')
177154

178-
self._store_partition.recover_from_changelog_message(
179-
cf_name=cf_name,
180-
key=key,
181-
value=value,
182-
offset=changelog_message.offset(),
183-
)
184-
else:
185-
# Even if the changelog update is skipped, roll the changelog offset
186-
# to move forward within the changelog topic
187-
self._store_partition.write_changelog_offset(
188-
offset=changelog_message.offset(),
155+
value = changelog_message.value()
156+
if not isinstance(value, (bytes, _NoneType)):
157+
raise TypeError(
158+
f'Invalid changelog value type {type(value)}, expected "bytes"'
189159
)
190160

161+
self._store_partition.recover_from_changelog_message(
162+
cf_name=cf_name,
163+
key=key,
164+
value=value,
165+
offset=changelog_message.offset(),
166+
)
167+
191168
def set_recovery_consume_position(self, offset: int):
192169
"""
193170
Update the recovery partition with the consumer's position (whenever
@@ -199,26 +176,6 @@ def set_recovery_consume_position(self, offset: int):
199176
"""
200177
self._recovery_consume_position = offset
201178

202-
def _should_apply_changelog(self, processed_offsets: dict[str, int]) -> bool:
203-
"""
204-
Determine whether the changelog update should be skipped.
205-
206-
:param processed_offsets: a dict with processed offsets
207-
from the changelog message header processed offset.
208-
209-
:return: True if update should be applied, else False.
210-
"""
211-
committed_offsets = self._committed_offsets
212-
for topic, processed_offset in processed_offsets.items():
213-
# Skip recovering from the message if its processed offset is ahead of the
214-
# current committed offset.
215-
# This is a best-effort to recover to a consistent state
216-
# if the checkpointing code produced the changelog messages
217-
# but failed to commit the source topic offset.
218-
if processed_offset >= committed_offsets[topic]:
219-
return False
220-
return True
221-
222179

223180
class ChangelogProducerFactory:
224181
"""
@@ -411,7 +368,6 @@ def _generate_recovery_partitions(
411368
topic_name: Optional[str],
412369
partition_num: int,
413370
store_partitions: Dict[str, StorePartition],
414-
committed_offsets: dict[str, int],
415371
) -> List[RecoveryPartition]:
416372
partitions = []
417373
for store_name, store_partition in store_partitions.items():
@@ -432,7 +388,6 @@ def _generate_recovery_partitions(
432388
changelog_name=changelog_topic.name,
433389
partition_num=partition_num,
434390
store_partition=store_partition,
435-
committed_offsets=committed_offsets,
436391
lowwater=lowwater,
437392
highwater=highwater,
438393
)
@@ -443,7 +398,6 @@ def assign_partition(
443398
self,
444399
topic: Optional[str],
445400
partition: int,
446-
committed_offsets: dict[str, int],
447401
store_partitions: Dict[str, StorePartition],
448402
):
449403
"""
@@ -455,7 +409,6 @@ def assign_partition(
455409
topic_name=topic,
456410
partition_num=partition,
457411
store_partitions=store_partitions,
458-
committed_offsets=committed_offsets,
459412
)
460413

461414
assigned_tps = set(

quixstreams/state/rocksdb/timestamped.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,14 @@ def set_for_timestamp(self, timestamp: int, value: Any, prefix: Any) -> None:
171171
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
172172

173173
@validate_transaction_status(PartitionTransactionStatus.STARTED)
174-
def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
174+
def prepare(self) -> None:
175175
"""
176176
This method first calls `_expire()` to remove outdated entries based on
177177
their timestamps and grace periods, then calls the parent class's
178178
`prepare()` to prepare the transaction for flush.
179-
180-
:param processed_offsets: the dict with <topic: offset> of the latest processed message
181179
"""
182180
self._expire()
183-
super().prepare(processed_offsets=processed_offsets)
181+
super().prepare()
184182

185183
def _expire(self) -> None:
186184
"""

quixstreams/state/rocksdb/transaction.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,13 @@ def _get_items(
9595
return sorted(merged_items.items(), key=lambda kv: kv[0], reverse=backwards)
9696

9797
@validate_transaction_status(PartitionTransactionStatus.STARTED)
98-
def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
98+
def prepare(self) -> None:
9999
"""
100100
This method first persists the counter and then calls the parent class's
101101
`prepare()` to prepare the transaction for flush.
102-
103-
:param processed_offsets: the dict with <topic: offset> of the latest processed message
104102
"""
105103
self._persist_counter()
106-
super().prepare(processed_offsets=processed_offsets)
104+
super().prepare()
107105

108106
def _increment_counter(self) -> int:
109107
"""

quixstreams/state/types.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def prepared(self) -> bool:
232232
"""
233233
...
234234

235-
def prepare(self, processed_offsets: Optional[dict[str, int]]):
235+
def prepare(self):
236236
"""
237237
Produce changelog messages to the changelog topic for all changes accumulated
238238
in this transaction and prepare transcation to flush its state to the state
@@ -243,9 +243,6 @@ def prepare(self, processed_offsets: Optional[dict[str, int]]):
243243
244244
If changelog is disabled for this application, no updates will be produced
245245
to the changelog topic.
246-
247-
:param processed_offsets: the dict with <topic: offset> of
248-
the latest processed message in the current partition
249246
"""
250247

251248
def as_state(self, prefix: Any) -> WindowedState[K, V]: ...

0 commit comments

Comments
 (0)