From f18a3cabf8d5b9316804bcde81996fe2549731d7 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Fri, 14 Nov 2025 16:28:10 +0200 Subject: [PATCH 01/12] move maintenance lock acquisition logic to `admin.try_get_maintenance_lock()` --- internal/sinks/postgres.go | 9 ++++----- internal/sinks/sql/admin_functions.sql | 11 +++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index ba3b92c07..d70af5303 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -509,7 +509,6 @@ func (pgw *PostgresWriter) DeleteOldPartitions() { func (pgw *PostgresWriter) MaintainUniqueSources() { logger := log.GetLogger(pgw.ctx) - sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()` sqlDistinct := ` WITH RECURSIVE t(dbname) AS ( @@ -528,13 +527,13 @@ func (pgw *PostgresWriter) MaintainUniqueSources() { RETURNING *` var lock bool - logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly - if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil { - logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err) + logger.Infof("Trying to get maintenance advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly + if err := pgw.sinkDb.QueryRow(pgw.ctx, "SELECT admin.try_get_maintenance_lock();").Scan(&lock); err != nil { + logger.Error("Getting maintenance advisory lock failed:", err) return } if !lock { - logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...") + logger.Info("Skipping maintenance as another instance has the advisory lock...") return } diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index a04d3433d..acbd9096d 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -139,3 +139,14 @@ BEGIN RETURN i; END; $SQL$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION admin.try_get_maintenance_lock( + OUT have_lock BOOLEAN +) +AS +$SQL$ +BEGIN + -- 1571543679778230000 is just a random bigint + SELECT pg_try_advisory_lock(1571543679778230000) INTO have_lock; +END; +$SQL$ LANGUAGE plpgsql; \ No newline at end of file From c43bbe35fc3f07d1de1959620016970a69072b6b Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 03:48:58 +0200 Subject: [PATCH 02/12] move listing table update logic to PL/pgSQL functions. - move individual metric update logic to `admin.update_listing_table()`. - move dropped tables removal logic to `admin.remove_dropped_tables_listing()`. - use these new functions in the maintenance routine. --- internal/sinks/postgres.go | 76 ++++---------------------- internal/sinks/sql/admin_functions.sql | 54 ++++++++++++++++++ 2 files changed, 66 insertions(+), 64 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index d70af5303..0e0b064ab 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -509,23 +509,6 @@ func (pgw *PostgresWriter) DeleteOldPartitions() { func (pgw *PostgresWriter) MaintainUniqueSources() { logger := log.GetLogger(pgw.ctx) - sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()` - sqlDistinct := ` - WITH RECURSIVE t(dbname) AS ( - SELECT MIN(dbname) AS dbname FROM %s - UNION - SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t - ) - SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1` - sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2` - sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1` - sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)` - sqlAdd := ` - INSERT INTO admin.all_distinct_dbname_metrics - SELECT u, $2 FROM (select unnest($1::text[]) as u) x - WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2) - RETURNING *` - var lock bool logger.Infof("Trying to get maintenance advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly if err := pgw.sinkDb.QueryRow(pgw.ctx, "SELECT admin.try_get_maintenance_lock();").Scan(&lock); err != nil { @@ -537,66 +520,31 @@ func (pgw *PostgresWriter) MaintainUniqueSources() { return } - logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...") - rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics) + logger.Info("Updating admin.all_distinct_dbname_metrics listing table...") + rows, _ := pgw.sinkDb.Query(pgw.ctx, "SELECT table_name FROM admin.get_top_level_metric_tables()") allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string]) if err != nil { logger.Error(err) return } + // updates listing table entries for a single metric. + sqlUpdateListingTable := "SELECT admin.update_listing_table(metric_table_name => $1);" for i, tableName := range allDistinctMetricTables { - foundDbnamesMap := make(map[string]bool) - foundDbnamesArr := make([]string, 0) - metricName := strings.Replace(tableName, "public.", "", 1) - // later usage in sqlDroppedTables requires no "public." prefix - allDistinctMetricTables[i] = metricName - - logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName) - rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName)) - ret, err := pgx.CollectRows(rows, pgx.RowTo[string]) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) - continue - } - for _, drDbname := range ret { - foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates - } - - // delete all that are not known and add all that are not there - for k := range foundDbnamesMap { - foundDbnamesArr = append(foundDbnamesArr, k) - } - if len(foundDbnamesArr) == 0 { // delete all entries for given metric - logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName) - - _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName) - if err != nil { - logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err) - } - continue - } - cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) - } else if cmdTag.RowsAffected() > 0 { - logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName) - } - cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) - } else if cmdTag.RowsAffected() > 0 { - logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName) + allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix. + logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName) + if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlUpdateListingTable, tableName); err != nil { + logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err) } time.Sleep(time.Minute) } - cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables) + // removes all entries for any non-existing table. + sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)" + _, err = pgw.sinkDb.Exec(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables) if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err) - } else if cmdTag.RowsAffected() > 0 { - logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected()) + logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err) } } diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index acbd9096d..4aa2a8bbd 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -149,4 +149,58 @@ BEGIN -- 1571543679778230000 is just a random bigint SELECT pg_try_advisory_lock(1571543679778230000) INTO have_lock; END; +$SQL$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_table_name text) +RETURNS VOID +AS +$SQL$ +DECLARE + metric_name text; + schema text = 'public'; +BEGIN + + IF POSITION('public.' IN metric_table_name) > 0 THEN + metric_name = SUBSTRING(metric_table_name FROM POSITION('public.' IN metric_table_name) + LENGTH('public.')); + ELSE + metric_name = metric_table_name; + END IF; + + EXECUTE FORMAT( + $$ + CREATE TEMP TABLE distinct_dbnames AS + WITH RECURSIVE t(dbname) AS ( + SELECT MIN(dbname) AS dbname FROM %I.%I + UNION + SELECT (SELECT MIN(dbname) FROM %I.%I WHERE dbname > t.dbname) FROM t + ) + SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1; + $$, schema, metric_name, schema, metric_name); + + EXECUTE FORMAT( + $$ + DELETE FROM admin.all_distinct_dbname_metrics + WHERE dbname NOT IN (SELECT * FROM distinct_dbnames) + AND metric = '%s'; + $$, metric_name); + + EXECUTE FORMAT( + $$ + INSERT INTO admin.all_distinct_dbname_metrics + SELECT d.dbname, '%s' FROM distinct_dbnames AS d + WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s'); + $$, metric_name, metric_name); + + DROP TABLE distinct_dbnames; + +END; +$SQL$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(existing_metrics text[]) +RETURNS VOID +AS +$SQL$ +BEGIN + DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics); +END; $SQL$ LANGUAGE plpgsql; \ No newline at end of file From 9f3746ff914e86308a21f2435d5ac0ba048e9ff9 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 05:22:51 +0200 Subject: [PATCH 03/12] Test updating tables having `public.` prefix in the maintenance routine. --- internal/sinks/postgres_test.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index d234b98a6..6b3eba9bd 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -669,10 +669,10 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { r.NoError(err) t.Run("MaintainUniqueSources", func(_ *testing.T) { - // adds an entry to `admin.all_distinct_dbname_metrics` + // creates an empty metric table and adds + // an entry to `admin.all_distinct_dbname_metrics` err = pgw.SyncMetric("test", "test_metric_1", AddOp) r.NoError(err) - var numOfEntries int err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) @@ -680,8 +680,7 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { // manually call the maintenance routine pgw.MaintainUniqueSources() - - // entry should have been deleted, because it has no corresponding entries in `test_metric_1` table. + // entry should have been deleted, because there is now rows in `test_metric_1` table. err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) a.Equal(0, numOfEntries) @@ -696,19 +695,25 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { }, } pgw.flush(message) - // manually call the maintenance routine pgw.MaintainUniqueSources() - - // entry should have been added, because there is a corresponding entry in `test_metric_1` table just written. + // an entry should have been added, because there is a corresponding entry in `test_metric_1` table just written. err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) a.Equal(1, numOfEntries) + message[0].DBName = "test_db_2" + pgw.flush(message) + // explicitly use `public.*` prefix. + pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.update_listing_table(metric_table_name => 'public.test_metric_1');") + // another entry should have been added. + err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) + a.NoError(err) + a.Equal(2, numOfEntries) + _, err = conn.Exec(ctx, "DROP TABLE test_metric_1;") r.NoError(err) - - // the corresponding entry should be deleted + // all entries should be deleted pgw.MaintainUniqueSources() err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) @@ -730,10 +735,10 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { // create the 3rd level time partition with end bound yesterday _, err = conn.Exec(ctx, fmt.Sprintf( - `CREATE TABLE subpartitions.test_metric_2_dbname_time + `CREATE TABLE subpartitions.test_metric_2_dbname_time PARTITION OF subpartitions.test_metric_2_dbname FOR VALUES FROM ('%s') TO ('%s')`, - boundStart, boundEnd), + boundStart, boundEnd), ) a.NoError(err) _, err = conn.Exec(ctx, "COMMENT ON TABLE subpartitions.test_metric_2_dbname_time IS $$pgwatch-generated-metric-dbname-time-lvl$$") From e4a31675ef8d235f858e988f032b8603994e76d4 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 05:33:29 +0200 Subject: [PATCH 04/12] Fix linter errors. --- internal/sinks/postgres_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index 6b3eba9bd..cba193ba3 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -669,7 +669,7 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { r.NoError(err) t.Run("MaintainUniqueSources", func(_ *testing.T) { - // creates an empty metric table and adds + // creates an empty metric table and adds // an entry to `admin.all_distinct_dbname_metrics` err = pgw.SyncMetric("test", "test_metric_1", AddOp) r.NoError(err) @@ -705,7 +705,8 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { message[0].DBName = "test_db_2" pgw.flush(message) // explicitly use `public.*` prefix. - pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.update_listing_table(metric_table_name => 'public.test_metric_1');") + _, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.update_listing_table(metric_table_name => 'public.test_metric_1');") + a.NoError(err) // another entry should have been added. err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) @@ -735,10 +736,10 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { // create the 3rd level time partition with end bound yesterday _, err = conn.Exec(ctx, fmt.Sprintf( - `CREATE TABLE subpartitions.test_metric_2_dbname_time - PARTITION OF subpartitions.test_metric_2_dbname - FOR VALUES FROM ('%s') TO ('%s')`, - boundStart, boundEnd), + `CREATE TABLE subpartitions.test_metric_2_dbname_time + PARTITION OF subpartitions.test_metric_2_dbname + FOR VALUES FROM ('%s') TO ('%s')`, + boundStart, boundEnd), ) a.NoError(err) _, err = conn.Exec(ctx, "COMMENT ON TABLE subpartitions.test_metric_2_dbname_time IS $$pgwatch-generated-metric-dbname-time-lvl$$") From e31ae57aaa3e8a5ad0a1cca0e368a70122c77b23 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 05:49:10 +0200 Subject: [PATCH 05/12] remove `schema` variable from `admin.update_listing_table()` --- internal/sinks/sql/admin_functions.sql | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index 4aa2a8bbd..41a17554d 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -157,7 +157,6 @@ AS $SQL$ DECLARE metric_name text; - schema text = 'public'; BEGIN IF POSITION('public.' IN metric_table_name) > 0 THEN @@ -170,12 +169,12 @@ BEGIN $$ CREATE TEMP TABLE distinct_dbnames AS WITH RECURSIVE t(dbname) AS ( - SELECT MIN(dbname) AS dbname FROM %I.%I + SELECT MIN(dbname) AS dbname FROM public.%I UNION - SELECT (SELECT MIN(dbname) FROM %I.%I WHERE dbname > t.dbname) FROM t + SELECT (SELECT MIN(dbname) FROM public.%I WHERE dbname > t.dbname) FROM t ) SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1; - $$, schema, metric_name, schema, metric_name); + $$, metric_name, metric_name); EXECUTE FORMAT( $$ From f2ccd572bd042fd4cc5cbb0f33864237bd1139f7 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 06:21:26 +0200 Subject: [PATCH 06/12] Increase tests timeout. - the `MaintainUniqueSources` test calls `pgw.MaintainUniqueSources()` 3 times which sleeps a minute each time. --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0e8e9b82e..88d954c17 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -96,7 +96,7 @@ jobs: - name: Test run: | go generate ./api/pb/ - go test -failfast -v -timeout=300s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/... + go test -failfast -v -timeout=600s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/... - name: Coveralls uses: coverallsapp/github-action@v2 From 0429d851b96ab3221491d9d4d11bf2912ed64b31 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 06:55:07 +0200 Subject: [PATCH 07/12] Drop all tables for builtin metrics to avoid timeout. --- internal/sinks/postgres_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index cba193ba3..760eaada5 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -668,6 +668,13 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { pgw, err := NewPostgresWriter(ctx, connStr, opts) r.NoError(err) + // Drop tables for all builtin metrics to avoid timeout in + // in `MaintainUniqueSources` test. + for _, metric := range metrics.GetDefaultBuiltInMetrics() { + _, err = pgw.sinkDb.Exec(pgw.ctx, fmt.Sprintf("DROP TABLE %s;", metric)) + a.NoError(err) + } + t.Run("MaintainUniqueSources", func(_ *testing.T) { // creates an empty metric table and adds // an entry to `admin.all_distinct_dbname_metrics` From 40c1052897b047d3165801e649688f94465485db Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 06:57:26 +0200 Subject: [PATCH 08/12] Fix linter error --- internal/sinks/postgres_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index 760eaada5..193612bb1 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -668,7 +668,7 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { pgw, err := NewPostgresWriter(ctx, connStr, opts) r.NoError(err) - // Drop tables for all builtin metrics to avoid timeout in + // Drop tables for all builtin metrics to avoid timeout in // in `MaintainUniqueSources` test. for _, metric := range metrics.GetDefaultBuiltInMetrics() { _, err = pgw.sinkDb.Exec(pgw.ctx, fmt.Sprintf("DROP TABLE %s;", metric)) From 3458673c24afc8794ff0dfe19e924bb159e1cb38 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 07:42:51 +0200 Subject: [PATCH 09/12] Return number of inserted/deleted rows for logging. --- internal/sinks/postgres.go | 17 ++++++++++++++--- internal/sinks/sql/admin_functions.sql | 18 ++++++++++++++---- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index 0e0b064ab..622dda7e8 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -529,22 +529,33 @@ func (pgw *PostgresWriter) MaintainUniqueSources() { } // updates listing table entries for a single metric. - sqlUpdateListingTable := "SELECT admin.update_listing_table(metric_table_name => $1);" + sqlUpdateListingTable := "SELECT * FROM admin.update_listing_table(metric_table_name => $1);" for i, tableName := range allDistinctMetricTables { metricName := strings.Replace(tableName, "public.", "", 1) allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix. logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName) - if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlUpdateListingTable, tableName); err != nil { + var deletedRowsCnt, insertedRowsCnt int; + err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt); + if err != nil { logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err) } + if deletedRowsCnt > 0 { + logger.Infof("Removed %d stale entries from admin.all_distinct_dbname_metrics listing table for metric: %s", deletedRowsCnt, metricName) + } + if insertedRowsCnt > 0 { + logger.Infof("Added %d entries to admin.all_distinct_dbname_metrics listing table for metric: %s", insertedRowsCnt, metricName) + } time.Sleep(time.Minute) } // removes all entries for any non-existing table. sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)" - _, err = pgw.sinkDb.Exec(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables) + var deletedRowsCnt int; + err = pgw.sinkDb.QueryRow(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables).Scan(&deletedRowsCnt) if err != nil { logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err) + } else if deletedRowsCnt > 0 { + logger.Infof("Removed %d stale entries for dropped metric tables from admin.all_distinct_dbname_metrics listing table", deletedRowsCnt) } } diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index 41a17554d..8c6237f70 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -151,8 +151,11 @@ BEGIN END; $SQL$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_table_name text) -RETURNS VOID +CREATE OR REPLACE FUNCTION admin.update_listing_table( + metric_table_name text, + OUT deleted_rows_cnt int, + OUT inserted_rows_cnt int +) AS $SQL$ DECLARE @@ -183,6 +186,8 @@ BEGIN AND metric = '%s'; $$, metric_name); + GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT; + EXECUTE FORMAT( $$ INSERT INTO admin.all_distinct_dbname_metrics @@ -190,16 +195,21 @@ BEGIN WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s'); $$, metric_name, metric_name); + GET DIAGNOSTICS inserted_rows_cnt = ROW_COUNT; + DROP TABLE distinct_dbnames; END; $SQL$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(existing_metrics text[]) -RETURNS VOID +CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing( + existing_metrics text[], + OUT deleted_rows_cnt int +) AS $SQL$ BEGIN DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics); + GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT; END; $SQL$ LANGUAGE plpgsql; \ No newline at end of file From 5c6510c619b876c7ad8941a6a9c1f0138c6333ca Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Mon, 17 Nov 2025 07:45:21 +0200 Subject: [PATCH 10/12] Fix more linter errors --- internal/sinks/postgres.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index 622dda7e8..c11e9bd06 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -534,8 +534,8 @@ func (pgw *PostgresWriter) MaintainUniqueSources() { metricName := strings.Replace(tableName, "public.", "", 1) allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix. logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName) - var deletedRowsCnt, insertedRowsCnt int; - err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt); + var deletedRowsCnt, insertedRowsCnt int + err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt) if err != nil { logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err) } @@ -550,7 +550,7 @@ func (pgw *PostgresWriter) MaintainUniqueSources() { // removes all entries for any non-existing table. sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)" - var deletedRowsCnt int; + var deletedRowsCnt int err = pgw.sinkDb.QueryRow(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables).Scan(&deletedRowsCnt) if err != nil { logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err) From 1e0053acd385043aa3df8cd8f2c642ee72ca67c8 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Thu, 27 Nov 2025 00:10:57 +0200 Subject: [PATCH 11/12] Add `admin.maintain_tables()` sql function. - It increases the abstraction of the maintenance logic from the Go code by handling internally the fetch of all metric tables and updating them one by one and deleting entries for dropped tables. - To achieve so it uses the same functions used before in the Go code --- internal/sinks/postgres.go | 52 ++--------------------- internal/sinks/postgres_test.go | 10 ----- internal/sinks/sql/admin_functions.sql | 58 +++++++++++--------------- 3 files changed, 28 insertions(+), 92 deletions(-) diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index c11e9bd06..3331aed4d 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -507,55 +507,11 @@ func (pgw *PostgresWriter) DeleteOldPartitions() { // in each metric table in admin.all_distinct_dbname_metrics. // This is used to avoid listing the same source multiple times in Grafana dropdowns. func (pgw *PostgresWriter) MaintainUniqueSources() { + sql := "SELECT admin.maintain_tables() WHERE pg_try_advisory_lock(1571543679778230000)" logger := log.GetLogger(pgw.ctx) - - var lock bool - logger.Infof("Trying to get maintenance advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly - if err := pgw.sinkDb.QueryRow(pgw.ctx, "SELECT admin.try_get_maintenance_lock();").Scan(&lock); err != nil { - logger.Error("Getting maintenance advisory lock failed:", err) - return - } - if !lock { - logger.Info("Skipping maintenance as another instance has the advisory lock...") - return - } - - logger.Info("Updating admin.all_distinct_dbname_metrics listing table...") - rows, _ := pgw.sinkDb.Query(pgw.ctx, "SELECT table_name FROM admin.get_top_level_metric_tables()") - allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string]) - if err != nil { - logger.Error(err) - return - } - - // updates listing table entries for a single metric. - sqlUpdateListingTable := "SELECT * FROM admin.update_listing_table(metric_table_name => $1);" - for i, tableName := range allDistinctMetricTables { - metricName := strings.Replace(tableName, "public.", "", 1) - allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix. - logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName) - var deletedRowsCnt, insertedRowsCnt int - err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt) - if err != nil { - logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err) - } - if deletedRowsCnt > 0 { - logger.Infof("Removed %d stale entries from admin.all_distinct_dbname_metrics listing table for metric: %s", deletedRowsCnt, metricName) - } - if insertedRowsCnt > 0 { - logger.Infof("Added %d entries to admin.all_distinct_dbname_metrics listing table for metric: %s", insertedRowsCnt, metricName) - } - time.Sleep(time.Minute) - } - - // removes all entries for any non-existing table. - sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)" - var deletedRowsCnt int - err = pgw.sinkDb.QueryRow(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables).Scan(&deletedRowsCnt) - if err != nil { - logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err) - } else if deletedRowsCnt > 0 { - logger.Infof("Removed %d stale entries for dropped metric tables from admin.all_distinct_dbname_metrics listing table", deletedRowsCnt) + logger.Info("Starting maintenance...") + if _, err := pgw.sinkDb.Exec(pgw.ctx, sql); err != nil { + logger.Error("Maintaining 'admin.all_distinct_dbname_metrics' listing table failed:", err) } } diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index 193612bb1..cc9c630e9 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -709,16 +709,6 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { a.NoError(err) a.Equal(1, numOfEntries) - message[0].DBName = "test_db_2" - pgw.flush(message) - // explicitly use `public.*` prefix. - _, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.update_listing_table(metric_table_name => 'public.test_metric_1');") - a.NoError(err) - // another entry should have been added. - err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) - a.NoError(err) - a.Equal(2, numOfEntries) - _, err = conn.Exec(ctx, "DROP TABLE test_metric_1;") r.NoError(err) // all entries should be deleted diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index 8c6237f70..a9df0213b 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -140,34 +140,40 @@ BEGIN END; $SQL$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION admin.try_get_maintenance_lock( - OUT have_lock BOOLEAN -) +CREATE OR REPLACE FUNCTION admin.maintain_tables() +RETURNS VOID AS $SQL$ +DECLARE + rec record; + metric_name text; + existing_metrics text[]; BEGIN - -- 1571543679778230000 is just a random bigint - SELECT pg_try_advisory_lock(1571543679778230000) INTO have_lock; + FOR rec IN SELECT * FROM admin.get_top_level_metric_tables() + LOOP + IF POSITION('public.' IN rec.table_name) = 1 THEN + metric_name = SUBSTRING(rec.table_name FROM POSITION('public.' IN rec.table_name) + LENGTH('public.')); + ELSE + metric_name = rec.table_name; + END IF; + + SELECT array_append(existing_metrics, metric_name) INTO existing_metrics; + + EXECUTE FORMAT($$SELECT admin.update_listing_table(metric_name => '%s')$$, metric_name); + SELECT PG_SLEEP(60); + END LOOP; + + -- Delete entries for dropped tables + DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics) OR existing_metrics IS NULL; END; $SQL$ LANGUAGE plpgsql; -CREATE OR REPLACE FUNCTION admin.update_listing_table( - metric_table_name text, - OUT deleted_rows_cnt int, - OUT inserted_rows_cnt int -) +CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_name text) +RETURNS VOID AS $SQL$ -DECLARE - metric_name text; BEGIN - IF POSITION('public.' IN metric_table_name) > 0 THEN - metric_name = SUBSTRING(metric_table_name FROM POSITION('public.' IN metric_table_name) + LENGTH('public.')); - ELSE - metric_name = metric_table_name; - END IF; - EXECUTE FORMAT( $$ CREATE TEMP TABLE distinct_dbnames AS @@ -186,8 +192,6 @@ BEGIN AND metric = '%s'; $$, metric_name); - GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT; - EXECUTE FORMAT( $$ INSERT INTO admin.all_distinct_dbname_metrics @@ -195,21 +199,7 @@ BEGIN WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s'); $$, metric_name, metric_name); - GET DIAGNOSTICS inserted_rows_cnt = ROW_COUNT; - DROP TABLE distinct_dbnames; END; $SQL$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing( - existing_metrics text[], - OUT deleted_rows_cnt int -) -AS -$SQL$ -BEGIN - DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics); - GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT; -END; -$SQL$ LANGUAGE plpgsql; \ No newline at end of file From ef41ec74b52775d6b107945c147c9c8bc4d4bb10 Mon Sep 17 00:00:00 2001 From: 0xgouda Date: Thu, 27 Nov 2025 00:34:58 +0200 Subject: [PATCH 12/12] Ignore the result of `PG_SLEEP` by using `PERFORM`. --- internal/sinks/sql/admin_functions.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index a9df0213b..27867c762 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -160,7 +160,7 @@ BEGIN SELECT array_append(existing_metrics, metric_name) INTO existing_metrics; EXECUTE FORMAT($$SELECT admin.update_listing_table(metric_name => '%s')$$, metric_name); - SELECT PG_SLEEP(60); + PERFORM PG_SLEEP(60); END LOOP; -- Delete entries for dropped tables