1515from kombu .utils .json import dumps , loads
1616
1717from django .conf import settings
18- from django .db import transaction , close_old_connections
18+ from django .db import transaction , close_old_connections , router , DEFAULT_DB_ALIAS
1919from django .db .utils import DatabaseError , InterfaceError
2020from django .core .exceptions import ObjectDoesNotExist
2121
@@ -258,7 +258,7 @@ def schedule_changed(self):
258258 # other transactions until the current transaction is
259259 # committed (Issue #41).
260260 try :
261- transaction .commit ()
261+ transaction .commit (using = self . target_db )
262262 except transaction .TransactionManagementError :
263263 pass # not in transaction management.
264264
@@ -287,7 +287,17 @@ def reserve(self, entry):
287287 self ._dirty .add (new_entry .name )
288288 return new_entry
289289
290- def sync (self ):
290+ @property
291+ def target_db (self ):
292+ """Determine if there is a django route"""
293+ if not settings .DATABASE_ROUTERS :
294+ return DEFAULT_DB_ALIAS
295+ # If the project does not actually implement this method, DEFAULT_DB_ALIAS will be automatically returned.
296+ # The exception will be located to the django routing section
297+ db = router .db_for_write (self .Model )
298+ return db
299+
300+ def _sync (self ):
291301 if logger .isEnabledFor (logging .DEBUG ):
292302 debug ('Writing entries...' )
293303 _tried = set ()
@@ -313,6 +323,10 @@ def sync(self):
313323 # retry later, only for the failed ones
314324 self ._dirty |= _failed
315325
326+ def sync (self ):
327+ with transaction .atomic (using = self .target_db ):
328+ self ._sync ()
329+
316330 def update_from_dict (self , mapping ):
317331 s = {}
318332 for name , entry_fields in mapping .items ():
0 commit comments