@@ -103,7 +103,7 @@ async fn sdam_pool_management() {
103103 . build ( )
104104 . await ;
105105
106- let mut subscriber = client. events . subscribe_all ( ) ;
106+ let mut subscriber = client. events . stream_all ( ) ;
107107
108108 if !VersionReq :: parse ( ">= 4.2.9" )
109109 . unwrap ( )
@@ -116,14 +116,14 @@ async fn sdam_pool_management() {
116116 }
117117
118118 subscriber
119- . wait_for_event ( Duration :: from_millis ( 500 ) , |event| {
119+ . next_match ( Duration :: from_millis ( 500 ) , |event| {
120120 matches ! ( event, Event :: Cmap ( CmapEvent :: PoolReady ( _) ) )
121121 } )
122122 . await
123123 . expect ( "should see pool ready event" ) ;
124124
125125 subscriber
126- . wait_for_event ( Duration :: from_millis ( 500 ) , |event| {
126+ . next_match ( Duration :: from_millis ( 500 ) , |event| {
127127 matches ! ( event, Event :: Sdam ( SdamEvent :: ServerHeartbeatSucceeded ( _) ) )
128128 } )
129129 . await
@@ -143,9 +143,7 @@ async fn sdam_pool_management() {
143143
144144 // Since there is no deterministic ordering, simply collect all the events and check for their
145145 // presence.
146- let events = subscriber
147- . collect_events ( Duration :: from_secs ( 1 ) , |_| true )
148- . await ;
146+ let events = subscriber. collect ( Duration :: from_secs ( 1 ) , |_| true ) . await ;
149147 assert ! ( events
150148 . iter( )
151149 . any( |e| matches!( e, Event :: Sdam ( SdamEvent :: ServerHeartbeatFailed ( _) ) ) ) ) ;
@@ -186,7 +184,7 @@ async fn hello_ok_true() {
186184
187185 let buffer = EventBuffer :: new ( ) ;
188186
189- let mut subscriber = buffer. subscribe ( ) ;
187+ let mut event_stream = buffer. stream ( ) ;
190188
191189 let mut options = setup_client_options. clone ( ) ;
192190 options. sdam_event_handler = Some ( buffer. handler ( ) ) ;
@@ -195,8 +193,8 @@ async fn hello_ok_true() {
195193 let _client = Client :: with_options ( options) . expect ( "client creation should succeed" ) ;
196194
197195 // first heartbeat should be legacy hello but contain helloOk
198- subscriber
199- . wait_for_event ( Duration :: from_millis ( 2000 ) , |event| {
196+ event_stream
197+ . next_match ( Duration :: from_millis ( 2000 ) , |event| {
200198 if let Event :: Sdam ( SdamEvent :: ServerHeartbeatSucceeded ( e) ) = event {
201199 assert_eq ! ( e. reply. get_bool( "helloOk" ) , Ok ( true ) ) ;
202200 assert ! ( e. reply. get( LEGACY_HELLO_COMMAND_NAME_LOWERCASE ) . is_some( ) ) ;
@@ -210,8 +208,8 @@ async fn hello_ok_true() {
210208
211209 // subsequent heartbeats should just be hello
212210 for _ in 0 ..3 {
213- subscriber
214- . wait_for_event ( Duration :: from_millis ( 2000 ) , |event| {
211+ event_stream
212+ . next_match ( Duration :: from_millis ( 2000 ) , |event| {
215213 if let Event :: Sdam ( SdamEvent :: ServerHeartbeatSucceeded ( e) ) = event {
216214 assert ! ( e. reply. get( "isWritablePrimary" ) . is_some( ) ) ;
217215 assert ! ( e. reply. get( LEGACY_HELLO_COMMAND_NAME_LOWERCASE ) . is_none( ) ) ;
@@ -270,13 +268,13 @@ async fn removed_server_monitor_stops() -> crate::error::Result<()> {
270268 let hosts = options. hosts . clone ( ) ;
271269 let set_name = options. repl_set_name . clone ( ) . unwrap ( ) ;
272270
273- let mut subscriber = buffer. subscribe ( ) ;
271+ let mut event_stream = buffer. stream ( ) ;
274272 let topology = Topology :: new ( options) ?;
275273
276274 // Wait until all three monitors have started.
277275 let mut seen_monitors = HashSet :: new ( ) ;
278- subscriber
279- . wait_for_event ( Duration :: from_millis ( 500 ) , |event| {
276+ event_stream
277+ . next_match ( Duration :: from_millis ( 500 ) , |event| {
280278 if let Event :: Sdam ( SdamEvent :: ServerHeartbeatStarted ( e) ) = event {
281279 seen_monitors. insert ( e. server_address . clone ( ) ) ;
282280 }
@@ -315,13 +313,13 @@ async fn removed_server_monitor_stops() -> crate::error::Result<()> {
315313 ) )
316314 . await ;
317315
318- subscriber . wait_for_event ( Duration :: from_secs ( 1 ) , |event| {
316+ event_stream . next_match ( Duration :: from_secs ( 1 ) , |event| {
319317 matches ! ( event, Event :: Sdam ( SdamEvent :: ServerClosed ( e) ) if e. address == hosts[ 2 ] )
320318 } ) . await . expect ( "should see server closed event" ) ;
321319
322320 // Capture heartbeat events for 1 second. The monitor for the removed server should stop
323321 // publishing them.
324- let events = subscriber . collect_events ( Duration :: from_secs ( 1 ) , |event| {
322+ let events = event_stream . collect ( Duration :: from_secs ( 1 ) , |event| {
325323 matches ! ( event, Event :: Sdam ( SdamEvent :: ServerHeartbeatStarted ( e) ) if e. server_address == hosts[ 2 ] )
326324 } ) . await ;
327325
0 commit comments