Skip to content

Commit 81abfd7

Browse files
committed
✨ Address review comments
1 parent 999a6ab commit 81abfd7

File tree

7 files changed

+179
-33
lines changed

7 files changed

+179
-33
lines changed

changes/20250820140853.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Added new compound execution group to support nested execution groups

utils/parallelisation/contextual.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ func DetermineContextError(ctx context.Context) error {
1111
return commonerrors.ConvertContextError(ctx.Err())
1212
}
1313

14+
type ContextualFunc func(ctx context.Context) error
15+
1416
type ContextualFunctionGroup struct {
1517
ExecutionGroup[ContextualFunc]
1618
}

utils/parallelisation/contextual_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ func TestForEach(t *testing.T) {
2020

2121
t.Run("close with 1 error but error collection", func(t *testing.T) {
2222
closeError := commonerrors.ErrUnexpected
23-
errortest.AssertError(t, ForEach(context.Background(), WithOptions(Parallel, JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
23+
errortest.AssertError(t, ForEach(context.Background(), WithOptions(Parallel), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
2424
})
2525

26-
t.Run("close with 1 error but error collection", func(t *testing.T) {
26+
t.Run("close with 1 error and limited number of parallel workers", func(t *testing.T) {
2727
closeError := commonerrors.ErrUnexpected
2828
errortest.AssertError(t, ForEach(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCloseToContextualFunc(func() error { return closeError }), WrapCancelToContextualFunc(cancelFunc)), closeError)
2929
})
@@ -45,4 +45,7 @@ func TestForEach(t *testing.T) {
4545
t.Run("break on error with no error", func(t *testing.T) {
4646
require.NoError(t, BreakOnError(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
4747
})
48+
t.Run("for each with no error", func(t *testing.T) {
49+
require.NoError(t, ForEach(context.Background(), WithOptions(Workers(5), JoinErrors), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc), WrapCancelToContextualFunc(cancelFunc)))
50+
})
4851
}

utils/parallelisation/group.go

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,49 @@ type StoreOptions struct {
2323
workers int
2424
}
2525

26+
func (o *StoreOptions) Default() *StoreOptions {
27+
o.clearOnExecution = false
28+
o.stopOnFirstError = false
29+
o.sequential = false
30+
o.reverse = false
31+
o.joinErrors = false
32+
o.onlyOnce = false
33+
o.workers = 0
34+
return o
35+
}
36+
2637
func (o *StoreOptions) Merge(opts *StoreOptions) *StoreOptions {
2738
if opts == nil {
2839
return o
2940
}
30-
return &StoreOptions{
31-
clearOnExecution: opts.clearOnExecution || o.clearOnExecution,
32-
stopOnFirstError: opts.stopOnFirstError || o.stopOnFirstError,
33-
sequential: opts.sequential || o.sequential,
34-
reverse: opts.reverse || o.reverse,
35-
joinErrors: opts.joinErrors || o.joinErrors,
36-
onlyOnce: opts.onlyOnce || o.onlyOnce,
37-
workers: safecast.ToInt(math.Max(float64(opts.workers), float64(o.workers))),
38-
}
41+
o.clearOnExecution = opts.clearOnExecution || o.clearOnExecution
42+
o.stopOnFirstError = opts.stopOnFirstError || o.stopOnFirstError
43+
o.sequential = opts.sequential || o.sequential
44+
o.reverse = opts.reverse || o.reverse
45+
o.joinErrors = opts.joinErrors || o.joinErrors
46+
o.onlyOnce = opts.onlyOnce || o.onlyOnce
47+
o.workers = safecast.ToInt(math.Max(float64(opts.workers), float64(o.workers)))
48+
return o
49+
}
50+
51+
func (o *StoreOptions) MergeWithOptions(opt ...StoreOption) *StoreOptions {
52+
return o.Merge(WithOptions(opt...))
53+
}
54+
55+
func (o *StoreOptions) Overwrite(opts *StoreOptions) *StoreOptions {
56+
return o.Default().Merge(opts)
57+
}
58+
59+
func (o *StoreOptions) WithOptions(opts ...StoreOption) *StoreOptions {
60+
return o.Overwrite(WithOptions(opts...))
3961
}
4062

4163
func (o *StoreOptions) Options() []StoreOption {
4264
return []StoreOption{
4365
func(opts *StoreOptions) *StoreOptions {
4466
op := o
4567
if op == nil {
46-
op = &StoreOptions{}
68+
op = DefaultOptions()
4769
}
4870
return op.Merge(opts)
4971
},
@@ -55,7 +77,7 @@ type StoreOption func(*StoreOptions) *StoreOptions
5577
// StopOnFirstError stops ExecutionGroup execution on first error.
5678
var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
5779
if o == nil {
58-
o = &StoreOptions{}
80+
o = DefaultOptions()
5981
}
6082
o.stopOnFirstError = true
6183
o.joinErrors = false
@@ -66,7 +88,7 @@ var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
6688
// This option should not be used in combination to StopOnFirstError.
6789
var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
6890
if o == nil {
69-
o = &StoreOptions{}
91+
o = DefaultOptions()
7092
}
7193
o.stopOnFirstError = false
7294
o.joinErrors = true
@@ -76,7 +98,7 @@ var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
7698
// OnlyOnce will ensure the function are executed only once if they do.
7799
var OnlyOnce StoreOption = func(o *StoreOptions) *StoreOptions {
78100
if o == nil {
79-
o = &StoreOptions{}
101+
o = DefaultOptions()
80102
}
81103
o.onlyOnce = true
82104
return o
@@ -85,7 +107,7 @@ var OnlyOnce StoreOption = func(o *StoreOptions) *StoreOptions {
85107
// AnyTimes will allow the functions to be executed as often that they might be.
86108
var AnyTimes StoreOption = func(o *StoreOptions) *StoreOptions {
87109
if o == nil {
88-
o = &StoreOptions{}
110+
o = DefaultOptions()
89111
}
90112
o.onlyOnce = false
91113
return o
@@ -94,7 +116,7 @@ var AnyTimes StoreOption = func(o *StoreOptions) *StoreOptions {
94116
// ExecuteAll executes all functions in the ExecutionGroup even if an error is raised. the first error raised is then returned.
95117
var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
96118
if o == nil {
97-
o = &StoreOptions{}
119+
o = DefaultOptions()
98120
}
99121
o.stopOnFirstError = false
100122
return o
@@ -103,7 +125,7 @@ var ExecuteAll StoreOption = func(o *StoreOptions) *StoreOptions {
103125
// ClearAfterExecution clears the ExecutionGroup after execution.
104126
var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
105127
if o == nil {
106-
o = &StoreOptions{}
128+
o = DefaultOptions()
107129
}
108130
o.clearOnExecution = true
109131
return o
@@ -112,7 +134,7 @@ var ClearAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
112134
// RetainAfterExecution keep the ExecutionGroup intact after execution (no reset).
113135
var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
114136
if o == nil {
115-
o = &StoreOptions{}
137+
o = DefaultOptions()
116138
}
117139
o.clearOnExecution = false
118140
return o
@@ -121,7 +143,7 @@ var RetainAfterExecution StoreOption = func(o *StoreOptions) *StoreOptions {
121143
// Parallel ensures every function registered in the ExecutionGroup is executed concurrently in the order they were registered.
122144
var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
123145
if o == nil {
124-
o = &StoreOptions{}
146+
o = DefaultOptions()
125147
}
126148
o.sequential = false
127149
return o
@@ -131,7 +153,7 @@ var Parallel StoreOption = func(o *StoreOptions) *StoreOptions {
131153
func Workers(workers int) StoreOption {
132154
return func(o *StoreOptions) *StoreOptions {
133155
if o == nil {
134-
o = &StoreOptions{}
156+
o = DefaultOptions()
135157
}
136158
o.workers = workers
137159
o.sequential = false
@@ -142,7 +164,7 @@ func Workers(workers int) StoreOption {
142164
// Sequential ensures every function registered in the ExecutionGroup is executed sequentially in the order they were registered.
143165
var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
144166
if o == nil {
145-
o = &StoreOptions{}
167+
o = DefaultOptions()
146168
}
147169
o.sequential = true
148170
return o
@@ -151,7 +173,7 @@ var Sequential StoreOption = func(o *StoreOptions) *StoreOptions {
151173
// SequentialInReverse ensures every function registered in the ExecutionGroup is executed sequentially but in the reverse order they were registered.
152174
var SequentialInReverse StoreOption = func(o *StoreOptions) *StoreOptions {
153175
if o == nil {
154-
o = &StoreOptions{}
176+
o = DefaultOptions()
155177
}
156178
o.sequential = true
157179
o.reverse = true
@@ -164,11 +186,34 @@ func WithOptions(option ...StoreOption) (opts *StoreOptions) {
164186
opts = option[i](opts)
165187
}
166188
if opts == nil {
167-
opts = &StoreOptions{}
189+
opts = DefaultOptions()
168190
}
169191
return
170192
}
171193

194+
// DefaultOptions returns the default store configuration
195+
func DefaultOptions() *StoreOptions {
196+
opts := &StoreOptions{}
197+
return opts.Default()
198+
}
199+
200+
type IExecutor interface {
201+
// Execute executes all the functions in the group.
202+
Execute(ctx context.Context) error
203+
}
204+
205+
type IExecutionGroup[T any] interface {
206+
IExecutor
207+
RegisterFunction(function ...T)
208+
Len() int
209+
}
210+
211+
type ICompoundExecutionGroup[T any] interface {
212+
IExecutionGroup[T]
213+
// RegisterExecutor registers executors of any kind to the group: they could be functions or sub-groups.
214+
RegisterExecutor(executor ...IExecutor)
215+
}
216+
172217
// NewExecutionGroup returns an execution group which executes functions according to store options.
173218
func NewExecutionGroup[T any](executeFunc ExecuteFunc[T], options ...StoreOption) *ExecutionGroup[T] {
174219

@@ -366,3 +411,25 @@ func newWrapped[T any](e T, once bool) wrappedElement[T] {
366411
return newBasicWrap[T](e)
367412
}
368413
}
414+
415+
var _ ICompoundExecutionGroup[ContextualFunc] = &CompoundExecutionGroup{}
416+
417+
// NewCompoundExecutionGroup returns an execution group made of executors
418+
func NewCompoundExecutionGroup(options ...StoreOption) *CompoundExecutionGroup {
419+
return &CompoundExecutionGroup{
420+
ContextualFunctionGroup: *NewContextualGroup(options...),
421+
}
422+
}
423+
424+
type CompoundExecutionGroup struct {
425+
ContextualFunctionGroup
426+
}
427+
428+
// RegisterExecutor registers executors
429+
func (g *CompoundExecutionGroup) RegisterExecutor(group ...IExecutor) {
430+
for i := range group {
431+
g.RegisterFunction(func(ctx context.Context) error {
432+
return group[i].Execute(ctx)
433+
})
434+
}
435+
}

utils/parallelisation/group_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package parallelisation
22

33
import (
4+
"context"
45
"testing"
56

7+
"github.com/stretchr/testify/assert"
68
"github.com/stretchr/testify/require"
79
"go.uber.org/mock/gomock"
810

11+
"github.com/ARM-software/golang-utils/utils/commonerrors"
12+
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
913
"github.com/ARM-software/golang-utils/utils/parallelisation/mocks"
1014
)
1115

@@ -146,3 +150,43 @@ func TestExecutionTimes(t *testing.T) {
146150
require.NoError(t, group.Close())
147151
})
148152
}
153+
154+
func TestCompoundGroup(t *testing.T) {
155+
ctlr := gomock.NewController(t)
156+
defer ctlr.Finish()
157+
158+
closerMock := mocks.NewMockCloser(ctlr)
159+
closerMock.EXPECT().Close().Return(nil).Times(17)
160+
group := NewCloserStoreWithOptions(ExecuteAll, OnlyOnce, Sequential)
161+
group.RegisterFunction(closerMock, closerMock, closerMock)
162+
163+
compoundGroup := NewCompoundExecutionGroup(Parallel, RetainAfterExecution)
164+
compoundGroup.RegisterFunction(WrapCloseToContextualFunc(WrapCloserIntoCloseFunc(closerMock)))
165+
compoundGroup.RegisterExecutor(group)
166+
compoundGroup.RegisterFunction(WrapCancelToContextualFunc(WrapContextualToCancelFunc(WrapCloseToContextualFunc(WrapCloserIntoCloseFunc(closerMock)))))
167+
168+
assert.Equal(t, 3, compoundGroup.Len())
169+
170+
require.NoError(t, compoundGroup.Execute(context.Background()))
171+
require.NoError(t, compoundGroup.Execute(context.Background()))
172+
require.NoError(t, compoundGroup.Execute(context.Background()))
173+
require.NoError(t, compoundGroup.Execute(context.Background()))
174+
require.NoError(t, compoundGroup.Execute(context.Background()))
175+
require.NoError(t, compoundGroup.Execute(context.Background()))
176+
require.NoError(t, compoundGroup.Execute(context.Background()))
177+
178+
t.Run("With cancelled context", func(t *testing.T) {
179+
ctx, cancel := context.WithCancel(context.Background())
180+
cancel()
181+
errortest.AssertError(t, compoundGroup.Execute(ctx), commonerrors.ErrCancelled)
182+
})
183+
184+
}
185+
186+
func TestStoreOptions_MergeWithOptions(t *testing.T) {
187+
opts := WithOptions(Parallel).MergeWithOptions(OnlyOnce, ExecuteAll, Workers(5), Sequential)
188+
assert.True(t, opts.onlyOnce)
189+
assert.False(t, opts.stopOnFirstError)
190+
assert.True(t, opts.sequential)
191+
assert.Equal(t, 5, opts.workers)
192+
}

utils/parallelisation/onclose.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,14 @@ func NewCloserStore(stopOnFirstError bool) *CloserStore {
2929
if stopOnFirstError {
3030
option = StopOnFirstError
3131
}
32-
return NewCloserStoreWithOptions(option, Parallel, RetainAfterExecution)
32+
return NewCloserStoreWithOptions(option, Parallel, OnlyOnce, RetainAfterExecution)
3333
}
3434

3535
// NewCloserStoreWithOptions returns a store of io.Closer object which will all be closed on Close(). The first error received if any will be returned
3636
func NewCloserStoreWithOptions(opts ...StoreOption) *CloserStore {
3737
return &CloserStore{
38-
ExecutionGroup: *NewExecutionGroup[io.Closer](func(_ context.Context, closerObj io.Closer) error {
39-
if closerObj == nil {
40-
return commonerrors.UndefinedVariable("closer object")
41-
}
42-
return closerObj.Close()
38+
ExecutionGroup: *NewExecutionGroup[io.Closer](func(ctx context.Context, closerObj io.Closer) error {
39+
return WrapCloseToContextualFunc(WrapCloserIntoCloseFunc(closerObj))(ctx)
4340
}, opts...),
4441
}
4542
}
@@ -86,9 +83,21 @@ func CloseAllFuncAndCollateErrors(cs ...CloseFunc) error {
8683
return group.Close()
8784
}
8885

89-
type ContextualFunc func(ctx context.Context) error
9086
type CloseFunc func() error
9187

88+
func (c CloseFunc) Close() error {
89+
return c()
90+
}
91+
92+
func WrapCloserIntoCloseFunc(closer io.Closer) CloseFunc {
93+
return func() error {
94+
if closer == nil {
95+
return commonerrors.UndefinedVariable("closer object")
96+
}
97+
return closer.Close()
98+
}
99+
}
100+
92101
func WrapCancelToCloseFunc(f context.CancelFunc) CloseFunc {
93102
return func() error {
94103
f()
@@ -172,5 +181,10 @@ func NewConcurrentCloseFunctionStore(stopOnFirstError bool) *CloseFunctionStore
172181
if stopOnFirstError {
173182
option = StopOnFirstError
174183
}
175-
return NewCloseFunctionStore(option, Parallel, RetainAfterExecution)
184+
return NewCloseFunctionStore(option, Parallel, RetainAfterExecution, OnlyOnce)
185+
}
186+
187+
// NewCloseOnceGroup is the same as NewCloseFunctionStore but ensures any closing functions are only executed once.
188+
func NewCloseOnceGroup(options ...StoreOption) *CloseFunctionStore {
189+
return NewCloseFunctionStore(OnlyOnce(WithOptions(options...)).Options()...)
176190
}

utils/parallelisation/onclose_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,21 @@ func TestCloseAll(t *testing.T) {
7272

7373
}
7474

75+
func TestCloseOnce(t *testing.T) {
76+
t.Run("close every function once", func(t *testing.T) {
77+
ctlr := gomock.NewController(t)
78+
defer ctlr.Finish()
79+
closeError := commonerrors.ErrUnexpected
80+
81+
closerMock := mocks.NewMockCloser(ctlr)
82+
closerMock.EXPECT().Close().Return(closeError).Times(3)
83+
84+
group := NewCloseOnceGroup(Parallel, RetainAfterExecution)
85+
group.RegisterCloseFunction(WrapCloserIntoCloseFunc(closerMock), WrapCloserIntoCloseFunc(closerMock), WrapCloserIntoCloseFunc(closerMock))
86+
errortest.AssertError(t, group.Close(), closeError)
87+
})
88+
}
89+
7590
func TestCancelOnClose(t *testing.T) {
7691
t.Run("parallel", func(t *testing.T) {
7792
closeStore := NewCloseFunctionStoreStore(true)
@@ -136,7 +151,7 @@ func TestSequentialExecution(t *testing.T) {
136151
for i := range tests {
137152
test := tests[i]
138153
t.Run(fmt.Sprintf("%v-%#v", i, test.option), func(t *testing.T) {
139-
opt := test.option(&StoreOptions{})
154+
opt := test.option(DefaultOptions())
140155
t.Run("sequentially", func(t *testing.T) {
141156
closeStore := NewCloseFunctionStore(test.option, Sequential)
142157
ctx1, cancel1 := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)