From 85e333f2fd80b11c859c3cff846be01703105518 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 3 Oct 2025 12:14:05 +0100 Subject: [PATCH 1/4] :sparkles: `parallisation` Add support for execution groups with priorities --- changes/20251003121340.feature | 1 + utils/parallelisation/contextual.go | 12 ++++ utils/parallelisation/group.go | 99 +++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+) create mode 100644 changes/20251003121340.feature diff --git a/changes/20251003121340.feature b/changes/20251003121340.feature new file mode 100644 index 0000000000..ade4022d37 --- /dev/null +++ b/changes/20251003121340.feature @@ -0,0 +1 @@ +:sparkles: `parallisation` Add support for execution groups with priorities diff --git a/utils/parallelisation/contextual.go b/utils/parallelisation/contextual.go index dbd3d99c0a..8e64186713 100644 --- a/utils/parallelisation/contextual.go +++ b/utils/parallelisation/contextual.go @@ -26,6 +26,18 @@ func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup { } } +// NewContextualGroupWithPriority returns a group executing contextual functions that will be run in priority order. +func NewPriorityContextualGroup(options ...StoreOption) *PriorityExecutionGroup[ContextualFunc] { + return newPriorityExecutionGroup[ContextualFunc]( + func(options ...StoreOption) IExecutionGroup[ContextualFunc] { + return NewExecutionGroup[ContextualFunc](func(ctx context.Context, f ContextualFunc) error { + return f(ctx) + }, options...) + }, + options..., + ) +} + // ForEach executes all the contextual functions according to the store options and returns an error if one occurred. func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error { group := NewContextualGroup(ExecuteAll(executionOptions).Options()...) diff --git a/utils/parallelisation/group.go b/utils/parallelisation/group.go index de91d757e4..a7aa21a0ae 100644 --- a/utils/parallelisation/group.go +++ b/utils/parallelisation/group.go @@ -2,7 +2,9 @@ package parallelisation import ( "context" + "maps" "math" + "slices" "github.com/sasha-s/go-deadlock" "go.uber.org/atomic" @@ -452,3 +454,100 @@ func (g *CompoundExecutionGroup) RegisterExecutor(group ...IExecutor) { }) } } + +var _ IExecutionGroup[IExecutor] = &PriorityExecutionGroup[IExecutor]{} + +type PriorityExecutionGroup[T any] struct { + mu deadlock.RWMutex + groups map[uint]IExecutionGroup[T] + options []StoreOption + newGroup func(...StoreOption) IExecutionGroup[T] +} + +func newPriorityExecutionGroup[T any](newGroup func(...StoreOption) IExecutionGroup[T], options ...StoreOption) *PriorityExecutionGroup[T] { + return &PriorityExecutionGroup[T]{ + mu: deadlock.RWMutex{}, + groups: make(map[uint]IExecutionGroup[T]), + options: options, + newGroup: newGroup, + } +} + +// NewPriorityExecutionGroup returns an execution group that can execute functions in order according to priority rules. +// Parallel commands with differing priorities will be executed in groups according to their priority. +// Sequential commands will be executed in order of their priority, no guarantees are made about the order of when +// the priority is the same as another command. +func NewPriorityExecutionGroup(options ...StoreOption) *PriorityExecutionGroup[IExecutor] { + return newPriorityExecutionGroup[IExecutor]( + func(options ...StoreOption) IExecutionGroup[IExecutor] { + return NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, options...) + }, + options..., + ) +} + +func (g *PriorityExecutionGroup[T]) check() { + g.mu.Lock() + defer g.mu.Unlock() + + if g.groups == nil { + g.groups = make(map[uint]IExecutionGroup[T]) + } + if g.options == nil { + g.options = DefaultOptions().Options() + } + if g.newGroup == nil { + g.newGroup = func(options ...StoreOption) IExecutionGroup[T] { + return NewExecutionGroup[T](func(context.Context, T) error { + return commonerrors.UndefinedVariableWithMessage("g.newGroup", "priority execution group has not been initialised correctly") + }) + } + } +} + +// RegisterExecutor registers executors with a specific priority (lower values indidcate higher priority) +func (g *PriorityExecutionGroup[T]) RegisterFunctionWithPriority(priority uint, function ...T) { + g.check() + + g.mu.Lock() + defer g.mu.Unlock() + + if g.groups[priority] == nil { + g.groups[priority] = g.newGroup(g.options...) + } + g.groups[priority].RegisterFunction(function...) +} + +// RegisterExecutor registers executors with a priority of zero (highest priority) +func (g *PriorityExecutionGroup[T]) RegisterFunction(function ...T) { + g.RegisterFunctionWithPriority(0, function...) +} + +func (g *PriorityExecutionGroup[T]) Len() (n int) { + g.mu.RLock() + defer g.mu.RUnlock() + + for _, group := range g.groups { + n += group.Len() + } + return +} + +func (g *PriorityExecutionGroup[T]) executors() (executor *CompoundExecutionGroup) { + g.mu.RLock() + defer g.mu.RUnlock() + + executor = NewCompoundExecutionGroup(g.options...) + for _, key := range slices.Sorted(maps.Keys(g.groups)) { + executor.RegisterExecutor(g.groups[key]) + } + + return +} + +// Execute will execute all the groups according to the priorities of the functions +func (g *PriorityExecutionGroup[T]) Execute(ctx context.Context) error { + return g.executors().Execute(ctx) +} From 1288fe15319851af4dc175bcb4b3583ab83d57a6 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 3 Oct 2025 15:42:57 +0100 Subject: [PATCH 2/4] add tests --- utils/parallelisation/group.go | 99 ----- utils/parallelisation/priority_group.go | 107 +++++ utils/parallelisation/priority_group_test.go | 399 +++++++++++++++++++ 3 files changed, 506 insertions(+), 99 deletions(-) create mode 100644 utils/parallelisation/priority_group.go create mode 100644 utils/parallelisation/priority_group_test.go diff --git a/utils/parallelisation/group.go b/utils/parallelisation/group.go index a7aa21a0ae..de91d757e4 100644 --- a/utils/parallelisation/group.go +++ b/utils/parallelisation/group.go @@ -2,9 +2,7 @@ package parallelisation import ( "context" - "maps" "math" - "slices" "github.com/sasha-s/go-deadlock" "go.uber.org/atomic" @@ -454,100 +452,3 @@ func (g *CompoundExecutionGroup) RegisterExecutor(group ...IExecutor) { }) } } - -var _ IExecutionGroup[IExecutor] = &PriorityExecutionGroup[IExecutor]{} - -type PriorityExecutionGroup[T any] struct { - mu deadlock.RWMutex - groups map[uint]IExecutionGroup[T] - options []StoreOption - newGroup func(...StoreOption) IExecutionGroup[T] -} - -func newPriorityExecutionGroup[T any](newGroup func(...StoreOption) IExecutionGroup[T], options ...StoreOption) *PriorityExecutionGroup[T] { - return &PriorityExecutionGroup[T]{ - mu: deadlock.RWMutex{}, - groups: make(map[uint]IExecutionGroup[T]), - options: options, - newGroup: newGroup, - } -} - -// NewPriorityExecutionGroup returns an execution group that can execute functions in order according to priority rules. -// Parallel commands with differing priorities will be executed in groups according to their priority. -// Sequential commands will be executed in order of their priority, no guarantees are made about the order of when -// the priority is the same as another command. -func NewPriorityExecutionGroup(options ...StoreOption) *PriorityExecutionGroup[IExecutor] { - return newPriorityExecutionGroup[IExecutor]( - func(options ...StoreOption) IExecutionGroup[IExecutor] { - return NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { - return e.Execute(ctx) - }, options...) - }, - options..., - ) -} - -func (g *PriorityExecutionGroup[T]) check() { - g.mu.Lock() - defer g.mu.Unlock() - - if g.groups == nil { - g.groups = make(map[uint]IExecutionGroup[T]) - } - if g.options == nil { - g.options = DefaultOptions().Options() - } - if g.newGroup == nil { - g.newGroup = func(options ...StoreOption) IExecutionGroup[T] { - return NewExecutionGroup[T](func(context.Context, T) error { - return commonerrors.UndefinedVariableWithMessage("g.newGroup", "priority execution group has not been initialised correctly") - }) - } - } -} - -// RegisterExecutor registers executors with a specific priority (lower values indidcate higher priority) -func (g *PriorityExecutionGroup[T]) RegisterFunctionWithPriority(priority uint, function ...T) { - g.check() - - g.mu.Lock() - defer g.mu.Unlock() - - if g.groups[priority] == nil { - g.groups[priority] = g.newGroup(g.options...) - } - g.groups[priority].RegisterFunction(function...) -} - -// RegisterExecutor registers executors with a priority of zero (highest priority) -func (g *PriorityExecutionGroup[T]) RegisterFunction(function ...T) { - g.RegisterFunctionWithPriority(0, function...) -} - -func (g *PriorityExecutionGroup[T]) Len() (n int) { - g.mu.RLock() - defer g.mu.RUnlock() - - for _, group := range g.groups { - n += group.Len() - } - return -} - -func (g *PriorityExecutionGroup[T]) executors() (executor *CompoundExecutionGroup) { - g.mu.RLock() - defer g.mu.RUnlock() - - executor = NewCompoundExecutionGroup(g.options...) - for _, key := range slices.Sorted(maps.Keys(g.groups)) { - executor.RegisterExecutor(g.groups[key]) - } - - return -} - -// Execute will execute all the groups according to the priorities of the functions -func (g *PriorityExecutionGroup[T]) Execute(ctx context.Context) error { - return g.executors().Execute(ctx) -} diff --git a/utils/parallelisation/priority_group.go b/utils/parallelisation/priority_group.go new file mode 100644 index 0000000000..5476a0d470 --- /dev/null +++ b/utils/parallelisation/priority_group.go @@ -0,0 +1,107 @@ +package parallelisation + +import ( + "context" + "maps" + "slices" + + "github.com/ARM-software/golang-utils/utils/commonerrors" + "github.com/sasha-s/go-deadlock" +) + +var _ IExecutionGroup[IExecutor] = &PriorityExecutionGroup[IExecutor]{} + +type PriorityExecutionGroup[T any] struct { + mu deadlock.RWMutex + groups map[uint]IExecutionGroup[T] + options []StoreOption + newGroup func(...StoreOption) IExecutionGroup[T] +} + +func newPriorityExecutionGroup[T any](newGroup func(...StoreOption) IExecutionGroup[T], options ...StoreOption) *PriorityExecutionGroup[T] { + return &PriorityExecutionGroup[T]{ + mu: deadlock.RWMutex{}, + groups: make(map[uint]IExecutionGroup[T]), + options: options, + newGroup: newGroup, + } +} + +// NewPriorityExecutionGroup returns an execution group that can execute functions in order according to priority rules. +// Parallel commands with differing priorities will be executed in groups according to their priority. +// Sequential commands will be executed in order of their priority, no guarantees are made about the order of when +// the priority is the same as another command. +func NewPriorityExecutionGroup(options ...StoreOption) *PriorityExecutionGroup[IExecutor] { + return newPriorityExecutionGroup[IExecutor]( + func(options ...StoreOption) IExecutionGroup[IExecutor] { + return NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, options...) + }, + options..., + ) +} + +func (g *PriorityExecutionGroup[T]) check() { + g.mu.Lock() + defer g.mu.Unlock() + + if g.groups == nil { + g.groups = make(map[uint]IExecutionGroup[T]) + } + if g.options == nil { + g.options = DefaultOptions().Options() + } + if g.newGroup == nil { + g.newGroup = func(options ...StoreOption) IExecutionGroup[T] { + // since none of the methods return errors directly we inject executors that will force the error and reveal it to the consumer + return NewExecutionGroup[T](func(context.Context, T) error { + return commonerrors.UndefinedVariableWithMessage("g.newGroup", "priority execution group has not been initialised correctly") + }) + } + } +} + +// RegisterExecutor registers executors with a specific priority (lower values indidcate higher priority) +func (g *PriorityExecutionGroup[T]) RegisterFunctionWithPriority(priority uint, function ...T) { + g.check() + + g.mu.Lock() + defer g.mu.Unlock() + + if g.groups[priority] == nil { + g.groups[priority] = g.newGroup(g.options...) + } + g.groups[priority].RegisterFunction(function...) +} + +// RegisterExecutor registers executors with a priority of zero (highest priority) +func (g *PriorityExecutionGroup[T]) RegisterFunction(function ...T) { + g.RegisterFunctionWithPriority(0, function...) +} + +func (g *PriorityExecutionGroup[T]) Len() (n int) { + g.mu.RLock() + defer g.mu.RUnlock() + + for _, group := range g.groups { + n += group.Len() + } + return +} + +func (g *PriorityExecutionGroup[T]) executors() (executor *CompoundExecutionGroup) { + g.mu.RLock() + defer g.mu.RUnlock() + + executor = NewCompoundExecutionGroup(DefaultOptions().MergeWithOptions(Sequential).Options()...) + for _, key := range slices.Sorted(maps.Keys(g.groups)) { + executor.RegisterExecutor(g.groups[key]) + } + return +} + +// Execute will execute all the groups according to the priorities of the functions +func (g *PriorityExecutionGroup[T]) Execute(ctx context.Context) error { + return g.executors().Execute(ctx) +} diff --git a/utils/parallelisation/priority_group_test.go b/utils/parallelisation/priority_group_test.go new file mode 100644 index 0000000000..9ec20e387e --- /dev/null +++ b/utils/parallelisation/priority_group_test.go @@ -0,0 +1,399 @@ +package parallelisation + +import ( + "context" + "slices" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/goleak" + + "github.com/ARM-software/golang-utils/utils/commonerrors" + "github.com/ARM-software/golang-utils/utils/commonerrors/errortest" +) + +type testExecutorFunc func(context.Context) error + +func (f testExecutorFunc) Execute(ctx context.Context) error { return f(ctx) } + +var _ IExecutor = (testExecutorFunc)(nil) + +// testRecordingExecutor will emit supplied values into a shared slice for order comparison +type testRecordingExecutor struct { + valueToEmit uint + executionOrder *atomicSlice + duration time.Duration +} + +func newRecordingExec(valueToEmit uint, executionOrder *atomicSlice) *testRecordingExecutor { + return newRecordingExecOfDuration(valueToEmit, executionOrder, 10*time.Millisecond) +} + +func newRecordingExecOfDuration(valueToEmit uint, executionOrder *atomicSlice, duration time.Duration) *testRecordingExecutor { + return &testRecordingExecutor{ + valueToEmit: valueToEmit, + executionOrder: executionOrder, + duration: duration, + } +} + +func (r *testRecordingExecutor) Execute(ctx context.Context) (err error) { + r.executionOrder.Append(r.valueToEmit) + SleepWithContext(ctx, r.duration) + return +} + +type atomicSlice struct { + mu sync.RWMutex + data []uint +} + +func newAtomicSlice(t *testing.T) *atomicSlice { + t.Helper() + return &atomicSlice{ + mu: sync.RWMutex{}, + data: make([]uint, 0), + } +} + +func (a *atomicSlice) Append(v ...uint) { + a.mu.Lock() + defer a.mu.Unlock() + a.data = append(a.data, v...) +} + +func (a *atomicSlice) IsSorted() bool { + a.mu.RLock() + defer a.mu.RUnlock() + return slices.IsSorted(a.data) +} + +func (a *atomicSlice) Len() int { + a.mu.RLock() + defer a.mu.RUnlock() + return len(a.data) +} + +func (a *atomicSlice) Data() []uint { + a.mu.RLock() + defer a.mu.RUnlock() + return a.data +} + +func TestPriority(t *testing.T) { + t.Run("single executor group", func(t *testing.T) { + t.Run("all sequential", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + priorities := []uint{3, 1, 2, 2, 0} + require.False(t, slices.IsSorted(priorities)) + + priorityGroup := NewPriorityExecutionGroup(Sequential, RetainAfterExecution) + + priorityGroup.RegisterFunctionWithPriority(priorities[0], newRecordingExec(priorities[0], executionOrder)) + priorityGroup.RegisterFunctionWithPriority(priorities[1], newRecordingExec(priorities[1], executionOrder)) + priorityGroup.RegisterFunctionWithPriority(priorities[2], newRecordingExec(priorities[2], executionOrder)) + priorityGroup.RegisterFunctionWithPriority(priorities[3], newRecordingExec(priorities[3], executionOrder)) + priorityGroup.RegisterFunctionWithPriority(priorities[4], newRecordingExec(priorities[4], executionOrder)) + + require.NoError(t, priorityGroup.Execute(context.Background())) + + assert.True(t, executionOrder.IsSorted()) + assert.EqualValues(t, executionOrder.Len(), len(priorities)) + }) + + t.Run("all parallel", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + priorities := []uint{2, 3, 2} + require.False(t, slices.IsSorted(priorities)) + + priorityGroup := NewPriorityExecutionGroup(Parallel, Workers(4), RetainAfterExecution) + + eachRunDuration := 100 * time.Millisecond + + priorityGroup.RegisterFunctionWithPriority(priorities[0], newRecordingExecOfDuration(priorities[0], executionOrder, eachRunDuration)) + priorityGroup.RegisterFunctionWithPriority(priorities[1], newRecordingExecOfDuration(priorities[1], executionOrder, eachRunDuration)) + priorityGroup.RegisterFunctionWithPriority(priorities[2], newRecordingExecOfDuration(priorities[2], executionOrder, eachRunDuration)) + + start := time.Now() + require.NoError(t, priorityGroup.Execute(context.Background())) + actualDuration := time.Since(start) + + assert.True(t, executionOrder.IsSorted()) + + // total duration for parallel executions should be pretty much the same as the number of priorities * eachRunDuration. This will indicate that they ran concurrently + prioritiesSorted := slices.Clone(priorities) + slices.Sort(prioritiesSorted) + expectedTotalDuration := eachRunDuration * time.Duration(len(slices.Compact(prioritiesSorted))) // account for different priorities + diff := expectedTotalDuration - actualDuration + assert.LessOrEqual(t, diff.Abs(), eachRunDuration/5) + }) + + }) + + t.Run("newGroup not set", func(t *testing.T) { + defer goleak.VerifyNone(t) + + var priorityGroup PriorityExecutionGroup[IExecutor] // no constructor used + + var called atomic.Bool + priorityGroup.RegisterFunction(testExecutorFunc(func(ctx context.Context) (err error) { + called.Store(true) + return + })) + + err := priorityGroup.Execute(context.Background()) + assert.Error(t, err) + errortest.AssertErrorDescription(t, err, "priority execution group has not been initialised correctly") + assert.False(t, called.Load()) + }) + + t.Run("multiple groups", func(t *testing.T) { + t.Run("all sequential", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + + priorities := []uint{2, 3, 2} + require.False(t, slices.IsSorted(priorities)) + + group1 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Sequential) + group1.RegisterFunction( + newRecordingExec(priorities[0], executionOrder), + newRecordingExec(priorities[1], executionOrder), + ) + + group2 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Sequential) + group2.RegisterFunction( + newRecordingExec(priorities[2], executionOrder), + ) + + priorityGroup := NewPriorityExecutionGroup(Sequential) + priorityGroup.RegisterFunctionWithPriority(5, group1) + priorityGroup.RegisterFunctionWithPriority(1, group2) + + require.NoError(t, priorityGroup.Execute(context.Background())) + + expected := []uint{priorities[2], priorities[0], priorities[1]} // 2 then 2,3 + assert.Equal(t, expected, executionOrder.Data()) + }) + + t.Run("two parallel groups (outer sequential)", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + priorities := []uint{20, 20, 10, 10} + + testDuration := 100 * time.Millisecond + + group1 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel, Workers(4)) + group1.RegisterFunction( + newRecordingExecOfDuration(priorities[0], executionOrder, testDuration), + newRecordingExecOfDuration(priorities[1], executionOrder, testDuration), + ) + + group2 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel, Workers(4)) + group2.RegisterFunction( + newRecordingExecOfDuration(priorities[2], executionOrder, testDuration), + newRecordingExecOfDuration(priorities[3], executionOrder, testDuration), + ) + + priorityGroup := NewPriorityExecutionGroup(Sequential) + priorityGroup.RegisterFunctionWithPriority(5, group1) + priorityGroup.RegisterFunctionWithPriority(1, group2) + + start := time.Now() + require.NoError(t, priorityGroup.Execute(context.Background())) + actualDuration := time.Since(start) + + require.EqualValues(t, executionOrder.Len(), 4) + assert.IsNonDecreasing(t, executionOrder.Data()) + + prioritiesSorted := slices.Clone(priorities) + slices.Sort(prioritiesSorted) + expectedTotalDuration := testDuration * 2 // two parallel tests in order so should take 2*testDuration + diff := expectedTotalDuration - actualDuration + assert.LessOrEqual(t, diff.Abs(), testDuration/5) + }) + + t.Run("mixed (group2 sequential and group1 parallel)", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + + priorities := []uint{20, 21, 10, 11} + + group1 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel, Workers(4)) + group1.RegisterFunction( + newRecordingExec(priorities[0], executionOrder), + newRecordingExec(priorities[1], executionOrder), + ) + + group2 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Sequential) + group2.RegisterFunction( + newRecordingExec(priorities[2], executionOrder), + newRecordingExec(priorities[3], executionOrder), + ) + + priorityGroup := NewPriorityExecutionGroup(Sequential) + priorityGroup.RegisterFunctionWithPriority(5, group1) + priorityGroup.RegisterFunctionWithPriority(1, group2) + + require.NoError(t, priorityGroup.Execute(context.Background())) + + require.EqualValues(t, executionOrder.Len(), 4) + assert.Equal(t, priorities[2:], executionOrder.Data()[:2]) // 10, 11 in order + assert.ElementsMatch(t, priorities[:2], executionOrder.Data()[2:]) // 20 & 21 any order + }) + + t.Run("two parallel groups in outer parallel (outer parallel same priority)", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + + priorities := []uint{20, 20, 10, 10} // each group will have all members run in parallel + + testDuration := 100 * time.Millisecond + + group1 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel, Workers(4)) + group1.RegisterFunction( + newRecordingExecOfDuration(priorities[0], executionOrder, testDuration), + newRecordingExecOfDuration(priorities[1], executionOrder, testDuration), + ) + + group2 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel) + group2.RegisterFunction( + newRecordingExecOfDuration(priorities[2], executionOrder, testDuration), + newRecordingExecOfDuration(priorities[3], executionOrder, testDuration), + ) + + priorityGroup := NewPriorityExecutionGroup(Parallel, Workers(4)) + priorityGroup.RegisterFunctionWithPriority(1, group1) + priorityGroup.RegisterFunctionWithPriority(1, group2) + + start := time.Now() + require.NoError(t, priorityGroup.Execute(context.Background())) + actualDuration := time.Since(start) + + prioritiesSorted := slices.Clone(priorities) + slices.Sort(prioritiesSorted) + expectedTotalDuration := testDuration // all should run at once since the different priorities are in different groups + diff := expectedTotalDuration - actualDuration + assert.LessOrEqual(t, diff.Abs(), testDuration/5) + }) + + t.Run("two parallel groups in outer parallel (outer parallel different priorities)", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + + priorities := []uint{20, 20, 10, 10} // each group will have all members run in parallel + + testDuration := 100 * time.Millisecond + + group1 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel, Workers(4)) + group1.RegisterFunction( + newRecordingExecOfDuration(priorities[0], executionOrder, testDuration), + newRecordingExecOfDuration(priorities[1], executionOrder, testDuration), + ) + + group2 := NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error { + return e.Execute(ctx) + }, Parallel) + group2.RegisterFunction( + newRecordingExecOfDuration(priorities[2], executionOrder, testDuration), + newRecordingExecOfDuration(priorities[3], executionOrder, testDuration), + ) + + priorityGroup := NewPriorityExecutionGroup(Parallel, Workers(4)) + priorityGroup.RegisterFunctionWithPriority(5, group1) + priorityGroup.RegisterFunctionWithPriority(1, group2) + + start := time.Now() + require.NoError(t, priorityGroup.Execute(context.Background())) + actualDuration := time.Since(start) + + prioritiesSorted := slices.Clone(priorities) + slices.Sort(prioritiesSorted) + expectedTotalDuration := 2 * testDuration // parallel groups have different priorities so act in sequential manner + diff := expectedTotalDuration - actualDuration + assert.LessOrEqual(t, diff.Abs(), testDuration/5) + }) + + }) + + t.Run("default priority (zero) is highest", func(t *testing.T) { + defer goleak.VerifyNone(t) + + executionOrder := newAtomicSlice(t) + + priorityGroup := NewPriorityExecutionGroup(Sequential) + + priorityGroup.RegisterFunctionWithPriority(1, newRecordingExec(1, executionOrder)) + + priorityGroup.RegisterFunction(newRecordingExec(0, executionOrder)) + + require.NoError(t, priorityGroup.Execute(context.Background())) + assert.Equal(t, []uint{0, 1}, executionOrder.Data()) + assert.Equal(t, 2, priorityGroup.Len()) + }) + + t.Run("cancel", func(t *testing.T) { + defer goleak.VerifyNone(t) + + priorityGroup := NewPriorityExecutionGroup(Parallel) + + priorityGroup.RegisterFunction(testExecutorFunc(func(ctx context.Context) error { + return DetermineContextError(ctx) + })) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := priorityGroup.Execute(ctx) + errortest.AssertError(t, err, commonerrors.ErrCancelled) + }) + + t.Run("timeout", func(t *testing.T) { + defer goleak.VerifyNone(t) + + priorityGroup := NewPriorityExecutionGroup(Parallel) + + priorityGroup.RegisterFunction(testExecutorFunc(func(ctx context.Context) error { + return DetermineContextError(ctx) + })) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + time.Sleep(100 * time.Millisecond) + + err := priorityGroup.Execute(ctx) + errortest.AssertError(t, err, commonerrors.ErrTimeout) + }) +} From cad467e3969fbb7661fafb96432ffc800804a168 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 3 Oct 2025 15:43:51 +0100 Subject: [PATCH 3/4] spelling --- changes/20251003121340.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/20251003121340.feature b/changes/20251003121340.feature index ade4022d37..d585a51421 100644 --- a/changes/20251003121340.feature +++ b/changes/20251003121340.feature @@ -1 +1 @@ -:sparkles: `parallisation` Add support for execution groups with priorities +:sparkles: `parallelisation` Add support for execution groups with priorities From eabe7ea8188b8ad2324a5ffdb96921c662bc0a32 Mon Sep 17 00:00:00 2001 From: joshjennings98 Date: Fri, 3 Oct 2025 15:47:24 +0100 Subject: [PATCH 4/4] linting --- utils/parallelisation/priority_group.go | 3 ++- utils/parallelisation/priority_group_test.go | 8 ++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/utils/parallelisation/priority_group.go b/utils/parallelisation/priority_group.go index 5476a0d470..6c8049d562 100644 --- a/utils/parallelisation/priority_group.go +++ b/utils/parallelisation/priority_group.go @@ -5,8 +5,9 @@ import ( "maps" "slices" - "github.com/ARM-software/golang-utils/utils/commonerrors" "github.com/sasha-s/go-deadlock" + + "github.com/ARM-software/golang-utils/utils/commonerrors" ) var _ IExecutionGroup[IExecutor] = &PriorityExecutionGroup[IExecutor]{} diff --git a/utils/parallelisation/priority_group_test.go b/utils/parallelisation/priority_group_test.go index 9ec20e387e..5d5968d5e2 100644 --- a/utils/parallelisation/priority_group_test.go +++ b/utils/parallelisation/priority_group_test.go @@ -368,9 +368,7 @@ func TestPriority(t *testing.T) { priorityGroup := NewPriorityExecutionGroup(Parallel) - priorityGroup.RegisterFunction(testExecutorFunc(func(ctx context.Context) error { - return DetermineContextError(ctx) - })) + priorityGroup.RegisterFunction(testExecutorFunc(DetermineContextError)) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -384,9 +382,7 @@ func TestPriority(t *testing.T) { priorityGroup := NewPriorityExecutionGroup(Parallel) - priorityGroup.RegisterFunction(testExecutorFunc(func(ctx context.Context) error { - return DetermineContextError(ctx) - })) + priorityGroup.RegisterFunction(testExecutorFunc(DetermineContextError)) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel()