Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions Sources/AsyncChannels/AsyncBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import OrderedCollections
/// sut.send(3)
/// sut.finish()
/// ```
public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Sendable {
public typealias Element = Element
public typealias AsyncIterator = Iterator

Expand Down Expand Up @@ -157,27 +157,7 @@ public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)

return await withTaskCancellationHandler { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
} operation: {
return await withTaskCancellationHandler {
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
Expand Down Expand Up @@ -218,6 +198,26 @@ public final class AsyncBufferedChannel<Element>: AsyncSequence, Sendable {
onSuspend?()
}
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
}
}

Expand Down
42 changes: 21 additions & 21 deletions Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,7 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)

return try await withTaskCancellationHandler { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
} operation: {
return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
Expand Down Expand Up @@ -245,6 +225,26 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
onSuspend?()
}
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
if awaitings.isEmpty {
state = .idle
} else {
state = .awaiting(awaitings)
}
return awaiting
default:
return nil
}
}

awaiting?.continuation?.resume(returning: nil)
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncSubjects/AsyncPassthroughSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/// passthrough.send(2)
/// passthrough.send(.finished)
/// ```
public final class AsyncPassthroughSubject<Element>: AsyncSubject {
public final class AsyncPassthroughSubject<Element: Sendable>: AsyncSubject {
public typealias Element = Element
public typealias Failure = Never
public typealias AsyncIterator = Iterator
Expand Down
6 changes: 3 additions & 3 deletions Sources/Combiners/Merge/MergeStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ struct MergeStateMachine<Element>: Sendable {

func next() async -> RegulatedElement<Element> {
await withTaskCancellationHandler {
self.unsuspendAndClearOnCancel()
} operation: {
self.requestNextRegulatedElements()

let regulatedElement = await withUnsafeContinuation { (continuation: UnsafeContinuation<RegulatedElement<Element>, Never>) in
Expand Down Expand Up @@ -240,11 +238,13 @@ struct MergeStateMachine<Element>: Sendable {
}
}

if case .termination = regulatedElement, case .element(.failure) = regulatedElement {
if case .element(.failure) = regulatedElement {
self.task.cancel()
}

return regulatedElement
} onCancel: {
self.unsuspendAndClearOnCancel()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,7 @@ where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Eleme
let shouldReturnNil = self.isTerminated.withCriticalRegion { $0 }
guard !shouldReturnNil else { return nil }

return try await withTaskCancellationHandler { [isTerminated, othersTask] in
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
} operation: { [othersTask, othersState, onBaseElement] in
return try await withTaskCancellationHandler { [othersTask, othersState, onBaseElement] in
do {
while true {
guard let baseElement = try await self.base.next() else {
Expand Down Expand Up @@ -219,6 +214,11 @@ where Other1: Sendable, Other2: Sendable, Other1.Element: Sendable, Other2.Eleme
othersTask?.cancel()
throw error
}
} onCancel: { [isTerminated, othersTask] in
isTerminated.withCriticalRegion { isTerminated in
isTerminated = true
}
othersTask?.cancel()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ where Other: Sendable, Other.Element: Sendable {
public mutating func next() async rethrows -> Element? {
guard !self.isTerminated else { return nil }

return try await withTaskCancellationHandler { [otherTask] in
otherTask?.cancel()
} operation: { [otherTask, otherState, onBaseElement] in
return try await withTaskCancellationHandler { [otherTask, otherState, onBaseElement] in
do {
while true {
guard let baseElement = try await self.base.next() else {
Expand Down Expand Up @@ -157,6 +155,8 @@ where Other: Sendable, Other.Element: Sendable {
otherTask?.cancel()
throw error
}
} onCancel: { [otherTask] in
otherTask?.cancel()
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/Combiners/Zip/Zip2Runtime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,6 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:

func next() async rethrows -> (Base1.Element, Base2.Element)? {
try await withTaskCancellationHandler {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
} operation: {
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result<Base1.Element, Error>, Result<Base2.Element, Error>)?, Never>) in
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
Expand All @@ -173,6 +167,12 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
self.handle(demandIsFulfilledOutput: output)

return try (results.0._rethrowGet(), results.1._rethrowGet())
} onCancel: {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
}
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/Combiners/Zip/Zip3Runtime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:

func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element)? {
try await withTaskCancellationHandler {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
} operation: {
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result<Base1.Element, Error>, Result<Base2.Element, Error>, Result<Base3.Element, Error>)?, Never>) in
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
Expand All @@ -211,6 +205,12 @@ where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element:
self.handle(demandIsFulfilledOutput: output)

return try (results.0._rethrowGet(), results.1._rethrowGet(), results.2._rethrowGet())
} onCancel: {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
}
}

Expand Down
12 changes: 6 additions & 6 deletions Sources/Combiners/Zip/ZipRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ where Base: Sendable, Base.Element: Sendable {

func next() async rethrows -> [Base.Element]? {
try await withTaskCancellationHandler {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
} operation: {
let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<[Int: Result<Base.Element, Error>]?, Never>) in
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.newDemandFromConsumer(suspendedDemand: continuation)
Expand All @@ -145,6 +139,12 @@ where Base: Sendable, Base.Element: Sendable {
self.handle(demandIsFulfilledOutput: output)

return try results.sorted { $0.key < $1.key }.map { try $0.value._rethrowGet() }
} onCancel: {
let output = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.rootTaskIsCancelled()
}

self.handle(rootTaskIsCancelledOutput: output)
}
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/Creators/AsyncTimerSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ public struct AsyncTimerSequence: AsyncSequence {
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler { [task] in
task.cancel()
} operation: {
await withTaskCancellationHandler {
guard !Task.isCancelled else { return nil }
return await self.iterator.next()
} onCancel: { [task] in
task.cancel()
}
}
}
Expand Down
11 changes: 3 additions & 8 deletions Sources/Operators/AsyncMulticastSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ where Base.Element == Subject.Element, Subject.Failure == Error, Base.AsyncItera

/// Allow the `AsyncIterator` to produce elements.
public func connect() {
self.isConnected.apply(criticalState: true)
self.connectedGate.send(())
}

Expand Down Expand Up @@ -157,14 +158,8 @@ where Base.Element == Subject.Element, Subject.Failure == Error, Base.AsyncItera
public mutating func next() async rethrows -> Element? {
guard !Task.isCancelled else { return nil }

let shouldWaitForGate = self.isConnected.withCriticalRegion { isConnected -> Bool in
if !isConnected {
isConnected = true
return true
}
return false
}
if shouldWaitForGate {
let isConnected = self.isConnected.withCriticalRegion { $0 }
if !isConnected {
await self.connectedGateIterator.next()
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/Operators/AsyncPrependSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public struct AsyncPrependSequence<Base: AsyncSequence>: AsyncSequence {

public struct Iterator: AsyncIteratorProtocol {
var base: Base.AsyncIterator
var prependElement: () async throws -> Element
var prependElement: @Sendable () -> Element
var hasBeenDelivered = false

public init(
base: Base.AsyncIterator,
prependElement: @escaping () async throws -> Element
prependElement: @Sendable @escaping () -> Element
) {
self.base = base
self.prependElement = prependElement
Expand All @@ -70,7 +70,7 @@ public struct AsyncPrependSequence<Base: AsyncSequence>: AsyncSequence {

if !self.hasBeenDelivered {
self.hasBeenDelivered = true
return try await prependElement()
return prependElement()
}

return try await self.base.next()
Expand Down
12 changes: 6 additions & 6 deletions Sources/Operators/AsyncSwitchToLatestSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,7 @@ where Base.Element: AsyncSequence, Base: Sendable, Base.Element.Element: Sendabl
guard !Task.isCancelled else { return nil }
self.startBase()

return try await withTaskCancellationHandler { [baseTask, state] in
baseTask?.cancel()
state.withCriticalRegion {
$0.childTask?.cancel()
}
} operation: {
return try await withTaskCancellationHandler {
while true {
let childTask = await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Task<ChildValue?, Never>?, Never>) in
let decision = state.withCriticalRegion { state -> NextDecision in
Expand Down Expand Up @@ -303,6 +298,11 @@ where Base.Element: AsyncSequence, Base: Sendable, Base.Element.Element: Sendabl
return try element._rethrowGet()
}
}
} onCancel: { [baseTask, state] in
baseTask?.cancel()
state.withCriticalRegion {
$0.childTask?.cancel()
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions Sources/Supporting/Regulator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ final class Regulator<Base: AsyncSequence>: @unchecked Sendable {

func iterate() async {
await withTaskCancellationHandler {
self.unsuspendAndExitOnCancel()
} operation: {
var mutableBase = base.makeAsyncIterator()

do {
Expand Down Expand Up @@ -99,6 +97,8 @@ final class Regulator<Base: AsyncSequence>: @unchecked Sendable {
}
self.onNextRegulatedElement(.element(result: .failure(error)))
}
} onCancel: {
self.unsuspendAndExitOnCancel()
}
}

Expand Down
Loading