Skip to content

Commit 560c3e8

Browse files
authored
[+] improve partition deletion in postgres sink, closes #786 (#985)
This commit addresses #786 using DETACH PARTITION before the actual dropping to avoid locking.
1 parent a3e9150 commit 560c3e8

File tree

3 files changed

+98
-283
lines changed

3 files changed

+98
-283
lines changed

internal/sinks/postgres.go

Lines changed: 16 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
var (
2323
cacheLimit = 256
2424
highLoadTimeout = time.Second * 5
25-
deleterDelay = time.Hour
2625
targetColumns = [...]string{"time", "dbname", "data", "tag_data"}
2726
)
2827

@@ -65,7 +64,7 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
6564
if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
6665
return
6766
}
68-
go pgw.deleteOldPartitions(deleterDelay)
67+
go pgw.deleteOldPartitions()
6968
go pgw.maintainUniqueSources()
7069
go pgw.poll()
7170
l.Info(`measurements sink is activated`)
@@ -171,7 +170,7 @@ func (pgw *PostgresWriter) EnsureBuiltinMetricDummies() (err error) {
171170

172171
// EnsureMetricDummy creates an empty table for a metric measurements if it doesn't exist
173172
func (pgw *PostgresWriter) EnsureMetricDummy(metric string) (err error) {
174-
_, err = pgw.sinkDb.Exec(pgw.ctx, "select admin.ensure_dummy_metrics_table($1)", metric)
173+
_, err = pgw.sinkDb.Exec(pgw.ctx, "SELECT admin.ensure_dummy_metrics_table($1)", metric)
175174
return
176175
}
177176

@@ -452,49 +451,19 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
452451
}
453452

454453
// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
455-
func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
456-
metricAgeDaysThreshold := pgw.opts.Retention
457-
if metricAgeDaysThreshold <= 0 {
454+
func (pgw *PostgresWriter) deleteOldPartitions() {
455+
if pgw.opts.Retention <= 0 {
458456
return
459457
}
460-
logger := log.GetLogger(pgw.ctx)
461-
select {
462-
case <-pgw.ctx.Done():
463-
return
464-
case <-time.After(delay):
465-
// to reduce distracting log messages at startup
466-
}
467-
458+
l := log.GetLogger(pgw.ctx)
468459
for {
469-
if pgw.metricSchema == DbStorageSchemaTimescale {
470-
partsDropped, err := pgw.DropOldTimePartitions(metricAgeDaysThreshold)
471-
if err != nil {
472-
logger.Errorf("Failed to drop old partitions (>%d days) from Postgres: %v", metricAgeDaysThreshold, err)
473-
continue
474-
}
475-
logger.Infof("Dropped %d old metric partitions...", partsDropped)
476-
} else if pgw.metricSchema == DbStorageSchemaPostgres {
477-
partsToDrop, err := pgw.GetOldTimePartitions(metricAgeDaysThreshold)
478-
if err != nil {
479-
logger.Errorf("Failed to get a listing of old (>%d days) time partitions from Postgres metrics DB - check that the admin.get_old_time_partitions() function is rolled out: %v", metricAgeDaysThreshold, err)
480-
time.Sleep(time.Second * 300)
481-
continue
482-
}
483-
if len(partsToDrop) > 0 {
484-
logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
485-
for _, toDrop := range partsToDrop {
486-
sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
487-
488-
if _, err := pgw.sinkDb.Exec(pgw.ctx, sqlDropTable); err != nil {
489-
logger.Errorf("Failed to drop old partition %s from Postgres metrics DB: %w", toDrop, err)
490-
time.Sleep(time.Second * 300)
491-
} else {
492-
time.Sleep(time.Second * 5)
493-
}
494-
}
495-
} else {
496-
logger.Infof("No old metric partitions found to drop...")
497-
}
460+
var partsDropped int
461+
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1)`,
462+
24*time.Hour*time.Duration(pgw.opts.Retention)).Scan(&partsDropped)
463+
if err != nil {
464+
l.Error("Could not drop old time partitions:", err)
465+
} else if partsDropped > 0 {
466+
l.Infof("Dropped %d old time partitions", partsDropped)
498467
}
499468
select {
500469
case <-pgw.ctx.Done():
@@ -597,26 +566,11 @@ func (pgw *PostgresWriter) maintainUniqueSources() {
597566
}
598567
}
599568

600-
func (pgw *PostgresWriter) DropOldTimePartitions(metricAgeDaysThreshold int) (res int, err error) {
601-
sqlOldPart := `select admin.drop_old_time_partitions($1, $2)`
602-
err = pgw.sinkDb.QueryRow(pgw.ctx, sqlOldPart, metricAgeDaysThreshold, false).Scan(&res)
603-
return
604-
}
605-
606-
func (pgw *PostgresWriter) GetOldTimePartitions(metricAgeDaysThreshold int) ([]string, error) {
607-
sqlGetOldParts := `select admin.get_old_time_partitions($1)`
608-
rows, err := pgw.sinkDb.Query(pgw.ctx, sqlGetOldParts, metricAgeDaysThreshold)
609-
if err == nil {
610-
return pgx.CollectRows(rows, pgx.RowTo[string])
611-
}
612-
return nil, err
613-
}
614-
615569
func (pgw *PostgresWriter) AddDBUniqueMetricToListingTable(dbUnique, metric string) error {
616-
sql := `insert into admin.all_distinct_dbname_metrics
617-
select $1, $2
618-
where not exists (
619-
select * from admin.all_distinct_dbname_metrics where dbname = $1 and metric = $2
570+
sql := `INSERT INTO admin.all_distinct_dbname_metrics
571+
SELECT $1, $2
572+
WHERE NOT EXISTS (
573+
SELECT * FROM admin.all_distinct_dbname_metrics WHERE dbname = $1 AND metric = $2
620574
)`
621575
_, err := pgw.sinkDb.Exec(pgw.ctx, sql, dbUnique, metric)
622576
return err

internal/sinks/postgres_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
4646
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
4747
conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
4848
for _, m := range metrics.GetDefaultBuiltInMetrics() {
49-
conn.ExpectExec("select admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
49+
conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
5050
}
5151

5252
opts := &CmdOpts{BatchingDelay: time.Hour, Retention: 356}
@@ -67,8 +67,8 @@ func TestSyncMetric(t *testing.T) {
6767
dbUnique := "mydb"
6868
metricName := "mymetric"
6969
op := AddOp
70-
conn.ExpectExec("insert into admin\\.all_distinct_dbname_metrics").WithArgs(dbUnique, metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
71-
conn.ExpectExec("select admin\\.ensure_dummy_metrics_table").WithArgs(metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
70+
conn.ExpectExec("INSERT INTO admin\\.all_distinct_dbname_metrics").WithArgs(dbUnique, metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
71+
conn.ExpectExec("SELECT admin\\.ensure_dummy_metrics_table").WithArgs(metricName).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
7272
err = pgw.SyncMetric(dbUnique, metricName, op)
7373
assert.NoError(t, err)
7474
assert.NoError(t, conn.ExpectationsWereMet())

0 commit comments

Comments
 (0)