Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Sources/AsyncSubjects/AsyncCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,19 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
func handleNewConsumer() -> (iterator: AsyncBufferedChannel<Element>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncBufferedChannel<Element>()

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
let terminalState = self.state.withCriticalRegion { state -> Termination? in
state.terminalState
}

if let terminalState = terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
return state.ids
Comment on lines 103 to 107

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid resuming consumers while holding subject lock

Calling asyncBufferedChannel.send(state.current) while inside state.withCriticalRegion can synchronously resume an awaiting consumer. If that consumer’s code touches the subject again (e.g., reads value or calls send), it will try to re-enter the same ManagedCriticalState that uses a non‑reentrant os_unfair_lock, leading to a deadlock. This only occurs when a consumer awaits the channel and then re-enters the subject on receipt, but that pattern is common in for‑await loops.

Useful? React with 👍 / 👎.

Copy link
Contributor

@twittemb twittemb Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex couldn't complete this request. Try again later.

}

Expand Down
7 changes: 3 additions & 4 deletions Sources/AsyncSubjects/AsyncThrowingCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
) -> (iterator: AsyncThrowingBufferedChannel<Element, Error>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncThrowingBufferedChannel<Element, Error>()

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
let terminalState = self.state.withCriticalRegion { state -> Termination? in
state.terminalState
}

if let terminalState = terminalState {
Expand All @@ -111,11 +111,10 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
return state.ids
Comment on lines 114 to 118

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid resuming consumers while holding subject lock

Calling asyncBufferedChannel.send(state.current) while inside state.withCriticalRegion can synchronously resume an awaiting consumer. If that consumer’s code touches the subject again (e.g., reads value or calls send), it will try to re-enter the same ManagedCriticalState that uses a non‑reentrant os_unfair_lock, leading to a deadlock. This only occurs when a consumer awaits the channel and then re-enters the subject on receipt, but that pattern is common in for‑await loops.

Useful? React with 👍 / 👎.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

Expand Down