100100
101101from bson .binary import Binary
102102from bson .int64 import Int64
103- from bson .py3compat import abc , reraise_instance
103+ from bson .py3compat import abc , integer_types , reraise_instance
104104from bson .son import SON
105105from bson .timestamp import Timestamp
106106
@@ -158,18 +158,35 @@ class TransactionOptions(object):
158158 """Options for :meth:`ClientSession.start_transaction`.
159159
160160 :Parameters:
161- - `read_concern`: The :class:`~pymongo.read_concern.ReadConcern` to use
162- for this transaction.
163- - `write_concern`: The :class:`~pymongo.write_concern.WriteConcern` to
164- use for this transaction.
161+ - `read_concern` (optional): The
162+ :class:`~pymongo.read_concern.ReadConcern` to use for this transaction.
163+ If ``None`` (the default) the :attr:`read_preference` of
164+ the :class:`MongoClient` is used.
165+ - `write_concern` (optional): The
166+ :class:`~pymongo.write_concern.WriteConcern` to use for this
167+ transaction. If ``None`` (the default) the :attr:`read_preference` of
168+ the :class:`MongoClient` is used.
169+ - `read_preference` (optional): The read preference to use. If
170+ ``None`` (the default) the :attr:`read_preference` of this
171+ :class:`MongoClient` is used. See :mod:`~pymongo.read_preferences`
172+ for options. Transactions which read must use
173+ :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`.
174+ - `max_commit_time_ms` (optional): The maximum amount of time to allow a
175+ single commitTransaction command to run. This option is an alias for
176+ maxTimeMS option on the commitTransaction command. If ``None`` (the
177+ default) maxTimeMS is not used.
178+
179+ .. versionchanged:: 3.9
180+ Added the ``max_commit_time_ms`` option.
165181
166182 .. versionadded:: 3.7
167183 """
168184 def __init__ (self , read_concern = None , write_concern = None ,
169- read_preference = None ):
185+ read_preference = None , max_commit_time_ms = None ):
170186 self ._read_concern = read_concern
171187 self ._write_concern = write_concern
172188 self ._read_preference = read_preference
189+ self ._max_commit_time_ms = max_commit_time_ms
173190 if read_concern is not None :
174191 if not isinstance (read_concern , ReadConcern ):
175192 raise TypeError ("read_concern must be an instance of "
@@ -189,6 +206,10 @@ def __init__(self, read_concern=None, write_concern=None,
189206 raise TypeError ("%r is not valid for read_preference. See "
190207 "pymongo.read_preferences for valid "
191208 "options." % (read_preference ,))
209+ if max_commit_time_ms is not None :
210+ if not isinstance (max_commit_time_ms , integer_types ):
211+ raise TypeError (
212+ "max_commit_time_ms must be an integer or None" )
192213
193214 @property
194215 def read_concern (self ):
@@ -206,6 +227,14 @@ def read_preference(self):
206227 """
207228 return self ._read_preference
208229
230+ @property
231+ def max_commit_time_ms (self ):
232+ """The maxTimeMS to use when running a commitTransaction command.
233+
234+ .. versionadded:: 3.9
235+ """
236+ return self ._max_commit_time_ms
237+
209238
210239def _validate_session_write_concern (session , write_concern ):
211240 """Validate that an explicit session is not used with an unack'ed write.
@@ -279,10 +308,16 @@ def _reraise_with_unknown_commit(exc):
279308 reraise_instance (exc , trace = sys .exc_info ()[2 ])
280309
281310
311+ def _max_time_expired_error (exc ):
312+ """Return true if exc is a MaxTimeMSExpired error."""
313+ return isinstance (exc , OperationFailure ) and exc .code == 50
314+
315+
282316# From the transactions spec, all the retryable writes errors plus
283317# WriteConcernFailed.
284318_UNKNOWN_COMMIT_ERROR_CODES = _RETRYABLE_ERROR_CODES | frozenset ([
285319 64 , # WriteConcernFailed
320+ 50 , # MaxTimeMSExpired
286321])
287322
288323# From the Convenient API for Transactions spec, with_transaction must
@@ -380,7 +415,7 @@ def _inherit_option(self, name, val):
380415 return getattr (self .client , name )
381416
382417 def with_transaction (self , callback , read_concern = None , write_concern = None ,
383- read_preference = None ):
418+ read_preference = None , max_commit_time_ms = None ):
384419 """Execute a callback in a transaction.
385420
386421 This method starts a transaction on this session, executes ``callback``
@@ -465,7 +500,8 @@ def callback(session, custom_arg, custom_kwarg=None):
465500 start_time = monotonic .time ()
466501 while True :
467502 self .start_transaction (
468- read_concern , write_concern , read_preference )
503+ read_concern , write_concern , read_preference ,
504+ max_commit_time_ms )
469505 try :
470506 ret = callback (self )
471507 except Exception as exc :
@@ -488,7 +524,8 @@ def callback(session, custom_arg, custom_kwarg=None):
488524 self .commit_transaction ()
489525 except PyMongoError as exc :
490526 if (exc .has_error_label ("UnknownTransactionCommitResult" )
491- and _within_time_limit (start_time )):
527+ and _within_time_limit (start_time )
528+ and not _max_time_expired_error (exc )):
492529 # Retry the commit.
493530 continue
494531
@@ -502,11 +539,14 @@ def callback(session, custom_arg, custom_kwarg=None):
502539 return ret
503540
504541 def start_transaction (self , read_concern = None , write_concern = None ,
505- read_preference = None ):
542+ read_preference = None , max_commit_time_ms = None ):
506543 """Start a multi-statement transaction.
507544
508545 Takes the same arguments as :class:`TransactionOptions`.
509546
547+ .. versionchanged:: 3.9
548+ Added the ``max_commit_time_ms`` option.
549+
510550 .. versionadded:: 3.7
511551 """
512552 self ._check_ended ()
@@ -518,9 +558,13 @@ def start_transaction(self, read_concern=None, write_concern=None,
518558 write_concern = self ._inherit_option ("write_concern" , write_concern )
519559 read_preference = self ._inherit_option (
520560 "read_preference" , read_preference )
561+ if max_commit_time_ms is None :
562+ opts = self .options .default_transaction_options
563+ if opts :
564+ max_commit_time_ms = opts .max_commit_time_ms
521565
522566 self ._transaction .opts = TransactionOptions (
523- read_concern , write_concern , read_preference )
567+ read_concern , write_concern , read_preference , max_commit_time_ms )
524568 self ._transaction .reset ()
525569 self ._transaction .state = _TxnState .STARTING
526570 self ._start_retryable_write ()
@@ -631,18 +675,25 @@ def _finish_transaction_with_retry(self, command_name, explict_retry):
631675 raise exc
632676
633677 def _finish_transaction (self , command_name , retrying ):
634- # Transaction spec says that after the initial commit attempt,
635- # subsequent commitTransaction commands should be upgraded to use
636- # w:"majority" and set a default value of 10 seconds for wtimeout.
637- wc = self ._transaction .opts .write_concern
638- if retrying and command_name == "commitTransaction" :
639- wc_doc = wc .document
640- wc_doc ["w" ] = "majority"
641- wc_doc .setdefault ("wtimeout" , 10000 )
642- wc = WriteConcern (** wc_doc )
678+ opts = self ._transaction .opts
679+ wc = opts .write_concern
643680 cmd = SON ([(command_name , 1 )])
681+ if command_name == "commitTransaction" :
682+ if opts .max_commit_time_ms :
683+ cmd ['maxTimeMS' ] = opts .max_commit_time_ms
684+
685+ # Transaction spec says that after the initial commit attempt,
686+ # subsequent commitTransaction commands should be upgraded to use
687+ # w:"majority" and set a default value of 10 seconds for wtimeout.
688+ if retrying :
689+ wc_doc = wc .document
690+ wc_doc ["w" ] = "majority"
691+ wc_doc .setdefault ("wtimeout" , 10000 )
692+ wc = WriteConcern (** wc_doc )
693+
644694 if self ._transaction .recovery_token :
645695 cmd ['recoveryToken' ] = self ._transaction .recovery_token
696+
646697 with self ._client ._socket_for_writes (self ) as sock_info :
647698 return self ._client .admin ._command (
648699 sock_info ,
0 commit comments