diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 432a56217..e6a4d6e5b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -96,7 +96,7 @@ jobs: - name: Test run: | go generate ./api/pb/ - go test -failfast -v -timeout=300s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/... + go test -failfast -v -timeout=600s -p 1 -coverprofile=profile.cov ./cmd/... ./internal/... - name: Coveralls uses: coverallsapp/github-action@v2 diff --git a/internal/sinks/postgres.go b/internal/sinks/postgres.go index b3570a02c..51c1d04c7 100644 --- a/internal/sinks/postgres.go +++ b/internal/sinks/postgres.go @@ -507,97 +507,11 @@ func (pgw *PostgresWriter) DeleteOldPartitions() { // in each metric table in admin.all_distinct_dbname_metrics. // This is used to avoid listing the same source multiple times in Grafana dropdowns. func (pgw *PostgresWriter) MaintainUniqueSources() { + sql := "SELECT admin.maintain_tables() WHERE pg_try_advisory_lock(1571543679778230000)" logger := log.GetLogger(pgw.ctx) - - sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint - sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()` - sqlDistinct := ` - WITH RECURSIVE t(dbname) AS ( - SELECT MIN(dbname) AS dbname FROM %s - UNION - SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t - ) - SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1` - sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2` - sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1` - sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)` - sqlAdd := ` - INSERT INTO admin.all_distinct_dbname_metrics - SELECT u, $2 FROM (select unnest($1::text[]) as u) x - WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2) - RETURNING *` - - var lock bool - logger.Infof("Trying to get admin.all_distinct_dbname_metrics maintainer advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly - if err := pgw.sinkDb.QueryRow(pgw.ctx, sqlGetAdvisoryLock).Scan(&lock); err != nil { - logger.Error("Getting admin.all_distinct_dbname_metrics maintainer advisory lock failed:", err) - return - } - if !lock { - logger.Info("Skipping admin.all_distinct_dbname_metrics maintenance as another instance has the advisory lock...") - return - } - - logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...") - rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics) - allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string]) - if err != nil { - logger.Error(err) - return - } - - for i, tableName := range allDistinctMetricTables { - foundDbnamesMap := make(map[string]bool) - foundDbnamesArr := make([]string, 0) - - metricName := strings.Replace(tableName, "public.", "", 1) - // later usage in sqlDroppedTables requires no "public." prefix - allDistinctMetricTables[i] = metricName - - logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName) - rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName)) - ret, err := pgx.CollectRows(rows, pgx.RowTo[string]) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) - continue - } - for _, drDbname := range ret { - foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates - } - - // delete all that are not known and add all that are not there - for k := range foundDbnamesMap { - foundDbnamesArr = append(foundDbnamesArr, k) - } - if len(foundDbnamesArr) == 0 { // delete all entries for given metric - logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName) - - _, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName) - if err != nil { - logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err) - } - continue - } - cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) - } else if cmdTag.RowsAffected() > 0 { - logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName) - } - cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err) - } else if cmdTag.RowsAffected() > 0 { - logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName) - } - time.Sleep(time.Minute) - } - - cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables) - if err != nil { - logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err) - } else if cmdTag.RowsAffected() > 0 { - logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected()) + logger.Info("Starting maintenance...") + if _, err := pgw.sinkDb.Exec(pgw.ctx, sql); err != nil { + logger.Error("Maintaining 'admin.all_distinct_dbname_metrics' listing table failed:", err) } } diff --git a/internal/sinks/postgres_test.go b/internal/sinks/postgres_test.go index 728078ad8..b7ceeb6ae 100644 --- a/internal/sinks/postgres_test.go +++ b/internal/sinks/postgres_test.go @@ -669,11 +669,18 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { pgw, err := NewPostgresWriter(ctx, connStr, opts) r.NoError(err) + // Drop tables for all builtin metrics to avoid timeout in + // in `MaintainUniqueSources` test. + for _, metric := range metrics.GetDefaultBuiltInMetrics() { + _, err = pgw.sinkDb.Exec(pgw.ctx, fmt.Sprintf("DROP TABLE %s;", metric)) + a.NoError(err) + } + t.Run("MaintainUniqueSources", func(_ *testing.T) { - // adds an entry to `admin.all_distinct_dbname_metrics` + // creates an empty metric table and adds + // an entry to `admin.all_distinct_dbname_metrics` err = pgw.SyncMetric("test", "test_metric_1", AddOp) r.NoError(err) - var numOfEntries int err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) @@ -681,8 +688,7 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { // manually call the maintenance routine pgw.MaintainUniqueSources() - - // entry should have been deleted, because it has no corresponding entries in `test_metric_1` table. + // entry should have been deleted, because there is now rows in `test_metric_1` table. err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) a.Equal(0, numOfEntries) @@ -697,19 +703,16 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { }, } pgw.flush(message) - // manually call the maintenance routine pgw.MaintainUniqueSources() - - // entry should have been added, because there is a corresponding entry in `test_metric_1` table just written. + // an entry should have been added, because there is a corresponding entry in `test_metric_1` table just written. err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) a.Equal(1, numOfEntries) _, err = conn.Exec(ctx, "DROP TABLE test_metric_1;") r.NoError(err) - - // the corresponding entry should be deleted + // all entries should be deleted pgw.MaintainUniqueSources() err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries) a.NoError(err) @@ -732,8 +735,8 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) { _, err = conn.Exec(ctx, fmt.Sprintf( `CREATE TABLE subpartitions.test_metric_2_dbname_time - PARTITION OF subpartitions.test_metric_2_dbname - FOR VALUES FROM ('%s') TO ('%s')`, + PARTITION OF subpartitions.test_metric_2_dbname + FOR VALUES FROM ('%s') TO ('%s')`, boundStart, boundEnd), ) a.NoError(err) diff --git a/internal/sinks/sql/admin_functions.sql b/internal/sinks/sql/admin_functions.sql index a04d3433d..27867c762 100644 --- a/internal/sinks/sql/admin_functions.sql +++ b/internal/sinks/sql/admin_functions.sql @@ -139,3 +139,67 @@ BEGIN RETURN i; END; $SQL$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION admin.maintain_tables() +RETURNS VOID +AS +$SQL$ +DECLARE + rec record; + metric_name text; + existing_metrics text[]; +BEGIN + FOR rec IN SELECT * FROM admin.get_top_level_metric_tables() + LOOP + IF POSITION('public.' IN rec.table_name) = 1 THEN + metric_name = SUBSTRING(rec.table_name FROM POSITION('public.' IN rec.table_name) + LENGTH('public.')); + ELSE + metric_name = rec.table_name; + END IF; + + SELECT array_append(existing_metrics, metric_name) INTO existing_metrics; + + EXECUTE FORMAT($$SELECT admin.update_listing_table(metric_name => '%s')$$, metric_name); + PERFORM PG_SLEEP(60); + END LOOP; + + -- Delete entries for dropped tables + DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics) OR existing_metrics IS NULL; +END; +$SQL$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_name text) +RETURNS VOID +AS +$SQL$ +BEGIN + + EXECUTE FORMAT( + $$ + CREATE TEMP TABLE distinct_dbnames AS + WITH RECURSIVE t(dbname) AS ( + SELECT MIN(dbname) AS dbname FROM public.%I + UNION + SELECT (SELECT MIN(dbname) FROM public.%I WHERE dbname > t.dbname) FROM t + ) + SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1; + $$, metric_name, metric_name); + + EXECUTE FORMAT( + $$ + DELETE FROM admin.all_distinct_dbname_metrics + WHERE dbname NOT IN (SELECT * FROM distinct_dbnames) + AND metric = '%s'; + $$, metric_name); + + EXECUTE FORMAT( + $$ + INSERT INTO admin.all_distinct_dbname_metrics + SELECT d.dbname, '%s' FROM distinct_dbnames AS d + WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s'); + $$, metric_name, metric_name); + + DROP TABLE distinct_dbnames; + +END; +$SQL$ LANGUAGE plpgsql;