Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/20250731140445.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: Add support for gracefully killing child processes
1 change: 1 addition & 0 deletions changes/20250804122854.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[collection]` added collection functional operations `Map`, `Filter`, `Reject`, `Reduce`
1 change: 1 addition & 0 deletions changes/20250804122923.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[parallelisation]` added parallelised collection functional operations `Map`, `Filter`, `Reject`
1 change: 1 addition & 0 deletions changes/20250804130842.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `[proc]` added a function to find processes based on name
2 changes: 1 addition & 1 deletion utils/collection/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *Conditions) Xor() bool {
return Xor(*c...)
}

// OneHot performs an `OnHot` operation on all conditions
// OneHot performs an `OneHot` operation on all conditions
func (c *Conditions) OneHot() bool {
if c == nil {
return false
Expand Down
38 changes: 38 additions & 0 deletions utils/collection/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ func AnyFunc[S ~[]E, E any](s S, f func(E) bool) bool {
return conditions.Any()
}

// Filter returns a new slice that contains elements from the input slice which return true when they’re passed as a parameter to the provided filtering function f.
func Filter[S ~[]E, E any](s S, f func(E) bool) (result S) {
result = make(S, 0, len(s))

for i := range s {
if f(s[i]) {
result = append(result, s[i])
}
}

return result
}

// Map creates a new slice and populates it with the results of calling the provided function on every element in input slice.
func Map[T1 any, T2 any](s []T1, f func(T1) T2) (result []T2) {
result = make([]T2, len(s))

for i := range s {
result[i] = f(s[i])
}

return result
}

// Reject is the opposite of Filter and returns the elements of collection for which the filtering function f returns false.
func Reject[S ~[]E, E any](s S, f func(E) bool) S {
return Filter(s, func(e E) bool { return !f(e) })
}

// Reduce runs a reducer function f over all elements in the array, in ascending-index order, and accumulates them into a single value.
func Reduce[T1, T2 any](s []T1, accumulator T2, f func(T2, T1) T2) (result T2) {
result = accumulator
for i := range s {
result = f(result, s[i])
}
return
}

// AnyEmpty returns whether there is one entry in the slice which is empty.
// If strict, then whitespaces are considered as empty strings
func AnyEmpty(strict bool, slice []string) bool {
Expand Down
43 changes: 43 additions & 0 deletions utils/collection/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package collection

import (
"fmt"
"strconv"
"testing"

"github.com/go-faker/faker/v4"
Expand Down Expand Up @@ -110,3 +112,44 @@ func TestAllNotEmpty(t *testing.T) {
assert.False(t, AllNotEmpty(false, []string{faker.Username(), "", faker.Name(), "", faker.Sentence()}))
assert.True(t, AllNotEmpty(false, []string{faker.Username(), faker.Name(), faker.Sentence()}))
}

func TestFilterReject(t *testing.T) {
nums := []int{1, 2, 3, 4, 5}
assert.ElementsMatch(t, []int{2, 4}, Filter(nums, func(n int) bool {
return n%2 == 0
}))
assert.ElementsMatch(t, []int{1, 3, 5}, Reject(nums, func(n int) bool {
return n%2 == 0
}))
assert.ElementsMatch(t, []int{4, 5}, Filter(nums, func(n int) bool {
return n > 3
}))
assert.ElementsMatch(t, []int{1, 2, 3}, Reject(nums, func(n int) bool {
return n > 3
}))
assert.ElementsMatch(t, []string{"foo", "bar"}, Filter([]string{"", "foo", "", "bar", ""}, func(x string) bool {
return len(x) > 0
}))
assert.ElementsMatch(t, []string{"", "", ""}, Reject([]string{"", "foo", "", "bar", ""}, func(x string) bool {
return len(x) > 0
}))
}

func TestMap(t *testing.T) {
mapped := Map([]int{1, 2}, func(i int) string {
return fmt.Sprintf("Hello world %v", i)
})
assert.ElementsMatch(t, []string{"Hello world 1", "Hello world 2"}, mapped)
mapped = Map([]int64{1, 2, 3, 4}, func(x int64) string {
return strconv.FormatInt(x, 10)
})
assert.ElementsMatch(t, []string{"1", "2", "3", "4"}, mapped)
}

func TestReduce(t *testing.T) {
nums := []int{1, 2, 3, 4, 5}
sumOfNums := Reduce(nums, 0, func(acc, n int) int {
return acc + n
})
assert.Equal(t, sumOfNums, 15)
}
52 changes: 46 additions & 6 deletions utils/parallelisation/parallelisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ func DetermineContextError(ctx context.Context) error {
}

type result struct {
Item interface{}
Item any
err error
}

// Parallelise parallelises an action over as many goroutines as specified by the argList and retrieves all the results when all the goroutines are done.
func Parallelise(argList interface{}, action func(arg interface{}) (interface{}, error), resultType reflect.Type) (results interface{}, err error) {
// To control the number of goroutines spawned, prefer WorkerPool
func Parallelise(argList any, action func(arg any) (any, error), resultType reflect.Type) (results any, err error) {
keepReturn := resultType != nil
argListValue := reflect.ValueOf(argList)
length := argListValue.Len()
channel := make(chan result, length)
for i := 0; i < length; i++ {
go func(args reflect.Value, actionFunc func(arg interface{}) (interface{}, error)) {
go func(args reflect.Value, actionFunc func(arg any) (any, error)) {
var r result
r.Item, r.err = func(v reflect.Value) (interface{}, error) {
r.Item, r.err = func(v reflect.Value) (any, error) {
return actionFunc(v.Interface())
}(args)
channel <- r
Expand Down Expand Up @@ -306,13 +307,19 @@ func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int,
for range numWorkers {
g.Go(func() error { return newWorker(gCtx, f, jobsChan, resultsChan) })
}
for _, job := range jobs {
jobsChan <- job
for i := range jobs {
if DetermineContextError(ctx) != nil {
break
}
jobsChan <- jobs[i]
}

close(jobsChan)
err = g.Wait()
close(resultsChan)
if err == nil {
err = DetermineContextError(ctx)
}
if err != nil {
return
}
Expand All @@ -323,3 +330,36 @@ func WorkerPool[InputType, ResultType any](ctx context.Context, numWorkers int,

return
}

// Filter is similar to collection.Filter but uses parallelisation.
func Filter[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) (result []T, err error) {
result, err = WorkerPool[T, T](ctx, numWorkers, s, func(fCtx context.Context, item T) (r T, ok bool, fErr error) {
fErr = DetermineContextError(fCtx)
if fErr != nil {
return
}
ok = f(item)
r = item
return
})
return
}

// Map is similar to collection.Map but uses parallelisation.
func Map[T1 any, T2 any](ctx context.Context, numWorkers int, s []T1, f func(T1) T2) (result []T2, err error) {
result, err = WorkerPool[T1, T2](ctx, numWorkers, s, func(fCtx context.Context, item T1) (r T2, ok bool, fErr error) {
fErr = DetermineContextError(fCtx)
if fErr != nil {
return
}
r = f(item)
ok = true
return
})
return
}

// Reject is the opposite of Filter and returns the elements of collection for which the filtering function f returns false.
func Reject[T any](ctx context.Context, numWorkers int, s []T, f func(T) bool) ([]T, error) {
return Filter[T](ctx, numWorkers, s, func(e T) bool { return !f(e) })
}
74 changes: 74 additions & 0 deletions utils/parallelisation/parallelisation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"math/rand"
"reflect"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -378,6 +379,7 @@ func runActionWithParallelCheckHappy(t *testing.T, ctx context.Context) {
}
err := RunActionWithParallelCheck(ctx, action, checkAction, 10*time.Millisecond)
require.NoError(t, err)
assert.Equal(t, int32(15), counter.Load())
}

func runActionWithParallelCheckFail(t *testing.T, ctx context.Context) {
Expand All @@ -394,6 +396,7 @@ func runActionWithParallelCheckFail(t *testing.T, ctx context.Context) {
err := RunActionWithParallelCheck(ctx, action, checkAction, 10*time.Millisecond)
require.Error(t, err)
errortest.AssertError(t, err, commonerrors.ErrCancelled)
assert.Equal(t, int32(1), counter.Load())
}

func runActionWithParallelCheckFailAtRandom(t *testing.T, ctx context.Context) {
Expand All @@ -410,9 +413,11 @@ func runActionWithParallelCheckFailAtRandom(t *testing.T, ctx context.Context) {
err := RunActionWithParallelCheck(ctx, action, checkAction, 10*time.Millisecond)
require.Error(t, err)
errortest.AssertError(t, err, commonerrors.ErrCancelled)
assert.GreaterOrEqual(t, counter.Load(), int32(1))
}

func TestWaitUntil(t *testing.T) {
defer goleak.VerifyNone(t)
verifiedCondition := func(ctx context.Context) (bool, error) {
SleepWithContext(ctx, 50*time.Millisecond)
return true, nil
Expand Down Expand Up @@ -465,6 +470,7 @@ func TestWaitUntil(t *testing.T) {
}

func TestWorkerPool(t *testing.T) {
defer goleak.VerifyNone(t)
for _, test := range []struct {
name string
numWorkers int
Expand Down Expand Up @@ -562,3 +568,71 @@ func TestWorkerPool(t *testing.T) {
errortest.AssertError(t, err, commonerrors.ErrCancelled)
})
}

func TestFilterReject(t *testing.T) {
defer goleak.VerifyNone(t)
nums := []int{1, 2, 3, 4, 5}
ctx := context.Background()
results, err := Filter(ctx, 3, nums, func(n int) bool {
return n%2 == 0
})
require.NoError(t, err)
assert.ElementsMatch(t, []int{2, 4}, results)
results, err = Reject(ctx, 3, nums, func(n int) bool {
return n%2 == 0
})
require.NoError(t, err)
assert.ElementsMatch(t, []int{1, 3, 5}, results)
results, err = Filter(ctx, 3, nums, func(n int) bool {
return n > 3
})
require.NoError(t, err)
assert.ElementsMatch(t, []int{4, 5}, results)
results, err = Reject(ctx, 3, nums, func(n int) bool {
return n > 3
})
require.NoError(t, err)
assert.ElementsMatch(t, []int{1, 2, 3}, results)
results2, err := Filter(ctx, 3, []string{"", "foo", "", "bar", ""}, func(x string) bool {
return len(x) > 0
})

require.NoError(t, err)
assert.ElementsMatch(t, []string{"foo", "bar"}, results2)
results3, err := Reject(ctx, 3, []string{"", "foo", "", "bar", ""}, func(x string) bool {
return len(x) > 0
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"", "", ""}, results3)
t.Run("cancelled context", func(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()
_, err := Filter(cancelledCtx, 3, nums, func(n int) bool {
return n%2 == 0
})
errortest.AssertError(t, err, commonerrors.ErrCancelled)
})
}

func TestMap(t *testing.T) {
defer goleak.VerifyNone(t)
ctx := context.Background()
mapped, err := Map(ctx, 3, []int{1, 2}, func(i int) string {
return fmt.Sprintf("Hello world %v", i)
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"Hello world 1", "Hello world 2"}, mapped)
mapped, err = Map(ctx, 3, []int64{1, 2, 3, 4}, func(x int64) string {
return strconv.FormatInt(x, 10)
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"1", "2", "3", "4"}, mapped)
t.Run("cancelled context", func(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(context.Background())
cancel()
_, err := Map(cancelledCtx, 3, []int{1, 2}, func(i int) string {
return fmt.Sprintf("Hello world %v", i)
})
errortest.AssertError(t, err, commonerrors.ErrCancelled)
})
}
26 changes: 26 additions & 0 deletions utils/proc/find/find.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package find

import (
"context"
"fmt"
"regexp"

"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/proc"
)

const numWorkers = 10

// FindProcessByRegex will search for the processes that match a specific regex
func FindProcessByRegex(ctx context.Context, re *regexp.Regexp) (processes []proc.IProcess, err error) {
if re == nil {
err = commonerrors.UndefinedVariable("regex to search")
return
}
return findProcessByRegex(ctx, re)
}

// FindProcessByName will search for the processes that match a specific name
func FindProcessByName(ctx context.Context, name string) (processes []proc.IProcess, err error) {
return FindProcessByRegex(ctx, regexp.MustCompile(fmt.Sprintf(".*%v.*", regexp.QuoteMeta(name))))
}
11 changes: 3 additions & 8 deletions utils/proc/find/find_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func FindProcessByRegexForFS(ctx context.Context, fs filesystem.FS, re *regexp.R
return
}

processes, err = parallelisation.WorkerPool(ctx, 10, procEntries, func(ctx context.Context, entry string) (p proc.IProcess, matches bool, err error) {
processes, err = parallelisation.WorkerPool(ctx, numWorkers, procEntries, func(ctx context.Context, entry string) (p proc.IProcess, matches bool, err error) {
matches, err = checkProcessMatch(ctx, fs, re, entry)
if err != nil || !matches {
return
Expand All @@ -100,12 +100,7 @@ func FindProcessByRegexForFS(ctx context.Context, fs filesystem.FS, re *regexp.R
return
}

// FindProcessByRegex will search for the processes that match a specific regex
func FindProcessByRegex(ctx context.Context, re *regexp.Regexp) (processes []proc.IProcess, err error) {
// findProcessByRegex will search for the processes that match a specific regex
func findProcessByRegex(ctx context.Context, re *regexp.Regexp) (processes []proc.IProcess, err error) {
return FindProcessByRegexForFS(ctx, filesystem.GetGlobalFileSystem(), re)
}

// FindProcessByName will search for the processes that match a specific name
func FindProcessByName(ctx context.Context, name string) (processes []proc.IProcess, err error) {
return FindProcessByRegex(ctx, regexp.MustCompile(fmt.Sprintf(".*%v.*", regexp.QuoteMeta(name))))
}
Loading
Loading