Skip to content

Commit 1e0053a

Browse files
committed
Add admin.maintain_tables() sql function.
- It increases the abstraction of the maintenance logic from the Go code by handling internally the fetch of all metric tables and updating them one by one and deleting entries for dropped tables. - To achieve so it uses the same functions used before in the Go code
1 parent 5c6510c commit 1e0053a

File tree

3 files changed

+28
-92
lines changed

3 files changed

+28
-92
lines changed

internal/sinks/postgres.go

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -507,55 +507,11 @@ func (pgw *PostgresWriter) DeleteOldPartitions() {
507507
// in each metric table in admin.all_distinct_dbname_metrics.
508508
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
509509
func (pgw *PostgresWriter) MaintainUniqueSources() {
510+
sql := "SELECT admin.maintain_tables() WHERE pg_try_advisory_lock(1571543679778230000)"
510511
logger := log.GetLogger(pgw.ctx)
511-
512-
var lock bool
513-
logger.Infof("Trying to get maintenance advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
514-
if err := pgw.sinkDb.QueryRow(pgw.ctx, "SELECT admin.try_get_maintenance_lock();").Scan(&lock); err != nil {
515-
logger.Error("Getting maintenance advisory lock failed:", err)
516-
return
517-
}
518-
if !lock {
519-
logger.Info("Skipping maintenance as another instance has the advisory lock...")
520-
return
521-
}
522-
523-
logger.Info("Updating admin.all_distinct_dbname_metrics listing table...")
524-
rows, _ := pgw.sinkDb.Query(pgw.ctx, "SELECT table_name FROM admin.get_top_level_metric_tables()")
525-
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
526-
if err != nil {
527-
logger.Error(err)
528-
return
529-
}
530-
531-
// updates listing table entries for a single metric.
532-
sqlUpdateListingTable := "SELECT * FROM admin.update_listing_table(metric_table_name => $1);"
533-
for i, tableName := range allDistinctMetricTables {
534-
metricName := strings.Replace(tableName, "public.", "", 1)
535-
allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix.
536-
logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName)
537-
var deletedRowsCnt, insertedRowsCnt int
538-
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlUpdateListingTable, tableName).Scan(&deletedRowsCnt, &insertedRowsCnt)
539-
if err != nil {
540-
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err)
541-
}
542-
if deletedRowsCnt > 0 {
543-
logger.Infof("Removed %d stale entries from admin.all_distinct_dbname_metrics listing table for metric: %s", deletedRowsCnt, metricName)
544-
}
545-
if insertedRowsCnt > 0 {
546-
logger.Infof("Added %d entries to admin.all_distinct_dbname_metrics listing table for metric: %s", insertedRowsCnt, metricName)
547-
}
548-
time.Sleep(time.Minute)
549-
}
550-
551-
// removes all entries for any non-existing table.
552-
sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)"
553-
var deletedRowsCnt int
554-
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables).Scan(&deletedRowsCnt)
555-
if err != nil {
556-
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
557-
} else if deletedRowsCnt > 0 {
558-
logger.Infof("Removed %d stale entries for dropped metric tables from admin.all_distinct_dbname_metrics listing table", deletedRowsCnt)
512+
logger.Info("Starting maintenance...")
513+
if _, err := pgw.sinkDb.Exec(pgw.ctx, sql); err != nil {
514+
logger.Error("Maintaining 'admin.all_distinct_dbname_metrics' listing table failed:", err)
559515
}
560516
}
561517

internal/sinks/postgres_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -709,16 +709,6 @@ func Test_MaintainUniqueSources_DeleteOldPartitions(t *testing.T) {
709709
a.NoError(err)
710710
a.Equal(1, numOfEntries)
711711

712-
message[0].DBName = "test_db_2"
713-
pgw.flush(message)
714-
// explicitly use `public.*` prefix.
715-
_, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.update_listing_table(metric_table_name => 'public.test_metric_1');")
716-
a.NoError(err)
717-
// another entry should have been added.
718-
err = conn.QueryRow(ctx, "SELECT count(*) FROM admin.all_distinct_dbname_metrics;").Scan(&numOfEntries)
719-
a.NoError(err)
720-
a.Equal(2, numOfEntries)
721-
722712
_, err = conn.Exec(ctx, "DROP TABLE test_metric_1;")
723713
r.NoError(err)
724714
// all entries should be deleted

internal/sinks/sql/admin_functions.sql

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -140,34 +140,40 @@ BEGIN
140140
END;
141141
$SQL$ LANGUAGE plpgsql;
142142

143-
CREATE OR REPLACE FUNCTION admin.try_get_maintenance_lock(
144-
OUT have_lock BOOLEAN
145-
)
143+
CREATE OR REPLACE FUNCTION admin.maintain_tables()
144+
RETURNS VOID
146145
AS
147146
$SQL$
147+
DECLARE
148+
rec record;
149+
metric_name text;
150+
existing_metrics text[];
148151
BEGIN
149-
-- 1571543679778230000 is just a random bigint
150-
SELECT pg_try_advisory_lock(1571543679778230000) INTO have_lock;
152+
FOR rec IN SELECT * FROM admin.get_top_level_metric_tables()
153+
LOOP
154+
IF POSITION('public.' IN rec.table_name) = 1 THEN
155+
metric_name = SUBSTRING(rec.table_name FROM POSITION('public.' IN rec.table_name) + LENGTH('public.'));
156+
ELSE
157+
metric_name = rec.table_name;
158+
END IF;
159+
160+
SELECT array_append(existing_metrics, metric_name) INTO existing_metrics;
161+
162+
EXECUTE FORMAT($$SELECT admin.update_listing_table(metric_name => '%s')$$, metric_name);
163+
SELECT PG_SLEEP(60);
164+
END LOOP;
165+
166+
-- Delete entries for dropped tables
167+
DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics) OR existing_metrics IS NULL;
151168
END;
152169
$SQL$ LANGUAGE plpgsql;
153170

154-
CREATE OR REPLACE FUNCTION admin.update_listing_table(
155-
metric_table_name text,
156-
OUT deleted_rows_cnt int,
157-
OUT inserted_rows_cnt int
158-
)
171+
CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_name text)
172+
RETURNS VOID
159173
AS
160174
$SQL$
161-
DECLARE
162-
metric_name text;
163175
BEGIN
164176

165-
IF POSITION('public.' IN metric_table_name) > 0 THEN
166-
metric_name = SUBSTRING(metric_table_name FROM POSITION('public.' IN metric_table_name) + LENGTH('public.'));
167-
ELSE
168-
metric_name = metric_table_name;
169-
END IF;
170-
171177
EXECUTE FORMAT(
172178
$$
173179
CREATE TEMP TABLE distinct_dbnames AS
@@ -186,30 +192,14 @@ BEGIN
186192
AND metric = '%s';
187193
$$, metric_name);
188194

189-
GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT;
190-
191195
EXECUTE FORMAT(
192196
$$
193197
INSERT INTO admin.all_distinct_dbname_metrics
194198
SELECT d.dbname, '%s' FROM distinct_dbnames AS d
195199
WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s');
196200
$$, metric_name, metric_name);
197201

198-
GET DIAGNOSTICS inserted_rows_cnt = ROW_COUNT;
199-
200202
DROP TABLE distinct_dbnames;
201203

202204
END;
203205
$SQL$ LANGUAGE plpgsql;
204-
205-
CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(
206-
existing_metrics text[],
207-
OUT deleted_rows_cnt int
208-
)
209-
AS
210-
$SQL$
211-
BEGIN
212-
DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics);
213-
GET DIAGNOSTICS deleted_rows_cnt = ROW_COUNT;
214-
END;
215-
$SQL$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)