diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index b53e80a..5298499 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -26,7 +26,7 @@ jobs: - 8.4 phpunit-versions: ['latest'] env: - extensions: mbstring, intl, xml, gmp, bcmath, snappy-kjdev/php-ext-snappy@0.2.2 + extensions: mbstring, intl, xml, gmp, bcmath, snappy-kjdev/php-ext-snappy@0.2.2, zstd key: extcache-v1 steps: diff --git a/composer.json b/composer.json index ca99130..4cf1acf 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,8 @@ "packaged/thrift": "^0.16.0" }, "suggest" : { - "ext-snappy" : "Install/compile snappy extension to get support for snappy compression reading/writing" + "ext-snappy" : "Install/compile snappy extension to get support for snappy compression reading/writing", + "ext-zstd" : "Install/compile zstd extension to get support for zstd compression reading/writing" }, "autoload": { "psr-4": { diff --git a/src/CompressionMethod.php b/src/CompressionMethod.php index c17e88a..b0f7e72 100644 --- a/src/CompressionMethod.php +++ b/src/CompressionMethod.php @@ -6,4 +6,5 @@ class CompressionMethod public const None = 0; public const Gzip = 1; public const Snappy = 2; + public const Zstd = 6; } diff --git a/src/ZstdStreamWrapper.php b/src/ZstdStreamWrapper.php new file mode 100644 index 0000000..0838196 --- /dev/null +++ b/src/ZstdStreamWrapper.php @@ -0,0 +1,419 @@ + [ + 'leave_open' => $leaveOpen, + 'compression_mode' => $compressionMode, + 'compression_level' => static::DEFAULT_COMPRESSION_LEVEL, + ] + ]); + return fopen('zstd://'.$stream, $mode, false, $context); + } + + /** + * [public description] + * @var resource + */ + public $context; + + /** + * [register description] + */ + public static function register(): void { + $wrapperExists = in_array("zstd", stream_get_wrappers()); + if ($wrapperExists) { + // stream_wrapper_unregister("gzip"); + } else { + stream_wrapper_register('zstd', static::class); + } + } + + /** + * inner stream + * @var resource + */ + protected $parent = null; + + /** + * Whether to leave the underlying stream open on stream close + * @var bool + */ + protected $leaveOpen = false; + + /** + * GZ/ZLIB compression level + * @var int + */ + protected $compressionLevel = 9; + + /** + * @inheritDoc + */ + public function __construct() + { + // throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function __destruct() + { + // throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function dir_closedir(): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function dir_opendir(string $path, int $options): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function dir_readdir(): string + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function mkdir(string $path, int $mode, int $options): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function rename(string $path_from, string $path_to): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function dir_rewinddir(): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function rmdir(string $path, int $options): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_cast(int $cast_as) + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function url_stat(string $path, int $flags): array + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function unlink(string $path): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_write(string $data): int + { + return fwrite($this->ms, $data); + } + + /** + * @inheritDoc + */ + public function stream_truncate(int $new_size): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_tell(): int + { + return ftell($this->ms); + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_stat(): array + { + return fstat($this->parent); + } + + /** + * @inheritDoc + */ + public function stream_set_option(int $option, int $arg1, int $arg2): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_seek(int $offset, int $whence = SEEK_SET): bool + { + return fseek($this->ms, $offset, $whence) === 0; + // return true; + // throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_read(int $count): string + { + return fread($this->ms, $count); + } + + protected static $prefix = 'zstd://'; + + /** + * [protected description] + * @var int + */ + protected $compressionMode; + + /** + * [protected description] + * @var resource + */ + protected $ms; + + /** + * Flag to track if compression has been written + * @var bool + */ + protected $finishedForWriting = false; + + /** + * @inheritDoc + */ + public function stream_open( + string $path, + string $mode, + int $options, + ?string &$opened_path + ): bool { + if(strpos($path, 'Resource id #') === strlen(static::$prefix)) { + // stringified resource identifier, get as handle via get_resource + $resourceId = explode('#', $path)[1]; + $this->parent = get_resources()[$resourceId]; + + // Passed via stream option + $this->leaveOpen = stream_context_get_options($this->context)['zstd']['leave_open'] ?? false; + $this->compressionMode = stream_context_get_options($this->context)['zstd']['compression_mode'] ?? null; + $this->compressionLevel = stream_context_get_options($this->context)['zstd']['compression_level'] ?? 6; // Default fallback: 6 (common default) + + if($this->compressionMode === null) { + throw new Exception('Compression mode undefined'); + } + + if($this->compressionMode === static::MODE_COMPRESS) { + $this->ms = fopen('php://memory', 'r+'); + } else { + $this->ms = $this->decompressFromStream($this->parent); + } + } else { + // non-resource, manually fopen? + throw new Exception('unsupported'); + } + + return true; + } + + /** + * [MODE_DECOMPRESS description] + * @var int + */ + const MODE_DECOMPRESS = 0; + + /** + * [MODE_COMPRESS description] + * @var int + */ + const MODE_COMPRESS = 1; + + /** + * [decompressFromStream description] + * @param resource $source [description] + * @return resource [description] + */ + protected function decompressFromStream($source) + { + $content = stream_get_contents($source); + $decompressed = zstd_uncompress($content); + $handle = fopen('php://memory', 'r+'); + fwrite($handle, $decompressed); + fflush($handle); + fseek($handle, 0); + return $handle; + } + + /** + * Writes out the final, compressed stream + */ + protected function writeCompressedStream(): void { + if($this->finishedForWriting) return; + + if($this->compressionMode === static::MODE_COMPRESS) { + $uncompressedLength = fstat($this->ms)['size']; + + $compressed = zstd_compress(stream_get_contents($this->ms, $uncompressedLength, 0), $this->compressionLevel); + + $compressedSize = strlen($compressed); + fseek($this->parent, 0); + fwrite($this->parent, $compressed); + fflush($this->parent); + // + // TODO: some error handling + // + } + + $this->finishedForWriting = true; + } + + /** + * @inheritDoc + */ + public function MarkWriteFinished(): void + { + $this->writeCompressedStream(); + } + + /** + * @inheritDoc + */ + public function stream_metadata( + string $path, + int $option, + $value + ): bool { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_lock(int $operation): bool + { + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_flush(): bool + { + // + // TODO: check, what we should do here... + // We might flush $this->ms + // + return false; + throw new \LogicException('Not implemented'); // TODO + } + + /** + * @inheritDoc + */ + public function stream_eof(): bool + { + return feof($this->ms); // ? IDEA: maybe depending on mode? + // return false; + // throw new \LogicException('Not implemented'); // TODO + // return feof($this->parent); + } + + /** + * @inheritDoc + */ + public function stream_close(): void + { + // + // Write out (compress!) on close + // As long as not closed, we delay compression until closing of this stream. + // + $this->writeCompressedStream(); + + // NOT SURE ABOUT THIS THING. + if(!$this->leaveOpen) { + if(is_resource($this->parent)) { + fclose($this->parent); + } else { + // Resource/stream already closed/disposed + // QUESTION: Should we throw an exception? + // throw new Exception('Inner stream already closed or freed'); + } + } + + // QUESTION should we do this here? + // We should kick out the reference for the GC to work... + // NOTE/SEE: https://stackoverflow.com/questions/28195655/php-garbage-collection-when-using-static-method-to-create-instance + // Different behaviour regarding resources in PHP than regular variables/members. + // This seems to work for now, but I'm unsure about using WeakRef (pecl ext) or WeakReference (PHP 7.4+) in favor. + // + $this->parent = null; + } +} diff --git a/src/file/DataStreamFactory.php b/src/file/DataStreamFactory.php index 7e450ae..e077b0e 100644 --- a/src/file/DataStreamFactory.php +++ b/src/file/DataStreamFactory.php @@ -7,6 +7,7 @@ use codename\parquet\CompressionMethod; use codename\parquet\GzipStreamWrapper; use codename\parquet\SnappyInMemoryStreamWrapper; +use codename\parquet\ZstdStreamWrapper; use codename\parquet\format\CompressionCodec; @@ -21,6 +22,7 @@ class DataStreamFactory CompressionCodec::UNCOMPRESSED => 'none', CompressionCodec::GZIP => 'gzip', CompressionCodec::SNAPPY => 'snappy', + CompressionCodec::ZSTD => 'zstd', ]; /** @@ -30,6 +32,7 @@ public static function registerStreamWrappers(): void { GapStreamWrapper::register(); GzipStreamWrapper::register(); SnappyInMemoryStreamWrapper::register(); + ZstdStreamWrapper::register(); } /** @@ -57,6 +60,11 @@ public static function CreateWriter( $leaveNakedOpen = false; break; + case CompressionMethod::Zstd: + $dest = ZstdStreamWrapper::createWrappedStream($nakedStream, 'r+', ZstdStreamWrapper::MODE_COMPRESS); + $leaveNakedOpen = false; + break; + case CompressionMethod::None: $dest = $nakedStream; break; @@ -148,6 +156,15 @@ public static function ReadPageData( $data = $uncompressedData; break; + case 'zstd': + $uncompressedData = zstd_uncompress($data); + if($uncompressedData === false) { + throw new \Exception('Decompression error (zstd)'); + } + + $data = $uncompressedData; + break; + default: throw new \Exception('unsupported compression method '. $compressionMethod); } diff --git a/tests/IssueTest.php b/tests/IssueTest.php index 81ceee1..f2a9686 100644 --- a/tests/IssueTest.php +++ b/tests/IssueTest.php @@ -10,13 +10,10 @@ class IssueTest extends TestBase * Tests a specific issue of parquet-dotnet * to be or not to be present in our package * @see https://github.com/aloneguid/parquet-dotnet/issues/81 + * @requires extension snappy */ public function testParquetDotnetIssue81(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - $reader = new ParquetReader($this->openTestFile('issues/parquet-dotnet_81_floats.parquet')); $dataColumns = $reader->ReadEntireRowGroup(); $values = $dataColumns[0]->getData(); diff --git a/tests/ParquetReaderTest.php b/tests/ParquetReaderTest.php index e4c271b..979f5c3 100644 --- a/tests/ParquetReaderTest.php +++ b/tests/ParquetReaderTest.php @@ -5,6 +5,12 @@ use Exception; use codename\parquet\ParquetReader; +use codename\parquet\ParquetWriter; +use codename\parquet\CompressionMethod; + +use codename\parquet\data\Schema; +use codename\parquet\data\DataField; +use codename\parquet\data\DataColumn; use codename\parquet\exception\ArgumentNullException; @@ -228,11 +234,9 @@ public function testReadMultiPageDictionaryWithNulls(): void // /** * [testReadBitPackedAtPageBoundary description] + * @requires extension snappy */ public function testReadBitPackedAtPageBoundary(): void { - if(!extension_loaded('snappy')) { - static::markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('special/multi_page_bit_packed_near_page_border.parquet')); $columns = $reader->ReadEntireRowGroup(); $data = $columns[0]->getData(); @@ -355,11 +359,9 @@ public function testReadLargeTimestampData(): void /** * [testReadEmptyColumn description] + * @requires extension snappy */ public function testReadEmptyColumn(): void { - if(!extension_loaded('snappy')) { - static::markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('emptycolumn.parquet')); $columns = $reader->ReadEntireRowGroup(); $col0 = $columns[0]->getData(); @@ -370,4 +372,75 @@ public function testReadEmptyColumn(): void { } } + /** + * Test reading zstd-compressed parquet file + * @requires extension zstd + */ + public function testReadZstdCompressedFile(): void { + // First, write a zstd-compressed file + $ms = fopen('php://memory', 'r+'); + $id = DataField::createFromType('id', 'integer'); + $name = DataField::createFromType('name', 'string'); + $value = DataField::createFromType('value', 'double'); + + $writer = new ParquetWriter(new Schema([$id, $name, $value]), $ms, null, false, CompressionMethod::Zstd); + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 1, 2, 3, 4, 5 ])); + $rg->WriteColumn(new DataColumn($name, [ 'Alice', 'Bob', 'Charlie', 'David', 'Eve' ])); + $rg->WriteColumn(new DataColumn($value, [ 1.1, 2.2, 3.3, 4.4, 5.5 ])); + $rg->finish(); + $writer->finish(); + + // Now read it back + fseek($ms, 0); + $reader = new ParquetReader($ms); + $columns = $reader->ReadEntireRowGroup(); + + $this->assertEquals([ 1, 2, 3, 4, 5 ], $columns[0]->getData()); + $this->assertEquals([ 'Alice', 'Bob', 'Charlie', 'David', 'Eve' ], $columns[1]->getData()); + $this->assertEquals([ 1.1, 2.2, 3.3, 4.4, 5.5 ], $columns[2]->getData()); + } + + /** + * Test reading zstd-compressed parquet file with multiple row groups + * @requires extension zstd + */ + public function testReadZstdCompressedMultipleRowGroups(): void { + $ms = fopen('php://memory', 'r+'); + $id = DataField::createFromType('id', 'integer'); + + $writer = new ParquetWriter(new Schema([$id]), $ms, null, false, CompressionMethod::Zstd); + + // Write first row group + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 1, 2, 3 ])); + $rg->finish(); + + // Write second row group + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 4, 5, 6 ])); + $rg->finish(); + + // Write third row group + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 7, 8, 9 ])); + $rg->finish(); + + $writer->finish(); + + // Read back and verify + fseek($ms, 0); + $reader = new ParquetReader($ms); + $this->assertEquals(3, $reader->getRowGroupCount()); + + $rg = $reader->OpenRowGroupReader(0); + $this->assertEquals([ 1, 2, 3 ], $rg->ReadColumn($id)->getData()); + + $rg = $reader->OpenRowGroupReader(1); + $this->assertEquals([ 4, 5, 6 ], $rg->ReadColumn($id)->getData()); + + $rg = $reader->OpenRowGroupReader(2); + $this->assertEquals([ 7, 8, 9 ], $rg->ReadColumn($id)->getData()); + } + } diff --git a/tests/ParquetWriterTest.php b/tests/ParquetWriterTest.php index 4652383..c16b637 100644 --- a/tests/ParquetWriterTest.php +++ b/tests/ParquetWriterTest.php @@ -6,6 +6,7 @@ use codename\parquet\ParquetReader; use codename\parquet\ParquetWriter; +use codename\parquet\CompressionMethod; use codename\parquet\data\Schema; use codename\parquet\data\DataField; @@ -288,11 +289,74 @@ public function testWriteReadDataPageV2(): void { $rg = $reader->OpenRowGroupReader(0); $col = $rg->ReadColumn($id); - + $this->assertEquals(4, $rg->getRowCount()); $rg = $reader->OpenRowGroupReader(1); $this->assertEquals(2, $rg->getRowCount()); } + /** + * Test writing and reading data with Zstd compression + * @requires extension zstd + */ + public function testWriteReadWithZstdCompression(): void { + $ms = fopen('php://memory', 'r+'); + $id = DataField::createFromType('id', 'integer'); + $name = DataField::createFromType('name', 'string'); + + //write with zstd compression + $writer = new ParquetWriter(new Schema([$id, $name]), $ms, null, false, CompressionMethod::Zstd); + + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 1, 2, 3, 4 ])); + $rg->WriteColumn(new DataColumn($name, [ 'Alice', 'Bob', 'Charlie', 'David' ])); + $rg->finish(); + + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 5, 6 ])); + $rg->WriteColumn(new DataColumn($name, [ 'Eve', 'Frank' ])); + $rg->finish(); + + $writer->finish(); + + //read back + fseek($ms, 0); + $reader = new ParquetReader($ms); + $this->assertEquals(6, $reader->getThriftMetadata()->num_rows); + + $rg = $reader->OpenRowGroupReader(0); + $this->assertEquals(4, $rg->getRowCount()); + $this->assertEquals([ 1, 2, 3, 4 ], $rg->ReadColumn($id)->getData()); + $this->assertEquals([ 'Alice', 'Bob', 'Charlie', 'David' ], $rg->ReadColumn($name)->getData()); + + $rg = $reader->OpenRowGroupReader(1); + $this->assertEquals(2, $rg->getRowCount()); + $this->assertEquals([ 5, 6 ], $rg->ReadColumn($id)->getData()); + $this->assertEquals([ 'Eve', 'Frank' ], $rg->ReadColumn($name)->getData()); + } + + /** + * Test writing and reading nullable columns with Zstd compression + * @requires extension zstd + */ + public function testWriteReadNullableColumnWithZstdCompression(): void { + $id = DataField::createFromType('id', 'integer'); + $ms = fopen('php://memory', 'r+'); + + $writer = new ParquetWriter(new Schema([$id]), $ms, null, false, CompressionMethod::Zstd); + $rg = $writer->CreateRowGroup(); + $rg->WriteColumn(new DataColumn($id, [ 1, null, 2, null, 3 ])); + $rg->finish(); + $writer->finish(); + + fseek($ms, 0); + + $reader = new ParquetReader($ms); + $this->assertEquals(1, $reader->getRowGroupCount()); + $rg = $reader->OpenRowGroupReader(0); + $this->assertEquals(5, $rg->getRowCount()); + $this->assertEquals([ 1, null, 2, null, 3 ], $rg->ReadColumn($id)->getData()); + } + } diff --git a/tests/PhpArrayConversionTest.php b/tests/PhpArrayConversionTest.php index 0f86d21..d6eda6d 100644 --- a/tests/PhpArrayConversionTest.php +++ b/tests/PhpArrayConversionTest.php @@ -427,12 +427,9 @@ public function testReadWriteEmptyAndNullStrings(): void { * Tests reading a file re-written by spark * Which originates from a method below * @see testSuperComplexWriteRead + * @requires extension snappy */ public function testReadSparkRewrittenSuperComplex(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - // Re-written by spark $readerSpark = new ParquetReader($this->openTestFile('custom/supercomplex1.spark.parquet')); $rgsSpark = $readerSpark->ReadEntireRowGroup(); @@ -502,12 +499,9 @@ public function testReadSparkRewrittenSuperComplex(): void { * Tests reading a file re-written by spark * Which originates from a method below * @see testComplexStructsWriteRead + * @requires extension snappy */ public function testReadSparkRewrittenComplexStructs(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - // Re-written by spark $readerSpark = new ParquetReader($this->openTestFile('custom/complex_structs1.spark.parquet')); $rgsSpark = $readerSpark->ReadEntireRowGroup(); @@ -704,12 +698,9 @@ public function testReadDataColumnIterable(string $file, ?array $flags = null): /** * Tests reading and writing of nested lists + * @requires extension snappy */ public function testWriteNestedLists(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - $reader = new ParquetReader($this->openTestFile('nested_lists.snappy.parquet')); $rgs = $reader->ReadEntireRowGroup(); $arrayConverter = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -746,12 +737,9 @@ public function testWriteNestedLists(): void { /** * Tests reading and writing of nested structs + * @requires extension snappy */ public function testWriteNestedStructs(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - $reader = new ParquetReader($this->openTestFile('nested_structs.parquet')); $rgs = $reader->ReadEntireRowGroup(); $arrayConverter = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -792,12 +780,9 @@ public function testWriteNestedStructs(): void { /** * Tests reading and writing repeated structs * and some specialties + * @requires extension snappy */ public function testWriteRepeatedFieldsWithStructs(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - $reader = new ParquetReader($this->openTestFile('repeated_no_annotation.parquet')); $rgs = $reader->ReadEntireRowGroup(); $arrayConverter = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -831,12 +816,9 @@ public function testWriteRepeatedFieldsWithStructs(): void { /** * Tests reading and writing (purely) nested maps + * @requires extension snappy */ public function testWriteNestedMaps(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } - $reader = new ParquetReader($this->openTestFile('nested_maps.snappy.parquet')); $rgs = $reader->ReadEntireRowGroup(); $arrayConverter = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -1503,11 +1485,9 @@ public function testListEmptyAlt(): void { * Subject's field path is split.leftCategoriesOrThreshold.array * So there's no intermediary 'list' field * But the element itself is repeated + * @requires extension snappy */ public function testLegacyListOneArray(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('legacy-list-onearray.parquet')); $rgs = $reader->ReadEntireRowGroup(); $conv = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -1540,11 +1520,9 @@ public function testLegacyListOneArray(): void { /** * Tests some list columns w/ and w/o nulls + * @requires extension snappy */ public function testListColumns(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('list_columns.parquet')); $rgs = $reader->ReadEntireRowGroup(); $conv = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -1558,11 +1536,9 @@ public function testListColumns(): void { /** * Tests reading of (purely) nested lists (and their elements) + * @requires extension snappy */ public function testNestedLists(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('nested_lists.snappy.parquet')); $rgs = $reader->ReadEntireRowGroup(); $conv = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -1585,11 +1561,9 @@ public function testNestedLists(): void { /** * Tests reading of (purely) nested maps (and their keys/values) + * @requires extension snappy */ public function testNestedMaps(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('nested_maps.snappy.parquet')); $rgs = $reader->ReadEntireRowGroup(); $conv = new DataColumnsToArrayConverter($reader->schema, $rgs); @@ -1606,11 +1580,9 @@ public function testNestedMaps(): void { /** * Tests reading (purely) nested structs (groups) and their fields + * @requires extension snappy */ public function testNestedStructs(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } // // NOTE: nested_structs.rust.parquet (the original file) // has been compressed using ZSTD (codec 6) and for compatibility, diff --git a/tests/Reader/TestDataTest.php b/tests/Reader/TestDataTest.php index db7cb3b..061a2b1 100644 --- a/tests/Reader/TestDataTest.php +++ b/tests/Reader/TestDataTest.php @@ -49,11 +49,9 @@ public function testAllTypesGzipCompression(): void { /** * [testAllTypesSnappyCompression description] + * @requires extension snappy */ public function testAllTypesSnappyCompression(): void { - if(!extension_loaded('snappy')) { - static::markTestSkipped('ext-snappy unavailable'); - } $this->CompareFiles('types/alltypes', 'snappy', true, [ 'integer', 'boolean', diff --git a/tests/SchemaTest.php b/tests/SchemaTest.php index 6e7b4b1..7dc6c89 100644 --- a/tests/SchemaTest.php +++ b/tests/SchemaTest.php @@ -367,11 +367,9 @@ public function testListOfStructuresValidLevels(): void /** * [testBackwardCompatListWithOneArray description] + * @requires extension snappy */ public function testBackwardCompatListWithOneArray(): void { - if(!extension_loaded('snappy')) { - static::markTestSkipped('ext-snappy unavailable'); - } $input = $this->openTestFile('legacy-list-onearray.parquet'); $reader = new ParquetReader($input); $schema = $reader->schema; diff --git a/tests/SnappyTest.php b/tests/SnappyTest.php index c021eb8..6da5f65 100644 --- a/tests/SnappyTest.php +++ b/tests/SnappyTest.php @@ -12,14 +12,12 @@ final class SnappyTest extends TestBase */ protected function setUp(): void { - if(!extension_loaded('snappy')) { - static::markTestSkipped('ext-snappy unavailable'); - } parent::setUp(); } /** * Simple test for a working snappy extension + * @requires extension snappy */ public function testSimpleSnappy(): void { $sampleString = 'sample'; @@ -32,6 +30,7 @@ public function testSimpleSnappy(): void { /** * [testCompressDecompressRandomByteChunks description] + * @requires extension snappy */ public function testCompressDecompressRandomByteChunks(): void { for ($i=0; $i < 100; $i++) { diff --git a/tests/StatisticsTest.php b/tests/StatisticsTest.php index 7532480..ce71d86 100644 --- a/tests/StatisticsTest.php +++ b/tests/StatisticsTest.php @@ -56,7 +56,7 @@ public function testDistinctStatForBasicDataTypes(): void { 'Min' => "one", 'Max' => "two" // yes, it is! ]), - "string2" => new TestDesc([ + "string3" => new TestDesc([ 'Type' => 'string', // typeof(string), 'HasNulls' => true, 'Data' => [ "one", "two", "one", "three", null, "zzz" ], diff --git a/tests/VariousTest.php b/tests/VariousTest.php index 82675a9..27d94c4 100644 --- a/tests/VariousTest.php +++ b/tests/VariousTest.php @@ -8,11 +8,9 @@ final class VariousTest extends TestBase { /** * [testParquetTestingSingleNan description] + * @requires extension snappy */ public function testParquetTestingSingleNan(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } $reader = new ParquetReader($this->openTestFile('single_nan.parquet')); $columns = $reader->ReadEntireRowGroup(); $this->assertEquals(1, $reader->getThriftMetadata()->num_rows); @@ -22,11 +20,9 @@ public function testParquetTestingSingleNan(): void { /** * [testParquet1402Arrow5322 description] + * @requires extension snappy */ public function testParquet1402Arrow5322(): void { - if(!extension_loaded('snappy')) { - $this->markTestSkipped('ext-snappy unavailable'); - } $this->expectNotToPerformAssertions(); $reader = new ParquetReader($this->openTestFile('dict-page-offset-zero.parquet')); $columns = $reader->ReadEntireRowGroup(); diff --git a/tests/ZstdTest.php b/tests/ZstdTest.php new file mode 100644 index 0000000..41cb159 --- /dev/null +++ b/tests/ZstdTest.php @@ -0,0 +1,78 @@ +assertNotFalse($compressed); // Assert it's not FALSE (-> erroneous) + $this->assertEquals($sampleString, $uncompressed); + } + + /** + * [testCompressDecompressRandomByteChunks description] + */ + public function testCompressDecompressRandomByteChunks(): void { + for ($i=0; $i < 100; $i++) { + $this->Compress_decompress_random_byte_chunks($i); + } + } + + /** + * [Compress_decompress_random_byte_chunks description] + * @param int $index [description] + */ + public function Compress_decompress_random_byte_chunks(int $index): void + { + // $stage1 = RandomGenerator.GetRandomBytes(2, 1000); + // $stage1 = "A small string to compress esses a a press a mall tring \n"; // (string)random_bytes(1000); + $stage1 = random_bytes(1000); + $stage2 = null; // byte[] stage2; + $stage3 = null; // byte[] stage3; + + $source = fopen('php://memory', 'r+'); + + ZstdStreamWrapper::register(); + $zstd = ZstdStreamWrapper::createWrappedStream($source, 'r+', ZstdStreamWrapper::MODE_COMPRESS); + + fwrite($zstd, $stage1); + StreamHelper::MarkWriteFinished($zstd); + + // fseek($source, 0); // ? + $stage2 = stream_get_contents($source, -1, 0); + + $source = fopen('php://memory', 'r+'); + fwrite($source, $stage2); + fseek($source, 0); + + $zstd = ZstdStreamWrapper::createWrappedStream($source, 'r+', ZstdStreamWrapper::MODE_DECOMPRESS); + + $ms = fopen('php://memory', 'r+'); + + stream_copy_to_stream($zstd, $ms); + + $stage3 = stream_get_contents($ms, -1, 0); + + // validate + $this->assertEquals($stage1, $stage3); + } +}