Skip to content

Commit 5b0dd30

Browse files
authored
(dsl): Support Percentile ranks aggregation (#366)
1 parent 4cf07e7 commit 5b0dd30

File tree

11 files changed

+275
-1
lines changed

11 files changed

+275
-1
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
---
2+
id: elastic_aggregation_percentile_ranks
3+
title: "Percentiles Aggregation"
4+
---
5+
6+
The `Percentile ranks` aggregation is a multi-value metrics aggregation that calculates percentile of values at or below a threshold grouped by a specified value.
7+
8+
In order to use the `Percentile ranks` aggregation import the following:
9+
```scala
10+
import zio.elasticsearch.aggregation.PercentileRanksAggregation
11+
import zio.elasticsearch.ElasticAggregation.percentileRanksAggregation
12+
```
13+
14+
You can create a `Percentile ranks` aggregation using the `percentileRanksAggregation` method this way:
15+
```scala
16+
val aggregation: PercentileRanksAggregation = percentileRanksAggregation(field = "intField", name = "percentileRanksAggregation", values = 500, 600)
17+
```
18+
19+
You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Percentile ranks` aggregation using the `percentileRanksAggregation` method this way:
20+
```scala
21+
// Document.intField must be number value
22+
val aggregation: PercentileRanksAggregation = percentileRanksAggregation(field = Document.intField, name = "percentileRanksAggregation", values = 500, 600)
23+
```
24+
25+
If you want to change the `missing`, you can use `missing` method:
26+
```scala
27+
val aggregationWithMissing: PercentileRanksAggregation = percentileRanksAggregation(field = Document.intField, name = "percentileRanksAggregation", values = 500, 600).missing(10.0)
28+
```
29+
30+
If you want to add aggregation (on the same level), you can use `withAgg` method:
31+
```scala
32+
val multipleAggregations: MultipleAggregations = percentileRanksAggregation(field = Document.intField, name = "percentileRanksAggregation1", values = 500, 600).withAgg(percentileRanksAggregation(field = Document.doubleField, name = "percentileRanksAggregation2", values = 500, 600))
33+
```
34+
35+
You can find more information about `Percentile ranks` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-percentile-rank-aggregation.html).

modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,38 @@ object HttpExecutorSpec extends IntegrationSpec {
256256
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
257257
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
258258
),
259+
test("aggregate using percentile ranks aggregation") {
260+
val expectedResult = Map("500.0" -> 55.55555555555555, "600.0" -> 100.0)
261+
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
262+
(firstDocumentId, firstDocument, secondDocumentId, secondDocument, thirdDocumentId, thirdDocument) =>
263+
for {
264+
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
265+
_ <- Executor.execute(
266+
ElasticRequest
267+
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 400))
268+
)
269+
_ <-
270+
Executor.execute(
271+
ElasticRequest
272+
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 500))
273+
)
274+
_ <- Executor.execute(
275+
ElasticRequest
276+
.upsert[TestDocument](firstSearchIndex, thirdDocumentId, thirdDocument.copy(intField = 550))
277+
.refreshTrue
278+
)
279+
aggregation =
280+
percentileRanksAggregation(name = "aggregation", field = "intField", value = 500.0, values = 600.0)
281+
aggsRes <-
282+
Executor
283+
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
284+
.asPercentileRanksAggregation("aggregation")
285+
} yield assert(aggsRes.head.values)(equalTo(expectedResult))
286+
}
287+
} @@ around(
288+
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
289+
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
290+
),
259291
test("aggregate using percentiles aggregation") {
260292
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
261293
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>

modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,66 @@ object ElasticAggregation {
254254
final def multipleAggregations: MultipleAggregations =
255255
Multiple(aggregations = Chunk.empty)
256256

257+
/**
258+
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] using the specified
259+
* parameters.
260+
*
261+
* @param name
262+
* the name of the aggregation
263+
* @param field
264+
* the type-safe field for which percentile ranks aggregation will be executed
265+
* @param value
266+
* the first value to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
267+
* @param values
268+
* an array of values to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
269+
* @tparam A
270+
* expected number type
271+
* @return
272+
* an instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] that represents percentile ranks
273+
* aggregation to be performed.
274+
*/
275+
final def percentileRanksAggregation[A: Numeric](
276+
name: String,
277+
field: Field[_, A],
278+
value: BigDecimal,
279+
values: BigDecimal*
280+
): PercentileRanksAggregation =
281+
PercentileRanks(
282+
name = name,
283+
field = field.toString,
284+
values = value +: Chunk.fromIterable(values),
285+
missing = None
286+
)
287+
288+
/**
289+
* Constructs an instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] using the specified
290+
* parameters.
291+
*
292+
* @param name
293+
* the name of the aggregation
294+
* @param field
295+
* the field for which percentile ranks aggregation will be executed
296+
* @param value
297+
* the first value to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
298+
* @param values
299+
* an array of values to be calculated for [[zio.elasticsearch.aggregation.PercentileRanksAggregation]]
300+
* @return
301+
* an instance of [[zio.elasticsearch.aggregation.PercentileRanksAggregation]] that represents percentile ranks
302+
* aggregation to be performed.
303+
*/
304+
final def percentileRanksAggregation(
305+
name: String,
306+
field: String,
307+
value: BigDecimal,
308+
values: BigDecimal*
309+
): PercentileRanksAggregation =
310+
PercentileRanks(
311+
name = name,
312+
field = field,
313+
values = value +: Chunk.fromIterable(values),
314+
missing = None
315+
)
316+
257317
/**
258318
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.PercentilesAggregation]] using the specified
259319
* parameters.

modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,37 @@ private[elasticsearch] final case class Multiple(aggregations: Chunk[SingleElast
285285
aggregations.map(_.toJson).reduce(_ merge _)
286286
}
287287

288+
sealed trait PercentileRanksAggregation
289+
extends SingleElasticAggregation
290+
with HasMissing[PercentileRanksAggregation]
291+
with WithAgg
292+
293+
private[elasticsearch] final case class PercentileRanks(
294+
name: String,
295+
field: String,
296+
values: Chunk[BigDecimal],
297+
missing: Option[Double]
298+
) extends PercentileRanksAggregation { self =>
299+
300+
def missing(value: Double): PercentileRanksAggregation =
301+
self.copy(missing = Some(value))
302+
303+
def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
304+
multipleAggregations.aggregations(self, agg)
305+
306+
private[elasticsearch] def toJson: Json = {
307+
val missingJson: Json = missing.fold(Obj())(m => Obj("missing" -> m.toJson))
308+
309+
Obj(
310+
name -> Obj(
311+
"percentile_ranks" -> ((Obj("field" -> field.toJson) merge Obj(
312+
"values" -> Arr(values.map(_.toJson))
313+
)) merge missingJson)
314+
)
315+
)
316+
}
317+
}
318+
288319
sealed trait PercentilesAggregation
289320
extends SingleElasticAggregation
290321
with HasMissing[PercentilesAggregation]
@@ -294,7 +325,7 @@ sealed trait PercentilesAggregation
294325
* Sets the `percents` parameter for the [[zio.elasticsearch.aggregation.PercentilesAggregation]].
295326
*
296327
* @param percents
297-
* a array of percentiles to be calculated for [[zio.elasticsearch.aggregation.PercentilesAggregation]]
328+
* an array of percentiles to be calculated for [[zio.elasticsearch.aggregation.PercentilesAggregation]]
298329
* @return
299330
* an instance of the [[zio.elasticsearch.aggregation.PercentilesAggregation]] enriched with the `percents`
300331
* parameter.

modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ object AggregationResponse {
8383
MinAggregationResult(value)
8484
case MissingAggregationResponse(value) =>
8585
MissingAggregationResult(value)
86+
case PercentileRanksAggregationResponse(values) =>
87+
PercentileRanksAggregationResult(values)
8688
case PercentilesAggregationResponse(values) =>
8789
PercentilesAggregationResult(values)
8890
case StatsAggregationResponse(count, min, max, avg, sum) =>
@@ -201,6 +203,10 @@ private[elasticsearch] object FilterAggregationResponse extends JsonDecoderOps {
201203
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
202204
case str if str.contains("missing#") =>
203205
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
206+
case str if str.contains("percentile_ranks#") =>
207+
Some(
208+
field -> PercentileRanksAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]])
209+
)
204210
case str if str.contains("percentiles#") =>
205211
Some(field -> PercentilesAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]]))
206212
case str if str.contains("stats#") =>
@@ -243,6 +249,8 @@ private[elasticsearch] object FilterAggregationResponse extends JsonDecoderOps {
243249
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
244250
case str if str.contains("missing#") =>
245251
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
252+
case str if str.contains("percentile_ranks#") =>
253+
(field.split("#")(1), data.asInstanceOf[PercentileRanksAggregationResponse])
246254
case str if str.contains("percentiles#") =>
247255
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
248256
case str if str.contains("stats#") =>
@@ -287,6 +295,14 @@ private[elasticsearch] object MissingAggregationResponse {
287295
implicit val decoder: JsonDecoder[MissingAggregationResponse] = DeriveJsonDecoder.gen[MissingAggregationResponse]
288296
}
289297

298+
private[elasticsearch] final case class PercentileRanksAggregationResponse(values: Map[String, Double])
299+
extends AggregationResponse
300+
301+
private[elasticsearch] object PercentileRanksAggregationResponse {
302+
implicit val decoder: JsonDecoder[PercentileRanksAggregationResponse] =
303+
DeriveJsonDecoder.gen[PercentileRanksAggregationResponse]
304+
}
305+
290306
private[elasticsearch] final case class PercentilesAggregationResponse(values: Map[String, Double])
291307
extends AggregationResponse
292308

@@ -396,6 +412,10 @@ private[elasticsearch] object TermsAggregationBucket extends JsonDecoderOps {
396412
Some(field -> MinAggregationResponse(value = objFields("value").unsafeAs[Double]))
397413
case str if str.contains("missing#") =>
398414
Some(field -> MissingAggregationResponse(docCount = objFields("doc_count").unsafeAs[Int]))
415+
case str if str.contains("percentile_ranks#") =>
416+
Some(
417+
field -> PercentileRanksAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]])
418+
)
399419
case str if str.contains("percentiles#") =>
400420
Some(field -> PercentilesAggregationResponse(values = objFields("values").unsafeAs[Map[String, Double]]))
401421
case str if str.contains("stats#") =>
@@ -439,6 +459,8 @@ private[elasticsearch] object TermsAggregationBucket extends JsonDecoderOps {
439459
(field.split("#")(1), data.asInstanceOf[MinAggregationResponse])
440460
case str if str.contains("missing#") =>
441461
(field.split("#")(1), data.asInstanceOf[MissingAggregationResponse])
462+
case str if str.contains("percentile_ranks#") =>
463+
(field.split("#")(1), data.asInstanceOf[PercentileRanksAggregationResponse])
442464
case str if str.contains("percentiles#") =>
443465
(field.split("#")(1), data.asInstanceOf[PercentilesAggregationResponse])
444466
case str if str.contains("stats#") =>

modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
8888
MinAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
8989
case str if str.contains("missing#") =>
9090
MissingAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
91+
case str if str.contains("percentile_ranks#") =>
92+
PercentileRanksAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
9193
case str if str.contains("percentiles#") =>
9294
PercentilesAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
9395
case str if str.contains("stats#") =>

modules/library/src/main/scala/zio/elasticsearch/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
108108
def asMaxAggregation(name: String): RIO[R, Option[MaxAggregationResult]] =
109109
aggregationAs[MaxAggregationResult](name)
110110

111+
/**
112+
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
113+
*
114+
* @param name
115+
* the name of the aggregation to retrieve
116+
* @return
117+
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
118+
* [[result.PercentileRanksAggregationResult]].
119+
*/
120+
def asPercentileRanksAggregation(name: String): RIO[R, Option[PercentileRanksAggregationResult]] =
121+
aggregationAs[PercentileRanksAggregationResult](name)
122+
111123
/**
112124
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
113125
*

modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ final case class MinAggregationResult private[elasticsearch] (value: Double) ext
6565

6666
final case class MissingAggregationResult private[elasticsearch] (docCount: Int) extends AggregationResult
6767

68+
final case class PercentileRanksAggregationResult private[elasticsearch] (values: Map[String, Double])
69+
extends AggregationResult
70+
6871
final case class PercentilesAggregationResult private[elasticsearch] (values: Map[String, Double])
6972
extends AggregationResult
7073

modules/library/src/main/scala/zio/elasticsearch/result/ElasticResult.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
5757
def asMinAggregation(name: String): IO[DecodingException, Option[MinAggregationResult]] =
5858
aggregationAs[MinAggregationResult](name)
5959

60+
def asPercentileRanksAggregation(name: String): IO[DecodingException, Option[PercentileRanksAggregationResult]] =
61+
aggregationAs[PercentileRanksAggregationResult](name)
62+
6063
def asPercentilesAggregation(name: String): IO[DecodingException, Option[PercentilesAggregationResult]] =
6164
aggregationAs[PercentilesAggregationResult](name)
6265

modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,32 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
285285
)
286286
)
287287
},
288+
test("percentileRanks") {
289+
val aggregation = percentileRanksAggregation("aggregation", "testField", 5, 6)
290+
val aggregationTs = percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6)
291+
val aggregationTsRaw = percentileRanksAggregation("aggregation", TestSubDocument.intField.raw, 5, 6)
292+
val aggregationWithMissing =
293+
percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6).missing(20.0)
294+
295+
assert(aggregation)(
296+
equalTo(
297+
PercentileRanks(name = "aggregation", field = "testField", values = Chunk(5.0, 6.0), missing = None)
298+
)
299+
) &&
300+
assert(aggregationTs)(
301+
equalTo(PercentileRanks(name = "aggregation", field = "intField", values = Chunk(5.0, 6.0), missing = None))
302+
) &&
303+
assert(aggregationTsRaw)(
304+
equalTo(
305+
PercentileRanks(name = "aggregation", field = "intField.raw", values = Chunk(5.0, 6.0), missing = None)
306+
)
307+
) &&
308+
assert(aggregationWithMissing)(
309+
equalTo(
310+
PercentileRanks(name = "aggregation", field = "intField", values = Chunk(5.0, 6.0), missing = Some(20.0))
311+
)
312+
)
313+
},
288314
test("percentiles") {
289315
val aggregation = percentilesAggregation("aggregation", "testField")
290316
val aggregationTs = percentilesAggregation("aggregation", TestSubDocument.intField)
@@ -1121,6 +1147,53 @@ object ElasticAggregationSpec extends ZIOSpecDefault {
11211147
assert(aggregation.toJson)(equalTo(expected.toJson)) &&
11221148
assert(aggregationWithSubAggregation.toJson)(equalTo(expectedWithSubAggregation.toJson))
11231149
},
1150+
test("percentileRanks") {
1151+
val aggregation = percentileRanksAggregation("aggregation", "testField", 5, 6)
1152+
val aggregationTs = percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6)
1153+
val aggregationWithMissing =
1154+
percentileRanksAggregation("aggregation", TestSubDocument.intField, 5, 6).missing(20.0)
1155+
1156+
val expected =
1157+
"""
1158+
|{
1159+
| "aggregation": {
1160+
| "percentile_ranks": {
1161+
| "field": "testField",
1162+
| "values": [5, 6]
1163+
| }
1164+
| }
1165+
|}
1166+
|""".stripMargin
1167+
1168+
val expectedTs =
1169+
"""
1170+
|{
1171+
| "aggregation": {
1172+
| "percentile_ranks": {
1173+
| "field": "intField",
1174+
| "values": [5, 6]
1175+
| }
1176+
| }
1177+
|}
1178+
|""".stripMargin
1179+
1180+
val expectedWithMissing =
1181+
"""
1182+
|{
1183+
| "aggregation": {
1184+
| "percentile_ranks": {
1185+
| "field": "intField",
1186+
| "values": [5, 6],
1187+
| "missing": 20.0
1188+
| }
1189+
| }
1190+
|}
1191+
|""".stripMargin
1192+
1193+
assert(aggregation.toJson)(equalTo(expected.toJson)) &&
1194+
assert(aggregationTs.toJson)(equalTo(expectedTs.toJson)) &&
1195+
assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson))
1196+
},
11241197
test("percentiles") {
11251198
val aggregation = percentilesAggregation("aggregation", "testField")
11261199
val aggregationTs = percentilesAggregation("aggregation", TestDocument.intField)

0 commit comments

Comments
 (0)