1- use std:: { borrow:: Cow , collections:: HashMap , time:: Duration } ;
1+ use std:: { borrow:: Cow , collections:: HashMap , sync :: Arc , time:: Duration } ;
22
33use bson:: Document ;
44use serde:: Deserialize ;
@@ -7,11 +7,25 @@ use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
77use crate :: {
88 bson:: { doc, Bson } ,
99 error:: { CommandError , Error , ErrorKind } ,
10+ hello:: LEGACY_HELLO_COMMAND_NAME ,
1011 options:: { AuthMechanism , ClientOptions , Credential , ListDatabasesOptions , ServerAddress } ,
1112 runtime,
1213 selection_criteria:: { ReadPreference , ReadPreferenceOptions , SelectionCriteria } ,
13- test:: { log_uncaptured, util:: TestClient , CLIENT_OPTIONS , LOCK } ,
14+ test:: {
15+ log_uncaptured,
16+ util:: TestClient ,
17+ CmapEvent ,
18+ Event ,
19+ EventHandler ,
20+ FailCommandOptions ,
21+ FailPoint ,
22+ FailPointMode ,
23+ SdamEvent ,
24+ CLIENT_OPTIONS ,
25+ LOCK ,
26+ } ,
1427 Client ,
28+ ServerType ,
1529} ;
1630
1731#[ derive( Debug , Deserialize ) ]
@@ -663,3 +677,134 @@ async fn plain_auth() {
663677 }
664678 ) ;
665679}
680+
681+ /// Test verifies that retrying a commitTransaction operation after a checkOut
682+ /// failure works.
683+ #[ cfg_attr( feature = "tokio-runtime" , tokio:: test( flavor = "multi_thread" ) ) ]
684+ #[ cfg_attr( feature = "async-std-runtime" , async_std:: test) ]
685+ async fn retry_commit_txn_check_out ( ) {
686+ let _guard: RwLockWriteGuard < _ > = LOCK . run_exclusively ( ) . await ;
687+
688+ let setup_client = TestClient :: new ( ) . await ;
689+ if !setup_client. is_replica_set ( ) {
690+ log_uncaptured ( "skipping retry_commit_txn_check_out due to non-replicaset topology" ) ;
691+ return ;
692+ }
693+
694+ if !setup_client. supports_transactions ( ) {
695+ log_uncaptured ( "skipping retry_commit_txn_check_out due to lack of transaction support" ) ;
696+ return ;
697+ }
698+
699+ if !setup_client. supports_fail_command_appname_initial_handshake ( ) {
700+ log_uncaptured (
701+ "skipping retry_commit_txn_check_out due to insufficient failCommand support" ,
702+ ) ;
703+ return ;
704+ }
705+
706+ // ensure namespace exists
707+ setup_client
708+ . database ( "retry_commit_txn_check_out" )
709+ . collection ( "retry_commit_txn_check_out" )
710+ . insert_one ( doc ! { } , None )
711+ . await
712+ . unwrap ( ) ;
713+
714+ let mut options = CLIENT_OPTIONS . clone ( ) ;
715+ let handler = Arc :: new ( EventHandler :: new ( ) ) ;
716+ options. cmap_event_handler = Some ( handler. clone ( ) ) ;
717+ options. sdam_event_handler = Some ( handler. clone ( ) ) ;
718+ options. heartbeat_freq = Some ( Duration :: from_secs ( 120 ) ) ;
719+ options. app_name = Some ( "retry_commit_txn_check_out" . to_string ( ) ) ;
720+ let client = Client :: with_options ( options) . unwrap ( ) ;
721+
722+ let mut session = client. start_session ( None ) . await . unwrap ( ) ;
723+ session. start_transaction ( None ) . await . unwrap ( ) ;
724+ // transition transaction to "in progress" so that the commit
725+ // actually executes an operation.
726+ client
727+ . database ( "retry_commit_txn_check_out" )
728+ . collection ( "retry_commit_txn_check_out" )
729+ . insert_one_with_session ( doc ! { } , None , & mut session)
730+ . await
731+ . unwrap ( ) ;
732+
733+ // enable a fail point that clears the connection pools so that
734+ // commitTransaction will create a new connection during check out.
735+ let fp = FailPoint :: fail_command (
736+ & [ "ping" ] ,
737+ FailPointMode :: Times ( 1 ) ,
738+ FailCommandOptions :: builder ( ) . error_code ( 11600 ) . build ( ) ,
739+ ) ;
740+ let _guard = setup_client. enable_failpoint ( fp, None ) . await . unwrap ( ) ;
741+
742+ let mut subscriber = handler. subscribe ( ) ;
743+ client
744+ . database ( "foo" )
745+ . run_command ( doc ! { "ping" : 1 } , None )
746+ . await
747+ . unwrap_err ( ) ;
748+
749+ // failing with a state change error will request an immediate check
750+ // wait for the mark unknown and subsequent succeeded heartbeat
751+ let mut primary = None ;
752+ subscriber
753+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
754+ if let Event :: Sdam ( SdamEvent :: ServerDescriptionChanged ( event) ) = e {
755+ if event. is_marked_unknown_event ( ) {
756+ primary = Some ( event. address . clone ( ) ) ;
757+ return true ;
758+ }
759+ }
760+ false
761+ } )
762+ . await
763+ . expect ( "should see marked unknown event" ) ;
764+
765+ subscriber
766+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
767+ if let Event :: Sdam ( SdamEvent :: ServerDescriptionChanged ( event) ) = e {
768+ if & event. address == primary. as_ref ( ) . unwrap ( )
769+ && event. previous_description . server_type ( ) == ServerType :: Unknown
770+ {
771+ return true ;
772+ }
773+ }
774+ false
775+ } )
776+ . await
777+ . expect ( "should see mark available event" ) ;
778+
779+ // enable a failpoint on the handshake to cause check_out
780+ // to fail with a retryable error
781+ let fp = FailPoint :: fail_command (
782+ & [ LEGACY_HELLO_COMMAND_NAME , "hello" ] ,
783+ FailPointMode :: Times ( 1 ) ,
784+ FailCommandOptions :: builder ( )
785+ . error_code ( 11600 )
786+ . app_name ( "retry_commit_txn_check_out" . to_string ( ) )
787+ . build ( ) ,
788+ ) ;
789+ let _guard2 = setup_client. enable_failpoint ( fp, None ) . await . unwrap ( ) ;
790+
791+ // finally, attempt the commit.
792+ // this should succeed due to retry
793+ session. commit_transaction ( ) . await . unwrap ( ) ;
794+
795+ // ensure the first check out attempt fails
796+ subscriber
797+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
798+ matches ! ( e, Event :: Cmap ( CmapEvent :: ConnectionCheckOutFailed ( _) ) )
799+ } )
800+ . await
801+ . expect ( "should see check out failed event" ) ;
802+
803+ // ensure the second one succeeds
804+ subscriber
805+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
806+ matches ! ( e, Event :: Cmap ( CmapEvent :: ConnectionCheckedOut ( _) ) )
807+ } )
808+ . await
809+ . expect ( "should see checked out event" ) ;
810+ }
0 commit comments