Skip to content

Commit 2405628

Browse files
Aggregate blob compaction counters (#27629) (#27757)
2 parents c663e54 + 81f7c26 commit 2405628

File tree

11 files changed

+250
-176
lines changed

11 files changed

+250
-176
lines changed

ydb/core/persqueue/pqrb/read_balancer.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,9 +617,12 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
617617
THolder<TConsumerLabeledCounters> labeledConsumerCounters;
618618
using TPartitionKeyCompactionCounters = TProtobufTabletLabeledCounters<EPartitionKeyCompactionLabeledCounters_descriptor>;
619619
THolder<TPartitionKeyCompactionCounters> compactionCounters;
620+
using TPartitionExtendedLabeledCounters = TProtobufTabletLabeledCounters<EPartitionExtendedLabeledCounters_descriptor>;
621+
THolder<TPartitionExtendedLabeledCounters> extendedLabeledCounters;
620622
labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
621623
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));
622624
compactionCounters.Reset(new TPartitionKeyCompactionCounters("topic", 0, DatabasePath));
625+
extendedLabeledCounters.Reset(new TPartitionExtendedLabeledCounters("topic", 0, DatabasePath));
623626

624627
if (AggregatedCounters.empty()) {
625628
for (ui32 i = 0; i < labeledCounters->GetCounters().Size(); ++i) {
@@ -630,6 +633,13 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
630633
AggregatedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
631634
}
632635
}
636+
if (AggregatedExtendedCounters.empty()) {
637+
for (ui32 i = 0; i < extendedLabeledCounters->GetCounters().Size(); ++i) {
638+
TString name = extendedLabeledCounters->GetNames()[i];
639+
AggregatedExtendedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
640+
}
641+
}
642+
633643
if (TabletConfig.GetEnableCompactification()) {
634644
if (AggregatedCompactionCounters.empty()) {
635645
for (ui32 i = 0; i < compactionCounters->GetCounters().Size(); ++i) {
@@ -660,6 +670,7 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
660670
ui64 milliSeconds = TAppData::TimeProvider->Now().MilliSeconds();
661671

662672
THolder<TTabletLabeledCountersBase> aggr(new TTabletLabeledCountersBase);
673+
THolder<TTabletLabeledCountersBase> aggrExtended(new TTabletLabeledCountersBase);
663674
THolder<TTabletLabeledCountersBase> compactionAggr(new TTabletLabeledCountersBase);
664675

665676
for (auto it = AggregatedStats.Stats.begin(); it != AggregatedStats.Stats.end(); ++it) {
@@ -670,6 +681,13 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
670681
}
671682
aggr->AggregateWith(*labeledCounters);
672683

684+
for (ui32 i = 0; i < it->second.Counters.GetExtendedCounters().ValuesSize()
685+
&& i < extendedLabeledCounters->GetCounters().Size(); ++i
686+
) {
687+
extendedLabeledCounters->GetCounters()[i] = it->second.Counters.GetExtendedCounters().GetValues(i);
688+
}
689+
aggrExtended->AggregateWith(*extendedLabeledCounters);
690+
673691
if (TabletConfig.GetEnableCompactification()) {
674692
for (ui32 i = 0; i < it->second.Counters.GetCompactionCounters().ValuesSize() && i < compactionCounters->GetCounters().Size(); ++i) {
675693
compactionCounters->GetCounters()[i] = it->second.Counters.GetCompactionCounters().GetValues(i);
@@ -700,6 +718,17 @@ void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
700718
}
701719
AggregatedCounters[i]->Set(val);
702720
}
721+
for (ui32 i = 0; aggrExtended->HasCounters() && i < aggrExtended->GetCounters().Size(); ++i) {
722+
if (!AggregatedExtendedCounters[i])
723+
continue;
724+
const auto& type = aggrExtended->GetCounterType(i);
725+
auto val = aggrExtended->GetCounters()[i].Get();
726+
if (type == TLabeledCounterOptions::CT_TIMELAG) {
727+
val = val <= milliSeconds ? milliSeconds - val : 0;
728+
}
729+
AggregatedExtendedCounters[i]->Set(val);
730+
}
731+
703732

704733
for (ui32 i = 0; i < compactionAggr->GetCounters().Size() && i < AggregatedCompactionCounters.size(); ++i) {
705734
if (!AggregatedCompactionCounters[i]) {

ydb/core/persqueue/pqrb/read_balancer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>,
205205
std::unordered_set<ui64> PipesRequested;
206206

207207
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters;
208+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedExtendedCounters;
208209
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCompactionCounters;
209210

210211
NMonitoring::TDynamicCounterPtr DynamicCounters;

ydb/core/persqueue/pqtablet/partition/partition.cpp

Lines changed: 85 additions & 151 deletions
Large diffs are not rendered by default.

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ static const ui32 MAX_USER_ACTS = 1000;
3434
static const ui32 BATCH_UNPACK_SIZE_BORDER = 500_KB;
3535
static const ui32 MAX_INLINE_SIZE = 1000;
3636

37-
using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
37+
using TPartitionLabeledCounters =
38+
TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
39+
using TPartitionExtendedLabeledCounters = TProtobufTabletLabeledCounters<EPartitionExtendedLabeledCounters_descriptor>;
3840

3941
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset);
4042
TMaybe<ui64> GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp);
@@ -921,6 +923,7 @@ class TPartition : public TBaseActor<TPartition> {
921923

922924
TTabletCountersBase TabletCounters;
923925
THolder<TPartitionLabeledCounters> PartitionCountersLabeled;
926+
THolder<TPartitionExtendedLabeledCounters> PartitionCountersExtended;
924927

925928
THolder<TPartitionKeyCompactionCounters> PartitionCompactionCounters;
926929

@@ -1138,7 +1141,6 @@ class TPartition : public TBaseActor<TPartition> {
11381141
size_t GetBodyKeysCountLimit() const;
11391142
ui64 GetCumulativeSizeLimit() const;
11401143

1141-
void UpdateCompactionCounters();
11421144
bool ThereIsUncompactedData() const;
11431145
TInstant GetFirstUncompactedBlobTimestamp() const;
11441146

ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -758,28 +758,6 @@ bool TPartition::ThereIsUncompactedData() const
758758
(BlobEncoder.BodySize >= GetCumulativeSizeLimit());
759759
}
760760

761-
void TPartition::UpdateCompactionCounters()
762-
{
763-
if (!InitDone) {
764-
return;
765-
}
766-
767-
const auto& ctx = ActorContext();
768-
769-
if (ThereIsUncompactedData()) {
770-
auto now = ctx.Now();
771-
auto begin = GetFirstUncompactedBlobTimestamp();
772-
773-
CompactionTimeLag.Set((now - begin).MilliSeconds());
774-
775-
} else {
776-
CompactionTimeLag.Set(0);
777-
}
778-
779-
CompactionUnprocessedBytes.Set(BlobEncoder.BodySize);
780-
CompactionUnprocessedCount.Set(BlobEncoder.DataKeysBody.size());
781-
}
782-
783761
TInstant TPartition::GetFirstUncompactedBlobTimestamp() const
784762
{
785763
const auto& ctx = ActorContext();

ydb/core/persqueue/pqtablet/partition/partition_init.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,8 +1060,14 @@ void TPartition::Initialize(const TActorContext& ctx) {
10601060
PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(EscapeBadChars(TopicName()),
10611061
Partition.InternalPartitionId,
10621062
Config.GetYdbDatabasePath()));
1063+
1064+
PartitionCountersExtended.Reset(new TPartitionExtendedLabeledCounters(EscapeBadChars(TopicName()),
1065+
Partition.InternalPartitionId,
1066+
Config.GetYdbDatabasePath()));
10631067
} else {
10641068
PartitionCountersLabeled.Reset(new TPartitionLabeledCounters(TopicName(), Partition.InternalPartitionId));
1069+
PartitionCountersExtended.Reset(new TPartitionExtendedLabeledCounters(TopicName(),
1070+
Partition.InternalPartitionId));
10651071
}
10661072
}
10671073

ydb/core/persqueue/ut/counters_ut.cpp

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ Y_UNIT_TEST(PartitionKeyCompaction) {
718718
tc.Prepare(dispatchName, setup, activeZone, true, true, true);
719719

720720
tc.Runtime->GetAppData(0).FeatureFlags.SetEnableTopicCompactificationByKey(true);
721-
tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(1);
721+
tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(10_MB);
722722
tc.Runtime->SetScheduledLimit(10000);
723723

724724
tc.Runtime->SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
@@ -843,6 +843,110 @@ Y_UNIT_TEST(PartitionKeyCompaction) {
843843
});
844844
}
845845

846+
847+
Y_UNIT_TEST(PartitionBlobCompactionCounters) {
848+
SetEnv("FAST_UT", "1");
849+
TTestContext tc;
850+
RunTestWithReboots(tc.TabletIds, [&]() { return tc.InitialEventsFilter.Prepare(); }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
851+
TFinalizer finalizer(tc);
852+
activeZone = false;
853+
bool dbRegistered = false;
854+
855+
tc.EnableDetailedPQLog = true;
856+
tc.Prepare(dispatchName, setup, activeZone, true, true, true);
857+
858+
tc.Runtime->GetAppData(0).PQConfig.MutableCompactionConfig()->SetBlobsSize(10_MB);
859+
tc.Runtime->SetScheduledLimit(10000);
860+
861+
tc.Runtime->SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
862+
if (event->GetTypeRewrite() == NSysView::TEvSysView::EvRegisterDbCounters) {
863+
auto database = event.Get()->Get<NSysView::TEvSysView::TEvRegisterDbCounters>()->Database;
864+
UNIT_ASSERT_VALUES_EQUAL(database, "/Root/PQ");
865+
dbRegistered = true;
866+
} else if (event->GetTypeRewrite() == TEvPQ::EEv::EvRunCompaction) {
867+
Cerr << "===Dropped TEvRunCompaction with blobs count: " << event->Get<TEvPQ::TEvRunCompaction>()->BlobsCount << Endl;
868+
return TTestActorRuntime::EEventAction::DROP;
869+
}
870+
return TTestActorRuntime::DefaultObserverFunc(event);
871+
});
872+
PQTabletPrepare({.deleteTime = 3600, .writeSpeed = 2_MB, .enableCompactificationByKey = false}, {}, tc);
873+
874+
TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
875+
ui64 ssId = 325;
876+
BootFakeSchemeShard(*tc.Runtime, ssId, state);
877+
878+
auto balancerParams = TBalancerParams::FromContext("topic", {{0, {tc.TabletId, 1}}}, ssId, tc);
879+
balancerParams.EnableKeyCompaction = false;
880+
PQBalancerPrepare(balancerParams);
881+
882+
IActor* actor = CreateTabletCountersAggregator(false);
883+
auto aggregatorId = tc.Runtime->Register(actor);
884+
tc.Runtime->EnableScheduleForActor(aggregatorId);
885+
TString s{5_MB, 'c'};
886+
ui64 currentOffset = 0;
887+
auto writeData = [&](ui32 count) {
888+
TVector<std::pair<ui64, TString>> data;
889+
for (auto i = 0u; i < count; ++i) {
890+
data.push_back({i + 1, s});
891+
}
892+
CmdWrite(0, "sourceid0", std::move(data), tc, false, {}, false, "", -1, currentOffset, false, false, true);
893+
currentOffset += count;
894+
};
895+
writeData(3);
896+
writeData(3);
897+
898+
{
899+
NSchemeCache::TDescribeResult::TPtr result = new NSchemeCache::TDescribeResult{};
900+
result->SetPath("/Root");
901+
TVector<TString> attrs = {"folder_id", "cloud_id", "database_id"};
902+
for (auto& attr : attrs) {
903+
auto ua = result->MutablePathDescription()->AddUserAttributes();
904+
ua->SetKey(attr);
905+
ua->SetValue(attr);
906+
}
907+
NSchemeCache::TDescribeResult::TCPtr cres = result;
908+
auto event = MakeHolder<TEvTxProxySchemeCache::TEvWatchNotifyUpdated>(0, "/Root", TPathId{}, cres);
909+
TActorId pipeClient = tc.Runtime->ConnectToPipe(tc.BalancerTabletId, tc.Edge, 0, GetPipeConfigWithRetries());
910+
tc.Runtime->SendToPipe(tc.BalancerTabletId, tc.Edge, event.Release(), 0, GetPipeConfigWithRetries(), pipeClient);
911+
912+
TDispatchOptions options;
913+
options.FinalEvents.emplace_back(TEvTxProxySchemeCache::EvWatchNotifyUpdated);
914+
options.FinalEvents.emplace_back(TEvPQ::EEv::EvRunCompaction);
915+
auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
916+
UNIT_ASSERT(processedCountersEvent);
917+
}
918+
{
919+
TDispatchOptions options;
920+
options.FinalEvents.emplace_back(TEvPersQueue::EvPeriodicTopicStats);
921+
auto processedCountersEvent = tc.Runtime->DispatchEvents(options);
922+
UNIT_ASSERT_VALUES_EQUAL(processedCountersEvent, true);
923+
}
924+
925+
auto counters = tc.Runtime->GetAppData(0).Counters;
926+
{
927+
auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);
928+
auto group = dbGroup->GetSubgroup("host", "")
929+
->GetSubgroup("database", "/Root")
930+
->GetSubgroup("cloud_id", "cloud_id")
931+
->GetSubgroup("folder_id", "folder_id")
932+
->GetSubgroup("database_id", "database_id")
933+
->GetSubgroup("topic", "topic");
934+
935+
936+
TStringStream countersStr;
937+
dbGroup->OutputHtml(countersStr);
938+
Cerr << "COUNTERS: " << countersStr.Str() << "\n";
939+
940+
UNIT_ASSERT_VALUES_EQUAL(
941+
group->FindNamedCounter("name", "topic.partition.blobs.uncompacted_count_max")->Val(), 4);
942+
UNIT_ASSERT_VALUES_EQUAL(
943+
group->FindNamedCounter("name", "topic.partition.blobs.uncompacted_bytes_max")->Val(), 31461348);
944+
UNIT_ASSERT_GE(group->FindNamedCounter("name", "topic.partition.blobs.compaction_lag_milliseconds_max")->Val(), 1000);
945+
}
946+
});
947+
}
948+
949+
846950
Y_UNIT_TEST(NewConsumersCountersAppear) {
847951
TTestContext tc;
848952
tc.InitialEventsFilter.Prepare();

ydb/core/persqueue/ut/resources/counters_topics.html

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
topic=topic:
1313
name=topic.partition.active_count: 1
1414
name=topic.partition.alive_count: 1
15+
name=topic.partition.blobs.compaction_lag_milliseconds_max: 0
16+
name=topic.partition.blobs.uncompacted_bytes_max: 747
17+
name=topic.partition.blobs.uncompacted_count_max: 3
1518
name=topic.partition.inactive_count: 0
1619
name=topic.partition.init_duration_milliseconds_max: 0
1720
name=topic.partition.producers_count_max: 3

ydb/core/persqueue/ut/resources/counters_topics_extended.html

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
name=topic.keys.uncompacted_bytes: 8315589
1515
name=topic.partition.active_count: 1
1616
name=topic.partition.alive_count: 1
17+
name=topic.partition.blobs.compaction_lag_milliseconds_max: 0
18+
name=topic.partition.blobs.uncompacted_bytes_max: 0
19+
name=topic.partition.blobs.uncompacted_count_max: 0
1720
name=topic.partition.inactive_count: 0
1821
name=topic.partition.init_duration_milliseconds_max: 0
1922
name=topic.partition.keys.compacted_bytes_max: 34531409

ydb/core/protos/counters_pq.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,12 @@ enum EPartitionKeyCompactionLabeledCounters {
257257
METRIC_CURR_CYCLE_DURATION = 6 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.keys.time_since_compaction_iteration_start_milliseconds_max"}];
258258
METRIC_CURR_READ_CYCLE_KEYS = 7 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.keys.compaction_iteration_keys_count_max"}];
259259
}
260+
261+
enum EPartitionExtendedLabeledCounters {
262+
option (GlobalGroupNamesOpts) = {
263+
Names: "topic"
264+
};
265+
METRIC_BLOB_UNCOMPACTED_COUNT_MAX = 0 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.blobs.uncompacted_count_max"}];
266+
METRIC_BLOB_UNCOMPACTED_SIZE_MAX = 1 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.blobs.uncompacted_bytes_max"}];
267+
METRIC_BLOB_UNCOMPACTED_LAG_MAX = 2 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.blobs.compaction_lag_milliseconds_max"}];
268+
};

0 commit comments

Comments
 (0)