Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 203 additions & 7 deletions src/openstack_mcp_server/tools/network_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def register_tools(self, mcp: FastMCP):
mcp.tool()(self.delete_port)
mcp.tool()(self.get_port_allowed_address_pairs)
mcp.tool()(self.set_port_binding)
mcp.tool()(self.add_port_to_security_group)
mcp.tool()(self.remove_port_from_security_group)
mcp.tool()(self.get_floating_ips)
mcp.tool()(self.create_floating_ip)
mcp.tool()(self.delete_floating_ip)
Expand All @@ -63,6 +65,10 @@ def register_tools(self, mcp: FastMCP):
mcp.tool()(self.get_security_group_detail)
mcp.tool()(self.update_security_group)
mcp.tool()(self.delete_security_group)
mcp.tool()(self.create_security_group_rule)
mcp.tool()(self.get_security_group_rule_detail)
mcp.tool()(self.delete_security_group_rule)
mcp.tool()(self.create_security_group_rules_bulk)
Copy link
Collaborator

@S0okJu S0okJu Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_security_group_rules_bulk 도구를 제작하신 이유가 있으신가요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리소스를 다루는 API를 만들때 벌크요청이 있으면 인터페이스를 같이 올려주는게 좋을것 같아서 추가했습니다.
보안그룹 규칙의 예를 들어.. 프롬프팅을 아래와 같이 한다면 한번에 여러 규칙을 추가할 수 있으니까요

test-security 보안그룹에 HTTP, HTTPS, SSH의 인바운드를 허용하고 ICMP를 양방향 허용하는 규칙을 추가해줘

무엇보다 SDK에 구현이 되어있어서 가져왔습니다 😋

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@platanus-kr 도구를 제작하신 이유에 대해 구체적으로 알 수 있을까요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

상기 답변으로 충분하다고 생각되는데 혹시 다른 이유가 더 필요할까요?
벌크요청을 하면 단건 호출 대비 효율성이 좋고 동일한 파라미터로 요청할 경우 휴먼에러를 방지할 수 있습니다. 오픈스택에는 보안그룹 규칙 외에도 아래와 같이 자체적으로 여러개의 리소스를 다룰 수 있는 API가 구현되어 있습니다.. 일단 제가 당장 생각나는건 이거 두가지네요.


def get_networks(
self,
Expand Down Expand Up @@ -511,6 +517,62 @@ def set_port_binding(
updated = conn.network.update_port(port_id, **update_args)
return self._convert_to_port_model(updated)

def add_port_to_security_group(
self,
port_id: str,
security_group_id: str,
) -> Port:
"""
Attach a Security Group to a Port.

Idempotent: if the security group is already attached, returns the current
port state without issuing an update.

:param port_id: Port ID
:param security_group_id: Security Group ID to attach
:return: Updated (or current) Port object
"""
conn = get_openstack_conn()
current = conn.network.get_port(port_id)
current_group_ids: list[str] = list(current.security_group_ids)
if security_group_id in current_group_ids:
return self._convert_to_port_model(current)

updated_group_ids = current_group_ids + [security_group_id]
updated = conn.network.update_port(
port_id, security_groups=updated_group_ids
)
return self._convert_to_port_model(updated)

def remove_port_from_security_group(
self,
port_id: str,
security_group_id: str,
) -> Port:
"""
Detach a Security Group from a Port.

Idempotent: if the security group is not attached, returns the current
port state without issuing an update.

:param port_id: Port ID
:param security_group_id: Security Group ID to detach
:return: Updated (or current) Port object
"""
conn = get_openstack_conn()
current = conn.network.get_port(port_id)
current_group_ids: list[str] = list(current.security_group_ids)
if security_group_id not in current_group_ids:
return self._convert_to_port_model(current)

updated_group_ids = [
sg_id for sg_id in current_group_ids if sg_id != security_group_id
]
updated = conn.network.update_port(
port_id, security_groups=updated_group_ids
)
return self._convert_to_port_model(updated)

def create_port(
self,
network_id: str,
Expand Down Expand Up @@ -1171,6 +1233,25 @@ def _sanitize_server_filters(self, filters: dict) -> dict:
attrs.pop("status", None)
return attrs

def _coerce_port(self, value) -> int | None:
"""
Coerce a port value that may arrive as a string (e.g., "80") into int.
Comment on lines +1236 to +1238
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

질문 : Coerce라는 단어를 이번에 처음 봤는대 일반적으로 많이 사용되는 단어인가요?


This relaxes tool input validation issues from UIs that serialize numbers
as strings. Only base-10 digit strings are converted; otherwise returns
None so the field can be omitted.

:param value: Port as int, str, or None
:return: int port or None
"""
if isinstance(value, int):
return value
if isinstance(value, str):
stripped = value.strip()
if stripped.isdigit():
return int(stripped)
return None

def get_security_groups(
self,
project_id: str | None = None,
Expand Down Expand Up @@ -1282,14 +1363,12 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
:param openstack_sg: OpenStack security group object
:return: Pydantic SecurityGroup model
"""
rule_ids: list[str] | None = None
rules = getattr(openstack_sg, "security_group_rules", None)
if rules is not None:
dto_rules = [
SecurityGroupRule.model_validate(r, from_attributes=True)
for r in rules
]
rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)]
rule_ids: list[str] | None = (
[r.get("id") for r in rules if r.get("id")]
if rules is not None
else None
)

return SecurityGroup(
id=openstack_sg.id,
Expand All @@ -1299,3 +1378,120 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
project_id=getattr(openstack_sg, "project_id", None),
security_group_rule_ids=rule_ids,
)

def create_security_group_rule(
self,
security_group_id: str,
direction: str = "ingress",
ethertype: str = "IPv4",
protocol: str | None = None,
port_range_min: int | str | None = None,
port_range_max: int | str | None = None,
remote_ip_prefix: str | None = None,
remote_group_id: str | None = None,
description: str | None = None,
project_id: str | None = None,
) -> SecurityGroupRule:
"""
Create a Security Group Rule.

:param security_group_id: Target security group ID
:param direction: "ingress" or "egress"
:param ethertype: "IPv4" or "IPv6"
:param protocol: L4 protocol (e.g., "tcp", "udp", "icmp")
:param port_range_min: Minimum port
:param port_range_max: Maximum port
:param remote_ip_prefix: Source/destination CIDR
:param remote_group_id: Peer security group ID
:param description: Rule description
:param project_id: Project ownership
:return: Created SecurityGroupRule
"""
conn = get_openstack_conn()
create_args: dict = {}
if protocol is not None:
create_args["protocol"] = protocol
coerced_min = self._coerce_port(port_range_min)
coerced_max = self._coerce_port(port_range_max)
if coerced_min is not None:
create_args["port_range_min"] = coerced_min
if coerced_max is not None:
create_args["port_range_max"] = coerced_max
if remote_ip_prefix is not None:
create_args["remote_ip_prefix"] = remote_ip_prefix
if remote_group_id is not None:
create_args["remote_group_id"] = remote_group_id
if description is not None:
create_args["description"] = description
if project_id is not None:
create_args["project_id"] = project_id

rule = conn.network.create_security_group_rule(
security_group_id=security_group_id,
direction=direction,
ethertype=ethertype,
**create_args,
Comment on lines +1430 to +1433
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

인자 전달 방식이 일관적이였으면 좋겠습니다. security_group_id, direction, ethertype 이 필수 필드여서 positional 한 인자로 전달한걸로 보이는데, 기능상 차이가 없으니 전체를 키워드 인자로 전달하는게 더 깔끔해보입니다

)
return SecurityGroupRule(**rule)

def get_security_group_rule_detail(
self, rule_id: str
) -> SecurityGroupRule:
"""
Get detailed information about a specific Security Group Rule.

:param rule_id: Rule ID
:return: SecurityGroupRule detail
"""
conn = get_openstack_conn()
rule = conn.network.get_security_group_rule(rule_id)
return SecurityGroupRule(**rule)

def delete_security_group_rule(self, rule_id: str) -> None:
"""
Delete a Security Group Rule.

:param rule_id: Rule ID to delete
:return: None
"""
conn = get_openstack_conn()
conn.network.delete_security_group_rule(rule_id, ignore_missing=False)
return None

def create_security_group_rules_bulk(
self,
rules: list[dict],
) -> list[SecurityGroupRule]:
"""
Create multiple Security Group Rules in bulk.

Each rule dict should follow Neutron SG rule schema keys (e.g.,
Copy link
Collaborator

@halucinor halucinor Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

맥락정보로 전달되기 때문에 SG 같은 약자가 의미 표현에 좋지 않을 것 같습니다

security_group_id, direction, ethertype, protocol, port_range_min,
port_range_max, remote_ip_prefix, remote_group_id, description, project_id).

:param rules: List of rule dictionaries
:return: List of created SecurityGroupRule models
"""
conn = get_openstack_conn()

def _clean_rule_payload(raw: dict) -> dict:
payload = dict(raw)
# Remove port ranges when protocol is absent (Neutron requirement)
if not payload.get("protocol"):
payload.pop("port_range_min", None)
payload.pop("port_range_max", None)
else:
# Coerce possible string ports to integers
for key in ("port_range_min", "port_range_max"):
if key in payload:
coerced = self._coerce_port(payload.get(key))
if coerced is None:
payload.pop(key, None)
else:
payload[key] = coerced
# Drop keys explicitly set to None
return {k: v for k, v in payload.items() if v is not None}
Comment on lines +1461 to +1493
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydantic 모델로 인자정보를 명시해도 LLM이 인자 타입을 인지하지 못하는지 확인이 필요해 보입니다


cleaned_rules = [_clean_rule_payload(r) for r in rules]
created = conn.network.create_security_group_rules(rules=cleaned_rules)
return [SecurityGroupRule(**r) for r in created]
Loading