Skip to content

Commit 8f9990c

Browse files
authored
(dsl): Support Weight Avg aggregation (#358)
1 parent 72d0a37 commit 8f9990c

File tree

10 files changed

+410
-1
lines changed

10 files changed

+410
-1
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
---
2+
id: elastic_aggregation_weighted_avg
3+
title: "Weighted Avg Aggregation"
4+
---
5+
6+
The `Weighted Avg` aggregation is a single-value metrics aggregation that computes the average while taking into account the varying degrees of importance of numeric values. As a formula, a weighted average is the `∑(value * weight) / ∑(weight)`
7+
8+
In order to use the `Weighted Avg` aggregation import the following:
9+
```scala
10+
import zio.elasticsearch.aggregation.WeightedAvgAggregation
11+
import zio.elasticsearch.ElasticAggregation.weightedAvgAggregation
12+
```
13+
14+
You can create a `Weighted Avg` aggregation using the `weightedAvgAggregation` method this way:
15+
```scala
16+
val aggregation: WeightedAvgAggregation = weightedAvgAggregation(name = "weightedAvgAggregation", valueField = "doubleField", weightField = "intField")
17+
```
18+
19+
You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Weighted Avg` aggregation using the `weightedAvgAggregation` method this way:
20+
```scala
21+
val aggregation: WeightedAvgAggregation = weightedAvgAggregation(name = "weightedAvgAggregation", valueField = Document.doubleField, weightField = Document.intField)
22+
```
23+
24+
If you want to add aggregation (on the same level), you can use `withAgg` method:
25+
```scala
26+
val multipleAggregations: MultipleAggregations = weightedAvgAggregation(name = "weightedAvgAggregation1", valueField = Document.intField, weightField = Document.doubleField).withAgg(weightedAvgAggregation(name = "weightedAvgAggregation2", valueField = Document.doubleField, weightField = Document.intField))
27+
```
28+
29+
If you want to change the `valueMissing`, you can use `valueMissing` method:
30+
```scala
31+
val aggregationWithValueMissing: WeightedAvgAggregation = weightedAvgAggregation(name = "weightedAvgAggregation", field = Document.intField).valueMissing(10.0)
32+
```
33+
34+
If you want to change the `weightMissing`, you can use `weightMissing` method:
35+
```scala
36+
val aggregationWithWeightMissing: WeightedAvgAggregation = weightedAvgAggregation(name = "weightedAvgAggregation", field = Document.intField).weightMissing(5.0)
37+
```
38+
39+
If you want to change the `weightMissing` and `valueMissing`, you can use `weightMissing` and `valueMissing` methods:
40+
```scala
41+
val aggregationWithValueAndWeightMissing: WeightedAvgAggregation = weightedAvgAggregation(name = "weightedAvgAggregation", field = Document.intField).valueMissing(5.0).weightMissing(10.0)
42+
```
43+
44+
You can find more information about `Weighted Avg` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-weight-avg-aggregation.html).

modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,45 @@ object HttpExecutorSpec extends IntegrationSpec {
496496
} @@ around(
497497
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
498498
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
499+
),
500+
test("aggregate using weighted avg aggregation") {
501+
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
502+
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
503+
for {
504+
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
505+
_ <-
506+
Executor.execute(
507+
ElasticRequest
508+
.upsert[TestDocument](
509+
firstSearchIndex,
510+
firstDocumentId,
511+
firstDocument.copy(doubleField = 5, intField = 2)
512+
)
513+
)
514+
_ <-
515+
Executor.execute(
516+
ElasticRequest
517+
.upsert[TestDocument](
518+
firstSearchIndex,
519+
secondDocumentId,
520+
secondDocument.copy(doubleField = 10, intField = 3)
521+
)
522+
.refreshTrue
523+
)
524+
aggregation = weightedAvgAggregation(
525+
name = "weightedAggregation",
526+
valueField = TestDocument.doubleField,
527+
weightField = TestDocument.intField
528+
)
529+
aggsRes <-
530+
Executor
531+
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
532+
.asWeightedAvgAggregation("weightedAggregation")
533+
} yield assert(aggsRes.head.value)(equalTo(8.0))
534+
}
535+
} @@ around(
536+
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
537+
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
499538
)
500539
),
501540
suite("search with aggregation")(

modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,4 +353,53 @@ object ElasticAggregation {
353353
final def valueCountAggregation(name: String, field: String): ValueCountAggregation =
354354
ValueCount(name = name, field = field)
355355

356+
/**
357+
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.WeightedAvgAggregation]] using the specified
358+
* parameters.
359+
*
360+
* @param name
361+
* aggregation name
362+
* @param valueField
363+
* the type-safe field that represents value for which weighted avg aggregation will be executed
364+
* @param weightField
365+
* the type-safe field that represents weight for which weighted avg aggregation will be executed
366+
* @return
367+
* an instance of [[zio.elasticsearch.aggregation.WeightedAvgAggregation]] that represents weighted avg aggregation
368+
* to be performed.
369+
*/
370+
final def weightedAvgAggregation(
371+
name: String,
372+
valueField: Field[_, Any],
373+
weightField: Field[_, Any]
374+
): WeightedAvgAggregation =
375+
WeightedAvg(
376+
name = name,
377+
valueField = valueField.toString,
378+
weightField = weightField.toString,
379+
valueMissing = None,
380+
weightMissing = None
381+
)
382+
383+
/**
384+
* Constructs an instance of [[zio.elasticsearch.aggregation.WeightedAvgAggregation]] using the specified parameters.
385+
*
386+
* @param name
387+
* aggregation name
388+
* @param valueField
389+
* the field that represents value for which weighted avg aggregation will be executed
390+
* @param weightField
391+
* the field that represents weight for which weighted avg aggregation will be executed
392+
* @return
393+
* an instance of [[zio.elasticsearch.aggregation.WeightedAvgAggregation]] that represents weighted avg aggregation
394+
* to be performed.
395+
*/
396+
final def weightedAvgAggregation(name: String, valueField: String, weightField: String): WeightedAvgAggregation =
397+
WeightedAvg(
398+
name = name,
399+
valueField = valueField,
400+
weightField = weightField,
401+
valueMissing = None,
402+
weightMissing = None
403+
)
404+
356405
}

modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,3 +356,63 @@ private[elasticsearch] final case class ValueCount(name: String, field: String)
356356
private[elasticsearch] def toJson: Json =
357357
Obj(name -> Obj("value_count" -> Obj("field" -> field.toJson)))
358358
}
359+
360+
sealed trait WeightedAvgAggregation extends SingleElasticAggregation with WithAgg {
361+
362+
/**
363+
* Sets the `valueMissing` parameter for the [[zio.elasticsearch.aggregation.WeightedAvgAggregation]].
364+
* The`valueMissing` parameter provides a value to use when a document is missing the value field that the aggregation
365+
* is running on.
366+
*
367+
* @param value
368+
* the value to use for missing documents
369+
* @return
370+
* an instance of the [[zio.elasticsearch.aggregation.WeightedAvgAggregation]] enriched with the `valueMissing`
371+
* parameter.
372+
*/
373+
def valueMissing(value: Double): WeightedAvgAggregation
374+
375+
/**
376+
* Sets the `weightMissing` parameter for the [[zio.elasticsearch.aggregation.WeightedAvgAggregation]].
377+
* The`weightMissing` parameter provides a value to use when a document is missing the weight field that the
378+
* aggregation is running on.
379+
*
380+
* @param value
381+
* the value to use for missing documents
382+
* @return
383+
* an instance of the [[zio.elasticsearch.aggregation.WeightedAvgAggregation]] enriched with the `weightMissing`
384+
* parameter.
385+
*/
386+
def weightMissing(value: Double): WeightedAvgAggregation
387+
}
388+
389+
private[elasticsearch] final case class WeightedAvg(
390+
name: String,
391+
valueField: String,
392+
weightField: String,
393+
valueMissing: Option[Double],
394+
weightMissing: Option[Double]
395+
) extends WeightedAvgAggregation { self =>
396+
397+
def valueMissing(value: Double): WeightedAvgAggregation =
398+
self.copy(valueMissing = Some(value))
399+
400+
def weightMissing(value: Double): WeightedAvgAggregation =
401+
self.copy(weightMissing = Some(value))
402+
403+
def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
404+
multipleAggregations.aggregations(self, agg)
405+
406+
private[elasticsearch] def toJson: Json = {
407+
val valueMissingJson: Json = valueMissing.fold(Obj())(m => Obj("missing" -> m.toJson))
408+
val weightMissingJson: Json = weightMissing.fold(Obj())(m => Obj("missing" -> m.toJson))
409+
410+
Obj(
411+
name -> Obj(
412+
"weighted_avg" -> (Obj("value" -> (Obj("field" -> valueField.toJson) merge valueMissingJson)) merge (Obj(
413+
"weight" -> (Obj("field" -> weightField.toJson) merge weightMissingJson)
414+
)))
415+
)
416+
)
417+
}
418+
}

modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ object AggregationResponse {
5959
)
6060
case ValueCountAggregationResponse(value) =>
6161
ValueCountAggregationResult(value)
62+
case WeightedAvgAggregationResponse(value) =>
63+
WeightedAvgAggregationResult(value)
6264
}
6365
}
6466

@@ -118,7 +120,6 @@ private[elasticsearch] final case class SumAggregationResponse(value: Double) ex
118120

119121
private[elasticsearch] object SumAggregationResponse {
120122
implicit val decoder: JsonDecoder[SumAggregationResponse] = DeriveJsonDecoder.gen[SumAggregationResponse]
121-
122123
}
123124

124125
private[elasticsearch] final case class TermsAggregationResponse(
@@ -154,6 +155,8 @@ private[elasticsearch] object TermsAggregationBucket {
154155
val objFields = data.unsafeAs[Obj].fields.toMap
155156

156157
(field: @unchecked) match {
158+
case str if str.contains("weighted_avg#") =>
159+
Some(field -> WeightedAvgAggregationResponse(value = objFields("value").unsafeAs[Double]))
157160
case str if str.contains("avg#") =>
158161
Some(field -> AvgAggregationResponse(value = objFields("value").unsafeAs[Double]))
159162
case str if str.contains("cardinality#") =>
@@ -199,6 +202,8 @@ private[elasticsearch] object TermsAggregationBucket {
199202
val subAggs = allFields.collect {
200203
case (field, data) if field != "key" && field != "doc_count" =>
201204
(field: @unchecked) match {
205+
case str if str.contains("weighted_avg#") =>
206+
(field.split("#")(1), data.asInstanceOf[WeightedAvgAggregationResponse])
202207
case str if str.contains("avg#") =>
203208
(field.split("#")(1), data.asInstanceOf[AvgAggregationResponse])
204209
case str if str.contains("cardinality#") =>
@@ -239,3 +244,10 @@ private[elasticsearch] object ValueCountAggregationResponse {
239244
implicit val decoder: JsonDecoder[ValueCountAggregationResponse] =
240245
DeriveJsonDecoder.gen[ValueCountAggregationResponse]
241246
}
247+
248+
private[elasticsearch] final case class WeightedAvgAggregationResponse(value: Double) extends AggregationResponse
249+
250+
private[elasticsearch] object WeightedAvgAggregationResponse {
251+
implicit val decoder: JsonDecoder[WeightedAvgAggregationResponse] =
252+
DeriveJsonDecoder.gen[WeightedAvgAggregationResponse]
253+
}

modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
7272
res.fields.map { case (field, data) =>
7373
ZValidation.fromEither(
7474
(field: @unchecked) match {
75+
case str if str.contains("weighted_avg#") =>
76+
WeightedAvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
7577
case str if str.contains("avg#") =>
7678
AvgAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
7779
case str if str.contains("max#") =>

modules/library/src/main/scala/zio/elasticsearch/package.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,19 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
179179
*/
180180
def asValueCountAggregation(name: String): RIO[R, Option[ValueCountAggregationResult]] =
181181
aggregationAs[ValueCountAggregationResult](name)
182+
183+
/**
184+
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
185+
*
186+
* @param name
187+
* the name of the aggregation to retrieve
188+
* @return
189+
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
190+
* [[result.WeightedAvgAggregationResult]].
191+
*/
192+
def asWeightedAvgAggregation(name: String): RIO[R, Option[WeightedAvgAggregationResult]] =
193+
aggregationAs[WeightedAvgAggregationResult](name)
194+
182195
}
183196

184197
final implicit class ZIODocumentOps[R, F[_]](zio: RIO[R, DocumentResult[F]]) {

modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,5 @@ final case class TermsAggregationBucketResult private[elasticsearch] (
7070
}
7171

7272
final case class ValueCountAggregationResult private[elasticsearch] (value: Int) extends AggregationResult
73+
74+
final case class WeightedAvgAggregationResult private[elasticsearch] (value: Double) extends AggregationResult

modules/library/src/main/scala/zio/elasticsearch/result/ElasticResult.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ private[elasticsearch] sealed trait ResultWithAggregation {
6565

6666
def asValueCountAggregation(name: String): IO[DecodingException, Option[ValueCountAggregationResult]] =
6767
aggregationAs[ValueCountAggregationResult](name)
68+
69+
def asWeightedAvgAggregation(name: String): IO[DecodingException, Option[WeightedAvgAggregationResult]] =
70+
aggregationAs[WeightedAvgAggregationResult](name)
6871
}
6972

7073
private[elasticsearch] sealed trait DocumentResult[F[_]] {

0 commit comments

Comments
 (0)