Skip to content

Conversation

@sivakumar-77
Copy link

Please ensure you have read the contribution guide before creating a pull request.

Link to Issue or Description of Change

1. Link to an existing issue (if applicable):

2. Or, if no issue exists, describe the change:

Problem:

DatabaseSessionService was not reliably rolling back database sessions when commit() failed, leaving connections in a "pending rollback" state and eventually exhausting the connection pool under load (see #3328). This manifested as errors like sqlalchemy.exc.PendingRollbackError: Can't reconnect until invalid transaction is rolled back and idle transactions remaining open after failures.

Solution:

  • Introduced a _commit_or_rollback helper in DatabaseSessionService that:
    • Attempts to commit() and, on any exception, performs a rollback() and then re-raises the original error to preserve failure semantics.
  • Updated all write paths in DatabaseSessionService (create_session, delete_session, append_event, etc.) to use _commit_or_rollback instead of calling commit() directly.
  • Added a unit test in test_session_service.py that monkeypatches commit() to fail and verifies that:
    • _commit_or_rollback re-raises the error, and
    • rollback() is invoked, ensuring transactions are not left in a pending state.

This aligns DatabaseSessionService behavior with the expectations for production persistence and prevents connection leaks under error conditions.

Testing Plan

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.

Summary of pytest results:

pytest tests/unittests/sessions/test_session_service.py -v

All above tests passed locally.

Manual End-to-End (E2E) Tests:

To manually exercise the behavior:

  1. Configure DatabaseSessionService with a SQLite or PostgreSQL DB URL.
  2. Run an agent using DatabaseSessionService as session storage.
  3. Inject a failure on commit (e.g., temporarily simulate a constraint violation or use a local patch that forces commit to raise).
  4. Observe that:
    • Requests now fail with a clear error, and
    • No “pending rollback” / idle-in-transaction sessions accumulate in the DB.

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end.
  • Any dependent changes have been merged and published in downstream modules.

Additional context

This change directly addresses the connection / transaction lifecycle issue described in #3328, ensuring that failed commits in DatabaseSessionService do not leave sessions in an invalid or pending-rollback state.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @sivakumar-77, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical stability issue in the DatabaseSessionService where database connections could become exhausted due to unhandled commit failures. By introducing a robust commit-or-rollback mechanism and enhancing error handling across all database write operations, the change ensures that transactions are always properly managed, preventing connection leaks and improving the overall reliability of the session storage system.

Highlights

  • Reliable Session Rollback: Introduced a new helper method, _commit_or_rollback, in DatabaseSessionService to ensure that database transactions are reliably rolled back if a commit operation fails. This prevents connections from being left in a 'pending rollback' state and exhausting the connection pool.
  • Integration Across Write Paths: All critical write operations within DatabaseSessionService, including create_session, delete_session, append_event, and _ensure_tables_created, have been updated to utilize the new _commit_or_rollback helper, standardizing error handling for commit failures.
  • Comprehensive Error Handling: Implemented try...except blocks around the main logic in various session management methods (create_session, get_session, list_sessions, delete_session, append_event) to catch any exceptions that might occur during database operations and explicitly call sql_session.rollback() if a transaction is active, further safeguarding against orphaned transactions.
  • New Unit Test: Added a dedicated unit test that simulates a commit() failure and verifies that the _commit_or_rollback method correctly triggers a rollback() and re-raises the original exception, confirming the intended behavior.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@adk-bot adk-bot added the services [Component] This issue is related to runtime services, e.g. sessions, memory, artifacts, etc label Nov 30, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a critical issue by ensuring database sessions are rolled back on commit failures, preventing connection pool exhaustion. The introduction of the _commit_or_rollback helper is a clean solution, and the accompanying unit test is well-written and correctly validates the new behavior. However, the changes also introduce several redundant try...except blocks around the main logic in the service methods. These blocks duplicate the rollback-on-exception behavior already provided by SQLAlchemy's async with session context manager. I've left specific comments with suggestions to remove this redundant code to improve maintainability.

Comment on lines 423 to 478
try:
if session_id and await sql_session.get(
StorageSession, (app_name, user_id, session_id)
):
raise AlreadyExistsError(
f"Session with id {session_id} already exists."
)
# Fetch app and user states from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)
# Fetch app and user states from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)

# Create state tables if not exist
if not storage_app_state:
storage_app_state = StorageAppState(app_name=app_name, state={})
sql_session.add(storage_app_state)
if not storage_user_state:
storage_user_state = StorageUserState(
app_name=app_name, user_id=user_id, state={}

# Create state tables if not exist
if not storage_app_state:
storage_app_state = StorageAppState(app_name=app_name, state={})
sql_session.add(storage_app_state)
if not storage_user_state:
storage_user_state = StorageUserState(
app_name=app_name, user_id=user_id, state={}
)
sql_session.add(storage_user_state)

# Extract state deltas
state_deltas = _session_util.extract_state_delta(state)
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state = state_deltas["session"]

# Apply state delta
if app_state_delta:
storage_app_state.state = storage_app_state.state | app_state_delta
if user_state_delta:
storage_user_state.state = storage_user_state.state | user_state_delta

# Store the session
storage_session = StorageSession(
app_name=app_name,
user_id=user_id,
id=session_id,
state=session_state,
)
sql_session.add(storage_user_state)

# Extract state deltas
state_deltas = _session_util.extract_state_delta(state)
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state = state_deltas["session"]

# Apply state delta
if app_state_delta:
storage_app_state.state = storage_app_state.state | app_state_delta
if user_state_delta:
storage_user_state.state = storage_user_state.state | user_state_delta

# Store the session
storage_session = StorageSession(
app_name=app_name,
user_id=user_id,
id=session_id,
state=session_state,
)
sql_session.add(storage_session)
await sql_session.commit()
sql_session.add(storage_session)
await self._commit_or_rollback(sql_session)

await sql_session.refresh(storage_session)
await sql_session.refresh(storage_session)

# Merge states for response
merged_state = _merge_state(
storage_app_state.state, storage_user_state.state, session_state
)
session = storage_session.to_session(state=merged_state)
# Merge states for response
merged_state = _merge_state(
storage_app_state.state, storage_user_state.state, session_state
)
session = storage_session.to_session(state=merged_state)
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The outer try...except block that manually rolls back the transaction is redundant. The async with self.database_session_factory() as sql_session: context manager already ensures that the transaction is rolled back if any exception occurs within the block. The new _commit_or_rollback helper correctly handles the specific case of a commit() failure. Relying on the async with context manager for automatic rollback simplifies the code and removes unnecessary boilerplate.

      if session_id and await sql_session.get(
          StorageSession, (app_name, user_id, session_id)
      ):
        raise AlreadyExistsError(
            f"Session with id {session_id} already exists."
        )
      # Fetch app and user states from storage
      storage_app_state = await sql_session.get(StorageAppState, (app_name))
      storage_user_state = await sql_session.get(
          StorageUserState, (app_name, user_id)
      )

      # Create state tables if not exist
      if not storage_app_state:
        storage_app_state = StorageAppState(app_name=app_name, state={})
        sql_session.add(storage_app_state)
      if not storage_user_state:
        storage_user_state = StorageUserState(
            app_name=app_name, user_id=user_id, state={}
        )
        sql_session.add(storage_user_state)

      # Extract state deltas
      state_deltas = _session_util.extract_state_delta(state)
      app_state_delta = state_deltas["app"]
      user_state_delta = state_deltas["user"]
      session_state = state_deltas["session"]

      # Apply state delta
      if app_state_delta:
        storage_app_state.state = storage_app_state.state | app_state_delta
      if user_state_delta:
        storage_user_state.state = storage_user_state.state | user_state_delta

      # Store the session
      storage_session = StorageSession(
          app_name=app_name,
          user_id=user_id,
          id=session_id,
          state=session_state,
      )
      sql_session.add(storage_session)
      await self._commit_or_rollback(sql_session)

      await sql_session.refresh(storage_session)

      # Merge states for response
      merged_state = _merge_state(
          storage_app_state.state, storage_user_state.state, session_state
      )
      session = storage_session.to_session(state=merged_state)

Comment on lines 495 to 540
try:
storage_session = await sql_session.get(
StorageSession, (app_name, user_id, session_id)
)
if storage_session is None:
return None

stmt = (
select(StorageEvent)
.filter(StorageEvent.app_name == app_name)
.filter(StorageEvent.session_id == storage_session.id)
.filter(StorageEvent.user_id == user_id)
)

if config and config.after_timestamp:
after_dt = datetime.fromtimestamp(config.after_timestamp)
stmt = stmt.filter(StorageEvent.timestamp >= after_dt)

stmt = stmt.order_by(StorageEvent.timestamp.desc())

if config and config.num_recent_events:
stmt = stmt.limit(config.num_recent_events)

result = await sql_session.execute(stmt)
storage_events = result.scalars().all()

# Fetch states from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)

app_state = storage_app_state.state if storage_app_state else {}
user_state = storage_user_state.state if storage_user_state else {}
session_state = storage_session.state

# Merge states
merged_state = _merge_state(app_state, user_state, session_state)

# Convert storage session to session
events = [e.to_event() for e in reversed(storage_events)]
session = storage_session.to_session(state=merged_state, events=events)
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This try...except block is redundant. Since get_session is a read-only operation and doesn't perform a commit, there's no need for this manual rollback logic. The async with context manager will correctly handle the transaction lifecycle, including rolling back (and thus releasing the connection) on error.

      storage_session = await sql_session.get(
          StorageSession, (app_name, user_id, session_id)
      )
      if storage_session is None:
        return None

      stmt = (
          select(StorageEvent)
          .filter(StorageEvent.app_name == app_name)
          .filter(StorageEvent.session_id == storage_session.id)
          .filter(StorageEvent.user_id == user_id)
      )

      if config and config.after_timestamp:
        after_dt = datetime.fromtimestamp(config.after_timestamp)
        stmt = stmt.filter(StorageEvent.timestamp >= after_dt)

      stmt = stmt.order_by(StorageEvent.timestamp.desc())

      if config and config.num_recent_events:
        stmt = stmt.limit(config.num_recent_events)

      result = await sql_session.execute(stmt)
      storage_events = result.scalars().all()

      # Fetch states from storage
      storage_app_state = await sql_session.get(StorageAppState, (app_name))
      storage_user_state = await sql_session.get(
          StorageUserState, (app_name, user_id)
      )

      app_state = storage_app_state.state if storage_app_state else {}
      user_state = storage_user_state.state if storage_user_state else {}
      session_state = storage_session.state

      # Merge states
      merged_state = _merge_state(app_state, user_state, session_state)

      # Convert storage session to session
      events = [e.to_event() for e in reversed(storage_events)]
      session = storage_session.to_session(state=merged_state, events=events)

Comment on lines 549 to 592
try:
stmt = select(StorageSession).filter(
StorageSession.app_name == app_name
)
if user_id is not None:
stmt = stmt.filter(StorageSession.user_id == user_id)

result = await sql_session.execute(stmt)
results = result.scalars().all()
result = await sql_session.execute(stmt)
results = result.scalars().all()

# Fetch app state from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
app_state = storage_app_state.state if storage_app_state else {}
# Fetch app state from storage
storage_app_state = await sql_session.get(StorageAppState, (app_name))
app_state = storage_app_state.state if storage_app_state else {}

# Fetch user state(s) from storage
user_states_map = {}
if user_id is not None:
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)
if storage_user_state:
user_states_map[user_id] = storage_user_state.state
else:
user_state_stmt = select(StorageUserState).filter(
StorageUserState.app_name == app_name
)
user_state_result = await sql_session.execute(user_state_stmt)
all_user_states_for_app = user_state_result.scalars().all()
for storage_user_state in all_user_states_for_app:
user_states_map[storage_user_state.user_id] = storage_user_state.state
# Fetch user state(s) from storage
user_states_map = {}
if user_id is not None:
storage_user_state = await sql_session.get(
StorageUserState, (app_name, user_id)
)
if storage_user_state:
user_states_map[user_id] = storage_user_state.state
else:
user_state_stmt = select(StorageUserState).filter(
StorageUserState.app_name == app_name
)
user_state_result = await sql_session.execute(user_state_stmt)
all_user_states_for_app = user_state_result.scalars().all()
for storage_user_state in all_user_states_for_app:
user_states_map[storage_user_state.user_id] = (
storage_user_state.state
)

sessions = []
for storage_session in results:
session_state = storage_session.state
user_state = user_states_map.get(storage_session.user_id, {})
merged_state = _merge_state(app_state, user_state, session_state)
sessions.append(storage_session.to_session(state=merged_state))
return ListSessionsResponse(sessions=sessions)
sessions = []
for storage_session in results:
session_state = storage_session.state
user_state = user_states_map.get(storage_session.user_id, {})
merged_state = _merge_state(app_state, user_state, session_state)
sessions.append(storage_session.to_session(state=merged_state))
return ListSessionsResponse(sessions=sessions)
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This try...except block is redundant. The async with context manager automatically handles rollback on exceptions, which is sufficient for this read-only operation.

      stmt = select(StorageSession).filter(
          StorageSession.app_name == app_name
      )
      if user_id is not None:
        stmt = stmt.filter(StorageSession.user_id == user_id)

      result = await sql_session.execute(stmt)
      results = result.scalars().all()

      # Fetch app state from storage
      storage_app_state = await sql_session.get(StorageAppState, (app_name))
      app_state = storage_app_state.state if storage_app_state else {}

      # Fetch user state(s) from storage
      user_states_map = {}
      if user_id is not None:
        storage_user_state = await sql_session.get(
            StorageUserState, (app_name, user_id)
        )
        if storage_user_state:
          user_states_map[user_id] = storage_user_state.state
      else:
        user_state_stmt = select(StorageUserState).filter(
            StorageUserState.app_name == app_name
        )
        user_state_result = await sql_session.execute(user_state_stmt)
        all_user_states_for_app = user_state_result.scalars().all()
        for storage_user_state in all_user_states_for_app:
          user_states_map[storage_user_state.user_id] = (
              storage_user_state.state
          )

      sessions = []
      for storage_session in results:
        session_state = storage_session.state
        user_state = user_states_map.get(storage_session.user_id, {})
        merged_state = _merge_state(app_state, user_state, session_state)
        sessions.append(storage_session.to_session(state=merged_state))
      return ListSessionsResponse(sessions=sessions)

Comment on lines +600 to +611
try:
stmt = delete(StorageSession).where(
StorageSession.app_name == app_name,
StorageSession.user_id == user_id,
StorageSession.id == session_id,
)
await sql_session.execute(stmt)
await self._commit_or_rollback(sql_session)
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This try...except block is redundant. The combination of the async with context manager for general exceptions and the _commit_or_rollback helper for commit-specific failures is sufficient to ensure correct transaction handling.

      stmt = delete(StorageSession).where(
          StorageSession.app_name == app_name,
          StorageSession.user_id == user_id,
          StorageSession.id == session_id,
      )
      await sql_session.execute(stmt)
      await self._commit_or_rollback(sql_session)

Comment on lines 626 to 683
try:
storage_session = await sql_session.get(
StorageSession, (session.app_name, session.user_id, session.id)
)
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state_delta = state_deltas["session"]
# Merge state and update storage
if app_state_delta:
storage_app_state.state = storage_app_state.state | app_state_delta
if user_state_delta:
storage_user_state.state = storage_user_state.state | user_state_delta
if session_state_delta:
storage_session.state = storage_session.state | session_state_delta

if storage_session._dialect_name == "sqlite":
update_time = datetime.fromtimestamp(
event.timestamp, timezone.utc
).replace(tzinfo=None)
else:
update_time = datetime.fromtimestamp(event.timestamp)
storage_session.update_time = update_time
sql_session.add(StorageEvent.from_event(session, event))
if storage_session.update_timestamp_tz > session.last_update_time:
raise ValueError(
"The last_update_time provided in the session object"
f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is"
" earlier than the update_time in the storage_session"
f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}."
" Please check if it is a stale session."
)

await sql_session.commit()
await sql_session.refresh(storage_session)
# Fetch states from storage
storage_app_state = await sql_session.get(
StorageAppState, (session.app_name)
)
storage_user_state = await sql_session.get(
StorageUserState, (session.app_name, session.user_id)
)

# Update timestamp with commit time
session.last_update_time = storage_session.update_timestamp_tz
# Extract state delta
if event.actions and event.actions.state_delta:
state_deltas = _session_util.extract_state_delta(
event.actions.state_delta
)
app_state_delta = state_deltas["app"]
user_state_delta = state_deltas["user"]
session_state_delta = state_deltas["session"]
# Merge state and update storage
if app_state_delta:
storage_app_state.state = storage_app_state.state | app_state_delta
if user_state_delta:
storage_user_state.state = (
storage_user_state.state | user_state_delta
)
if session_state_delta:
storage_session.state = storage_session.state | session_state_delta

if storage_session._dialect_name == "sqlite":
update_time = datetime.fromtimestamp(
event.timestamp, timezone.utc
).replace(tzinfo=None)
else:
update_time = datetime.fromtimestamp(event.timestamp)
storage_session.update_time = update_time
sql_session.add(StorageEvent.from_event(session, event))

await self._commit_or_rollback(sql_session)
await sql_session.refresh(storage_session)

# Update timestamp with commit time
session.last_update_time = storage_session.update_timestamp_tz
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This try...except block is redundant. The async with context manager and the _commit_or_rollback helper already provide the necessary transaction safety and rollback guarantees.

      storage_session = await sql_session.get(
          StorageSession, (session.app_name, session.user_id, session.id)
      )

      if storage_session.update_timestamp_tz > session.last_update_time:
        raise ValueError(
            "The last_update_time provided in the session object"
            f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is"
            " earlier than the update_time in the storage_session"
            f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}."
            " Please check if it is a stale session."
        )

      # Fetch states from storage
      storage_app_state = await sql_session.get(
          StorageAppState, (session.app_name)
      )
      storage_user_state = await sql_session.get(
          StorageUserState, (session.app_name, session.user_id)
      )

      # Extract state delta
      if event.actions and event.actions.state_delta:
        state_deltas = _session_util.extract_state_delta(
            event.actions.state_delta
        )
        app_state_delta = state_deltas["app"]
        user_state_delta = state_deltas["user"]
        session_state_delta = state_deltas["session"]
        # Merge state and update storage
        if app_state_delta:
          storage_app_state.state = storage_app_state.state | app_state_delta
        if user_state_delta:
          storage_user_state.state = (
              storage_user_state.state | user_state_delta
          )
        if session_state_delta:
          storage_session.state = storage_session.state | session_state_delta

      if storage_session._dialect_name == "sqlite":
        update_time = datetime.fromtimestamp(
            event.timestamp, timezone.utc
        ).replace(tzinfo=None)
      else:
        update_time = datetime.fromtimestamp(event.timestamp)
      storage_session.update_time = update_time
      sql_session.add(StorageEvent.from_event(session, event))

      await self._commit_or_rollback(sql_session)
      await sql_session.refresh(storage_session)

      # Update timestamp with commit time
      session.last_update_time = storage_session.update_timestamp_tz

@ryanaiagent ryanaiagent self-assigned this Dec 2, 2025
@sivakumar-77
Copy link
Author

sivakumar-77 commented Dec 3, 2025

@ryanaiagent

Why the try/except is still useful

  • _commit_or_rollback() only handles failures that happen during commit().
    Any exception before that point such as during get(...), state merging, add(...), or timestamp handling will not be caught by _commit_or_rollback().

  • For these pre-commit failures, the outer except block (with the in_transaction() guard) ensures that you explicitly roll back any transaction that was started, instead of relying on implicit behavior from the session context manager.

  • For actual commit failures, _commit_or_rollback() already performs the rollback, so the in_transaction() check becomes effectively a no-op.
    However, it is harmless and keeps a consistent coding pattern across all methods.

@gemini-code-assist review

@sivakumar-77
Copy link
Author

sivakumar-77 commented Dec 3, 2025

/gemini review

@gemini-code-assist
Copy link
Contributor

It looks like there might be a typo in your command. Did you mean to type /gemini review? If so, please let me know and I can initiate a new review for you.

@sivakumar-77
Copy link
Author

It looks like there might be a typo in your command. Did you mean to type /gemini review? If so, please let me know and I can initiate a new review for you.

yes

@gemini-code-assist
Copy link
Contributor

Understood! I will initiate a new review for you now.

@ryanaiagent
Copy link
Collaborator

Hi @sivakumar-77 , Thank you for your contribution through this pull request! This PR has merge conflicts that require changes from your end. Could you please rebase your branch with the latest main branch to address these? Once this is complete, please let us know so we can proceed with the review.

@ryanaiagent ryanaiagent added the request clarification [Status] The maintainer need clarification or more information from the author label Dec 4, 2025
@sivakumar-77
Copy link
Author

/gemini review

@sivakumar-77
Copy link
Author

Hi @sivakumar-77 , Thank you for your contribution through this pull request! This PR has merge conflicts that require changes from your end. Could you please rebase your branch with the latest main branch to address these? Once this is complete, please let us know so we can proceed with the review.

conflict resolved

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly introduces a _commit_or_rollback helper to ensure transactions are rolled back on commit failure, addressing a potential connection pool exhaustion issue. The accompanying unit test effectively validates this new helper. However, the implementation has introduced some critical issues in the append_event method, including the removal of a vital stale session check and a copy-paste error that will cause a runtime failure. Additionally, there is significant code duplication in the new try...except error handling blocks across multiple methods, which should be refactored for better maintainability. While the core idea is sound, the identified critical issues must be addressed before merging.

Comment on lines +727 to 729
storage_session = await sql_session.get(
StorageSession, (session.app_name, session.user_id, session.id)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The critical check for stale sessions, which prevents race conditions by comparing session.last_update_time with the timestamp in storage, has been removed during this refactoring. This could lead to data corruption if multiple updates happen concurrently. Please reintroduce this validation after fetching storage_session.

Suggested change
storage_session = await sql_session.get(
StorageSession, (session.app_name, session.user_id, session.id)
)
storage_session = await sql_session.get(
StorageSession, (session.app_name, session.user_id, session.id)
)
if storage_session.update_timestamp_tz > session.last_update_time:
raise ValueError(
"The last_update_time provided in the session object"
f" {datetime.fromtimestamp(session.last_update_time):'%Y-%m-%d %H:%M:%S'} is"
" earlier than the update_time in the storage_session"
f" {datetime.fromtimestamp(storage_session.update_timestamp_tz):'%Y-%m-%d %H:%M:%S'}."
" Please check if it is a stale session."
)

Comment on lines 731 to 736
if storage_session._dialect_name == "sqlite":
update_time = datetime.utcfromtimestamp(event.timestamp)
else:
update_time = datetime.fromtimestamp(event.timestamp)
storage_session.update_time = update_time
sql_session.add(StorageEvent.from_event(session, event))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This block of code appears to be a copy-paste error from the refactoring. It's misplaced outside the try block and will cause a runtime UnboundLocalError because storage_session is not defined in this scope. This code is also duplicated later within the try block (lines 764-771). This entire block should be removed.

Comment on lines +575 to +578
except Exception:
if sql_session.in_transaction():
await sql_session.rollback()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The try...except block for rolling back transactions on failure is duplicated across several methods (create_session, get_session, list_sessions, delete_session, append_event). This introduces significant boilerplate and makes the code harder to maintain.

Consider refactoring this common pattern into a reusable component, such as a custom asynccontextmanager. This would centralize the transaction management logic and make the service methods cleaner.

For example, you could define a context manager like this:

from contextlib import asynccontextmanager

@asynccontextmanager
async def _safe_db_transaction(self):
    """Provides a session with automatic rollback on failure."""
    async with self.database_session_factory() as sql_session:
        try:
            yield sql_session
        except Exception:
            if sql_session.in_transaction():
                await sql_session.rollback()
            raise

And then use it in each method:

async def create_session(self, ...) -> Session:
    await self._ensure_tables_created()
    async with self._safe_db_transaction() as sql_session:
        # ... method-specific logic ...
        await self._commit_or_rollback(sql_session)
        # ...
    return session

This would remove the duplicated try...except blocks from each method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

request clarification [Status] The maintainer need clarification or more information from the author services [Component] This issue is related to runtime services, e.g. sessions, memory, artifacts, etc

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DatabaseSessionSerive sessions are not properly rolled back.

3 participants