From 6d26d2458199bb2cfca6bd0e82c4dd6232524758 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 8 Dec 2025 10:17:41 +0000 Subject: [PATCH 1/6] :sparkles: `parallisation` Add support for more context when returning from RunActionWithParallelCheck --- changes/20251205145557.feature | 1 + utils/parallelisation/parallelisation.go | 59 ++++++-- utils/parallelisation/parallelisation_test.go | 136 ++++++++++++++++++ 3 files changed, 184 insertions(+), 12 deletions(-) create mode 100644 changes/20251205145557.feature diff --git a/changes/20251205145557.feature b/changes/20251205145557.feature new file mode 100644 index 0000000000..bc1aae5af2 --- /dev/null +++ b/changes/20251205145557.feature @@ -0,0 +1 @@ +:sparkles: `parallisation` Add support for more context when returning from RunActionWithParallelCheck diff --git a/utils/parallelisation/parallelisation.go b/utils/parallelisation/parallelisation.go index 5852b253bd..8ed914cf20 100644 --- a/utils/parallelisation/parallelisation.go +++ b/utils/parallelisation/parallelisation.go @@ -9,6 +9,7 @@ package parallelisation import ( "context" "reflect" + "sync" "time" "go.uber.org/atomic" @@ -210,38 +211,72 @@ func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Durati } } -// RunActionWithParallelCheck runs an action with a check in parallel -// The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled. -func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) bool, checkPeriod time.Duration) error { - err := DetermineContextError(ctx) - if err != nil { - return err - } +func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), checkPeriod time.Duration) (res T, ok bool, err error) { cancelStore := NewCancelFunctionsStore() defer cancelStore.Cancel() + cancellableCtx, cancelFunc := context.WithCancel(ctx) cancelStore.RegisterCancelFunction(cancelFunc) + + wg.Add(1) go func(ctx context.Context, store *CancelFunctionStore) { + defer wg.Done() for { select { case <-ctx.Done(): store.Cancel() return default: - if !checkAction(ctx) { + res, ok = checkAction(ctx) + if !ok { store.Cancel() return } + SleepWithContext(ctx, checkPeriod) } } }(cancellableCtx, cancelStore) + err = action(cancellableCtx) - err2 := DetermineContextError(cancellableCtx) - if err2 != nil { - return err2 + if errCtx := DetermineContextError(cancellableCtx); errCtx != nil { + err = errCtx + } + + return +} + +// RunActionWithParallelCheck runs an action with a check in parallel +// The function performing the check should return true if the check was favourable; false otherwise. +// For more context, a result can be returned. If the check did not have the expected result and the +// whole function would be cancelled. +func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), checkPeriod time.Duration) (res T, ok bool, err error) { + err = DetermineContextError(ctx) + if err != nil { + return } - return err + + var wg sync.WaitGroup + defer wg.Wait() + + res, ok, err = runActionAndWait(ctx, &wg, action, checkAction, checkPeriod) + return +} + +// RunActionWithParallelCheck runs an action with a check in parallel +// The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled. +func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) bool, checkPeriod time.Duration) (err error) { + _, _, err = RunActionWithParallelCheckAndResult( + ctx, + action, + func(ctx context.Context) (_ struct{}, ok bool) { + ok = checkAction(ctx) + return + }, + checkPeriod, + ) + + return } // WaitUntil waits for a condition evaluated by evalCondition to be verified diff --git a/utils/parallelisation/parallelisation_test.go b/utils/parallelisation/parallelisation_test.go index a0e63f17b8..5c1c93f9fe 100644 --- a/utils/parallelisation/parallelisation_test.go +++ b/utils/parallelisation/parallelisation_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/go-faker/faker/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -418,6 +419,141 @@ func runActionWithParallelCheckFailAtRandom(t *testing.T, ctx context.Context) { assert.GreaterOrEqual(t, counter.Load(), int32(1)) } +func TestRunActionWithParallelCheckAndResult(t *testing.T) { + type parallelisationCheckResult struct { + checks int32 + status string + } + + t.Run("Happy", func(t *testing.T) { + defer goleak.VerifyNone(t) + + counter := atomic.NewInt32(0) + + res, ok, err := RunActionWithParallelCheckAndResult( + context.Background(), + func(ctx context.Context) (err error) { + time.Sleep(120 * time.Millisecond) + return + }, + func(ctx context.Context) (res parallelisationCheckResult, ok bool) { + return parallelisationCheckResult{ + checks: counter.Inc(), + status: "healthy", + }, true + }, + 10*time.Millisecond, + ) + + require.NoError(t, err) + require.True(t, ok) + + assert.GreaterOrEqual(t, res.checks, int32(10)) + assert.Equal(t, res.checks, counter.Load()) + assert.Equal(t, "healthy", res.status) + }) + + t.Run("Check Fails With Reason", func(t *testing.T) { + defer goleak.VerifyNone(t) + + counter := atomic.NewInt32(0) + actionStarted := atomic.NewBool(false) + + status := "adrien" + + res, ok, err := RunActionWithParallelCheckAndResult( + context.Background(), + func(ctx context.Context) error { + actionStarted.Store(true) + <-ctx.Done() + return DetermineContextError(ctx) + }, + func(ctx context.Context) (res parallelisationCheckResult, ok bool) { + if n := counter.Inc(); n >= 5 { + return parallelisationCheckResult{ + checks: n, + status: status, + }, false + } else { + return parallelisationCheckResult{ + checks: n, + status: "ok", + }, true + } + }, + 5*time.Millisecond, + ) + + require.True(t, actionStarted.Load()) + require.Error(t, err) + errortest.AssertError(t, err, commonerrors.ErrCancelled) + + require.False(t, ok) + assert.Equal(t, status, res.status) + assert.Equal(t, int32(5), res.checks) + assert.Equal(t, int32(5), counter.Load()) + }) + t.Run("Action Error (no context cancel)", func(t *testing.T) { + defer goleak.VerifyNone(t) + + counter := atomic.NewInt32(0) + status := "abdel" + + res, ok, err := RunActionWithParallelCheckAndResult( + context.Background(), + func(ctx context.Context) error { + time.Sleep(30 * time.Millisecond) + return commonerrors.New(commonerrors.ErrForbidden, faker.Sentence()) + }, + func(ctx context.Context) (parallelisationCheckResult, bool) { + return parallelisationCheckResult{ + checks: counter.Inc(), + status: status, + }, true + }, + 5*time.Millisecond, + ) + + require.Error(t, err) + errortest.AssertError(t, err, commonerrors.ErrForbidden) + require.True(t, ok) + + assert.Equal(t, status, res.status) + assert.GreaterOrEqual(t, res.checks, int32(1)) + assert.Equal(t, res.checks, counter.Load()) + }) + + t.Run("Context cancel", func(t *testing.T) { + defer goleak.VerifyNone(t) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + counter := atomic.NewInt32(0) + status := "kem" + + res, ok, err := RunActionWithParallelCheckAndResult( + ctx, + func(ctx context.Context) error { + <-ctx.Done() + return DetermineContextError(ctx) + }, + func(ctx context.Context) (parallelisationCheckResult, bool) { + return parallelisationCheckResult{ + checks: counter.Inc(), + status: status, + }, true + }, + 5*time.Millisecond, + ) + + require.Error(t, err) + errortest.AssertError(t, err, commonerrors.ErrTimeout) + assert.True(t, ok) + assert.GreaterOrEqual(t, res.checks, int32(1)) + }) +} + func TestWaitUntil(t *testing.T) { defer goleak.VerifyNone(t) verifiedCondition := func(ctx context.Context) (bool, error) { From 568b01166a12b3fb5ad75d20546bf5dd6905cff2 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 8 Dec 2025 13:50:19 +0000 Subject: [PATCH 2/6] add onCheckResult --- utils/parallelisation/parallelisation.go | 10 +++-- utils/parallelisation/parallelisation_test.go | 43 ++++++++++++++----- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/utils/parallelisation/parallelisation.go b/utils/parallelisation/parallelisation.go index 8ed914cf20..73ff2ebfc4 100644 --- a/utils/parallelisation/parallelisation.go +++ b/utils/parallelisation/parallelisation.go @@ -211,7 +211,7 @@ func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Durati } } -func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), checkPeriod time.Duration) (res T, ok bool, err error) { +func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T), checkPeriod time.Duration) (res T, ok bool, err error) { cancelStore := NewCancelFunctionsStore() defer cancelStore.Cancel() @@ -228,6 +228,9 @@ func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action fun return default: res, ok = checkAction(ctx) + + onCheckResult(res) + if !ok { store.Cancel() return @@ -250,7 +253,7 @@ func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action fun // The function performing the check should return true if the check was favourable; false otherwise. // For more context, a result can be returned. If the check did not have the expected result and the // whole function would be cancelled. -func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), checkPeriod time.Duration) (res T, ok bool, err error) { +func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T), checkPeriod time.Duration) (res T, ok bool, err error) { err = DetermineContextError(ctx) if err != nil { return @@ -259,7 +262,7 @@ func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func var wg sync.WaitGroup defer wg.Wait() - res, ok, err = runActionAndWait(ctx, &wg, action, checkAction, checkPeriod) + res, ok, err = runActionAndWait(ctx, &wg, action, checkAction, onCheckResult, checkPeriod) return } @@ -273,6 +276,7 @@ func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Con ok = checkAction(ctx) return }, + func(_ struct{}) {}, checkPeriod, ) diff --git a/utils/parallelisation/parallelisation_test.go b/utils/parallelisation/parallelisation_test.go index 5c1c93f9fe..8ffed9952b 100644 --- a/utils/parallelisation/parallelisation_test.go +++ b/utils/parallelisation/parallelisation_test.go @@ -428,7 +428,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { t.Run("Happy", func(t *testing.T) { defer goleak.VerifyNone(t) - counter := atomic.NewInt32(0) + checkCounter := atomic.NewInt32(0) + checkResultCounter := atomic.NewInt32(0) res, ok, err := RunActionWithParallelCheckAndResult( context.Background(), @@ -438,10 +439,13 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { }, func(ctx context.Context) (res parallelisationCheckResult, ok bool) { return parallelisationCheckResult{ - checks: counter.Inc(), + checks: checkCounter.Inc(), status: "healthy", }, true }, + func(_ parallelisationCheckResult) { + checkResultCounter.Inc() + }, 10*time.Millisecond, ) @@ -449,14 +453,16 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { require.True(t, ok) assert.GreaterOrEqual(t, res.checks, int32(10)) - assert.Equal(t, res.checks, counter.Load()) + assert.Equal(t, res.checks, checkCounter.Load()) assert.Equal(t, "healthy", res.status) + assert.Equal(t, checkCounter.Load(), checkResultCounter.Load()) }) t.Run("Check Fails With Reason", func(t *testing.T) { defer goleak.VerifyNone(t) - counter := atomic.NewInt32(0) + checkCounter := atomic.NewInt32(0) + checkResultCounter := atomic.NewInt32(0) actionStarted := atomic.NewBool(false) status := "adrien" @@ -469,7 +475,7 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { return DetermineContextError(ctx) }, func(ctx context.Context) (res parallelisationCheckResult, ok bool) { - if n := counter.Inc(); n >= 5 { + if n := checkCounter.Inc(); n >= 5 { return parallelisationCheckResult{ checks: n, status: status, @@ -481,6 +487,9 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { }, true } }, + func(_ parallelisationCheckResult) { + checkResultCounter.Inc() + }, 5*time.Millisecond, ) @@ -491,12 +500,14 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { require.False(t, ok) assert.Equal(t, status, res.status) assert.Equal(t, int32(5), res.checks) - assert.Equal(t, int32(5), counter.Load()) + assert.Equal(t, int32(5), checkCounter.Load()) + assert.Equal(t, checkCounter.Load(), checkResultCounter.Load()) }) t.Run("Action Error (no context cancel)", func(t *testing.T) { defer goleak.VerifyNone(t) - counter := atomic.NewInt32(0) + checkCounter := atomic.NewInt32(0) + checkResultCounter := atomic.NewInt32(0) status := "abdel" res, ok, err := RunActionWithParallelCheckAndResult( @@ -507,10 +518,13 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { }, func(ctx context.Context) (parallelisationCheckResult, bool) { return parallelisationCheckResult{ - checks: counter.Inc(), + checks: checkCounter.Inc(), status: status, }, true }, + func(_ parallelisationCheckResult) { + checkResultCounter.Inc() + }, 5*time.Millisecond, ) @@ -520,7 +534,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { assert.Equal(t, status, res.status) assert.GreaterOrEqual(t, res.checks, int32(1)) - assert.Equal(t, res.checks, counter.Load()) + assert.Equal(t, res.checks, checkCounter.Load()) + assert.Equal(t, checkCounter.Load(), checkResultCounter.Load()) }) t.Run("Context cancel", func(t *testing.T) { @@ -529,7 +544,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - counter := atomic.NewInt32(0) + checkCounter := atomic.NewInt32(0) + checkResultCounter := atomic.NewInt32(0) status := "kem" res, ok, err := RunActionWithParallelCheckAndResult( @@ -540,10 +556,13 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { }, func(ctx context.Context) (parallelisationCheckResult, bool) { return parallelisationCheckResult{ - checks: counter.Inc(), + checks: checkCounter.Inc(), status: status, }, true }, + func(_ parallelisationCheckResult) { + checkResultCounter.Inc() + }, 5*time.Millisecond, ) @@ -551,6 +570,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { errortest.AssertError(t, err, commonerrors.ErrTimeout) assert.True(t, ok) assert.GreaterOrEqual(t, res.checks, int32(1)) + assert.Equal(t, res.checks, checkCounter.Load()) + assert.Equal(t, checkCounter.Load(), checkResultCounter.Load()) }) } From fd791eb023bb1c8d5e8337c683cd4d7c56267d83 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Mon, 8 Dec 2025 14:43:09 +0000 Subject: [PATCH 3/6] add onCheckResult error return --- utils/parallelisation/parallelisation.go | 58 ++++++++++--------- utils/parallelisation/parallelisation_test.go | 46 +++++++++++++-- 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/utils/parallelisation/parallelisation.go b/utils/parallelisation/parallelisation.go index 73ff2ebfc4..5b19d896d8 100644 --- a/utils/parallelisation/parallelisation.go +++ b/utils/parallelisation/parallelisation.go @@ -9,10 +9,10 @@ package parallelisation import ( "context" "reflect" - "sync" "time" "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "github.com/ARM-software/golang-utils/utils/collection" "github.com/ARM-software/golang-utils/utils/commonerrors" @@ -211,40 +211,48 @@ func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Durati } } -func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T), checkPeriod time.Duration) (res T, ok bool, err error) { +func runActionAndWait[T any](ctx context.Context, errGroup *errgroup.Group, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T) error, checkPeriod time.Duration) (res T, ok bool, err error) { cancelStore := NewCancelFunctionsStore() defer cancelStore.Cancel() cancellableCtx, cancelFunc := context.WithCancel(ctx) cancelStore.RegisterCancelFunction(cancelFunc) - wg.Add(1) - go func(ctx context.Context, store *CancelFunctionStore) { - defer wg.Done() - for { - select { - case <-ctx.Done(): - store.Cancel() - return - default: - res, ok = checkAction(ctx) + errGroup.Go(func() error { + return func(ctx context.Context, store *CancelFunctionStore) (err error) { + defer store.Cancel() + for { + select { + case <-ctx.Done(): + return + default: + res, ok = checkAction(ctx) - onCheckResult(res) + err = onCheckResult(res) + if err != nil { + return + } - if !ok { - store.Cancel() - return - } + if !ok { + return + } - SleepWithContext(ctx, checkPeriod) + SleepWithContext(ctx, checkPeriod) + } } - } - }(cancellableCtx, cancelStore) + }(cancellableCtx, cancelStore) + }) err = action(cancellableCtx) if errCtx := DetermineContextError(cancellableCtx); errCtx != nil { err = errCtx } + cancelFunc() + + if egErr := errGroup.Wait(); commonerrors.Ignore(egErr, commonerrors.ErrCancelled, commonerrors.ErrTimeout) != nil { + err = egErr + return + } return } @@ -253,16 +261,14 @@ func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action fun // The function performing the check should return true if the check was favourable; false otherwise. // For more context, a result can be returned. If the check did not have the expected result and the // whole function would be cancelled. -func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T), checkPeriod time.Duration) (res T, ok bool, err error) { +func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T) error, checkPeriod time.Duration) (res T, ok bool, err error) { err = DetermineContextError(ctx) if err != nil { return } - var wg sync.WaitGroup - defer wg.Wait() - - res, ok, err = runActionAndWait(ctx, &wg, action, checkAction, onCheckResult, checkPeriod) + var errGroup errgroup.Group + res, ok, err = runActionAndWait(ctx, &errGroup, action, checkAction, onCheckResult, checkPeriod) return } @@ -276,7 +282,7 @@ func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Con ok = checkAction(ctx) return }, - func(_ struct{}) {}, + func(_ struct{}) error { return nil }, checkPeriod, ) diff --git a/utils/parallelisation/parallelisation_test.go b/utils/parallelisation/parallelisation_test.go index 8ffed9952b..2ec31007aa 100644 --- a/utils/parallelisation/parallelisation_test.go +++ b/utils/parallelisation/parallelisation_test.go @@ -443,8 +443,9 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { status: "healthy", }, true }, - func(_ parallelisationCheckResult) { + func(_ parallelisationCheckResult) error { checkResultCounter.Inc() + return nil }, 10*time.Millisecond, ) @@ -487,8 +488,9 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { }, true } }, - func(_ parallelisationCheckResult) { + func(_ parallelisationCheckResult) error { checkResultCounter.Inc() + return nil }, 5*time.Millisecond, ) @@ -522,8 +524,9 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { status: status, }, true }, - func(_ parallelisationCheckResult) { + func(_ parallelisationCheckResult) error { checkResultCounter.Inc() + return nil }, 5*time.Millisecond, ) @@ -560,8 +563,9 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { status: status, }, true }, - func(_ parallelisationCheckResult) { + func(_ parallelisationCheckResult) error { checkResultCounter.Inc() + return nil }, 5*time.Millisecond, ) @@ -573,6 +577,40 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) { assert.Equal(t, res.checks, checkCounter.Load()) assert.Equal(t, checkCounter.Load(), checkResultCounter.Load()) }) + + t.Run("Check result error", func(t *testing.T) { + defer goleak.VerifyNone(t) + + checkCounter := atomic.NewInt32(0) + checkResultCounter := atomic.NewInt32(0) + status := "kem" + + res, ok, err := RunActionWithParallelCheckAndResult( + context.Background(), + func(ctx context.Context) error { + <-ctx.Done() + return DetermineContextError(ctx) + }, + func(ctx context.Context) (parallelisationCheckResult, bool) { + return parallelisationCheckResult{ + checks: checkCounter.Inc(), + status: status, + }, true + }, + func(_ parallelisationCheckResult) error { + checkResultCounter.Inc() + return commonerrors.ErrUnexpected + }, + 5*time.Millisecond, + ) + + require.Error(t, err) + errortest.AssertError(t, err, commonerrors.ErrUnexpected) + assert.True(t, ok) + assert.GreaterOrEqual(t, res.checks, int32(1)) + assert.Equal(t, res.checks, checkCounter.Load()) + assert.Equal(t, checkCounter.Load(), checkResultCounter.Load()) + }) } func TestWaitUntil(t *testing.T) { From b842943fe09cccd7c72d5e3cd463cea0a5c38401 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Tue, 9 Dec 2025 09:05:15 +0000 Subject: [PATCH 4/6] review --- changes/20251205145557.feature | 2 +- utils/parallelisation/parallelisation.go | 37 +++++++++++++----------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/changes/20251205145557.feature b/changes/20251205145557.feature index bc1aae5af2..1710378d56 100644 --- a/changes/20251205145557.feature +++ b/changes/20251205145557.feature @@ -1 +1 @@ -:sparkles: `parallisation` Add support for more context when returning from RunActionWithParallelCheck +:sparkles: `parallelisation` Add support for more context when returning from RunActionWithParallelCheck diff --git a/utils/parallelisation/parallelisation.go b/utils/parallelisation/parallelisation.go index 5b19d896d8..d75ff52479 100644 --- a/utils/parallelisation/parallelisation.go +++ b/utils/parallelisation/parallelisation.go @@ -211,7 +211,25 @@ func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Durati } } -func runActionAndWait[T any](ctx context.Context, errGroup *errgroup.Group, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T) error, checkPeriod time.Duration) (res T, ok bool, err error) { +type ( + ActionFunc func(ctx context.Context) (err error) + CheckFunc func(ctx context.Context) (ok bool) + CheckWithResultFunc[T any] func(ctx context.Context) (res T, ok bool) + ResultCheckFunc[T any] func(res T) (err error) +) + +// RunActionWithParallelCheck runs an action with a check in parallel +// The function performing the check should return true if the check was successful, but not met; false otherwise. +// For more context, a result can be returned. If the check did not have the expected result and the +// whole function would be cancelled. +func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action ActionFunc, checkAction CheckWithResultFunc[T], onCheckResult ResultCheckFunc[T], checkPeriod time.Duration) (res T, ok bool, err error) { + err = DetermineContextError(ctx) + if err != nil { + return + } + + var errGroup errgroup.Group + cancelStore := NewCancelFunctionsStore() defer cancelStore.Cancel() @@ -257,24 +275,9 @@ func runActionAndWait[T any](ctx context.Context, errGroup *errgroup.Group, acti return } -// RunActionWithParallelCheck runs an action with a check in parallel -// The function performing the check should return true if the check was favourable; false otherwise. -// For more context, a result can be returned. If the check did not have the expected result and the -// whole function would be cancelled. -func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T) error, checkPeriod time.Duration) (res T, ok bool, err error) { - err = DetermineContextError(ctx) - if err != nil { - return - } - - var errGroup errgroup.Group - res, ok, err = runActionAndWait(ctx, &errGroup, action, checkAction, onCheckResult, checkPeriod) - return -} - // RunActionWithParallelCheck runs an action with a check in parallel // The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled. -func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) bool, checkPeriod time.Duration) (err error) { +func RunActionWithParallelCheck(ctx context.Context, action ActionFunc, checkAction CheckFunc, checkPeriod time.Duration) (err error) { _, _, err = RunActionWithParallelCheckAndResult( ctx, action, From be196379eea2f4d347e2c63d6856fc4fb6c05df3 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Tue, 9 Dec 2025 11:38:15 +0000 Subject: [PATCH 5/6] review2 --- utils/parallelisation/parallelisation.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/utils/parallelisation/parallelisation.go b/utils/parallelisation/parallelisation.go index d75ff52479..e3a38420f1 100644 --- a/utils/parallelisation/parallelisation.go +++ b/utils/parallelisation/parallelisation.go @@ -219,9 +219,9 @@ type ( ) // RunActionWithParallelCheck runs an action with a check in parallel -// The function performing the check should return true if the check was successful, but not met; false otherwise. -// For more context, a result can be returned. If the check did not have the expected result and the -// whole function would be cancelled. +// The function performing the check should return true if the check should be repeated; false otherwise it should not. +// For more context about how the check ended, a result can be returned. If the check did not have the expected result +// then the whole function would be cancelled. func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action ActionFunc, checkAction CheckWithResultFunc[T], onCheckResult ResultCheckFunc[T], checkPeriod time.Duration) (res T, ok bool, err error) { err = DetermineContextError(ctx) if err != nil { From a56098649a0235a13473b5f72b6e6b0c1b1f9ae6 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Tue, 9 Dec 2025 13:19:28 +0000 Subject: [PATCH 6/6] spelling --- utils/parallelisation/parallelisation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/parallelisation/parallelisation.go b/utils/parallelisation/parallelisation.go index e3a38420f1..f57d808adb 100644 --- a/utils/parallelisation/parallelisation.go +++ b/utils/parallelisation/parallelisation.go @@ -276,7 +276,7 @@ func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action Acti } // RunActionWithParallelCheck runs an action with a check in parallel -// The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result and the whole function would be cancelled. +// The function performing the check should return true if the check was favourable; false otherwise. If the check did not have the expected result then the whole function would be cancelled. func RunActionWithParallelCheck(ctx context.Context, action ActionFunc, checkAction CheckFunc, checkPeriod time.Duration) (err error) { _, _, err = RunActionWithParallelCheckAndResult( ctx,