Skip to content

Commit 6c0346f

Browse files
authored
[parallelisation] Create a store for closing functions so they can be done concurrently (#653)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description - Close functions concurrently ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 40294a9 commit 6c0346f

File tree

6 files changed

+246
-14
lines changed

6 files changed

+246
-14
lines changed

changes/20250715185008.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` Create a store for closing functions so they can be done concurrently

utils/parallelisation/cancel_functions.go

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,86 @@ import (
99
"context"
1010

1111
"github.com/sasha-s/go-deadlock"
12+
"golang.org/x/sync/errgroup"
1213
)
1314

14-
type CancelFunctionStore struct {
15-
mu deadlock.RWMutex
16-
cancelFunctions []context.CancelFunc
15+
func newFunctionStore[T any](clearOnExecution, stopOnFirstError bool, executeFunc func(context.Context, T) error) *store[T] {
16+
return &store[T]{
17+
mu: deadlock.RWMutex{},
18+
functions: make([]T, 0),
19+
executeFunc: executeFunc,
20+
clearOnExecution: clearOnExecution,
21+
stopOnFirstError: stopOnFirstError,
22+
}
1723
}
1824

19-
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
25+
type store[T any] struct {
26+
mu deadlock.RWMutex
27+
functions []T
28+
executeFunc func(ctx context.Context, element T) error
29+
clearOnExecution bool
30+
stopOnFirstError bool
31+
}
32+
33+
func (s *store[T]) RegisterFunction(function ...T) {
2034
defer s.mu.Unlock()
2135
s.mu.Lock()
22-
s.cancelFunctions = append(s.cancelFunctions, cancel...)
36+
s.functions = append(s.functions, function...)
2337
}
2438

25-
func (s *CancelFunctionStore) Cancel() {
39+
func (s *store[T]) Len() int {
40+
defer s.mu.RUnlock()
41+
s.mu.RLock()
42+
return len(s.functions)
43+
}
44+
45+
func (s *store[T]) Execute(ctx context.Context) error {
2646
defer s.mu.Unlock()
2747
s.mu.Lock()
28-
for _, c := range s.cancelFunctions {
29-
c()
48+
g, gCtx := errgroup.WithContext(ctx)
49+
if !s.stopOnFirstError {
50+
gCtx = ctx
51+
}
52+
g.SetLimit(len(s.functions))
53+
for i := range s.functions {
54+
g.Go(func() error {
55+
err := DetermineContextError(gCtx)
56+
if err != nil {
57+
return err
58+
}
59+
return s.executeFunc(gCtx, s.functions[i])
60+
})
61+
}
62+
63+
err := g.Wait()
64+
if err == nil && s.clearOnExecution {
65+
s.functions = make([]T, 0, len(s.functions))
3066
}
31-
s.cancelFunctions = []context.CancelFunc{}
67+
return err
68+
}
69+
70+
type CancelFunctionStore struct {
71+
store[context.CancelFunc]
72+
}
73+
74+
func (s *CancelFunctionStore) RegisterCancelFunction(cancel ...context.CancelFunc) {
75+
s.store.RegisterFunction(cancel...)
76+
}
77+
78+
func (s *CancelFunctionStore) Cancel() {
79+
_ = s.Execute(context.Background())
3280
}
3381

3482
func (s *CancelFunctionStore) Len() int {
35-
defer s.mu.RUnlock()
36-
s.mu.RLock()
37-
return len(s.cancelFunctions)
83+
return s.store.Len()
3884
}
3985

86+
// NewCancelFunctionsStore creates a store for cancel functions.
4087
func NewCancelFunctionsStore() *CancelFunctionStore {
4188
return &CancelFunctionStore{
42-
cancelFunctions: []context.CancelFunc{},
89+
store: *newFunctionStore[context.CancelFunc](true, false, func(_ context.Context, cancelFunc context.CancelFunc) error {
90+
cancelFunc()
91+
return nil
92+
}),
4393
}
4494
}

utils/parallelisation/mocks/mock_parallelisation.go

Lines changed: 54 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

utils/parallelisation/onclose.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package parallelisation
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/ARM-software/golang-utils/utils/commonerrors"
8+
)
9+
10+
type CloserStore struct {
11+
store[io.Closer]
12+
}
13+
14+
func (s *CloserStore) RegisterCloser(closerObj ...io.Closer) {
15+
s.store.RegisterFunction(closerObj...)
16+
}
17+
18+
func (s *CloserStore) Close() error {
19+
return s.Execute(context.Background())
20+
}
21+
22+
func (s *CloserStore) Len() int {
23+
return s.store.Len()
24+
}
25+
26+
// NewCloserStore returns a store of io.Closer object which will all be closed concurrently on Close(). The first error received will be returned
27+
func NewCloserStore(stopOnFirstError bool) *CloserStore {
28+
return &CloserStore{
29+
store: *newFunctionStore[io.Closer](false, stopOnFirstError, func(_ context.Context, closerObj io.Closer) error {
30+
if closerObj == nil {
31+
return commonerrors.UndefinedVariable("closer object")
32+
}
33+
return closerObj.Close()
34+
}),
35+
}
36+
}
37+
38+
// CloseAll calls concurrently Close on all io.Closer implementations passed as arguments and returns the first error encountered
39+
func CloseAll(cs ...io.Closer) error {
40+
group := NewCloserStore(false)
41+
group.RegisterFunction(cs...)
42+
return group.Close()
43+
}
44+
45+
// CloseAllWithContext is similar to CloseAll but can be controlled using a context.
46+
func CloseAllWithContext(ctx context.Context, cs ...io.Closer) error {
47+
group := NewCloserStore(false)
48+
group.RegisterFunction(cs...)
49+
return group.Execute(ctx)
50+
}
51+
52+
// CloseAllFunc calls concurrently all Close functions passed as arguments and returns the first error encountered
53+
func CloseAllFunc(cs ...CloseFunc) error {
54+
group := NewCloseFunctionStoreStore(false)
55+
group.RegisterFunction(cs...)
56+
return group.Close()
57+
}
58+
59+
type CloseFunc func() error
60+
61+
type CloseFunctionStore struct {
62+
store[CloseFunc]
63+
}
64+
65+
func (s *CloseFunctionStore) RegisterCloseFunction(closerObj ...CloseFunc) {
66+
s.store.RegisterFunction(closerObj...)
67+
}
68+
69+
func (s *CloseFunctionStore) Close() error {
70+
return s.Execute(context.Background())
71+
}
72+
73+
func (s *CloseFunctionStore) Len() int {
74+
return s.store.Len()
75+
}
76+
77+
// NewCloseFunctionStoreStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned.
78+
func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore {
79+
return &CloseFunctionStore{
80+
store: *newFunctionStore[CloseFunc](false, stopOnFirstError, func(_ context.Context, closerObj CloseFunc) error {
81+
return closerObj()
82+
}),
83+
}
84+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package parallelisation
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"go.uber.org/mock/gomock"
8+
9+
"github.com/ARM-software/golang-utils/utils/commonerrors"
10+
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
11+
"github.com/ARM-software/golang-utils/utils/parallelisation/mocks"
12+
)
13+
14+
//go:generate go tool mockgen -destination=./mocks/mock_$GOPACKAGE.go -package=mocks io Closer
15+
func TestCloseAll(t *testing.T) {
16+
t.Run("close", func(t *testing.T) {
17+
ctlr := gomock.NewController(t)
18+
defer ctlr.Finish()
19+
20+
closerMock := mocks.NewMockCloser(ctlr)
21+
closerMock.EXPECT().Close().Return(nil).MinTimes(1)
22+
23+
require.NoError(t, CloseAll(closerMock, closerMock, closerMock))
24+
})
25+
26+
t.Run("close with error", func(t *testing.T) {
27+
ctlr := gomock.NewController(t)
28+
defer ctlr.Finish()
29+
closeError := commonerrors.ErrUnexpected
30+
31+
closerMock := mocks.NewMockCloser(ctlr)
32+
closerMock.EXPECT().Close().Return(closeError).MinTimes(1)
33+
34+
errortest.AssertError(t, CloseAll(closerMock, closerMock, closerMock), closeError)
35+
})
36+
37+
t.Run("close with 1 error", func(t *testing.T) {
38+
closeError := commonerrors.ErrUnexpected
39+
40+
errortest.AssertError(t, CloseAllFunc(func() error { return nil }, func() error { return nil }, func() error { return closeError }, func() error { return nil }), closeError)
41+
})
42+
43+
}

utils/parallelisation/wait.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func WaitWithContextAndError(ctx context.Context, wg IErrorWaiter) (err error) {
2828
}
2929
}
3030

31-
// IError can be used to wait on sync WaitGroups and similar types where Wait() does not return an error
31+
// IWaiter can be used to wait on sync WaitGroups and similar types where Wait() does not return an error
3232
// This is used to support use in the WaitWithContext function to wait but listen to contexts
3333
type IWaiter interface {
3434
Wait()

0 commit comments

Comments
 (0)