Skip to content

Commit 08fea4b

Browse files
committed
feat: add ParallelComplete function.
1 parent 46170de commit 08fea4b

File tree

2 files changed

+230
-0
lines changed

2 files changed

+230
-0
lines changed

parallel.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package async
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/ghosind/utils"
78
)
@@ -105,3 +106,86 @@ func runTaskInParallel(
105106
}
106107
}
107108
}
109+
110+
// ParallelComplete runs the functions asynchronously with the specified concurrency limitation. It
111+
// returns an error array and a boolean value to indicate whether any function panics or returns an
112+
// error, and you can get the error details from the error array by the indices of the functions in
113+
// the parameter list. It will return until all of the functions are finished.
114+
//
115+
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
116+
// limitation if the number is 0.
117+
func ParallelComplete(concurrency int, funcs ...AsyncFn) ([]error, bool) {
118+
return parallelComplete(context.Background(), concurrency, funcs...)
119+
}
120+
121+
// ParallelCompleteWithContext runs the functions asynchronously with the specified concurrency
122+
// limitation and the context. It returns an error array and a boolean value to indicate whether
123+
// any function panics or returns an error, and you can get the error details from the error array
124+
// by the indices of the functions in the parameter list. It will return until all of the functions
125+
// are finished.
126+
//
127+
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
128+
// limitation if the number is 0.
129+
func ParallelCompleteWithContext(
130+
ctx context.Context,
131+
concurrency int,
132+
funcs ...AsyncFn,
133+
) ([]error, bool) {
134+
return parallelComplete(ctx, concurrency, funcs...)
135+
}
136+
137+
// parallelComplete runs the functions asynchronously with the specified concurrency until all of
138+
// the functions are finished.
139+
func parallelComplete(parent context.Context, concurrency int, funcs ...AsyncFn) ([]error, bool) {
140+
// the number of concurrency should be 0 (no limitation) or greater than 0.
141+
if concurrency < 0 {
142+
panic(ErrInvalidConcurrency)
143+
}
144+
145+
errs := make([]error, len(funcs))
146+
hasError := false
147+
148+
if len(funcs) == 0 {
149+
return errs, hasError
150+
}
151+
152+
ctx := getContext(parent)
153+
154+
wg := sync.WaitGroup{}
155+
wg.Add(len(funcs))
156+
157+
var conch chan struct{} // channel for concurrency limit
158+
// no concurrency limitation if the value of the number is 0
159+
if concurrency > 0 {
160+
conch = make(chan struct{}, concurrency)
161+
}
162+
163+
for i := 0; i < len(funcs); i++ {
164+
go func(n int) {
165+
defer wg.Done()
166+
167+
if conch != nil {
168+
conch <- struct{}{}
169+
}
170+
171+
fn := funcs[n]
172+
childCtx, childCanFunc := context.WithCancel(ctx)
173+
defer childCanFunc()
174+
175+
if err := utils.Try(func() error {
176+
return fn(childCtx)
177+
}); err != nil {
178+
errs[n] = err
179+
hasError = true
180+
}
181+
182+
if conch != nil {
183+
<-conch
184+
}
185+
}(i)
186+
}
187+
188+
wg.Wait()
189+
190+
return errs, hasError
191+
}

parallel_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,149 @@ func TestParallelWithTimedOutContext(t *testing.T) {
137137
}
138138
a.EqualNow(finishedNum, 2)
139139
}
140+
141+
func TestParallelComplete(t *testing.T) {
142+
a := assert.New(t)
143+
144+
errs, hasError := ParallelComplete(0)
145+
a.NotTrueNow(hasError)
146+
a.EqualNow(errs, []error{})
147+
148+
a.PanicNow(func() {
149+
ParallelComplete(-1)
150+
})
151+
}
152+
153+
func TestParallelCompleteWithoutConcurrencyLimit(t *testing.T) {
154+
a := assert.New(t)
155+
156+
funcs := make([]AsyncFn, 0, 5)
157+
for i := 0; i < 5; i++ {
158+
funcs = append(funcs, func(ctx context.Context) error {
159+
time.Sleep(100 * time.Millisecond)
160+
return nil
161+
})
162+
}
163+
164+
start := time.Now()
165+
errs, hasError := ParallelComplete(0, funcs...)
166+
dur := time.Since(start)
167+
a.NotTrueNow(hasError)
168+
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
169+
a.TrueNow(dur-100*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
170+
}
171+
172+
func TestParallelCompleteWithConcurrencyLimit(t *testing.T) {
173+
a := assert.New(t)
174+
175+
funcs := make([]AsyncFn, 0, 5)
176+
for i := 0; i < 5; i++ {
177+
funcs = append(funcs, func(ctx context.Context) error {
178+
time.Sleep(100 * time.Millisecond)
179+
return nil
180+
})
181+
}
182+
183+
start := time.Now()
184+
errs, hasError := ParallelComplete(2, funcs...)
185+
dur := time.Since(start)
186+
a.NotTrueNow(hasError)
187+
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
188+
a.TrueNow(dur-300*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
189+
}
190+
191+
func TestParallelCompleteWithFailedTask(t *testing.T) {
192+
a := assert.New(t)
193+
194+
expectedErr := errors.New("expected error")
195+
196+
funcs := make([]AsyncFn, 0, 5)
197+
for i := 0; i < 5; i++ {
198+
n := i
199+
funcs = append(funcs, func(ctx context.Context) error {
200+
if n == 2 {
201+
time.Sleep(50 * time.Millisecond)
202+
return expectedErr
203+
} else {
204+
time.Sleep(100 * time.Millisecond)
205+
}
206+
return nil
207+
})
208+
}
209+
210+
errs, hasError := ParallelComplete(0, funcs...)
211+
a.TrueNow(hasError)
212+
a.EqualNow(errs, []error{nil, nil, expectedErr, nil, nil})
213+
}
214+
215+
func TestParallelCompleteWithContext(t *testing.T) {
216+
a := assert.New(t)
217+
218+
funcs := make([]AsyncFn, 0, 5)
219+
res := make([]bool, 5)
220+
for i := 0; i < 5; i++ {
221+
n := i
222+
funcs = append(funcs, func(ctx context.Context) error {
223+
time.Sleep(100 * time.Millisecond)
224+
res[n] = true
225+
return nil
226+
})
227+
}
228+
229+
errs, hasError := ParallelCompleteWithContext(context.Background(), 2, funcs...)
230+
a.NotTrueNow(hasError)
231+
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
232+
233+
finishedNum := 0
234+
for _, v := range res {
235+
if v {
236+
finishedNum++
237+
}
238+
}
239+
a.EqualNow(finishedNum, 5)
240+
}
241+
242+
func TestParallelCompleteWithTimedOutContext(t *testing.T) {
243+
a := assert.New(t)
244+
245+
timeoutErr := errors.New("timed out")
246+
247+
funcs := make([]AsyncFn, 0, 5)
248+
res := make([]bool, 5)
249+
for i := 0; i < 5; i++ {
250+
n := i
251+
funcs = append(funcs, func(ctx context.Context) error {
252+
timer := time.NewTimer(100 * time.Millisecond)
253+
254+
select {
255+
case <-ctx.Done():
256+
return timeoutErr
257+
case <-timer.C:
258+
res[n] = true
259+
return nil
260+
}
261+
})
262+
}
263+
264+
ctx, canFunc := context.WithTimeout(context.Background(), 150*time.Millisecond)
265+
defer canFunc()
266+
267+
errs, hasError := ParallelCompleteWithContext(ctx, 2, funcs...)
268+
a.TrueNow(hasError)
269+
270+
numErrors := 0
271+
for _, e := range errs {
272+
if errors.Is(e, timeoutErr) {
273+
numErrors++
274+
}
275+
}
276+
a.EqualNow(numErrors, 3)
277+
278+
finishedNum := 0
279+
for _, v := range res {
280+
if v {
281+
finishedNum++
282+
}
283+
}
284+
a.EqualNow(finishedNum, 2)
285+
}

0 commit comments

Comments
 (0)