Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 44 additions & 46 deletions ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,67 +794,64 @@ std::pair<TVector<TOLAPPredicateNode>, TVector<TOLAPPredicateNode>> SplitForPart
return {pushable, remaining};
}

bool IsSuitableToCollectProjection(TExprBase node) {
// Currently support only `JsonDocument`.
if (auto maybeJsonValue = node.Maybe<TCoJsonValue>()) {
auto jsonMember = maybeJsonValue.Cast().Json().Maybe<TCoMember>();
auto jsonPath = maybeJsonValue.Cast().JsonPath().Maybe<TCoUtf8>();
return jsonMember && jsonPath;
}
return false;
TExprNode::TPtr IsSuitableToCollectProjection(TExprNode::TPtr node) {
// Currently support only `JsonValue`.
auto jsonValuePred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode<TCoJsonValue>(node); };
if (auto jsonValues = FindNodes(node, jsonValuePred); jsonValues.size() == 1) {
auto jsonValue = TExprBase(jsonValues.front()).Cast<TCoJsonValue>();
return jsonValue.Json().Maybe<TCoMember>() && jsonValue.JsonPath().Maybe<TCoUtf8>() ? jsonValue.Ptr() : nullptr;
}
return nullptr;
}

// Collects all operations for projections and returns a vector of pair - [columName, olap operation].
TVector<std::pair<TString, TExprNode::TPtr>> CollectOlapOperationsForProjections(const TExprNode::TPtr& node, const TExprNode& arg,
TNodeOnNodeOwnedMap& replaces,
const THashSet<TString>& predicateMembers,
TExprContext& ctx) {
TVector<std::pair<TString, TExprNode::TPtr>> olapOperationsForProjections;
auto asStructPred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode<TCoAsStruct>(node); };
auto memberPred = [](const TExprNode::TPtr& node) { return !!TMaybeNode<TCoMember>(node); };
auto memberPred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode<TCoMember>(node); };
THashSet<TString> projectionMembers;
THashSet<TString> notSuitableToPushMembers;
ui32 nextMemberId = 0;

TVector<std::pair<TString, TExprNode::TPtr>> olapOperationsForProjections;
TVector<std::tuple<TString, TExprNode::TPtr, TExprNode::TPtr, TExprNode::TPtr>> projectionCandidates;
// Expressions for projections are placed in `AsStruct` callable.
if (auto asStruct = FindNode(node, asStructPred)) {
// Process each child for `AsStruct` callable.
for (auto child : TExprBase(asStruct).Cast<TCoAsStruct>()) {
bool memberCollected = false;
if (IsSuitableToCollectProjection(child.Item(1))) {
// Search for the `TCoMember` in expression, we need expression with only one `TCoMember`.
if (auto originalMembers = FindNodes(child.Item(1).Ptr(), memberPred); originalMembers.size() == 1) {
// Convert YQL op to OLAP op.
if (auto olapOperations = ConvertComparisonNode(TExprBase(child.Item(1)), arg, ctx, node->Pos(), false);
olapOperations.size() == 1) {
auto originalMember = TExprBase(originalMembers.front()).Cast<TCoMember>();
auto originalMemberName = TString(originalMember.Name());

if (!predicateMembers.contains(originalMemberName)) {
if (projectionMembers.contains(originalMemberName)) {
originalMemberName = "__kqp_olap_projection_" + originalMemberName + ToString(nextMemberId++);
} else {
projectionMembers.insert(originalMemberName);
}

// clang-format off
auto newMember = Build<TCoMember>(ctx, node->Pos())
.Struct(originalMember.Struct())
.Name<TCoAtom>()
.Value(originalMemberName)
.Build()
.Done();
// clang-format on

auto olapOperation = olapOperations.front();
// Replace full expression with only member.
replaces[child.Item(1).Raw()] = newMember.Ptr();
olapOperationsForProjections.emplace_back(TString(newMember.Name()), olapOperation.Ptr());
memberCollected = true;

YQL_CLOG(TRACE, ProviderKqp)
<< "[OLAP PROJECTION] Operation in olap dialect: " << KqpExprToPrettyString(olapOperation, ctx);
TExprNode::TPtr nodeToProcess = child.Item(1).Ptr();
if (auto projection = IsSuitableToCollectProjection(nodeToProcess)) {
if (auto olapOperations = ConvertComparisonNode(TExprBase(projection), arg, ctx, node->Pos(), false);
olapOperations.size() == 1) {

Y_ENSURE(TMaybeNode<TCoMember>(projection->ChildPtr(0)));
auto originalMember = TExprBase(projection->ChildPtr(0)).Cast<TCoMember>();
auto originalMemberName = TString(originalMember.Name());

if (!predicateMembers.contains(originalMemberName)) {
if (projectionMembers.contains(originalMemberName)) {
originalMemberName = "__kqp_olap_projection_" + originalMemberName + ToString(nextMemberId++);
} else {
projectionMembers.insert(originalMemberName);
}

// clang-format off
auto replace = Build<TCoMember>(ctx, node->Pos())
.Struct(originalMember.Struct())
.Name<TCoAtom>()
.Value(originalMemberName)
.Build()
.Done().Ptr();
// clang-format on

auto olapOperation = olapOperations.front();
projectionCandidates.push_back({TString(originalMemberName), projection, replace, olapOperation.Ptr()});
memberCollected = true;
YQL_CLOG(TRACE, ProviderKqp)
<< "[OLAP PROJECTION] Operation in olap dialect: " << KqpExprToPrettyString(olapOperation, ctx);
}
}
}
Expand All @@ -867,9 +864,10 @@ TVector<std::pair<TString, TExprNode::TPtr>> CollectOlapOperationsForProjections
}
}

for (const auto& [colName, expr] : olapOperationsForProjections) {
if (notSuitableToPushMembers.count(colName)) {
return {};
for (const auto& [colName, projection, replace, olapOperation] : projectionCandidates) {
if (!notSuitableToPushMembers.count(colName)) {
replaces[TExprBase(projection).Raw()] = replace;
olapOperationsForProjections.emplace_back(colName, olapOperation);
}
}

Expand Down
41 changes: 34 additions & 7 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
b Int32,
timestamp Timestamp,
jsonDoc JsonDocument,
jsonDoc1 JsonDocument,
primary key(a)
)
PARTITION BY HASH(a)
Expand All @@ -1780,12 +1781,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {


auto insertRes = session2.ExecuteQuery(R"(
INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc)
VALUES (1, 1, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a1", "b.c.d" : "b1", "c.d.e" : "c1"}'));
INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc)
VALUES (2, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a2", "b.c.d" : "b2", "c.d.e" : "c2"}'));
INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc)
VALUES (3, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"b.c.a" : "a3", "b.c.d" : "b3", "c.d.e" : "c3"}'));
INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc, jsonDoc1)
VALUES (1, 1, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a1", "b.c.d" : "b1", "c.d.e" : "c1"}'), JsonDocument('{"a" : "1.1", "b" : "1.2", "c" : "1.3"}'));
INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc, jsonDoc1)
VALUES (2, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a2", "b.c.d" : "b2", "c.d.e" : "c2"}'), JsonDocument('{"a" : "2.1", "b" : "2.2", "c" : "2.3"}'));
INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc, jsonDoc1)
VALUES (3, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"b.c.a" : "a3", "b.c.d" : "b3", "c.d.e" : "c3"}'), JsonDocument('{"x" : "3.1", "y" : "1.2", "z" : "1.3"}'));
)", NYdb::NQuery::TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT(insertRes.IsSuccess());

Expand Down Expand Up @@ -1830,6 +1831,29 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
SELECT a, JSON_VALUE(jsonDoc, "$.\"a.b.c\""), JSON_VALUE(jsonDoc, "$.\"b.c.d\""), JSON_VALUE(jsonDoc, "$.\"c.d.e\"")
FROM `/Root/foo`
ORDER BY a;
)",
R"(
PRAGMA Kikimr.OptEnableOlapPushdownProjections = "true";

SELECT CAST(JSON_VALUE(jsonDoc1, "$.\"a\"") as Double) as result
FROM `/Root/foo`
ORDER BY result;
)",
R"(
PRAGMA Kikimr.OptEnableOlapPushdownProjections = "true";

SELECT (JSON_VALUE(jsonDoc, "$.\"a.b.c\"") in ["a1", "a3", "a4"]) as col1, CAST(JSON_VALUE(jsonDoc1, "$.\"a\"") as Double) as col2
FROM `/Root/foo`
ORDER BY col2;
)",
R"(
PRAGMA Kikimr.OptEnableOlapPushdownProjections = "true";

SELECT (JSON_VALUE(jsonDoc, "$.\"a.b.c\"") in ["a1", "a3", "a4"]) as col1,
CAST(JSON_VALUE(jsonDoc1, "$.\"a\"") as Double) as col2,
CAST(JSON_VALUE(jsonDoc1, "$.\"b\"") as Double) as col3
FROM `/Root/foo`
ORDER BY col2;
)"
};

Expand All @@ -1838,7 +1862,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
R"([[[3000001u];["a1"];1u];[[3000001u];["a2"];1u]])",
R"([[[3000001u];#;1u];[[3000001u];["a1"];1u];[[3000001u];["a2"];1u]])",
R"([[[3000001u];["a1"];1u];[[3000001u];["a2"];1u]])",
R"([[1;["a1"];["b1"];["c1"]];[2;["a2"];["b2"];["c2"]];[3;#;["b3"];["c3"]]])"
R"([[1;["a1"];["b1"];["c1"]];[2;["a2"];["b2"];["c2"]];[3;#;["b3"];["c3"]]])",
R"([[#];[[1.1]];[[2.1]]])",
R"([[#;#];[[%true];[1.1]];[[%false];[2.1]]])",
R"([[#;#;#];[[%true];[1.1];[1.2]];[[%false];[2.1];[2.2]]])"
};

for (ui32 i = 0; i < queries.size(); ++i) {
Expand Down
Loading