Skip to content

Commit 6b49119

Browse files
committed
feat(network): add security group rule mcp tools (#87)
1 parent 5d7a207 commit 6b49119

File tree

2 files changed

+261
-5
lines changed

2 files changed

+261
-5
lines changed

src/openstack_mcp_server/tools/network_tools.py

Lines changed: 127 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ def register_tools(self, mcp: FastMCP):
6363
mcp.tool()(self.get_security_group_detail)
6464
mcp.tool()(self.update_security_group)
6565
mcp.tool()(self.delete_security_group)
66+
mcp.tool()(self.create_security_group_rule)
67+
mcp.tool()(self.get_security_group_rule_detail)
68+
mcp.tool()(self.delete_security_group_rule)
69+
mcp.tool()(self.create_security_group_rules_bulk)
6670

6771
def get_networks(
6872
self,
@@ -1285,11 +1289,16 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
12851289
rule_ids: list[str] | None = None
12861290
rules = getattr(openstack_sg, "security_group_rules", None)
12871291
if rules is not None:
1288-
dto_rules = [
1289-
SecurityGroupRule.model_validate(r, from_attributes=True)
1290-
for r in rules
1291-
]
1292-
rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)]
1292+
extracted: list[str] = []
1293+
for r in rules:
1294+
rid = (
1295+
r.get("id")
1296+
if isinstance(r, dict)
1297+
else getattr(r, "id", None)
1298+
)
1299+
if rid:
1300+
extracted.append(str(rid))
1301+
rule_ids = extracted
12931302

12941303
return SecurityGroup(
12951304
id=openstack_sg.id,
@@ -1299,3 +1308,116 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
12991308
project_id=getattr(openstack_sg, "project_id", None),
13001309
security_group_rule_ids=rule_ids,
13011310
)
1311+
1312+
def create_security_group_rule(
1313+
self,
1314+
security_group_id: str,
1315+
direction: str = "ingress",
1316+
ethertype: str = "IPv4",
1317+
protocol: str | None = None,
1318+
port_range_min: int | None = None,
1319+
port_range_max: int | None = None,
1320+
remote_ip_prefix: str | None = None,
1321+
remote_group_id: str | None = None,
1322+
description: str | None = None,
1323+
project_id: str | None = None,
1324+
) -> SecurityGroupRule:
1325+
"""
1326+
Create a Security Group Rule.
1327+
1328+
:param security_group_id: Target security group ID
1329+
:param direction: "ingress" or "egress"
1330+
:param ethertype: "IPv4" or "IPv6"
1331+
:param protocol: L4 protocol (e.g., "tcp", "udp", "icmp")
1332+
:param port_range_min: Minimum port
1333+
:param port_range_max: Maximum port
1334+
:param remote_ip_prefix: Source/destination CIDR
1335+
:param remote_group_id: Peer security group ID
1336+
:param description: Rule description
1337+
:param project_id: Project ownership
1338+
:return: Created SecurityGroupRule
1339+
"""
1340+
conn = get_openstack_conn()
1341+
args: dict = {
1342+
"security_group_id": security_group_id,
1343+
"direction": direction,
1344+
"ethertype": ethertype,
1345+
}
1346+
args["protocol"] = protocol
1347+
args["port_range_min"] = port_range_min
1348+
args["port_range_max"] = port_range_max
1349+
args["remote_ip_prefix"] = remote_ip_prefix
1350+
args["remote_group_id"] = remote_group_id
1351+
args["description"] = description
1352+
args["project_id"] = project_id
1353+
rule = conn.network.create_security_group_rule(**args)
1354+
return self._convert_to_security_group_rule_model(rule)
1355+
1356+
def get_security_group_rule_detail(
1357+
self, rule_id: str
1358+
) -> SecurityGroupRule:
1359+
"""
1360+
Get detailed information about a specific Security Group Rule.
1361+
1362+
:param rule_id: Rule ID
1363+
:return: SecurityGroupRule detail
1364+
"""
1365+
conn = get_openstack_conn()
1366+
rule = conn.network.get_security_group_rule(rule_id)
1367+
return self._convert_to_security_group_rule_model(rule)
1368+
1369+
def delete_security_group_rule(self, rule_id: str) -> None:
1370+
"""
1371+
Delete a Security Group Rule.
1372+
1373+
:param rule_id: Rule ID to delete
1374+
:return: None
1375+
"""
1376+
conn = get_openstack_conn()
1377+
conn.network.delete_security_group_rule(rule_id, ignore_missing=False)
1378+
return None
1379+
1380+
def create_security_group_rules_bulk(
1381+
self,
1382+
rules: list[dict],
1383+
) -> list[SecurityGroupRule]:
1384+
"""
1385+
Create multiple Security Group Rules in bulk.
1386+
1387+
Each rule dict should follow Neutron SG rule schema keys (e.g.,
1388+
security_group_id, direction, ethertype, protocol, port_range_min,
1389+
port_range_max, remote_ip_prefix, remote_group_id, description, project_id).
1390+
1391+
:param rules: List of rule dictionaries
1392+
:return: List of created SecurityGroupRule models
1393+
"""
1394+
conn = get_openstack_conn()
1395+
created = conn.network.create_security_group_rules(rules=rules)
1396+
return [self._convert_to_security_group_rule_model(r) for r in created]
1397+
1398+
def _convert_to_security_group_rule_model(
1399+
self, openstack_rule
1400+
) -> SecurityGroupRule:
1401+
"""
1402+
Convert an OpenStack Security Group Rule object to a pydantic model.
1403+
1404+
:param openstack_rule: OpenStack rule object
1405+
:return: SecurityGroupRule model
1406+
"""
1407+
return SecurityGroupRule(
1408+
id=getattr(openstack_rule, "id"),
1409+
name=getattr(openstack_rule, "name", None),
1410+
status=getattr(openstack_rule, "status", None),
1411+
description=getattr(openstack_rule, "description", None),
1412+
project_id=getattr(openstack_rule, "project_id", None),
1413+
direction=getattr(openstack_rule, "direction", None),
1414+
ethertype=getattr(openstack_rule, "ethertype", None),
1415+
protocol=getattr(openstack_rule, "protocol", None),
1416+
port_range_min=getattr(openstack_rule, "port_range_min", None),
1417+
port_range_max=getattr(openstack_rule, "port_range_max", None),
1418+
remote_ip_prefix=getattr(openstack_rule, "remote_ip_prefix", None),
1419+
remote_group_id=getattr(openstack_rule, "remote_group_id", None),
1420+
security_group_id=getattr(
1421+
openstack_rule, "security_group_id", None
1422+
),
1423+
)

tests/tools/test_network_tools.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,3 +1768,137 @@ def test_add_get_remove_router_interface_by_port(
17681768
assert removed == RouterInterface(
17691769
router_id="r-if-2", port_id="p-2", subnet_id="s-2"
17701770
)
1771+
1772+
def test_create_security_group_rule(self, mock_openstack_connect_network):
1773+
mock_conn = mock_openstack_connect_network
1774+
rule = Mock()
1775+
rule.id = "r-1"
1776+
rule.name = None
1777+
rule.status = None
1778+
rule.description = "allow 22"
1779+
rule.project_id = "proj-1"
1780+
rule.direction = "ingress"
1781+
rule.ethertype = "IPv4"
1782+
rule.protocol = "tcp"
1783+
rule.port_range_min = 22
1784+
rule.port_range_max = 22
1785+
rule.remote_ip_prefix = "0.0.0.0/0"
1786+
rule.remote_group_id = None
1787+
rule.security_group_id = "sg-1"
1788+
mock_conn.network.create_security_group_rule.return_value = rule
1789+
1790+
tools = self.get_network_tools()
1791+
res = tools.create_security_group_rule(
1792+
security_group_id="sg-1",
1793+
direction="ingress",
1794+
ethertype="IPv4",
1795+
protocol="tcp",
1796+
port_range_min=22,
1797+
port_range_max=22,
1798+
remote_ip_prefix="0.0.0.0/0",
1799+
description="allow 22",
1800+
project_id="proj-1",
1801+
)
1802+
assert res.id == "r-1"
1803+
mock_conn.network.create_security_group_rule.assert_called_once()
1804+
1805+
def test_get_security_group_rule_detail(
1806+
self, mock_openstack_connect_network
1807+
):
1808+
mock_conn = mock_openstack_connect_network
1809+
rule = Mock()
1810+
rule.id = "r-2"
1811+
rule.name = None
1812+
rule.status = None
1813+
rule.description = None
1814+
rule.project_id = None
1815+
rule.direction = "egress"
1816+
rule.ethertype = "IPv4"
1817+
rule.protocol = None
1818+
rule.port_range_min = None
1819+
rule.port_range_max = None
1820+
rule.remote_ip_prefix = None
1821+
rule.remote_group_id = None
1822+
rule.security_group_id = "sg-1"
1823+
mock_conn.network.get_security_group_rule.return_value = rule
1824+
1825+
tools = self.get_network_tools()
1826+
res = tools.get_security_group_rule_detail("r-2")
1827+
assert res.id == "r-2"
1828+
mock_conn.network.get_security_group_rule.assert_called_once_with(
1829+
"r-2"
1830+
)
1831+
1832+
def test_delete_security_group_rule(self, mock_openstack_connect_network):
1833+
mock_conn = mock_openstack_connect_network
1834+
mock_conn.network.delete_security_group_rule.return_value = None
1835+
1836+
tools = self.get_network_tools()
1837+
res = tools.delete_security_group_rule("r-3")
1838+
assert res is None
1839+
mock_conn.network.delete_security_group_rule.assert_called_once_with(
1840+
"r-3", ignore_missing=False
1841+
)
1842+
1843+
def test_create_security_group_rules_bulk(
1844+
self, mock_openstack_connect_network
1845+
):
1846+
mock_conn = mock_openstack_connect_network
1847+
r1 = Mock()
1848+
r1.id = "r-10"
1849+
r1.name = None
1850+
r1.status = None
1851+
r1.description = None
1852+
r1.project_id = None
1853+
r1.security_group_id = "sg-1"
1854+
r1.direction = "ingress"
1855+
r1.ethertype = "IPv4"
1856+
r1.protocol = "tcp"
1857+
r1.port_range_min = 80
1858+
r1.port_range_max = 80
1859+
r1.remote_ip_prefix = "0.0.0.0/0"
1860+
r1.remote_group_id = None
1861+
1862+
r2 = Mock()
1863+
r2.id = "r-11"
1864+
r2.name = None
1865+
r2.status = None
1866+
r2.description = None
1867+
r2.project_id = None
1868+
r2.security_group_id = "sg-1"
1869+
r2.direction = "ingress"
1870+
r2.ethertype = "IPv4"
1871+
r2.protocol = "tcp"
1872+
r2.port_range_min = 443
1873+
r2.port_range_max = 443
1874+
r2.remote_ip_prefix = "0.0.0.0/0"
1875+
r2.remote_group_id = None
1876+
1877+
mock_conn.network.create_security_group_rules.return_value = [r1, r2]
1878+
1879+
tools = self.get_network_tools()
1880+
rules = [
1881+
{
1882+
"security_group_id": "sg-1",
1883+
"direction": "ingress",
1884+
"ethertype": "IPv4",
1885+
"protocol": "tcp",
1886+
"port_range_min": 80,
1887+
"port_range_max": 80,
1888+
"remote_ip_prefix": "0.0.0.0/0",
1889+
},
1890+
{
1891+
"security_group_id": "sg-1",
1892+
"direction": "ingress",
1893+
"ethertype": "IPv4",
1894+
"protocol": "tcp",
1895+
"port_range_min": 443,
1896+
"port_range_max": 443,
1897+
"remote_ip_prefix": "0.0.0.0/0",
1898+
},
1899+
]
1900+
res = tools.create_security_group_rules_bulk(rules)
1901+
assert len(res) == 2
1902+
mock_conn.network.create_security_group_rules.assert_called_once_with(
1903+
rules=rules
1904+
)

0 commit comments

Comments
 (0)