Skip to content

Commit 4acb7f4

Browse files
laz2dpkp
authored andcommitted
Add metadata update to pattern subscribing (#915)
1 parent 0e55d99 commit 4acb7f4

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

kafka/consumer/group.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
772772
if pattern is not None:
773773
self._client.cluster.need_all_topic_metadata = True
774774
self._client.set_topics([])
775+
self._client.cluster.request_update()
775776
log.debug("Subscribed to topic pattern: %s", pattern)
776777
else:
777778
self._client.cluster.need_all_topic_metadata = False

test/test_client_async.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from kafka.protocol.metadata import MetadataResponse, MetadataRequest
1818
from kafka.protocol.produce import ProduceRequest
1919
from kafka.structs import BrokerMetadata
20+
from kafka.cluster import ClusterMetadata
21+
from kafka.future import Future
2022

2123

2224
@pytest.fixture
@@ -285,8 +287,30 @@ def test_least_loaded_node():
285287
pass
286288

287289

288-
def test_set_topics():
289-
pass
290+
def test_set_topics(mocker):
291+
request_update = mocker.patch.object(ClusterMetadata, 'request_update')
292+
request_update.side_effect = lambda: Future()
293+
cli = KafkaClient(api_version=(0, 10))
294+
295+
# replace 'empty' with 'non empty'
296+
request_update.reset_mock()
297+
fut = cli.set_topics(['t1', 't2'])
298+
assert not fut.is_done
299+
request_update.assert_called_with()
300+
301+
# replace 'non empty' with 'same'
302+
request_update.reset_mock()
303+
fut = cli.set_topics(['t1', 't2'])
304+
assert fut.is_done
305+
assert fut.value == set(['t1', 't2'])
306+
request_update.assert_not_called()
307+
308+
# replace 'non empty' with 'empty'
309+
request_update.reset_mock()
310+
fut = cli.set_topics([])
311+
assert fut.is_done
312+
assert fut.value == set()
313+
request_update.assert_not_called()
290314

291315

292316
@pytest.fixture

0 commit comments

Comments
 (0)