1111 Port ,
1212 Router ,
1313 RouterInterface ,
14+ SecurityGroup ,
15+ SecurityGroupRule ,
1416 Subnet ,
1517)
1618
@@ -56,6 +58,11 @@ def register_tools(self, mcp: FastMCP):
5658 mcp .tool ()(self .add_router_interface )
5759 mcp .tool ()(self .get_router_interfaces )
5860 mcp .tool ()(self .remove_router_interface )
61+ mcp .tool ()(self .get_security_groups )
62+ mcp .tool ()(self .create_security_group )
63+ mcp .tool ()(self .get_security_group_detail )
64+ mcp .tool ()(self .update_security_group )
65+ mcp .tool ()(self .delete_security_group )
5966
6067 def get_networks (
6168 self ,
@@ -171,9 +178,9 @@ def update_network(
171178
172179 update_args = {}
173180
174- if name is not None :
181+ if name :
175182 update_args ["name" ] = name
176- if description is not None :
183+ if description :
177184 update_args ["description" ] = description
178185 if is_admin_state_up is not None :
179186 update_args ["admin_state_up" ] = is_admin_state_up
@@ -305,11 +312,11 @@ def create_subnet(
305312 "ip_version" : ip_version ,
306313 "enable_dhcp" : is_dhcp_enabled ,
307314 }
308- if name is not None :
315+ if name :
309316 subnet_args ["name" ] = name
310- if description is not None :
317+ if description :
311318 subnet_args ["description" ] = description
312- if gateway_ip is not None :
319+ if gateway_ip :
313320 subnet_args ["gateway_ip" ] = gateway_ip
314321 if dns_nameservers is not None :
315322 subnet_args ["dns_nameservers" ] = dns_nameservers
@@ -378,13 +385,13 @@ def update_subnet(
378385 """
379386 conn = get_openstack_conn ()
380387 update_args : dict = {}
381- if name is not None :
388+ if name :
382389 update_args ["name" ] = name
383- if description is not None :
390+ if description :
384391 update_args ["description" ] = description
385392 if clear_gateway :
386393 update_args ["gateway_ip" ] = None
387- elif gateway_ip is not None :
394+ elif gateway_ip :
388395 update_args ["gateway_ip" ] = gateway_ip
389396 if is_dhcp_enabled is not None :
390397 update_args ["enable_dhcp" ] = is_dhcp_enabled
@@ -492,9 +499,9 @@ def set_port_binding(
492499 """
493500 conn = get_openstack_conn ()
494501 update_args : dict = {}
495- if host_id is not None :
502+ if host_id :
496503 update_args ["binding_host_id" ] = host_id
497- if vnic_type is not None :
504+ if vnic_type :
498505 update_args ["binding_vnic_type" ] = vnic_type
499506 if profile is not None :
500507 update_args ["binding_profile" ] = profile
@@ -531,11 +538,11 @@ def create_port(
531538 "network_id" : network_id ,
532539 "admin_state_up" : is_admin_state_up ,
533540 }
534- if name is not None :
541+ if name :
535542 port_args ["name" ] = name
536- if description is not None :
543+ if description :
537544 port_args ["description" ] = description
538- if device_id is not None :
545+ if device_id :
539546 port_args ["device_id" ] = device_id
540547 if fixed_ips is not None :
541548 port_args ["fixed_ips" ] = fixed_ips
@@ -604,13 +611,13 @@ def update_port(
604611 """
605612 conn = get_openstack_conn ()
606613 update_args : dict = {}
607- if name is not None :
614+ if name :
608615 update_args ["name" ] = name
609- if description is not None :
616+ if description :
610617 update_args ["description" ] = description
611618 if is_admin_state_up is not None :
612619 update_args ["admin_state_up" ] = is_admin_state_up
613- if device_id is not None :
620+ if device_id :
614621 update_args ["device_id" ] = device_id
615622 if security_group_ids is not None :
616623 update_args ["security_groups" ] = security_group_ids
@@ -717,13 +724,13 @@ def create_floating_ip(
717724 """
718725 conn = get_openstack_conn ()
719726 ip_args : dict = {"floating_network_id" : floating_network_id }
720- if description is not None :
727+ if description :
721728 ip_args ["description" ] = description
722- if fixed_ip_address is not None :
729+ if fixed_ip_address :
723730 ip_args ["fixed_ip_address" ] = fixed_ip_address
724- if port_id is not None :
731+ if port_id :
725732 ip_args ["port_id" ] = port_id
726- if project_id is not None :
733+ if project_id :
727734 ip_args ["project_id" ] = project_id
728735 ip = conn .network .create_ip (** ip_args )
729736 return self ._convert_to_floating_ip_model (ip )
@@ -744,7 +751,7 @@ def attach_floating_ip_to_port(
744751 """
745752 conn = get_openstack_conn ()
746753 update_args : dict = {"port_id" : port_id }
747- if fixed_ip_address is not None :
754+ if fixed_ip_address :
748755 update_args ["fixed_ip_address" ] = fixed_ip_address
749756 ip = conn .network .update_ip (floating_ip_id , ** update_args )
750757 return self ._convert_to_floating_ip_model (ip )
@@ -783,11 +790,11 @@ def update_floating_ip(
783790 """
784791 conn = get_openstack_conn ()
785792 update_args : dict = {}
786- if description is not None :
793+ if description :
787794 update_args ["description" ] = description
788- if port_id is not None :
795+ if port_id :
789796 update_args ["port_id" ] = port_id
790- if fixed_ip_address is not None :
797+ if fixed_ip_address :
791798 update_args ["fixed_ip_address" ] = fixed_ip_address
792799 else :
793800 if clear_port :
@@ -947,13 +954,13 @@ def create_router(
947954 """
948955 conn = get_openstack_conn ()
949956 router_args : dict = {"admin_state_up" : is_admin_state_up }
950- if name is not None :
957+ if name :
951958 router_args ["name" ] = name
952- if description is not None :
959+ if description :
953960 router_args ["description" ] = description
954961 if is_distributed is not None :
955962 router_args ["distributed" ] = is_distributed
956- if project_id is not None :
963+ if project_id :
957964 router_args ["project_id" ] = project_id
958965 if external_gateway_info is not None :
959966 router_args ["external_gateway_info" ] = (
@@ -1008,9 +1015,9 @@ def update_router(
10081015 """
10091016 conn = get_openstack_conn ()
10101017 update_args : dict = {}
1011- if name is not None :
1018+ if name :
10121019 update_args ["name" ] = name
1013- if description is not None :
1020+ if description :
10141021 update_args ["description" ] = description
10151022 if is_admin_state_up is not None :
10161023 update_args ["admin_state_up" ] = is_admin_state_up
@@ -1114,9 +1121,9 @@ def remove_router_interface(
11141121 """
11151122 conn = get_openstack_conn ()
11161123 args : dict = {}
1117- if subnet_id is not None :
1124+ if subnet_id :
11181125 args ["subnet_id" ] = subnet_id
1119- if port_id is not None :
1126+ if port_id :
11201127 args ["port_id" ] = port_id
11211128 res = conn .network .remove_interface_from_router (router_id , ** args )
11221129 return RouterInterface (
@@ -1161,6 +1168,134 @@ def _sanitize_server_filters(self, filters: dict) -> dict:
11611168 if not filters :
11621169 return {}
11631170 attrs = dict (filters )
1164- # Remove client-only or unsupported filters
11651171 attrs .pop ("status" , None )
11661172 return attrs
1173+
1174+ def get_security_groups (
1175+ self ,
1176+ project_id : str | None = None ,
1177+ name : str | None = None ,
1178+ id : str | None = None ,
1179+ ) -> list [SecurityGroup ]:
1180+ """
1181+ Get the list of Security Groups with optional filtering.
1182+
1183+ :param project_id: Filter by project ID
1184+ :param name: Filter by security group name
1185+ :param id: Filter by security group ID
1186+ :return: List of SecurityGroup objects
1187+ """
1188+ conn = get_openstack_conn ()
1189+ filters : dict = {}
1190+ if project_id :
1191+ filters ["project_id" ] = project_id
1192+ if name :
1193+ filters ["name" ] = name
1194+ if id :
1195+ filters ["id" ] = id
1196+ security_groups = conn .network .security_groups (** filters )
1197+ return [
1198+ self ._convert_to_security_group_model (sg ) for sg in security_groups
1199+ ]
1200+
1201+ def create_security_group (
1202+ self ,
1203+ name : str ,
1204+ description : str | None = None ,
1205+ project_id : str | None = None ,
1206+ ) -> SecurityGroup :
1207+ """
1208+ Create a new Security Group.
1209+
1210+ :param name: Security group name
1211+ :param description: Security group description
1212+ :param project_id: Project ID to assign ownership
1213+ :return: Created SecurityGroup object
1214+ """
1215+ conn = get_openstack_conn ()
1216+ args : dict = {"name" : name }
1217+ if description :
1218+ args ["description" ] = description
1219+ if project_id :
1220+ args ["project_id" ] = project_id
1221+ sg = conn .network .create_security_group (** args )
1222+ return self ._convert_to_security_group_model (sg )
1223+
1224+ def get_security_group_detail (
1225+ self , security_group_id : str
1226+ ) -> SecurityGroup :
1227+ """
1228+ Get detailed information about a specific Security Group.
1229+
1230+ :param security_group_id: ID of the security group to retrieve
1231+ :return: SecurityGroup details
1232+ """
1233+ conn = get_openstack_conn ()
1234+ sg = conn .network .get_security_group (security_group_id )
1235+ return self ._convert_to_security_group_model (sg )
1236+
1237+ def update_security_group (
1238+ self ,
1239+ security_group_id : str ,
1240+ name : str | None = None ,
1241+ description : str | None = None ,
1242+ ) -> SecurityGroup :
1243+ """
1244+ Update an existing Security Group.
1245+
1246+ :param security_group_id: ID of the security group to update
1247+ :param name: New security group name
1248+ :param description: New security group description
1249+ :return: Updated SecurityGroup object
1250+ """
1251+ conn = get_openstack_conn ()
1252+ update_args : dict = {}
1253+ if name :
1254+ update_args ["name" ] = name
1255+ if description :
1256+ update_args ["description" ] = description
1257+ if not update_args :
1258+ current = conn .network .get_security_group (security_group_id )
1259+ return self ._convert_to_security_group_model (current )
1260+ sg = conn .network .update_security_group (
1261+ security_group_id , ** update_args
1262+ )
1263+ return self ._convert_to_security_group_model (sg )
1264+
1265+ def delete_security_group (self , security_group_id : str ) -> None :
1266+ """
1267+ Delete a Security Group.
1268+
1269+ :param security_group_id: ID of the security group to delete
1270+ :return: None
1271+ """
1272+ conn = get_openstack_conn ()
1273+ conn .network .delete_security_group (
1274+ security_group_id , ignore_missing = False
1275+ )
1276+ return None
1277+
1278+ def _convert_to_security_group_model (self , openstack_sg ) -> SecurityGroup :
1279+ """
1280+ Convert an OpenStack Security Group object to a SecurityGroup pydantic model.
1281+
1282+ :param openstack_sg: OpenStack security group object
1283+ :return: Pydantic SecurityGroup model
1284+ """
1285+ rule_ids : list [str ] | None = None
1286+ rules = getattr (openstack_sg , "security_group_rules" , None )
1287+ if rules is not None :
1288+ dto_rules = [
1289+ SecurityGroupRule .model_validate (r , from_attributes = True )
1290+ for r in rules
1291+ ]
1292+ rule_ids = [str (r .id ) for r in dto_rules if getattr (r , "id" , None )]
1293+
1294+ return SecurityGroup (
1295+ id = openstack_sg .id ,
1296+ name = getattr (openstack_sg , "name" , None ),
1297+ status = getattr (openstack_sg , "status" , None ),
1298+ description = getattr (openstack_sg , "description" , None ),
1299+ project_id = getattr (openstack_sg , "project_id" , None ),
1300+ security_group_rule_ids = rule_ids ,
1301+ )
0 commit comments