@@ -100,21 +100,41 @@ async def stop_listening(self):
100100 _listening_task : t .Optional [t .Awaitable ] = Any (allow_none = True )
101101
102102 def handle_incoming_message (self , channel_name : str , msg : list [bytes ]):
103- """Use the given session to send the message."""
103+ """
104+ Handle incoming kernel messages and set up immediate cell execution state tracking.
105+
106+ This method processes incoming kernel messages and caches them for response mapping.
107+ Importantly, it detects execute_request messages and immediately sets the corresponding
108+ cell state to 'busy' to provide real-time feedback for queued cell executions.
109+
110+ This ensures that when multiple cells are executed simultaneously, all queued cells
111+ show a '*' prompt immediately, not just the currently executing cell.
112+
113+ Args:
114+ channel_name: The kernel channel name (shell, iopub, etc.)
115+ msg: The raw kernel message as bytes
116+ """
104117 # Cache the message ID and its socket name so that
105118 # any response message can be mapped back to the
106119 # source channel.
107120 header = self .session .unpack (msg [0 ])
108- msg_id = header ["msg_id" ]
121+ msg_id = header ["msg_id" ]
122+ msg_type = header .get ("msg_type" )
109123 metadata = self .session .unpack (msg [2 ])
110124 cell_id = metadata .get ("cellId" )
111125
112- # Clear cell outputs if cell is re-executedq
126+ # Clear cell outputs if cell is re-executed
113127 if cell_id :
114128 existing = self .message_cache .get (cell_id = cell_id )
115129 if existing and existing ['msg_id' ] != msg_id :
116130 asyncio .create_task (self .output_processor .clear_cell_outputs (cell_id ))
117131
132+ # IMPORTANT: Set cell to 'busy' immediately when execute_request is received
133+ # This ensures queued cells show '*' prompt even before kernel starts processing them
134+ if msg_type == "execute_request" and channel_name == "shell" and cell_id :
135+ for yroom in self ._yrooms :
136+ yroom .set_cell_awareness_state (cell_id , "busy" )
137+
118138 self .message_cache .add ({
119139 "msg_id" : msg_id ,
120140 "channel" : channel_name ,
@@ -240,27 +260,27 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
240260 metadata ["metadata" ]["language_info" ] = language_info
241261
242262 case "status" :
243- # Unpack cell-specific information and determine execution state
263+ # Handle kernel status messages and update cell execution states
264+ # This provides real-time feedback about cell execution progress
244265 content = self .session .unpack (dmsg ["content" ])
245266 execution_state = content .get ("execution_state" )
267+
246268 # Update status across all collaborative rooms
247269 for yroom in self ._yrooms :
248- # If this status came from the shell channel, update
249- # the notebook status.
250- if parent_msg_data [ "channel" ] == "shell" :
251- awareness = yroom . get_awareness ()
252- if awareness is not None :
270+ awareness = yroom . get_awareness ()
271+ if awareness is not None :
272+ # If this status came from the shell channel, update
273+ # the notebook kernel status.
274+ if parent_msg_data and parent_msg_data . get ( "channel" ) == "shell" :
253275 # Update the kernel execution state at the top document level
254276 awareness .set_local_state_field ("kernel" , {"execution_state" : execution_state })
255- # Specifically update the running cell's execution state if cell_id is provided
256- if cell_id :
257- notebook = await yroom .get_jupyter_ydoc ()
258- _ , target_cell = notebook .find_cell (cell_id )
259- if target_cell :
260- # Adjust state naming convention from 'busy' to 'running' as per JupyterLab expectation
261- # https://github.com/jupyterlab/jupyterlab/blob/0ad84d93be9cb1318d749ffda27fbcd013304d50/packages/cells/src/widget.ts#L1670-L1678
262- state = 'running' if execution_state == 'busy' else execution_state
263- target_cell ["execution_state" ] = state
277+
278+ # Store cell execution state for persistence across client connections
279+ # This ensures that cell execution states survive page refreshes
280+ if cell_id :
281+ for yroom in self ._yrooms :
282+ yroom .set_cell_execution_state (cell_id , execution_state )
283+ yroom .set_cell_awareness_state (cell_id , execution_state )
264284 break
265285
266286 case "execute_input" :
@@ -278,8 +298,7 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
278298 case "stream" | "display_data" | "execute_result" | "error" | "update_display_data" | "clear_output" :
279299 if cell_id :
280300 # Process specific output messages through an optional processor
281- if self .output_processor and cell_id :
282- cell_id = parent_msg_data .get ('cell_id' )
301+ if self .output_processor :
283302 content = self .session .unpack (dmsg ["content" ])
284303 self .output_processor .process_output (dmsg ['msg_type' ], cell_id , content )
285304
0 commit comments