|
| 1 | +"""4XX error classes, which handle exception groups. |
| 2 | +
|
| 3 | +ASYNC400 except-star-invalid-attribute checks for invalid attribute access on except* |
| 4 | +""" |
| 5 | + |
| 6 | +from __future__ import annotations |
| 7 | + |
| 8 | +import ast |
| 9 | +from typing import TYPE_CHECKING, Any |
| 10 | + |
| 11 | +from .flake8asyncvisitor import Flake8AsyncVisitor |
| 12 | +from .helpers import error_class |
| 13 | + |
| 14 | +if TYPE_CHECKING: |
| 15 | + from collections.abc import Mapping |
| 16 | + |
| 17 | +EXCGROUP_ATTRS = ( |
| 18 | + # from ExceptionGroup |
| 19 | + "message", |
| 20 | + "exceptions", |
| 21 | + "subgroup", |
| 22 | + "split", |
| 23 | + "derive", |
| 24 | + # from BaseException |
| 25 | + "args", |
| 26 | + "with_traceback", |
| 27 | + "add_note", |
| 28 | + # in the backport |
| 29 | + "_is_protocol", |
| 30 | +) |
| 31 | + |
| 32 | + |
| 33 | +@error_class |
| 34 | +class Visitor4xx(Flake8AsyncVisitor): |
| 35 | + |
| 36 | + error_codes: Mapping[str, str] = { |
| 37 | + "ASYNC400": ( |
| 38 | + "Accessing attribute {} on ExceptionGroup as if it was a bare Exception." |
| 39 | + ) |
| 40 | + } |
| 41 | + |
| 42 | + def __init__(self, *args: Any, **kwargs: Any): |
| 43 | + super().__init__(*args, **kwargs) |
| 44 | + self.exception_groups: list[str] = [] |
| 45 | + self.trystar = False |
| 46 | + |
| 47 | + def visit_TryStar(self, node: ast.TryStar): # type: ignore[name-defined] |
| 48 | + self.save_state(node, "trystar") |
| 49 | + self.trystar = True |
| 50 | + |
| 51 | + def visit_Try(self, node: ast.Try): |
| 52 | + self.save_state(node, "trystar") |
| 53 | + self.trystar = False |
| 54 | + |
| 55 | + def visit_ExceptHandler(self, node: ast.ExceptHandler): |
| 56 | + if not self.trystar or node.name is None: |
| 57 | + return |
| 58 | + self.save_state(node, "exception_groups", copy=True) |
| 59 | + self.exception_groups.append(node.name) |
| 60 | + self.visit_nodes(node.body) |
| 61 | + |
| 62 | + def visit_Attribute(self, node: ast.Attribute): |
| 63 | + if ( |
| 64 | + isinstance(node.value, ast.Name) |
| 65 | + and node.value.id in self.exception_groups |
| 66 | + and node.attr not in EXCGROUP_ATTRS |
| 67 | + and not (node.attr.startswith("__") and node.attr.endswith("__")) |
| 68 | + ): |
| 69 | + self.error(node, node.attr) |
| 70 | + |
| 71 | + def _clear_if_name(self, node: ast.AST | None): |
| 72 | + if isinstance(node, ast.Name) and node.id in self.exception_groups: |
| 73 | + self.exception_groups.remove(node.id) |
| 74 | + |
| 75 | + def _walk_and_clear(self, node: ast.AST | None): |
| 76 | + if node is None: |
| 77 | + return |
| 78 | + for n in ast.walk(node): |
| 79 | + self._clear_if_name(n) |
| 80 | + |
| 81 | + def visit_Assign(self, node: ast.Assign): |
| 82 | + for t in node.targets: |
| 83 | + self._walk_and_clear(t) |
| 84 | + |
| 85 | + def visit_AnnAssign(self, node: ast.AnnAssign): |
| 86 | + self._clear_if_name(node.target) |
| 87 | + |
| 88 | + def visit_withitem(self, node: ast.withitem): |
| 89 | + self._walk_and_clear(node.optional_vars) |
| 90 | + |
| 91 | + def visit_FunctionDef( |
| 92 | + self, node: ast.FunctionDef | ast.AsyncFunctionDef | ast.Lambda |
| 93 | + ): |
| 94 | + self.save_state(node, "exception_groups", "trystar", copy=False) |
| 95 | + self.exception_groups = [] |
| 96 | + |
| 97 | + visit_AsyncFunctionDef = visit_FunctionDef |
| 98 | + visit_Lambda = visit_FunctionDef |
0 commit comments