Skip to content

Commit 1562cbf

Browse files
committed
refactor: apply ExecutionErrors to AllCompleted and ParallelCompleted.
1 parent 35a82e5 commit 1562cbf

File tree

4 files changed

+60
-50
lines changed

4 files changed

+60
-50
lines changed

all.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package async
33
import (
44
"context"
55
"sync"
6+
"sync/atomic"
67
)
78

89
// All executes the functions asynchronously until all functions have been finished. If some
@@ -89,7 +90,7 @@ func runTaskInAll(ctx context.Context, n int, fn AsyncFn, ch chan<- executeResul
8990
// AllCompleted executes the functions asynchronously until all functions have been finished. It
9091
// will return an error slice that is ordered by the functions order, and a boolean value to
9192
// indicate whether any functions return an error or panic.
92-
func AllCompleted(funcs ...AsyncFn) ([][]any, []error, bool) {
93+
func AllCompleted(funcs ...AsyncFn) ([][]any, error) {
9394
return allCompleted(context.Background(), funcs...)
9495
}
9596

@@ -100,7 +101,7 @@ func AllCompleted(funcs ...AsyncFn) ([][]any, []error, bool) {
100101
func AllCompletedWithContext(
101102
ctx context.Context,
102103
funcs ...AsyncFn,
103-
) ([][]any, []error, bool) {
104+
) ([][]any, error) {
104105
return allCompleted(ctx, funcs...)
105106
}
106107

@@ -109,18 +110,19 @@ func AllCompletedWithContext(
109110
func allCompleted(
110111
parent context.Context,
111112
funcs ...AsyncFn,
112-
) (out [][]any, errs []error, hasError bool) {
113+
) ([][]any, error) {
113114
validateAsyncFuncs(funcs...)
114115

115-
hasError = false
116-
errs = make([]error, len(funcs))
117-
out = make([][]any, len(funcs))
116+
out := make([][]any, len(funcs))
118117
if len(funcs) == 0 {
119-
return
118+
return out, nil
120119
}
121120

122121
parent = getContext(parent)
123122

123+
errs := make([]error, len(funcs))
124+
errNum := atomic.Int32{}
125+
124126
wg := sync.WaitGroup{}
125127
wg.Add(len(funcs))
126128

@@ -134,14 +136,19 @@ func allCompleted(
134136

135137
ret, err := invokeAsyncFn(fn, childCtx, nil)
136138
if err != nil {
137-
hasError = true
139+
errNum.Add(1)
138140
errs[n] = err
139141
}
140142
out[n] = ret
141143
}(i)
142144
}
143145

144146
wg.Wait()
147+
if errNum.Load() == 0 {
148+
return out, nil
149+
}
150+
151+
err := convertErrorListToExecutionErrors(errs, int(errNum.Load()))
145152

146-
return
153+
return out, err
147154
}

all_test.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,8 @@ func BenchmarkAll(b *testing.B) {
112112
func TestAllCompletedWithoutFuncs(t *testing.T) {
113113
a := assert.New(t)
114114

115-
out, errs, hasError := AllCompleted()
116-
a.NotTrueNow(hasError)
117-
a.EqualNow(errs, []error{})
115+
out, err := AllCompleted()
116+
a.NilNow(err)
118117
a.EqualNow(out, [][]any{})
119118
}
120119

@@ -132,10 +131,9 @@ func TestAllCompletedSuccess(t *testing.T) {
132131
})
133132
}
134133

135-
out, errs, hasError := AllCompleted(funcs...)
136-
a.NotTrueNow(hasError)
134+
out, err := AllCompleted(funcs...)
135+
a.NilNow(err)
137136
a.EqualNow(data, []bool{true, true, true, true, true})
138-
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
139137
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
140138
}
141139

@@ -158,23 +156,22 @@ func TestAllCompletedPartialFailure(t *testing.T) {
158156
})
159157
}
160158

161-
out, errs, hasError := AllCompleted(funcs...)
162-
a.TrueNow(hasError)
159+
out, err := AllCompleted(funcs...)
160+
a.NotNilNow(err)
161+
a.EqualNow(err.Error(), "function 2 error: n = 2")
163162
a.EqualNow(data, []bool{true, true, false, true, true})
164-
a.EqualNow(errs, []error{nil, nil, errNIs2, nil, nil})
165163
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, errNIs2}, {3, nil}, {4, nil}})
166164
}
167165

168166
func TestAllCompletedWithNilContext(t *testing.T) {
169167
a := assert.New(t)
170168

171169
//lint:ignore SA1012 for test case only
172-
out, errs, hasError := AllCompletedWithContext(nil, func(ctx context.Context) error {
170+
out, err := AllCompletedWithContext(nil, func(ctx context.Context) error {
173171
time.Sleep(100 * time.Millisecond)
174172
return nil
175173
})
176-
a.NotTrueNow(hasError)
177-
a.EqualNow(errs, []error{nil})
174+
a.NilNow(err)
178175
a.EqualNow(out, [][]any{{nil}})
179176
}
180177

@@ -202,10 +199,12 @@ func TestAllCompletedWithTimeoutContext(t *testing.T) {
202199
ctx, canFunc := context.WithTimeout(context.Background(), 150*time.Millisecond)
203200
defer canFunc()
204201

205-
out, errs, hasError := AllCompletedWithContext(ctx, funcs...)
206-
a.TrueNow(hasError)
202+
out, err := AllCompletedWithContext(ctx, funcs...)
203+
a.NotNilNow(err)
204+
a.EqualNow(err.Error(), `function 2 error: timeout
205+
function 3 error: timeout
206+
function 4 error: timeout`)
207207
a.EqualNow(data, []bool{true, true, false, false, false})
208-
a.EqualNow(errs, []error{nil, nil, errTimeout, errTimeout, errTimeout})
209208
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, errTimeout}, {3, errTimeout}, {4, errTimeout}})
210209
}
211210

parallel.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package async
33
import (
44
"context"
55
"sync"
6+
"sync/atomic"
67
)
78

89
// Parallel runs the functions asynchronously with the specified concurrency limitation. It will
@@ -122,7 +123,7 @@ func runTaskInParallel(
122123
//
123124
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
124125
// limitation if the number is 0.
125-
func ParallelCompleted(concurrency int, funcs ...AsyncFn) ([][]any, []error, bool) {
126+
func ParallelCompleted(concurrency int, funcs ...AsyncFn) ([][]any, error) {
126127
return parallelCompleted(context.Background(), concurrency, funcs...)
127128
}
128129

@@ -138,7 +139,7 @@ func ParallelCompletedWithContext(
138139
ctx context.Context,
139140
concurrency int,
140141
funcs ...AsyncFn,
141-
) ([][]any, []error, bool) {
142+
) ([][]any, error) {
142143
return parallelCompleted(ctx, concurrency, funcs...)
143144
}
144145

@@ -148,22 +149,22 @@ func parallelCompleted(
148149
parent context.Context,
149150
concurrency int,
150151
funcs ...AsyncFn,
151-
) ([][]any, []error, bool) {
152+
) ([][]any, error) {
152153
// the number of concurrency should be 0 (no limitation) or greater than 0.
153154
if concurrency < 0 {
154155
panic(ErrInvalidConcurrency)
155156
}
156157
validateAsyncFuncs(funcs...)
157158

158159
out := make([][]any, len(funcs))
159-
errs := make([]error, len(funcs))
160-
hasError := false
161160

162161
if len(funcs) == 0 {
163-
return out, errs, hasError
162+
return out, nil
164163
}
165164

166165
ctx := getContext(parent)
166+
errs := make([]error, len(funcs))
167+
errNum := atomic.Int32{}
167168

168169
wg := sync.WaitGroup{}
169170
wg.Add(len(funcs))
@@ -189,7 +190,7 @@ func parallelCompleted(
189190
ret, err := invokeAsyncFn(fn, childCtx, nil)
190191
if err != nil {
191192
errs[n] = err
192-
hasError = true
193+
errNum.Add(1)
193194
}
194195
out[n] = ret
195196

@@ -200,6 +201,11 @@ func parallelCompleted(
200201
}
201202

202203
wg.Wait()
204+
if errNum.Load() == 0 {
205+
return out, nil
206+
}
207+
208+
err := convertErrorListToExecutionErrors(errs, int(errNum.Load()))
203209

204-
return out, errs, hasError
210+
return out, err
205211
}

parallel_test.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,8 @@ func BenchmarkParallel(b *testing.B) {
154154
func TestParallelCompleted(t *testing.T) {
155155
a := assert.New(t)
156156

157-
out, errs, hasError := ParallelCompleted(0)
158-
a.NotTrueNow(hasError)
159-
a.EqualNow(errs, []error{})
157+
out, err := ParallelCompleted(0)
158+
a.NilNow(err)
160159
a.EqualNow(out, [][]any{})
161160

162161
a.PanicNow(func() {
@@ -177,10 +176,9 @@ func TestParallelCompletedWithoutConcurrencyLimit(t *testing.T) {
177176
}
178177

179178
start := time.Now()
180-
out, errs, hasError := ParallelCompleted(0, funcs...)
179+
out, err := ParallelCompleted(0, funcs...)
181180
dur := time.Since(start)
182-
a.NotTrueNow(hasError)
183-
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
181+
a.NilNow(err)
184182
a.TrueNow(dur-100*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
185183
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
186184
}
@@ -198,10 +196,9 @@ func TestParallelCompletedWithConcurrencyLimit(t *testing.T) {
198196
}
199197

200198
start := time.Now()
201-
out, errs, hasError := ParallelCompleted(2, funcs...)
199+
out, err := ParallelCompleted(2, funcs...)
202200
dur := time.Since(start)
203-
a.NotTrueNow(hasError)
204-
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
201+
a.NilNow(err)
205202
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
206203
a.TrueNow(dur-300*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
207204
}
@@ -225,9 +222,9 @@ func TestParallelCompletedWithFailedTask(t *testing.T) {
225222
})
226223
}
227224

228-
out, errs, hasError := ParallelCompleted(0, funcs...)
229-
a.TrueNow(hasError)
230-
a.EqualNow(errs, []error{nil, nil, expectedErr, nil, nil})
225+
out, err := ParallelCompleted(0, funcs...)
226+
a.NotNilNow(err)
227+
a.EqualNow(err.Error(), "function 2 error: expected error")
231228
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, expectedErr}, {3, nil}, {4, nil}})
232229
}
233230

@@ -245,9 +242,8 @@ func TestParallelCompletedWithContext(t *testing.T) {
245242
})
246243
}
247244

248-
out, errs, hasError := ParallelCompletedWithContext(context.Background(), 2, funcs...)
249-
a.NotTrueNow(hasError)
250-
a.EqualNow(errs, []error{nil, nil, nil, nil, nil})
245+
out, err := ParallelCompletedWithContext(context.Background(), 2, funcs...)
246+
a.NilNow(err)
251247
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
252248

253249
finishedNum := 0
@@ -284,9 +280,11 @@ func TestParallelCompletedWithTimedOutContext(t *testing.T) {
284280
ctx, canFunc := context.WithTimeout(context.Background(), 150*time.Millisecond)
285281
defer canFunc()
286282

287-
out, errs, hasError := ParallelCompletedWithContext(ctx, 2, funcs...)
288-
a.TrueNow(hasError)
289-
a.EqualNow(errs, []error{nil, nil, timeoutErr, timeoutErr, timeoutErr})
283+
out, err := ParallelCompletedWithContext(ctx, 2, funcs...)
284+
a.NotNilNow(err)
285+
a.EqualNow(err.Error(), `function 2 error: timed out
286+
function 3 error: timed out
287+
function 4 error: timed out`)
290288
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, timeoutErr}, {3, timeoutErr}, {4, timeoutErr}})
291289

292290
finishedNum := 0

0 commit comments

Comments
 (0)