Skip to content

Commit 1cb75b7

Browse files
authored
(dsl): Support Extended stats aggregation (#363)
1 parent dbeb4b0 commit 1cb75b7

File tree

13 files changed

+408
-4
lines changed

13 files changed

+408
-4
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
---
2+
id: elastic_aggregation_extended_stats
3+
title: "Extended stats Aggregation"
4+
---
5+
6+
The `Extended stats` aggregation is a multi-value metrics aggregation that provides statistical information (count, sum, min, max, average, sum od squares, variance and std deviation of a field) over numeric values extracted from the aggregated documents.
7+
The `Extended stats` aggregation is an extended version of the [`Stats`](https://lambdaworks.github.io/zio-elasticsearch/overview/aggregations/elastic_aggregation_stats) aggregation.
8+
9+
In order to use the `Extended stats` aggregation import the following:
10+
```scala
11+
import zio.elasticsearch.aggregation.ExtendedStatsAggregation
12+
import zio.elasticsearch.ElasticAggregation.extendedStatsAggregation
13+
```
14+
15+
You can create a `Extended stats` aggregation using the `extendedStatsAggregation` method this way:
16+
```scala
17+
val aggregation: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = "intField")
18+
```
19+
20+
You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Extended stats` aggregation using the `extendedStatsAggregation` method this way:
21+
```scala
22+
// Document.intField must be number value, because of Stats aggregation
23+
val aggregation: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = Document.intField)
24+
```
25+
26+
If you want to change the `missing` parameter, you can use `missing` method:
27+
```scala
28+
val aggregationWithMissing: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = Document.intField).missing(10.0)
29+
```
30+
31+
If you want to change the `sigma` parameter, you can use `sigma` method:
32+
```scala
33+
val aggregationWithSigma: ExtendedStatsAggregation = extendedStatsAggregation(name = "extendedStatsAggregation", field = Document.intField).sigma(3.0)
34+
```
35+
36+
If you want to add aggregation (on the same level), you can use `withAgg` method:
37+
```scala
38+
val multipleAggregations: MultipleAggregations = extendedStatsAggregation(name = "extendedStatsAggregation1", field = Document.intField).withAgg(extendedStatsAggregation(name = "extendedStatsAggregation2", field = Document.doubleField))
39+
```
40+
41+
You can find more information about `Extended stats` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-extendedstats-aggregation.html#search-aggregations-metrics-extendedstats-aggregation).

docs/overview/aggregations/elastic_aggregation_stats.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ id: elastic_aggregation_stats
33
title: "Stats Aggregation"
44
---
55

6-
The `Stats` aggregation is a multi-value metrics aggregation that provides statistical information (count, sum, min, max, and average of a field) over numeric values extracted from the aggregated documents.
6+
The `Stats` aggregation is a multi-value metrics aggregation that provides statistical information (count, sum, min, max and average of a field) over numeric values extracted from the aggregated documents.
77

88
In order to use the `Stats` aggregation import the following:
99
```scala

docs/overview/queries/elastic_query_term.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import zio.elasticsearch.ElasticQuery._
1313

1414
You can create a `Term` query using the `term` method this way:
1515
```scala
16-
val query: TermQuery = term(field = Document.name, value = "test")
16+
val query: TermQuery = term(field = "stringField", value = "test")
1717
```
1818

1919
You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Term` query using the `term` method this way:

modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,48 @@ object HttpExecutorSpec extends IntegrationSpec {
104104
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
105105
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
106106
),
107+
test("aggregate using extended stats aggregation") {
108+
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
109+
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
110+
for {
111+
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
112+
_ <- Executor.execute(
113+
ElasticRequest
114+
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 100))
115+
)
116+
_ <- Executor.execute(
117+
ElasticRequest
118+
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 50))
119+
.refreshTrue
120+
)
121+
aggregation = extendedStatsAggregation(name = "aggregation", field = TestDocument.intField).sigma(3)
122+
aggsRes <-
123+
Executor
124+
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
125+
.asExtendedStatsAggregation("aggregation")
126+
} yield assert(aggsRes.head.count)(equalTo(2)) &&
127+
assert(aggsRes.head.min)(equalTo(50.0)) &&
128+
assert(aggsRes.head.max)(equalTo(100.0)) &&
129+
assert(aggsRes.head.avg)(equalTo(75.0)) &&
130+
assert(aggsRes.head.sum)(equalTo(150.0)) &&
131+
assert(aggsRes.head.sumOfSquares)(equalTo(12500.0)) &&
132+
assert(aggsRes.head.variance)(equalTo(625.0)) &&
133+
assert(aggsRes.head.variancePopulation)(equalTo(625.0)) &&
134+
assert(aggsRes.head.varianceSampling)(equalTo(1250.0)) &&
135+
assert(aggsRes.head.stdDeviation)(equalTo(25.0)) &&
136+
assert(aggsRes.head.stdDeviationPopulation)(equalTo(25.0)) &&
137+
assert(aggsRes.head.stdDeviationSampling)(equalTo(35.35533905932738)) &&
138+
assert(aggsRes.head.stdDeviationBoundsResult.upper)(equalTo(150.0)) &&
139+
assert(aggsRes.head.stdDeviationBoundsResult.lower)(equalTo(0.0)) &&
140+
assert(aggsRes.head.stdDeviationBoundsResult.upperPopulation)(equalTo(150.0)) &&
141+
assert(aggsRes.head.stdDeviationBoundsResult.lowerPopulation)(equalTo(0.0)) &&
142+
assert(aggsRes.head.stdDeviationBoundsResult.upperSampling)(equalTo(181.06601717798213)) &&
143+
assert(aggsRes.head.stdDeviationBoundsResult.lowerSampling)(equalTo(-31.066017177982133))
144+
}
145+
} @@ around(
146+
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
147+
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
148+
),
107149
test("aggregate using max aggregation") {
108150
val expectedResponse = ("aggregationInt", MaxAggregationResult(value = 20.0))
109151
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {

modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,38 @@ object ElasticAggregation {
113113
final def cardinalityAggregation(name: String, field: String): CardinalityAggregation =
114114
Cardinality(name = name, field = field, missing = None)
115115

116+
/**
117+
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] using the specified
118+
* parameters.
119+
*
120+
* @param name
121+
* aggregation name
122+
* @param field
123+
* the type-safe field for which extended stats aggregation will be executed
124+
* @tparam A
125+
* expected number type
126+
* @return
127+
* an instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] that represents extended stats
128+
* aggregation to be performed.
129+
*/
130+
final def extendedStatsAggregation[A: Numeric](name: String, field: Field[_, A]): ExtendedStatsAggregation =
131+
ExtendedStats(name = name, field = field.toString, missing = None, sigma = None)
132+
133+
/**
134+
* Constructs an instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] using the specified
135+
* parameters.
136+
*
137+
* @param name
138+
* aggregation name
139+
* @param field
140+
* the field for which extended stats aggregation will be executed
141+
* @return
142+
* an instance of [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] that represents extended stats
143+
* aggregation to be performed.
144+
*/
145+
final def extendedStatsAggregation(name: String, field: String): ExtendedStatsAggregation =
146+
ExtendedStats(name = name, field = field, missing = None, sigma = None)
147+
116148
/**
117149
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MaxAggregation]] using the specified parameters.
118150
*
@@ -246,6 +278,8 @@ object ElasticAggregation {
246278
* the name of the aggregation
247279
* @param field
248280
* the type-safe field for which stats aggregation will be executed
281+
* @tparam A
282+
* expected number type
249283
* @return
250284
* an instance of [[zio.elasticsearch.aggregation.StatsAggregation]] that represents stats aggregation to be
251285
* performed.

modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,49 @@ private[elasticsearch] final case class Cardinality(name: String, field: String,
143143
}
144144
}
145145

146+
sealed trait ExtendedStatsAggregation
147+
extends SingleElasticAggregation
148+
with HasMissing[ExtendedStatsAggregation]
149+
with WithAgg {
150+
151+
/**
152+
* Sets the `sigma` parameter for the [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]]. The`sigma` parameter
153+
* controls how many standard deviations plus/minus from the mean should std_deviation_bounds object display.
154+
*
155+
* @param value
156+
* the value to use for sigma parameter
157+
* @return
158+
* an instance of the [[zio.elasticsearch.aggregation.ExtendedStatsAggregation]] enriched with the `sigma`
159+
* parameter.
160+
*/
161+
def sigma(value: Double): ExtendedStatsAggregation
162+
}
163+
164+
private[elasticsearch] final case class ExtendedStats(
165+
name: String,
166+
field: String,
167+
missing: Option[Double],
168+
sigma: Option[Double]
169+
) extends ExtendedStatsAggregation { self =>
170+
171+
def missing(value: Double): ExtendedStatsAggregation =
172+
self.copy(missing = Some(value))
173+
174+
def sigma(value: Double): ExtendedStatsAggregation =
175+
self.copy(sigma = Some(value))
176+
177+
def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
178+
multipleAggregations.aggregations(self, agg)
179+
180+
private[elasticsearch] def toJson: Json = {
181+
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))
182+
183+
val sigmaJson: Json = sigma.fold(Obj())(m => Obj("sigma" -> m.toJson))
184+
185+
Obj(name -> Obj("extended_stats" -> (Obj("field" -> field.toJson) merge missingJson merge sigmaJson)))
186+
}
187+
}
188+
146189
sealed trait MaxAggregation extends SingleElasticAggregation with HasMissing[MaxAggregation] with WithAgg
147190

148191
private[elasticsearch] final case class Max(name: String, field: String, missing: Option[Double])

modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,43 @@ object AggregationResponse {
3131
AvgAggregationResult(value)
3232
case CardinalityAggregationResponse(value) =>
3333
CardinalityAggregationResult(value)
34+
case ExtendedStatsAggregationResponse(
35+
count,
36+
min,
37+
max,
38+
avg,
39+
sum,
40+
sumOfSquares,
41+
variance,
42+
variancePopulation,
43+
varianceSampling,
44+
stdDeviation,
45+
stdDeviationPopulation,
46+
stdDeviationSampling,
47+
stdDeviationBoundsResponse
48+
) =>
49+
ExtendedStatsAggregationResult(
50+
count = count,
51+
min = min,
52+
max = max,
53+
avg = avg,
54+
sum = sum,
55+
sumOfSquares = sumOfSquares,
56+
variance = variance,
57+
variancePopulation = variancePopulation,
58+
varianceSampling = varianceSampling,
59+
stdDeviation = stdDeviation,
60+
stdDeviationPopulation = stdDeviationPopulation,
61+
stdDeviationSampling = stdDeviationSampling,
62+
StdDeviationBoundsResult(
63+
upper = stdDeviationBoundsResponse.upper,
64+
lower = stdDeviationBoundsResponse.lower,
65+
upperPopulation = stdDeviationBoundsResponse.upperPopulation,
66+
lowerPopulation = stdDeviationBoundsResponse.lowerPopulation,
67+
upperSampling = stdDeviationBoundsResponse.upperSampling,
68+
lowerSampling = stdDeviationBoundsResponse.lowerSampling
69+
)
70+
)
3471
case MaxAggregationResponse(value) =>
3572
MaxAggregationResult(value)
3673
case MinAggregationResponse(value) =>
@@ -77,6 +114,34 @@ private[elasticsearch] object CardinalityAggregationResponse {
77114
DeriveJsonDecoder.gen[CardinalityAggregationResponse]
78115
}
79116

117+
private[elasticsearch] final case class ExtendedStatsAggregationResponse(
118+
count: Int,
119+
min: Double,
120+
max: Double,
121+
avg: Double,
122+
sum: Double,
123+
@jsonField("sum_of_squares")
124+
sumOfSquares: Double,
125+
variance: Double,
126+
@jsonField("variance_population")
127+
variancePopulation: Double,
128+
@jsonField("variance_sampling")
129+
varianceSampling: Double,
130+
@jsonField("std_deviation")
131+
stdDeviation: Double,
132+
@jsonField("std_deviation_population")
133+
stdDeviationPopulation: Double,
134+
@jsonField("std_deviation_sampling")
135+
stdDeviationSampling: Double,
136+
@jsonField("std_deviation_bounds")
137+
stdDeviationBoundsResponse: StdDeviationBoundsResponse
138+
) extends AggregationResponse
139+
140+
private[elasticsearch] object ExtendedStatsAggregationResponse {
141+
implicit val decoder: JsonDecoder[ExtendedStatsAggregationResponse] =
142+
DeriveJsonDecoder.gen[ExtendedStatsAggregationResponse]
143+
}
144+
80145
private[elasticsearch] final case class MaxAggregationResponse(value: Double) extends AggregationResponse
81146

82147
private[elasticsearch] object MaxAggregationResponse {
@@ -116,6 +181,24 @@ private[elasticsearch] object StatsAggregationResponse {
116181
implicit val decoder: JsonDecoder[StatsAggregationResponse] = DeriveJsonDecoder.gen[StatsAggregationResponse]
117182
}
118183

184+
private[elasticsearch] case class StdDeviationBoundsResponse(
185+
upper: Double,
186+
lower: Double,
187+
@jsonField("upper_population")
188+
upperPopulation: Double,
189+
@jsonField("lower_population")
190+
lowerPopulation: Double,
191+
@jsonField("upper_sampling")
192+
upperSampling: Double,
193+
@jsonField("lower_sampling")
194+
lowerSampling: Double
195+
) extends AggregationResponse
196+
197+
private[elasticsearch] object StdDeviationBoundsResponse {
198+
implicit val decoder: JsonDecoder[StdDeviationBoundsResponse] =
199+
DeriveJsonDecoder.gen[StdDeviationBoundsResponse]
200+
}
201+
119202
private[elasticsearch] final case class SumAggregationResponse(value: Double) extends AggregationResponse
120203

121204
private[elasticsearch] object SumAggregationResponse {
@@ -161,6 +244,26 @@ private[elasticsearch] object TermsAggregationBucket {
161244
Some(field -> AvgAggregationResponse(value = objFields("value").unsafeAs[Double]))
162245
case str if str.contains("cardinality#") =>
163246
Some(field -> CardinalityAggregationResponse(value = objFields("value").unsafeAs[Int]))
247+
case str if str.contains("extended_stats#") =>
248+
Some(
249+
field -> ExtendedStatsAggregationResponse(
250+
count = objFields("count").unsafeAs[Int],
251+
min = objFields("min").unsafeAs[Double],
252+
max = objFields("max").unsafeAs[Double],
253+
avg = objFields("avg").unsafeAs[Double],
254+
sum = objFields("sum").unsafeAs[Double],
255+
sumOfSquares = objFields("sum_of_squares").unsafeAs[Double],
256+
variance = objFields("variance").unsafeAs[Double],
257+
variancePopulation = objFields("variance_population").unsafeAs[Double],
258+
varianceSampling = objFields("variance_sampling").unsafeAs[Double],
259+
stdDeviation = objFields("std_deviation").unsafeAs[Double],
260+
stdDeviationPopulation = objFields("std_deviation_population").unsafeAs[Double],
261+
stdDeviationSampling = objFields("std_deviation_sampling").unsafeAs[Double],
262+
stdDeviationBoundsResponse = objFields("std_deviation_sampling").unsafeAs[StdDeviationBoundsResponse](
263+
StdDeviationBoundsResponse.decoder
264+
)
265+
)
266+
)
164267
case str if str.contains("max#") =>
165268
Some(field -> MaxAggregationResponse(value = objFields("value").unsafeAs[Double]))
166269
case str if str.contains("min#") =>
@@ -208,6 +311,8 @@ private[elasticsearch] object TermsAggregationBucket {
208311
(field.split("#")(1), data.asInstanceOf[AvgAggregationResponse])
209312
case str if str.contains("cardinality#") =>
210313
(field.split("#")(1), data.asInstanceOf[CardinalityAggregationResponse])
314+
case str if str.contains("extended_stats#") =>
315+
(field.split("#")(1), data.asInstanceOf[ExtendedStatsAggregationResponse])
211316
case str if str.contains("max#") =>
212317
(field.split("#")(1), data.asInstanceOf[MaxAggregationResponse])
213318
case str if str.contains("min#") =>

modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
7676
WeightedAvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
7777
case str if str.contains("avg#") =>
7878
AvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
79+
case str if str.contains("cardinality#") =>
80+
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
81+
case str if str.contains("extended_stats#") =>
82+
ExtendedStatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
7983
case str if str.contains("max#") =>
8084
MaxAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
8185
case str if str.contains("min#") =>
@@ -88,8 +92,6 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
8892
StatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
8993
case str if str.contains("sum#") =>
9094
SumAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
91-
case str if str.contains("cardinality#") =>
92-
CardinalityAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
9395
case str if str.contains("terms#") =>
9496
TermsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
9597
case str if str.contains("value_count#") =>

modules/library/src/main/scala/zio/elasticsearch/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
8484
def asCardinalityAggregation(name: String): RIO[R, Option[CardinalityAggregationResult]] =
8585
aggregationAs[CardinalityAggregationResult](name)
8686

87+
/**
88+
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
89+
*
90+
* @param name
91+
* the name of the aggregation to retrieve
92+
* @return
93+
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
94+
* [[result.ExtendedStatsAggregationResult]].
95+
*/
96+
def asExtendedStatsAggregation(name: String): RIO[R, Option[ExtendedStatsAggregationResult]] =
97+
aggregationAs[ExtendedStatsAggregationResult](name)
98+
8799
/**
88100
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
89101
*

0 commit comments

Comments
 (0)