Skip to content

Comments

Implement queue locking mechanism in StreamingResponse to prevent race conditions during activity processing#189

Closed
cleemullins wants to merge 4 commits intomainfrom
users/cleemullins/streamingResponse
Closed

Implement queue locking mechanism in StreamingResponse to prevent race conditions during activity processing#189
cleemullins wants to merge 4 commits intomainfrom
users/cleemullins/streamingResponse

Conversation

@cleemullins
Copy link
Collaborator

Fixes #188

Fixed Concurrency Errors in streaming_response.py

I've identified and fixed several concurrency issues in the streaming response implementation:

Issues Found and Fixed

1. Added asyncio.Lock for queue synchronization

  • Added self._queue_lock in __init__ to protect shared state access
  • This lock prevents race conditions when multiple async contexts access the queue simultaneously

2. Fixed race condition in _chunk_queued flag

  • Problem: The flag was reset at the beginning of create_activity(), which could cause race conditions
  • Solution: Moved self._chunk_queued = False to execute after activity creation in all code paths
  • This ensures the flag is properly reset before returning, preventing lost updates

3. Protected _queue_sync task management

  • Problem: Checking and starting the drain task without synchronization could lead to multiple drain tasks running
  • Solution:
    • Changed _queue_activity() to use a lock when checking and starting the drain task
    • Added a check for done() status to handle completed tasks correctly
    • This prevents multiple drain tasks from running simultaneously

4. Thread-safe queue operations

  • Problem: self._queue.pop(0) could be accessed by multiple async contexts
  • Solution:
    • Protected queue access with the lock in _drain_queue()
    • Added a double-check pattern to safely handle empty queue scenarios
    • Ensures the _queue_sync cleanup happens under lock protection

Key Changes Made

Location Change
Line 54 Added self._queue_lock = asyncio.Lock()
Lines 273-318 Refactored _queue_next_chunk() to reset _chunk_queued after activity creation
Lines 320-332 Refactored _queue_activity() to use lock-protected task startup
Lines 334-362 Protected queue access in _drain_queue() with lock

Impact

These changes eliminate race conditions that could occur when:

  • Multiple async contexts try to queue activities simultaneously
  • The drain task is checked and started from different contexts
  • Queue items are being added while being consumed
  • The _chunk_queued flag is being read/written from different contexts

The implementation is now thread-safe for async operations and prevents potential data corruption or lost updates.

@cleemullins cleemullins requested a review from a team as a code owner October 20, 2025 22:57
Copilot AI review requested due to automatic review settings October 20, 2025 22:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a queue locking mechanism to address race conditions in the StreamingResponse class's activity processing. The changes introduce an asyncio.Lock to protect shared state during concurrent access and refactor queue management logic to ensure thread-safe operations.

Key Changes:

  • Added asyncio.Lock to synchronize access to the activity queue and related state
  • Refactored _chunk_queued flag reset logic to occur after activity creation rather than at the beginning
  • Modified queue drain task management to use lock-protected startup and completion checks

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 322 to 328
async def start_drain_if_needed():
async with self._queue_lock:
if not self._queue_sync or self._queue_sync.done():
self._queue_sync = asyncio.create_task(self._drain_queue())

# Schedule the coroutine to run
asyncio.create_task(start_drain_if_needed())
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a task to start another task introduces unnecessary complexity and potential timing issues. The inner async function start_drain_if_needed() creates a task that may not complete before _queue_activity() returns, potentially allowing race conditions if _queue_activity() is called again immediately. Since _queue_activity() is not async, consider either making it async and awaiting the lock directly, or using a synchronous lock approach with asyncio.create_task(self._drain_queue()) protected by a different synchronization mechanism.

Copilot uses AI. Check for mistakes.
Comment on lines 306 to 307
self._chunk_queued = False
return None
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _chunk_queued flag is being modified without lock protection. Since this flag is accessed from multiple async contexts (as mentioned in the PR description), resetting it here and at line 309 should be protected by self._queue_lock to prevent race conditions where multiple threads might read stale values or encounter lost updates.

Copilot uses AI. Check for mistakes.
@@ -274,7 +275,6 @@ def _queue_next_chunk(self) -> None:
self._chunk_queued = True
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting _chunk_queued = True occurs outside any lock protection. This creates a race condition where multiple callers of _queue_next_chunk() could simultaneously set the flag and queue activities. The flag assignment should be protected by self._queue_lock to ensure atomic read-modify-write operations.

Copilot uses AI. Check for mistakes.
@@ -316,8 +318,14 @@ def _queue_activity(self, factory: Callable[[], Activity]) -> None:
self._queue.append(factory)
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue append operation is not protected by the lock, but the queue access in _drain_queue() is protected. This creates a race condition where items could be added to self._queue while _drain_queue() is checking if the queue is empty or popping items. Move this append operation inside the lock in start_drain_if_needed() or add separate lock protection here.

Copilot uses AI. Check for mistakes.
Comment on lines 327 to 338
logger.debug(f"Draining queue with {len(self._queue)} activities.")
while self._queue:
factory = self._queue.pop(0)
# Use lock to safely access the queue
async with self._queue_lock:
Copy link

Copilot AI Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Accessing len(self._queue) for logging occurs outside the lock protection. While this is just for debugging, it could produce misleading log output if the queue size changes between the log call and the actual processing. Consider moving this inside the first lock acquisition or removing the count from the log message.

Suggested change
logger.debug(f"Draining queue with {len(self._queue)} activities.")
while self._queue:
factory = self._queue.pop(0)
# Use lock to safely access the queue
async with self._queue_lock:
while self._queue:
# Use lock to safely access the queue
async with self._queue_lock:
logger.debug(f"Draining queue with {len(self._queue)} activities.")

Copilot uses AI. Check for mistakes.
@cleemullins cleemullins self-assigned this Oct 21, 2025
@cleemullins
Copy link
Collaborator Author

After fighting with the PR for a day, I'm no longer convinced this is the right direction. We should hold this until the next release, and re-evaluate.

@cleemullins cleemullins reopened this Oct 21, 2025
@cleemullins cleemullins marked this pull request as draft October 21, 2025 20:42
…nt and improve concurrency handling

Note: I'm not convinced this is the right direction. I've converted the PR to draft, and we should re-evaluate.
…_text_chunk and set_citations methods for improved performance
Copilot AI review requested due to automatic review settings October 21, 2025 20:44
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@cleemullins
Copy link
Collaborator Author

This PR shouldn't be merged. Closing. Will re-do to fix the open issue in the next release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Race Condition: streaming_response._queue_activity

1 participant