From 5a5839771c4eba4c0f31823728430ab3b7795d1a Mon Sep 17 00:00:00 2001 From: Reese Date: Fri, 25 Apr 2025 20:32:17 +0100 Subject: [PATCH] Add programmatic APIs for TimescaleDB retention policy and compression This commit adds support for TimescaleDB's retention policy and compression features: - Add RetentionPolicy class with methods to add and remove retention policies - Add CompressionPolicy class with methods to enable compression, add/remove policies, and get stats - Extend TimescaleManager with methods for managing retention and compression - Add comprehensive tests using TransactionTestCase for self-contained testing - Update README with examples of how to use the new APIs, including in migrations These APIs provide a programmatic way to manage TimescaleDB's data lifecycle features. The implementation uses a method-based approach where you call methods on the `timescale` manager, rather than a declarative approach in model definitions. Part of Milestone 2 roadmap. roadmap. Ref: #23 --- README.md | 112 +++++++ timescale/db/models/compression.py | 241 +++++++++++++++ timescale/db/models/managers.py | 137 ++++++++- timescale/db/models/retention.py | 128 ++++++++ timescale/tests/__init__.py | 1 + timescale/tests/test_policies.py | 193 ++++++++++++ timescale/tests/test_retention_compression.py | 281 ++++++++++++++++++ 7 files changed, 1092 insertions(+), 1 deletion(-) create mode 100644 timescale/db/models/compression.py create mode 100644 timescale/db/models/retention.py create mode 100644 timescale/tests/test_policies.py create mode 100644 timescale/tests/test_retention_compression.py diff --git a/README.md b/README.md index 14d8868..866460e 100644 --- a/README.md +++ b/README.md @@ -148,8 +148,120 @@ As such the use of the Django's ORM is perfectally suited to this type of data. ``` +### Data Retention and Compression + +TimescaleDB provides powerful features for managing data lifecycle and storage efficiency through retention policies and compression. Django-TimescaleDB provides programmatic APIs for these features. + +> **Note:** The current implementation uses a programmatic approach where you call methods on the `timescale` manager. These methods need to be called explicitly in your code (e.g., in a migration, management command, view, or Django shell). A future enhancement may provide a declarative approach where policies can be defined directly in model classes. + +#### Data Retention Policy [More Info](https://docs.timescale.com/use-timescale/latest/data-retention/) + +Data retention policies automatically remove old data based on time, helping manage database size. + +```python + from metrics.models import Metric + from datetime import timedelta + + # This code can be run in a Django shell, migration, management command, or view + + # Add a retention policy to drop data older than 90 days + job_id = Metric.timescale.add_retention_policy( + drop_after='90 days', + schedule_interval='1 day', + if_not_exists=True + ) + + # Or using timedelta objects + job_id = Metric.timescale.add_retention_policy( + drop_after=timedelta(days=90), + schedule_interval=timedelta(days=1) + ) + + # Remove a retention policy when no longer needed + Metric.timescale.remove_retention_policy(if_exists=True) +``` + +#### Compression [More Info](https://docs.timescale.com/use-timescale/latest/compression/) + +Compression reduces storage requirements by compressing older chunks of data. + +```python + from metrics.models import Metric + from datetime import timedelta + + # This code can be run in a Django shell, migration, management command, or view + + # Enable compression on the hypertable + Metric.timescale.enable_compression( + compress_orderby=['time'], + compress_segmentby=['device'] + ) + + # Add a compression policy to compress data older than 30 days + job_id = Metric.timescale.add_compression_policy( + compress_after='30 days', + schedule_interval='1 day', + if_not_exists=True + ) + + # Or using timedelta objects + job_id = Metric.timescale.add_compression_policy( + compress_after=timedelta(days=30), + schedule_interval=timedelta(days=1) + ) + + # Get compression statistics + stats = Metric.timescale.get_compression_stats() + + # Remove a compression policy when no longer needed + Metric.timescale.remove_compression_policy(if_exists=True) +``` + +#### Example Usage in a Django Migration + +```python +from django.db import migrations + +def setup_timescale_policies(apps, schema_editor): + # Get the model from the apps registry + Metric = apps.get_model('metrics', 'Metric') + + # Enable compression + Metric.timescale.enable_compression( + compress_orderby=['time'], + compress_segmentby=['device'] + ) + + # Add compression policy + Metric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Add retention policy + Metric.timescale.add_retention_policy( + drop_after='90 days', + if_not_exists=True + ) + +def reverse_timescale_policies(apps, schema_editor): + Metric = apps.get_model('metrics', 'Metric') + Metric.timescale.remove_compression_policy(if_exists=True) + Metric.timescale.remove_retention_policy(if_exists=True) + +class Migration(migrations.Migration): + dependencies = [ + ('metrics', '0001_initial'), + ] + + operations = [ + migrations.RunPython(setup_timescale_policies, reverse_timescale_policies), + ] +``` + ## Contributors - [Rasmus Schlünsen](https://github.com/schlunsen) - [Ben Cleary](https://github.com/bencleary) - [Jonathan Sundqvist](https://github.com/jonathan-s) - [Harsh Bhikadia](https://github.com/daadu) +- [Reese Jenner](https://github.com/voarsh2) \ No newline at end of file diff --git a/timescale/db/models/compression.py b/timescale/db/models/compression.py new file mode 100644 index 0000000..34662f4 --- /dev/null +++ b/timescale/db/models/compression.py @@ -0,0 +1,241 @@ +""" +Compression policy implementation for TimescaleDB. +""" +from django.db import connection +from typing import Optional, Union, List, Dict, Any +from datetime import datetime, timedelta + + +class CompressionPolicy: + """ + A class to manage compression policies for TimescaleDB hypertables. + + This allows setting up automatic compression of chunks to reduce storage requirements. + """ + + @staticmethod + def add_compression_policy( + model, + compress_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + compress_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a compression policy to a TimescaleDB hypertable. + + Args: + model: The Django model with a TimescaleDateTimeField + compress_after: Chunks older than this interval will be compressed + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + compress_created_before: Alternative to compress_after, compresses chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + + Raises: + ValueError: If both compress_after and compress_created_before are provided + """ + if compress_after is not None and compress_created_before is not None: + raise ValueError("Cannot specify both compress_after and compress_created_before") + + table_name = model._meta.db_table + + # Convert timedelta to string interval if needed + if isinstance(compress_after, timedelta): + compress_after = f"{compress_after.total_seconds()} seconds" + if isinstance(compress_created_before, timedelta): + compress_created_before = f"{compress_created_before.total_seconds()} seconds" + if isinstance(schedule_interval, timedelta): + schedule_interval = f"{schedule_interval.total_seconds()} seconds" + + # Build the SQL query + params = [] + sql = "SELECT add_compression_policy(" + + # Add the table name + sql += "%s" + params.append(table_name) + + # Add compress_after or compress_created_before + if compress_after is not None: + sql += ", compress_after => INTERVAL %s" + params.append(compress_after) + if compress_created_before is not None: + sql += ", compress_created_before => INTERVAL %s" + params.append(compress_created_before) + + # Add optional parameters + if schedule_interval is not None: + sql += ", schedule_interval => INTERVAL %s" + params.append(schedule_interval) + + if initial_start is not None: + sql += ", initial_start => %s" + params.append(initial_start) + + if timezone is not None: + sql += ", timezone => %s" + params.append(timezone) + + if if_not_exists: + sql += ", if_not_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + job_id = cursor.fetchone()[0] + + return job_id + + @staticmethod + def remove_compression_policy(model, if_exists: bool = False) -> bool: + """ + Remove a compression policy from a TimescaleDB hypertable. + + Args: + model: The Django model with the compression policy + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + table_name = model._meta.db_table + + # Build the SQL query + params = [table_name] + sql = "SELECT remove_compression_policy(%s" + + if if_exists: + sql += ", if_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + result = cursor.fetchone()[0] + + # If the result is None, the policy was removed successfully + return True if result is None else result + + @staticmethod + def enable_compression(model, compress_segmentby: Optional[List[str]] = None, + compress_orderby: Optional[List[str]] = None, + compress_chunk_time_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False) -> bool: + """ + Enable compression on a TimescaleDB hypertable. + + Args: + model: The Django model to enable compression on + compress_segmentby: List of columns to segment by + compress_orderby: List of columns to order by + compress_chunk_time_interval: Time interval for compression chunks + if_not_exists: If True, don't error if compression is already enabled + + Returns: + True if compression was enabled, False otherwise + """ + table_name = model._meta.db_table + + # Convert timedelta to string interval if needed + if isinstance(compress_chunk_time_interval, timedelta): + compress_chunk_time_interval = f"{compress_chunk_time_interval.total_seconds()} seconds" + + # Build the SQL query + sql = f"ALTER TABLE {table_name} SET (timescaledb.compress = TRUE" + + if compress_segmentby: + sql += f", timescaledb.compress_segmentby = '{','.join(compress_segmentby)}'" + + if compress_orderby: + sql += f", timescaledb.compress_orderby = '{','.join(compress_orderby)}'" + + if compress_chunk_time_interval: + sql += f", timescaledb.compress_chunk_time_interval = '{compress_chunk_time_interval}'" + + sql += ")" + + # Execute the query + try: + with connection.cursor() as cursor: + cursor.execute(sql) + return True + except Exception as e: + if if_not_exists and "already compressed" in str(e): + return False + raise + + @staticmethod + def compress_chunk(chunk_name: str) -> bool: + """ + Manually compress a specific chunk. + + Args: + chunk_name: The name of the chunk to compress + + Returns: + True if the chunk was compressed, False otherwise + """ + # Build the SQL query + sql = f"SELECT compress_chunk('{chunk_name}')" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql) + result = cursor.fetchone()[0] + + return result + + @staticmethod + def decompress_chunk(chunk_name: str) -> bool: + """ + Manually decompress a specific chunk. + + Args: + chunk_name: The name of the chunk to decompress + + Returns: + True if the chunk was decompressed, False otherwise + """ + # Build the SQL query + sql = f"SELECT decompress_chunk('{chunk_name}')" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql) + result = cursor.fetchone()[0] + + return result + + @staticmethod + def get_compression_stats(model) -> List[Dict[str, Any]]: + """ + Get compression statistics for a hypertable. + + Args: + model: The Django model to get compression stats for + + Returns: + A list of dictionaries containing compression statistics + """ + table_name = model._meta.db_table + + # Build the SQL query + sql = f"SELECT * FROM hypertable_compression_stats('{table_name}')" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql) + columns = [col[0] for col in cursor.description] + results = [dict(zip(columns, row)) for row in cursor.fetchall()] + + return results diff --git a/timescale/db/models/managers.py b/timescale/db/models/managers.py index c3c0021..bf7c919 100644 --- a/timescale/db/models/managers.py +++ b/timescale/db/models/managers.py @@ -1,6 +1,7 @@ from django.db import models from timescale.db.models.querysets import * -from typing import Optional +from typing import Optional, Union, List, Dict, Any +from datetime import datetime, timedelta class TimescaleManager(models.Manager): @@ -26,3 +27,137 @@ def histogram(self, field: str, min_value: float, max_value: float, num_of_bucke def lttb(self, time: str, value: str, num_of_counts: int = 20): return self.get_queryset().lttb(time, value, num_of_counts) + + # Retention Policy Methods + def add_retention_policy( + self, + drop_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + drop_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a retention policy to automatically remove old data from the hypertable. + + Args: + drop_after: Chunks older than this interval will be dropped + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + drop_created_before: Alternative to drop_after, drops chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + """ + from timescale.db.models.retention import RetentionPolicy + return RetentionPolicy.add_retention_policy( + self.model, + drop_after=drop_after, + schedule_interval=schedule_interval, + if_not_exists=if_not_exists, + drop_created_before=drop_created_before, + initial_start=initial_start, + timezone=timezone + ) + + def remove_retention_policy(self, if_exists: bool = False) -> bool: + """ + Remove a retention policy from the hypertable. + + Args: + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + from timescale.db.models.retention import RetentionPolicy + return RetentionPolicy.remove_retention_policy(self.model, if_exists=if_exists) + + # Compression Policy Methods + def add_compression_policy( + self, + compress_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + compress_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a compression policy to automatically compress chunks in the hypertable. + + Args: + compress_after: Chunks older than this interval will be compressed + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + compress_created_before: Alternative to compress_after, compresses chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.add_compression_policy( + self.model, + compress_after=compress_after, + schedule_interval=schedule_interval, + if_not_exists=if_not_exists, + compress_created_before=compress_created_before, + initial_start=initial_start, + timezone=timezone + ) + + def remove_compression_policy(self, if_exists: bool = False) -> bool: + """ + Remove a compression policy from the hypertable. + + Args: + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.remove_compression_policy(self.model, if_exists=if_exists) + + def enable_compression( + self, + compress_segmentby: Optional[List[str]] = None, + compress_orderby: Optional[List[str]] = None, + compress_chunk_time_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False + ) -> bool: + """ + Enable compression on the hypertable. + + Args: + compress_segmentby: List of columns to segment by + compress_orderby: List of columns to order by + compress_chunk_time_interval: Time interval for compression chunks + if_not_exists: If True, don't error if compression is already enabled + + Returns: + True if compression was enabled, False otherwise + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.enable_compression( + self.model, + compress_segmentby=compress_segmentby, + compress_orderby=compress_orderby, + compress_chunk_time_interval=compress_chunk_time_interval, + if_not_exists=if_not_exists + ) + + def get_compression_stats(self) -> List[Dict[str, Any]]: + """ + Get compression statistics for the hypertable. + + Returns: + A list of dictionaries containing compression statistics + """ + from timescale.db.models.compression import CompressionPolicy + return CompressionPolicy.get_compression_stats(self.model) diff --git a/timescale/db/models/retention.py b/timescale/db/models/retention.py new file mode 100644 index 0000000..a2d1620 --- /dev/null +++ b/timescale/db/models/retention.py @@ -0,0 +1,128 @@ +""" +Retention policy implementation for TimescaleDB. +""" +from django.db import connection +from typing import Optional, Union +from datetime import datetime, timedelta + + +class RetentionPolicy: + """ + A class to manage retention policies for TimescaleDB hypertables. + + This allows setting up automatic removal of old data based on time, + helping manage database size. + """ + + @staticmethod + def add_retention_policy( + model, + drop_after: Union[str, int, timedelta], + schedule_interval: Optional[Union[str, timedelta]] = None, + if_not_exists: bool = False, + drop_created_before: Optional[Union[str, timedelta]] = None, + initial_start: Optional[datetime] = None, + timezone: Optional[str] = None + ) -> int: + """ + Add a retention policy to a TimescaleDB hypertable. + + Args: + model: The Django model with a TimescaleDateTimeField + drop_after: Chunks older than this interval will be dropped + schedule_interval: The interval between policy executions + if_not_exists: If True, don't error if policy already exists + drop_created_before: Alternative to drop_after, drops chunks created before this interval + initial_start: Time the policy is first run + timezone: Timezone for the policy + + Returns: + The job ID of the created policy + + Raises: + ValueError: If both drop_after and drop_created_before are provided + """ + if drop_after is not None and drop_created_before is not None: + raise ValueError("Cannot specify both drop_after and drop_created_before") + + table_name = model._meta.db_table + + # Convert timedelta to string interval if needed + if isinstance(drop_after, timedelta): + drop_after = f"{drop_after.total_seconds()} seconds" + if isinstance(drop_created_before, timedelta): + drop_created_before = f"{drop_created_before.total_seconds()} seconds" + if isinstance(schedule_interval, timedelta): + schedule_interval = f"{schedule_interval.total_seconds()} seconds" + + # Build the SQL query + params = [] + sql = "SELECT add_retention_policy(" + + # Add the table name + sql += "%s" + params.append(table_name) + + # Add drop_after or drop_created_before + if drop_after is not None: + sql += ", drop_after => INTERVAL %s" + params.append(drop_after) + if drop_created_before is not None: + sql += ", drop_created_before => INTERVAL %s" + params.append(drop_created_before) + + # Add optional parameters + if schedule_interval is not None: + sql += ", schedule_interval => INTERVAL %s" + params.append(schedule_interval) + + if initial_start is not None: + sql += ", initial_start => %s" + params.append(initial_start) + + if timezone is not None: + sql += ", timezone => %s" + params.append(timezone) + + if if_not_exists: + sql += ", if_not_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + job_id = cursor.fetchone()[0] + + return job_id + + @staticmethod + def remove_retention_policy(model, if_exists: bool = False) -> bool: + """ + Remove a retention policy from a TimescaleDB hypertable. + + Args: + model: The Django model with the retention policy + if_exists: If True, don't error if policy doesn't exist + + Returns: + True if the policy was removed, False otherwise + """ + table_name = model._meta.db_table + + # Build the SQL query + params = [table_name] + sql = "SELECT remove_retention_policy(%s" + + if if_exists: + sql += ", if_exists => TRUE" + + sql += ")" + + # Execute the query + with connection.cursor() as cursor: + cursor.execute(sql, params) + result = cursor.fetchone()[0] + + # If the result is None, the policy was removed successfully + return True if result is None else result diff --git a/timescale/tests/__init__.py b/timescale/tests/__init__.py index e69de29..a3d0c04 100644 --- a/timescale/tests/__init__.py +++ b/timescale/tests/__init__.py @@ -0,0 +1 @@ +# This file is intentionally left empty to make the directory a Python package. \ No newline at end of file diff --git a/timescale/tests/test_policies.py b/timescale/tests/test_policies.py new file mode 100644 index 0000000..1bdfe2e --- /dev/null +++ b/timescale/tests/test_policies.py @@ -0,0 +1,193 @@ +""" +Tests for TimescaleDB retention and compression policies. +""" +from django.test import TestCase +from django.db import connection +from django.utils import timezone +from datetime import timedelta +from metrics.models import Metric, AnotherMetricFromTimeScaleModel +from timescale.db.models.retention import RetentionPolicy +from timescale.db.models.compression import CompressionPolicy + + +class RetentionPolicyTests(TestCase): + """Tests for TimescaleDB retention policies.""" + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + Metric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + Metric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + Metric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_add_retention_policy(self): + """Test adding a retention policy.""" + # Add a retention policy to drop data older than 60 days + job_id = Metric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_retention_policy(self): + """Test removing a retention policy.""" + # Add a retention policy + job_id = Metric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Remove the policy + result = Metric.timescale.remove_retention_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + + +class CompressionPolicyTests(TestCase): + """Tests for TimescaleDB compression policies.""" + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + Metric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + Metric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + Metric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_enable_compression(self): + """Test enabling compression on a hypertable.""" + # Enable compression + result = Metric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Check that compression was enabled + self.assertTrue(result) + + # Check that compression is enabled in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'metrics_metric' + """) + result = cursor.fetchone() + self.assertTrue(result[0]) + + def test_add_compression_policy(self): + """Test adding a compression policy.""" + # Enable compression first + Metric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy to compress data older than 30 days + job_id = Metric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_compression_policy(self): + """Test removing a compression policy.""" + # Enable compression first + Metric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy + job_id = Metric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Remove the policy + result = Metric.timescale.remove_compression_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + + def test_get_compression_stats(self): + """Test getting compression statistics.""" + # Enable compression + Metric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Get compression stats + stats = Metric.timescale.get_compression_stats() + + # Check that stats were returned + self.assertIsInstance(stats, list) + + # There should be at least one row for our hypertable + self.assertTrue(len(stats) > 0) diff --git a/timescale/tests/test_retention_compression.py b/timescale/tests/test_retention_compression.py new file mode 100644 index 0000000..f9fe70f --- /dev/null +++ b/timescale/tests/test_retention_compression.py @@ -0,0 +1,281 @@ +""" +Tests for TimescaleDB retention and compression policies. +""" +from django.test import TransactionTestCase +from django.db import connection, models +from django.utils import timezone +from datetime import timedelta + +# Import directly to avoid circular imports +from timescale.db.models.fields import TimescaleDateTimeField +from timescale.db.models.managers import TimescaleManager + + +# Define test models (these won't be migrated, just created/dropped in tests) +class TestMetric(models.Model): + """Test model with TimescaleDateTimeField.""" + time = TimescaleDateTimeField(interval="1 day") + temperature = models.FloatField(default=0.0) + device = models.IntegerField(default=0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'timescale_testmetric' + + +class TestTimescaleModel(models.Model): + """Test model similar to TimescaleModel.""" + time = TimescaleDateTimeField(interval="1 day") + value = models.FloatField(default=0.0) + + objects = models.Manager() + timescale = TimescaleManager() + + class Meta: + app_label = 'timescale' + db_table = 'timescale_testtimescalemodel' + + +class RetentionPolicyTests(TransactionTestCase): + """Tests for TimescaleDB retention policies.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test tables + with connection.schema_editor() as schema_editor: + schema_editor.create_model(TestMetric) + schema_editor.create_model(TestTimescaleModel) + + # Create hypertables + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_testmetric', 'time', if_not_exists => TRUE)" + ) + cursor.execute( + "SELECT create_hypertable('timescale_testtimescalemodel', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_testmetric CASCADE") + cursor.execute("DROP TABLE IF EXISTS timescale_testtimescalemodel CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + TestMetric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + TestMetric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + TestMetric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_add_retention_policy(self): + """Test adding a retention policy.""" + # Add a retention policy to drop data older than 60 days + job_id = TestMetric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_retention_policy(self): + """Test removing a retention policy.""" + # Add a retention policy + job_id = TestMetric.timescale.add_retention_policy( + drop_after='60 days', + if_not_exists=True + ) + + # Remove the policy + result = TestMetric.timescale.remove_retention_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + +class CompressionPolicyTests(TransactionTestCase): + """Tests for TimescaleDB compression policies.""" + + @classmethod + def setUpClass(cls): + """Set up test data.""" + super().setUpClass() + + # Create the test tables + with connection.schema_editor() as schema_editor: + schema_editor.create_model(TestMetric) + schema_editor.create_model(TestTimescaleModel) + + # Create hypertables + with connection.cursor() as cursor: + cursor.execute( + "SELECT create_hypertable('timescale_testmetric', 'time', if_not_exists => TRUE)" + ) + cursor.execute( + "SELECT create_hypertable('timescale_testtimescalemodel', 'time', if_not_exists => TRUE)" + ) + + @classmethod + def tearDownClass(cls): + """Clean up test data.""" + # Drop the test tables + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS timescale_testmetric CASCADE") + cursor.execute("DROP TABLE IF EXISTS timescale_testtimescalemodel CASCADE") + + super().tearDownClass() + + def setUp(self): + """Set up test data.""" + # Create some test data + self.timestamp = timezone.now() + TestMetric.objects.create( + time=self.timestamp - timedelta(days=30), + temperature=10.0, + device=1 + ) + TestMetric.objects.create( + time=self.timestamp - timedelta(days=60), + temperature=15.0, + device=1 + ) + TestMetric.objects.create( + time=self.timestamp - timedelta(days=90), + temperature=20.0, + device=1 + ) + + def test_enable_compression(self): + """Test enabling compression on a hypertable.""" + # Enable compression + result = TestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Check that compression was enabled + self.assertTrue(result) + + # Check that compression is enabled in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT compression_enabled FROM timescaledb_information.hypertables + WHERE hypertable_name = 'timescale_testmetric' + """) + result = cursor.fetchone() + # If result is None, compression might not be properly enabled + self.assertIsNotNone(result, "Compression status not found in hypertables") + self.assertTrue(result[0], "Compression is not enabled") + + def test_add_compression_policy(self): + """Test adding a compression policy.""" + # Enable compression first + TestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy to compress data older than 30 days + job_id = TestMetric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Check that the job was created + self.assertIsNotNone(job_id) + + # Check that the policy exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNotNone(result) + + def test_remove_compression_policy(self): + """Test removing a compression policy.""" + # Enable compression first + TestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Add a compression policy + job_id = TestMetric.timescale.add_compression_policy( + compress_after='30 days', + if_not_exists=True + ) + + # Remove the policy + result = TestMetric.timescale.remove_compression_policy(if_exists=True) + + # Check that the policy was removed + self.assertTrue(result) + + # Check that the policy no longer exists in the database + with connection.cursor() as cursor: + cursor.execute(""" + SELECT * FROM timescaledb_information.jobs + WHERE job_id = %s + """, [job_id]) + result = cursor.fetchone() + self.assertIsNone(result) + + def test_get_compression_stats(self): + """Test getting compression statistics.""" + # Enable compression + TestMetric.timescale.enable_compression( + compress_orderby=['time'], + if_not_exists=True + ) + + # Get compression stats + stats = TestMetric.timescale.get_compression_stats() + + # Check that stats were returned + self.assertIsInstance(stats, list) + + # There should be at least one row for our hypertable + self.assertTrue(len(stats) > 0)