11use std:: io:: Read ;
22
33use bitflags:: bitflags;
4- use futures_io:: AsyncWrite ;
5- use futures_util:: {
6- io:: { BufReader , BufWriter } ,
7- AsyncReadExt ,
8- AsyncWriteExt ,
9- } ;
4+ use tokio:: io:: { AsyncReadExt , AsyncWrite , AsyncWriteExt , BufReader , BufWriter } ;
105
116use super :: header:: { Header , OpCode } ;
127use crate :: {
@@ -16,7 +11,7 @@ use crate::{
1611 Command ,
1712 } ,
1813 error:: { Error , ErrorKind , Result } ,
19- runtime:: { AsyncLittleEndianWrite , AsyncStream , SyncLittleEndianRead } ,
14+ runtime:: { AsyncStream , SyncLittleEndianRead } ,
2015} ;
2116
2217use crate :: compression:: { Compressor , Decoder } ;
@@ -129,7 +124,7 @@ impl Message {
129124 let mut reader = buf. as_slice ( ) ;
130125
131126 // Read original opcode (should be OP_MSG)
132- let original_opcode = reader. read_i32 ( ) ?;
127+ let original_opcode = reader. read_i32_sync ( ) ?;
133128 if original_opcode != OpCode :: Message as i32 {
134129 return Err ( ErrorKind :: InvalidResponse {
135130 message : format ! (
@@ -142,10 +137,10 @@ impl Message {
142137 }
143138
144139 // Read uncompressed size
145- let uncompressed_size = reader. read_i32 ( ) ?;
140+ let uncompressed_size = reader. read_i32_sync ( ) ?;
146141
147142 // Read compressor id
148- let compressor_id: u8 = reader. read_u8 ( ) ?;
143+ let compressor_id: u8 = reader. read_u8_sync ( ) ?;
149144
150145 // Get decoder
151146 let decoder = Decoder :: from_u8 ( compressor_id) ?;
@@ -178,7 +173,7 @@ impl Message {
178173 mut length_remaining : i32 ,
179174 header : & Header ,
180175 ) -> Result < Self > {
181- let flags = MessageFlags :: from_bits_truncate ( reader. read_u32 ( ) ?) ;
176+ let flags = MessageFlags :: from_bits_truncate ( reader. read_u32_sync ( ) ?) ;
182177 length_remaining -= std:: mem:: size_of :: < u32 > ( ) as i32 ;
183178
184179 let mut count_reader = SyncCountReader :: new ( & mut reader) ;
@@ -193,7 +188,7 @@ impl Message {
193188 let mut checksum = None ;
194189
195190 if length_remaining == 4 && flags. contains ( MessageFlags :: CHECKSUM_PRESENT ) {
196- checksum = Some ( reader. read_u32 ( ) ?) ;
191+ checksum = Some ( reader. read_u32_sync ( ) ?) ;
197192 } else if length_remaining != 0 {
198193 return Err ( ErrorKind :: InvalidResponse {
199194 message : format ! (
@@ -241,11 +236,11 @@ impl Message {
241236 } ;
242237
243238 header. write_to ( & mut writer) . await ?;
244- writer. write_u32 ( self . flags . bits ( ) ) . await ?;
239+ writer. write_u32_le ( self . flags . bits ( ) ) . await ?;
245240 writer. write_all ( & sections_bytes) . await ?;
246241
247242 if let Some ( checksum) = self . checksum {
248- writer. write_u32 ( checksum) . await ?;
243+ writer. write_u32_le ( checksum) . await ?;
249244 }
250245
251246 writer. flush ( ) . await ?;
@@ -292,9 +287,9 @@ impl Message {
292287 // Write header
293288 header. write_to ( & mut writer) . await ?;
294289 // Write original (pre-compressed) opcode (always OP_MSG)
295- writer. write_i32 ( OpCode :: Message as i32 ) . await ?;
290+ writer. write_i32_le ( OpCode :: Message as i32 ) . await ?;
296291 // Write uncompressed size
297- writer. write_i32 ( uncompressed_len as i32 ) . await ?;
292+ writer. write_i32_le ( uncompressed_len as i32 ) . await ?;
298293 // Write compressor id
299294 writer. write_u8 ( compressor_id) . await ?;
300295 // Write compressed message
@@ -329,15 +324,15 @@ pub(crate) enum MessageSection {
329324impl MessageSection {
330325 /// Reads bytes from `reader` and deserializes them into a MessageSection.
331326 fn read < R : Read > ( reader : & mut R ) -> Result < Self > {
332- let payload_type = reader. read_u8 ( ) ?;
327+ let payload_type = reader. read_u8_sync ( ) ?;
333328
334329 if payload_type == 0 {
335330 return Ok ( MessageSection :: Document ( bson_util:: read_document_bytes (
336331 reader,
337332 ) ?) ) ;
338333 }
339334
340- let size = reader. read_i32 ( ) ?;
335+ let size = reader. read_i32_sync ( ) ?;
341336 let mut length_remaining = size - std:: mem:: size_of :: < i32 > ( ) as i32 ;
342337
343338 let mut identifier = String :: new ( ) ;
@@ -385,7 +380,7 @@ impl MessageSection {
385380 // Write payload type.
386381 writer. write_u8 ( 1 ) . await ?;
387382
388- writer. write_i32 ( * size) . await ?;
383+ writer. write_i32_le ( * size) . await ?;
389384 super :: util:: write_cstring ( writer, identifier) . await ?;
390385
391386 for doc in documents {
0 commit comments