Skip to content

Commit 60a56c5

Browse files
committed
Use stream in compression codecs
And not ByteBuf, better interop.
1 parent 2dd2d5e commit 60a56c5

File tree

8 files changed

+57
-62
lines changed

8 files changed

+57
-62
lines changed

src/main/java/com/rabbitmq/stream/compression/CommonsCompressCompressionCodecFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
* href="https://commons.apache.org/proper/commons-compress/">Apache Commons Compress</a>
2525
* implementations.
2626
*
27-
* The framed format is used for SNAPPY and LZ4.
27+
* <p>The framed format is used for SNAPPY and LZ4.
2828
*
29-
* All but ZSTD compression codecs are implemented in Commons Compress. The ZSTD codec
30-
* uses the <a href="https://github.com/luben/zstd-jni/">zstd-jni</a> library.
29+
* <p>All but ZSTD compression codecs are implemented in Commons Compress. The ZSTD codec uses the
30+
* <a href="https://github.com/luben/zstd-jni/">zstd-jni</a> library.
3131
*/
3232
public class CommonsCompressCompressionCodecFactory implements CompressionCodecFactory {
3333
private final CompressionCodec[] codecs = new CompressionCodec[5];

src/main/java/com/rabbitmq/stream/compression/CompressionCodec.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
package com.rabbitmq.stream.compression;
1616

17-
import io.netty.buffer.ByteBuf;
1817
import java.io.InputStream;
1918
import java.io.OutputStream;
2019

@@ -32,18 +31,18 @@ public interface CompressionCodec {
3231
/**
3332
* Creates an {@link OutputStream} to compress data.
3433
*
35-
* @param target {@link ByteBuf} to write compressed data to
34+
* @param target the stream where compressed data will end up
3635
* @return output stream to write plain data to
3736
*/
38-
OutputStream compress(ByteBuf target);
37+
OutputStream compress(OutputStream target);
3938

4039
/**
4140
* Creates an {@link InputStream} to read decompressed data from.
4241
*
43-
* @param source the {@link ByteBuf} to read compressed from
44-
* @return input stream to read decompressed from
42+
* @param source the stream to read compressed data from
43+
* @return input stream to read decompressed data from
4544
*/
46-
InputStream decompress(ByteBuf source);
45+
InputStream decompress(InputStream source);
4746

4847
/**
4948
* Return the code for this type of codec.

src/main/java/com/rabbitmq/stream/compression/CompressionCodecFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414

1515
package com.rabbitmq.stream.compression;
1616

17-
/**
18-
* Factory to create {@link CompressionCodec} instances.
19-
*/
17+
/** Factory to create {@link CompressionCodec} instances. */
2018
public interface CompressionCodecFactory {
2119

2220
/**
2321
* Get a compression codec for a given type of compression.
22+
*
2423
* @param compression the type of compression codec
2524
* @return the appropriate compression codec
2625
*/
2726
CompressionCodec get(Compression compression);
28-
}
27+
}

src/main/java/com/rabbitmq/stream/compression/CompressionUtils.java

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import com.github.luben.zstd.Zstd;
44
import com.github.luben.zstd.ZstdInputStream;
55
import com.github.luben.zstd.ZstdOutputStream;
6-
import io.netty.buffer.ByteBuf;
7-
import io.netty.buffer.ByteBufInputStream;
8-
import io.netty.buffer.ByteBufOutputStream;
96
import java.io.IOException;
107
import java.io.InputStream;
118
import java.io.OutputStream;
@@ -30,9 +27,7 @@
3027
import org.xerial.snappy.SnappyFramedInputStream;
3128
import org.xerial.snappy.SnappyFramedOutputStream;
3229

33-
/**
34-
* Implementation of {@link CompressionCodec}s.
35-
*/
30+
/** Implementation of {@link CompressionCodec}s. */
3631
public final class CompressionUtils {
3732

3833
private CompressionUtils() {}
@@ -45,18 +40,18 @@ public int maxCompressedLength(int sourceLength) {
4540
}
4641

4742
@Override
48-
public OutputStream compress(ByteBuf byteBuf) {
43+
public OutputStream compress(OutputStream outputStream) {
4944
try {
50-
return new GZIPOutputStream(new ByteBufOutputStream(byteBuf));
45+
return new GZIPOutputStream(outputStream);
5146
} catch (IOException e) {
5247
throw new CompressionException("Error while creating GZIP compression output stream", e);
5348
}
5449
}
5550

5651
@Override
57-
public InputStream decompress(ByteBuf byteBuf) {
52+
public InputStream decompress(InputStream inputStream) {
5853
try {
59-
return new GZIPInputStream(new ByteBufInputStream(byteBuf));
54+
return new GZIPInputStream(inputStream);
6055
} catch (IOException e) {
6156
throw new CompressionException("Error while creating GZIP compression input stream", e);
6257
}
@@ -81,18 +76,18 @@ public int maxCompressedLength(int sourceLength) {
8176
}
8277

8378
@Override
84-
public OutputStream compress(ByteBuf byteBuf) {
79+
public OutputStream compress(OutputStream outputStream) {
8580
try {
86-
return new ZstdOutputStream(new ByteBufOutputStream(byteBuf));
81+
return new ZstdOutputStream(outputStream);
8782
} catch (IOException e) {
8883
throw new CompressionException("Error while creating Zstd compression output stream", e);
8984
}
9085
}
9186

9287
@Override
93-
public InputStream decompress(ByteBuf byteBuf) {
88+
public InputStream decompress(InputStream inputStream) {
9489
try {
95-
return new ZstdInputStream(new ByteBufInputStream(byteBuf));
90+
return new ZstdInputStream(inputStream);
9691
} catch (IOException e) {
9792
throw new CompressionException("Error while creating Zstd compression input stream", e);
9893
}
@@ -120,19 +115,18 @@ public int maxCompressedLength(int sourceLength) {
120115
}
121116

122117
@Override
123-
public OutputStream compress(ByteBuf byteBuf) {
118+
public OutputStream compress(OutputStream outputStream) {
124119
try {
125-
return new LZ4FrameOutputStream(
126-
new ByteBufOutputStream(byteBuf), BLOCKSIZE.SIZE_64KB, DEFAULT_FEATURES);
120+
return new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB, DEFAULT_FEATURES);
127121
} catch (IOException e) {
128122
throw new CompressionException("Error while creating LZ4 compression output stream", e);
129123
}
130124
}
131125

132126
@Override
133-
public InputStream decompress(ByteBuf byteBuf) {
127+
public InputStream decompress(InputStream inputStream) {
134128
try {
135-
return new LZ4FrameInputStream(new ByteBufInputStream(byteBuf));
129+
return new LZ4FrameInputStream(inputStream);
136130
} catch (IOException e) {
137131
throw new CompressionException("Error while creating LZ4 compression input stream", e);
138132
}
@@ -157,18 +151,18 @@ public int maxCompressedLength(int sourceLength) {
157151
}
158152

159153
@Override
160-
public OutputStream compress(ByteBuf byteBuf) {
154+
public OutputStream compress(OutputStream outputStream) {
161155
try {
162-
return new SnappyFramedOutputStream(new ByteBufOutputStream(byteBuf));
156+
return new SnappyFramedOutputStream(outputStream);
163157
} catch (IOException e) {
164158
throw new CompressionException("Error while creating snappy compression output stream", e);
165159
}
166160
}
167161

168162
@Override
169-
public InputStream decompress(ByteBuf byteBuf) {
163+
public InputStream decompress(InputStream inputStream) {
170164
try {
171-
return new SnappyFramedInputStream(new ByteBufInputStream(byteBuf));
165+
return new SnappyFramedInputStream(inputStream);
172166
} catch (IOException e) {
173167
throw new CompressionException("Error while creating snappy compression input stream", e);
174168
}
@@ -193,18 +187,18 @@ public int maxCompressedLength(int sourceLength) {
193187
}
194188

195189
@Override
196-
public OutputStream compress(ByteBuf byteBuf) {
190+
public OutputStream compress(OutputStream outputStream) {
197191
try {
198-
return new GzipCompressorOutputStream(new ByteBufOutputStream(byteBuf));
192+
return new GzipCompressorOutputStream(outputStream);
199193
} catch (IOException e) {
200194
throw new CompressionException("Error while creating GZIP compression output stream", e);
201195
}
202196
}
203197

204198
@Override
205-
public InputStream decompress(ByteBuf byteBuf) {
199+
public InputStream decompress(InputStream inputStream) {
206200
try {
207-
return new GzipCompressorInputStream(new ByteBufInputStream(byteBuf));
201+
return new GzipCompressorInputStream(inputStream);
208202
} catch (IOException e) {
209203
throw new CompressionException("Error while creating GZIP compression input stream", e);
210204
}
@@ -230,21 +224,19 @@ public int maxCompressedLength(int sourceLength) {
230224
}
231225

232226
@Override
233-
public OutputStream compress(ByteBuf byteBuf) {
227+
public OutputStream compress(OutputStream outputStream) {
234228
try {
235-
return new FramedSnappyCompressorOutputStream(new ByteBufOutputStream(byteBuf));
229+
return new FramedSnappyCompressorOutputStream(outputStream);
236230
} catch (IOException e) {
237231
throw new CompressionException("Error while creating Snappy compression output stream", e);
238232
}
239233
}
240234

241235
@Override
242-
public InputStream decompress(ByteBuf byteBuf) {
236+
public InputStream decompress(InputStream inputStream) {
243237
try {
244238
return new FramedSnappyCompressorInputStream(
245-
new ByteBufInputStream(byteBuf),
246-
SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE,
247-
FramedSnappyDialect.STANDARD);
239+
inputStream, SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE, FramedSnappyDialect.STANDARD);
248240
} catch (IOException e) {
249241
throw new CompressionException("Error while creating Snappy compression input stream", e);
250242
}
@@ -272,18 +264,18 @@ public int maxCompressedLength(int sourceLength) {
272264
}
273265

274266
@Override
275-
public OutputStream compress(ByteBuf byteBuf) {
267+
public OutputStream compress(OutputStream outputStream) {
276268
try {
277-
return new FramedLZ4CompressorOutputStream(new ByteBufOutputStream(byteBuf), DEFAULT);
269+
return new FramedLZ4CompressorOutputStream(outputStream, DEFAULT);
278270
} catch (IOException e) {
279271
throw new CompressionException("Error while creating LZ4 compression output stream", e);
280272
}
281273
}
282274

283275
@Override
284-
public InputStream decompress(ByteBuf byteBuf) {
276+
public InputStream decompress(InputStream inputStream) {
285277
try {
286-
return new FramedLZ4CompressorInputStream(new ByteBufInputStream(byteBuf));
278+
return new FramedLZ4CompressorInputStream(inputStream);
287279
} catch (IOException e) {
288280
throw new CompressionException("Error while creating LZ4 compression input stream", e);
289281
}
@@ -308,18 +300,18 @@ public int maxCompressedLength(int sourceLength) {
308300
}
309301

310302
@Override
311-
public OutputStream compress(ByteBuf byteBuf) {
303+
public OutputStream compress(OutputStream outputStream) {
312304
try {
313-
return new ZstdCompressorOutputStream(new ByteBufOutputStream(byteBuf));
305+
return new ZstdCompressorOutputStream(outputStream);
314306
} catch (IOException e) {
315307
throw new CompressionException("Error while creating Zstd compression output stream", e);
316308
}
317309
}
318310

319311
@Override
320-
public InputStream decompress(ByteBuf byteBuf) {
312+
public InputStream decompress(InputStream inputStream) {
321313
try {
322-
return new ZstdCompressorInputStream(new ByteBufInputStream(byteBuf));
314+
return new ZstdCompressorInputStream(inputStream);
323315
} catch (IOException e) {
324316
throw new CompressionException("Error while creating Zstd compression input stream", e);
325317
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import io.netty.bootstrap.Bootstrap;
7474
import io.netty.buffer.ByteBuf;
7575
import io.netty.buffer.ByteBufAllocator;
76+
import io.netty.buffer.ByteBufOutputStream;
7677
import io.netty.channel.Channel;
7778
import io.netty.channel.ChannelFuture;
7879
import io.netty.channel.ChannelHandlerContext;
@@ -1543,7 +1544,7 @@ public void add(Codec.EncodedMessage encodedMessage) {
15431544
public void close() {
15441545
int maxCompressedLength = codec.maxCompressedLength(this.uncompressedByteSize);
15451546
this.buffer = allocator.buffer(maxCompressedLength);
1546-
OutputStream outputStream = this.codec.compress(buffer);
1547+
OutputStream outputStream = this.codec.compress(new ByteBufOutputStream(buffer));
15471548
try {
15481549
for (int i = 0; i < messages.size(); i++) {
15491550
final int size = messages.get(i).getSize();

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import com.rabbitmq.stream.impl.Client.SubscriptionOffset;
6565
import com.rabbitmq.stream.metrics.MetricsCollector;
6666
import io.netty.buffer.ByteBuf;
67+
import io.netty.buffer.ByteBufInputStream;
6768
import io.netty.channel.ChannelHandlerContext;
6869
import io.netty.handler.timeout.IdleStateHandler;
6970
import java.io.IOException;
@@ -393,7 +394,7 @@ static int handleDeliver(
393394
CompressionCodec compressionCodec = client.compressionCodecFactory.get(comp);
394395
ByteBuf outBb = client.channel.alloc().heapBuffer(uncompressedDataSize);
395396
ByteBuf slice = message.slice(message.readerIndex(), dataSize);
396-
InputStream inputStream = compressionCodec.decompress(slice);
397+
InputStream inputStream = compressionCodec.decompress(new ByteBufInputStream(slice));
397398
byte[] inBuffer = new byte[uncompressedDataSize < 1024 ? uncompressedDataSize : 1024];
398399
int n;
399400
try {

src/test/java/com/rabbitmq/stream/benchmark/CompressDecompressBenchmark.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.rabbitmq.stream.compression.CompressionCodec;
1818
import io.netty.buffer.ByteBuf;
1919
import io.netty.buffer.ByteBufAllocator;
20+
import io.netty.buffer.ByteBufInputStream;
21+
import io.netty.buffer.ByteBufOutputStream;
2022
import java.io.InputStream;
2123
import java.io.OutputStream;
2224
import java.nio.charset.StandardCharsets;
@@ -87,7 +89,7 @@ public void setUp() throws Exception {
8789

8890
int maxCompressedLength = codec.maxCompressedLength(plainData.length);
8991
ByteBuf bb = allocator.heapBuffer(maxCompressedLength);
90-
OutputStream compress = codec.compress(bb);
92+
OutputStream compress = codec.compress(new ByteBufOutputStream(bb));
9193
compress.write(plainData);
9294
compress.flush();
9395
compress.close();
@@ -102,7 +104,7 @@ public void setUp() throws Exception {
102104
public void compress() throws Exception {
103105
int maxCompressedLength = codec.maxCompressedLength(plainData.length);
104106
ByteBuf bb = allocator.buffer(maxCompressedLength);
105-
OutputStream compress = codec.compress(bb);
107+
OutputStream compress = codec.compress(new ByteBufOutputStream(bb));
106108
compress.write(plainData);
107109
compress.flush();
108110
compress.close();
@@ -113,7 +115,7 @@ public void compress() throws Exception {
113115
public void decodeReadBytePerByte() throws Exception {
114116
compressedDataBb.readerIndex(0);
115117
ByteBuf outBb = allocator.buffer(compressedData.length);
116-
InputStream inputStream = codec.decompress(compressedDataBb);
118+
InputStream inputStream = codec.decompress(new ByteBufInputStream(compressedDataBb));
117119
int n;
118120
while (-1 != (n = inputStream.read())) {
119121
outBb.writeByte(n);
@@ -134,7 +136,7 @@ public void decodeReadBytePerByte() throws Exception {
134136
@Benchmark
135137
public void decodePreAllocatedArray() throws Exception {
136138
compressedDataBb.readerIndex(0);
137-
InputStream inputStream = codec.decompress(compressedDataBb);
139+
InputStream inputStream = codec.decompress(new ByteBufInputStream(compressedDataBb));
138140
ByteBuf outBb = allocator.buffer(plainData.length);
139141
byte[] inBuffer = new byte[compressedData.length];
140142
int n;
@@ -157,7 +159,7 @@ public void decodePreAllocatedArray() throws Exception {
157159
@Benchmark
158160
public void decodeWriteInPreAllocatedHeapByteBuf() throws Exception {
159161
compressedDataBb.readerIndex(0);
160-
InputStream inputStream = codec.decompress(compressedDataBb);
162+
InputStream inputStream = codec.decompress(new ByteBufInputStream(compressedDataBb));
161163
ByteBuf outBb = allocator.heapBuffer(plainData.length);
162164
inputStream.read(outBb.array(), 0, plainData.length);
163165

src/test/java/com/rabbitmq/stream/impl/CompressionCodecsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.rabbitmq.stream.impl.Client.EncodedMessageBatch;
3131
import io.netty.buffer.ByteBuf;
3232
import io.netty.buffer.ByteBufAllocator;
33+
import io.netty.buffer.ByteBufInputStream;
3334
import java.io.IOException;
3435
import java.io.InputStream;
3536
import java.nio.charset.StandardCharsets;
@@ -98,7 +99,7 @@ void compressedEncodedMessageBatch(
9899

99100
ByteBuf outBb = allocator.buffer(plainSize);
100101
destinationBb.readerIndex(0);
101-
InputStream inputStream = decompressionCodec.decompress(destinationBb);
102+
InputStream inputStream = decompressionCodec.decompress(new ByteBufInputStream(destinationBb));
102103
int n;
103104
while (-1 != (n = inputStream.read())) {
104105
outBb.writeByte(n);

0 commit comments

Comments
 (0)