3232# Initialize MCP server
3333mcp = FastMCP (SERVER_NAME )
3434
35+ # Global Databend client singleton
36+ _databend_client = None
37+
38+
39+ def get_global_databend_client ():
40+ """Get global Databend client instance (deprecated, use create_databend_client)."""
41+ global _databend_client
42+ if _databend_client is None :
43+ _databend_client = create_databend_client ()
44+ return _databend_client
45+
3546
3647def is_sql_safe (sql : str ) -> tuple [bool , str ]:
3748 """
3849 Check if SQL query is safe to execute in safe mode.
39-
50+
4051 Args:
4152 sql: SQL query string to check
42-
53+
4354 Returns:
4455 Tuple of (is_safe, reason) where is_safe is boolean and reason is error message if unsafe
4556 """
4657 sql_upper = sql .upper ().strip ()
47-
58+
4859 # List of dangerous operations to block in safe mode
4960 dangerous_patterns = [
50- (r' \bDROP\s+' , "DROP operations are not allowed in MCP safe mode" ),
51- (r' \bDELETE\s+' , "DELETE operations are not allowed in MCP safe mode" ),
52- (r' \bTRUNCATE\s+' , "TRUNCATE operations are not allowed in MCP safe mode" ),
53- (r' \bALTER\s+' , "ALTER operations are not allowed in MCP safe mode" ),
54- (r' \bUPDATE\s+' , "UPDATE operations are not allowed in MCP safe mode" ),
55- (r' \bREVOKE\s+' , "REVOKE operations are not allowed in MCP safe mode" ),
61+ (r" \bDROP\s+" , "DROP operations are not allowed in MCP safe mode" ),
62+ (r" \bDELETE\s+" , "DELETE operations are not allowed in MCP safe mode" ),
63+ (r" \bTRUNCATE\s+" , "TRUNCATE operations are not allowed in MCP safe mode" ),
64+ (r" \bALTER\s+" , "ALTER operations are not allowed in MCP safe mode" ),
65+ (r" \bUPDATE\s+" , "UPDATE operations are not allowed in MCP safe mode" ),
66+ (r" \bREVOKE\s+" , "REVOKE operations are not allowed in MCP safe mode" ),
5667 ]
57-
68+
5869 # Check each dangerous pattern
5970 for pattern , reason in dangerous_patterns :
6071 if re .search (pattern , sql_upper , re .IGNORECASE | re .DOTALL ):
6172 return False , reason
62-
73+
6374 return True , ""
6475
6576
6677def create_databend_client ():
6778 """Create and return a Databend client instance."""
6879 config = get_config ()
69- from databend_driver import BlockingDatabendClient
7080
71- return BlockingDatabendClient (config .dsn )
81+ if config .local_mode :
82+ # Use local in-memory Databend
83+ import databend
84+
85+ return databend .SessionContext ()
86+ else :
87+ # Use remote Databend server
88+ from databend_driver import BlockingDatabendClient
89+
90+ return BlockingDatabendClient (config .dsn )
7291
7392
7493def execute_databend_query (sql : str ) -> list [dict ] | dict :
@@ -81,20 +100,28 @@ def execute_databend_query(sql: str) -> list[dict] | dict:
81100 Returns:
82101 List of dictionaries containing query results or error dictionary
83102 """
84- client = create_databend_client ()
85- conn = client . get_conn ()
103+ client = get_global_databend_client ()
104+ config = get_config ()
86105
87106 try :
88- cursor = conn .query_iter (sql )
89- column_names = [field .name for field in cursor .schema ().fields ()]
90- results = []
91-
92- for row in cursor :
93- row_data = dict (zip (column_names , list (row .values ())))
94- results .append (row_data )
95-
96- logger .info (f"Query executed successfully, returned { len (results )} rows" )
97- return results
107+ if config .local_mode :
108+ # Handle local in-memory Databend
109+ result = client .sql (sql )
110+ df = result .to_pandas ()
111+ return df .to_dict ("records" )
112+ else :
113+ # Handle remote Databend server
114+ conn = client .get_conn ()
115+ cursor = conn .query_iter (sql )
116+ column_names = [field .name for field in cursor .schema ().fields ()]
117+ results = []
118+
119+ for row in cursor :
120+ row_data = dict (zip (column_names , list (row .values ())))
121+ results .append (row_data )
122+
123+ logger .info (f"Query executed successfully, returned { len (results )} rows" )
124+ return results
98125
99126 except Exception as err :
100127 error_msg = f"Error executing query: { str (err )} "
@@ -104,7 +131,7 @@ def execute_databend_query(sql: str) -> list[dict] | dict:
104131
105132def _execute_sql (sql : str ) -> dict :
106133 logger .info (f"Executing SQL query: { sql } " )
107-
134+
108135 # Check safe mode configuration
109136 config = get_config ()
110137 if config .safe_mode :
@@ -141,12 +168,13 @@ def _execute_sql(sql: str) -> dict:
141168 logger .error (error_msg )
142169 return {"status" : "error" , "message" : error_msg }
143170
171+
144172@mcp .tool ()
145173async def execute_sql (sql : str ) -> dict :
146174 """
147175 Execute SQL query against Databend database with MCP safe mode protection.
148-
149- Safe mode (enabled by default) blocks dangerous operations like DROP, DELETE,
176+
177+ Safe mode (enabled by default) blocks dangerous operations like DROP, DELETE,
150178 TRUNCATE, ALTER, UPDATE, and REVOKE. Set SAFE_MODE=false to disable.
151179
152180 Args:
@@ -159,14 +187,14 @@ async def execute_sql(sql: str) -> dict:
159187
160188
161189@mcp .tool ()
162- def show_databases ():
190+ async def show_databases ():
163191 """List available Databend databases (safe operation, not affected by MCP safe mode)"""
164192 logger .info ("Listing all databases" )
165193 return _execute_sql ("SHOW DATABASES" )
166194
167195
168196@mcp .tool ()
169- def show_tables (database : Optional [str ] = None , filter : Optional [str ] = None ):
197+ async def show_tables (database : Optional [str ] = None , filter : Optional [str ] = None ):
170198 """
171199 List available Databend tables in a database (safe operation, not affected by MCP safe mode)
172200 Args:
@@ -186,7 +214,7 @@ def show_tables(database: Optional[str] = None, filter: Optional[str] = None):
186214
187215
188216@mcp .tool ()
189- def describe_table (table : str , database : Optional [str ] = None ):
217+ async def describe_table (table : str , database : Optional [str ] = None ):
190218 """
191219 Describe a Databend table (safe operation, not affected by MCP safe mode)
192220 Args:
@@ -201,7 +229,7 @@ def describe_table(table: str, database: Optional[str] = None):
201229 table = f"{ database } .{ table } "
202230 logger .info (f"Describing table '{ table } '" )
203231 sql = f"DESCRIBE TABLE { table } "
204- return execute_sql (sql )
232+ return _execute_sql (sql )
205233
206234
207235@mcp .tool ()
@@ -212,7 +240,7 @@ def show_stages():
212240
213241
214242@mcp .tool ()
215- def list_stage_files (stage_name : str , path : Optional [str ] = None ):
243+ async def list_stage_files (stage_name : str , path : Optional [str ] = None ):
216244 """
217245 List files in a Databend stage (safe operation, not affected by MCP safe mode)
218246 Args:
@@ -222,28 +250,30 @@ def list_stage_files(stage_name: str, path: Optional[str] = None):
222250 Returns:
223251 Dictionary containing either query results or error information
224252 """
225- if not stage_name .startswith ('@' ):
253+ if not stage_name .startswith ("@" ):
226254 stage_name = f"@{ stage_name } "
227-
255+
228256 if path :
229257 stage_path = f"{ stage_name } /{ path .strip ('/' )} "
230258 else :
231259 stage_path = stage_name
232-
260+
233261 logger .info (f"Listing files in stage '{ stage_path } '" )
234262 sql = f"LIST { stage_path } "
235263 return _execute_sql (sql )
236264
237265
238266@mcp .tool ()
239- def show_connections ():
267+ async def show_connections ():
240268 """List available Databend connections (safe operation, not affected by MCP safe mode)"""
241269 logger .info ("Listing all connections" )
242270 return _execute_sql ("SHOW CONNECTIONS" )
243271
244272
245273@mcp .tool ()
246- async def create_stage (name : str , url : str , connection_name : Optional [str ] = None , ** kwargs ) -> dict :
274+ async def create_stage (
275+ name : str , url : str , connection_name : Optional [str ] = None , ** kwargs
276+ ) -> dict :
247277 """
248278 Create a Databend stage with connection
249279 Args:
@@ -256,17 +286,17 @@ async def create_stage(name: str, url: str, connection_name: Optional[str] = Non
256286 Dictionary containing either query results or error information
257287 """
258288 logger .info (f"Creating stage '{ name } ' with URL '{ url } '" )
259-
289+
260290 sql_parts = [f"CREATE STAGE { name } " , f"URL = '{ url } '" ]
261-
291+
262292 if connection_name :
263293 sql_parts .append (f"CONNECTION = (CONNECTION_NAME = '{ connection_name } ')" )
264-
294+
265295 # Add any additional options from kwargs
266296 for key , value in kwargs .items ():
267- if key not in [' name' , ' url' , ' connection_name' ]:
297+ if key not in [" name" , " url" , " connection_name" ]:
268298 sql_parts .append (f"{ key .upper ()} = '{ value } '" )
269-
299+
270300 sql = " " .join (sql_parts )
271301 return _execute_sql (sql )
272302
0 commit comments