Skip to content

Commit cabfb0c

Browse files
authored
[parallelisation] New groups and Store options (#683)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description - new store option to be able to limit function to one single execution - new store option to be able to set the number of workers executing in parallel - A new group to execute contextual function - Some more helpers ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent d7ebb44 commit cabfb0c

File tree

11 files changed

+837
-271
lines changed

11 files changed

+837
-271
lines changed

changes/20250820002654.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Added new groups (ContextualFunctionGroup) and new Store options to configure the execution (number of workers, single execution, etc.)

changes/20250820140853.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Added new compound execution group to support nested execution groups

utils/parallelisation/cancel_functions.go

Lines changed: 13 additions & 236 deletions
Original file line numberDiff line numberDiff line change
@@ -5,245 +5,23 @@
55

66
package parallelisation
77

8-
import (
9-
"context"
8+
import "context"
109

11-
"github.com/sasha-s/go-deadlock"
12-
"golang.org/x/sync/errgroup"
13-
14-
"github.com/ARM-software/golang-utils/utils/commonerrors"
15-
"github.com/ARM-software/golang-utils/utils/reflection"
16-
)
17-
18-
type StoreOptions struct {
19-
clearOnExecution bool
20-
stopOnFirstError bool
21-
sequential bool
22-
reverse bool
23-
joinErrors bool
24-
}
25-
type StoreOption func(*StoreOptions) *StoreOptions
26-
27-
// StopOnFirstError stops store execution on first error.
28-
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
29-
if o == nil {
30-
return o
31-
}
32-
o.stopOnFirstError = true
33-
o.joinErrors = false
34-
return o
35-
}
36-
37-
// JoinErrors will collate any errors which happened when executing functions in store.
38-
// This option should not be used in combination to StopOnFirstError.
39-
var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
40-
if o == nil {
41-
return o
42-
}
43-
o.stopOnFirstError = false
44-
o.joinErrors = true
45-
return o
46-
}
47-
48-
// ExecuteAll executes all functions in the store even if an error is raised. the first error raised is then returned.
49-
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
50-
if o == nil {
51-
return o
52-
}
53-
o.stopOnFirstError = false
54-
return o
55-
}
56-
57-
// ClearAfterExecution clears the store after execution.
58-
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
59-
if o == nil {
60-
return o
61-
}
62-
o.clearOnExecution = true
63-
return o
64-
}
65-
66-
// RetainAfterExecution keep the store intact after execution (no reset).
67-
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
68-
if o == nil {
69-
return o
70-
}
71-
o.clearOnExecution = false
72-
return o
73-
}
74-
75-
// Parallel ensures every function registered in the store is executed concurrently in the order they were registered.
76-
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
77-
if o == nil {
78-
return o
79-
}
80-
o.sequential = false
81-
return o
82-
}
83-
84-
// Sequential ensures every function registered in the store is executed sequentially in the order they were registered.
85-
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
86-
if o == nil {
87-
return o
88-
}
89-
o.sequential = true
90-
return o
91-
}
92-
93-
// SequentialInReverse ensures every function registered in the store is executed sequentially but in the reverse order they were registered.
94-
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
95-
if o == nil {
96-
return o
97-
}
98-
o.sequential = true
99-
o.reverse = true
100-
return o
101-
}
102-
103-
func newFunctionStore[T any](executeFunc func(context.Context, T) error, options ...StoreOption) *store[T] {
104-
105-
opts := &StoreOptions{}
106-
107-
for i := range options {
108-
opts = options[i](opts)
109-
}
110-
return &store[T]{
111-
mu: deadlock.RWMutex{},
112-
functions: make([]T, 0),
113-
executeFunc: executeFunc,
114-
options: *opts,
115-
}
116-
}
117-
118-
type store[T any] struct {
119-
mu deadlock.RWMutex
120-
functions []T
121-
executeFunc func(ctx context.Context, element T) error
122-
options StoreOptions
123-
}
124-
125-
func (s *store[T]) RegisterFunction(function ...T) {
126-
defer s.mu.Unlock()
127-
s.mu.Lock()
128-
s.functions = append(s.functions, function...)
129-
}
130-
131-
func (s *store[T]) Len() int {
132-
defer s.mu.RUnlock()
133-
s.mu.RLock()
134-
return len(s.functions)
135-
}
136-
137-
func (s *store[T]) Execute(ctx context.Context) (err error) {
138-
defer s.mu.Unlock()
139-
s.mu.Lock()
140-
if reflection.IsEmpty(s.executeFunc) {
141-
return commonerrors.New(commonerrors.ErrUndefined, "the store was not initialised correctly")
142-
}
143-
144-
if s.options.sequential {
145-
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse, s.options.joinErrors)
146-
} else {
147-
err = s.executeConcurrently(ctx, s.options.stopOnFirstError, s.options.joinErrors)
148-
}
149-
150-
if err == nil && s.options.clearOnExecution {
151-
s.functions = make([]T, 0, len(s.functions))
152-
}
153-
return
154-
}
155-
156-
func (s *store[T]) executeConcurrently(ctx context.Context, stopOnFirstError bool, collateErrors bool) error {
157-
g, gCtx := errgroup.WithContext(ctx)
158-
if !stopOnFirstError {
159-
gCtx = ctx
160-
}
161-
funcNum := len(s.functions)
162-
errCh := make(chan error, funcNum)
163-
g.SetLimit(funcNum)
164-
for i := range s.functions {
165-
g.Go(func() error {
166-
_, subErr := s.executeFunction(gCtx, s.functions[i])
167-
errCh <- subErr
168-
return subErr
169-
})
170-
}
171-
err := g.Wait()
172-
close(errCh)
173-
if collateErrors {
174-
collateErr := make([]error, funcNum)
175-
i := 0
176-
for subErr := range errCh {
177-
collateErr[i] = subErr
178-
i++
179-
}
180-
err = commonerrors.Join(collateErr...)
181-
}
182-
183-
return err
10+
type CancelFunctionStore struct {
11+
ExecutionGroup[context.CancelFunc]
18412
}
18513

186-
func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse, collateErrors bool) (err error) {
187-
err = DetermineContextError(ctx)
188-
if err != nil {
189-
return
190-
}
191-
funcNum := len(s.functions)
192-
collateErr := make([]error, funcNum)
193-
if reverse {
194-
for i := funcNum - 1; i >= 0; i-- {
195-
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
196-
collateErr[funcNum-i-1] = subErr
197-
if shouldBreak {
198-
err = subErr
199-
return
200-
}
201-
if subErr != nil && err == nil {
202-
err = subErr
203-
if stopOnFirstError {
204-
return
205-
}
206-
}
207-
}
208-
} else {
209-
for i := range s.functions {
210-
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
211-
collateErr[i] = subErr
212-
if shouldBreak {
213-
err = subErr
214-
return
215-
}
216-
if subErr != nil && err == nil {
217-
err = subErr
218-
if stopOnFirstError {
219-
return
220-
}
221-
}
222-
}
223-
}
224-
225-
if collateErrors {
226-
err = commonerrors.Join(collateErr...)
227-
}
228-
return
14+
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
15+
s.ExecutionGroup.RegisterFunction(cancel...)
22916
}
23017

231-
func (s *store[T]) executeFunction(ctx context.Context, element T) (mustBreak bool, err error) {
232-
err = DetermineContextError(ctx)
233-
if err != nil {
234-
mustBreak = true
18+
func (s *CancelFunctionStore) RegisterCancelStore(store *CancelFunctionStore) {
19+
if store == nil {
23520
return
23621
}
237-
err = s.executeFunc(ctx, element)
238-
return
239-
}
240-
241-
type CancelFunctionStore struct {
242-
store[context.CancelFunc]
243-
}
244-
245-
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
246-
s.store.RegisterFunction(cancel...)
22+
s.RegisterCancelFunction(func() {
23+
store.Cancel()
24+
})
24725
}
24826

24927
// Cancel will execute the cancel functions in the store. Any errors will be ignored and Execute() is recommended if you need to know if a cancellation failed
@@ -252,15 +30,14 @@ func (s *CancelFunctionStore) Cancel() {
25230
}
25331

25432
func (s *CancelFunctionStore) Len() int {
255-
return s.store.Len()
33+
return s.ExecutionGroup.Len()
25634
}
25735

25836
// NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed and cleared. In other words, options `RetainAfterExecution` and `StopOnFirstError` would be discarded if selected to create the Cancel store
25937
func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore {
26038
return &CancelFunctionStore{
261-
store: *newFunctionStore[context.CancelFunc](func(_ context.Context, cancelFunc context.CancelFunc) error {
262-
cancelFunc()
263-
return nil
39+
ExecutionGroup: *NewExecutionGroup[context.CancelFunc](func(ctx context.Context, cancelFunc context.CancelFunc) error {
40+
return WrapCancelToContextualFunc(cancelFunc)(ctx)
26441
}, append(options, ClearAfterExecution, ExecuteAll)...),
26542
}
26643
}

utils/parallelisation/cancel_functions_test.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13+
"go.uber.org/atomic"
1314

1415
"github.com/ARM-software/golang-utils/utils/commonerrors"
1516
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
@@ -19,24 +20,35 @@ func testCancelStore(t *testing.T, store *CancelFunctionStore) {
1920
t.Helper()
2021
require.NotNil(t, store)
2122
// Set up some fake CancelFuncs to make sure they are called
22-
called1 := false
23-
called2 := false
23+
called1 := atomic.NewBool(false)
24+
called2 := atomic.NewBool(false)
25+
called3 := atomic.NewBool(false)
26+
2427
cancelFunc1 := func() {
25-
called1 = true
28+
called1.Store(true)
2629
}
2730
cancelFunc2 := func() {
28-
called2 = true
31+
called2.Store(true)
32+
}
33+
cancelFunc3 := func() {
34+
called3.Store(true)
2935
}
36+
subStore := NewCancelFunctionsStore()
37+
subStore.RegisterCancelFunction(cancelFunc3)
3038

3139
store.RegisterCancelFunction(cancelFunc1, cancelFunc2)
40+
store.RegisterCancelStore(subStore)
41+
store.RegisterCancelStore(nil)
3242

33-
assert.Equal(t, 2, store.Len())
34-
assert.False(t, called1)
35-
assert.False(t, called2)
43+
assert.Equal(t, 3, store.Len())
44+
assert.False(t, called1.Load())
45+
assert.False(t, called2.Load())
46+
assert.False(t, called3.Load())
3647
store.Cancel()
3748

38-
assert.True(t, called1)
39-
assert.True(t, called2)
49+
assert.True(t, called1.Load())
50+
assert.True(t, called2.Load())
51+
assert.True(t, called3.Load())
4052
}
4153

4254
// Given a CancelFunctionsStore
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package parallelisation
2+
3+
import (
4+
"context"
5+
6+
"github.com/ARM-software/golang-utils/utils/commonerrors"
7+
)
8+
9+
// DetermineContextError determines what the context error is if any.
10+
func DetermineContextError(ctx context.Context) error {
11+
return commonerrors.ConvertContextError(ctx.Err())
12+
}
13+
14+
type ContextualFunc func(ctx context.Context) error
15+
16+
type ContextualFunctionGroup struct {
17+
ExecutionGroup[ContextualFunc]
18+
}
19+
20+
// NewContextualGroup returns a group executing contextual functions.
21+
func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup {
22+
return &ContextualFunctionGroup{
23+
ExecutionGroup: *NewExecutionGroup[ContextualFunc](func(ctx context.Context, contextualF ContextualFunc) error {
24+
return contextualF(ctx)
25+
}, options...),
26+
}
27+
}
28+
29+
// ForEach executes all the contextual functions according to the store options and returns an error if one occurred.
30+
func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
31+
group := NewContextualGroup(ExecuteAll(executionOptions).Options()...)
32+
group.RegisterFunction(contextualFunc...)
33+
return group.Execute(ctx)
34+
}
35+
36+
// BreakOnError executes each functions in the group until an error is found or the context gets cancelled.
37+
func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
38+
group := NewContextualGroup(StopOnFirstError(executionOptions).Options()...)
39+
group.RegisterFunction(contextualFunc...)
40+
return group.Execute(ctx)
41+
}

0 commit comments

Comments
 (0)