@@ -8,9 +8,9 @@ use time::PreciseTime;
88use crate :: {
99 bson:: Document ,
1010 cmap:: Connection ,
11- error:: { ErrorKind , Result } ,
11+ error:: { Error , ErrorKind , Result } ,
1212 event:: command:: { CommandFailedEvent , CommandStartedEvent , CommandSucceededEvent } ,
13- operation:: Operation ,
13+ operation:: { Operation , Retryability } ,
1414 options:: SelectionCriteria ,
1515 sdam:: { Server , SessionSupportStatus } ,
1616} ;
@@ -93,8 +93,17 @@ impl Client {
9393 }
9494 } ;
9595
96+ let retryability = self . get_retryability ( & conn, & op) . await ?;
97+
98+ let txn_number = match session {
99+ Some ( ref mut session) if retryability == Retryability :: Write => {
100+ Some ( session. get_and_increment_txn_number ( ) )
101+ }
102+ _ => None ,
103+ } ;
104+
96105 let first_error = match self
97- . execute_operation_on_connection ( & op, & mut conn, & mut session)
106+ . execute_operation_on_connection ( & op, & mut conn, & mut session, txn_number )
98107 . await
99108 {
100109 Ok ( result) => {
@@ -103,16 +112,36 @@ impl Client {
103112 Err ( err) => {
104113 self . inner
105114 . topology
106- . handle_post_handshake_error ( err. clone ( ) , conn, server)
115+ . handle_post_handshake_error ( err. clone ( ) , & conn, server)
107116 . await ;
108- // TODO RUST-90: Do not retry if session is in a transaction
109- if self . inner . options . retry_reads == Some ( false )
110- || !op. is_read_retryable ( )
111- || !err. is_read_retryable ( )
117+
118+ // Retryable writes are only supported by storage engines with document-level
119+ // locking, so users need to disable retryable writes if using mmapv1.
120+ if let ErrorKind :: CommandError ( ref err) = err. kind . as_ref ( ) {
121+ if err. code == 20 && err. message . starts_with ( "Transaction numbers" ) {
122+ let mut err = err. clone ( ) ;
123+ err. message = "This MongoDB deployment does not support retryable writes. \
124+ Please add retryWrites=false to your connection string."
125+ . to_string ( ) ;
126+ return Err ( ErrorKind :: CommandError ( err) . into ( ) ) ;
127+ }
128+ }
129+
130+ // For a pre-4.4 connection, an error label should be added to any write-retryable
131+ // error as long as the retry_writes client option is not set to false. For a 4.4+
132+ // connection, a label should be added only to network errors.
133+ let err = match retryability {
134+ Retryability :: Write => get_error_with_retryable_write_label ( & conn, err) . await ?,
135+ _ => err,
136+ } ;
137+
138+ // TODO RUST-90: Do not retry read if session is in a transaction
139+ if retryability == Retryability :: Read && err. is_read_retryable ( )
140+ || retryability == Retryability :: Write && err. is_write_retryable ( )
112141 {
113- return Err ( err) ;
114- } else {
115142 err
143+ } else {
144+ return Err ( err) ;
116145 }
117146 }
118147 } ;
@@ -135,17 +164,28 @@ impl Client {
135164 }
136165 } ;
137166
167+ let retryability = self . get_retryability ( & conn, & op) . await ?;
168+ if retryability == Retryability :: None {
169+ return Err ( first_error) ;
170+ }
171+
138172 match self
139- . execute_operation_on_connection ( & op, & mut conn, & mut session)
173+ . execute_operation_on_connection ( & op, & mut conn, & mut session, txn_number )
140174 . await
141175 {
142176 Ok ( result) => Ok ( result) ,
143177 Err ( err) => {
144178 self . inner
145179 . topology
146- . handle_post_handshake_error ( err. clone ( ) , conn, server)
180+ . handle_post_handshake_error ( err. clone ( ) , & conn, server)
147181 . await ;
148- if err. is_server_error ( ) || err. is_read_retryable ( ) {
182+
183+ let err = match retryability {
184+ Retryability :: Write => get_error_with_retryable_write_label ( & conn, err) . await ?,
185+ _ => err,
186+ } ;
187+
188+ if err. is_server_error ( ) || err. is_read_retryable ( ) || err. is_write_retryable ( ) {
149189 Err ( err)
150190 } else {
151191 Err ( first_error)
@@ -160,6 +200,7 @@ impl Client {
160200 op : & T ,
161201 connection : & mut Connection ,
162202 session : & mut Option < & mut ClientSession > ,
203+ txn_number : Option < u64 > ,
163204 ) -> Result < T :: O > {
164205 if let Some ( wc) = op. write_concern ( ) {
165206 wc. validate ( ) ?;
@@ -174,6 +215,9 @@ impl Client {
174215 match session {
175216 Some ( ref mut session) if op. supports_sessions ( ) && op. is_acknowledged ( ) => {
176217 cmd. set_session ( session) ;
218+ if let Some ( txn_number) = txn_number {
219+ cmd. set_txn_number ( txn_number) ;
220+ }
177221 session. update_last_use ( ) ;
178222 }
179223 Some ( ref session) if !op. supports_sessions ( ) && !session. is_implicit ( ) => {
@@ -318,4 +362,37 @@ impl Client {
318362 _ => Ok ( initial_status) ,
319363 }
320364 }
365+
366+ /// Returns the retryability level for the execution of this operation.
367+ async fn get_retryability < T : Operation > (
368+ & self ,
369+ conn : & Connection ,
370+ op : & T ,
371+ ) -> Result < Retryability > {
372+ match op. retryability ( ) {
373+ Retryability :: Read if self . inner . options . retry_reads != Some ( false ) => {
374+ Ok ( Retryability :: Read )
375+ }
376+ Retryability :: Write
377+ if self . inner . options . retry_writes != Some ( false )
378+ && conn. stream_description ( ) ?. supports_retryable_writes ( ) =>
379+ {
380+ Ok ( Retryability :: Write )
381+ }
382+ _ => Ok ( Retryability :: None ) ,
383+ }
384+ }
385+ }
386+
387+ /// Returns an Error with a "RetryableWriteError" label added if necessary. On a pre-4.4
388+ /// connection, a label should be added to any write-retryable error. On a 4.4+ connection, a
389+ /// label should only be added to network errors. Regardless of server version, a label should
390+ /// only be added if the `retry_writes` client option is not set to `false`.
391+ async fn get_error_with_retryable_write_label ( conn : & Connection , err : Error ) -> Result < Error > {
392+ if let Some ( max_wire_version) = conn. stream_description ( ) ?. max_wire_version {
393+ if err. should_add_retryable_write_label ( max_wire_version) {
394+ return Ok ( err. with_label ( "RetryableWriteError" ) ) ;
395+ }
396+ }
397+ Ok ( err)
321398}
0 commit comments