55from .sqeleton .abcs import DbKey , DbTime , DbPath
66from .diff_tables import Algorithm
77from .hashdiff_tables import HashDiffer , DEFAULT_BISECTION_THRESHOLD , DEFAULT_BISECTION_FACTOR
8- from .joindiff_tables import JoinDiffer
8+ from .joindiff_tables import JoinDiffer , TABLE_WRITE_LIMIT
99from .table_segment import TableSegment
10+ from .utils import eval_name_template
1011
1112__version__ = "0.3.0rc4"
1213
14+
1315def connect_to_table (
1416 db_info : Union [str , dict ],
1517 table_name : Union [DbPath , str ],
@@ -55,17 +57,27 @@ def diff_tables(
5557 # Start/end update_column values, used to restrict the segment
5658 min_update : DbTime = None ,
5759 max_update : DbTime = None ,
58- # Algorithm
59- algorithm : Algorithm = Algorithm .HASHDIFF ,
60- # Into how many segments to bisect per iteration (hashdiff only)
61- bisection_factor : int = DEFAULT_BISECTION_FACTOR ,
62- # When should we stop bisecting and compare locally (in row count; hashdiff only)
63- bisection_threshold : int = DEFAULT_BISECTION_THRESHOLD ,
6460 # Enable/disable threaded diffing. Needed to take advantage of database threads.
6561 threaded : bool = True ,
6662 # Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
6763 # There may be many pools, so number of actual threads can be a lot higher.
6864 max_threadpool_size : Optional [int ] = 1 ,
65+ # Algorithm
66+ algorithm : Algorithm = Algorithm .AUTO ,
67+ # Into how many segments to bisect per iteration (hashdiff only)
68+ bisection_factor : int = DEFAULT_BISECTION_FACTOR ,
69+ # When should we stop bisecting and compare locally (in row count; hashdiff only)
70+ bisection_threshold : int = DEFAULT_BISECTION_THRESHOLD ,
71+ # Enable/disable validating that the key columns are unique. (joindiff only)
72+ validate_unique_key : bool = True ,
73+ # Enable/disable sampling of exclusive rows. Creates a temporary table. (joindiff only)
74+ sample_exclusive_rows : bool = False ,
75+ # Path of new table to write diff results to. Disabled if not provided. (joindiff only)
76+ materialize_to_table : Union [str , DbPath ] = None ,
77+ # Materialize every row, not just those that are different. (joindiff only)
78+ materialize_all_rows : bool = False ,
79+ # Maximum number of rows to write when materializing, per thread. (joindiff only)
80+ table_write_limit : int = TABLE_WRITE_LIMIT ,
6981) -> Iterator :
7082 """Finds the diff between table1 and table2.
7183
@@ -78,14 +90,21 @@ def diff_tables(
7890 max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
7991 min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
8092 max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
81- algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`)
82- bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
83- bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
84- and compare locally. (Used when algorithm is `HASHDIFF`).
8593 threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
8694 max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
8795 Only relevant when `threaded` is ``True``.
8896 There may be many pools, so number of actual threads can be a lot higher.
97+ algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`. Default=`AUTO`)
98+ bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
99+ bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
100+ and compare locally. (Used when algorithm is `HASHDIFF`).
101+ validate_unique_key (bool): Enable/disable validating that the key columns are unique. (used for `JOINDIFF`. default: True)
102+ Single query, and can't be threaded, so it's very slow on non-cloud dbs.
103+ Future versions will detect UNIQUE constraints in the schema.
104+ sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. Creates a temporary table. (used for `JOINDIFF`. default: False)
105+ materialize_to_table (Union[str, DbPath], optional): Path of new table to write diff results to. Disabled if not provided. Used for `JOINDIFF`.
106+ materialize_all_rows (bool): Materialize every row, not just those that are different. (used for `JOINDIFF`. default: False)
107+ table_write_limit (int): Maximum number of rows to write when materializing, per thread.
89108
90109 Note:
91110 The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
@@ -125,6 +144,9 @@ def diff_tables(
125144 segments = [t .new (** override_attrs ) for t in tables ] if override_attrs else tables
126145
127146 algorithm = Algorithm (algorithm )
147+ if algorithm == Algorithm .AUTO :
148+ algorithm = Algorithm .JOINDIFF if table1 .database is table2 .database else Algorithm .HASHDIFF
149+
128150 if algorithm == Algorithm .HASHDIFF :
129151 differ = HashDiffer (
130152 bisection_factor = bisection_factor ,
@@ -133,9 +155,16 @@ def diff_tables(
133155 max_threadpool_size = max_threadpool_size ,
134156 )
135157 elif algorithm == Algorithm .JOINDIFF :
158+ if isinstance (materialize_to_table , str ):
159+ materialize_to_table = table1 .database .parse_table_name (eval_name_template (materialize_to_table ))
136160 differ = JoinDiffer (
137161 threaded = threaded ,
138162 max_threadpool_size = max_threadpool_size ,
163+ validate_unique_key = validate_unique_key ,
164+ sample_exclusive_rows = sample_exclusive_rows ,
165+ materialize_to_table = materialize_to_table ,
166+ materialize_all_rows = materialize_all_rows ,
167+ table_write_limit = table_write_limit ,
139168 )
140169 else :
141170 raise ValueError (f"Unknown algorithm: { algorithm } " )
0 commit comments