From 6ec26e20906be925be1d68a26b9afd6fd325e6d5 Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Sat, 8 Nov 2025 11:58:33 +0300 Subject: [PATCH] [Olap projection pushdown] Keep any expression for json_value in compute --- .../opt/physical/kqp_opt_phy_olap_filter.cpp | 90 +++++++++---------- ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 41 +++++++-- 2 files changed, 78 insertions(+), 53 deletions(-) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp index 3f19116102b7..cb5fde8179bb 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp @@ -794,14 +794,14 @@ std::pair, TVector> SplitForPart return {pushable, remaining}; } -bool IsSuitableToCollectProjection(TExprBase node) { - // Currently support only `JsonDocument`. - if (auto maybeJsonValue = node.Maybe()) { - auto jsonMember = maybeJsonValue.Cast().Json().Maybe(); - auto jsonPath = maybeJsonValue.Cast().JsonPath().Maybe(); - return jsonMember && jsonPath; - } - return false; +TExprNode::TPtr IsSuitableToCollectProjection(TExprNode::TPtr node) { + // Currently support only `JsonValue`. + auto jsonValuePred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode(node); }; + if (auto jsonValues = FindNodes(node, jsonValuePred); jsonValues.size() == 1) { + auto jsonValue = TExprBase(jsonValues.front()).Cast(); + return jsonValue.Json().Maybe() && jsonValue.JsonPath().Maybe() ? jsonValue.Ptr() : nullptr; + } + return nullptr; } // Collects all operations for projections and returns a vector of pair - [columName, olap operation]. @@ -809,52 +809,49 @@ TVector> CollectOlapOperationsForProjections TNodeOnNodeOwnedMap& replaces, const THashSet& predicateMembers, TExprContext& ctx) { + TVector> olapOperationsForProjections; auto asStructPred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode(node); }; - auto memberPred = [](const TExprNode::TPtr& node) { return !!TMaybeNode(node); }; + auto memberPred = [](const TExprNode::TPtr& node) -> bool { return !!TMaybeNode(node); }; THashSet projectionMembers; THashSet notSuitableToPushMembers; ui32 nextMemberId = 0; - TVector> olapOperationsForProjections; + TVector> projectionCandidates; // Expressions for projections are placed in `AsStruct` callable. if (auto asStruct = FindNode(node, asStructPred)) { // Process each child for `AsStruct` callable. for (auto child : TExprBase(asStruct).Cast()) { bool memberCollected = false; - if (IsSuitableToCollectProjection(child.Item(1))) { - // Search for the `TCoMember` in expression, we need expression with only one `TCoMember`. - if (auto originalMembers = FindNodes(child.Item(1).Ptr(), memberPred); originalMembers.size() == 1) { - // Convert YQL op to OLAP op. - if (auto olapOperations = ConvertComparisonNode(TExprBase(child.Item(1)), arg, ctx, node->Pos(), false); - olapOperations.size() == 1) { - auto originalMember = TExprBase(originalMembers.front()).Cast(); - auto originalMemberName = TString(originalMember.Name()); - - if (!predicateMembers.contains(originalMemberName)) { - if (projectionMembers.contains(originalMemberName)) { - originalMemberName = "__kqp_olap_projection_" + originalMemberName + ToString(nextMemberId++); - } else { - projectionMembers.insert(originalMemberName); - } - - // clang-format off - auto newMember = Build(ctx, node->Pos()) - .Struct(originalMember.Struct()) - .Name() - .Value(originalMemberName) - .Build() - .Done(); - // clang-format on - - auto olapOperation = olapOperations.front(); - // Replace full expression with only member. - replaces[child.Item(1).Raw()] = newMember.Ptr(); - olapOperationsForProjections.emplace_back(TString(newMember.Name()), olapOperation.Ptr()); - memberCollected = true; - - YQL_CLOG(TRACE, ProviderKqp) - << "[OLAP PROJECTION] Operation in olap dialect: " << KqpExprToPrettyString(olapOperation, ctx); + TExprNode::TPtr nodeToProcess = child.Item(1).Ptr(); + if (auto projection = IsSuitableToCollectProjection(nodeToProcess)) { + if (auto olapOperations = ConvertComparisonNode(TExprBase(projection), arg, ctx, node->Pos(), false); + olapOperations.size() == 1) { + + Y_ENSURE(TMaybeNode(projection->ChildPtr(0))); + auto originalMember = TExprBase(projection->ChildPtr(0)).Cast(); + auto originalMemberName = TString(originalMember.Name()); + + if (!predicateMembers.contains(originalMemberName)) { + if (projectionMembers.contains(originalMemberName)) { + originalMemberName = "__kqp_olap_projection_" + originalMemberName + ToString(nextMemberId++); + } else { + projectionMembers.insert(originalMemberName); } + + // clang-format off + auto replace = Build(ctx, node->Pos()) + .Struct(originalMember.Struct()) + .Name() + .Value(originalMemberName) + .Build() + .Done().Ptr(); + // clang-format on + + auto olapOperation = olapOperations.front(); + projectionCandidates.push_back({TString(originalMemberName), projection, replace, olapOperation.Ptr()}); + memberCollected = true; + YQL_CLOG(TRACE, ProviderKqp) + << "[OLAP PROJECTION] Operation in olap dialect: " << KqpExprToPrettyString(olapOperation, ctx); } } } @@ -867,9 +864,10 @@ TVector> CollectOlapOperationsForProjections } } - for (const auto& [colName, expr] : olapOperationsForProjections) { - if (notSuitableToPushMembers.count(colName)) { - return {}; + for (const auto& [colName, projection, replace, olapOperation] : projectionCandidates) { + if (!notSuitableToPushMembers.count(colName)) { + replaces[TExprBase(projection).Raw()] = replace; + olapOperationsForProjections.emplace_back(colName, olapOperation); } } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 4823355fc5ef..5824949707a0 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1771,6 +1771,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { b Int32, timestamp Timestamp, jsonDoc JsonDocument, + jsonDoc1 JsonDocument, primary key(a) ) PARTITION BY HASH(a) @@ -1780,12 +1781,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto insertRes = session2.ExecuteQuery(R"( - INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc) - VALUES (1, 1, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a1", "b.c.d" : "b1", "c.d.e" : "c1"}')); - INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc) - VALUES (2, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a2", "b.c.d" : "b2", "c.d.e" : "c2"}')); - INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc) - VALUES (3, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"b.c.a" : "a3", "b.c.d" : "b3", "c.d.e" : "c3"}')); + INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc, jsonDoc1) + VALUES (1, 1, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a1", "b.c.d" : "b1", "c.d.e" : "c1"}'), JsonDocument('{"a" : "1.1", "b" : "1.2", "c" : "1.3"}')); + INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc, jsonDoc1) + VALUES (2, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"a.b.c" : "a2", "b.c.d" : "b2", "c.d.e" : "c2"}'), JsonDocument('{"a" : "2.1", "b" : "2.2", "c" : "2.3"}')); + INSERT INTO `/Root/foo` (a, b, timestamp, jsonDoc, jsonDoc1) + VALUES (3, 11, Timestamp("1970-01-01T00:00:03.000001Z"), JsonDocument('{"b.c.a" : "a3", "b.c.d" : "b3", "c.d.e" : "c3"}'), JsonDocument('{"x" : "3.1", "y" : "1.2", "z" : "1.3"}')); )", NYdb::NQuery::TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT(insertRes.IsSuccess()); @@ -1830,6 +1831,29 @@ Y_UNIT_TEST_SUITE(KqpOlap) { SELECT a, JSON_VALUE(jsonDoc, "$.\"a.b.c\""), JSON_VALUE(jsonDoc, "$.\"b.c.d\""), JSON_VALUE(jsonDoc, "$.\"c.d.e\"") FROM `/Root/foo` ORDER BY a; + )", + R"( + PRAGMA Kikimr.OptEnableOlapPushdownProjections = "true"; + + SELECT CAST(JSON_VALUE(jsonDoc1, "$.\"a\"") as Double) as result + FROM `/Root/foo` + ORDER BY result; + )", + R"( + PRAGMA Kikimr.OptEnableOlapPushdownProjections = "true"; + + SELECT (JSON_VALUE(jsonDoc, "$.\"a.b.c\"") in ["a1", "a3", "a4"]) as col1, CAST(JSON_VALUE(jsonDoc1, "$.\"a\"") as Double) as col2 + FROM `/Root/foo` + ORDER BY col2; + )", + R"( + PRAGMA Kikimr.OptEnableOlapPushdownProjections = "true"; + + SELECT (JSON_VALUE(jsonDoc, "$.\"a.b.c\"") in ["a1", "a3", "a4"]) as col1, + CAST(JSON_VALUE(jsonDoc1, "$.\"a\"") as Double) as col2, + CAST(JSON_VALUE(jsonDoc1, "$.\"b\"") as Double) as col3 + FROM `/Root/foo` + ORDER BY col2; )" }; @@ -1838,7 +1862,10 @@ Y_UNIT_TEST_SUITE(KqpOlap) { R"([[[3000001u];["a1"];1u];[[3000001u];["a2"];1u]])", R"([[[3000001u];#;1u];[[3000001u];["a1"];1u];[[3000001u];["a2"];1u]])", R"([[[3000001u];["a1"];1u];[[3000001u];["a2"];1u]])", - R"([[1;["a1"];["b1"];["c1"]];[2;["a2"];["b2"];["c2"]];[3;#;["b3"];["c3"]]])" + R"([[1;["a1"];["b1"];["c1"]];[2;["a2"];["b2"];["c2"]];[3;#;["b3"];["c3"]]])", + R"([[#];[[1.1]];[[2.1]]])", + R"([[#;#];[[%true];[1.1]];[[%false];[2.1]]])", + R"([[#;#;#];[[%true];[1.1];[1.2]];[[%false];[2.1];[2.2]]])" }; for (ui32 i = 0; i < queries.size(); ++i) {