diff --git a/README.md b/README.md index 14d8868..866460e 100644 --- a/README.md +++ b/README.md @@ -148,8 +148,120 @@ As such the use of the Django's ORM is perfectally suited to this type of data. ``` +### Data Retention and Compression + +TimescaleDB provides powerful features for managing data lifecycle and storage efficiency through retention policies and compression. Django-TimescaleDB provides programmatic APIs for these features. + +> **Note:** The current implementation uses a programmatic approach where you call methods on the `timescale` manager. These methods need to be called explicitly in your code (e.g., in a migration, management command, view, or Django shell). A future enhancement may provide a declarative approach where policies can be defined directly in model classes. + +#### Data Retention Policy [More Info](https://docs.timescale.com/use-timescale/latest/data-retention/) + +Data retention policies automatically remove old data based on time, helping manage database size. + +```python + from metrics.models import Metric + from datetime import timedelta + + # This code can be run in a Django shell, migration, management command, or view + + # Add a retention policy to drop data older than 90 days + job_id = Metric.timescale.add_retention_policy( + drop_after='90 days', + schedule_interval='1 day', + if_not_exists=True + ) + + # Or using timedelta objects + job_id = Metric.timescale.add_retention_policy( + drop_after=timedelta(days=90), + schedule_interval=timedelta(days=1) + ) + + # Remove a retention policy when no longer needed + Metric.timescale.remove_retention_policy(if_exists=True) +``` + +#### Compression [More Info](https://docs.timescale.com/use-timescale/latest/compression/) + +Compression reduces storage requirements by compressing older chunks of data. + +```python + from metrics.models import Metric + from datetime import timedelta + + # This code can be run in a Django shell, migration, management command, or view + + # Enable compression on the hypertable + Metric.timescale.enable_compression( + compress_orderby=['time'], + compress_segmentby=['device'] + ) + + # Add a compression policy to compress data older than 30 days + job_id = Metric.timescale.add_compression_policy( + compress_after='30 days', + schedule_interval='1 day', + if_not_exists=True + ) + + # Or using timedelta objects + job_id = Metric.timescale.add_compression_policy( + compress_after=timedelta(days=30), + schedule_interval=timedelta(days=1) + ) + + # Get compression statistics + stats = Metric.timescale.get_compression_stats() + + # Remove a compression policy when no longer needed + Metric.timescale.remove_compression_policy(if_exists=True) +``` + +#### Example Usage in a Django Migration + +```python +from django.db import migrations + +def setup_timescale_policies(apps, schema_editor): + # Get the model from the apps registry + Metric = apps.get_model('metrics', 'Metric') + + # Enable compression + Metric.timescale.enable_compression( + compress_orderby=['time'], + compress_segmentby=['device'] + ) + + # Add compression policy + Metric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Add retention policy + Metric.timescale.add_retention_policy( + drop_after='90 days', + if_not_exists=True + ) + +def reverse_timescale_policies(apps, schema_editor): + Metric = apps.get_model('metrics', 'Metric') + Metric.timescale.remove_compression_policy(if_exists=True) + Metric.timescale.remove_retention_policy(if_exists=True) + +class Migration(migrations.Migration): + dependencies = [ + ('metrics', '0001_initial'), + ] + + operations = [ + migrations.RunPython(setup_timescale_policies, reverse_timescale_policies), + ] +``` + ## Contributors - [Rasmus Schlünsen](https://github.com/schlunsen) - [Ben Cleary](https://github.com/bencleary) - [Jonathan Sundqvist](https://github.com/jonathan-s) - [Harsh Bhikadia](https://github.com/daadu) +- [Reese Jenner](https://github.com/voarsh2) \ No newline at end of file diff --git a/example/iot_example_app/settings.py b/example/iot_example_app/settings.py index f69a395..e47545e 100644 --- a/example/iot_example_app/settings.py +++ b/example/iot_example_app/settings.py @@ -13,9 +13,14 @@ from pathlib import Path from dotenv import load_dotenv import os +import sys +import sys load_dotenv() +# Add parent directory to Python path so we can import timescale during development +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + # Build paths inside the project like this: BASE_DIR / 'subdir'. BASE_DIR = Path(__file__).resolve().parent.parent diff --git a/timescale/apps.py b/timescale/apps.py new file mode 100644 index 0000000..fe84915 --- /dev/null +++ b/timescale/apps.py @@ -0,0 +1,33 @@ +""" +Django app configuration for TimescaleDB. + +This module handles the initialization of the TimescaleDB Django app, +including patching Django's migration system to support automatic +detection of TimescaleDB policy changes. +""" +from django.apps import AppConfig + + +class TimescaleConfig(AppConfig): + """ + Configuration for the TimescaleDB Django app. + + This app config patches Django's migration autodetector to automatically + detect changes in TimescaleDB policies defined in model classes. + """ + default_auto_field = 'django.db.models.BigAutoField' + name = 'timescale' + verbose_name = 'TimescaleDB' + + def ready(self): + """ + Called when the app is ready. + + This method patches Django's migration system to include + TimescaleDB policy detection. + """ + # Import here to avoid circular imports + from .db.migrations.autodetector import patch_migration_autodetector + + # Patch Django's migration autodetector + patch_migration_autodetector() diff --git a/timescale/db/migrations/README.md b/timescale/db/migrations/README.md new file mode 100644 index 0000000..600cd00 --- /dev/null +++ b/timescale/db/migrations/README.md @@ -0,0 +1,251 @@ +# TimescaleDB Migration Operations + +This module provides Django migration operations for managing TimescaleDB retention and compression policies declaratively in Django migrations. + +## Available Operations + +### AddRetentionPolicy + +Adds a retention policy to automatically remove old data from a TimescaleDB hypertable. + +```python +from timescale.db.migrations import AddRetentionPolicy + +AddRetentionPolicy( + model_name='MyTimeSeriesModel', + drop_after='90 days', + schedule_interval='1 day', # Optional + if_not_exists=True, # Optional, default True +) +``` + +### RemoveRetentionPolicy + +Removes a retention policy from a TimescaleDB hypertable. + +```python +from timescale.db.migrations import RemoveRetentionPolicy + +RemoveRetentionPolicy( + model_name='MyTimeSeriesModel', + if_exists=True, # Optional, default True +) +``` + +### EnableCompression + +Enables compression on a TimescaleDB hypertable with specified settings. + +```python +from timescale.db.migrations import EnableCompression + +EnableCompression( + model_name='MyTimeSeriesModel', + compress_orderby=['time'], # Optional + compress_segmentby=['device_id'], # Optional + compress_chunk_time_interval='1 hour', # Optional + if_not_exists=True, # Optional, default True +) +``` + +### AddCompressionPolicy + +Adds a compression policy to automatically compress chunks older than a specified interval. + +```python +from timescale.db.migrations import AddCompressionPolicy + +AddCompressionPolicy( + model_name='MyTimeSeriesModel', + compress_after='7 days', + schedule_interval='1 hour', # Optional + if_not_exists=True, # Optional, default True +) +``` + +### RemoveCompressionPolicy + +Removes a compression policy from a TimescaleDB hypertable. + +```python +from timescale.db.migrations import RemoveCompressionPolicy + +RemoveCompressionPolicy( + model_name='MyTimeSeriesModel', + if_exists=True, # Optional, default True +) +``` + +## Complete Migration Example + +Here's a complete example of a Django migration that sets up compression and retention policies for a TimescaleDB hypertable: + +```python +# migrations/0002_add_timescale_policies.py +from django.db import migrations +from timescale.db.migrations import ( + EnableCompression, + AddCompressionPolicy, + AddRetentionPolicy, +) + +class Migration(migrations.Migration): + dependencies = [ + ('myapp', '0001_initial'), + ] + + operations = [ + # First, enable compression on the hypertable + EnableCompression( + model_name='SensorData', + compress_orderby=['timestamp'], + compress_segmentby=['sensor_id'], + ), + + # Add a compression policy to compress data older than 7 days + AddCompressionPolicy( + model_name='SensorData', + compress_after='7 days', + schedule_interval='1 hour', + ), + + # Add a retention policy to drop data older than 1 year + AddRetentionPolicy( + model_name='SensorData', + drop_after='1 year', + schedule_interval='1 day', + ), + ] +``` + +## Using with timedelta + +You can also use Python `timedelta` objects instead of string intervals: + +```python +from datetime import timedelta +from timescale.db.migrations import AddRetentionPolicy + +AddRetentionPolicy( + model_name='MyTimeSeriesModel', + drop_after=timedelta(days=90), + schedule_interval=timedelta(hours=12), +) +``` + +## Migration Workflow + +### Setting up policies for a new hypertable + +1. Create your model with `TimescaleDateTimeField` +2. Run `makemigrations` to create the initial migration +3. Create a new migration for policies: `python manage.py makemigrations --empty myapp` +4. Add the policy operations to the new migration +5. Run `migrate` to apply the policies + +### Modifying existing policies + +To modify an existing policy, you typically need to: + +1. Remove the old policy +2. Add the new policy with updated parameters + +```python +operations = [ + # Remove old retention policy + RemoveRetentionPolicy( + model_name='SensorData', + ), + + # Add new retention policy with different interval + AddRetentionPolicy( + model_name='SensorData', + drop_after='6 months', # Changed from 1 year + ), +] +``` + +## Important Notes + +### Reversibility + +- `AddRetentionPolicy` and `AddCompressionPolicy` are reversible (they remove the policy when reversed) +- `RemoveRetentionPolicy` and `RemoveCompressionPolicy` are **not reversible** - you need to manually create the reverse operation with the original parameters +- `EnableCompression` attempts to disable compression when reversed, but TimescaleDB has limitations on disabling compression + +### Prerequisites + +- The table must already be a TimescaleDB hypertable before applying compression or retention policies +- For compression policies, compression must be enabled on the hypertable first using `EnableCompression` + +### Error Handling + +All operations use `if_not_exists=True` and `if_exists=True` by default to make migrations more robust and idempotent. + +## Advanced Usage + +### Custom Schedule Intervals + +You can specify custom schedule intervals for when policies run: + +```python +AddRetentionPolicy( + model_name='MyModel', + drop_after='90 days', + schedule_interval='6 hours', # Run every 6 hours +) +``` + +### Timezone Support + +You can specify a timezone for policy execution: + +```python +AddRetentionPolicy( + model_name='MyModel', + drop_after='90 days', + timezone='America/New_York', +) +``` + +### Initial Start Time + +You can specify when a policy should first run: + +```python +from datetime import datetime +from django.utils import timezone + +AddRetentionPolicy( + model_name='MyModel', + drop_after='90 days', + initial_start=timezone.now() + timedelta(hours=1), +) +``` + +## Troubleshooting + +### Common Issues + +1. **"relation is not a hypertable"**: Make sure your model uses `TimescaleDateTimeField` and the hypertable was created before applying policies. + +2. **"compression not enabled"**: You must use `EnableCompression` before adding compression policies. + +3. **"policy already exists"**: Use `if_not_exists=True` (default) to avoid this error. + +### Checking Policy Status + +You can check if policies are active by querying TimescaleDB system tables: + +```sql +-- Check retention policies +SELECT * FROM timescaledb_information.jobs +WHERE proc_name = 'policy_retention'; + +-- Check compression policies +SELECT * FROM timescaledb_information.jobs +WHERE proc_name = 'policy_compression'; + +-- Check compression status +SELECT * FROM timescaledb_information.hypertables; +``` diff --git a/timescale/db/migrations/__init__.py b/timescale/db/migrations/__init__.py new file mode 100644 index 0000000..25303ca --- /dev/null +++ b/timescale/db/migrations/__init__.py @@ -0,0 +1,65 @@ +""" +TimescaleDB migration operations. + +This module provides Django migration operations for managing TimescaleDB +retention and compression policies declaratively. + +The operations are automatically generated when you define TimescalePolicies +in your Django models and run makemigrations. + +Example model with declarative policies: + + from django.db import models + from timescale.db.models.fields import TimescaleDateTimeField + from timescale.db.models.managers import TimescaleManager + + class SensorData(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + sensor_id = models.CharField(max_length=50) + temperature = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class TimescalePolicies: + compression = { + 'enabled': True, + 'compress_orderby': ['timestamp'], + 'compress_segmentby': ['sensor_id'], + } + compression_policy = { + 'compress_after': '7 days', + 'schedule_interval': '1 hour', + } + retention_policy = { + 'drop_after': '90 days', + 'schedule_interval': '1 day', + } + +When you run makemigrations, Django will automatically generate migration +operations to apply these policies to your TimescaleDB hypertables. +""" + +from ..operations import TimescaleExtension +from .operations import ( + ApplyTimescalePolicies, + RemoveTimescalePolicies, + # Individual operations for backward compatibility and programmatic use + AddRetentionPolicy, + RemoveRetentionPolicy, + EnableCompression, + AddCompressionPolicy, + RemoveCompressionPolicy, +) + +__all__ = [ + 'TimescaleExtension', + 'ApplyTimescalePolicies', + 'RemoveTimescalePolicies', + # Individual operations + 'AddRetentionPolicy', + 'RemoveRetentionPolicy', + 'EnableCompression', + 'AddCompressionPolicy', + 'RemoveCompressionPolicy', +] diff --git a/timescale/db/migrations/autodetector.py b/timescale/db/migrations/autodetector.py new file mode 100644 index 0000000..c6c8d6b --- /dev/null +++ b/timescale/db/migrations/autodetector.py @@ -0,0 +1,348 @@ +""" +TimescaleDB migration autodetector. + +This module patches Django's migration autodetector to automatically +detect changes in TimescaleDB policies defined in model classes. +""" +from django.db.migrations.autodetector import MigrationAutodetector +from django.db.migrations.state import ModelState +from django.apps import apps +from typing import Dict, List, Any, Optional + +from .operations import ApplyTimescalePolicies, RemoveTimescalePolicies +from ..models.policies import get_model_timescale_policies +from ..models.fields import TimescaleDateTimeField + + +def has_timescale_field(model_class): + """Check if a model has TimescaleDB fields.""" + if not model_class: + return False + + for field in model_class._meta.get_fields(): + if isinstance(field, TimescaleDateTimeField): + return True + return False + + +def get_last_applied_policies_from_migrations(app_label: str, model_name: str, autodetector): + """ + Get the last applied TimescaleDB policies for a model from migration history. + + This uses Django's MigrationLoader to find the most recent ApplyTimescalePolicies + operation for the given model. This is more reliable than reading files directly. + """ + try: + from django.db.migrations.loader import MigrationLoader + + # Create a migration loader to access the migration graph + loader = MigrationLoader(None, ignore_no_migrations=True) + + # Get all migrations for this app, sorted by name (which includes ordering) + app_migrations = [] + for migration_key in loader.graph.nodes: + if migration_key[0] == app_label: + app_migrations.append(migration_key) + + # Sort by migration name to get chronological order + app_migrations.sort(key=lambda x: x[1]) + + # Look through migrations in reverse order to find the most recent policy + for migration_key in reversed(app_migrations): + migration = loader.graph.nodes[migration_key] + + # Check each operation in the migration + for operation in migration.operations: + if (hasattr(operation, 'model_name') and + hasattr(operation, '__class__') and + operation.__class__.__name__ == 'ApplyTimescalePolicies' and + operation.model_name.lower() == model_name.lower()): + # Found the most recent policy operation for this model + print(f"DEBUG: Found policies in migration {migration_key[1]}") + return { + 'compression': getattr(operation, 'compression_settings', None), + 'compression_policy': getattr(operation, 'compression_policy', None), + 'retention_policy': getattr(operation, 'retention_policy', None), + } + + print(f"DEBUG: No policy operations found for {model_name}") + return None + + except Exception as e: + print(f"DEBUG: Error getting policies from migration history: {e}") + # Fallback to disk reading approach if MigrationLoader fails + return get_policies_from_disk_fallback(app_label, model_name) + + +def get_policies_from_disk_fallback(app_label: str, model_name: str): + """ + Fallback method to read migration files from disk. + + This is used when the MigrationLoader approach fails. + """ + import os + import importlib.util + from django.conf import settings + + try: + # Find the app's migration directory + app_config = None + for app in settings.INSTALLED_APPS: + if app.split('.')[-1] == app_label: + app_config = app + break + + if not app_config: + return None + + # Get the migration directory path + try: + module = importlib.import_module(app_config) + app_path = os.path.dirname(module.__file__) + migrations_path = os.path.join(app_path, 'migrations') + except: + return None + + if not os.path.exists(migrations_path): + return None + + # Get all migration files + migration_files = [] + for filename in os.listdir(migrations_path): + if filename.endswith('.py') and filename != '__init__.py': + migration_files.append(filename) + + # Sort migration files by name (which includes ordering) + migration_files.sort() + + # Look through migration files in reverse order + for filename in reversed(migration_files): + filepath = os.path.join(migrations_path, filename) + + try: + # Load the migration module + spec = importlib.util.spec_from_file_location(f"{app_label}.migrations.{filename[:-3]}", filepath) + migration_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(migration_module) + + # Get the Migration class + if hasattr(migration_module, 'Migration'): + migration = migration_module.Migration + + # Check each operation in the migration + for operation in migration.operations: + if (hasattr(operation, 'model_name') and + hasattr(operation, '__class__') and + operation.__class__.__name__ == 'ApplyTimescalePolicies' and + operation.model_name.lower() == model_name.lower()): + # Found the most recent policy operation for this model + print(f"DEBUG: Found policies in migration {filename} (fallback)") + return { + 'compression': getattr(operation, 'compression_settings', None), + 'compression_policy': getattr(operation, 'compression_policy', None), + 'retention_policy': getattr(operation, 'retention_policy', None), + } + except Exception: + continue + + return None + + except Exception: + return None + + +def compare_model_policies(old_policies, new_policies): + """ + Compare two policy definitions to determine if they are different. + + Args: + old_policies: Previous policy definition (dict or TimescalePolicies object) + new_policies: New policy definition (TimescalePolicies object) + + Returns: + True if policies are different, False if they are the same + """ + if old_policies is None and new_policies is None: + return False + + if old_policies is None or new_policies is None: + return True + + # Convert old_policies to comparable format + if isinstance(old_policies, dict): + old_compression = old_policies.get('compression') + old_compression_policy = old_policies.get('compression_policy') + old_retention_policy = old_policies.get('retention_policy') + else: + old_compression = getattr(old_policies, 'compression', None) + old_compression_policy = getattr(old_policies, 'compression_policy', None) + old_retention_policy = getattr(old_policies, 'retention_policy', None) + + # Get new policies + new_compression = getattr(new_policies, 'compression', None) + new_compression_policy = getattr(new_policies, 'compression_policy', None) + new_retention_policy = getattr(new_policies, 'retention_policy', None) + + # Compare each policy type + if old_compression != new_compression: + return True + + if old_compression_policy != new_compression_policy: + return True + + if old_retention_policy != new_retention_policy: + return True + + return False + + +def convert_policies_to_dict(policies_obj): + """ + Convert a TimescalePolicies object to a dictionary for comparison. + """ + if not policies_obj: + return None + + return { + 'compression': getattr(policies_obj, 'compression', None), + 'compression_policy': getattr(policies_obj, 'compression_policy', None), + 'retention_policy': getattr(policies_obj, 'retention_policy', None), + } + + +def generate_timescale_policy_operations(autodetector): + """ + Generate TimescaleDB policy operations for the migration autodetector. + + This function detects when TimescaleDB policies are added, changed, or removed + from models and generates the appropriate migration operations. + """ + # Get all models from both old and new states + old_model_keys = set(autodetector.from_state.models.keys()) + new_model_keys = set(autodetector.to_state.models.keys()) + + # Check all models in the new state for TimescaleDB policies + # This approach focuses on detecting when policies are added to models + print(f"DEBUG: Checking {len(new_model_keys)} models for policies") + + for app_label, model_name in new_model_keys: + print(f"DEBUG: Checking model {app_label}.{model_name}") + new_model_state = autodetector.to_state.models[app_label, model_name] + + # Only process models with TimescaleDB fields + try: + model_class = apps.get_model(app_label, model_name) + if not has_timescale_field(model_class): + print(f"DEBUG: {model_name} has no TimescaleDB fields") + continue + print(f"DEBUG: {model_name} has TimescaleDB fields") + except LookupError: + print(f"DEBUG: Could not get model class for {model_name}") + continue + + # Get current policies from the model class + current_policies = get_model_timescale_policies(model_class) + print(f"DEBUG: Current policies for {model_name}: {current_policies}") + + # Get last applied policies from migration history + last_applied_policies = get_last_applied_policies_from_migrations(app_label, model_name, autodetector) + print(f"DEBUG: Last applied policies for {model_name}: {last_applied_policies}") + + # Compare policies to determine what operations are needed + operation = None + + if current_policies and ( + hasattr(current_policies, 'compression') or + hasattr(current_policies, 'compression_policy') or + hasattr(current_policies, 'retention_policy') + ): + if not last_applied_policies: + # New policies - generate ApplyTimescalePolicies operation + print(f"DEBUG: {model_name} has new policies") + operation = ApplyTimescalePolicies( + model_name=model_name, + compression_settings=getattr(current_policies, 'compression', None), + compression_policy=getattr(current_policies, 'compression_policy', None), + retention_policy=getattr(current_policies, 'retention_policy', None), + ) + elif compare_model_policies(last_applied_policies, current_policies): + # Policies changed - generate ApplyTimescalePolicies operation + print(f"DEBUG: {model_name} has changed policies") + operation = ApplyTimescalePolicies( + model_name=model_name, + compression_settings=getattr(current_policies, 'compression', None), + compression_policy=getattr(current_policies, 'compression_policy', None), + retention_policy=getattr(current_policies, 'retention_policy', None), + ) + else: + print(f"DEBUG: {model_name} policies unchanged") + elif last_applied_policies: + # Policies removed - generate RemoveTimescalePolicies operation + print(f"DEBUG: {model_name} policies removed") + operation = RemoveTimescalePolicies( + model_name=model_name, + remove_compression_policy=bool(last_applied_policies.get('compression_policy')), + remove_retention_policy=bool(last_applied_policies.get('retention_policy')), + ) + else: + print(f"DEBUG: {model_name} has no policies defined") + + # Add the operation to migrations if needed + if operation: + print(f"DEBUG: Adding operation for {model_name}: {operation}") + + # Ensure the app has a migration entry + if app_label not in autodetector.migrations: + autodetector.migrations[app_label] = [] + print(f"DEBUG: Created migration entry for {app_label}") + + # Create a migration if it doesn't exist + if not autodetector.migrations[app_label]: + from django.db.migrations import Migration + migration = Migration(f'timescale_policies_{model_name.lower()}', app_label) + autodetector.migrations[app_label].append(migration) + print(f"DEBUG: Created new migration for {app_label}") + + # Add operation to the migration + migration = autodetector.migrations[app_label][0] + migration.operations.append(operation) + print(f"DEBUG: Added operation to migration: {operation}") + + + + +def patch_migration_autodetector(): + """ + Patch Django's MigrationAutodetector to include TimescaleDB policy detection. + + This function is called during Django app initialization to enable + automatic detection of TimescaleDB policy changes. + """ + # print("DEBUG: Patching MigrationAutodetector...") + + # Store the original _detect_changes method + original_detect_changes = MigrationAutodetector._detect_changes + + def enhanced_detect_changes(self, convert_apps=None, graph=None): + """Enhanced version that also detects TimescaleDB policy changes.""" + print("DEBUG: Enhanced _detect_changes called") + + # Call the original method first + result = original_detect_changes(self, convert_apps, graph) + + # Add our TimescaleDB policy detection + try: + print("DEBUG: Calling generate_timescale_policy_operations") + generate_timescale_policy_operations(self) + print(f"DEBUG: After policy detection, migrations: {self.migrations}") + except Exception as e: + print(f"DEBUG: Error in policy detection: {e}") + # Don't break migration generation if our detection fails + pass + + # Return the result + return result + + # Patch the method + MigrationAutodetector._detect_changes = enhanced_detect_changes + print("DEBUG: MigrationAutodetector patched successfully") diff --git a/timescale/db/migrations/examples/example_migration.py b/timescale/db/migrations/examples/example_migration.py new file mode 100644 index 0000000..8ad0b8c --- /dev/null +++ b/timescale/db/migrations/examples/example_migration.py @@ -0,0 +1,189 @@ +""" +Example Django migration showing how to use TimescaleDB policy operations. + +This is an example migration file that demonstrates how to use the TimescaleDB +migration operations to set up compression and retention policies. + +To use this in your own project: +1. Copy the operations you need into your own migration file +2. Update the model_name to match your actual model +3. Adjust the time intervals as needed for your use case +""" + +from django.db import migrations +from timescale.db.migrations import ( + EnableCompression, + AddCompressionPolicy, + AddRetentionPolicy, + RemoveRetentionPolicy, + RemoveCompressionPolicy, +) + + +class Migration(migrations.Migration): + """ + Example migration for setting up TimescaleDB policies. + + This migration assumes you have a model called 'SensorData' that is already + a TimescaleDB hypertable (created by having a TimescaleDateTimeField). + """ + + dependencies = [ + ('myapp', '0001_initial'), # Replace with your actual app and migration + ] + + operations = [ + # Step 1: Enable compression on the hypertable + # This configures the hypertable for compression with specific settings + EnableCompression( + model_name='SensorData', # Replace with your model name + compress_orderby=['timestamp'], # Order by timestamp for better compression + compress_segmentby=['sensor_id'], # Segment by sensor_id for parallel processing + ), + + # Step 2: Add a compression policy + # This automatically compresses chunks older than 7 days + AddCompressionPolicy( + model_name='SensorData', # Replace with your model name + compress_after='7 days', # Compress data older than 7 days + schedule_interval='1 hour', # Check for new chunks to compress every hour + ), + + # Step 3: Add a retention policy + # This automatically removes data older than 1 year + AddRetentionPolicy( + model_name='SensorData', # Replace with your model name + drop_after='1 year', # Remove data older than 1 year + schedule_interval='1 day', # Check for old data to remove daily + ), + ] + + +# Example of a migration that modifies existing policies +class ModifyPoliciesMigration(migrations.Migration): + """ + Example migration for modifying existing TimescaleDB policies. + + This shows how to change policy parameters by removing the old policy + and adding a new one with different settings. + """ + + dependencies = [ + ('myapp', '0002_add_timescale_policies'), # The migration that added the original policies + ] + + operations = [ + # Remove the old retention policy + RemoveRetentionPolicy( + model_name='SensorData', + ), + + # Add a new retention policy with a shorter retention period + AddRetentionPolicy( + model_name='SensorData', + drop_after='6 months', # Changed from 1 year to 6 months + schedule_interval='1 day', + ), + + # You could also modify compression policies in the same way: + # RemoveCompressionPolicy( + # model_name='SensorData', + # ), + # AddCompressionPolicy( + # model_name='SensorData', + # compress_after='3 days', # Changed from 7 days to 3 days + # schedule_interval='30 minutes', # More frequent compression + # ), + ] + + +# Example using timedelta objects instead of strings +class TimedeltaExampleMigration(migrations.Migration): + """ + Example migration using Python timedelta objects for time intervals. + + This shows an alternative way to specify time intervals using Python's + timedelta objects instead of string intervals. + """ + + from datetime import timedelta + + dependencies = [ + ('myapp', '0001_initial'), + ] + + operations = [ + EnableCompression( + model_name='SensorData', + compress_orderby=['timestamp'], + ), + + AddCompressionPolicy( + model_name='SensorData', + compress_after=timedelta(days=7), # Using timedelta + schedule_interval=timedelta(hours=1), # Using timedelta + ), + + AddRetentionPolicy( + model_name='SensorData', + drop_after=timedelta(days=365), # 1 year using timedelta + schedule_interval=timedelta(days=1), # Using timedelta + ), + ] + + +# Example for multiple models +class MultipleModelsMigration(migrations.Migration): + """ + Example migration for setting up policies on multiple models. + + This shows how to apply different policies to different models + based on their specific requirements. + """ + + dependencies = [ + ('myapp', '0001_initial'), + ] + + operations = [ + # High-frequency sensor data - aggressive compression and shorter retention + EnableCompression( + model_name='HighFrequencySensorData', + compress_orderby=['timestamp'], + compress_segmentby=['sensor_id'], + ), + AddCompressionPolicy( + model_name='HighFrequencySensorData', + compress_after='1 day', # Compress quickly + ), + AddRetentionPolicy( + model_name='HighFrequencySensorData', + drop_after='30 days', # Short retention + ), + + # Low-frequency sensor data - less aggressive compression and longer retention + EnableCompression( + model_name='LowFrequencySensorData', + compress_orderby=['timestamp'], + ), + AddCompressionPolicy( + model_name='LowFrequencySensorData', + compress_after='30 days', # Compress less frequently + ), + AddRetentionPolicy( + model_name='LowFrequencySensorData', + drop_after='2 years', # Longer retention + ), + + # Audit logs - compression but no automatic deletion + EnableCompression( + model_name='AuditLog', + compress_orderby=['timestamp'], + compress_segmentby=['user_id'], + ), + AddCompressionPolicy( + model_name='AuditLog', + compress_after='90 days', + ), + # Note: No retention policy for audit logs - keep forever + ] diff --git a/timescale/db/migrations/operations.py b/timescale/db/migrations/operations.py new file mode 100644 index 0000000..1808f82 --- /dev/null +++ b/timescale/db/migrations/operations.py @@ -0,0 +1,558 @@ +""" +Django migration operations for TimescaleDB policies. + +These operations are automatically generated by Django's migration system +when TimescaleDB policies are defined in model classes. +""" +from django.db.migrations.operations.base import Operation +from typing import Optional, Union, List, Dict, Any +from datetime import datetime, timedelta + + +class ApplyTimescalePolicies(Operation): + """ + Apply TimescaleDB policies to a model. + + This operation is automatically generated when policies are defined + in a model's TimescalePolicies class. + """ + + def __init__( + self, + model_name: str, + compression_settings: Optional[Dict[str, Any]] = None, + compression_policy: Optional[Dict[str, Any]] = None, + retention_policy: Optional[Dict[str, Any]] = None, + **kwargs + ): + """ + Initialize the ApplyTimescalePolicies operation. + + Args: + model_name: The name of the model to apply policies to + compression_settings: Compression configuration + compression_policy: Compression policy configuration + retention_policy: Retention policy configuration + """ + self.model_name = model_name + self.compression_settings = compression_settings + self.compression_policy = compression_policy + self.retention_policy = retention_policy + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """ + The ApplyTimescalePolicies operation doesn't alter the model state. + """ + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """ + Apply the TimescaleDB policies to the database. + + For policy updates, we need to remove existing policies first, + then add the new ones to ensure the changes take effect. + """ + # Get the actual model class, not the migration state model + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + + # For policy updates, remove existing policies first + # This ensures that policy changes actually take effect + if self.compression_policy: + model.timescale.remove_compression_policy(if_exists=True) + + if self.retention_policy: + model.timescale.remove_retention_policy(if_exists=True) + + # Apply compression settings (only if not already enabled) + if self.compression_settings and self.compression_settings.get('enabled', False): + model.timescale.enable_compression( + compress_orderby=self.compression_settings.get('compress_orderby', []), + compress_segmentby=self.compression_settings.get('compress_segmentby', []), + compress_chunk_time_interval=self.compression_settings.get('compress_chunk_time_interval'), + if_not_exists=True # Compression can only be enabled once + ) + + # Apply compression policy (after removing old one) + if self.compression_policy: + model.timescale.add_compression_policy( + compress_after=self.compression_policy['compress_after'], + schedule_interval=self.compression_policy.get('schedule_interval'), + if_not_exists=False, # We already removed the old policy + compress_created_before=self.compression_policy.get('compress_created_before'), + initial_start=self.compression_policy.get('initial_start'), + timezone=self.compression_policy.get('timezone') + ) + + # Apply retention policy (after removing old one) + if self.retention_policy: + model.timescale.add_retention_policy( + drop_after=self.retention_policy['drop_after'], + schedule_interval=self.retention_policy.get('schedule_interval'), + if_not_exists=False, # We already removed the old policy + drop_created_before=self.retention_policy.get('drop_created_before'), + initial_start=self.retention_policy.get('initial_start'), + timezone=self.retention_policy.get('timezone') + ) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """ + Remove the TimescaleDB policies from the database. + """ + # Get the actual model class, not the migration state model + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + + # Remove policies (order matters - remove policies before disabling compression) + if self.compression_policy: + model.timescale.remove_compression_policy(if_exists=True) + + if self.retention_policy: + model.timescale.remove_retention_policy(if_exists=True) + + # Note: We don't disable compression in reverse because TimescaleDB + # doesn't support disabling compression once enabled + + def describe(self): + """ + Return a human-readable description of the operation. + """ + policies = [] + if self.compression_settings: + policies.append("compression") + if self.compression_policy: + policies.append("compression policy") + if self.retention_policy: + policies.append("retention policy") + + policy_str = ", ".join(policies) + return f"Apply TimescaleDB {policy_str} to {self.model_name}" + + @property + def migration_name_fragment(self): + """ + Return a fragment for the migration filename. + """ + return f"apply_timescale_policies_{self.model_name.lower()}" + + def deconstruct(self): + """ + Return a 3-tuple of class import path, positional args, and keyword args. + """ + kwargs = { + 'model_name': self.model_name, + } + + if self.compression_settings: + kwargs['compression_settings'] = self.compression_settings + if self.compression_policy: + kwargs['compression_policy'] = self.compression_policy + if self.retention_policy: + kwargs['retention_policy'] = self.retention_policy + + return ( + self.__class__.__qualname__, + [], + kwargs + ) + + +class RemoveTimescalePolicies(Operation): + """ + Remove TimescaleDB policies from a model. + + This operation is automatically generated when policies are removed + from a model's TimescalePolicies class. + """ + + def __init__( + self, + model_name: str, + remove_compression_policy: bool = False, + remove_retention_policy: bool = False, + **kwargs + ): + """ + Initialize the RemoveTimescalePolicies operation. + + Args: + model_name: The name of the model to remove policies from + remove_compression_policy: Whether to remove compression policy + remove_retention_policy: Whether to remove retention policy + """ + self.model_name = model_name + self.remove_compression_policy = remove_compression_policy + self.remove_retention_policy = remove_retention_policy + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """ + The RemoveTimescalePolicies operation doesn't alter the model state. + """ + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """ + Remove the TimescaleDB policies from the database. + """ + # Get the actual model class, not the migration state model + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + + if self.remove_compression_policy: + model.timescale.remove_compression_policy(if_exists=True) + + if self.remove_retention_policy: + model.timescale.remove_retention_policy(if_exists=True) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """ + This operation is not reversible without the original policy definitions. + """ + raise NotImplementedError( + "RemoveTimescalePolicies is not reversible. " + "The original policy definitions would be needed to reverse this operation." + ) + + def describe(self): + """ + Return a human-readable description of the operation. + """ + policies = [] + if self.remove_compression_policy: + policies.append("compression policy") + if self.remove_retention_policy: + policies.append("retention policy") + + policy_str = ", ".join(policies) + return f"Remove TimescaleDB {policy_str} from {self.model_name}" + + @property + def migration_name_fragment(self): + """ + Return a fragment for the migration filename. + """ + return f"remove_timescale_policies_{self.model_name.lower()}" + + +# Individual operations for backward compatibility and programmatic use +class AddRetentionPolicy(Operation): + """ + Add a retention policy to automatically remove old data from a TimescaleDB hypertable. + """ + + def __init__( + self, + model_name: str, + drop_after: Union[str, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = True, + drop_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[str] = None, + timezone: Optional[str] = None, + **kwargs + ): + self.model_name = model_name + self.drop_after = drop_after + self.schedule_interval = schedule_interval + self.if_not_exists = if_not_exists + self.drop_created_before = drop_created_before + self.initial_start = initial_start + self.timezone = timezone + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """The AddRetentionPolicy operation doesn't alter the model state.""" + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """Add the retention policy to the database.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + + model.timescale.add_retention_policy( + drop_after=self.drop_after, + schedule_interval=self.schedule_interval, + if_not_exists=self.if_not_exists, + drop_created_before=self.drop_created_before, + initial_start=self.initial_start, + timezone=self.timezone + ) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """Remove the retention policy from the database.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + model.timescale.remove_retention_policy(if_exists=True) + + def describe(self): + return f"Add retention policy to {self.model_name} (drop after {self.drop_after})" + + @property + def migration_name_fragment(self): + return f"add_retention_policy_{self.model_name.lower()}" + + def deconstruct(self): + kwargs = { + 'model_name': self.model_name, + 'drop_after': self.drop_after, + } + if self.schedule_interval is not None: + kwargs['schedule_interval'] = self.schedule_interval + if not self.if_not_exists: + kwargs['if_not_exists'] = self.if_not_exists + if self.drop_created_before is not None: + kwargs['drop_created_before'] = self.drop_created_before + if self.initial_start is not None: + kwargs['initial_start'] = self.initial_start + if self.timezone is not None: + kwargs['timezone'] = self.timezone + + return ( + self.__class__.__name__, + [], + kwargs + ) + + +class RemoveRetentionPolicy(Operation): + """ + Remove a retention policy from a TimescaleDB hypertable. + """ + + def __init__(self, model_name: str, if_exists: bool = True, **kwargs): + self.model_name = model_name + self.if_exists = if_exists + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """The RemoveRetentionPolicy operation doesn't alter the model state.""" + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """Remove the retention policy from the database.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + model.timescale.remove_retention_policy(if_exists=self.if_exists) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """This operation is not reversible without the original policy parameters.""" + raise NotImplementedError( + "RemoveRetentionPolicy is not reversible. " + "The original policy parameters would be needed to reverse this operation." + ) + + def describe(self): + return f"Remove retention policy from {self.model_name}" + + @property + def migration_name_fragment(self): + return f"remove_retention_policy_{self.model_name.lower()}" + + def deconstruct(self): + kwargs = {'model_name': self.model_name} + if not self.if_exists: + kwargs['if_exists'] = self.if_exists + + return ( + self.__class__.__name__, + [], + kwargs + ) + + +class EnableCompression(Operation): + """ + Enable compression on a TimescaleDB hypertable. + """ + + def __init__( + self, + model_name: str, + compress_orderby: Optional[List[str]] = None, + compress_segmentby: Optional[List[str]] = None, + compress_chunk_time_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = True, + **kwargs + ): + self.model_name = model_name + self.compress_orderby = compress_orderby or [] + self.compress_segmentby = compress_segmentby or [] + self.compress_chunk_time_interval = compress_chunk_time_interval + self.if_not_exists = if_not_exists + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """The EnableCompression operation doesn't alter the model state.""" + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """Enable compression on the hypertable.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + + model.timescale.enable_compression( + compress_orderby=self.compress_orderby, + compress_segmentby=self.compress_segmentby, + compress_chunk_time_interval=self.compress_chunk_time_interval, + if_not_exists=self.if_not_exists + ) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """ + Note: TimescaleDB doesn't support disabling compression once enabled. + This is a limitation of TimescaleDB itself. + """ + # We can't actually disable compression in TimescaleDB + # This is documented behavior + pass + + def describe(self): + return f"Enable compression on {self.model_name}" + + @property + def migration_name_fragment(self): + return f"enable_compression_{self.model_name.lower()}" + + def deconstruct(self): + kwargs = {'model_name': self.model_name} + if self.compress_orderby: + kwargs['compress_orderby'] = self.compress_orderby + if self.compress_segmentby: + kwargs['compress_segmentby'] = self.compress_segmentby + if self.compress_chunk_time_interval is not None: + kwargs['compress_chunk_time_interval'] = self.compress_chunk_time_interval + if not self.if_not_exists: + kwargs['if_not_exists'] = self.if_not_exists + + return ( + self.__class__.__name__, + [], + kwargs + ) + + +class AddCompressionPolicy(Operation): + """ + Add a compression policy to automatically compress chunks older than a specified interval. + """ + + def __init__( + self, + model_name: str, + compress_after: Union[str, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = True, + compress_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[str] = None, + timezone: Optional[str] = None, + **kwargs + ): + self.model_name = model_name + self.compress_after = compress_after + self.schedule_interval = schedule_interval + self.if_not_exists = if_not_exists + self.compress_created_before = compress_created_before + self.initial_start = initial_start + self.timezone = timezone + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """The AddCompressionPolicy operation doesn't alter the model state.""" + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """Add the compression policy to the database.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + + model.timescale.add_compression_policy( + compress_after=self.compress_after, + schedule_interval=self.schedule_interval, + if_not_exists=self.if_not_exists, + compress_created_before=self.compress_created_before, + initial_start=self.initial_start, + timezone=self.timezone + ) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """Remove the compression policy from the database.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + model.timescale.remove_compression_policy(if_exists=True) + + def describe(self): + return f"Add compression policy to {self.model_name} (compress after {self.compress_after})" + + @property + def migration_name_fragment(self): + return f"add_compression_policy_{self.model_name.lower()}" + + def deconstruct(self): + kwargs = { + 'model_name': self.model_name, + 'compress_after': self.compress_after, + } + if self.schedule_interval is not None: + kwargs['schedule_interval'] = self.schedule_interval + if not self.if_not_exists: + kwargs['if_not_exists'] = self.if_not_exists + if self.compress_created_before is not None: + kwargs['compress_created_before'] = self.compress_created_before + if self.initial_start is not None: + kwargs['initial_start'] = self.initial_start + if self.timezone is not None: + kwargs['timezone'] = self.timezone + + return ( + self.__class__.__name__, + [], + kwargs + ) + + +class RemoveCompressionPolicy(Operation): + """ + Remove a compression policy from a TimescaleDB hypertable. + """ + + def __init__(self, model_name: str, if_exists: bool = True, **kwargs): + self.model_name = model_name + self.if_exists = if_exists + super().__init__(**kwargs) + + def state_forwards(self, app_label, state): + """The RemoveCompressionPolicy operation doesn't alter the model state.""" + pass + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """Remove the compression policy from the database.""" + from django.apps import apps + model = apps.get_model(app_label, self.model_name) + model.timescale.remove_compression_policy(if_exists=self.if_exists) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """This operation is not reversible without the original policy parameters.""" + raise NotImplementedError( + "RemoveCompressionPolicy is not reversible. " + "The original policy parameters would be needed to reverse this operation." + ) + + def describe(self): + return f"Remove compression policy from {self.model_name}" + + @property + def migration_name_fragment(self): + return f"remove_compression_policy_{self.model_name.lower()}" + + def deconstruct(self): + kwargs = {'model_name': self.model_name} + if not self.if_exists: + kwargs['if_exists'] = self.if_exists + + return ( + self.__class__.__name__, + [], + kwargs + ) diff --git a/timescale/db/models/compression.py b/timescale/db/models/compression.py new file mode 100644 index 0000000..edd7340 --- /dev/null +++ b/timescale/db/models/compression.py @@ -0,0 +1,241 @@ +""" +Compression policy implementation for TimescaleDB. +""" +from django.db import connection +from typing import Optional, Union, List, Dict, Any +from datetime import datetime, timedelta + + +class CompressionPolicy: + """ + A class to manage compression policies for TimescaleDB hypertables. + + This allows setting up automatic compression of chunks to reduce storage requirements. + """ + + @staticmethod + def add_compression_policy( + model, + compress_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + compress_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a compression policy to a TimescaleDB hypertable. + + Args: + model: The Django model with a TimescaleDateTimeField + compress_after: Chunks older than this interval will be compressed + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + compress_created_before: Alternative to compress_after, compresses chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + + Raises: + ValueError: If both compress_after and compress_created_before are provided + """ + if compress_after is not None and compress_created_before is not None: + raise ValueError("Cannot specify both compress_after and compress_created_before") + + table_name = model._meta.db_table + + # Convert timedelta to string interval if needed + if isinstance(compress_after, timedelta): + compress_after = f"{compress_after.total_seconds()} seconds" + if isinstance(compress_created_before, timedelta): + compress_created_before = f"{compress_created_before.total_seconds()} seconds" + if isinstance(schedule_interval, timedelta): + schedule_interval = f"{schedule_interval.total_seconds()} seconds" + + # Build the SQL query + params = [] + sql = "SELECT add_compression_policy(" + + # Add the table name + sql += "%s" + params.append(table_name) + + # Add compress_after or compress_created_before + if compress_after is not None: + sql += ", compress_after => INTERVAL %s" + params.append(compress_after) + if compress_created_before is not None: + sql += ", compress_created_before => INTERVAL %s" + params.append(compress_created_before) + + # Add optional parameters + if schedule_interval is not None: + sql += ", schedule_interval => INTERVAL %s" + params.append(schedule_interval) + + if initial_start is not None: + sql += ", initial_start => %s" + params.append(initial_start) + + if timezone is not None: + sql += ", timezone => %s" + params.append(timezone) + + if if_not_exists: + sql += ", if_not_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + job_id = cursor.fetchone()[0] + + return job_id + + @staticmethod + def remove_compression_policy(model, if_exists: bool = False) -> bool: + """ + Remove a compression policy from a TimescaleDB hypertable. + + Args: + model: The Django model with the compression policy + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + table_name = model._meta.db_table + + # Build the SQL query + params = [table_name] + sql = "SELECT remove_compression_policy(%s" + + if if_exists: + sql += ", if_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + result = cursor.fetchone()[0] + + # TimescaleDB returns True when successful + return bool(result) + + @staticmethod + def enable_compression(model, compress_segmentby: Optional[List[str]] = None, + compress_orderby: Optional[List[str]] = None, + compress_chunk_time_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False) -> bool: + """ + Enable compression on a TimescaleDB hypertable. + + Args: + model: The Django model to enable compression on + compress_segmentby: List of columns to segment by + compress_orderby: List of columns to order by + compress_chunk_time_interval: Time interval for compression chunks + if_not_exists: If True, don't error if compression is already enabled + + Returns: + True if compression was enabled, False otherwise + """ + table_name = model._meta.db_table + + # Convert timedelta to string interval if needed + if isinstance(compress_chunk_time_interval, timedelta): + compress_chunk_time_interval = f"{compress_chunk_time_interval.total_seconds()} seconds" + + # Build the SQL query + sql = f"ALTER TABLE {table_name} SET (timescaledb.compress = TRUE" + + if compress_segmentby: + sql += f", timescaledb.compress_segmentby = '{','.join(compress_segmentby)}'" + + if compress_orderby: + sql += f", timescaledb.compress_orderby = '{','.join(compress_orderby)}'" + + if compress_chunk_time_interval: + sql += f", timescaledb.compress_chunk_time_interval = '{compress_chunk_time_interval}'" + + sql += ")" + + # Execute the query + try: + with connection.cursor() as cursor: + cursor.execute(sql) + return True + except Exception as e: + if if_not_exists and "already compressed" in str(e): + return False + raise + + @staticmethod + def compress_chunk(chunk_name: str) -> bool: + """ + Manually compress a specific chunk. + + Args: + chunk_name: The name of the chunk to compress + + Returns: + True if the chunk was compressed, False otherwise + """ + # Build the SQL query + sql = f"SELECT compress_chunk('{chunk_name}')" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql) + result = cursor.fetchone()[0] + + return result + + @staticmethod + def decompress_chunk(chunk_name: str) -> bool: + """ + Manually decompress a specific chunk. + + Args: + chunk_name: The name of the chunk to decompress + + Returns: + True if the chunk was decompressed, False otherwise + """ + # Build the SQL query + sql = f"SELECT decompress_chunk('{chunk_name}')" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql) + result = cursor.fetchone()[0] + + return result + + @staticmethod + def get_compression_stats(model) -> List[Dict[str, Any]]: + """ + Get compression statistics for a hypertable. + + Args: + model: The Django model to get compression stats for + + Returns: + A list of dictionaries containing compression statistics + """ + table_name = model._meta.db_table + + # Build the SQL query + sql = f"SELECT * FROM hypertable_compression_stats('{table_name}')" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql) + columns = [col[0] for col in cursor.description] + results = [dict(zip(columns, row)) for row in cursor.fetchall()] + + return results diff --git a/timescale/db/models/managers.py b/timescale/db/models/managers.py index c3c0021..bf7c919 100644 --- a/timescale/db/models/managers.py +++ b/timescale/db/models/managers.py @@ -1,6 +1,7 @@ from django.db import models from timescale.db.models.querysets import * -from typing import Optional +from typing import Optional, Union, List, Dict, Any +from datetime import datetime, timedelta class TimescaleManager(models.Manager): @@ -26,3 +27,137 @@ def histogram(self, field: str, min_value: float, max_value: float, num_of_bucke def lttb(self, time: str, value: str, num_of_counts: int = 20): return self.get_queryset().lttb(time, value, num_of_counts) + + # Retention Policy Methods + def add_retention_policy( + self, + drop_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + drop_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a retention policy to automatically remove old data from the hypertable. + + Args: + drop_after: Chunks older than this interval will be dropped + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + drop_created_before: Alternative to drop_after, drops chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + """ + from timescale.db.models.retention import RetentionPolicy + return RetentionPolicy.add_retention_policy( + self.model, + drop_after=drop_after, + schedule_interval=schedule_interval, + if_not_exists=if_not_exists, + drop_created_before=drop_created_before, + initial_start=initial_start, + timezone=timezone + ) + + def remove_retention_policy(self, if_exists: bool = False) -> bool: + """ + Remove a retention policy from the hypertable. + + Args: + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + from timescale.db.models.retention import RetentionPolicy + return RetentionPolicy.remove_retention_policy(self.model, if_exists=if_exists) + + # Compression Policy Methods + def add_compression_policy( + self, + compress_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + compress_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a compression policy to automatically compress chunks in the hypertable. + + Args: + compress_after: Chunks older than this interval will be compressed + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + compress_created_before: Alternative to compress_after, compresses chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.add_compression_policy( + self.model, + compress_after=compress_after, + schedule_interval=schedule_interval, + if_not_exists=if_not_exists, + compress_created_before=compress_created_before, + initial_start=initial_start, + timezone=timezone + ) + + def remove_compression_policy(self, if_exists: bool = False) -> bool: + """ + Remove a compression policy from the hypertable. + + Args: + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.remove_compression_policy(self.model, if_exists=if_exists) + + def enable_compression( + self, + compress_segmentby: Optional[List[str]] = None, + compress_orderby: Optional[List[str]] = None, + compress_chunk_time_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False + ) -> bool: + """ + Enable compression on the hypertable. + + Args: + compress_segmentby: List of columns to segment by + compress_orderby: List of columns to order by + compress_chunk_time_interval: Time interval for compression chunks + if_not_exists: If True, don't error if compression is already enabled + + Returns: + True if compression was enabled, False otherwise + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.enable_compression( + self.model, + compress_segmentby=compress_segmentby, + compress_orderby=compress_orderby, + compress_chunk_time_interval=compress_chunk_time_interval, + if_not_exists=if_not_exists + ) + + def get_compression_stats(self) -> List[Dict[str, Any]]: + """ + Get compression statistics for the hypertable. + + Returns: + A list of dictionaries containing compression statistics + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.get_compression_stats(self.model) diff --git a/timescale/db/models/policies.py b/timescale/db/models/policies.py new file mode 100644 index 0000000..4460e2f --- /dev/null +++ b/timescale/db/models/policies.py @@ -0,0 +1,228 @@ +""" +Declarative TimescaleDB policy definitions for Django models. + +This module provides a way to define TimescaleDB retention and compression +policies directly in Django model classes, which will be automatically +detected by Django's migration system. +""" +from typing import Optional, Union, List, Dict, Any +from datetime import timedelta, datetime + + +class TimescalePolicies: + """ + Base class for defining TimescaleDB policies in Django models. + + Usage: + class MyModel(models.Model): + time = TimescaleDateTimeField(interval="1 day") + # ... other fields + + class TimescalePolicies: + compression = { + 'enabled': True, + 'compress_orderby': ['time'], + 'compress_segmentby': ['device_id'], + } + compression_policy = { + 'compress_after': '7 days', + 'schedule_interval': '1 hour', + } + retention_policy = { + 'drop_after': '90 days', + 'schedule_interval': '1 day', + } + """ + + def __init_subclass__(cls, **kwargs): + """ + Called when a subclass is created. This allows us to validate + the policy definitions at class creation time. + """ + super().__init_subclass__(**kwargs) + cls._validate_policies() + + @classmethod + def _validate_policies(cls): + """ + Validate the policy definitions to catch errors early. + """ + # Validate compression settings + if hasattr(cls, 'compression'): + compression = cls.compression + if not isinstance(compression, dict): + raise ValueError("compression must be a dictionary") + + if 'enabled' in compression and not isinstance(compression['enabled'], bool): + raise ValueError("compression.enabled must be a boolean") + + if 'compress_orderby' in compression: + if not isinstance(compression['compress_orderby'], list): + raise ValueError("compression.compress_orderby must be a list") + + if 'compress_segmentby' in compression: + if not isinstance(compression['compress_segmentby'], list): + raise ValueError("compression.compress_segmentby must be a list") + + # Validate compression policy + if hasattr(cls, 'compression_policy'): + policy = cls.compression_policy + if not isinstance(policy, dict): + raise ValueError("compression_policy must be a dictionary") + + if 'compress_after' not in policy: + raise ValueError("compression_policy must have 'compress_after'") + + # Validate retention policy + if hasattr(cls, 'retention_policy'): + policy = cls.retention_policy + if not isinstance(policy, dict): + raise ValueError("retention_policy must be a dictionary") + + if 'drop_after' not in policy: + raise ValueError("retention_policy must have 'drop_after'") + + @classmethod + def get_compression_settings(cls) -> Optional[Dict[str, Any]]: + """Get compression settings if defined.""" + return getattr(cls, 'compression', None) + + @classmethod + def get_compression_policy(cls) -> Optional[Dict[str, Any]]: + """Get compression policy if defined.""" + return getattr(cls, 'compression_policy', None) + + @classmethod + def get_retention_policy(cls) -> Optional[Dict[str, Any]]: + """Get retention policy if defined.""" + return getattr(cls, 'retention_policy', None) + + @classmethod + def has_any_policies(cls) -> bool: + """Check if any policies are defined.""" + return ( + hasattr(cls, 'compression') or + hasattr(cls, 'compression_policy') or + hasattr(cls, 'retention_policy') + ) + + +def get_model_timescale_policies(model_class) -> Optional[TimescalePolicies]: + """ + Extract TimescaleDB policies from a Django model class. + + This function checks both the direct TimescalePolicies class and + the Meta options (for migration state compatibility). + + Args: + model_class: Django model class to inspect + + Returns: + TimescalePolicies instance if policies are defined, None otherwise + """ + # First check if policies are stored in Meta options (for migration state compatibility) + if hasattr(model_class, '_meta') and hasattr(model_class._meta, 'timescale_policies'): + return model_class._meta.timescale_policies + + # Fallback to direct TimescalePolicies class + if hasattr(model_class, 'TimescalePolicies'): + return model_class.TimescalePolicies + return None + + +def serialize_policies_to_meta(policies_class) -> Dict[str, Any]: + """ + Convert a TimescalePolicies class to a dictionary suitable for Meta options. + + This allows Django's migration system to track policy changes. + """ + if not policies_class: + return {} + + meta_dict = {} + + if hasattr(policies_class, 'compression'): + meta_dict['compression'] = policies_class.compression + + if hasattr(policies_class, 'compression_policy'): + meta_dict['compression_policy'] = policies_class.compression_policy + + if hasattr(policies_class, 'retention_policy'): + meta_dict['retention_policy'] = policies_class.retention_policy + + return meta_dict + + +def deserialize_policies_from_meta(meta_dict: Dict[str, Any]): + """ + Convert a Meta options dictionary back to a TimescalePolicies-like object. + """ + if not meta_dict: + return None + + # Create a dynamic class with the policy attributes + class DeserializedPolicies(TimescalePolicies): + pass + + for key, value in meta_dict.items(): + setattr(DeserializedPolicies, key, value) + + return DeserializedPolicies + + +def compare_policies(old_policies: Optional[TimescalePolicies], + new_policies: Optional[TimescalePolicies]) -> Dict[str, Any]: + """ + Compare two policy definitions and return the differences. + + This is used by the migration autodetector to determine what changes + need to be made. + + Args: + old_policies: Previous policy definition + new_policies: New policy definition + + Returns: + Dictionary describing the changes needed + """ + changes = { + 'compression_changes': None, + 'compression_policy_changes': None, + 'retention_policy_changes': None, + } + + # Compare compression settings + old_compression = old_policies.get_compression_settings() if old_policies else None + new_compression = new_policies.get_compression_settings() if new_policies else None + + if old_compression != new_compression: + changes['compression_changes'] = { + 'old': old_compression, + 'new': new_compression, + } + + # Compare compression policy + old_comp_policy = old_policies.get_compression_policy() if old_policies else None + new_comp_policy = new_policies.get_compression_policy() if new_policies else None + + if old_comp_policy != new_comp_policy: + changes['compression_policy_changes'] = { + 'old': old_comp_policy, + 'new': new_comp_policy, + } + + # Compare retention policy + old_ret_policy = old_policies.get_retention_policy() if old_policies else None + new_ret_policy = new_policies.get_retention_policy() if new_policies else None + + if old_ret_policy != new_ret_policy: + changes['retention_policy_changes'] = { + 'old': old_ret_policy, + 'new': new_ret_policy, + } + + # Return None if no changes + if all(change is None for change in changes.values()): + return None + + return changes diff --git a/timescale/db/models/retention.py b/timescale/db/models/retention.py new file mode 100644 index 0000000..544a6e8 --- /dev/null +++ b/timescale/db/models/retention.py @@ -0,0 +1,128 @@ +""" +Retention policy implementation for TimescaleDB. +""" +from django.db import connection +from typing import Optional, Union +from datetime import datetime, timedelta + + +class RetentionPolicy: + """ + A class to manage retention policies for TimescaleDB hypertables. + + This allows setting up automatic removal of old data based on time, + helping manage database size. + """ + + @staticmethod + def add_retention_policy( + model, + drop_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + drop_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a retention policy to a TimescaleDB hypertable. + + Args: + model: The Django model with a TimescaleDateTimeField + drop_after: Chunks older than this interval will be dropped + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + drop_created_before: Alternative to drop_after, drops chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + + Raises: + ValueError: If both drop_after and drop_created_before are provided + """ + if drop_after is not None and drop_created_before is not None: + raise ValueError("Cannot specify both drop_after and drop_created_before") + + table_name = model._meta.db_table + + # Convert timedelta to string interval if needed + if isinstance(drop_after, timedelta): + drop_after = f"{drop_after.total_seconds()} seconds" + if isinstance(drop_created_before, timedelta): + drop_created_before = f"{drop_created_before.total_seconds()} seconds" + if isinstance(schedule_interval, timedelta): + schedule_interval = f"{schedule_interval.total_seconds()} seconds" + + # Build the SQL query + params = [] + sql = "SELECT add_retention_policy(" + + # Add the table name + sql += "%s" + params.append(table_name) + + # Add drop_after or drop_created_before + if drop_after is not None: + sql += ", drop_after => INTERVAL %s" + params.append(drop_after) + if drop_created_before is not None: + sql += ", drop_created_before => INTERVAL %s" + params.append(drop_created_before) + + # Add optional parameters + if schedule_interval is not None: + sql += ", schedule_interval => INTERVAL %s" + params.append(schedule_interval) + + if initial_start is not None: + sql += ", initial_start => %s" + params.append(initial_start) + + if timezone is not None: + sql += ", timezone => %s" + params.append(timezone) + + if if_not_exists: + sql += ", if_not_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + job_id = cursor.fetchone()[0] + + return job_id + + @staticmethod + def remove_retention_policy(model, if_exists: bool = False) -> bool: + """ + Remove a retention policy from a TimescaleDB hypertable. + + Args: + model: The Django model with the retention policy + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + table_name = model._meta.db_table + + # Build the SQL query + params = [table_name] + sql = "SELECT remove_retention_policy(%s" + + if if_exists: + sql += ", if_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + result = cursor.fetchone()[0] + + # TimescaleDB returns an empty string when successful + return result == '' diff --git a/timescale/tests/__init__.py b/timescale/tests/__init__.py index e69de29..a3d0c04 100644 --- a/timescale/tests/__init__.py +++ b/timescale/tests/__init__.py @@ -0,0 +1 @@ +# This file is intentionally left empty to make the directory a Python package. \ No newline at end of file diff --git a/timescale/tests/test_comprehensive_migrations.py b/timescale/tests/test_comprehensive_migrations.py new file mode 100644 index 0000000..260c190 --- /dev/null +++ b/timescale/tests/test_comprehensive_migrations.py @@ -0,0 +1,1207 @@ +""" +Comprehensive tests for TimescaleDB migration operations and declarative policies. + +This test suite combines unit tests and integration tests to thoroughly test: +1. Migration operation functionality (unit tests) +2. Real migration scenarios with data (integration tests) +3. Edge cases and policy changes +4. Both new and existing model scenarios +""" +import os +import sys +import unittest +import tempfile +from datetime import timedelta + +# Configure Django settings before importing Django modules +import django +from django.conf import settings + +if not settings.configured: + settings.configure( + DEBUG=True, + DATABASES={ + 'default': { + 'ENGINE': 'timescale.db.backends.postgresql', + 'NAME': os.environ.get('DB_DATABASE', 'test_timescale'), + 'USER': os.environ.get('DB_USERNAME', 'postgres'), + 'PASSWORD': os.environ.get('DB_PASSWORD', 'password'), + 'HOST': os.environ.get('DB_HOST', 'localhost'), + 'PORT': os.environ.get('DB_PORT', '5433'), + } + }, + INSTALLED_APPS=[ + 'timescale', + ], + USE_TZ=True, + SECRET_KEY='test-secret-key-for-testing-only', + ) + django.setup() + +from django.test import TransactionTestCase +from django.db import connection, models, migrations +from django.db.migrations.state import ProjectState +from django.db.migrations.autodetector import MigrationAutodetector +from django.utils import timezone + +# Import TimescaleDB components +from timescale.db.models.fields import TimescaleDateTimeField +from timescale.db.models.managers import TimescaleManager +from timescale.db.migrations.operations import ( + ApplyTimescalePolicies, + RemoveTimescalePolicies, +) + + +class MigrationTestModel(models.Model): + """Test model for migration operations.""" + time = TimescaleDateTimeField(interval="1 day") + temperature = models.FloatField(default=0.0) + device = models.IntegerField(default=0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_timescale_model' + + +class MigrationTestModelWithPolicies(models.Model): + """Test model with TimescaleDB policies defined.""" + timestamp = TimescaleDateTimeField(interval="1 hour") + sensor_id = models.CharField(max_length=50) + temperature = models.FloatField() + humidity = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_timescale_model_with_policies' + + class TimescalePolicies: + compression = { + 'enabled': True, + 'compress_orderby': ['timestamp'], + 'compress_segmentby': ['sensor_id'], + } + compression_policy = { + 'compress_after': '7 days', + 'schedule_interval': '1 hour', + } + retention_policy = { + 'drop_after': '90 days', + 'schedule_interval': '1 day', + } + + +class MigrationOperationUnitTests(unittest.TestCase): + """Unit tests for migration operations that don't require a database.""" + + def test_apply_timescale_policies_init(self): + """Test ApplyTimescalePolicies initialization.""" + operation = ApplyTimescalePolicies( + model_name='TestModel', + compression_settings={'enabled': True, 'compress_orderby': ['time']}, + compression_policy={'compress_after': '7 days'}, + retention_policy={'drop_after': '90 days'} + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.compression_settings['enabled'], True) + self.assertEqual(operation.compression_policy['compress_after'], '7 days') + self.assertEqual(operation.retention_policy['drop_after'], '90 days') + + def test_apply_timescale_policies_deconstruct(self): + """Test ApplyTimescalePolicies deconstruction for serialization.""" + operation = ApplyTimescalePolicies( + model_name='TestModel', + compression_settings={'enabled': True}, + compression_policy={'compress_after': '7 days'}, + retention_policy={'drop_after': '90 days'} + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'ApplyTimescalePolicies') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertIn('compression_settings', kwargs) + self.assertIn('compression_policy', kwargs) + self.assertIn('retention_policy', kwargs) + + def test_apply_timescale_policies_describe(self): + """Test ApplyTimescalePolicies description.""" + operation = ApplyTimescalePolicies( + model_name='TestModel', + compression_settings={'enabled': True}, + compression_policy={'compress_after': '7 days'}, + retention_policy={'drop_after': '90 days'} + ) + + description = operation.describe() + self.assertIn('Apply TimescaleDB', description) + self.assertIn('TestModel', description) + self.assertIn('compression', description) + self.assertIn('retention policy', description) + + def test_remove_timescale_policies_init(self): + """Test RemoveTimescalePolicies initialization.""" + operation = RemoveTimescalePolicies( + model_name='TestModel', + remove_compression_policy=True, + remove_retention_policy=True + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertTrue(operation.remove_compression_policy) + self.assertTrue(operation.remove_retention_policy) + + def test_remove_timescale_policies_deconstruct(self): + """Test RemoveTimescalePolicies deconstruction.""" + operation = RemoveTimescalePolicies( + model_name='TestModel', + remove_compression_policy=True, + remove_retention_policy=False + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'RemoveTimescalePolicies') + self.assertEqual(list(args), []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['remove_compression_policy'], True) + self.assertEqual(kwargs['remove_retention_policy'], False) + + def test_state_forwards_no_op(self): + """Test that state_forwards methods don't modify state.""" + operations = [ + ApplyTimescalePolicies(model_name='TestModel'), + RemoveTimescalePolicies(model_name='TestModel'), + ] + + # Mock state object + class MockState: + def __init__(self): + self.modified = False + + for operation in operations: + state = MockState() + # This should not raise an exception and should not modify state + operation.state_forwards('test_app', state) + self.assertFalse(state.modified) + + +class MigrationOperationIntegrationTests(TransactionTestCase): + """Integration tests for migration operations with real database.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test table + with connection.schema_editor() as schema_editor: + schema_editor.create_model(MigrationTestModel) + + # Create hypertable + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('test_timescale_model', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test table + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_timescale_model CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + MigrationTestModel.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + MigrationTestModel.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + + def tearDown(self): + """Clean up after each test.""" + # Remove any policies that might have been created + try: + MigrationTestModel.timescale.remove_retention_policy(if_exists=True) + except: + pass + try: + MigrationTestModel.timescale.remove_compression_policy(if_exists=True) + except: + pass + + # Clear test data + MigrationTestModel.objects.all().delete() + + def get_schema_editor(self): + """Get a schema editor for testing.""" + return connection.schema_editor() + + def get_project_state(self): + """Get a project state for testing.""" + # For our TimescaleDB operations, we don't need the model state + # since they work directly with the database + return ProjectState() + + def test_apply_timescale_policies_forward(self): + """Test applying TimescaleDB policies in forward migration.""" + operation = ApplyTimescalePolicies( + model_name='MigrationTestModel', + compression_settings={ + 'enabled': True, + 'compress_orderby': ['time'], + 'compress_segmentby': ['device'], + }, + compression_policy={ + 'compress_after': '30 days', + 'schedule_interval': '1 hour', + }, + retention_policy={ + 'drop_after': '90 days', + 'schedule_interval': '1 day', + } + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify compression was enabled + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'test_timescale_model' + """) + result = cursor.fetchone() + self.assertIsNotNone(result) + self.assertTrue(result[0]) + + # Verify compression policy exists + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'test_timescale_model' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + # Verify retention policy exists + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention' + AND hypertable_name = 'test_timescale_model' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_apply_timescale_policies_backward(self): + """Test removing TimescaleDB policies in backward migration.""" + # First apply policies + MigrationTestModel.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + MigrationTestModel.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + MigrationTestModel.timescale.add_retention_policy( + drop_after='90 days', + if_not_exists=True + ) + + operation = ApplyTimescalePolicies( + model_name='MigrationTestModel', + compression_settings={'enabled': True}, + compression_policy={'compress_after': '30 days'}, + retention_policy={'drop_after': '90 days'} + ) + + # Test backward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_backwards('timescale', schema_editor, state, state) + + # Verify policies were removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_timescale_model' + AND proc_name IN ('policy_compression', 'policy_retention') + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + + def test_remove_timescale_policies_forward(self): + """Test removing TimescaleDB policies in forward migration.""" + # First clean up any existing policies and compression + try: + MigrationTestModel.timescale.remove_compression_policy(if_exists=True) + MigrationTestModel.timescale.remove_retention_policy(if_exists=True) + except: + pass + + # Apply policies with consistent compression settings + MigrationTestModel.timescale.enable_compression( + compress_orderby=['time'], + compress_segmentby=['device'], # Include segmentby to avoid conflicts + if_not_exists=True + ) + MigrationTestModel.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + MigrationTestModel.timescale.add_retention_policy( + drop_after='90 days', + if_not_exists=True + ) + + operation = RemoveTimescalePolicies( + model_name='MigrationTestModel', + remove_compression_policy=True, + remove_retention_policy=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policies were removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_timescale_model' + AND proc_name IN ('policy_compression', 'policy_retention') + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + def test_remove_timescale_policies_backward_not_reversible(self): + """Test that backward migration raises NotImplementedError.""" + operation = RemoveTimescalePolicies( + model_name='MigrationTestModel', + remove_compression_policy=True, + remove_retention_policy=True + ) + + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + with self.assertRaises(NotImplementedError): + operation.database_backwards('timescale', schema_editor, state, state) + + +class SelfContainedMigrationTests(TransactionTestCase): + """Self-contained tests that don't depend on external models or migrations.""" + + def setUp(self): + """Set up for each test.""" + self.timestamp = timezone.now() + + def tearDown(self): + """Clean up after each test.""" + # Clean up any test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_self_contained CASCADE") + + def create_test_table_and_model(self, table_name='test_self_contained'): + """Create a test table and corresponding model.""" + with connection.cursor() as cursor: + # Create table + cursor.execute(f""" + CREATE TABLE {table_name} ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + value FLOAT NOT NULL, + PRIMARY KEY (id, timestamp) + ) + """) + + # Create hypertable + cursor.execute(f"SELECT create_hypertable('{table_name}', 'timestamp')") + + # Insert test data + for i in range(10): + cursor.execute(f""" + INSERT INTO {table_name} (timestamp, value) + VALUES (%s, %s) + """, [ + self.timestamp - timedelta(days=i), + float(i * 10) + ]) + + # Create and register a test model + class TestSelfContainedModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = table_name + + # Register the model temporarily + from django.apps import apps + apps.register_model('timescale', TestSelfContainedModel) + return TestSelfContainedModel + + def test_new_model_with_policies_migration(self): + """Test creating a new model with policies from scratch.""" + # Create the table and model + TestModel = self.create_test_table_and_model() + + try: + # Simulate applying policies via migration + operation = ApplyTimescalePolicies( + model_name='TestSelfContainedModel', + compression_settings={ + 'enabled': True, + 'compress_orderby': ['timestamp'], + }, + retention_policy={ + 'drop_after': '90 days', + 'schedule_interval': '1 day', + } + ) + + # Apply the operation + state = ProjectState() + with connection.schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify the policies were applied + with connection.cursor() as cursor: + # Check compression enabled + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'test_self_contained' + """) + result = cursor.fetchone() + self.assertTrue(result[0]) + + # Check retention policy exists + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_self_contained' + AND proc_name = 'policy_retention' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + finally: + # Unregister the model + from django.apps import apps + try: + del apps.all_models['timescale']['testselfcontainedmodel'] + except KeyError: + pass + + def test_policy_update_migration(self): + """Test updating policies on an existing model.""" + # Create table and model + TestModel = self.create_test_table_and_model() + + try: + # Apply initial policies + initial_operation = ApplyTimescalePolicies( + model_name='TestSelfContainedModel', + retention_policy={'drop_after': '90 days', 'schedule_interval': '1 day'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + initial_operation.database_forwards('timescale', schema_editor, state, state) + + # Verify initial policy + with connection.cursor() as cursor: + cursor.execute(""" + SELECT schedule_interval FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_self_contained' + AND proc_name = 'policy_retention' + """) + result = cursor.fetchone() + self.assertEqual(result[0], timedelta(days=1)) + + # Now change the policy + changed_operation = ApplyTimescalePolicies( + model_name='TestSelfContainedModel', + retention_policy={'drop_after': '60 days', 'schedule_interval': '12 hours'} + ) + + with connection.schema_editor() as schema_editor: + changed_operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policy was updated + with connection.cursor() as cursor: + cursor.execute(""" + SELECT schedule_interval FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_self_contained' + AND proc_name = 'policy_retention' + """) + result = cursor.fetchone() + self.assertEqual(result[0], timedelta(hours=12)) + + finally: + # Unregister the model + from django.apps import apps + try: + del apps.all_models['timescale']['testselfcontainedmodel'] + except KeyError: + pass + + +class EdgeCaseTests(TransactionTestCase): + """Tests for edge cases and error conditions.""" + + def test_apply_policies_to_nonexistent_model(self): + """Test applying policies to a model that doesn't exist.""" + operation = ApplyTimescalePolicies( + model_name='NonExistentModel', + retention_policy={'drop_after': '90 days'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + # This should raise an exception + with self.assertRaises(Exception): + operation.database_forwards('timescale', schema_editor, state, state) + + def test_apply_policies_with_invalid_intervals(self): + """Test applying policies with invalid time intervals.""" + # Create a test table first + with connection.cursor() as cursor: + cursor.execute(""" + CREATE TABLE test_invalid_table ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + PRIMARY KEY (id, timestamp) + ) + """) + cursor.execute("SELECT create_hypertable('test_invalid_table', 'timestamp')") + + try: + operation = ApplyTimescalePolicies( + model_name='TestModel', + retention_policy={'drop_after': 'invalid_interval'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + # This should raise an exception due to invalid interval + with self.assertRaises(Exception): + operation.database_forwards('timescale', schema_editor, state, state) + finally: + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_invalid_table CASCADE") + + def test_policy_update_functionality(self): + """Test that policy updates actually change the database policies.""" + # Create a test table + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_policy_update CASCADE") + cursor.execute(""" + CREATE TABLE test_policy_update ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + value FLOAT, + PRIMARY KEY (id, timestamp) + ) + """) + cursor.execute("SELECT create_hypertable('test_policy_update', 'timestamp')") + + # Create a test model dynamically + class TestPolicyUpdateModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_policy_update' + + # Register the model temporarily + from django.apps import apps + apps.register_model('timescale', TestPolicyUpdateModel) + + try: + # Apply initial policies + initial_operation = ApplyTimescalePolicies( + model_name='TestPolicyUpdateModel', + compression_settings={'enabled': True, 'compress_orderby': ['timestamp']}, + compression_policy={'compress_after': '30 days', 'schedule_interval': '1 hour'}, + retention_policy={'drop_after': '90 days', 'schedule_interval': '1 day'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + initial_operation.database_forwards('timescale', schema_editor, state, state) + + # Verify initial policies + with connection.cursor() as cursor: + cursor.execute(""" + SELECT config FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_policy_update' + AND proc_name = 'policy_compression' + """) + result = cursor.fetchone() + self.assertIn('"compress_after": "30 days"', result[0]) + + # Apply updated policies + updated_operation = ApplyTimescalePolicies( + model_name='TestPolicyUpdateModel', + compression_settings={'enabled': True, 'compress_orderby': ['timestamp']}, + compression_policy={'compress_after': '14 days', 'schedule_interval': '30 minutes'}, + retention_policy={'drop_after': '60 days', 'schedule_interval': '12 hours'} + ) + + with connection.schema_editor() as schema_editor: + updated_operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policies were updated + with connection.cursor() as cursor: + cursor.execute(""" + SELECT config FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_policy_update' + AND proc_name = 'policy_compression' + """) + result = cursor.fetchone() + self.assertIn('"compress_after": "14 days"', result[0]) + + cursor.execute(""" + SELECT config FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_policy_update' + AND proc_name = 'policy_retention' + """) + result = cursor.fetchone() + self.assertIn('"drop_after": "60 days"', result[0]) + + finally: + # Unregister the model + try: + del apps.all_models['timescale']['testpolicyupdatemodel'] + except KeyError: + pass + + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_policy_update CASCADE") + + +class IndividualOperationTests(TransactionTestCase): + """Tests for individual operations (backward compatibility).""" + + def setUp(self): + """Set up test table.""" + with connection.cursor() as cursor: + # Clean up any existing table first + cursor.execute("DROP TABLE IF EXISTS test_individual_ops CASCADE") + + cursor.execute(""" + CREATE TABLE test_individual_ops ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + value FLOAT, + PRIMARY KEY (id, timestamp) + ) + """) + cursor.execute("SELECT create_hypertable('test_individual_ops', 'timestamp')") + + def tearDown(self): + """Clean up test table.""" + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_individual_ops CASCADE") + + def test_add_retention_policy_operation(self): + """Test AddRetentionPolicy operation.""" + from timescale.db.migrations import AddRetentionPolicy + + # Create a test model dynamically + class TestIndividualOpsModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_individual_ops' + + # Register the model temporarily + from django.apps import apps + apps.register_model('timescale', TestIndividualOpsModel) + + try: + operation = AddRetentionPolicy( + model_name='TestIndividualOpsModel', + drop_after='90 days', + schedule_interval='1 day' + ) + + # Test forward migration + state = ProjectState() + with connection.schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_individual_ops' + AND proc_name = 'policy_retention' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + finally: + # Unregister the model + try: + del apps.all_models['timescale']['testindividualopsmodel'] + except KeyError: + pass + + def test_enable_compression_operation(self): + """Test EnableCompression operation.""" + from timescale.db.migrations import EnableCompression + + # Create a test model dynamically + class TestEnableCompressionModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_individual_ops' + + # Register the model temporarily + from django.apps import apps + apps.register_model('timescale', TestEnableCompressionModel) + + try: + operation = EnableCompression( + model_name='TestEnableCompressionModel', + compress_orderby=['timestamp'], + compress_segmentby=['value'] + ) + + # Test forward migration + state = ProjectState() + with connection.schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify compression was enabled + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'test_individual_ops' + """) + result = cursor.fetchone() + self.assertTrue(result[0]) + + finally: + # Unregister the model + try: + del apps.all_models['timescale']['testenablecompressionmodel'] + except KeyError: + pass + + def test_add_compression_policy_operation(self): + """Test AddCompressionPolicy operation.""" + from timescale.db.migrations import EnableCompression, AddCompressionPolicy + + # Create a test model dynamically + class TestCompressionPolicyModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_individual_ops' + + # Register the model temporarily + from django.apps import apps + apps.register_model('timescale', TestCompressionPolicyModel) + + try: + # First enable compression + enable_op = EnableCompression( + model_name='TestCompressionPolicyModel', + compress_orderby=['timestamp'] + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + enable_op.database_forwards('timescale', schema_editor, state, state) + + # Then add compression policy + policy_op = AddCompressionPolicy( + model_name='TestCompressionPolicyModel', + compress_after='7 days', + schedule_interval='1 hour' + ) + + with connection.schema_editor() as schema_editor: + policy_op.database_forwards('timescale', schema_editor, state, state) + + # Verify policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_individual_ops' + AND proc_name = 'policy_compression' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + finally: + # Unregister the model + try: + del apps.all_models['timescale']['testcompressionpolicymodel'] + except KeyError: + pass + + +class MigrationHistoryTests(TransactionTestCase): + """Tests for migration history detection.""" + + def test_migration_loader_approach(self): + """Test that migration history detection uses MigrationLoader correctly.""" + from timescale.db.migrations.autodetector import get_last_applied_policies_from_migrations + + # This should not raise an exception and should return None for non-existent model + result = get_last_applied_policies_from_migrations('test_app', 'NonExistentModel', None) + self.assertIsNone(result) + + def test_disk_fallback_approach(self): + """Test that disk fallback works when MigrationLoader fails.""" + from timescale.db.migrations.autodetector import get_policies_from_disk_fallback + + # This should not raise an exception and should return None for non-existent app + result = get_policies_from_disk_fallback('nonexistent_app', 'NonExistentModel') + self.assertIsNone(result) + + +class RealDataEdgeCaseTests(TransactionTestCase): + """Tests for edge cases with real data scenarios.""" + + def setUp(self): + """Set up test table with real data.""" + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_real_data CASCADE") + cursor.execute(""" + CREATE TABLE test_real_data ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + sensor_id VARCHAR(50) NOT NULL, + value FLOAT NOT NULL, + PRIMARY KEY (id, timestamp) + ) + """) + cursor.execute("SELECT create_hypertable('test_real_data', 'timestamp')") + + # Insert test data spanning different time periods + cursor.execute(""" + INSERT INTO test_real_data (timestamp, sensor_id, value) + SELECT + NOW() - (i || ' days')::INTERVAL, + 'sensor_' || (i % 5), + random() * 100 + FROM generate_series(1, 100) i + """) + + def tearDown(self): + """Clean up test table.""" + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_real_data CASCADE") + + def test_policy_changes_with_existing_data(self): + """Test changing policies on tables with existing data.""" + # Create test model + class TestRealDataModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + sensor_id = models.CharField(max_length=50) + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_real_data' + + from django.apps import apps + apps.register_model('timescale', TestRealDataModel) + + try: + # Apply initial policies + initial_operation = ApplyTimescalePolicies( + model_name='TestRealDataModel', + compression_settings={'enabled': True, 'compress_orderby': ['timestamp']}, + compression_policy={'compress_after': '30 days', 'schedule_interval': '1 hour'}, + retention_policy={'drop_after': '90 days', 'schedule_interval': '1 day'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + initial_operation.database_forwards('timescale', schema_editor, state, state) + + # Verify data count before policy change + with connection.cursor() as cursor: + cursor.execute("SELECT COUNT(*) FROM test_real_data") + initial_count = cursor.fetchone()[0] + self.assertGreater(initial_count, 0) + + # Change policies + updated_operation = ApplyTimescalePolicies( + model_name='TestRealDataModel', + compression_settings={'enabled': True, 'compress_orderby': ['timestamp']}, + compression_policy={'compress_after': '14 days', 'schedule_interval': '30 minutes'}, + retention_policy={'drop_after': '60 days', 'schedule_interval': '12 hours'} + ) + + with connection.schema_editor() as schema_editor: + updated_operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policies were updated + with connection.cursor() as cursor: + cursor.execute(""" + SELECT config FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_real_data' + AND proc_name = 'policy_compression' + """) + result = cursor.fetchone() + self.assertIn('"compress_after": "14 days"', result[0]) + + # Verify data integrity maintained + cursor.execute("SELECT COUNT(*) FROM test_real_data") + final_count = cursor.fetchone()[0] + self.assertGreater(final_count, 0) # Data should still exist + + finally: + try: + del apps.all_models['timescale']['testrealdatamodel'] + except KeyError: + pass + + def test_adding_policies_to_existing_hypertable(self): + """Test adding policies to an existing hypertable that had no policies.""" + # Create test model + class TestExistingHypertableModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_real_data' + + from django.apps import apps + apps.register_model('timescale', TestExistingHypertableModel) + + try: + # Verify no policies exist initially + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_real_data' + """) + initial_jobs = cursor.fetchone()[0] + self.assertEqual(initial_jobs, 0) + + # Add policies to existing hypertable + operation = ApplyTimescalePolicies( + model_name='TestExistingHypertableModel', + compression_settings={'enabled': True, 'compress_orderby': ['timestamp']}, + compression_policy={'compress_after': '7 days', 'schedule_interval': '2 hours'}, + retention_policy={'drop_after': '180 days', 'schedule_interval': '1 day'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policies were added + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_real_data' + """) + final_jobs = cursor.fetchone()[0] + self.assertEqual(final_jobs, 2) # compression + retention + + # Verify compression was enabled + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'test_real_data' + """) + compression_enabled = cursor.fetchone()[0] + self.assertTrue(compression_enabled) + + finally: + try: + del apps.all_models['timescale']['testexistinghypertablemodel'] + except KeyError: + pass + + +class BackwardCompatibilityTests(TransactionTestCase): + """Tests for backward compatibility with existing programmatic usage.""" + + def test_declarative_and_programmatic_together(self): + """Test that declarative and programmatic approaches work together.""" + # Create two test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_declarative CASCADE") + cursor.execute("DROP TABLE IF EXISTS test_programmatic CASCADE") + + cursor.execute(""" + CREATE TABLE test_declarative ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + value FLOAT, + PRIMARY KEY (id, timestamp) + ) + """) + cursor.execute("SELECT create_hypertable('test_declarative', 'timestamp')") + + cursor.execute(""" + CREATE TABLE test_programmatic ( + id SERIAL, + timestamp TIMESTAMPTZ NOT NULL, + value FLOAT, + PRIMARY KEY (id, timestamp) + ) + """) + cursor.execute("SELECT create_hypertable('test_programmatic', 'timestamp')") + + try: + # Create models + class TestDeclarativeModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_declarative' + + class TestProgrammaticModel(models.Model): + timestamp = TimescaleDateTimeField(interval="1 hour") + value = models.FloatField() + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'test_programmatic' + + from django.apps import apps + apps.register_model('timescale', TestDeclarativeModel) + apps.register_model('timescale', TestProgrammaticModel) + + # Apply declarative policies + declarative_operation = ApplyTimescalePolicies( + model_name='TestDeclarativeModel', + compression_settings={'enabled': True, 'compress_orderby': ['timestamp']}, + retention_policy={'drop_after': '90 days', 'schedule_interval': '1 day'} + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + declarative_operation.database_forwards('timescale', schema_editor, state, state) + + # Apply programmatic policies using individual operations + from timescale.db.migrations import EnableCompression, AddRetentionPolicy + + enable_compression = EnableCompression( + model_name='TestProgrammaticModel', + compress_orderby=['timestamp'] + ) + + add_retention = AddRetentionPolicy( + model_name='TestProgrammaticModel', + drop_after='60 days', + schedule_interval='12 hours' + ) + + with connection.schema_editor() as schema_editor: + enable_compression.database_forwards('timescale', schema_editor, state, state) + add_retention.database_forwards('timescale', schema_editor, state, state) + + # Verify both approaches worked + with connection.cursor() as cursor: + # Check declarative policies + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_declarative' + """) + declarative_jobs = cursor.fetchone()[0] + self.assertEqual(declarative_jobs, 1) # retention only + + # Check programmatic policies + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'test_programmatic' + """) + programmatic_jobs = cursor.fetchone()[0] + self.assertEqual(programmatic_jobs, 1) # retention only + + # Check compression enabled on both + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.hypertables + WHERE hypertable_name IN ('test_declarative', 'test_programmatic') + AND compression_enabled = true + """) + compressed_tables = cursor.fetchone()[0] + self.assertEqual(compressed_tables, 2) # Both tables + + finally: + try: + del apps.all_models['timescale']['testdeclarativemodel'] + del apps.all_models['timescale']['testprogrammaticmodel'] + except KeyError: + pass + + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS test_declarative CASCADE") + cursor.execute("DROP TABLE IF EXISTS test_programmatic CASCADE") + + +if __name__ == '__main__': + unittest.main() diff --git a/timescale/tests/test_migration_operations.py b/timescale/tests/test_migration_operations.py new file mode 100644 index 0000000..96dd305 --- /dev/null +++ b/timescale/tests/test_migration_operations.py @@ -0,0 +1,588 @@ +""" +Tests for TimescaleDB migration operations. + +These tests verify that the migration operations work correctly for both +forward and reverse migrations. +""" +import os + +# Configure Django settings before importing Django modules +import django +from django.conf import settings + +if not settings.configured: + settings.configure( + DEBUG=True, + DATABASES={ + 'default': { + 'ENGINE': 'timescale.db.backends.postgresql', + 'NAME': os.environ.get('DB_DATABASE', 'test_timescale'), + 'USER': os.environ.get('DB_USERNAME', 'postgres'), + 'PASSWORD': os.environ.get('DB_PASSWORD', 'password'), + 'HOST': os.environ.get('DB_HOST', 'localhost'), + 'PORT': os.environ.get('DB_PORT', '5433'), + } + }, + INSTALLED_APPS=[ + 'timescale', + ], + USE_TZ=True, + SECRET_KEY='test-secret-key-for-testing-only', + ) + django.setup() + +from django.test import TransactionTestCase +from django.db import connection, models, migrations +from django.db.migrations.state import ProjectState +from django.utils import timezone +from datetime import timedelta +import tempfile + +# Import directly to avoid circular imports +from timescale.db.models.fields import TimescaleDateTimeField +from timescale.db.models.managers import TimescaleManager +from timescale.db.migrations import ( + AddRetentionPolicy, + RemoveRetentionPolicy, + EnableCompression, + AddCompressionPolicy, + RemoveCompressionPolicy, +) + + +# Define test models (these won't be migrated, just created/dropped in tests) +class TestMigrationModel(models.Model): + """Test model for migration operations.""" + time = TimescaleDateTimeField(interval="1 day") + temperature = models.FloatField(default=0.0) + device = models.IntegerField(default=0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'timescale_testmigrationmodel' + + +class MigrationOperationTestCase(TransactionTestCase): + """Base test case for migration operations.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test table + with connection.schema_editor() as schema_editor: + schema_editor.create_model(TestMigrationModel) + + # Create hypertable + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_testmigrationmodel', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test table + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_testmigrationmodel CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + TestMigrationModel.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + TestMigrationModel.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + + def tearDown(self): + """Clean up after each test.""" + # Remove any policies that might have been created + try: + TestMigrationModel.timescale.remove_retention_policy(if_exists=True) + except: + pass + try: + TestMigrationModel.timescale.remove_compression_policy(if_exists=True) + except: + pass + + # Clear test data + TestMigrationModel.objects.all().delete() + + def get_schema_editor(self): + """Get a schema editor for testing.""" + return connection.schema_editor() + + def get_project_state(self): + """Get a project state for testing.""" + # For our TimescaleDB operations, we don't need the model state + # since they work directly with the database + return ProjectState() + + +class AddRetentionPolicyTests(MigrationOperationTestCase): + """Tests for AddRetentionPolicy migration operation.""" + + def test_add_retention_policy_forward(self): + """Test adding a retention policy in forward migration.""" + operation = AddRetentionPolicy( + model_name='TestMigrationModel', + drop_after='60 days', + if_not_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify the policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_add_retention_policy_backward(self): + """Test removing a retention policy in backward migration.""" + # First add a policy + TestMigrationModel.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + operation = AddRetentionPolicy( + model_name='TestMigrationModel', + drop_after='60 days', + if_not_exists=True + ) + + # Test backward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_backwards('timescale', schema_editor, state, state) + + # Verify the policy was removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + def test_add_retention_policy_with_timedelta(self): + """Test adding a retention policy with timedelta.""" + operation = AddRetentionPolicy( + model_name='TestMigrationModel', + drop_after=timedelta(days=30), + schedule_interval=timedelta(hours=12), + if_not_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify the policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_add_retention_policy_deconstruct(self): + """Test that the operation can be deconstructed properly.""" + operation = AddRetentionPolicy( + model_name='TestMigrationModel', + drop_after='60 days', + schedule_interval='1 day', + if_not_exists=False # Use non-default value to test inclusion + ) + + name, args, kwargs = operation.deconstruct() + self.assertEqual(name, 'AddRetentionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestMigrationModel') + self.assertEqual(kwargs['drop_after'], '60 days') + self.assertEqual(kwargs['schedule_interval'], '1 day') + self.assertEqual(kwargs['if_not_exists'], False) # Should be included when False + + +class RemoveRetentionPolicyTests(MigrationOperationTestCase): + """Tests for RemoveRetentionPolicy migration operation.""" + + def test_remove_retention_policy_forward(self): + """Test removing a retention policy in forward migration.""" + # First add a policy + TestMigrationModel.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + operation = RemoveRetentionPolicy( + model_name='TestMigrationModel', + if_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify the policy was removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + def test_remove_retention_policy_backward_not_reversible(self): + """Test that backward migration raises NotImplementedError.""" + operation = RemoveRetentionPolicy( + model_name='TestMigrationModel', + if_exists=True + ) + + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + with self.assertRaises(NotImplementedError): + operation.database_backwards('timescale', schema_editor, state, state) + + +class EnableCompressionTests(MigrationOperationTestCase): + """Tests for EnableCompression migration operation.""" + + def test_enable_compression_forward(self): + """Test enabling compression in forward migration.""" + operation = EnableCompression( + model_name='TestMigrationModel', + compress_orderby=['time'], + if_not_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify compression was enabled + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_testmigrationmodel' + """) + result = cursor.fetchone() + self.assertIsNotNone(result) + self.assertTrue(result[0]) + + def test_enable_compression_backward(self): + """Test disabling compression in backward migration.""" + # First enable compression + TestMigrationModel.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + operation = EnableCompression( + model_name='TestMigrationModel', + compress_orderby=['time'], + if_not_exists=True + ) + + # Test backward migration (should succeed silently due to if_exists=True) + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + result = operation.database_backwards('timescale', schema_editor, state, state) + # Should not raise an error due to if_exists=True in disable_compression + + def test_enable_compression_with_segmentby(self): + """Test enabling compression with segment by columns.""" + operation = EnableCompression( + model_name='TestMigrationModel', + compress_orderby=['time'], + compress_segmentby=['device'], + if_not_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify compression was enabled + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_testmigrationmodel' + """) + result = cursor.fetchone() + self.assertIsNotNone(result) + self.assertTrue(result[0]) + + +class AddCompressionPolicyTests(MigrationOperationTestCase): + """Tests for AddCompressionPolicy migration operation.""" + + def test_add_compression_policy_forward(self): + """Test adding a compression policy in forward migration.""" + # First enable compression + TestMigrationModel.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + operation = AddCompressionPolicy( + model_name='TestMigrationModel', + compress_after='30 days', + if_not_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify the policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_add_compression_policy_backward(self): + """Test removing a compression policy in backward migration.""" + # First enable compression and add policy + TestMigrationModel.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + TestMigrationModel.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + operation = AddCompressionPolicy( + model_name='TestMigrationModel', + compress_after='30 days', + if_not_exists=True + ) + + # Test backward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_backwards('timescale', schema_editor, state, state) + + # Verify the policy was removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + +class RemoveCompressionPolicyTests(MigrationOperationTestCase): + """Tests for RemoveCompressionPolicy migration operation.""" + + def test_remove_compression_policy_forward(self): + """Test removing a compression policy in forward migration.""" + # First enable compression and add policy + TestMigrationModel.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + TestMigrationModel.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + operation = RemoveCompressionPolicy( + model_name='TestMigrationModel', + if_exists=True + ) + + # Test forward migration + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify the policy was removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + def test_remove_compression_policy_backward_not_reversible(self): + """Test that backward migration raises NotImplementedError.""" + operation = RemoveCompressionPolicy( + model_name='TestMigrationModel', + if_exists=True + ) + + state = self.get_project_state() + with self.get_schema_editor() as schema_editor: + with self.assertRaises(NotImplementedError): + operation.database_backwards('timescale', schema_editor, state, state) + + +class MigrationOperationIntegrationTests(MigrationOperationTestCase): + """Integration tests for multiple migration operations.""" + + def test_full_policy_setup_workflow(self): + """Test a complete workflow of setting up compression and retention policies.""" + # Step 1: Enable compression + enable_compression_op = EnableCompression( + model_name='TestMigrationModel', + compress_orderby=['time'], + compress_segmentby=['device'], + if_not_exists=True + ) + + # Step 2: Add compression policy + add_compression_policy_op = AddCompressionPolicy( + model_name='TestMigrationModel', + compress_after='7 days', + if_not_exists=True + ) + + # Step 3: Add retention policy + add_retention_policy_op = AddRetentionPolicy( + model_name='TestMigrationModel', + drop_after='90 days', + if_not_exists=True + ) + + state = self.get_project_state() + + # Execute all operations + with self.get_schema_editor() as schema_editor: + enable_compression_op.database_forwards('timescale', schema_editor, state, state) + add_compression_policy_op.database_forwards('timescale', schema_editor, state, state) + add_retention_policy_op.database_forwards('timescale', schema_editor, state, state) + + # Verify compression is enabled + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_testmigrationmodel' + """) + result = cursor.fetchone() + self.assertIsNotNone(result) + self.assertTrue(result[0]) + + # Verify compression policy exists + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + # Verify retention policy exists + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE proc_name = 'policy_retention' + AND hypertable_name = 'timescale_testmigrationmodel' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_operation_descriptions(self): + """Test that operations have proper descriptions.""" + add_retention = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days' + ) + self.assertIn('Add retention policy', add_retention.describe()) + self.assertIn('TestModel', add_retention.describe()) + + remove_retention = RemoveRetentionPolicy( + model_name='TestModel' + ) + self.assertIn('Remove retention policy', remove_retention.describe()) + self.assertIn('TestModel', remove_retention.describe()) + + enable_compression = EnableCompression( + model_name='TestModel' + ) + self.assertIn('Enable compression', enable_compression.describe()) + self.assertIn('TestModel', enable_compression.describe()) + + add_compression = AddCompressionPolicy( + model_name='TestModel', + compress_after='30 days' + ) + self.assertIn('Add compression policy', add_compression.describe()) + self.assertIn('TestModel', add_compression.describe()) + + remove_compression = RemoveCompressionPolicy( + model_name='TestModel' + ) + self.assertIn('Remove compression policy', remove_compression.describe()) + self.assertIn('TestModel', remove_compression.describe()) + + def test_migration_name_fragments(self): + """Test that operations generate proper migration name fragments.""" + add_retention = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days' + ) + self.assertEqual(add_retention.migration_name_fragment, 'add_retention_policy_testmodel') + + remove_retention = RemoveRetentionPolicy( + model_name='TestModel' + ) + self.assertEqual(remove_retention.migration_name_fragment, 'remove_retention_policy_testmodel') + + enable_compression = EnableCompression( + model_name='TestModel' + ) + self.assertEqual(enable_compression.migration_name_fragment, 'enable_compression_testmodel') + + add_compression = AddCompressionPolicy( + model_name='TestModel', + compress_after='30 days' + ) + self.assertEqual(add_compression.migration_name_fragment, 'add_compression_policy_testmodel') + + remove_compression = RemoveCompressionPolicy( + model_name='TestModel' + ) + self.assertEqual(remove_compression.migration_name_fragment, 'remove_compression_policy_testmodel') diff --git a/timescale/tests/test_migration_operations_unit.py b/timescale/tests/test_migration_operations_unit.py new file mode 100644 index 0000000..b03f7a0 --- /dev/null +++ b/timescale/tests/test_migration_operations_unit.py @@ -0,0 +1,341 @@ +""" +Unit tests for TimescaleDB migration operations. + +These tests verify the migration operation structure, serialization, and basic functionality +without requiring a database connection. +""" +import unittest +from datetime import timedelta + +from timescale.db.migrations import ( + AddRetentionPolicy, + RemoveRetentionPolicy, + EnableCompression, + AddCompressionPolicy, + RemoveCompressionPolicy, +) + + +class MigrationOperationUnitTests(unittest.TestCase): + """Unit tests for migration operations that don't require a database.""" + + def test_add_retention_policy_init(self): + """Test AddRetentionPolicy initialization.""" + operation = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days', + schedule_interval='1 day', + if_not_exists=True + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.drop_after, '60 days') + self.assertEqual(operation.schedule_interval, '1 day') + self.assertTrue(operation.if_not_exists) + self.assertIsNone(operation.drop_created_before) + self.assertIsNone(operation.initial_start) + self.assertIsNone(operation.timezone) + + def test_add_retention_policy_with_timedelta(self): + """Test AddRetentionPolicy with timedelta values.""" + operation = AddRetentionPolicy( + model_name='TestModel', + drop_after=timedelta(days=30), + schedule_interval=timedelta(hours=12) + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.drop_after, timedelta(days=30)) + self.assertEqual(operation.schedule_interval, timedelta(hours=12)) + + def test_add_retention_policy_deconstruct(self): + """Test AddRetentionPolicy deconstruction for serialization.""" + operation = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days', + schedule_interval='1 day', + if_not_exists=False # Use non-default value to test inclusion + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'AddRetentionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['drop_after'], '60 days') + self.assertEqual(kwargs['schedule_interval'], '1 day') + self.assertEqual(kwargs['if_not_exists'], False) + self.assertNotIn('drop_created_before', kwargs) + self.assertNotIn('initial_start', kwargs) + self.assertNotIn('timezone', kwargs) + + def test_add_retention_policy_deconstruct_minimal(self): + """Test AddRetentionPolicy deconstruction with minimal parameters.""" + operation = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days' + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'AddRetentionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['drop_after'], '60 days') + # Default values should not be included + self.assertNotIn('if_not_exists', kwargs) + self.assertNotIn('schedule_interval', kwargs) + + def test_add_retention_policy_describe(self): + """Test AddRetentionPolicy description.""" + operation = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days' + ) + + description = operation.describe() + self.assertIn('Add retention policy', description) + self.assertIn('TestModel', description) + self.assertIn('60 days', description) + + def test_add_retention_policy_migration_name_fragment(self): + """Test AddRetentionPolicy migration name fragment.""" + operation = AddRetentionPolicy( + model_name='TestModel', + drop_after='60 days' + ) + + fragment = operation.migration_name_fragment + self.assertEqual(fragment, 'add_retention_policy_testmodel') + + def test_remove_retention_policy_init(self): + """Test RemoveRetentionPolicy initialization.""" + operation = RemoveRetentionPolicy( + model_name='TestModel', + if_exists=False + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertFalse(operation.if_exists) + + def test_remove_retention_policy_deconstruct(self): + """Test RemoveRetentionPolicy deconstruction.""" + operation = RemoveRetentionPolicy( + model_name='TestModel', + if_exists=False + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'RemoveRetentionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['if_exists'], False) + + def test_remove_retention_policy_deconstruct_default(self): + """Test RemoveRetentionPolicy deconstruction with default values.""" + operation = RemoveRetentionPolicy( + model_name='TestModel' + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'RemoveRetentionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + # Default value should not be included + self.assertNotIn('if_exists', kwargs) + + def test_enable_compression_init(self): + """Test EnableCompression initialization.""" + operation = EnableCompression( + model_name='TestModel', + compress_segmentby=['device_id'], + compress_orderby=['time'], + compress_chunk_time_interval='1 hour', + if_not_exists=False + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.compress_segmentby, ['device_id']) + self.assertEqual(operation.compress_orderby, ['time']) + self.assertEqual(operation.compress_chunk_time_interval, '1 hour') + self.assertFalse(operation.if_not_exists) + + def test_enable_compression_init_defaults(self): + """Test EnableCompression initialization with defaults.""" + operation = EnableCompression( + model_name='TestModel' + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.compress_segmentby, []) + self.assertEqual(operation.compress_orderby, []) + self.assertIsNone(operation.compress_chunk_time_interval) + self.assertTrue(operation.if_not_exists) + + def test_enable_compression_deconstruct(self): + """Test EnableCompression deconstruction.""" + operation = EnableCompression( + model_name='TestModel', + compress_segmentby=['device_id'], + compress_orderby=['time'], + if_not_exists=False + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'EnableCompression') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['compress_segmentby'], ['device_id']) + self.assertEqual(kwargs['compress_orderby'], ['time']) + self.assertEqual(kwargs['if_not_exists'], False) + + def test_enable_compression_deconstruct_minimal(self): + """Test EnableCompression deconstruction with minimal parameters.""" + operation = EnableCompression( + model_name='TestModel' + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'EnableCompression') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + # Empty lists and default values should not be included + self.assertNotIn('compress_segmentby', kwargs) + self.assertNotIn('compress_orderby', kwargs) + self.assertNotIn('compress_chunk_time_interval', kwargs) + self.assertNotIn('if_not_exists', kwargs) + + def test_add_compression_policy_init(self): + """Test AddCompressionPolicy initialization.""" + operation = AddCompressionPolicy( + model_name='TestModel', + compress_after='7 days', + schedule_interval='1 hour', + if_not_exists=False + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.compress_after, '7 days') + self.assertEqual(operation.schedule_interval, '1 hour') + self.assertFalse(operation.if_not_exists) + + def test_add_compression_policy_with_timedelta(self): + """Test AddCompressionPolicy with timedelta values.""" + operation = AddCompressionPolicy( + model_name='TestModel', + compress_after=timedelta(days=7), + schedule_interval=timedelta(hours=1) + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertEqual(operation.compress_after, timedelta(days=7)) + self.assertEqual(operation.schedule_interval, timedelta(hours=1)) + + def test_add_compression_policy_deconstruct(self): + """Test AddCompressionPolicy deconstruction.""" + operation = AddCompressionPolicy( + model_name='TestModel', + compress_after='7 days', + schedule_interval='1 hour', + if_not_exists=False + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'AddCompressionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['compress_after'], '7 days') + self.assertEqual(kwargs['schedule_interval'], '1 hour') + self.assertEqual(kwargs['if_not_exists'], False) + + def test_remove_compression_policy_init(self): + """Test RemoveCompressionPolicy initialization.""" + operation = RemoveCompressionPolicy( + model_name='TestModel', + if_exists=False + ) + + self.assertEqual(operation.model_name, 'TestModel') + self.assertFalse(operation.if_exists) + + def test_remove_compression_policy_deconstruct(self): + """Test RemoveCompressionPolicy deconstruction.""" + operation = RemoveCompressionPolicy( + model_name='TestModel', + if_exists=False + ) + + name, args, kwargs = operation.deconstruct() + + self.assertEqual(name, 'RemoveCompressionPolicy') + self.assertEqual(args, []) + self.assertEqual(kwargs['model_name'], 'TestModel') + self.assertEqual(kwargs['if_exists'], False) + + def test_all_operations_describe(self): + """Test that all operations have proper descriptions.""" + operations = [ + AddRetentionPolicy(model_name='TestModel', drop_after='60 days'), + RemoveRetentionPolicy(model_name='TestModel'), + EnableCompression(model_name='TestModel'), + AddCompressionPolicy(model_name='TestModel', compress_after='30 days'), + RemoveCompressionPolicy(model_name='TestModel'), + ] + + for operation in operations: + description = operation.describe() + self.assertIsInstance(description, str) + self.assertGreater(len(description), 0) + self.assertIn('TestModel', description) + + def test_all_operations_migration_name_fragment(self): + """Test that all operations have proper migration name fragments.""" + operations = [ + AddRetentionPolicy(model_name='TestModel', drop_after='60 days'), + RemoveRetentionPolicy(model_name='TestModel'), + EnableCompression(model_name='TestModel'), + AddCompressionPolicy(model_name='TestModel', compress_after='30 days'), + RemoveCompressionPolicy(model_name='TestModel'), + ] + + expected_fragments = [ + 'add_retention_policy_testmodel', + 'remove_retention_policy_testmodel', + 'enable_compression_testmodel', + 'add_compression_policy_testmodel', + 'remove_compression_policy_testmodel', + ] + + for operation, expected_fragment in zip(operations, expected_fragments): + fragment = operation.migration_name_fragment + self.assertEqual(fragment, expected_fragment) + + def test_state_forwards_no_op(self): + """Test that state_forwards methods don't modify state.""" + operations = [ + AddRetentionPolicy(model_name='TestModel', drop_after='60 days'), + RemoveRetentionPolicy(model_name='TestModel'), + EnableCompression(model_name='TestModel'), + AddCompressionPolicy(model_name='TestModel', compress_after='30 days'), + RemoveCompressionPolicy(model_name='TestModel'), + ] + + # Mock state object + class MockState: + def __init__(self): + self.modified = False + + for operation in operations: + state = MockState() + # This should not raise an exception and should not modify state + operation.state_forwards('test_app', state) + self.assertFalse(state.modified) + + +if __name__ == '__main__': + unittest.main() diff --git a/timescale/tests/test_policies.py b/timescale/tests/test_policies.py new file mode 100644 index 0000000..70e5b0a --- /dev/null +++ b/timescale/tests/test_policies.py @@ -0,0 +1,282 @@ +""" +Tests for TimescaleDB retention and compression policies. +""" +import os + +# Configure Django settings before importing Django modules +import django +from django.conf import settings + +if not settings.configured: + settings.configure( + DEBUG=True, + DATABASES={ + 'default': { + 'ENGINE': 'timescale.db.backends.postgresql', + 'NAME': os.environ.get('DB_DATABASE', 'test_timescale'), + 'USER': os.environ.get('DB_USERNAME', 'postgres'), + 'PASSWORD': os.environ.get('DB_PASSWORD', 'password'), + 'HOST': os.environ.get('DB_HOST', 'localhost'), + 'PORT': os.environ.get('DB_PORT', '5433'), + } + }, + INSTALLED_APPS=[ + 'timescale', + ], + USE_TZ=True, + SECRET_KEY='test-secret-key-for-testing-only', + ) + django.setup() + +from django.test import TransactionTestCase +from django.db import connection, models +from django.utils import timezone +from datetime import timedelta +from timescale.db.models.fields import TimescaleDateTimeField +from timescale.db.models.managers import TimescaleManager + + +# Define test model (self-contained, no external dependencies) +class PolicyTestMetric(models.Model): + """Test model with TimescaleDateTimeField.""" + time = TimescaleDateTimeField(interval="1 day") + temperature = models.FloatField(default=0.0) + device = models.IntegerField(default=0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'timescale_policytestmetric' + + +class RetentionPolicyTests(TransactionTestCase): + """Tests for TimescaleDB retention policies.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test table + with connection.schema_editor() as schema_editor: + schema_editor.create_model(PolicyTestMetric) + + # Create hypertable + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_policytestmetric', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test table + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_policytestmetric CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + PolicyTestMetric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + PolicyTestMetric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + PolicyTestMetric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_add_retention_policy(self): + """Test adding a retention policy.""" + # Add a retention policy to drop data older than 60 days + job_id = PolicyTestMetric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_retention_policy(self): + """Test removing a retention policy.""" + # Add a retention policy + job_id = PolicyTestMetric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Remove the policy + result = PolicyTestMetric.timescale.remove_retention_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + + +class CompressionPolicyTests(TransactionTestCase): + """Tests for TimescaleDB compression policies.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test table + with connection.schema_editor() as schema_editor: + schema_editor.create_model(PolicyTestMetric) + + # Create hypertable + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_policytestmetric', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test table + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_policytestmetric CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + PolicyTestMetric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + PolicyTestMetric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + PolicyTestMetric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_enable_compression(self): + """Test enabling compression on a hypertable.""" + # Enable compression + result = PolicyTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Check that compression was enabled + self.assertTrue(result) + + # Check that compression is enabled in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_policytestmetric' + """) + result = cursor.fetchone() + self.assertTrue(result[0]) + + def test_add_compression_policy(self): + """Test adding a compression policy.""" + # Enable compression first + PolicyTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy to compress data older than 30 days + job_id = PolicyTestMetric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_compression_policy(self): + """Test removing a compression policy.""" + # Enable compression first + PolicyTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy + job_id = PolicyTestMetric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Remove the policy + result = PolicyTestMetric.timescale.remove_compression_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + + def test_get_compression_stats(self): + """Test getting compression statistics.""" + # Enable compression + PolicyTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Get compression stats + stats = PolicyTestMetric.timescale.get_compression_stats() + + # Check that stats were returned + self.assertIsInstance(stats, list) + + # There should be at least one row for our hypertable + self.assertTrue(len(stats) > 0) diff --git a/timescale/tests/test_retention_compression.py b/timescale/tests/test_retention_compression.py new file mode 100644 index 0000000..c92ff68 --- /dev/null +++ b/timescale/tests/test_retention_compression.py @@ -0,0 +1,542 @@ +""" +Tests for TimescaleDB retention and compression policies. +""" +import os + +# Configure Django settings before importing Django modules +import django +from django.conf import settings + +if not settings.configured: + settings.configure( + DEBUG=True, + DATABASES={ + 'default': { + 'ENGINE': 'timescale.db.backends.postgresql', + 'NAME': os.environ.get('DB_DATABASE', 'test_timescale'), + 'USER': os.environ.get('DB_USERNAME', 'postgres'), + 'PASSWORD': os.environ.get('DB_PASSWORD', 'password'), + 'HOST': os.environ.get('DB_HOST', 'localhost'), + 'PORT': os.environ.get('DB_PORT', '5433'), + } + }, + INSTALLED_APPS=[ + 'timescale', + ], + USE_TZ=True, + SECRET_KEY='test-secret-key-for-testing-only', + ) + django.setup() + +from django.test import TransactionTestCase +from django.db import connection, models +from django.utils import timezone +from datetime import timedelta + +# Import directly to avoid circular imports +from timescale.db.models.fields import TimescaleDateTimeField +from timescale.db.models.managers import TimescaleManager + + +# Define test models (these won't be migrated, just created/dropped in tests) +class RetentionTestMetric(models.Model): + """Test model with TimescaleDateTimeField.""" + time = TimescaleDateTimeField(interval="1 day") + temperature = models.FloatField(default=0.0) + device = models.IntegerField(default=0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'timescale_retentiontestmetric' + + +class RetentionTestTimescaleModel(models.Model): + """Test model similar to TimescaleModel.""" + time = TimescaleDateTimeField(interval="1 day") + value = models.FloatField(default=0.0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'timescale_retentiontesttimescalemodel' + + +class RetentionPolicyTests(TransactionTestCase): + """Tests for TimescaleDB retention policies.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test tables + with connection.schema_editor() as schema_editor: + schema_editor.create_model(RetentionTestMetric) + schema_editor.create_model(RetentionTestTimescaleModel) + + # Create hypertables + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_retentiontestmetric', 'time', if_not_exists => TRUE)" + ) + cursor.execute( + "SELECT create_hypertable('timescale_retentiontesttimescalemodel', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_retentiontestmetric CASCADE") + cursor.execute("DROP TABLE IF EXISTS timescale_retentiontesttimescalemodel CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + RetentionTestMetric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + RetentionTestMetric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + RetentionTestMetric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_add_retention_policy(self): + """Test adding a retention policy.""" + # Add a retention policy to drop data older than 60 days + job_id = RetentionTestMetric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_retention_policy(self): + """Test removing a retention policy.""" + # Add a retention policy + job_id = RetentionTestMetric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Remove the policy + result = RetentionTestMetric.timescale.remove_retention_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + +class CompressionPolicyTests(TransactionTestCase): + """Tests for TimescaleDB compression policies.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test tables + with connection.schema_editor() as schema_editor: + schema_editor.create_model(RetentionTestMetric) + schema_editor.create_model(RetentionTestTimescaleModel) + + # Create hypertables + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_retentiontestmetric', 'time', if_not_exists => TRUE)" + ) + cursor.execute( + "SELECT create_hypertable('timescale_retentiontesttimescalemodel', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_retentiontestmetric CASCADE") + cursor.execute("DROP TABLE IF EXISTS timescale_retentiontesttimescalemodel CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + RetentionTestMetric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + RetentionTestMetric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + RetentionTestMetric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_enable_compression(self): + """Test enabling compression on a hypertable.""" + # Enable compression + result = RetentionTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Check that compression was enabled + self.assertTrue(result) + + # Check that compression is enabled in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_retentiontestmetric' + """) + result = cursor.fetchone() + # If result is None, compression might not be properly enabled + self.assertIsNotNone(result, "Compression status not found in hypertables") + self.assertTrue(result[0], "Compression is not enabled") + + def test_add_compression_policy(self): + """Test adding a compression policy.""" + # Enable compression first + RetentionTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy to compress data older than 30 days + job_id = RetentionTestMetric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_compression_policy(self): + """Test removing a compression policy.""" + # Enable compression first + RetentionTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy + job_id = RetentionTestMetric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Remove the policy + result = RetentionTestMetric.timescale.remove_compression_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + + def test_get_compression_stats(self): + """Test getting compression statistics.""" + # Enable compression + RetentionTestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Get compression stats + stats = RetentionTestMetric.timescale.get_compression_stats() + + # Check that stats were returned + self.assertIsInstance(stats, list) + + # There should be at least one row for our hypertable + self.assertTrue(len(stats) > 0) + + +class IndividualMigrationOperationTests(TransactionTestCase): + """Tests for individual migration operations (programmatic API).""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test tables + with connection.schema_editor() as schema_editor: + schema_editor.create_model(RetentionTestMetric) + + # Create hypertables + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_retentiontestmetric', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_retentiontestmetric CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Clean up any existing policies + try: + RetentionTestMetric.timescale.remove_retention_policy(if_exists=True) + RetentionTestMetric.timescale.remove_compression_policy(if_exists=True) + except: + pass + + def test_individual_operations_import(self): + """Test that individual operations can be imported.""" + from timescale.db.migrations import ( + AddRetentionPolicy, + RemoveRetentionPolicy, + EnableCompression, + AddCompressionPolicy, + RemoveCompressionPolicy + ) + + # Test that all operations exist and can be instantiated + operations = [ + AddRetentionPolicy('TestModel', drop_after='30 days'), + RemoveRetentionPolicy('TestModel'), + EnableCompression('TestModel', compress_orderby=['time']), + AddCompressionPolicy('TestModel', compress_after='7 days'), + RemoveCompressionPolicy('TestModel'), + ] + + for op in operations: + # Test that each operation has required methods + self.assertTrue(hasattr(op, 'describe')) + self.assertTrue(hasattr(op, 'migration_name_fragment')) + self.assertTrue(hasattr(op, 'deconstruct')) + + # Test descriptions + description = op.describe() + self.assertIsInstance(description, str) + self.assertGreater(len(description), 0) + + # Test migration fragments + fragment = op.migration_name_fragment + self.assertIsInstance(fragment, str) + self.assertGreater(len(fragment), 0) + + def test_enable_compression_operation(self): + """Test EnableCompression migration operation.""" + from timescale.db.migrations import EnableCompression + from django.db.migrations.state import ProjectState + + operation = EnableCompression( + model_name='RetentionTestMetric', + compress_orderby=['time'], + if_not_exists=True + ) + + # Test database operation + state = ProjectState() + with connection.schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify compression was enabled + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_retentiontestmetric' + """) + result = cursor.fetchone() + self.assertTrue(result[0]) + + def test_add_retention_policy_operation(self): + """Test AddRetentionPolicy migration operation.""" + from timescale.db.migrations import AddRetentionPolicy + from django.db.migrations.state import ProjectState + + operation = AddRetentionPolicy( + model_name='RetentionTestMetric', + drop_after='60 days', + schedule_interval='1 day', + if_not_exists=True + ) + + # Test database operation + state = ProjectState() + with connection.schema_editor() as schema_editor: + operation.database_forwards('timescale', schema_editor, state, state) + + # Verify policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'timescale_retentiontestmetric' + AND proc_name = 'policy_retention' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_add_compression_policy_operation(self): + """Test AddCompressionPolicy migration operation.""" + from timescale.db.migrations import EnableCompression, AddCompressionPolicy + from django.db.migrations.state import ProjectState + + # First enable compression + enable_op = EnableCompression( + model_name='RetentionTestMetric', + compress_orderby=['time'] + ) + + state = ProjectState() + with connection.schema_editor() as schema_editor: + enable_op.database_forwards('timescale', schema_editor, state, state) + + # Then add compression policy + policy_op = AddCompressionPolicy( + model_name='RetentionTestMetric', + compress_after='30 days', + schedule_interval='1 hour', + if_not_exists=True + ) + + with connection.schema_editor() as schema_editor: + policy_op.database_forwards('timescale', schema_editor, state, state) + + # Verify policy was created + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'timescale_retentiontestmetric' + AND proc_name = 'policy_compression' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 1) + + def test_remove_policies_operations(self): + """Test RemoveRetentionPolicy and RemoveCompressionPolicy operations.""" + from timescale.db.migrations import ( + EnableCompression, AddCompressionPolicy, AddRetentionPolicy, + RemoveCompressionPolicy, RemoveRetentionPolicy + ) + from django.db.migrations.state import ProjectState + + # Set up policies first + state = ProjectState() + with connection.schema_editor() as schema_editor: + # Enable compression and add policies + EnableCompression( + model_name='RetentionTestMetric', + compress_orderby=['time'] + ).database_forwards('timescale', schema_editor, state, state) + + AddCompressionPolicy( + model_name='RetentionTestMetric', + compress_after='7 days' + ).database_forwards('timescale', schema_editor, state, state) + + AddRetentionPolicy( + model_name='RetentionTestMetric', + drop_after='30 days' + ).database_forwards('timescale', schema_editor, state, state) + + # Verify policies exist + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'timescale_retentiontestmetric' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 2) # compression + retention + + # Test removing compression policy + remove_compression = RemoveCompressionPolicy( + model_name='RetentionTestMetric', + if_exists=True + ) + + with connection.schema_editor() as schema_editor: + remove_compression.database_forwards('timescale', schema_editor, state, state) + + # Verify compression policy removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'timescale_retentiontestmetric' + AND proc_name = 'policy_compression' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0) + + # Test removing retention policy + remove_retention = RemoveRetentionPolicy( + model_name='RetentionTestMetric', + if_exists=True + ) + + with connection.schema_editor() as schema_editor: + remove_retention.database_forwards('timescale', schema_editor, state, state) + + # Verify retention policy removed + with connection.cursor() as cursor: + cursor.execute(""" + SELECT COUNT(*) FROM timescaledb_information.jobs + WHERE hypertable_name = 'timescale_retentiontestmetric' + AND proc_name = 'policy_retention' + """) + count = cursor.fetchone()[0] + self.assertEqual(count, 0)