|
3 | 3 | Plugin, FunctionContext, ClassDefContext, DynamicClassDefContext, |
4 | 4 | SemanticAnalyzerPluginInterface |
5 | 5 | ) |
6 | | -from mypy.plugins.common import add_method |
| 6 | +from mypy.plugins.common import add_method, _get_argument |
7 | 7 | from mypy.nodes import ( |
8 | 8 | NameExpr, Expression, StrExpr, TypeInfo, ClassDef, Block, SymbolTable, SymbolTableNode, GDEF, |
9 | | - Argument, Var, ARG_STAR2, MDEF, TupleExpr, RefExpr |
| 9 | + Argument, Var, ARG_STAR2, MDEF, TupleExpr, RefExpr, AssignmentStmt, CallExpr, MemberExpr |
10 | 10 | ) |
11 | 11 | from mypy.types import ( |
12 | 12 | UnionType, NoneTyp, Instance, Type, AnyType, TypeOfAny, UninhabitedType, CallableType |
|
22 | 22 |
|
23 | 23 | COLUMN_NAME = 'sqlalchemy.sql.schema.Column' # type: Final |
24 | 24 | RELATIONSHIP_NAME = 'sqlalchemy.orm.relationships.RelationshipProperty' # type: Final |
| 25 | +FOREIGN_KEY_NAME = 'sqlalchemy.sql.schema.ForeignKey' # type: Final |
25 | 26 |
|
26 | 27 |
|
27 | 28 | def is_declarative(info: TypeInfo) -> bool: |
@@ -60,17 +61,17 @@ def get_function_hook(self, fullname: str) -> Optional[Callable[[FunctionContext |
60 | 61 | return model_hook |
61 | 62 | return None |
62 | 63 |
|
63 | | - def get_dynamic_class_hook(self, fullname: str) -> CB[DynamicClassDefContext]: |
| 64 | + def get_dynamic_class_hook(self, fullname: str): |
64 | 65 | if fullname == 'sqlalchemy.ext.declarative.api.declarative_base': |
65 | 66 | return decl_info_hook |
66 | 67 | return None |
67 | 68 |
|
68 | | - def get_class_decorator_hook(self, fullname: str) -> CB[ClassDefContext]: |
| 69 | + def get_class_decorator_hook(self, fullname: str): |
69 | 70 | if fullname == 'sqlalchemy.ext.declarative.api.as_declarative': |
70 | 71 | return decl_deco_hook |
71 | 72 | return None |
72 | 73 |
|
73 | | - def get_base_class_hook(self, fullname: str) -> CB[ClassDefContext]: |
| 74 | + def get_base_class_hook(self, fullname: str): |
74 | 75 | sym = self.lookup_fully_qualified(fullname) |
75 | 76 | if sym and isinstance(sym.node, TypeInfo): |
76 | 77 | if is_declarative(sym.node): |
@@ -105,6 +106,32 @@ def add_model_init_hook(ctx: ClassDefContext) -> None: |
105 | 106 | add_method(ctx, '__init__', [kw_arg], NoneTyp()) |
106 | 107 | ctx.cls.info.metadata.setdefault('sqlalchemy', {})['generated_init'] = True |
107 | 108 |
|
| 109 | + for stmt in ctx.cls.defs.body: |
| 110 | + if not (isinstance(stmt, AssignmentStmt) and len(stmt.lvalues) == 1 and isinstance(stmt.lvalues[0], NameExpr)): |
| 111 | + continue |
| 112 | + |
| 113 | + if stmt.lvalues[0].name == "__tablename__" and isinstance(stmt.rvalue, StrExpr): |
| 114 | + ctx.cls.info.metadata.setdefault('sqlalchemy', {})['tablename'] = stmt.rvalue.value |
| 115 | + |
| 116 | + if isinstance(stmt.rvalue, CallExpr) and stmt.rvalue.callee.fullname == COLUMN_NAME: |
| 117 | + colname = stmt.lvalues[0].name |
| 118 | + has_explicit_colname = stmt.rvalue |
| 119 | + ctx.cls.info.metadata.setdefault('sqlalchemy', {}).setdefault('columns', []).append(colname) |
| 120 | + for arg in stmt.rvalue.args: |
| 121 | + if isinstance(arg, CallExpr) and arg.callee.fullname == FOREIGN_KEY_NAME and len(arg.args) >= 1: |
| 122 | + fk = arg.args[0] |
| 123 | + if isinstance(fk, StrExpr): |
| 124 | + *_, parent_table, parent_col = fk.value.split(".") |
| 125 | + ctx.cls.info.metadata.setdefault('sqlalchemy', {}).setdefault('foreign_keys', {})[colname] = { |
| 126 | + "column": parent_col, |
| 127 | + "table": parent_table |
| 128 | + } |
| 129 | + elif isinstance(fk, MemberExpr): |
| 130 | + ctx.cls.info.metadata.setdefault('sqlalchemy', {}).setdefault('foreign_keys', {})[colname] = { |
| 131 | + "column": fk.name, |
| 132 | + "model": fk.expr.fullname |
| 133 | + } |
| 134 | + |
108 | 135 | # Also add a selection of auto-generated attributes. |
109 | 136 | sym = ctx.api.lookup_fully_qualified_or_none('sqlalchemy.sql.schema.Table') |
110 | 137 | if sym: |
|
0 commit comments