From 47e091ff61f8b4a7adc4e1fc2603284cf9bc4461 Mon Sep 17 00:00:00 2001 From: Konstantin Melekhov Date: Mon, 27 Oct 2025 16:52:12 +0000 Subject: [PATCH 1/2] Fix key-comp+retention test --- ydb/core/persqueue/ut/pq_ut.cpp | 49 +++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index 80b8230fa65b..1ac76113e0d4 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -5,6 +5,8 @@ #include #include +#include + #include #include @@ -1470,10 +1472,22 @@ Y_UNIT_TEST(TestTimeRetention) { }); } -Y_UNIT_TEST(TestCompactifiedWithRetention) { - // TODO(abcdef): temporarily deleted - return; +TString GetSerializedData(ui64 seqNo, const TString& payload, const TString& key) { + NKikimrPQClient::TDataChunk proto; + proto.SetSeqNo(seqNo); + proto.SetData(payload); + if (!key.empty()) { + auto *msgMeta = proto.AddMessageMeta(); + msgMeta->set_key("__key"); + msgMeta->set_value(key); + } + TString dataChunkStr; + bool res = proto.SerializeToString(&dataChunkStr); + Y_ABORT_UNLESS(res); + return dataChunkStr; +} +Y_UNIT_TEST(TestCompactifiedWithRetention) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { return tc.InitialEventsFilter.Prepare(); @@ -1485,24 +1499,29 @@ Y_UNIT_TEST(TestCompactifiedWithRetention) { tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsCount(0); - TVector> data; - activeZone = PlainOrSoSlow(true, false); - + ui64 key = 1; TString s{32, 'c'}; ui32 pp = 8 + 4 + 2 + 9; - for (ui32 i = 0; i < 10; ++i) { - data.push_back({i + 1, s.substr(pp)}); - } + auto getData = [&] () { + TVector> data; + for (ui32 i = 0; i < 10; ++i) { + data.push_back({i + 1, GetSerializedData(i + 1, s.substr(pp), ToString(key++))}); + } + return data; + }; + activeZone = PlainOrSoSlow(true, false); + + PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = true}, {}, tc); - CmdWrite(0, "sourceid0", data, tc, false, {}, true); - CmdWrite(0, "sourceid1", data, tc, false); - CmdWrite(0, "sourceid2", data, tc, false); + CmdWrite(0, "sourceid0", getData(), tc, false, {}, true); + CmdWrite(0, "sourceid1", getData(), tc, false); + CmdWrite(0, "sourceid2", getData(), tc, false); PQGetPartInfo(0, 30, tc); PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = false}, {}, tc); - CmdWrite(0, "sourceid3", data, tc, false); - CmdWrite(0, "sourceid4", data, tc, false); - CmdWrite(0, "sourceid5", data, tc, false); + CmdWrite(0, "sourceid3", getData(), tc, false); + CmdWrite(0, "sourceid4", getData(), tc, false); + CmdWrite(0, "sourceid5", getData(), tc, false); Cerr << "Get part info with compactification disabled\n"; PQGetPartInfo(50, 60, tc); }); From 413338ad67f80666602245578c70c946d0f83a1c Mon Sep 17 00:00:00 2001 From: Konstantin Melekhov Date: Fri, 7 Nov 2025 20:39:10 +0000 Subject: [PATCH 2/2] Fix tests and LastOffset bug in PQ --- .../pqtablet/partition/partition_read.cpp | 7 +++++++ ydb/core/persqueue/ut/counters_ut.cpp | 2 ++ .../ut/resources/counters_topics_extended.html | 8 ++++---- ydb/core/viewer/tests/test.py | 16 +++++++++++----- ydb/services/persqueue_v1/persqueue_ut.cpp | 8 ++++---- 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index 4d468f4d6714..ca4bd54dda53 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -522,6 +522,10 @@ TMaybe TReadInfo::AddBlobsFromBody(const TVector= LastOffset) { + needStop = true; + break; + } } else { ++PartNo; } @@ -657,6 +661,9 @@ TReadAnswer TReadInfo::FormAnswer( if (updateUsage(writeBlob)) { break; } + if (LastOffset && Offset >= LastOffset) { + break; + } } } diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index 024892d76dbc..dfa763d546e8 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -815,11 +815,13 @@ Y_UNIT_TEST(PartitionKeyCompaction) { group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600); group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000); group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600); + group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000); group = group->GetSubgroup("consumer", "__ydb_compaction_consumer"); group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(200); group->GetNamedCounter("name", "topic.partition.end_to_end_lag_milliseconds_max", false)->Set(30000); group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000); group->GetNamedCounter("name", "topic.partition.read.idle_milliseconds_max", false)->Set(300); + group->GetNamedCounter("name", "topic.partition.read.lag_milliseconds_max", false)->Set(300); TStringStream countersStr; dbGroup->OutputHtml(countersStr); diff --git a/ydb/core/persqueue/ut/resources/counters_topics_extended.html b/ydb/core/persqueue/ut/resources/counters_topics_extended.html index 4a5c11910a3b..71f154e6b903 100644 --- a/ydb/core/persqueue/ut/resources/counters_topics_extended.html +++ b/ydb/core/persqueue/ut/resources/counters_topics_extended.html @@ -25,7 +25,7 @@ name=topic.partition.producers_count_max: 1 name=topic.partition.read.inflight_throttled_microseconds_max: 0 name=topic.partition.read.speed_limit_bytes_per_second: 20000000000 - name=topic.partition.read.throttled_microseconds_max: 148 + name=topic.partition.read.throttled_microseconds_max: 2000 name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0 name=topic.partition.read_without_consumer.throttled_microseconds_max: 0 name=topic.partition.storage_bytes_max: 7549747200 @@ -50,12 +50,12 @@ name=topic.partition.committed_read_lag_milliseconds_max: 0 name=topic.partition.end_to_end_lag_milliseconds_max: 30000 name=topic.partition.read.idle_milliseconds_max: 300 - name=topic.partition.read.lag_messages_max: 0 - name=topic.partition.read.lag_milliseconds_max: 0 + name=topic.partition.read.lag_messages_max: 1 + name=topic.partition.read.lag_milliseconds_max: 300 name=topic.partition.read.speed_limit_bytes_per_second: 4194304 name=topic.partition.read.throttled_microseconds_max: 2000 name=topic.partition.write.lag_milliseconds_max: 200 - name=topic.read.lag_messages: 0 + name=topic.read.lag_messages: 1 consumer=client: name=topic.partition.alive_count: 0 diff --git a/ydb/core/viewer/tests/test.py b/ydb/core/viewer/tests/test.py index bda59d605c3a..b75f51182029 100644 --- a/ydb/core/viewer/tests/test.py +++ b/ydb/core/viewer/tests/test.py @@ -1025,11 +1025,17 @@ def test_topic_data_cdc(cls): 'query': "alter table table_test_topic_data_cdc add changefeed updates_feed WITH (FORMAT = 'JSON', MODE = 'UPDATES', INITIAL_SCAN = TRUE)" }) - insert_response = cls.call_viewer("/viewer/query", { - 'database': cls.dedicated_db, - 'query': 'insert into table_test_topic_data_cdc(id, name) values(11, "elleven")', - 'schema': 'multi' - }) + insert_response = None + for i in range(3): + insert_response = cls.call_viewer("/viewer/query", { + 'database': cls.dedicated_db, + 'query': 'insert into table_test_topic_data_cdc(id, name) values(11, "elleven")', + 'schema': 'multi' + }) + if 'error' in insert_response and insert_response['error']['issue_code'] == 2034: + continue + else: + break update_response = cls.call_viewer("/viewer/query", { 'database': cls.dedicated_db, diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 8a6552b1d2ac..5b1d3c87e0ca 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3266,9 +3266,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << ">>>>> 2" << Endl << Flush; auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16); - UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 2); - UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 2); - UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromDisk + info16.BlobsFromDisk, 1); + UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 1); + UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 1); + UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromDisk + info16.BlobsFromDisk, 3); for (ui32 i = 0; i < 8; ++i) server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value); @@ -3341,7 +3341,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(info_half1.BlobsFromCache > 0); UNIT_ASSERT(info_half2.BlobsFromCache > 0); - UNIT_ASSERT_VALUES_EQUAL(info_half1.BlobsFromDisk, 0); + UNIT_ASSERT_VALUES_EQUAL(info_half1.BlobsFromDisk, 1); //Because of async compaction UNIT_ASSERT_VALUES_EQUAL(info_half2.BlobsFromDisk, 0); } }