Skip to content

Commit 413338a

Browse files
Fix tests and LastOffset bug in PQ
1 parent 47e091f commit 413338a

File tree

5 files changed

+28
-13
lines changed

5 files changed

+28
-13
lines changed

ydb/core/persqueue/pqtablet/partition/partition_read.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,10 @@ TMaybe<TReadAnswer> TReadInfo::AddBlobsFromBody(const TVector<NPQ::TRequestedBlo
522522
if (res.IsLastPart()) {
523523
PartNo = 0;
524524
++Offset;
525+
if (LastOffset && Offset >= LastOffset) {
526+
needStop = true;
527+
break;
528+
}
525529
} else {
526530
++PartNo;
527531
}
@@ -657,6 +661,9 @@ TReadAnswer TReadInfo::FormAnswer(
657661
if (updateUsage(writeBlob)) {
658662
break;
659663
}
664+
if (LastOffset && Offset >= LastOffset) {
665+
break;
666+
}
660667
}
661668
}
662669

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,11 +815,13 @@ Y_UNIT_TEST(PartitionKeyCompaction) {
815815
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
816816
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
817817
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
818+
group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000);
818819
group = group->GetSubgroup("consumer", "__ydb_compaction_consumer");
819820
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(200);
820821
group->GetNamedCounter("name", "topic.partition.end_to_end_lag_milliseconds_max", false)->Set(30000);
821822
group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000);
822823
group->GetNamedCounter("name", "topic.partition.read.idle_milliseconds_max", false)->Set(300);
824+
group->GetNamedCounter("name", "topic.partition.read.lag_milliseconds_max", false)->Set(300);
823825

824826
TStringStream countersStr;
825827
dbGroup->OutputHtml(countersStr);

ydb/core/persqueue/ut/resources/counters_topics_extended.html

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
name=topic.partition.producers_count_max: 1
2626
name=topic.partition.read.inflight_throttled_microseconds_max: 0
2727
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
28-
name=topic.partition.read.throttled_microseconds_max: 148
28+
name=topic.partition.read.throttled_microseconds_max: 2000
2929
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
3030
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
3131
name=topic.partition.storage_bytes_max: 7549747200
@@ -50,12 +50,12 @@
5050
name=topic.partition.committed_read_lag_milliseconds_max: 0
5151
name=topic.partition.end_to_end_lag_milliseconds_max: 30000
5252
name=topic.partition.read.idle_milliseconds_max: 300
53-
name=topic.partition.read.lag_messages_max: 0
54-
name=topic.partition.read.lag_milliseconds_max: 0
53+
name=topic.partition.read.lag_messages_max: 1
54+
name=topic.partition.read.lag_milliseconds_max: 300
5555
name=topic.partition.read.speed_limit_bytes_per_second: 4194304
5656
name=topic.partition.read.throttled_microseconds_max: 2000
5757
name=topic.partition.write.lag_milliseconds_max: 200
58-
name=topic.read.lag_messages: 0
58+
name=topic.read.lag_messages: 1
5959

6060
consumer=client:
6161
name=topic.partition.alive_count: 0

ydb/core/viewer/tests/test.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,11 +1025,17 @@ def test_topic_data_cdc(cls):
10251025
'query': "alter table table_test_topic_data_cdc add changefeed updates_feed WITH (FORMAT = 'JSON', MODE = 'UPDATES', INITIAL_SCAN = TRUE)"
10261026
})
10271027

1028-
insert_response = cls.call_viewer("/viewer/query", {
1029-
'database': cls.dedicated_db,
1030-
'query': 'insert into table_test_topic_data_cdc(id, name) values(11, "elleven")',
1031-
'schema': 'multi'
1032-
})
1028+
insert_response = None
1029+
for i in range(3):
1030+
insert_response = cls.call_viewer("/viewer/query", {
1031+
'database': cls.dedicated_db,
1032+
'query': 'insert into table_test_topic_data_cdc(id, name) values(11, "elleven")',
1033+
'schema': 'multi'
1034+
})
1035+
if 'error' in insert_response and insert_response['error']['issue_code'] == 2034:
1036+
continue
1037+
else:
1038+
break
10331039

10341040
update_response = cls.call_viewer("/viewer/query", {
10351041
'database': cls.dedicated_db,

ydb/services/persqueue_v1/persqueue_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3266,9 +3266,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
32663266
Cerr << ">>>>> 2" << Endl << Flush;
32673267
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
32683268

3269-
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 2);
3270-
UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 2);
3271-
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromDisk + info16.BlobsFromDisk, 1);
3269+
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 1);
3270+
UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 1);
3271+
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromDisk + info16.BlobsFromDisk, 3);
32723272

32733273
for (ui32 i = 0; i < 8; ++i)
32743274
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);
@@ -3341,7 +3341,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
33413341

33423342
UNIT_ASSERT(info_half1.BlobsFromCache > 0);
33433343
UNIT_ASSERT(info_half2.BlobsFromCache > 0);
3344-
UNIT_ASSERT_VALUES_EQUAL(info_half1.BlobsFromDisk, 0);
3344+
UNIT_ASSERT_VALUES_EQUAL(info_half1.BlobsFromDisk, 1); //Because of async compaction
33453345
UNIT_ASSERT_VALUES_EQUAL(info_half2.BlobsFromDisk, 0);
33463346
}
33473347
}

0 commit comments

Comments
 (0)