Skip to content

Commit 2348e8d

Browse files
committed
add sse
1 parent b3ba438 commit 2348e8d

File tree

6 files changed

+850
-42
lines changed

6 files changed

+850
-42
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,15 @@ Add to your MCP client configuration (e.g., Claude Desktop, Windsurf):
108108
- **Claude Code** (CLI)
109109
- **Windsurf** / **Claude Desktop** / **Continue.dev** / **Cursor IDE**
110110

111+
#### Options variables
112+
113+
- `DATABEND_DSN`: Databend connection string
114+
- `LOCAL_MODE`: Set to `true` to use local Databend
115+
- `SAFE_MODE`: Set to `false` to disable safe mode
116+
- `DATABEND_MCP_SERVER_TRANSPORT`: Default to `stdio`, set to `http` or `sse` to enable HTTP/SSE transport
117+
- `DATABEND_MCP_BIND_HOST`: Default to `127.0.0.1`, set to bind host for HTTP/SSE transport
118+
- `DATABEND_MCP_BIND_PORT`: Default to `8001`, set to bind port for HTTP/SSE transport
119+
111120
### Step 4: Start Using
112121

113122
Once configured, you can ask your AI assistant to:

mcp_databend/env.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
from dataclasses import dataclass
44
import os
5+
from enum import Enum
6+
7+
8+
class TransportType(Enum):
9+
"""Transport types for MCP server."""
10+
STDIO = "stdio"
11+
HTTP = "http"
12+
SSE = "sse"
13+
14+
@classmethod
15+
def values(cls):
16+
"""Get all transport type values."""
17+
return [member.value for member in cls]
518

619

720
@dataclass
@@ -40,6 +53,46 @@ def local_mode(self) -> bool:
4053
"on",
4154
)
4255

56+
@property
57+
def mcp_server_transport(self) -> str:
58+
"""Get the MCP server transport method.
59+
60+
Valid options: "stdio", "http", "sse"
61+
Default: "stdio"
62+
"""
63+
transport = os.getenv("DATABEND_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower()
64+
65+
# Validate transport type
66+
if transport not in TransportType.values():
67+
valid_options = ", ".join(f'"{t}"' for t in TransportType.values())
68+
raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}")
69+
return transport
70+
71+
@property
72+
def mcp_bind_host(self) -> str:
73+
"""Get the MCP server bind host for HTTP/SSE transports.
74+
75+
Default: "127.0.0.1"
76+
"""
77+
return os.getenv("DATABEND_MCP_BIND_HOST", "127.0.0.1")
78+
79+
@property
80+
def mcp_bind_port(self) -> int:
81+
"""Get the MCP server bind port for HTTP/SSE transports.
82+
83+
Default: 8001
84+
"""
85+
port_str = os.getenv("DATABEND_MCP_BIND_PORT", "8001")
86+
try:
87+
port = int(port_str)
88+
if port < 1 or port > 65535:
89+
raise ValueError(f"Port must be between 1 and 65535, got {port}")
90+
return port
91+
except ValueError as e:
92+
if "invalid literal" in str(e):
93+
raise ValueError(f"Invalid port value '{port_str}'. Must be a valid integer.")
94+
raise
95+
4396

4497
# Global instance placeholder for the singleton pattern
4598
_CONFIG_INSTANCE = None

mcp_databend/main.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,26 @@
33
import sys
44
import logging
55
from .server import mcp, logger
6+
from .env import get_config, TransportType
67

78

89
def main():
910
"""Main entry point for the MCP server."""
1011
try:
11-
logger.info("Starting Databend MCP Server...")
12-
mcp.run()
12+
config = get_config()
13+
transport = config.mcp_server_transport
14+
15+
logger.info(f"Starting Databend MCP Server with transport: {transport}")
16+
17+
# For HTTP and SSE transports, we need to specify host and port
18+
http_transports = [TransportType.HTTP.value, TransportType.SSE.value]
19+
if transport in http_transports:
20+
# Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0)
21+
# and bind port (defaults to 8001)
22+
mcp.run(transport=transport, host=config.mcp_bind_host, port=config.mcp_bind_port)
23+
else:
24+
# For stdio transport, no host or port is needed
25+
mcp.run(transport=transport)
1326
except KeyboardInterrupt:
1427
logger.info("Shutting down server by user request")
1528
except Exception as e:

mcp_databend/server.py

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
import logging
44
import sys
55
import re
6-
from mcp.server.fastmcp import FastMCP
6+
from fastmcp import FastMCP
7+
from fastmcp.tools import Tool
78
import concurrent.futures
89
from dotenv import load_dotenv
910
import pyarrow as pa
1011
import atexit
1112
from typing import Optional
12-
from .env import get_config
13+
from .env import get_config, TransportType
1314

1415
# Constants
1516
SERVER_NAME = "mcp-databend"
@@ -165,7 +166,11 @@ def _execute_sql(sql: str) -> dict:
165166
logger.warning(error_msg)
166167
return {"status": "error", "message": error_msg}
167168

168-
return result
169+
# Ensure we always return a dict structure for fastmcp compatibility
170+
if isinstance(result, list):
171+
return {"status": "success", "data": result, "row_count": len(result)}
172+
else:
173+
return {"status": "success", "data": result}
169174

170175
except concurrent.futures.TimeoutError:
171176
error_msg = f"Query timed out after {DEFAULT_TIMEOUT} seconds"
@@ -179,8 +184,7 @@ def _execute_sql(sql: str) -> dict:
179184
return {"status": "error", "message": error_msg}
180185

181186

182-
@mcp.tool()
183-
async def execute_sql(sql: str) -> dict:
187+
def execute_sql(sql: str) -> dict:
184188
"""
185189
Execute SQL query against Databend database with MCP safe mode protection.
186190
@@ -196,15 +200,13 @@ async def execute_sql(sql: str) -> dict:
196200
return _execute_sql(sql)
197201

198202

199-
@mcp.tool()
200-
async def show_databases():
203+
def show_databases():
201204
"""List available Databend databases (safe operation, not affected by MCP safe mode)"""
202205
logger.info("Listing all databases")
203206
return _execute_sql("SHOW DATABASES")
204207

205208

206-
@mcp.tool()
207-
async def show_tables(database: Optional[str] = None, filter: Optional[str] = None):
209+
def show_tables(database: Optional[str] = None, filter: Optional[str] = None):
208210
"""
209211
List available Databend tables in a database (safe operation, not affected by MCP safe mode)
210212
Args:
@@ -222,9 +224,20 @@ async def show_tables(database: Optional[str] = None, filter: Optional[str] = No
222224
sql += f" WHERE {filter}"
223225
return _execute_sql(sql)
224226

227+
def show_functions(filter: Optional[str] = None):
228+
"""List available Databend functions (safe operation, not affected by MCP safe mode)
229+
Args:
230+
filter: The filter string, eg: "name like 'add%'"
231+
Returns:
232+
Dictionary containing either query results or error information
233+
"""
234+
logger.info("Listing all functions")
235+
sql = "SHOW FUNCTIONS"
236+
if filter is not None:
237+
sql += f" WHERE {filter}"
238+
return _execute_sql(sql)
225239

226-
@mcp.tool()
227-
async def describe_table(table: str, database: Optional[str] = None):
240+
def describe_table(table: str, database: Optional[str] = None):
228241
"""
229242
Describe a Databend table (safe operation, not affected by MCP safe mode)
230243
Args:
@@ -242,15 +255,13 @@ async def describe_table(table: str, database: Optional[str] = None):
242255
return _execute_sql(sql)
243256

244257

245-
@mcp.tool()
246258
def show_stages():
247259
"""List available Databend stages (safe operation, not affected by MCP safe mode)"""
248260
logger.info("Listing all stages")
249261
return _execute_sql("SHOW STAGES")
250262

251263

252-
@mcp.tool()
253-
async def list_stage_files(stage_name: str, path: Optional[str] = None):
264+
def list_stage_files(stage_name: str, path: Optional[str] = None):
254265
"""
255266
List files in a Databend stage (safe operation, not affected by MCP safe mode)
256267
Args:
@@ -273,24 +284,21 @@ async def list_stage_files(stage_name: str, path: Optional[str] = None):
273284
return _execute_sql(sql)
274285

275286

276-
@mcp.tool()
277-
async def show_connections():
287+
def show_connections():
278288
"""List available Databend connections (safe operation, not affected by MCP safe mode)"""
279289
logger.info("Listing all connections")
280290
return _execute_sql("SHOW CONNECTIONS")
281291

282292

283-
@mcp.tool()
284-
async def create_stage(
285-
name: str, url: str, connection_name: Optional[str] = None, **kwargs
293+
def create_stage(
294+
name: str, url: str, connection_name: Optional[str] = None
286295
) -> dict:
287296
"""
288297
Create a Databend stage with connection
289298
Args:
290299
name: The stage name
291300
url: The stage URL (e.g., 's3://bucket-name')
292301
connection_name: Optional connection name to use
293-
**kwargs: Additional stage options
294302
295303
Returns:
296304
Dictionary containing either query results or error information
@@ -302,20 +310,39 @@ async def create_stage(
302310
if connection_name:
303311
sql_parts.append(f"CONNECTION = (CONNECTION_NAME = '{connection_name}')")
304312

305-
# Add any additional options from kwargs
306-
for key, value in kwargs.items():
307-
if key not in ["name", "url", "connection_name"]:
308-
sql_parts.append(f"{key.upper()} = '{value}'")
309-
310313
sql = " ".join(sql_parts)
311314
return _execute_sql(sql)
312315

313316

317+
# Register all tools
318+
mcp.add_tool(Tool.from_function(execute_sql))
319+
mcp.add_tool(Tool.from_function(show_databases))
320+
mcp.add_tool(Tool.from_function(show_tables))
321+
mcp.add_tool(Tool.from_function(show_functions))
322+
mcp.add_tool(Tool.from_function(describe_table))
323+
mcp.add_tool(Tool.from_function(show_stages))
324+
mcp.add_tool(Tool.from_function(list_stage_files))
325+
mcp.add_tool(Tool.from_function(show_connections))
326+
mcp.add_tool(Tool.from_function(create_stage))
327+
328+
314329
def main():
315330
"""Main entry point for the MCP server."""
316331
try:
317-
logger.info("Starting Databend MCP Server...")
318-
mcp.run()
332+
config = get_config()
333+
transport = config.mcp_server_transport
334+
335+
logger.info(f"Starting Databend MCP Server with transport: {transport}")
336+
337+
# For HTTP and SSE transports, we need to specify host and port
338+
http_transports = [TransportType.HTTP.value, TransportType.SSE.value]
339+
if transport in http_transports:
340+
# Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0)
341+
# and bind port (defaults to 8001)
342+
mcp.run(transport=transport, host=config.mcp_bind_host, port=config.mcp_bind_port)
343+
else:
344+
# For stdio transport, no host or port is needed
345+
mcp.run(transport=transport)
319346
except KeyboardInterrupt:
320347
logger.info("Shutting down server by user request")
321348
except Exception as e:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ classifiers = [
2323
dependencies = [
2424
"databend>=1.2.810",
2525
"databend-driver>=0.27.3",
26-
"mcp>=1.9.0",
26+
"fastmcp>=2.12.3",
2727
"pyarrow>=21.0.0",
2828
"python-dotenv>=1.1.0",
2929
]

0 commit comments

Comments
 (0)