Skip to content

Commit f29a32a

Browse files
authored
Fix potential segfault on query termination and add metric for # of queries per pool (#28293)
1 parent 7ed7baf commit f29a32a

File tree

4 files changed

+47
-44
lines changed

4 files changed

+47
-44
lines changed

ydb/core/kqp/runtime/scheduler/kqp_compute_scheduler_ut.cpp

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010

1111
namespace NKikimr::NKqp::NScheduler {
1212

13-
Y_UNIT_TEST_SUITE(TKqpScheduler) {
14-
1513
namespace {
1614
// hardcoded from ydb/core/protos/table_service_config.proto
1715
constexpr TDelayParams kDefaultDelayParams{
@@ -35,10 +33,13 @@ namespace {
3533
return tasks;
3634
}
3735

38-
void UpdateDemand(std::vector<TSchedulableTaskPtr>& tasks, ui64 demand) {
36+
void ShrinkDemand(std::vector<TSchedulableTaskPtr>& tasks, ui64 demand) {
37+
Y_ENSURE(demand * 2 < tasks.size());
3938
tasks.resize(2 * demand);
4039
}
41-
}
40+
} // namespace
41+
42+
Y_UNIT_TEST_SUITE(TKqpScheduler) {
4243

4344
Y_UNIT_TEST(SingleDatabasePoolQueryStructure) {
4445
/*
@@ -713,40 +714,29 @@ namespace {
713714
tasks.emplace_back(CreateDemandTasks(query, kQueryDemand));
714715
}
715716

716-
scheduler.UpdateFairShare();
717+
auto CheckFairShare = [&](const std::vector<ui64>& expectedFairShare) {
718+
scheduler.UpdateFairShare();
717719

718-
std::vector<ui64> fairShares = {5, 4, 1};
719-
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
720-
auto query = queries[queryId];
721-
auto querySnapshot = query->GetSnapshot();
722-
UNIT_ASSERT(querySnapshot);
723-
UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]);
724-
}
720+
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
721+
auto querySnapshot = queries.at(queryId)->GetSnapshot();
722+
UNIT_ASSERT(querySnapshot);
723+
UNIT_ASSERT_VALUES_EQUAL_C(querySnapshot->FairShare, expectedFairShare.at(queryId),
724+
"Wrong fair-share for query " << queryId);
725+
}
726+
};
727+
728+
CheckFairShare({5, 4, 1});
725729

730+
// Add one more query
726731
NHdrf::NDynamic::TQueryPtr new_query = queries.emplace_back(scheduler.AddOrUpdateQuery(databaseId, poolId, 4, {}));
727732
tasks.emplace_back(CreateDemandTasks(new_query, kQueryDemand));
728733

729-
scheduler.UpdateFairShare();
734+
CheckFairShare({5, 3, 1, 1});
730735

731-
// distribution in FIFO ordering
732-
fairShares = {5, 3, 1, 1};
733-
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
734-
auto query = queries[queryId];
735-
auto querySnapshot = query->GetSnapshot();
736-
UNIT_ASSERT(querySnapshot);
737-
UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]);
738-
}
736+
// Shrink demand of the first query
737+
ShrinkDemand(tasks[0], 2);
739738

740-
UpdateDemand(tasks[0], 2);
741-
scheduler.UpdateFairShare();
742-
743-
fairShares = {2, 5, 2, 1};
744-
for (size_t queryId = 0; queryId < queries.size(); ++queryId) {
745-
auto query = queries[queryId];
746-
auto querySnapshot = query->GetSnapshot();
747-
UNIT_ASSERT(querySnapshot);
748-
UNIT_ASSERT_VALUES_EQUAL(querySnapshot->FairShare, fairShares[queryId]);
749-
}
739+
CheckFairShare({2, 5, 2, 1});
750740

751741
auto* poolSnapshot = queries[0]->GetSnapshot()->GetParent();
752742
UNIT_ASSERT(poolSnapshot);

ydb/core/kqp/runtime/scheduler/kqp_schedulable_task.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,11 @@ void TSchedulableTask::DecreaseUsage(const TDuration& burstUsage, bool forcedRes
7575
}
7676

7777
size_t TSchedulableTask::GetSpareUsage() const {
78-
if (const auto snapshot = Query->GetSnapshot()) {
79-
auto usage = Query->GetParent()->Usage.load();
78+
const auto snapshot = Query->GetSnapshot();
79+
const auto* parent = Query->GetParent();
80+
if (snapshot && parent) {
81+
// TODO: check this code when the pool removal will be implemented, since the `parent` may be gone.
82+
auto usage = parent->Usage.load();
8083
auto fairShare = snapshot->GetParent()->FairShare;
8184
return fairShare >= usage ? (fairShare - usage) : 0;
8285
}

ydb/core/kqp/runtime/scheduler/tree/common.h

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <util/generic/hash.h>
88

99
#include <optional>
10+
#include <set>
1011

1112
template <typename Base, typename Derived>
1213
concept CStaticallyDowncastable = requires(Base* b) {
@@ -76,21 +77,14 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
7677

7778
void AddChild(const TPtr& element) {
7879
Y_ENSURE(Y_LIKELY(element));
79-
Children.push_back(element);
80+
Y_ENSURE(Children.insert(element).second);
8081
element->Parent = this;
8182
}
8283

8384
void RemoveChild(const TPtr& element) {
8485
Y_ENSURE(Y_LIKELY(element));
85-
for (auto it = Children.begin(); it != Children.end(); ++it) {
86-
if (*it == element) {
87-
element->Parent = nullptr;
88-
Children.erase(it);
89-
return;
90-
}
91-
}
92-
93-
// TODO: throw exception that child not found.
86+
Y_ENSURE(Children.erase(element));
87+
element->Parent = nullptr;
9488
}
9589

9690
inline size_t ChildrenSize() const {
@@ -139,9 +133,20 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
139133
TTreeElementBase* Parent = nullptr;
140134

141135
private:
142-
std::vector<TPtr> Children;
136+
struct TCompareChildren {
137+
bool operator() (const TPtr& left, const TPtr& right) const;
138+
};
139+
140+
std::set<TPtr, TCompareChildren> Children;
143141
};
144142

143+
template <ETreeType T>
144+
bool TTreeElementBase<T>::TCompareChildren::operator() (
145+
const TTreeElementBase<T>::TPtr& left, const TTreeElementBase<T>::TPtr& right
146+
) const {
147+
return left->GetId() < right->GetId();
148+
}
149+
145150
template <ETreeType T>
146151
struct TQuery : public virtual TTreeElementBase<T> {
147152
using TBase = TTreeElementBase<T>;
@@ -171,6 +176,7 @@ namespace NKikimr::NKqp::NScheduler::NHdrf {
171176
NMonitoring::TDynamicCounters::TCounterPtr FairShare;
172177
NMonitoring::TDynamicCounters::TCounterPtr InFlight;
173178
NMonitoring::TDynamicCounters::TCounterPtr Waiting;
179+
NMonitoring::TDynamicCounters::TCounterPtr Queries;
174180
NMonitoring::TDynamicCounters::TCounterPtr Satisfaction;
175181
NMonitoring::TDynamicCounters::TCounterPtr AdjustedSatisfaction;
176182
NMonitoring::TDynamicCounters::TCounterPtr InFlightExtra;

ydb/core/kqp/runtime/scheduler/tree/dynamic.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ TPool::TPool(const TPoolId& id, const TIntrusivePtr<TKqpCounters>& counters, con
112112
Counters->Demand = group->GetCounter("Demand", false); // snapshot
113113
Counters->InFlight = group->GetCounter("InFlight", false);
114114
Counters->Waiting = group->GetCounter("Waiting", false);
115+
Counters->Queries = group->GetCounter("Queries", false);
115116
Counters->Usage = group->GetCounter("Usage", true);
116117
Counters->UsageResume = group->GetCounter("UsageResume", true);
117118
Counters->Throttle = group->GetCounter("Throttle", true);
@@ -143,6 +144,9 @@ NSnapshot::TPool* TPool::TakeSnapshot() {
143144
}
144145

145146
if (IsLeaf()) {
147+
if (Counters) {
148+
Counters->Queries->Set(ChildrenSize());
149+
}
146150
ForEachChild<TQuery>([&](TQuery* query, size_t) {
147151
newPool->AddQuery(NSnapshot::TQueryPtr(query->TakeSnapshot()));
148152
});

0 commit comments

Comments
 (0)