From fb7b9601329cf63743f235ef3a3bcc0b10fcc320 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sat, 29 Mar 2025 13:08:37 +1100 Subject: [PATCH 01/28] feat: enhance kernel message handling with memory streams to run execute_request in separate task --- ipykernel/kernelbase.py | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e260f2d27..da4d8cba3 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -37,7 +37,13 @@ import psutil import zmq import zmq_anyio -from anyio import TASK_STATUS_IGNORED, create_task_group, sleep, to_thread +from anyio import ( + TASK_STATUS_IGNORED, + create_memory_object_stream, + create_task_group, + sleep, + to_thread, +) from anyio.abc import TaskStatus from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session @@ -418,27 +424,39 @@ async def shell_main(self, subshell_id: str | None): assert subshell_id is None assert threading.current_thread() == threading.main_thread() socket = None - + send_stream, receive_stream = create_memory_object_stream() async with create_task_group() as tg: if not socket.started.is_set(): await tg.start(socket.start) - tg.start_soon(self.process_shell, socket) + tg.start_soon(self.process_shell, socket, send_stream) + tg.start_soon(self._execute_request_handler, receive_stream) if subshell_id is None: # Main subshell. await to_thread.run_sync(self.shell_stop.wait) tg.cancel_scope.cancel() - async def process_shell(self, socket=None): + async def _execute_request_handler(self, receive_stream): + async with receive_stream: + async for handler, (socket, idents, msg) in receive_stream: + try: + result = handler(socket, idents, msg) + self.set_parent(idents, msg, channel="shell") + if inspect.isawaitable(result): + await result + except Exception as e: + self.log.exception("Execute request", exc_info=e) + + async def process_shell(self, socket, send_stream): # socket=None is valid if kernel subshells are not supported. try: while True: - await self.process_shell_message(socket=socket) + await self.process_shell_message(socket=socket, send_stream=send_stream) except BaseException: if self.shell_stop.is_set(): return raise - async def process_shell_message(self, msg=None, socket=None): + async def process_shell_message(self, msg=None, socket=None, send_stream=None): # If socket is None kernel subshells are not supported so use socket=shell_socket. # If msg is set, process that message. # If msg is None, await the next message to arrive on the socket. @@ -507,9 +525,12 @@ async def process_shell_message(self, msg=None, socket=None): except Exception: self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) try: - result = handler(socket, idents, msg) - if inspect.isawaitable(result): - await result + if msg_type == "execute_request" and send_stream: + await send_stream.send((handler, (socket, idents, msg))) + else: + result = handler(socket, idents, msg) + if inspect.isawaitable(result): + await result except Exception: self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 except KeyboardInterrupt: From 17ebe6ba5082ca03cd2864d19dd13365be73b8d2 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sun, 30 Mar 2025 10:49:38 +1100 Subject: [PATCH 02/28] Allow infinite buffer size for memory object streams in kernel --- ipykernel/kernelbase.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index da4d8cba3..5d8337120 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -7,6 +7,7 @@ import inspect import itertools import logging +import math import os import queue import sys @@ -424,7 +425,7 @@ async def shell_main(self, subshell_id: str | None): assert subshell_id is None assert threading.current_thread() == threading.main_thread() socket = None - send_stream, receive_stream = create_memory_object_stream() + send_stream, receive_stream = create_memory_object_stream(max_buffer_size=math.inf) async with create_task_group() as tg: if not socket.started.is_set(): await tg.start(socket.start) From 5f71eaa5a9be9001656fec9a745c16d3dc0d3f17 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sun, 30 Mar 2025 11:25:55 +1100 Subject: [PATCH 03/28] fix: handle kernel abort requests gracefully in message processing --- ipykernel/kernelbase.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 5d8337120..9f0bf6d3a 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -440,11 +440,14 @@ async def _execute_request_handler(self, receive_stream): async with receive_stream: async for handler, (socket, idents, msg) in receive_stream: try: + if self._aborting: + await self._send_abort_reply(socket, msg, idents) + continue result = handler(socket, idents, msg) self.set_parent(idents, msg, channel="shell") if inspect.isawaitable(result): await result - except Exception as e: + except BaseException as e: self.log.exception("Execute request", exc_info=e) async def process_shell(self, socket, send_stream): From 62300edbabec006c40c2cae00325ddde8ac48f1b Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sun, 30 Mar 2025 12:25:31 +1100 Subject: [PATCH 04/28] Simplify abort handling and make send_stream a required parameter for processing shell messages. --- ipykernel/kernelbase.py | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 9f0bf6d3a..fc500ed6b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -46,6 +46,7 @@ to_thread, ) from anyio.abc import TaskStatus +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session from traitlets.config.configurable import SingletonConfigurable @@ -92,7 +93,7 @@ def _accepts_parameters(meth, param_names): class Kernel(SingletonConfigurable): """The base kernel class.""" - _aborted_time: float + _aborted_time: float = time.monotonic() # --------------------------------------------------------------------------- # Kernel interface @@ -436,11 +437,11 @@ async def shell_main(self, subshell_id: str | None): await to_thread.run_sync(self.shell_stop.wait) tg.cancel_scope.cancel() - async def _execute_request_handler(self, receive_stream): + async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: - async for handler, (socket, idents, msg) in receive_stream: + async for handler, (received_time, socket, idents, msg) in receive_stream: try: - if self._aborting: + if received_time < self._aborted_time: await self._send_abort_reply(socket, msg, idents) continue result = handler(socket, idents, msg) @@ -450,7 +451,7 @@ async def _execute_request_handler(self, receive_stream): except BaseException as e: self.log.exception("Execute request", exc_info=e) - async def process_shell(self, socket, send_stream): + async def process_shell(self, socket, send_stream: MemoryObjectSendStream): # socket=None is valid if kernel subshells are not supported. try: while True: @@ -460,7 +461,9 @@ async def process_shell(self, socket, send_stream): return raise - async def process_shell_message(self, msg=None, socket=None, send_stream=None): + async def process_shell_message( + self, msg=None, socket=None, *, send_stream: MemoryObjectSendStream + ): # If socket is None kernel subshells are not supported so use socket=shell_socket. # If msg is set, process that message. # If msg is None, await the next message to arrive on the socket. @@ -476,10 +479,8 @@ async def process_shell_message(self, msg=None, socket=None, send_stream=None): assert socket is None socket = self.shell_socket - no_msg = msg is None if self._is_test else not await socket.apoll(0).wait() msg = msg or await socket.arecv_multipart(copy=False).wait() - received_time = time.monotonic() copy = not isinstance(msg[0], zmq.Message) idents, msg = self.session.feed_identities(msg, copy=copy) try: @@ -494,18 +495,6 @@ async def process_shell_message(self, msg=None, socket=None, send_stream=None): msg_type = msg["header"]["msg_type"] - # Only abort execute requests - if self._aborting and msg_type == "execute_request": - if not self.stop_on_error_timeout: - if no_msg: - self._aborting = False - elif received_time - self._aborted_time > self.stop_on_error_timeout: - self._aborting = False - if self._aborting: - await self._send_abort_reply(socket, msg, idents) - self._publish_status("idle", "shell") - return - # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each # handler prints its message at the end. @@ -529,8 +518,8 @@ async def process_shell_message(self, msg=None, socket=None, send_stream=None): except Exception: self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) try: - if msg_type == "execute_request" and send_stream: - await send_stream.send((handler, (socket, idents, msg))) + if msg_type == "execute_request": + await send_stream.send((handler, (time.monotonic(), socket, idents, msg))) else: result = handler(socket, idents, msg) if inspect.isawaitable(result): @@ -824,9 +813,7 @@ async def execute_request(self, socket, ident, parent): assert reply_msg is not None if not silent and reply_msg["content"]["status"] == "error" and stop_on_error: - # while this flag is true, - # execute requests will be aborted - self._aborting = True + # execute requests will be aborted if the received time is prior to the _aborted_time self._aborted_time = time.monotonic() self.log.info("Aborting queue") From 1828a88968a6ebfac65c434842e32411e3493bf3 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 10:09:06 +1100 Subject: [PATCH 05/28] Fix publish status for execute_request --- ipykernel/kernelbase.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index fc500ed6b..fc90f0039 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -446,8 +446,11 @@ async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStre continue result = handler(socket, idents, msg) self.set_parent(idents, msg, channel="shell") + self._publish_status("busy", "shell") if inspect.isawaitable(result): await result + self.set_parent(idents, msg, channel="shell") + self._publish_status("idle", "shell") except BaseException as e: self.log.exception("Execute request", exc_info=e) @@ -489,11 +492,11 @@ async def process_shell_message( self.log.error("Invalid Message", exc_info=True) # noqa: G201 return - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell") - msg_type = msg["header"]["msg_type"] + if msg_type != "execute_request": + # Set the parent message for side effects. + self.set_parent(idents, msg, channel="shell") + self._publish_status("busy", "shell") # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each @@ -534,12 +537,13 @@ async def process_shell_message( self.post_handler_hook() except Exception: self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) - - if sys.stdout is not None: - sys.stdout.flush() - if sys.stderr is not None: - sys.stderr.flush() - self._publish_status("idle", "shell") + if msg_type != "execute_request": + self.set_parent(idents, msg, channel="shell") + if sys.stdout is not None: + sys.stdout.flush() + if sys.stderr is not None: + sys.stderr.flush() + self._publish_status("idle", "shell") async def control_main(self): assert self.control_socket is not None From 211094b9d7902344be724e22ec78cd1c4b2778e2 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 14:42:27 +1100 Subject: [PATCH 06/28] Ensure one send_stream per socket/thread --- ipykernel/kernelbase.py | 47 ++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index fc90f0039..02990bf91 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -124,6 +124,9 @@ class Kernel(SingletonConfigurable): iopub_socket = Any() iopub_thread = Any() stdin_socket = Any() + + _send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict() + log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] # identities: @@ -426,16 +429,20 @@ async def shell_main(self, subshell_id: str | None): assert subshell_id is None assert threading.current_thread() == threading.main_thread() socket = None - send_stream, receive_stream = create_memory_object_stream(max_buffer_size=math.inf) - async with create_task_group() as tg: - if not socket.started.is_set(): - await tg.start(socket.start) - tg.start_soon(self.process_shell, socket, send_stream) - tg.start_soon(self._execute_request_handler, receive_stream) - if subshell_id is None: - # Main subshell. - await to_thread.run_sync(self.shell_stop.wait) - tg.cancel_scope.cancel() + socket = socket or self.shell_socket + if socket not in self._send_exec_request: + send_stream, receive_stream = create_memory_object_stream(max_buffer_size=math.inf) + self._send_exec_request[socket] = send_stream + async with create_task_group() as tg: + if not socket.started.is_set(): + await tg.start(socket.start) + tg.start_soon(self.process_shell, socket) + tg.start_soon(self._execute_request_handler, receive_stream) + if subshell_id is None: + # Main subshell. + await to_thread.run_sync(self.shell_stop.wait) + tg.cancel_scope.cancel() + self._send_exec_request.pop(socket, None) async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: @@ -444,9 +451,9 @@ async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStre if received_time < self._aborted_time: await self._send_abort_reply(socket, msg, idents) continue - result = handler(socket, idents, msg) self.set_parent(idents, msg, channel="shell") self._publish_status("busy", "shell") + result = handler(socket, idents, msg) if inspect.isawaitable(result): await result self.set_parent(idents, msg, channel="shell") @@ -454,19 +461,17 @@ async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStre except BaseException as e: self.log.exception("Execute request", exc_info=e) - async def process_shell(self, socket, send_stream: MemoryObjectSendStream): + async def process_shell(self, socket): # socket=None is valid if kernel subshells are not supported. try: while True: - await self.process_shell_message(socket=socket, send_stream=send_stream) + await self.process_shell_message(socket=socket) except BaseException: if self.shell_stop.is_set(): return raise - async def process_shell_message( - self, msg=None, socket=None, *, send_stream: MemoryObjectSendStream - ): + async def process_shell_message(self, msg=None, socket=None): # If socket is None kernel subshells are not supported so use socket=shell_socket. # If msg is set, process that message. # If msg is None, await the next message to arrive on the socket. @@ -479,7 +484,6 @@ async def process_shell_message( assert socket is not None else: assert threading.current_thread() == threading.main_thread() - assert socket is None socket = self.shell_socket msg = msg or await socket.arecv_multipart(copy=False).wait() @@ -522,6 +526,7 @@ async def process_shell_message( self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) try: if msg_type == "execute_request": + send_stream = self._send_exec_request[socket] await send_stream.send((handler, (time.monotonic(), socket, idents, msg))) else: result = handler(socket, idents, msg) @@ -577,9 +582,8 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: self.shell_is_blocking = False self.shell_stop = threading.Event() + tg.start_soon(self.shell_main, None) if self.shell_channel_thread: - tg.start_soon(self.shell_main, None) - # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager self.shell_channel_thread.start_soon(self.shell_channel_thread_main) @@ -588,9 +592,6 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: ) self.shell_channel_thread.start_soon(manager.listen_from_subshells) self.shell_channel_thread.start() - else: - if not self._is_test and self.shell_socket is not None: - tg.start_soon(self.shell_main, None) def stop(self): if not self._eventloop_set.is_set(): @@ -1219,8 +1220,6 @@ def _topic(self, topic): """prefixed topic for IOPub messages""" return (f"kernel.{self.ident}.{topic}").encode() - _aborting = Bool(False) - async def _send_abort_reply(self, socket, msg, idents): """Send a reply to an aborted request""" if not self.session: From f13959b163dead6e873b5a902cc27add3b6daa7b Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 14:45:06 +1100 Subject: [PATCH 07/28] Disable timeouts for debugging tests with debugpy --- pyproject.toml | 1 - tests/conftest.py | 7 +++++++ tests/utils.py | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1853544fa..3a582d161 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -210,7 +210,6 @@ testpaths = [ # "tests/inprocess" ] norecursedirs = "tests/inprocess" -timeout = 60 # Restore this setting to debug failures # timeout_method = "thread" filterwarnings= [ diff --git a/tests/conftest.py b/tests/conftest.py index 32524e0ca..5161cbb16 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import gc import logging +import os +import sys import warnings from math import inf from threading import Event @@ -37,6 +39,11 @@ def _garbage_collection(request): pytestmark = pytest.mark.anyio +@pytest.hookimpl +def pytest_configure(config): + os.environ["PYTEST_TIMEOUT"] = str(1e6) if "debugpy" in sys.modules else str(60) + + # Handle resource limit # Ensure a minimal soft limit of DEFAULT_SOFT if the current hard limit is at least that much. if resource is not None: diff --git a/tests/utils.py b/tests/utils.py index a3a6de298..b6648e48d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,8 +16,8 @@ from jupyter_client import manager from jupyter_client.blocking.client import BlockingKernelClient -STARTUP_TIMEOUT = 60 -TIMEOUT = 100 +STARTUP_TIMEOUT = 10 if "debugpy" not in sys.modules else 1e6 +TIMEOUT = 10 if "debugpy" not in sys.modules else 1e6 KM: manager.KernelManager = None # type:ignore KC: BlockingKernelClient = None # type:ignore From bee82d19716faea864fce5616af0559a2baccc5f Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 15:38:33 +1100 Subject: [PATCH 08/28] Minor tweaks to tests to make them work with kernel changes. --- tests/test_eventloop.py | 7 +++++-- tests/test_ipkernel_direct.py | 3 ++- tests/test_kernel_direct.py | 3 ++- tests/test_message_spec.py | 19 ++++++------------- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/tests/test_eventloop.py b/tests/test_eventloop.py index e4aef5711..42ccae1ed 100644 --- a/tests/test_eventloop.py +++ b/tests/test_eventloop.py @@ -80,13 +80,16 @@ def do_thing(): @windows_skip @pytest.mark.parametrize("anyio_backend", ["asyncio"]) -def test_asyncio_loop(kernel): +async def test_asyncio_loop(kernel): + okay = asyncio.Event() + def do_thing(): - loop.call_later(0.01, loop.stop) + okay.set() loop = asyncio.get_event_loop() loop.call_soon(do_thing) loop_asyncio(kernel) + await asyncio.wait_for(okay.wait(), 1) @windows_skip diff --git a/tests/test_ipkernel_direct.py b/tests/test_ipkernel_direct.py index 98ebd2015..2704447a0 100644 --- a/tests/test_ipkernel_direct.py +++ b/tests/test_ipkernel_direct.py @@ -1,6 +1,7 @@ """Test IPythonKernel directly""" import os +import time import pytest from IPython.core.history import DummyDB @@ -45,7 +46,7 @@ async def test_direct_execute_request(ipkernel: MockIPyKernel) -> None: async def test_direct_execute_request_aborting(ipkernel): - ipkernel._aborting = True + ipkernel._aborted_time = time.monotonic() + 10 # Set in the future reply = await ipkernel.test_shell_message("execute_request", dict(code="hello", silent=False)) assert reply["header"]["msg_type"] == "execute_reply" assert reply["content"]["status"] == "aborted" diff --git a/tests/test_kernel_direct.py b/tests/test_kernel_direct.py index 6ff1c1d9c..5368c84e2 100644 --- a/tests/test_kernel_direct.py +++ b/tests/test_kernel_direct.py @@ -4,6 +4,7 @@ # Distributed under the terms of the Modified BSD License. import os +import time import pytest @@ -26,7 +27,7 @@ async def test_direct_execute_request(kernel): async def test_direct_execute_request_aborting(kernel): - kernel._aborting = True + kernel._aborted_time = time.monotonic() + 10 reply = await kernel.test_shell_message("execute_request", dict(code="hello", silent=False)) assert reply["header"]["msg_type"] == "execute_reply" assert reply["content"]["status"] == "aborted" diff --git a/tests/test_message_spec.py b/tests/test_message_spec.py index 9dd58d2f2..2926850c4 100644 --- a/tests/test_message_spec.py +++ b/tests/test_message_spec.py @@ -402,20 +402,12 @@ def test_non_execute_stop_on_error(): """test that non-execute_request's are not aborted after an error""" flush_channels() - fail = "\n".join( - [ - # sleep to ensure subsequent message is waiting in the queue to be aborted - "import time", - "time.sleep(0.5)", - "raise ValueError", - ] - ) - KC.execute(code=fail) + KC.execute(code="raise ValueError") + reply = KC.get_shell_msg(timeout=TIMEOUT) # execute + assert reply["content"]["status"] == "error" KC.kernel_info() KC.comm_info() KC.inspect(code="print") - reply = KC.get_shell_msg(timeout=TIMEOUT) # execute - assert reply["content"]["status"] == "error" reply = KC.get_shell_msg(timeout=TIMEOUT) # kernel_info assert reply["content"]["status"] == "ok" reply = KC.get_shell_msg(timeout=TIMEOUT) # comm_info @@ -427,8 +419,9 @@ def test_non_execute_stop_on_error(): def test_user_expressions(): flush_channels() - msg_id, reply = execute(code="x=1", user_expressions=dict(foo="x+1")) - user_expressions = reply["user_expressions"] + msg_id = KC.execute(code="x=1", user_expressions=dict(foo="x+1")) + reply = get_reply(KC, msg_id, TIMEOUT) # execute + user_expressions = reply["content"]["user_expressions"] assert user_expressions == { "foo": { "status": "ok", From aaa29fe2d9ca639cc5cc55d01d4b8ef206bf8064 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 17:32:15 +1100 Subject: [PATCH 09/28] Add anyio Event for _main_shell_ready to aid with start and test reliability. --- ipykernel/kernelapp.py | 8 +------- ipykernel/kernelbase.py | 18 +++++++++++------- tests/conftest.py | 2 ++ tests/test_ipkernel_direct.py | 11 ++++++++--- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 8078f97ce..5caa2a71a 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -20,7 +20,7 @@ import zmq import zmq_anyio -from anyio import create_task_group, run, to_thread +from anyio import create_task_group, run from IPython.core.application import ( # type:ignore[attr-defined] BaseIPythonApplication, base_aliases, @@ -764,15 +764,9 @@ def start(self) -> None: backend = "trio" if self.trio_loop else "asyncio" run(partial(self._start, backend), backend=backend) - async def _wait_to_enter_eventloop(self) -> None: - await to_thread.run_sync(self.kernel._eventloop_set.wait) - await self.kernel.enter_eventloop() - async def main(self) -> None: async with create_task_group() as tg: - tg.start_soon(self._wait_to_enter_eventloop) tg.start_soon(self.kernel.start) - if self.kernel.eventloop: self.kernel._eventloop_set.set() diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 02990bf91..b4bcea02c 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -40,6 +40,7 @@ import zmq_anyio from anyio import ( TASK_STATUS_IGNORED, + Event, create_memory_object_stream, create_task_group, sleep, @@ -126,6 +127,7 @@ class Kernel(SingletonConfigurable): stdin_socket = Any() _send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict() + _main_shell_ready = Instance(Event, ()) log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] @@ -436,13 +438,15 @@ async def shell_main(self, subshell_id: str | None): async with create_task_group() as tg: if not socket.started.is_set(): await tg.start(socket.start) - tg.start_soon(self.process_shell, socket) + tg.start_soon(self._process_shell, socket) tg.start_soon(self._execute_request_handler, receive_stream) if subshell_id is None: # Main subshell. + self._main_shell_ready.set() await to_thread.run_sync(self.shell_stop.wait) tg.cancel_scope.cancel() self._send_exec_request.pop(socket, None) + await send_stream.aclose() async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: @@ -461,8 +465,9 @@ async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStre except BaseException as e: self.log.exception("Execute request", exc_info=e) - async def process_shell(self, socket): + async def _process_shell(self, socket): # socket=None is valid if kernel subshells are not supported. + await self._main_shell_ready.wait() try: while True: await self.process_shell_message(socket=socket) @@ -476,15 +481,13 @@ async def process_shell_message(self, msg=None, socket=None): # If msg is set, process that message. # If msg is None, await the next message to arrive on the socket. assert self.session is not None + socket = socket or self.shell_socket if self._supports_kernel_subshells: assert threading.current_thread() not in ( self.control_thread, self.shell_channel_thread, ) assert socket is not None - else: - assert threading.current_thread() == threading.main_thread() - socket = self.shell_socket msg = msg or await socket.arecv_multipart(copy=False).wait() @@ -532,8 +535,8 @@ async def process_shell_message(self, msg=None, socket=None): result = handler(socket, idents, msg) if inspect.isawaitable(result): await result - except Exception: - self.log.error("Exception in message handler:", exc_info=True) # noqa: G201 + except Exception as e: + self.log.error("Exception in message handler:", exc_info=e) except KeyboardInterrupt: # Ctrl-c shouldn't crash the kernel here. self.log.error("KeyboardInterrupt caught in kernel.") @@ -583,6 +586,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: self.shell_stop = threading.Event() tg.start_soon(self.shell_main, None) + await self._main_shell_ready.wait() if self.shell_channel_thread: # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager diff --git a/tests/conftest.py b/tests/conftest.py index 5161cbb16..c1552ca47 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -207,6 +207,7 @@ async def kernel(anyio_backend): async with create_task_group() as tg: kernel = MockKernel() tg.start_soon(kernel.start) + await kernel._main_shell_ready.wait() try: yield kernel finally: @@ -218,6 +219,7 @@ async def ipkernel(anyio_backend): async with create_task_group() as tg: kernel = MockIPyKernel() tg.start_soon(kernel.start) + await kernel._main_shell_ready.wait() try: yield kernel finally: diff --git a/tests/test_ipkernel_direct.py b/tests/test_ipkernel_direct.py index 2704447a0..fe7321a66 100644 --- a/tests/test_ipkernel_direct.py +++ b/tests/test_ipkernel_direct.py @@ -34,14 +34,19 @@ async def test_direct_kernel_info_request(ipkernel): async def test_direct_execute_request(ipkernel: MockIPyKernel) -> None: - reply = await ipkernel.test_shell_message("execute_request", dict(code="hello", silent=False)) + reply = await ipkernel.test_shell_message( + "execute_request", dict(code="invalid_call()", silent=False) + ) assert reply["header"]["msg_type"] == "execute_reply" + ipkernel._aborted_time += 10 reply = await ipkernel.test_shell_message( "execute_request", dict(code="trigger_error", silent=False) ) assert reply["content"]["status"] == "aborted" - - reply = await ipkernel.test_shell_message("execute_request", dict(code="hello", silent=False)) + ipkernel._aborted_time = time.monotonic() + reply = await ipkernel.test_shell_message( + "execute_request", dict(code="okay=True", silent=False) + ) assert reply["header"]["msg_type"] == "execute_reply" From 5bc198554a4060eafa4243ed85b6400e8b95aa5f Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 20:28:16 +1100 Subject: [PATCH 10/28] Update timing assertions in concurrent test to improve reliability --- tests/test_subshells.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/test_subshells.py b/tests/test_subshells.py index 889df1b2e..faa018696 100644 --- a/tests/test_subshells.py +++ b/tests/test_subshells.py @@ -6,7 +6,6 @@ import platform import time -from datetime import datetime, timedelta import pytest from jupyter_client.blocking.client import BlockingKernelClient @@ -186,7 +185,9 @@ def test_run_concurrently_timing(include_main_shell): kc, "import threading as t, time; b=t.Barrier(2); print('ok')", None ) - times = (0.2, 0.2) + times = (0.4,) * 2 + min_duration = times[0] + max_duration = sum(times) # Prepare messages, times are sleep times in seconds. # Identical times for both subshells is a harder test as preparing and sending # the execute_reply messages may overlap. @@ -198,24 +199,27 @@ def test_run_concurrently_timing(include_main_shell): msgs.append(msg) # Send messages - start = datetime.now() + start = time.monotonic() for msg in msgs: kc.shell_channel.send(msg) - _ = get_replies(kc, [msg["msg_id"] for msg in msgs]) - end = datetime.now() + replies = get_replies(kc, [msg["msg_id"] for msg in msgs]) + end = time.monotonic() + for reply in replies: + assert reply["content"]["status"] == "ok" for subshell_id in subshell_ids: if subshell_id: delete_subshell_helper(kc, subshell_id) duration = end - start - assert duration >= timedelta(seconds=max(times)) + assert duration >= min_duration # Care is needed with this test as runtime conditions such as gathering # coverage can slow it down causing the following assert to fail. # The sleep time of 0.2 is empirically determined to run OK in CI, but # consider increasing it if the following fails. - assert duration < timedelta(seconds=sum(times)) + + assert duration < max_duration @pytest.mark.xfail(strict=False, reason="subshell still sometime give different results") From 533ca757afbffcd1edb6acd9eb76edfe02a51703 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Mon, 31 Mar 2025 21:09:05 +1100 Subject: [PATCH 11/28] Restore test timeouts to original values --- tests/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index b6648e48d..93cfb5bf6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,8 +16,8 @@ from jupyter_client import manager from jupyter_client.blocking.client import BlockingKernelClient -STARTUP_TIMEOUT = 10 if "debugpy" not in sys.modules else 1e6 -TIMEOUT = 10 if "debugpy" not in sys.modules else 1e6 +STARTUP_TIMEOUT = 60 if "debugpy" not in sys.modules else 1e6 +TIMEOUT = 100 if "debugpy" not in sys.modules else 1e6 KM: manager.KernelManager = None # type:ignore KC: BlockingKernelClient = None # type:ignore From f0e5116c80f3d1c4541fe45bf5a81acd5bb77da3 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Tue, 1 Apr 2025 10:56:18 +1100 Subject: [PATCH 12/28] Add small delay in process_control_message to aid with test_sequential_control_messages --- ipykernel/kernelbase.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index b4bcea02c..572499513 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -317,6 +317,9 @@ async def process_control_message(self, msg=None): result = handler(self.control_socket, idents, msg) if inspect.isawaitable(result): await result + else: + # If the handler is not awaitable, ensure it completes before proceeding + time.sleep(0.00001) # Small delay to ensure sequential processing except Exception: self.log.error("Exception in control handler:", exc_info=True) # noqa: G201 From 8b4b0863b28e5ca590b0d1b001a3ae292b14cd8c Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Tue, 1 Apr 2025 11:20:05 +1100 Subject: [PATCH 13/28] Remove unused _eventloop_set --- ipykernel/eventloops.py | 10 ---------- ipykernel/kernelapp.py | 2 -- ipykernel/kernelbase.py | 8 -------- 3 files changed, 20 deletions(-) diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index baca4dcdb..9e42b40b8 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -602,13 +602,3 @@ def enable_gui(gui, kernel=None): msg = "Cannot activate multiple GUI eventloops" # type:ignore[unreachable] raise RuntimeError(msg) kernel.eventloop = loop - # We set `eventloop`; the function the user chose is executed in `Kernel.enter_eventloop`, thus - # any exceptions raised during the event loop will not be shown in the client. - - # If running in async loop then set anyio event to trigger starting the eventloop. - # If not running in async loop do nothing as this will be handled in IPKernelApp.main(). - try: - kernel._eventloop_set.set() - except RuntimeError: - # Expecting sniffio.AsyncLibraryNotFoundError but don't want to import sniffio just for that - pass diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 5caa2a71a..b4674de0e 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -767,8 +767,6 @@ def start(self) -> None: async def main(self) -> None: async with create_task_group() as tg: tg.start_soon(self.kernel.start) - if self.kernel.eventloop: - self.kernel._eventloop_set.set() def stop(self) -> None: """Stop the kernel, thread-safe.""" diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 572499513..4eec54f16 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -249,14 +249,10 @@ def _parent_header(self): "list_subshell_request", ] - _eventloop_set: threading.Event - def __init__(self, **kwargs): """Initialize the kernel.""" super().__init__(**kwargs) - self._eventloop_set = threading.Event() - # Kernel application may swap stdout and stderr to OutStream, # which is the case in `IPKernelApp.init_io`, hence `sys.stdout` # can already by different from TextIO at initialization time. @@ -601,10 +597,6 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: self.shell_channel_thread.start() def stop(self): - if not self._eventloop_set.is_set(): - # Stop the async task that is waiting for the eventloop to be set. - self._eventloop_set.set() - self.shell_stop.set() self.control_stop.set() From fa09c5fe33f334dcb626225a35a1baa35b62fca7 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Tue, 1 Apr 2025 15:14:37 +1100 Subject: [PATCH 14/28] Rename _main_shell_ready to _main_subshell_ready --- ipykernel/kernelbase.py | 8 ++++---- tests/conftest.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 4eec54f16..dd0a85c8b 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -127,7 +127,7 @@ class Kernel(SingletonConfigurable): stdin_socket = Any() _send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict() - _main_shell_ready = Instance(Event, ()) + _main_subshell_ready = Instance(Event, ()) log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] @@ -441,7 +441,7 @@ async def shell_main(self, subshell_id: str | None): tg.start_soon(self._execute_request_handler, receive_stream) if subshell_id is None: # Main subshell. - self._main_shell_ready.set() + self._main_subshell_ready.set() await to_thread.run_sync(self.shell_stop.wait) tg.cancel_scope.cancel() self._send_exec_request.pop(socket, None) @@ -466,7 +466,7 @@ async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStre async def _process_shell(self, socket): # socket=None is valid if kernel subshells are not supported. - await self._main_shell_ready.wait() + await self._main_subshell_ready.wait() try: while True: await self.process_shell_message(socket=socket) @@ -585,7 +585,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: self.shell_stop = threading.Event() tg.start_soon(self.shell_main, None) - await self._main_shell_ready.wait() + await self._main_subshell_ready.wait() if self.shell_channel_thread: # Assign tasks to and start shell channel thread. manager = self.shell_channel_thread.manager diff --git a/tests/conftest.py b/tests/conftest.py index c1552ca47..59fda8643 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -207,7 +207,7 @@ async def kernel(anyio_backend): async with create_task_group() as tg: kernel = MockKernel() tg.start_soon(kernel.start) - await kernel._main_shell_ready.wait() + await kernel._main_subshell_ready.wait() try: yield kernel finally: @@ -219,7 +219,7 @@ async def ipkernel(anyio_backend): async with create_task_group() as tg: kernel = MockIPyKernel() tg.start_soon(kernel.start) - await kernel._main_shell_ready.wait() + await kernel._main_subshell_ready.wait() try: yield kernel finally: From 6a68f02155cb8adcf73f361434422b818dbb1ace Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Tue, 1 Apr 2025 15:16:25 +1100 Subject: [PATCH 15/28] Try to make test_run_concurrently_timing more reliable --- tests/test_subshells.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_subshells.py b/tests/test_subshells.py index faa018696..2a0df12e0 100644 --- a/tests/test_subshells.py +++ b/tests/test_subshells.py @@ -186,11 +186,12 @@ def test_run_concurrently_timing(include_main_shell): ) times = (0.4,) * 2 - min_duration = times[0] + min_duration = max(times) max_duration = sum(times) # Prepare messages, times are sleep times in seconds. # Identical times for both subshells is a harder test as preparing and sending # the execute_reply messages may overlap. + time.sleep(1) # sleep to let kernel achieve idle state msgs = [] for id, sleep in zip(subshell_ids, times): code = f"b.wait(); time.sleep({sleep})" From 104ef76725b0b96c3a49ca641e155b91096ce47d Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Tue, 1 Apr 2025 15:19:15 +1100 Subject: [PATCH 16/28] Improve test_tk_loop to run localy. --- tests/test_eventloop.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_eventloop.py b/tests/test_eventloop.py index 42ccae1ed..ed7957e12 100644 --- a/tests/test_eventloop.py +++ b/tests/test_eventloop.py @@ -64,18 +64,20 @@ def do_thing(): time.sleep(1) try: kernel.app_wrapper.app.quit() + kernel.app_wrapper.app.destroy() # guard for tk failing to start (if there is no display) except AttributeError: pass t = threading.Thread(target=do_thing) + t.daemon = True t.start() # guard for tk failing to start (if there is no display) try: loop_tk(kernel) except Exception: pass - t.join() + t.join(1) @windows_skip From 7b5d3e60357d381f300e80fcfe54e49b6ccfb002 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Tue, 1 Apr 2025 15:40:22 +1100 Subject: [PATCH 17/28] Add asyncio_event_loop (currently only set for asyncio backend) to the kernel and drop old integration code. --- ipykernel/eventloops.py | 61 +++-------------------------------------- ipykernel/kernelbase.py | 8 +++++- tests/test_eventloop.py | 19 ++++--------- 3 files changed, 17 insertions(+), 71 deletions(-) diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index 9e42b40b8..e0dc2a2f2 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -6,7 +6,6 @@ import os import platform import sys -from functools import partial import zmq from packaging.version import Version as V @@ -400,63 +399,11 @@ def loop_cocoa_exit(kernel): @register_integration("asyncio") def loop_asyncio(kernel): - """Start a kernel with asyncio event loop support.""" - import asyncio + """Verify the asyncio event loop is supported.""" - loop = asyncio.get_event_loop() - # loop is already running (e.g. tornado 5), nothing left to do - if loop.is_running(): - return - - if loop.is_closed(): - # main loop is closed, create a new one - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop._should_close = False # type:ignore[attr-defined] - - # pause eventloop when there's an event on a zmq socket - def process_stream_events(socket): - """fall back to main loop when there's a socket event""" - loop.stop() - - notifier = partial(process_stream_events, kernel.shell_socket) - loop.add_reader(kernel.shell_socket.getsockopt(zmq.FD), notifier) - loop.call_soon(notifier) - - while True: - error = None - try: - loop.run_forever() - except KeyboardInterrupt: - continue - except Exception as e: - error = e - if loop._should_close: # type:ignore[attr-defined] - loop.close() - if error is not None: - raise error - break - - -@loop_asyncio.exit -def loop_asyncio_exit(kernel): - """Exit hook for asyncio""" - import asyncio - - loop = asyncio.get_event_loop() - - async def close_loop(): - if hasattr(loop, "shutdown_asyncgens"): - yield loop.shutdown_asyncgens() - loop._should_close = True # type:ignore[attr-defined] - loop.stop() - - if loop.is_running(): - close_loop() - - elif not loop.is_closed(): - loop.run_until_complete(close_loop) # type:ignore[arg-type] - loop.close() + if not kernel.asyncio_event_loop or not kernel.asyncio_event_loop.is_running(): + msg = "The asyncio event loop is not running so is not supported." + raise RuntimeError(msg) def set_qt_api_env_from_gui(gui): diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index dd0a85c8b..9c53dde79 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -4,6 +4,8 @@ # Distributed under the terms of the Modified BSD License. from __future__ import annotations +import asyncio +import contextlib import inspect import itertools import logging @@ -128,6 +130,7 @@ class Kernel(SingletonConfigurable): _send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict() _main_subshell_ready = Instance(Event, ()) + asyncio_event_loop = Instance(asyncio.AbstractEventLoop, allow_none=True, read_only=True) # type:ignore[call-overload] log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] @@ -441,10 +444,13 @@ async def shell_main(self, subshell_id: str | None): tg.start_soon(self._execute_request_handler, receive_stream) if subshell_id is None: # Main subshell. + with contextlib.suppress(RuntimeError): + self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) self._main_subshell_ready.set() await to_thread.run_sync(self.shell_stop.wait) tg.cancel_scope.cancel() self._send_exec_request.pop(socket, None) + self.set_trait("asyncio_event_loop", None) await send_stream.aclose() async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream): @@ -821,7 +827,7 @@ async def execute_request(self, socket, ident, parent): self._aborted_time = time.monotonic() self.log.info("Aborting queue") - def do_execute( + async def do_execute( self, code, silent, diff --git a/tests/test_eventloop.py b/tests/test_eventloop.py index ed7957e12..7d04d7a56 100644 --- a/tests/test_eventloop.py +++ b/tests/test_eventloop.py @@ -1,6 +1,5 @@ """Test eventloop integration""" -import asyncio import os import sys import threading @@ -80,18 +79,12 @@ def do_thing(): t.join(1) -@windows_skip -@pytest.mark.parametrize("anyio_backend", ["asyncio"]) -async def test_asyncio_loop(kernel): - okay = asyncio.Event() - - def do_thing(): - okay.set() - - loop = asyncio.get_event_loop() - loop.call_soon(do_thing) - loop_asyncio(kernel) - await asyncio.wait_for(okay.wait(), 1) +async def test_asyncio_loop(kernel, anyio_backend): + if anyio_backend == "asyncio": + loop_asyncio(kernel) + else: + with pytest.raises(RuntimeError): + loop_asyncio(kernel) @windows_skip From ee8b7685393ae23339c14987652c19b56976f6a9 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Wed, 2 Apr 2025 09:20:30 +1100 Subject: [PATCH 18/28] Fix typos in CHANGELOG.md --- CHANGELOG.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fbfbcd46..a41e3bc99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,15 +29,15 @@ - Fix expected text depending on IPython version. [#1354](https://github.com/ipython/ipykernel/pull/1354) ([@Carreau](https://github.com/Carreau)) - Another try at tracking down ResourceWarning with tracemalloc. [#1353](https://github.com/ipython/ipykernel/pull/1353) ([@Carreau](https://github.com/Carreau)) - Remove deprecated modules since 4.3 (2016). [#1352](https://github.com/ipython/ipykernel/pull/1352) ([@Carreau](https://github.com/Carreau)) -- Try to reenable tests from downstream ipywidgets [#1350](https://github.com/ipython/ipykernel/pull/1350) ([@Carreau](https://github.com/Carreau)) +- Try to re-enable tests from downstream ipywidgets [#1350](https://github.com/ipython/ipykernel/pull/1350) ([@Carreau](https://github.com/Carreau)) - Disable 3 failing downstream tests, but keep testing the rest. [#1349](https://github.com/ipython/ipykernel/pull/1349) ([@Carreau](https://github.com/Carreau)) -- Licence :: * trove classifers are deprecated [#1348](https://github.com/ipython/ipykernel/pull/1348) ([@Carreau](https://github.com/Carreau)) +- Licence :: * trove classifiers are deprecated [#1348](https://github.com/ipython/ipykernel/pull/1348) ([@Carreau](https://github.com/Carreau)) - Pin sphinx to resolve docs build failures [#1347](https://github.com/ipython/ipykernel/pull/1347) ([@krassowski](https://github.com/krassowski)) - Make our own mock kernel methods async [#1346](https://github.com/ipython/ipykernel/pull/1346) ([@Carreau](https://github.com/Carreau)) - Try to debug non-closed iopub socket [#1345](https://github.com/ipython/ipykernel/pull/1345) ([@Carreau](https://github.com/Carreau)) - Email is @python.org since 2018 [#1343](https://github.com/ipython/ipykernel/pull/1343) ([@Carreau](https://github.com/Carreau)) - Remove unused ignores lints. [#1342](https://github.com/ipython/ipykernel/pull/1342) ([@Carreau](https://github.com/Carreau)) -- Enable ruff G002 and fix 6 occurences [#1341](https://github.com/ipython/ipykernel/pull/1341) ([@Carreau](https://github.com/Carreau)) +- Enable ruff G002 and fix 6 occurrences [#1341](https://github.com/ipython/ipykernel/pull/1341) ([@Carreau](https://github.com/Carreau)) - Check ignores warnings are still relevant. [#1340](https://github.com/ipython/ipykernel/pull/1340) ([@Carreau](https://github.com/Carreau)) - Move mypy disablinging error codes on a per-file basis [#1338](https://github.com/ipython/ipykernel/pull/1338) ([@Carreau](https://github.com/Carreau)) - try to fix spyder kernel install [#1337](https://github.com/ipython/ipykernel/pull/1337) ([@Carreau](https://github.com/Carreau)) @@ -46,7 +46,7 @@ - Bump mypy [#1333](https://github.com/ipython/ipykernel/pull/1333) ([@Carreau](https://github.com/Carreau)) - Remove dead code. [#1332](https://github.com/ipython/ipykernel/pull/1332) ([@Carreau](https://github.com/Carreau)) - Ignore or fix most of the remaining ruff 0.9.6 errors [#1331](https://github.com/ipython/ipykernel/pull/1331) ([@Carreau](https://github.com/Carreau)) -- minor code reformating valid ruff 0.9.6 [#1330](https://github.com/ipython/ipykernel/pull/1330) ([@Carreau](https://github.com/Carreau)) +- minor code reformatting valid ruff 0.9.6 [#1330](https://github.com/ipython/ipykernel/pull/1330) ([@Carreau](https://github.com/Carreau)) - Some formatting changes to prepare bumping ruff pre-commit. [#1329](https://github.com/ipython/ipykernel/pull/1329) ([@Carreau](https://github.com/Carreau)) - Manually update Codespell and fix new errors. [#1328](https://github.com/ipython/ipykernel/pull/1328) ([@Carreau](https://github.com/Carreau)) - Manually update mdformat pre-commit and run it. [#1327](https://github.com/ipython/ipykernel/pull/1327) ([@Carreau](https://github.com/Carreau)) From 555f4b2d61bfdaa7a4b71d0f82673e979f45ed5f Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Wed, 2 Apr 2025 12:40:45 +1100 Subject: [PATCH 19/28] Close receive_stream after task group has exited. --- ipykernel/kernelbase.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 9c53dde79..79516f962 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -452,6 +452,7 @@ async def shell_main(self, subshell_id: str | None): self._send_exec_request.pop(socket, None) self.set_trait("asyncio_event_loop", None) await send_stream.aclose() + await receive_stream.aclose() async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: From 2b3d6ec3d77ced5d96917790d0b606a709ee8053 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Wed, 2 Apr 2025 12:48:47 +1100 Subject: [PATCH 20/28] Pass parent directly to publish_status calls to avoid changing parent for non-execute requests. --- ipykernel/kernelbase.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 79516f962..d940dd00e 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -303,7 +303,7 @@ async def process_control_message(self, msg=None): # Set the parent message for side effects. self.set_parent(idents, msg, channel="control") - self._publish_status("busy", "control") + self._publish_status("busy", "control", parent=msg) header = msg["header"] msg_type = header["msg_type"] @@ -462,12 +462,12 @@ async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStre await self._send_abort_reply(socket, msg, idents) continue self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell") + self._publish_status("busy", "shell", parent=msg) result = handler(socket, idents, msg) if inspect.isawaitable(result): await result self.set_parent(idents, msg, channel="shell") - self._publish_status("idle", "shell") + self._publish_status("idle", "shell", parent=msg) except BaseException as e: self.log.exception("Execute request", exc_info=e) @@ -507,9 +507,7 @@ async def process_shell_message(self, msg=None, socket=None): msg_type = msg["header"]["msg_type"] if msg_type != "execute_request": - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell") + self._publish_status("busy", "shell", parent=msg) # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each @@ -552,12 +550,7 @@ async def process_shell_message(self, msg=None, socket=None): except Exception: self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) if msg_type != "execute_request": - self.set_parent(idents, msg, channel="shell") - if sys.stdout is not None: - sys.stdout.flush() - if sys.stderr is not None: - sys.stderr.flush() - self._publish_status("idle", "shell") + self._publish_status("idle", "shell", parent=msg) async def control_main(self): assert self.control_socket is not None From 4586e346e4c76b2ff0f2580e00b9db873dd6e6ef Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Wed, 2 Apr 2025 16:28:53 +1100 Subject: [PATCH 21/28] Pass subshell_id to _execute_request_handler --- ipykernel/kernelbase.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index d940dd00e..9ae8d75f6 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -441,12 +441,9 @@ async def shell_main(self, subshell_id: str | None): if not socket.started.is_set(): await tg.start(socket.start) tg.start_soon(self._process_shell, socket) - tg.start_soon(self._execute_request_handler, receive_stream) + tg.start_soon(self._execute_request_handler, receive_stream, subshell_id) if subshell_id is None: # Main subshell. - with contextlib.suppress(RuntimeError): - self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) - self._main_subshell_ready.set() await to_thread.run_sync(self.shell_stop.wait) tg.cancel_scope.cancel() self._send_exec_request.pop(socket, None) @@ -454,7 +451,13 @@ async def shell_main(self, subshell_id: str | None): await send_stream.aclose() await receive_stream.aclose() - async def _execute_request_handler(self, receive_stream: MemoryObjectReceiveStream): + async def _execute_request_handler( + self, receive_stream: MemoryObjectReceiveStream, subshell_id: str | None + ): + if subshell_id is None: + with contextlib.suppress(RuntimeError): + self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) + self._main_subshell_ready.set() async with receive_stream: async for handler, (received_time, socket, idents, msg) in receive_stream: try: From 62d46b455aa78fb565fdcf5c6a0fd4456c6d705f Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Wed, 2 Apr 2025 21:35:19 +1100 Subject: [PATCH 22/28] Provide a TaskGroup in the kernel with a shielded CancelScope --- ipykernel/eventloops.py | 6 +++--- ipykernel/kernelbase.py | 26 ++++++++++++++------------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index e0dc2a2f2..32c499b57 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -401,9 +401,9 @@ def loop_cocoa_exit(kernel): def loop_asyncio(kernel): """Verify the asyncio event loop is supported.""" - if not kernel.asyncio_event_loop or not kernel.asyncio_event_loop.is_running(): - msg = "The asyncio event loop is not running so is not supported." - raise RuntimeError(msg) + import asyncio + + asyncio.get_running_loop() def set_qt_api_env_from_gui(gui): diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 9ae8d75f6..605036d74 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -42,13 +42,14 @@ import zmq_anyio from anyio import ( TASK_STATUS_IGNORED, + CancelScope, Event, create_memory_object_stream, create_task_group, sleep, to_thread, ) -from anyio.abc import TaskStatus +from anyio.abc import TaskGroup, TaskStatus from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session @@ -131,6 +132,7 @@ class Kernel(SingletonConfigurable): _send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict() _main_subshell_ready = Instance(Event, ()) asyncio_event_loop = Instance(asyncio.AbstractEventLoop, allow_none=True, read_only=True) # type:ignore[call-overload] + tg = Instance(TaskGroup, read_only=True) log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] @@ -441,23 +443,23 @@ async def shell_main(self, subshell_id: str | None): if not socket.started.is_set(): await tg.start(socket.start) tg.start_soon(self._process_shell, socket) - tg.start_soon(self._execute_request_handler, receive_stream, subshell_id) + tg.start_soon(self._execute_request_loop, receive_stream) if subshell_id is None: # Main subshell. - await to_thread.run_sync(self.shell_stop.wait) - tg.cancel_scope.cancel() + with contextlib.suppress(RuntimeError): + self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) + async with create_task_group() as tg_main: + with CancelScope(shield=True) as scope: + self.set_trait("tg", tg_main) + self._main_subshell_ready.set() + await to_thread.run_sync(self.shell_stop.wait) + scope.cancel() + tg.cancel_scope.cancel() self._send_exec_request.pop(socket, None) - self.set_trait("asyncio_event_loop", None) await send_stream.aclose() await receive_stream.aclose() - async def _execute_request_handler( - self, receive_stream: MemoryObjectReceiveStream, subshell_id: str | None - ): - if subshell_id is None: - with contextlib.suppress(RuntimeError): - self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) - self._main_subshell_ready.set() + async def _execute_request_loop(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: async for handler, (received_time, socket, idents, msg) in receive_stream: try: From 8fc455c22c5f239bba6ab9d2c19c49811ac06136 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Fri, 4 Apr 2025 18:43:47 +1100 Subject: [PATCH 23/28] Add BlockingPortal and enhance task management in Kernel class --- ipykernel/kernelbase.py | 35 +++++++++++++++++++++++++++-------- tests/test_ipkernel_direct.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 605036d74..73d33f49c 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -50,6 +50,7 @@ to_thread, ) from anyio.abc import TaskGroup, TaskStatus +from anyio.from_thread import BlockingPortal from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session @@ -132,7 +133,8 @@ class Kernel(SingletonConfigurable): _send_exec_request: Dict[dict[zmq_anyio.Socket, MemoryObjectSendStream]] = Dict() _main_subshell_ready = Instance(Event, ()) asyncio_event_loop = Instance(asyncio.AbstractEventLoop, allow_none=True, read_only=True) # type:ignore[call-overload] - tg = Instance(TaskGroup, read_only=True) + _tg_main = Instance(TaskGroup) + _portal = Instance(BlockingPortal) log: logging.Logger = Instance(logging.Logger, allow_none=True) # type:ignore[assignment] @@ -444,17 +446,22 @@ async def shell_main(self, subshell_id: str | None): await tg.start(socket.start) tg.start_soon(self._process_shell, socket) tg.start_soon(self._execute_request_loop, receive_stream) - if subshell_id is None: - # Main subshell. + if not subshell_id: + # Main subshell with contextlib.suppress(RuntimeError): self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) async with create_task_group() as tg_main: with CancelScope(shield=True) as scope: - self.set_trait("tg", tg_main) - self._main_subshell_ready.set() - await to_thread.run_sync(self.shell_stop.wait) - scope.cancel() - tg.cancel_scope.cancel() + self._tg_main = tg_main + async with BlockingPortal() as portal: + # Provide a portal for general threadsafe access + self._portal = portal + self._main_subshell_ready.set() + await to_thread.run_sync(self.shell_stop.wait) + await portal.stop(True) + scope.cancel() + tg_main.cancel_scope.cancel() + tg.cancel_scope.cancel() self._send_exec_request.pop(socket, None) await send_stream.aclose() await receive_stream.aclose() @@ -573,6 +580,17 @@ def pre_handler_hook(self): def post_handler_hook(self): """Hook to execute after calling message handler""" + def start_soon(self, func, *args): + "Run a coroutine in the main thread taskgroup." + try: + if self._portal._event_loop_thread_id == threading.get_ident(): + self._tg_main.start_soon(func, *args) + else: + self._portal.start_task_soon(func, *args) + except Exception: + self.log.exception("portal call failed") + raise + async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: """Process messages on shell and control channels""" async with create_task_group() as tg: @@ -604,6 +622,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: def stop(self): self.shell_stop.set() self.control_stop.set() + self._main_subshell_ready = Event() def record_ports(self, ports): """Record the ports that this kernel is using. diff --git a/tests/test_ipkernel_direct.py b/tests/test_ipkernel_direct.py index fe7321a66..cb521fec9 100644 --- a/tests/test_ipkernel_direct.py +++ b/tests/test_ipkernel_direct.py @@ -177,3 +177,32 @@ async def test_do_debug_request(ipkernel: IPythonKernel) -> None: msg = ipkernel.session.msg("debug_request", {}) ipkernel.session.serialize(msg) await ipkernel.do_debug_request(msg) + + +@pytest.mark.parametrize("mode", ["main", "external"]) +@pytest.mark.parametrize("exception", [True, False]) +async def test_start_soon(mode, exception: bool, ipkernel: IPythonKernel, anyio_backend: str): + # Test we can start coroutines from various scopes + import anyio + from anyio import to_thread + + async def my_test(event: anyio.Event): + event.set() + if exception: + raise ValueError + + events = [] + + async def start(): + event = anyio.Event() + if mode == "main": + ipkernel.start_soon(my_test, event) + else: + await to_thread.run_sync(ipkernel.start_soon, my_test, event) + events.append(event) + + for _ in range(50): + await start() + + for event in events: + await event.wait() From 02c59e81e118ea2f52d3e5d6ababaf522634e8a8 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Fri, 4 Apr 2025 20:20:03 +1100 Subject: [PATCH 24/28] Catch errors on main subshell stop and remove redundant cancel scope. --- ipykernel/kernelbase.py | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 73d33f49c..bd5c8a9b8 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -42,7 +42,6 @@ import zmq_anyio from anyio import ( TASK_STATUS_IGNORED, - CancelScope, Event, create_memory_object_stream, create_task_group, @@ -441,17 +440,18 @@ async def shell_main(self, subshell_id: str | None): if socket not in self._send_exec_request: send_stream, receive_stream = create_memory_object_stream(max_buffer_size=math.inf) self._send_exec_request[socket] = send_stream - async with create_task_group() as tg: - if not socket.started.is_set(): - await tg.start(socket.start) - tg.start_soon(self._process_shell, socket) - tg.start_soon(self._execute_request_loop, receive_stream) - if not subshell_id: - # Main subshell - with contextlib.suppress(RuntimeError): - self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) - async with create_task_group() as tg_main: - with CancelScope(shield=True) as scope: + try: + async with create_task_group() as tg: + if not socket.started.is_set(): + await tg.start(socket.start) + tg.start_soon(self._process_shell, socket) + tg.start_soon(self._execute_request_loop, receive_stream) + if not subshell_id: + # Main subshell + with contextlib.suppress(RuntimeError): + self.set_trait("asyncio_event_loop", asyncio.get_running_loop()) + async with create_task_group() as tg_main: + tg_main.cancel_scope.shield = True self._tg_main = tg_main async with BlockingPortal() as portal: # Provide a portal for general threadsafe access @@ -459,12 +459,15 @@ async def shell_main(self, subshell_id: str | None): self._main_subshell_ready.set() await to_thread.run_sync(self.shell_stop.wait) await portal.stop(True) - scope.cancel() - tg_main.cancel_scope.cancel() - tg.cancel_scope.cancel() - self._send_exec_request.pop(socket, None) - await send_stream.aclose() - await receive_stream.aclose() + tg_main.cancel_scope.cancel() + tg.cancel_scope.cancel() + except BaseException: + if not self.shell_stop.is_set(): + raise + finally: + self._send_exec_request.pop(socket, None) + await send_stream.aclose() + await receive_stream.aclose() async def _execute_request_loop(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: From da2d27fcbbb01fc2d2286fe512a4721cf96cade9 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Fri, 4 Apr 2025 21:00:43 +1100 Subject: [PATCH 25/28] Add optional name parameter to start_soon for improved task identification --- ipykernel/kernelbase.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index bd5c8a9b8..1da09be44 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -583,13 +583,13 @@ def pre_handler_hook(self): def post_handler_hook(self): """Hook to execute after calling message handler""" - def start_soon(self, func, *args): + def start_soon(self, func, *args, name: str | None = None): "Run a coroutine in the main thread taskgroup." try: if self._portal._event_loop_thread_id == threading.get_ident(): - self._tg_main.start_soon(func, *args) + self._tg_main.start_soon(func, *args, name=name) else: - self._portal.start_task_soon(func, *args) + self._portal.start_task_soon(func, *args, name=name) except Exception: self.log.exception("portal call failed") raise From 7a85d84803bed2f1750ffe7e77b2f8c8e797d9b3 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sun, 6 Apr 2025 08:13:59 +1000 Subject: [PATCH 26/28] Refactor _execute_request_loop to ensure status is published in finally block --- ipykernel/kernelbase.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 1da09be44..ce74c2473 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -472,19 +472,19 @@ async def shell_main(self, subshell_id: str | None): async def _execute_request_loop(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: async for handler, (received_time, socket, idents, msg) in receive_stream: + self.set_parent(idents, msg, channel="shell") + self._publish_status("busy", "shell", parent=msg) try: if received_time < self._aborted_time: await self._send_abort_reply(socket, msg, idents) continue - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell", parent=msg) result = handler(socket, idents, msg) if inspect.isawaitable(result): await result - self.set_parent(idents, msg, channel="shell") - self._publish_status("idle", "shell", parent=msg) except BaseException as e: self.log.exception("Execute request", exc_info=e) + finally: + self._publish_status("idle", "shell", parent=msg) async def _process_shell(self, socket): # socket=None is valid if kernel subshells are not supported. From 19ee3e21cbbbe7610c36b870e1e31cb7706a8cd7 Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sun, 6 Apr 2025 09:28:24 +1000 Subject: [PATCH 27/28] Remove deprecated metadata handling and unused log module --- ipykernel/ipkernel.py | 99 ----------------------------------- ipykernel/kernelbase.py | 26 --------- ipykernel/log.py | 30 ----------- pyproject.toml | 1 - tests/test_ipkernel_direct.py | 17 ------ tests/test_kernel.py | 38 ++++---------- 6 files changed, 10 insertions(+), 201 deletions(-) delete mode 100644 ipykernel/log.py diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index b170d55e7..a6910a540 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -294,35 +294,6 @@ def set_parent(self, ident, parent, channel="shell"): if channel == "shell" and self.shell: self.shell.set_parent(parent) - def init_metadata(self, parent): - """Initialize metadata. - - Run at the beginning of each execution request. - """ - md = super().init_metadata(parent) - # FIXME: remove deprecated ipyparallel-specific code - # This is required for ipyparallel < 5.0 - md.update( - { - "dependencies_met": True, - "engine": self.ident, - } - ) - return md - - def finish_metadata(self, parent, metadata, reply_content): - """Finish populating metadata. - - Run after completing an execution request. - """ - # FIXME: remove deprecated ipyparallel-specific code - # This is required by ipyparallel < 5.0 - metadata["status"] = reply_content["status"] - if reply_content["status"] == "error" and reply_content["ename"] == "UnmetDependency": - metadata["dependencies_met"] = False - - return metadata - def _forward_input(self, allow_stdin=False): """Forward raw_input and getpass to the current frontend. @@ -478,10 +449,6 @@ async def run(execution: Execution) -> None: } ) - # FIXME: deprecated piece for ipyparallel (remove in 5.0): - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method="execute") - reply_content["engine_info"] = e_info - # Return the execution counter so clients can display prompts reply_content["execution_count"] = shell.execution_count - 1 @@ -662,72 +629,6 @@ def do_is_complete(self, code): r["indent"] = " " * indent_spaces return r - def do_apply(self, content, bufs, msg_id, reply_metadata): - """Handle an apply request.""" - try: - from ipyparallel.serialize import serialize_object, unpack_apply_message - except ImportError: - from .serialize import serialize_object, unpack_apply_message - - shell = self.shell - assert shell is not None - try: - working = shell.user_ns - - prefix = "_" + str(msg_id).replace("-", "") + "_" - f, args, kwargs = unpack_apply_message(bufs, working, copy=False) - - fname = getattr(f, "__name__", "f") - - fname = prefix + "f" - argname = prefix + "args" - kwargname = prefix + "kwargs" - resultname = prefix + "result" - - ns = {fname: f, argname: args, kwargname: kwargs, resultname: None} - # print ns - working.update(ns) - code = f"{resultname} = {fname}(*{argname},**{kwargname})" - try: - exec(code, shell.user_global_ns, shell.user_ns) - result = working.get(resultname) - finally: - for key in ns: - working.pop(key) - - assert self.session is not None - result_buf = serialize_object( - result, - buffer_threshold=self.session.buffer_threshold, - item_threshold=self.session.item_threshold, - ) - - except BaseException as e: - # invoke IPython traceback formatting - shell.showtraceback() - reply_content = { - "traceback": shell._last_traceback or [], - "ename": str(type(e).__name__), - "evalue": str(e), - } - # FIXME: deprecated piece for ipyparallel (remove in 5.0): - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method="apply") - reply_content["engine_info"] = e_info - - self.send_response( - self.iopub_socket, - "error", - reply_content, - ident=self._topic("error"), - ) - self.log.info("Exception in apply request:\n%s", "\n".join(reply_content["traceback"])) - result_buf = [] - reply_content["status"] = "error" - else: - reply_content = {"status": "ok"} - - return reply_content, result_buf - def do_clear(self): """Clear the kernel.""" if self.shell: diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index ce74c2473..7739f4c18 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -745,24 +745,6 @@ def send_response( metadata, ) - def init_metadata(self, parent): - """Initialize metadata. - - Run at the beginning of execution requests. - """ - # FIXME: `started` is part of ipyparallel - # Remove for ipykernel 5.0 - return { - "started": now(), - } - - def finish_metadata(self, parent, metadata, reply_content): - """Finish populating metadata. - - Run after completing an execution request. - """ - return metadata - async def execute_request(self, socket, ident, parent): """handle an execute_request""" if not self.session: @@ -782,8 +764,6 @@ async def execute_request(self, socket, ident, parent): stop_on_error = content.get("stop_on_error", True) - metadata = self.init_metadata(parent) - # Re-broadcast our input for the benefit of listening clients, and # start computing output if not silent: @@ -829,14 +809,12 @@ async def execute_request(self, socket, ident, parent): # Send the reply. reply_content = json_clean(reply_content) - metadata = self.finish_metadata(parent, metadata, reply_content) reply_msg = self.session.send( socket, "execute_reply", content=reply_content, parent=parent, - metadata=metadata, ident=ident, ) @@ -1253,15 +1231,11 @@ async def _send_abort_reply(self, socket, msg, idents): self.log.info("Aborting %s: %s", msg["header"]["msg_id"], msg["header"]["msg_type"]) reply_type = msg["header"]["msg_type"].rsplit("_", 1)[0] + "_reply" status = {"status": "aborted"} - md = self.init_metadata(msg) - md = self.finish_metadata(msg, md, status) - md.update(status) assert self.session is not None self.session.send( socket, reply_type, - metadata=md, content=status, parent=msg, ident=idents, diff --git a/ipykernel/log.py b/ipykernel/log.py deleted file mode 100644 index c230065e8..000000000 --- a/ipykernel/log.py +++ /dev/null @@ -1,30 +0,0 @@ -"""A PUB log handler.""" - -import warnings - -from zmq.log.handlers import PUBHandler - -warnings.warn( - "ipykernel.log is deprecated since ipykernel 4.3.0 (2016). It has moved to ipyparallel.engine.log", - DeprecationWarning, - stacklevel=2, -) - - -class EnginePUBHandler(PUBHandler): - """A simple PUBHandler subclass that sets root_topic""" - - engine = None - - def __init__(self, engine, *args, **kwargs): - """Initialize the handler.""" - PUBHandler.__init__(self, *args, **kwargs) - self.engine = engine - - @property # type:ignore[misc] - def root_topic(self): - """this is a property, in case the handler is created - before the engine gets registered with an id""" - if isinstance(getattr(self.engine, "id", None), int): - return "engine.%i" % self.engine.id # type:ignore[union-attr] - return "engine" diff --git a/pyproject.toml b/pyproject.toml index 3a582d161..367d3dcda 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,6 @@ docs = [ ] test = [ "flaky", - "ipyparallel", "pre-commit", "pytest-cov", "pytest-timeout", diff --git a/tests/test_ipkernel_direct.py b/tests/test_ipkernel_direct.py index cb521fec9..58fa84f12 100644 --- a/tests/test_ipkernel_direct.py +++ b/tests/test_ipkernel_direct.py @@ -137,17 +137,6 @@ async def test_is_complete_request(ipkernel: MockIPyKernel) -> None: assert reply["header"]["msg_type"] == "is_complete_reply" -def test_do_apply(ipkernel: MockIPyKernel) -> None: - from ipyparallel import pack_apply_message - - def hello(): - pass - - msg = pack_apply_message(hello, (), {}) - ipkernel.do_apply(None, msg, "1", {}) - ipkernel.do_apply(None, [], "1", {}) - - async def test_direct_debug_request(ipkernel): reply = await ipkernel.test_control_message("debug_request", {}) assert reply["header"]["msg_type"] == "debug_reply" @@ -167,12 +156,6 @@ def test_create_comm(): assert isinstance(_create_comm(), BaseComm) -def test_finish_metadata(ipkernel: IPythonKernel) -> None: - reply_content = dict(status="error", ename="UnmetDependency") - metadata = ipkernel.finish_metadata({}, {}, reply_content) - assert metadata["dependencies_met"] is False - - async def test_do_debug_request(ipkernel: IPythonKernel) -> None: msg = ipkernel.session.msg("debug_request", {}) ipkernel.session.serialize(msg) diff --git a/tests/test_kernel.py b/tests/test_kernel.py index 2b379eb83..a7038a976 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -10,7 +10,6 @@ import subprocess import sys import time -from datetime import datetime, timedelta from subprocess import Popen from tempfile import TemporaryDirectory @@ -602,16 +601,20 @@ def test_control_thread_priority(): def test_sequential_control_messages(): with new_kernel() as kc: - msg_id = kc.execute("import time") + msg_id = kc.execute("import anyio") get_reply(kc, msg_id) # Send multiple messages on the control channel. # Using execute messages to vary duration. - sleeps = [0.6, 0.3, 0.1] + sleeps = [0.0, 0.6, 0.3, 0.1, 0.0, 0.0] # Prepare messages msgs = [ - kc.session.msg("execute_request", {"code": f"time.sleep({sleep})"}) for sleep in sleeps + kc.session.msg( + "execute_request", + {"code": f"await anyio.sleep({sleep})", "user_expressions": {"i": str(i)}}, + ) + for i, sleep in enumerate(sleeps) ] msg_ids = [msg["header"]["msg_id"] for msg in msgs] @@ -620,30 +623,9 @@ def test_sequential_control_messages(): kc.control_channel.send(msg) # Get replies - replies = [get_reply(kc, msg_id, channel="control") for msg_id in msg_ids] - - def ensure_datetime(arg): - # Support arg which is a datetime or str. - if isinstance(arg, str): - if sys.version_info[:2] < (3, 11) and arg.endswith("Z"): - # Python < 3.11 doesn't support "Z" suffix in datetime.fromisoformat, - # so use alternative timezone format. - # https://github.com/python/cpython/issues/80010 - arg = arg[:-1] + "+00:00" - return datetime.fromisoformat(arg) - return arg - - # Check messages are processed in order, one at a time, and of a sensible duration. - previous_end = None - for reply, sleep in zip(replies, sleeps): - start = ensure_datetime(reply["metadata"]["started"]) - end = ensure_datetime(reply["header"]["date"]) - - if previous_end is not None: - assert start >= previous_end - previous_end = end - - assert end >= start + timedelta(seconds=sleep) + for ii, reply in enumerate(get_reply(kc, msg_id, channel="control") for msg_id in msg_ids): + i = reply["content"]["user_expressions"]["i"]["data"]["text/plain"] + assert str(ii) == i def _child(): From 5798b4ce680c6112d7e6b4e83cf94ff81e2595cb Mon Sep 17 00:00:00 2001 From: Alan Fleming Date: Sun, 6 Apr 2025 16:40:27 +1000 Subject: [PATCH 28/28] Only set parent and ident from _execute_request_loop. --- ipykernel/comm/comm.py | 2 +- ipykernel/inprocess/ipkernel.py | 6 +- ipykernel/ipkernel.py | 9 +-- ipykernel/kernelbase.py | 130 ++++++++++++-------------------- tests/test_kernel.py | 6 +- tests/test_subshells.py | 2 +- 6 files changed, 58 insertions(+), 97 deletions(-) diff --git a/ipykernel/comm/comm.py b/ipykernel/comm/comm.py index 1747d4ce7..b01af0ca9 100644 --- a/ipykernel/comm/comm.py +++ b/ipykernel/comm/comm.py @@ -39,7 +39,7 @@ def publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys): msg_type, content, metadata=json_clean(metadata), - parent=self.kernel.get_parent(), + parent=self.kernel.parent_msg, ident=self.topic, buffers=buffers, ) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index efaa594bd..d40e653e9 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -95,7 +95,7 @@ def stop(self): def _abort_queues(self): """The in-process kernel doesn't abort requests.""" - def _input_request(self, prompt, ident, parent, password=False): + def _input_request(self, prompt, *, password=False): # Flush output before making the request. self.raw_input_str = None if sys.stdout is not None: @@ -106,10 +106,10 @@ def _input_request(self, prompt, ident, parent, password=False): # Send the input request. content = json_clean(dict(prompt=prompt, password=password)) assert self.session is not None - msg = self.session.msg("input_request", content, parent) + msg = self.session.msg("input_request", content, self.parent_msg) for frontend in self.frontends: assert frontend is not None - if frontend.session.session == parent["header"]["session"]: + if frontend.session.session == self.parent_msg["header"]["session"]: frontend.stdin_channel.call_handlers(msg) break else: diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index a6910a540..b74b97184 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -286,12 +286,9 @@ def stop(self): if self.debugpy_socket is not None: self.debugpy_stop.set() - def set_parent(self, ident, parent, channel="shell"): - """Overridden from parent to tell the display hook and output streams - about the parent message. - """ - super().set_parent(ident, parent, channel) - if channel == "shell" and self.shell: + def _set_parent_ident(self, parent, ident): + super()._set_parent_ident(parent, ident) + if self.shell: self.shell.set_parent(parent) def _forward_input(self, allow_stdin=False): diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 7739f4c18..658875254 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -22,8 +22,6 @@ from functools import partial from signal import SIGINT, SIGTERM, Signals -from .thread import CONTROL_THREAD_NAME - if sys.platform != "win32": from signal import SIGKILL else: @@ -174,17 +172,6 @@ def _default_ident(self): # track associations with current request _allow_stdin = Bool(False) - _parents: Dict[str, t.Any] = Dict({"shell": {}, "control": {}}) - _parent_ident = Dict({"shell": b"", "control": b""}) - - @property - def _parent_header(self): - warnings.warn( - "Kernel._parent_header is deprecated in ipykernel 6. Use .get_parent()", - DeprecationWarning, - stacklevel=2, - ) - return self.get_parent() # Time to sleep after flushing the stdout/err buffers in each execute # cycle. While this introduces a hard limit on the minimal latency of the @@ -303,11 +290,7 @@ async def process_control_message(self, msg=None): return self.log.debug("Control received: %s", msg) - - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="control") - self._publish_status("busy", "control", parent=msg) - + self._publish_status("busy", msg) header = msg["header"] msg_type = header["msg_type"] @@ -329,7 +312,7 @@ async def process_control_message(self, msg=None): sys.stdout.flush() if sys.stderr is not None: sys.stderr.flush() - self._publish_status("idle", "control") + self._publish_status("idle", msg) def should_handle(self, stream, msg, idents): """Check whether a shell-channel message should be handled @@ -472,8 +455,8 @@ async def shell_main(self, subshell_id: str | None): async def _execute_request_loop(self, receive_stream: MemoryObjectReceiveStream): async with receive_stream: async for handler, (received_time, socket, idents, msg) in receive_stream: - self.set_parent(idents, msg, channel="shell") - self._publish_status("busy", "shell", parent=msg) + self._set_parent_ident(msg, idents) + self._publish_status("busy", msg) try: if received_time < self._aborted_time: await self._send_abort_reply(socket, msg, idents) @@ -484,7 +467,7 @@ async def _execute_request_loop(self, receive_stream: MemoryObjectReceiveStream) except BaseException as e: self.log.exception("Execute request", exc_info=e) finally: - self._publish_status("idle", "shell", parent=msg) + self._publish_status("idle", msg) async def _process_shell(self, socket): # socket=None is valid if kernel subshells are not supported. @@ -522,7 +505,7 @@ async def process_shell_message(self, msg=None, socket=None): msg_type = msg["header"]["msg_type"] if msg_type != "execute_request": - self._publish_status("busy", "shell", parent=msg) + self._publish_status("busy", msg) # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each @@ -565,7 +548,7 @@ async def process_shell_message(self, msg=None, socket=None): except Exception: self.log.debug("Unable to signal in post_handler_hook:", exc_info=True) if msg_type != "execute_request": - self._publish_status("idle", "shell", parent=msg) + self._publish_status("idle", msg) async def control_main(self): assert self.control_socket is not None @@ -651,7 +634,7 @@ def _publish_execute_input(self, code, parent, execution_count): ident=self._topic("execute_input"), ) - def _publish_status(self, status, channel, parent=None): + def _publish_status(self, status: str, parent): """send status (busy/idle) on IOPub""" if not self.session: return @@ -659,7 +642,7 @@ def _publish_status(self, status, channel, parent=None): self.iopub_socket, "status", {"execution_state": status}, - parent=parent or self.get_parent(channel), + parent=parent, ident=self._topic("status"), ) @@ -670,46 +653,35 @@ def _publish_debug_event(self, event): self.iopub_socket, "debug_event", event, - parent=self.get_parent(), + parent=self.parent_msg, ident=self._topic("debug_event"), ) - def set_parent(self, ident, parent, channel="shell"): - """Set the current parent request + def _set_parent_ident(self, parent, ident): + self._parent_msg = parent + self._parent_ident = ident - Side effects (IOPub messages) and replies are associated with - the request that caused them via the parent_header. + @property + def parent_msg(self): + """The message of the most recent execution request. - The parent identity is used to route input_request messages - on the stdin channel. + .. versionadded:: 7 """ - self._parent_ident[channel] = ident - self._parents[channel] = parent - - def get_parent(self, channel=None): - """Get the parent request associated with a channel. - - .. versionadded:: 6 + try: + return self._parent_msg + except AttributeError: + return {} - Parameters - ---------- - channel : str - the name of the channel ('shell' or 'control') + @property + def parent_ident(self): + """The ident of the most recent execution request. - Returns - ------- - message : dict - the parent message for the most recent request on the channel. + .. versionadded:: 7 """ - - if channel is None: - # If a channel is not specified, get information from current thread - if threading.current_thread().name == CONTROL_THREAD_NAME: - channel = "control" - else: - channel = "shell" - - return self._parents.get(channel, {}) + try: + return self._parent_ident + except AttributeError: + return [] def send_response( self, @@ -721,28 +693,24 @@ def send_response( track=False, header=None, metadata=None, - channel=None, ): """Send a response to the message we're currently processing. This accepts all the parameters of :meth:`jupyter_client.session.Session.send` except ``parent``. - - This relies on :meth:`set_parent` having been called for the current - message. """ if not self.session: return None return self.session.send( socket, msg_or_type, - content, - self.get_parent(channel), - ident, - buffers, - track, - header, - metadata, + content=content, + parent=self.parent_msg, + ident=ident, + buffers=buffers, + track=track, + header=header, + metadata=metadata, ) async def execute_request(self, socket, ident, parent): @@ -1265,12 +1233,7 @@ def getpass(self, prompt="", stream=None): UserWarning, stacklevel=2, ) - return self._input_request( - prompt, - self._parent_ident["shell"], - self.get_parent("shell"), - password=True, - ) + return self._input_request(prompt, password=True) def raw_input(self, prompt=""): """Forward raw_input to frontends @@ -1282,14 +1245,9 @@ def raw_input(self, prompt=""): if not self._allow_stdin: msg = "raw_input was called, but this frontend does not support input requests." raise StdinNotImplementedError(msg) - return self._input_request( - str(prompt), - self._parent_ident["shell"], - self.get_parent("shell"), - password=False, - ) + return self._input_request(str(prompt), password=False) - def _input_request(self, prompt, ident, parent, password=False): + def _input_request(self, prompt, *, password=False): # Flush output before making the request. if sys.stdout is not None: sys.stdout.flush() @@ -1308,7 +1266,13 @@ def _input_request(self, prompt, ident, parent, password=False): # Send the input request. assert self.session is not None content = json_clean(dict(prompt=prompt, password=password)) - self.session.send(self.stdin_socket, "input_request", content, parent, ident=ident) + self.session.send( + self.stdin_socket, + "input_request", + content, + parent=self.parent_msg, + ident=self.parent_ident, + ) # Await a response. while True: @@ -1334,7 +1298,7 @@ def _input_request(self, prompt, ident, parent, password=False): try: value = reply["content"]["value"] # type:ignore[index] except Exception: - self.log.error("Bad input_reply: %s", parent) + self.log.error("Bad input_reply: %s", self.parent_msg) value = "" if value == "\x04": # EOF diff --git a/tests/test_kernel.py b/tests/test_kernel.py index a7038a976..4d591760c 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -601,18 +601,18 @@ def test_control_thread_priority(): def test_sequential_control_messages(): with new_kernel() as kc: - msg_id = kc.execute("import anyio") + msg_id = kc.execute("import time") get_reply(kc, msg_id) # Send multiple messages on the control channel. # Using execute messages to vary duration. - sleeps = [0.0, 0.6, 0.3, 0.1, 0.0, 0.0] + sleeps = [0.0, 0.6, 0.3, 0.1, 0.0] # Prepare messages msgs = [ kc.session.msg( "execute_request", - {"code": f"await anyio.sleep({sleep})", "user_expressions": {"i": str(i)}}, + {"code": f"time.sleep({sleep})", "user_expressions": {"i": str(i)}}, ) for i, sleep in enumerate(sleeps) ] diff --git a/tests/test_subshells.py b/tests/test_subshells.py index 2a0df12e0..58d71ceaf 100644 --- a/tests/test_subshells.py +++ b/tests/test_subshells.py @@ -259,7 +259,7 @@ def test_execution_count(): def test_create_while_execute(): with new_kernel() as kc: # Send request to execute code on main subshell. - msg = kc.session.msg("execute_request", {"code": "import time; time.sleep(0.05)"}) + msg = kc.session.msg("execute_request", {"code": "import time; time.sleep(0.5)"}) kc.shell_channel.send(msg) # Create subshell via control channel.