|
1 | 1 | import contextlib |
| 2 | +import logging |
2 | 3 | import threading |
3 | 4 | from contextlib import contextmanager |
4 | 5 |
|
| 6 | +from pymongo.errors import ConnectionFailure, OperationFailure |
5 | 7 | from pymongo.read_concern import ReadConcern |
6 | 8 | from pymongo.write_concern import WriteConcern |
7 | 9 |
|
8 | 10 | from mongoengine.base.fields import _no_dereference_for_fields |
9 | 11 | from mongoengine.common import _import_class |
10 | | -from mongoengine.connection import DEFAULT_CONNECTION_NAME, get_db |
| 12 | +from mongoengine.connection import ( |
| 13 | + DEFAULT_CONNECTION_NAME, |
| 14 | + _clear_session, |
| 15 | + _get_session, |
| 16 | + _set_session, |
| 17 | + get_connection, |
| 18 | + get_db, |
| 19 | +) |
11 | 20 | from mongoengine.pymongo_support import count_documents |
12 | 21 |
|
13 | 22 | __all__ = ( |
|
19 | 28 | "set_write_concern", |
20 | 29 | "set_read_write_concern", |
21 | 30 | "no_dereferencing_active_for_class", |
| 31 | + "run_in_transaction", |
22 | 32 | ) |
23 | 33 |
|
24 | 34 |
|
@@ -231,11 +241,11 @@ def __init__(self, alias=DEFAULT_CONNECTION_NAME): |
231 | 241 | } |
232 | 242 |
|
233 | 243 | def _turn_on_profiling(self): |
234 | | - profile_update_res = self.db.command({"profile": 0}) |
| 244 | + profile_update_res = self.db.command({"profile": 0}, session=_get_session()) |
235 | 245 | self.initial_profiling_level = profile_update_res["was"] |
236 | 246 |
|
237 | 247 | self.db.system.profile.drop() |
238 | | - self.db.command({"profile": 2}) |
| 248 | + self.db.command({"profile": 2}, session=_get_session()) |
239 | 249 |
|
240 | 250 | def _resets_profiling(self): |
241 | 251 | self.db.command({"profile": self.initial_profiling_level}) |
@@ -311,3 +321,60 @@ def set_read_write_concern(collection, write_concerns, read_concerns): |
311 | 321 | write_concern=WriteConcern(**combined_write_concerns), |
312 | 322 | read_concern=ReadConcern(**combined_read_concerns), |
313 | 323 | ) |
| 324 | + |
| 325 | + |
| 326 | +def _commit_with_retry(session): |
| 327 | + while True: |
| 328 | + try: |
| 329 | + # Commit uses write concern set at transaction start. |
| 330 | + session.commit_transaction() |
| 331 | + break |
| 332 | + except (ConnectionFailure, OperationFailure) as exc: |
| 333 | + # Can retry commit |
| 334 | + if exc.has_error_label("UnknownTransactionCommitResult"): |
| 335 | + logging.warning( |
| 336 | + "UnknownTransactionCommitResult, retrying commit operation ..." |
| 337 | + ) |
| 338 | + continue |
| 339 | + else: |
| 340 | + # Error during commit |
| 341 | + raise |
| 342 | + |
| 343 | + |
| 344 | +@contextmanager |
| 345 | +def run_in_transaction( |
| 346 | + alias=DEFAULT_CONNECTION_NAME, session_kwargs=None, transaction_kwargs=None |
| 347 | +): |
| 348 | + """run_in_transaction context manager |
| 349 | + Execute queries within the context in a database transaction. |
| 350 | +
|
| 351 | + Usage: |
| 352 | +
|
| 353 | + .. code-block:: python |
| 354 | +
|
| 355 | + class A(Document): |
| 356 | + name = StringField() |
| 357 | +
|
| 358 | + with run_in_transaction(): |
| 359 | + a_doc = A.objects.create(name="a") |
| 360 | + a_doc.update(name="b") |
| 361 | +
|
| 362 | + Be aware that: |
| 363 | + - Mongo transactions run inside a session which is bound to a connection. If you attempt to |
| 364 | + execute a transaction across a different connection alias, pymongo will raise an exception. In |
| 365 | + other words: you cannot create a transaction that crosses different database connections. That |
| 366 | + said, multiple transaction can be nested within the same session for particular connection. |
| 367 | +
|
| 368 | + For more information regarding pymongo transactions: https://pymongo.readthedocs.io/en/stable/api/pymongo/client_session.html#transactions |
| 369 | + """ |
| 370 | + conn = get_connection(alias) |
| 371 | + session_kwargs = session_kwargs or {} |
| 372 | + with conn.start_session(**session_kwargs) as session: |
| 373 | + transaction_kwargs = transaction_kwargs or {} |
| 374 | + with session.start_transaction(**transaction_kwargs): |
| 375 | + try: |
| 376 | + _set_session(session) |
| 377 | + yield |
| 378 | + _commit_with_retry(session) |
| 379 | + finally: |
| 380 | + _clear_session() |
0 commit comments