Skip to content

Conversation

@CrysisDeu
Copy link

@CrysisDeu CrysisDeu commented Nov 16, 2025

Description

This PR adds parallel reading support to S3SessionManager.list_messages() to improve performance when retrieving multiple messages from S3. The implementation uses ThreadPoolExecutor to fetch messages concurrently, significantly reducing latency for sessions with many messages.

Key Changes:

  • Added max_parallel_reads parameter to S3SessionManager.__init__() (defaults to 1 for backward compatibility)
  • Implemented parallel S3 reads using ThreadPoolExecutor in list_messages()
  • Support for both instance-level and per-call max_parallel_reads configuration
  • Maintains message order regardless of completion order
  • Handles individual message read failures gracefully without stopping the entire operation
  • Added comprehensive test coverage including edge cases and mock-based verification

Backward Compatibility:

  • Default max_parallel_reads=1 ensures sequential behavior (backward compatible)
  • Existing code continues to work without modification
  • No breaking changes to the API

Related Issues

#1164

Documentation PR

Type of Change

New feature

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Testing Details:

  • Added 9 new test cases covering:
    • Default behavior (sequential reads)
    • Instance-level configuration
    • Per-call override functionality
    • Edge cases (few messages, many messages, pagination)
    • Mock-based verification of ThreadPoolExecutor configuration
  • All existing tests continue to pass (1520 passed, 4 skipped)
  • Verified message ordering is preserved
  • Tested error handling for individual message read failures
  • No new warnings introduced (existing warnings are pre-existing deprecation warnings)

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

boto_session: Optional[boto3.Session] = None,
boto_client_config: Optional[BotocoreConfig] = None,
region_name: Optional[str] = None,
max_parallel_reads: int = 1,
Copy link
Contributor

@JackYPCOnline JackYPCOnline Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we shouldn't use magic number

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I will change it do DEFAULT_SEQUENTIAL_READ_THREAD_COUNT

JackYPCOnline
JackYPCOnline previously approved these changes Dec 19, 2025
@codecov
Copy link

codecov bot commented Dec 19, 2025

Codecov Report

❌ Patch coverage is 85.71429% with 3 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/strands/session/s3_session_manager.py 85.71% 1 Missing and 2 partials ⚠️

📢 Thoughts on this report? Let us know!

zastrowm
zastrowm previously approved these changes Dec 22, 2025
Copy link
Member

@pgrayy pgrayy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment about exception handling.

CrysisDeu and others added 3 commits December 23, 2025 12:02
- Add max_parallel_reads parameter to S3SessionManager.__init__()
- Implement parallel S3 reads using ThreadPoolExecutor in list_messages()
- Support both instance-level and per-call max_parallel_reads configuration
- Add comprehensive tests for parallel reading functionality
- Maintain backward compatibility (default max_parallel_reads=1)
@JackYPCOnline JackYPCOnline dismissed stale reviews from zastrowm and themself via 69ebb61 December 23, 2025 20:30
@JackYPCOnline JackYPCOnline force-pushed the feature/s3-parallel-message-reads branch from b47ece1 to 69ebb61 Compare December 23, 2025 20:30
@github-actions github-actions bot removed the size/m label Dec 23, 2025
messages.append(SessionMessage.from_dict(message_data))
return messages

with ThreadPoolExecutor(max_workers=max_workers) as executor:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a pool of threads, can we instead create one thread and use an async function to call the read_s3_object concurrently?

We had a similar approach here where an async function was created to make the s3 call.

This ended up getting reverted because of a bug when calling asyncio.run on the main thread. Sometimes the agent is run as part of another asyncio.run call and scheduled on the asyncio event_loop already, so you cannot make nested asyncio.run calls on the same thread.

To fix that issue, you can just create a new thread, and then schedule the asyncio calls on that new thread:

from .._async import run_async

...

async def _load_messages_concurrently(self, messages_dir: str, message_files: list[str]) -> list[SessionMessage]:
        """Load multiple message files concurrently using async."""
        ...
        return messages

...
def list_messages(
        self, session_id: str, agent_id: str, limit: Optional[int] = None, offset: int = 0, **kwargs: Any
    ) -> List[SessionMessage]:
  ...
return run_async(self._load_messages_concurrently(messages_dir, message_files))

Copy link
Contributor

@JackYPCOnline JackYPCOnline Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this detailed context. @zastrowm also mentioned ir. I think there are several reasons ThreadPoolExecutor makes more sense here:

  1. Better parallelism: The current approach runs multiple synchronous S3 reads across different threads simultaneously, which is more efficient than single-thread async for this I/O-bound operation.

  2. No event loop conflicts: The nested event loop issue only applies when using asyncio.run() with async S3 calls. We're using standard threading here, so no risk of that bug.

  3. Consistent with design: Our session manager avoids async throughout to keep it simple and avoid async overhead, so we should maintain that consistency.

  4. Universal compatibility: ThreadPoolExecutor works safely in both async and sync contexts without event loop complications.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants