Skip to content

Commit 7f8be8c

Browse files
authored
[+] allow to specify sink retention period as interval value (#988)
Retention is now treated as Postgres interval. This allow people to use custom retention intervals, e.g. "1 month", "14 days", "42 hours"
1 parent 560c3e8 commit 7f8be8c

File tree

5 files changed

+14
-10
lines changed

5 files changed

+14
-10
lines changed

cmd/pgwatch/doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
// (default: 950ms)
6161
// [$PW_BATCHING_DELAY]
6262
// --retention= If set, metrics older than that will
63-
// be deleted (default: 14)
63+
// be deleted (default: "14 days")
6464
// [$PW_RETENTION]
6565
// --real-dbname-field= Tag key for real database name
6666
// (default: real_dbname)

docs/reference/cli_env.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ It reads the configuration from the specified sources and metrics, then begins c
9999

100100
- `--retention=`
101101

102-
If set, metrics older than that will be deleted (default: 14).
102+
If set, metrics older than that will be deleted (default: "14 days").
103103
ENV: `$PW_RETENTION`
104104

105105
- `--real-dbname-field=`

internal/sinks/cmdopts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import "time"
66
type CmdOpts struct {
77
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
88
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
9-
Retention int `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14" env:"PW_RETENTION"`
9+
Retention string `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14 days" env:"PW_RETENTION"`
1010
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
1111
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`
1212
}

internal/sinks/postgres.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
4343
lastError: make(chan error),
4444
sinkDb: conn,
4545
}
46+
var runDeleteOldPartitions bool
4647
if err = db.Init(ctx, pgw.sinkDb, func(ctx context.Context, conn db.PgxIface) error {
48+
if err = conn.QueryRow(ctx, "SELECT $1::interval > '0'::interval", opts.Retention).Scan(&runDeleteOldPartitions); err != nil {
49+
return err
50+
}
4751
l.Info("initialising measurements database...")
4852
exists, err := db.DoesSchemaExist(ctx, conn, "admin")
4953
if err != nil || exists {
@@ -64,7 +68,9 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
6468
if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
6569
return
6670
}
67-
go pgw.deleteOldPartitions()
71+
if runDeleteOldPartitions {
72+
go pgw.deleteOldPartitions()
73+
}
6874
go pgw.maintainUniqueSources()
6975
go pgw.poll()
7076
l.Info(`measurements sink is activated`)
@@ -452,14 +458,11 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
452458

453459
// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
454460
func (pgw *PostgresWriter) deleteOldPartitions() {
455-
if pgw.opts.Retention <= 0 {
456-
return
457-
}
458461
l := log.GetLogger(pgw.ctx)
459462
for {
460463
var partsDropped int
461-
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1)`,
462-
24*time.Hour*time.Duration(pgw.opts.Retention)).Scan(&partsDropped)
464+
err := pgw.sinkDb.QueryRow(pgw.ctx, `SELECT admin.drop_old_time_partitions(older_than => $1::interval)`,
465+
pgw.opts.Retention).Scan(&partsDropped)
463466
if err != nil {
464467
l.Error("Could not drop old time partitions:", err)
465468
} else if partsDropped > 0 {

internal/sinks/postgres_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
4343
assert.NoError(t, err)
4444

4545
conn.ExpectPing()
46+
conn.ExpectQuery("SELECT \\$1::interval").WithArgs("1 year").WillReturnRows(pgxmock.NewRows([]string{"col"}).AddRow(true))
4647
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
4748
conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
4849
for _, m := range metrics.GetDefaultBuiltInMetrics() {
4950
conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
5051
}
5152

52-
opts := &CmdOpts{BatchingDelay: time.Hour, Retention: 356}
53+
opts := &CmdOpts{BatchingDelay: time.Hour, Retention: "1 year"}
5354
pgw, err := NewWriterFromPostgresConn(ctx, conn, opts)
5455
assert.NoError(t, err)
5556
assert.NotNil(t, pgw)

0 commit comments

Comments
 (0)