From 6a3b3b2baa6582d25effa1d407773e8a314468b0 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Fri, 7 Nov 2025 12:47:08 +0700 Subject: [PATCH 01/11] Feature: Supporting the dimension columns in diffa --- src/diffa/cli.py | 9 ++ src/diffa/config.py | 12 +- src/diffa/db/data_models.py | 82 ++++++++---- src/diffa/db/diffa_check.py | 4 +- src/diffa/db/source_target.py | 64 ++++++++-- src/diffa/managers/check_manager.py | 109 +++++++++++++--- tests/managers/test_check_manager.py | 178 +++++++++++++++++++++------ 7 files changed, 366 insertions(+), 92 deletions(-) diff --git a/src/diffa/cli.py b/src/diffa/cli.py index 512764f..6592c7a 100644 --- a/src/diffa/cli.py +++ b/src/diffa/cli.py @@ -4,6 +4,7 @@ import click from alembic import command from alembic.config import Config +from typing import List from diffa.managers.check_manager import CheckManager from diffa.managers.run_manager import RunManager @@ -57,6 +58,12 @@ def cli(): type=str, help="Target table name.", ) +@click.option( + "--diff-dimenstions", + nargs=-1, + type=str, + help="Diff dimension columns.", +) def data_diff( *, source_db_uri: str = None, @@ -68,6 +75,7 @@ def data_diff( target_database: str = None, target_schema: str = "public", target_table: str, + diff_dimenstions: List[str] = None, ): config_manager = ConfigManager().configure( source_database=source_database, @@ -79,6 +87,7 @@ def data_diff( source_db_uri=source_db_uri, target_db_uri=target_db_uri, diffa_db_uri=diffa_db_uri, + diff_dimension_cols=diff_dimenstions, ) run_manager = RunManager(config_manager=config_manager) check_manager = CheckManager(config_manager=config_manager) diff --git a/src/diffa/config.py b/src/diffa/config.py index 9f6d8de..70f468f 100644 --- a/src/diffa/config.py +++ b/src/diffa/config.py @@ -3,6 +3,7 @@ from datetime import date from enum import Enum from urllib.parse import urlparse +from typing import List, Optional from diffa.utils import Logger @@ -103,8 +104,12 @@ def get_db_table(self): class SourceConfig(DBConfig): """A class to handle the configs for the Source DBs""" + def __init__(self, *args, diff_dimension_cols: Optional[List[str]] = None, **kwargs): + super().__init__(*args, **kwargs) + self.diff_dimension_cols = diff_dimension_cols or [] - + def get_diff_dimension_cols(self): + return self.diff_dimension_cols class DiffaConfig(DBConfig): """A class to handle the configs for the Diffa DB""" @@ -143,18 +148,21 @@ def configure( target_schema: str = "public", target_table: str, diffa_db_uri: str = None, + diff_dimension_cols: List[str] = None, ): self.source.update( db_uri=source_db_uri, db_name=source_database, db_schema=source_schema, db_table=source_table, + diff_dimension_cols=diff_dimension_cols, ) self.target.update( db_uri=target_db_uri, db_name=target_database, db_schema=target_schema, db_table=target_table, + diff_dimension_cols=diff_dimension_cols, ) self.diffa_check.update( db_uri=diffa_db_uri, @@ -203,7 +211,7 @@ def save_config(self, source_uri: str, target_uri: str, diffa_uri: str): logger.info("Configuration saved to successfully.") def __getattr__(self, __name: str) -> DBConfig: - """Dynamically access DBConfig attributes (e.g config_manager.source.database)""" + """Dynamically access DBConfig attributes (e.g config_manager.source.get_db_name())""" if __name in self.config: return self.config[__name] diff --git a/src/diffa/db/data_models.py b/src/diffa/db/data_models.py index c864adc..6f30348 100644 --- a/src/diffa/db/data_models.py +++ b/src/diffa/db/data_models.py @@ -1,6 +1,6 @@ from datetime import date -from typing import Optional -from dataclasses import dataclass, field +from typing import Optional, List, Tuple +from dataclasses import dataclass, field, fields, make_dataclass, asdict import uuid from sqlalchemy import ( @@ -148,15 +148,46 @@ def validate_status(self): return self -@dataclass +@dataclass(frozen=True) class CountCheck: """A single count check in Source/Target Database""" - cnt: int check_date: date + cnt: int + + @classmethod + def create_with_dimensions(cls, dimension_cols: Optional[List[str]] = None): + """Factory method to create a CountCheck class with dimension fields""" + + return make_dataclass( + cls.__name__, + [(col, str) for col in sorted(dimension_cols)] if dimension_cols else [], + bases=(cls,), + frozen=True, + ) + + @classmethod + def get_base_fields(cls) -> List[Tuple[str, type]]: + return [("check_date", date), ("cnt", int)] + + @classmethod + def get_dimension_fields(cls) -> List[Tuple[str, type]]: + base_fields = {name for name, _ in cls.get_base_fields()} + + return [(f.name, f.type) for f in fields(cls) if f.name not in base_fields] + + def get_dimension_values(self): + # check_date is still considered as a dimension field. In fact, it's a main dimension field. + return { + f[0]: getattr(self, f[0]) + for f in self.get_dimension_fields() + [("check_date", date)] + } + def to_flatten_dimension_format(self) -> dict: + return {tuple(self.get_dimension_values().items()): self} -@dataclass + +@dataclass(frozen=True) class MergedCountCheck: """A merged count check after checking count in Source/Target Databases""" @@ -166,33 +197,40 @@ class MergedCountCheck: is_valid: bool = field(init=False) def __post_init__(self): - self.is_valid = True if self.source_count <= self.target_count else False + object.__setattr__(self, 'is_valid', True if self.source_count <= self.target_count else False) def __eq__(self, other): if not isinstance(other, MergedCountCheck): return NotImplemented - return ( - self.source_count == other.source_count - and self.target_count == other.target_count - and self.check_date == other.check_date - and self.is_valid == other.is_valid - ) + return asdict(self) == asdict(other) + + def __lt__(self, other): + if not isinstance(other, MergedCountCheck): + return NotImplemented + return asdict(self) < asdict(other) + + def __str__(self): + field_strs = [f"{f.name}={getattr(self, f.name)}" for f in fields(self)] + return f"MergedCountCheck({', '.join(field_strs)})" + + @classmethod + def create_with_dimensions(cls, dimension_fields: List[Tuple[str, type]]): + """Factory method to dynamically create a MergedCountCheck with a CountCheck schema""" + + return make_dataclass(cls.__name__, dimension_fields, bases=(cls,), frozen=True, eq=False) @classmethod def from_counts( cls, source: Optional[CountCheck] = None, target: Optional[CountCheck] = None ): - if source and target: - if source.check_date != target.check_date: - raise ValueError("Source and target counts are not for the same date.") - elif not source and not target: - raise ValueError("Both source and target counts are missing.") + count_check = source if source else target + merged_count_check_values = count_check.get_dimension_values() + merged_count_check_values["source_count"] = source.cnt if source else 0 + merged_count_check_values["target_count"] = target.cnt if target else 0 - check_date = source.check_date if source else target.check_date - source_count = source.cnt if source else 0 - target_count = target.cnt if target else 0 - - return cls(source_count, target_count, check_date) + return cls.create_with_dimensions(count_check.get_dimension_fields())( + **merged_count_check_values + ) def to_diffa_check_schema( self, diff --git a/src/diffa/db/diffa_check.py b/src/diffa/db/diffa_check.py index af24a2f..a08eece 100644 --- a/src/diffa/db/diffa_check.py +++ b/src/diffa/db/diffa_check.py @@ -6,7 +6,7 @@ from sqlalchemy.dialects.postgresql import insert from diffa.db.connect import DiffaConnection -from diffa.config import DBConfig, ConfigManager, DIFFA_BEGIN_DATE +from diffa.config import DiffaConfig, ConfigManager, DIFFA_BEGIN_DATE from diffa.db.data_models import ( DiffaCheckSchema, DiffaCheck, @@ -20,7 +20,7 @@ class DiffaCheckDatabase: """SQLAlchemy Database Adapter for Diffa state management""" - def __init__(self, db_config: DBConfig): + def __init__(self, db_config: DiffaConfig): self.db_config = db_config self.conn = DiffaConnection(self.db_config.get_db_config()) diff --git a/src/diffa/db/source_target.py b/src/diffa/db/source_target.py index 96cdad9..0f7fa87 100644 --- a/src/diffa/db/source_target.py +++ b/src/diffa/db/source_target.py @@ -1,12 +1,13 @@ from datetime import date -from typing import List, Iterable +from typing import List, Iterable, Optional from concurrent.futures import ThreadPoolExecutor +from functools import partial import psycopg2.extras from diffa.utils import Logger from diffa.db.connect import PostgresConnection -from diffa.config import DBConfig +from diffa.config import SourceConfig from diffa.db.data_models import CountCheck from diffa.config import ConfigManager @@ -16,7 +17,7 @@ class SourceTargetDatabase: """Base class for the Source Target DB handling""" - def __init__(self, db_config: DBConfig) -> None: + def __init__(self, db_config: SourceConfig) -> None: self.db_config = db_config self.conn = PostgresConnection(self.db_config.get_db_config()) @@ -34,7 +35,10 @@ def _execute_query(self, query: str, sql_params: tuple = None): raise e def _build_count_query( - self, latest_check_date: date, invalid_check_dates: List[date] + self, + latest_check_date: date, + invalid_check_dates: List[date], + diff_dimension_cols: Optional[List[str]] = None, ): backfill_where_clause = ( f" (created_at::DATE IN ({','.join([f"'{date}'" for date in invalid_check_dates])})) OR" @@ -47,21 +51,44 @@ def _build_count_query( created_at::DATE <= CURRENT_DATE - INTERVAL '2 DAY' ) """ + group_by_diff_dimensions_clause = ( + f"GROUP BY {','.join(diff_dimension_cols)}" if diff_dimension_cols else "" + ) + select_diff_dimensions_clause = ( + f"{','.join([f'{col}::text' for col in diff_dimension_cols])}" + if diff_dimension_cols + else "" + ) + return f""" SELECT created_at::DATE as check_date, - COUNT(*) AS cnt + COUNT(*) AS cnt, + {select_diff_dimensions_clause} FROM {self.db_config.get_db_schema()}.{self.db_config.get_db_table()} WHERE {backfill_where_clause} {catchup_where_clause} GROUP BY created_at::DATE + {group_by_diff_dimensions_clause} ORDER BY created_at::DATE ASC """ def count(self, latest_check_date: date, invalid_check_dates: List[date]): - count_query = self._build_count_query(latest_check_date, invalid_check_dates) + if self.db_config.get_diff_dimension_cols(): + count_query = self._build_count_query( + latest_check_date, + invalid_check_dates, + self.db_config.get_diff_dimension_cols(), + ) + logger.warning( + "Diff dimensions are enabled. May impact the performance of the query" + ) + else: + count_query = self._build_count_query( + latest_check_date, invalid_check_dates + ) logger.info( f"Executing the count query on {self.db_config.get_db_scheme()}: {count_query}" ) @@ -77,8 +104,15 @@ def __init__(self, config_manager: ConfigManager): def get_counts( self, last_check_date: date, invalid_check_dates: Iterable[date] ) -> Iterable[CountCheck]: - def to_count_check(count_dict: dict) -> CountCheck: - return CountCheck(**count_dict) + def to_count_check( + count_dict: dict, diff_dimension_cols: Optional[List[str]] = None + ) -> CountCheck: + if diff_dimension_cols: + return CountCheck.create_with_dimensions(diff_dimension_cols)( + **count_dict + ) + else: + return CountCheck(**count_dict) with ThreadPoolExecutor(max_workers=2) as executor: future_source_count = executor.submit( @@ -92,4 +126,16 @@ def to_count_check(count_dict: dict) -> CountCheck: future_source_count.result(), future_target_count.result(), ) - return map(to_count_check, source_counts), map(to_count_check, target_counts) + return map( + partial( + to_count_check, + diff_dimension_cols=self.source_db.db_config.get_diff_dimension_cols(), + ), + source_counts, + ), map( + partial( + to_count_check, + diff_dimension_cols=self.target_db.db_config.get_diff_dimension_cols(), + ), + target_counts, + ) diff --git a/src/diffa/managers/check_manager.py b/src/diffa/managers/check_manager.py index 28049c2..3710b05 100644 --- a/src/diffa/managers/check_manager.py +++ b/src/diffa/managers/check_manager.py @@ -1,4 +1,7 @@ from typing import Iterable +from datetime import date +from collections import defaultdict +from functools import reduce from diffa.db.data_models import CountCheck, MergedCountCheck from diffa.db.diffa_check import DiffaCheckService @@ -20,9 +23,9 @@ def data_diff(self): """This will interupt the process when there are invalid diff found.""" if self.compare_tables(): - logger.error("There is an invalid diff between source and target.") + logger.error("❌ There is an invalid diff between source and target.") raise InvalidDiffException - logger.info("There is no invalid diff between source and target.") + logger.info("✅ There is no invalid diff between source and target.") def compare_tables(self): """Data-diff comparison service. Will return True if there is any invalid diff.""" @@ -45,6 +48,7 @@ def compare_tables(self): last_check_date, invalid_check_dates ) merged_count_checks = self._merge_count_checks(source_counts, target_counts) + merged_by_date = self._merge_by_check_date(merged_count_checks) # Step 4: Save the merged count checks to the diffa database self.diffa_check_service.save_diffa_checks( @@ -57,20 +61,81 @@ def compare_tables(self): target_schema=self.cm.target.get_db_schema(), target_table=self.cm.target.get_db_table(), ), - merged_count_checks, + merged_by_date.values(), ) ) + # Step 5: Build and log the check summary + self._build_check_summary(merged_count_checks, merged_by_date) + # Return True if there is any invalid diff - return self._check_if_invalid_diff(merged_count_checks) + return any(not mcc.is_valid for mcc in merged_by_date.values()) + + def _build_check_summary( + self, + merged_count_checks: Iterable[MergedCountCheck], + merged_by_date: dict[date, MergedCountCheck], + ): + stats_by_day = { + check_date: { + "detailed_msgs": self._get_check_messages( + self._get_checks_by_date(merged_count_checks, check_date) + ), + "summary_msg": self._get_check_messages([mcc])[0], + } + for check_date, mcc in merged_by_date.items() + } + + summary_lines = [ + f""" + - {check_date}: + summary: + {stats['summary_msg']} + detailed: + {stats['detailed_msgs']} + """ + for check_date, stats in stats_by_day.items() + ] + stats_summary = "\n".join(summary_lines) - def _check_if_invalid_diff( - self, merged_count_checks: Iterable[MergedCountCheck] - ) -> bool: - for merged_count_check in merged_count_checks: - if not merged_count_check.is_valid: - return True - return False + logger.info( + f""" + Data-diff comparison result: + Summary: + - Total days checked: {len(stats_by_day)} + - Stats by day: + {stats_summary} + """ + ) + + @staticmethod + def _get_check_messages(merged_count_checks: Iterable[MergedCountCheck]): + return [ + f"{'✅ No Diff' if mcc.is_valid else '❌ Diff'} {mcc}" + for mcc in merged_count_checks + ] + + @staticmethod + def _get_checks_by_date( + merged_count_checks: Iterable[MergedCountCheck], check_date: date + ) -> list[MergedCountCheck]: + return [mcc for mcc in merged_count_checks if mcc.check_date == check_date] + + @staticmethod + def _merge_by_check_date( + merged_count_checks: Iterable[MergedCountCheck], + ) -> dict[date, MergedCountCheck]: + merged = defaultdict( + lambda: dict(check_date=None, source_count=0, target_count=0, is_valid=True) + ) + for mcc in merged_count_checks: + entry = merged[mcc.check_date] + entry["source_count"] += mcc.source_count + entry["target_count"] += mcc.target_count + entry["is_valid"] &= mcc.is_valid + entry["check_date"] = mcc.check_date + + return {cd: MergedCountCheck(**data) for cd, data in merged.items()} def _merge_count_checks( self, source_counts: Iterable[CountCheck], target_counts: Iterable[CountCheck] @@ -83,18 +148,26 @@ def _merge_count_checks( Output [(1,0), (2,2), (0,4), (5,5), (6,0), (0,7)] """ - source_dict = {count.check_date: count for count in source_counts} - target_dict = {count.check_date: count for count in target_counts} + source_dict = reduce( + lambda x, y: x | y, + map(lambda x: x.to_flatten_dimension_format(), source_counts), + {}, + ) + target_dict = reduce( + lambda x, y: x | y, + map(lambda x: x.to_flatten_dimension_format(), target_counts), + {}, + ) - all_dates = set(source_dict.keys()) | set(target_dict.keys()) + all_dims = set(source_dict.keys()) | set(target_dict.keys()) merged_count_checks = [] - for check_date in all_dates: - source_count = source_dict.get(check_date) - target_count = target_dict.get(check_date) + for dim in all_dims: + source_count = source_dict.get(dim) + target_count = target_dict.get(dim) merged_count_check = MergedCountCheck.from_counts( source_count, target_count ) merged_count_checks.append(merged_count_check) - return merged_count_checks + return sorted(merged_count_checks, key=lambda x: x.check_date) diff --git a/tests/managers/test_check_manager.py b/tests/managers/test_check_manager.py index b419c75..84021fe 100644 --- a/tests/managers/test_check_manager.py +++ b/tests/managers/test_check_manager.py @@ -71,7 +71,34 @@ def check_manager(): ) ], ), - # Case 4: Checking dates are in neither source nor target + # Case 4: Checking different dates in source and target + ( + [ + CountCheck( + cnt=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + ) + ], + [ + CountCheck( + cnt=200, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + ) + ], + [ + MergedCountCheck( + source_count=200, + target_count=0, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + ), + MergedCountCheck( + source_count=0, + target_count=200, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + ), + ], + ), + # Case 5: Checking dates are in neither source nor target ([], [], []), ], ) @@ -81,60 +108,133 @@ def test__merge_count_check( merged_counts = check_manager._merge_count_checks(source_counts, target_counts) assert expected_merged_counts == merged_counts - @pytest.mark.parametrize( - "merged_count_checks, expected_result", + "source_counts, target_counts, expected_merged_counts", [ - # Case 1: All merged count checks are valid - [ + # Case 1: Checking dates are in both source and target + ( [ - MergedCountCheck( - source_count=100, - target_count=100, + CountCheck.create_with_dimensions(["status", "country"])( + cnt=100, check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), - ), - MergedCountCheck( + status="True", + country="US" + ) + ], + [ + CountCheck.create_with_dimensions(["status", "country"])( + cnt=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ) + ], + [ + MergedCountCheck.create_with_dimensions(["status", "country"])( source_count=100, - target_count=150, - check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), - ), + target_count=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ) ], - False, - ], - # Case 2: All merged count checks are invalid - [ + ), + # Case 2: Checking dates are in source only + ( [ - MergedCountCheck( - source_count=150, - target_count=100, + CountCheck.create_with_dimensions( + ["status", "country"])( + cnt=100, check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), - ), + status="True", + country="US" + ) ], - True, - ], - # Case 3: Mixed valid and invalid merged count checks - [ + [], [ - MergedCountCheck( + MergedCountCheck.create_with_dimensions(["status", "country"])( source_count=100, - target_count=100, + target_count=0, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ) + ], + ), + # Case 3: Checking dates are in target only + ( + [], + [ + CountCheck.create_with_dimensions(["status", "country"])( + cnt=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ) + ], + [ + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=0, + target_count=200, check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ) + ], + ), + # Case 4: Checking different dates in source and target + ( + [ + CountCheck.create_with_dimensions(["status", "country"])( + cnt=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" ), - MergedCountCheck( - source_count=150, - target_count=100, + CountCheck.create_with_dimensions(["status", "country"])( + cnt=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="Singapore" + ) + ], + [ + CountCheck.create_with_dimensions(["status", "country"])( + cnt=200, check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + status="False", + country="US" + ) + ], + [ + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=200, + target_count=0, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="Singapore" ), - MergedCountCheck( - source_count=100, - target_count=150, - check_date=datetime.strptime("2024-01-03", "%Y-%m-%d").date(), + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=200, + target_count=0, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ), + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=0, + target_count=200, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + status="False", + country="US" ), ], - True, - ], + ), + # Case 5: Checking dates are in neither source nor target + ([], [], []), + ], ) -def test__check_if_invalid_diff(check_manager, merged_count_checks, expected_result): - is_invalid_diff = check_manager._check_if_invalid_diff(merged_count_checks) - assert is_invalid_diff == expected_result +def test__merge_count_check_with_dimensions(check_manager, source_counts, target_counts, expected_merged_counts): + merged_counts = check_manager._merge_count_checks(source_counts, target_counts) + assert expected_merged_counts == merged_counts \ No newline at end of file From c540a8975895af3d520e465abf2d1c9e846e7e03 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Fri, 7 Nov 2025 15:08:47 +0700 Subject: [PATCH 02/11] Convert MergedCountCheck to normal class (get rid of too complicated dataclass) --- src/diffa/db/data_models.py | 64 ++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/src/diffa/db/data_models.py b/src/diffa/db/data_models.py index 6f30348..583edac 100644 --- a/src/diffa/db/data_models.py +++ b/src/diffa/db/data_models.py @@ -1,6 +1,7 @@ from datetime import date -from typing import Optional, List, Tuple -from dataclasses import dataclass, field, fields, make_dataclass, asdict +from typing import Optional, List, Tuple, Any +from dataclasses import dataclass, fields, make_dataclass +from functools import reduce import uuid from sqlalchemy import ( @@ -187,37 +188,64 @@ def to_flatten_dimension_format(self) -> dict: return {tuple(self.get_dimension_values().items()): self} -@dataclass(frozen=True) class MergedCountCheck: """A merged count check after checking count in Source/Target Databases""" - source_count: int - target_count: int - check_date: date - is_valid: bool = field(init=False) - - def __post_init__(self): - object.__setattr__(self, 'is_valid', True if self.source_count <= self.target_count else False) + def __init__( + self, + source_count: int, + target_count: int, + check_date: date, + is_valid: Optional[bool] = None, + **kwargs: Any, + ): + self.source_count = source_count + self.target_count = target_count + self.check_date = check_date + for key, value in kwargs.items(): + setattr(self, key, value) + + self.is_valid = ( + is_valid if is_valid is not None else source_count <= target_count + ) def __eq__(self, other): if not isinstance(other, MergedCountCheck): return NotImplemented - return asdict(self) == asdict(other) - + return self.__dict__ == other.__dict__ + def __lt__(self, other): if not isinstance(other, MergedCountCheck): return NotImplemented - return asdict(self) < asdict(other) + dynamic_fields = [ + f + for f in self.__dict__.keys() + if f not in ["source_count", "target_count", "check_date", "is_valid"] + ] + precedence = ( + ["check_date"] + + dynamic_fields + + ["source_count", "target_count", "is_valid"] + ) + + return tuple(getattr(self, f) for f in precedence) < tuple( + getattr(other, f) for f in precedence + ) def __str__(self): - field_strs = [f"{f.name}={getattr(self, f.name)}" for f in fields(self)] - return f"MergedCountCheck({', '.join(field_strs)})" + return f"MergedCountCheck({", ".join(f"{k}={v!r}" for k, v in self.__dict__.items())})" @classmethod def create_with_dimensions(cls, dimension_fields: List[Tuple[str, type]]): """Factory method to dynamically create a MergedCountCheck with a CountCheck schema""" - return make_dataclass(cls.__name__, dimension_fields, bases=(cls,), frozen=True, eq=False) + return type( + cls.__name__, + (cls,), + reduce( + lambda x, y: x | y, map(lambda x: {x[0]: x[1]}, dimension_fields), {} + ), + ) @classmethod def from_counts( @@ -228,9 +256,7 @@ def from_counts( merged_count_check_values["source_count"] = source.cnt if source else 0 merged_count_check_values["target_count"] = target.cnt if target else 0 - return cls.create_with_dimensions(count_check.get_dimension_fields())( - **merged_count_check_values - ) + return cls(**merged_count_check_values) def to_diffa_check_schema( self, From 378c13510b6af20e5f2ca6054609298ba6781869 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Fri, 7 Nov 2025 15:08:59 +0700 Subject: [PATCH 03/11] adding unit tests --- tests/managers/test_check_manager.py | 118 ++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 1 deletion(-) diff --git a/tests/managers/test_check_manager.py b/tests/managers/test_check_manager.py index 84021fe..3345b70 100644 --- a/tests/managers/test_check_manager.py +++ b/tests/managers/test_check_manager.py @@ -237,4 +237,120 @@ def test__merge_count_check( ) def test__merge_count_check_with_dimensions(check_manager, source_counts, target_counts, expected_merged_counts): merged_counts = check_manager._merge_count_checks(source_counts, target_counts) - assert expected_merged_counts == merged_counts \ No newline at end of file + assert expected_merged_counts == merged_counts + + +@pytest.mark.parametrize( + "merged_count_checks, expected_merged_by_date", + [ + # Case 1: Merge count checks by check date with base dimension field + ( + [ + MergedCountCheck( + source_count=100, + target_count=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date() + ), + MergedCountCheck( + source_count=300, + target_count=400, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date() + ), + ], + { + datetime.strptime("2024-01-01", "%Y-%m-%d").date(): MergedCountCheck( + source_count=100, + target_count=200, + is_valid=True, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date() + ), + datetime.strptime("2024-01-02", "%Y-%m-%d").date(): MergedCountCheck( + source_count=300, + target_count=400, + is_valid=True, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date() + ), + } + ), + # Case 2: Merge count checks by check date with 1 dimension field (happy case) + ( + [ + MergedCountCheck.create_with_dimensions(["status"])( + source_count=100, + target_count=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True" + ), + MergedCountCheck.create_with_dimensions(["status"])( + source_count=200, + target_count=300, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="False" + ), + MergedCountCheck.create_with_dimensions(["status"])( + source_count=400, + target_count=300, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + status="True" + ), + ], + { + datetime.strptime("2024-01-01", "%Y-%m-%d").date(): MergedCountCheck( + source_count=300, + target_count=500, + is_valid=True, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + ), + datetime.strptime("2024-01-02", "%Y-%m-%d").date(): MergedCountCheck( + source_count=400, + target_count=300, + is_valid=False, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + ), + } + ), + # Case 3: Merge count checks by check date with 2 dimension fields (unhappy case: dimenssion failure => invalid diff) + ( + [ + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=100, + target_count=200, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="True", + country="US" + ), + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=200, + target_count=100, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + status="False", + country="US" + ), + MergedCountCheck.create_with_dimensions(["status", "country"])( + source_count=300, + target_count=400, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + status="True", + country="Singapore" + ), + ], + { + datetime.strptime("2024-01-01", "%Y-%m-%d").date(): MergedCountCheck( + source_count=300, + target_count=300, + is_valid=False, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date(), + ), + datetime.strptime("2024-01-02", "%Y-%m-%d").date(): MergedCountCheck( + source_count=300, + target_count=400, + is_valid=True, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date(), + ), + } + ), + ] +) +def test__merge_by_check_date(check_manager, merged_count_checks, expected_merged_by_date): + merged_by_date = check_manager._merge_by_check_date(merged_count_checks) + assert expected_merged_by_date == merged_by_date From e4ecea7e7d220f4019c6373275f000feab5b5742 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Fri, 7 Nov 2025 15:30:09 +0700 Subject: [PATCH 04/11] adding the unit test --- src/diffa/managers/check_manager.py | 5 ++- tests/managers/test_check_manager.py | 46 ++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/src/diffa/managers/check_manager.py b/src/diffa/managers/check_manager.py index 3710b05..f06c809 100644 --- a/src/diffa/managers/check_manager.py +++ b/src/diffa/managers/check_manager.py @@ -69,7 +69,10 @@ def compare_tables(self): self._build_check_summary(merged_count_checks, merged_by_date) # Return True if there is any invalid diff - return any(not mcc.is_valid for mcc in merged_by_date.values()) + return self._check_if_valid_diff(merged_by_date.values()) + + def _check_if_valid_diff(self, merged_by_date: list[MergedCountCheck]) -> bool: + return all(mcc.is_valid for mcc in merged_by_date) def _build_check_summary( self, diff --git a/tests/managers/test_check_manager.py b/tests/managers/test_check_manager.py index 3345b70..b6370f9 100644 --- a/tests/managers/test_check_manager.py +++ b/tests/managers/test_check_manager.py @@ -354,3 +354,49 @@ def test__merge_count_check_with_dimensions(check_manager, source_counts, target def test__merge_by_check_date(check_manager, merged_count_checks, expected_merged_by_date): merged_by_date = check_manager._merge_by_check_date(merged_count_checks) assert expected_merged_by_date == merged_by_date + +@pytest.mark.parametrize( + "merged_by_date, expected_is_valid_diff", + [ + # Case 1: Happy case + ( + [ + MergedCountCheck( + source_count=100, + target_count=200, + is_valid=True, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date() + ), + MergedCountCheck( + source_count=200, + target_count=200, + is_valid=True, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date() + ), + ], + True + ), + # Case 2: Unhappy case + ( + [ + MergedCountCheck( + source_count=100, + target_count=200, + is_valid=True, + check_date=datetime.strptime("2024-01-01", "%Y-%m-%d").date() + ), + MergedCountCheck( + source_count=200, + target_count=200, + is_valid=False, + check_date=datetime.strptime("2024-01-02", "%Y-%m-%d").date() + ), + ], + False + ) + ], + +) +def test__check_if_valid_diff(check_manager, merged_by_date, expected_is_valid_diff): + is_valid_diff = check_manager._check_if_valid_diff(merged_by_date) + assert is_valid_diff == expected_is_valid_diff From d71795e22e447a41c6b36b5b22628c095454ba24 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Fri, 7 Nov 2025 16:24:29 +0700 Subject: [PATCH 05/11] fixing the nit --- src/diffa/cli.py | 9 ++++----- src/diffa/db/source_target.py | 5 ++--- src/diffa/managers/check_manager.py | 4 ++-- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/diffa/cli.py b/src/diffa/cli.py index 6592c7a..9bee573 100644 --- a/src/diffa/cli.py +++ b/src/diffa/cli.py @@ -4,7 +4,6 @@ import click from alembic import command from alembic.config import Config -from typing import List from diffa.managers.check_manager import CheckManager from diffa.managers.run_manager import RunManager @@ -59,8 +58,8 @@ def cli(): help="Target table name.", ) @click.option( - "--diff-dimenstions", - nargs=-1, + "--diff-dimensions", + multiple=True, type=str, help="Diff dimension columns.", ) @@ -75,7 +74,7 @@ def data_diff( target_database: str = None, target_schema: str = "public", target_table: str, - diff_dimenstions: List[str] = None, + diff_dimensions: tuple = None, ): config_manager = ConfigManager().configure( source_database=source_database, @@ -87,7 +86,7 @@ def data_diff( source_db_uri=source_db_uri, target_db_uri=target_db_uri, diffa_db_uri=diffa_db_uri, - diff_dimension_cols=diff_dimenstions, + diff_dimension_cols=list(diff_dimensions) if diff_dimensions else None, ) run_manager = RunManager(config_manager=config_manager) check_manager = CheckManager(config_manager=config_manager) diff --git a/src/diffa/db/source_target.py b/src/diffa/db/source_target.py index 0f7fa87..8008b74 100644 --- a/src/diffa/db/source_target.py +++ b/src/diffa/db/source_target.py @@ -52,7 +52,7 @@ def _build_count_query( ) """ group_by_diff_dimensions_clause = ( - f"GROUP BY {','.join(diff_dimension_cols)}" if diff_dimension_cols else "" + f"{','.join(diff_dimension_cols)}" if diff_dimension_cols else "" ) select_diff_dimensions_clause = ( f"{','.join([f'{col}::text' for col in diff_dimension_cols])}" @@ -69,8 +69,7 @@ def _build_count_query( WHERE {backfill_where_clause} {catchup_where_clause} - GROUP BY created_at::DATE - {group_by_diff_dimensions_clause} + GROUP BY created_at::DATE, {group_by_diff_dimensions_clause} ORDER BY created_at::DATE ASC """ diff --git a/src/diffa/managers/check_manager.py b/src/diffa/managers/check_manager.py index f06c809..e84fcaf 100644 --- a/src/diffa/managers/check_manager.py +++ b/src/diffa/managers/check_manager.py @@ -22,7 +22,7 @@ def __init__(self, config_manager: ConfigManager): def data_diff(self): """This will interupt the process when there are invalid diff found.""" - if self.compare_tables(): + if not self.compare_tables(): logger.error("❌ There is an invalid diff between source and target.") raise InvalidDiffException logger.info("✅ There is no invalid diff between source and target.") @@ -99,7 +99,7 @@ def _build_check_summary( """ for check_date, stats in stats_by_day.items() ] - stats_summary = "\n".join(summary_lines) + stats_summary = "\n".join(summary_lines) if summary_lines else "No stats available" logger.info( f""" From 30e448f16091dabbe75732e6ef3174dab22cd9a3 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Fri, 7 Nov 2025 16:57:47 +0700 Subject: [PATCH 06/11] fixing the nits --- src/diffa/db/data_models.py | 2 +- src/diffa/db/source_target.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/diffa/db/data_models.py b/src/diffa/db/data_models.py index 583edac..559954d 100644 --- a/src/diffa/db/data_models.py +++ b/src/diffa/db/data_models.py @@ -153,8 +153,8 @@ def validate_status(self): class CountCheck: """A single count check in Source/Target Database""" - check_date: date cnt: int + check_date: date @classmethod def create_with_dimensions(cls, dimension_cols: Optional[List[str]] = None): diff --git a/src/diffa/db/source_target.py b/src/diffa/db/source_target.py index 8008b74..9c418fe 100644 --- a/src/diffa/db/source_target.py +++ b/src/diffa/db/source_target.py @@ -52,10 +52,10 @@ def _build_count_query( ) """ group_by_diff_dimensions_clause = ( - f"{','.join(diff_dimension_cols)}" if diff_dimension_cols else "" + f", {','.join(diff_dimension_cols)}" if diff_dimension_cols else "" ) select_diff_dimensions_clause = ( - f"{','.join([f'{col}::text' for col in diff_dimension_cols])}" + f", {','.join([f'{col}::text' for col in diff_dimension_cols])}" if diff_dimension_cols else "" ) @@ -63,13 +63,14 @@ def _build_count_query( return f""" SELECT created_at::DATE as check_date, - COUNT(*) AS cnt, + COUNT(*) AS cnt {select_diff_dimensions_clause} FROM {self.db_config.get_db_schema()}.{self.db_config.get_db_table()} WHERE {backfill_where_clause} {catchup_where_clause} - GROUP BY created_at::DATE, {group_by_diff_dimensions_clause} + GROUP BY created_at::DATE + {group_by_diff_dimensions_clause} ORDER BY created_at::DATE ASC """ From bd58f68c31205dddb4796dd6e819a832b970f837 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Mon, 10 Nov 2025 10:09:12 +0700 Subject: [PATCH 07/11] full-diff mode support --- src/diffa/cli.py | 7 +++++++ src/diffa/config.py | 8 +++++++- src/diffa/db/diffa_check.py | 23 ++++++++++++++++++++--- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/diffa/cli.py b/src/diffa/cli.py index 9bee573..9ff412c 100644 --- a/src/diffa/cli.py +++ b/src/diffa/cli.py @@ -63,6 +63,11 @@ def cli(): type=str, help="Diff dimension columns.", ) +@click.option( + "--full-diff", + is_flag=True, + help="Full diff mode. Not saving the diffa checks to the database.", +) def data_diff( *, source_db_uri: str = None, @@ -75,6 +80,7 @@ def data_diff( target_schema: str = "public", target_table: str, diff_dimensions: tuple = None, + full_diff: bool = False, ): config_manager = ConfigManager().configure( source_database=source_database, @@ -87,6 +93,7 @@ def data_diff( target_db_uri=target_db_uri, diffa_db_uri=diffa_db_uri, diff_dimension_cols=list(diff_dimensions) if diff_dimensions else None, + full_diff=full_diff, ) run_manager = RunManager(config_manager=config_manager) check_manager = CheckManager(config_manager=config_manager) diff --git a/src/diffa/config.py b/src/diffa/config.py index 70f468f..a8cf057 100644 --- a/src/diffa/config.py +++ b/src/diffa/config.py @@ -112,8 +112,12 @@ def get_diff_dimension_cols(self): return self.diff_dimension_cols class DiffaConfig(DBConfig): """A class to handle the configs for the Diffa DB""" + def __init__(self, *args, full_diff: bool = False, **kwargs): + super().__init__(*args, **kwargs) + self.full_diff = full_diff - + def is_full_diff(self): + return self.full_diff class ConfigManager: """Manage all the configuration needed for Diffa Operations""" @@ -149,6 +153,7 @@ def configure( target_table: str, diffa_db_uri: str = None, diff_dimension_cols: List[str] = None, + full_diff: bool = False, ): self.source.update( db_uri=source_db_uri, @@ -166,6 +171,7 @@ def configure( ) self.diffa_check.update( db_uri=diffa_db_uri, + full_diff=full_diff, ) self.diffa_check_run.update( db_uri=diffa_db_uri, diff --git a/src/diffa/db/diffa_check.py b/src/diffa/db/diffa_check.py index a08eece..c1e8f9f 100644 --- a/src/diffa/db/diffa_check.py +++ b/src/diffa/db/diffa_check.py @@ -103,6 +103,7 @@ class DiffaCheckService: def __init__(self, config_manager: ConfigManager): self.config_manager = config_manager self.diffa_db = DiffaCheckDatabase(self.config_manager.diffa_check) + self.is_full_diff = self.config_manager.diffa_check.is_full_diff() def get_last_check_date(self) -> date: @@ -115,8 +116,16 @@ def get_last_check_date(self) -> date: target_table=self.config_manager.target.get_db_table(), ) - check_date = latest_check["check_date"] if latest_check else DIFFA_BEGIN_DATE - logger.info(f"Last check date: {check_date}") + if not self.is_full_diff: + check_date = ( + latest_check["check_date"] if latest_check else DIFFA_BEGIN_DATE + ) + logger.info(f"Last check date: {check_date}") + else: + check_date = DIFFA_BEGIN_DATE + logger.info( + f"Full diff mode is enabled. Checking from the beginning. Last check date: {check_date}" + ) return check_date @@ -134,7 +143,9 @@ def get_invalid_check_dates(self) -> Iterable[date]: invalid_check_dates = [ invalid_check["check_date"] for invalid_check in invalid_checks ] - if len(invalid_check_dates) > 0: + if self.is_full_diff: + return None + elif len(invalid_check_dates) > 0: logger.info( f"The number of invalid check dates is: {len(invalid_check_dates)}" ) @@ -146,6 +157,12 @@ def get_invalid_check_dates(self) -> Iterable[date]: def save_diffa_checks(self, merged_count_check_schemas: Iterable[DiffaCheckSchema]): """Upsert all the merged count checks to the diffa database""" + if self.is_full_diff: + logger.info( + "Full diff mode is enabled. Not saving diffa checks to the database." + ) + return + diffa_checks = [ diffa_check.model_dump() for diffa_check in merged_count_check_schemas ] From 1de341d400dff9dc50c04312113e8865da7cc587 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Mon, 10 Nov 2025 10:12:00 +0700 Subject: [PATCH 08/11] adding trailing line --- src/diffa/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/diffa/config.py b/src/diffa/config.py index a8cf057..22642fb 100644 --- a/src/diffa/config.py +++ b/src/diffa/config.py @@ -118,6 +118,7 @@ def __init__(self, *args, full_diff: bool = False, **kwargs): def is_full_diff(self): return self.full_diff + class ConfigManager: """Manage all the configuration needed for Diffa Operations""" From 9251a453f7c626a9689ef645df44afc9ae8c7a24 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Mon, 10 Nov 2025 11:46:38 +0700 Subject: [PATCH 09/11] still save the diff check results into the DB --- src/diffa/db/diffa_check.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/diffa/db/diffa_check.py b/src/diffa/db/diffa_check.py index c1e8f9f..23c031b 100644 --- a/src/diffa/db/diffa_check.py +++ b/src/diffa/db/diffa_check.py @@ -157,12 +157,6 @@ def get_invalid_check_dates(self) -> Iterable[date]: def save_diffa_checks(self, merged_count_check_schemas: Iterable[DiffaCheckSchema]): """Upsert all the merged count checks to the diffa database""" - if self.is_full_diff: - logger.info( - "Full diff mode is enabled. Not saving diffa checks to the database." - ) - return - diffa_checks = [ diffa_check.model_dump() for diffa_check in merged_count_check_schemas ] From 5aa691334ef878eb38aa1476036c0da2156f5368 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Mon, 10 Nov 2025 12:10:06 +0700 Subject: [PATCH 10/11] fixing the description --- src/diffa/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/diffa/cli.py b/src/diffa/cli.py index 9ff412c..688428e 100644 --- a/src/diffa/cli.py +++ b/src/diffa/cli.py @@ -66,7 +66,7 @@ def cli(): @click.option( "--full-diff", is_flag=True, - help="Full diff mode. Not saving the diffa checks to the database.", + help="Full diff mode. Re-run the diff from the beginning.", ) def data_diff( *, From 19e27f4b2004504ae96354f1182e0ee279c65f05 Mon Sep 17 00:00:00 2001 From: Quoc Nguyen Date: Mon, 10 Nov 2025 12:43:04 +0700 Subject: [PATCH 11/11] changing the DIFFA_BEGIN_DATE to 06/01/2020 --- src/diffa/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/diffa/config.py b/src/diffa/config.py index 22642fb..bd15db4 100644 --- a/src/diffa/config.py +++ b/src/diffa/config.py @@ -12,7 +12,7 @@ DIFFA_DB_SCHEMA = "diffa" DIFFA_DB_TABLE = "diffa_checks" DIFFA_CHECK_RUNS_TABLE = "diffa_check_runs" -DIFFA_BEGIN_DATE = date(2024, 1, 1) +DIFFA_BEGIN_DATE = date(2020, 6, 1) # Matching with Ascenda start date class ExitCode(Enum):