Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ void TConsumerActor::CommitIfNeeded() {
}

void TConsumerActor::UpdateStorageConfig() {
LOG_D("Update config: RetentionPeriod: " << (RetentionPeriod.has_value() ? RetentionPeriod->ToString() : "infinity")
<< " " << Config.ShortDebugString());

Storage->SetKeepMessageOrder(Config.GetKeepMessageOrder());
Storage->SetMaxMessageProcessingCount(Config.GetMaxProcessingAttempts());
Storage->SetRetentionPeriod(RetentionPeriod);
Expand Down Expand Up @@ -500,8 +503,6 @@ void TConsumerActor::ProcessEventQueue() {

ReadRequestsQueue = std::move(readRequestsQueue);

LOG_T("AfterQueueDump: " << Storage->DebugString());

Persist();
}

Expand All @@ -518,6 +519,8 @@ void TConsumerActor::Persist() {

Become(&TConsumerActor::StateWrite);

LOG_T("Dump befor persist: " << Storage->DebugString());

auto tryInlineChannel = [](auto& write) {
if (write->GetValue().size() < 1000) {
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
Expand Down
78 changes: 73 additions & 5 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp_consumer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ Y_UNIT_TEST(Reload) {

Cerr << ">>>>> BEGIN DESCRIBE" << Endl;

ui64 tabletId = GetTabletId(setup, "/Root", "/Root/topic1", 0);

Sleep(TDuration::Seconds(2));

Cerr << ">>>>> BEGIN READ" << Endl;
Expand Down Expand Up @@ -77,9 +75,8 @@ Y_UNIT_TEST(Reload) {
UNIT_ASSERT_VALUES_EQUAL(result->Status, Ydb::StatusIds::SUCCESS);
}

Cerr << ">>>>> BEGIN REBOOT " << tabletId << Endl;

ForwardToTablet(runtime, tabletId, runtime.AllocateEdgeActor(), new TEvents::TEvPoison());
Cerr << ">>>>> BEGIN REBOOT " << Endl;
ReloadPQTablet(setup, "/Root", "/Root/topic1", 0);

Sleep(TDuration::Seconds(2));

Expand Down Expand Up @@ -149,6 +146,77 @@ Y_UNIT_TEST(AlterConsumer) {
}
}

Y_UNIT_TEST(RetentionStorage) {
auto setup = CreateSetup();
auto& runtime = setup->GetRuntime();

auto driver = TDriver(setup->MakeDriverConfig());
auto client = TTopicClient(driver);

client.CreateTopic("/Root/topic1", NYdb::NTopic::TCreateTopicSettings()
.RetentionStorageMb(8)
.BeginAddSharedConsumer("mlp-consumer")
.KeepMessagesOrder(false)
.EndAddConsumer());

Sleep(TDuration::Seconds(1));

WriteMany(setup, "/Root/topic1", 0, 1_MB, 25);

Sleep(TDuration::Seconds(1));

{
// check that message with offset 0 wasn`t removed by retention
CreateReaderActor(runtime, TReaderSettings{
.DatabasePath = "/Root",
.TopicName = "/Root/topic1",
.Consumer = "mlp-consumer",
});
auto response = GetReadResponse(runtime);
UNIT_ASSERT_VALUES_EQUAL_C(response->Status, Ydb::StatusIds::SUCCESS, response->ErrorDescription);
UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0);
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0);
}
}

Y_UNIT_TEST(RetentionStorageAfterReload) {
auto setup = CreateSetup();
auto& runtime = setup->GetRuntime();

auto driver = TDriver(setup->MakeDriverConfig());
auto client = TTopicClient(driver);

client.CreateTopic("/Root/topic1", NYdb::NTopic::TCreateTopicSettings()
.RetentionStorageMb(8)
.BeginAddSharedConsumer("mlp-consumer")
.KeepMessagesOrder(false)
.EndAddConsumer());

Sleep(TDuration::Seconds(1));

WriteMany(setup, "/Root/topic1", 0, 1_MB, 25);

Cerr << ">>>>> BEGIN REBOOT " << Endl;
ReloadPQTablet(setup, "/Root", "/Root/topic1", 0);

Sleep(TDuration::Seconds(2));

{
// check that message with offset 0 wasn`t removed by retention
CreateReaderActor(runtime, TReaderSettings{
.DatabasePath = "/Root",
.TopicName = "/Root/topic1",
.Consumer = "mlp-consumer",
});
auto response = GetReadResponse(runtime);
UNIT_ASSERT_VALUES_EQUAL_C(response->Status, Ydb::StatusIds::SUCCESS, response->ErrorDescription);
UNIT_ASSERT_VALUES_EQUAL(response->Messages.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.PartitionId, 0);
UNIT_ASSERT_VALUES_EQUAL(response->Messages[0].MessageId.Offset, 0);
}
}

}

} // namespace NKikimr::NPQ::NMLP
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ void TMessageEnricherActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev) {
reply.Offsets.pop_front();
}
if (reply.Offsets.empty()) {
Send(reply.Sender, PendingResponse.release(), 0, reply.Cookie);
if (PendingResponse->Record.MessageSize() > 0) {
Send(reply.Sender, PendingResponse.release(), 0, reply.Cookie);
} else {
auto r = std::make_unique<TEvPQ::TEvMLPErrorResponse>(Ydb::StatusIds::INTERNAL_ERROR, "Messages was not found");
Send(reply.Sender, std::move(r), 0, reply.Cookie);
}
PendingResponse = std::make_unique<TEvPQ::TEvMLPReadResponse>();
Queue.pop_front();
continue;
Expand Down
33 changes: 21 additions & 12 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,33 @@ void TStorage::SetRetentionPeriod(std::optional<TDuration> retentionPeriod) {
RetentionPeriod = retentionPeriod;
}

std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
auto dieDelta = Max<ui64>();
std::optional<ui32> TStorage::GetRetentionDeadlineDelta() const {
if (RetentionPeriod) {
auto dieTime = TimeProvider->Now() - RetentionPeriod.value();
dieDelta = dieTime > BaseWriteTimestamp ? (dieTime - BaseWriteTimestamp).Seconds() : 0;
auto retentionDeadline = TrimToSeconds(TimeProvider->Now(), false) - RetentionPeriod.value();
if (retentionDeadline >= BaseWriteTimestamp) {
return (retentionDeadline - BaseWriteTimestamp).Seconds();
}
}

return std::nullopt;
}

std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
std::optional<ui64> retentionDeadlineDelta = GetRetentionDeadlineDelta();

if (!position.SlowPosition) {
position.SlowPosition = SlowMessages.begin();
}

auto retentionExpired = [&](const auto& message) {
return retentionDeadlineDelta && message.WriteTimestampDelta <= retentionDeadlineDelta.value();
};

for(; position.SlowPosition != SlowMessages.end(); ++position.SlowPosition.value()) {
auto offset = position.SlowPosition.value()->first;
auto& message = position.SlowPosition.value()->second;
if (message.Status == EMessageStatus::Unprocessed) {
if (message.WriteTimestampDelta < dieDelta) {
if (retentionExpired(message)) {
continue;
}

Expand All @@ -69,7 +80,7 @@ std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
for (size_t i = std::max(position.FastPosition, FirstUnlockedOffset) - FirstOffset; i < Messages.size(); ++i) {
auto& message = Messages[i];
if (message.Status == EMessageStatus::Unprocessed) {
if (message.WriteTimestampDelta < dieDelta) {
if (retentionExpired(message)) {
if (moveUnlockedOffset) {
++FirstUnlockedOffset;
}
Expand All @@ -86,7 +97,6 @@ std::optional<ui64> TStorage::Next(TInstant deadline, TPosition& position) {
}

ui64 offset = FirstOffset + i;

return DoLock(offset, message, deadline);
} else if (moveUnlockedOffset) {
++FirstUnlockedOffset;
Expand Down Expand Up @@ -157,18 +167,17 @@ size_t TStorage::Compact() {
size_t removed = 0;

// Remove messages by retention
if (RetentionPeriod && (TimeProvider->Now() - RetentionPeriod.value()) > BaseWriteTimestamp) {
auto dieDelta = (TimeProvider->Now() - RetentionPeriod.value() - BaseWriteTimestamp).Seconds();
auto dieProcessingDelta = dieDelta + 60;
if (auto retentionDeadlineDelta = GetRetentionDeadlineDelta(); retentionDeadlineDelta.has_value()) {
auto dieProcessingDelta = retentionDeadlineDelta.value() + 60;

auto canRemove = [&](auto& message) {
switch (message.Status) {
case EMessageStatus::Locked:
return message.DeadlineDelta < dieProcessingDelta;
return message.DeadlineDelta <= dieProcessingDelta;
case EMessageStatus::Unprocessed:
case EMessageStatus::Committed:
case EMessageStatus::DLQ:
return message.WriteTimestampDelta < dieDelta;
return message.WriteTimestampDelta <= retentionDeadlineDelta.value();
default:
return false;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class TStorage {

void RemoveMessage(const TMessage& message);

std::optional<ui32> GetRetentionDeadlineDelta() const;

private:
const TIntrusivePtr<ITimeProvider> TimeProvider;

Expand Down
32 changes: 28 additions & 4 deletions ydb/core/persqueue/pqtablet/partition/mlp/mlp_storage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,30 @@ Y_UNIT_TEST(NextWithWriteRetentionPeriod) {
UNIT_ASSERT_VALUES_EQUAL(metrics.DLQMessageCount, 0);
}

Y_UNIT_TEST(NextWithInfinityRetentionPeriod) {
auto timeProvider = TIntrusivePtr<MockTimeProvider>(new MockTimeProvider());

TStorage storage(timeProvider);
storage.SetRetentionPeriod(std::nullopt);

storage.AddMessage(3, true, 5, timeProvider->Now());

// skip message by retention
TStorage::TPosition position;
auto result = storage.Next(timeProvider->Now() + TDuration::Seconds(1), position);
UNIT_ASSERT(result.has_value());
UNIT_ASSERT_VALUES_EQUAL(*result, 3);

auto& metrics = storage.GetMetrics();
UNIT_ASSERT_VALUES_EQUAL(metrics.InflyMessageCount, 1);
UNIT_ASSERT_VALUES_EQUAL(metrics.UnprocessedMessageCount, 0);
UNIT_ASSERT_VALUES_EQUAL(metrics.LockedMessageCount, 1);
UNIT_ASSERT_VALUES_EQUAL(metrics.LockedMessageGroupCount, 0);
UNIT_ASSERT_VALUES_EQUAL(metrics.CommittedMessageCount, 0);
UNIT_ASSERT_VALUES_EQUAL(metrics.DeadlineExpiredMessageCount, 0);
UNIT_ASSERT_VALUES_EQUAL(metrics.DLQMessageCount, 0);
}

Y_UNIT_TEST(SkipLockedMessage) {
TStorage storage(CreateDefaultTimeProvider());
{
Expand Down Expand Up @@ -1415,7 +1439,7 @@ Y_UNIT_TEST(CompactStorage_ByRetention) {
storage.AddMessage(4, true, 7, timeProvider->Now() + TDuration::Seconds(11));
storage.AddMessage(5, true, 11, writeTimestamp);

timeProvider->Tick(TDuration::Seconds(13));
timeProvider->Tick(TDuration::Seconds(12));

auto result = storage.Compact();
Cerr << storage.DebugString() << Endl;
Expand Down Expand Up @@ -1789,7 +1813,7 @@ Y_UNIT_TEST(SlowZone_Retention_1message) {
utils.AddMessage(8);
utils.Begin();

utils.TimeProvider->Tick(TDuration::Seconds(3));
utils.TimeProvider->Tick(TDuration::Seconds(2));
utils.Storage.Compact();

utils.End();
Expand All @@ -1806,7 +1830,7 @@ Y_UNIT_TEST(SlowZone_Retention_2message) {
utils.AddMessage(8);
utils.Begin();

utils.TimeProvider->Tick(TDuration::Seconds(4));
utils.TimeProvider->Tick(TDuration::Seconds(3));
utils.Storage.Compact();

utils.End();
Expand All @@ -1823,7 +1847,7 @@ Y_UNIT_TEST(SlowZone_Retention_3message) {
utils.AddMessage(8);
utils.Begin();

utils.TimeProvider->Tick(TDuration::Seconds(5));
utils.TimeProvider->Tick(TDuration::Seconds(4));
utils.Storage.Compact();

utils.End();
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/persqueue/pqtablet/partition/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ bool TPartition::ImportantConsumersNeedToKeepCurrentKey(const TDataKey& currentK
return true;
}
}

return false;
}

Expand Down Expand Up @@ -3165,7 +3166,7 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
ts += TDuration::MilliSeconds(1);
}
userInfo.ReadFromTimestamp = ts;
userInfo.Important = consumer.GetImportant();
userInfo.Important = IsImportant(consumer);
userInfo.AvailabilityPeriod = TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs());

ui64 rrGen = consumer.GetGeneration();
Expand Down Expand Up @@ -4543,4 +4544,8 @@ void TPartition::ResetDetailedMetrics() {
MessagesWrittenPerPartition.Reset();
}

bool IsImportant(const NKikimrPQ::TPQTabletConfig::TConsumer& consumer) {
return consumer.GetImportant() || consumer.GetType() == NKikimrPQ::TPQTabletConfig::CONSUMER_TYPE_MLP;
}

} // namespace NKikimr::NPQ
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pqtablet/partition/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -1268,4 +1268,6 @@ inline ui64 TPartition::GetEndOffset() const {
return BlobEncoder.EndOffset;
}

bool IsImportant(const NKikimrPQ::TPQTabletConfig::TConsumer& consumer);

} // namespace NKikimr::NPQ
3 changes: 3 additions & 0 deletions ydb/core/persqueue/pqtablet/partition/partition_mlp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ void TPartition::InitializeMLPConsumers() {
}
if (consumer.HasAvailabilityPeriodMs()) {
return TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs());
} else if (Config.GetPartitionConfig().GetStorageLimitBytes() > 0) {
// retention by storage is not supported yet
return std::nullopt;
} else {
return TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds());
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/pqtablet/partition/partition_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void TPartition::Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr&
void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
TSet<TString> important;
for (const auto& consumer : Config.GetConsumers()) {
if (!consumer.GetImportant() && !(consumer.GetAvailabilityPeriodMs() > 0)) {
if (!IsImportant(consumer) && !(consumer.GetAvailabilityPeriodMs() > 0)) {
continue;
}

Expand All @@ -253,12 +253,12 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
if (!ImporantOrExtendedAvailabilityPeriod(*userInfo) && userInfo->LabeledCounters) {
ctx.Send(TabletActorId, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo->LabeledCounters->GetGroup()));
}
UsersInfoStorage->SetImportant(*userInfo, consumer.GetImportant(), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()));
UsersInfoStorage->SetImportant(*userInfo, IsImportant(consumer), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()));
continue;
}
if (!userInfo) {
userInfo = &UsersInfoStorage->Create(
ctx, consumer.GetName(), 0, consumer.GetImportant(), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()), "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false
ctx, consumer.GetName(), 0, IsImportant(consumer), TDuration::MilliSeconds(consumer.GetAvailabilityPeriodMs()), "", 0, 0, 0, 0, 0, TInstant::Zero(), {}, false
);
}
if (userInfo->Offset < (i64)GetStartOffset())
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/persqueue/public/mlp/ut/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,10 @@ THolder<NKikimr::TEvPQ::TEvGetMLPConsumerStateResponse> GetConsumerState(std::sh
return setup->GetRuntime().GrabEdgeEvent<NKikimr::TEvPQ::TEvGetMLPConsumerStateResponse>();
}

void ReloadPQTablet(std::shared_ptr<TTopicSdkTestSetup>& setup, const TString& database, const TString& topic, ui32 partitionId) {
auto& runtime = setup->GetRuntime();
auto tabletId = GetTabletId(setup, database, topic, partitionId);
ForwardToTablet(runtime, tabletId, runtime.AllocateEdgeActor(), new TEvents::TEvPoison());
}

}
1 change: 1 addition & 0 deletions ydb/core/persqueue/public/mlp/ut/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ void WriteMany(std::shared_ptr<TTopicSdkTestSetup> setup, const std::string& top
ui64 GetTabletId(std::shared_ptr<TTopicSdkTestSetup>& setup, const TString& database, const TString& topic, ui32 partitionId = 0);
THolder<NKikimr::TEvPQ::TEvGetMLPConsumerStateResponse> GetConsumerState(std::shared_ptr<TTopicSdkTestSetup>& setup,
const TString& database, const TString& topic, const TString& consumer, ui32 partitionId = 0);
void ReloadPQTablet(std::shared_ptr<TTopicSdkTestSetup>& setup, const TString& database, const TString& topic, ui32 partitionId = 0);

}
Loading