Skip to content

Commit 3dd4c3f

Browse files
committed
feat: add Parallel.
1 parent 8c15c74 commit 3dd4c3f

File tree

3 files changed

+249
-0
lines changed

3 files changed

+249
-0
lines changed

error.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,7 @@ import "errors"
55
var (
66
// ErrContextCanceled to indicate the context was canceled or timed out.
77
ErrContextCanceled error = errors.New("context canceled")
8+
// ErrInvalidConcurrency to indicate the number of the concurrency limitation is an invalid
9+
// value.
10+
ErrInvalidConcurrency error = errors.New("invalid concurrency")
811
)

parallel.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package async
2+
3+
import (
4+
"context"
5+
6+
"github.com/ghosind/utils"
7+
)
8+
9+
// Parallel runs the functions asynchronously with the specified concurrency limitation. It will
10+
// send a cancel sign to context and terminate immediately if any function returns an error or
11+
// panic, and also returns the index of the error function in the parameters list.
12+
//
13+
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
14+
// limitation if the number is 0.
15+
func Parallel(concurrency int, funcs ...AsyncFn) (int, error) {
16+
return parallel(context.Background(), concurrency, funcs...)
17+
}
18+
19+
// ParallelWithContext runs the functions asynchronously with the specified concurrency limitation.
20+
// It will send a cancel sign to context and terminate immediately if any function returns an error
21+
// or panic, and also returns the index of the error function in the parameters list. If the
22+
// context was canceled or timed out before all functions finished executing, it will send a cancel
23+
// sign to all uncompleted functions, and return the value of the index as -1 with a `context
24+
// canceled` error.
25+
//
26+
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
27+
// limitation if the number is 0.
28+
func ParallelWithContext(ctx context.Context, concurrency int, funcs ...AsyncFn) (int, error) {
29+
return parallel(ctx, concurrency, funcs...)
30+
}
31+
32+
// parallel runs the functions asynchronously with the specified concurrency.
33+
func parallel(parent context.Context, concurrency int, funcs ...AsyncFn) (int, error) {
34+
// the number of concurrency should be 0 (no limitation) or greater than 0.
35+
if concurrency < 0 {
36+
panic(ErrInvalidConcurrency)
37+
}
38+
39+
if len(funcs) == 0 {
40+
return -1, nil
41+
}
42+
43+
parent = getContext(parent)
44+
ctx, canFunc := context.WithCancel(parent)
45+
defer canFunc()
46+
47+
ch := make(chan executeResult) // channel for result
48+
var conch chan struct{} // channel for concurrency limit
49+
50+
// no concurrency limitation if the value of the number is 0
51+
if concurrency > 0 {
52+
conch = make(chan struct{}, concurrency)
53+
}
54+
55+
for i := 0; i < len(funcs); i++ {
56+
go runTaskInParallel(ctx, i, funcs[i], conch, ch)
57+
}
58+
59+
finished := 0
60+
for finished < len(funcs) {
61+
select {
62+
case <-parent.Done():
63+
return -1, ErrContextCanceled
64+
case ret := <-ch:
65+
if ret.Error != nil {
66+
return ret.Index, ret.Error
67+
}
68+
finished++
69+
}
70+
}
71+
72+
return -1, nil
73+
}
74+
75+
// runTaskInParallel runs the specified function for Parallel / ParallelWithContext.
76+
func runTaskInParallel(
77+
ctx context.Context,
78+
n int,
79+
fn AsyncFn,
80+
conch chan struct{},
81+
ch chan executeResult,
82+
) {
83+
if conch != nil {
84+
conch <- struct{}{}
85+
}
86+
87+
childCtx, childCanFunc := context.WithCancel(ctx)
88+
defer childCanFunc()
89+
90+
err := utils.Try(func() error {
91+
return fn(childCtx)
92+
})
93+
94+
if conch != nil {
95+
<-conch
96+
}
97+
98+
select {
99+
case <-ctx.Done():
100+
return
101+
default:
102+
ch <- executeResult{
103+
Index: n,
104+
Error: err,
105+
}
106+
}
107+
}

parallel_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/ghosind/go-assert"
10+
)
11+
12+
func TestParallel(t *testing.T) {
13+
a := assert.New(t)
14+
15+
index, err := Parallel(0)
16+
a.NilNow(err)
17+
a.EqualNow(index, -1)
18+
19+
a.PanicNow(func() {
20+
Parallel(-1)
21+
})
22+
}
23+
24+
func TestParallelWithoutConcurrencyLimit(t *testing.T) {
25+
a := assert.New(t)
26+
27+
funcs := make([]AsyncFn, 0, 5)
28+
for i := 0; i < 5; i++ {
29+
funcs = append(funcs, func(ctx context.Context) error {
30+
time.Sleep(100 * time.Millisecond)
31+
return nil
32+
})
33+
}
34+
35+
start := time.Now()
36+
index, err := Parallel(0, funcs...)
37+
dur := time.Since(start)
38+
a.NilNow(err)
39+
a.EqualNow(index, -1)
40+
a.TrueNow(dur-100*time.Millisecond < 5*time.Millisecond) // allow 5ms deviation
41+
}
42+
43+
func TestParallelWithConcurrencyLimit(t *testing.T) {
44+
a := assert.New(t)
45+
46+
funcs := make([]AsyncFn, 0, 5)
47+
for i := 0; i < 5; i++ {
48+
funcs = append(funcs, func(ctx context.Context) error {
49+
time.Sleep(100 * time.Millisecond)
50+
return nil
51+
})
52+
}
53+
54+
start := time.Now()
55+
index, err := Parallel(2, funcs...)
56+
dur := time.Since(start)
57+
a.NilNow(err)
58+
a.EqualNow(index, -1)
59+
a.TrueNow(dur-300*time.Millisecond < 10*time.Millisecond) // allow 10ms deviation
60+
}
61+
62+
func TestParallelWithFailedTask(t *testing.T) {
63+
a := assert.New(t)
64+
65+
expectedErr := errors.New("expected error")
66+
67+
funcs := make([]AsyncFn, 0, 5)
68+
for i := 0; i < 5; i++ {
69+
n := i
70+
funcs = append(funcs, func(ctx context.Context) error {
71+
time.Sleep(100 * time.Millisecond)
72+
if n == 2 {
73+
return expectedErr
74+
}
75+
return nil
76+
})
77+
}
78+
79+
index, err := Parallel(2, funcs...)
80+
a.EqualNow(err, expectedErr)
81+
a.EqualNow(index, 2)
82+
}
83+
84+
func TestParallelWithContext(t *testing.T) {
85+
a := assert.New(t)
86+
87+
funcs := make([]AsyncFn, 0, 5)
88+
res := make([]bool, 5)
89+
for i := 0; i < 5; i++ {
90+
n := i
91+
funcs = append(funcs, func(ctx context.Context) error {
92+
time.Sleep(100 * time.Millisecond)
93+
res[n] = true
94+
return nil
95+
})
96+
}
97+
98+
index, err := ParallelWithContext(context.Background(), 2, funcs...)
99+
a.NilNow(err)
100+
a.EqualNow(index, -1)
101+
102+
finishedNum := 0
103+
for _, v := range res {
104+
if v {
105+
finishedNum++
106+
}
107+
}
108+
a.EqualNow(finishedNum, 5)
109+
}
110+
111+
func TestParallelWithTimedOutContext(t *testing.T) {
112+
a := assert.New(t)
113+
114+
funcs := make([]AsyncFn, 0, 5)
115+
res := make([]bool, 5)
116+
for i := 0; i < 5; i++ {
117+
n := i
118+
funcs = append(funcs, func(ctx context.Context) error {
119+
time.Sleep(100 * time.Millisecond)
120+
res[n] = true
121+
return nil
122+
})
123+
}
124+
125+
ctx, canFunc := context.WithTimeout(context.Background(), 150*time.Millisecond)
126+
defer canFunc()
127+
128+
index, err := ParallelWithContext(ctx, 2, funcs...)
129+
a.TrueNow(errors.Is(err, ErrContextCanceled))
130+
a.EqualNow(index, -1)
131+
132+
finishedNum := 0
133+
for _, v := range res {
134+
if v {
135+
finishedNum++
136+
}
137+
}
138+
a.EqualNow(finishedNum, 2)
139+
}

0 commit comments

Comments
 (0)