Skip to content

Commit 999a6ab

Browse files
committed
[parallelisation] New groups and Store options
1 parent 12fe18a commit 999a6ab

File tree

9 files changed

+655
-257
lines changed

9 files changed

+655
-257
lines changed

changes/20250820002654.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Added new groups (ContextualFunctionGroup) and new Store options to configure the execution (number of workers, single execution, etc.)

utils/parallelisation/cancel_functions.go

Lines changed: 6 additions & 238 deletions
Original file line numberDiff line numberDiff line change
@@ -5,245 +5,14 @@
55

66
package parallelisation
77

8-
import (
9-
"context"
10-
11-
"github.com/sasha-s/go-deadlock"
12-
"golang.org/x/sync/errgroup"
13-
14-
"github.com/ARM-software/golang-utils/utils/commonerrors"
15-
"github.com/ARM-software/golang-utils/utils/reflection"
16-
)
17-
18-
type StoreOptions struct {
19-
clearOnExecution bool
20-
stopOnFirstError bool
21-
sequential bool
22-
reverse bool
23-
joinErrors bool
24-
}
25-
type StoreOption func(*StoreOptions) *StoreOptions
26-
27-
// StopOnFirstError stops store execution on first error.
28-
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
29-
if o == nil {
30-
return o
31-
}
32-
o.stopOnFirstError = true
33-
o.joinErrors = false
34-
return o
35-
}
36-
37-
// JoinErrors will collate any errors which happened when executing functions in store.
38-
// This option should not be used in combination to StopOnFirstError.
39-
var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
40-
if o == nil {
41-
return o
42-
}
43-
o.stopOnFirstError = false
44-
o.joinErrors = true
45-
return o
46-
}
47-
48-
// ExecuteAll executes all functions in the store even if an error is raised. the first error raised is then returned.
49-
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
50-
if o == nil {
51-
return o
52-
}
53-
o.stopOnFirstError = false
54-
return o
55-
}
56-
57-
// ClearAfterExecution clears the store after execution.
58-
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
59-
if o == nil {
60-
return o
61-
}
62-
o.clearOnExecution = true
63-
return o
64-
}
65-
66-
// RetainAfterExecution keep the store intact after execution (no reset).
67-
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
68-
if o == nil {
69-
return o
70-
}
71-
o.clearOnExecution = false
72-
return o
73-
}
74-
75-
// Parallel ensures every function registered in the store is executed concurrently in the order they were registered.
76-
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
77-
if o == nil {
78-
return o
79-
}
80-
o.sequential = false
81-
return o
82-
}
83-
84-
// Sequential ensures every function registered in the store is executed sequentially in the order they were registered.
85-
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
86-
if o == nil {
87-
return o
88-
}
89-
o.sequential = true
90-
return o
91-
}
92-
93-
// SequentialInReverse ensures every function registered in the store is executed sequentially but in the reverse order they were registered.
94-
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
95-
if o == nil {
96-
return o
97-
}
98-
o.sequential = true
99-
o.reverse = true
100-
return o
101-
}
102-
103-
func newFunctionStore[T any](executeFunc func(context.Context, T) error, options ...StoreOption) *store[T] {
104-
105-
opts := &StoreOptions{}
106-
107-
for i := range options {
108-
opts = options[i](opts)
109-
}
110-
return &store[T]{
111-
mu: deadlock.RWMutex{},
112-
functions: make([]T, 0),
113-
executeFunc: executeFunc,
114-
options: *opts,
115-
}
116-
}
117-
118-
type store[T any] struct {
119-
mu deadlock.RWMutex
120-
functions []T
121-
executeFunc func(ctx context.Context, element T) error
122-
options StoreOptions
123-
}
124-
125-
func (s *store[T]) RegisterFunction(function ...T) {
126-
defer s.mu.Unlock()
127-
s.mu.Lock()
128-
s.functions = append(s.functions, function...)
129-
}
130-
131-
func (s *store[T]) Len() int {
132-
defer s.mu.RUnlock()
133-
s.mu.RLock()
134-
return len(s.functions)
135-
}
136-
137-
func (s *store[T]) Execute(ctx context.Context) (err error) {
138-
defer s.mu.Unlock()
139-
s.mu.Lock()
140-
if reflection.IsEmpty(s.executeFunc) {
141-
return commonerrors.New(commonerrors.ErrUndefined, "the store was not initialised correctly")
142-
}
143-
144-
if s.options.sequential {
145-
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse, s.options.joinErrors)
146-
} else {
147-
err = s.executeConcurrently(ctx, s.options.stopOnFirstError, s.options.joinErrors)
148-
}
149-
150-
if err == nil && s.options.clearOnExecution {
151-
s.functions = make([]T, 0, len(s.functions))
152-
}
153-
return
154-
}
155-
156-
func (s *store[T]) executeConcurrently(ctx context.Context, stopOnFirstError bool, collateErrors bool) error {
157-
g, gCtx := errgroup.WithContext(ctx)
158-
if !stopOnFirstError {
159-
gCtx = ctx
160-
}
161-
funcNum := len(s.functions)
162-
errCh := make(chan error, funcNum)
163-
g.SetLimit(funcNum)
164-
for i := range s.functions {
165-
g.Go(func() error {
166-
_, subErr := s.executeFunction(gCtx, s.functions[i])
167-
errCh <- subErr
168-
return subErr
169-
})
170-
}
171-
err := g.Wait()
172-
close(errCh)
173-
if collateErrors {
174-
collateErr := make([]error, funcNum)
175-
i := 0
176-
for subErr := range errCh {
177-
collateErr[i] = subErr
178-
i++
179-
}
180-
err = commonerrors.Join(collateErr...)
181-
}
182-
183-
return err
184-
}
185-
186-
func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse, collateErrors bool) (err error) {
187-
err = DetermineContextError(ctx)
188-
if err != nil {
189-
return
190-
}
191-
funcNum := len(s.functions)
192-
collateErr := make([]error, funcNum)
193-
if reverse {
194-
for i := funcNum - 1; i >= 0; i-- {
195-
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
196-
collateErr[funcNum-i-1] = subErr
197-
if shouldBreak {
198-
err = subErr
199-
return
200-
}
201-
if subErr != nil && err == nil {
202-
err = subErr
203-
if stopOnFirstError {
204-
return
205-
}
206-
}
207-
}
208-
} else {
209-
for i := range s.functions {
210-
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
211-
collateErr[i] = subErr
212-
if shouldBreak {
213-
err = subErr
214-
return
215-
}
216-
if subErr != nil && err == nil {
217-
err = subErr
218-
if stopOnFirstError {
219-
return
220-
}
221-
}
222-
}
223-
}
224-
225-
if collateErrors {
226-
err = commonerrors.Join(collateErr...)
227-
}
228-
return
229-
}
230-
231-
func (s *store[T]) executeFunction(ctx context.Context, element T) (mustBreak bool, err error) {
232-
err = DetermineContextError(ctx)
233-
if err != nil {
234-
mustBreak = true
235-
return
236-
}
237-
err = s.executeFunc(ctx, element)
238-
return
239-
}
8+
import "context"
2409

24110
type CancelFunctionStore struct {
242-
store[context.CancelFunc]
11+
ExecutionGroup[context.CancelFunc]
24312
}
24413

24514
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
246-
s.store.RegisterFunction(cancel...)
15+
s.ExecutionGroup.RegisterFunction(cancel...)
24716
}
24817

24918
// Cancel will execute the cancel functions in the store. Any errors will be ignored and Execute() is recommended if you need to know if a cancellation failed
@@ -252,15 +21,14 @@ func (s *CancelFunctionStore) Cancel() {
25221
}
25322

25423
func (s *CancelFunctionStore) Len() int {
255-
return s.store.Len()
24+
return s.ExecutionGroup.Len()
25625
}
25726

25827
// NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed and cleared. In other words, options `RetainAfterExecution` and `StopOnFirstError` would be discarded if selected to create the Cancel store
25928
func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore {
26029
return &CancelFunctionStore{
261-
store: *newFunctionStore[context.CancelFunc](func(_ context.Context, cancelFunc context.CancelFunc) error {
262-
cancelFunc()
263-
return nil
30+
ExecutionGroup: *NewExecutionGroup[context.CancelFunc](func(ctx context.Context, cancelFunc context.CancelFunc) error {
31+
return WrapCancelToContextualFunc(cancelFunc)(ctx)
26432
}, append(options, ClearAfterExecution, ExecuteAll)...),
26533
}
26634
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package parallelisation
2+
3+
import (
4+
"context"
5+
6+
"github.com/ARM-software/golang-utils/utils/commonerrors"
7+
)
8+
9+
// DetermineContextError determines what the context error is if any.
10+
func DetermineContextError(ctx context.Context) error {
11+
return commonerrors.ConvertContextError(ctx.Err())
12+
}
13+
14+
type ContextualFunctionGroup struct {
15+
ExecutionGroup[ContextualFunc]
16+
}
17+
18+
// NewContextualGroup returns a group executing contextual functions.
19+
func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup {
20+
return &ContextualFunctionGroup{
21+
ExecutionGroup: *NewExecutionGroup[ContextualFunc](func(ctx context.Context, contextualF ContextualFunc) error {
22+
return contextualF(ctx)
23+
}, options...),
24+
}
25+
}
26+
27+
// ForEach executes all the contextual functions according to the store options and returns an error if one occurred.
28+
func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
29+
group := NewContextualGroup(ExecuteAll(executionOptions).Options()...)
30+
group.RegisterFunction(contextualFunc...)
31+
return group.Execute(ctx)
32+
}
33+
34+
// BreakOnError executes each functions in the group until an error is found or the context gets cancelled.
35+
func BreakOnError(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
36+
group := NewContextualGroup(StopOnFirstError(executionOptions).Options()...)
37+
group.RegisterFunction(contextualFunc...)
38+
return group.Execute(ctx)
39+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package parallelisation
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/ARM-software/golang-utils/utils/commonerrors"
10+
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
11+
)
12+
13+
func TestForEach(t *testing.T) {
14+
cancelFunc := func() {}
15+
t.Run("close with 1 error", func(t *testing.T) {
16+
closeError := commonerrors.ErrUnexpected
17+
18+
errortest.AssertError(t, ForEach(context.Background(), WithOptions(Parallel), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
19+
})
20+
21+
t.Run("close with 1 error but error collection", func(t *testing.T) {
22+
closeError := commonerrors.ErrUnexpected
23+
errortest.AssertError(t, ForEach(context.Background(), WithOptions(Parallel, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
24+
})
25+
26+
t.Run("close with 1 error but error collection", func(t *testing.T) {
27+
closeError := commonerrors.ErrUnexpected
28+
errortest.AssertError(t, ForEach(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
29+
})
30+
31+
t.Run("close with 1 error but sequential", func(t *testing.T) {
32+
closeError := commonerrors.ErrUnexpected
33+
errortest.AssertError(t, ForEach(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
34+
errortest.AssertError(t, BreakOnError(context.Background(), WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
35+
})
36+
37+
t.Run("close with cancellation", func(t *testing.T) {
38+
closeError := commonerrors.ErrUnexpected
39+
cancelCtx, cancel := context.WithCancel(context.Background())
40+
cancel()
41+
errortest.AssertError(t, ForEach(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
42+
errortest.AssertError(t, BreakOnError(cancelCtx, WithOptions(SequentialInReverse, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)), commonerrors.ErrCancelled)
43+
})
44+
45+
t.Run("break on error with no error", func(t *testing.T) {
46+
require.NoError(t, BreakOnError(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
47+
})
48+
}

0 commit comments

Comments
 (0)