-
Notifications
You must be signed in to change notification settings - Fork 2.6k
fix(sessions): ensure database session rollback on commit failure #3766
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix(sessions): ensure database session rollback on commit failure #3766
Conversation
Summary of ChangesHello @sivakumar-77, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical stability issue in the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a critical issue by ensuring database sessions are rolled back on commit failures, preventing connection pool exhaustion. The introduction of the _commit_or_rollback helper is a clean solution, and the accompanying unit test is well-written and correctly validates the new behavior. However, the changes also introduce several redundant try...except blocks around the main logic in the service methods. These blocks duplicate the rollback-on-exception behavior already provided by SQLAlchemy's async with session context manager. I've left specific comments with suggestions to remove this redundant code to improve maintainability.
| try: | ||
| if session_id and await sql_session.get( | ||
| StorageSession, (app_name, user_id, session_id) | ||
| ): | ||
| raise AlreadyExistsError( | ||
| f"Session with id {session_id} already exists." | ||
| ) | ||
| # Fetch app and user states from storage | ||
| storage_app_state = await sql_session.get(StorageAppState, (app_name)) | ||
| storage_user_state = await sql_session.get( | ||
| StorageUserState, (app_name, user_id) | ||
| ) | ||
| # Fetch app and user states from storage | ||
| storage_app_state = await sql_session.get(StorageAppState, (app_name)) | ||
| storage_user_state = await sql_session.get( | ||
| StorageUserState, (app_name, user_id) | ||
| ) | ||
|
|
||
| # Create state tables if not exist | ||
| if not storage_app_state: | ||
| storage_app_state = StorageAppState(app_name=app_name, state={}) | ||
| sql_session.add(storage_app_state) | ||
| if not storage_user_state: | ||
| storage_user_state = StorageUserState( | ||
| app_name=app_name, user_id=user_id, state={} | ||
|
|
||
| # Create state tables if not exist | ||
| if not storage_app_state: | ||
| storage_app_state = StorageAppState(app_name=app_name, state={}) | ||
| sql_session.add(storage_app_state) | ||
| if not storage_user_state: | ||
| storage_user_state = StorageUserState( | ||
| app_name=app_name, user_id=user_id, state={} | ||
| ) | ||
| sql_session.add(storage_user_state) | ||
|
|
||
| # Extract state deltas | ||
| state_deltas = _session_util.extract_state_delta(state) | ||
| app_state_delta = state_deltas["app"] | ||
| user_state_delta = state_deltas["user"] | ||
| session_state = state_deltas["session"] | ||
|
|
||
| # Apply state delta | ||
| if app_state_delta: | ||
| storage_app_state.state = storage_app_state.state | app_state_delta | ||
| if user_state_delta: | ||
| storage_user_state.state = storage_user_state.state | user_state_delta | ||
|
|
||
| # Store the session | ||
| storage_session = StorageSession( | ||
| app_name=app_name, | ||
| user_id=user_id, | ||
| id=session_id, | ||
| state=session_state, | ||
| ) | ||
| sql_session.add(storage_user_state) | ||
|
|
||
| # Extract state deltas | ||
| state_deltas = _session_util.extract_state_delta(state) | ||
| app_state_delta = state_deltas["app"] | ||
| user_state_delta = state_deltas["user"] | ||
| session_state = state_deltas["session"] | ||
|
|
||
| # Apply state delta | ||
| if app_state_delta: | ||
| storage_app_state.state = storage_app_state.state | app_state_delta | ||
| if user_state_delta: | ||
| storage_user_state.state = storage_user_state.state | user_state_delta | ||
|
|
||
| # Store the session | ||
| storage_session = StorageSession( | ||
| app_name=app_name, | ||
| user_id=user_id, | ||
| id=session_id, | ||
| state=session_state, | ||
| ) | ||
| sql_session.add(storage_session) | ||
| await sql_session.commit() | ||
| sql_session.add(storage_session) | ||
| await self._commit_or_rollback(sql_session) | ||
|
|
||
| await sql_session.refresh(storage_session) | ||
| await sql_session.refresh(storage_session) | ||
|
|
||
| # Merge states for response | ||
| merged_state = _merge_state( | ||
| storage_app_state.state, storage_user_state.state, session_state | ||
| ) | ||
| session = storage_session.to_session(state=merged_state) | ||
| # Merge states for response | ||
| merged_state = _merge_state( | ||
| storage_app_state.state, storage_user_state.state, session_state | ||
| ) | ||
| session = storage_session.to_session(state=merged_state) | ||
| except Exception: | ||
| if sql_session.in_transaction(): | ||
| await sql_session.rollback() | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outer try...except block that manually rolls back the transaction is redundant. The async with self.database_session_factory() as sql_session: context manager already ensures that the transaction is rolled back if any exception occurs within the block. The new _commit_or_rollback helper correctly handles the specific case of a commit() failure. Relying on the async with context manager for automatic rollback simplifies the code and removes unnecessary boilerplate.
if session_id and await sql_session.get(
StorageSession, (app_name, user_id, session_id)
):
raise AlreadyExistsError(
f"Session with id {session_id} already exists."
)
# Fetch app and user states from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)
# Create state tables if not exist
if not storage_app_state:
storage_app_state = StorageAppState(app_name=app_name, state={})
sql_session.add(storage_app_state)
if not storage_user_state:
storage_user_state = StorageUserState(
app_name=app_name, user_id=user_id, state={}
)
sql_session.add(storage_user_state)
# Extract state deltas
state_deltas = _session_util.extract_state_delta(state)
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state = state_deltas["session"]
# Apply state delta
if app_state_delta:
storage_app_state.state = storage_app_state.state | app_state_delta
if user_state_delta:
storage_user_state.state = storage_user_state.state | user_state_delta
# Store the session
storage_session = StorageSession(
app_name=app_name,
user_id=user_id,
id=session_id,
state=session_state,
)
sql_session.add(storage_session)
await self._commit_or_rollback(sql_session)
await sql_session.refresh(storage_session)
# Merge states for response
merged_state = _merge_state(
storage_app_state.state, storage_user_state.state, session_state
)
session = storage_session.to_session(state=merged_state)| try: | ||
| storage_session = await sql_session.get( | ||
| StorageSession, (app_name, user_id, session_id) | ||
| ) | ||
| if storage_session is None: | ||
| return None | ||
|
|
||
| stmt = ( | ||
| select(StorageEvent) | ||
| .filter(StorageEvent.app_name == app_name) | ||
| .filter(StorageEvent.session_id == storage_session.id) | ||
| .filter(StorageEvent.user_id == user_id) | ||
| ) | ||
|
|
||
| if config and config.after_timestamp: | ||
| after_dt = datetime.fromtimestamp(config.after_timestamp) | ||
| stmt = stmt.filter(StorageEvent.timestamp >= after_dt) | ||
|
|
||
| stmt = stmt.order_by(StorageEvent.timestamp.desc()) | ||
|
|
||
| if config and config.num_recent_events: | ||
| stmt = stmt.limit(config.num_recent_events) | ||
|
|
||
| result = await sql_session.execute(stmt) | ||
| storage_events = result.scalars().all() | ||
|
|
||
| # Fetch states from storage | ||
| storage_app_state = await sql_session.get(StorageAppState, (app_name)) | ||
| storage_user_state = await sql_session.get( | ||
| StorageUserState, (app_name, user_id) | ||
| ) | ||
|
|
||
| app_state = storage_app_state.state if storage_app_state else {} | ||
| user_state = storage_user_state.state if storage_user_state else {} | ||
| session_state = storage_session.state | ||
|
|
||
| # Merge states | ||
| merged_state = _merge_state(app_state, user_state, session_state) | ||
|
|
||
| # Convert storage session to session | ||
| events = [e.to_event() for e in reversed(storage_events)] | ||
| session = storage_session.to_session(state=merged_state, events=events) | ||
| except Exception: | ||
| if sql_session.in_transaction(): | ||
| await sql_session.rollback() | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try...except block is redundant. Since get_session is a read-only operation and doesn't perform a commit, there's no need for this manual rollback logic. The async with context manager will correctly handle the transaction lifecycle, including rolling back (and thus releasing the connection) on error.
storage_session = await sql_session.get(
StorageSession, (app_name, user_id, session_id)
)
if storage_session is None:
return None
stmt = (
select(StorageEvent)
.filter(StorageEvent.app_name == app_name)
.filter(StorageEvent.session_id == storage_session.id)
.filter(StorageEvent.user_id == user_id)
)
if config and config.after_timestamp:
after_dt = datetime.fromtimestamp(config.after_timestamp)
stmt = stmt.filter(StorageEvent.timestamp >= after_dt)
stmt = stmt.order_by(StorageEvent.timestamp.desc())
if config and config.num_recent_events:
stmt = stmt.limit(config.num_recent_events)
result = await sql_session.execute(stmt)
storage_events = result.scalars().all()
# Fetch states from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)
app_state = storage_app_state.state if storage_app_state else {}
user_state = storage_user_state.state if storage_user_state else {}
session_state = storage_session.state
# Merge states
merged_state = _merge_state(app_state, user_state, session_state)
# Convert storage session to session
events = [e.to_event() for e in reversed(storage_events)]
session = storage_session.to_session(state=merged_state, events=events)| try: | ||
| stmt = select(StorageSession).filter( | ||
| StorageSession.app_name == app_name | ||
| ) | ||
| if user_id is not None: | ||
| stmt = stmt.filter(StorageSession.user_id == user_id) | ||
|
|
||
| result = await sql_session.execute(stmt) | ||
| results = result.scalars().all() | ||
| result = await sql_session.execute(stmt) | ||
| results = result.scalars().all() | ||
|
|
||
| # Fetch app state from storage | ||
| storage_app_state = await sql_session.get(StorageAppState, (app_name)) | ||
| app_state = storage_app_state.state if storage_app_state else {} | ||
| # Fetch app state from storage | ||
| storage_app_state = await sql_session.get(StorageAppState, (app_name)) | ||
| app_state = storage_app_state.state if storage_app_state else {} | ||
|
|
||
| # Fetch user state(s) from storage | ||
| user_states_map = {} | ||
| if user_id is not None: | ||
| storage_user_state = await sql_session.get( | ||
| StorageUserState, (app_name, user_id) | ||
| ) | ||
| if storage_user_state: | ||
| user_states_map[user_id] = storage_user_state.state | ||
| else: | ||
| user_state_stmt = select(StorageUserState).filter( | ||
| StorageUserState.app_name == app_name | ||
| ) | ||
| user_state_result = await sql_session.execute(user_state_stmt) | ||
| all_user_states_for_app = user_state_result.scalars().all() | ||
| for storage_user_state in all_user_states_for_app: | ||
| user_states_map[storage_user_state.user_id] = storage_user_state.state | ||
| # Fetch user state(s) from storage | ||
| user_states_map = {} | ||
| if user_id is not None: | ||
| storage_user_state = await sql_session.get( | ||
| StorageUserState, (app_name, user_id) | ||
| ) | ||
| if storage_user_state: | ||
| user_states_map[user_id] = storage_user_state.state | ||
| else: | ||
| user_state_stmt = select(StorageUserState).filter( | ||
| StorageUserState.app_name == app_name | ||
| ) | ||
| user_state_result = await sql_session.execute(user_state_stmt) | ||
| all_user_states_for_app = user_state_result.scalars().all() | ||
| for storage_user_state in all_user_states_for_app: | ||
| user_states_map[storage_user_state.user_id] = ( | ||
| storage_user_state.state | ||
| ) | ||
|
|
||
| sessions = [] | ||
| for storage_session in results: | ||
| session_state = storage_session.state | ||
| user_state = user_states_map.get(storage_session.user_id, {}) | ||
| merged_state = _merge_state(app_state, user_state, session_state) | ||
| sessions.append(storage_session.to_session(state=merged_state)) | ||
| return ListSessionsResponse(sessions=sessions) | ||
| sessions = [] | ||
| for storage_session in results: | ||
| session_state = storage_session.state | ||
| user_state = user_states_map.get(storage_session.user_id, {}) | ||
| merged_state = _merge_state(app_state, user_state, session_state) | ||
| sessions.append(storage_session.to_session(state=merged_state)) | ||
| return ListSessionsResponse(sessions=sessions) | ||
| except Exception: | ||
| if sql_session.in_transaction(): | ||
| await sql_session.rollback() | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try...except block is redundant. The async with context manager automatically handles rollback on exceptions, which is sufficient for this read-only operation.
stmt = select(StorageSession).filter(
StorageSession.app_name == app_name
)
if user_id is not None:
stmt = stmt.filter(StorageSession.user_id == user_id)
result = await sql_session.execute(stmt)
results = result.scalars().all()
# Fetch app state from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
app_state = storage_app_state.state if storage_app_state else {}
# Fetch user state(s) from storage
user_states_map = {}
if user_id is not None:
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)
if storage_user_state:
user_states_map[user_id] = storage_user_state.state
else:
user_state_stmt = select(StorageUserState).filter(
StorageUserState.app_name == app_name
)
user_state_result = await sql_session.execute(user_state_stmt)
all_user_states_for_app = user_state_result.scalars().all()
for storage_user_state in all_user_states_for_app:
user_states_map[storage_user_state.user_id] = (
storage_user_state.state
)
sessions = []
for storage_session in results:
session_state = storage_session.state
user_state = user_states_map.get(storage_session.user_id, {})
merged_state = _merge_state(app_state, user_state, session_state)
sessions.append(storage_session.to_session(state=merged_state))
return ListSessionsResponse(sessions=sessions)| try: | ||
| stmt = delete(StorageSession).where( | ||
| StorageSession.app_name == app_name, | ||
| StorageSession.user_id == user_id, | ||
| StorageSession.id == session_id, | ||
| ) | ||
| await sql_session.execute(stmt) | ||
| await self._commit_or_rollback(sql_session) | ||
| except Exception: | ||
| if sql_session.in_transaction(): | ||
| await sql_session.rollback() | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try...except block is redundant. The combination of the async with context manager for general exceptions and the _commit_or_rollback helper for commit-specific failures is sufficient to ensure correct transaction handling.
stmt = delete(StorageSession).where(
StorageSession.app_name == app_name,
StorageSession.user_id == user_id,
StorageSession.id == session_id,
)
await sql_session.execute(stmt)
await self._commit_or_rollback(sql_session)| try: | ||
| storage_session = await sql_session.get( | ||
| StorageSession, (session.app_name, session.user_id, session.id) | ||
| ) | ||
| app_state_delta = state_deltas["app"] | ||
| user_state_delta = state_deltas["user"] | ||
| session_state_delta = state_deltas["session"] | ||
| # Merge state and update storage | ||
| if app_state_delta: | ||
| storage_app_state.state = storage_app_state.state | app_state_delta | ||
| if user_state_delta: | ||
| storage_user_state.state = storage_user_state.state | user_state_delta | ||
| if session_state_delta: | ||
| storage_session.state = storage_session.state | session_state_delta | ||
|
|
||
| if storage_session._dialect_name == "sqlite": | ||
| update_time = datetime.fromtimestamp( | ||
| event.timestamp, timezone.utc | ||
| ).replace(tzinfo=None) | ||
| else: | ||
| update_time = datetime.fromtimestamp(event.timestamp) | ||
| storage_session.update_time = update_time | ||
| sql_session.add(StorageEvent.from_event(session, event)) | ||
| if storage_session.update_timestamp_tz > session.last_update_time: | ||
| raise ValueError( | ||
| "The last_update_time provided in the session object" | ||
| f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is" | ||
| " earlier than the update_time in the storage_session" | ||
| f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}." | ||
| " Please check if it is a stale session." | ||
| ) | ||
|
|
||
| await sql_session.commit() | ||
| await sql_session.refresh(storage_session) | ||
| # Fetch states from storage | ||
| storage_app_state = await sql_session.get( | ||
| StorageAppState, (session.app_name) | ||
| ) | ||
| storage_user_state = await sql_session.get( | ||
| StorageUserState, (session.app_name, session.user_id) | ||
| ) | ||
|
|
||
| # Update timestamp with commit time | ||
| session.last_update_time = storage_session.update_timestamp_tz | ||
| # Extract state delta | ||
| if event.actions and event.actions.state_delta: | ||
| state_deltas = _session_util.extract_state_delta( | ||
| event.actions.state_delta | ||
| ) | ||
| app_state_delta = state_deltas["app"] | ||
| user_state_delta = state_deltas["user"] | ||
| session_state_delta = state_deltas["session"] | ||
| # Merge state and update storage | ||
| if app_state_delta: | ||
| storage_app_state.state = storage_app_state.state | app_state_delta | ||
| if user_state_delta: | ||
| storage_user_state.state = ( | ||
| storage_user_state.state | user_state_delta | ||
| ) | ||
| if session_state_delta: | ||
| storage_session.state = storage_session.state | session_state_delta | ||
|
|
||
| if storage_session._dialect_name == "sqlite": | ||
| update_time = datetime.fromtimestamp( | ||
| event.timestamp, timezone.utc | ||
| ).replace(tzinfo=None) | ||
| else: | ||
| update_time = datetime.fromtimestamp(event.timestamp) | ||
| storage_session.update_time = update_time | ||
| sql_session.add(StorageEvent.from_event(session, event)) | ||
|
|
||
| await self._commit_or_rollback(sql_session) | ||
| await sql_session.refresh(storage_session) | ||
|
|
||
| # Update timestamp with commit time | ||
| session.last_update_time = storage_session.update_timestamp_tz | ||
| except Exception: | ||
| if sql_session.in_transaction(): | ||
| await sql_session.rollback() | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try...except block is redundant. The async with context manager and the _commit_or_rollback helper already provide the necessary transaction safety and rollback guarantees.
storage_session = await sql_session.get(
StorageSession, (session.app_name, session.user_id, session.id)
)
if storage_session.update_timestamp_tz > session.last_update_time:
raise ValueError(
"The last_update_time provided in the session object"
f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is"
" earlier than the update_time in the storage_session"
f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}."
" Please check if it is a stale session."
)
# Fetch states from storage
storage_app_state = await sql_session.get(
StorageAppState, (session.app_name)
)
storage_user_state = await sql_session.get(
StorageUserState, (session.app_name, session.user_id)
)
# Extract state delta
if event.actions and event.actions.state_delta:
state_deltas = _session_util.extract_state_delta(
event.actions.state_delta
)
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state_delta = state_deltas["session"]
# Merge state and update storage
if app_state_delta:
storage_app_state.state = storage_app_state.state | app_state_delta
if user_state_delta:
storage_user_state.state = (
storage_user_state.state | user_state_delta
)
if session_state_delta:
storage_session.state = storage_session.state | session_state_delta
if storage_session._dialect_name == "sqlite":
update_time = datetime.fromtimestamp(
event.timestamp, timezone.utc
).replace(tzinfo=None)
else:
update_time = datetime.fromtimestamp(event.timestamp)
storage_session.update_time = update_time
sql_session.add(StorageEvent.from_event(session, event))
await self._commit_or_rollback(sql_session)
await sql_session.refresh(storage_session)
# Update timestamp with commit time
session.last_update_time = storage_session.update_timestamp_tz
Why the
|
|
/gemini review |
|
It looks like there might be a typo in your command. Did you mean to type |
yes |
|
Understood! I will initiate a new review for you now. |
|
Hi @sivakumar-77 , Thank you for your contribution through this pull request! This PR has merge conflicts that require changes from your end. Could you please rebase your branch with the latest main branch to address these? Once this is complete, please let us know so we can proceed with the review. |
|
/gemini review |
conflict resolved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly introduces a _commit_or_rollback helper to ensure transactions are rolled back on commit failure, addressing a potential connection pool exhaustion issue. The accompanying unit test effectively validates this new helper. However, the implementation has introduced some critical issues in the append_event method, including the removal of a vital stale session check and a copy-paste error that will cause a runtime failure. Additionally, there is significant code duplication in the new try...except error handling blocks across multiple methods, which should be refactored for better maintainability. While the core idea is sound, the identified critical issues must be addressed before merging.
| storage_session = await sql_session.get( | ||
| StorageSession, (session.app_name, session.user_id, session.id) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The critical check for stale sessions, which prevents race conditions by comparing session.last_update_time with the timestamp in storage, has been removed during this refactoring. This could lead to data corruption if multiple updates happen concurrently. Please reintroduce this validation after fetching storage_session.
| storage_session = await sql_session.get( | |
| StorageSession, (session.app_name, session.user_id, session.id) | |
| ) | |
| storage_session = await sql_session.get( | |
| StorageSession, (session.app_name, session.user_id, session.id) | |
| ) | |
| if storage_session.update_timestamp_tz > session.last_update_time: | |
| raise ValueError( | |
| "The last_update_time provided in the session object" | |
| f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is" | |
| " earlier than the update_time in the storage_session" | |
| f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}." | |
| " Please check if it is a stale session." | |
| ) |
| if storage_session._dialect_name == "sqlite": | ||
| update_time = datetime.utcfromtimestamp(event.timestamp) | ||
| else: | ||
| update_time = datetime.fromtimestamp(event.timestamp) | ||
| storage_session.update_time = update_time | ||
| sql_session.add(StorageEvent.from_event(session, event)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of code appears to be a copy-paste error from the refactoring. It's misplaced outside the try block and will cause a runtime UnboundLocalError because storage_session is not defined in this scope. This code is also duplicated later within the try block (lines 764-771). This entire block should be removed.
| except Exception: | ||
| if sql_session.in_transaction(): | ||
| await sql_session.rollback() | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except block for rolling back transactions on failure is duplicated across several methods (create_session, get_session, list_sessions, delete_session, append_event). This introduces significant boilerplate and makes the code harder to maintain.
Consider refactoring this common pattern into a reusable component, such as a custom asynccontextmanager. This would centralize the transaction management logic and make the service methods cleaner.
For example, you could define a context manager like this:
from contextlib import asynccontextmanager
@asynccontextmanager
async def _safe_db_transaction(self):
"""Provides a session with automatic rollback on failure."""
async with self.database_session_factory() as sql_session:
try:
yield sql_session
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raiseAnd then use it in each method:
async def create_session(self, ...) -> Session:
await self._ensure_tables_created()
async with self._safe_db_transaction() as sql_session:
# ... method-specific logic ...
await self._commit_or_rollback(sql_session)
# ...
return sessionThis would remove the duplicated try...except blocks from each method.
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
DatabaseSessionServicewas not reliably rolling back database sessions whencommit()failed, leaving connections in a "pending rollback" state and eventually exhausting the connection pool under load (see #3328). This manifested as errors likesqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled backand idle transactions remaining open after failures.Solution:
_commit_or_rollbackhelper inDatabaseSessionServicethat:commit()and, on any exception, performs arollback()and then re-raises the original error to preserve failure semantics.DatabaseSessionService(create_session,delete_session,append_event, etc.) to use_commit_or_rollbackinstead of callingcommit()directly.test_session_service.pythat monkeypatchescommit()to fail and verifies that:_commit_or_rollbackre-raises the error, androllback()is invoked, ensuring transactions are not left in a pending state.This aligns
DatabaseSessionServicebehavior with the expectations for production persistence and prevents connection leaks under error conditions.Testing Plan
Unit Tests:
Summary of
pytestresults:All above tests passed locally.
Manual End-to-End (E2E) Tests:
To manually exercise the behavior:
DatabaseSessionServicewith a SQLite or PostgreSQL DB URL.DatabaseSessionServiceas session storage.Checklist
Additional context
This change directly addresses the connection / transaction lifecycle issue described in #3328, ensuring that failed commits in
DatabaseSessionServicedo not leave sessions in an invalid or pending-rollback state.