Skip to content

Commit 62b90b0

Browse files
authored
stable-25-3-1: fix topic.partition.committed_end_to_end_lag_milliseconds_max metric LOGBROKER-10068 (#28632)
2 parents 5ff3931 + 0e8f5d4 commit 62b90b0

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,9 @@ TConsumerSnapshot TPartition::CreateSnapshot(TUserInfo& userInfo) const {
805805
if (userInfo.Offset >= static_cast<i64>(GetEndOffset())) {
806806
result.LastCommittedMessage.CreateTimestamp = now;
807807
result.LastCommittedMessage.WriteTimestamp = now;
808+
} else if (userInfo.ActualTimestamps) {
809+
result.LastCommittedMessage.CreateTimestamp = userInfo.CreateTimestamp;
810+
result.LastCommittedMessage.WriteTimestamp = userInfo.WriteTimestamp;
808811
} else {
809812
auto timestamp = GetWriteTimeEstimate(userInfo.Offset);
810813
result.LastCommittedMessage.CreateTimestamp = timestamp;
@@ -915,7 +918,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
915918
NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo();
916919
clientInfo->SetClientId(userInfo.User);
917920

918-
auto snapshot = CreateSnapshot(userInfo);
921+
const auto snapshot = CreateSnapshot(userInfo);
919922

920923
auto write = clientInfo->MutableWritePosition();
921924
write->SetOffset(userInfo.Offset);
@@ -945,7 +948,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
945948
}
946949

947950
if (ev->Get()->GetStatForAllConsumers) { //fill lags
948-
auto snapshot = CreateSnapshot(userInfo);
951+
const auto snapshot = CreateSnapshot(userInfo);
949952

950953
auto* clientInfo = result.AddConsumerResult();
951954
clientInfo->SetConsumer(userInfo.User);
@@ -1084,7 +1087,7 @@ void TPartition::Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActor
10841087
result.SetEndOffset(GetEndOffset());
10851088
result.SetResponseTimestamp(ctx.Now().MilliSeconds());
10861089
for (auto& pr : UsersInfoStorage->GetAll()) {
1087-
auto snapshot = CreateSnapshot(pr.second);
1090+
const auto snapshot = CreateSnapshot(pr.second);
10881091

10891092
TUserInfo& userInfo(pr.second);
10901093
NKikimrPQ::TClientInfo& clientInfo = *result.AddClientInfo();
@@ -1853,7 +1856,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
18531856
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
18541857
continue;
18551858
bool haveChanges = false;
1856-
auto snapshot = CreateSnapshot(userInfo);
1859+
const auto snapshot = CreateSnapshot(userInfo);
18571860

18581861
auto ts = snapshot.LastCommittedMessage.WriteTimestamp.MilliSeconds();
18591862
if (ts < MIN_TIMESTAMP_MS) {

ydb/core/persqueue/ut/ut_with_sdk/topic_ut.cpp

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
3838
return setup.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER);
3939
};
4040

41+
std::deque<TString> messagesTextQueue;
4142
auto write = [&](size_t seqNo) {
4243
TTopicClient client(setup.MakeDriver());
4344

@@ -49,9 +50,11 @@ Y_UNIT_TEST_SUITE(WithSDK) {
4950

5051
TString msgTxt = TStringBuilder() << "message_" << seqNo;
5152
TWriteMessage msg(msgTxt);
52-
msg.CreateTimestamp(TInstant::Now() - TDuration::Seconds(10 - seqNo));
53+
constexpr size_t maxSeqNo = 10;
54+
Y_ASSERT(seqNo <= maxSeqNo);
55+
msg.CreateTimestamp(TInstant::Now() - TDuration::Seconds(maxSeqNo - seqNo));
5356
UNIT_ASSERT(session->Write(std::move(msg)));
54-
57+
messagesTextQueue.push_back(msgTxt);
5558
session->Close(TDuration::Seconds(5));
5659
};
5760

@@ -78,7 +81,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
7881
Sleep(TDuration::Seconds(2));
7982
write(7);
8083
Sleep(TDuration::Seconds(2));
81-
write(11);
84+
write(10);
8285

8386
Cerr << ">>>>> Check describe for topic which contains messages, but consumer hasn`t read\n";
8487
{
@@ -100,6 +103,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
100103
}
101104

102105
UNIT_ASSERT(setup.Commit(TEST_TOPIC, TEST_CONSUMER, 0, 1).IsSuccess());
106+
messagesTextQueue.pop_front();
103107

104108
Cerr << ">>>>> Check describe for topic whis contains messages, has commited offset but hasn`t read (restart tablet for example)\n";
105109
{
@@ -115,7 +119,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
115119
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
116120
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
117121
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
118-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
122+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(2), c->GetMaxCommittedTimeLag());
119123
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3)); // why not zero?
120124
UNIT_ASSERT_VALUES_EQUAL(1, c->GetLastReadOffset());
121125
}
@@ -135,8 +139,15 @@ Y_UNIT_TEST_SUITE(WithSDK) {
135139
Cerr << ">>>>> Event = " << e->index() << Endl << Flush;
136140
}
137141
if (e && std::holds_alternative<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(e.value())) {
138-
// we must recive only one date event with second message
139-
break;
142+
for (const auto& message : std::get<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(e.value()).GetMessages()) {
143+
UNIT_ASSERT(!messagesTextQueue.empty());
144+
UNIT_ASSERT_VALUES_EQUAL(message.GetData(), messagesTextQueue.front());
145+
messagesTextQueue.pop_front();
146+
}
147+
if (messagesTextQueue.empty()) {
148+
// we must receive data events for all messages except the first one
149+
break;
150+
}
140151
} else if (e && std::holds_alternative<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(e.value())) {
141152
std::get<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(e.value()).Confirm();
142153
}
@@ -160,7 +171,7 @@ Y_UNIT_TEST_SUITE(WithSDK) {
160171
UNIT_ASSERT_VALUES_EQUAL(1, c->GetCommittedOffset());
161172
//UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(7), c->GetMaxWriteTimeLag());
162173
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(0), c->GetMaxReadTimeLag());
163-
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(4), c->GetMaxCommittedTimeLag());
174+
UNIT_ASSERT_VALUES_EQUAL(TDuration::Seconds(2), c->GetMaxCommittedTimeLag());
164175
UNIT_ASSERT_TIME_EQUAL(TInstant::Now(), c->GetLastReadTime(), TDuration::Seconds(3));
165176
UNIT_ASSERT_VALUES_EQUAL(3, c->GetLastReadOffset());
166177
}

0 commit comments

Comments
 (0)