Skip to content

Commit b864423

Browse files
0xgoudapashagolub
authored andcommitted
move listing table update logic to PL/pgSQL functions.
- move individual metric update logic to `admin.update_listing_table()`. - move dropped tables removal logic to `admin.remove_dropped_tables_listing()`. - use these new functions in the maintenance routine.
1 parent 0c149a8 commit b864423

File tree

2 files changed

+66
-64
lines changed

2 files changed

+66
-64
lines changed

internal/sinks/postgres.go

Lines changed: 12 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -509,23 +509,6 @@ func (pgw *PostgresWriter) DeleteOldPartitions() {
509509
func (pgw *PostgresWriter) MaintainUniqueSources() {
510510
logger := log.GetLogger(pgw.ctx)
511511

512-
sqlTopLevelMetrics := `SELECT table_name FROM admin.get_top_level_metric_tables()`
513-
sqlDistinct := `
514-
WITH RECURSIVE t(dbname) AS (
515-
SELECT MIN(dbname) AS dbname FROM %s
516-
UNION
517-
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t
518-
)
519-
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
520-
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
521-
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
522-
sqlDroppedTables := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL($1)`
523-
sqlAdd := `
524-
INSERT INTO admin.all_distinct_dbname_metrics
525-
SELECT u, $2 FROM (select unnest($1::text[]) as u) x
526-
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
527-
RETURNING *`
528-
529512
var lock bool
530513
logger.Infof("Trying to get maintenance advisory lock...") // to only have one "maintainer" in case of a "push" setup, as can get costly
531514
if err := pgw.sinkDb.QueryRow(pgw.ctx, "SELECT admin.try_get_maintenance_lock();").Scan(&lock); err != nil {
@@ -537,66 +520,31 @@ func (pgw *PostgresWriter) MaintainUniqueSources() {
537520
return
538521
}
539522

540-
logger.Info("Refreshing admin.all_distinct_dbname_metrics listing table...")
541-
rows, _ := pgw.sinkDb.Query(pgw.ctx, sqlTopLevelMetrics)
523+
logger.Info("Updating admin.all_distinct_dbname_metrics listing table...")
524+
rows, _ := pgw.sinkDb.Query(pgw.ctx, "SELECT table_name FROM admin.get_top_level_metric_tables()")
542525
allDistinctMetricTables, err := pgx.CollectRows(rows, pgx.RowTo[string])
543526
if err != nil {
544527
logger.Error(err)
545528
return
546529
}
547530

531+
// updates listing table entries for a single metric.
532+
sqlUpdateListingTable := "SELECT admin.update_listing_table(metric_table_name => $1);"
548533
for i, tableName := range allDistinctMetricTables {
549-
foundDbnamesMap := make(map[string]bool)
550-
foundDbnamesArr := make([]string, 0)
551-
552534
metricName := strings.Replace(tableName, "public.", "", 1)
553-
// later usage in sqlDroppedTables requires no "public." prefix
554-
allDistinctMetricTables[i] = metricName
555-
556-
logger.Debugf("Refreshing all_distinct_dbname_metrics listing for metric: %s", metricName)
557-
rows, _ := pgw.sinkDb.Query(pgw.ctx, fmt.Sprintf(sqlDistinct, tableName, tableName))
558-
ret, err := pgx.CollectRows(rows, pgx.RowTo[string])
559-
if err != nil {
560-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
561-
continue
562-
}
563-
for _, drDbname := range ret {
564-
foundDbnamesMap[drDbname] = true // "set" behaviour, don't want duplicates
565-
}
566-
567-
// delete all that are not known and add all that are not there
568-
for k := range foundDbnamesMap {
569-
foundDbnamesArr = append(foundDbnamesArr, k)
570-
}
571-
if len(foundDbnamesArr) == 0 { // delete all entries for given metric
572-
logger.Debugf("Deleting Postgres all_distinct_dbname_metrics listing table entries for metric '%s':", metricName)
573-
574-
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlDeleteAll, metricName)
575-
if err != nil {
576-
logger.Errorf("Could not delete Postgres all_distinct_dbname_metrics listing table entries for metric '%s': %s", metricName, err)
577-
}
578-
continue
579-
}
580-
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDelete, foundDbnamesArr, metricName)
581-
if err != nil {
582-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
583-
} else if cmdTag.RowsAffected() > 0 {
584-
logger.Infof("Removed %d stale entries from all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
585-
}
586-
cmdTag, err = pgw.sinkDb.Exec(pgw.ctx, sqlAdd, foundDbnamesArr, metricName)
587-
if err != nil {
588-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for metric '%s': %s", metricName, err)
589-
} else if cmdTag.RowsAffected() > 0 {
590-
logger.Infof("Added %d entry to the Postgres all_distinct_dbname_metrics listing table for metric: %s", cmdTag.RowsAffected(), metricName)
535+
allDistinctMetricTables[i] = metricName // later usage in sqlDroppedTables requires no "public." prefix.
536+
logger.Debugf("Updating admin.all_distinct_dbname_metrics listing for metric: %s", metricName)
537+
if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlUpdateListingTable, tableName); err != nil {
538+
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing for metric '%s': %s", metricName, err)
591539
}
592540
time.Sleep(time.Minute)
593541
}
594542

595-
cmdTag, err := pgw.sinkDb.Exec(pgw.ctx, sqlDroppedTables, allDistinctMetricTables)
543+
// removes all entries for any non-existing table.
544+
sqlRemoveDroppedTables := "SELECT admin.remove_dropped_tables_listing(existing_metrics => $1)"
545+
_, err = pgw.sinkDb.Exec(pgw.ctx, sqlRemoveDroppedTables, allDistinctMetricTables)
596546
if err != nil {
597-
logger.Errorf("Could not refresh Postgres all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
598-
} else if cmdTag.RowsAffected() > 0 {
599-
logger.Infof("Removed %d stale entries for dropped tables from all_distinct_dbname_metrics listing table", cmdTag.RowsAffected())
547+
logger.Errorf("Could not update admin.all_distinct_dbname_metrics listing table for dropped metric tables: %s", err)
600548
}
601549
}
602550

internal/sinks/sql/admin_functions.sql

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,58 @@ BEGIN
149149
-- 1571543679778230000 is just a random bigint
150150
SELECT pg_try_advisory_lock(1571543679778230000) INTO have_lock;
151151
END;
152+
$SQL$ LANGUAGE plpgsql;
153+
154+
CREATE OR REPLACE FUNCTION admin.update_listing_table(metric_table_name text)
155+
RETURNS VOID
156+
AS
157+
$SQL$
158+
DECLARE
159+
metric_name text;
160+
schema text = 'public';
161+
BEGIN
162+
163+
IF POSITION('public.' IN metric_table_name) > 0 THEN
164+
metric_name = SUBSTRING(metric_table_name FROM POSITION('public.' IN metric_table_name) + LENGTH('public.'));
165+
ELSE
166+
metric_name = metric_table_name;
167+
END IF;
168+
169+
EXECUTE FORMAT(
170+
$$
171+
CREATE TEMP TABLE distinct_dbnames AS
172+
WITH RECURSIVE t(dbname) AS (
173+
SELECT MIN(dbname) AS dbname FROM %I.%I
174+
UNION
175+
SELECT (SELECT MIN(dbname) FROM %I.%I WHERE dbname > t.dbname) FROM t
176+
)
177+
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1;
178+
$$, schema, metric_name, schema, metric_name);
179+
180+
EXECUTE FORMAT(
181+
$$
182+
DELETE FROM admin.all_distinct_dbname_metrics
183+
WHERE dbname NOT IN (SELECT * FROM distinct_dbnames)
184+
AND metric = '%s';
185+
$$, metric_name);
186+
187+
EXECUTE FORMAT(
188+
$$
189+
INSERT INTO admin.all_distinct_dbname_metrics
190+
SELECT d.dbname, '%s' FROM distinct_dbnames AS d
191+
WHERE NOT EXISTS (SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = d.dbname AND metric = '%s');
192+
$$, metric_name, metric_name);
193+
194+
DROP TABLE distinct_dbnames;
195+
196+
END;
197+
$SQL$ LANGUAGE plpgsql;
198+
199+
CREATE OR REPLACE FUNCTION admin.remove_dropped_tables_listing(existing_metrics text[])
200+
RETURNS VOID
201+
AS
202+
$SQL$
203+
BEGIN
204+
DELETE FROM admin.all_distinct_dbname_metrics WHERE metric != ALL(existing_metrics);
205+
END;
152206
$SQL$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)