Skip to content

Conversation

@voarsh2
Copy link

@voarsh2 voarsh2 commented Apr 25, 2025

This commit adds support for TimescaleDB's retention policy and compression features:

  • Add RetentionPolicy class with methods to add and remove retention policies
  • Add CompressionPolicy class with methods to enable compression, add/remove policies, and get stats
  • Extend TimescaleManager with methods for managing retention and compression
  • Add comprehensive tests using TransactionTestCase for self-contained testing
  • Update README with examples of how to use the new APIs, including in migrations

These APIs provide a programmatic way to manage TimescaleDB's data lifecycle features. The implementation uses a method-based approach where you call methods on the timescale manager, rather than a declarative approach in model definitions.

Part of Milestone 2 roadmap. roadmap. Ref: #23

This commit adds support for TimescaleDB's retention policy and compression features:

- Add RetentionPolicy class with methods to add and remove retention policies
- Add CompressionPolicy class with methods to enable compression, add/remove policies, and get stats
- Extend TimescaleManager with methods for managing retention and compression
- Add comprehensive tests using TransactionTestCase for self-contained testing
- Update README with examples of how to use the new APIs, including in migrations

These APIs provide a programmatic way to manage TimescaleDB's data lifecycle features.
The implementation uses a method-based approach where you call methods on the
`timescale` manager, rather than a declarative approach in model definitions.

Part of Milestone 2 roadmap. roadmap. Ref: jamessewell#23
@voarsh2
Copy link
Author

voarsh2 commented Apr 25, 2025

@jamessewell is anyone able to review this?

@voarsh2 voarsh2 changed the title Add programmatic APIs for TimescaleDB retention policy and compression Add programmatic APIs for TimescaleDB (retention policy and compression( Apr 25, 2025
@voarsh2 voarsh2 mentioned this pull request Apr 25, 2025
6 tasks
@voarsh2 voarsh2 changed the title Add programmatic APIs for TimescaleDB (retention policy and compression( Add programmatic APIs for TimescaleDB (retention policy and compression) Apr 25, 2025
@ghost
Copy link

ghost commented May 13, 2025

I'm looking for something like this right now, but I was wondering if this should be coming from the model Meta instead in a more declarative way. Can this be considered?

@voarsh2
Copy link
Author

voarsh2 commented May 14, 2025

I'm looking for something like this right now, but I was wondering if this should be coming from the model Meta instead in a more declarative way. Can this be considered?

My changes were a PoC (Proof of Concept), and the referenced issue/roadmap spoke about declaring it in models as migrations as a future addition, in ADDITION to my PoC that I've attempted (API, not migration based, but you can declare in your migration files manually if you check my readme) with passing tests. But this would take considerable effort (on my part), if maintainer won't even look at my PR then I won't bother....

I am maintaining my TimescaleDB tables manually outside of Django.

If people actually expressed interest, I could take more time on it..... since I have a project that needs these missing features..... and a big effort to contribute these features, when I need them now (easier to do it myself, outside of this project), especially if not reviewed/merged, etc...

@jamessewell
Copy link
Owner

Oooh! Neat!

So this is similar to the way these other libs work right?

@voarsh2
Copy link
Author

voarsh2 commented May 14, 2025

Oooh! Neat!

So this is similar to the way these other libs work right?

Hi @jamessewell,

Yes, my implementation is similar in approach to the TimescaleDB Ruby and TypeScript libraries, but with some differences due to Django's architecture (and my time constraints if this PR is not of interest to this repo):

Similarities to timescaledb-ruby and timescaledb-ts:

  1. Programmatic API: Like both libraries, I've implemented a programmatic API that allows users to manage retention and compression policies through method calls.

  2. Manager-based access: Similar to how the Ruby gem provides access through a manager, I've added methods to the existing TimescaleManager class to provide a clean API for managing TimescaleDB features.

  3. Access through model manager: The implementation provides access to TimescaleDB features through the model's timescale manager, which is a simpler approach compared to the deeper ORM integrations in the TypeScript and Ruby libraries. While those libraries extend their respective ORMs with custom decorators, query builders, and model macros, this implementation focuses on providing straightforward method access.

Key differences:

  1. Declarative vs. Programmatic: The Ruby and TypeScript libraries offer both declarative (in model definitions) and programmatic approaches. My current implementation is programmatic only, with declarative configuration planned for a future enhancement (as mentioned in the roadmap).

  2. Migration support: The Ruby gem has more advanced migration support with special syntax for creating hypertables with compression and retention in one step. My implementation requires separate method calls, but they can be included in migration files.

  3. Scope: My implementation focuses specifically on retention and compression policies, while the Ruby and TypeScript libraries have broader feature sets including continuous aggregates, time buckets, etc.

Future enhancements:

As mentioned in the roadmap (issue #23), a future enhancement could add declarative configuration in model definitions, similar to how the Ruby gem allows configuration in model classes:

# Future declarative approach (not yet implemented)
class MyTimeseriesModel(TimescaleModel):
    time = TimescaleDateTimeField(interval="1 day")
    value = models.FloatField()
    
    class TimescaleOptions:
        retention_policy = {
            'drop_after': '90 days',
            'schedule_interval': '1 day'
        }
        compression = {
            'enabled': True,
            'orderby': ['time'],
            'segmentby': ['device'],
            'policy': {
                'compress_after': '30 days'
            }
        }

This would be a natural evolution of the current programmatic API, providing users with both options depending on their preference. Django model declarative approach would be a next step (and more work).

Feel free to give a test and/or feedback. I am managing these features outside of Django on my project, so is a pain point - but I also need these now.

How I'm currently managing and configuring TimescaleDB tables with outside of this package:
Config generator:

TimescaleDB configuration generator.

This module automatically generates TimescaleDB configuration from Django models,
reducing the maintenance burden by keeping configuration in sync with model definitions.

import inspect
import sys
from dataclasses import dataclass
from typing import List, Dict, Optional, Type, Set
from django.db import models
from django.apps import apps

from apps.ml_pipeline.metrics import (
    TimeSeriesMetricBase,
    MessageMetrics,
    UserActivityMetrics,
    CommunityHealthMetrics,
    GrowthAnalysis,
    NetworkAnalysis,
    CommunityHealth,
    TrendAnalysis,
    UserSegmentAnalysis,
    ReactionPatternAnalysis,
    TopicEngagementAnalysis
)

@dataclass
class HypertableConfig:
    """Configuration for a TimescaleDB hypertable."""
    table_name: str
    time_column: str = 'timestamp'
    partition_columns: List[str] = None
    chunk_interval: str = '1 day'

    def __post_init__(self):
        if self.partition_columns is None:
            self.partition_columns = ['community_id']

@dataclass
class CompressionConfig:
    """Configuration for TimescaleDB compression."""
    table_name: str
    compress_after: str = '7 days'
    segment_by: List[str] = None
    order_by: List[str] = None

    def __post_init__(self):
        if self.segment_by is None:
            self.segment_by = ['community_id']
        if self.order_by is None:
            self.order_by = ['timestamp DESC']

@dataclass
class ContinuousAggregateConfig:
    """Configuration for a TimescaleDB continuous aggregate view."""
    name: str
    source_table: str
    time_bucket: str
    group_by: List[str]
    aggregates: Dict[str, str]
    create_after: str = '1 hour'
    refresh_interval: str = '1 hour'
    retention_period: str = '6 months'
    custom_from: Optional[str] = None  # For complex FROM clauses with LATERAL joins

    def get_sql(self) -> str:
        """Generate SQL for creating the continuous aggregate view."""
        aggregates_sql = ',\n'.join(
            f"{expr} AS {name}"
            for name, expr in self.aggregates.items()
        )

        from_clause = self.custom_from if self.custom_from else self.source_table

        return f"""
        CREATE MATERIALIZED VIEW IF NOT EXISTS {self.name}
        WITH (timescaledb.continuous) AS
        SELECT
            time_bucket('{self.time_bucket}', timestamp) AS bucket,
            {','.join(self.group_by)},
            {aggregates_sql}
        FROM {from_clause}
        GROUP BY bucket, {','.join(self.group_by)};
        """

@dataclass
class RetentionConfig:
    """Configuration for TimescaleDB data retention."""
    table_name: str
    retention_period: str
    is_view: bool = False

class ModelInspector:
    """Inspects Django models to generate TimescaleDB configurations."""

    @staticmethod
    def get_timescale_models() -> List[Type[TimeSeriesMetricBase]]:
        """Get all models that inherit from TimeSeriesMetricBase."""
        return [
            MessageMetrics,
            UserActivityMetrics,
            CommunityHealthMetrics,
            GrowthAnalysis,
            NetworkAnalysis,
            CommunityHealth,
            TrendAnalysis,
            UserSegmentAnalysis,
            ReactionPatternAnalysis,
            TopicEngagementAnalysis
        ]

    @staticmethod
    def get_table_name(model: Type[models.Model]) -> str:
        """Get the database table name for a model."""
        return model._meta.db_table

    @staticmethod
    def get_field_names(model: Type[models.Model]) -> List[str]:
        """Get all field names for a model."""
        return [field.name for field in model._meta.fields]

    @staticmethod
    def get_numeric_fields(model: Type[models.Model]) -> List[str]:
        """Get names of numeric fields in a model."""
        numeric_fields = []
        for field in model._meta.fields:
            if isinstance(field, (models.IntegerField, models.FloatField, models.DecimalField)):
                numeric_fields.append(field.name)
        return numeric_fields

    @staticmethod
    def get_json_fields(model: Type[models.Model]) -> List[str]:
        """Get names of JSON fields in a model."""
        json_fields = []
        for field in model._meta.fields:
            if isinstance(field, models.JSONField):
                json_fields.append(field.name)
        return json_fields

    @staticmethod
    def get_default_aggregation(field_name: str, field_type: type) -> str:
        """Determine the appropriate aggregation function based on field name and type.

        This helps automatically select the right aggregation function for fields
        based on naming conventions and field types.

        Args:
            field_name: The name of the field
            field_type: The Django field type

        Returns:
            The appropriate aggregation function name (MAX, AVG, SUM)
        """
        # Count-like fields should use MAX to avoid duplication
        if any(term in field_name.lower() for term in ['count', 'total', 'num']):
            return 'MAX'

        # Rate or score fields should use AVG
        if any(term in field_name.lower() for term in ['rate', 'ratio', 'score', 'avg']):
            return 'AVG'

        # User-related counts might need SUM
        if any(term in field_name.lower() for term in ['users', 'members']):
            if any(prefix in field_name.lower() for prefix in ['new', 'returning', 'churned']):
                return 'SUM'
            return 'MAX'

        # Default based on field type
        if field_type in (models.IntegerField, models.PositiveIntegerField):
            return 'MAX'
        if field_type in (models.FloatField, models.DecimalField):
            return 'AVG'

        return 'MAX'  # Safe default

    @staticmethod
    def json_path(field: str, path: str, cast_type: str = 'float') -> str:
        """Generate a JSON path expression with proper casting.

        This creates consistent JSON path expressions for accessing JSON fields
        in PostgreSQL, with proper type casting.

        Args:
            field: The JSON field name
            path: The path within the JSON, using dot notation for nesting
            cast_type: The PostgreSQL type to cast to

        Returns:
            A properly formatted JSON path expression with casting

        Examples:
            >>> ModelInspector.json_path('data', 'score', 'float')
            "(data->>'score')::float"

            >>> ModelInspector.json_path('metrics', 'engagement.score', 'float')
            "(metrics->'engagement'->>'score')::float"
        """
        if '.' not in path:
            return f"({field}->>{path!r})::{cast_type}"

        # For nested paths
        segments = path.split('.')
        result = field
        for i, segment in enumerate(segments):
            if i == len(segments) - 1:
                result = f"({result}->>'{segment}')"
            else:
                result = f"({result}->'{segment}')"

        return f"{result}::{cast_type}"

    @staticmethod
    def generate_hypertable_configs() -> List[HypertableConfig]:
        """Generate hypertable configurations for all TimescaleDB models."""
        configs = []
        for model in ModelInspector.get_timescale_models():
            configs.append(HypertableConfig(
                table_name=ModelInspector.get_table_name(model)
            ))
        return configs

    @staticmethod
    def generate_compression_configs() -> List[CompressionConfig]:
        """Generate compression configurations for all TimescaleDB models."""
        configs = []
        for model in ModelInspector.get_timescale_models():
            configs.append(CompressionConfig(
                table_name=ModelInspector.get_table_name(model)
            ))
        return configs

    @staticmethod
    def generate_continuous_aggregate_configs() -> List[ContinuousAggregateConfig]:
        """Generate continuous aggregate configurations based on model fields."""
        configs = []

        # MessageMetrics hourly aggregates
        message_metrics = MessageMetrics
        configs.append(ContinuousAggregateConfig(
            name='message_metrics_hourly',
            source_table=ModelInspector.get_table_name(message_metrics),
            time_bucket='1 hour',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'message_count': 'MAX(message_count)',  # Using MAX instead of SUM to avoid duplication
                'unique_users': 'AVG(unique_users)',
                'reaction_count': 'MAX(reaction_count)',
                'thread_count': 'MAX(thread_count)',
                'avg_response_time': 'AVG(avg_response_time)',
                'avg_sentiment_score': 'AVG(avg_sentiment_score)'
            }
        ))

        # UserActivityMetrics daily aggregates
        user_metrics = UserActivityMetrics
        configs.append(ContinuousAggregateConfig(
            name='user_metrics_daily',
            source_table=ModelInspector.get_table_name(user_metrics),
            time_bucket='1 day',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'daily_active_users': 'AVG(daily_active_users)',
                'weekly_active_users': 'AVG(weekly_active_users)',
                'new_users': 'SUM(new_users)',
                'returning_users': 'SUM(returning_users)',
                'churned_users': 'SUM(churned_users)',
                'direct_churned_users': 'SUM(direct_churned_users)',
                'inactivity_churned_users': 'SUM(inactivity_churned_users)',
                'seasonal_churned_users': 'SUM(seasonal_churned_users)',
                'partial_churned_users': 'SUM(partial_churned_users)',
                'rejoined_users': 'SUM(rejoined_users)',
                'total_server_members': 'MAX(total_server_members)',
                'engagement_score': 'AVG(engagement_score)',
                'participation_rate': 'AVG(participation_rate)'
            },
            refresh_interval='1 day',
            retention_period='1 year'
        ))

        # CommunityHealthMetrics weekly aggregates
        health_metrics = CommunityHealthMetrics
        configs.append(ContinuousAggregateConfig(
            name='health_metrics_weekly',
            source_table=ModelInspector.get_table_name(health_metrics),
            time_bucket='7 days',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'health_score': 'AVG(health_score)',
                'engagement_rate': 'AVG(engagement_rate)',
                'growth_rate': 'AVG(growth_rate)',
                'retention_rate': 'AVG(retention_rate)',
                'network_density': 'AVG(network_density)',
                'topic_diversity': 'AVG(topic_diversity)',
                'avg_thread_depth': 'AVG(avg_thread_depth)',
                'active_user_ratio': 'AVG(active_user_ratio)'
            },
            refresh_interval='1 day',
            retention_period='2 years'
        ))

        # CommunityHealth weekly trends
        community_health = CommunityHealth
        configs.append(ContinuousAggregateConfig(
            name='health_trends_weekly',
            source_table=ModelInspector.get_table_name(community_health),
            time_bucket='7 days',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'engagement_score': "AVG((engagement->>'score')::float)",
                'cohesion_score': "AVG((cohesion->>'score')::float)",
                'growth_score': "AVG((growth->>'score')::float)",
                'retention_score': "AVG((retention->>'score')::float)"
            }
        ))

        # GrowthAnalysis daily aggregates
        growth_analysis = GrowthAnalysis
        configs.append(ContinuousAggregateConfig(
            name='growth_analysis_daily',
            source_table=ModelInspector.get_table_name(growth_analysis),
            time_bucket='1 day',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'growth_rate': "AVG((current_metrics->>'growth_rate')::float)",
                'retention_rate': "AVG((retention_metrics->>'overall_rate')::float)",
                'new_user_conversion': "AVG((current_metrics->>'new_user_conversion')::float)",
                'prediction_accuracy': "AVG((predictions->>'accuracy')::float)"
            },
            refresh_interval='1 day',
            retention_period='3 months'
        ))

        # NetworkAnalysis weekly aggregates
        network_analysis = NetworkAnalysis
        configs.append(ContinuousAggregateConfig(
            name='network_analysis_weekly',
            source_table=ModelInspector.get_table_name(network_analysis),
            time_bucket='7 days',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'network_density': "AVG((graph_metrics->>'density')::float)",
                'clustering_coefficient': "AVG((graph_metrics->>'clustering_coefficient')::float)",
                'avg_path_length': "AVG((graph_metrics->>'avg_path_length')::float)",
                'key_user_count': "AVG((key_users->>'count')::float)"
            },
            refresh_interval='1 day',
            retention_period='3 months'
        ))

        # TrendAnalysis hourly aggregates
        trend_analysis = TrendAnalysis
        configs.append(ContinuousAggregateConfig(
            name='trend_analysis_hourly',
            source_table=ModelInspector.get_table_name(trend_analysis),
            time_bucket='1 hour',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'total_messages': "MAX((engagement_patterns->>'total_messages')::int)",
                'unique_authors': "AVG((hourly_trends->>'unique_authors')::float)",
                'avg_engagement_rate': "AVG((hourly_trends->>'engagement_rate')::float)"
            },
            refresh_interval='1 hour',
            retention_period='1 month'
        ))

        # UserSegmentAnalysis daily aggregates
        user_segments = UserSegmentAnalysis
        configs.append(ContinuousAggregateConfig(
            name='user_segments_daily',
            source_table=ModelInspector.get_table_name(user_segments),
            time_bucket='1 day',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'total_segments': 'COUNT(DISTINCT segment_key)',
                'total_transitions': "SUM((segment_transitions->>'total_transitions')::int)",
                # Updated to use standardized segment name 'highly_engaged_users' instead of 'power_users'
                # Note: The metric name 'power_user_ratio' is kept for backward compatibility
                # but may be renamed to 'highly_engaged_user_ratio' in a future update
                'power_user_ratio': """
                    AVG(
                        ((segments->'highly_engaged_users'->>'count')::float /
                        NULLIF((segments->>'total_users')::float, 0))
                    )
                """
            },
            refresh_interval='1 day',
            retention_period='3 months',
            custom_from=f"{ModelInspector.get_table_name(user_segments)} CROSS JOIN LATERAL jsonb_object_keys(segments) AS t(segment_key)"
        ))

        # ReactionPatternAnalysis hourly aggregates
        reaction_patterns = ReactionPatternAnalysis
        configs.append(ContinuousAggregateConfig(
            name='reaction_patterns_hourly',
            source_table=ModelInspector.get_table_name(reaction_patterns),
            time_bucket='1 hour',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'total_reactions': "MAX((reaction_metrics->>'total_messages')::int)",
                'avg_reactions_per_message': "AVG((reaction_metrics->>'avg_reactions_per_message')::float)",
                'messages_with_reactions': "MAX((reaction_metrics->>'messages_with_reactions')::int)"
            },
            refresh_interval='1 hour',
            retention_period='1 month'
        ))

        # TopicEngagementAnalysis daily aggregates
        topic_engagement = TopicEngagementAnalysis
        configs.append(ContinuousAggregateConfig(
            name='topic_engagement_daily',
            source_table=ModelInspector.get_table_name(topic_engagement),
            time_bucket='1 day',
            group_by=['community_id'],
            aggregates={
                'id': 'MAX(id)',  # Include id field
                'avg_topic_engagement': "AVG((topics->0->'engagement_metrics'->>'avg_reactions')::float)",
                'total_topics': "COUNT(DISTINCT t.topic_key)",
                'trending_topics': "COUNT(*) FILTER (WHERE topics->0->>'trend' = 'increasing')"
            },
            refresh_interval='1 day',
            retention_period='3 months',
            custom_from=f"{ModelInspector.get_table_name(topic_engagement)} CROSS JOIN LATERAL jsonb_object_keys(topic_evolution) AS t(topic_key)"
        ))

        return configs

    @staticmethod
    def generate_retention_configs() -> List[RetentionConfig]:
        """Generate retention configurations for all TimescaleDB models and views."""
        configs = []

        # Raw data retention
        for model in ModelInspector.get_timescale_models():
            table_name = ModelInspector.get_table_name(model)

            # Determine appropriate retention period based on model type
            if table_name in ['ml_pipeline_trend_analysis', 'ml_pipeline_reaction_pattern_analysis']:
                retention_period = '1 month'
            else:
                retention_period = '3 months'

            configs.append(RetentionConfig(table_name, retention_period))

        # Aggregate retention
        aggregate_retention = {
            'message_metrics_hourly': '6 months',
            'user_metrics_daily': '1 year',
            'health_metrics_weekly': '2 years',
            'health_trends_weekly': '1 year',
            'growth_analysis_daily': '6 months',  # New aggregate
            'network_analysis_weekly': '1 year',  # New aggregate
            'trend_analysis_hourly': '3 months',
            'user_segments_daily': '6 months',
            'reaction_patterns_hourly': '3 months',
            'topic_engagement_daily': '6 months'
        }

        for view_name, retention_period in aggregate_retention.items():
            configs.append(RetentionConfig(view_name, retention_period, is_view=True))

        return configs

class TimescaleConfigGenerator:
    """Generates TimescaleDB configuration by inspecting Django models."""

    @staticmethod
    def generate_config():
        """Generate a complete TimescaleDB configuration."""
        return TimescaleConfig()

class TimescaleConfig:
    """Central configuration for all TimescaleDB features, generated from model inspection."""

    def __init__(self):
        # Generate configurations by inspecting models
        self.hypertables = ModelInspector.generate_hypertable_configs()
        self.compression_policies = ModelInspector.generate_compression_configs()
        self.continuous_aggregates = ModelInspector.generate_continuous_aggregate_configs()
        self.retention_policies = ModelInspector.generate_retention_configs()

    def validate(self) -> List[str]:
        """Validate the TimescaleDB configuration for common issues.

        Returns:
            A list of validation issues found, empty if no issues
        """
        issues = []

        # Check for duplicate aggregate names
        aggregate_names = [ca.name for ca in self.continuous_aggregates]
        duplicates = [name for name in set(aggregate_names) if aggregate_names.count(name) > 1]
        if duplicates:
            issues.append(f"Duplicate continuous aggregate names: {', '.join(duplicates)}")

        # Check for missing retention policies
        aggregate_names = set(ca.name for ca in self.continuous_aggregates)
        retention_view_names = set(rc.table_name for rc in self.retention_policies if rc.is_view)
        missing_retention = aggregate_names - retention_view_names
        if missing_retention:
            issues.append(f"Missing retention policies for aggregates: {', '.join(missing_retention)}")

        # Check for invalid refresh intervals
        for ca in self.continuous_aggregates:
            if ca.time_bucket == '1 hour' and ca.refresh_interval not in ('1 hour', '30 minutes'):
                issues.append(f"Aggregate {ca.name} has time bucket {ca.time_bucket} but refresh {ca.refresh_interval}")
            elif ca.time_bucket == '1 day' and ca.refresh_interval not in ('1 day', '12 hours', '6 hours'):
                issues.append(f"Aggregate {ca.name} has time bucket {ca.time_bucket} but refresh {ca.refresh_interval}")

        # Check for hypertables without compression
        hypertable_names = set(ht.table_name for ht in self.hypertables)
        compression_table_names = set(cp.table_name for cp in self.compression_policies)
        missing_compression = hypertable_names - compression_table_names
        if missing_compression:
            issues.append(f"Missing compression policies for hypertables: {', '.join(missing_compression)}")

        return issues

    def get_sql(self) -> List[str]:
        """Generate SQL statements for the entire configuration."""
        sql_statements = []

        # First modify primary keys and ensure unique constraints
        for config in self.hypertables:
            sql = f"""
            DO $$
            BEGIN
                -- Drop existing primary key
                IF EXISTS (
                    SELECT 1 FROM pg_constraint
                    WHERE conname = '{config.table_name}_pkey'
                ) THEN
                    ALTER TABLE {config.table_name} DROP CONSTRAINT {config.table_name}_pkey;
                END IF;

                -- Create new composite primary key
                ALTER TABLE {config.table_name}
                ADD PRIMARY KEY (id, timestamp);

                -- Ensure unique constraint exists
                IF NOT EXISTS (
                    SELECT 1 FROM pg_indexes
                    WHERE tablename = '{config.table_name}'
                    AND indexdef LIKE '%UNIQUE%timestamp%community_id%'
                ) THEN
                    CREATE UNIQUE INDEX IF NOT EXISTS {config.table_name}_ts_comm_id_idx
                    ON {config.table_name} (timestamp, community_id);
                END IF;
            END $$;
            """
            sql_statements.append(sql)

        # Then create hypertables
        for config in self.hypertables:
            sql = f"""
            SELECT create_hypertable(
                '{config.table_name}',
                '{config.time_column}',
                chunk_time_interval => INTERVAL '{config.chunk_interval}',
                if_not_exists => TRUE
            );
            """
            sql_statements.append(sql)

        # Compression setup
        for config in self.compression_policies:
            sql = f"""
            ALTER TABLE {config.table_name} SET (
                timescaledb.compress = true,
                timescaledb.compress_segmentby = '{','.join(config.segment_by)}',
                timescaledb.compress_orderby = '{','.join(config.order_by)}'
            );

            SELECT add_compression_policy(
                '{config.table_name}',
                INTERVAL '{config.compress_after}',
                if_not_exists => TRUE
            );
            """
            sql_statements.append(sql)

        # Continuous aggregates
        for config in self.continuous_aggregates:
            sql = config.get_sql()
            sql_statements.append(sql)

            # Add policy as separate statement
            sql = f"""
            SELECT add_continuous_aggregate_policy(
                '{config.name}',
                start_offset => INTERVAL '{config.time_bucket}' * 3,
                end_offset => INTERVAL '{config.time_bucket}',
                schedule_interval => INTERVAL '{config.refresh_interval}',
                if_not_exists => TRUE
            );
            """
            sql_statements.append(sql)

        # Retention policies
        for config in self.retention_policies:
            sql = f"""
            SELECT add_retention_policy(
                '{config.table_name}',
                INTERVAL '{config.retention_period}',
                if_not_exists => TRUE
            );
            """
            sql_statements.append(sql)

        return sql_statements

Validation as management command:


Management command to verify the TimescaleDB configuration against model definitions.

This command checks that the TimescaleDB configuration matches the model definitions
and reports any discrepancies.

from django.core.management.base import BaseCommand
from django.db import connection
from apps.ml_pipeline.timescale.config_generator import TimescaleConfig, ModelInspector

class Command(BaseCommand):
    help = 'Verify TimescaleDB configuration against model definitions'

    def add_arguments(self, parser):
        parser.add_argument(
            '--show-sql',
            action='store_true',
            help='Show SQL statements that would be executed'
        )
        parser.add_argument(
            '--check-models',
            action='store_true',
            help='Check model fields against configuration'
        )
        parser.add_argument(
            '--check-db',
            action='store_true',
            help='Check database state against configuration'
        )
        parser.add_argument(
            '--validate',
            action='store_true',
            help='Validate the configuration for common issues'
        )

    def handle(self, *args, **options):
        config = TimescaleConfig()

        if options['validate']:
            self._validate_config(config)
            return

        if options['show_sql']:
            self.stdout.write("SQL statements that would be executed:")
            for sql in config.get_sql():
                self.stdout.write("\n" + sql)
            return

        if options['check_models']:
            self._check_models(config)
            return

        if options['check_db']:
            self._check_db(config)
            return

        # Default: show configuration summary
        self._show_summary(config)

    def _validate_config(self, config):
        """Validate the configuration for common issues."""
        self.stdout.write(self.style.SUCCESS("Validating TimescaleDB Configuration"))
        self.stdout.write("=" * 80)

        issues = config.validate()

        if issues:
            self.stdout.write(self.style.ERROR("Found configuration issues:"))
            for issue in issues:
                self.stdout.write(self.style.ERROR(f"  - {issue}"))
        else:
            self.stdout.write(self.style.SUCCESS("Configuration validation passed! No issues found."))

    def _show_summary(self, config):
        """Show a summary of the TimescaleDB configuration."""
        self.stdout.write(self.style.SUCCESS("TimescaleDB Configuration Summary"))
        self.stdout.write("=" * 80)

        self.stdout.write(f"\nHypertables ({len(config.hypertables)}):")
        for hypertable in config.hypertables:
            self.stdout.write(f"  - {hypertable.table_name} (time column: {hypertable.time_column})")

        self.stdout.write(f"\nCompression Policies ({len(config.compression_policies)}):")
        for policy in config.compression_policies:
            self.stdout.write(f"  - {policy.table_name} (compress after: {policy.compress_after})")

        self.stdout.write(f"\nContinuous Aggregates ({len(config.continuous_aggregates)}):")
        for agg in config.continuous_aggregates:
            self.stdout.write(f"  - {agg.name} (source: {agg.source_table}, bucket: {agg.time_bucket})")
            self.stdout.write(f"    Group by: {', '.join(agg.group_by)}")
            self.stdout.write(f"    Aggregates: {len(agg.aggregates)} fields")

        self.stdout.write(f"\nRetention Policies ({len(config.retention_policies)}):")
        for policy in config.retention_policies:
            table_type = "View" if policy.is_view else "Table"
            self.stdout.write(f"  - {policy.table_name} ({table_type}, retention: {policy.retention_period})")

    def _check_models(self, config):
        """Check that all models have corresponding configurations."""
        self.stdout.write(self.style.SUCCESS("Checking Models Against Configuration"))
        self.stdout.write("=" * 80)

        # Get all TimescaleDB models
        models = ModelInspector.get_timescale_models()
        model_tables = {model._meta.db_table: model for model in models}

        # Check hypertables
        hypertable_tables = {h.table_name for h in config.hypertables}
        missing_hypertables = set(model_tables.keys()) - hypertable_tables
        extra_hypertables = hypertable_tables - set(model_tables.keys())

        if missing_hypertables:
            self.stdout.write(self.style.ERROR(f"Missing hypertable configurations for:"))
            for table in missing_hypertables:
                self.stdout.write(f"  - {table}")

        if extra_hypertables:
            self.stdout.write(self.style.WARNING(f"Extra hypertable configurations for:"))
            for table in extra_hypertables:
                self.stdout.write(f"  - {table}")

        # Check compression policies
        compression_tables = {c.table_name for c in config.compression_policies}
        missing_compression = set(model_tables.keys()) - compression_tables

        if missing_compression:
            self.stdout.write(self.style.ERROR(f"Missing compression policies for:"))
            for table in missing_compression:
                self.stdout.write(f"  - {table}")

        # Check continuous aggregates
        source_tables = {ca.source_table for ca in config.continuous_aggregates}
        missing_aggregates = set(model_tables.keys()) - source_tables

        if missing_aggregates:
            self.stdout.write(self.style.WARNING(f"Models without continuous aggregates:"))
            for table in missing_aggregates:
                self.stdout.write(f"  - {table}")

        # Check retention policies
        retention_tables = {r.table_name for r in config.retention_policies if not r.is_view}
        missing_retention = set(model_tables.keys()) - retention_tables

        if missing_retention:
            self.stdout.write(self.style.ERROR(f"Missing retention policies for:"))
            for table in missing_retention:
                self.stdout.write(f"  - {table}")

        # Check field coverage in continuous aggregates
        self.stdout.write("\nChecking field coverage in continuous aggregates:")
        for ca in config.continuous_aggregates:
            if ca.source_table in model_tables:
                model = model_tables[ca.source_table]
                numeric_fields = ModelInspector.get_numeric_fields(model)
                aggregate_fields = set(ca.aggregates.keys())

                # Fields used in group_by are already included in the aggregate
                group_by_fields = set(ca.group_by)

                # Fields that are neither in aggregates nor in group_by
                missing_fields = set(numeric_fields) - aggregate_fields - group_by_fields

                if missing_fields:
                    self.stdout.write(self.style.WARNING(f"  {ca.name} is missing fields from {ca.source_table}:"))
                    for field in missing_fields:
                        self.stdout.write(f"    - {field}")

        if not (missing_hypertables or missing_compression or missing_retention):
            self.stdout.write(self.style.SUCCESS("\nAll models have required TimescaleDB configurations!"))

    def _check_db(self, config):
        """Check the database state against the configuration."""
        self.stdout.write(self.style.SUCCESS("Checking Database State Against Configuration"))
        self.stdout.write("=" * 80)

        with connection.cursor() as cursor:
            # Check hypertables
            cursor.execute("SELECT hypertable_name FROM timescaledb_information.hypertables")
            db_hypertables = {row[0] for row in cursor.fetchall()}

            config_hypertables = {h.table_name for h in config.hypertables}
            missing_hypertables = config_hypertables - db_hypertables

            if missing_hypertables:
                self.stdout.write(self.style.ERROR(f"Hypertables in config but not in database:"))
                for table in missing_hypertables:
                    self.stdout.write(f"  - {table}")

            # Check compression
            cursor.execute("SELECT hypertable_name FROM timescaledb_information.compression_settings")
            db_compression = {row[0] for row in cursor.fetchall()}

            config_compression = {c.table_name for c in config.compression_policies}
            missing_compression = config_compression - db_compression

            if missing_compression:
                self.stdout.write(self.style.ERROR(f"Compression policies in config but not in database:"))
                for table in missing_compression:
                    self.stdout.write(f"  - {table}")

            # Check continuous aggregates
            cursor.execute("SELECT view_name FROM timescaledb_information.continuous_aggregates")
            db_aggregates = {row[0] for row in cursor.fetchall()}

            config_aggregates = {ca.name for ca in config.continuous_aggregates}
            missing_aggregates = config_aggregates - db_aggregates

            if missing_aggregates:
                self.stdout.write(self.style.ERROR(f"Continuous aggregates in config but not in database:"))
                for name in missing_aggregates:
                    self.stdout.write(f"  - {name}")

            # Check retention policies for regular hypertables
            cursor.execute("SELECT hypertable_name FROM timescaledb_information.jobs WHERE proc_name = 'policy_retention'")
            db_hypertable_retention = {row[0] for row in cursor.fetchall()}

            # Check retention policies for continuous aggregates
            cursor.execute("""
            SELECT ca.view_name
            FROM timescaledb_information.jobs j
            JOIN timescaledb_information.continuous_aggregates ca
                ON j.hypertable_name = ca.materialization_hypertable_name
            WHERE j.proc_name = 'policy_retention'
            """)
            db_cagg_retention = {row[0] for row in cursor.fetchall()}

            # Combine both sets of retention policies
            db_retention = db_hypertable_retention.union(db_cagg_retention)

            # Check for missing retention policies
            config_retention = {r.table_name for r in config.retention_policies}
            missing_retention = config_retention - db_retention

            if missing_retention:
                self.stdout.write(self.style.ERROR(f"Retention policies in config but not in database:"))
                for table in missing_retention:
                    self.stdout.write(f"  - {table}")

            if not (missing_hypertables or missing_compression or missing_aggregates or missing_retention):
                self.stdout.write(self.style.SUCCESS("\nDatabase state matches configuration!"))

@jamessewell
Copy link
Owner

Oh there is this one too. https://github.com/jmitchel3/timescaledb-python

This all sounds great. I will try to get a review done this week.

I'd love to learn more about your project - if you want to chat join the TimescaleDB Slack :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants