diff --git a/ydb/tests/functional/backup_collection/basic_user_scenarios.py b/ydb/tests/functional/backup_collection/basic_user_scenarios.py index b7f5370b1d2e..080dce984e50 100644 --- a/ydb/tests/functional/backup_collection/basic_user_scenarios.py +++ b/ydb/tests/functional/backup_collection/basic_user_scenarios.py @@ -176,9 +176,6 @@ class BackupType(str, Enum): FULL = "FULL" INCREMENTAL = "INCREMENTAL" - def __str__(self) -> str: - return self.value - class StorageType(str, Enum): """Enum for storage types.""" @@ -186,9 +183,6 @@ class StorageType(str, Enum): LOCAL = "local" S3 = "s3" - def __str__(self) -> str: - return self.value - # ================ DATA STRUCTURES ================ @dataclass @@ -222,6 +216,34 @@ def get_table(self, table_name: str) -> Optional[TableSnapshot]: return None +@dataclass +class BackupResult: + """Result of a backup operation.""" + success: bool + snapshot_name: Optional[str] = None + error_message: Optional[str] = None + + def __bool__(self) -> bool: + """Allow using result in boolean context: if backup_result: ...""" + return self.success + + +@dataclass +class RestoreResult: + """Result of a restore operation.""" + success: bool + expected_failure: bool = False + data_verified: bool = False + schema_verified: bool = False + acl_verified: bool = False + error_message: Optional[str] = None + diagnostics: Optional[Dict] = None + + def __bool__(self) -> bool: + """Allow using result in boolean context: if restore_result: ...""" + return self.success + + @dataclass class BackupStage: """Represents a stage in the backup lifecycle.""" @@ -454,15 +476,6 @@ def wait_for_table_rows(self, raise AssertionError(f"Timeout waiting for table '{table}' rows to match expected (timeout {timeout_s}s). Last error: {last_exc}") - def _drop_tables(self, tables: List[str]): - with self.session_scope() as session: - for t in tables: - full = f"/Root/{t}" if not t.startswith("/Root") else t - try: - session.execute_scheme(f"DROP TABLE `{full}`;") - except Exception: - pass - def _count_restore_operations(self): endpoint = f"grpc://localhost:{self.cluster.nodes[1].grpc_port}" database = self.root_dir @@ -706,14 +719,52 @@ def _add_more_tables(self, prefix: str, count: int = 1): created.append(f"/Root/{name}") return created - def _remove_tables(self, table_paths: List[str]): + def _try_remove_tables(self, table_paths: List[str]): with self.session_scope() as session: for tp in table_paths: full = tp if tp.startswith("/Root") else f"/Root/{tp}" try: session.execute_scheme(f"DROP TABLE `{full}`;") - except Exception: - pass + logger.debug(f"Successfully dropped table: {full}") + except Exception as e: + logger.error(f"Failed to drop table {full}: {e}") + + def try_drop_table_from_backup(self, collection_name: str, backup_type: str, table_name: str, snapshot_index: int = -1) -> bool: + try: + # Get all snapshots in the collection + children = self.get_collection_children(collection_name) + if not children: + return False + + # Filter snapshots by type + backup_suffix = f"_{backup_type.lower()}" + matching_snapshots = [ + child for child in children + if child.endswith(backup_suffix) + ] + + if not matching_snapshots: + return False + + # Sort to ensure consistent ordering + matching_snapshots.sort() + + # Get the requested snapshot + try: + target_snapshot = matching_snapshots[snapshot_index] + except IndexError: + return False + + # Construct full path to the table in backup + table_basename = os.path.basename(table_name) if "/" in table_name else table_name + full_path = self.root_dir + f"/.backups/collections/{collection_name}/{target_snapshot}/{table_basename}" + + with self.session_scope() as session: + session.execute_scheme(f"DROP TABLE `{full_path}`;") + return True + + except Exception: + return False def _capture_schema(self, table_path: str): desc = self.driver.scheme_client.describe_path(table_path) @@ -781,6 +832,84 @@ def _get_columns_from_scheme_entry(self, desc, path_hint: str = None): raise AssertionError("describe_path returned SchemeEntry in unexpected shape. Cannot locate columns.") + def has_changefeeds(self, table_name: str) -> Tuple[bool, int]: + r = yatest.common.execute( + [ + backup_bin(), + "--endpoint", + f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", + "--database", + self.root_dir, + "scheme", + "describe", + table_name + ], + check_exit_code=False, + ) + out = (r.std_out or b"").decode("utf-8", "ignore") + + # Count total backup changefeeds and their states + total_backup_cfs = out.count("_continuousBackupImpl") + enabled_count = out.count("Enabled") + disabled_count = out.count("Disabled") + + # Check if all changefeeds are accounted for (enabled + disabled) + has_backup_cfs = total_backup_cfs > 0 and total_backup_cfs == (disabled_count + enabled_count) + + return has_backup_cfs, enabled_count + + def drop_backup_collection(self, collection_name: str) -> None: + res = self._execute_yql(f"DROP BACKUP COLLECTION `{collection_name}`;") + assert res.exit_code == 0, f"Failed to drop backup collection '{collection_name}': {res.std_err}" + + def wait_for_changefeed_state(self, table_name: str, expected_enabled: int = 1, + timeout: float = 5.0, poll_interval: float = 0.3) -> Tuple[bool, int, int]: + start_time = time.time() + + while time.time() - start_time < timeout: + has_cfs, enabled_count = self.has_changefeeds(table_name) + + # Count total changefeeds by checking the output directly + r = yatest.common.execute( + [ + backup_bin(), + "--endpoint", + f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", + "--database", + self.root_dir, + "scheme", + "describe", + table_name + ], + check_exit_code=False, + ) + out = (r.std_out or b"").decode("utf-8", "ignore") + total_cfs = out.count("_continuousBackupImpl") + + if enabled_count == expected_enabled: + return True, total_cfs, enabled_count + + time.sleep(poll_interval) + + has_cfs, enabled_count = self.has_changefeeds(table_name) + r = yatest.common.execute( + [ + backup_bin(), + "--endpoint", + f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", + "--database", + self.root_dir, + "scheme", + "describe", + table_name + ], + check_exit_code=False, + ) + out = (r.std_out or b"").decode("utf-8", "ignore") + total_cfs = out.count("_continuousBackupImpl") + + return False, total_cfs, enabled_count + # ================ BUILDER AND HELPER CLASSES ================ class BackupBuilder: @@ -802,8 +931,8 @@ def incremental(self) -> 'BackupBuilder': self._backup_type = BackupType.INCREMENTAL return self - def execute(self) -> Tuple[bool, str]: - """Execute the backup and return success status and snapshot name.""" + def execute(self) -> BackupResult: + """Execute the backup and return result.""" time.sleep(1.1) if self._backup_type == BackupType.INCREMENTAL: @@ -815,13 +944,22 @@ def execute(self) -> Tuple[bool, str]: if res.exit_code != 0: out = (res.std_out or b"").decode('utf-8', 'ignore') err = (res.std_err or b"").decode('utf-8', 'ignore') - raise AssertionError(f"BACKUP failed: code={res.exit_code} STDOUT: {out} STDERR: {err}") + error_msg = f"BACKUP failed: code={res.exit_code} STDOUT: {out} STDERR: {err}" + return BackupResult( + success=False, + snapshot_name=None, + error_message=error_msg + ) self.test.wait_for_collection_has_snapshot(self.collection, timeout_s=self._timeout) kids = sorted(self.test.get_collection_children(self.collection)) snap_name = kids[-1] if kids else None - return True, snap_name + return BackupResult( + success=True, + snapshot_name=snap_name, + error_message=None + ) class RestoreBuilder: @@ -866,11 +1004,11 @@ def timeout(self, seconds: int) -> 'RestoreBuilder': self._timeout = seconds return self - def execute(self) -> Dict: + def execute(self) -> RestoreResult: """Execute restore and return results.""" # Remove tables if specified if self._remove_tables: - self.test._remove_tables(self._remove_tables) + self.test._try_remove_tables(self._remove_tables) # Track restore operations BEFORE restore start_total, start_success, _ = self.test._count_restore_operations() @@ -879,10 +1017,24 @@ def execute(self) -> Dict: res = self.test._execute_yql(f"RESTORE `{self._collection}`;") if self._should_fail: - assert res.exit_code != 0, "Expected RESTORE to fail but it succeeded" - return {'expected_failure': True} + if res.exit_code != 0: + return RestoreResult( + success=True, + expected_failure=True, + error_message="Restore failed as expected" + ) + else: + return RestoreResult( + success=False, + expected_failure=True, + error_message="Expected RESTORE to fail but it succeeded" + ) - assert res.exit_code == 0, f"RESTORE failed: {res.std_err}" + if res.exit_code != 0: + return RestoreResult( + success=False, + error_message=f"RESTORE failed: {res.std_err}" + ) if self._use_polling: # Poll for completion @@ -895,14 +1047,21 @@ def execute(self) -> Dict: ) if not ok: - raise AssertionError(f"Timeout waiting restore. Diagnostics: {info}") + return RestoreResult( + success=False, + error_message="Timeout waiting restore", + diagnostics=info + ) - # Verify if expected snapshot provided - result = {'success': True} + # Create result with success + result = RestoreResult(success=True) + # Verify if expected snapshot provided if self._expected_snapshot: - verified = self._verify_restored_data() - result.update(verified) + verification = self._verify_restored_data() + result.data_verified = verification.get('data_verified', False) + result.schema_verified = verification.get('schema_verified', False) + result.acl_verified = verification.get('acl_verified', False) return result @@ -1009,18 +1168,22 @@ def create_collection(self, incremental_enabled: bool = False) -> 'BackupTestOrc self.test.wait_for_collection(self.collection, timeout_s=30) return self - def stage(self, backup_type: BackupType = BackupType.FULL, - description: str = "") -> BackupStage: + def stage(self, backup_type: BackupType = BackupType.FULL, description: str = "") -> BackupStage: """Execute a complete backup stage with snapshot capture.""" if isinstance(backup_type, str): backup_type = BackupType(backup_type) snapshot = self.snapshot_capture.capture_tables(self.tables) - success, snap_name = BackupBuilder(self.test, self.collection).full().execute() \ - if backup_type == BackupType.FULL else \ - BackupBuilder(self.test, self.collection).incremental().execute() + if backup_type == BackupType.FULL: + result = BackupBuilder(self.test, self.collection).full().execute() + else: + result = BackupBuilder(self.test, self.collection).incremental().execute() + + if not result.success: + raise AssertionError(f"Backup failed: {result.error_message}") + snap_name = result.snapshot_name self.created_snapshots.append(snap_name) snapshot.name = snap_name @@ -1182,18 +1345,19 @@ def test_full_cycle_local_backup_restore(self): # Test 1: Restore should fail when tables exist logger.info("\nTEST 1: Verifying restore fails when tables already exist...") result = backup.restore_to_stage(2, auto_remove_tables=False).should_fail().execute() - assert result['expected_failure'], "Expected RESTORE to fail when tables already exist" - logger.info("✓ TEST 1 PASSED: Restore correctly failed with existing tables") + assert result.expected_failure, "Expected RESTORE to fail when tables already exist" + assert result.success, "Restore should report success when failing as expected" # Test 2: Restore Stage 1 (Initial state) # Remove all tables first - self._remove_tables([full_orders, full_products, "/Root/extra_table_1"]) + self._try_remove_tables([full_orders, full_products, "/Root/extra_table_1"]) # Restore to stage 1 result = backup.restore_to_stage(1, auto_remove_tables=False).execute() - assert result['data_verified'], "Stage 1 data verification failed" - assert result['schema_verified'], "Stage 1 schema verification failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 1 data verification failed" + assert result.schema_verified, "Stage 1 schema verification failed" # Additional verification: extra_table_1 should NOT exist in stage 1 try: @@ -1206,15 +1370,16 @@ def test_full_cycle_local_backup_restore(self): restored_rows = self._capture_snapshot(t_orders) assert len(restored_rows) == 4, f"Expected 4 rows (header + 3 data), got {len(restored_rows)}" - # ---------------- Test 3: Restore Stage 2 (Modified state) ---------------- + # Test 3: Restore Stage 2 (Modified state) # Remove all tables again - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) # Restore to stage 2 result = backup.restore_to_stage(2, auto_remove_tables=False).execute() - assert result['data_verified'], "Stage 2 data verification failed" - assert result['schema_verified'], "Stage 2 schema verification failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 2 data verification failed" + assert result.schema_verified, "Stage 2 schema verification failed" # Verify orders table has 4 rows (original 3 + 1 added) restored_rows = self._capture_snapshot(t_orders) @@ -1264,7 +1429,7 @@ def test_full_cycle_local_backup_restore_with_incrementals(self): data_helper.modify(add_rows=[(20, 2000, "b1")], remove_ids=[1]) extras += self._add_more_tables("extra2", 1) if extras: - self._remove_tables([extras[0]]) + self._try_remove_tables([extras[0]]) backup.stage(BackupType.INCREMENTAL, "First incremental after removing extra1") @@ -1273,7 +1438,7 @@ def test_full_cycle_local_backup_restore_with_incrementals(self): data_helper.modify(add_rows=[(30, 3000, "c1")], remove_ids=[10]) extras += self._add_more_tables("extra3", 1) if len(extras) >= 2: - self._remove_tables([extras[1]]) + self._try_remove_tables([extras[1]]) backup.stage(BackupType.INCREMENTAL, "Second incremental with extra3") @@ -1281,7 +1446,7 @@ def test_full_cycle_local_backup_restore_with_incrementals(self): # Modifications: More changes + extra4 table - extra3 table extras += self._add_more_tables("extra4", 1) if len(extras) >= 3: - self._remove_tables([extras[2]]) + self._try_remove_tables([extras[2]]) data_helper.modify(add_rows=[(40, 4000, "d1")], remove_ids=[20]) backup.stage(BackupType.FULL, "Second full backup as new baseline") @@ -1294,7 +1459,7 @@ def test_full_cycle_local_backup_restore_with_incrementals(self): # Modifications: Final data state + extra5 table - extra4 table extras += self._add_more_tables("extra5", 1) if len(extras) >= 4: - self._remove_tables([extras[3]]) + self._try_remove_tables([extras[3]]) data_helper.modify(add_rows=[(50, 5000, "e1")], remove_ids=[30]) backup.stage(BackupType.INCREMENTAL, "Final incremental with latest data") @@ -1306,43 +1471,54 @@ def test_full_cycle_local_backup_restore_with_incrementals(self): # Test 1: Should fail when tables exist result = backup.restore_to_stage(6, auto_remove_tables=False).should_fail().execute() - assert result['expected_failure'], "Expected RESTORE to fail when tables already exist" + assert result.expected_failure, "Expected RESTORE to fail when tables already exist" + assert result.success, "Restore should report success when failing as expected" # Remove all tables for subsequent restore tests - self._remove_tables([full_orders, full_products] + extras) + self._try_remove_tables([full_orders, full_products] + extras) # Test 2: Restore to stage 1 (full backup 1) result = backup.restore_to_stage(1, auto_remove_tables=False).execute() - assert result['data_verified'] and result['schema_verified'], "Stage 1 restore failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 1 data verification failed" + assert result.schema_verified, "Stage 1 schema verification failed" # Test 3: Restore to stage 2 (full1 + inc1) - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(2, auto_remove_tables=False).execute() - assert result['success'] and result['data_verified'], "Stage 2 restore failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 2 data verification failed" + assert result.schema_verified, "Stage 2 schema verification failed" # Test 4: Restore to stage 3 (full1 + inc1 + inc2) - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(3, auto_remove_tables=False).execute() - assert result['success'] and result['data_verified'], "Stage 3 restore failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 3 data verification failed" + assert result.schema_verified, "Stage 3 schema verification failed" # Test 5: Restore to stage 4 (full backup 2) - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(4, auto_remove_tables=False).execute() - assert result['data_verified'] and result['schema_verified'], "Stage 4 restore failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 4 data verification failed" + assert result.schema_verified, "Stage 4 schema verification failed" # SPECIAL TEST: Incremental-only restore (should fail) self._test_incremental_only_restore_failure(backup, export_dir) # Test 6: Restore to final stage (full2 + inc3 + inc4) - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(6, auto_remove_tables=False).execute() - assert result['success'] and result['data_verified'], "Final stage restore failed" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 6 data verification failed" + assert result.schema_verified, "Stage 6 schema verification failed" # ADVANCED TEST: Cross-full restore # Verify we can restore to stage 5 (full2 + inc3) - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(5, auto_remove_tables=False).execute() - assert result['success'], "Stage 5 restore failed" + assert result.success, f"Restore failed: {result.error_message}" # Cleanup if os.path.exists(export_dir): @@ -1569,7 +1745,7 @@ def modify_stage2(self): extra_tables.append("/Root/extra_table_2") # Remove extra_table_1 - self._remove_tables(["/Root/extra_table_1"]) + self._try_remove_tables(["/Root/extra_table_1"]) extra_tables.remove("/Root/extra_table_1") # Apply schema changes @@ -1594,16 +1770,17 @@ def modify_stage2(self): # Test 1: Should fail when tables exist result = backup.restore_to_stage(2, auto_remove_tables=False).should_fail().execute() - assert result['expected_failure'], "Expected RESTORE to fail when tables already exist" + assert result.expected_failure, "Expected RESTORE to fail when tables already exist" + assert result.success, "Restore should report success when failing as expected" # Remove all tables for restore tests - self._remove_tables([full_orders, full_products] + extra_tables) + self._try_remove_tables([full_orders, full_products] + extra_tables) # Test 2: Restore to stage 1 (initial state with extra_table_1) - logger.info("=== RESTORING STAGE 1 ===") result = backup.restore_to_stage(1, auto_remove_tables=False).execute() - assert result['data_verified'], "Data verification failed for stage 1" - assert result['schema_verified'], "Schema verification failed for stage 1" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 1 data verification failed" + assert result.schema_verified, "Stage 1 schema verification failed" # Verify that schema is original (without new_col, with number column) restored_schema = self._capture_schema(full_orders) @@ -1619,13 +1796,13 @@ def modify_stage2(self): logger.info(f"Stage 1 ACL verification: {grants_output}") # Remove all tables again for stage 2 restore - self._remove_tables([full_orders, full_products, "/Root/extra_table_1"]) + self._try_remove_tables([full_orders, full_products, "/Root/extra_table_1"]) # Test 3: Restore to stage 2 (with schema changes and extra_table_2, without extra_table_1) - logger.info("=== RESTORING STAGE 2 ===") result = backup.restore_to_stage(2, auto_remove_tables=False).execute() - assert result['data_verified'], "Data verification failed for stage 2" - assert result['schema_verified'], "Schema verification failed for stage 2" + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 2 data verification failed" + assert result.schema_verified, "Stage 2 schema verification failed" # Verify schema changes are present restored_schema2 = self._capture_schema(full_orders) @@ -1722,11 +1899,10 @@ def _record(stage: BackupStage) -> str: target_stage_number = 3 # stage numbering: 1=full, 2=inc1, 3=inc2, 4=inc3 target_snap_name = snap_inc2 - self._remove_tables([t_orders, t_products]) + self._try_remove_tables([t_orders, t_products]) - restore_builder = orchestrator.restore_to_stage(target_stage_number, auto_remove_tables=False) - res = restore_builder.execute() - assert res.get("success", False) is True, f"Restore reported failure: {res}" + result = orchestrator.restore_to_stage(target_stage_number, auto_remove_tables=False).execute() + assert result.success, f"Restore failed: {result.error_message}" # Verify restored rows match the recorded snapshot for inc2 expected_orders = snapshot_rows[target_snap_name]["orders"] @@ -1795,7 +1971,7 @@ def test_full_cycle_local_backup_restore_with_complex_schema_changes(self): # remove first extra if created if extras: - self._remove_tables([extras[0]]) + self._try_remove_tables([extras[0]]) # alter schema: add column (and defensively try to drop) with self.session_scope() as session: @@ -1820,23 +1996,27 @@ def test_full_cycle_local_backup_restore_with_complex_schema_changes(self): # RESTORE TESTS # Test A: attempt restore when targets exist -> should fail (use last stage) - res_fail = backup.restore_to_stage(len(backup.stages), new_collection_name=None, auto_remove_tables=False).should_fail().execute() - assert res_fail.get('expected_failure', False), "Expected restore to fail when tables already exist" + result = backup.restore_to_stage(len(backup.stages), new_collection_name=None, auto_remove_tables=False).should_fail().execute() + # assert res_fail.get('expected_failure', False), "Expected restore to fail when tables already exist" + assert result.expected_failure, "Expected RESTORE to fail when tables already exist" + assert result.success, "Restore should report success when failing as expected" - self._remove_tables([full_orders, full_products] + extras) + self._try_remove_tables([full_orders, full_products] + extras) # Test B: restore to stage1 and verify exact match of data/schema/acl - rb1 = backup.restore_to_stage(1, auto_remove_tables=False) - rb1_result = rb1.execute() - assert rb1_result.get("success", False) is True, f"Restore to stage1 failed: {rb1_result}" + result = backup.restore_to_stage(1, auto_remove_tables=False).execute() + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 1 data verification failed" + assert result.schema_verified, "Stage 1 schema verification failed" # Clean tables - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) # Test C: restore to stage2 and verify (note: orders data may be on orders_copy) - rb2 = backup.restore_to_stage(2, auto_remove_tables=False) - rb2_result = rb2.execute() - assert rb2_result.get("success", False) is True, f"Restore to stage2 failed: {rb2_result}" + result = backup.restore_to_stage(2, auto_remove_tables=False).execute() + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 2 data verification failed" + assert result.schema_verified, "Stage 2 schema verification failed" expected_orders_snapshot = stage2.snapshot.get_table(full_orders) or stage2.snapshot.get_table(full_orders_copy) if expected_orders_snapshot: @@ -1896,7 +2076,7 @@ def test_full_cycle_local_backup_restore_with_incrementals_complex_schema_change data_helper.modify(add_rows=[(20, 2000, "b1")], remove_ids=[1]) extras += self._add_more_tables("extra2", 1) if extras: - self._remove_tables([extras[0]]) + self._try_remove_tables([extras[0]]) self._apply_acl_changes(full_orders, "root@builtin", "SELECT") self._copy_table(t_orders, "orders_v1") @@ -1907,7 +2087,7 @@ def test_full_cycle_local_backup_restore_with_incrementals_complex_schema_change data_helper.modify(add_rows=[(30, 3000, "c1")], remove_ids=[10]) extras += self._add_more_tables("extra3", 1) if len(extras) >= 2: - self._remove_tables([extras[1]]) + self._try_remove_tables([extras[1]]) self._copy_table(t_orders, "orders_v2") # STAGE 3: Second incremental @@ -1916,7 +2096,7 @@ def test_full_cycle_local_backup_restore_with_incrementals_complex_schema_change # Modifications for stage 4 extras += self._add_more_tables("extra4", 1) if len(extras) >= 3: - self._remove_tables([extras[2]]) + self._try_remove_tables([extras[2]]) data_helper.modify(add_rows=[(40, 4000, "d1")], remove_ids=[20]) # STAGE 4: Second FULL backup @@ -1928,7 +2108,7 @@ def test_full_cycle_local_backup_restore_with_incrementals_complex_schema_change # Final modifications extras += self._add_more_tables("extra5", 1) if len(extras) >= 4: - self._remove_tables([extras[3]]) + self._try_remove_tables([extras[3]]) data_helper.modify(add_rows=[(50, 5000, "e1")], remove_ids=[30]) self._apply_acl_changes(full_orders, "root1@builtin", "SELECT") @@ -1942,35 +2122,230 @@ def test_full_cycle_local_backup_restore_with_incrementals_complex_schema_change # Test 1: Should fail when tables exist (не удаляем таблицы!) result = backup.restore_to_stage(6, auto_remove_tables=False).should_fail().execute() - assert result['expected_failure'], "Expected RESTORE to fail when tables already exist" + assert result.expected_failure, "Expected RESTORE to fail when tables already exist" + assert result.success, "Restore should report success when failing as expected" # Remove all tables for subsequent restore tests - self._remove_tables([full_orders, full_products] + extras[4:]) + self._try_remove_tables([full_orders, full_products] + extras[4:]) # Test 2: Restore to stage 1 result = backup.restore_to_stage(1, auto_remove_tables=False).execute() - assert result['data_verified'] and result['schema_verified'] + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 1 data verification failed" + assert result.schema_verified, "Stage 1 schema verification failed" # Test 3: Restore to stage 2 - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(2, auto_remove_tables=False).execute() - assert result['success'] + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 2 data verification failed" + assert result.schema_verified, "Stage 2 schema verification failed" # Test 4: Restore to stage 3 - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(3, auto_remove_tables=False).execute() - assert result['success'] + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 3 data verification failed" + assert result.schema_verified, "Stage 3 schema verification failed" # Test 5: Restore to stage 4 - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(4, auto_remove_tables=False).execute() - assert result['data_verified'] and result['schema_verified'] + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 4 data verification failed" + assert result.schema_verified, "Stage 4 schema verification failed" # Test 6: Restore to final stage - self._remove_tables([full_orders, full_products]) + self._try_remove_tables([full_orders, full_products]) result = backup.restore_to_stage(6, auto_remove_tables=False).execute() - assert result['success'] + assert result.success, f"Restore failed: {result.error_message}" + assert result.data_verified, "Stage 6 data verification failed" + assert result.schema_verified, "Stage 6 schema verification failed" # Cleanup if os.path.exists(export_dir): shutil.rmtree(export_dir) + + +class TestBackupCollectionServiceObjectsCleanup(BaseTestBackupInFiles): + def test_service_schema_objects_cleanup_on_delete(self): + # Setup + t_orders = "orders" + t_products = "products" + full_orders = f"/Root/{t_orders}" + full_products = f"/Root/{t_products}" + + # Create initial tables + with self.session_scope() as session: + create_table_with_data(session, t_orders) + create_table_with_data(session, t_products) + + # Before creating backup collection - should have no changefeeds + has_cfs_orders, enabled_orders = self.has_changefeeds(t_orders) + has_cfs_products, enabled_products = self.has_changefeeds(t_products) + + assert not has_cfs_orders, "Table orders should not have changefeeds before backup collection creation" + assert not has_cfs_products, "Table products should not have changefeeds before backup collection creation" + assert enabled_orders == 0, f"No enabled changefeeds expected on orders, got {enabled_orders}" + assert enabled_products == 0, f"No enabled changefeeds expected on products, got {enabled_products}" + + collection_name = f"test_cleanup_{uuid.uuid4().hex[:8]}" + + # Use orchestrator for backup lifecycle + with backup_lifecycle(self, collection_name, [full_orders, full_products]) as backup: + + # Create backup collection with incremental enabled + backup.create_collection(incremental_enabled=True) + + # Check changefeeds after collection creation (should still be 0 - created only on first backup) + has_cfs_orders, enabled_orders = self.has_changefeeds(t_orders) + has_cfs_products, enabled_products = self.has_changefeeds(t_products) + + assert not has_cfs_orders, "Changefeeds should not exist yet on orders (created on first backup)" + assert not has_cfs_products, "Changefeeds should not exist yet on products (created on first backup)" + + # Create a full backup - this should create the first changefeed + backup.stage(BackupType.FULL, "Initial full backup") + + # Check changefeeds after full backup - should have 1 enabled changefeed + has_cfs_orders, enabled_orders = self.has_changefeeds(t_orders) + has_cfs_products, enabled_products = self.has_changefeeds(t_products) + + assert has_cfs_orders, "Changefeeds should exist on orders after full backup" + assert has_cfs_products, "Changefeeds should exist on products after full backup" + assert enabled_orders == 1, f"Expected 1 enabled changefeed on orders after full backup, got {enabled_orders}" + assert enabled_products == 1, f"Expected 1 enabled changefeed on products after full backup, got {enabled_products}" + + # Modify data for incremental + DataHelper(self, t_orders).modify(add_rows=[(100, 1000, "for_incremental")], remove_ids=[1]) + + # Create incremental backup - old changefeed becomes Disabled, new one is Enabled + backup.stage(BackupType.INCREMENTAL, "First incremental backup") + + # Check changefeeds after incremental backup + has_cfs_orders, enabled_orders = self.has_changefeeds(t_orders) + has_cfs_products, enabled_products = self.has_changefeeds(t_products) + + assert has_cfs_orders, "Changefeeds should still exist on orders after incremental backup" + assert has_cfs_products, "Changefeeds should still exist on products after incremental backup" + # After incremental, we still have 1 enabled (previous disabled, new enabled) + assert enabled_orders == 1, f"Expected 1 enabled changefeed on orders after incremental, got {enabled_orders}" + assert enabled_products == 1, f"Expected 1 enabled changefeed on products after incremental, got {enabled_products}" + + # Drop the backup collection + self.drop_backup_collection(collection_name) + + # Verify collection no longer exists + assert not self.collection_exists(collection_name), f"Collection {collection_name} should not exist after DROP" + + # Changefeeds should be cleaned up + has_cfs_orders, enabled_orders = self.has_changefeeds(t_orders) + has_cfs_products, enabled_products = self.has_changefeeds(t_products) + + assert not has_cfs_orders, ( + "CRITICAL: Changefeeds were NOT cleaned up on orders table after dropping backup collection! " + "This may lead to resource leaks." + ) + assert not has_cfs_products, ( + "CRITICAL: Changefeeds were NOT cleaned up on products table after dropping backup collection! " + "This may lead to resource leaks." + ) + assert enabled_orders == 0, f"No enabled changefeeds expected on orders after cleanup, got {enabled_orders}" + assert enabled_products == 0, f"No enabled changefeeds expected on products after cleanup, got {enabled_products}" + + +class TestBackupCollectionServiceObjectsRotation(BaseTestBackupInFiles): + def test_service_schema_objects_cleanup_on_rotate(self): + # Setup + t_orders = "orders" + t_products = "products" + full_orders = f"/Root/{t_orders}" + full_products = f"/Root/{t_products}" + + # Create initial tables + with self.session_scope() as session: + create_table_with_data(session, t_orders) + create_table_with_data(session, t_products) + + collection_name = f"collection_{uuid.uuid4().hex[:8]}" + data_helper = DataHelper(self, t_orders) + + with backup_lifecycle(self, collection_name, [full_orders, full_products]) as backup: + + # Create backup collection with incremental enabled + backup.create_collection(incremental_enabled=True) + + # Initial state - no changefeeds + has_cfs, enabled = self.has_changefeeds(t_orders) + assert not has_cfs, "Should have no changefeeds initially" + + # Create first full backup + backup.stage(BackupType.FULL, "First full backup") + success, total, enabled = self.wait_for_changefeed_state(t_orders, expected_enabled=1) + assert success, f"Expected 1 enabled changefeed after full backup, got {enabled}" + + # Create first incremental + data_helper.modify(add_rows=[(100, 1000, "inc1")]) + backup.stage(BackupType.INCREMENTAL, "First incremental") + + # Wait for old changefeed to be cleaned up (should happen after ~0.9s) + success, total, enabled = self.wait_for_changefeed_state(t_orders, expected_enabled=1, timeout=3.0) + assert success, f"Expected 1 enabled changefeed after incremental, got {enabled}" + + success = self.try_drop_table_from_backup( + collection_name=collection_name, + backup_type="full", + table_name="orders", + snapshot_index=-1 # Latest full backup + ) + assert success, "Expected ability to delete backup" + + # Create second incremental + data_helper.modify(add_rows=[(101, 1001, "inc2")]) + backup.stage(BackupType.INCREMENTAL, "Second incremental") + + success, total, enabled = self.wait_for_changefeed_state(t_orders, expected_enabled=1, timeout=3.0) + assert success, f"Expected 1 enabled changefeed after second incremental, got {enabled}" + + # Create third incremental + data_helper.modify(add_rows=[(102, 1002, "inc3")]) + backup.stage(BackupType.INCREMENTAL, "Third incremental") + + success, total_after_inc3, enabled = self.wait_for_changefeed_state(t_orders, expected_enabled=1, timeout=3.0) + assert success, f"Expected 1 enabled changefeed after third incremental, got {enabled}" + + success = self.try_drop_table_from_backup( + collection_name=collection_name, + backup_type="incremental", + table_name="orders", + snapshot_index=-1 # Latest incr backup + ) + assert success, "Expected ability to delete backup" + + # Create second full backup - this might trigger the double-enabled bug + data_helper.modify(add_rows=[(103, 1003, "full2")]) + backup.stage(BackupType.FULL, "Second full backup") + + # Check immediately for the bug + has_cfs, enabled_immediate = self.has_changefeeds(t_orders) + if enabled_immediate > 1: + logger.error(f"{enabled_immediate} enabled changefeeds right after full backup!") + + # Wait for proper state + success, total_after_full2, enabled = self.wait_for_changefeed_state(t_orders, expected_enabled=1, timeout=5.0) + + if not success or enabled > 1: + logger.error(f"Changefeed rotation issue: {enabled} enabled changefeeds (expected 1)") + else: + logger.info(f"After second full backup: {total_after_full2} total changefeeds, {enabled} enabled") + + self.drop_backup_collection(collection_name) + + # Verify all changefeeds are removed + success, total, enabled = self.wait_for_changefeed_state(t_orders, expected_enabled=0, timeout=5.0) + has_cfs, _ = self.has_changefeeds(t_orders) + assert not has_cfs, "All changefeeds should be cleaned up after dropping collection" + + # Check products table too + has_cfs_products, _ = self.has_changefeeds(t_products) + assert not has_cfs_products, "Products changefeeds should also be cleaned up"