@@ -511,6 +511,8 @@ async def read(self, bytes_needed: int) -> bytes:
511511 raise OSError ("connection is already closed" ) from None
512512 if self .transport and self .transport .is_closing ():
513513 raise OSError ("connection is already closed" )
514+ if self ._bytes_ready >= bytes_needed :
515+ return self ._read (bytes_needed )
514516 self ._pending_reads .append (bytes_needed )
515517 read_waiter = asyncio .get_running_loop ().create_future ()
516518 self ._pending_listeners .append (read_waiter )
@@ -540,18 +542,24 @@ def buffer_updated(self, nbytes: int) -> None:
540542 self ._bytes_ready += nbytes
541543
542544 # Bail we don't have the current requested number of bytes.
543- n_requested = self ._bytes_requested
544- if n_requested == 0 and self ._pending_reads :
545- n_requested = self ._pending_reads .popleft ()
546- if n_requested == 0 or self ._bytes_ready < n_requested :
545+ bytes_needed = self ._bytes_requested
546+ if bytes_needed == 0 and self ._pending_reads :
547+ bytes_needed = self ._pending_reads .popleft ()
548+ if bytes_needed == 0 or self ._bytes_ready < bytes_needed :
547549 return
548550
551+ data = self ._read (bytes_needed )
552+ waiter = self ._pending_listeners .popleft ()
553+ waiter .set_result (data )
554+
555+ def _read (self , bytes_needed ):
556+ """Read bytes from the buffer."""
549557 # Send the bytes to the listener.
550- self ._bytes_ready -= n_requested
558+ self ._bytes_ready -= bytes_needed
551559 self ._bytes_requested = 0
552- waiter = self . _pending_listeners . popleft ()
553- output_buf = bytearray (n_requested )
554- n_remaining = n_requested
560+
561+ output_buf = bytearray (bytes_needed )
562+ n_remaining = bytes_needed
555563 out_index = 0
556564 while n_remaining > 0 :
557565 buffer = self ._buffers .popleft ()
@@ -572,7 +580,7 @@ def buffer_updated(self, nbytes: int) -> None:
572580 n_remaining -= buffer_remaining
573581 buffer .start_index = 0
574582 self ._buffer_pool .append (buffer )
575- waiter . set_result ( output_buf )
583+ return output_buf
576584
577585
578586async def async_sendall (conn : PyMongoBaseProtocol , buf : bytes ) -> None :
0 commit comments