Skip to content

Commit 91428ef

Browse files
authored
YQ-4502 Streaming: type tests and compatibility (#28799)
1 parent 800fcbe commit 91428ef

File tree

6 files changed

+365
-10
lines changed

6 files changed

+365
-10
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
# -*- coding: utf-8 -*-
2+
import logging
3+
import os
4+
import pytest
5+
import time
6+
7+
from ydb.tests.library.compatibility.fixtures import MixedClusterFixture, RestartToAnotherVersionFixture, RollingUpgradeAndDowngradeFixture
8+
from ydb.tests.library.harness.util import LogLevels
9+
from ydb.tests.library.test_meta import link_test_case
10+
from ydb.tests.oss.ydb_sdk_import import ydb
11+
from ydb.tests.tools.datastreams_helpers.data_plane import write_stream, read_stream
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class StreamingTestBase:
17+
def setup_cluster(self):
18+
logger.debug(f"setup_cluster, versions {self.versions}")
19+
20+
if min(self.versions) < (25, 4):
21+
logger.debug("skip test, only available since 25-4")
22+
pytest.skip("Only available since 25-4")
23+
24+
os.environ["YDB_TEST_DEFAULT_CHECKPOINTING_PERIOD_MS"] = "200"
25+
os.environ["YDB_TEST_LEASE_DURATION_SEC"] = "15"
26+
yield from super().setup_cluster(
27+
extra_feature_flags={
28+
"enable_external_data_sources": True,
29+
"enable_streaming_queries": True
30+
},
31+
additional_log_configs={
32+
'KQP_COMPUTE': LogLevels.TRACE,
33+
'STREAMS_CHECKPOINT_COORDINATOR': LogLevels.TRACE,
34+
'STREAMS_STORAGE_SERVICE': LogLevels.TRACE,
35+
'FQ_ROW_DISPATCHER': LogLevels.TRACE,
36+
'KQP_PROXY': LogLevels.DEBUG,
37+
'KQP_EXECUTOR': LogLevels.DEBUG},
38+
)
39+
40+
def create_topics(self):
41+
logger.debug("create_topics")
42+
self.input_topic = 'streaming_recipe/input_topic'
43+
self.output_topic = 'streaming_recipe/output_topic'
44+
self.consumer_name = 'consumer_name'
45+
with ydb.QuerySessionPool(self.driver) as session_pool:
46+
query = f"""
47+
CREATE TOPIC `{self.input_topic}`;
48+
CREATE TOPIC `{self.output_topic}` (CONSUMER {self.consumer_name});
49+
"""
50+
session_pool.execute_with_retries(query)
51+
52+
def create_external_data_source(self):
53+
logger.debug("create_external_data_source")
54+
endpoint = f"localhost:{self.cluster.nodes[1].port}"
55+
with ydb.QuerySessionPool(self.driver) as session_pool:
56+
query = f"""
57+
CREATE EXTERNAL DATA SOURCE source_name WITH (
58+
SOURCE_TYPE="Ydb",
59+
LOCATION="{endpoint}",
60+
DATABASE_NAME="{self.database_path}",
61+
SHARED_READING="false",
62+
AUTH_METHOD="NONE");
63+
"""
64+
session_pool.execute_with_retries(query)
65+
66+
def create_streaming_query(self):
67+
logger.debug("create_streaming_query")
68+
with ydb.QuerySessionPool(self.driver) as session_pool:
69+
query = f"""
70+
CREATE STREAMING QUERY `my_queries/query_name` AS DO BEGIN
71+
$input = (
72+
SELECT * FROM
73+
source_name.`{self.input_topic}` WITH (
74+
FORMAT = 'json_each_row',
75+
SCHEMA (time String NOT NULL, level String NOT NULL, host String NOT NULL)
76+
)
77+
);
78+
$filtered = (SELECT * FROM $input WHERE level == 'error');
79+
80+
$number_errors = (
81+
SELECT host, COUNT(*) AS error_count, CAST(HOP_START() AS String) AS ts
82+
FROM $filtered
83+
GROUP BY
84+
HoppingWindow(CAST(time AS Timestamp), 'PT600S', 'PT600S'),
85+
host
86+
);
87+
88+
$json = (SELECT ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
89+
FROM $number_errors
90+
);
91+
92+
INSERT INTO source_name.`{self.output_topic}`
93+
SELECT * FROM $json;
94+
END DO;
95+
96+
"""
97+
session_pool.execute_with_retries(query)
98+
99+
def create_simple_streaming_query(self):
100+
logger.debug("create_simple_streaming_query")
101+
with ydb.QuerySessionPool(self.driver) as session_pool:
102+
query = f"""
103+
CREATE STREAMING QUERY `my_queries/query_name` AS DO BEGIN
104+
$input = (
105+
SELECT
106+
*
107+
FROM
108+
source_name.`{self.input_topic}` WITH (
109+
FORMAT = 'json_each_row',
110+
SCHEMA (time String NOT NULL, level String NOT NULL, host String NOT NULL)
111+
)
112+
);
113+
114+
$json = (SELECT ToBytes(Unwrap(Yson::SerializeJson(Yson::From(TableRow()))))
115+
FROM $input
116+
);
117+
118+
INSERT INTO source_name.`{self.output_topic}`
119+
SELECT * FROM $json;
120+
END DO;
121+
122+
"""
123+
session_pool.execute_with_retries(query)
124+
125+
def do_write_read(self, input, expected_output):
126+
logger.debug("do_write_read")
127+
endpoint = f"localhost:{self.cluster.nodes[1].port}"
128+
time.sleep(2)
129+
logger.debug("write data to stream")
130+
write_stream(path=self.input_topic, data=input, database=self.database_path, endpoint=endpoint)
131+
logger.debug("read data from stream")
132+
assert sorted(read_stream(
133+
path=self.output_topic,
134+
messages_count=len(expected_output),
135+
consumer_name=self.consumer_name,
136+
database=self.database_path,
137+
endpoint=endpoint)) == sorted(expected_output)
138+
139+
def do_test_part1(self):
140+
input = [
141+
'{"time": "2025-01-01T00:00:00.000000Z", "level": "error", "host": "host-1"}',
142+
'{"time": "2025-01-01T00:04:00.000000Z", "level": "error", "host": "host-2"}',
143+
'{"time": "2025-01-01T00:08:00.000000Z", "level": "error", "host": "host-1"}',
144+
'{"time": "2025-01-01T00:12:00.000000Z", "level": "error", "host": "host-2"}',
145+
'{"time": "2025-01-01T00:12:00.000000Z", "level": "error", "host": "host-1"}']
146+
expected_data = sorted([
147+
'{"error_count":1,"host":"host-2","ts":"2025-01-01T00:00:00Z"}',
148+
'{"error_count":2,"host":"host-1","ts":"2025-01-01T00:00:00Z"}'])
149+
self.do_write_read(input, expected_data)
150+
151+
def do_test_part2(self):
152+
input = [
153+
'{"time": "2025-01-01T00:15:00.000000Z", "level": "error", "host": "host-2"}',
154+
'{"time": "2025-01-01T00:22:00.000000Z", "level": "error", "host": "host-1"}',
155+
'{"time": "2025-01-01T00:22:00.000000Z", "level": "error", "host": "host-2"}']
156+
expected_data = sorted([
157+
'{"error_count":2,"host":"host-2","ts":"2025-01-01T00:10:00Z"}',
158+
'{"error_count":1,"host":"host-1","ts":"2025-01-01T00:10:00Z"}'])
159+
self.do_write_read(input, expected_data)
160+
161+
162+
class TestStreamingMixedCluster(StreamingTestBase, MixedClusterFixture):
163+
@pytest.fixture(autouse=True, scope="function")
164+
def setup(self):
165+
yield from self.setup_cluster()
166+
167+
@link_test_case("#27924")
168+
def test_mixed_cluster(self):
169+
self.create_topics()
170+
self.create_external_data_source()
171+
self.create_streaming_query()
172+
self.do_test_part1()
173+
self.do_test_part2()
174+
175+
176+
class TestStreamingRestartToAnotherVersion(StreamingTestBase, RestartToAnotherVersionFixture):
177+
@pytest.fixture(autouse=True, scope="function")
178+
def setup(self):
179+
yield from self.setup_cluster()
180+
181+
@link_test_case("#27924")
182+
def test_restart_to_another_version(self):
183+
self.create_topics()
184+
self.create_external_data_source()
185+
self.create_streaming_query()
186+
self.do_test_part1()
187+
self.change_cluster_version()
188+
self.do_test_part2()
189+
190+
191+
class TestStreamingRollingUpgradeAndDowngrade(StreamingTestBase, RollingUpgradeAndDowngradeFixture):
192+
@pytest.fixture(autouse=True, scope="function")
193+
def setup(self):
194+
yield from self.setup_cluster()
195+
196+
@link_test_case("#27924")
197+
def test_rolling_upgrage(self):
198+
self.create_topics()
199+
self.create_external_data_source()
200+
self.create_simple_streaming_query()
201+
202+
for _ in self.roll(): # every iteration is a step in rolling upgrade process
203+
#
204+
# 2. check written data is correct during rolling upgrade
205+
#
206+
input = ['{"time": "2025-01-01T00:15:00.000000Z", "level": "error", "host": "host-2"}']
207+
expected_data = ['{"host":"host-2","level":"error","time":"2025-01-01T00:15:00.000000Z"}']
208+
self.do_write_read(input, expected_data)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
PY3TEST()
2+
INCLUDE(${ARCADIA_ROOT}/ydb/tests/ydbd_dep.inc)
3+
4+
5+
FORK_TEST_FILES()
6+
FORK_TESTS()
7+
FORK_SUBTESTS()
8+
SPLIT_FACTOR(10)
9+
10+
TEST_SRCS(
11+
test_streaming.py
12+
)
13+
14+
SIZE(LARGE)
15+
REQUIREMENTS(cpu:16)
16+
INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc)
17+
18+
19+
DEPENDS(
20+
ydb/tests/library/compatibility/binaries
21+
ydb/tests/tools/pq_read
22+
)
23+
24+
PEERDIR(
25+
contrib/python/boto3
26+
ydb/tests/library
27+
ydb/tests/library/compatibility
28+
ydb/tests/library/test_meta
29+
ydb/tests/tools/datastreams_helpers
30+
)
31+
32+
END()

ydb/tests/compatibility/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,5 @@ RECURSE(
5656
federated_queries
5757
s3_backups
5858
olap
59+
streaming
5960
)

ydb/tests/fq/streaming/test_streaming.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,3 +414,109 @@ def test_pragma(self, kikimr):
414414
assert self.read_stream(1, topic_path=self.output_topic) == ['lunch time']
415415

416416
kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`")
417+
418+
def test_types(self, kikimr):
419+
sourceName = "test_types"
420+
self.init_topics(sourceName, partitions_count=1)
421+
422+
self.create_source(kikimr, sourceName)
423+
424+
query_name = "test_types1"
425+
426+
def test_type(self, kikimr, type, input, expected_output):
427+
sql = R'''
428+
CREATE STREAMING QUERY `{query_name}` AS
429+
DO BEGIN
430+
$in = SELECT field_name FROM {source_name}.`{input_topic}`
431+
WITH (
432+
FORMAT="json_each_row",
433+
SCHEMA=(field_name {type_name} NOT NULL));
434+
INSERT INTO {source_name}.`{output_topic}` SELECT CAST(field_name as String) FROM $in;
435+
END DO;'''
436+
437+
kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, type_name=type, input_topic=self.input_topic, output_topic=self.output_topic))
438+
self.write_stream([f"{{\"field_name\": {input}}}"])
439+
assert self.read_stream(1, topic_path=self.output_topic) == [expected_output]
440+
kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`")
441+
442+
test_type(self, kikimr, type="String", input='"lunch time"', expected_output='lunch time')
443+
test_type(self, kikimr, type="Utf8", input='"Relativitätstheorie"', expected_output='Relativitätstheorie')
444+
test_type(self, kikimr, type="Int8", input='42', expected_output='42')
445+
test_type(self, kikimr, type="Uint64", input='777', expected_output='777')
446+
test_type(self, kikimr, type="Float", input='1024.1024', expected_output='1024.1024')
447+
test_type(self, kikimr, type="Double", input='-777.777', expected_output='-777.777')
448+
test_type(self, kikimr, type="Bool", input='true', expected_output='true')
449+
test_type(self, kikimr, type="Uuid", input='"3d6c7233-d082-4b25-83e2-10d271bbc911"', expected_output='3d6c7233-d082-4b25-83e2-10d271bbc911')
450+
# Unsupported
451+
# test_type(self, kikimr, type="Timestamp", input='"2025-08-25 10:49:00"', expected_output='2025-08-25T10:49:00Z')
452+
# test_type(self, kikimr, type="Json", input='{"name": "value"}', expected_output='{"name": "value"}')
453+
# test_type(self, kikimr, type="JsonDocument", input='{"name": "value"}', expected_output='lunch time')
454+
455+
def test_raw_format(self, kikimr):
456+
sourceName = "test_restart_query" + ''.join(random.choices(string.ascii_letters + string.digits, k=8))
457+
self.init_topics(sourceName, partitions_count=10)
458+
self.create_source(kikimr, sourceName, False)
459+
460+
query_name = "test_raw_format_string"
461+
sql = R'''
462+
CREATE STREAMING QUERY `{query_name}` AS
463+
DO BEGIN
464+
$input = SELECT CAST(data AS Json) AS json FROM {source_name}.`{input_topic}`
465+
WITH (
466+
FORMAT="raw",
467+
SCHEMA=(data String));
468+
$parsed = SELECT JSON_VALUE(json, "$.time") as k, JSON_VALUE(json, "$.value") as v FROM $input;
469+
INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed;
470+
END DO;'''
471+
path = f"/Root/{query_name}"
472+
kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
473+
self.wait_completed_checkpoints(kikimr, path)
474+
475+
data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}']
476+
expected_data = ['{"k":"2020-01-01T13:00:00.000000Z","v":"lunch time"}']
477+
self.write_stream(data)
478+
479+
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
480+
kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`")
481+
482+
query_name = "test_raw_format_default"
483+
sql = R'''
484+
CREATE STREAMING QUERY `{query_name}` AS
485+
DO BEGIN
486+
$input = SELECT CAST(Data AS Json) AS json FROM {source_name}.`{input_topic}`;
487+
$parsed = SELECT JSON_VALUE(json, "$.time") as k, JSON_VALUE(json, "$.value") as v FROM $input;
488+
INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed;
489+
END DO;'''
490+
path = f"/Root/{query_name}"
491+
kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
492+
self.wait_completed_checkpoints(kikimr, path)
493+
494+
data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}']
495+
expected_data = ['{"k":"2020-01-01T13:00:00.000000Z","v":"lunch time"}']
496+
self.write_stream(data)
497+
498+
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
499+
kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`")
500+
501+
query_name = "test_raw_format_json"
502+
sql = R'''
503+
CREATE STREAMING QUERY `{query_name}` AS
504+
DO BEGIN
505+
$input = SELECT data AS json FROM {source_name}.`{input_topic}`
506+
WITH (
507+
FORMAT="raw",
508+
SCHEMA=(data Json));
509+
$parsed = SELECT JSON_VALUE(json, "$.time") as k, JSON_VALUE(json, "$.value") as v FROM $input;
510+
INSERT INTO {source_name}.`{output_topic}` SELECT ToBytes(Unwrap(Json::SerializeJson(Yson::From(TableRow())))) FROM $parsed;
511+
END DO;'''
512+
path = f"/Root/{query_name}"
513+
kikimr.YdbClient.query(sql.format(query_name=query_name, source_name=sourceName, input_topic=self.input_topic, output_topic=self.output_topic))
514+
self.wait_completed_checkpoints(kikimr, path)
515+
516+
data = ['{"time": "2020-01-01T13:00:00.000000Z", "value": "lunch time"}']
517+
expected_data = ['{"k":"2020-01-01T13:00:00.000000Z","v":"lunch time"}']
518+
self.write_stream(data)
519+
520+
assert self.read_stream(len(expected_data), topic_path=self.output_topic) == expected_data
521+
522+
kikimr.YdbClient.query(f"DROP STREAMING QUERY `{query_name}`")

0 commit comments

Comments
 (0)