@@ -24,8 +24,8 @@ Bytes SerializationBuffer::serializeSingleBoolToBytes(bool value) {
2424 return Bytes ((const char *)existsValue, 2 );
2525}
2626
27- void SerializationBuffer ::compress () {
28- if (m_last_compression_point == m_size ) {
27+ void SerializationBufferBlock ::compress () {
28+ if (m_compressed ) {
2929 return ;
3030 }
3131
@@ -37,7 +37,7 @@ void SerializationBuffer::compress() {
3737
3838 // replace the data we have here with a block of 4 bytes of size of compressed data and
3939 // then the data stream
40- size_t bytesRequired = LZ4F_compressFrameBound (m_size - m_last_compression_point , &lz4Prefs);
40+ size_t bytesRequired = LZ4F_compressFrameBound (m_size, &lz4Prefs);
4141
4242 void * compressedBytes = malloc (bytesRequired);
4343
@@ -49,26 +49,64 @@ void SerializationBuffer::compress() {
4949 compressedBytecount = LZ4F_compressFrame (
5050 compressedBytes,
5151 bytesRequired,
52- m_buffer + m_last_compression_point ,
53- m_size - m_last_compression_point ,
52+ m_buffer,
53+ m_size,
5454 &lz4Prefs
5555 );
5656
5757 if (LZ4F_isError (compressedBytecount)) {
58+ free (compressedBytes);
59+
5860 throw std::runtime_error (
5961 std::string (" Error compressing data using LZ4: " )
6062 + LZ4F_getErrorName (compressedBytecount)
6163 );
6264 }
6365 }
6466
65- m_size = m_last_compression_point ;
67+ m_size = 0 ;
6668
6769 write<uint32_t >(compressedBytecount);
6870
69- write_bytes ((uint8_t *)compressedBytes, compressedBytecount, false );
71+ write_bytes ((uint8_t *)compressedBytes, compressedBytecount);
7072
7173 free (compressedBytes);
7274
73- m_last_compression_point = m_size;
75+ m_compressed = true ;
76+ }
77+
78+ void SerializationBuffer::consolidate () {
79+ if (m_wants_compress) {
80+ for (auto blockPtr: m_blocks) {
81+ blockPtr->compress ();
82+ }
83+ }
84+
85+ if (m_blocks.size () == 1 ) {
86+ return ;
87+ }
88+
89+ size_t totalSize = 0 ;
90+
91+ for (auto blockPtr: m_blocks) {
92+ totalSize += blockPtr->size ();
93+ }
94+
95+ SerializationBufferBlock* block = new SerializationBufferBlock ();
96+ block->ensure (totalSize);
97+
98+ for (auto blockPtr: m_blocks) {
99+ block->write_bytes (blockPtr->buffer (), blockPtr->size ());
100+ }
101+
102+ if (m_wants_compress) {
103+ block->markCompressed ();
104+ }
105+
106+ m_blocks.clear ();
107+ m_blocks.push_back (
108+ std::shared_ptr<SerializationBufferBlock>(
109+ block
110+ )
111+ );
74112}
0 commit comments