Skip to content

Commit ef3e8f4

Browse files
authored
New Aggregate function: return map[string]interface{} (#157)
1 parent cc7ddce commit ef3e8f4

File tree

4 files changed

+150
-27
lines changed

4 files changed

+150
-27
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ jobs:
5050
TLS_CACERT=redis/tests/tls/ca.crt
5151
5252
53-
build: # test with redisearch:latest
53+
build:
5454
docker:
5555
- image: circleci/golang:1.16
5656
- image: redislabs/redisearch:edge

redisearch/aggregate.go

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func (q AggregateQuery) Serialize() redis.Args {
281281

282282
// Deprecated: Please use processAggReply() instead
283283
func ProcessAggResponse(res []interface{}) [][]string {
284-
aggregateReply := make([][]string, len(res), len(res))
284+
aggregateReply := make([][]string, len(res))
285285
for i := 0; i < len(res); i++ {
286286
if d, e := redis.Strings(res[i], nil); e == nil {
287287
aggregateReply[i] = d
@@ -293,14 +293,15 @@ func ProcessAggResponse(res []interface{}) [][]string {
293293
return aggregateReply
294294
}
295295

296+
// Deprecated: Please use processAggQueryReply() instead
296297
func processAggReply(res []interface{}) (total int, aggregateReply [][]string, err error) {
297298
aggregateReply = [][]string{}
298299
total = 0
299-
aggregate_results := len(res) - 1
300-
if aggregate_results > 0 {
301-
total = aggregate_results
302-
aggregateReply = make([][]string, aggregate_results, aggregate_results)
303-
for i := 0; i < aggregate_results; i++ {
300+
aggregateResults := len(res) - 1
301+
if aggregateResults > 0 {
302+
total = aggregateResults
303+
aggregateReply = make([][]string, aggregateResults)
304+
for i := 0; i < aggregateResults; i++ {
304305
if d, e := redis.Strings(res[i+1], nil); e == nil {
305306
aggregateReply[i] = d
306307
} else {
@@ -312,13 +313,33 @@ func processAggReply(res []interface{}) (total int, aggregateReply [][]string, e
312313
return
313314
}
314315

316+
// New Aggregate reply processor
317+
func processAggQueryReply(res []interface{}) (total int, aggregateReply []map[string]interface{}, err error) {
318+
aggregateReply = []map[string]interface{}{}
319+
total = 0
320+
aggregateResults := len(res) - 1
321+
if aggregateResults > 0 {
322+
total = aggregateResults
323+
aggregateReply = make([]map[string]interface{}, aggregateResults)
324+
for i := 0; i < aggregateResults; i++ {
325+
if d, e := mapToStrings(res[i+1], nil); e == nil {
326+
aggregateReply[i] = d
327+
} else {
328+
err = fmt.Errorf("Error parsing Aggregate Reply: %v on reply position %d", e, i)
329+
aggregateReply[i] = nil
330+
}
331+
}
332+
}
333+
return
334+
}
335+
315336
func ProcessAggResponseSS(res []interface{}) [][]string {
316337
var lout = len(res)
317-
aggregateReply := make([][]string, lout, lout)
338+
aggregateReply := make([][]string, lout)
318339
for i := 0; i < lout; i++ {
319340
reply := res[i].([]interface{})
320341
linner := len(reply)
321-
aggregateReply[i] = make([]string, linner, linner)
342+
aggregateReply[i] = make([]string, linner)
322343
for j := 0; j < linner; j++ {
323344
if reply[j] == nil {
324345
log.Print(fmt.Sprintf("Error parsing Aggregate Reply on position (%d,%d)", i, j))
@@ -330,3 +351,35 @@ func ProcessAggResponseSS(res []interface{}) [][]string {
330351
}
331352
return aggregateReply
332353
}
354+
355+
// mapToStrings is a helper that converts an array (alternating key, value) into a map[string]interface{}.
356+
// The value can be string or []string. Numbers will be treated as strings. Requires an even number of
357+
// values in result.
358+
func mapToStrings(result interface{}, err error) (map[string]interface{}, error) {
359+
values, err := redis.Values(result, err)
360+
if err != nil {
361+
return nil, err
362+
}
363+
if len(values)%2 != 0 {
364+
return nil, fmt.Errorf("redigo: mapToStrings expects even number of values result")
365+
}
366+
m := make(map[string]interface{}, len(values)/2)
367+
for i := 0; i < len(values); i += 2 {
368+
key, okKey := redis.String(values[i], err)
369+
if okKey != nil {
370+
return nil, fmt.Errorf("mapToStrings key not a bulk string value")
371+
}
372+
373+
var value interface{}
374+
value, okValue := redis.String(values[i+1], err)
375+
if okValue != nil {
376+
value, okValue = redis.Strings(values[i+1], err)
377+
}
378+
if okValue != nil && okValue != redis.ErrNil {
379+
return nil, fmt.Errorf("mapToStrings value got unexpected element type: %T", values[i+1])
380+
}
381+
382+
m[string(key)] = value
383+
}
384+
return m, nil
385+
}

redisearch/aggregate_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func Init() {
148148

149149
AddValues(c)
150150
}
151+
151152
func TestAggregateGroupBy(t *testing.T) {
152153
Init()
153154
c := createClient("docs-games-idx1")
@@ -161,6 +162,10 @@ func TestAggregateGroupBy(t *testing.T) {
161162
_, count, err := c.Aggregate(q1)
162163
assert.Nil(t, err)
163164
assert.Equal(t, 5, count)
165+
166+
count, _, err = c.AggregateQuery(q1)
167+
assert.Nil(t, err)
168+
assert.Equal(t, 5, count)
164169
}
165170

166171
func TestAggregateMinMax(t *testing.T) {
@@ -181,6 +186,13 @@ func TestAggregateMinMax(t *testing.T) {
181186
assert.GreaterOrEqual(t, f, 88.0)
182187
assert.Less(t, f, 89.0)
183188

189+
_, rep, err := c.AggregateQuery(q1)
190+
assert.Nil(t, err)
191+
fmt.Println(rep[0])
192+
f, _ = strconv.ParseFloat(rep[0]["minPrice"].(string), 64)
193+
assert.GreaterOrEqual(t, f, 88.0)
194+
assert.Less(t, f, 89.0)
195+
184196
q2 := NewAggregateQuery().SetQuery(NewQuery("sony")).
185197
GroupBy(*NewGroupBy().AddFields("@brand").
186198
Reduce(*NewReducer(GroupByReducerCount, []string{})).
@@ -193,6 +205,12 @@ func TestAggregateMinMax(t *testing.T) {
193205
f, _ = strconv.ParseFloat(row[5], 64)
194206
assert.GreaterOrEqual(t, f, 695.0)
195207
assert.Less(t, f, 696.0)
208+
209+
_, rep, err = c.AggregateQuery(q2)
210+
assert.Nil(t, err)
211+
f, _ = strconv.ParseFloat(rep[0]["maxPrice"].(string), 64)
212+
assert.GreaterOrEqual(t, f, 695.0)
213+
assert.Less(t, f, 696.0)
196214
}
197215

198216
func TestAggregateCountDistinct(t *testing.T) {
@@ -208,6 +226,27 @@ func TestAggregateCountDistinct(t *testing.T) {
208226
assert.Nil(t, err)
209227
row := res[0]
210228
assert.Equal(t, "1484", row[3])
229+
230+
_, rep, err := c.AggregateQuery(q1)
231+
assert.Nil(t, err)
232+
assert.Equal(t, "1484", rep[0]["count_distinct(title)"])
233+
}
234+
235+
func TestAggregateToList(t *testing.T) {
236+
Init()
237+
c := createClient("docs-games-idx1")
238+
239+
q1 := NewAggregateQuery().
240+
GroupBy(*NewGroupBy().AddFields("@brand").
241+
Reduce(*NewReducer(GroupByReducerToList, []string{"@brand"})))
242+
243+
total, reply, err := c.AggregateQuery(q1) // Can't be used with Aggregate when using ToList!
244+
assert.Nil(t, err)
245+
assert.Equal(t, 292, total)
246+
_, ok := reply[0]["brand"].(string)
247+
assert.True(t, ok)
248+
_, ok = reply[0]["__generated_aliastolistbrand"].([]string)
249+
assert.True(t, ok)
211250
}
212251

213252
func TestAggregateFilter(t *testing.T) {
@@ -226,6 +265,12 @@ func TestAggregateFilter(t *testing.T) {
226265
assert.Greater(t, f, 5.0)
227266
}
228267

268+
_, rep, err := c.AggregateQuery(q1)
269+
assert.Nil(t, err)
270+
for _, row := range rep {
271+
f, _ := strconv.ParseFloat(row["count"].(string), 64)
272+
assert.Greater(t, f, 5.0)
273+
}
229274
}
230275

231276
func makeAggResponseInterface(seed int64, nElements int, responseSizes []int) (res []interface{}) {

redisearch/client.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -245,26 +245,12 @@ func (i *Client) SpellCheck(q *Query, s *SpellCheckOptions) (suggs []MisspelledT
245245
return
246246
}
247247

248-
// Aggregate
248+
// Deprecated: Use AggregateQuery() instead.
249249
func (i *Client) Aggregate(q *AggregateQuery) (aggregateReply [][]string, total int, err error) {
250-
conn := i.pool.Get()
251-
defer conn.Close()
252-
hasCursor := q.WithCursor
253-
validCursor := q.CursorHasResults()
254-
var res []interface{} = nil
255-
if !validCursor {
256-
args := redis.Args{i.name}
257-
args = append(args, q.Serialize()...)
258-
res, err = redis.Values(conn.Do("FT.AGGREGATE", args...))
259-
} else {
260-
args := redis.Args{"READ", i.name, q.Cursor.Id}
261-
res, err = redis.Values(conn.Do("FT.CURSOR", args...))
262-
}
263-
if err != nil {
264-
return
265-
}
250+
res, err := i.aggregate(q)
251+
266252
// has no cursor
267-
if !hasCursor {
253+
if !q.WithCursor {
268254
total, aggregateReply, err = processAggReply(res)
269255
// has cursor
270256
} else {
@@ -278,7 +264,46 @@ func (i *Client) Aggregate(q *AggregateQuery) (aggregateReply [][]string, total
278264
}
279265
total, aggregateReply, err = processAggReply(partialResults)
280266
}
267+
return
268+
}
269+
270+
// AggregateQuery - New version to Aggregate() function. The values in each map can be string or []string.
271+
func (i *Client) AggregateQuery(q *AggregateQuery) (total int, aggregateReply []map[string]interface{}, err error) {
272+
res, err := i.aggregate(q)
281273

274+
// has no cursor
275+
if !q.WithCursor {
276+
total, aggregateReply, err = processAggQueryReply(res)
277+
// has cursor
278+
} else {
279+
var partialResults, err = redis.Values(res[0], nil)
280+
if err != nil {
281+
return total, aggregateReply, err
282+
}
283+
q.Cursor.Id, err = redis.Int(res[1], nil)
284+
if err != nil {
285+
return total, aggregateReply, err
286+
}
287+
total, aggregateReply, err = processAggQueryReply(partialResults)
288+
}
289+
return
290+
}
291+
292+
func (i *Client) aggregate(q *AggregateQuery) (res []interface{}, err error) {
293+
conn := i.pool.Get()
294+
defer conn.Close()
295+
validCursor := q.CursorHasResults()
296+
if !validCursor {
297+
args := redis.Args{i.name}
298+
args = append(args, q.Serialize()...)
299+
res, err = redis.Values(conn.Do("FT.AGGREGATE", args...))
300+
} else {
301+
args := redis.Args{"READ", i.name, q.Cursor.Id}
302+
res, err = redis.Values(conn.Do("FT.CURSOR", args...))
303+
}
304+
if err != nil {
305+
return
306+
}
282307
return
283308
}
284309

0 commit comments

Comments
 (0)