@@ -22,6 +22,78 @@ bool isCompoundAggregate(const AggExpr* agg) {
2222 return agg->aggType () == AggType::kStdDevSamp || agg->aggType () == AggType::kCorr ;
2323}
2424
25+ void removeFromValidSortUsers (Node* node, std::map<NodePtr, std::set<Node*>>& users) {
26+ for (size_t input_idx = 0 ; input_idx < node->inputCount (); ++input_idx) {
27+ auto input = node->getAndOwnInput (input_idx);
28+ if (input->is <Sort>()) {
29+ users.at (input).erase (node);
30+ } else if (!input->is <Aggregate>()) {
31+ removeFromValidSortUsers (input.get (), users);
32+ }
33+ }
34+ }
35+
36+ /* *
37+ * Find sort nodes whose result's order is going to be ignored later. It happens if the
38+ * result is later aggregated or sorted again.
39+ */
40+ void dropDeadSorts (QueryDag& dag) {
41+ auto nodes = dag.getNodes ();
42+ std::list<NodePtr> node_list (nodes.begin (), nodes.end ());
43+ std::map<NodePtr, std::list<NodePtr>::iterator> sorts;
44+ std::map<NodePtr, std::set<Node*>> valid_sort_users;
45+ std::map<NodePtr, std::set<Node*>> all_sort_users;
46+
47+ // Collect sort nodes and their users.
48+ for (auto node_itr = node_list.begin (); node_itr != node_list.end (); ++node_itr) {
49+ const auto node = *node_itr;
50+
51+ // Store positions of all sort nodes to be able to remove them later from the nodes
52+ // list.
53+ if (node->is <Sort>()) {
54+ sorts[node] = node_itr;
55+ // Root node is always considered to have a valid user. We also cannot drop nodes
56+ // with limit and/or offset specified.
57+ if (dag.getRootNode () == node.get () || node->as <Sort>()->getLimit () ||
58+ node->as <Sort>()->getOffset ()) {
59+ valid_sort_users[node].insert (nullptr );
60+ }
61+ }
62+
63+ for (size_t input_idx = 0 ; input_idx < node->inputCount (); ++input_idx) {
64+ auto input = node->getAndOwnInput (input_idx);
65+ if (input->is <Sort>()) {
66+ all_sort_users[input].insert (node.get ());
67+ valid_sort_users[input].insert (node.get ());
68+ }
69+ }
70+ }
71+
72+ // Find sort and aggregate nodes and remove their inputs from valid sort users.
73+ for (auto node : node_list) {
74+ if (node->is <Aggregate>() || node->is <Sort>()) {
75+ removeFromValidSortUsers (node.get (), valid_sort_users);
76+ }
77+ }
78+
79+ // Remove sorts with no valid users.
80+ for (auto & pr : valid_sort_users) {
81+ if (pr.second .empty ()) {
82+ auto sort = pr.first ;
83+ for (auto user : all_sort_users.at (sort)) {
84+ user->replaceInput (sort, sort->getAndOwnInput (0 ));
85+ }
86+ node_list.erase (sorts.at (sort));
87+ }
88+ }
89+
90+ // Any applied transformation always decreases the number of nodes.
91+ if (node_list.size () != nodes.size ()) {
92+ nodes.assign (node_list.begin (), node_list.end ());
93+ dag.setNodes (std::move (nodes));
94+ }
95+ }
96+
2597/* *
2698 * Base class holding interface for compound aggregate expansion.
2799 * Compound aggregate is expanded in three steps.
@@ -470,6 +542,7 @@ void addWindowFunctionPreProject(
470542} // namespace
471543
472544void canonicalizeQuery (QueryDag& dag) {
545+ dropDeadSorts (dag);
473546 expandCompoundAggregates (dag);
474547 addWindowFunctionPreProject (dag);
475548}
0 commit comments