Skip to content

Commit 01217ca

Browse files
committed
Adopt nextgen-kernels-api for kernel management
Replace custom kernel management implementation with nextgen-kernels-api. Use message filtering to route outputs to processor while excluding them from websocket clients. Configure kernel client via traitlets instead of subclassing.
1 parent 63e25bb commit 01217ca

23 files changed

+294
-1971
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,6 @@ Untitled*
130130

131131
.vscode/
132132
playground/
133+
# pixi environments
134+
.pixi/*
135+
!.pixi/config.toml

jupyter_server_documents/app.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from jupyter_server.extension.application import ExtensionApp
22
from traitlets.config import Config
3-
43
from traitlets import Instance, Type
4+
5+
from nextgen_kernels_api.services.kernels.client_manager import KernelClientManager
6+
57
from .handlers import RouteHandler, FileIDIndexHandler
68
from .websockets import YRoomWebsocket
79
from .rooms.yroom_manager import YRoomManager
@@ -47,6 +49,8 @@ class ServerDocsApp(ExtensionApp):
4749

4850
yroom_manager = Instance(klass=YRoomManager, allow_none=True)
4951

52+
client_manager = Instance(klass=KernelClientManager, allow_none=True)
53+
5054
def initialize(self):
5155
super().initialize()
5256

@@ -71,6 +75,17 @@ def get_fileid_manager():
7175
self.outputs_manager = self.outputs_manager_class(parent=self)
7276
self.settings["outputs_manager"] = self.outputs_manager
7377

78+
# Initialize KernelClientRegistry as singleton
79+
# The KernelClientWebsocketConnection from nextgen-kernels-api will access this via .instance()
80+
self.client_manager = KernelClientManager.instance(
81+
parent=self,
82+
multi_kernel_manager=self.serverapp.kernel_manager
83+
)
84+
self.settings["client_manager"] = self.client_manager
85+
86+
# Register event listener for client management
87+
self.client_manager.register_event_listener(self.serverapp.event_logger)
88+
7489
# Serve Jupyter Collaboration API on
7590
# `self.settings["jupyter_server_ydoc"]` for compatibility with
7691
# extensions depending on Jupyter Collaboration
@@ -82,10 +97,34 @@ def get_fileid_manager():
8297
def _link_jupyter_server_extension(self, server_app):
8398
"""Setup custom config needed by this extension."""
8499
c = Config()
85-
c.ServerApp.kernel_websocket_connection_class = "jupyter_server_documents.kernels.websocket_connection.NextGenKernelWebsocketConnection"
86-
c.ServerApp.kernel_manager_class = "jupyter_server_documents.kernels.multi_kernel_manager.NextGenMappingKernelManager"
87-
c.MultiKernelManager.kernel_manager_class = "jupyter_server_documents.kernels.kernel_manager.NextGenKernelManager"
100+
# Use nextgen-kernels-api's multi-kernel manager
101+
# This manager disables activity watching and buffering (which we don't need)
102+
c.ServerApp.kernel_manager_class = "nextgen_kernels_api.services.kernels.kernelmanager.MultiKernelManager"
103+
c.ServerApp.kernel_websocket_connection_class = 'nextgen_kernels_api.services.kernels.connection.kernel_client_connection.KernelClientWebsocketConnection'
104+
105+
# Configure MultiKernelManager to use nextgen's KernelManager
106+
c.MultiKernelManager.kernel_manager_class = "nextgen_kernels_api.services.kernels.kernelmanager.KernelManager"
107+
108+
# Configure the KernelManager to use DocumentAwareKernelClient
109+
c.KernelManager.client_class = "jupyter_server_documents.kernel_client.DocumentAwareKernelClient"
110+
c.KernelManager.client_factory = "jupyter_server_documents.kernel_client.DocumentAwareKernelClient"
111+
112+
# Use custom session manager for YRoom integration
88113
c.ServerApp.session_manager_class = "jupyter_server_documents.session_manager.YDocSessionManager"
114+
115+
# Configure websocket connection to exclude output messages
116+
# Output messages are handled by DocumentAwareKernelClient's output processor
117+
# and should not be sent to websocket clients to avoid duplicate processing
118+
c.KernelClientWebsocketConnection.exclude_msg_types = [
119+
("status", "iopub"),
120+
("stream", "iopub"),
121+
("display_data", "iopub"),
122+
("execute_result", "iopub"),
123+
("error", "iopub"),
124+
("update_display_data", "iopub"),
125+
("clear_output", "iopub")
126+
]
127+
89128
server_app.update_config(c)
90129
super()._link_jupyter_server_extension(server_app)
91130

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
"""Document-aware kernel client for collaborative notebook editing.
2+
3+
This module extends nextgen-kernels-api's JupyterServerKernelClient to add
4+
notebook-specific functionality required for real-time collaboration:
5+
6+
- Routes kernel messages to collaborative YRooms for document state synchronization
7+
- Processes and separates large outputs to optimize document size
8+
- Tracks cell execution states and updates awareness for real-time UI feedback
9+
- Manages notebook metadata updates from kernel info
10+
"""
11+
import asyncio
12+
import typing as t
13+
14+
from nextgen_kernels_api.services.kernels.client import JupyterServerKernelClient
15+
from traitlets import Instance, Set, Type, default
16+
17+
from jupyter_server_documents.outputs import OutputProcessor
18+
from jupyter_server_documents.rooms.yroom import YRoom
19+
20+
21+
class DocumentAwareKernelClient(JupyterServerKernelClient):
22+
"""Kernel client with collaborative document awareness and output processing.
23+
24+
Extends the base JupyterServerKernelClient to integrate with YRooms for
25+
real-time collaboration, process outputs for optimization, and track cell
26+
execution states across connected clients.
27+
"""
28+
29+
_yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set())
30+
31+
output_processor = Instance(OutputProcessor, allow_none=True)
32+
33+
output_processor_class = Type(
34+
klass=OutputProcessor, default_value=OutputProcessor
35+
).tag(config=True)
36+
37+
@default("output_processor")
38+
def _default_output_processor(self) -> OutputProcessor:
39+
return self.output_processor_class(parent=self, config=self.config)
40+
41+
def __init__(self, *args, **kwargs):
42+
super().__init__(*args, **kwargs)
43+
44+
# Register listener for document-related messages
45+
# Combines state updates and outputs to share deserialization logic
46+
self.add_listener(
47+
self._handle_document_messages,
48+
msg_types=[
49+
("kernel_info_reply", "shell"),
50+
("status", "iopub"),
51+
("execute_input", "iopub"),
52+
("stream", "iopub"),
53+
("display_data", "iopub"),
54+
("execute_result", "iopub"),
55+
("error", "iopub"),
56+
("update_display_data", "iopub"),
57+
("clear_output", "iopub"),
58+
],
59+
)
60+
61+
async def _handle_document_messages(self, channel_name: str, msg: list[bytes]):
62+
"""Route kernel messages to document state and output handlers.
63+
64+
Deserializes kernel protocol messages and dispatches them to appropriate
65+
handlers based on message type. Extracts parent message and cell ID context
66+
needed by most handlers.
67+
"""
68+
if channel_name not in ("iopub", "shell"):
69+
return
70+
71+
# Deserialize message components
72+
# Base client strips signature, leaving [header, parent_header, metadata, content, ...buffers]
73+
try:
74+
if len(msg) < 4:
75+
self.log.debug(f"Message too short: {len(msg)} parts")
76+
return
77+
78+
header = self.session.unpack(msg[0])
79+
parent_header = self.session.unpack(msg[1])
80+
metadata = self.session.unpack(msg[2])
81+
82+
dmsg = {
83+
"header": header,
84+
"parent_header": parent_header,
85+
"metadata": metadata,
86+
"content": msg[3], # Keep as bytes, unpack in handlers
87+
"buffers": msg[4:] if len(msg) > 4 else [],
88+
"msg_id": header["msg_id"],
89+
"msg_type": header["msg_type"],
90+
}
91+
except Exception as e:
92+
self.log.debug(f"Skipping message that can't be deserialized: {e}")
93+
return
94+
95+
# Extract parent message context for cell ID lookup
96+
parent_msg_id = dmsg.get("parent_header", {}).get("msg_id")
97+
parent_msg_data = self.message_cache.get(parent_msg_id) if parent_msg_id else None
98+
cell_id = parent_msg_data.get("cell_id") if parent_msg_data else None
99+
100+
# Dispatch to appropriate handler
101+
msg_type = dmsg.get("msg_type")
102+
match msg_type:
103+
case "kernel_info_reply":
104+
await self._handle_kernel_info_reply(dmsg)
105+
case "status":
106+
await self._handle_status_message(dmsg, parent_msg_data, cell_id)
107+
case "execute_input":
108+
await self._handle_execute_input(dmsg, cell_id)
109+
case "stream" | "display_data" | "execute_result" | "error" | "update_display_data" | "clear_output":
110+
await self._handle_output_message(dmsg, msg_type, cell_id)
111+
112+
async def _handle_kernel_info_reply(self, msg: dict):
113+
"""Update notebook metadata with kernel language info."""
114+
content = self.session.unpack(msg["content"])
115+
language_info = content.get("language_info")
116+
117+
if language_info:
118+
for yroom in self._yrooms:
119+
try:
120+
notebook = await yroom.get_jupyter_ydoc()
121+
metadata = notebook.ymeta
122+
metadata["metadata"]["language_info"] = language_info
123+
except Exception as e:
124+
self.log.warning(f"Failed to update language info for yroom: {e}")
125+
126+
async def _handle_status_message(
127+
self, dmsg: dict, parent_msg_data: dict | None, cell_id: str | None
128+
):
129+
"""Update kernel and cell execution states from status messages.
130+
131+
Updates both document-level kernel status and cell-specific execution states,
132+
storing them persistently and in awareness for real-time UI updates.
133+
"""
134+
content = self.session.unpack(dmsg["content"])
135+
execution_state = content.get("execution_state")
136+
137+
for yroom in self._yrooms:
138+
awareness = yroom.get_awareness()
139+
if awareness is None:
140+
continue
141+
142+
# Update document-level kernel status if this is a top-level status message
143+
if parent_msg_data and parent_msg_data.get("channel") == "shell":
144+
awareness.set_local_state_field(
145+
"kernel", {"execution_state": execution_state}
146+
)
147+
148+
# Update cell execution state for persistence and awareness
149+
if cell_id:
150+
yroom.set_cell_execution_state(cell_id, execution_state)
151+
yroom.set_cell_awareness_state(cell_id, execution_state)
152+
break
153+
154+
async def _handle_execute_input(self, dmsg: dict, cell_id: str | None):
155+
"""Update cell execution count when execution begins."""
156+
if not cell_id:
157+
return
158+
159+
content = self.session.unpack(dmsg["content"])
160+
execution_count = content.get("execution_count")
161+
162+
if execution_count is not None:
163+
for yroom in self._yrooms:
164+
notebook = await yroom.get_jupyter_ydoc()
165+
_, target_cell = notebook.find_cell(cell_id)
166+
if target_cell:
167+
target_cell["execution_count"] = execution_count
168+
break
169+
170+
async def _handle_output_message(self, dmsg: dict, msg_type: str, cell_id: str | None):
171+
"""Process output messages through output processor."""
172+
if not cell_id:
173+
return
174+
175+
if self.output_processor:
176+
content = self.session.unpack(dmsg["content"])
177+
self.output_processor.process_output(msg_type, cell_id, content)
178+
else:
179+
self.log.warning("No output processor configured")
180+
181+
async def add_yroom(self, yroom: YRoom):
182+
"""Register a YRoom to receive kernel messages."""
183+
self._yrooms.add(yroom)
184+
185+
async def remove_yroom(self, yroom: YRoom):
186+
"""Unregister a YRoom from receiving kernel messages."""
187+
self._yrooms.discard(yroom)
188+
189+
def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
190+
"""Handle messages from WebSocket clients before routing to kernel.
191+
192+
Extends base implementation to:
193+
- Set cell awareness to 'busy' immediately on execute_request
194+
- Clear outputs when cell is re-executed
195+
196+
This ensures UI updates happen immediately rather than waiting for
197+
kernel processing, providing better UX for queued executions.
198+
"""
199+
try:
200+
header = self.session.unpack(msg[0])
201+
msg_id = header["msg_id"]
202+
msg_type = header.get("msg_type")
203+
metadata = self.session.unpack(msg[2])
204+
cell_id = metadata.get("cellId")
205+
206+
if cell_id:
207+
# Clear outputs if this is a re-execution of the same cell
208+
existing = self.message_cache.get(cell_id=cell_id)
209+
if existing and existing["msg_id"] != msg_id:
210+
asyncio.create_task(self.output_processor.clear_cell_outputs(cell_id))
211+
212+
# Set awareness state immediately for queued cells
213+
if msg_type == "execute_request" and channel_name == "shell":
214+
for yroom in self._yrooms:
215+
yroom.set_cell_awareness_state(cell_id, "busy")
216+
except Exception as e:
217+
self.log.debug(f"Error handling awareness for incoming message: {e}")
218+
219+
super().handle_incoming_message(channel_name, msg)

jupyter_server_documents/kernels/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)