Skip to content

Commit 4f77aed

Browse files
Fix LastOffset bug in PQ (#28667)
2 parents 84e0ea6 + 8c23658 commit 4f77aed

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

ydb/core/persqueue/pqtablet/partition/partition_read.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,10 @@ TMaybe<TReadAnswer> TReadInfo::AddBlobsFromBody(const TVector<NPQ::TRequestedBlo
504504
if (res.IsLastPart()) {
505505
PartNo = 0;
506506
++Offset;
507+
if (LastOffset && Offset >= LastOffset) {
508+
needStop = true;
509+
break;
510+
}
507511
} else {
508512
++PartNo;
509513
}
@@ -640,6 +644,9 @@ TReadAnswer TReadInfo::FormAnswer(
640644
if (updateUsage(writeBlob)) {
641645
break;
642646
}
647+
if (LastOffset && Offset >= LastOffset) {
648+
break;
649+
}
643650
}
644651
}
645652

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,11 +815,13 @@ Y_UNIT_TEST(PartitionKeyCompaction) {
815815
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
816816
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
817817
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
818+
group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000);
818819
group = group->GetSubgroup("consumer", "__ydb_compaction_consumer");
819820
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(200);
820821
group->GetNamedCounter("name", "topic.partition.end_to_end_lag_milliseconds_max", false)->Set(30000);
821822
group->GetNamedCounter("name", "topic.partition.read.throttled_microseconds_max", false)->Set(2000);
822823
group->GetNamedCounter("name", "topic.partition.read.idle_milliseconds_max", false)->Set(300);
824+
group->GetNamedCounter("name", "topic.partition.read.lag_milliseconds_max", false)->Set(300);
823825

824826
TStringStream countersStr;
825827
dbGroup->OutputHtml(countersStr);

ydb/core/persqueue/ut/resources/counters_topics_extended.html

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
name=topic.partition.producers_count_max: 1
2929
name=topic.partition.read.inflight_throttled_microseconds_max: 0
3030
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
31-
name=topic.partition.read.throttled_microseconds_max: 148
31+
name=topic.partition.read.throttled_microseconds_max: 2000
3232
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
3333
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
3434
name=topic.partition.storage_bytes_max: 7549747200
@@ -53,12 +53,12 @@
5353
name=topic.partition.committed_read_lag_milliseconds_max: 0
5454
name=topic.partition.end_to_end_lag_milliseconds_max: 30000
5555
name=topic.partition.read.idle_milliseconds_max: 300
56-
name=topic.partition.read.lag_messages_max: 0
57-
name=topic.partition.read.lag_milliseconds_max: 0
56+
name=topic.partition.read.lag_messages_max: 1
57+
name=topic.partition.read.lag_milliseconds_max: 300
5858
name=topic.partition.read.speed_limit_bytes_per_second: 4194304
5959
name=topic.partition.read.throttled_microseconds_max: 2000
6060
name=topic.partition.write.lag_milliseconds_max: 200
61-
name=topic.read.lag_messages: 0
61+
name=topic.read.lag_messages: 1
6262

6363
consumer=client:
6464
name=topic.partition.alive_count: 0

0 commit comments

Comments
 (0)