@@ -74,6 +74,8 @@ uint32_t TransportMessage::fillData(const uint8_t* data, uint32_t length)
7474 }
7575 if (data) {
7676 memcpy (m_buffer.get () + m_receivedBytes, data, length);
77+ } else {
78+ length = 0 ;
7779 }
7880 m_receivedBytes += length;
7981 return length;
@@ -157,10 +159,19 @@ void TransportSession::sendData(TransportData data)
157159 return ;
158160 }
159161 auto self (shared_from_this ());
160- m_service->post (boost::bind (&TransportSession::sendHandler, self, data));
162+ m_service->post (boost::bind (&TransportSession::prepareSend, self, data));
163+ }
164+
165+ void TransportSession::prepareSend (TransportData data)
166+ {
167+ // Only access m_sendQueue in IO service thread.
168+ m_sendQueue.push (data);
169+ if (m_sendQueue.size () == 1 ) {
170+ sendHandler ();
171+ }
161172}
162173
163- void TransportSession::sendHandler (TransportData data )
174+ void TransportSession::sendHandler ()
164175{
165176 if (m_isClosed) {
166177 return ;
@@ -169,7 +180,11 @@ void TransportSession::sendHandler(TransportData data)
169180 ELOG_WARN (" sendHandler: socket is not open" );
170181 return ;
171182 }
183+ if (m_sendQueue.empty ()) {
184+ return ;
185+ }
172186
187+ TransportData data = m_sendQueue.front ();
173188 TransportMessage toSend (data.buffer .get (), data.length );
174189 TransportData wrappedData{toSend.messageData (),
175190 toSend.messageLength ()};
@@ -197,13 +212,18 @@ void TransportSession::writeHandler(
197212 const boost::system::error_code& ec,
198213 std::size_t bytes)
199214{
215+ assert (m_sendQueue.size () > 0 );
216+ m_sendQueue.pop ();
200217 if (ec) {
201218 ELOG_DEBUG (" Error writing data: %s" , ec.message ().c_str ());
202219 if (!m_isClosed) {
203220 m_isClosed = true ;
204221 // Notify the listener about the socket error if the listener is not closing me.
205222 m_listener->onClose (m_id);
206223 }
224+ } else {
225+ ELOG_DEBUG (" Wrote data: %zu" , bytes);
226+ sendHandler ();
207227 }
208228}
209229
@@ -254,6 +274,7 @@ void TransportSession::receiveData()
254274
255275 uint32_t bytesToRead = m_receivedMessage.missingBytes ();
256276 assert (bytesToRead > 0 );
277+
257278 if (bytesToRead > m_receivedBufferSize) {
258279 // Double the received buffer size
259280 m_receivedBufferSize = std::max (m_receivedBufferSize * 2 , bytesToRead);
@@ -286,7 +307,10 @@ void TransportSession::readHandler(
286307 if (!ec || ec.value () == boost::asio::error::message_size) {
287308 uint32_t bytesToRead = m_receivedMessage.missingBytes ();
288309 assert (bytesToRead >= bytes);
289- m_receivedMessage.fillData (m_receivedBuffer.get (), bytes);
310+ uint32_t filled = m_receivedMessage.fillData (m_receivedBuffer.get (), bytes);
311+ if (filled != bytes) {
312+ ELOG_WARN (" Message fill length %u, %zu\n " , filled, bytes);
313+ }
290314 receiveData ();
291315 } else {
292316 if (ec.value () != boost::system::errc::operation_canceled &&
0 commit comments