Skip to content

Commit f2dbe31

Browse files
committed
feat: add SeqGroups.
1 parent aab985f commit f2dbe31

File tree

2 files changed

+128
-0
lines changed

2 files changed

+128
-0
lines changed

seq.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,35 @@ func validateSeqFuncs(funcs ...AsyncFn) error {
7474

7575
return nil
7676
}
77+
78+
// SeGroups runs the functions group in order, and it will be terminated if any function returns error.
79+
func SeqGroups(groups ...[]AsyncFn) error {
80+
return seqGroups(context.Background(), groups...)
81+
}
82+
83+
func SeqGroupsWithContext(ctx context.Context, groups ...[]AsyncFn) error {
84+
return seqGroups(ctx, groups...)
85+
}
86+
87+
func seqGroups(ctx context.Context, groups ...[]AsyncFn) error {
88+
if len(groups) == 0 {
89+
return nil
90+
}
91+
92+
tasks := make([]AsyncFn, 0, len(groups))
93+
for _, group := range groups {
94+
validateAsyncFuncs(group...)
95+
task := func(funcs ...AsyncFn) AsyncFn {
96+
return func(ctx context.Context) error {
97+
_, err := all(ctx, funcs...)
98+
return err
99+
}
100+
}(group...)
101+
tasks = append(tasks, task)
102+
}
103+
104+
ctx = getContext(ctx)
105+
106+
_, err := seq(ctx, tasks...)
107+
return err
108+
}

seq_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync/atomic"
78
"testing"
9+
"time"
810

911
"github.com/ghosind/go-assert"
1012
"github.com/ghosind/go-async"
@@ -98,3 +100,97 @@ func ExampleSeq() {
98100
// [2]
99101
// <nil>
100102
}
103+
104+
func TestSeqGroups(t *testing.T) {
105+
a := assert.New(t)
106+
cnts := make([]atomic.Int32, 3)
107+
groups := make([][]async.AsyncFn, 0, 3)
108+
expectedCnts := []int{2, 3, 4}
109+
110+
for i := 0; i < 3; i++ {
111+
tasks := make([]async.AsyncFn, 0)
112+
idx := i
113+
for j := 0; j < i+2; j++ {
114+
tasks = append(tasks, func() {
115+
cnts[idx].Add(1)
116+
})
117+
}
118+
groups = append(groups, tasks)
119+
}
120+
121+
err := async.SeqGroups(groups...)
122+
a.NilNow(err)
123+
for i := 0; i < 3; i++ {
124+
a.EqualNow(cnts[i].Load(), expectedCnts[i])
125+
}
126+
}
127+
128+
func TestSeqGroupsWithoutTasks(t *testing.T) {
129+
a := assert.New(t)
130+
131+
err := async.SeqGroups()
132+
a.NilNow(err)
133+
}
134+
135+
func TestSeqGroupsWithFailure(t *testing.T) {
136+
a := assert.New(t)
137+
cnts := make([]atomic.Int32, 3)
138+
groups := make([][]async.AsyncFn, 0, 3)
139+
expectedErr := errors.New("expected error")
140+
expectedCnts := []int{2, 0, 0}
141+
142+
for i := 0; i < 3; i++ {
143+
tasks := make([]async.AsyncFn, 0)
144+
idx := i
145+
for j := 0; j < i+2; j++ {
146+
tasks = append(tasks, func() error {
147+
v := cnts[idx].Add(1)
148+
149+
if idx == 1 && v == 2 {
150+
return expectedErr
151+
}
152+
153+
return nil
154+
})
155+
}
156+
groups = append(groups, tasks)
157+
}
158+
159+
err := async.SeqGroups(groups...)
160+
a.NotNilNow(err)
161+
a.ContainsStringNow(err.Error(), expectedErr.Error())
162+
163+
for i := 0; i < 3; i++ {
164+
if i == 1 {
165+
continue
166+
}
167+
a.EqualNow(cnts[i].Load(), expectedCnts[i])
168+
}
169+
}
170+
171+
func TestSeqGroupsWithContext(t *testing.T) {
172+
a := assert.New(t)
173+
cnts := make([]atomic.Int32, 3)
174+
groups := make([][]async.AsyncFn, 0, 3)
175+
expectedCnts := []int{2, 3, 4}
176+
177+
for i := 0; i < 3; i++ {
178+
tasks := make([]async.AsyncFn, 0)
179+
idx := i
180+
for j := 0; j < i+2; j++ {
181+
tasks = append(tasks, func() {
182+
cnts[idx].Add(1)
183+
})
184+
}
185+
groups = append(groups, tasks)
186+
}
187+
188+
ctx, canFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
189+
defer canFunc()
190+
191+
err := async.SeqGroupsWithContext(ctx, groups...)
192+
a.NilNow(err)
193+
for i := 0; i < 3; i++ {
194+
a.EqualNow(cnts[i].Load(), expectedCnts[i])
195+
}
196+
}

0 commit comments

Comments
 (0)