@@ -63,6 +63,10 @@ def register_tools(self, mcp: FastMCP):
6363 mcp .tool ()(self .get_security_group_detail )
6464 mcp .tool ()(self .update_security_group )
6565 mcp .tool ()(self .delete_security_group )
66+ mcp .tool ()(self .create_security_group_rule )
67+ mcp .tool ()(self .get_security_group_rule_detail )
68+ mcp .tool ()(self .delete_security_group_rule )
69+ mcp .tool ()(self .create_security_group_rules_bulk )
6670
6771 def get_networks (
6872 self ,
@@ -1285,11 +1289,16 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
12851289 rule_ids : list [str ] | None = None
12861290 rules = getattr (openstack_sg , "security_group_rules" , None )
12871291 if rules is not None :
1288- dto_rules = [
1289- SecurityGroupRule .model_validate (r , from_attributes = True )
1290- for r in rules
1291- ]
1292- rule_ids = [str (r .id ) for r in dto_rules if getattr (r , "id" , None )]
1292+ extracted : list [str ] = []
1293+ for r in rules :
1294+ rid = (
1295+ r .get ("id" )
1296+ if isinstance (r , dict )
1297+ else getattr (r , "id" , None )
1298+ )
1299+ if rid :
1300+ extracted .append (str (rid ))
1301+ rule_ids = extracted
12931302
12941303 return SecurityGroup (
12951304 id = openstack_sg .id ,
@@ -1299,3 +1308,116 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
12991308 project_id = getattr (openstack_sg , "project_id" , None ),
13001309 security_group_rule_ids = rule_ids ,
13011310 )
1311+
1312+ def create_security_group_rule (
1313+ self ,
1314+ security_group_id : str ,
1315+ direction : str = "ingress" ,
1316+ ethertype : str = "IPv4" ,
1317+ protocol : str | None = None ,
1318+ port_range_min : int | None = None ,
1319+ port_range_max : int | None = None ,
1320+ remote_ip_prefix : str | None = None ,
1321+ remote_group_id : str | None = None ,
1322+ description : str | None = None ,
1323+ project_id : str | None = None ,
1324+ ) -> SecurityGroupRule :
1325+ """
1326+ Create a Security Group Rule.
1327+
1328+ :param security_group_id: Target security group ID
1329+ :param direction: "ingress" or "egress"
1330+ :param ethertype: "IPv4" or "IPv6"
1331+ :param protocol: L4 protocol (e.g., "tcp", "udp", "icmp")
1332+ :param port_range_min: Minimum port
1333+ :param port_range_max: Maximum port
1334+ :param remote_ip_prefix: Source/destination CIDR
1335+ :param remote_group_id: Peer security group ID
1336+ :param description: Rule description
1337+ :param project_id: Project ownership
1338+ :return: Created SecurityGroupRule
1339+ """
1340+ conn = get_openstack_conn ()
1341+ args : dict = {
1342+ "security_group_id" : security_group_id ,
1343+ "direction" : direction ,
1344+ "ethertype" : ethertype ,
1345+ }
1346+ args ["protocol" ] = protocol
1347+ args ["port_range_min" ] = port_range_min
1348+ args ["port_range_max" ] = port_range_max
1349+ args ["remote_ip_prefix" ] = remote_ip_prefix
1350+ args ["remote_group_id" ] = remote_group_id
1351+ args ["description" ] = description
1352+ args ["project_id" ] = project_id
1353+ rule = conn .network .create_security_group_rule (** args )
1354+ return self ._convert_to_security_group_rule_model (rule )
1355+
1356+ def get_security_group_rule_detail (
1357+ self , rule_id : str
1358+ ) -> SecurityGroupRule :
1359+ """
1360+ Get detailed information about a specific Security Group Rule.
1361+
1362+ :param rule_id: Rule ID
1363+ :return: SecurityGroupRule detail
1364+ """
1365+ conn = get_openstack_conn ()
1366+ rule = conn .network .get_security_group_rule (rule_id )
1367+ return self ._convert_to_security_group_rule_model (rule )
1368+
1369+ def delete_security_group_rule (self , rule_id : str ) -> None :
1370+ """
1371+ Delete a Security Group Rule.
1372+
1373+ :param rule_id: Rule ID to delete
1374+ :return: None
1375+ """
1376+ conn = get_openstack_conn ()
1377+ conn .network .delete_security_group_rule (rule_id , ignore_missing = False )
1378+ return None
1379+
1380+ def create_security_group_rules_bulk (
1381+ self ,
1382+ rules : list [dict ],
1383+ ) -> list [SecurityGroupRule ]:
1384+ """
1385+ Create multiple Security Group Rules in bulk.
1386+
1387+ Each rule dict should follow Neutron SG rule schema keys (e.g.,
1388+ security_group_id, direction, ethertype, protocol, port_range_min,
1389+ port_range_max, remote_ip_prefix, remote_group_id, description, project_id).
1390+
1391+ :param rules: List of rule dictionaries
1392+ :return: List of created SecurityGroupRule models
1393+ """
1394+ conn = get_openstack_conn ()
1395+ created = conn .network .create_security_group_rules (rules = rules )
1396+ return [self ._convert_to_security_group_rule_model (r ) for r in created ]
1397+
1398+ def _convert_to_security_group_rule_model (
1399+ self , openstack_rule
1400+ ) -> SecurityGroupRule :
1401+ """
1402+ Convert an OpenStack Security Group Rule object to a pydantic model.
1403+
1404+ :param openstack_rule: OpenStack rule object
1405+ :return: SecurityGroupRule model
1406+ """
1407+ return SecurityGroupRule (
1408+ id = getattr (openstack_rule , "id" ),
1409+ name = getattr (openstack_rule , "name" , None ),
1410+ status = getattr (openstack_rule , "status" , None ),
1411+ description = getattr (openstack_rule , "description" , None ),
1412+ project_id = getattr (openstack_rule , "project_id" , None ),
1413+ direction = getattr (openstack_rule , "direction" , None ),
1414+ ethertype = getattr (openstack_rule , "ethertype" , None ),
1415+ protocol = getattr (openstack_rule , "protocol" , None ),
1416+ port_range_min = getattr (openstack_rule , "port_range_min" , None ),
1417+ port_range_max = getattr (openstack_rule , "port_range_max" , None ),
1418+ remote_ip_prefix = getattr (openstack_rule , "remote_ip_prefix" , None ),
1419+ remote_group_id = getattr (openstack_rule , "remote_group_id" , None ),
1420+ security_group_id = getattr (
1421+ openstack_rule , "security_group_id" , None
1422+ ),
1423+ )
0 commit comments