Skip to content

Commit 4eb2998

Browse files
acabarbayeaorabdel
andauthored
[parallelisation] Extend the cancel/close store to have various ways of functioning (e.g. parallel, sequential, reverse order, etc.) (#676)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description Extend the stores so it can be executed differently : i.e. sequentially vs concurrently ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update). --------- Co-authored-by: Abdelrahman Abdelraouf <abdelrahman.abdelraouf@arm.com>
1 parent 3169df4 commit 4eb2998

File tree

5 files changed

+316
-66
lines changed

5 files changed

+316
-66
lines changed

changes/20250813063457.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Extend the cancel/close store to have various ways of functioning (e.g. parallel, sequential, reverse order, etc.)

utils/parallelisation/cancel_functions.go

Lines changed: 159 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,99 @@ import (
1515
"github.com/ARM-software/golang-utils/utils/reflection"
1616
)
1717

18-
func newFunctionStore[T any](clearOnExecution, stopOnFirstError bool, executeFunc func(context.Context, T) error) *store[T] {
18+
type StoreOptions struct {
19+
clearOnExecution bool
20+
stopOnFirstError bool
21+
sequential bool
22+
reverse bool
23+
}
24+
25+
type StoreOption func(*StoreOptions) *StoreOptions
26+
27+
// StopOnFirstError stops store execution on first error.
28+
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
29+
if o == nil {
30+
return o
31+
}
32+
o.stopOnFirstError = true
33+
return o
34+
}
35+
36+
// ExecuteAll executes all functions in the store even if an error is raised. the first error raised is then returned.
37+
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
38+
if o == nil {
39+
return o
40+
}
41+
o.stopOnFirstError = false
42+
return o
43+
}
44+
45+
// ClearAfterExecution clears the store after execution.
46+
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
47+
if o == nil {
48+
return o
49+
}
50+
o.clearOnExecution = true
51+
return o
52+
}
53+
54+
// RetainAfterExecution keep the store intact after execution (no reset).
55+
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
56+
if o == nil {
57+
return o
58+
}
59+
o.clearOnExecution = false
60+
return o
61+
}
62+
63+
// Parallel ensures every function registered in the store is executed concurrently in the order they were registered.
64+
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
65+
if o == nil {
66+
return o
67+
}
68+
o.sequential = false
69+
return o
70+
}
71+
72+
// Sequential ensures every function registered in the store is executed sequentially in the order they were registered.
73+
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
74+
if o == nil {
75+
return o
76+
}
77+
o.sequential = true
78+
return o
79+
}
80+
81+
// SequentialInReverse ensures every function registered in the store is executed sequentially but in the reverse order they were registered.
82+
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
83+
if o == nil {
84+
return o
85+
}
86+
o.sequential = true
87+
o.reverse = true
88+
return o
89+
}
90+
91+
func newFunctionStore[T any](executeFunc func(context.Context, T) error, options ...StoreOption) *store[T] {
92+
93+
opts := &StoreOptions{}
94+
95+
for i := range options {
96+
opts = options[i](opts)
97+
}
1998
return &store[T]{
20-
mu: deadlock.RWMutex{},
21-
functions: make([]T, 0),
22-
executeFunc: executeFunc,
23-
clearOnExecution: clearOnExecution,
24-
stopOnFirstError: stopOnFirstError,
99+
mu: deadlock.RWMutex{},
100+
functions: make([]T, 0),
101+
executeFunc: executeFunc,
102+
options: *opts,
25103
}
26104
}
27105

28106
type store[T any] struct {
29-
mu deadlock.RWMutex
30-
functions []T
31-
executeFunc func(ctx context.Context, element T) error
32-
clearOnExecution bool
33-
stopOnFirstError bool
107+
mu deadlock.RWMutex
108+
functions []T
109+
executeFunc func(ctx context.Context, element T) error
110+
options StoreOptions
34111
}
35112

36113
func (s *store[T]) RegisterFunction(function ...T) {
@@ -45,32 +122,87 @@ func (s *store[T]) Len() int {
45122
return len(s.functions)
46123
}
47124

48-
func (s *store[T]) Execute(ctx context.Context) error {
125+
func (s *store[T]) Execute(ctx context.Context) (err error) {
49126
defer s.mu.Unlock()
50127
s.mu.Lock()
51128
if reflection.IsEmpty(s.executeFunc) {
52-
return commonerrors.New(commonerrors.ErrUndefined, "the cancel store was not initialised correctly")
129+
return commonerrors.New(commonerrors.ErrUndefined, "the store was not initialised correctly")
130+
}
131+
132+
if s.options.sequential {
133+
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse)
134+
} else {
135+
err = s.executeInParallel(ctx, s.options.stopOnFirstError)
53136
}
137+
138+
if err == nil && s.options.clearOnExecution {
139+
s.functions = make([]T, 0, len(s.functions))
140+
}
141+
return
142+
}
143+
144+
func (s *store[T]) executeInParallel(ctx context.Context, stopOnFirstError bool) error {
54145
g, gCtx := errgroup.WithContext(ctx)
55-
if !s.stopOnFirstError {
146+
if !stopOnFirstError {
56147
gCtx = ctx
57148
}
58149
g.SetLimit(len(s.functions))
59150
for i := range s.functions {
60151
g.Go(func() error {
61-
err := DetermineContextError(gCtx)
62-
if err != nil {
63-
return err
64-
}
65-
return s.executeFunc(gCtx, s.functions[i])
152+
_, subErr := s.executeFunction(gCtx, s.functions[i])
153+
return subErr
66154
})
67155
}
68156

69-
err := g.Wait()
70-
if err == nil && s.clearOnExecution {
71-
s.functions = make([]T, 0, len(s.functions))
157+
return g.Wait()
158+
}
159+
160+
func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse bool) (err error) {
161+
err = DetermineContextError(ctx)
162+
if err != nil {
163+
return
164+
}
165+
if reverse {
166+
for i := len(s.functions) - 1; i >= 0; i-- {
167+
mustBreak, subErr := s.executeFunction(ctx, s.functions[i])
168+
if mustBreak {
169+
err = subErr
170+
return
171+
}
172+
if subErr != nil && err == nil {
173+
err = subErr
174+
if stopOnFirstError {
175+
return
176+
}
177+
}
178+
}
179+
} else {
180+
for i := range s.functions {
181+
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
182+
if shouldBreak {
183+
err = subErr
184+
return
185+
}
186+
if subErr != nil && err == nil {
187+
err = subErr
188+
if stopOnFirstError {
189+
return
190+
}
191+
}
192+
}
193+
}
194+
195+
return
196+
}
197+
198+
func (s *store[T]) executeFunction(ctx context.Context, element T) (mustBreak bool, err error) {
199+
err = DetermineContextError(ctx)
200+
if err != nil {
201+
mustBreak = true
202+
return
72203
}
73-
return err
204+
err = s.executeFunc(ctx, element)
205+
return
74206
}
75207

76208
type CancelFunctionStore struct {
@@ -90,12 +222,12 @@ func (s *CancelFunctionStore) Len() int {
90222
return s.store.Len()
91223
}
92224

93-
// NewCancelFunctionsStore creates a store for cancel functions.
94-
func NewCancelFunctionsStore() *CancelFunctionStore {
225+
// NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed and cleared. In other words, options `RetainAfterExecution` and `StopOnFirstError` would be discarded if selected to create the Cancel store
226+
func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore {
95227
return &CancelFunctionStore{
96-
store: *newFunctionStore[context.CancelFunc](true, false, func(_ context.Context, cancelFunc context.CancelFunc) error {
228+
store: *newFunctionStore[context.CancelFunc](func(_ context.Context, cancelFunc context.CancelFunc) error {
97229
cancelFunc()
98230
return nil
99-
}),
231+
}, append(options, ClearAfterExecution, ExecuteAll)...),
100232
}
101233
}

utils/parallelisation/cancel_functions_test.go

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,54 @@ import (
99
"testing"
1010

1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
1213

1314
"github.com/ARM-software/golang-utils/utils/commonerrors"
1415
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
1516
)
1617

18+
func testCancelStore(t *testing.T, store *CancelFunctionStore) {
19+
t.Helper()
20+
require.NotNil(t, store)
21+
// Set up some fake CancelFuncs to make sure they are called
22+
called1 := false
23+
called2 := false
24+
cancelFunc1 := func() {
25+
called1 = true
26+
}
27+
cancelFunc2 := func() {
28+
called2 = true
29+
}
30+
31+
store.RegisterCancelFunction(cancelFunc1, cancelFunc2)
32+
33+
assert.Equal(t, 2, store.Len())
34+
assert.False(t, called1)
35+
assert.False(t, called2)
36+
store.Cancel()
37+
38+
assert.True(t, called1)
39+
assert.True(t, called2)
40+
}
41+
1742
// Given a CancelFunctionsStore
1843
// Functions can be registered
1944
// and all functions will be called
2045
func TestCancelFunctionStore(t *testing.T) {
2146
t.Run("valid cancel store", func(t *testing.T) {
22-
// Set up some fake CancelFuncs to make sure they are called
23-
called1 := false
24-
called2 := false
25-
cancelFunc1 := func() {
26-
called1 = true
27-
}
28-
cancelFunc2 := func() {
29-
called2 = true
30-
}
31-
32-
store := NewCancelFunctionsStore()
33-
34-
store.RegisterCancelFunction(cancelFunc1, cancelFunc2)
35-
36-
assert.Equal(t, 2, store.Len())
37-
38-
store.Cancel()
3947

40-
assert.True(t, called1)
41-
assert.True(t, called2)
48+
t.Run("parallel", func(t *testing.T) {
49+
testCancelStore(t, NewCancelFunctionsStore())
50+
})
51+
t.Run("sequential", func(t *testing.T) {
52+
testCancelStore(t, NewCancelFunctionsStore(Sequential))
53+
})
54+
t.Run("reverse", func(t *testing.T) {
55+
testCancelStore(t, NewCancelFunctionsStore(SequentialInReverse))
56+
})
57+
t.Run("execute all", func(t *testing.T) {
58+
testCancelStore(t, NewCancelFunctionsStore(StopOnFirstError))
59+
})
4260
})
4361

4462
t.Run("incorrectly initialised cancel store", func(t *testing.T) {

utils/parallelisation/onclose.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,22 @@ func (s *CloserStore) Len() int {
2525

2626
// NewCloserStore returns a store of io.Closer object which will all be closed concurrently on Close(). The first error received will be returned
2727
func NewCloserStore(stopOnFirstError bool) *CloserStore {
28+
option := ExecuteAll
29+
if stopOnFirstError {
30+
option = StopOnFirstError
31+
}
32+
return NewCloserStoreWithOptions(option, Parallel)
33+
}
34+
35+
// NewCloserStoreWithOptions returns a store of io.Closer object which will all be closed on Close(). The first error received if any will be returned
36+
func NewCloserStoreWithOptions(opts ...StoreOption) *CloserStore {
2837
return &CloserStore{
29-
store: *newFunctionStore[io.Closer](false, stopOnFirstError, func(_ context.Context, closerObj io.Closer) error {
38+
store: *newFunctionStore[io.Closer](func(_ context.Context, closerObj io.Closer) error {
3039
if closerObj == nil {
3140
return commonerrors.UndefinedVariable("closer object")
3241
}
3342
return closerObj.Close()
34-
}),
43+
}, append(opts, RetainAfterExecution)...),
3544
}
3645
}
3746

@@ -90,11 +99,26 @@ func (s *CloseFunctionStore) Len() int {
9099
return s.store.Len()
91100
}
92101

93-
// NewCloseFunctionStoreStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned.
94-
func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore {
102+
// NewCloseFunctionStore returns a store closing functions which will all be called on Close(). The first error received if any will be returned.
103+
func NewCloseFunctionStore(options ...StoreOption) *CloseFunctionStore {
95104
return &CloseFunctionStore{
96-
store: *newFunctionStore[CloseFunc](false, stopOnFirstError, func(_ context.Context, closerObj CloseFunc) error {
105+
store: *newFunctionStore[CloseFunc](func(_ context.Context, closerObj CloseFunc) error {
97106
return closerObj()
98-
}),
107+
}, append(options, RetainAfterExecution)...),
108+
}
109+
}
110+
111+
// NewCloseFunctionStoreStore is exactly the same as NewConcurrentCloseFunctionStore but without a typo in the name.
112+
func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore {
113+
return NewConcurrentCloseFunctionStore(stopOnFirstError)
114+
}
115+
116+
// NewConcurrentCloseFunctionStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned.
117+
// Prefer using NewCloseFunctionStore where possible
118+
func NewConcurrentCloseFunctionStore(stopOnFirstError bool) *CloseFunctionStore {
119+
option := ExecuteAll
120+
if stopOnFirstError {
121+
option = StopOnFirstError
99122
}
123+
return NewCloseFunctionStore(option, Parallel)
100124
}

0 commit comments

Comments
 (0)