Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/20250828111444.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:bug: [parallelisation] Revert the behaviour change of close stores introduced in recent release
7 changes: 7 additions & 0 deletions utils/parallelisation/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (o *StoreOptions) MergeWithOptions(opt ...StoreOption) *StoreOptions {
return o.Merge(WithOptions(opt...))
}

func (o *StoreOptions) Apply(opt StoreOption) *StoreOptions {
if opt == nil {
return o
}
return opt(o)
}

func (o *StoreOptions) Overwrite(opts *StoreOptions) *StoreOptions {
return o.Default().Merge(opts)
}
Expand Down
28 changes: 20 additions & 8 deletions utils/parallelisation/onclose.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ func (s *CloserStore) Len() int {

// NewCloserStore returns a store of io.Closer object which will all be closed concurrently on Close(). The first error received will be returned
func NewCloserStore(stopOnFirstError bool) *CloserStore {
option := ExecuteAll
if stopOnFirstError {
option = StopOnFirstError
}
return NewCloserStoreWithOptions(option, Parallel, OnlyOnce, RetainAfterExecution)
return NewCloserStoreWithOptions(closeDefaultOptions(stopOnFirstError).Options()...)
}

// NewCloserOnceStore is similar to NewCloserStore but only close the closers once.
func NewCloserOnceStore(stopOnFirstError bool) *CloserStore {
return NewCloserStoreWithOptions(closeDefaultOptions(stopOnFirstError).Apply(OnlyOnce).Options()...)
}

// NewCloserStoreWithOptions returns a store of io.Closer object which will all be closed on Close(). The first error received if any will be returned
Expand Down Expand Up @@ -177,11 +178,22 @@ func NewCloseFunctionStoreStore(stopOnFirstError bool) *CloseFunctionStore {
// NewConcurrentCloseFunctionStore returns a store closing functions which will all be called concurrently on Close(). The first error received will be returned.
// Prefer using NewCloseFunctionStore where possible
func NewConcurrentCloseFunctionStore(stopOnFirstError bool) *CloseFunctionStore {
option := ExecuteAll
return NewCloseFunctionStore(closeDefaultOptions(stopOnFirstError).Options()...)
}

// NewConcurrentCloseOnceFunctionStore returns a store closing functions once on Close(). It is similar to NewConcurrentCloseFunctionStore otherwise.
func NewConcurrentCloseOnceFunctionStore(stopOnFirstError bool) *CloseFunctionStore {
return NewCloseFunctionStore(closeDefaultOptions(stopOnFirstError).Apply(OnlyOnce).Options()...)
}

func closeDefaultOptions(stopOnFirstError bool) *StoreOptions {
opts := WithOptions(Parallel, RetainAfterExecution)
if stopOnFirstError {
option = StopOnFirstError
opts = opts.Apply(StopOnFirstError)
} else {
opts = opts.Apply(ExecuteAll)
}
return NewCloseFunctionStore(option, Parallel, RetainAfterExecution, OnlyOnce)
return opts
}

// NewCloseOnceGroup is the same as NewCloseFunctionStore but ensures any closing functions are only executed once.
Expand Down
Loading