Skip to content

Commit 56f5415

Browse files
authored
Disable retention by time for MLP consumers (#28412)
1 parent 7665309 commit 56f5415

File tree

12 files changed

+156
-28
lines changed

12 files changed

+156
-28
lines changed

ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ void TConsumerActor::CommitIfNeeded() {
331331
}
332332

333333
void TConsumerActor::UpdateStorageConfig() {
334+
LOG_D("Update config: RetentionPeriod: " << (RetentionPeriod.has_value() ? RetentionPeriod->ToString() : "infinity")
335+
<< " " << Config.ShortDebugString());
336+
334337
Storage->SetKeepMessageOrder(Config.GetKeepMessageOrder());
335338
Storage->SetMaxMessageProcessingCount(Config.GetMaxProcessingAttempts());
336339
Storage->SetRetentionPeriod(RetentionPeriod);
@@ -500,8 +503,6 @@ void TConsumerActor::ProcessEventQueue() {
500503

501504
ReadRequestsQueue = std::move(readRequestsQueue);
502505

503-
LOG_T("AfterQueueDump: " << Storage->DebugString());
504-
505506
Persist();
506507
}
507508

@@ -518,6 +519,8 @@ void TConsumerActor::Persist() {
518519

519520
Become(&TConsumerActor::StateWrite);
520521

522+
LOG_T("Dump befor persist: " << Storage->DebugString());
523+
521524
auto tryInlineChannel = [](auto& write) {
522525
if (write->GetValue().size() < 1000) {
523526
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);

ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer_ut.cpp

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ Y_UNIT_TEST(Reload) {
2929

3030
Cerr << ">>>>> BEGIN DESCRIBE" << Endl;
3131

32-
ui64 tabletId = GetTabletId(setup, "/Root", "/Root/topic1", 0);
33-
3432
Sleep(TDuration::Seconds(2));
3533

3634
Cerr << ">>>>> BEGIN READ" << Endl;
@@ -77,9 +75,8 @@ Y_UNIT_TEST(Reload) {
7775
UNIT_ASSERT_VALUES_EQUAL(result->Status, Ydb::StatusIds::SUCCESS);
7876
}
7977

80-
Cerr << ">>>>> BEGIN REBOOT " << tabletId << Endl;
81-
82-
ForwardToTablet(runtime, tabletId, runtime.AllocateEdgeActor(), new TEvents::TEvPoison());
78+
Cerr << ">>>>> BEGIN REBOOT " << Endl;
79+
ReloadPQTablet(setup, "/Root", "/Root/topic1", 0);
8380

8481
Sleep(TDuration::Seconds(2));
8582

@@ -149,6 +146,77 @@ Y_UNIT_TEST(AlterConsumer) {
149146
}
150147
}
151148

149+
Y_UNIT_TEST(RetentionStorage) {
150+
auto setup = CreateSetup();
151+
auto& runtime = setup->GetRuntime();
152+
153+
auto driver = TDriver(setup->MakeDriverConfig());
154+
auto client = TTopicClient(driver);
155+
156+
client.CreateTopic("/Root/topic1", NYdb::NTopic::TCreateTopicSettings()
157+
.RetentionStorageMb(8)
158+
.BeginAddSharedConsumer("mlp-consumer")
159+
.KeepMessagesOrder(false)
160+
.EndAddConsumer());
161+
162+
Sleep(TDuration::Seconds(1));
163+
164+
WriteMany(setup, "/Root/topic1", 0, 1_MB, 25);
165+
166+
Sleep(TDuration::Seconds(1));
167+
168+
{
169+
// check that message with offset 0 wasn`t removed by retention
170+
CreateReaderActor(runtime, TReaderSettings{
171+
.DatabasePath = "/Root",
172+
.TopicName = "/Root/topic1",
173+
.Consumer = "mlp-consumer",
174+
});
175+
auto response = GetReadResponse(runtime);
176+
UNIT_ASSERT_VALUES_EQUAL_C(response->Status, Ydb::StatusIds::SUCCESS, response->ErrorDescription);
177+
UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1);
178+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0);
179+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0);
180+
}
181+
}
182+
183+
Y_UNIT_TEST(RetentionStorageAfterReload) {
184+
auto setup = CreateSetup();
185+
auto& runtime = setup->GetRuntime();
186+
187+
auto driver = TDriver(setup->MakeDriverConfig());
188+
auto client = TTopicClient(driver);
189+
190+
client.CreateTopic("/Root/topic1", NYdb::NTopic::TCreateTopicSettings()
191+
.RetentionStorageMb(8)
192+
.BeginAddSharedConsumer("mlp-consumer")
193+
.KeepMessagesOrder(false)
194+
.EndAddConsumer());
195+
196+
Sleep(TDuration::Seconds(1));
197+
198+
WriteMany(setup, "/Root/topic1", 0, 1_MB, 25);
199+
200+
Cerr << ">>>>> BEGIN REBOOT " << Endl;
201+
ReloadPQTablet(setup, "/Root", "/Root/topic1", 0);
202+
203+
Sleep(TDuration::Seconds(2));
204+
205+
{
206+
// check that message with offset 0 wasn`t removed by retention
207+
CreateReaderActor(runtime, TReaderSettings{
208+
.DatabasePath = "/Root",
209+
.TopicName = "/Root/topic1",
210+
.Consumer = "mlp-consumer",
211+
});
212+
auto response = GetReadResponse(runtime);
213+
UNIT_ASSERT_VALUES_EQUAL_C(response->Status, Ydb::StatusIds::SUCCESS, response->ErrorDescription);
214+
UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1);
215+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0);
216+
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0);
217+
}
218+
}
219+
152220
}
153221

154222
} // namespace NKikimr::NPQ::NMLP

ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ void TMessageEnricherActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev) {
6060
reply.Offsets.pop_front();
6161
}
6262
if (reply.Offsets.empty()) {
63-
Send(reply.Sender, PendingResponse.release(), 0, reply.Cookie);
63+
if (PendingResponse->Record.MessageSize() > 0) {
64+
Send(reply.Sender, PendingResponse.release(), 0, reply.Cookie);
65+
} else {
66+
auto r = std::make_unique<TEvPQ::TEvMLPErrorResponse>(Ydb::StatusIds::INTERNAL_ERROR, "Messages was not found");
67+
Send(reply.Sender, std::move(r), 0, reply.Cookie);
68+
}
6469
PendingResponse = std::make_unique<TEvPQ::TEvMLPReadResponse>();
6570
Queue.pop_front();
6671
continue;

ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,33 @@ void TStorage::SetRetentionPeriod(std::optional<TDuration> retentionPeriod) {
3838
RetentionPeriod = retentionPeriod;
3939
}
4040

41-
std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
42-
auto dieDelta = Max<ui64>();
41+
std::optional<ui32> TStorage::GetRetentionDeadlineDelta() const {
4342
if (RetentionPeriod) {
44-
auto dieTime = TimeProvider->Now() - RetentionPeriod.value();
45-
dieDelta = dieTime > BaseWriteTimestamp ? (dieTime - BaseWriteTimestamp).Seconds() : 0;
43+
auto retentionDeadline = TrimToSeconds(TimeProvider->Now(), false) - RetentionPeriod.value();
44+
if (retentionDeadline >= BaseWriteTimestamp) {
45+
return (retentionDeadline - BaseWriteTimestamp).Seconds();
46+
}
4647
}
4748

49+
return std::nullopt;
50+
}
51+
52+
std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
53+
std::optional<ui64> retentionDeadlineDelta = GetRetentionDeadlineDelta();
54+
4855
if (!position.SlowPosition) {
4956
position.SlowPosition = SlowMessages.begin();
5057
}
5158

59+
auto retentionExpired = [&](const auto& message) {
60+
return retentionDeadlineDelta && message.WriteTimestampDelta <= retentionDeadlineDelta.value();
61+
};
62+
5263
for(; position.SlowPosition != SlowMessages.end(); ++position.SlowPosition.value()) {
5364
auto offset = position.SlowPosition.value()->first;
5465
auto& message = position.SlowPosition.value()->second;
5566
if (message.Status == EMessageStatus::Unprocessed) {
56-
if (message.WriteTimestampDelta < dieDelta) {
67+
if (retentionExpired(message)) {
5768
continue;
5869
}
5970

@@ -69,7 +80,7 @@ std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
6980
for (size_t i = std::max(position.FastPosition, FirstUnlockedOffset) - FirstOffset; i < Messages.size(); ++i) {
7081
auto& message = Messages[i];
7182
if (message.Status == EMessageStatus::Unprocessed) {
72-
if (message.WriteTimestampDelta < dieDelta) {
83+
if (retentionExpired(message)) {
7384
if (moveUnlockedOffset) {
7485
++FirstUnlockedOffset;
7586
}
@@ -86,7 +97,6 @@ std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
8697
}
8798

8899
ui64 offset = FirstOffset + i;
89-
90100
return DoLock(offset, message, deadline);
91101
} else if (moveUnlockedOffset) {
92102
++FirstUnlockedOffset;
@@ -157,18 +167,17 @@ size_t TStorage::Compact() {
157167
size_t removed = 0;
158168

159169
// Remove messages by retention
160-
if (RetentionPeriod && (TimeProvider->Now() - RetentionPeriod.value()) > BaseWriteTimestamp) {
161-
auto dieDelta = (TimeProvider->Now() - RetentionPeriod.value() - BaseWriteTimestamp).Seconds();
162-
auto dieProcessingDelta = dieDelta + 60;
170+
if (auto retentionDeadlineDelta = GetRetentionDeadlineDelta(); retentionDeadlineDelta.has_value()) {
171+
auto dieProcessingDelta = retentionDeadlineDelta.value() + 60;
163172

164173
auto canRemove = [&](auto& message) {
165174
switch (message.Status) {
166175
case EMessageStatus::Locked:
167-
return message.DeadlineDelta < dieProcessingDelta;
176+
return message.DeadlineDelta <= dieProcessingDelta;
168177
case EMessageStatus::Unprocessed:
169178
case EMessageStatus::Committed:
170179
case EMessageStatus::DLQ:
171-
return message.WriteTimestampDelta < dieDelta;
180+
return message.WriteTimestampDelta <= retentionDeadlineDelta.value();
172181
default:
173182
return false;
174183
}

ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ class TStorage {
210210

211211
void RemoveMessage(const TMessage& message);
212212

213+
std::optional<ui32> GetRetentionDeadlineDelta() const;
214+
213215
private:
214216
const TIntrusivePtr<ITimeProvider> TimeProvider;
215217

ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,30 @@ Y_UNIT_TEST(NextWithWriteRetentionPeriod) {
443443
UNIT_ASSERT_VALUES_EQUAL(metrics.DLQMessageCount, 0);
444444
}
445445

446+
Y_UNIT_TEST(NextWithInfinityRetentionPeriod) {
447+
auto timeProvider = TIntrusivePtr<MockTimeProvider>(new MockTimeProvider());
448+
449+
TStorage storage(timeProvider);
450+
storage.SetRetentionPeriod(std::nullopt);
451+
452+
storage.AddMessage(3, true, 5, timeProvider->Now());
453+
454+
// skip message by retention
455+
TStorage::TPosition position;
456+
auto result = storage.Next(timeProvider->Now() + TDuration::Seconds(1), position);
457+
UNIT_ASSERT(result.has_value());
458+
UNIT_ASSERT_VALUES_EQUAL(*result, 3);
459+
460+
auto& metrics = storage.GetMetrics();
461+
UNIT_ASSERT_VALUES_EQUAL(metrics.InflyMessageCount, 1);
462+
UNIT_ASSERT_VALUES_EQUAL(metrics.UnprocessedMessageCount, 0);
463+
UNIT_ASSERT_VALUES_EQUAL(metrics.LockedMessageCount, 1);
464+
UNIT_ASSERT_VALUES_EQUAL(metrics.LockedMessageGroupCount, 0);
465+
UNIT_ASSERT_VALUES_EQUAL(metrics.CommittedMessageCount, 0);
466+
UNIT_ASSERT_VALUES_EQUAL(metrics.DeadlineExpiredMessageCount, 0);
467+
UNIT_ASSERT_VALUES_EQUAL(metrics.DLQMessageCount, 0);
468+
}
469+
446470
Y_UNIT_TEST(SkipLockedMessage) {
447471
TStorage storage(CreateDefaultTimeProvider());
448472
{
@@ -1415,7 +1439,7 @@ Y_UNIT_TEST(CompactStorage_ByRetention) {
14151439
storage.AddMessage(4, true, 7, timeProvider->Now() + TDuration::Seconds(11));
14161440
storage.AddMessage(5, true, 11, writeTimestamp);
14171441

1418-
timeProvider->Tick(TDuration::Seconds(13));
1442+
timeProvider->Tick(TDuration::Seconds(12));
14191443

14201444
auto result = storage.Compact();
14211445
Cerr << storage.DebugString() << Endl;
@@ -1789,7 +1813,7 @@ Y_UNIT_TEST(SlowZone_Retention_1message) {
17891813
utils.AddMessage(8);
17901814
utils.Begin();
17911815

1792-
utils.TimeProvider->Tick(TDuration::Seconds(3));
1816+
utils.TimeProvider->Tick(TDuration::Seconds(2));
17931817
utils.Storage.Compact();
17941818

17951819
utils.End();
@@ -1806,7 +1830,7 @@ Y_UNIT_TEST(SlowZone_Retention_2message) {
18061830
utils.AddMessage(8);
18071831
utils.Begin();
18081832

1809-
utils.TimeProvider->Tick(TDuration::Seconds(4));
1833+
utils.TimeProvider->Tick(TDuration::Seconds(3));
18101834
utils.Storage.Compact();
18111835

18121836
utils.End();
@@ -1823,7 +1847,7 @@ Y_UNIT_TEST(SlowZone_Retention_3message) {
18231847
utils.AddMessage(8);
18241848
utils.Begin();
18251849

1826-
utils.TimeProvider->Tick(TDuration::Seconds(5));
1850+
utils.TimeProvider->Tick(TDuration::Seconds(4));
18271851
utils.Storage.Compact();
18281852

18291853
utils.End();

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ bool TPartition::ImportantConsumersNeedToKeepCurrentKey(const TDataKey& currentK
451451
return true;
452452
}
453453
}
454+
454455
return false;
455456
}
456457

@@ -3165,7 +3166,7 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
31653166
ts += TDuration::MilliSeconds(1);
31663167
}
31673168
userInfo.ReadFromTimestamp = ts;
3168-
userInfo.Important = consumer.GetImportant();
3169+
userInfo.Important = IsImportant(consumer);
31693170
userInfo.AvailabilityPeriod = TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs());
31703171

31713172
ui64 rrGen = consumer.GetGeneration();
@@ -4543,4 +4544,8 @@ void TPartition::ResetDetailedMetrics() {
45434544
MessagesWrittenPerPartition.Reset();
45444545
}
45454546

4547+
bool IsImportant(const NKikimrPQ::TPQTabletConfig::TConsumer& consumer) {
4548+
return consumer.GetImportant() || consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP;
4549+
}
4550+
45464551
} // namespace NKikimr::NPQ

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,4 +1268,6 @@ inline ui64 TPartition::GetEndOffset() const {
12681268
return BlobEncoder.EndOffset;
12691269
}
12701270

1271+
bool IsImportant(const NKikimrPQ::TPQTabletConfig::TConsumer& consumer);
1272+
12711273
} // namespace NKikimr::NPQ

ydb/core/persqueue/pqtablet/partition/partition_mlp.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ void TPartition::InitializeMLPConsumers() {
8989
}
9090
if (consumer.HasAvailabilityPeriodMs()) {
9191
return TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs());
92+
} else if (Config.GetPartitionConfig().GetStorageLimitBytes() > 0) {
93+
// retention by storage is not supported yet
94+
return std::nullopt;
9295
} else {
9396
return TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds());
9497
}

ydb/core/persqueue/pqtablet/partition/partition_read.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr&
242242
void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
243243
TSet<TString> important;
244244
for (const auto& consumer : Config.GetConsumers()) {
245-
if (!consumer.GetImportant() && !(consumer.GetAvailabilityPeriodMs() > 0)) {
245+
if (!IsImportant(consumer) && !(consumer.GetAvailabilityPeriodMs() > 0)) {
246246
continue;
247247
}
248248

@@ -253,12 +253,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
253253
if (!ImporantOrExtendedAvailabilityPeriod(*userInfo) && userInfo->LabeledCounters) {
254254
ctx.Send(TabletActorId, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
255255
}
256-
UsersInfoStorage->SetImportant(*userInfo, consumer.GetImportant(), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()));
256+
UsersInfoStorage->SetImportant(*userInfo, IsImportant(consumer), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()));
257257
continue;
258258
}
259259
if (!userInfo) {
260260
userInfo = &UsersInfoStorage->Create(
261-
ctx, consumer.GetName(), 0, consumer.GetImportant(), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()), "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false
261+
ctx, consumer.GetName(), 0, IsImportant(consumer), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()), "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false
262262
);
263263
}
264264
if (userInfo->Offset < (i64)GetStartOffset())

0 commit comments

Comments
 (0)