Skip to content

Commit 89d0e24

Browse files
kardymondsCopilot
andauthored
YQ-4535 Streaming queries: add rescaling test (#28122)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent b48c432 commit 89d0e24

File tree

3 files changed

+111
-4
lines changed

3 files changed

+111
-4
lines changed

ydb/tests/fq/streaming/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def query_async(self, statement):
3232
return self.session_pool.execute_with_retries_async(statement)
3333

3434

35-
@pytest.fixture(scope="function")
35+
@pytest.fixture(scope="module")
3636
def kikimr(request):
3737

3838
class Kikimr:

ydb/tests/fq/streaming/test_streaming.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def test_restart_query(self, kikimr):
113113
self.init_topics(sourceName, partitions_count=10)
114114
self.create_source(kikimr, sourceName, False)
115115

116-
name = "query1"
116+
name = "test_restart_query"
117117
sql = R'''
118118
CREATE STREAMING QUERY `{query_name}` AS
119119
DO BEGIN
@@ -301,7 +301,7 @@ def test_json_errors(self, kikimr):
301301
self.init_topics(sourceName, partitions_count=10)
302302
self.create_source(kikimr, sourceName, True)
303303

304-
name = "query1"
304+
name = "test_json_errors"
305305
sql = R'''
306306
CREATE STREAMING QUERY `{query_name}` AS
307307
DO BEGIN
@@ -326,3 +326,61 @@ def test_json_errors(self, kikimr):
326326

327327
expected = ['hello1', 'hello2']
328328
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
329+
330+
def test_restart_query_by_rescaling(self, kikimr):
331+
sourceName = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
332+
self.init_topics(sourceName, partitions_count=10)
333+
self.create_source(kikimr, sourceName, True)
334+
335+
name = "test_restart_query_by_rescaling"
336+
sql = R'''
337+
CREATE STREAMING QUERY `{query_name}` AS
338+
DO BEGIN
339+
PRAGMA ydb.OverridePlanner = @@ [
340+
{{ "tx": 0, "stage": 0, "tasks": 2 }}
341+
] @@;
342+
$in = SELECT time FROM {source_name}.`{input_topic}`
343+
WITH (
344+
FORMAT="json_each_row",
345+
SCHEMA=(time String NOT NULL))
346+
WHERE time like "%time%";
347+
INSERT INTO `{source_name}`.`{output_topic}` SELECT time FROM $in;
348+
END DO;'''
349+
350+
query_id = "query_id" # TODO
351+
kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
352+
self.wait_completed_checkpoints(kikimr, query_id)
353+
354+
message_count = 20
355+
for i in range(message_count):
356+
self.write_stream(['{"time": "time to do it"}'], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8))))
357+
assert self.read_stream(message_count, topic_path=self.output_topic) == ["time to do it" for i in range(message_count)]
358+
self.wait_completed_checkpoints(kikimr, query_id)
359+
360+
logging.debug(f"stopping query {name}")
361+
kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);")
362+
363+
sql = R'''ALTER STREAMING QUERY `{query_name}` SET (
364+
RUN = TRUE,
365+
FORCE = TRUE
366+
) AS
367+
DO BEGIN
368+
PRAGMA ydb.OverridePlanner = @@ [
369+
{{ "tx": 0, "stage": 0, "tasks": 3 }}
370+
] @@;
371+
$in = SELECT time FROM {source_name}.`{input_topic}`
372+
WITH (
373+
FORMAT="json_each_row",
374+
SCHEMA=(time String NOT NULL))
375+
WHERE time like "%lunch%";
376+
INSERT INTO `{source_name}`.`{output_topic}` SELECT time FROM $in;
377+
END DO;'''
378+
379+
kikimr.YdbClient.query(sql.format(query_name=name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
380+
381+
message = '{"time": "time to lunch"}'
382+
for i in range(message_count):
383+
self.write_stream([message], topic_path=None, partition_key=(''.join(random.choices(string.digits, k=8))))
384+
assert self.read_stream(message_count, topic_path=self.output_topic) == ["time to lunch" for i in range(message_count)]
385+
386+
kikimr.YdbClient.query(f"ALTER STREAMING QUERY `{name}` SET (RUN = FALSE);")

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from ydb.tests.tools.fq_runner.kikimr_runner import TenantConfig
1616

1717
from ydb.tests.tools.datastreams_helpers.control_plane import list_read_rules
18-
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream, create_read_rule
18+
from ydb.tests.tools.datastreams_helpers.control_plane import create_stream, create_read_rule, delete_stream
1919
from ydb.tests.tools.datastreams_helpers.data_plane import read_stream, write_stream
2020
from ydb.tests.tools.fq_runner.fq_client import StreamingDisposition
2121

@@ -1233,3 +1233,52 @@ def test_json_errors(self, kikimr, client, use_binding):
12331233
assert time.time() < deadline, f"Waiting sensor ParsingErrors value failed, current count {count}"
12341234
time.sleep(1)
12351235
stop_yds_query(client, query_id)
1236+
1237+
@yq_v1
1238+
def test_delete_topic(self, kikimr, client):
1239+
self.init(client, "test_delete_topic")
1240+
1241+
sql = Rf'''
1242+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
1243+
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
1244+
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));'''
1245+
1246+
query_id = start_yds_query(kikimr, client, sql)
1247+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
1248+
1249+
data = [
1250+
'{"time": 101, "data": "hello1", "event": "event1"}',
1251+
'{"time": 102, "data": "hello2", "event": "event2"}',
1252+
'{"time": 103, "data": "hello3", "event": "event3"}',
1253+
]
1254+
1255+
self.write_stream(data)
1256+
expected = ['101', '102', '103']
1257+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
1258+
kikimr.compute_plane.wait_completed_checkpoints(
1259+
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
1260+
)
1261+
stop_yds_query(client, query_id)
1262+
1263+
delete_stream(self.input_topic)
1264+
create_stream(self.input_topic)
1265+
1266+
client.modify_query(
1267+
query_id,
1268+
"simple",
1269+
sql,
1270+
type=fq.QueryContent.QueryType.STREAMING,
1271+
state_load_mode=fq.StateLoadMode.EMPTY,
1272+
streaming_disposition=StreamingDisposition.from_last_checkpoint(),
1273+
)
1274+
1275+
data = [
1276+
'{"time": 101, "data": "hello1", "event": "event1"}',
1277+
'{"time": 102, "data": "hello2", "event": "event2"}',
1278+
'{"time": 103, "data": "hello3", "event": "event3"}',
1279+
'{"time": 104, "data": "hello4", "event": "event4"}',
1280+
]
1281+
1282+
self.write_stream(data)
1283+
expected = ['104']
1284+
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

0 commit comments

Comments
 (0)