|
| 1 | +from collections import defaultdict |
| 2 | + |
| 3 | +import pyarrow.parquet as pq |
| 4 | +import argparse |
| 5 | + |
| 6 | +from urllib.parse import urlparse |
| 7 | +from urllib.request import urlopen |
| 8 | +import boto3 |
| 9 | +import gzip |
| 10 | +from tqdm.auto import tqdm |
| 11 | + |
| 12 | + |
| 13 | +def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> bool: |
| 14 | + sort_column_index = next(i for i, name in enumerate(pf.schema.names) |
| 15 | + if name == column_name) |
| 16 | + |
| 17 | + # keep track of min/max in this ParquetFile |
| 18 | + whole_min = None |
| 19 | + whole_max = None |
| 20 | + prev_max = None |
| 21 | + for row_group_index in range(pf.num_row_groups): |
| 22 | + row_group = pf.metadata.row_group(row_group_index) |
| 23 | + column = row_group.column(sort_column_index) |
| 24 | + if prev_max is not None and prev_max > column.statistics.min: |
| 25 | + # internally unsorted |
| 26 | + print(f"row group {row_group_index} is not sorted on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") |
| 27 | + return False, None, None |
| 28 | + whole_min = column.statistics.min if whole_min is None else column.statistics.min |
| 29 | + whole_max = column.statistics.max if whole_max is None else column.statistics.max |
| 30 | + return True, whole_min, whole_max |
| 31 | + |
| 32 | + |
| 33 | +def is_full_table_sorted(file_or_s3_url_list_ordered: list[str], sort_column_name: str) -> bool: |
| 34 | + is_sorted = True |
| 35 | + prev_max = None |
| 36 | + prev_file_or_url = None |
| 37 | + status = defaultdict(int) |
| 38 | + with tqdm(file_or_s3_url_list_ordered) as pbar: |
| 39 | + for file_or_url in pbar: |
| 40 | + pf = pq.ParquetFile(file_or_url) |
| 41 | + this_is_sorted, pf_min, pf_max = are_parquet_file_row_groups_sorted(pf, column_name=sort_column_name) |
| 42 | + if not this_is_sorted: |
| 43 | + print( |
| 44 | + f"Row groups are *internally* not sorted in file {file_or_url}" |
| 45 | + ) |
| 46 | + is_sorted = False |
| 47 | + status['internally_unsorted'] += 1 |
| 48 | + |
| 49 | + if prev_max is not None and prev_max > pf_min: |
| 50 | + print(f"{prev_file_or_url} is not sorted with respect to {file_or_url}: '{prev_max}' > '{pf_min}'") |
| 51 | + status['filewise_unsorted'] += 1 |
| 52 | + pbar.set_postfix(status) |
| 53 | + prev_max = pf_max |
| 54 | + prev_file_or_url = file_or_url |
| 55 | + return is_sorted |
| 56 | + |
| 57 | + |
| 58 | +def is_gzip(content: bytes) -> bool: |
| 59 | + return content[:2] == b'\x1f\x8b' |
| 60 | + |
| 61 | + |
| 62 | +def read_file_list(path_or_url: str, prefix: str) -> list[str]: |
| 63 | + parsed = urlparse(path_or_url) |
| 64 | + if parsed.scheme == "s3": |
| 65 | + s3 = boto3.client("s3") |
| 66 | + bucket = parsed.netloc |
| 67 | + key = parsed.path.lstrip("/") |
| 68 | + obj = s3.get_object(Bucket=bucket, Key=key) |
| 69 | + content = obj["Body"].read() |
| 70 | + elif parsed.scheme in ("http", "https"): |
| 71 | + with urlopen(path_or_url) as f: |
| 72 | + content = f.read() |
| 73 | + else: |
| 74 | + with open(path_or_url, "r") as f: |
| 75 | + content = f.read() |
| 76 | + |
| 77 | + if is_gzip(content): |
| 78 | + content = gzip.decompress(content) |
| 79 | + lines = content.decode("utf-8").split("\n") |
| 80 | + return [prefix + line.strip() for line in lines if len(line.strip()) > 0] |
| 81 | + |
| 82 | + |
| 83 | +if __name__ == "__main__": |
| 84 | + parser = argparse.ArgumentParser(description="Check if a collection of Parquet files, considered as a whole, is sorted. Exit code is 0 if sorted, 1 if not sorted.") |
| 85 | + parser.add_argument("files_or_s3_urls_file", type=str, help="URI or path to a text file containing a list of paths or S3 URLs, one per line, in the expected sorted order.") |
| 86 | + parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") |
| 87 | + parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check sorting against (default: 'url_surtkey')") |
| 88 | + |
| 89 | + args = parser.parse_args() |
| 90 | + |
| 91 | + files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix) |
| 92 | + is_sorted = is_full_table_sorted(files, sort_column_name=args.column) |
| 93 | + if is_sorted: |
| 94 | + print("✅ Files are sorted") |
| 95 | + exit(0) |
| 96 | + else: |
| 97 | + print("❌ Files are NOT sorted") |
| 98 | + exit(1) |
0 commit comments