Skip to content

Commit d594b64

Browse files
✨ Add support for gracefully killing child processes (#662)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add support for gracefully killing child processes ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update). --------- Co-authored-by: Adrien CABARBAYE <adrien.cabarbaye@arm.com>
1 parent 6eeb721 commit d594b64

19 files changed

+572
-112
lines changed

changes/20250731140445.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: Add support for gracefully killing child processes

changes/20250804122854.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[collection]` added collection functional operations `Map`, `Filter`, `Reject`, `Reduce`

changes/20250804122923.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[parallelisation]` added parallelised collection functional operations `Map`, `Filter`, `Reject`

changes/20250804130842.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `[proc]` added a function to find processes based on name

utils/collection/conditions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (c *Conditions) Xor() bool {
110110
return Xor(*c...)
111111
}
112112

113-
// OneHot performs an `OnHot` operation on all conditions
113+
// OneHot performs an `OneHot` operation on all conditions
114114
func (c *Conditions) OneHot() bool {
115115
if c == nil {
116116
return false

utils/collection/search.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,45 @@ func AnyFunc[S ~[]E, E any](s S, f func(E) bool) bool {
7171
return conditions.Any()
7272
}
7373

74+
// Filter returns a new slice that contains elements from the input slice which return true when they’re passed as a parameter to the provided filtering function f.
75+
func Filter[S ~[]E, E any](s S, f func(E) bool) (result S) {
76+
result = make(S, 0, len(s))
77+
78+
for i := range s {
79+
if f(s[i]) {
80+
result = append(result, s[i])
81+
}
82+
}
83+
84+
return result
85+
}
86+
87+
// Map creates a new slice and populates it with the results of calling the provided function on every element in input slice.
88+
func Map[T1 any, T2 any](s []T1, f func(T1) T2) (result []T2) {
89+
result = make([]T2, len(s))
90+
91+
for i := range s {
92+
result[i] = f(s[i])
93+
}
94+
95+
return result
96+
}
97+
98+
// Reject is the opposite of Filter and returns the elements of collection for which the filtering function f returns false.
99+
// This is functionally equivalent to slices.DeleteFunc but it returns a new slice.
100+
func Reject[S ~[]E, E any](s S, f func(E) bool) S {
101+
return Filter(s, func(e E) bool { return !f(e) })
102+
}
103+
104+
// Reduce runs a reducer function f over all elements in the array, in ascending-index order, and accumulates them into a single value.
105+
func Reduce[T1, T2 any](s []T1, accumulator T2, f func(T2, T1) T2) (result T2) {
106+
result = accumulator
107+
for i := range s {
108+
result = f(result, s[i])
109+
}
110+
return
111+
}
112+
74113
// AnyEmpty returns whether there is one entry in the slice which is empty.
75114
// If strict, then whitespaces are considered as empty strings
76115
func AnyEmpty(strict bool, slice []string) bool {

utils/collection/search_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
package collection
66

77
import (
8+
"fmt"
9+
"strconv"
810
"testing"
911

1012
"github.com/go-faker/faker/v4"
@@ -110,3 +112,44 @@ func TestAllNotEmpty(t *testing.T) {
110112
assert.False(t, AllNotEmpty(false, []string{faker.Username(), "", faker.Name(), "", faker.Sentence()}))
111113
assert.True(t, AllNotEmpty(false, []string{faker.Username(), faker.Name(), faker.Sentence()}))
112114
}
115+
116+
func TestFilterReject(t *testing.T) {
117+
nums := []int{1, 2, 3, 4, 5}
118+
assert.ElementsMatch(t, []int{2, 4}, Filter(nums, func(n int) bool {
119+
return n%2 == 0
120+
}))
121+
assert.ElementsMatch(t, []int{1, 3, 5}, Reject(nums, func(n int) bool {
122+
return n%2 == 0
123+
}))
124+
assert.ElementsMatch(t, []int{4, 5}, Filter(nums, func(n int) bool {
125+
return n > 3
126+
}))
127+
assert.ElementsMatch(t, []int{1, 2, 3}, Reject(nums, func(n int) bool {
128+
return n > 3
129+
}))
130+
assert.ElementsMatch(t, []string{"foo", "bar"}, Filter([]string{"", "foo", "", "bar", ""}, func(x string) bool {
131+
return len(x) > 0
132+
}))
133+
assert.ElementsMatch(t, []string{"", "", ""}, Reject([]string{"", "foo", "", "bar", ""}, func(x string) bool {
134+
return len(x) > 0
135+
}))
136+
}
137+
138+
func TestMap(t *testing.T) {
139+
mapped := Map([]int{1, 2}, func(i int) string {
140+
return fmt.Sprintf("Hello world %v", i)
141+
})
142+
assert.ElementsMatch(t, []string{"Hello world 1", "Hello world 2"}, mapped)
143+
mapped = Map([]int64{1, 2, 3, 4}, func(x int64) string {
144+
return strconv.FormatInt(x, 10)
145+
})
146+
assert.ElementsMatch(t, []string{"1", "2", "3", "4"}, mapped)
147+
}
148+
149+
func TestReduce(t *testing.T) {
150+
nums := []int{1, 2, 3, 4, 5}
151+
sumOfNums := Reduce(nums, 0, func(acc, n int) int {
152+
return acc + n
153+
})
154+
assert.Equal(t, sumOfNums, 15)
155+
}

utils/parallelisation/parallelisation.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,21 @@ func DetermineContextError(ctx context.Context) error {
2323
}
2424

2525
type result struct {
26-
Item interface{}
26+
Item any
2727
err error
2828
}
2929

3030
// Parallelise parallelises an action over as many goroutines as specified by the argList and retrieves all the results when all the goroutines are done.
31-
func Parallelise(argList interface{}, action func(arg interface{}) (interface{}, error), resultType reflect.Type) (results interface{}, err error) {
31+
// To control the number of goroutines spawned, prefer WorkerPool
32+
func Parallelise(argList any, action func(arg any) (any, error), resultType reflect.Type) (results any, err error) {
3233
keepReturn := resultType != nil
3334
argListValue := reflect.ValueOf(argList)
3435
length := argListValue.Len()
3536
channel := make(chan result, length)
3637
for i := 0; i < length; i++ {
37-
go func(args reflect.Value, actionFunc func(arg interface{}) (interface{}, error)) {
38+
go func(args reflect.Value, actionFunc func(arg any) (any, error)) {
3839
var r result
39-
r.Item, r.err = func(v reflect.Value) (interface{}, error) {
40+
r.Item, r.err = func(v reflect.Value) (any, error) {
4041
return actionFunc(v.Interface())
4142
}(args)
4243
channel <- r
@@ -306,13 +307,19 @@ func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int,
306307
for range numWorkers {
307308
g.Go(func() error { return newWorker(gCtx, f, jobsChan, resultsChan) })
308309
}
309-
for _, job := range jobs {
310-
jobsChan <- job
310+
for i := range jobs {
311+
if DetermineContextError(ctx) != nil {
312+
break
313+
}
314+
jobsChan <- jobs[i]
311315
}
312316

313317
close(jobsChan)
314318
err = g.Wait()
315319
close(resultsChan)
320+
if err == nil {
321+
err = DetermineContextError(ctx)
322+
}
316323
if err != nil {
317324
return
318325
}
@@ -323,3 +330,36 @@ func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int,
323330

324331
return
325332
}
333+
334+
// Filter is similar to collection.Filter but uses parallelisation.
335+
func Filter[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) (result []T, err error) {
336+
result, err = WorkerPool[T, T](ctx, numWorkers, s, func(fCtx context.Context, item T) (r T, ok bool, fErr error) {
337+
fErr = DetermineContextError(fCtx)
338+
if fErr != nil {
339+
return
340+
}
341+
ok = f(item)
342+
r = item
343+
return
344+
})
345+
return
346+
}
347+
348+
// Map is similar to collection.Map but uses parallelisation.
349+
func Map[T1 any, T2 any](ctx context.Context, numWorkers int, s []T1, f func(T1) T2) (result []T2, err error) {
350+
result, err = WorkerPool[T1, T2](ctx, numWorkers, s, func(fCtx context.Context, item T1) (r T2, ok bool, fErr error) {
351+
fErr = DetermineContextError(fCtx)
352+
if fErr != nil {
353+
return
354+
}
355+
r = f(item)
356+
ok = true
357+
return
358+
})
359+
return
360+
}
361+
362+
// Reject is the opposite of Filter and returns the elements of collection for which the filtering function f returns false.
363+
func Reject[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) ([]T, error) {
364+
return Filter[T](ctx, numWorkers, s, func(e T) bool { return !f(e) })
365+
}

utils/parallelisation/parallelisation_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"math/rand"
1212
"reflect"
13+
"strconv"
1314
"testing"
1415
"time"
1516

@@ -378,6 +379,7 @@ func runActionWithParallelCheckHappy(t *testing.T, ctx context.Context) {
378379
}
379380
err := RunActionWithParallelCheck(ctx, action, checkAction, 10*time.Millisecond)
380381
require.NoError(t, err)
382+
assert.Equal(t, int32(15), counter.Load())
381383
}
382384

383385
func runActionWithParallelCheckFail(t *testing.T, ctx context.Context) {
@@ -394,6 +396,7 @@ func runActionWithParallelCheckFail(t *testing.T, ctx context.Context) {
394396
err := RunActionWithParallelCheck(ctx, action, checkAction, 10*time.Millisecond)
395397
require.Error(t, err)
396398
errortest.AssertError(t, err, commonerrors.ErrCancelled)
399+
assert.Equal(t, int32(1), counter.Load())
397400
}
398401

399402
func runActionWithParallelCheckFailAtRandom(t *testing.T, ctx context.Context) {
@@ -410,9 +413,11 @@ func runActionWithParallelCheckFailAtRandom(t *testing.T, ctx context.Context) {
410413
err := RunActionWithParallelCheck(ctx, action, checkAction, 10*time.Millisecond)
411414
require.Error(t, err)
412415
errortest.AssertError(t, err, commonerrors.ErrCancelled)
416+
assert.GreaterOrEqual(t, counter.Load(), int32(1))
413417
}
414418

415419
func TestWaitUntil(t *testing.T) {
420+
defer goleak.VerifyNone(t)
416421
verifiedCondition := func(ctx context.Context) (bool, error) {
417422
SleepWithContext(ctx, 50*time.Millisecond)
418423
return true, nil
@@ -465,6 +470,7 @@ func TestWaitUntil(t *testing.T) {
465470
}
466471

467472
func TestWorkerPool(t *testing.T) {
473+
defer goleak.VerifyNone(t)
468474
for _, test := range []struct {
469475
name string
470476
numWorkers int
@@ -562,3 +568,71 @@ func TestWorkerPool(t *testing.T) {
562568
errortest.AssertError(t, err, commonerrors.ErrCancelled)
563569
})
564570
}
571+
572+
func TestFilterReject(t *testing.T) {
573+
defer goleak.VerifyNone(t)
574+
nums := []int{1, 2, 3, 4, 5}
575+
ctx := context.Background()
576+
results, err := Filter(ctx, 3, nums, func(n int) bool {
577+
return n%2 == 0
578+
})
579+
require.NoError(t, err)
580+
assert.ElementsMatch(t, []int{2, 4}, results)
581+
results, err = Reject(ctx, 3, nums, func(n int) bool {
582+
return n%2 == 0
583+
})
584+
require.NoError(t, err)
585+
assert.ElementsMatch(t, []int{1, 3, 5}, results)
586+
results, err = Filter(ctx, 3, nums, func(n int) bool {
587+
return n > 3
588+
})
589+
require.NoError(t, err)
590+
assert.ElementsMatch(t, []int{4, 5}, results)
591+
results, err = Reject(ctx, 3, nums, func(n int) bool {
592+
return n > 3
593+
})
594+
require.NoError(t, err)
595+
assert.ElementsMatch(t, []int{1, 2, 3}, results)
596+
results2, err := Filter(ctx, 3, []string{"", "foo", "", "bar", ""}, func(x string) bool {
597+
return len(x) > 0
598+
})
599+
600+
require.NoError(t, err)
601+
assert.ElementsMatch(t, []string{"foo", "bar"}, results2)
602+
results3, err := Reject(ctx, 3, []string{"", "foo", "", "bar", ""}, func(x string) bool {
603+
return len(x) > 0
604+
})
605+
require.NoError(t, err)
606+
assert.ElementsMatch(t, []string{"", "", ""}, results3)
607+
t.Run("cancelled context", func(t *testing.T) {
608+
cancelledCtx, cancel := context.WithCancel(context.Background())
609+
cancel()
610+
_, err := Filter(cancelledCtx, 3, nums, func(n int) bool {
611+
return n%2 == 0
612+
})
613+
errortest.AssertError(t, err, commonerrors.ErrCancelled)
614+
})
615+
}
616+
617+
func TestMap(t *testing.T) {
618+
defer goleak.VerifyNone(t)
619+
ctx := context.Background()
620+
mapped, err := Map(ctx, 3, []int{1, 2}, func(i int) string {
621+
return fmt.Sprintf("Hello world %v", i)
622+
})
623+
require.NoError(t, err)
624+
assert.ElementsMatch(t, []string{"Hello world 1", "Hello world 2"}, mapped)
625+
mapped, err = Map(ctx, 3, []int64{1, 2, 3, 4}, func(x int64) string {
626+
return strconv.FormatInt(x, 10)
627+
})
628+
require.NoError(t, err)
629+
assert.ElementsMatch(t, []string{"1", "2", "3", "4"}, mapped)
630+
t.Run("cancelled context", func(t *testing.T) {
631+
cancelledCtx, cancel := context.WithCancel(context.Background())
632+
cancel()
633+
_, err := Map(cancelledCtx, 3, []int{1, 2}, func(i int) string {
634+
return fmt.Sprintf("Hello world %v", i)
635+
})
636+
errortest.AssertError(t, err, commonerrors.ErrCancelled)
637+
})
638+
}

utils/proc/find/find.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package find
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"regexp"
7+
8+
"github.com/ARM-software/golang-utils/utils/commonerrors"
9+
"github.com/ARM-software/golang-utils/utils/proc"
10+
)
11+
12+
const numWorkers = 10
13+
14+
// FindProcessByRegex will search for the processes that match a specific regex
15+
func FindProcessByRegex(ctx context.Context, re *regexp.Regexp) (processes []proc.IProcess, err error) {
16+
if re == nil {
17+
err = commonerrors.UndefinedVariable("regex to search")
18+
return
19+
}
20+
return findProcessByRegex(ctx, re)
21+
}
22+
23+
// FindProcessByName will search for the processes that match a specific name
24+
func FindProcessByName(ctx context.Context, name string) (processes []proc.IProcess, err error) {
25+
return FindProcessByRegex(ctx, regexp.MustCompile(fmt.Sprintf(".*%v.*", regexp.QuoteMeta(name))))
26+
}

0 commit comments

Comments
 (0)