From c8cb9ceb00b273363785c901b3024fc422343cee Mon Sep 17 00:00:00 2001 From: platanus-kr Date: Tue, 21 Oct 2025 23:24:28 +0900 Subject: [PATCH 1/4] feat(network): add security group rule mcp tools (#87) --- .../tools/network_tools.py | 132 ++++++++++++++++- tests/tools/test_network_tools.py | 134 ++++++++++++++++++ 2 files changed, 261 insertions(+), 5 deletions(-) diff --git a/src/openstack_mcp_server/tools/network_tools.py b/src/openstack_mcp_server/tools/network_tools.py index c1101df..0c94fe9 100644 --- a/src/openstack_mcp_server/tools/network_tools.py +++ b/src/openstack_mcp_server/tools/network_tools.py @@ -63,6 +63,10 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.get_security_group_detail) mcp.tool()(self.update_security_group) mcp.tool()(self.delete_security_group) + mcp.tool()(self.create_security_group_rule) + mcp.tool()(self.get_security_group_rule_detail) + mcp.tool()(self.delete_security_group_rule) + mcp.tool()(self.create_security_group_rules_bulk) def get_networks( self, @@ -1285,11 +1289,16 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: rule_ids: list[str] | None = None rules = getattr(openstack_sg, "security_group_rules", None) if rules is not None: - dto_rules = [ - SecurityGroupRule.model_validate(r, from_attributes=True) - for r in rules - ] - rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)] + extracted: list[str] = [] + for r in rules: + rid = ( + r.get("id") + if isinstance(r, dict) + else getattr(r, "id", None) + ) + if rid: + extracted.append(str(rid)) + rule_ids = extracted return SecurityGroup( id=openstack_sg.id, @@ -1299,3 +1308,116 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: project_id=getattr(openstack_sg, "project_id", None), security_group_rule_ids=rule_ids, ) + + def create_security_group_rule( + self, + security_group_id: str, + direction: str = "ingress", + ethertype: str = "IPv4", + protocol: str | None = None, + port_range_min: int | None = None, + port_range_max: int | None = None, + remote_ip_prefix: str | None = None, + remote_group_id: str | None = None, + description: str | None = None, + project_id: str | None = None, + ) -> SecurityGroupRule: + """ + Create a Security Group Rule. + + :param security_group_id: Target security group ID + :param direction: "ingress" or "egress" + :param ethertype: "IPv4" or "IPv6" + :param protocol: L4 protocol (e.g., "tcp", "udp", "icmp") + :param port_range_min: Minimum port + :param port_range_max: Maximum port + :param remote_ip_prefix: Source/destination CIDR + :param remote_group_id: Peer security group ID + :param description: Rule description + :param project_id: Project ownership + :return: Created SecurityGroupRule + """ + conn = get_openstack_conn() + args: dict = { + "security_group_id": security_group_id, + "direction": direction, + "ethertype": ethertype, + } + args["protocol"] = protocol + args["port_range_min"] = port_range_min + args["port_range_max"] = port_range_max + args["remote_ip_prefix"] = remote_ip_prefix + args["remote_group_id"] = remote_group_id + args["description"] = description + args["project_id"] = project_id + rule = conn.network.create_security_group_rule(**args) + return self._convert_to_security_group_rule_model(rule) + + def get_security_group_rule_detail( + self, rule_id: str + ) -> SecurityGroupRule: + """ + Get detailed information about a specific Security Group Rule. + + :param rule_id: Rule ID + :return: SecurityGroupRule detail + """ + conn = get_openstack_conn() + rule = conn.network.get_security_group_rule(rule_id) + return self._convert_to_security_group_rule_model(rule) + + def delete_security_group_rule(self, rule_id: str) -> None: + """ + Delete a Security Group Rule. + + :param rule_id: Rule ID to delete + :return: None + """ + conn = get_openstack_conn() + conn.network.delete_security_group_rule(rule_id, ignore_missing=False) + return None + + def create_security_group_rules_bulk( + self, + rules: list[dict], + ) -> list[SecurityGroupRule]: + """ + Create multiple Security Group Rules in bulk. + + Each rule dict should follow Neutron SG rule schema keys (e.g., + security_group_id, direction, ethertype, protocol, port_range_min, + port_range_max, remote_ip_prefix, remote_group_id, description, project_id). + + :param rules: List of rule dictionaries + :return: List of created SecurityGroupRule models + """ + conn = get_openstack_conn() + created = conn.network.create_security_group_rules(rules=rules) + return [self._convert_to_security_group_rule_model(r) for r in created] + + def _convert_to_security_group_rule_model( + self, openstack_rule + ) -> SecurityGroupRule: + """ + Convert an OpenStack Security Group Rule object to a pydantic model. + + :param openstack_rule: OpenStack rule object + :return: SecurityGroupRule model + """ + return SecurityGroupRule( + id=getattr(openstack_rule, "id"), + name=getattr(openstack_rule, "name", None), + status=getattr(openstack_rule, "status", None), + description=getattr(openstack_rule, "description", None), + project_id=getattr(openstack_rule, "project_id", None), + direction=getattr(openstack_rule, "direction", None), + ethertype=getattr(openstack_rule, "ethertype", None), + protocol=getattr(openstack_rule, "protocol", None), + port_range_min=getattr(openstack_rule, "port_range_min", None), + port_range_max=getattr(openstack_rule, "port_range_max", None), + remote_ip_prefix=getattr(openstack_rule, "remote_ip_prefix", None), + remote_group_id=getattr(openstack_rule, "remote_group_id", None), + security_group_id=getattr( + openstack_rule, "security_group_id", None + ), + ) diff --git a/tests/tools/test_network_tools.py b/tests/tools/test_network_tools.py index 5108bed..d29c6c4 100644 --- a/tests/tools/test_network_tools.py +++ b/tests/tools/test_network_tools.py @@ -1768,3 +1768,137 @@ def test_add_get_remove_router_interface_by_port( assert removed == RouterInterface( router_id="r-if-2", port_id="p-2", subnet_id="s-2" ) + + def test_create_security_group_rule(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + rule = Mock() + rule.id = "r-1" + rule.name = None + rule.status = None + rule.description = "allow 22" + rule.project_id = "proj-1" + rule.direction = "ingress" + rule.ethertype = "IPv4" + rule.protocol = "tcp" + rule.port_range_min = 22 + rule.port_range_max = 22 + rule.remote_ip_prefix = "0.0.0.0/0" + rule.remote_group_id = None + rule.security_group_id = "sg-1" + mock_conn.network.create_security_group_rule.return_value = rule + + tools = self.get_network_tools() + res = tools.create_security_group_rule( + security_group_id="sg-1", + direction="ingress", + ethertype="IPv4", + protocol="tcp", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="0.0.0.0/0", + description="allow 22", + project_id="proj-1", + ) + assert res.id == "r-1" + mock_conn.network.create_security_group_rule.assert_called_once() + + def test_get_security_group_rule_detail( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + rule = Mock() + rule.id = "r-2" + rule.name = None + rule.status = None + rule.description = None + rule.project_id = None + rule.direction = "egress" + rule.ethertype = "IPv4" + rule.protocol = None + rule.port_range_min = None + rule.port_range_max = None + rule.remote_ip_prefix = None + rule.remote_group_id = None + rule.security_group_id = "sg-1" + mock_conn.network.get_security_group_rule.return_value = rule + + tools = self.get_network_tools() + res = tools.get_security_group_rule_detail("r-2") + assert res.id == "r-2" + mock_conn.network.get_security_group_rule.assert_called_once_with( + "r-2" + ) + + def test_delete_security_group_rule(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + mock_conn.network.delete_security_group_rule.return_value = None + + tools = self.get_network_tools() + res = tools.delete_security_group_rule("r-3") + assert res is None + mock_conn.network.delete_security_group_rule.assert_called_once_with( + "r-3", ignore_missing=False + ) + + def test_create_security_group_rules_bulk( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + r1 = Mock() + r1.id = "r-10" + r1.name = None + r1.status = None + r1.description = None + r1.project_id = None + r1.security_group_id = "sg-1" + r1.direction = "ingress" + r1.ethertype = "IPv4" + r1.protocol = "tcp" + r1.port_range_min = 80 + r1.port_range_max = 80 + r1.remote_ip_prefix = "0.0.0.0/0" + r1.remote_group_id = None + + r2 = Mock() + r2.id = "r-11" + r2.name = None + r2.status = None + r2.description = None + r2.project_id = None + r2.security_group_id = "sg-1" + r2.direction = "ingress" + r2.ethertype = "IPv4" + r2.protocol = "tcp" + r2.port_range_min = 443 + r2.port_range_max = 443 + r2.remote_ip_prefix = "0.0.0.0/0" + r2.remote_group_id = None + + mock_conn.network.create_security_group_rules.return_value = [r1, r2] + + tools = self.get_network_tools() + rules = [ + { + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 80, + "port_range_max": 80, + "remote_ip_prefix": "0.0.0.0/0", + }, + { + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 443, + "port_range_max": 443, + "remote_ip_prefix": "0.0.0.0/0", + }, + ] + res = tools.create_security_group_rules_bulk(rules) + assert len(res) == 2 + mock_conn.network.create_security_group_rules.assert_called_once_with( + rules=rules + ) From a66653094cacc75eb0e74f01a23fe70997005efb Mon Sep 17 00:00:00 2001 From: platanus-kr Date: Tue, 21 Oct 2025 23:35:22 +0900 Subject: [PATCH 2/4] feat(network): add security group binding to port to mcp tools (#87) --- .../tools/network_tools.py | 58 +++++++++++++ tests/tools/test_network_tools.py | 81 +++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/src/openstack_mcp_server/tools/network_tools.py b/src/openstack_mcp_server/tools/network_tools.py index 0c94fe9..9849d2f 100644 --- a/src/openstack_mcp_server/tools/network_tools.py +++ b/src/openstack_mcp_server/tools/network_tools.py @@ -44,6 +44,8 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.delete_port) mcp.tool()(self.get_port_allowed_address_pairs) mcp.tool()(self.set_port_binding) + mcp.tool()(self.add_security_group_to_port) + mcp.tool()(self.remove_security_group_from_port) mcp.tool()(self.get_floating_ips) mcp.tool()(self.create_floating_ip) mcp.tool()(self.delete_floating_ip) @@ -515,6 +517,62 @@ def set_port_binding( updated = conn.network.update_port(port_id, **update_args) return self._convert_to_port_model(updated) + def add_security_group_to_port( + self, + port_id: str, + security_group_id: str, + ) -> Port: + """ + Attach a Security Group to a Port. + + Idempotent: if the security group is already attached, returns the current + port state without issuing an update. + + :param port_id: Port ID + :param security_group_id: Security Group ID to attach + :return: Updated (or current) Port object + """ + conn = get_openstack_conn() + current = conn.network.get_port(port_id) + current_group_ids: list[str] = list(current.security_group_ids or []) + if security_group_id in current_group_ids: + return self._convert_to_port_model(current) + + updated_group_ids = current_group_ids + [security_group_id] + updated = conn.network.update_port( + port_id, security_groups=updated_group_ids + ) + return self._convert_to_port_model(updated) + + def remove_security_group_from_port( + self, + port_id: str, + security_group_id: str, + ) -> Port: + """ + Detach a Security Group from a Port. + + Idempotent: if the security group is not attached, returns the current + port state without issuing an update. + + :param port_id: Port ID + :param security_group_id: Security Group ID to detach + :return: Updated (or current) Port object + """ + conn = get_openstack_conn() + current = conn.network.get_port(port_id) + current_group_ids: list[str] = list(current.security_group_ids or []) + if security_group_id not in current_group_ids: + return self._convert_to_port_model(current) + + updated_group_ids = [ + sg_id for sg_id in current_group_ids if sg_id != security_group_id + ] + updated = conn.network.update_port( + port_id, security_groups=updated_group_ids + ) + return self._convert_to_port_model(updated) + def create_port( self, network_id: str, diff --git a/tests/tools/test_network_tools.py b/tests/tools/test_network_tools.py index d29c6c4..d3dc251 100644 --- a/tests/tools/test_network_tools.py +++ b/tests/tools/test_network_tools.py @@ -869,6 +869,87 @@ def test_set_port_binding_and_admin_state( ) assert res_toggle.is_admin_state_up is True + def test_add_remove_security_group_on_port( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + + # current without sg-9 + current = Mock() + current.id = "port-1" + current.name = "p1" + current.status = "ACTIVE" + current.description = None + current.project_id = None + current.network_id = "net-1" + current.admin_state_up = True + current.is_admin_state_up = True + current.device_id = None + current.device_owner = None + current.mac_address = "fa:16:3e:00:00:09" + current.fixed_ips = [] + current.security_group_ids = ["sg-1", "sg-2"] + mock_conn.network.get_port.return_value = current + + # updated after add + updated_add = Mock() + updated_add.id = "port-1" + updated_add.name = "p1" + updated_add.status = "ACTIVE" + updated_add.description = None + updated_add.project_id = None + updated_add.network_id = "net-1" + updated_add.admin_state_up = True + updated_add.is_admin_state_up = True + updated_add.device_id = None + updated_add.device_owner = None + updated_add.mac_address = "fa:16:3e:00:00:09" + updated_add.fixed_ips = [] + updated_add.security_group_ids = ["sg-1", "sg-2", "sg-9"] + mock_conn.network.update_port.return_value = updated_add + + tools = self.get_network_tools() + res_add = tools.add_security_group_to_port("port-1", "sg-9") + assert isinstance(res_add, Port) + mock_conn.network.update_port.assert_called_with( + "port-1", security_groups=["sg-1", "sg-2", "sg-9"] + ) + + # idempotent add when sg already present + mock_conn.network.get_port.return_value = updated_add + res_add_again = tools.add_security_group_to_port("port-1", "sg-9") + assert isinstance(res_add_again, Port) + + # updated after remove + updated_remove = Mock() + updated_remove.id = "port-1" + updated_remove.name = "p1" + updated_remove.status = "ACTIVE" + updated_remove.description = None + updated_remove.project_id = None + updated_remove.network_id = "net-1" + updated_remove.admin_state_up = True + updated_remove.is_admin_state_up = True + updated_remove.device_id = None + updated_remove.device_owner = None + updated_remove.mac_address = "fa:16:3e:00:00:09" + updated_remove.fixed_ips = [] + updated_remove.security_group_ids = ["sg-1", "sg-2"] + mock_conn.network.update_port.return_value = updated_remove + + res_remove = tools.remove_security_group_from_port("port-1", "sg-9") + assert isinstance(res_remove, Port) + mock_conn.network.update_port.assert_called_with( + "port-1", security_groups=["sg-1", "sg-2"] + ) + + # idempotent remove when sg not present + mock_conn.network.get_port.return_value = updated_remove + res_remove_again = tools.remove_security_group_from_port( + "port-1", "sg-9" + ) + assert isinstance(res_remove_again, Port) + def test_get_subnets_filters_and_has_gateway_true( self, mock_openstack_connect_network, From db21adc010c69eae7d8a91cedd962e5047ff18a1 Mon Sep 17 00:00:00 2001 From: platanus-kr Date: Sat, 25 Oct 2025 18:33:04 +0900 Subject: [PATCH 3/4] feat(network): improve feedback (#87) --- .../tools/network_tools.py | 138 ++++++++++-------- tests/tools/test_network_tools.py | 127 ++++++++-------- 2 files changed, 141 insertions(+), 124 deletions(-) diff --git a/src/openstack_mcp_server/tools/network_tools.py b/src/openstack_mcp_server/tools/network_tools.py index 9849d2f..a106209 100644 --- a/src/openstack_mcp_server/tools/network_tools.py +++ b/src/openstack_mcp_server/tools/network_tools.py @@ -44,7 +44,7 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.delete_port) mcp.tool()(self.get_port_allowed_address_pairs) mcp.tool()(self.set_port_binding) - mcp.tool()(self.add_security_group_to_port) + mcp.tool()(self.add_port_to_security_group) mcp.tool()(self.remove_security_group_from_port) mcp.tool()(self.get_floating_ips) mcp.tool()(self.create_floating_ip) @@ -517,7 +517,7 @@ def set_port_binding( updated = conn.network.update_port(port_id, **update_args) return self._convert_to_port_model(updated) - def add_security_group_to_port( + def add_port_to_security_group( self, port_id: str, security_group_id: str, @@ -534,7 +534,7 @@ def add_security_group_to_port( """ conn = get_openstack_conn() current = conn.network.get_port(port_id) - current_group_ids: list[str] = list(current.security_group_ids or []) + current_group_ids: list[str] = list(current.security_group_ids) if security_group_id in current_group_ids: return self._convert_to_port_model(current) @@ -561,7 +561,7 @@ def remove_security_group_from_port( """ conn = get_openstack_conn() current = conn.network.get_port(port_id) - current_group_ids: list[str] = list(current.security_group_ids or []) + current_group_ids: list[str] = list(current.security_group_ids) if security_group_id not in current_group_ids: return self._convert_to_port_model(current) @@ -1233,6 +1233,25 @@ def _sanitize_server_filters(self, filters: dict) -> dict: attrs.pop("status", None) return attrs + def _coerce_port(self, value) -> int | None: + """ + Coerce a port value that may arrive as a string (e.g., "80") into int. + + This relaxes tool input validation issues from UIs that serialize numbers + as strings. Only base-10 digit strings are converted; otherwise returns + None so the field can be omitted. + + :param value: Port as int, str, or None + :return: int port or None + """ + if isinstance(value, int): + return value + if isinstance(value, str): + stripped = value.strip() + if stripped.isdigit(): + return int(stripped) + return None + def get_security_groups( self, project_id: str | None = None, @@ -1344,19 +1363,12 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: :param openstack_sg: OpenStack security group object :return: Pydantic SecurityGroup model """ - rule_ids: list[str] | None = None rules = getattr(openstack_sg, "security_group_rules", None) - if rules is not None: - extracted: list[str] = [] - for r in rules: - rid = ( - r.get("id") - if isinstance(r, dict) - else getattr(r, "id", None) - ) - if rid: - extracted.append(str(rid)) - rule_ids = extracted + rule_ids: list[str] | None = ( + [r.get("id") for r in rules if r.get("id")] + if rules is not None + else None + ) return SecurityGroup( id=openstack_sg.id, @@ -1373,8 +1385,8 @@ def create_security_group_rule( direction: str = "ingress", ethertype: str = "IPv4", protocol: str | None = None, - port_range_min: int | None = None, - port_range_max: int | None = None, + port_range_min: int | str | None = None, + port_range_max: int | str | None = None, remote_ip_prefix: str | None = None, remote_group_id: str | None = None, description: str | None = None, @@ -1396,20 +1408,31 @@ def create_security_group_rule( :return: Created SecurityGroupRule """ conn = get_openstack_conn() - args: dict = { - "security_group_id": security_group_id, - "direction": direction, - "ethertype": ethertype, - } - args["protocol"] = protocol - args["port_range_min"] = port_range_min - args["port_range_max"] = port_range_max - args["remote_ip_prefix"] = remote_ip_prefix - args["remote_group_id"] = remote_group_id - args["description"] = description - args["project_id"] = project_id - rule = conn.network.create_security_group_rule(**args) - return self._convert_to_security_group_rule_model(rule) + create_args: dict = {} + if protocol is not None: + create_args["protocol"] = protocol + coerced_min = self._coerce_port(port_range_min) + coerced_max = self._coerce_port(port_range_max) + if coerced_min is not None: + create_args["port_range_min"] = coerced_min + if coerced_max is not None: + create_args["port_range_max"] = coerced_max + if remote_ip_prefix is not None: + create_args["remote_ip_prefix"] = remote_ip_prefix + if remote_group_id is not None: + create_args["remote_group_id"] = remote_group_id + if description is not None: + create_args["description"] = description + if project_id is not None: + create_args["project_id"] = project_id + + rule = conn.network.create_security_group_rule( + security_group_id=security_group_id, + direction=direction, + ethertype=ethertype, + **create_args, + ) + return SecurityGroupRule(**rule) def get_security_group_rule_detail( self, rule_id: str @@ -1422,7 +1445,7 @@ def get_security_group_rule_detail( """ conn = get_openstack_conn() rule = conn.network.get_security_group_rule(rule_id) - return self._convert_to_security_group_rule_model(rule) + return SecurityGroupRule(**rule) def delete_security_group_rule(self, rule_id: str) -> None: """ @@ -1450,32 +1473,25 @@ def create_security_group_rules_bulk( :return: List of created SecurityGroupRule models """ conn = get_openstack_conn() - created = conn.network.create_security_group_rules(rules=rules) - return [self._convert_to_security_group_rule_model(r) for r in created] - def _convert_to_security_group_rule_model( - self, openstack_rule - ) -> SecurityGroupRule: - """ - Convert an OpenStack Security Group Rule object to a pydantic model. - - :param openstack_rule: OpenStack rule object - :return: SecurityGroupRule model - """ - return SecurityGroupRule( - id=getattr(openstack_rule, "id"), - name=getattr(openstack_rule, "name", None), - status=getattr(openstack_rule, "status", None), - description=getattr(openstack_rule, "description", None), - project_id=getattr(openstack_rule, "project_id", None), - direction=getattr(openstack_rule, "direction", None), - ethertype=getattr(openstack_rule, "ethertype", None), - protocol=getattr(openstack_rule, "protocol", None), - port_range_min=getattr(openstack_rule, "port_range_min", None), - port_range_max=getattr(openstack_rule, "port_range_max", None), - remote_ip_prefix=getattr(openstack_rule, "remote_ip_prefix", None), - remote_group_id=getattr(openstack_rule, "remote_group_id", None), - security_group_id=getattr( - openstack_rule, "security_group_id", None - ), - ) + def _clean_rule_payload(raw: dict) -> dict: + payload = dict(raw) + # Remove port ranges when protocol is absent (Neutron requirement) + if not payload.get("protocol"): + payload.pop("port_range_min", None) + payload.pop("port_range_max", None) + else: + # Coerce possible string ports to integers + for key in ("port_range_min", "port_range_max"): + if key in payload: + coerced = self._coerce_port(payload.get(key)) + if coerced is None: + payload.pop(key, None) + else: + payload[key] = coerced + # Drop keys explicitly set to None + return {k: v for k, v in payload.items() if v is not None} + + cleaned_rules = [_clean_rule_payload(r) for r in rules] + created = conn.network.create_security_group_rules(rules=cleaned_rules) + return [SecurityGroupRule(**r) for r in created] diff --git a/tests/tools/test_network_tools.py b/tests/tools/test_network_tools.py index d3dc251..cb0d6bd 100644 --- a/tests/tools/test_network_tools.py +++ b/tests/tools/test_network_tools.py @@ -909,7 +909,7 @@ def test_add_remove_security_group_on_port( mock_conn.network.update_port.return_value = updated_add tools = self.get_network_tools() - res_add = tools.add_security_group_to_port("port-1", "sg-9") + res_add = tools.add_port_to_security_group("port-1", "sg-9") assert isinstance(res_add, Port) mock_conn.network.update_port.assert_called_with( "port-1", security_groups=["sg-1", "sg-2", "sg-9"] @@ -917,7 +917,7 @@ def test_add_remove_security_group_on_port( # idempotent add when sg already present mock_conn.network.get_port.return_value = updated_add - res_add_again = tools.add_security_group_to_port("port-1", "sg-9") + res_add_again = tools.add_port_to_security_group("port-1", "sg-9") assert isinstance(res_add_again, Port) # updated after remove @@ -1460,10 +1460,7 @@ def test_get_security_groups_filters(self, mock_openstack_connect_network): sg.status = None sg.description = "desc" sg.project_id = "proj-1" - sg.security_group_rules = [ - {"id": "r-1"}, - {"id": "r-2"}, - ] + sg.security_group_rules = [{"id": "r-1"}, {"id": "r-2"}] expected_sg = SecurityGroup( id="sg-1", @@ -1852,20 +1849,21 @@ def test_add_get_remove_router_interface_by_port( def test_create_security_group_rule(self, mock_openstack_connect_network): mock_conn = mock_openstack_connect_network - rule = Mock() - rule.id = "r-1" - rule.name = None - rule.status = None - rule.description = "allow 22" - rule.project_id = "proj-1" - rule.direction = "ingress" - rule.ethertype = "IPv4" - rule.protocol = "tcp" - rule.port_range_min = 22 - rule.port_range_max = 22 - rule.remote_ip_prefix = "0.0.0.0/0" - rule.remote_group_id = None - rule.security_group_id = "sg-1" + rule = { + "id": "r-1", + "name": None, + "status": None, + "description": "allow 22", + "project_id": "proj-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 22, + "port_range_max": 22, + "remote_ip_prefix": "0.0.0.0/0", + "remote_group_id": None, + "security_group_id": "sg-1", + } mock_conn.network.create_security_group_rule.return_value = rule tools = self.get_network_tools() @@ -1887,20 +1885,21 @@ def test_get_security_group_rule_detail( self, mock_openstack_connect_network ): mock_conn = mock_openstack_connect_network - rule = Mock() - rule.id = "r-2" - rule.name = None - rule.status = None - rule.description = None - rule.project_id = None - rule.direction = "egress" - rule.ethertype = "IPv4" - rule.protocol = None - rule.port_range_min = None - rule.port_range_max = None - rule.remote_ip_prefix = None - rule.remote_group_id = None - rule.security_group_id = "sg-1" + rule = { + "id": "r-2", + "name": None, + "status": None, + "description": None, + "project_id": None, + "direction": "egress", + "ethertype": "IPv4", + "protocol": None, + "port_range_min": None, + "port_range_max": None, + "remote_ip_prefix": None, + "remote_group_id": None, + "security_group_id": "sg-1", + } mock_conn.network.get_security_group_rule.return_value = rule tools = self.get_network_tools() @@ -1925,35 +1924,37 @@ def test_create_security_group_rules_bulk( self, mock_openstack_connect_network ): mock_conn = mock_openstack_connect_network - r1 = Mock() - r1.id = "r-10" - r1.name = None - r1.status = None - r1.description = None - r1.project_id = None - r1.security_group_id = "sg-1" - r1.direction = "ingress" - r1.ethertype = "IPv4" - r1.protocol = "tcp" - r1.port_range_min = 80 - r1.port_range_max = 80 - r1.remote_ip_prefix = "0.0.0.0/0" - r1.remote_group_id = None - - r2 = Mock() - r2.id = "r-11" - r2.name = None - r2.status = None - r2.description = None - r2.project_id = None - r2.security_group_id = "sg-1" - r2.direction = "ingress" - r2.ethertype = "IPv4" - r2.protocol = "tcp" - r2.port_range_min = 443 - r2.port_range_max = 443 - r2.remote_ip_prefix = "0.0.0.0/0" - r2.remote_group_id = None + r1 = { + "id": "r-10", + "name": None, + "status": None, + "description": None, + "project_id": None, + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 80, + "port_range_max": 80, + "remote_ip_prefix": "0.0.0.0/0", + "remote_group_id": None, + } + + r2 = { + "id": "r-11", + "name": None, + "status": None, + "description": None, + "project_id": None, + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 443, + "port_range_max": 443, + "remote_ip_prefix": "0.0.0.0/0", + "remote_group_id": None, + } mock_conn.network.create_security_group_rules.return_value = [r1, r2] From da6bdb5e461d189b9451e943846e92e24bf46d0f Mon Sep 17 00:00:00 2001 From: platanus-kr Date: Sat, 25 Oct 2025 22:17:39 +0900 Subject: [PATCH 4/4] feat(network): rename port to security group (#87) --- src/openstack_mcp_server/tools/network_tools.py | 4 ++-- tests/tools/test_network_tools.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/openstack_mcp_server/tools/network_tools.py b/src/openstack_mcp_server/tools/network_tools.py index a106209..51a5e33 100644 --- a/src/openstack_mcp_server/tools/network_tools.py +++ b/src/openstack_mcp_server/tools/network_tools.py @@ -45,7 +45,7 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.get_port_allowed_address_pairs) mcp.tool()(self.set_port_binding) mcp.tool()(self.add_port_to_security_group) - mcp.tool()(self.remove_security_group_from_port) + mcp.tool()(self.remove_port_from_security_group) mcp.tool()(self.get_floating_ips) mcp.tool()(self.create_floating_ip) mcp.tool()(self.delete_floating_ip) @@ -544,7 +544,7 @@ def add_port_to_security_group( ) return self._convert_to_port_model(updated) - def remove_security_group_from_port( + def remove_port_from_security_group( self, port_id: str, security_group_id: str, diff --git a/tests/tools/test_network_tools.py b/tests/tools/test_network_tools.py index cb0d6bd..f919299 100644 --- a/tests/tools/test_network_tools.py +++ b/tests/tools/test_network_tools.py @@ -937,7 +937,7 @@ def test_add_remove_security_group_on_port( updated_remove.security_group_ids = ["sg-1", "sg-2"] mock_conn.network.update_port.return_value = updated_remove - res_remove = tools.remove_security_group_from_port("port-1", "sg-9") + res_remove = tools.remove_port_from_security_group("port-1", "sg-9") assert isinstance(res_remove, Port) mock_conn.network.update_port.assert_called_with( "port-1", security_groups=["sg-1", "sg-2"] @@ -945,7 +945,7 @@ def test_add_remove_security_group_on_port( # idempotent remove when sg not present mock_conn.network.get_port.return_value = updated_remove - res_remove_again = tools.remove_security_group_from_port( + res_remove_again = tools.remove_port_from_security_group( "port-1", "sg-9" ) assert isinstance(res_remove_again, Port)