Skip to content

Commit 01c4b44

Browse files
author
chinmaychandak
committed
2 parents 0e24366 + 719e1b3 commit 01c4b44

File tree

1 file changed

+36
-1
lines changed

1 file changed

+36
-1
lines changed

streamz/tests/test_kafka.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import requests
77
import shlex
88
import subprocess
9+
import time
910
from tornado import gen
1011

1112
from ..core import Stream
@@ -245,7 +246,41 @@ def test_kafka_batch_checkpointing():
245246
out3 = stream3.sink_to_list()
246247
stream3.start()
247248
wait_for(lambda: any(out3) and out3[-1][0] == b'value-10' and out3[-1][-1] == b'value-19', 10, period=0.2)
248-
stream3.upstream.stopped = True
249+
250+
for i in range(20, 25):
251+
kafka.produce(TOPIC, b'value-%d' % i)
252+
kafka.flush()
253+
time.sleep(5)
254+
checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC)
255+
assert len(checkpoints_list) == 3
256+
257+
for i in range(25, 30):
258+
kafka.produce(TOPIC, b'value-%d' % i)
259+
kafka.flush()
260+
time.sleep(5)
261+
checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC)
262+
assert len(checkpoints_list) == 4
263+
264+
for i in range(30, 35):
265+
kafka.produce(TOPIC, b'value-%d' % i)
266+
kafka.flush()
267+
time.sleep(5)
268+
checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC)
269+
assert len(checkpoints_list) == 5
270+
271+
for i in range(35, 40):
272+
kafka.produce(TOPIC, b'value-%d' % i)
273+
kafka.flush()
274+
time.sleep(5)
275+
checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC)
276+
assert len(checkpoints_list) == 5
277+
278+
for i in range(40, 45):
279+
kafka.produce(TOPIC, b'value-%d' % i)
280+
kafka.flush()
281+
time.sleep(5)
282+
checkpoints_list = os.listdir('custreamz_checkpoints/' + TOPIC)
283+
assert len(checkpoints_list) == 5
249284

250285

251286
@gen_cluster(client=True, timeout=60)

0 commit comments

Comments
 (0)