Skip to content

Commit c7d0159

Browse files
NatalyaGrigorevaNatalia Grigoreva
authored andcommitted
add base db interface
1 parent 4f0bec2 commit c7d0159

File tree

1 file changed

+237
-0
lines changed

1 file changed

+237
-0
lines changed
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import logging
2+
from typing import Any, Literal, Optional, Type, Union
3+
4+
from sqlalchemy import and_, delete, func, select
5+
from sqlalchemy.exc import IntegrityError
6+
from sqlalchemy.ext.asyncio import AsyncSession
7+
from sqlalchemy.sql import Select, column, distinct
8+
from sqlalchemy.sql.elements import UnaryExpression
9+
from sqlalchemy.sql.expression import BinaryExpression
10+
11+
from fastapi_jsonapi.data_layers.sqla.query_building import RelationshipInfo
12+
from fastapi_jsonapi.data_typing import TypeModel
13+
from fastapi_jsonapi.exceptions import BadRequest, InternalServerError, ObjectNotFound
14+
15+
log = logging.getLogger(__name__)
16+
17+
18+
class BaseSQLA:
19+
@classmethod
20+
def _check_field_exists(
21+
cls,
22+
model: TypeModel,
23+
key: str,
24+
) -> None:
25+
try:
26+
getattr(model, key)
27+
except AttributeError as ex:
28+
err_message = f"No fields `{key}` on `{type(model).__name__}`. Make sure schema conforms model."
29+
log.exception(err_message, exc_info=ex)
30+
raise InternalServerError(
31+
detail=err_message,
32+
pointer="/data",
33+
)
34+
35+
@classmethod
36+
def _fill(
37+
cls,
38+
model: TypeModel,
39+
relationships: list[tuple[str, TypeModel]],
40+
**kwargs,
41+
) -> None:
42+
for key, value in kwargs.items():
43+
cls._check_field_exists(model, key)
44+
setattr(model, key, value)
45+
for relation_name, related_data in relationships:
46+
cls._check_field_exists(model, relation_name)
47+
setattr(model, relation_name, related_data)
48+
49+
@classmethod
50+
async def _save(
51+
cls,
52+
session: AsyncSession,
53+
model: TypeModel,
54+
action_trigger: Literal["update", "create", "delete"],
55+
resource_type: str,
56+
commit: bool = True,
57+
id_: Optional[str] = None,
58+
**kwargs,
59+
) -> TypeModel:
60+
try:
61+
if not commit:
62+
await session.flush()
63+
return model
64+
65+
await session.commit()
66+
return model
67+
except IntegrityError as ex:
68+
err_message = f"Could not {action_trigger} object"
69+
log.exception("%s with data %s", err_message, kwargs, exc_info=ex)
70+
raise BadRequest(
71+
detail=err_message,
72+
pointer="/data",
73+
meta={
74+
"type": resource_type,
75+
"id": id_,
76+
},
77+
)
78+
except Exception as ex:
79+
err_message = f"Got an error {ex.__class__.__name__} during updating obj {kwargs} data in DB"
80+
log.exception(err_message, exc_info=ex)
81+
await session.rollback()
82+
raise InternalServerError(
83+
detail=err_message,
84+
pointer="/data",
85+
meta={
86+
"type": resource_type,
87+
"id": id_,
88+
},
89+
)
90+
91+
@classmethod
92+
async def all(
93+
cls,
94+
session: AsyncSession,
95+
stmt: Select,
96+
) -> Union[Type[TypeModel], Any]:
97+
return (await session.execute(stmt)).unique().scalars().all()
98+
99+
@classmethod
100+
async def count(
101+
cls,
102+
session: AsyncSession,
103+
stmt: Select,
104+
) -> int:
105+
stmt = select(func.count(distinct(column("id")))).select_from(stmt.subquery())
106+
return (await session.execute(stmt)).scalar_one()
107+
108+
@classmethod
109+
async def create(
110+
cls,
111+
session: AsyncSession,
112+
model: TypeModel,
113+
resource_type: str,
114+
relationships: list[tuple[str, TypeModel]],
115+
commit: bool = True,
116+
id_: Optional[str] = None,
117+
**kwargs,
118+
) -> TypeModel:
119+
cls._fill(model, relationships, **kwargs)
120+
session.add(model)
121+
return await cls._save(
122+
session=session,
123+
model=model,
124+
action_trigger="create",
125+
resource_type=resource_type,
126+
commit=commit,
127+
id_=id_,
128+
**kwargs,
129+
)
130+
131+
@classmethod
132+
async def delete(
133+
cls,
134+
session: AsyncSession,
135+
model: TypeModel,
136+
filters: list[Union[BinaryExpression, bool]],
137+
resource_type: str,
138+
commit: bool = True,
139+
id_: Optional[str] = None,
140+
**kwargs,
141+
) -> None:
142+
await session.execute(delete(model).where(*filters))
143+
await cls._save(
144+
session=session,
145+
model=model,
146+
action_trigger="delete",
147+
resource_type=resource_type,
148+
commit=commit,
149+
id_=id_,
150+
**kwargs,
151+
)
152+
153+
@classmethod
154+
async def one_or_raise(
155+
cls,
156+
session: AsyncSession,
157+
model: TypeModel,
158+
filters: list[Union[BinaryExpression, bool]],
159+
stmt: Select,
160+
) -> Union[TypeModel, Any]:
161+
result = (await session.execute(stmt)).scalar_one_or_none()
162+
if result is None:
163+
compiled_conditions = and_(*filters).compile(
164+
dialect=session.bind.dialect,
165+
compile_kwargs={"literal_binds": True},
166+
)
167+
raise ObjectNotFound(
168+
detail=f"Resource {model.__name__} `{compiled_conditions}` not found",
169+
)
170+
return result
171+
172+
@classmethod
173+
async def query(
174+
cls,
175+
model: TypeModel,
176+
distinct_: bool = False,
177+
fields: Optional[list] = None,
178+
filters: Optional[list[Union[BinaryExpression, bool]]] = None,
179+
for_update: Optional[dict] = None,
180+
join: Optional[list[RelationshipInfo]] = None,
181+
number: Optional[int] = None,
182+
options: set = (),
183+
order: Optional[Union[str, UnaryExpression]] = None,
184+
size: Optional[int] = None,
185+
stmt: Optional[Select] = None,
186+
) -> Select:
187+
if stmt is None:
188+
stmt = select(model) if fields is None else select(*fields)
189+
190+
if filters is not None:
191+
stmt = stmt.where(*filters)
192+
193+
if options:
194+
stmt = stmt.options(*options)
195+
196+
if for_update is not None:
197+
stmt = stmt.with_for_update(**for_update)
198+
199+
if order is not None:
200+
stmt = stmt.order_by(*order)
201+
202+
if join:
203+
for relationship_info in join:
204+
stmt = stmt.join(relationship_info.aliased_model, relationship_info.join_column)
205+
206+
if size not in [0, None]:
207+
stmt = stmt.limit(size)
208+
if number:
209+
stmt = stmt.offset((number - 1) * size)
210+
211+
if distinct_:
212+
stmt = stmt.distinct()
213+
214+
return stmt
215+
216+
@classmethod
217+
async def update(
218+
cls,
219+
session: AsyncSession,
220+
model: TypeModel,
221+
resource_type: str,
222+
relationships: list[tuple[str, TypeModel]],
223+
commit: bool = True,
224+
id_: Optional[str] = None,
225+
**kwargs,
226+
) -> TypeModel:
227+
cls._fill(model, relationships, **kwargs)
228+
session.add(model)
229+
return await cls._save(
230+
session=session,
231+
model=model,
232+
action_trigger="update",
233+
resource_type=resource_type,
234+
commit=commit,
235+
id_=id_,
236+
**kwargs,
237+
)

0 commit comments

Comments
 (0)