33import logging
44import sys
55import re
6- from mcp .server .fastmcp import FastMCP
6+ from fastmcp import FastMCP
7+ from fastmcp .tools import Tool
78import concurrent .futures
89from dotenv import load_dotenv
910import pyarrow as pa
1011import atexit
1112from typing import Optional
12- from .env import get_config
13+ from .env import get_config , TransportType
1314
1415# Constants
1516SERVER_NAME = "mcp-databend"
@@ -165,7 +166,11 @@ def _execute_sql(sql: str) -> dict:
165166 logger .warning (error_msg )
166167 return {"status" : "error" , "message" : error_msg }
167168
168- return result
169+ # Ensure we always return a dict structure for fastmcp compatibility
170+ if isinstance (result , list ):
171+ return {"status" : "success" , "data" : result , "row_count" : len (result )}
172+ else :
173+ return {"status" : "success" , "data" : result }
169174
170175 except concurrent .futures .TimeoutError :
171176 error_msg = f"Query timed out after { DEFAULT_TIMEOUT } seconds"
@@ -179,8 +184,7 @@ def _execute_sql(sql: str) -> dict:
179184 return {"status" : "error" , "message" : error_msg }
180185
181186
182- @mcp .tool ()
183- async def execute_sql (sql : str ) -> dict :
187+ def execute_sql (sql : str ) -> dict :
184188 """
185189 Execute SQL query against Databend database with MCP safe mode protection.
186190
@@ -196,15 +200,13 @@ async def execute_sql(sql: str) -> dict:
196200 return _execute_sql (sql )
197201
198202
199- @mcp .tool ()
200- async def show_databases ():
203+ def show_databases ():
201204 """List available Databend databases (safe operation, not affected by MCP safe mode)"""
202205 logger .info ("Listing all databases" )
203206 return _execute_sql ("SHOW DATABASES" )
204207
205208
206- @mcp .tool ()
207- async def show_tables (database : Optional [str ] = None , filter : Optional [str ] = None ):
209+ def show_tables (database : Optional [str ] = None , filter : Optional [str ] = None ):
208210 """
209211 List available Databend tables in a database (safe operation, not affected by MCP safe mode)
210212 Args:
@@ -222,9 +224,20 @@ async def show_tables(database: Optional[str] = None, filter: Optional[str] = No
222224 sql += f" WHERE { filter } "
223225 return _execute_sql (sql )
224226
227+ def show_functions (filter : Optional [str ] = None ):
228+ """List available Databend functions (safe operation, not affected by MCP safe mode)
229+ Args:
230+ filter: The filter string, eg: "name like 'add%'"
231+ Returns:
232+ Dictionary containing either query results or error information
233+ """
234+ logger .info ("Listing all functions" )
235+ sql = "SHOW FUNCTIONS"
236+ if filter is not None :
237+ sql += f" WHERE { filter } "
238+ return _execute_sql (sql )
225239
226- @mcp .tool ()
227- async def describe_table (table : str , database : Optional [str ] = None ):
240+ def describe_table (table : str , database : Optional [str ] = None ):
228241 """
229242 Describe a Databend table (safe operation, not affected by MCP safe mode)
230243 Args:
@@ -242,15 +255,13 @@ async def describe_table(table: str, database: Optional[str] = None):
242255 return _execute_sql (sql )
243256
244257
245- @mcp .tool ()
246258def show_stages ():
247259 """List available Databend stages (safe operation, not affected by MCP safe mode)"""
248260 logger .info ("Listing all stages" )
249261 return _execute_sql ("SHOW STAGES" )
250262
251263
252- @mcp .tool ()
253- async def list_stage_files (stage_name : str , path : Optional [str ] = None ):
264+ def list_stage_files (stage_name : str , path : Optional [str ] = None ):
254265 """
255266 List files in a Databend stage (safe operation, not affected by MCP safe mode)
256267 Args:
@@ -273,24 +284,21 @@ async def list_stage_files(stage_name: str, path: Optional[str] = None):
273284 return _execute_sql (sql )
274285
275286
276- @mcp .tool ()
277- async def show_connections ():
287+ def show_connections ():
278288 """List available Databend connections (safe operation, not affected by MCP safe mode)"""
279289 logger .info ("Listing all connections" )
280290 return _execute_sql ("SHOW CONNECTIONS" )
281291
282292
283- @mcp .tool ()
284- async def create_stage (
285- name : str , url : str , connection_name : Optional [str ] = None , ** kwargs
293+ def create_stage (
294+ name : str , url : str , connection_name : Optional [str ] = None
286295) -> dict :
287296 """
288297 Create a Databend stage with connection
289298 Args:
290299 name: The stage name
291300 url: The stage URL (e.g., 's3://bucket-name')
292301 connection_name: Optional connection name to use
293- **kwargs: Additional stage options
294302
295303 Returns:
296304 Dictionary containing either query results or error information
@@ -302,20 +310,39 @@ async def create_stage(
302310 if connection_name :
303311 sql_parts .append (f"CONNECTION = (CONNECTION_NAME = '{ connection_name } ')" )
304312
305- # Add any additional options from kwargs
306- for key , value in kwargs .items ():
307- if key not in ["name" , "url" , "connection_name" ]:
308- sql_parts .append (f"{ key .upper ()} = '{ value } '" )
309-
310313 sql = " " .join (sql_parts )
311314 return _execute_sql (sql )
312315
313316
317+ # Register all tools
318+ mcp .add_tool (Tool .from_function (execute_sql ))
319+ mcp .add_tool (Tool .from_function (show_databases ))
320+ mcp .add_tool (Tool .from_function (show_tables ))
321+ mcp .add_tool (Tool .from_function (show_functions ))
322+ mcp .add_tool (Tool .from_function (describe_table ))
323+ mcp .add_tool (Tool .from_function (show_stages ))
324+ mcp .add_tool (Tool .from_function (list_stage_files ))
325+ mcp .add_tool (Tool .from_function (show_connections ))
326+ mcp .add_tool (Tool .from_function (create_stage ))
327+
328+
314329def main ():
315330 """Main entry point for the MCP server."""
316331 try :
317- logger .info ("Starting Databend MCP Server..." )
318- mcp .run ()
332+ config = get_config ()
333+ transport = config .mcp_server_transport
334+
335+ logger .info (f"Starting Databend MCP Server with transport: { transport } " )
336+
337+ # For HTTP and SSE transports, we need to specify host and port
338+ http_transports = [TransportType .HTTP .value , TransportType .SSE .value ]
339+ if transport in http_transports :
340+ # Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0)
341+ # and bind port (defaults to 8001)
342+ mcp .run (transport = transport , host = config .mcp_bind_host , port = config .mcp_bind_port )
343+ else :
344+ # For stdio transport, no host or port is needed
345+ mcp .run (transport = transport )
319346 except KeyboardInterrupt :
320347 logger .info ("Shutting down server by user request" )
321348 except Exception as e :
0 commit comments