-
Notifications
You must be signed in to change notification settings - Fork 579
Add parallel reading support to S3SessionManager.list_messages() #1186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add parallel reading support to S3SessionManager.list_messages() #1186
Conversation
| boto_session: Optional[boto3.Session] = None, | ||
| boto_client_config: Optional[BotocoreConfig] = None, | ||
| region_name: Optional[str] = None, | ||
| max_parallel_reads: int = 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we shouldn't use magic number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I will change it do DEFAULT_SEQUENTIAL_READ_THREAD_COUNT
f498196 to
3ded56f
Compare
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
pgrayy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment about exception handling.
- Add max_parallel_reads parameter to S3SessionManager.__init__() - Implement parallel S3 reads using ThreadPoolExecutor in list_messages() - Support both instance-level and per-call max_parallel_reads configuration - Add comprehensive tests for parallel reading functionality - Maintain backward compatibility (default max_parallel_reads=1)
…r.list_messages()]
69ebb61
b47ece1 to
69ebb61
Compare
| messages.append(SessionMessage.from_dict(message_data)) | ||
| return messages | ||
|
|
||
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a pool of threads, can we instead create one thread and use an async function to call the read_s3_object concurrently?
We had a similar approach here where an async function was created to make the s3 call.
This ended up getting reverted because of a bug when calling asyncio.run on the main thread. Sometimes the agent is run as part of another asyncio.run call and scheduled on the asyncio event_loop already, so you cannot make nested asyncio.run calls on the same thread.
To fix that issue, you can just create a new thread, and then schedule the asyncio calls on that new thread:
from .._async import run_async
...
async def _load_messages_concurrently(self, messages_dir: str, message_files: list[str]) -> list[SessionMessage]:
"""Load multiple message files concurrently using async."""
...
return messages
...
def list_messages(
self, session_id: str, agent_id: str, limit: Optional[int] = None, offset: int = 0, **kwargs: Any
) -> List[SessionMessage]:
...
return run_async(self._load_messages_concurrently(messages_dir, message_files))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this detailed context. @zastrowm also mentioned ir. I think there are several reasons ThreadPoolExecutor makes more sense here:
-
Better parallelism: The current approach runs multiple synchronous S3 reads across different threads simultaneously, which is more efficient than single-thread async for this I/O-bound operation.
-
No event loop conflicts: The nested event loop issue only applies when using asyncio.run() with async S3 calls. We're using standard threading here, so no risk of that bug.
-
Consistent with design: Our session manager avoids async throughout to keep it simple and avoid async overhead, so we should maintain that consistency.
-
Universal compatibility: ThreadPoolExecutor works safely in both async and sync contexts without event loop complications.
Description
This PR adds parallel reading support to
S3SessionManager.list_messages()to improve performance when retrieving multiple messages from S3. The implementation usesThreadPoolExecutorto fetch messages concurrently, significantly reducing latency for sessions with many messages.Key Changes:
max_parallel_readsparameter toS3SessionManager.__init__()(defaults to 1 for backward compatibility)ThreadPoolExecutorinlist_messages()max_parallel_readsconfigurationBackward Compatibility:
max_parallel_reads=1ensures sequential behavior (backward compatible)Related Issues
#1164
Documentation PR
Type of Change
New feature
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepareTesting Details:
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.