Skip to content

Commit 3df4d8e

Browse files
committed
feat: add Paralleler type.
1 parent c98b9a3 commit 3df4d8e

File tree

3 files changed

+149
-121
lines changed

3 files changed

+149
-121
lines changed

parallel.go

Lines changed: 5 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -48,86 +48,13 @@ func ParallelWithContext(
4848

4949
// parallel runs the functions asynchronously with the specified concurrency.
5050
func parallel(parent context.Context, concurrency int, funcs ...AsyncFn) ([][]any, error) {
51-
// the number of concurrency should be 0 (no limitation) or greater than 0.
52-
if concurrency < 0 {
53-
panic(ErrInvalidConcurrency)
54-
}
55-
validateAsyncFuncs(funcs...)
56-
57-
out := make([][]any, len(funcs))
58-
if len(funcs) == 0 {
59-
return out, nil
60-
}
61-
62-
parent = getContext(parent)
63-
ctx, canFunc := context.WithCancel(parent)
64-
defer canFunc()
65-
66-
ch := make(chan executeResult, len(funcs)) // channel for result
67-
var conch chan empty // channel for concurrency limit
51+
paralleler := new(Paralleler).
52+
WithContext(parent).
53+
WithConcurrency(concurrency)
6854

69-
// no concurrency limitation if the value of the number is 0
70-
if concurrency > 0 {
71-
conch = make(chan empty, concurrency)
72-
}
55+
paralleler.Add(funcs...)
7356

74-
go func() {
75-
for i := 0; i < len(funcs); i++ {
76-
if conch != nil {
77-
conch <- empty{}
78-
}
79-
80-
go runTaskInParallel(ctx, i, funcs[i], conch, ch)
81-
}
82-
}()
83-
84-
finished := 0
85-
for finished < len(funcs) {
86-
select {
87-
case <-parent.Done():
88-
return out, ErrContextCanceled
89-
case ret := <-ch:
90-
out[ret.Index] = ret.Out
91-
if ret.Error != nil {
92-
return out, &executionError{
93-
index: ret.Index,
94-
err: ret.Error,
95-
}
96-
}
97-
finished++
98-
}
99-
}
100-
101-
return out, nil
102-
}
103-
104-
// runTaskInParallel runs the specified function for Parallel / ParallelWithContext.
105-
func runTaskInParallel(
106-
ctx context.Context,
107-
n int,
108-
fn AsyncFn,
109-
conch chan empty,
110-
ch chan executeResult,
111-
) {
112-
childCtx, childCanFunc := context.WithCancel(ctx)
113-
defer childCanFunc()
114-
115-
ret, err := invokeAsyncFn(fn, childCtx, nil)
116-
117-
if conch != nil {
118-
<-conch
119-
}
120-
121-
select {
122-
case <-ctx.Done():
123-
return
124-
default:
125-
ch <- executeResult{
126-
Index: n,
127-
Error: err,
128-
Out: ret,
129-
}
130-
}
57+
return paralleler.Run()
13158
}
13259

13360
// ParallelCompleted runs the functions asynchronously with the specified concurrency limitation.

paralleler.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
type Paralleler struct {
9+
concurrency int
10+
ctx context.Context
11+
locker sync.Mutex
12+
tasks []AsyncFn
13+
}
14+
15+
func (p *Paralleler) WithConcurrency(concurrency int) *Paralleler {
16+
if concurrency < 0 {
17+
panic(ErrInvalidConcurrency)
18+
}
19+
20+
p.concurrency = concurrency
21+
22+
return p
23+
}
24+
25+
func (p *Paralleler) WithContext(ctx context.Context) *Paralleler {
26+
p.ctx = ctx
27+
28+
return p
29+
}
30+
31+
func (p *Paralleler) Add(funcs ...AsyncFn) *Paralleler {
32+
validateAsyncFuncs(funcs...)
33+
34+
p.locker.Lock()
35+
defer p.locker.Unlock()
36+
37+
p.tasks = append(p.tasks, funcs...)
38+
39+
return p
40+
}
41+
42+
func (p *Paralleler) Run() ([][]any, error) {
43+
tasks := p.getTasks()
44+
out := make([][]any, len(tasks))
45+
if len(tasks) == 0 {
46+
return out, nil
47+
}
48+
49+
parent := getContext(p.ctx)
50+
ctx, canFunc := context.WithCancel(parent)
51+
defer canFunc()
52+
53+
ch := make(chan executeResult, len(tasks))
54+
55+
go p.runTasks(ctx, ch, tasks)
56+
57+
finished := 0
58+
for finished < len(tasks) {
59+
select {
60+
case <-parent.Done():
61+
return out, ErrContextCanceled
62+
case ret := <-ch:
63+
out[ret.Index] = ret.Out
64+
if ret.Error != nil {
65+
return out, &executionError{
66+
index: ret.Index,
67+
err: ret.Error,
68+
}
69+
}
70+
finished++
71+
}
72+
}
73+
74+
return out, nil
75+
}
76+
77+
func (p *Paralleler) getConcurrencyChan() chan empty {
78+
var conch chan empty
79+
80+
if p.concurrency > 0 {
81+
conch = make(chan empty, p.concurrency)
82+
}
83+
84+
return conch
85+
}
86+
87+
func (p *Paralleler) getTasks() []AsyncFn {
88+
p.locker.Lock()
89+
90+
tasks := p.tasks
91+
p.tasks = nil
92+
93+
p.locker.Unlock()
94+
95+
return tasks
96+
}
97+
98+
func (p *Paralleler) runTasks(ctx context.Context, resCh chan executeResult, tasks []AsyncFn) {
99+
conch := p.getConcurrencyChan()
100+
101+
for i := 0; i < len(tasks); i++ {
102+
if conch != nil {
103+
conch <- empty{}
104+
}
105+
106+
go p.runTask(ctx, i, tasks[i], conch, resCh)
107+
}
108+
}
109+
110+
func (p *Paralleler) runTask(
111+
ctx context.Context,
112+
n int,
113+
fn AsyncFn,
114+
conch chan empty,
115+
ch chan executeResult,
116+
) {
117+
childCtx, childCanFunc := context.WithCancel(ctx)
118+
defer childCanFunc()
119+
120+
ret, err := invokeAsyncFn(fn, childCtx, nil)
121+
122+
if conch != nil {
123+
<-conch
124+
}
125+
126+
select {
127+
case <-ctx.Done():
128+
return
129+
default:
130+
ch <- executeResult{
131+
Index: n,
132+
Error: err,
133+
Out: ret,
134+
}
135+
}
136+
}

times.go

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -57,51 +57,16 @@ func TimesSeriesWithContext(ctx context.Context, n int, fn AsyncFn) ([][]any, er
5757

5858
// times executes the function n times withe the specified concurrency.
5959
func times(parent context.Context, n, concurrency int, fn AsyncFn) ([][]any, error) {
60-
// the number of concurrency should be 0 (no limitation) or greater than 0.
61-
if concurrency < 0 {
62-
panic(ErrInvalidConcurrency)
63-
}
64-
validateAsyncFuncs(fn)
65-
66-
parent = getContext(parent)
67-
ctx, canFunc := context.WithCancel(parent)
68-
defer canFunc()
69-
70-
out := make([][]any, n)
71-
72-
ch := make(chan executeResult, n)
73-
var conch chan empty
60+
paralleler := new(Paralleler).
61+
WithConcurrency(concurrency).
62+
WithContext(parent)
7463

75-
if concurrency > 0 {
76-
conch = make(chan empty, concurrency)
64+
tasks := make([]AsyncFn, 0, n)
65+
for i := 0; i < n; i++ {
66+
tasks = append(tasks, fn)
7767
}
7868

79-
go func() {
80-
for i := 0; i < n; i++ {
81-
if conch != nil {
82-
conch <- empty{}
83-
}
84-
85-
go runTaskInParallel(ctx, i, fn, conch, ch)
86-
}
87-
}()
88-
89-
finished := 0
90-
for finished < n {
91-
select {
92-
case <-parent.Done():
93-
return out, ErrContextCanceled
94-
case ret := <-ch:
95-
out[ret.Index] = ret.Out
96-
if ret.Error != nil {
97-
return out, &executionError{
98-
index: ret.Index,
99-
err: ret.Error,
100-
}
101-
}
102-
finished++
103-
}
104-
}
69+
paralleler.Add(tasks...)
10570

106-
return out, nil
71+
return paralleler.Run()
10772
}

0 commit comments

Comments
 (0)