Skip to content

Commit 568b011

Browse files
add onCheckResult
1 parent 6d26d24 commit 568b011

File tree

2 files changed

+39
-14
lines changed

2 files changed

+39
-14
lines changed

utils/parallelisation/parallelisation.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func RunActionWithTimeoutAndCancelStore(ctx context.Context, timeout time.Durati
211211
}
212212
}
213213

214-
func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), checkPeriod time.Duration) (res T, ok bool, err error) {
214+
func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T), checkPeriod time.Duration) (res T, ok bool, err error) {
215215
cancelStore := NewCancelFunctionsStore()
216216
defer cancelStore.Cancel()
217217

@@ -228,6 +228,9 @@ func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action fun
228228
return
229229
default:
230230
res, ok = checkAction(ctx)
231+
232+
onCheckResult(res)
233+
231234
if !ok {
232235
store.Cancel()
233236
return
@@ -250,7 +253,7 @@ func runActionAndWait[T any](ctx context.Context, wg *sync.WaitGroup, action fun
250253
// The function performing the check should return true if the check was favourable; false otherwise.
251254
// For more context, a result can be returned. If the check did not have the expected result and the
252255
// whole function would be cancelled.
253-
func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), checkPeriod time.Duration) (res T, ok bool, err error) {
256+
func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func(ctx context.Context) error, checkAction func(ctx context.Context) (res T, ok bool), onCheckResult func(res T), checkPeriod time.Duration) (res T, ok bool, err error) {
254257
err = DetermineContextError(ctx)
255258
if err != nil {
256259
return
@@ -259,7 +262,7 @@ func RunActionWithParallelCheckAndResult[T any](ctx context.Context, action func
259262
var wg sync.WaitGroup
260263
defer wg.Wait()
261264

262-
res, ok, err = runActionAndWait(ctx, &wg, action, checkAction, checkPeriod)
265+
res, ok, err = runActionAndWait(ctx, &wg, action, checkAction, onCheckResult, checkPeriod)
263266
return
264267
}
265268

@@ -273,6 +276,7 @@ func RunActionWithParallelCheck(ctx context.Context, action func(ctx context.Con
273276
ok = checkAction(ctx)
274277
return
275278
},
279+
func(_ struct{}) {},
276280
checkPeriod,
277281
)
278282

utils/parallelisation/parallelisation_test.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
428428
t.Run("Happy", func(t *testing.T) {
429429
defer goleak.VerifyNone(t)
430430

431-
counter := atomic.NewInt32(0)
431+
checkCounter := atomic.NewInt32(0)
432+
checkResultCounter := atomic.NewInt32(0)
432433

433434
res, ok, err := RunActionWithParallelCheckAndResult(
434435
context.Background(),
@@ -438,25 +439,30 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
438439
},
439440
func(ctx context.Context) (res parallelisationCheckResult, ok bool) {
440441
return parallelisationCheckResult{
441-
checks: counter.Inc(),
442+
checks: checkCounter.Inc(),
442443
status: "healthy",
443444
}, true
444445
},
446+
func(_ parallelisationCheckResult) {
447+
checkResultCounter.Inc()
448+
},
445449
10*time.Millisecond,
446450
)
447451

448452
require.NoError(t, err)
449453
require.True(t, ok)
450454

451455
assert.GreaterOrEqual(t, res.checks, int32(10))
452-
assert.Equal(t, res.checks, counter.Load())
456+
assert.Equal(t, res.checks, checkCounter.Load())
453457
assert.Equal(t, "healthy", res.status)
458+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
454459
})
455460

456461
t.Run("Check Fails With Reason", func(t *testing.T) {
457462
defer goleak.VerifyNone(t)
458463

459-
counter := atomic.NewInt32(0)
464+
checkCounter := atomic.NewInt32(0)
465+
checkResultCounter := atomic.NewInt32(0)
460466
actionStarted := atomic.NewBool(false)
461467

462468
status := "adrien"
@@ -469,7 +475,7 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
469475
return DetermineContextError(ctx)
470476
},
471477
func(ctx context.Context) (res parallelisationCheckResult, ok bool) {
472-
if n := counter.Inc(); n >= 5 {
478+
if n := checkCounter.Inc(); n >= 5 {
473479
return parallelisationCheckResult{
474480
checks: n,
475481
status: status,
@@ -481,6 +487,9 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
481487
}, true
482488
}
483489
},
490+
func(_ parallelisationCheckResult) {
491+
checkResultCounter.Inc()
492+
},
484493
5*time.Millisecond,
485494
)
486495

@@ -491,12 +500,14 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
491500
require.False(t, ok)
492501
assert.Equal(t, status, res.status)
493502
assert.Equal(t, int32(5), res.checks)
494-
assert.Equal(t, int32(5), counter.Load())
503+
assert.Equal(t, int32(5), checkCounter.Load())
504+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
495505
})
496506
t.Run("Action Error (no context cancel)", func(t *testing.T) {
497507
defer goleak.VerifyNone(t)
498508

499-
counter := atomic.NewInt32(0)
509+
checkCounter := atomic.NewInt32(0)
510+
checkResultCounter := atomic.NewInt32(0)
500511
status := "abdel"
501512

502513
res, ok, err := RunActionWithParallelCheckAndResult(
@@ -507,10 +518,13 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
507518
},
508519
func(ctx context.Context) (parallelisationCheckResult, bool) {
509520
return parallelisationCheckResult{
510-
checks: counter.Inc(),
521+
checks: checkCounter.Inc(),
511522
status: status,
512523
}, true
513524
},
525+
func(_ parallelisationCheckResult) {
526+
checkResultCounter.Inc()
527+
},
514528
5*time.Millisecond,
515529
)
516530

@@ -520,7 +534,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
520534

521535
assert.Equal(t, status, res.status)
522536
assert.GreaterOrEqual(t, res.checks, int32(1))
523-
assert.Equal(t, res.checks, counter.Load())
537+
assert.Equal(t, res.checks, checkCounter.Load())
538+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
524539
})
525540

526541
t.Run("Context cancel", func(t *testing.T) {
@@ -529,7 +544,8 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
529544
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
530545
defer cancel()
531546

532-
counter := atomic.NewInt32(0)
547+
checkCounter := atomic.NewInt32(0)
548+
checkResultCounter := atomic.NewInt32(0)
533549
status := "kem"
534550

535551
res, ok, err := RunActionWithParallelCheckAndResult(
@@ -540,17 +556,22 @@ func TestRunActionWithParallelCheckAndResult(t *testing.T) {
540556
},
541557
func(ctx context.Context) (parallelisationCheckResult, bool) {
542558
return parallelisationCheckResult{
543-
checks: counter.Inc(),
559+
checks: checkCounter.Inc(),
544560
status: status,
545561
}, true
546562
},
563+
func(_ parallelisationCheckResult) {
564+
checkResultCounter.Inc()
565+
},
547566
5*time.Millisecond,
548567
)
549568

550569
require.Error(t, err)
551570
errortest.AssertError(t, err, commonerrors.ErrTimeout)
552571
assert.True(t, ok)
553572
assert.GreaterOrEqual(t, res.checks, int32(1))
573+
assert.Equal(t, res.checks, checkCounter.Load())
574+
assert.Equal(t, checkCounter.Load(), checkResultCounter.Load())
554575
})
555576
}
556577

0 commit comments

Comments
 (0)