diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp index 33ddc9e2c433..2cc53048478a 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp @@ -331,6 +331,9 @@ void TConsumerActor::CommitIfNeeded() { } void TConsumerActor::UpdateStorageConfig() { + LOG_D("Update config: RetentionPeriod: " << (RetentionPeriod.has_value() ? RetentionPeriod->ToString() : "infinity") + << " " << Config.ShortDebugString()); + Storage->SetKeepMessageOrder(Config.GetKeepMessageOrder()); Storage->SetMaxMessageProcessingCount(Config.GetMaxProcessingAttempts()); Storage->SetRetentionPeriod(RetentionPeriod); @@ -500,8 +503,6 @@ void TConsumerActor::ProcessEventQueue() { ReadRequestsQueue = std::move(readRequestsQueue); - LOG_T("AfterQueueDump: " << Storage->DebugString()); - Persist(); } @@ -518,6 +519,8 @@ void TConsumerActor::Persist() { Become(&TConsumerActor::StateWrite); + LOG_T("Dump befor persist: " << Storage->DebugString()); + auto tryInlineChannel = [](auto& write) { if (write->GetValue().size() < 1000) { write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer_ut.cpp index 4ec4a0fc90cc..705fa037f22a 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer_ut.cpp @@ -29,8 +29,6 @@ Y_UNIT_TEST(Reload) { Cerr << ">>>>> BEGIN DESCRIBE" << Endl; - ui64 tabletId = GetTabletId(setup, "/Root", "/Root/topic1", 0); - Sleep(TDuration::Seconds(2)); Cerr << ">>>>> BEGIN READ" << Endl; @@ -77,9 +75,8 @@ Y_UNIT_TEST(Reload) { UNIT_ASSERT_VALUES_EQUAL(result->Status, Ydb::StatusIds::SUCCESS); } - Cerr << ">>>>> BEGIN REBOOT " << tabletId << Endl; - - ForwardToTablet(runtime, tabletId, runtime.AllocateEdgeActor(), new TEvents::TEvPoison()); + Cerr << ">>>>> BEGIN REBOOT " << Endl; + ReloadPQTablet(setup, "/Root", "/Root/topic1", 0); Sleep(TDuration::Seconds(2)); @@ -149,6 +146,77 @@ Y_UNIT_TEST(AlterConsumer) { } } +Y_UNIT_TEST(RetentionStorage) { + auto setup = CreateSetup(); + auto& runtime = setup->GetRuntime(); + + auto driver = TDriver(setup->MakeDriverConfig()); + auto client = TTopicClient(driver); + + client.CreateTopic("/Root/topic1", NYdb::NTopic::TCreateTopicSettings() + .RetentionStorageMb(8) + .BeginAddSharedConsumer("mlp-consumer") + .KeepMessagesOrder(false) + .EndAddConsumer()); + + Sleep(TDuration::Seconds(1)); + + WriteMany(setup, "/Root/topic1", 0, 1_MB, 25); + + Sleep(TDuration::Seconds(1)); + + { + // check that message with offset 0 wasn`t removed by retention + CreateReaderActor(runtime, TReaderSettings{ + .DatabasePath = "/Root", + .TopicName = "/Root/topic1", + .Consumer = "mlp-consumer", + }); + auto response = GetReadResponse(runtime); + UNIT_ASSERT_VALUES_EQUAL_C(response->Status, Ydb::StatusIds::SUCCESS, response->ErrorDescription); + UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0); + UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0); + } +} + +Y_UNIT_TEST(RetentionStorageAfterReload) { + auto setup = CreateSetup(); + auto& runtime = setup->GetRuntime(); + + auto driver = TDriver(setup->MakeDriverConfig()); + auto client = TTopicClient(driver); + + client.CreateTopic("/Root/topic1", NYdb::NTopic::TCreateTopicSettings() + .RetentionStorageMb(8) + .BeginAddSharedConsumer("mlp-consumer") + .KeepMessagesOrder(false) + .EndAddConsumer()); + + Sleep(TDuration::Seconds(1)); + + WriteMany(setup, "/Root/topic1", 0, 1_MB, 25); + + Cerr << ">>>>> BEGIN REBOOT " << Endl; + ReloadPQTablet(setup, "/Root", "/Root/topic1", 0); + + Sleep(TDuration::Seconds(2)); + + { + // check that message with offset 0 wasn`t removed by retention + CreateReaderActor(runtime, TReaderSettings{ + .DatabasePath = "/Root", + .TopicName = "/Root/topic1", + .Consumer = "mlp-consumer", + }); + auto response = GetReadResponse(runtime); + UNIT_ASSERT_VALUES_EQUAL_C(response->Status, Ydb::StatusIds::SUCCESS, response->ErrorDescription); + UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0); + UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0); + } +} + } } // namespace NKikimr::NPQ::NMLP diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.cpp index 450db4fac839..015fd7d65c5c 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_message_enricher.cpp @@ -60,7 +60,12 @@ void TMessageEnricherActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev) { reply.Offsets.pop_front(); } if (reply.Offsets.empty()) { - Send(reply.Sender, PendingResponse.release(), 0, reply.Cookie); + if (PendingResponse->Record.MessageSize() > 0) { + Send(reply.Sender, PendingResponse.release(), 0, reply.Cookie); + } else { + auto r = std::make_unique(Ydb::StatusIds::INTERNAL_ERROR, "Messages was not found"); + Send(reply.Sender, std::move(r), 0, reply.Cookie); + } PendingResponse = std::make_unique(); Queue.pop_front(); continue; diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp index e080086837a3..bfa33ecb577b 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp @@ -38,22 +38,33 @@ void TStorage::SetRetentionPeriod(std::optional retentionPeriod) { RetentionPeriod = retentionPeriod; } -std::optional TStorage::Next(TInstant deadline, TPosition& position) { - auto dieDelta = Max(); +std::optional TStorage::GetRetentionDeadlineDelta() const { if (RetentionPeriod) { - auto dieTime = TimeProvider->Now() - RetentionPeriod.value(); - dieDelta = dieTime > BaseWriteTimestamp ? (dieTime - BaseWriteTimestamp).Seconds() : 0; + auto retentionDeadline = TrimToSeconds(TimeProvider->Now(), false) - RetentionPeriod.value(); + if (retentionDeadline >= BaseWriteTimestamp) { + return (retentionDeadline - BaseWriteTimestamp).Seconds(); + } } + return std::nullopt; +} + +std::optional TStorage::Next(TInstant deadline, TPosition& position) { + std::optional retentionDeadlineDelta = GetRetentionDeadlineDelta(); + if (!position.SlowPosition) { position.SlowPosition = SlowMessages.begin(); } + auto retentionExpired = [&](const auto& message) { + return retentionDeadlineDelta && message.WriteTimestampDelta <= retentionDeadlineDelta.value(); + }; + for(; position.SlowPosition != SlowMessages.end(); ++position.SlowPosition.value()) { auto offset = position.SlowPosition.value()->first; auto& message = position.SlowPosition.value()->second; if (message.Status == EMessageStatus::Unprocessed) { - if (message.WriteTimestampDelta < dieDelta) { + if (retentionExpired(message)) { continue; } @@ -69,7 +80,7 @@ std::optional TStorage::Next(TInstant deadline, TPosition& position) { for (size_t i = std::max(position.FastPosition, FirstUnlockedOffset) - FirstOffset; i < Messages.size(); ++i) { auto& message = Messages[i]; if (message.Status == EMessageStatus::Unprocessed) { - if (message.WriteTimestampDelta < dieDelta) { + if (retentionExpired(message)) { if (moveUnlockedOffset) { ++FirstUnlockedOffset; } @@ -86,7 +97,6 @@ std::optional TStorage::Next(TInstant deadline, TPosition& position) { } ui64 offset = FirstOffset + i; - return DoLock(offset, message, deadline); } else if (moveUnlockedOffset) { ++FirstUnlockedOffset; @@ -157,18 +167,17 @@ size_t TStorage::Compact() { size_t removed = 0; // Remove messages by retention - if (RetentionPeriod && (TimeProvider->Now() - RetentionPeriod.value()) > BaseWriteTimestamp) { - auto dieDelta = (TimeProvider->Now() - RetentionPeriod.value() - BaseWriteTimestamp).Seconds(); - auto dieProcessingDelta = dieDelta + 60; + if (auto retentionDeadlineDelta = GetRetentionDeadlineDelta(); retentionDeadlineDelta.has_value()) { + auto dieProcessingDelta = retentionDeadlineDelta.value() + 60; auto canRemove = [&](auto& message) { switch (message.Status) { case EMessageStatus::Locked: - return message.DeadlineDelta < dieProcessingDelta; + return message.DeadlineDelta <= dieProcessingDelta; case EMessageStatus::Unprocessed: case EMessageStatus::Committed: case EMessageStatus::DLQ: - return message.WriteTimestampDelta < dieDelta; + return message.WriteTimestampDelta <= retentionDeadlineDelta.value(); default: return false; } diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h index d0422b70488b..58bb74b5be04 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h @@ -210,6 +210,8 @@ class TStorage { void RemoveMessage(const TMessage& message); + std::optional GetRetentionDeadlineDelta() const; + private: const TIntrusivePtr TimeProvider; diff --git a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp index 5a60ad72ccf1..6be8abc2a012 100644 --- a/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp +++ b/ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp @@ -443,6 +443,30 @@ Y_UNIT_TEST(NextWithWriteRetentionPeriod) { UNIT_ASSERT_VALUES_EQUAL(metrics.DLQMessageCount, 0); } +Y_UNIT_TEST(NextWithInfinityRetentionPeriod) { + auto timeProvider = TIntrusivePtr(new MockTimeProvider()); + + TStorage storage(timeProvider); + storage.SetRetentionPeriod(std::nullopt); + + storage.AddMessage(3, true, 5, timeProvider->Now()); + + // skip message by retention + TStorage::TPosition position; + auto result = storage.Next(timeProvider->Now() + TDuration::Seconds(1), position); + UNIT_ASSERT(result.has_value()); + UNIT_ASSERT_VALUES_EQUAL(*result, 3); + + auto& metrics = storage.GetMetrics(); + UNIT_ASSERT_VALUES_EQUAL(metrics.InflyMessageCount, 1); + UNIT_ASSERT_VALUES_EQUAL(metrics.UnprocessedMessageCount, 0); + UNIT_ASSERT_VALUES_EQUAL(metrics.LockedMessageCount, 1); + UNIT_ASSERT_VALUES_EQUAL(metrics.LockedMessageGroupCount, 0); + UNIT_ASSERT_VALUES_EQUAL(metrics.CommittedMessageCount, 0); + UNIT_ASSERT_VALUES_EQUAL(metrics.DeadlineExpiredMessageCount, 0); + UNIT_ASSERT_VALUES_EQUAL(metrics.DLQMessageCount, 0); +} + Y_UNIT_TEST(SkipLockedMessage) { TStorage storage(CreateDefaultTimeProvider()); { @@ -1415,7 +1439,7 @@ Y_UNIT_TEST(CompactStorage_ByRetention) { storage.AddMessage(4, true, 7, timeProvider->Now() + TDuration::Seconds(11)); storage.AddMessage(5, true, 11, writeTimestamp); - timeProvider->Tick(TDuration::Seconds(13)); + timeProvider->Tick(TDuration::Seconds(12)); auto result = storage.Compact(); Cerr << storage.DebugString() << Endl; @@ -1789,7 +1813,7 @@ Y_UNIT_TEST(SlowZone_Retention_1message) { utils.AddMessage(8); utils.Begin(); - utils.TimeProvider->Tick(TDuration::Seconds(3)); + utils.TimeProvider->Tick(TDuration::Seconds(2)); utils.Storage.Compact(); utils.End(); @@ -1806,7 +1830,7 @@ Y_UNIT_TEST(SlowZone_Retention_2message) { utils.AddMessage(8); utils.Begin(); - utils.TimeProvider->Tick(TDuration::Seconds(4)); + utils.TimeProvider->Tick(TDuration::Seconds(3)); utils.Storage.Compact(); utils.End(); @@ -1823,7 +1847,7 @@ Y_UNIT_TEST(SlowZone_Retention_3message) { utils.AddMessage(8); utils.Begin(); - utils.TimeProvider->Tick(TDuration::Seconds(5)); + utils.TimeProvider->Tick(TDuration::Seconds(4)); utils.Storage.Compact(); utils.End(); diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index c9e50cefd075..f6458a8740a2 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -451,6 +451,7 @@ bool TPartition::ImportantConsumersNeedToKeepCurrentKey(const TDataKey& currentK return true; } } + return false; } @@ -3165,7 +3166,7 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co ts += TDuration::MilliSeconds(1); } userInfo.ReadFromTimestamp = ts; - userInfo.Important = consumer.GetImportant(); + userInfo.Important = IsImportant(consumer); userInfo.AvailabilityPeriod = TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()); ui64 rrGen = consumer.GetGeneration(); @@ -4543,4 +4544,8 @@ void TPartition::ResetDetailedMetrics() { MessagesWrittenPerPartition.Reset(); } +bool IsImportant(const NKikimrPQ::TPQTabletConfig::TConsumer& consumer) { + return consumer.GetImportant() || consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP; +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h index 7bc679a681aa..7d17740f5cde 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.h +++ b/ydb/core/persqueue/pqtablet/partition/partition.h @@ -1268,4 +1268,6 @@ inline ui64 TPartition::GetEndOffset() const { return BlobEncoder.EndOffset; } +bool IsImportant(const NKikimrPQ::TPQTabletConfig::TConsumer& consumer); + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/pqtablet/partition/partition_mlp.cpp b/ydb/core/persqueue/pqtablet/partition/partition_mlp.cpp index b68d84f436ff..ca71132546aa 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_mlp.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_mlp.cpp @@ -89,6 +89,9 @@ void TPartition::InitializeMLPConsumers() { } if (consumer.HasAvailabilityPeriodMs()) { return TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()); + } else if (Config.GetPartitionConfig().GetStorageLimitBytes() > 0) { + // retention by storage is not supported yet + return std::nullopt; } else { return TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()); } diff --git a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp index a91996652fc7..71ab6d7ee50e 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_read.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_read.cpp @@ -242,7 +242,7 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet important; for (const auto& consumer : Config.GetConsumers()) { - if (!consumer.GetImportant() && !(consumer.GetAvailabilityPeriodMs() > 0)) { + if (!IsImportant(consumer) && !(consumer.GetAvailabilityPeriodMs() > 0)) { continue; } @@ -253,12 +253,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { if (!ImporantOrExtendedAvailabilityPeriod(*userInfo) && userInfo->LabeledCounters) { ctx.Send(TabletActorId, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup())); } - UsersInfoStorage->SetImportant(*userInfo, consumer.GetImportant(), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs())); + UsersInfoStorage->SetImportant(*userInfo, IsImportant(consumer), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs())); continue; } if (!userInfo) { userInfo = &UsersInfoStorage->Create( - ctx, consumer.GetName(), 0, consumer.GetImportant(), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()), "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false + ctx, consumer.GetName(), 0, IsImportant(consumer), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()), "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false ); } if (userInfo->Offset < (i64)GetStartOffset()) diff --git a/ydb/core/persqueue/public/mlp/ut/common/common.cpp b/ydb/core/persqueue/public/mlp/ut/common/common.cpp index 05110abf4ddd..eaf564c60347 100644 --- a/ydb/core/persqueue/public/mlp/ut/common/common.cpp +++ b/ydb/core/persqueue/public/mlp/ut/common/common.cpp @@ -156,4 +156,10 @@ THolder GetConsumerState(std::sh return setup->GetRuntime().GrabEdgeEvent(); } +void ReloadPQTablet(std::shared_ptr& setup, const TString& database, const TString& topic, ui32 partitionId) { + auto& runtime = setup->GetRuntime(); + auto tabletId = GetTabletId(setup, database, topic, partitionId); + ForwardToTablet(runtime, tabletId, runtime.AllocateEdgeActor(), new TEvents::TEvPoison()); +} + } diff --git a/ydb/core/persqueue/public/mlp/ut/common/common.h b/ydb/core/persqueue/public/mlp/ut/common/common.h index d34e814a91e8..bb05dcad8176 100644 --- a/ydb/core/persqueue/public/mlp/ut/common/common.h +++ b/ydb/core/persqueue/public/mlp/ut/common/common.h @@ -40,5 +40,6 @@ void WriteMany(std::shared_ptr setup, const std::string& top ui64 GetTabletId(std::shared_ptr& setup, const TString& database, const TString& topic, ui32 partitionId = 0); THolder GetConsumerState(std::shared_ptr& setup, const TString& database, const TString& topic, const TString& consumer, ui32 partitionId = 0); +void ReloadPQTablet(std::shared_ptr& setup, const TString& database, const TString& topic, ui32 partitionId = 0); }