-
Notifications
You must be signed in to change notification settings - Fork 6
feat: Add security group rule tools #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
c8cb9ce
a666530
db21adc
da6bdb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,8 @@ def register_tools(self, mcp: FastMCP): | |
| mcp.tool()(self.delete_port) | ||
| mcp.tool()(self.get_port_allowed_address_pairs) | ||
| mcp.tool()(self.set_port_binding) | ||
| mcp.tool()(self.add_port_to_security_group) | ||
| mcp.tool()(self.remove_port_from_security_group) | ||
| mcp.tool()(self.get_floating_ips) | ||
| mcp.tool()(self.create_floating_ip) | ||
| mcp.tool()(self.delete_floating_ip) | ||
|
|
@@ -63,6 +65,10 @@ def register_tools(self, mcp: FastMCP): | |
| mcp.tool()(self.get_security_group_detail) | ||
| mcp.tool()(self.update_security_group) | ||
| mcp.tool()(self.delete_security_group) | ||
| mcp.tool()(self.create_security_group_rule) | ||
| mcp.tool()(self.get_security_group_rule_detail) | ||
| mcp.tool()(self.delete_security_group_rule) | ||
| mcp.tool()(self.create_security_group_rules_bulk) | ||
|
|
||
| def get_networks( | ||
| self, | ||
|
|
@@ -511,6 +517,62 @@ def set_port_binding( | |
| updated = conn.network.update_port(port_id, **update_args) | ||
| return self._convert_to_port_model(updated) | ||
|
|
||
| def add_port_to_security_group( | ||
| self, | ||
| port_id: str, | ||
| security_group_id: str, | ||
| ) -> Port: | ||
| """ | ||
| Attach a Security Group to a Port. | ||
|
|
||
| Idempotent: if the security group is already attached, returns the current | ||
| port state without issuing an update. | ||
|
|
||
| :param port_id: Port ID | ||
| :param security_group_id: Security Group ID to attach | ||
| :return: Updated (or current) Port object | ||
| """ | ||
| conn = get_openstack_conn() | ||
| current = conn.network.get_port(port_id) | ||
| current_group_ids: list[str] = list(current.security_group_ids) | ||
| if security_group_id in current_group_ids: | ||
| return self._convert_to_port_model(current) | ||
|
|
||
| updated_group_ids = current_group_ids + [security_group_id] | ||
| updated = conn.network.update_port( | ||
| port_id, security_groups=updated_group_ids | ||
| ) | ||
| return self._convert_to_port_model(updated) | ||
|
|
||
| def remove_port_from_security_group( | ||
| self, | ||
| port_id: str, | ||
| security_group_id: str, | ||
| ) -> Port: | ||
| """ | ||
| Detach a Security Group from a Port. | ||
|
|
||
| Idempotent: if the security group is not attached, returns the current | ||
| port state without issuing an update. | ||
|
|
||
| :param port_id: Port ID | ||
| :param security_group_id: Security Group ID to detach | ||
| :return: Updated (or current) Port object | ||
| """ | ||
| conn = get_openstack_conn() | ||
| current = conn.network.get_port(port_id) | ||
| current_group_ids: list[str] = list(current.security_group_ids) | ||
| if security_group_id not in current_group_ids: | ||
| return self._convert_to_port_model(current) | ||
|
|
||
| updated_group_ids = [ | ||
| sg_id for sg_id in current_group_ids if sg_id != security_group_id | ||
| ] | ||
| updated = conn.network.update_port( | ||
| port_id, security_groups=updated_group_ids | ||
| ) | ||
| return self._convert_to_port_model(updated) | ||
|
|
||
| def create_port( | ||
| self, | ||
| network_id: str, | ||
|
|
@@ -1171,6 +1233,25 @@ def _sanitize_server_filters(self, filters: dict) -> dict: | |
| attrs.pop("status", None) | ||
| return attrs | ||
|
|
||
| def _coerce_port(self, value) -> int | None: | ||
| """ | ||
| Coerce a port value that may arrive as a string (e.g., "80") into int. | ||
|
Comment on lines
+1236
to
+1238
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 질문 : Coerce라는 단어를 이번에 처음 봤는대 일반적으로 많이 사용되는 단어인가요? |
||
|
|
||
| This relaxes tool input validation issues from UIs that serialize numbers | ||
| as strings. Only base-10 digit strings are converted; otherwise returns | ||
| None so the field can be omitted. | ||
|
|
||
| :param value: Port as int, str, or None | ||
| :return: int port or None | ||
| """ | ||
| if isinstance(value, int): | ||
| return value | ||
| if isinstance(value, str): | ||
| stripped = value.strip() | ||
| if stripped.isdigit(): | ||
| return int(stripped) | ||
| return None | ||
|
|
||
| def get_security_groups( | ||
| self, | ||
| project_id: str | None = None, | ||
|
|
@@ -1282,14 +1363,12 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: | |
| :param openstack_sg: OpenStack security group object | ||
| :return: Pydantic SecurityGroup model | ||
| """ | ||
| rule_ids: list[str] | None = None | ||
| rules = getattr(openstack_sg, "security_group_rules", None) | ||
| if rules is not None: | ||
| dto_rules = [ | ||
| SecurityGroupRule.model_validate(r, from_attributes=True) | ||
| for r in rules | ||
| ] | ||
| rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)] | ||
| rule_ids: list[str] | None = ( | ||
| [r.get("id") for r in rules if r.get("id")] | ||
| if rules is not None | ||
| else None | ||
| ) | ||
|
|
||
| return SecurityGroup( | ||
| id=openstack_sg.id, | ||
|
|
@@ -1299,3 +1378,120 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: | |
| project_id=getattr(openstack_sg, "project_id", None), | ||
| security_group_rule_ids=rule_ids, | ||
| ) | ||
|
|
||
| def create_security_group_rule( | ||
| self, | ||
| security_group_id: str, | ||
| direction: str = "ingress", | ||
| ethertype: str = "IPv4", | ||
| protocol: str | None = None, | ||
| port_range_min: int | str | None = None, | ||
| port_range_max: int | str | None = None, | ||
| remote_ip_prefix: str | None = None, | ||
| remote_group_id: str | None = None, | ||
| description: str | None = None, | ||
| project_id: str | None = None, | ||
| ) -> SecurityGroupRule: | ||
| """ | ||
| Create a Security Group Rule. | ||
|
|
||
| :param security_group_id: Target security group ID | ||
| :param direction: "ingress" or "egress" | ||
| :param ethertype: "IPv4" or "IPv6" | ||
| :param protocol: L4 protocol (e.g., "tcp", "udp", "icmp") | ||
| :param port_range_min: Minimum port | ||
| :param port_range_max: Maximum port | ||
| :param remote_ip_prefix: Source/destination CIDR | ||
| :param remote_group_id: Peer security group ID | ||
| :param description: Rule description | ||
| :param project_id: Project ownership | ||
| :return: Created SecurityGroupRule | ||
| """ | ||
| conn = get_openstack_conn() | ||
| create_args: dict = {} | ||
| if protocol is not None: | ||
| create_args["protocol"] = protocol | ||
| coerced_min = self._coerce_port(port_range_min) | ||
| coerced_max = self._coerce_port(port_range_max) | ||
| if coerced_min is not None: | ||
| create_args["port_range_min"] = coerced_min | ||
| if coerced_max is not None: | ||
| create_args["port_range_max"] = coerced_max | ||
| if remote_ip_prefix is not None: | ||
| create_args["remote_ip_prefix"] = remote_ip_prefix | ||
| if remote_group_id is not None: | ||
| create_args["remote_group_id"] = remote_group_id | ||
| if description is not None: | ||
| create_args["description"] = description | ||
| if project_id is not None: | ||
| create_args["project_id"] = project_id | ||
|
|
||
| rule = conn.network.create_security_group_rule( | ||
| security_group_id=security_group_id, | ||
| direction=direction, | ||
| ethertype=ethertype, | ||
| **create_args, | ||
|
Comment on lines
+1430
to
+1433
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 인자 전달 방식이 일관적이였으면 좋겠습니다. security_group_id, direction, ethertype 이 필수 필드여서 positional 한 인자로 전달한걸로 보이는데, 기능상 차이가 없으니 전체를 키워드 인자로 전달하는게 더 깔끔해보입니다 |
||
| ) | ||
| return SecurityGroupRule(**rule) | ||
|
|
||
| def get_security_group_rule_detail( | ||
| self, rule_id: str | ||
| ) -> SecurityGroupRule: | ||
| """ | ||
| Get detailed information about a specific Security Group Rule. | ||
|
|
||
| :param rule_id: Rule ID | ||
| :return: SecurityGroupRule detail | ||
| """ | ||
| conn = get_openstack_conn() | ||
| rule = conn.network.get_security_group_rule(rule_id) | ||
| return SecurityGroupRule(**rule) | ||
|
|
||
| def delete_security_group_rule(self, rule_id: str) -> None: | ||
| """ | ||
| Delete a Security Group Rule. | ||
|
|
||
| :param rule_id: Rule ID to delete | ||
| :return: None | ||
| """ | ||
| conn = get_openstack_conn() | ||
| conn.network.delete_security_group_rule(rule_id, ignore_missing=False) | ||
| return None | ||
|
|
||
| def create_security_group_rules_bulk( | ||
| self, | ||
| rules: list[dict], | ||
| ) -> list[SecurityGroupRule]: | ||
| """ | ||
| Create multiple Security Group Rules in bulk. | ||
|
|
||
| Each rule dict should follow Neutron SG rule schema keys (e.g., | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 맥락정보로 전달되기 때문에 SG 같은 약자가 의미 표현에 좋지 않을 것 같습니다 |
||
| security_group_id, direction, ethertype, protocol, port_range_min, | ||
| port_range_max, remote_ip_prefix, remote_group_id, description, project_id). | ||
|
|
||
| :param rules: List of rule dictionaries | ||
| :return: List of created SecurityGroupRule models | ||
| """ | ||
| conn = get_openstack_conn() | ||
|
|
||
| def _clean_rule_payload(raw: dict) -> dict: | ||
| payload = dict(raw) | ||
| # Remove port ranges when protocol is absent (Neutron requirement) | ||
| if not payload.get("protocol"): | ||
| payload.pop("port_range_min", None) | ||
| payload.pop("port_range_max", None) | ||
| else: | ||
| # Coerce possible string ports to integers | ||
| for key in ("port_range_min", "port_range_max"): | ||
| if key in payload: | ||
| coerced = self._coerce_port(payload.get(key)) | ||
| if coerced is None: | ||
| payload.pop(key, None) | ||
| else: | ||
| payload[key] = coerced | ||
| # Drop keys explicitly set to None | ||
| return {k: v for k, v in payload.items() if v is not None} | ||
|
Comment on lines
+1461
to
+1493
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pydantic 모델로 인자정보를 명시해도 LLM이 인자 타입을 인지하지 못하는지 확인이 필요해 보입니다 |
||
|
|
||
| cleaned_rules = [_clean_rule_payload(r) for r in rules] | ||
| created = conn.network.create_security_group_rules(rules=cleaned_rules) | ||
| return [SecurityGroupRule(**r) for r in created] | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create_security_group_rules_bulk도구를 제작하신 이유가 있으신가요?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리소스를 다루는 API를 만들때 벌크요청이 있으면 인터페이스를 같이 올려주는게 좋을것 같아서 추가했습니다.
보안그룹 규칙의 예를 들어.. 프롬프팅을 아래와 같이 한다면 한번에 여러 규칙을 추가할 수 있으니까요
무엇보다 SDK에 구현이 되어있어서 가져왔습니다 😋
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@platanus-kr 도구를 제작하신 이유에 대해 구체적으로 알 수 있을까요?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
상기 답변으로 충분하다고 생각되는데 혹시 다른 이유가 더 필요할까요?
벌크요청을 하면 단건 호출 대비 효율성이 좋고 동일한 파라미터로 요청할 경우 휴먼에러를 방지할 수 있습니다. 오픈스택에는 보안그룹 규칙 외에도 아래와 같이 자체적으로 여러개의 리소스를 다룰 수 있는 API가 구현되어 있습니다.. 일단 제가 당장 생각나는건 이거 두가지네요.