Skip to content

Commit 5bcbc1a

Browse files
0xgoudapashagolub
andauthored
[+] support variable partition intervals in postgres sink (#1012)
* [+] allow to specify interval for Postgres partitioning `admin.ensure_partition_metric_dbname_time()` acccepts now partition_period interval. The naming format is 'YYYYMMDD' for intervals bigger than 1 day and 'YYYYMMDD_HH24' for hour-based intervals. The function finds the last available partition bound and creates new partition based on it. So it's possible to use mixed length partitions, possibly with scheduler based on monitored source and/or metrics. * Add new `PartitionInterval` cli parameter. * Use `PartitionInterval` parameter in `EnsureMetricDbnameTime()` * Mention `--partition-interval` in docs. * Add validation logic for `PartitionInterval` * Add test for `PartitionInterval` validation * Test actual partitions creation * Add example usage of `--partition-interval` in docs. --------- Co-authored-by: Pavlo Golub <pavlo.golub@gmail.com>
1 parent 5410092 commit 5bcbc1a

File tree

6 files changed

+220
-165
lines changed

6 files changed

+220
-165
lines changed

docs/reference/cli_env.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ It reads the configuration from the specified sources and metrics, then begins c
9797
Sink-specific batching flush delay; may be ignored by some sinks (default: 950ms).
9898
ENV: `$PW_BATCHING_DELAY`
9999

100+
- `--partition-interval=`
101+
Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval expression. (default: 1 week)
102+
ENV: `$PW_PARTITION_INTERVAL`
103+
104+
Example:
105+
`--partition-inteval="3 weeks 4 days"`,
106+
100107
- `--retention=`
101108

102109
If set, metrics older than that will be deleted (default: "14 days").

internal/sinks/cmdopts.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import "time"
66
type CmdOpts struct {
77
Sinks []string `long:"sink" mapstructure:"sink" description:"URI where metrics will be stored, can be used multiple times" env:"PW_SINK"`
88
BatchingDelay time.Duration `long:"batching-delay" mapstructure:"batching-delay" description:"Sink-specific batching flush delay; may be ignored by some sinks" default:"950ms" env:"PW_BATCHING_DELAY"`
9+
PartitionInterval string `long:"partition-interval" mapstructure:"partition-interval" description:"Time range for PostgreSQL sink table partitions. Must be a valid PostgreSQL interval expression." default:"1 week" env:"PW_PARTITION_INTERVAL"`
910
Retention string `long:"retention" mapstructure:"retention" description:"If set, metrics older than that will be deleted" default:"14 days" env:"PW_RETENTION"`
1011
RealDbnameField string `long:"real-dbname-field" mapstructure:"real-dbname-field" description:"Tag key for real database name" env:"PW_REAL_DBNAME_FIELD" default:"real_dbname"`
1112
SystemIdentifierField string `long:"system-identifier-field" mapstructure:"system-identifier-field" description:"Tag key for system identifier value" env:"PW_SYSTEM_IDENTIFIER_FIELD" default:"sys_id"`

internal/sinks/postgres.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
4848
if err = conn.QueryRow(ctx, "SELECT $1::interval > '0'::interval", opts.Retention).Scan(&runDeleteOldPartitions); err != nil {
4949
return err
5050
}
51+
var isValidInterval bool
52+
err = conn.QueryRow(ctx, "SELECT $1::interval >= '1h'::interval", opts.PartitionInterval).Scan(&isValidInterval)
53+
if err != nil {
54+
return err
55+
}
56+
if !isValidInterval {
57+
return fmt.Errorf("partition interval must be at least 1 hour, got: %s", opts.PartitionInterval)
58+
}
59+
5160
l.Info("initialising measurements database...")
5261
exists, err := db.DoesSchemaExist(ctx, conn, "admin")
5362
if err != nil || exists {
@@ -318,7 +327,6 @@ func (pgw *PostgresWriter) flush(msgs []metrics.MeasurementEnvelope) {
318327
return
319328
}
320329
logger := log.GetLogger(pgw.ctx)
321-
// metricsToStorePerMetric := make(map[string][]MeasurementMessagePostgres)
322330
pgPartBounds := make(map[string]ExistingPartitionInfo) // metric=min/max
323331
pgPartBoundsDbName := make(map[string]map[string]ExistingPartitionInfo) // metric=[dbname=min/max]
324332
var err error
@@ -421,7 +429,7 @@ func (pgw *PostgresWriter) EnsureMetricTimescale(pgPartBounds map[string]Existin
421429

422430
func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[string]map[string]ExistingPartitionInfo, force bool) (err error) {
423431
var rows pgx.Rows
424-
sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3)`
432+
sqlEnsure := `select * from admin.ensure_partition_metric_dbname_time($1, $2, $3, $4)`
425433
for metric, dbnameTimestampMap := range metricDbnamePartBounds {
426434
_, ok := partitionMapMetricDbname[metric]
427435
if !ok {
@@ -434,7 +442,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
434442
}
435443
partInfo, ok := partitionMapMetricDbname[metric][dbname]
436444
if !ok || (ok && (pb.StartTime.Before(partInfo.StartTime))) || force {
437-
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
445+
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
438446
return
439447
}
440448
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {
@@ -443,7 +451,7 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
443451
partitionMapMetricDbname[metric][dbname] = partInfo
444452
}
445453
if pb.EndTime.After(partInfo.EndTime) || pb.EndTime.Equal(partInfo.EndTime) || force {
446-
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime); err != nil {
454+
if rows, err = pgw.sinkDb.Query(pgw.ctx, sqlEnsure, metric, dbname, pb.StartTime, pgw.opts.PartitionInterval); err != nil {
447455
return
448456
}
449457
if partInfo, err = pgx.CollectOneRow(rows, pgx.RowToStructByPos[ExistingPartitionInfo]); err != nil {

internal/sinks/postgres_test.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,18 @@ func TestNewWriterFromPostgresConn(t *testing.T) {
4444

4545
conn.ExpectPing()
4646
conn.ExpectQuery("SELECT \\$1::interval").WithArgs("1 year").WillReturnRows(pgxmock.NewRows([]string{"col"}).AddRow(true))
47+
conn.ExpectQuery("SELECT \\$1::interval").WithArgs("1 hour").WillReturnRows(pgxmock.NewRows([]string{"col"}).AddRow(true))
4748
conn.ExpectQuery("SELECT EXISTS").WithArgs("admin").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
4849
conn.ExpectQuery("SELECT schema_type").WillReturnRows(pgxmock.NewRows([]string{"schema_type"}).AddRow(true))
4950
for _, m := range metrics.GetDefaultBuiltInMetrics() {
5051
conn.ExpectExec("SELECT admin.ensure_dummy_metrics_table").WithArgs(m).WillReturnResult(pgxmock.NewResult("EXECUTE", 1))
5152
}
5253

53-
opts := &CmdOpts{BatchingDelay: time.Hour, Retention: "1 year"}
54+
opts := &CmdOpts{
55+
BatchingDelay: time.Hour,
56+
Retention: "1 year",
57+
PartitionInterval: "1 hour",
58+
}
5459
pgw, err := NewWriterFromPostgresConn(ctx, conn, opts)
5560
assert.NoError(t, err)
5661
assert.NotNil(t, pgw)
@@ -499,3 +504,79 @@ func TestCopyFromMeasurements_CopyFail(t *testing.T) {
499504
}
500505

501506
}
507+
508+
func TestPartitionInterval(t *testing.T) {
509+
a := assert.New(t)
510+
r := require.New(t)
511+
512+
const ImageName = "docker.io/postgres:17-alpine"
513+
pgContainer, err := postgres.Run(ctx,
514+
ImageName,
515+
postgres.WithDatabase("mydatabase"),
516+
testcontainers.WithWaitStrategy(
517+
wait.ForLog("database system is ready to accept connections").
518+
WithOccurrence(2).
519+
WithStartupTimeout(5*time.Second)),
520+
)
521+
r.NoError(err)
522+
defer func() { a.NoError(pgContainer.Terminate(ctx)) }()
523+
524+
connStr, _ := pgContainer.ConnectionString(ctx, "sslmode=disable")
525+
526+
opts := &CmdOpts{
527+
PartitionInterval: "1 minute",
528+
Retention: "14 days",
529+
BatchingDelay: time.Second,
530+
}
531+
532+
t.Run("Interval Validation", func(_ *testing.T) {
533+
_, err = NewPostgresWriter(ctx, connStr, opts)
534+
a.EqualError(err, "partition interval must be at least 1 hour, got: 1 minute")
535+
536+
opts.PartitionInterval = "not an interval"
537+
_, err = NewPostgresWriter(ctx, connStr, opts)
538+
a.Error(err)
539+
540+
validIntervals := []string{
541+
"3 days 4 hours", "1 year",
542+
"P3D", "PT3H", "0-02", "1 00:00:00",
543+
"P0-02", "P1", "2 weeks",
544+
}
545+
546+
for _, interval := range validIntervals {
547+
opts.PartitionInterval = interval
548+
_, err = NewPostgresWriter(ctx, connStr, opts)
549+
a.NoError(err)
550+
}
551+
})
552+
553+
t.Run("Partitions Creation", func(_ *testing.T) {
554+
opts.PartitionInterval = "3 weeks"
555+
pgw, err := NewPostgresWriter(ctx, connStr, opts)
556+
r.NoError(err)
557+
558+
conn, err := pgx.Connect(ctx, connStr)
559+
r.NoError(err)
560+
561+
m := map[string]map[string]ExistingPartitionInfo{
562+
"test_metric": {
563+
"test_db": {
564+
time.Now(), time.Now().Add(time.Hour),
565+
},
566+
},
567+
}
568+
err = pgw.EnsureMetricDbnameTime(m, false)
569+
r.NoError(err)
570+
571+
var partitionsNum int;
572+
err = conn.QueryRow(ctx, "SELECT COUNT(*) FROM pg_partition_tree('test_metric');").Scan(&partitionsNum)
573+
a.NoError(err)
574+
// 1 the metric table itself + 1 dbname partition
575+
// + 4 time partitions (1 we asked for + 3 precreated)
576+
a.Equal(6, partitionsNum)
577+
578+
part := partitionMapMetricDbname["test_metric"]["test_db"]
579+
// partition bounds should have a difference of 3 weeks
580+
a.Equal(part.StartTime.Add(3 * 7 * 24 * time.Hour), part.EndTime)
581+
})
582+
}
Lines changed: 105 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
1-
-- DROP FUNCTION admin.ensure_partition_metric_dbname_time(text,text,timestamp with time zone,integer);
2-
-- select * from admin.ensure_partition_metric_dbname_time('wal', 'kala', now());
3-
1+
/*
2+
ensure_partition_metric_dbname_time creates partitioned metric tables if not already existing
3+
metric - name of the metric (top level table)
4+
dbname - name of the database (2nd level partition)
5+
metric_timestamp - timestamp of the metric (used to determine the time partition)
6+
partition_period - interval for partitioning (e.g., '1 week', '1 day', '1 month')
7+
partitions_to_precreate - how many future time partitions to create (default 3)
8+
part_available_from - output parameter, start time of the time partition where the given metric_timestamp fits in
9+
part_available_to - output parameter, end time of the time partition where the given metric_timestamp fits in
10+
*/
411
CREATE OR REPLACE FUNCTION admin.ensure_partition_metric_dbname_time(
512
metric text,
613
dbname text,
714
metric_timestamp timestamptz,
15+
partition_period interval default '1 week'::interval,
816
partitions_to_precreate int default 3,
917
OUT part_available_from timestamptz,
1018
OUT part_available_to timestamptz)
@@ -15,92 +23,145 @@ RETURNS record AS
1523
*/
1624
$SQL$
1725
DECLARE
18-
l_year int;
19-
l_week int;
20-
l_doy int;
2126
l_part_name_2nd text;
2227
l_part_name_3rd text;
23-
l_part_start date;
24-
l_part_end date;
25-
l_sql text;
28+
l_part_start timestamptz;
29+
l_part_end timestamptz;
2630
ideal_length int;
2731
l_template_table text := 'admin.metrics_template';
2832
MAX_IDENT_LEN CONSTANT integer := current_setting('max_identifier_length')::int;
33+
l_partition_format text;
34+
l_time_suffix text;
35+
l_existing_upper_bound timestamptz;
2936
BEGIN
37+
-- Validate partition period
38+
IF partition_period < interval '1 hour' THEN
39+
RAISE EXCEPTION 'Partition period must be at least 1 hour, got: %', partition_period;
40+
END IF;
41+
42+
-- Determine partition naming format based on period
43+
CASE
44+
WHEN partition_period >= interval '1 day' THEN
45+
l_partition_format := 'YYYYMMDD';
46+
ELSE
47+
-- For hourly partitions (>= 1 hour, < 1 day)
48+
l_partition_format := 'YYYYMMDD_HH24';
49+
END CASE;
3050

3151
PERFORM pg_advisory_xact_lock(regexp_replace( md5(metric) , E'\\D', '', 'g')::varchar(10)::int8);
3252

53+
3354
-- 1. level
34-
IF NOT EXISTS (SELECT 1
35-
FROM pg_tables
36-
WHERE tablename = metric
37-
AND schemaname = 'public')
55+
IF to_regclass('public.' || quote_ident(metric)) IS NULL
3856
THEN
39-
-- RAISE NOTICE 'creating partition % ...', metric;
40-
EXECUTE format('CREATE TABLE IF NOT EXISTS public.%I (LIKE admin.metrics_template INCLUDING INDEXES) PARTITION BY LIST (dbname)', metric);
57+
EXECUTE format('CREATE TABLE public.%I (LIKE admin.metrics_template INCLUDING INDEXES) PARTITION BY LIST (dbname)', metric);
4158
EXECUTE format('COMMENT ON TABLE public.%I IS $$pgwatch-generated-metric-lvl$$', metric);
4259
END IF;
4360

4461
-- 2. level
45-
4662
l_part_name_2nd := metric || '_' || dbname;
47-
4863
IF char_length(l_part_name_2nd) > MAX_IDENT_LEN -- use "dbname" hash instead of name for overly long ones
4964
THEN
5065
ideal_length = MAX_IDENT_LEN - char_length(format('%s_', metric));
5166
l_part_name_2nd := metric || '_' || substring(md5(dbname) from 1 for ideal_length);
5267
END IF;
5368

54-
IF NOT EXISTS (SELECT 1
55-
FROM pg_tables
56-
WHERE tablename = l_part_name_2nd
57-
AND schemaname = 'subpartitions')
69+
IF to_regclass('subpartitions.' || quote_ident(l_part_name_2nd)) IS NULL
5870
THEN
59-
--RAISE NOTICE 'creating partition % ...', l_part_name_2nd;
60-
EXECUTE format('CREATE TABLE IF NOT EXISTS subpartitions.%I PARTITION OF public.%I FOR VALUES IN (%L) PARTITION BY RANGE (time)',
71+
EXECUTE format('CREATE TABLE subpartitions.%I PARTITION OF public.%I FOR VALUES IN (%L) PARTITION BY RANGE (time)',
6172
l_part_name_2nd, metric, dbname);
6273
EXECUTE format('COMMENT ON TABLE subpartitions.%s IS $$pgwatch-generated-metric-dbname-lvl$$', l_part_name_2nd);
6374
END IF;
6475

65-
-- 3. level
66-
FOR i IN 0..partitions_to_precreate LOOP
76+
-- 3. level
77+
78+
-- Get existing partition upper bound
79+
SELECT max(substring(pg_catalog.pg_get_expr(c.relpartbound, c.oid, true) from 'TO \(''([^'']+)''')::timestamptz)
80+
INTO l_existing_upper_bound
81+
FROM pg_catalog.pg_class c
82+
JOIN pg_catalog.pg_inherits i ON i.inhrelid = c.oid
83+
JOIN pg_catalog.pg_class parent ON parent.oid = i.inhparent
84+
WHERE c.relispartition
85+
AND c.relnamespace = 'subpartitions'::regnamespace
86+
AND parent.relname = l_part_name_2nd;
6787

68-
l_year := extract(isoyear from (metric_timestamp + '1week'::interval * i));
69-
l_week := extract(week from (metric_timestamp + '1week'::interval * i));
88+
-- Determine starting point for new partitions
89+
IF l_existing_upper_bound IS NOT NULL THEN
90+
-- Start from the existing upper bound to maintain continuity
91+
l_part_start := l_existing_upper_bound;
92+
ELSE
93+
-- No existing partitions, align to clean boundaries based on period size
94+
CASE
95+
WHEN partition_period >= interval '1 week' THEN
96+
l_part_start := date_trunc('week', metric_timestamp);
97+
WHEN partition_period >= interval '1 day' THEN
98+
l_part_start := date_trunc('day', metric_timestamp);
99+
ELSE
100+
-- For hourly periods (>= 1 hour, < 1 day)
101+
l_part_start := date_trunc('hour', metric_timestamp);
102+
END CASE;
103+
104+
-- For the first partition, set the available range
105+
part_available_from := l_part_start;
106+
part_available_to := l_part_start + partition_period;
107+
END IF;
70108

71-
IF i = 0 THEN
72-
l_part_start := to_date(l_year::text || l_week::text, 'iyyyiw');
73-
l_part_end := l_part_start + '1week'::interval;
109+
-- Create partitions
110+
FOR i IN 0..partitions_to_precreate LOOP
111+
l_part_end := l_part_start + partition_period;
112+
113+
-- Update the available range for the first partition only if we started from metric_timestamp
114+
IF i = 0 AND l_existing_upper_bound IS NULL THEN
74115
part_available_from := l_part_start;
75116
part_available_to := l_part_end;
76-
ELSE
77-
l_part_start := l_part_start + '1week'::interval;
78-
l_part_end := l_part_start + '1week'::interval;
79-
part_available_to := l_part_end;
117+
ELSIF i = 0 AND l_existing_upper_bound IS NOT NULL THEN
118+
-- For existing partitions, we need to find which partition contains the metric_timestamp
119+
IF metric_timestamp >= l_part_start AND metric_timestamp < l_part_end THEN
120+
part_available_from := l_part_start;
121+
part_available_to := l_part_end;
122+
END IF;
80123
END IF;
81124

82-
l_part_name_3rd := format('%s_%s_y%sw%s', metric, dbname, l_year, to_char(l_week, 'fm00' ));
125+
l_time_suffix := to_char(l_part_start, l_partition_format);
126+
l_part_name_3rd := format('%s_%s_%s', metric, dbname, l_time_suffix);
83127

84128
IF char_length(l_part_name_3rd) > MAX_IDENT_LEN -- use "dbname" hash instead of name for overly long ones
85129
THEN
86-
ideal_length = MAX_IDENT_LEN - char_length(format('%s__y%sw%s', metric, l_year, to_char(l_week, 'fm00')));
87-
l_part_name_3rd := format('%s_%s_y%sw%s', metric, substring(md5(dbname) from 1 for ideal_length), l_year, to_char(l_week, 'fm00' ));
130+
ideal_length = MAX_IDENT_LEN - char_length(format('%s__%s', metric, l_time_suffix));
131+
l_part_name_3rd := format('%s_%s_%s', metric, substring(md5(dbname) from 1 for ideal_length), l_time_suffix);
88132
END IF;
89133

90-
IF NOT EXISTS (SELECT 1
91-
FROM pg_tables
92-
WHERE tablename = l_part_name_3rd
93-
AND schemaname = 'subpartitions')
134+
IF to_regclass('subpartitions.' || quote_ident(l_part_name_3rd)) IS NULL
94135
THEN
95-
l_sql := format($$CREATE TABLE IF NOT EXISTS subpartitions.%I PARTITION OF subpartitions.%I FOR VALUES FROM ('%s') TO ('%s')$$,
136+
EXECUTE format('CREATE TABLE subpartitions.%I PARTITION OF subpartitions.%I FOR VALUES FROM ($$%s$$) TO ($$%s$$)',
96137
l_part_name_3rd, l_part_name_2nd, l_part_start, l_part_end);
97-
EXECUTE l_sql;
98138
EXECUTE format('COMMENT ON TABLE subpartitions.%I IS $$pgwatch-generated-metric-dbname-time-lvl$$', l_part_name_3rd);
99139
END IF;
100140

141+
l_part_start := l_part_end;
101142
END LOOP;
143+
144+
-- If we still don't have part_available_from/to set, find the partition containing metric_timestamp
145+
IF part_available_from IS NULL THEN
146+
SELECT lower_text::timestamptz, upper_text::timestamptz
147+
INTO part_available_from, part_available_to
148+
FROM (
149+
SELECT substring(pg_catalog.pg_get_expr(c.relpartbound, c.oid, true) from 'FOR VALUES FROM \(''([^'']+)''') AS lower_text,
150+
substring(pg_catalog.pg_get_expr(c.relpartbound, c.oid, true) from 'TO \(''([^'']+)''') AS upper_text
151+
FROM pg_catalog.pg_class c
152+
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
153+
JOIN pg_catalog.pg_inherits i ON i.inhrelid = c.oid
154+
JOIN pg_catalog.pg_class parent ON parent.oid = i.inhparent
155+
WHERE c.relispartition
156+
AND n.nspname = 'subpartitions'
157+
AND parent.relname = l_part_name_2nd
158+
) AS partitions
159+
WHERE metric_timestamp >= lower_text::timestamptz
160+
AND metric_timestamp < upper_text::timestamptz
161+
LIMIT 1;
162+
END IF;
102163

103164
END;
104165
$SQL$ LANGUAGE plpgsql;
105166

106-
-- GRANT EXECUTE ON FUNCTION admin.ensure_partition_metric_dbname_time(text,text,timestamp with time zone,integer) TO pgwatch;
167+
-- GRANT EXECUTE ON FUNCTION admin.ensure_partition_metric_dbname_time(text,text,timestamp with time zone,interval,integer) TO pgwatch;

0 commit comments

Comments
 (0)