Skip to content

Commit f6bba88

Browse files
Fix LastOffset bug and some tests (#28415)
1 parent fbef964 commit f6bba88

File tree

6 files changed

+62
-28
lines changed

6 files changed

+62
-28
lines changed

ydb/core/persqueue/pqtablet/partition/partition_read.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,10 @@ TMaybe<TReadAnswer> TReadInfo::AddBlobsFromBody(const TVector<NPQ::TRequestedBlo
513513
if (res.IsLastPart()) {
514514
PartNo = 0;
515515
++Offset;
516+
if (LastOffset && Offset >= LastOffset) {
517+
needStop = true;
518+
break;
519+
}
516520
} else {
517521
++PartNo;
518522
}
@@ -651,6 +655,9 @@ TReadAnswer TReadInfo::FormAnswer(
651655
if (updateUsage(writeBlob)) {
652656
break;
653657
}
658+
if (LastOffset && Offset >= LastOffset) {
659+
break;
660+
}
654661
}
655662
}
656663

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,11 +815,13 @@ Y_UNIT_TEST(PartitionKeyCompaction) {
815815
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
816816
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
817817
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
818+
group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000);
818819
group = group->GetSubgroup("consumer", "__ydb_compaction_consumer");
819820
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(200);
820821
group->GetNamedCounter("name", "topic.partition.end_to_end_lag_milliseconds_max", false)->Set(30000);
821822
group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000);
822823
group->GetNamedCounter("name", "topic.partition.read.idle_milliseconds_max", false)->Set(300);
824+
group->GetNamedCounter("name", "topic.partition.read.lag_milliseconds_max", false)->Set(300);
823825

824826
TStringStream countersStr;
825827
dbGroup->OutputHtml(countersStr);

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
66
#include <ydb/core/security/ticket_parser.h>
77

8+
#include <ydb/core/protos/grpc_pq_old.pb.h>
9+
810
#include <ydb/core/testlib/fake_scheme_shard.h>
911
#include <ydb/core/testlib/tablet_helpers.h>
1012

@@ -1530,10 +1532,22 @@ Y_UNIT_TEST(TestTimeRetention) {
15301532
});
15311533
}
15321534

1533-
Y_UNIT_TEST(TestCompactifiedWithRetention) {
1534-
// TODO(abcdef): temporarily deleted
1535-
return;
1535+
TString GetSerializedData(ui64 seqNo, const TString& payload, const TString& key) {
1536+
NKikimrPQClient::TDataChunk proto;
1537+
proto.SetSeqNo(seqNo);
1538+
proto.SetData(payload);
1539+
if (!key.empty()) {
1540+
auto *msgMeta = proto.AddMessageMeta();
1541+
msgMeta->set_key("__key");
1542+
msgMeta->set_value(key);
1543+
}
1544+
TString dataChunkStr;
1545+
bool res = proto.SerializeToString(&dataChunkStr);
1546+
Y_ABORT_UNLESS(res);
1547+
return dataChunkStr;
1548+
}
15361549

1550+
Y_UNIT_TEST(TestCompactifiedWithRetention) {
15371551
TTestContext tc;
15381552
RunTestWithReboots(tc.TabletIds, [&]() {
15391553
return tc.InitialEventsFilter.Prepare();
@@ -1545,24 +1559,29 @@ Y_UNIT_TEST(TestCompactifiedWithRetention) {
15451559

15461560
tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsCount(0);
15471561

1548-
TVector<std::pair<ui64, TString>> data;
1549-
activeZone = PlainOrSoSlow(true, false);
1550-
1562+
ui64 key = 1;
15511563
TString s{32, 'c'};
15521564
ui32 pp = 8 + 4 + 2 + 9;
1553-
for (ui32 i = 0; i < 10; ++i) {
1554-
data.push_back({i + 1, s.substr(pp)});
1555-
}
1565+
auto getData = [&] () {
1566+
TVector<std::pair<ui64, TString>> data;
1567+
for (ui32 i = 0; i < 10; ++i) {
1568+
data.push_back({i + 1, GetSerializedData(i + 1, s.substr(pp), ToString(key++))});
1569+
}
1570+
return data;
1571+
};
1572+
activeZone = PlainOrSoSlow(true, false);
1573+
1574+
15561575
PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = true}, {}, tc);
1557-
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
1558-
CmdWrite(0, "sourceid1", data, tc, false);
1559-
CmdWrite(0, "sourceid2", data, tc, false);
1576+
CmdWrite(0, "sourceid0", getData(), tc, false, {}, true);
1577+
CmdWrite(0, "sourceid1", getData(), tc, false);
1578+
CmdWrite(0, "sourceid2", getData(), tc, false);
15601579
PQGetPartInfo(0, 30, tc);
15611580

15621581
PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100, .enableCompactificationByKey = false}, {}, tc);
1563-
CmdWrite(0, "sourceid3", data, tc, false);
1564-
CmdWrite(0, "sourceid4", data, tc, false);
1565-
CmdWrite(0, "sourceid5", data, tc, false);
1582+
CmdWrite(0, "sourceid3", getData(), tc, false);
1583+
CmdWrite(0, "sourceid4", getData(), tc, false);
1584+
CmdWrite(0, "sourceid5", getData(), tc, false);
15661585
Cerr << "Get part info with compactification disabled\n";
15671586
PQGetPartInfo(50, 60, tc);
15681587
});

ydb/core/persqueue/ut/resources/counters_topics_extended.html

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
name=topic.partition.producers_count_max: 1
2929
name=topic.partition.read.inflight_throttled_microseconds_max: 0
3030
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
31-
name=topic.partition.read.throttled_microseconds_max: 148
31+
name=topic.partition.read.throttled_microseconds_max: 2000
3232
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
3333
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
3434
name=topic.partition.storage_bytes_max: 7549747200
@@ -53,12 +53,12 @@
5353
name=topic.partition.committed_read_lag_milliseconds_max: 0
5454
name=topic.partition.end_to_end_lag_milliseconds_max: 30000
5555
name=topic.partition.read.idle_milliseconds_max: 300
56-
name=topic.partition.read.lag_messages_max: 0
57-
name=topic.partition.read.lag_milliseconds_max: 0
56+
name=topic.partition.read.lag_messages_max: 1
57+
name=topic.partition.read.lag_milliseconds_max: 300
5858
name=topic.partition.read.speed_limit_bytes_per_second: 4194304
5959
name=topic.partition.read.throttled_microseconds_max: 2000
6060
name=topic.partition.write.lag_milliseconds_max: 200
61-
name=topic.read.lag_messages: 0
61+
name=topic.read.lag_messages: 1
6262

6363
consumer=client:
6464
name=topic.partition.alive_count: 0

ydb/core/viewer/tests/test.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,11 +1025,17 @@ def test_topic_data_cdc(cls):
10251025
'query': "alter table table_test_topic_data_cdc add changefeed updates_feed WITH (FORMAT = 'JSON', MODE = 'UPDATES', INITIAL_SCAN = TRUE)"
10261026
})
10271027

1028-
insert_response = cls.call_viewer("/viewer/query", {
1029-
'database': cls.dedicated_db,
1030-
'query': 'insert into table_test_topic_data_cdc(id, name) values(11, "elleven")',
1031-
'schema': 'multi'
1032-
})
1028+
insert_response = None
1029+
for i in range(3):
1030+
insert_response = cls.call_viewer("/viewer/query", {
1031+
'database': cls.dedicated_db,
1032+
'query': 'insert into table_test_topic_data_cdc(id, name) values(11, "elleven")',
1033+
'schema': 'multi'
1034+
})
1035+
if 'error' in insert_response and insert_response['error']['issue_code'] == 2034:
1036+
continue
1037+
else:
1038+
break
10331039

10341040
update_response = cls.call_viewer("/viewer/query", {
10351041
'database': cls.dedicated_db,

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3266,9 +3266,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
32663266
Cerr << ">>>>> 2" << Endl << Flush;
32673267
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
32683268

3269-
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 2);
3270-
UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 2);
3271-
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromDisk + info16.BlobsFromDisk, 1);
3269+
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 1);
3270+
UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 1);
3271+
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromDisk + info16.BlobsFromDisk, 3);
32723272

32733273
for (ui32 i = 0; i < 8; ++i)
32743274
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);
@@ -3341,7 +3341,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
33413341

33423342
UNIT_ASSERT(info_half1.BlobsFromCache > 0);
33433343
UNIT_ASSERT(info_half2.BlobsFromCache > 0);
3344-
UNIT_ASSERT_VALUES_EQUAL(info_half1.BlobsFromDisk, 0);
3344+
UNIT_ASSERT_VALUES_EQUAL(info_half1.BlobsFromDisk, 1); //Because of async compaction
33453345
UNIT_ASSERT_VALUES_EQUAL(info_half2.BlobsFromDisk, 0);
33463346
}
33473347
}

0 commit comments

Comments
 (0)