@@ -277,6 +277,8 @@ def __init__(self, **kwargs):
277277 """Initialize the kernel."""
278278 super ().__init__ (** kwargs )
279279
280+ #self._iant_lock = threading.Lock()
281+
280282 # Kernel application may swap stdout and stderr to OutStream,
281283 # which is the case in `IPKernelApp.init_io`, hence `sys.stdout`
282284 # can already by different from TextIO at initialization time.
@@ -571,6 +573,10 @@ def schedule_dispatch(self, dispatch, *args):
571573
572574 def start (self ):
573575 """register dispatchers for streams"""
576+
577+ #with open("debug.txt", "a") as f:
578+ # f.write(f"--- kernelbase start --- {threading.main_thread().ident}\n")
579+
574580 self .io_loop = ioloop .IOLoop .current ()
575581 self .msg_queue : Queue [t .Any ] = Queue ()
576582 if not self .shell_channel_thread :
@@ -609,6 +615,11 @@ async def shell_channel_thread_main(self, msg):
609615 msg3 = self .session .deserialize (msg2 , content = False , copy = False )
610616 subshell_id = msg3 ["header" ].get ("subshell_id" )
611617
618+ #with open("debug.txt", "a") as f:
619+ # f.write(
620+ # f"{threading.current_thread().ident} shell_channel_thread_main msg received for {subshell_id}\n"
621+ # )
622+
612623 # Find inproc pair socket to use to send message to correct subshell.
613624 subshell_manager = self .shell_channel_thread .manager
614625 socket = subshell_manager .get_shell_channel_stream (subshell_id )
@@ -622,6 +633,10 @@ async def shell_channel_thread_main(self, msg):
622633
623634 async def shell_main (self , subshell_id : str | None , msg ):
624635 """Handler of shell messages for a single subshell"""
636+
637+ #with open("debug.txt", "a") as f:
638+ # f.write(f"{threading.current_thread().ident} shell_main msg recvd on {subshell_id}\n")
639+
625640 if self ._supports_kernel_subshells :
626641 if subshell_id is None :
627642 assert threading .current_thread () == threading .main_thread ()
@@ -667,6 +682,10 @@ def _publish_execute_input(self, code, parent, execution_count):
667682 """Publish the code request on the iopub stream."""
668683 if not self .session :
669684 return
685+ #with self._iant_lock:
686+ #with open("debug.txt", "a") as f:
687+ # f.write(f"{threading.current_thread().ident} iopub_socket execute_input\n")
688+
670689 self .session .send (
671690 self .iopub_socket ,
672691 "execute_input" ,
@@ -679,6 +698,11 @@ def _publish_status(self, status, channel, parent=None):
679698 """send status (busy/idle) on IOPub"""
680699 if not self .session :
681700 return
701+ #with self._iant_lock:
702+ #with open("debug.txt", "a") as f:
703+ # f.write(f"{threading.current_thread().ident} iopub_socket status {status}\n")
704+ # f.write(f"SESSION {self.session}\n")
705+
682706 self .session .send (
683707 self .iopub_socket ,
684708 "status" ,
@@ -696,6 +720,10 @@ def _publish_status_and_flush(self, status, channel, stream, parent=None):
696720 def _publish_debug_event (self , event ):
697721 if not self .session :
698722 return
723+ #with self._iant_lock:
724+ #with open("debug.txt", "a") as f:
725+ # f.write(f"{threading.current_thread().ident} iopub_socket debug_event\n")
726+
699727 self .session .send (
700728 self .iopub_socket ,
701729 "debug_event" ,
@@ -763,6 +791,10 @@ def send_response(
763791 """
764792 if not self .session :
765793 return None
794+
795+ #with open("debug.txt", "a") as f:
796+ # f.write(f"{threading.current_thread().ident} ? ?send_response\n")
797+
766798 return self .session .send (
767799 stream ,
768800 msg_or_type ,
@@ -835,6 +867,14 @@ async def execute_request(self, stream, ident, parent):
835867 if self ._do_exec_accepted_params ["cell_id" ]:
836868 do_execute_args ["cell_id" ] = cell_id
837869
870+ subshell_id = parent ["header" ].get ("subshell_id" )
871+ msg_id = parent ["header" ].get ("msg_id" )
872+
873+ #with open("debug.txt", "a") as f:
874+ # f.write(
875+ # f"{threading.current_thread().ident} about to execute_request {msg_id} {subshell_id} {code}\n"
876+ # )
877+
838878 # Call do_execute with the appropriate arguments
839879 reply_content = self .do_execute (** do_execute_args )
840880
@@ -854,6 +894,11 @@ async def execute_request(self, stream, ident, parent):
854894 reply_content = json_clean (reply_content )
855895 metadata = self .finish_metadata (parent , metadata , reply_content )
856896
897+ #with open("debug.txt", "a") as f:
898+ # f.write(
899+ # f"{threading.current_thread().ident} execute_reply {msg_id} {subshell_id} {reply_content}\n"
900+ # )
901+
857902 reply_msg : dict [str , t .Any ] = self .session .send ( # type:ignore[assignment]
858903 stream ,
859904 "execute_reply" ,
@@ -943,6 +988,10 @@ async def history_request(self, stream, ident, parent):
943988 reply_content = await reply_content
944989
945990 reply_content = json_clean (reply_content )
991+
992+ #with open("debug.txt", "a") as f:
993+ # f.write(f"{threading.current_thread().ident} ? history_reply\n")
994+
946995 msg = self .session .send (stream , "history_reply" , reply_content , parent , ident )
947996 self .log .debug ("%s" , msg )
948997
@@ -967,6 +1016,10 @@ async def connect_request(self, stream, ident, parent):
9671016 return
9681017 content = self ._recorded_ports .copy () if self ._recorded_ports else {}
9691018 content ["status" ] = "ok"
1019+
1020+ #with open("debug.txt", "a") as f:
1021+ # f.write(f"{threading.current_thread().ident} ? connect_reply\n")
1022+
9701023 msg = self .session .send (stream , "connect_reply" , content , parent , ident )
9711024 self .log .debug ("%s" , msg )
9721025
@@ -991,6 +1044,10 @@ async def kernel_info_request(self, stream, ident, parent):
9911044 return
9921045 content = {"status" : "ok" }
9931046 content .update (self .kernel_info )
1047+
1048+ #with open("debug.txt", "a") as f:
1049+ # f.write(f"{threading.current_thread().ident} ? kernel_info_reply\n")
1050+
9941051 msg = self .session .send (stream , "kernel_info_reply" , content , parent , ident )
9951052 self .log .debug ("%s" , msg )
9961053
@@ -1058,6 +1115,10 @@ async def shutdown_request(self, stream, ident, parent):
10581115 content = self .do_shutdown (parent ["content" ]["restart" ])
10591116 if inspect .isawaitable (content ):
10601117 content = await content
1118+
1119+ #with open("debug.txt", "a") as f:
1120+ # f.write(f"{threading.current_thread().ident} ? shutdown_reply\n")
1121+
10611122 self .session .send (stream , "shutdown_reply" , content , parent , ident = ident )
10621123 # same content, but different msg_id for broadcasting on IOPub
10631124 self ._shutdown_message = self .session .msg ("shutdown_reply" , content , parent )
@@ -1108,6 +1169,10 @@ async def debug_request(self, stream, ident, parent):
11081169 if inspect .isawaitable (reply_content ):
11091170 reply_content = await reply_content
11101171 reply_content = json_clean (reply_content )
1172+
1173+ #with open("debug.txt", "a") as f:
1174+ # f.write(f"{threading.current_thread().ident} ? debug_reply\n")
1175+
11111176 reply_msg = self .session .send (stream , "debug_reply" , reply_content , parent , ident )
11121177 self .log .debug ("%s" , reply_msg )
11131178
@@ -1170,13 +1235,20 @@ async def create_subshell_request(self, socket, ident, parent) -> None:
11701235 self .log .error ("Subshells are not supported by this kernel" )
11711236 return
11721237
1238+ #with open("debug.txt", "a") as f:
1239+ # f.write(f"{threading.current_thread().ident} ? create_subshell_request\n")
1240+
11731241 assert threading .current_thread ().name == CONTROL_THREAD_NAME
11741242
11751243 # This should only be called in the control thread if it exists.
11761244 # Request is passed to shell channel thread to process.
11771245 other_socket = self .shell_channel_thread .manager .get_control_other_socket ()
11781246 other_socket .send_json ({"type" : "create" })
11791247 reply = other_socket .recv_json ()
1248+
1249+ #with open("debug.txt", "a") as f:
1250+ # f.write(f"{threading.current_thread().ident} ? create_subshell_reply\n")
1251+
11801252 self .session .send (socket , "create_subshell_reply" , reply , parent , ident )
11811253
11821254 async def delete_subshell_request (self , socket , ident , parent ) -> None :
@@ -1200,6 +1272,10 @@ async def delete_subshell_request(self, socket, ident, parent) -> None:
12001272 other_socket = self .shell_channel_thread .manager .get_control_other_socket ()
12011273 other_socket .send_json ({"type" : "delete" , "subshell_id" : subshell_id })
12021274 reply = other_socket .recv_json ()
1275+
1276+ #with open("debug.txt", "a") as f:
1277+ # f.write(f"{threading.current_thread().ident} ? delete_subshell_reply\n")
1278+
12031279 self .session .send (socket , "delete_subshell_reply" , reply , parent , ident )
12041280
12051281 async def list_subshell_request (self , socket , ident , parent ) -> None :
@@ -1216,6 +1292,10 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
12161292 other_socket = self .shell_channel_thread .manager .get_control_other_socket ()
12171293 other_socket .send_json ({"type" : "list" })
12181294 reply = other_socket .recv_json ()
1295+
1296+ #with open("debug.txt", "a") as f:
1297+ # f.write(f"{threading.current_thread().ident} ? list_subshell_reply\n")
1298+
12191299 self .session .send (socket , "list_subshell_reply" , reply , parent , ident )
12201300
12211301 # ---------------------------------------------------------------------------
@@ -1280,6 +1360,10 @@ async def abort_request(self, stream, ident, parent): # pragma: no cover
12801360 content = dict (status = "ok" )
12811361 if not self .session :
12821362 return
1363+
1364+ #with open("debug.txt", "a") as f:
1365+ # f.write(f"{threading.current_thread().ident} ? abort_reply\n")
1366+
12831367 reply_msg = self .session .send (
12841368 stream , "abort_reply" , content = content , parent = parent , ident = ident
12851369 )
@@ -1378,6 +1462,9 @@ def _send_abort_reply(self, stream, msg, idents):
13781462 md = self .finish_metadata (msg , md , status )
13791463 md .update (status )
13801464
1465+ #with open("debug.txt", "a") as f:
1466+ # f.write(f"{threading.current_thread().ident} ? {reply_type}\n")
1467+
13811468 self .session .send (
13821469 stream ,
13831470 reply_type ,
@@ -1558,6 +1645,10 @@ async def _at_shutdown(self):
15581645
15591646 finally :
15601647 if self ._shutdown_message is not None and self .session :
1648+ #with self._iant_lock:
1649+ #with open("debug.txt", "a") as f:
1650+ # f.write(f"{threading.current_thread().ident} ? _shutdown\n")
1651+
15611652 self .session .send (
15621653 self .iopub_socket ,
15631654 self ._shutdown_message ,
0 commit comments