Skip to content

Commit bdb10ca

Browse files
committed
test: add test cases for Paralleler.
1 parent 63ffec8 commit bdb10ca

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed

paralleler.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@ import (
55
"sync"
66
)
77

8+
// Paralleler is a tool to run the tasks with the specific concurrency, default no concurrency
9+
// limitation.
810
type Paralleler struct {
911
concurrency int
1012
ctx context.Context
1113
locker sync.Mutex
1214
tasks []AsyncFn
1315
}
1416

17+
// WithConcurrency sets the number of concurrency limitation.
1518
func (p *Paralleler) WithConcurrency(concurrency int) *Paralleler {
1619
if concurrency < 0 {
1720
panic(ErrInvalidConcurrency)
@@ -22,12 +25,14 @@ func (p *Paralleler) WithConcurrency(concurrency int) *Paralleler {
2225
return p
2326
}
2427

28+
// WithContext sets the context that passes to the tasks.
2529
func (p *Paralleler) WithContext(ctx context.Context) *Paralleler {
2630
p.ctx = ctx
2731

2832
return p
2933
}
3034

35+
// Add adds the functions into the pending tasks list.
3136
func (p *Paralleler) Add(funcs ...AsyncFn) *Paralleler {
3237
validateAsyncFuncs(funcs...)
3338

@@ -39,6 +44,18 @@ func (p *Paralleler) Add(funcs ...AsyncFn) *Paralleler {
3944
return p
4045
}
4146

47+
// Clear clears the paralleler's pending tasks list.
48+
func (p *Paralleler) Clear() *Paralleler {
49+
p.locker.Lock()
50+
defer p.locker.Unlock()
51+
52+
p.tasks = nil
53+
54+
return p
55+
}
56+
57+
// Run runs the tasks in the paralleler's pending list, it'll clear the pending list and return
58+
// the results of the tasks.
4259
func (p *Paralleler) Run() ([][]any, error) {
4360
tasks := p.getTasks()
4461
out := make([][]any, len(tasks))
@@ -74,6 +91,8 @@ func (p *Paralleler) Run() ([][]any, error) {
7491
return out, nil
7592
}
7693

94+
// getConcurrencyChan creates and returns a concurrency controlling channel by the specific number
95+
// of the concurrency limitation.
7796
func (p *Paralleler) getConcurrencyChan() chan empty {
7897
var conch chan empty
7998

@@ -84,6 +103,8 @@ func (p *Paralleler) getConcurrencyChan() chan empty {
84103
return conch
85104
}
86105

106+
// getTasks returns the tasks from the pending list, and clear the pending list to receiving new
107+
// tasks.
87108
func (p *Paralleler) getTasks() []AsyncFn {
88109
p.locker.Lock()
89110

@@ -95,6 +116,7 @@ func (p *Paralleler) getTasks() []AsyncFn {
95116
return tasks
96117
}
97118

119+
// runTasks runs the tasks with the concurrency limitation.
98120
func (p *Paralleler) runTasks(ctx context.Context, resCh chan executeResult, tasks []AsyncFn) {
99121
conch := p.getConcurrencyChan()
100122

@@ -107,6 +129,7 @@ func (p *Paralleler) runTasks(ctx context.Context, resCh chan executeResult, tas
107129
}
108130
}
109131

132+
// runTask runs the task function, and
110133
func (p *Paralleler) runTask(
111134
ctx context.Context,
112135
n int,

paralleler_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package async_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/ghosind/go-assert"
11+
"github.com/ghosind/go-async"
12+
)
13+
14+
func TestParalleler(t *testing.T) {
15+
a := assert.New(t)
16+
cnt := atomic.Int32{}
17+
18+
p := new(async.Paralleler)
19+
for i := 0; i < 5; i++ {
20+
p.Add(func() {
21+
cnt.Add(1)
22+
})
23+
}
24+
25+
a.EqualNow(cnt.Load(), 0)
26+
_, err := p.Run()
27+
a.Nil(err)
28+
a.EqualNow(cnt.Load(), 5)
29+
}
30+
31+
func TestParallelerAddTasks(t *testing.T) {
32+
a := assert.New(t)
33+
cnt := atomic.Int32{}
34+
35+
p := new(async.Paralleler)
36+
for i := 0; i < 5; i++ {
37+
p.Add(func() {
38+
cnt.Add(1)
39+
})
40+
}
41+
42+
_, err := p.Run()
43+
a.Nil(err)
44+
a.EqualNow(cnt.Load(), 5)
45+
46+
for i := 0; i < 3; i++ {
47+
p.Add(func() {
48+
cnt.Add(1)
49+
})
50+
}
51+
52+
_, err = p.Run()
53+
a.Nil(err)
54+
a.EqualNow(cnt.Load(), 8)
55+
}
56+
57+
func TestParallelerClear(t *testing.T) {
58+
a := assert.New(t)
59+
cnt := atomic.Int32{}
60+
61+
p := new(async.Paralleler)
62+
for i := 0; i < 5; i++ {
63+
p.Add(func() {
64+
cnt.Add(1)
65+
})
66+
}
67+
68+
p.Clear()
69+
70+
for i := 0; i < 3; i++ {
71+
p.Add(func() {
72+
cnt.Add(1)
73+
})
74+
}
75+
76+
_, err := p.Run()
77+
a.Nil(err)
78+
a.EqualNow(cnt.Load(), 3)
79+
}
80+
81+
func TestParallelerWithConcurrency(t *testing.T) {
82+
a := assert.New(t)
83+
cnt := atomic.Int32{}
84+
85+
p := new(async.Paralleler).WithConcurrency(2)
86+
for i := 0; i < 5; i++ {
87+
p.Add(func() {
88+
time.Sleep(50 * time.Millisecond)
89+
cnt.Add(1)
90+
})
91+
}
92+
93+
start := time.Now()
94+
_, err := p.Run()
95+
a.Nil(err)
96+
a.EqualNow(cnt.Load(), 5)
97+
98+
dur := time.Since(start)
99+
a.GtNow(dur, 150*time.Millisecond)
100+
a.LtNow(dur, 200*time.Millisecond)
101+
}
102+
103+
func TestParallelerWithContext(t *testing.T) {
104+
a := assert.New(t)
105+
cnt := atomic.Int32{}
106+
107+
ctx, canFunc := context.WithTimeout(context.Background(), 80*time.Millisecond)
108+
defer canFunc()
109+
110+
p := new(async.Paralleler).WithConcurrency(2).WithContext(ctx)
111+
for i := 0; i < 5; i++ {
112+
p.Add(func(ctx context.Context) {
113+
select {
114+
case <-ctx.Done():
115+
return
116+
default:
117+
time.Sleep(50 * time.Millisecond)
118+
cnt.Add(1)
119+
}
120+
})
121+
}
122+
123+
_, err := p.Run()
124+
a.EqualNow(err, async.ErrContextCanceled)
125+
a.EqualNow(cnt.Load(), 2)
126+
}
127+
128+
func ExampleParalleler() {
129+
p := new(async.Paralleler)
130+
131+
p.Add(func() int {
132+
return 1
133+
}).Add(func() int {
134+
return 2
135+
}).Add(func() string {
136+
return "Hello"
137+
})
138+
139+
ret, err := p.Run()
140+
fmt.Println(ret)
141+
fmt.Println(err)
142+
// Output:
143+
// [[1] [2] [Hello]]
144+
// <nil>
145+
}

0 commit comments

Comments
 (0)