Skip to content

Commit 7cc46ab

Browse files
authored
✨ [parallelisation] Add an option to collate all the errors found during a function store execution (#677)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description Added an option so that it is possible to join all errors during the execution of functions of a store ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent abc00e7 commit 7cc46ab

File tree

4 files changed

+152
-45
lines changed

4 files changed

+152
-45
lines changed

changes/20250813180357.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: [parallelisation] Add an option to collate all the errors found during a function store execution

utils/parallelisation/cancel_functions.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type StoreOptions struct {
2020
stopOnFirstError bool
2121
sequential bool
2222
reverse bool
23+
joinErrors bool
2324
}
24-
2525
type StoreOption func(*StoreOptions) *StoreOptions
2626

2727
// StopOnFirstError stops store execution on first error.
@@ -30,6 +30,18 @@ var StopOnFirstError StoreOption = func(o *StoreOptions) *StoreOptions {
3030
return o
3131
}
3232
o.stopOnFirstError = true
33+
o.joinErrors = false
34+
return o
35+
}
36+
37+
// JoinErrors will collate any errors which happened when executing functions in store.
38+
// This option should not be used in combination to StopOnFirstError.
39+
var JoinErrors StoreOption = func(o *StoreOptions) *StoreOptions {
40+
if o == nil {
41+
return o
42+
}
43+
o.stopOnFirstError = false
44+
o.joinErrors = true
3345
return o
3446
}
3547

@@ -130,9 +142,9 @@ func (s *store[T]) Execute(ctx context.Context) (err error) {
130142
}
131143

132144
if s.options.sequential {
133-
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse)
145+
err = s.executeSequentially(ctx, s.options.stopOnFirstError, s.options.reverse, s.options.joinErrors)
134146
} else {
135-
err = s.executeInParallel(ctx, s.options.stopOnFirstError)
147+
err = s.executeConcurrently(ctx, s.options.stopOnFirstError, s.options.joinErrors)
136148
}
137149

138150
if err == nil && s.options.clearOnExecution {
@@ -141,31 +153,48 @@ func (s *store[T]) Execute(ctx context.Context) (err error) {
141153
return
142154
}
143155

144-
func (s *store[T]) executeInParallel(ctx context.Context, stopOnFirstError bool) error {
156+
func (s *store[T]) executeConcurrently(ctx context.Context, stopOnFirstError bool, collateErrors bool) error {
145157
g, gCtx := errgroup.WithContext(ctx)
146158
if !stopOnFirstError {
147159
gCtx = ctx
148160
}
149-
g.SetLimit(len(s.functions))
161+
funcNum := len(s.functions)
162+
errCh := make(chan error, funcNum)
163+
g.SetLimit(funcNum)
150164
for i := range s.functions {
151165
g.Go(func() error {
152166
_, subErr := s.executeFunction(gCtx, s.functions[i])
167+
errCh <- subErr
153168
return subErr
154169
})
155170
}
171+
err := g.Wait()
172+
close(errCh)
173+
if collateErrors {
174+
collateErr := make([]error, funcNum)
175+
i := 0
176+
for subErr := range errCh {
177+
collateErr[i] = subErr
178+
i++
179+
}
180+
err = commonerrors.Join(collateErr...)
181+
}
156182

157-
return g.Wait()
183+
return err
158184
}
159185

160-
func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse bool) (err error) {
186+
func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, reverse, collateErrors bool) (err error) {
161187
err = DetermineContextError(ctx)
162188
if err != nil {
163189
return
164190
}
191+
funcNum := len(s.functions)
192+
collateErr := make([]error, funcNum)
165193
if reverse {
166-
for i := len(s.functions) - 1; i >= 0; i-- {
167-
mustBreak, subErr := s.executeFunction(ctx, s.functions[i])
168-
if mustBreak {
194+
for i := funcNum - 1; i >= 0; i-- {
195+
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
196+
collateErr[funcNum-i-1] = subErr
197+
if shouldBreak {
169198
err = subErr
170199
return
171200
}
@@ -179,6 +208,7 @@ func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, re
179208
} else {
180209
for i := range s.functions {
181210
shouldBreak, subErr := s.executeFunction(ctx, s.functions[i])
211+
collateErr[i] = subErr
182212
if shouldBreak {
183213
err = subErr
184214
return
@@ -192,6 +222,9 @@ func (s *store[T]) executeSequentially(ctx context.Context, stopOnFirstError, re
192222
}
193223
}
194224

225+
if collateErrors {
226+
err = commonerrors.Join(collateErr...)
227+
}
195228
return
196229
}
197230

utils/parallelisation/onclose.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,41 @@ func CloseAll(cs ...io.Closer) error {
5151
return group.Close()
5252
}
5353

54+
// CloseAllAndCollateErrors calls concurrently Close on all io.Closer implementations passed as arguments and returns the errors encountered
55+
func CloseAllAndCollateErrors(cs ...io.Closer) error {
56+
group := NewCloserStoreWithOptions(ExecuteAll, Parallel, JoinErrors)
57+
group.RegisterFunction(cs...)
58+
return group.Close()
59+
}
60+
5461
// CloseAllWithContext is similar to CloseAll but can be controlled using a context.
5562
func CloseAllWithContext(ctx context.Context, cs ...io.Closer) error {
5663
group := NewCloserStore(false)
5764
group.RegisterFunction(cs...)
5865
return group.Execute(ctx)
5966
}
6067

68+
// CloseAllWithContextAndCollateErrors is similar to CloseAllAndCollateErrors but can be controlled using a context.
69+
func CloseAllWithContextAndCollateErrors(ctx context.Context, cs ...io.Closer) error {
70+
group := NewCloserStoreWithOptions(ExecuteAll, Parallel, JoinErrors)
71+
group.RegisterFunction(cs...)
72+
return group.Execute(ctx)
73+
}
74+
6175
// CloseAllFunc calls concurrently all Close functions passed as arguments and returns the first error encountered
6276
func CloseAllFunc(cs ...CloseFunc) error {
6377
group := NewCloseFunctionStoreStore(false)
6478
group.RegisterFunction(cs...)
6579
return group.Close()
6680
}
6781

82+
// CloseAllFuncAndCollateErrors calls concurrently all Close functions passed as arguments and returns the errors encountered
83+
func CloseAllFuncAndCollateErrors(cs ...CloseFunc) error {
84+
group := NewCloseFunctionStore(ExecuteAll, Parallel, JoinErrors)
85+
group.RegisterFunction(cs...)
86+
return group.Close()
87+
}
88+
6889
type CloseFunc func() error
6990

7091
type CloseFunctionStore struct {

utils/parallelisation/onclose_test.go

Lines changed: 87 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package parallelisation
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67

78
"github.com/stretchr/testify/assert"
@@ -25,6 +26,16 @@ func TestCloseAll(t *testing.T) {
2526
require.NoError(t, CloseAll(closerMock, closerMock, closerMock))
2627
})
2728

29+
t.Run("close and join errors", func(t *testing.T) {
30+
ctlr := gomock.NewController(t)
31+
defer ctlr.Finish()
32+
33+
closerMock := mocks.NewMockCloser(ctlr)
34+
closerMock.EXPECT().Close().Return(nil).MinTimes(1)
35+
36+
require.NoError(t, CloseAllAndCollateErrors(closerMock, closerMock, closerMock))
37+
})
38+
2839
t.Run("close with error", func(t *testing.T) {
2940
ctlr := gomock.NewController(t)
3041
defer ctlr.Finish()
@@ -36,12 +47,29 @@ func TestCloseAll(t *testing.T) {
3647
errortest.AssertError(t, CloseAll(closerMock, closerMock, closerMock), closeError)
3748
})
3849

50+
t.Run("close with errors", func(t *testing.T) {
51+
ctlr := gomock.NewController(t)
52+
defer ctlr.Finish()
53+
closeError := commonerrors.ErrUnexpected
54+
55+
closerMock := mocks.NewMockCloser(ctlr)
56+
closerMock.EXPECT().Close().Return(closeError).MinTimes(1)
57+
58+
errortest.AssertError(t, CloseAllAndCollateErrors(closerMock, closerMock, closerMock), closeError)
59+
})
60+
3961
t.Run("close with 1 error", func(t *testing.T) {
4062
closeError := commonerrors.ErrUnexpected
4163

4264
errortest.AssertError(t, CloseAllFunc(func() error { return nil }, func() error { return nil }, func() error { return closeError }, func() error { return nil }), closeError)
4365
})
4466

67+
t.Run("close with 1 error but error collection", func(t *testing.T) {
68+
closeError := commonerrors.ErrUnexpected
69+
70+
errortest.AssertError(t, CloseAllFuncAndCollateErrors(func() error { return nil }, func() error { return nil }, func() error { return closeError }, func() error { return nil }), closeError)
71+
})
72+
4573
}
4674

4775
func TestCancelOnClose(t *testing.T) {
@@ -98,39 +126,63 @@ func TestCancelOnClose(t *testing.T) {
98126
})
99127
}
100128

101-
func TestStopOnFirstError(t *testing.T) {
102-
t.Run("sequentially", func(t *testing.T) {
103-
closeStore := NewCloseFunctionStore(StopOnFirstError, Sequential)
104-
ctx1, cancel1 := context.WithCancel(context.Background())
105-
closeStore.RegisterCloseFunction(func() error { cancel1(); return DetermineContextError(ctx1) })
106-
ctx2, cancel2 := context.WithCancel(context.Background())
107-
closeStore.RegisterCloseFunction(func() error { cancel2(); return DetermineContextError(ctx2) })
108-
ctx3, cancel3 := context.WithCancel(context.Background())
109-
closeStore.RegisterCloseFunction(func() error { cancel3(); return DetermineContextError(ctx3) })
110-
assert.Equal(t, 3, closeStore.Len())
111-
require.NoError(t, DetermineContextError(ctx1))
112-
require.NoError(t, DetermineContextError(ctx2))
113-
require.NoError(t, DetermineContextError(ctx3))
114-
errortest.AssertError(t, closeStore.Close(), commonerrors.ErrCancelled)
115-
errortest.AssertError(t, DetermineContextError(ctx1), commonerrors.ErrCancelled)
116-
assert.NoError(t, DetermineContextError(ctx2))
117-
assert.NoError(t, DetermineContextError(ctx3))
118-
})
119-
t.Run("reverse", func(t *testing.T) {
120-
closeStore := NewCloseFunctionStore(StopOnFirstError, SequentialInReverse)
121-
ctx1, cancel1 := context.WithCancel(context.Background())
122-
closeStore.RegisterCloseFunction(func() error { cancel1(); return DetermineContextError(ctx1) })
123-
ctx2, cancel2 := context.WithCancel(context.Background())
124-
closeStore.RegisterCloseFunction(func() error { cancel2(); return DetermineContextError(ctx2) })
125-
ctx3, cancel3 := context.WithCancel(context.Background())
126-
closeStore.RegisterCloseFunction(func() error { cancel3(); return DetermineContextError(ctx3) })
127-
assert.Equal(t, 3, closeStore.Len())
128-
require.NoError(t, DetermineContextError(ctx1))
129-
require.NoError(t, DetermineContextError(ctx2))
130-
require.NoError(t, DetermineContextError(ctx3))
131-
errortest.AssertError(t, closeStore.Close(), commonerrors.ErrCancelled)
132-
assert.NoError(t, DetermineContextError(ctx1))
133-
assert.NoError(t, DetermineContextError(ctx2))
134-
errortest.AssertError(t, DetermineContextError(ctx3), commonerrors.ErrCancelled)
135-
})
129+
func TestSequentialExecution(t *testing.T) {
130+
tests := []struct {
131+
option StoreOption
132+
}{
133+
{StopOnFirstError},
134+
{JoinErrors},
135+
}
136+
for i := range tests {
137+
test := tests[i]
138+
t.Run(fmt.Sprintf("%v-%#v", i, test.option), func(t *testing.T) {
139+
opt := test.option(&StoreOptions{})
140+
t.Run("sequentially", func(t *testing.T) {
141+
closeStore := NewCloseFunctionStore(test.option, Sequential)
142+
ctx1, cancel1 := context.WithCancel(context.Background())
143+
closeStore.RegisterCloseFunction(func() error { cancel1(); return DetermineContextError(ctx1) })
144+
ctx2, cancel2 := context.WithCancel(context.Background())
145+
closeStore.RegisterCloseFunction(func() error { cancel2(); return DetermineContextError(ctx2) })
146+
ctx3, cancel3 := context.WithCancel(context.Background())
147+
closeStore.RegisterCloseFunction(func() error { cancel3(); return DetermineContextError(ctx3) })
148+
assert.Equal(t, 3, closeStore.Len())
149+
require.NoError(t, DetermineContextError(ctx1))
150+
require.NoError(t, DetermineContextError(ctx2))
151+
require.NoError(t, DetermineContextError(ctx3))
152+
153+
errortest.AssertError(t, closeStore.Close(), commonerrors.ErrCancelled)
154+
errortest.AssertError(t, DetermineContextError(ctx1), commonerrors.ErrCancelled)
155+
if opt.stopOnFirstError {
156+
assert.NoError(t, DetermineContextError(ctx2))
157+
assert.NoError(t, DetermineContextError(ctx3))
158+
} else {
159+
errortest.AssertError(t, DetermineContextError(ctx2), commonerrors.ErrCancelled)
160+
errortest.AssertError(t, DetermineContextError(ctx3), commonerrors.ErrCancelled)
161+
}
162+
163+
})
164+
t.Run("reverse", func(t *testing.T) {
165+
closeStore := NewCloseFunctionStore(test.option, SequentialInReverse)
166+
ctx1, cancel1 := context.WithCancel(context.Background())
167+
closeStore.RegisterCloseFunction(func() error { cancel1(); return DetermineContextError(ctx1) })
168+
ctx2, cancel2 := context.WithCancel(context.Background())
169+
closeStore.RegisterCloseFunction(func() error { cancel2(); return DetermineContextError(ctx2) })
170+
ctx3, cancel3 := context.WithCancel(context.Background())
171+
closeStore.RegisterCloseFunction(func() error { cancel3(); return DetermineContextError(ctx3) })
172+
assert.Equal(t, 3, closeStore.Len())
173+
require.NoError(t, DetermineContextError(ctx1))
174+
require.NoError(t, DetermineContextError(ctx2))
175+
require.NoError(t, DetermineContextError(ctx3))
176+
errortest.AssertError(t, closeStore.Close(), commonerrors.ErrCancelled)
177+
if opt.stopOnFirstError {
178+
assert.NoError(t, DetermineContextError(ctx1))
179+
assert.NoError(t, DetermineContextError(ctx2))
180+
} else {
181+
errortest.AssertError(t, DetermineContextError(ctx1), commonerrors.ErrCancelled)
182+
errortest.AssertError(t, DetermineContextError(ctx2), commonerrors.ErrCancelled)
183+
}
184+
errortest.AssertError(t, DetermineContextError(ctx3), commonerrors.ErrCancelled)
185+
})
186+
})
187+
}
136188
}

0 commit comments

Comments
 (0)