@@ -3,6 +3,7 @@ use super::store::Store;
33use crate :: bee_msg:: misc:: AuthenticateChannel ;
44use crate :: bee_msg:: { Header , Msg , deserialize_body, deserialize_header, serialize} ;
55use crate :: bee_serde:: { Deserializable , Serializable } ;
6+ use crate :: conn:: TCP_BUF_LEN ;
67use crate :: conn:: store:: StoredStream ;
78use crate :: conn:: stream:: Stream ;
89use crate :: types:: { AuthSecret , Uid } ;
@@ -52,9 +53,7 @@ impl Pool {
5253 let mut buf = self . store . pop_buf_or_create ( ) ;
5354
5455 let msg_len = serialize ( msg, & mut buf) ?;
55- let resp_header = self
56- . comm_stream ( node_uid, & mut buf[ 0 ..msg_len] , true )
57- . await ?;
56+ let resp_header = self . comm_stream ( node_uid, & mut buf, msg_len, true ) . await ?;
5857 let resp_msg = deserialize_body ( & resp_header, & buf[ Header :: LEN ..] ) ?;
5958
6059 self . store . push_buf ( buf) ;
@@ -71,8 +70,7 @@ impl Pool {
7170 let mut buf = self . store . pop_buf_or_create ( ) ;
7271
7372 let msg_len = serialize ( msg, & mut buf) ?;
74- self . comm_stream ( node_uid, & mut buf[ 0 ..msg_len] , false )
75- . await ?;
73+ self . comm_stream ( node_uid, & mut buf, msg_len, false ) . await ?;
7674
7775 self . store . push_buf ( buf) ;
7876
@@ -95,12 +93,15 @@ impl Pool {
9593 & self ,
9694 node_uid : Uid ,
9795 buf : & mut [ u8 ] ,
96+ send_len : usize ,
9897 expect_response : bool ,
9998 ) -> Result < Header > {
99+ debug_assert_eq ! ( buf. len( ) , TCP_BUF_LEN ) ;
100+
100101 // 1. Pop open streams until communication succeeds or none are left
101102 while let Some ( stream) = self . store . try_pop_stream ( node_uid) {
102103 match self
103- . write_and_read_stream ( buf, stream, expect_response)
104+ . write_and_read_stream ( buf, stream, send_len , expect_response)
104105 . await
105106 {
106107 Ok ( header) => return Ok ( header) ,
@@ -147,7 +148,7 @@ impl Pool {
147148 // Communication using the newly opened stream should usually not fail. If
148149 // it does, abort. It might be better to just try the next address though.
149150 let resp_header = self
150- . write_and_read_stream ( buf, stream, expect_response)
151+ . write_and_read_stream ( buf, stream, send_len , expect_response)
151152 . await
152153 . with_context ( err_context) ?;
153154
@@ -166,7 +167,7 @@ impl Pool {
166167 let stream = self . store . pop_stream ( node_uid) . await ?;
167168
168169 let resp_header = self
169- . write_and_read_stream ( buf, stream, expect_response)
170+ . write_and_read_stream ( buf, stream, send_len , expect_response)
170171 . await
171172 . with_context ( || {
172173 format ! ( "Communication using existing stream to {node_uid:?} failed" )
@@ -181,9 +182,10 @@ impl Pool {
181182 & self ,
182183 buf : & mut [ u8 ] ,
183184 mut stream : StoredStream ,
185+ send_len : usize ,
184186 expect_response : bool ,
185187 ) -> Result < Header > {
186- stream. as_mut ( ) . write_all ( buf) . await ?;
188+ stream. as_mut ( ) . write_all ( & buf[ 0 ..send_len ] ) . await ?;
187189
188190 let header = if expect_response {
189191 // Read header
0 commit comments