11use std:: io:: Read ;
22
33use bitflags:: bitflags;
4- use futures_io:: AsyncWrite ;
5- use futures_util:: {
6- io:: { BufReader , BufWriter } ,
7- AsyncReadExt ,
8- AsyncWriteExt ,
9- } ;
4+ use tokio:: io:: { AsyncReadExt , AsyncWrite , AsyncWriteExt , BufReader , BufWriter } ;
105
116use super :: header:: { Header , OpCode } ;
127use crate :: {
@@ -16,7 +11,7 @@ use crate::{
1611 Command ,
1712 } ,
1813 error:: { Error , ErrorKind , Result } ,
19- runtime:: { AsyncLittleEndianWrite , AsyncStream , SyncLittleEndianRead } ,
14+ runtime:: { AsyncStream , SyncLittleEndianRead } ,
2015} ;
2116
2217use crate :: compression:: { Compressor , Decoder } ;
@@ -139,7 +134,7 @@ impl Message {
139134 let mut reader = buf. as_slice ( ) ;
140135
141136 // Read original opcode (should be OP_MSG)
142- let original_opcode = reader. read_i32 ( ) ?;
137+ let original_opcode = reader. read_i32_sync ( ) ?;
143138 if original_opcode != OpCode :: Message as i32 {
144139 return Err ( ErrorKind :: InvalidResponse {
145140 message : format ! (
@@ -152,10 +147,10 @@ impl Message {
152147 }
153148
154149 // Read uncompressed size
155- let uncompressed_size = reader. read_i32 ( ) ?;
150+ let uncompressed_size = reader. read_i32_sync ( ) ?;
156151
157152 // Read compressor id
158- let compressor_id: u8 = reader. read_u8 ( ) ?;
153+ let compressor_id: u8 = reader. read_u8_sync ( ) ?;
159154
160155 // Get decoder
161156 let decoder = Decoder :: from_u8 ( compressor_id) ?;
@@ -188,7 +183,7 @@ impl Message {
188183 mut length_remaining : i32 ,
189184 header : & Header ,
190185 ) -> Result < Self > {
191- let flags = MessageFlags :: from_bits_truncate ( reader. read_u32 ( ) ?) ;
186+ let flags = MessageFlags :: from_bits_truncate ( reader. read_u32_sync ( ) ?) ;
192187 length_remaining -= std:: mem:: size_of :: < u32 > ( ) as i32 ;
193188
194189 let mut count_reader = SyncCountReader :: new ( & mut reader) ;
@@ -203,7 +198,7 @@ impl Message {
203198 let mut checksum = None ;
204199
205200 if length_remaining == 4 && flags. contains ( MessageFlags :: CHECKSUM_PRESENT ) {
206- checksum = Some ( reader. read_u32 ( ) ?) ;
201+ checksum = Some ( reader. read_u32_sync ( ) ?) ;
207202 } else if length_remaining != 0 {
208203 return Err ( ErrorKind :: InvalidResponse {
209204 message : format ! (
@@ -251,11 +246,11 @@ impl Message {
251246 } ;
252247
253248 header. write_to ( & mut writer) . await ?;
254- writer. write_u32 ( self . flags . bits ( ) ) . await ?;
249+ writer. write_u32_le ( self . flags . bits ( ) ) . await ?;
255250 writer. write_all ( & sections_bytes) . await ?;
256251
257252 if let Some ( checksum) = self . checksum {
258- writer. write_u32 ( checksum) . await ?;
253+ writer. write_u32_le ( checksum) . await ?;
259254 }
260255
261256 writer. flush ( ) . await ?;
@@ -302,9 +297,9 @@ impl Message {
302297 // Write header
303298 header. write_to ( & mut writer) . await ?;
304299 // Write original (pre-compressed) opcode (always OP_MSG)
305- writer. write_i32 ( OpCode :: Message as i32 ) . await ?;
300+ writer. write_i32_le ( OpCode :: Message as i32 ) . await ?;
306301 // Write uncompressed size
307- writer. write_i32 ( uncompressed_len as i32 ) . await ?;
302+ writer. write_i32_le ( uncompressed_len as i32 ) . await ?;
308303 // Write compressor id
309304 writer. write_u8 ( compressor_id) . await ?;
310305 // Write compressed message
@@ -341,15 +336,15 @@ pub(crate) enum MessageSection {
341336impl MessageSection {
342337 /// Reads bytes from `reader` and deserializes them into a MessageSection.
343338 fn read < R : Read > ( reader : & mut R ) -> Result < Self > {
344- let payload_type = reader. read_u8 ( ) ?;
339+ let payload_type = reader. read_u8_sync ( ) ?;
345340
346341 if payload_type == 0 {
347342 return Ok ( MessageSection :: Document ( bson_util:: read_document_bytes (
348343 reader,
349344 ) ?) ) ;
350345 }
351346
352- let size = reader. read_i32 ( ) ?;
347+ let size = reader. read_i32_sync ( ) ?;
353348 let mut length_remaining = size - std:: mem:: size_of :: < i32 > ( ) as i32 ;
354349
355350 let mut identifier = String :: new ( ) ;
@@ -397,7 +392,7 @@ impl MessageSection {
397392 // Write payload type.
398393 writer. write_u8 ( 1 ) . await ?;
399394
400- writer. write_i32 ( * size) . await ?;
395+ writer. write_i32_le ( * size) . await ?;
401396 super :: util:: write_cstring ( writer, identifier) . await ?;
402397
403398 for doc in documents {
0 commit comments