Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/20250813063457.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[parallelisation]` Extend the cancel/close store to have various ways of functioning (e.g. parallel, sequential, reverse order, etc.)
186 changes: 159 additions & 27 deletions utils/parallelisation/cancel_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,99 @@ import (
"github.com/ARM-software/golang-utils/utils/reflection"
)

func newFunctionStore[T any](clearOnExecution, stopOnFirstError bool, executeFunc func(context.Context, T) error) *store[T] {
type StoreOptions struct {
clearOnExecution bool
stopOnFirstError bool
sequential bool
reverse bool
}

type StoreOption func(*StoreOptions) *StoreOptions

// StopOnFirstError stops store execution on first error.
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.stopOnFirstError = true
return o
}

// ExecuteAll executes all functions in the store even if an error is raised. the first error raised is then returned.
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.stopOnFirstError = false
return o
}

// ClearAfterExecution clears the store after execution.
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.clearOnExecution = true
return o
}

// RetainAfterExecution keep the store intact after execution (no reset).
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.clearOnExecution = false
return o
}

// Parallel ensures every function registered in the store is executed concurrently in the order they were registered.
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.sequential = false
return o
}

// Sequential ensures every function registered in the store is executed sequentially in the order they were registered.
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.sequential = true
return o
}

// SequentialInReverse ensures every function registered in the store is executed sequentially but in the reverse order they were registered.
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
if o == nil {
return o
}
o.sequential = true
o.reverse = true
return o
}

func newFunctionStore[T any](executeFunc func(context.Context, T) error, options ...StoreOption) *store[T] {

opts := &StoreOptions{}

for i := range options {
opts = options[i](opts)
}
return &store[T]{
mu: deadlock.RWMutex{},
functions: make([]T, 0),
executeFunc: executeFunc,
clearOnExecution: clearOnExecution,
stopOnFirstError: stopOnFirstError,
mu: deadlock.RWMutex{},
functions: make([]T, 0),
executeFunc: executeFunc,
options: *opts,
}
}

type store[T any] struct {
mu deadlock.RWMutex
functions []T
executeFunc func(ctx context.Context, element T) error
clearOnExecution bool
stopOnFirstError bool
mu deadlock.RWMutex
functions []T
executeFunc func(ctx context.Context, element T) error
options StoreOptions
}

func (s *store[T]) RegisterFunction(function ...T) {
Expand All @@ -45,32 +122,87 @@ func (s *store[T]) Len() int {
return len(s.functions)
}

func (s *store[T]) Execute(ctx context.Context) error {
func (s *store[T]) Execute(ctx context.Context) (err error) {
defer s.mu.Unlock()
s.mu.Lock()
if reflection.IsEmpty(s.executeFunc) {
return commonerrors.New(commonerrors.ErrUndefined, "the cancel store was not initialised correctly")
return commonerrors.New(commonerrors.ErrUndefined, "the store was not initialised correctly")
}

if s.options.sequential {
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse)
} else {
err = s.executeConcurrently(ctx, s.options.stopOnFirstError)
}

if err == nil && s.options.clearOnExecution {
s.functions = make([]T, 0, len(s.functions))
}
return
}

func (s *store[T]) executeConcurrently(ctx context.Context, stopOnFirstError bool) error {
g, gCtx := errgroup.WithContext(ctx)
if !s.stopOnFirstError {
if !stopOnFirstError {
gCtx = ctx
}
g.SetLimit(len(s.functions))
for i := range s.functions {
g.Go(func() error {
err := DetermineContextError(gCtx)
if err != nil {
return err
}
return s.executeFunc(gCtx, s.functions[i])
_, subErr := s.executeFunction(gCtx, s.functions[i])
return subErr
})
}

err := g.Wait()
if err == nil && s.clearOnExecution {
s.functions = make([]T, 0, len(s.functions))
return g.Wait()
}

func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse bool) (err error) {
err = DetermineContextError(ctx)
if err != nil {
return
}
if reverse {
for i := len(s.functions) - 1; i >= 0; i-- {
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
if shouldBreak {
err = subErr
return
}
if subErr != nil && err == nil {
err = subErr
if stopOnFirstError {
return
}
}
}
} else {
for i := range s.functions {
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
if shouldBreak {
err = subErr
return
}
if subErr != nil && err == nil {
err = subErr
if stopOnFirstError {
return
}
}
}
}

return
}

func (s *store[T]) executeFunction(ctx context.Context, element T) (shouldBreak bool, err error) {
err = DetermineContextError(ctx)
if err != nil {
shouldBreak = true
return
}
return err
err = s.executeFunc(ctx, element)
return
}

type CancelFunctionStore struct {
Expand All @@ -90,12 +222,12 @@ func (s *CancelFunctionStore) Len() int {
return s.store.Len()
}

// NewCancelFunctionsStore creates a store for cancel functions.
func NewCancelFunctionsStore() *CancelFunctionStore {
// NewCancelFunctionsStore creates a store for cancel functions. Whatever the options passed, all cancel functions will be executed.
func NewCancelFunctionsStore(options ...StoreOption) *CancelFunctionStore {
return &CancelFunctionStore{
store: *newFunctionStore[context.CancelFunc](true, false, func(_ context.Context, cancelFunc context.CancelFunc) error {
store: *newFunctionStore[context.CancelFunc](func(_ context.Context, cancelFunc context.CancelFunc) error {
cancelFunc()
return nil
}),
}, append(options, ClearAfterExecution, ExecuteAll)...),
}
}
56 changes: 37 additions & 19 deletions utils/parallelisation/cancel_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,54 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
)

func testCancelStore(t *testing.T, store *CancelFunctionStore) {
t.Helper()
require.NotNil(t, store)
// Set up some fake CancelFuncs to make sure they are called
called1 := false
called2 := false
cancelFunc1 := func() {
called1 = true
}
cancelFunc2 := func() {
called2 = true
}

store.RegisterCancelFunction(cancelFunc1, cancelFunc2)

assert.Equal(t, 2, store.Len())
assert.False(t, called1)
assert.False(t, called2)
store.Cancel()

assert.True(t, called1)
assert.True(t, called2)
}

// Given a CancelFunctionsStore
// Functions can be registered
// and all functions will be called
func TestCancelFunctionStore(t *testing.T) {
t.Run("valid cancel store", func(t *testing.T) {
// Set up some fake CancelFuncs to make sure they are called
called1 := false
called2 := false
cancelFunc1 := func() {
called1 = true
}
cancelFunc2 := func() {
called2 = true
}

store := NewCancelFunctionsStore()

store.RegisterCancelFunction(cancelFunc1, cancelFunc2)

assert.Equal(t, 2, store.Len())

store.Cancel()

assert.True(t, called1)
assert.True(t, called2)
t.Run("parallel", func(t *testing.T) {
testCancelStore(t, NewCancelFunctionsStore())
})
t.Run("sequential", func(t *testing.T) {
testCancelStore(t, NewCancelFunctionsStore(Sequential))
})
t.Run("reverse", func(t *testing.T) {
testCancelStore(t, NewCancelFunctionsStore(SequentialInReverse))
})
t.Run("execute all", func(t *testing.T) {
testCancelStore(t, NewCancelFunctionsStore(StopOnFirstError))
})
})

t.Run("incorrectly initialised cancel store", func(t *testing.T) {
Expand Down
36 changes: 30 additions & 6 deletions utils/parallelisation/onclose.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ func (s *CloserStore) Len() int {

// NewCloserStore returns a store of io.Closer object which will all be closed concurrently on Close(). The first error received will be returned
func NewCloserStore(stopOnFirstError bool) *CloserStore {
option := ExecuteAll
if stopOnFirstError {
option = StopOnFirstError
}
Comment on lines +28 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an option only represent a positive will help us avoid overwriting options which will simplify option interactions if things get complicated, what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the overwriting is because the stores cancel and close have fundamentally different behaviours

return NewCloserStoreWithOptions(option, Parallel)
}

// NewCloserStoreWithOptions returns a store of io.Closer object which will all be closed on Close(). The first error received if any will be returned
func NewCloserStoreWithOptions(opts ...StoreOption) *CloserStore {
return &CloserStore{
store: *newFunctionStore[io.Closer](false, stopOnFirstError, func(_ context.Context, closerObj io.Closer) error {
store: *newFunctionStore[io.Closer](func(_ context.Context, closerObj io.Closer) error {
if closerObj == nil {
return commonerrors.UndefinedVariable("closer object")
}
return closerObj.Close()
}),
}, append(opts, RetainAfterExecution)...),
}
}

Expand Down Expand Up @@ -90,11 +99,26 @@ func (s *CloseFunctionStore) Len() int {
return s.store.Len()
}

// NewCloseFunctionStoreStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned.
func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore {
// NewCloseFunctionStore returns a store closing functions which will all be called on Close(). The first error received if any will be returned.
func NewCloseFunctionStore(options ...StoreOption) *CloseFunctionStore {
return &CloseFunctionStore{
store: *newFunctionStore[CloseFunc](false, stopOnFirstError, func(_ context.Context, closerObj CloseFunc) error {
store: *newFunctionStore[CloseFunc](func(_ context.Context, closerObj CloseFunc) error {
return closerObj()
}),
}, append(options, RetainAfterExecution)...),
}
}

// NewCloseFunctionStoreStore is exactly the same as NewConcurrentCloseFunctionStore but without a typo in the name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what typo ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StoreStore in the name

func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore {
return NewConcurrentCloseFunctionStore(stopOnFirstError)
}

// NewConcurrentCloseFunctionStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned.
// Prefer using NewCloseFunctionStore where possible
func NewConcurrentCloseFunctionStore(stopOnFirstError bool) *CloseFunctionStore {
option := ExecuteAll
if stopOnFirstError {
option = StopOnFirstError
}
return NewCloseFunctionStore(option, Parallel)
}
Loading
Loading