Skip to content

Commit 50419d7

Browse files
committed
WIP: Message router
1 parent cae70c5 commit 50419d7

File tree

9 files changed

+516
-34
lines changed

9 files changed

+516
-34
lines changed

README.md

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,49 @@
22

33
[![Github Actions Status](https://github.com/jupyter-ai-contrib/jupyter-ai-router/workflows/Build/badge.svg)](https://github.com/jupyter-ai-contrib/jupyter-ai-router/actions/workflows/build.yml)
44

5-
Core routing layer of Jupyter AI
5+
Core message routing layer for Jupyter AI
66

7-
This extension is composed of a Python package named `jupyter_ai_router`
8-
for the server extension and a NPM package named `@jupyter-ai/router`
9-
for the frontend extension.
7+
This extension provides the foundational message routing functionality for Jupyter AI. It automatically detects new chat sessions and routes messages to registered callbacks based on message type (slash commands vs regular messages). Extensions can register callbacks to handle specific chat events without needing to manage chat lifecycle directly.
8+
9+
## Usage
10+
11+
### Basic MessageRouter Setup
12+
13+
```python
14+
# The router is automatically available in other extensions via settings
15+
router = self.settings['jupyter-ai']['router']
16+
17+
# Register callbacks for different event types
18+
def on_new_chat(room_id: str, ychat: YChat):
19+
print(f"New chat connected: {room_id}")
20+
21+
def on_slash_command(room_id: str, message: Message):
22+
print(f"Slash command in {room_id}: {message.body}")
23+
24+
def on_regular_message(room_id: str, message: Message):
25+
print(f"Regular message in {room_id}: {message.body}")
26+
27+
# Register the callbacks
28+
router.observe_chat_init(on_new_chat)
29+
router.observe_slash_cmd_msg("room-id", on_slash_command)
30+
router.observe_chat_msg("room-id", on_regular_message)
31+
```
32+
33+
### Message Flow
34+
35+
1. **Router detects new chats** - Automatically listens for chat room initialization events
36+
2. **Router connects chats** - Establishes observers on YChat message streams
37+
3. **Router routes messages** - Calls appropriate callbacks based on message type (slash vs regular)
38+
4. **Extensions respond** - Your callbacks receive room_id and message data
39+
40+
### Available Methods
41+
42+
- `observe_chat_init(callback)` - Called when new chat sessions are initialized with `(room_id, ychat)`
43+
- `observe_slash_cmd_msg(room_id, callback)` - Called for messages starting with `/` in a specific room
44+
- `observe_chat_msg(room_id, callback)` - Called for regular (non-slash) messages in a specific room
45+
- `connect_chat(room_id, ychat)` - Manually connect a chat (usually automatic)
46+
- `disconnect_chat(room_id)` - Disconnect a chat session
47+
- `cleanup()` - Clean up all resources when shutting down
1048

1149
## QUICK START
1250

jupyter_ai_router/__init__.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import warnings
88
warnings.warn("Importing 'jupyter_ai_router' outside a proper installation.")
99
__version__ = "dev"
10-
from .handlers import setup_handlers
10+
11+
from .extension import RouterExtension
1112

1213

1314
def _jupyter_labextension_paths():
@@ -18,19 +19,4 @@ def _jupyter_labextension_paths():
1819

1920

2021
def _jupyter_server_extension_points():
21-
return [{
22-
"module": "jupyter_ai_router"
23-
}]
24-
25-
26-
def _load_jupyter_server_extension(server_app):
27-
"""Registers the API handler to receive HTTP requests from the frontend extension.
28-
29-
Parameters
30-
----------
31-
server_app: jupyterlab.labapp.LabApp
32-
JupyterLab application instance
33-
"""
34-
setup_handlers(server_app.web_app)
35-
name = "jupyter_ai_router"
36-
server_app.log.info(f"Registered {name} server extension")
22+
return [{"module": "jupyter_ai_router", "app": RouterExtension}]

jupyter_ai_router/extension.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from __future__ import annotations
2+
import time
3+
from jupyter_events import EventLogger
4+
from jupyter_server.extension.application import ExtensionApp
5+
from jupyter_server.serverapp import ServerApp
6+
7+
from .router import MessageRouter
8+
9+
# Check jupyter-collaboration version for compatibility
10+
try:
11+
from jupyter_collaboration import __version__ as jupyter_collaboration_version
12+
JCOLLAB_VERSION = int(jupyter_collaboration_version[0])
13+
if JCOLLAB_VERSION >= 3:
14+
from jupyter_server_ydoc.utils import JUPYTER_COLLABORATION_EVENTS_URI
15+
else:
16+
from jupyter_collaboration.utils import JUPYTER_COLLABORATION_EVENTS_URI
17+
except ImportError:
18+
# Fallback if jupyter-collaboration is not available
19+
JUPYTER_COLLABORATION_EVENTS_URI = "https://events.jupyter.org/jupyter_collaboration"
20+
21+
22+
class RouterExtension(ExtensionApp):
23+
"""
24+
Jupyter AI Router Extension
25+
"""
26+
27+
name = "jupyter_ai_router"
28+
handlers = [] # No HTTP handlers needed
29+
30+
def initialize_settings(self):
31+
"""Initialize router settings and event listeners."""
32+
start = time.time()
33+
34+
# Create MessageRouter instance
35+
self.router = MessageRouter(parent=self)
36+
37+
# Make router available to other extensions
38+
if 'jupyter-ai' not in self.settings:
39+
self.settings['jupyter-ai'] = {}
40+
self.settings['jupyter-ai']['router'] = self.router
41+
42+
# Listen for new chat room events
43+
if self.serverapp is not None:
44+
self.event_logger = self.serverapp.web_app.settings["event_logger"]
45+
self.event_logger.add_listener(
46+
schema_id=JUPYTER_COLLABORATION_EVENTS_URI,
47+
listener=self._on_chat_event
48+
)
49+
50+
elapsed = time.time() - start
51+
self.log.info(f"Initialized RouterExtension in {elapsed:.2f}s")
52+
53+
async def _on_chat_event(
54+
self, logger: EventLogger, schema_id: str, data: dict
55+
) -> None:
56+
"""Handle chat room events and connect new chats to router."""
57+
# Only handle chat room initialization events
58+
if not (
59+
data["room"].startswith("text:chat:")
60+
and data["action"] == "initialize"
61+
and data["msg"] == "Room initialized"
62+
):
63+
return
64+
65+
room_id = data["room"]
66+
self.log.info(f"New chat room detected: {room_id}")
67+
68+
# Get YChat document for the room
69+
ychat = await self._get_chat(room_id)
70+
if ychat is None:
71+
self.log.error(f"Failed to get YChat for room {room_id}")
72+
return
73+
74+
# Connect chat to router
75+
self.router.connect_chat(room_id, ychat)
76+
77+
async def _get_chat(self, room_id: str):
78+
"""Get YChat instance for a room ID."""
79+
if not self.serverapp:
80+
return None
81+
82+
try:
83+
if JCOLLAB_VERSION >= 3:
84+
collaboration = self.serverapp.web_app.settings["jupyter_server_ydoc"]
85+
document = await collaboration.get_document(room_id=room_id, copy=False)
86+
else:
87+
collaboration = self.serverapp.web_app.settings["jupyter_collaboration"]
88+
server = collaboration.ywebsocket_server
89+
room = await server.get_room(room_id)
90+
document = room._document
91+
92+
return document
93+
except Exception as e:
94+
self.log.error(f"Error getting chat document for {room_id}: {e}")
95+
return None
96+
97+
def stop_extension(self):
98+
"""Clean up router when extension stops."""
99+
try:
100+
if hasattr(self, 'router'):
101+
self.router.cleanup()
102+
except Exception as e:
103+
self.log.error(f"Error during router cleanup: {e}")

jupyter_ai_router/router.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
"""
2+
MessageRouter that manages message routing with callbacks.
3+
4+
This module provides a MessageRouter that:
5+
- Handles new chat connections
6+
- Routes slash commands and regular messages via callbacks
7+
- Manages lifecycle and cleanup
8+
"""
9+
10+
from typing import Any, Callable, Dict, List, Set, TYPE_CHECKING
11+
from functools import partial
12+
from jupyterlab_chat.models import Message
13+
from pycrdt import ArrayEvent
14+
from traitlets.config import LoggingConfigurable
15+
16+
if TYPE_CHECKING:
17+
from jupyterlab_chat.ychat import YChat
18+
19+
from .utils import get_first_word
20+
21+
22+
23+
class MessageRouter(LoggingConfigurable):
24+
"""
25+
Router that manages ychat message routing.
26+
27+
The Router provides three callback points:
28+
1. When new chats are initialized
29+
2. When slash commands are received
30+
3. When regular (non-slash) messages are received
31+
"""
32+
33+
def __init__(self, *args, **kwargs):
34+
super().__init__(*args, **kwargs)
35+
36+
# Callback lists
37+
self.chat_init_observers: List[Callable[[str, "YChat"], Any]] = []
38+
self.slash_cmd_observers: Dict[str, List[Callable[[str, Message], Any]]] = {}
39+
self.chat_msg_observers: Dict[str, List[Callable[[str, Message], Any]]] = {}
40+
41+
# Active chat rooms
42+
self.active_chats: Dict[str, "YChat"] = {}
43+
44+
# Root observers for keeping track of incoming messages
45+
self.message_observers: Dict[str, Callable] = {}
46+
47+
def observe_chat_init(self, callback: Callable[[str, "YChat"], Any]) -> None:
48+
"""
49+
Register a callback for when new chats are initialized.
50+
51+
Args:
52+
callback: Function called with (room_id: str, ychat: YChat) when chat connects
53+
"""
54+
self.chat_init_observers.append(callback)
55+
self.log.info("Registered new chat initialization callback")
56+
57+
def observe_slash_cmd_msg(self, room_id: str, callback: Callable[[str, Message], Any]) -> None:
58+
"""
59+
Register a callback for when slash commands are received.
60+
61+
Args:
62+
callback: Function called with (room_id: str, message: Message) for slash commands
63+
"""
64+
if room_id not in self.slash_cmd_observers:
65+
self.slash_cmd_observers[room_id] = []
66+
67+
self.slash_cmd_observers[room_id].append(callback)
68+
self.log.info("Registered slash command callback")
69+
70+
def observe_chat_msg(self, room_id: str, callback: Callable[[str, Message], Any]) -> None:
71+
"""
72+
Register a callback for when regular (non-slash) messages are received.
73+
74+
Args:
75+
callback: Function called with (room_id: str, message: Message) for regular messages
76+
"""
77+
if room_id not in self.chat_msg_observers:
78+
self.chat_msg_observers[room_id] = []
79+
80+
self.chat_msg_observers[room_id].append(callback)
81+
self.log.info("Registered message callback")
82+
83+
def connect_chat(self, room_id: str, ychat: "YChat") -> None:
84+
"""
85+
Connect a new chat session to the router.
86+
87+
Args:
88+
room_id: Unique identifier for the chat room
89+
ychat: YChat instance for the room
90+
"""
91+
if room_id in self.active_chats:
92+
self.log.warning(f"Chat {room_id} already connected to router")
93+
return
94+
95+
self.active_chats[room_id] = ychat
96+
97+
# Set up message observer
98+
callback = partial(self._on_message_change, room_id, ychat)
99+
ychat.ymessages.observe(callback)
100+
self.message_observers[room_id] = callback
101+
102+
self.log.info(f"Connected chat {room_id} to router")
103+
104+
# Notify new chat observers
105+
self._notify_chat_init_observers(room_id, ychat)
106+
107+
def disconnect_chat(self, room_id: str) -> None:
108+
"""
109+
Disconnect a chat session from the router.
110+
111+
Args:
112+
room_id: Unique identifier for the chat room
113+
"""
114+
if room_id not in self.active_chats:
115+
return
116+
117+
# Remove message observer
118+
if room_id in self.message_observers:
119+
ychat = self.active_chats[room_id]
120+
try:
121+
ychat.ymessages.unobserve(self.message_observers[room_id])
122+
except Exception as e:
123+
self.log.warning(f"Failed to unobserve chat {room_id}: {e}")
124+
del self.message_observers[room_id]
125+
126+
del self.active_chats[room_id]
127+
self.log.info(f"Disconnected chat {room_id} from router")
128+
129+
def _on_message_change(self, room_id: str, ychat: "YChat", events: ArrayEvent) -> None:
130+
"""Handle incoming messages from YChat."""
131+
for change in events.delta: # type: ignore[attr-defined]
132+
if "insert" not in change.keys():
133+
continue
134+
135+
# Process new messages (filter out raw_time duplicates)
136+
new_messages = [
137+
Message(**m) for m in change["insert"]
138+
if not m.get("raw_time", False)
139+
]
140+
141+
for message in new_messages:
142+
self._route_message(room_id, message)
143+
144+
def _route_message(self, room_id: str, message: Message) -> None:
145+
"""
146+
Route an incoming message to appropriate observers.
147+
148+
Args:
149+
room_id: The chat room ID
150+
message: The message to route
151+
"""
152+
first_word = get_first_word(message.body)
153+
154+
# Check if it's a slash command
155+
if first_word and first_word.startswith("/"):
156+
self._notify_slash_cmd_observers(room_id, message)
157+
else:
158+
self._notify_msg_observers(room_id, message)
159+
160+
def _notify_chat_init_observers(self, room_id: str, ychat: "YChat") -> None:
161+
"""Notify all new chat observers."""
162+
for callback in self.chat_init_observers:
163+
try:
164+
callback(room_id, ychat)
165+
except Exception as e:
166+
self.log.error(f"New chat observer error for {room_id}: {e}")
167+
168+
def _notify_slash_cmd_observers(self, room_id: str, message: Message) -> None:
169+
"""Notify all slash command observers."""
170+
callbacks = self.slash_cmd_observers.get(room_id, [])
171+
for callback in callbacks:
172+
try:
173+
callback(room_id, message)
174+
except Exception as e:
175+
self.log.error(f"Slash command observer error for {room_id}: {e}")
176+
177+
def _notify_msg_observers(self, room_id: str, message: Message) -> None:
178+
"""Notify all message observers."""
179+
callbacks = self.chat_msg_observers.get(room_id, [])
180+
for callback in callbacks:
181+
try:
182+
callback(room_id, message)
183+
except Exception as e:
184+
self.log.error(f"Message observer error for {room_id}: {e}")
185+
186+
def cleanup(self) -> None:
187+
"""Clean up router resources."""
188+
self.log.info("Cleaning up MessageRouter...")
189+
190+
# Disconnect all chats
191+
room_ids = list(self.active_chats.keys())
192+
for room_id in room_ids:
193+
self.disconnect_chat(room_id)
194+
195+
# Clear callbacks
196+
self.chat_init_observers.clear()
197+
self.slash_cmd_observers.clear()
198+
self.chat_msg_observers.clear()
199+
200+
self.log.info("MessageRouter cleanup complete")

0 commit comments

Comments
 (0)