@@ -49,6 +49,9 @@ use crate::plans::UndropDatabasePlan;
4949use crate :: BindContext ;
5050use crate :: SelectBuilder ;
5151
52+ pub const DEFAULT_STORAGE_CONNECTION : & str = "DEFAULT_STORAGE_CONNECTION" ;
53+ pub const DEFAULT_STORAGE_PATH : & str = "DEFAULT_STORAGE_PATH" ;
54+
5255impl Binder {
5356 #[ async_backtrace:: framed]
5457 pub ( in crate :: planner:: binder) async fn bind_show_databases (
@@ -217,20 +220,33 @@ impl Binder {
217220 ) ) ) ,
218221
219222 AlterDatabaseAction :: SetOptions { options } => {
220- // Validate database options before processing
221- // For ALTER DATABASE, allow modifying single option (the other already exists)
222- self . validate_database_options ( options, false ) . await ?;
223+ let catalog_arc = self . ctx . get_catalog ( & catalog) . await ?;
224+ let db_exists = catalog_arc. exists_database ( & tenant, & database) . await ?;
225+ if !db_exists && !* if_exists {
226+ return Err ( ErrorCode :: UnknownDatabase ( format ! (
227+ "Unknown database '{}'" ,
228+ database
229+ ) ) ) ;
230+ }
223231
224- // Convert SQLProperty to BTreeMap for database options
225- let db_options = options
226- . iter ( )
227- . map ( |property| ( property. name . clone ( ) , property. value . clone ( ) ) )
228- . collect :: < BTreeMap < String , String > > ( ) ;
232+ // Validate database options only when the database exists.
233+ let db_options = if db_exists {
234+ // For ALTER DATABASE, allow modifying single option (the other already exists)
235+ self . validate_database_options ( options, false ) . await ?;
236+
237+ options
238+ . iter ( )
239+ . map ( |property| ( property. name . clone ( ) , property. value . clone ( ) ) )
240+ . collect :: < BTreeMap < String , String > > ( )
241+ } else {
242+ BTreeMap :: new ( )
243+ } ;
229244
230245 Ok ( Plan :: AlterDatabase ( Box :: new ( AlterDatabasePlan {
231246 tenant,
232247 catalog,
233248 database,
249+ if_exists : * if_exists,
234250 options : db_options,
235251 } ) ) )
236252 }
@@ -331,8 +347,7 @@ impl Binder {
331347 require_both : bool ,
332348 ) -> Result < ( ) > {
333349 // Validate database options - only allow specific connection-related options
334- const VALID_DATABASE_OPTIONS : & [ & str ] =
335- & [ "DEFAULT_STORAGE_CONNECTION" , "DEFAULT_STORAGE_PATH" ] ;
350+ const VALID_DATABASE_OPTIONS : & [ & str ] = & [ DEFAULT_STORAGE_CONNECTION , DEFAULT_STORAGE_PATH ] ;
336351
337352 // Check for duplicate options
338353 let mut seen_options = std:: collections:: HashSet :: new ( ) ;
@@ -354,33 +369,31 @@ impl Binder {
354369 }
355370
356371 // Validate pairing requirement based on operation type
357- let has_connection = options
358- . iter ( )
359- . any ( |p| p. name == "DEFAULT_STORAGE_CONNECTION" ) ;
360- let has_path = options. iter ( ) . any ( |p| p. name == "DEFAULT_STORAGE_PATH" ) ;
372+ let has_connection = options. iter ( ) . any ( |p| p. name == DEFAULT_STORAGE_CONNECTION ) ;
373+ let has_path = options. iter ( ) . any ( |p| p. name == DEFAULT_STORAGE_PATH ) ;
361374
362375 if require_both {
363376 // For CREATE DATABASE: both options must be specified together
364377 if has_connection && !has_path {
365- return Err ( ErrorCode :: BadArguments (
366- "DEFAULT_STORAGE_CONNECTION requires DEFAULT_STORAGE_PATH to be specified"
367- . to_string ( ) ,
368- ) ) ;
378+ return Err ( ErrorCode :: BadArguments ( format ! (
379+ "{} requires {} to be specified" ,
380+ DEFAULT_STORAGE_CONNECTION , DEFAULT_STORAGE_PATH
381+ ) ) ) ;
369382 }
370383
371384 if has_path && !has_connection {
372- return Err ( ErrorCode :: BadArguments (
373- "DEFAULT_STORAGE_PATH requires DEFAULT_STORAGE_CONNECTION to be specified"
374- . to_string ( ) ,
375- ) ) ;
385+ return Err ( ErrorCode :: BadArguments ( format ! (
386+ "{} requires {} to be specified" ,
387+ DEFAULT_STORAGE_PATH , DEFAULT_STORAGE_CONNECTION
388+ ) ) ) ;
376389 }
377390 }
378391 // For ALTER DATABASE: allow modifying single option (the other one already exists in database)
379392
380393 // Validate that the specified connection exists
381394 if let Some ( connection_property) = options
382395 . iter ( )
383- . find ( |p| p. name == " DEFAULT_STORAGE_CONNECTION" )
396+ . find ( |p| p. name == DEFAULT_STORAGE_CONNECTION )
384397 {
385398 let connection_name = & connection_property. value ;
386399
@@ -402,15 +415,46 @@ impl Binder {
402415 if let ( Some ( connection_prop) , Some ( path_prop) ) = (
403416 options
404417 . iter ( )
405- . find ( |p| p. name == " DEFAULT_STORAGE_CONNECTION" ) ,
406- options. iter ( ) . find ( |p| p. name == " DEFAULT_STORAGE_PATH" ) ,
418+ . find ( |p| p. name == DEFAULT_STORAGE_CONNECTION ) ,
419+ options. iter ( ) . find ( |p| p. name == DEFAULT_STORAGE_PATH ) ,
407420 ) {
408- // Validate the storage path is accessible
421+ // Validate the storage path is accessible and matches the connection protocol
409422 let connection = self . ctx . get_connection ( & connection_prop. value ) . await ?;
423+
424+ let uri_for_scheme = databend_common_ast:: ast:: UriLocation :: from_uri (
425+ path_prop. value . clone ( ) ,
426+ BTreeMap :: new ( ) ,
427+ )
428+ . map_err ( |e| {
429+ ErrorCode :: BadArguments ( format ! (
430+ "Invalid storage path '{}': {}" ,
431+ path_prop. value, e
432+ ) )
433+ } ) ?;
434+
435+ let path_protocol = normalize_storage_protocol ( & uri_for_scheme. protocol ) ;
436+ let connection_protocol = normalize_storage_protocol ( & connection. storage_type ) ;
437+
438+ if path_protocol != connection_protocol {
439+ return Err ( ErrorCode :: BadArguments ( format ! (
440+ "{} protocol '{}' does not match connection '{}' protocol '{}'" ,
441+ DEFAULT_STORAGE_PATH ,
442+ uri_for_scheme. protocol,
443+ connection_prop. value,
444+ connection. storage_type
445+ ) ) ) ;
446+ }
447+
410448 let mut uri_location = databend_common_ast:: ast:: UriLocation :: from_uri (
411449 path_prop. value . clone ( ) ,
412- connection. storage_params ,
413- ) ?;
450+ connection. storage_params . clone ( ) ,
451+ )
452+ . map_err ( |e| {
453+ ErrorCode :: BadArguments ( format ! (
454+ "Invalid storage path '{}': {}" ,
455+ path_prop. value, e
456+ ) )
457+ } ) ?;
414458
415459 // Parse and validate the URI location using parse_storage_params_from_uri
416460 // This enforces that the path must end with '/' (directory requirement)
@@ -490,3 +534,11 @@ impl Binder {
490534 } )
491535 }
492536}
537+
538+ fn normalize_storage_protocol ( protocol : & str ) -> String {
539+ let mut lower = protocol. to_ascii_lowercase ( ) ;
540+ if lower == "file" {
541+ lower = "fs" . to_string ( ) ;
542+ }
543+ lower
544+ }
0 commit comments