Skip to content

Commit 14fcae0

Browse files
parallisation Add support for more context when returning from RunActionWithParallelCheck (#757)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add support for more context when returning from RunActionWithParallelCheck ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 4c8c804 commit 14fcae0

File tree

3 files changed

+264
-20
lines changed

3 files changed

+264
-20
lines changed

changes/20251205145557.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `parallelisation` Add support for more context when returning from RunActionWithParallelCheck

utils/parallelisation/parallelisation.go

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"go.uber.org/atomic"
15+
"golang.org/x/sync/errgroup"
1516

1617
"github.com/ARM-software/golang-utils/utils/collection"
1718
"github.com/ARM-software/golang-utils/utils/commonerrors"
@@ -210,38 +211,85 @@ func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Durati
210211
}
211212
}
212213

214+
type (
215+
ActionFunc func(ctx context.Context) (err error)
216+
CheckFunc func(ctx context.Context) (ok bool)
217+
CheckWithResultFunc[T any] func(ctx context.Context) (res T, ok bool)
218+
ResultCheckFunc[T any] func(res T) (err error)
219+
)
220+
213221
// RunActionWithParallelCheck runs an action with a check in parallel
214-
// The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled.
215-
func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) bool, checkPeriod time.Duration) error {
216-
err := DetermineContextError(ctx)
222+
// The function performing the check should return true if the check should be repeated; false otherwise it should not.
223+
// For more context about how the check ended, a result can be returned. If the check did not have the expected result
224+
// then the whole function would be cancelled.
225+
func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action ActionFunc, checkAction CheckWithResultFunc[T], onCheckResult ResultCheckFunc[T], checkPeriod time.Duration) (res T, ok bool, err error) {
226+
err = DetermineContextError(ctx)
217227
if err != nil {
218-
return err
228+
return
219229
}
230+
231+
var errGroup errgroup.Group
232+
220233
cancelStore := NewCancelFunctionsStore()
221234
defer cancelStore.Cancel()
235+
222236
cancellableCtx, cancelFunc := context.WithCancel(ctx)
223237
cancelStore.RegisterCancelFunction(cancelFunc)
224-
go func(ctx context.Context, store *CancelFunctionStore) {
225-
for {
226-
select {
227-
case <-ctx.Done():
228-
store.Cancel()
229-
return
230-
default:
231-
if !checkAction(ctx) {
232-
store.Cancel()
238+
239+
errGroup.Go(func() error {
240+
return func(ctx context.Context, store *CancelFunctionStore) (err error) {
241+
defer store.Cancel()
242+
for {
243+
select {
244+
case <-ctx.Done():
233245
return
246+
default:
247+
res, ok = checkAction(ctx)
248+
249+
err = onCheckResult(res)
250+
if err != nil {
251+
return
252+
}
253+
254+
if !ok {
255+
return
256+
}
257+
258+
SleepWithContext(ctx, checkPeriod)
234259
}
235-
SleepWithContext(ctx, checkPeriod)
236260
}
237-
}
238-
}(cancellableCtx, cancelStore)
261+
}(cancellableCtx, cancelStore)
262+
})
263+
239264
err = action(cancellableCtx)
240-
err2 := DetermineContextError(cancellableCtx)
241-
if err2 != nil {
242-
return err2
265+
if errCtx := DetermineContextError(cancellableCtx); errCtx != nil {
266+
err = errCtx
243267
}
244-
return err
268+
cancelFunc()
269+
270+
if egErr := errGroup.Wait(); commonerrors.Ignore(egErr, commonerrors.ErrCancelled, commonerrors.ErrTimeout) != nil {
271+
err = egErr
272+
return
273+
}
274+
275+
return
276+
}
277+
278+
// RunActionWithParallelCheck runs an action with a check in parallel
279+
// The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result then the whole function would be cancelled.
280+
func RunActionWithParallelCheck(ctx context.Context, action ActionFunc, checkAction CheckFunc, checkPeriod time.Duration) (err error) {
281+
_, _, err = RunActionWithParallelCheckAndResult(
282+
ctx,
283+
action,
284+
func(ctx context.Context) (_ struct{}, ok bool) {
285+
ok = checkAction(ctx)
286+
return
287+
},
288+
func(_ struct{}) error { return nil },
289+
checkPeriod,
290+
)
291+
292+
return
245293
}
246294

247295
// WaitUntil waits for a condition evaluated by evalCondition to be verified

utils/parallelisation/parallelisation_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"testing"
1515
"time"
1616

17+
"github.com/go-faker/faker/v4"
1718
"github.com/stretchr/testify/assert"
1819
"github.com/stretchr/testify/require"
1920
"go.uber.org/atomic"
@@ -418,6 +419,200 @@ func runActionWithParallelCheckFailAtRandom(t *testing.T, ctx context.Context) {
418419
assert.GreaterOrEqual(t, counter.Load(), int32(1))
419420
}
420421

422+
func TestRunActionWithParallelCheckAndResult(t *testing.T) {
423+
type parallelisationCheckResult struct {
424+
checks int32
425+
status string
426+
}
427+
428+
t.Run("Happy", func(t *testing.T) {
429+
defer goleak.VerifyNone(t)
430+
431+
checkCounter := atomic.NewInt32(0)
432+
checkResultCounter := atomic.NewInt32(0)
433+
434+
res, ok, err := RunActionWithParallelCheckAndResult(
435+
context.Background(),
436+
func(ctx context.Context) (err error) {
437+
time.Sleep(120 * time.Millisecond)
438+
return
439+
},
440+
func(ctx context.Context) (res parallelisationCheckResult, ok bool) {
441+
return parallelisationCheckResult{
442+
checks: checkCounter.Inc(),
443+
status: "healthy",
444+
}, true
445+
},
446+
func(_ parallelisationCheckResult) error {
447+
checkResultCounter.Inc()
448+
return nil
449+
},
450+
10*time.Millisecond,
451+
)
452+
453+
require.NoError(t, err)
454+
require.True(t, ok)
455+
456+
assert.GreaterOrEqual(t, res.checks, int32(10))
457+
assert.Equal(t, res.checks, checkCounter.Load())
458+
assert.Equal(t, "healthy", res.status)
459+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
460+
})
461+
462+
t.Run("Check Fails With Reason", func(t *testing.T) {
463+
defer goleak.VerifyNone(t)
464+
465+
checkCounter := atomic.NewInt32(0)
466+
checkResultCounter := atomic.NewInt32(0)
467+
actionStarted := atomic.NewBool(false)
468+
469+
status := "adrien"
470+
471+
res, ok, err := RunActionWithParallelCheckAndResult(
472+
context.Background(),
473+
func(ctx context.Context) error {
474+
actionStarted.Store(true)
475+
<-ctx.Done()
476+
return DetermineContextError(ctx)
477+
},
478+
func(ctx context.Context) (res parallelisationCheckResult, ok bool) {
479+
if n := checkCounter.Inc(); n >= 5 {
480+
return parallelisationCheckResult{
481+
checks: n,
482+
status: status,
483+
}, false
484+
} else {
485+
return parallelisationCheckResult{
486+
checks: n,
487+
status: "ok",
488+
}, true
489+
}
490+
},
491+
func(_ parallelisationCheckResult) error {
492+
checkResultCounter.Inc()
493+
return nil
494+
},
495+
5*time.Millisecond,
496+
)
497+
498+
require.True(t, actionStarted.Load())
499+
require.Error(t, err)
500+
errortest.AssertError(t, err, commonerrors.ErrCancelled)
501+
502+
require.False(t, ok)
503+
assert.Equal(t, status, res.status)
504+
assert.Equal(t, int32(5), res.checks)
505+
assert.Equal(t, int32(5), checkCounter.Load())
506+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
507+
})
508+
t.Run("Action Error (no context cancel)", func(t *testing.T) {
509+
defer goleak.VerifyNone(t)
510+
511+
checkCounter := atomic.NewInt32(0)
512+
checkResultCounter := atomic.NewInt32(0)
513+
status := "abdel"
514+
515+
res, ok, err := RunActionWithParallelCheckAndResult(
516+
context.Background(),
517+
func(ctx context.Context) error {
518+
time.Sleep(30 * time.Millisecond)
519+
return commonerrors.New(commonerrors.ErrForbidden, faker.Sentence())
520+
},
521+
func(ctx context.Context) (parallelisationCheckResult, bool) {
522+
return parallelisationCheckResult{
523+
checks: checkCounter.Inc(),
524+
status: status,
525+
}, true
526+
},
527+
func(_ parallelisationCheckResult) error {
528+
checkResultCounter.Inc()
529+
return nil
530+
},
531+
5*time.Millisecond,
532+
)
533+
534+
require.Error(t, err)
535+
errortest.AssertError(t, err, commonerrors.ErrForbidden)
536+
require.True(t, ok)
537+
538+
assert.Equal(t, status, res.status)
539+
assert.GreaterOrEqual(t, res.checks, int32(1))
540+
assert.Equal(t, res.checks, checkCounter.Load())
541+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
542+
})
543+
544+
t.Run("Context cancel", func(t *testing.T) {
545+
defer goleak.VerifyNone(t)
546+
547+
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
548+
defer cancel()
549+
550+
checkCounter := atomic.NewInt32(0)
551+
checkResultCounter := atomic.NewInt32(0)
552+
status := "kem"
553+
554+
res, ok, err := RunActionWithParallelCheckAndResult(
555+
ctx,
556+
func(ctx context.Context) error {
557+
<-ctx.Done()
558+
return DetermineContextError(ctx)
559+
},
560+
func(ctx context.Context) (parallelisationCheckResult, bool) {
561+
return parallelisationCheckResult{
562+
checks: checkCounter.Inc(),
563+
status: status,
564+
}, true
565+
},
566+
func(_ parallelisationCheckResult) error {
567+
checkResultCounter.Inc()
568+
return nil
569+
},
570+
5*time.Millisecond,
571+
)
572+
573+
require.Error(t, err)
574+
errortest.AssertError(t, err, commonerrors.ErrTimeout)
575+
assert.True(t, ok)
576+
assert.GreaterOrEqual(t, res.checks, int32(1))
577+
assert.Equal(t, res.checks, checkCounter.Load())
578+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
579+
})
580+
581+
t.Run("Check result error", func(t *testing.T) {
582+
defer goleak.VerifyNone(t)
583+
584+
checkCounter := atomic.NewInt32(0)
585+
checkResultCounter := atomic.NewInt32(0)
586+
status := "kem"
587+
588+
res, ok, err := RunActionWithParallelCheckAndResult(
589+
context.Background(),
590+
func(ctx context.Context) error {
591+
<-ctx.Done()
592+
return DetermineContextError(ctx)
593+
},
594+
func(ctx context.Context) (parallelisationCheckResult, bool) {
595+
return parallelisationCheckResult{
596+
checks: checkCounter.Inc(),
597+
status: status,
598+
}, true
599+
},
600+
func(_ parallelisationCheckResult) error {
601+
checkResultCounter.Inc()
602+
return commonerrors.ErrUnexpected
603+
},
604+
5*time.Millisecond,
605+
)
606+
607+
require.Error(t, err)
608+
errortest.AssertError(t, err, commonerrors.ErrUnexpected)
609+
assert.True(t, ok)
610+
assert.GreaterOrEqual(t, res.checks, int32(1))
611+
assert.Equal(t, res.checks, checkCounter.Load())
612+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
613+
})
614+
}
615+
421616
func TestWaitUntil(t *testing.T) {
422617
defer goleak.VerifyNone(t)
423618
verifiedCondition := func(ctx context.Context) (bool, error) {

0 commit comments

Comments
 (0)