-
Notifications
You must be signed in to change notification settings - Fork 3
fix: Add generic mechanism to cancel queries on exception #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Add generic mechanism to cancel queries on exception #64
Conversation
|
Note Reviews pausedUse the following commands to manage reviews:
📝 WalkthroughWalkthroughAdds cursor-tracking wrappers to SQL execution: Sequence Diagram(s)sequenceDiagram
participant Client
participant Exec as _execute_sql_on_engine
participant TrackerConn as CursorTrackingSQLAlchemyConnection
participant DBAPIConn as DB-API Connection
participant Cursor
participant Database
participant QueryJob as Optional QueryJob
Client->>Exec: request execution(sql, engine)
Exec->>TrackerConn: obtain wrapped connection
TrackerConn->>DBAPIConn: install DB-API wrapper (tracks cursors)
Exec->>TrackerConn: execute(sql)
TrackerConn->>Cursor: cursor() created and registered
Cursor->>Database: run query (long-running)
alt External cancellation (KeyboardInterrupt)
Client->>Exec: KeyboardInterrupt
Exec->>TrackerConn: cancel_all_cursors()
TrackerConn->>Cursor: invoke cursor.cancel()
opt query job present
Exec->>QueryJob: query_job.cancel()
end
Cursor->>Database: request cancel
Database-->>Cursor: cancelled
Exec-->>Client: propagate interrupt/exception
else Normal completion
Database-->>Cursor: results
Cursor-->>Exec: return results
Exec-->>Client: return results
end
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
|
📦 Python package built successfully!
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #64 +/- ##
==========================================
+ Coverage 73.39% 73.50% +0.10%
==========================================
Files 93 93
Lines 5206 5253 +47
Branches 758 764 +6
==========================================
+ Hits 3821 3861 +40
- Misses 1143 1147 +4
- Partials 242 245 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
🚀 Review App Deployment Started
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@deepnote_toolkit/sql/sql_execution.py`:
- Around line 525-540: The cursor tracking currently uses a WeakSet and will
raise TypeError for non‑weakrefable cursor objects; update __init__ to also
create a fallback strong set (e.g., self._self_strong_cursor_set = set()),
change cursor(self, *args, **kwargs) to attempt
self._self_cursor_registry.add(cursor) inside a try/except TypeError and on
exception add the cursor to self._self_strong_cursor_set, and modify
cancel_all_cursors() to iterate both self._self_cursor_registry and
self._self_strong_cursor_set to call _cancel_cursor(cursor) while catching
errors, then clear both collections after attempting cancellation.
- Around line 522-597: Add explicit typing to the new wrappers and helpers:
annotate CursorTrackingDBAPIConnection.__init__(self, wrapped: Any,
cursor_registry: Optional[weakref.WeakSet] = None) -> None,
CursorTrackingDBAPIConnection.cursor(self, *args: Any, **kwargs: Any) -> Any,
and CursorTrackingDBAPIConnection.cancel_all_cursors(self) -> None; annotate
CursorTrackingSQLAlchemyConnection.__init__(self, wrapped: Any) -> None,
_install_dbapi_wrapper(self) -> None, and cancel_all_cursors(self) -> None;
annotate _cancel_cursor(cursor: Any) -> None (and update _execute_sql_on_engine
signature similarly if it accepts nullable/Any types). Use Optional[T] for
parameters that can be None and import typing names (Any, Optional) as needed.
Ensure return types use -> None or -> Any where appropriate.
- Around line 555-566: The _install_dbapi_wrapper currently replaces
self.__wrapped__._dbapi_connection unconditionally, which can create chains when
it's already a CursorTrackingDBAPIConnection; update _install_dbapi_wrapper to
check the existing self.__wrapped__._dbapi_connection first, detect if it's an
instance of CursorTrackingDBAPIConnection (or exposes the registry/registry
holder), and if so reuse that wrapper by attaching/merging self._self_cursors
into the existing wrapper's registry instead of rewrapping; only create a new
CursorTrackingDBAPIConnection when the current _dbapi_connection is a raw DBAPI
connection (not already wrapped).
In `@tests/unit/test_sql_execution_internal.py`:
- Around line 12-33: Add explicit type hints to the
_setup_mock_engine_with_cursor helper: annotate the parameter mock_cursor as
unittest.mock.Mock (or typing.Any) and the return type as unittest.mock.Mock (or
sqlalchemy.engine.Connection) to match what the function returns; also type the
inner mock_exec_driver_sql callable (e.g., def mock_exec_driver_sql(sql: str,
*args: Any) -> unittest.mock.Mock) and annotate mock_sa_connection and
mock_dbapi_connection variables where declared to clarify their types. Ensure
you import typing.Any and unittest.mock.Mock (or appropriate sqlalchemy types)
at the top of the test file and update the function signature and nested
function signature accordingly.
…hen-cancelling-block-execution-3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@deepnote_toolkit/sql/sql_execution.py`:
- Around line 576-582: The _cancel_cursor function currently only attempts
cursor.cancel() but the tests expect cursor.query_job.cancel() to be invoked for
BigQuery cursors; update _cancel_cursor to also check for a query_job attribute
and call cursor.query_job.cancel() when available (e.g., if hasattr(cursor,
"query_job") and hasattr(cursor.query_job, "cancel") then call it), while
preserving the existing try/except best-effort behavior and still attempting
cursor.cancel() when present; ensure both calls are wrapped in the same
exception handling so failures are ignored as before.
In `@tests/unit/test_sql_execution_internal.py`:
- Around line 82-99: The test fails because when a KeyboardInterrupt occurs
_execute_sql_on_engine (via _cancel_cursor) only calls cursor.cancel() but does
not cancel cursor.query_job; update the cancellation logic so that
_cancel_cursor (or the cancellation path inside _execute_sql_on_engine) checks
for a BigQuery query job on the cursor (e.g., cursor.query_job) and calls its
cancel() method before/after calling cursor.cancel(), ensuring both
mock_query_job.cancel() and cursor.cancel() are invoked.
|
@coderabbitai pause |
✅ Actions performedReviews paused. |
de3fa24 to
5595abc
Compare
m1so
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with a minor concern about accessing private attributes and SQLAlchemy 1.x and 2.x compatibility
This add generic system to cancel query on exception. We now wrap SQLAlchemy connection to track produced cursors and if we receive
BaseException, we try to cancel them before reraising.This was tested with Trino. See instructions for testing Trino in #63
Unfortunately, for BigQuery I had to keep old runtime patch because at the moment of exception, there is no way to get reference to job (to cancel it) from cursor object, and cursor doesn't provide any method like
.cancel()Summary by CodeRabbit
New Features
Bug Fixes
Tests