-
Notifications
You must be signed in to change notification settings - Fork 14
Add a tool to check if row groups .min / .max are strictly increasing within a parquet file
#40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
damian0815
wants to merge
9
commits into
main
Choose a base branch
from
damian/feat/is_table_sorted
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
4d4038c
is_table_sorted initial implementation
damian0815 0b229d9
wip tests
damian0815 2914f42
tests and fixes
9285ec6
reorganise
6d63937
file-level unit tests
b120571
don't fail if not filewise sorted
aeffeec
github workflow for python unit tests
a3484ac
fix github action
62f7a9a
add README details; clarify min/max row group checking vs full file c…
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| name: Python Unit Tests | ||
|
|
||
| on: | ||
| push: | ||
| branches: [ main ] | ||
| pull_request: | ||
| branches: [ main ] | ||
| workflow_dispatch: | ||
|
|
||
| jobs: | ||
| test: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| matrix: | ||
| python-version: ['3.10', '3.11', '3.12', '3.13'] | ||
| fail-fast: false | ||
|
|
||
| steps: | ||
| - uses: actions/checkout@v4 | ||
|
|
||
| - name: Set up Python ${{ matrix.python-version }} | ||
| uses: actions/setup-python@v5 | ||
| with: | ||
| python-version: ${{ matrix.python-version }} | ||
|
|
||
| - name: Install uv | ||
| run: | | ||
| curl -LsSf https://astral.sh/uv/install.sh | sh | ||
| echo "$HOME/.cargo/bin" >> $GITHUB_PATH | ||
|
|
||
| - name: Install dependencies | ||
| run: | | ||
| uv sync | ||
|
|
||
| - name: Run tests | ||
| run: | | ||
| uv run -m pytest src -v |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| [project] | ||
| name = "cc-index-table" | ||
| version = "0.1.0" | ||
| description = "Tools for working with Common Crawl index tables." | ||
| requires-python = ">=3.12" | ||
| dependencies = [ | ||
| "boto3>=1.40.61", | ||
| "pyarrow>=22.0.0", | ||
| "pytest>=8.4.2", | ||
| "tqdm>=4.67.1", | ||
| ] |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| from collections import defaultdict | ||
|
|
||
| import pyarrow.parquet as pq | ||
| import argparse | ||
|
|
||
| from urllib.parse import urlparse | ||
| import boto3 | ||
| import gzip | ||
| from tqdm.auto import tqdm | ||
|
|
||
|
|
||
| def are_parquet_file_row_groups_min_max_ordered(pf: pq.ParquetFile, column_name: str) -> bool: | ||
| sort_column_index = next(i for i, name in enumerate(pf.schema.names) | ||
| if name == column_name) | ||
|
|
||
| # keep track of min/max in this ParquetFile | ||
| whole_min = None | ||
| whole_max = None | ||
| prev_max = None | ||
| for row_group_index in range(pf.num_row_groups): | ||
| row_group = pf.metadata.row_group(row_group_index) | ||
| column = row_group.column(sort_column_index) | ||
| if prev_max is not None and prev_max > column.statistics.min: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Must prepared that no min/max statistics are available in a row group because of overlong URLs / SURT keys. Cf. PARQUET-1685: |
||
| print(f"row group {row_group_index} min is not strictly increasing w.r.t previous row group max on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") | ||
| return False | ||
| whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min) | ||
| whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max) | ||
| prev_max = column.statistics.max | ||
| return True | ||
|
|
||
|
|
||
| def are_all_parts_min_max_ordered(file_or_s3_url_list: list[str], sort_column_name: str) -> bool: | ||
| is_sorted = True | ||
| status = defaultdict(int) | ||
| with tqdm(file_or_s3_url_list) as pbar: | ||
| for file_or_url in pbar: | ||
| pf = pq.ParquetFile(file_or_url) | ||
| this_is_sorted = are_parquet_file_row_groups_min_max_ordered(pf, column_name=sort_column_name) | ||
| if not this_is_sorted: | ||
| print( | ||
| f"Row groups are *internally* not ordered by min/max in file {file_or_url}" | ||
| ) | ||
| is_sorted = False | ||
| status['internally_unsorted'] += 1 | ||
|
|
||
| pbar.set_postfix(status) | ||
| return is_sorted | ||
|
|
||
|
|
||
| def is_gzip(content: bytes) -> bool: | ||
| return content[:2] == b'\x1f\x8b' | ||
|
|
||
|
|
||
| def read_file_list(path_or_url: str, prefix: str) -> list[str]: | ||
| parsed = urlparse(path_or_url) | ||
| if parsed.scheme == "s3": | ||
| s3 = boto3.client("s3") | ||
| bucket = parsed.netloc | ||
| key = parsed.path.lstrip("/") | ||
| obj = s3.get_object(Bucket=bucket, Key=key) | ||
| content = obj["Body"].read() | ||
| else: | ||
| with open(path_or_url, "rb") as f: | ||
| content = f.read() | ||
|
|
||
| if is_gzip(content): | ||
| content = gzip.decompress(content) | ||
| lines = content.decode("utf-8").split("\n") | ||
| return [prefix + line.strip() for line in lines if len(line.strip()) > 0] | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser(description="Check if row groups within parquet files have strictly increasing non-overlapping min/max ranges. Exit code is 0 if sorted, 1 if not sorted.") | ||
| parser.add_argument("files_or_s3_urls_file", type=str, help="path or s3:// URI to a text file containing a list of paths, to check; used in combination with --prefix to recover individual file paths.") | ||
| parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") | ||
| parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check against (default: 'url_surtkey')") | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix) | ||
| is_sorted = are_all_parts_min_max_ordered(files, sort_column_name=args.column) | ||
| if is_sorted: | ||
| print("✅ Files are sorted") | ||
| exit(0) | ||
| else: | ||
| print("❌ Files are NOT sorted") | ||
| exit(1) | ||
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import random | ||
| from unittest.mock import MagicMock, patch | ||
|
|
||
| from util.are_part_min_max_increasing import are_parquet_file_row_groups_min_max_ordered, are_all_parts_min_max_ordered | ||
|
|
||
|
|
||
| def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): | ||
| mock_pf = MagicMock() | ||
| mock_pf.schema.names = [column_name] | ||
| mock_pf.num_row_groups = len(row_groups_stats) | ||
|
|
||
| mock_row_groups = [] | ||
| for min_val, max_val in row_groups_stats: | ||
| mock_row_group = MagicMock() | ||
| mock_column = MagicMock() | ||
| mock_column.statistics.min = min_val | ||
| mock_column.statistics.max = max_val | ||
| mock_row_group.column.return_value = mock_column | ||
| mock_row_groups.append(mock_row_group) | ||
|
|
||
| mock_pf.metadata.row_group.side_effect = lambda i: mock_row_groups[i] | ||
| return mock_pf | ||
|
|
||
|
|
||
| def test_single_row_group_sorted(): | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')]) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_sorted(): | ||
| all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] | ||
| for n in range(1, len(all_row_groups_stats)): | ||
| row_groups_stats = all_row_groups_stats[:n] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert is_sorted | ||
|
|
||
|
|
||
| def test_row_groups_unsorted(): | ||
| all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] | ||
| count = 0 | ||
| while count < 100: | ||
| for n in range(2, len(all_row_groups_stats)): | ||
| row_groups_stats = all_row_groups_stats[:n].copy() | ||
| random.shuffle(row_groups_stats) | ||
| if row_groups_stats == all_row_groups_stats[:n]: | ||
| # shuffle resulted in same order, try again | ||
| continue | ||
|
|
||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert not is_sorted | ||
|
|
||
| count += 1 | ||
|
|
||
|
|
||
| def test_row_groups_overlapping(): | ||
| row_groups = [('a', 'c'), ('b', 'd')] | ||
| mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) | ||
| is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') | ||
| assert not is_sorted | ||
|
|
||
|
|
||
| def test_ordered_files_sorted(): | ||
| files_config = { | ||
| '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], | ||
| '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], | ||
| '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], | ||
| } | ||
|
|
||
| def mock_parquet_file(path): | ||
| return _create_mock_parquet_file('url_surtkey', files_config[path]) | ||
|
|
||
| with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): | ||
| result = are_all_parts_min_max_ordered(['/data/a', '/data/b', '/data/c'], 'url_surtkey') | ||
| assert result | ||
|
|
||
|
|
||
| def test_ordered_files_unsorted(): | ||
| files_config = { | ||
| '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], | ||
| '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], | ||
| '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], | ||
| } | ||
|
|
||
| def mock_parquet_file(path): | ||
| return _create_mock_parquet_file('url_surtkey', files_config[path]) | ||
|
|
||
| with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): | ||
| result = are_all_parts_min_max_ordered(['/data/a', '/data/c', '/data/b'], 'url_surtkey') | ||
| assert result # we don't care about the order of files | ||
|
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this strict condition: values in
url_surtkeyare not unique and it may happen (although the probability is low) that two rows with the same SURT key end up in two row groups. Than the tool reports an error, although the column might be perfectly sorted.