From 27affb4340c806967cdca72971c3f497d90a5541 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 25 Oct 2018 13:03:42 +0200 Subject: [PATCH 01/17] Add facebook folly TDigest --- src/mongo/db/pipeline/TDigest.cpp | 376 ++++++++++++++++++++++++++++++ src/mongo/db/pipeline/TDigest.h | 151 ++++++++++++ 2 files changed, 527 insertions(+) create mode 100644 src/mongo/db/pipeline/TDigest.cpp create mode 100644 src/mongo/db/pipeline/TDigest.h diff --git a/src/mongo/db/pipeline/TDigest.cpp b/src/mongo/db/pipeline/TDigest.cpp new file mode 100644 index 0000000000000..b9f3e10fa5e96 --- /dev/null +++ b/src/mongo/db/pipeline/TDigest.cpp @@ -0,0 +1,376 @@ +/* + * Copyright 2012-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +namespace folly { + +/* + * A good biased scaling function has the following properties: + * - The value of the function k(0, delta) = 0, and k(1, delta) = delta. + * This is a requirement for any t-digest function. + * - The limit of the derivative of the function dk/dq at 0 is inf, and at + * 1 is inf. This provides bias to improve accuracy at the tails. + * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy + * of upper and lower quantiles are equivalent. + * + * The scaling function used here is... + * k(q, d) = (IF q >= 0.5, d - d * sqrt(2 - 2q) / 2, d * sqrt(2q) / 2) + * + * k(0, d) = 0 + * k(1, d) = d + * + * dk/dq = (IF q >= 0.5, d / sqrt(2-2q), d / sqrt(2q)) + * limit q->1 dk/dq = inf + * limit q->0 dk/dq = inf + * + * When plotted, the derivative function is symmetric, centered at q=0.5. + * + * Note that FMA has been tested here, but benchmarks have not shown it to be a + * performance improvement. + */ + +/* + * q_to_k is unused but left here as a comment for completeness. + * double q_to_k(double q, double d) { + * if (q >= 0.5) { + * return d - d * std::sqrt(0.5 - 0.5 * q); + * } + * return d * std::sqrt(0.5 * q); + * } + */ + +static double k_to_q(double k, double d) { + double k_div_d = k / d; + if (k_div_d >= 0.5) { + double base = 1 - k_div_d; + return 1 - 2 * base * base; + } else { + return 2 * k_div_d * k_div_d; + } +} + +static double clamp(double v, double lo, double hi) { + if (v > hi) { + return hi; + } else if (v < lo) { + return lo; + } + return v; +} + +TDigest::TDigest( + std::vector centroids, + double sum, + double count, + double max_val, + double min_val, + size_t maxSize) + : maxSize_(maxSize), + sum_(sum), + count_(count), + max_(max_val), + min_(min_val) { + if (centroids.size() <= maxSize_) { + centroids_ = std::move(centroids); + } else { + // Number of centroids is greater than maxSize, we need to compress them + // When merging, resulting digest takes the maxSize of the first digest + auto sz = centroids.size(); + std::array digests{{ + TDigest(maxSize_), + TDigest(std::move(centroids), sum_, count_, max_, min_, sz), + }}; + *this = this->merge(digests); + } +} + +// Merge unsorted values by first sorting them. Use radix sort if +// possible. This implementation puts all additional memory in the +// heap, so that if called from fiber context we do not smash the +// stack. Otherwise it is very similar to boost::spreadsort. +TDigest TDigest::merge(Range unsortedValues) const { + auto n = unsortedValues.size(); + + // We require 256 buckets per byte level, plus one count array we can reuse. + std::unique_ptr buckets{new uint64_t[256 * 9]}; + // Allocate input and tmp array + std::unique_ptr tmp{new double[n * 2]}; + auto out = tmp.get() + n; + auto in = tmp.get(); + std::copy(unsortedValues.begin(), unsortedValues.end(), in); + + detail::double_radix_sort(n, buckets.get(), in, out); + DCHECK(std::is_sorted(in, in + n)); + + return merge(presorted, Range(in, in + n)); +} + +TDigest TDigest::merge(presorted_t, Range sortedValues) const { + if (sortedValues.empty()) { + return *this; + } + + TDigest result(maxSize_); + + result.count_ = count_ + sortedValues.size(); + + double maybeMin = *sortedValues.begin(); + double maybeMax = *(sortedValues.end() - 1); + if (count_ > 0) { + // We know that min_ and max_ are numbers + result.min_ = std::min(min_, maybeMin); + result.max_ = std::max(max_, maybeMax); + } else { + // We know that min_ and max_ are NaN. + result.min_ = maybeMin; + result.max_ = maybeMax; + } + + std::vector compressed; + compressed.reserve(maxSize_); + + double k_limit = 1; + double q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_; + + auto it_centroids = centroids_.begin(); + auto it_sortedValues = sortedValues.begin(); + + Centroid cur; + if (it_centroids != centroids_.end() && + it_centroids->mean() < *it_sortedValues) { + cur = *it_centroids++; + } else { + cur = Centroid(*it_sortedValues++, 1.0); + } + + double weightSoFar = cur.weight(); + + // Keep track of sums along the way to reduce expensive floating points + double sumsToMerge = 0; + double weightsToMerge = 0; + + while (it_centroids != centroids_.end() || + it_sortedValues != sortedValues.end()) { + Centroid next; + + if (it_centroids != centroids_.end() && + (it_sortedValues == sortedValues.end() || + it_centroids->mean() < *it_sortedValues)) { + next = *it_centroids++; + } else { + next = Centroid(*it_sortedValues++, 1.0); + } + + double nextSum = next.mean() * next.weight(); + weightSoFar += next.weight(); + + if (weightSoFar <= q_limit_times_count) { + sumsToMerge += nextSum; + weightsToMerge += next.weight(); + } else { + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + sumsToMerge = 0; + weightsToMerge = 0; + compressed.push_back(cur); + q_limit_times_count = k_to_q(k_limit++, maxSize_) * result.count_; + cur = next; + } + } + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + compressed.push_back(cur); + compressed.shrink_to_fit(); + + // Deal with floating point precision + std::sort(compressed.begin(), compressed.end()); + + result.centroids_ = std::move(compressed); + return result; +} + +TDigest TDigest::merge(Range digests) { + size_t nCentroids = 0; + for (auto it = digests.begin(); it != digests.end(); it++) { + nCentroids += it->centroids_.size(); + } + + if (nCentroids == 0) { + return TDigest(); + } + + std::vector centroids; + centroids.reserve(nCentroids); + + std::vector::iterator> starts; + starts.reserve(digests.size()); + + double count = 0; + + // We can safely use these limits to avoid isnan checks below because we know + // nCentroids > 0, so at least one TDigest has a min and max. + double min = std::numeric_limits::infinity(); + double max = -std::numeric_limits::infinity(); + + for (auto it = digests.begin(); it != digests.end(); it++) { + starts.push_back(centroids.end()); + double curCount = it->count(); + if (curCount > 0) { + DCHECK(!std::isnan(it->min_)); + DCHECK(!std::isnan(it->max_)); + min = std::min(min, it->min_); + max = std::max(max, it->max_); + count += curCount; + for (const auto& centroid : it->centroids_) { + centroids.push_back(centroid); + } + } + } + + for (size_t digestsPerBlock = 1; digestsPerBlock < starts.size(); + digestsPerBlock *= 2) { + // Each sorted block is digestPerBlock digests big. For each step, try to + // merge two blocks together. + for (size_t i = 0; i < starts.size(); i += (digestsPerBlock * 2)) { + // It is possible that this block is incomplete (less than digestsPerBlock + // big). In that case, the rest of the block is sorted and leave it alone + if (i + digestsPerBlock < starts.size()) { + auto first = starts[i]; + auto middle = starts[i + digestsPerBlock]; + + // It is possible that the next block is incomplete (less than + // digestsPerBlock big). In that case, merge to end. Otherwise, merge to + // the end of that block. + std::vector::iterator last = + (i + (digestsPerBlock * 2) < starts.size()) + ? *(starts.begin() + i + 2 * digestsPerBlock) + : centroids.end(); + std::inplace_merge(first, middle, last); + } + } + } + + DCHECK(std::is_sorted(centroids.begin(), centroids.end())); + + size_t maxSize = digests.begin()->maxSize_; + TDigest result(maxSize); + + std::vector compressed; + compressed.reserve(maxSize); + + double k_limit = 1; + double q_limit_times_count = k_to_q(k_limit, maxSize) * count; + + Centroid cur = centroids.front(); + double weightSoFar = cur.weight(); + double sumsToMerge = 0; + double weightsToMerge = 0; + for (auto it = centroids.begin() + 1; it != centroids.end(); ++it) { + weightSoFar += it->weight(); + if (weightSoFar <= q_limit_times_count) { + sumsToMerge += it->mean() * it->weight(); + weightsToMerge += it->weight(); + } else { + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + sumsToMerge = 0; + weightsToMerge = 0; + compressed.push_back(cur); + q_limit_times_count = k_to_q(k_limit++, maxSize) * count; + cur = *it; + } + } + result.sum_ += cur.add(sumsToMerge, weightsToMerge); + compressed.push_back(cur); + compressed.shrink_to_fit(); + + // Deal with floating point precision + std::sort(compressed.begin(), compressed.end()); + + result.count_ = count; + result.min_ = min; + result.max_ = max; + result.centroids_ = std::move(compressed); + return result; +} + +double TDigest::estimateQuantile(double q) const { + if (centroids_.empty()) { + return 0.0; + } + double rank = q * count_; + + size_t pos; + double t; + if (q > 0.5) { + if (q >= 1.0) { + return max_; + } + pos = 0; + t = count_; + for (auto rit = centroids_.rbegin(); rit != centroids_.rend(); ++rit) { + t -= rit->weight(); + if (rank >= t) { + pos = std::distance(rit, centroids_.rend()) - 1; + break; + } + } + } else { + if (q <= 0.0) { + return min_; + } + pos = centroids_.size() - 1; + t = 0; + for (auto it = centroids_.begin(); it != centroids_.end(); ++it) { + if (rank < t + it->weight()) { + pos = std::distance(centroids_.begin(), it); + break; + } + t += it->weight(); + } + } + + double delta = 0; + double min = min_; + double max = max_; + if (centroids_.size() > 1) { + if (pos == 0) { + delta = centroids_[pos + 1].mean() - centroids_[pos].mean(); + max = centroids_[pos + 1].mean(); + } else if (pos == centroids_.size() - 1) { + delta = centroids_[pos].mean() - centroids_[pos - 1].mean(); + min = centroids_[pos - 1].mean(); + } else { + delta = (centroids_[pos + 1].mean() - centroids_[pos - 1].mean()) / 2; + min = centroids_[pos - 1].mean(); + max = centroids_[pos + 1].mean(); + } + } + auto value = centroids_[pos].mean() + + ((rank - t) / centroids_[pos].weight() - 0.5) * delta; + return clamp(value, min, max); +} + +double TDigest::Centroid::add(double sum, double weight) { + sum += (mean_ * weight_); + weight_ += weight; + mean_ = sum / weight_; + return sum; +} + +} // namespace folly diff --git a/src/mongo/db/pipeline/TDigest.h b/src/mongo/db/pipeline/TDigest.h new file mode 100644 index 0000000000000..12d26513c3d83 --- /dev/null +++ b/src/mongo/db/pipeline/TDigest.h @@ -0,0 +1,151 @@ +/* + * Copyright 2012-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +namespace folly { + +/* + * TDigests are a biased quantile estimator designed to estimate the values of + * the quantiles of streaming data with high accuracy and low memory, + * particularly for quantiles at the tails (p0.1, p1, p99, p99.9). See + * https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf + * for an explanation of what the purpose of TDigests is, and how they work. + * + * There is a notable difference between the implementation here and the + * implementation in the paper. In the paper, the recommended scaling function + * for bucketing centroids is an arcsin function. The arcsin function provides + * high accuracy for low memory, but comes at a relatively high compute cost. + * A good choice algorithm has the following properties: + * - The value of the function k(0, delta) = 0, and k(1, delta) = delta. + * This is a requirement for any t-digest function. + * - The limit of the derivative of the function dk/dq at 0 is inf, and at + * 1 is inf. This provides bias to improve accuracy at the tails. + * - For any q <= 0.5, dk/dq(q) = dk/dq(1-q). This ensures that the accuracy + * of upper and lower quantiles are equivalent. + * As such, TDigest uses a sqrt function with these properties, which is faster + * than arcsin. There is a small, but relatively negligible impact to accuracy + * at the tail. In empirical tests, accuracy of the sqrt approach has been + * adequate. + */ +class TDigest { + public: + class Centroid { + public: + explicit Centroid(double mean = 0.0, double weight = 1.0) + : mean_(mean), weight_(weight) { + DCHECK_GT(weight, 0); + } + + inline double mean() const { + return mean_; + } + + inline double weight() const { + return weight_; + } + + /* + * Adds the sum/weight to this centroid, and returns the new sum. + */ + inline double add(double sum, double weight); + + inline bool operator<(const Centroid& other) const { + return mean() < other.mean(); + } + + private: + double mean_; + double weight_; + }; + + explicit TDigest(size_t maxSize = 100) + : maxSize_(maxSize), sum_(0.0), count_(0.0), max_(NAN), min_(NAN) {} + + explicit TDigest( + std::vector centroids, + double sum, + double count, + double max_val, + double min_val, + size_t maxSize = 100); + + /* + * Returns a new TDigest constructed with values merged from the current + * digest and the given sortedValues. + */ + TDigest merge(presorted_t, Range sortedValues) const; + TDigest merge(Range unsortedValues) const; + + /* + * Returns a new TDigest constructed with values merged from the given + * digests. + */ + static TDigest merge(Range digests); + + /* + * Estimates the value of the given quantile. + */ + double estimateQuantile(double q) const; + + double mean() const { + return count_ ? sum_ / count_ : 0; + } + + double sum() const { + return sum_; + } + + double count() const { + return count_; + } + + double min() const { + return min_; + } + + double max() const { + return max_; + } + + bool empty() const { + return centroids_.empty(); + } + + const std::vector& getCentroids() const { + return centroids_; + } + + size_t maxSize() const { + return maxSize_; + } + + private: + std::vector centroids_; + size_t maxSize_; + double sum_; + double count_; + double max_; + double min_; +}; + +} // namespace folly From b228ac7921843e2de93e71c95b9d61112b9aa0a6 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 25 Oct 2018 13:40:47 +0200 Subject: [PATCH 02/17] To remove Range from TDigest body --- src/mongo/db/pipeline/TDigest.cpp | 28 +++++++++++++++------------- src/mongo/db/pipeline/TDigest.h | 14 +++++++------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/mongo/db/pipeline/TDigest.cpp b/src/mongo/db/pipeline/TDigest.cpp index b9f3e10fa5e96..0731794732756 100644 --- a/src/mongo/db/pipeline/TDigest.cpp +++ b/src/mongo/db/pipeline/TDigest.cpp @@ -14,13 +14,15 @@ * limitations under the License. */ -#include -#include +#include "TDigest.h" +#include "DoubleRadixSort.h" #include #include +#include +#include -namespace folly { +namespace mongo { /* * A good biased scaling function has the following properties: @@ -94,7 +96,7 @@ TDigest::TDigest( // Number of centroids is greater than maxSize, we need to compress them // When merging, resulting digest takes the maxSize of the first digest auto sz = centroids.size(); - std::array digests{{ + std::vector digests{{ TDigest(maxSize_), TDigest(std::move(centroids), sum_, count_, max_, min_, sz), }}; @@ -106,7 +108,7 @@ TDigest::TDigest( // possible. This implementation puts all additional memory in the // heap, so that if called from fiber context we do not smash the // stack. Otherwise it is very similar to boost::spreadsort. -TDigest TDigest::merge(Range unsortedValues) const { +TDigest TDigest::merge(const std::vector & unsortedValues) const { auto n = unsortedValues.size(); // We require 256 buckets per byte level, plus one count array we can reuse. @@ -118,12 +120,12 @@ TDigest TDigest::merge(Range unsortedValues) const { std::copy(unsortedValues.begin(), unsortedValues.end(), in); detail::double_radix_sort(n, buckets.get(), in, out); - DCHECK(std::is_sorted(in, in + n)); + //DCHECK(std::is_sorted(in, in + n)); - return merge(presorted, Range(in, in + n)); + return merge(presorted, std::vector(in, in + n)); } -TDigest TDigest::merge(presorted_t, Range sortedValues) const { +TDigest TDigest::merge(presorted_t, const std::vector & sortedValues) const { if (sortedValues.empty()) { return *this; } @@ -205,7 +207,7 @@ TDigest TDigest::merge(presorted_t, Range sortedValues) const { return result; } -TDigest TDigest::merge(Range digests) { +TDigest TDigest::merge(std::vector& digests) { size_t nCentroids = 0; for (auto it = digests.begin(); it != digests.end(); it++) { nCentroids += it->centroids_.size(); @@ -232,8 +234,8 @@ TDigest TDigest::merge(Range digests) { starts.push_back(centroids.end()); double curCount = it->count(); if (curCount > 0) { - DCHECK(!std::isnan(it->min_)); - DCHECK(!std::isnan(it->max_)); + //DCHECK(!std::isnan(it->min_)); + //DCHECK(!std::isnan(it->max_)); min = std::min(min, it->min_); max = std::max(max, it->max_); count += curCount; @@ -266,7 +268,7 @@ TDigest TDigest::merge(Range digests) { } } - DCHECK(std::is_sorted(centroids.begin(), centroids.end())); + //DCHECK(std::is_sorted(centroids.begin(), centroids.end())); size_t maxSize = digests.begin()->maxSize_; TDigest result(maxSize); @@ -373,4 +375,4 @@ double TDigest::Centroid::add(double sum, double weight) { return sum; } -} // namespace folly +} // namespace folly => merged with existing mongo namespace diff --git a/src/mongo/db/pipeline/TDigest.h b/src/mongo/db/pipeline/TDigest.h index 12d26513c3d83..fb84f4f76a056 100644 --- a/src/mongo/db/pipeline/TDigest.h +++ b/src/mongo/db/pipeline/TDigest.h @@ -19,10 +19,10 @@ #include #include -#include -#include +struct presorted_t {}; +constexpr presorted_t presorted{}; -namespace folly { +namespace mongo { /* * TDigests are a biased quantile estimator designed to estimate the values of @@ -53,7 +53,7 @@ class TDigest { public: explicit Centroid(double mean = 0.0, double weight = 1.0) : mean_(mean), weight_(weight) { - DCHECK_GT(weight, 0); + //DCHECK_GT(weight, 0); } inline double mean() const { @@ -93,14 +93,14 @@ class TDigest { * Returns a new TDigest constructed with values merged from the current * digest and the given sortedValues. */ - TDigest merge(presorted_t, Range sortedValues) const; - TDigest merge(Range unsortedValues) const; + TDigest merge(presorted_t, const std::vector & sortedValues) const; + TDigest merge(const std::vector & unsortedValues) const; /* * Returns a new TDigest constructed with values merged from the given * digests. */ - static TDigest merge(Range digests); + static TDigest merge(std::vector& digests); /* * Estimates the value of the given quantile. From ee9aa7f2c4baf5550e8cdf5f6b7e07c2ec219d75 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 25 Oct 2018 13:58:26 +0200 Subject: [PATCH 03/17] Update accumulators and SConscript --- src/mongo/db/pipeline/SConscript | 4 +++- src/mongo/db/pipeline/accumulator.h | 31 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index d8cd37fe5a3e6..a719a4bffe4ee 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -160,7 +160,9 @@ env.Library( 'accumulator_push.cpp', 'accumulator_std_dev.cpp', 'accumulator_sum.cpp', - 'accumulator_merge_objects.cpp' + 'accumulator_merge_objects.cpp', + 'accumulator_percentile.cpp', + 'TDigest.cpp' ], LIBDEPS=[ 'document_value', diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 801c356020f2b..88680b7ea0e11 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -44,6 +44,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/summation.h" +#include "mongo/db/pipeline/TDigest.h" namespace mongo { @@ -304,6 +305,36 @@ class AccumulatorAvg final : public Accumulator { }; +// Adding a new accumulator as 'percentile' +class AccumulatorPercentile final : public Accumulator { +public: + explicit AccumulatorPercentile(const boost::intrusive_ptr& expCtx); + + void processInternal(const Value& input, bool merging) final; + Value getValue(bool toBeMerged) final; + const char* getOpName() const final; + void reset() final; + + static boost::intrusive_ptr create( + const boost::intrusive_ptr& expCtx); + +private: + /** + * The total of all values is partitioned between those that are decimals, and those that are + * not decimals, so the decimal total needs to add the non-decimal. + */ + Decimal128 _getDecimalTotal() const; + + // to feed TDigest + std::vector values; + + bool _isDecimal; + DoubleDoubleSummation _nonDecimalTotal; + Decimal128 _decimalTotal; + long long _count; +}; + + class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr& expCtx, bool isSamp); From 73ee2a84817f089dee25679fb391f71ea2448e2d Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 22 Nov 2018 14:11:53 +0100 Subject: [PATCH 04/17] adjust TDigest sortedvalues and create new accumulator --- src/mongo/db/pipeline/TDigest.cpp | 6 +- src/mongo/db/pipeline/TDigest.h | 6 +- .../db/pipeline/accumulator_percentile.cpp | 136 ++++++++++++++++++ 3 files changed, 140 insertions(+), 8 deletions(-) create mode 100644 src/mongo/db/pipeline/accumulator_percentile.cpp diff --git a/src/mongo/db/pipeline/TDigest.cpp b/src/mongo/db/pipeline/TDigest.cpp index 0731794732756..013583e1d6104 100644 --- a/src/mongo/db/pipeline/TDigest.cpp +++ b/src/mongo/db/pipeline/TDigest.cpp @@ -15,7 +15,6 @@ */ #include "TDigest.h" -#include "DoubleRadixSort.h" #include #include @@ -103,7 +102,7 @@ TDigest::TDigest( *this = this->merge(digests); } } - +/* // Merge unsorted values by first sorting them. Use radix sort if // possible. This implementation puts all additional memory in the // heap, so that if called from fiber context we do not smash the @@ -124,8 +123,9 @@ TDigest TDigest::merge(const std::vector & unsortedValues) const { return merge(presorted, std::vector(in, in + n)); } +*/ -TDigest TDigest::merge(presorted_t, const std::vector & sortedValues) const { +TDigest TDigest::merge(const std::vector & sortedValues) const { if (sortedValues.empty()) { return *this; } diff --git a/src/mongo/db/pipeline/TDigest.h b/src/mongo/db/pipeline/TDigest.h index fb84f4f76a056..70865f466800d 100644 --- a/src/mongo/db/pipeline/TDigest.h +++ b/src/mongo/db/pipeline/TDigest.h @@ -19,9 +19,6 @@ #include #include -struct presorted_t {}; -constexpr presorted_t presorted{}; - namespace mongo { /* @@ -93,8 +90,7 @@ class TDigest { * Returns a new TDigest constructed with values merged from the current * digest and the given sortedValues. */ - TDigest merge(presorted_t, const std::vector & sortedValues) const; - TDigest merge(const std::vector & unsortedValues) const; + TDigest merge(const std::vector & sortedValues) const; /* * Returns a new TDigest constructed with values merged from the given diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp new file mode 100644 index 0000000000000..6f0e8e0481324 --- /dev/null +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -0,0 +1,136 @@ +/** + * Copyright (c) 2011 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/accumulator.h" + +#include "mongo/db/pipeline/accumulation_statement.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/expression.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/value.h" +#include "mongo/platform/decimal128.h" + +namespace mongo { + +using boost::intrusive_ptr; + +REGISTER_ACCUMULATOR(avg, AccumulatorAvg::create); +REGISTER_EXPRESSION(avg, ExpressionFromAccumulator::parse); + +const char* AccumulatorAvg::getOpName() const { + return "$avg"; +} + +namespace { +const char subTotalName[] = "subTotal"; +const char subTotalErrorName[] = "subTotalError"; // Used for extra precision +const char countName[] = "count"; +} // namespace + +void AccumulatorAvg::processInternal(const Value& input, bool merging) { + if (merging) { + // We expect an object that contains both a subtotal and a count. Additionally there may + // be an error value, that allows for additional precision. + // 'input' is what getValue(true) produced below. + verify(input.getType() == Object); + // We're recursively adding the subtotal to get the proper type treatment, but this only + // increments the count by one, so adjust the count afterwards. Similarly for 'error'. + processInternal(input[subTotalName], false); + _count += input[countName].getLong() - 1; + Value error = input[subTotalErrorName]; + if (!error.missing()) { + processInternal(error, false); + _count--; // The error correction only adjusts the total, not the number of items. + } + return; + } + + switch (input.getType()) { + case NumberDecimal: + _decimalTotal = _decimalTotal.add(input.getDecimal()); + _isDecimal = true; + break; + case NumberLong: + // Avoid summation using double as that loses precision. + _nonDecimalTotal.addLong(input.getLong()); + break; + case NumberInt: + case NumberDouble: + _nonDecimalTotal.addDouble(input.getDouble()); + break; + default: + dassert(!input.numeric()); + return; + } + _count++; +} + +intrusive_ptr AccumulatorAvg::create( + const boost::intrusive_ptr& expCtx) { + return new AccumulatorAvg(expCtx); +} + +Decimal128 AccumulatorAvg::_getDecimalTotal() const { + return _decimalTotal.add(_nonDecimalTotal.getDecimal()); +} + +Value AccumulatorAvg::getValue(bool toBeMerged) { + if (toBeMerged) { + if (_isDecimal) + return Value(Document{{subTotalName, _getDecimalTotal()}, {countName, _count}}); + + double total, error; + std::tie(total, error) = _nonDecimalTotal.getDoubleDouble(); + return Value( + Document{{subTotalName, total}, {countName, _count}, {subTotalErrorName, error}}); + } + + if (_count == 0) + return Value(BSONNULL); + + if (_isDecimal) + return Value(_getDecimalTotal().divide(Decimal128(static_cast(_count)))); + + return Value(_nonDecimalTotal.getDouble() / static_cast(_count)); +} + +AccumulatorAvg::AccumulatorAvg(const boost::intrusive_ptr& expCtx) + : Accumulator(expCtx), _isDecimal(false), _count(0) { + // This is a fixed size Accumulator so we never need to update this + _memUsageBytes = sizeof(*this); +} + +void AccumulatorAvg::reset() { + _isDecimal = false; + _nonDecimalTotal = {}; + _decimalTotal = {}; + _count = 0; +} +} From 432759dbf827428e8513d1cded5bd02f0741d8ab Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 22 Nov 2018 14:56:28 +0100 Subject: [PATCH 05/17] Connect the new accumulator to TDigest --- src/mongo/db/pipeline/accumulator.h | 13 ++- .../db/pipeline/accumulator_percentile.cpp | 110 +++++++++++++----- 2 files changed, 95 insertions(+), 28 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 88680b7ea0e11..6189a5c6e8572 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -287,6 +287,10 @@ class AccumulatorAvg final : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; + double perc_val; + double digest_size; + double chunk_size; + void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); @@ -305,6 +309,7 @@ class AccumulatorAvg final : public Accumulator { }; + // Adding a new accumulator as 'percentile' class AccumulatorPercentile final : public Accumulator { public: @@ -325,9 +330,13 @@ class AccumulatorPercentile final : public Accumulator { */ Decimal128 _getDecimalTotal() const; - // to feed TDigest + // to be digested by TDigest algorithm std::vector values; + // process inputs will be sorted and added to this object + mongo::TDigest digest; + + short percentile; // Integer between 0 to 100 bool _isDecimal; DoubleDoubleSummation _nonDecimalTotal; Decimal128 _decimalTotal; @@ -335,6 +344,8 @@ class AccumulatorPercentile final : public Accumulator { }; + + class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr& expCtx, bool isSamp); diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 6f0e8e0481324..44e2d0db1a3a9 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -1,4 +1,4 @@ -/** +/* * Copyright (c) 2011 10gen Inc. * * This program is free software: you can redistribute it and/or modify @@ -37,15 +37,18 @@ #include "mongo/db/pipeline/value.h" #include "mongo/platform/decimal128.h" +#include "mongo/db/pipeline/TDigest.h" + namespace mongo { using boost::intrusive_ptr; -REGISTER_ACCUMULATOR(avg, AccumulatorAvg::create); -REGISTER_EXPRESSION(avg, ExpressionFromAccumulator::parse); +REGISTER_ACCUMULATOR(percentile, AccumulatorPercentile::create); +REGISTER_EXPRESSION(percentile, ExpressionFromAccumulator::parse); + -const char* AccumulatorAvg::getOpName() const { - return "$avg"; +const char* AccumulatorPercentile::getOpName() const { + return "$percentile"; } namespace { @@ -54,17 +57,50 @@ const char subTotalErrorName[] = "subTotalError"; // Used for extra precision const char countName[] = "count"; } // namespace -void AccumulatorAvg::processInternal(const Value& input, bool merging) { +void AccumulatorPercentile::processInternal(const Value& input, bool merging) { + + // Determining 'digest_size' + // ToDo: Try to find a better evaluation + // ToDo: Check how to pass the default value. From configurations? + if (input.getDocument()["digest_size"].missing()) + this->digest_size = 1000; + else + this->digest_size = input.getDocument()["digest_size"].getDouble(); + + + // // Determining 'chunk_size' used in TDigest + // ToDo: Try to find a better evaluation + // ToDo: Check how to pass the default value. From configurations? + if (input.getDocument()["chunk_size"].missing()) + this->chunk_size = 1000; + else + this->digest_size = input.getDocument()["digest_size"].getDouble(); + + // ToDo: error codes are not accurate. Set better numbers later + // ToDo: It might be better evaluations for this part. + uassert(6677, "The 'perc' should be present in the input document.", + !input.getDocument()["perc"].missing()); + + uassert(6678, "The 'value' should be present in the input document.", + !input.getDocument()["value"].missing()); + + this->perc_val = input.getDocument()["perc"].getDouble() / 100; // Converting Percentile to Quantile - [0:100] to [0:1] + this->digest_size = input.getDocument()["digest_size"].getDouble(); + + // ToDo: Choose a better name for perc_input and refactor later + Value perc_input = input.getDocument()["value"]; + + if (merging) { // We expect an object that contains both a subtotal and a count. Additionally there may // be an error value, that allows for additional precision. // 'input' is what getValue(true) produced below. - verify(input.getType() == Object); + verify(perc_input.getType() == Object); // We're recursively adding the subtotal to get the proper type treatment, but this only // increments the count by one, so adjust the count afterwards. Similarly for 'error'. - processInternal(input[subTotalName], false); - _count += input[countName].getLong() - 1; - Value error = input[subTotalErrorName]; + processInternal(perc_input[subTotalName], false); + _count += perc_input[countName].getLong() - 1; + Value error = perc_input[subTotalErrorName]; if (!error.missing()) { processInternal(error, false); _count--; // The error correction only adjusts the total, not the number of items. @@ -72,36 +108,47 @@ void AccumulatorAvg::processInternal(const Value& input, bool merging) { return; } - switch (input.getType()) { + // ToDo: Not sure 1) Is it important for TDigest to distinguish? 2) Is it important for MongoDB to distinguish? + // ToDo: Going to cover all Decimal, Long and Double as a temporary, need to decide on this. + switch (perc_input.getType()) { case NumberDecimal: - _decimalTotal = _decimalTotal.add(input.getDecimal()); - _isDecimal = true; + values.push_back(perc_input.getDouble()); // ToDo: need to discuss break; case NumberLong: - // Avoid summation using double as that loses precision. - _nonDecimalTotal.addLong(input.getLong()); + values.push_back(perc_input.getDouble()); // ToDo: need to discuss break; case NumberInt: case NumberDouble: - _nonDecimalTotal.addDouble(input.getDouble()); + values.push_back(perc_input.getDouble()); break; default: - dassert(!input.numeric()); + dassert(!perc_input.numeric()); return; } _count++; + + if (values.size() == this->chunk_size) + _add_to_tdigest(values); + else + return; } -intrusive_ptr AccumulatorAvg::create( +intrusive_ptr AccumulatorPercentile::create( const boost::intrusive_ptr& expCtx) { - return new AccumulatorAvg(expCtx); + return new AccumulatorPercentile(expCtx); } -Decimal128 AccumulatorAvg::_getDecimalTotal() const { +Decimal128 AccumulatorPercentile::_getDecimalTotal() const { return _decimalTotal.add(_nonDecimalTotal.getDecimal()); } -Value AccumulatorAvg::getValue(bool toBeMerged) { +Value AccumulatorPercentile::getValue(bool toBeMerged) { + + // To add remainders left over a chunk + if (values.size() > 0) + _add_to_tdigest(values); + + // ToDo: Unchanged copy from 'avg' module, need to change this for Percentile if (toBeMerged) { if (_isDecimal) return Value(Document{{subTotalName, _getDecimalTotal()}, {countName, _count}}); @@ -115,19 +162,28 @@ Value AccumulatorAvg::getValue(bool toBeMerged) { if (_count == 0) return Value(BSONNULL); - if (_isDecimal) - return Value(_getDecimalTotal().divide(Decimal128(static_cast(_count)))); - - return Value(_nonDecimalTotal.getDouble() / static_cast(_count)); + return Value(this->digest.estimateQuantile(this->perc_val)); } -AccumulatorAvg::AccumulatorAvg(const boost::intrusive_ptr& expCtx) +AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) : Accumulator(expCtx), _isDecimal(false), _count(0) { + + // Higher 'digest_size' results in higher memory consumption + mongo::TDigest digest(this->digest_size); + // This is a fixed size Accumulator so we never need to update this _memUsageBytes = sizeof(*this); } -void AccumulatorAvg::reset() { +void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ + + // Sort, Push and Clear the "values" vector in each chunk + std::sort(values.begin(), values.end()); + digest = digest.merge(values); + values.clear(); +} + +void AccumulatorPercentile::reset() { _isDecimal = false; _nonDecimalTotal = {}; _decimalTotal = {}; From 4974334d570df85163f7328f4e8eebfebf010c7b Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 4 Apr 2019 10:47:27 +0200 Subject: [PATCH 06/17] percentile unittests added --- .../sources/group/accumulator_percentile.js | 63 +++++++++++++++++++ src/mongo/db/pipeline/TDigest.cpp | 22 ------- src/mongo/db/pipeline/accumulator.h | 8 +++ .../db/pipeline/accumulator_percentile.cpp | 42 ++++++------- src/mongo/db/pipeline/accumulator_test.cpp | 15 +++++ 5 files changed, 104 insertions(+), 46 deletions(-) create mode 100644 jstests/aggregation/sources/group/accumulator_percentile.js diff --git a/jstests/aggregation/sources/group/accumulator_percentile.js b/jstests/aggregation/sources/group/accumulator_percentile.js new file mode 100644 index 0000000000000..dc4006fb10c1a --- /dev/null +++ b/jstests/aggregation/sources/group/accumulator_percentile.js @@ -0,0 +1,63 @@ +/** + * Tests that numbers that are equivalent but have different types are grouped together. + */ +(function() { + "use strict"; + const coll = db.coll; + + coll.drop(); + + assert.writeOK(coll.insert({key: new NumberInt(24), value: new NumberInt(75)})); + assert.writeOK(coll.insert({key: new NumberLong(24), value: new NumberLong(100)})); + assert.writeOK(coll.insert({key: 24, value: 36})); + + assert.writeOK(coll.insert({key: new NumberInt(42), value: new NumberInt(75)})); + assert.writeOK(coll.insert({key: new NumberLong(42), value: new NumberLong(100)})); + assert.writeOK(coll.insert({key: 42, value: 36})); + + const result1 = coll.aggregate({$group: {_id: "$key", perc_result: {$percentile: {"value":"$value","perc":20}}}}).toArray(); + + assert.eq(result1.length, 2, tojson(result1)); + + assert.eq(result1[0].perc_result, 39.900000000000006, tojson(result1)); + assert.eq(result1[1].perc_result, 39.900000000000006, tojson(result1)); + coll.drop(); + + assert.writeOK(coll.insert({temperature: 18,switch:1})); + assert.writeOK(coll.insert({temperature: 10,switch:0})); + assert.writeOK(coll.insert({temperature: 10,switch:0})); + assert.writeOK(coll.insert({temperature: 10,switch:0})); + assert.writeOK(coll.insert({temperature: 10,switch:1})); + assert.writeOK(coll.insert({temperature: 20,switch:1})); + assert.writeOK(coll.insert({temperature: 25,switch:1})); + assert.writeOK(coll.insert({temperature: 30,switch:1})); + assert.writeOK(coll.insert({temperature: 35,switch:1})); + + const result2 = db.coll.aggregate( + [ + { + '$project': { + 'valid_temp': { + '$cond': { + if: {'$eq': ['$switch', 1]}, + then: '$temperature', + else: null + } + }, + } + }, + { + "$group": { + _id: null, + perc_result: { + $percentile: { + "value":"$valid_temp", + "perc":70} + } + } + } + ]).toArray(); + + assert.eq(result2[0]['perc_result'], 28.499999999999996, tojson(result2)); + +}()); diff --git a/src/mongo/db/pipeline/TDigest.cpp b/src/mongo/db/pipeline/TDigest.cpp index 013583e1d6104..b827de9adfee8 100644 --- a/src/mongo/db/pipeline/TDigest.cpp +++ b/src/mongo/db/pipeline/TDigest.cpp @@ -102,28 +102,6 @@ TDigest::TDigest( *this = this->merge(digests); } } -/* -// Merge unsorted values by first sorting them. Use radix sort if -// possible. This implementation puts all additional memory in the -// heap, so that if called from fiber context we do not smash the -// stack. Otherwise it is very similar to boost::spreadsort. -TDigest TDigest::merge(const std::vector & unsortedValues) const { - auto n = unsortedValues.size(); - - // We require 256 buckets per byte level, plus one count array we can reuse. - std::unique_ptr buckets{new uint64_t[256 * 9]}; - // Allocate input and tmp array - std::unique_ptr tmp{new double[n * 2]}; - auto out = tmp.get() + n; - auto in = tmp.get(); - std::copy(unsortedValues.begin(), unsortedValues.end(), in); - - detail::double_radix_sort(n, buckets.get(), in, out); - //DCHECK(std::is_sorted(in, in + n)); - - return merge(presorted, std::vector(in, in + n)); -} -*/ TDigest TDigest::merge(const std::vector & sortedValues) const { if (sortedValues.empty()) { diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 6189a5c6e8572..c15b61b35dc53 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -270,6 +270,10 @@ class AccumulatorPush final : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; + double perc_val; + double digest_size; + double chunk_size; + void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); @@ -386,6 +390,10 @@ class AccumulatorMergeObjects : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; + double perc_val; + double digest_size = 0; + double chunk_size; + void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 44e2d0db1a3a9..e5f8cd9ef7e89 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -60,21 +60,16 @@ const char countName[] = "count"; void AccumulatorPercentile::processInternal(const Value& input, bool merging) { // Determining 'digest_size' - // ToDo: Try to find a better evaluation - // ToDo: Check how to pass the default value. From configurations? - if (input.getDocument()["digest_size"].missing()) - this->digest_size = 1000; - else - this->digest_size = input.getDocument()["digest_size"].getDouble(); - - - // // Determining 'chunk_size' used in TDigest - // ToDo: Try to find a better evaluation - // ToDo: Check how to pass the default value. From configurations? - if (input.getDocument()["chunk_size"].missing()) - this->chunk_size = 1000; - else - this->digest_size = input.getDocument()["digest_size"].getDouble(); + if (this->digest_size == 0){ + + if (input.getDocument()["digest_size"].missing()){ + this->digest_size = 1000; + } + else + { + this->digest_size = input.getDocument()["digest_size"].getDouble(); + } + } // ToDo: error codes are not accurate. Set better numbers later // ToDo: It might be better evaluations for this part. @@ -85,7 +80,6 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { !input.getDocument()["value"].missing()); this->perc_val = input.getDocument()["perc"].getDouble() / 100; // Converting Percentile to Quantile - [0:100] to [0:1] - this->digest_size = input.getDocument()["digest_size"].getDouble(); // ToDo: Choose a better name for perc_input and refactor later Value perc_input = input.getDocument()["value"]; @@ -111,12 +105,9 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { // ToDo: Not sure 1) Is it important for TDigest to distinguish? 2) Is it important for MongoDB to distinguish? // ToDo: Going to cover all Decimal, Long and Double as a temporary, need to decide on this. switch (perc_input.getType()) { + case NumberDecimal: - values.push_back(perc_input.getDouble()); // ToDo: need to discuss - break; case NumberLong: - values.push_back(perc_input.getDouble()); // ToDo: need to discuss - break; case NumberInt: case NumberDouble: values.push_back(perc_input.getDouble()); @@ -127,8 +118,9 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { } _count++; - if (values.size() == this->chunk_size) + if (values.size() == this->chunk_size){ _add_to_tdigest(values); + } else return; } @@ -145,8 +137,9 @@ Decimal128 AccumulatorPercentile::_getDecimalTotal() const { Value AccumulatorPercentile::getValue(bool toBeMerged) { // To add remainders left over a chunk - if (values.size() > 0) + if (values.size() > 0){ _add_to_tdigest(values); + } // ToDo: Unchanged copy from 'avg' module, need to change this for Percentile if (toBeMerged) { @@ -159,8 +152,9 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { Document{{subTotalName, total}, {countName, _count}, {subTotalErrorName, error}}); } - if (_count == 0) + if (_count == 0){ return Value(BSONNULL); + } return Value(this->digest.estimateQuantile(this->perc_val)); } @@ -168,7 +162,7 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) : Accumulator(expCtx), _isDecimal(false), _count(0) { - // Higher 'digest_size' results in higher memory consumption + // Higher 'digest_size' results in higher memory consumption and better precision mongo::TDigest digest(this->digest_size); // This is a fixed size Accumulator so we never need to update this diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index af727e21df7ea..46455580d6673 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -49,6 +49,7 @@ using std::string; * expected results as its second argument, and asserts that for the given Accumulator the arguments * evaluate to the expected results. */ + static void assertExpectedResults( std::string accumulatorName, const intrusive_ptr& expCtx, @@ -99,6 +100,20 @@ static void assertExpectedResults( } } +TEST(Accumulators, Percentile) { + intrusive_ptr expCtx(new ExpressionContextForTest()); + + assertExpectedResults( + "$percentile", + expCtx, + { + {{Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 10}})), + Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 20}})), + Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 30}})), + Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 40}})), + }, Value(13.00)}}); +} + TEST(Accumulators, Avg) { intrusive_ptr expCtx(new ExpressionContextForTest()); assertExpectedResults( From bf9abdc0c0adf63d15ef652a3c413aaa409f58a3 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Wed, 10 Apr 2019 15:20:05 +0200 Subject: [PATCH 07/17] fix definition bug in header --- src/mongo/db/pipeline/accumulator.h | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index c15b61b35dc53..88e1f356bde93 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -270,10 +270,6 @@ class AccumulatorPush final : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; - double perc_val; - double digest_size; - double chunk_size; - void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); @@ -291,10 +287,7 @@ class AccumulatorAvg final : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; - double perc_val; - double digest_size; - double chunk_size; - void _add_to_tdigest(std::vector&); + static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); @@ -313,7 +306,6 @@ class AccumulatorAvg final : public Accumulator { }; - // Adding a new accumulator as 'percentile' class AccumulatorPercentile final : public Accumulator { public: @@ -323,6 +315,11 @@ class AccumulatorPercentile final : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; + + double perc_val; + double digest_size = 0; + double chunk_size; + void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); @@ -349,7 +346,6 @@ class AccumulatorPercentile final : public Accumulator { - class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr& expCtx, bool isSamp); @@ -390,10 +386,6 @@ class AccumulatorMergeObjects : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; - double perc_val; - double digest_size = 0; - double chunk_size; - void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); From e7a78de55a72aa951cc2a9fd13a1b91ee554a174 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Wed, 10 Apr 2019 15:34:28 +0200 Subject: [PATCH 08/17] enable python3 --- buildscripts/scons.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildscripts/scons.py b/buildscripts/scons.py index fec884ce91f42..14802f1d7aed1 100755 --- a/buildscripts/scons.py +++ b/buildscripts/scons.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """Scons module.""" import os From f1aceffbf0a3ef25f54d1c2fc4d83845babe76b1 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Tue, 30 Apr 2019 11:07:28 +0200 Subject: [PATCH 09/17] Enable sharding for percentile --- src/mongo/db/pipeline/accumulator.h | 29 ++-- .../db/pipeline/accumulator_percentile.cpp | 138 +++++++++++------- src/mongo/db/pipeline/accumulator_test.cpp | 27 +++- 3 files changed, 112 insertions(+), 82 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 88e1f356bde93..237dde182e08b 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -305,7 +305,6 @@ class AccumulatorAvg final : public Accumulator { long long _count; }; - // Adding a new accumulator as 'percentile' class AccumulatorPercentile final : public Accumulator { public: @@ -316,36 +315,28 @@ class AccumulatorPercentile final : public Accumulator { const char* getOpName() const final; void reset() final; - double perc_val; - double digest_size = 0; - double chunk_size; + double percentile; + double digest_size; + double chunk_size = 100; + + mongo::TDigest digest; + + // push the vector of values to create tdigest object void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: - /** - * The total of all values is partitioned between those that are decimals, and those that are - * not decimals, so the decimal total needs to add the non-decimal. - */ - Decimal128 _getDecimalTotal() const; - // to be digested by TDigest algorithm std::vector values; + + std::vector digest_vector; - // process inputs will be sorted and added to this object - mongo::TDigest digest; - - short percentile; // Integer between 0 to 100 - bool _isDecimal; - DoubleDoubleSummation _nonDecimalTotal; - Decimal128 _decimalTotal; long long _count; + double test_var; }; - - class AccumulatorStdDev : public Accumulator { public: AccumulatorStdDev(const boost::intrusive_ptr& expCtx, bool isSamp); diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index e5f8cd9ef7e89..826d85784753b 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -54,15 +54,51 @@ const char* AccumulatorPercentile::getOpName() const { namespace { const char subTotalName[] = "subTotal"; const char subTotalErrorName[] = "subTotalError"; // Used for extra precision + +// for sharding tests - temp +const char sumName[] = "sum"; const char countName[] = "count"; +const char maxName[] = "max"; +const char minName[] = "min"; +const char percentileName[] = "percentile"; +const char digestSizeName[] = "digest_size"; +const char centroidsName[] = "centroids"; + +const char meanName[] = "mean"; +const char weightName[] = "weight"; } // namespace void AccumulatorPercentile::processInternal(const Value& input, bool merging) { + if (merging) { + verify(input.getType() == Object); + + Value digest_centroids = input[centroidsName]; + double digest_sum = input[sumName].getDouble(); + double digest_count = input[countName].getDouble(); + double digest_max = input[maxName].getDouble(); + double digest_min = input[minName].getDouble(); + double digest_size = input[digestSizeName].getDouble(); + + std::vector centroids; + for (const auto& centroid: digest_centroids.getArray()) { + centroids.push_back(mongo::TDigest::Centroid(centroid[meanName].getDouble(), centroid[weightName].getDouble())); + }; + + + // ToDo: might need to destroy it after filling to the vector + mongo::TDigest digest_temp(centroids, digest_sum, digest_count, digest_max, digest_min, digest_size); + + this->digest_vector.push_back(digest_temp); + this->percentile = input[percentileName].getDouble(); + + return; + } + // Determining 'digest_size' if (this->digest_size == 0){ - - if (input.getDocument()["digest_size"].missing()){ + if (input.getDocument()["digest_size"].missing()) + { this->digest_size = 1000; } else @@ -73,56 +109,41 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { // ToDo: error codes are not accurate. Set better numbers later // ToDo: It might be better evaluations for this part. - uassert(6677, "The 'perc' should be present in the input document.", - !input.getDocument()["perc"].missing()); + uassert(6677, "The 'percentile' should be present in the input document.", + !input.getDocument()["percentile"].missing()); uassert(6678, "The 'value' should be present in the input document.", !input.getDocument()["value"].missing()); - this->perc_val = input.getDocument()["perc"].getDouble() / 100; // Converting Percentile to Quantile - [0:100] to [0:1] + this->percentile = input.getDocument()["percentile"].getDouble(); - // ToDo: Choose a better name for perc_input and refactor later - Value perc_input = input.getDocument()["value"]; + // ToDo: Choose a better name for input_value and refactor later + Value input_value = input.getDocument()["value"]; - - if (merging) { - // We expect an object that contains both a subtotal and a count. Additionally there may - // be an error value, that allows for additional precision. - // 'input' is what getValue(true) produced below. - verify(perc_input.getType() == Object); - // We're recursively adding the subtotal to get the proper type treatment, but this only - // increments the count by one, so adjust the count afterwards. Similarly for 'error'. - processInternal(perc_input[subTotalName], false); - _count += perc_input[countName].getLong() - 1; - Value error = perc_input[subTotalErrorName]; - if (!error.missing()) { - processInternal(error, false); - _count--; // The error correction only adjusts the total, not the number of items. - } - return; - } - - // ToDo: Not sure 1) Is it important for TDigest to distinguish? 2) Is it important for MongoDB to distinguish? // ToDo: Going to cover all Decimal, Long and Double as a temporary, need to decide on this. - switch (perc_input.getType()) { - + switch (input_value.getType()) { case NumberDecimal: case NumberLong: case NumberInt: case NumberDouble: - values.push_back(perc_input.getDouble()); + values.push_back(input_value.getDouble()); break; default: - dassert(!perc_input.numeric()); + dassert(!input_value.numeric()); return; } + + // ToDo: I am thinking to replace this with other checks and get rid of "_count" variable + if (_count == 0) + { + digest = mongo::TDigest(this->digest_size); + } + _count++; if (values.size() == this->chunk_size){ _add_to_tdigest(values); } - else - return; } intrusive_ptr AccumulatorPercentile::create( @@ -130,47 +151,55 @@ intrusive_ptr AccumulatorPercentile::create( return new AccumulatorPercentile(expCtx); } -Decimal128 AccumulatorPercentile::_getDecimalTotal() const { - return _decimalTotal.add(_nonDecimalTotal.getDecimal()); -} - Value AccumulatorPercentile::getValue(bool toBeMerged) { // To add remainders left over a chunk - if (values.size() > 0){ + if (not values.empty()){ _add_to_tdigest(values); } - // ToDo: Unchanged copy from 'avg' module, need to change this for Percentile if (toBeMerged) { - if (_isDecimal) - return Value(Document{{subTotalName, _getDecimalTotal()}, {countName, _count}}); + std::vector centroids; + + for (const auto& centroid: this->digest.getCentroids()) { + centroids.push_back(Document{ + {"mean", centroid.mean()}, + {"weight", centroid.weight()} + }); + }; - double total, error; - std::tie(total, error) = _nonDecimalTotal.getDoubleDouble(); return Value( - Document{{subTotalName, total}, {countName, _count}, {subTotalErrorName, error}}); + Document{ + {"centroids", Value(centroids)}, + {"sum", digest.sum()}, + {"count", digest.count()}, + {"max", digest.max()}, + {"min", digest.min()}, + {"percentile", this->percentile}, + {"digest_size", this->digest_size} + } + ); } - if (_count == 0){ - return Value(BSONNULL); - } + // getValue(False) reaches here + // This line helps to still keep the tdigest values when there is no sharding. In case of Sharding, 'this->digest' is empty. + this->digest_vector.push_back(this->digest); + + // Regardless of using sharding, the percentile is calculated on a vector of tdigest objects + mongo::TDigest new_digest; + new_digest = mongo::TDigest::merge(this->digest_vector); - return Value(this->digest.estimateQuantile(this->perc_val)); + return Value(new_digest.estimateQuantile(this->percentile)); } AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) - : Accumulator(expCtx), _isDecimal(false), _count(0) { - - // Higher 'digest_size' results in higher memory consumption and better precision - mongo::TDigest digest(this->digest_size); - + : Accumulator(expCtx), _count(0) { + // This is a fixed size Accumulator so we never need to update this _memUsageBytes = sizeof(*this); } void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ - // Sort, Push and Clear the "values" vector in each chunk std::sort(values.begin(), values.end()); digest = digest.merge(values); @@ -178,9 +207,6 @@ void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ } void AccumulatorPercentile::reset() { - _isDecimal = false; - _nonDecimalTotal = {}; - _decimalTotal = {}; _count = 0; } } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 46455580d6673..a1d903c097149 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -64,6 +64,7 @@ static void assertExpectedResults( accum->process(val, false); } Value result = accum->getValue(false); + ASSERT_VALUE_EQ(op.second, result); ASSERT_EQUALS(op.second.getType(), result.getType()); } @@ -80,7 +81,7 @@ static void assertExpectedResults( ASSERT_VALUE_EQ(op.second, result); ASSERT_EQUALS(op.second.getType(), result.getType()); } - + // Asserts that result equals expected result when each input is on a separate shard. { boost::intrusive_ptr accum(factory(expCtx)); @@ -92,7 +93,7 @@ static void assertExpectedResults( Value result = accum->getValue(false); ASSERT_VALUE_EQ(op.second, result); ASSERT_EQUALS(op.second.getType(), result.getType()); - } + } } catch (...) { log() << "failed with arguments: " << Value(op.first); throw; @@ -107,11 +108,23 @@ TEST(Accumulators, Percentile) { "$percentile", expCtx, { - {{Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 10}})), - Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 20}})), - Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 30}})), - Value(Document({{"perc", 20.00}, {"digest_size", 100},{"value", 40}})), - }, Value(13.00)}}); + {{Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 40}})), + }, Value(13.00)}, + + {{ + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 42}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 43}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 44}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 45}})), + }, Value(41.25)}, + }); } TEST(Accumulators, Avg) { From 8e094f5722f597a07659fac61865202799cb5cdb Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Fri, 3 May 2019 14:24:10 +0200 Subject: [PATCH 10/17] Switch to gentle merge --- src/mongo/db/pipeline/TDigest.cpp | 2 +- src/mongo/db/pipeline/TDigest.h | 2 +- src/mongo/db/pipeline/accumulator.h | 4 +- .../db/pipeline/accumulator_percentile.cpp | 38 +++--------- src/mongo/db/pipeline/accumulator_test.cpp | 62 ++++++++++++++++++- 5 files changed, 72 insertions(+), 36 deletions(-) diff --git a/src/mongo/db/pipeline/TDigest.cpp b/src/mongo/db/pipeline/TDigest.cpp index b827de9adfee8..da25790b506a5 100644 --- a/src/mongo/db/pipeline/TDigest.cpp +++ b/src/mongo/db/pipeline/TDigest.cpp @@ -185,7 +185,7 @@ TDigest TDigest::merge(const std::vector & sortedValues) const { return result; } -TDigest TDigest::merge(std::vector& digests) { +TDigest TDigest::merge(const std::vector& digests) { size_t nCentroids = 0; for (auto it = digests.begin(); it != digests.end(); it++) { nCentroids += it->centroids_.size(); diff --git a/src/mongo/db/pipeline/TDigest.h b/src/mongo/db/pipeline/TDigest.h index 70865f466800d..b4baaa40a7c9d 100644 --- a/src/mongo/db/pipeline/TDigest.h +++ b/src/mongo/db/pipeline/TDigest.h @@ -96,7 +96,7 @@ class TDigest { * Returns a new TDigest constructed with values merged from the given * digests. */ - static TDigest merge(std::vector& digests); + static TDigest merge(const std::vector& digests); /* * Estimates the value of the given quantile. diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 237dde182e08b..616a2b183ed19 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -330,10 +330,8 @@ class AccumulatorPercentile final : public Accumulator { private: // to be digested by TDigest algorithm std::vector values; - - std::vector digest_vector; - long long _count; + bool any_input{false}; double test_var; }; diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 826d85784753b..7da4d8e636219 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -29,7 +29,6 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/accumulator.h" - #include "mongo/db/pipeline/accumulation_statement.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression.h" @@ -46,7 +45,6 @@ using boost::intrusive_ptr; REGISTER_ACCUMULATOR(percentile, AccumulatorPercentile::create); REGISTER_EXPRESSION(percentile, ExpressionFromAccumulator::parse); - const char* AccumulatorPercentile::getOpName() const { return "$percentile"; } @@ -55,7 +53,6 @@ namespace { const char subTotalName[] = "subTotal"; const char subTotalErrorName[] = "subTotalError"; // Used for extra precision -// for sharding tests - temp const char sumName[] = "sum"; const char countName[] = "count"; const char maxName[] = "max"; @@ -63,7 +60,6 @@ const char minName[] = "min"; const char percentileName[] = "percentile"; const char digestSizeName[] = "digest_size"; const char centroidsName[] = "centroids"; - const char meanName[] = "mean"; const char weightName[] = "weight"; } // namespace @@ -85,13 +81,8 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { centroids.push_back(mongo::TDigest::Centroid(centroid[meanName].getDouble(), centroid[weightName].getDouble())); }; - - // ToDo: might need to destroy it after filling to the vector - mongo::TDigest digest_temp(centroids, digest_sum, digest_count, digest_max, digest_min, digest_size); - - this->digest_vector.push_back(digest_temp); + digest = digest.merge({mongo::TDigest(centroids, digest_sum, digest_count, digest_max, digest_min, digest_size), digest}); this->percentile = input[percentileName].getDouble(); - return; } @@ -107,7 +98,7 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { } } - // ToDo: error codes are not accurate. Set better numbers later + // ToDo: Error codes are not accurate. Set better numbers later // ToDo: It might be better evaluations for this part. uassert(6677, "The 'percentile' should be present in the input document.", !input.getDocument()["percentile"].missing()); @@ -120,7 +111,6 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { // ToDo: Choose a better name for input_value and refactor later Value input_value = input.getDocument()["value"]; - // ToDo: Going to cover all Decimal, Long and Double as a temporary, need to decide on this. switch (input_value.getType()) { case NumberDecimal: case NumberLong: @@ -133,14 +123,12 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { return; } - // ToDo: I am thinking to replace this with other checks and get rid of "_count" variable - if (_count == 0) + if (any_input == false) { digest = mongo::TDigest(this->digest_size); + any_input = true; } - _count++; - if (values.size() == this->chunk_size){ _add_to_tdigest(values); } @@ -153,7 +141,7 @@ intrusive_ptr AccumulatorPercentile::create( Value AccumulatorPercentile::getValue(bool toBeMerged) { - // To add remainders left over a chunk + // To add the remainders if (not values.empty()){ _add_to_tdigest(values); } @@ -180,21 +168,11 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { } ); } - - // getValue(False) reaches here - // This line helps to still keep the tdigest values when there is no sharding. In case of Sharding, 'this->digest' is empty. - this->digest_vector.push_back(this->digest); - - // Regardless of using sharding, the percentile is calculated on a vector of tdigest objects - mongo::TDigest new_digest; - new_digest = mongo::TDigest::merge(this->digest_vector); - - return Value(new_digest.estimateQuantile(this->percentile)); + return Value(digest.estimateQuantile(this->percentile)); } AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) - : Accumulator(expCtx), _count(0) { - + : Accumulator(expCtx) { // This is a fixed size Accumulator so we never need to update this _memUsageBytes = sizeof(*this); } @@ -207,6 +185,6 @@ void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ } void AccumulatorPercentile::reset() { - _count = 0; + return; } } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index a1d903c097149..121613fb38d2e 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -93,7 +93,7 @@ static void assertExpectedResults( Value result = accum->getValue(false); ASSERT_VALUE_EQ(op.second, result); ASSERT_EQUALS(op.second.getType(), result.getType()); - } + } } catch (...) { log() << "failed with arguments: " << Value(op.first); throw; @@ -124,6 +124,66 @@ TEST(Accumulators, Percentile) { Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 44}})), Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 45}})), }, Value(41.25)}, + + {{ + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 10}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 20}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 30}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 40}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 50}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 60}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 70}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 80}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 90}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 91}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 92}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 93}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 94}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 95}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 96}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 97}})), + }, Value(13.00)}, + + {{ + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 10}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 20}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 30}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 40}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 50}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 60}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 70}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 80}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 90}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 91}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 92}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 93}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 94}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 95}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 96}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 97}})), + }, Value(87.25)}, + + {{ + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 10}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 20}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 30}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 40}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 50}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 60}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 70}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 80}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 90}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 91}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 92}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 93}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 94}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 95}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 96}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 97}})), + }, Value(96.7)}, + + + }); } From 84ad697369ce39d0278efa79055d868baa654cd5 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Tue, 14 May 2019 13:30:30 +0200 Subject: [PATCH 11/17] Fix digest_size initialiser bug --- src/mongo/db/pipeline/SConscript | 2 +- src/mongo/db/pipeline/accumulator.h | 21 ++- .../db/pipeline/accumulator_percentile.cpp | 27 ++-- src/mongo/db/pipeline/accumulator_test.cpp | 127 +++++++++--------- .../tdigest}/TDigest.cpp | 1 + .../tdigest}/TDigest.h | 0 6 files changed, 89 insertions(+), 89 deletions(-) rename src/{mongo/db/pipeline => third_party/tdigest}/TDigest.cpp (99%) rename src/{mongo/db/pipeline => third_party/tdigest}/TDigest.h (100%) diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index a719a4bffe4ee..22ce264cf4c03 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -162,7 +162,7 @@ env.Library( 'accumulator_sum.cpp', 'accumulator_merge_objects.cpp', 'accumulator_percentile.cpp', - 'TDigest.cpp' + '$BUILD_DIR/third_party/tdigest/TDigest.cpp' ], LIBDEPS=[ 'document_value', diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 616a2b183ed19..02b1524d569e5 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -44,7 +44,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/summation.h" -#include "mongo/db/pipeline/TDigest.h" +#include "third_party/tdigest/TDigest.h" namespace mongo { @@ -288,7 +288,6 @@ class AccumulatorAvg final : public Accumulator { const char* getOpName() const final; void reset() final; - static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); @@ -314,25 +313,23 @@ class AccumulatorPercentile final : public Accumulator { Value getValue(bool toBeMerged) final; const char* getOpName() const final; void reset() final; - - double percentile; - double digest_size; - double chunk_size = 100; - - mongo::TDigest digest; - - // push the vector of values to create tdigest object - void _add_to_tdigest(std::vector&); static boost::intrusive_ptr create( const boost::intrusive_ptr& expCtx); private: + double percentile; + double digest_size=0; + double chunk_size = 100; + mongo::TDigest digest; + // to be digested by TDigest algorithm std::vector values; + // push the vector of values to create tdigest object + void _add_to_tdigest(std::vector&); + bool any_input{false}; - double test_var; }; class AccumulatorStdDev : public Accumulator { diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 7da4d8e636219..8ba6d069ce52c 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -36,7 +36,7 @@ #include "mongo/db/pipeline/value.h" #include "mongo/platform/decimal128.h" -#include "mongo/db/pipeline/TDigest.h" +#include "third_party/tdigest/TDigest.h" namespace mongo { @@ -98,17 +98,14 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { } } - // ToDo: Error codes are not accurate. Set better numbers later - // ToDo: It might be better evaluations for this part. - uassert(6677, "The 'percentile' should be present in the input document.", + uassert(51300, "The 'percentile' should be present in the input document.", !input.getDocument()["percentile"].missing()); - uassert(6678, "The 'value' should be present in the input document.", + uassert(51301, "The 'value' should be present in the input document.", !input.getDocument()["value"].missing()); this->percentile = input.getDocument()["percentile"].getDouble(); - // ToDo: Choose a better name for input_value and refactor later Value input_value = input.getDocument()["value"]; switch (input_value.getType()) { @@ -155,8 +152,8 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { {"weight", centroid.weight()} }); }; - - return Value( + + Value res = Value( Document{ {"centroids", Value(centroids)}, {"sum", digest.sum()}, @@ -164,11 +161,16 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { {"max", digest.max()}, {"min", digest.min()}, {"percentile", this->percentile}, - {"digest_size", this->digest_size} + {"digest_size", this->digest_size} } ); + reset(); + return res; } - return Value(digest.estimateQuantile(this->percentile)); + + Value res = Value(digest.estimateQuantile(this->percentile)); + reset(); + return res; } AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) @@ -185,6 +187,9 @@ void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ } void AccumulatorPercentile::reset() { - return; + this->digest_size = 0; + values.clear(); + digest = mongo::TDigest(this->digest_size); + any_input = false; } } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 121613fb38d2e..7a975f0cd8834 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -49,7 +49,6 @@ using std::string; * expected results as its second argument, and asserts that for the given Accumulator the arguments * evaluate to the expected results. */ - static void assertExpectedResults( std::string accumulatorName, const intrusive_ptr& expCtx, @@ -103,87 +102,85 @@ static void assertExpectedResults( TEST(Accumulators, Percentile) { intrusive_ptr expCtx(new ExpressionContextForTest()); - assertExpectedResults( "$percentile", expCtx, { - {{Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 10}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 20}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 30}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 150},{"value", 40}})), + {{ + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 40}})), }, Value(13.00)}, {{ - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 10}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 20}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 30}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 40}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 42}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 43}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 44}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 300},{"value", 45}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 10}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 20}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 30}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 40}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 42}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 43}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 44}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 45}})), }, Value(41.25)}, {{ - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 10}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 20}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 30}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 40}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 50}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 60}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 70}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 80}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 90}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 91}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 92}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 93}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 94}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 95}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 96}})), - Value(Document({{"percentile", 0.05}, {"digest_size", 3},{"value", 97}})), - }, Value(13.00)}, + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 50}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 60}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 70}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 80}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 90}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 91}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 92}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 93}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 94}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 95}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 96}})), + Value(Document({{"percentile", 0.05}, {"digest_size", 300},{"value", 97}})), + }, Value(13.00)}, {{ - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 10}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 20}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 30}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 40}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 50}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 60}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 70}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 80}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 90}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 91}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 92}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 93}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 94}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 95}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 96}})), - Value(Document({{"percentile", 0.5}, {"digest_size", 3},{"value", 97}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 50}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 60}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 70}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 80}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 90}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 91}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 92}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 93}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 94}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 95}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 96}})), + Value(Document({{"percentile", 0.5}, {"digest_size", 300},{"value", 97}})), }, Value(87.25)}, {{ - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 10}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 20}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 30}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 40}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 50}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 60}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 70}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 80}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 90}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 91}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 92}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 93}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 94}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 95}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 96}})), - Value(Document({{"percentile", 0.95}, {"digest_size", 3},{"value", 97}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 10}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 20}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 30}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 40}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 50}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 60}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 70}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 80}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 90}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 91}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 92}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 93}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 94}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 95}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 96}})), + Value(Document({{"percentile", 0.95}, {"digest_size", 300},{"value", 97}})), }, Value(96.7)}, - - }); } diff --git a/src/mongo/db/pipeline/TDigest.cpp b/src/third_party/tdigest/TDigest.cpp similarity index 99% rename from src/mongo/db/pipeline/TDigest.cpp rename to src/third_party/tdigest/TDigest.cpp index da25790b506a5..75962377d34c8 100644 --- a/src/mongo/db/pipeline/TDigest.cpp +++ b/src/third_party/tdigest/TDigest.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace mongo { diff --git a/src/mongo/db/pipeline/TDigest.h b/src/third_party/tdigest/TDigest.h similarity index 100% rename from src/mongo/db/pipeline/TDigest.h rename to src/third_party/tdigest/TDigest.h From d4894c68e8a025fa00617a141271c18532291b0c Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Tue, 14 May 2019 14:27:44 +0200 Subject: [PATCH 12/17] Get rid of reset() --- src/mongo/db/pipeline/SConscript | 2 +- src/mongo/db/pipeline/accumulator.h | 2 +- src/mongo/db/pipeline/accumulator_percentile.cpp | 8 ++------ src/third_party/{tdigest => folly}/TDigest.cpp | 1 - src/third_party/{tdigest => folly}/TDigest.h | 0 5 files changed, 4 insertions(+), 9 deletions(-) rename src/third_party/{tdigest => folly}/TDigest.cpp (99%) rename src/third_party/{tdigest => folly}/TDigest.h (100%) diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 22ce264cf4c03..89216aa17702b 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -162,7 +162,7 @@ env.Library( 'accumulator_sum.cpp', 'accumulator_merge_objects.cpp', 'accumulator_percentile.cpp', - '$BUILD_DIR/third_party/tdigest/TDigest.cpp' + '$BUILD_DIR/third_party/folly/TDigest.cpp' ], LIBDEPS=[ 'document_value', diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 02b1524d569e5..1162864dc94cd 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -44,7 +44,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/unordered_set.h" #include "mongo/util/summation.h" -#include "third_party/tdigest/TDigest.h" +#include "third_party/folly/TDigest.h" namespace mongo { diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 8ba6d069ce52c..50022cc6db7df 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -36,7 +36,7 @@ #include "mongo/db/pipeline/value.h" #include "mongo/platform/decimal128.h" -#include "third_party/tdigest/TDigest.h" +#include "third_party/folly/TDigest.h" namespace mongo { @@ -164,13 +164,9 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { {"digest_size", this->digest_size} } ); - reset(); return res; } - - Value res = Value(digest.estimateQuantile(this->percentile)); - reset(); - return res; + return Value(digest.estimateQuantile(this->percentile)); } AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) diff --git a/src/third_party/tdigest/TDigest.cpp b/src/third_party/folly/TDigest.cpp similarity index 99% rename from src/third_party/tdigest/TDigest.cpp rename to src/third_party/folly/TDigest.cpp index 75962377d34c8..da25790b506a5 100644 --- a/src/third_party/tdigest/TDigest.cpp +++ b/src/third_party/folly/TDigest.cpp @@ -20,7 +20,6 @@ #include #include #include -#include namespace mongo { diff --git a/src/third_party/tdigest/TDigest.h b/src/third_party/folly/TDigest.h similarity index 100% rename from src/third_party/tdigest/TDigest.h rename to src/third_party/folly/TDigest.h From a3f0367931e89938561e8476d0ea3fd6b648a789 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 16 May 2019 11:44:46 +0200 Subject: [PATCH 13/17] Extend for Null/Empty inputs --- src/mongo/db/pipeline/accumulator.h | 6 ++-- .../db/pipeline/accumulator_percentile.cpp | 32 +++++++++++-------- src/mongo/db/pipeline/accumulator_test.cpp | 17 ++++++++-- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator.h b/src/mongo/db/pipeline/accumulator.h index 1162864dc94cd..851d21ccca5f8 100644 --- a/src/mongo/db/pipeline/accumulator.h +++ b/src/mongo/db/pipeline/accumulator.h @@ -319,7 +319,7 @@ class AccumulatorPercentile final : public Accumulator { private: double percentile; - double digest_size=0; + double digest_size = 0; double chunk_size = 100; mongo::TDigest digest; @@ -327,9 +327,9 @@ class AccumulatorPercentile final : public Accumulator { std::vector values; // push the vector of values to create tdigest object - void _add_to_tdigest(std::vector&); + void _add_to_tdigest(); - bool any_input{false}; + bool any_input = false; }; class AccumulatorStdDev : public Accumulator { diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 50022cc6db7df..29cc9f91622d1 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -122,12 +122,12 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { if (any_input == false) { - digest = mongo::TDigest(this->digest_size); + digest = mongo::TDigest(digest_size); any_input = true; } - if (values.size() == this->chunk_size){ - _add_to_tdigest(values); + if (values.size() == chunk_size){ + _add_to_tdigest(); } } @@ -140,42 +140,46 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { // To add the remainders if (not values.empty()){ - _add_to_tdigest(values); + _add_to_tdigest(); } if (toBeMerged) { std::vector centroids; - for (const auto& centroid: this->digest.getCentroids()) { + for (const auto& centroid:digest.getCentroids()) { centroids.push_back(Document{ {"mean", centroid.mean()}, {"weight", centroid.weight()} }); }; - Value res = Value( + return Value( Document{ {"centroids", Value(centroids)}, {"sum", digest.sum()}, {"count", digest.count()}, {"max", digest.max()}, {"min", digest.min()}, - {"percentile", this->percentile}, - {"digest_size", this->digest_size} + {"percentile", percentile}, + {"digest_size", digest_size} } ); - return res; } - return Value(digest.estimateQuantile(this->percentile)); + + if (digest.count() == 0) + { + return Value(BSONNULL); + } + + return Value(digest.estimateQuantile(percentile)); } AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) : Accumulator(expCtx) { // This is a fixed size Accumulator so we never need to update this - _memUsageBytes = sizeof(*this); } -void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ +void AccumulatorPercentile::_add_to_tdigest(){ // Sort, Push and Clear the "values" vector in each chunk std::sort(values.begin(), values.end()); digest = digest.merge(values); @@ -183,9 +187,9 @@ void AccumulatorPercentile::_add_to_tdigest(std::vector & values){ } void AccumulatorPercentile::reset() { - this->digest_size = 0; + digest_size = 0; values.clear(); - digest = mongo::TDigest(this->digest_size); + digest = mongo::TDigest(digest_size); any_input = false; } } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index 7a975f0cd8834..aa43d72deed73 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -80,7 +80,7 @@ static void assertExpectedResults( ASSERT_VALUE_EQ(op.second, result); ASSERT_EQUALS(op.second.getType(), result.getType()); } - + // Asserts that result equals expected result when each input is on a separate shard. { boost::intrusive_ptr accum(factory(expCtx)); @@ -105,7 +105,20 @@ TEST(Accumulators, Percentile) { assertExpectedResults( "$percentile", expCtx, - { + { + {{}, Value(BSONNULL)}, + + {{ + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", Value(BSONNULL)}})), + }, Value(BSONNULL)}, + + {{ + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", Value(BSONNULL)}})), + }, Value(11.00)}, + {{ Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 10}})), Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 20}})), From 07d126989c7dcc886b9ecb9d88ef10dc78639c13 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Thu, 16 May 2019 16:52:44 +0200 Subject: [PATCH 14/17] Calculate _memUsageBytes in new accumulator --- .../db/pipeline/accumulator_percentile.cpp | 56 +++++++++++++------ src/mongo/db/pipeline/accumulator_test.cpp | 35 ++++++------ 2 files changed, 57 insertions(+), 34 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 29cc9f91622d1..de8461cbe72d1 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -50,8 +50,6 @@ const char* AccumulatorPercentile::getOpName() const { } namespace { -const char subTotalName[] = "subTotal"; -const char subTotalErrorName[] = "subTotalError"; // Used for extra precision const char sumName[] = "sum"; const char countName[] = "count"; @@ -76,26 +74,44 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { double digest_min = input[minName].getDouble(); double digest_size = input[digestSizeName].getDouble(); + if (any_input == false){ + digest = mongo::TDigest(digest_size); + any_input = true; + } + std::vector centroids; for (const auto& centroid: digest_centroids.getArray()) { centroids.push_back(mongo::TDigest::Centroid(centroid[meanName].getDouble(), centroid[weightName].getDouble())); }; + + // ToReview: This is the vector created to receive the centroids from Shards and needs to be counted in memory usage. + _memUsageBytes += centroids.size() * sizeof(centroids[0]); + + digest = digest.merge({ + mongo::TDigest( + centroids, + digest_sum, + digest_count, + digest_max, + digest_min, + digest_size), + digest + }); + + _memUsageBytes += sizeof(digest.getCentroids()[0]) * digest_size; - digest = digest.merge({mongo::TDigest(centroids, digest_sum, digest_count, digest_max, digest_min, digest_size), digest}); this->percentile = input[percentileName].getDouble(); return; } // Determining 'digest_size' if (this->digest_size == 0){ - if (input.getDocument()["digest_size"].missing()) - { + if (input.getDocument()["digest_size"].missing()){ this->digest_size = 1000; - } - else - { + } + else{ this->digest_size = input.getDocument()["digest_size"].getDouble(); - } + } } uassert(51300, "The 'percentile' should be present in the input document.", @@ -120,15 +136,20 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { return; } - if (any_input == false) - { + if (any_input == false){ digest = mongo::TDigest(digest_size); any_input = true; + + // To add the memory used by 'values' vector. + _memUsageBytes += sizeof(double) * chunk_size; + + // To add the memory used by new digest with custom digest_size + _memUsageBytes += sizeof(digest.getCentroids()[0]) * digest_size; } if (values.size() == chunk_size){ _add_to_tdigest(); - } + } } intrusive_ptr AccumulatorPercentile::create( @@ -141,7 +162,7 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { // To add the remainders if (not values.empty()){ _add_to_tdigest(); - } + } if (toBeMerged) { std::vector centroids; @@ -153,6 +174,9 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { }); }; + // ToReview: This is the vector created to pass the centroids to MongoS and needs to be counted in memory usage. + _memUsageBytes += centroids.size() * sizeof(centroids[0]); + return Value( Document{ {"centroids", Value(centroids)}, @@ -166,8 +190,7 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { ); } - if (digest.count() == 0) - { + if (digest.empty()){ return Value(BSONNULL); } @@ -176,7 +199,7 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { AccumulatorPercentile::AccumulatorPercentile(const boost::intrusive_ptr& expCtx) : Accumulator(expCtx) { - // This is a fixed size Accumulator so we never need to update this + _memUsageBytes = sizeof(*this); } void AccumulatorPercentile::_add_to_tdigest(){ @@ -191,5 +214,6 @@ void AccumulatorPercentile::reset() { values.clear(); digest = mongo::TDigest(digest_size); any_input = false; + _memUsageBytes = sizeof(*this); } } diff --git a/src/mongo/db/pipeline/accumulator_test.cpp b/src/mongo/db/pipeline/accumulator_test.cpp index aa43d72deed73..3d037d26903ef 100644 --- a/src/mongo/db/pipeline/accumulator_test.cpp +++ b/src/mongo/db/pipeline/accumulator_test.cpp @@ -63,7 +63,6 @@ static void assertExpectedResults( accum->process(val, false); } Value result = accum->getValue(false); - ASSERT_VALUE_EQ(op.second, result); ASSERT_EQUALS(op.second.getType(), result.getType()); } @@ -109,32 +108,32 @@ TEST(Accumulators, Percentile) { {{}, Value(BSONNULL)}, {{ - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", Value(BSONNULL)}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 110},{"value", Value(BSONNULL)}})), }, Value(BSONNULL)}, {{ - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 10}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 20}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 30}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", Value(BSONNULL)}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 120},{"value", Value(BSONNULL)}})), }, Value(11.00)}, {{ - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 10}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 20}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 30}})), - Value(Document({{"percentile", 0.20}, {"digest_size", 100},{"value", 40}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 10}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 20}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 30}})), + Value(Document({{"percentile", 0.20}, {"digest_size", 130},{"value", 40}})), }, Value(13.00)}, {{ - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 10}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 20}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 30}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 40}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 42}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 43}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 44}})), - Value(Document({{"percentile", 0.50}, {"digest_size", 200},{"value", 45}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 10}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 20}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 30}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 40}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 42}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 43}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 44}})), + Value(Document({{"percentile", 0.50}, {"digest_size", 140},{"value", 45}})), }, Value(41.25)}, {{ From ff2965cf4837411008d5cfcefbfb219f6e2c38e1 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Fri, 17 May 2019 10:09:24 +0200 Subject: [PATCH 15/17] Add more to _memUsageBytes --- src/mongo/db/pipeline/accumulator_percentile.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index de8461cbe72d1..c787cd935b229 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -1,3 +1,4 @@ + /* * Copyright (c) 2011 10gen Inc. * @@ -76,6 +77,12 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { if (any_input == false){ digest = mongo::TDigest(digest_size); + + // "digest" will be extended by digest_size. Similar to "centroids", it is also 16 * digest_size + _memUsageBytes += sizeof(digest.getCentroids()[0]) * digest_size; + + // "centroids" will be the vector of two doubles (mean, weight) + _memUsageBytes += (16 * digest_size); any_input = true; } @@ -84,9 +91,6 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { centroids.push_back(mongo::TDigest::Centroid(centroid[meanName].getDouble(), centroid[weightName].getDouble())); }; - // ToReview: This is the vector created to receive the centroids from Shards and needs to be counted in memory usage. - _memUsageBytes += centroids.size() * sizeof(centroids[0]); - digest = digest.merge({ mongo::TDigest( centroids, @@ -98,8 +102,6 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { digest }); - _memUsageBytes += sizeof(digest.getCentroids()[0]) * digest_size; - this->percentile = input[percentileName].getDouble(); return; } From 3073f0b3ff0087e2c15380567495c383b03088c0 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Fri, 17 May 2019 15:37:55 +0200 Subject: [PATCH 16/17] Changes on _memUsageBytes calculation --- src/mongo/db/pipeline/accumulator_percentile.cpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index c787cd935b229..919cf3b2b1529 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -77,12 +77,7 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { if (any_input == false){ digest = mongo::TDigest(digest_size); - - // "digest" will be extended by digest_size. Similar to "centroids", it is also 16 * digest_size - _memUsageBytes += sizeof(digest.getCentroids()[0]) * digest_size; - - // "centroids" will be the vector of two doubles (mean, weight) - _memUsageBytes += (16 * digest_size); + _memUsageBytes += sizeof(mongo::TDigest::Centroid) * digest_size; any_input = true; } @@ -142,11 +137,11 @@ void AccumulatorPercentile::processInternal(const Value& input, bool merging) { digest = mongo::TDigest(digest_size); any_input = true; - // To add the memory used by 'values' vector. + // To add the memory used by 'values' vector. _memUsageBytes += sizeof(double) * chunk_size; - // To add the memory used by new digest with custom digest_size - _memUsageBytes += sizeof(digest.getCentroids()[0]) * digest_size; + // To add the memory used by digest with custom size + _memUsageBytes += sizeof(mongo::TDigest::Centroid) * digest_size; } if (values.size() == chunk_size){ From 057c25fc262ea35d619cc2eb6340f56718d03fa2 Mon Sep 17 00:00:00 2001 From: Mehdi Abolfathi Date: Tue, 21 May 2019 13:26:43 +0200 Subject: [PATCH 17/17] fix python version and memory calculation bug --- buildscripts/scons.py | 2 +- src/mongo/db/pipeline/accumulator_percentile.cpp | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/buildscripts/scons.py b/buildscripts/scons.py index 14802f1d7aed1..fec884ce91f42 100755 --- a/buildscripts/scons.py +++ b/buildscripts/scons.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python """Scons module.""" import os diff --git a/src/mongo/db/pipeline/accumulator_percentile.cpp b/src/mongo/db/pipeline/accumulator_percentile.cpp index 919cf3b2b1529..cb1155f66d52d 100644 --- a/src/mongo/db/pipeline/accumulator_percentile.cpp +++ b/src/mongo/db/pipeline/accumulator_percentile.cpp @@ -170,9 +170,6 @@ Value AccumulatorPercentile::getValue(bool toBeMerged) { {"weight", centroid.weight()} }); }; - - // ToReview: This is the vector created to pass the centroids to MongoS and needs to be counted in memory usage. - _memUsageBytes += centroids.size() * sizeof(centroids[0]); return Value( Document{