Skip to content

Commit 6f228c6

Browse files
committed
refactor: apply execution error to All,, Parallel, and Race.
1 parent aef794a commit 6f228c6

File tree

6 files changed

+81
-48
lines changed

6 files changed

+81
-48
lines changed

all.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,31 @@ import (
66
)
77

88
// All executes the functions asynchronously until all functions have been finished. If some
9-
// function returns an error or panic, it will immediately return the index of the function and the
10-
// error, and send a cancel signal to all other functions by context.
9+
// function returns an error or panic, it will immediately return an execution error, and send a
10+
// cancel signal to all other functions by context.
1111
//
1212
// The index of the function will be -1 if all functions have been completed without error or
1313
// panic.
14-
func All(funcs ...AsyncFn) ([][]any, int, error) {
14+
func All(funcs ...AsyncFn) ([][]any, error) {
1515
return all(context.Background(), funcs...)
1616
}
1717

1818
// AllWithContext executes the functions asynchronously until all functions have been finished, or
1919
// the context is done (canceled or timeout). If some function returns an error or panic, it will
20-
// immediately return the index of the index and the error and send a cancel signal to all other
21-
// functions by context.
20+
// immediately return an execution error and send a cancel signal to all other functions by
21+
// context.
2222
//
2323
// The index of the function will be -1 if all functions have been completed without error or
2424
// panic, or the context has been canceled (or timeout) before all functions finished.
25-
func AllWithContext(ctx context.Context, funcs ...AsyncFn) ([][]any, int, error) {
25+
func AllWithContext(ctx context.Context, funcs ...AsyncFn) ([][]any, error) {
2626
return all(ctx, funcs...)
2727
}
2828

2929
// all executes the functions asynchronously until all functions have been finished, or the context
3030
// is done (canceled or timeout).
31-
func all(parent context.Context, funcs ...AsyncFn) ([][]any, int, error) {
31+
func all(parent context.Context, funcs ...AsyncFn) ([][]any, error) {
3232
if len(funcs) == 0 {
33-
return nil, -1, nil
33+
return nil, nil
3434
}
3535
validateAsyncFuncs(funcs...)
3636

@@ -51,17 +51,20 @@ func all(parent context.Context, funcs ...AsyncFn) ([][]any, int, error) {
5151
for finished < len(funcs) {
5252
select {
5353
case <-parent.Done():
54-
return out, -1, ErrContextCanceled
54+
return out, ErrContextCanceled
5555
case ret := <-ch:
5656
out[ret.Index] = ret.Out
5757
if ret.Error != nil {
58-
return out, ret.Index, ret.Error
58+
return out, &executionError{
59+
index: ret.Index,
60+
err: ret.Error,
61+
}
5962
}
6063
finished++
6164
}
6265
}
6366

64-
return out, -1, nil
67+
return out, nil
6568
}
6669

6770
// runTaskInAll runs the specified function for All / AllWithContext.

all_test.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ import (
1212
func TestAllWithoutFuncs(t *testing.T) {
1313
a := assert.New(t)
1414

15-
out, index, err := All()
15+
out, err := All()
1616
a.NilNow(err)
17-
a.EqualNow(index, -1)
1817
a.NilNow(out)
1918
}
2019

@@ -32,9 +31,8 @@ func TestAllSuccess(t *testing.T) {
3231
})
3332
}
3433

35-
out, index, err := All(funcs...)
34+
out, err := All(funcs...)
3635
a.NilNow(err)
37-
a.EqualNow(index, -1)
3836
a.EqualNow(data, []bool{true, true, true, true, true})
3937
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
4038
}
@@ -57,10 +55,9 @@ func TestAllFailure(t *testing.T) {
5755
})
5856
}
5957

60-
out, index, err := All(funcs...)
58+
out, err := All(funcs...)
6159
a.NotNilNow(err)
62-
a.EqualNow(index, 2)
63-
a.EqualNow(err, expectedErr)
60+
a.EqualNow(err.Error(), "function 2 error: n = 2")
6461
a.EqualNow(data, []bool{true, true, false, false, false})
6562
a.EqualNow(out, [][]any{{nil}, {nil}, {expectedErr}, nil, nil})
6663
}
@@ -69,12 +66,11 @@ func TestAllWithNilContext(t *testing.T) {
6966
a := assert.New(t)
7067

7168
//lint:ignore SA1012 for test case only
72-
out, index, err := AllWithContext(nil, func(ctx context.Context) error {
69+
out, err := AllWithContext(nil, func(ctx context.Context) error {
7370
time.Sleep(100 * time.Millisecond)
7471
return nil
7572
})
7673
a.NilNow(err)
77-
a.EqualNow(index, -1)
7874
a.EqualNow(out, [][]any{{nil}})
7975
}
8076

@@ -95,9 +91,8 @@ func TestAllWithTimeoutContext(t *testing.T) {
9591
ctx, canFunc := context.WithTimeout(context.Background(), 150*time.Millisecond)
9692
defer canFunc()
9793

98-
out, index, err := AllWithContext(ctx, funcs...)
94+
out, err := AllWithContext(ctx, funcs...)
9995
a.NotNilNow(err)
100-
a.EqualNow(index, -1)
10196
a.TrueNow(errors.Is(err, ErrContextCanceled))
10297
a.EqualNow(data, []bool{true, true, false, false, false})
10398
a.EqualNow(out, [][]any{{nil}, {nil}, nil, nil, nil})

parallel.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,32 @@ import (
77

88
// Parallel runs the functions asynchronously with the specified concurrency limitation. It will
99
// send a cancel sign to context and terminate immediately if any function returns an error or
10-
// panic, and also returns the index of the error function in the parameters list.
10+
// panic, and also returns an execution error to indicate the error.
1111
//
1212
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
1313
// limitation if the number is 0.
14-
func Parallel(concurrency int, funcs ...AsyncFn) ([][]any, int, error) {
14+
func Parallel(concurrency int, funcs ...AsyncFn) ([][]any, error) {
1515
return parallel(context.Background(), concurrency, funcs...)
1616
}
1717

1818
// ParallelWithContext runs the functions asynchronously with the specified concurrency limitation.
1919
// It will send a cancel sign to context and terminate immediately if any function returns an error
20-
// or panic, and also returns the index of the error function in the parameters list. If the
21-
// context was canceled or timed out before all functions finished executing, it will send a cancel
22-
// sign to all uncompleted functions, and return the value of the index as -1 with a `context
23-
// canceled` error.
20+
// or panic, and also returns an execution error to indicate the error. If the context was canceled
21+
// or timed out before all functions finished executing, it will send a cancel sign to all
22+
// uncompleted functions, and return a context canceled error.
2423
//
2524
// The number of concurrency must be greater than or equal to 0, and it means no concurrency
2625
// limitation if the number is 0.
2726
func ParallelWithContext(
2827
ctx context.Context,
2928
concurrency int,
3029
funcs ...AsyncFn,
31-
) ([][]any, int, error) {
30+
) ([][]any, error) {
3231
return parallel(ctx, concurrency, funcs...)
3332
}
3433

3534
// parallel runs the functions asynchronously with the specified concurrency.
36-
func parallel(parent context.Context, concurrency int, funcs ...AsyncFn) ([][]any, int, error) {
35+
func parallel(parent context.Context, concurrency int, funcs ...AsyncFn) ([][]any, error) {
3736
// the number of concurrency should be 0 (no limitation) or greater than 0.
3837
if concurrency < 0 {
3938
panic(ErrInvalidConcurrency)
@@ -42,7 +41,7 @@ func parallel(parent context.Context, concurrency int, funcs ...AsyncFn) ([][]an
4241

4342
out := make([][]any, len(funcs))
4443
if len(funcs) == 0 {
45-
return out, -1, nil
44+
return out, nil
4645
}
4746

4847
parent = getContext(parent)
@@ -71,17 +70,20 @@ func parallel(parent context.Context, concurrency int, funcs ...AsyncFn) ([][]an
7170
for finished < len(funcs) {
7271
select {
7372
case <-parent.Done():
74-
return out, -1, ErrContextCanceled
73+
return out, ErrContextCanceled
7574
case ret := <-ch:
7675
out[ret.Index] = ret.Out
7776
if ret.Error != nil {
78-
return out, ret.Index, ret.Error
77+
return out, &executionError{
78+
index: ret.Index,
79+
err: ret.Error,
80+
}
7981
}
8082
finished++
8183
}
8284
}
8385

84-
return out, -1, nil
86+
return out, nil
8587
}
8688

8789
// runTaskInParallel runs the specified function for Parallel / ParallelWithContext.

parallel_test.go

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ import (
1212
func TestParallel(t *testing.T) {
1313
a := assert.New(t)
1414

15-
out, index, err := Parallel(0)
15+
out, err := Parallel(0)
1616
a.NilNow(err)
17-
a.EqualNow(index, -1)
1817
a.EqualNow(out, [][]any{})
1918

2019
a.PanicNow(func() {
@@ -35,10 +34,9 @@ func TestParallelWithoutConcurrencyLimit(t *testing.T) {
3534
}
3635

3736
start := time.Now()
38-
out, index, err := Parallel(0, funcs...)
37+
out, err := Parallel(0, funcs...)
3938
dur := time.Since(start)
4039
a.NilNow(err)
41-
a.EqualNow(index, -1)
4240
a.TrueNow(dur-100*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
4341
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
4442
}
@@ -56,10 +54,9 @@ func TestParallelWithConcurrencyLimit(t *testing.T) {
5654
}
5755

5856
start := time.Now()
59-
out, index, err := Parallel(2, funcs...)
57+
out, err := Parallel(2, funcs...)
6058
dur := time.Since(start)
6159
a.NilNow(err)
62-
a.EqualNow(index, -1)
6360
a.TrueNow(dur-300*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
6461
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
6562
}
@@ -84,10 +81,10 @@ func TestParallelWithFailedTask(t *testing.T) {
8481
}
8582

8683
start := time.Now()
87-
out, index, err := Parallel(2, funcs...)
84+
out, err := Parallel(2, funcs...)
8885
dur := time.Since(start)
89-
a.EqualNow(err, expectedErr)
90-
a.EqualNow(index, 2)
86+
a.NotNilNow(err)
87+
a.EqualNow(err.Error(), "function 2 error: expected error")
9188
a.TrueNow(dur-150*time.Millisecond < 30*time.Millisecond) // allow 30ms deviation
9289
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, expectedErr}, nil, nil})
9390
}
@@ -106,9 +103,8 @@ func TestParallelWithContext(t *testing.T) {
106103
})
107104
}
108105

109-
out, index, err := ParallelWithContext(context.Background(), 2, funcs...)
106+
out, err := ParallelWithContext(context.Background(), 2, funcs...)
110107
a.NilNow(err)
111-
a.EqualNow(index, -1)
112108
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, {2, nil}, {3, nil}, {4, nil}})
113109

114110
finishedNum := 0
@@ -137,9 +133,9 @@ func TestParallelWithTimedOutContext(t *testing.T) {
137133
ctx, canFunc := context.WithTimeout(context.Background(), 150*time.Millisecond)
138134
defer canFunc()
139135

140-
out, index, err := ParallelWithContext(ctx, 2, funcs...)
136+
out, err := ParallelWithContext(ctx, 2, funcs...)
137+
a.NotNilNow(err)
141138
a.TrueNow(errors.Is(err, ErrContextCanceled))
142-
a.EqualNow(index, -1)
143139
a.EqualNow(res, []bool{true, true, false, false, false})
144140
a.EqualNow(out, [][]any{{0, nil}, {1, nil}, nil, nil, nil})
145141
}

race.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ func race(ctx context.Context, funcs ...AsyncFn) ([]any, int, error) {
4949
}
5050

5151
ret := <-ch
52+
if ret.Error != nil {
53+
return ret.Out, ret.Index, &executionError{
54+
index: ret.Index,
55+
err: ret.Error,
56+
}
57+
}
5258

53-
return ret.Out, ret.Index, ret.Error
59+
return ret.Out, ret.Index, nil
5460
}

race_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,37 @@ func TestRace(t *testing.T) {
4242
a.EqualNow(data, []bool{true, true, true, true, true})
4343
}
4444

45+
func TestRaceWithFailed(t *testing.T) {
46+
a := assert.New(t)
47+
48+
data := make([]bool, 5)
49+
funcs := make([]AsyncFn, 0, 5)
50+
expectedErr := errors.New("expected error")
51+
for i := 0; i < 5; i++ {
52+
n := i
53+
funcs = append(funcs, func(ctx context.Context) (int, error) {
54+
if n == 2 {
55+
time.Sleep(25 * time.Millisecond)
56+
return n, expectedErr
57+
} else {
58+
time.Sleep(time.Duration((n+1)*50) * time.Millisecond)
59+
data[n] = true
60+
return n, nil
61+
}
62+
})
63+
}
64+
65+
out, index, err := Race(funcs...)
66+
a.NotNilNow(err)
67+
a.EqualNow(err.Error(), "function 2 error: expected error")
68+
a.EqualNow(index, 2)
69+
a.EqualNow(data, []bool{false, false, false, false, false})
70+
a.EqualNow(out, []any{2, expectedErr})
71+
72+
time.Sleep(300 * time.Millisecond)
73+
a.EqualNow(data, []bool{true, true, false, true, true})
74+
}
75+
4576
func TestRaceWithNilContext(t *testing.T) {
4677
a := assert.New(t)
4778

0 commit comments

Comments
 (0)