Skip to content

Commit 63e25bb

Browse files
authored
Refactor OutputsManager to align with default Jupyter behavior (#163)
* Add safe msg_id handling in kernel_client. * Contrain to Python 3.13 in dev environment. * Create _is_stream_output utility function * Update stream_limit to 500 to avoid triggering too early * Use set literal * Update get_outputs to remove stream_limit logic The stream_limit logic is being moved in this PR to the writing of outputs, so get_outputs can just return all outputs. * Create _append_to_stream_file utility method. * Update process_loaded_notebook to handle exclude_outputs * Modify private _process_loaded methods to handle exclude_outputs * Update process_saving_notebooks to handle exclude_outputs * Add comment about placeholder outputs wrt nbformat * Fix write to better handle stream outputs and stream_limit at write time * Remove call to clear in saving logic * Refactor OutputsManager and add experimental OptimizedOutputsManager This commit introduces a cleaner architecture for handling notebook outputs and adds an experimental optimized version that supports excluding outputs from saved notebook files. Core changes to OutputsManager: - Extract private utility functions (_create_output_url, _create_output_placeholder) - Add comprehensive docstrings to all methods - Simplify write() method by removing stream_limit logic - Improve error handling in get_outputs() to return empty list instead of raising - Consolidate output processing logic into _process_outputs_from_cell() - Add helper methods: _upgrade_notebook_format(), _ensure_cell_id() - Always write full outputs to notebook files on save (traditional Jupyter behavior) - Remove stream-specific handling and StreamAPIHandler route New OptimizedOutputsManager: - Extends base OutputsManager with exclude_outputs metadata flag support - When exclude_outputs=True: outputs stored only in runtime, not in saved files - When exclude_outputs=False/unset: full outputs included in saved files (default) - Implements stream_limit (500) for large stream outputs with link placeholders - Provides _append_to_stream_file() for efficient stream handling - Stream API handler for accessing accumulated stream outputs Other improvements: - Add __all__ to outputs/__init__.py for cleaner exports - Expand test coverage with comprehensive test suite - Rename private methods for clarity (_process_loaded_excluded_outputs, etc.) - Update yroom_file_api to use process_saving_notebook correctly The OptimizedOutputsManager is currently experimental and disabled by default. StreamAPIHandler route is commented out until the feature is ready for production.
1 parent babd48c commit 63e25bb

File tree

8 files changed

+1753
-394
lines changed

8 files changed

+1753
-394
lines changed

dev-environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: serverdocs
22
channels:
33
- conda-forge
44
dependencies:
5-
- python
5+
- python=3.13
66
- nodejs=22
77
- uv
88
- jupyterlab

jupyter_server_documents/kernels/kernel_client.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ async def stop_listening(self):
102102
def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
103103
"""
104104
Handle incoming kernel messages and set up immediate cell execution state tracking.
105-
105+
106106
This method processes incoming kernel messages and caches them for response mapping.
107107
Importantly, it detects execute_request messages and immediately sets the corresponding
108108
cell state to 'busy' to provide real-time feedback for queued cell executions.
109-
109+
110110
This ensures that when multiple cells are executed simultaneously, all queued cells
111111
show a '*' prompt immediately, not just the currently executing cell.
112-
112+
113113
Args:
114114
channel_name: The kernel channel name (shell, iopub, etc.)
115115
msg: The raw kernel message as bytes
@@ -119,32 +119,35 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
119119
# source channel.
120120
header = self.session.unpack(msg[0])
121121
msg_id = header["msg_id"]
122-
msg_type = header.get("msg_type")
122+
msg_type = header.get("msg_type")
123123
metadata = self.session.unpack(msg[2])
124124
cell_id = metadata.get("cellId")
125-
125+
126126
# Clear cell outputs if cell is re-executed
127127
if cell_id:
128128
existing = self.message_cache.get(cell_id=cell_id)
129129
if existing and existing['msg_id'] != msg_id:
130130
asyncio.create_task(self.output_processor.clear_cell_outputs(cell_id))
131-
131+
132132
# IMPORTANT: Set cell to 'busy' immediately when execute_request is received
133133
# This ensures queued cells show '*' prompt even before kernel starts processing them
134134
if msg_type == "execute_request" and channel_name == "shell" and cell_id:
135135
for yroom in self._yrooms:
136136
yroom.set_cell_awareness_state(cell_id, "busy")
137-
137+
138138
self.message_cache.add({
139139
"msg_id": msg_id,
140140
"channel": channel_name,
141141
"cell_id": cell_id
142142
})
143143
channel = getattr(self, f"{channel_name}_channel")
144+
if channel.socket is None:
145+
self.log.error(f"Channel {channel_name} socket is None! Cannot send message. Channel alive: {channel.is_alive()}")
146+
raise AttributeError(f"Channel {channel_name} socket is None")
144147
channel.session.send_raw(channel.socket, msg)
145148

146149
def send_kernel_info(self):
147-
"""Sends a kernel info message on the shell channel. Useful
150+
"""Sends a kernel info message on the shell channel. Useful
148151
for determining if the kernel is busy or idle.
149152
"""
150153
msg = self.session.msg("kernel_info_request")
@@ -240,10 +243,16 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona
240243
except Exception as e:
241244
self.log.error(f"Error deserializing message: {e}")
242245
raise
243-
244-
parent_msg_id = dmsg["parent_header"]["msg_id"]
245-
parent_msg_data = self.message_cache.get(parent_msg_id)
246-
cell_id = parent_msg_data.get('cell_id')
246+
247+
# Safely get parent message ID and data
248+
parent_header = dmsg.get("parent_header", {})
249+
parent_msg_id = parent_header.get("msg_id")
250+
251+
# Get parent message data from cache (may be None if not found)
252+
parent_msg_data = self.message_cache.get(parent_msg_id) if parent_msg_id else None
253+
254+
# Safely extract cell_id
255+
cell_id = parent_msg_data.get('cell_id') if parent_msg_data else None
247256

248257
# Handle different message types using pattern matching
249258
match dmsg["msg_type"]:
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
11
from .handlers import outputs_handlers
22
from .manager import OutputsManager
33
from .output_processor import OutputProcessor
4+
5+
__all__ = [
6+
'OutputsManager',
7+
'OutputProcessor',
8+
'outputs_handlers'
9+
]

jupyter_server_documents/outputs/handlers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,7 @@ async def get(self, file_id=None, cell_id=None):
8686

8787
outputs_handlers = [
8888
(rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}(?:/{_output_index_regex}.output)?", OutputsAPIHandler),
89-
(rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler),
89+
# We have disabled this for now as OptimizedOutputsManager is experimental.
90+
# Uncomment this to use OptimizedOutputsManager.
91+
# (rf"/api/outputs/{_file_id_regex}/{_cell_id_regex}/stream", StreamAPIHandler),
9092
]

0 commit comments

Comments
 (0)