Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit a9e25ab

Browse files
authored
Merge pull request #592 from tgroh/backport_bzip_master_1x
Backport apache/beam #3669
2 parents 7a2cdb8 + 7201275 commit a9e25ab

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public boolean matches(String fileName) {
144144
public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
145145
throws IOException {
146146
return Channels.newChannel(
147-
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
147+
new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
148148
}
149149
};
150150

sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,38 @@ public void testReadConcatenatedGzip() throws IOException {
203203
p.run();
204204
}
205205

206+
/**
207+
* Test a bzip2 file containing multiple streams is correctly decompressed.
208+
*
209+
* <p>A bzip2 file may contain multiple streams and should decompress as the concatenation of
210+
* those streams.
211+
*/
212+
@Test
213+
public void testReadMultiStreamBzip2() throws IOException {
214+
CompressionMode mode = CompressionMode.BZIP2;
215+
byte[] input1 = generateInput(5, 587973);
216+
byte[] input2 = generateInput(5, 387374);
217+
218+
ByteArrayOutputStream stream1 = new ByteArrayOutputStream();
219+
try (OutputStream os = getOutputStreamForMode(mode, stream1)) {
220+
os.write(input1);
221+
}
222+
223+
ByteArrayOutputStream stream2 = new ByteArrayOutputStream();
224+
try (OutputStream os = getOutputStreamForMode(mode, stream2)) {
225+
os.write(input2);
226+
}
227+
228+
File tmpFile = tmpFolder.newFile();
229+
try (OutputStream os = new FileOutputStream(tmpFile)) {
230+
os.write(stream1.toByteArray());
231+
os.write(stream2.toByteArray());
232+
}
233+
234+
byte[] output = Bytes.concat(input1, input2);
235+
verifyReadContents(output, tmpFile, mode);
236+
}
237+
206238
/**
207239
* Test reading empty input with bzip2.
208240
*/
@@ -416,7 +448,14 @@ public void populateDisplayData(DisplayData.Builder builder) {
416448
*/
417449
private byte[] generateInput(int size) {
418450
// Arbitrary but fixed seed
419-
Random random = new Random(285930);
451+
return generateInput(size, 285930);
452+
}
453+
454+
/**
455+
* Generate byte array of given size.
456+
*/
457+
private byte[] generateInput(int size, int seed) {
458+
Random random = new Random(seed);
420459
byte[] buff = new byte[size];
421460
random.nextBytes(buff);
422461
return buff;

0 commit comments

Comments
 (0)