@@ -264,11 +264,22 @@ public IAsyncCursor<TResult> Execute(IReadBinding binding, CancellationToken can
264264 throw new ArgumentException ( "The binding value passed to ChangeStreamOperation.Execute must implement IReadBindingHandle." , nameof ( binding ) ) ;
265265 }
266266
267- var cursor = Resume ( bindingHandle , cancellationToken ) ;
268- if ( _startAtOperationTime == null && _resumeAfter == null )
267+ IAsyncCursor < RawBsonDocument > cursor ;
268+ using ( var channelSource = binding . GetReadChannelSource ( cancellationToken ) )
269+ using ( var channel = channelSource . GetChannel ( cancellationToken ) )
270+ using ( var channelBinding = new ChannelReadBinding ( channelSource . Server , channel , binding . ReadPreference , binding . Session . Fork ( ) ) )
269271 {
270- _startAtOperationTime = binding . Session . OperationTime ;
272+ cursor = Resume ( channelBinding , cancellationToken ) ;
273+ if ( _startAtOperationTime == null && _resumeAfter == null )
274+ {
275+ var maxWireVersion = channel . ConnectionDescription . IsMasterResult . MaxWireVersion ;
276+ if ( maxWireVersion >= 7 )
277+ {
278+ _startAtOperationTime = binding . Session . OperationTime ;
279+ }
280+ }
271281 }
282+
272283 return new ChangeStreamCursor < TResult > ( cursor , _resultSerializer , bindingHandle . Fork ( ) , this ) ;
273284 }
274285
@@ -281,11 +292,22 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(IReadBinding binding, Canc
281292 throw new ArgumentException ( "The binding value passed to ChangeStreamOperation.ExecuteAsync must implement IReadBindingHandle." , nameof ( binding ) ) ;
282293 }
283294
284- var cursor = await ResumeAsync ( bindingHandle , cancellationToken ) . ConfigureAwait ( false ) ;
285- if ( _startAtOperationTime == null && _resumeAfter == null )
295+ IAsyncCursor < RawBsonDocument > cursor ;
296+ using ( var channelSource = await binding . GetReadChannelSourceAsync ( cancellationToken ) . ConfigureAwait ( false ) )
297+ using ( var channel = await channelSource . GetChannelAsync ( cancellationToken ) . ConfigureAwait ( false ) )
298+ using ( var channelBinding = new ChannelReadBinding ( channelSource . Server , channel , binding . ReadPreference , binding . Session . Fork ( ) ) )
286299 {
287- _startAtOperationTime = binding . Session . OperationTime ;
300+ cursor = await ResumeAsync ( channelBinding , cancellationToken ) . ConfigureAwait ( false ) ;
301+ if ( _startAtOperationTime == null && _resumeAfter == null )
302+ {
303+ var maxWireVersion = channel . ConnectionDescription . IsMasterResult . MaxWireVersion ;
304+ if ( maxWireVersion >= 7 )
305+ {
306+ _startAtOperationTime = binding . Session . OperationTime ;
307+ }
308+ }
288309 }
310+
289311 return new ChangeStreamCursor < TResult > ( cursor , _resultSerializer , bindingHandle . Fork ( ) , this ) ;
290312 }
291313
0 commit comments