Skip to content

Commit 186d480

Browse files
committed
KAFKA-5429 - Ignore produce response if batch was previously aborted
1 parent 92037ce commit 186d480

File tree

2 files changed

+90
-6
lines changed

2 files changed

+90
-6
lines changed

kafka/producer/record_accumulator.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66
import threading
77
import time
88

9+
try:
10+
# enum in stdlib as of py3.4
11+
from enum import IntEnum # pylint: disable=import-error
12+
except ImportError:
13+
# vendored backport module
14+
from kafka.vendor.enum34 import IntEnum
15+
916
import kafka.errors as Errors
1017
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
1118
from kafka.record.memory_records import MemoryRecordsBuilder
@@ -34,6 +41,12 @@ def get(self):
3441
return self._val
3542

3643

44+
class FinalState(IntEnum):
45+
ABORTED = 0
46+
FAILED = 1
47+
SUCCEEDED = 2
48+
49+
3750
class ProducerBatch(object):
3851
def __init__(self, tp, records, now=None):
3952
self.max_record_size = 0
@@ -47,6 +60,7 @@ def __init__(self, tp, records, now=None):
4760
self.topic_partition = tp
4861
self.produce_future = FutureProduceResult(tp)
4962
self._retry = False
63+
self._final_state = None
5064

5165
@property
5266
def record_count(self):
@@ -79,10 +93,29 @@ def try_append(self, timestamp_ms, key, value, headers, now=None):
7993
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
8094
return future
8195

82-
def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_offset=None):
83-
if self.produce_future.is_done:
84-
log.warning('Batch is already closed -- ignoring batch.done()')
96+
def abort(self, exception):
97+
"""Abort the batch and complete the future and callbacks."""
98+
if self._final_state is not None:
99+
raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state)
100+
self._final_state = FinalState.ABORTED
101+
102+
log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception)
103+
self._complete_future(-1, -1, exception)
104+
105+
def done(self, base_offset=None, timestamp_ms=None, exception=None):
106+
if self._final_state is None:
107+
self._final_state = FinalState.SUCCEEDED if exception is None else FinalState.FAILED
108+
elif self._final_state is FinalState.ABORTED:
109+
log.debug("ProduceResponse returned for %s after batch had already been aborted.", self.topic_partition)
85110
return
111+
else:
112+
raise Errors.IllegalStateError("Batch has already been completed in final state %s" % self._final_state)
113+
114+
self._complete_future(base_offset, timestamp_ms, exception)
115+
116+
def _complete_future(self, base_offset, timestamp_ms, exception):
117+
if self.produce_future.is_done:
118+
raise Errors.IllegalStateError('Batch is already closed!')
86119
elif exception is None:
87120
log.debug("Produced messages to topic-partition %s with base offset %s", self.topic_partition, base_offset)
88121
self.produce_future.success((base_offset, timestamp_ms))
@@ -588,7 +621,7 @@ def _abort_batches(self, error):
588621
with self._tp_locks[tp]:
589622
batch.records.close()
590623
self._batches[tp].remove(batch)
591-
batch.done(exception=error)
624+
batch.abort(error)
592625
self.deallocate(batch)
593626

594627
def abort_undrained_batches(self, error):
@@ -601,7 +634,7 @@ def abort_undrained_batches(self, error):
601634
batch.records.close()
602635
self._batches[tp].remove(batch)
603636
if aborted:
604-
batch.done(exception=error)
637+
batch.abort(error)
605638
self.deallocate(batch)
606639

607640
def close(self):

test/test_record_accumulator.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pytest
55
import io
66

7-
from kafka.errors import KafkaTimeoutError
7+
from kafka.errors import IllegalStateError, KafkaError, KafkaTimeoutError
88
from kafka.producer.future import FutureRecordMetadata, RecordMetadata
99
from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
1010
from kafka.record.memory_records import MemoryRecordsBuilder
@@ -72,3 +72,54 @@ def test_producer_batch_maybe_expire():
7272
assert future.is_done
7373
assert future.failed()
7474
assert isinstance(future.exception, KafkaTimeoutError)
75+
76+
def test_batch_abort():
77+
tp = TopicPartition('foo', 0)
78+
records = MemoryRecordsBuilder(
79+
magic=2, compression_type=0, batch_size=100000)
80+
batch = ProducerBatch(tp, records)
81+
future = batch.try_append(123, None, b'msg', [])
82+
83+
batch.abort(KafkaError())
84+
assert future.is_done
85+
86+
# subsequent completion should be ignored
87+
batch.done(500, 2342342341)
88+
batch.done(exception=KafkaError())
89+
90+
assert future.is_done
91+
with pytest.raises(KafkaError):
92+
future.get()
93+
94+
def test_batch_cannot_abort_twice():
95+
tp = TopicPartition('foo', 0)
96+
records = MemoryRecordsBuilder(
97+
magic=2, compression_type=0, batch_size=100000)
98+
batch = ProducerBatch(tp, records)
99+
future = batch.try_append(123, None, b'msg', [])
100+
101+
batch.abort(KafkaError())
102+
103+
with pytest.raises(IllegalStateError):
104+
batch.abort(KafkaError())
105+
106+
assert future.is_done
107+
with pytest.raises(KafkaError):
108+
future.get()
109+
110+
def test_batch_cannot_complete_twice():
111+
tp = TopicPartition('foo', 0)
112+
records = MemoryRecordsBuilder(
113+
magic=2, compression_type=0, batch_size=100000)
114+
batch = ProducerBatch(tp, records)
115+
future = batch.try_append(123, None, b'msg', [])
116+
117+
batch.done(500, 10, None)
118+
119+
with pytest.raises(IllegalStateError):
120+
batch.done(1000, 20, None)
121+
122+
record_metadata = future.get()
123+
124+
assert record_metadata.offset == 500
125+
assert record_metadata.timestamp == 10

0 commit comments

Comments
 (0)