From 961501b1f074c6e2df2e3df613a6b1e0f02cd618 Mon Sep 17 00:00:00 2001 From: Drew Sessler Date: Sun, 25 Jan 2026 22:48:07 -0800 Subject: [PATCH 1/2] Fix issue with resourcedetection processor failing with no detectors set. Minor refactoring of some collector code. Add/adjust tests for different otel detector configs. --- internal/collector/patroni.go | 36 ++-- internal/collector/patroni_test.go | 111 ++++++++++- internal/collector/pgadmin.go | 36 ++-- internal/collector/pgadmin_test.go | 132 ++++++++++++- internal/collector/pgbackrest.go | 26 ++- internal/collector/pgbackrest_test.go | 119 ++++++++++- internal/collector/pgbouncer.go | 36 ++-- internal/collector/pgbouncer_test.go | 115 ++++++++++- internal/collector/postgres.go | 62 +++--- internal/collector/postgres_test.go | 273 +++++++++++++++++++++++++- internal/collector/util.go | 4 +- 11 files changed, 857 insertions(+), 93 deletions(-) diff --git a/internal/collector/patroni.go b/internal/collector/patroni.go index 307d203fa5..5642493c92 100644 --- a/internal/collector/patroni.go +++ b/internal/collector/patroni.go @@ -17,12 +17,8 @@ func EnablePatroniLogging(ctx context.Context, inCluster *v1beta1.PostgresCluster, outConfig *Config, ) { - var spec *v1beta1.InstrumentationLogsSpec - if inCluster != nil && inCluster.Spec.Instrumentation != nil { - spec = inCluster.Spec.Instrumentation.Logs - } - if OpenTelemetryLogsEnabled(ctx, inCluster) { + spec := inCluster.Spec.Instrumentation directory := naming.PatroniPGDataLogPath // Keep track of what log records and files have been processed. @@ -117,21 +113,31 @@ func EnablePatroniLogging(ctx context.Context, // If there are exporters to be added to the logs pipelines defined in // the spec, add them to the pipeline. Otherwise, add the DebugExporter. exporters := []ComponentID{DebugExporter} - if spec != nil && spec.Exporters != nil { - exporters = slices.Clone(spec.Exporters) + if spec.Logs != nil && spec.Logs.Exporters != nil { + exporters = slices.Clone(spec.Logs.Exporters) + } + + patroniProcessors := []ComponentID{ + "resource/patroni", + "transform/patroni_logs", + } + + // We can only add the ResourceDetectionProcessor if there are detectors set, + // otherwise it will fail. This is due to a change in the following upstream commmit: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298 + if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 { + patroniProcessors = append(patroniProcessors, ResourceDetectionProcessor) } + // Order of processors matter so we add the batching and compacting processors after + // potentially adding the resourcedetection processor + patroniProcessors = append(patroniProcessors, LogsBatchProcessor, CompactingProcessor) + outConfig.Pipelines["logs/patroni"] = Pipeline{ Extensions: []ComponentID{"file_storage/patroni_logs"}, Receivers: []ComponentID{"filelog/patroni_jsonlog"}, - Processors: []ComponentID{ - "resource/patroni", - "transform/patroni_logs", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: patroniProcessors, + Exporters: exporters, } } } diff --git a/internal/collector/patroni_test.go b/internal/collector/patroni_test.go index 1cf7738f1a..ab4c3777b0 100644 --- a/internal/collector/patroni_test.go +++ b/internal/collector/patroni_test.go @@ -109,7 +109,6 @@ service: processors: - resource/patroni - transform/patroni_logs - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -117,7 +116,7 @@ service: `) }) - t.Run("InstrumentationSpecDefined", func(t *testing.T) { + t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) { gate := feature.NewGate() assert.NilError(t, gate.SetFromMap(map[string]bool{ feature.OpenTelemetryLogs: true, @@ -202,6 +201,114 @@ receivers: to: body.original type: move storage: file_storage/patroni_logs +service: + extensions: + - file_storage/patroni_logs + pipelines: + logs/patroni: + exporters: + - googlecloud + processors: + - resource/patroni + - transform/patroni_logs + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/patroni_jsonlog +`) + }) + + t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(context.Background(), gate) + + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.Instrumentation = testInstrumentationSpec() + cluster.Spec.Instrumentation.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{ + { + Name: "gcp", + }, + } + config := NewConfig(cluster.Spec.Instrumentation) + + EnablePatroniLogging(ctx, cluster, config) + + result, err := config.ToYAML() + assert.NilError(t, err) + assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT. +# Your changes will not be saved. +exporters: + debug: + verbosity: detailed + googlecloud: + log: + default_log_name: opentelemetry.io/collector-exported-log + project: google-project-name +extensions: + file_storage/patroni_logs: + create_directory: true + directory: /pgdata/patroni/log/receiver + fsync: true +processors: + batch/1s: + timeout: 1s + batch/200ms: + timeout: 200ms + batch/logs: + send_batch_size: 8192 + timeout: 200ms + groupbyattrs/compact: {} + resource/patroni: + attributes: + - action: insert + key: k8s.container.name + value: database + - action: insert + key: k8s.namespace.name + value: ${env:K8S_POD_NAMESPACE} + - action: insert + key: k8s.pod.name + value: ${env:K8S_POD_NAME} + - action: insert + key: process.executable.name + value: patroni + resourcedetection: + detectors: + - gcp + override: false + timeout: 30s + transform/patroni_logs: + log_statements: + - statements: + - set(instrumentation_scope.name, "patroni") + - set(log.cache, ParseJSON(log.body["original"])) + - set(log.severity_text, log.cache["levelname"]) + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text == + "DEBUG" + - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text == + "INFO" + - set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text == + "WARNING" + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text == + "ERROR" + - set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text == + "CRITICAL" + - set(log.time, Time(log.cache["asctime"], "%F %T,%L")) where IsString(log.cache["asctime"]) + - set(log.attributes["log.record.original"], log.body["original"]) + - set(log.body, log.cache["message"]) +receivers: + filelog/patroni_jsonlog: + include: + - /pgdata/patroni/log/*.log + - /pgdata/patroni/log/*.log.1 + operators: + - from: body + to: body.original + type: move + storage: file_storage/patroni_logs service: extensions: - file_storage/patroni_logs diff --git a/internal/collector/pgadmin.go b/internal/collector/pgadmin.go index f506d3ccdb..d1ddb8d7af 100644 --- a/internal/collector/pgadmin.go +++ b/internal/collector/pgadmin.go @@ -100,30 +100,34 @@ func EnablePgAdminLogging(ctx context.Context, spec *v1beta1.InstrumentationSpec exporters = slices.Clone(spec.Logs.Exporters) } + pgadminProcessors := []ComponentID{ + "resource/pgadmin", + "transform/pgadmin_log", + } + + // We can only add the ResourceDetectionProcessor if there are detectors set, + // otherwise it will fail. This is due to a change in the following upstream commmit: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298 + if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 { + pgadminProcessors = append(pgadminProcessors, ResourceDetectionProcessor) + } + + // Order of processors matter so we add the batching and compacting processors after + // potentially adding the resourcedetection processor + pgadminProcessors = append(pgadminProcessors, LogsBatchProcessor, CompactingProcessor) + otelConfig.Pipelines["logs/pgadmin"] = Pipeline{ Extensions: []ComponentID{"file_storage/pgadmin_data_logs"}, Receivers: []ComponentID{"filelog/pgadmin"}, - Processors: []ComponentID{ - "resource/pgadmin", - "transform/pgadmin_log", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: pgadminProcessors, + Exporters: exporters, } otelConfig.Pipelines["logs/gunicorn"] = Pipeline{ Extensions: []ComponentID{"file_storage/pgadmin_data_logs"}, Receivers: []ComponentID{"filelog/gunicorn"}, - Processors: []ComponentID{ - "resource/pgadmin", - "transform/pgadmin_log", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: pgadminProcessors, + Exporters: exporters, } otelYAML, err := otelConfig.ToYAML() diff --git a/internal/collector/pgadmin_test.go b/internal/collector/pgadmin_test.go index 5b1d2d7cd8..871a709988 100644 --- a/internal/collector/pgadmin_test.go +++ b/internal/collector/pgadmin_test.go @@ -114,7 +114,6 @@ collector.yaml: | processors: - resource/pgadmin - transform/pgadmin_log - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -125,7 +124,6 @@ collector.yaml: | processors: - resource/pgadmin - transform/pgadmin_log - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -133,7 +131,7 @@ collector.yaml: | `)) }) - t.Run("InstrumentationSpecDefined", func(t *testing.T) { + t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) { gate := feature.NewGate() assert.NilError(t, gate.SetFromMap(map[string]bool{ feature.OpenTelemetryLogs: true, @@ -232,6 +230,134 @@ collector.yaml: | include: - /var/lib/pgadmin/logs/pgadmin.log storage: file_storage/pgadmin_data_logs + service: + extensions: + - file_storage/pgadmin_data_logs + pipelines: + logs/gunicorn: + exporters: + - googlecloud + processors: + - resource/pgadmin + - transform/pgadmin_log + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/gunicorn + logs/pgadmin: + exporters: + - googlecloud + processors: + - resource/pgadmin + - transform/pgadmin_log + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/pgadmin +`)) + }) + + t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + + ctx := feature.NewContext(context.Background(), gate) + + var spec v1beta1.InstrumentationSpec + require.UnmarshalInto(t, &spec, `{ + config: { + detectors: [{name: gcp}], + exporters: { + googlecloud: { + log: { default_log_name: opentelemetry.io/collector-exported-log }, + project: google-project-name, + }, + }, + }, + logs: { exporters: [googlecloud] }, + }`) + + configmap := new(corev1.ConfigMap) + initialize.Map(&configmap.Data) + err := collector.EnablePgAdminLogging(ctx, &spec, configmap) + assert.NilError(t, err) + + assert.Assert(t, cmp.MarshalMatches(configmap.Data, ` +collector.yaml: | + # Generated by postgres-operator. DO NOT EDIT. + # Your changes will not be saved. + exporters: + debug: + verbosity: detailed + googlecloud: + log: + default_log_name: opentelemetry.io/collector-exported-log + project: google-project-name + extensions: + file_storage/pgadmin_data_logs: + create_directory: false + directory: /var/lib/pgadmin/logs/receiver + fsync: true + processors: + batch/1s: + timeout: 1s + batch/200ms: + timeout: 200ms + batch/logs: + send_batch_size: 8192 + timeout: 200ms + groupbyattrs/compact: {} + resource/pgadmin: + attributes: + - action: insert + key: k8s.container.name + value: pgadmin + - action: insert + key: k8s.namespace.name + value: ${env:K8S_POD_NAMESPACE} + - action: insert + key: k8s.pod.name + value: ${env:K8S_POD_NAME} + - action: insert + key: process.executable.name + value: pgadmin + resourcedetection: + detectors: + - gcp + override: false + timeout: 30s + transform/pgadmin_log: + log_statements: + - statements: + - set(log.attributes["log.record.original"], log.body) + - set(log.cache, ParseJSON(log.body)) + - merge_maps(log.attributes, ExtractPatterns(log.cache["message"], "(?P[A-Z]{3}.*?[\\d]{3})"), + "insert") + - set(log.body, log.cache["message"]) + - set(instrumentation_scope.name, log.cache["name"]) + - set(log.severity_text, log.cache["level"]) + - set(log.time_unix_nano, Int(log.cache["time"]*1000000000)) + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text == + "DEBUG" + - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text == + "INFO" + - set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text == + "WARNING" + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text == + "ERROR" + - set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text == + "CRITICAL" + receivers: + filelog/gunicorn: + include: + - /var/lib/pgadmin/logs/gunicorn.log + storage: file_storage/pgadmin_data_logs + filelog/pgadmin: + include: + - /var/lib/pgadmin/logs/pgadmin.log + storage: file_storage/pgadmin_data_logs service: extensions: - file_storage/pgadmin_data_logs diff --git a/internal/collector/pgbackrest.go b/internal/collector/pgbackrest.go index 480c360af4..d7041ee07b 100644 --- a/internal/collector/pgbackrest.go +++ b/internal/collector/pgbackrest.go @@ -95,17 +95,27 @@ func NewConfigForPgBackrestRepoHostPod( exporters = slices.Clone(spec.Logs.Exporters) } + pgbackrestProcessors := []ComponentID{ + "resource/pgbackrest", + "transform/pgbackrest_logs", + } + + // We can only add the ResourceDetectionProcessor if there are detectors set, + // otherwise it will fail. This is due to a change in the following upstream commmit: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298 + if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 { + pgbackrestProcessors = append(pgbackrestProcessors, ResourceDetectionProcessor) + } + + // Order of processors matter so we add the batching and compacting processors after + // potentially adding the resourcedetection processor + pgbackrestProcessors = append(pgbackrestProcessors, LogsBatchProcessor, CompactingProcessor) + config.Pipelines["logs/pgbackrest"] = Pipeline{ Extensions: []ComponentID{"file_storage/pgbackrest_logs"}, Receivers: []ComponentID{"filelog/pgbackrest_log"}, - Processors: []ComponentID{ - "resource/pgbackrest", - "transform/pgbackrest_logs", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: pgbackrestProcessors, + Exporters: exporters, } } return config diff --git a/internal/collector/pgbackrest_test.go b/internal/collector/pgbackrest_test.go index 2a539b6fd7..ef8c5bcbd7 100644 --- a/internal/collector/pgbackrest_test.go +++ b/internal/collector/pgbackrest_test.go @@ -115,7 +115,6 @@ service: processors: - resource/pgbackrest - transform/pgbackrest_logs - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -123,7 +122,7 @@ service: `) }) - t.Run("InstrumentationSpecDefined", func(t *testing.T) { + t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) { gate := feature.NewGate() assert.NilError(t, gate.SetFromMap(map[string]bool{ feature.OpenTelemetryLogs: true, @@ -214,6 +213,122 @@ receivers: multiline: line_start_pattern: ^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}|^-{19} storage: file_storage/pgbackrest_logs +service: + extensions: + - file_storage/pgbackrest_logs + pipelines: + logs/pgbackrest: + exporters: + - googlecloud + processors: + - resource/pgbackrest + - transform/pgbackrest_logs + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/pgbackrest_log +`) + }) + + t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(context.Background(), gate) + repos := []v1beta1.PGBackRestRepo{ + { + Name: "repo1", + Volume: new(v1beta1.RepoPVC), + }, + } + + instrumentationSpec := testInstrumentationSpec() + instrumentationSpec.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{ + { + Name: "gcp", + }, + } + config := NewConfigForPgBackrestRepoHostPod(ctx, instrumentationSpec, repos, "/another/directory") + + result, err := config.ToYAML() + assert.NilError(t, err) + assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT. +# Your changes will not be saved. +exporters: + debug: + verbosity: detailed + googlecloud: + log: + default_log_name: opentelemetry.io/collector-exported-log + project: google-project-name +extensions: + file_storage/pgbackrest_logs: + create_directory: false + directory: /another/directory/receiver + fsync: true +processors: + batch/1s: + timeout: 1s + batch/200ms: + timeout: 200ms + batch/logs: + send_batch_size: 8192 + timeout: 200ms + groupbyattrs/compact: {} + resource/pgbackrest: + attributes: + - action: insert + key: k8s.container.name + value: pgbackrest + - action: insert + key: k8s.namespace.name + value: ${env:K8S_POD_NAMESPACE} + - action: insert + key: k8s.pod.name + value: ${env:K8S_POD_NAME} + - action: insert + key: process.executable.name + value: pgbackrest + resourcedetection: + detectors: + - gcp + override: false + timeout: 30s + transform/pgbackrest_logs: + log_statements: + - statements: + - set(instrumentation_scope.name, "pgbackrest") + - set(instrumentation_scope.schema_url, "https://opentelemetry.io/schemas/1.29.0") + - 'merge_maps(log.cache, ExtractPatterns(log.body, "^(?\\d{4}-\\d{2}-\\d{2} + \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) (?P\\d{2,3})\\s*(?\\S*): + (?(?s).*)$"), "insert") where Len(log.body) > 0' + - set(log.severity_text, log.cache["error_severity"]) where IsString(log.cache["error_severity"]) + - set(log.severity_number, SEVERITY_NUMBER_TRACE) where log.severity_text == + "TRACE" + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text == + "DEBUG" + - set(log.severity_number, SEVERITY_NUMBER_DEBUG2) where log.severity_text == + "DETAIL" + - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text == + "INFO" + - set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text == + "WARN" + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text == + "ERROR" + - set(log.time, Time(log.cache["timestamp"], "%Y-%m-%d %H:%M:%S.%L")) where + IsString(log.cache["timestamp"]) + - set(log.attributes["process.pid"], log.cache["process_id"]) + - set(log.attributes["log.record.original"], log.body) + - set(log.body, log.cache["message"]) +receivers: + filelog/pgbackrest_log: + include: + - /another/directory/*.log + - /another/directory/*.log.1 + multiline: + line_start_pattern: ^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}|^-{19} + storage: file_storage/pgbackrest_logs service: extensions: - file_storage/pgbackrest_logs diff --git a/internal/collector/pgbouncer.go b/internal/collector/pgbouncer.go index 829c673210..d59d936669 100644 --- a/internal/collector/pgbouncer.go +++ b/internal/collector/pgbouncer.go @@ -52,12 +52,8 @@ func EnablePgBouncerLogging(ctx context.Context, outConfig *Config, logfile string, ) { - var spec *v1beta1.InstrumentationLogsSpec - if inCluster != nil && inCluster.Spec.Instrumentation != nil { - spec = inCluster.Spec.Instrumentation.Logs - } - if OpenTelemetryLogsEnabled(ctx, inCluster) { + spec := inCluster.Spec.Instrumentation directory := filepath.Dir(logfile) // Keep track of what log records and files have been processed. @@ -151,21 +147,31 @@ func EnablePgBouncerLogging(ctx context.Context, // If there are exporters to be added to the logs pipelines defined in // the spec, add them to the pipeline. Otherwise, add the DebugExporter. exporters := []ComponentID{DebugExporter} - if spec != nil && spec.Exporters != nil { - exporters = slices.Clone(spec.Exporters) + if spec.Logs != nil && spec.Logs.Exporters != nil { + exporters = slices.Clone(spec.Logs.Exporters) + } + + pgbouncerProcessors := []ComponentID{ + "resource/pgbouncer", + "transform/pgbouncer_logs", + } + + // We can only add the ResourceDetectionProcessor if there are detectors set, + // otherwise it will fail. This is due to a change in the following upstream commmit: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298 + if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 { + pgbouncerProcessors = append(pgbouncerProcessors, ResourceDetectionProcessor) } + // Order of processors matter so we add the batching and compacting processors after + // potentially adding the resourcedetection processor + pgbouncerProcessors = append(pgbouncerProcessors, LogsBatchProcessor, CompactingProcessor) + outConfig.Pipelines["logs/pgbouncer"] = Pipeline{ Extensions: []ComponentID{"file_storage/pgbouncer_logs"}, Receivers: []ComponentID{"filelog/pgbouncer_log"}, - Processors: []ComponentID{ - "resource/pgbouncer", - "transform/pgbouncer_logs", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: pgbouncerProcessors, + Exporters: exporters, } } } diff --git a/internal/collector/pgbouncer_test.go b/internal/collector/pgbouncer_test.go index 1a0f13e038..f0b1e2035c 100644 --- a/internal/collector/pgbouncer_test.go +++ b/internal/collector/pgbouncer_test.go @@ -108,7 +108,6 @@ service: processors: - resource/pgbouncer - transform/pgbouncer_logs - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -116,18 +115,18 @@ service: `) }) - t.Run("InstrumentationSpecDefined", func(t *testing.T) { + t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) { gate := feature.NewGate() assert.NilError(t, gate.SetFromMap(map[string]bool{ feature.OpenTelemetryLogs: true, })) ctx := feature.NewContext(context.Background(), gate) - config := NewConfig(testInstrumentationSpec()) - cluster := new(v1beta1.PostgresCluster) cluster.Spec.Instrumentation = testInstrumentationSpec() + config := NewConfig(cluster.Spec.Instrumentation) + EnablePgBouncerLogging(ctx, cluster, config, naming.PGBouncerFullLogPath) result, err := config.ToYAML() @@ -201,6 +200,114 @@ receivers: - /tmp/*.log - /tmp/*.log.1 storage: file_storage/pgbouncer_logs +service: + extensions: + - file_storage/pgbouncer_logs + pipelines: + logs/pgbouncer: + exporters: + - googlecloud + processors: + - resource/pgbouncer + - transform/pgbouncer_logs + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/pgbouncer_log +`) + }) + + t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(context.Background(), gate) + + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.Instrumentation = testInstrumentationSpec() + cluster.Spec.Instrumentation.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{ + { + Name: "gcp", + }, + } + + config := NewConfig(cluster.Spec.Instrumentation) + + EnablePgBouncerLogging(ctx, cluster, config, naming.PGBouncerFullLogPath) + + result, err := config.ToYAML() + assert.NilError(t, err) + assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT. +# Your changes will not be saved. +exporters: + debug: + verbosity: detailed + googlecloud: + log: + default_log_name: opentelemetry.io/collector-exported-log + project: google-project-name +extensions: + file_storage/pgbouncer_logs: + create_directory: false + directory: /tmp/receiver + fsync: true +processors: + batch/1s: + timeout: 1s + batch/200ms: + timeout: 200ms + batch/logs: + send_batch_size: 8192 + timeout: 200ms + groupbyattrs/compact: {} + resource/pgbouncer: + attributes: + - action: insert + key: k8s.container.name + value: pgbouncer + - action: insert + key: k8s.namespace.name + value: ${env:K8S_POD_NAMESPACE} + - action: insert + key: k8s.pod.name + value: ${env:K8S_POD_NAME} + - action: insert + key: process.executable.name + value: pgbouncer + resourcedetection: + detectors: + - gcp + override: false + timeout: 30s + transform/pgbouncer_logs: + log_statements: + - statements: + - set(instrumentation_scope.name, "pgbouncer") + - merge_maps(log.cache, ExtractPatterns(log.body, "^(?\\d{4}-\\d{2}-\\d{2} + \\d{2}:\\d{2}:\\d{2}\\.\\d{3} [A-Z]{3}) \\[(?\\d+)\\] (?[A-Z]+) + (?.*$)"), "insert") + - set(log.severity_text, log.cache["log_level"]) + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text == + "NOISE" or log.severity_text == "DEBUG" + - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text == + "LOG" + - set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text == + "WARNING" + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text == + "ERROR" + - set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text == + "FATAL" + - set(log.time, Time(log.cache["timestamp"], "%F %T.%L %Z")) where IsString(log.cache["timestamp"]) + - set(log.attributes["log.record.original"], log.body) + - set(log.attributes["process.pid"], log.cache["pid"]) + - set(log.body, log.cache["msg"]) +receivers: + filelog/pgbouncer_log: + include: + - /tmp/*.log + - /tmp/*.log.1 + storage: file_storage/pgbouncer_logs service: extensions: - file_storage/pgbouncer_logs diff --git a/internal/collector/postgres.go b/internal/collector/postgres.go index 2e06035e66..e993fcc15f 100644 --- a/internal/collector/postgres.go +++ b/internal/collector/postgres.go @@ -134,12 +134,8 @@ func EnablePostgresLogging( inParameters *postgres.ParameterSet, outConfig *Config, ) { - var spec *v1beta1.InstrumentationLogsSpec - if inCluster != nil && inCluster.Spec.Instrumentation != nil { - spec = inCluster.Spec.Instrumentation.Logs - } - if OpenTelemetryLogsEnabled(ctx, inCluster) { + spec := inCluster.Spec.Instrumentation directory := inParameters.Value("log_directory") version := inCluster.Spec.PostgresVersion @@ -229,8 +225,8 @@ func EnablePostgresLogging( // If there are exporters to be added to the logs pipelines defined in // the spec, add them to the pipeline. Otherwise, add the DebugExporter. exporters := []ComponentID{DebugExporter} - if spec != nil && spec.Exporters != nil { - exporters = slices.Clone(spec.Exporters) + if spec.Logs != nil && spec.Logs.Exporters != nil { + exporters = slices.Clone(spec.Logs.Exporters) } // JSON logs are preferable since PostgreSQL v15. These are enabled in [PostgreSQLParameters]. @@ -239,17 +235,27 @@ func EnablePostgresLogging( receivers = []ComponentID{"filelog/postgres_csvlog"} } + postgresProcessors := []ComponentID{ + "resource/postgres", + "transform/postgres_logs", + } + + // We can only add the ResourceDetectionProcessor if there are detectors set, + // otherwise it will fail. This is due to a change in the following upstream commmit: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298 + if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 { + postgresProcessors = append(postgresProcessors, ResourceDetectionProcessor) + } + + // Order of processors matter so we add the batching and compacting processors after + // potentially adding the resourcedetection processor + postgresProcessors = append(postgresProcessors, LogsBatchProcessor, CompactingProcessor) + outConfig.Pipelines["logs/postgres"] = Pipeline{ Extensions: []ComponentID{"file_storage/postgres_logs"}, Receivers: receivers, - Processors: []ComponentID{ - "resource/postgres", - "transform/postgres_logs", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: postgresProcessors, + Exporters: exporters, } // pgBackRest pipeline @@ -305,17 +311,27 @@ func EnablePostgresLogging( "log_statements": slices.Clone(pgBackRestLogsTransforms), } + pgbackrestProcessors := []ComponentID{ + "resource/pgbackrest", + "transform/pgbackrest_logs", + } + + // We can only add the ResourceDetectionProcessor if there are detectors set, + // otherwise it will fail. This is due to a change in the following upstream commmit: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298 + if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 { + pgbackrestProcessors = append(pgbackrestProcessors, ResourceDetectionProcessor) + } + + // Order of processors matter so we add the batching and compacting processors after + // potentially adding the resourcedetection processor + pgbackrestProcessors = append(pgbackrestProcessors, LogsBatchProcessor, CompactingProcessor) + outConfig.Pipelines["logs/pgbackrest"] = Pipeline{ Extensions: []ComponentID{"file_storage/pgbackrest_logs"}, Receivers: []ComponentID{"filelog/pgbackrest_log"}, - Processors: []ComponentID{ - "resource/pgbackrest", - "transform/pgbackrest_logs", - ResourceDetectionProcessor, - LogsBatchProcessor, - CompactingProcessor, - }, - Exporters: exporters, + Processors: pgbackrestProcessors, + Exporters: exporters, } } } diff --git a/internal/collector/postgres_test.go b/internal/collector/postgres_test.go index 62ecf25276..e0e76c1896 100644 --- a/internal/collector/postgres_test.go +++ b/internal/collector/postgres_test.go @@ -261,7 +261,6 @@ service: processors: - resource/pgbackrest - transform/pgbackrest_logs - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -272,7 +271,6 @@ service: processors: - resource/postgres - transform/postgres_logs - - resourcedetection - batch/logs - groupbyattrs/compact receivers: @@ -280,7 +278,7 @@ service: `) }) - t.Run("InstrumentationSpecDefined", func(t *testing.T) { + t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) { gate := feature.NewGate() assert.NilError(t, gate.SetFromMap(map[string]bool{ feature.OpenTelemetryLogs: true, @@ -515,6 +513,275 @@ receivers: type: add value: json storage: file_storage/postgres_logs +service: + extensions: + - file_storage/pgbackrest_logs + - file_storage/postgres_logs + pipelines: + logs/pgbackrest: + exporters: + - googlecloud + processors: + - resource/pgbackrest + - transform/pgbackrest_logs + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/pgbackrest_log + logs/postgres: + exporters: + - googlecloud + processors: + - resource/postgres + - transform/postgres_logs + - batch/logs + - groupbyattrs/compact + receivers: + - filelog/postgres_jsonlog +`) + }) + + t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) { + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.OpenTelemetryLogs: true, + })) + ctx := feature.NewContext(context.Background(), gate) + + cluster := new(v1beta1.PostgresCluster) + cluster.Spec.PostgresVersion = 99 + cluster.Spec.Instrumentation = testInstrumentationSpec() + cluster.Spec.Instrumentation.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{ + { + Name: "gcp", + }, + } + + config := NewConfig(cluster.Spec.Instrumentation) + params := postgres.NewParameters() + + EnablePostgresLogging(ctx, cluster, params.Default, config) + + result, err := config.ToYAML() + assert.NilError(t, err) + assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT. +# Your changes will not be saved. +exporters: + debug: + verbosity: detailed + googlecloud: + log: + default_log_name: opentelemetry.io/collector-exported-log + project: google-project-name +extensions: + file_storage/pgbackrest_logs: + create_directory: false + directory: /pgdata/pgbackrest/log/receiver + fsync: true + file_storage/postgres_logs: + create_directory: true + directory: /pgdata/logs/postgres/receiver + fsync: true +processors: + batch/1s: + timeout: 1s + batch/200ms: + timeout: 200ms + batch/logs: + send_batch_size: 8192 + timeout: 200ms + groupbyattrs/compact: {} + resource/pgbackrest: + attributes: + - action: insert + key: k8s.container.name + value: database + - action: insert + key: k8s.namespace.name + value: ${env:K8S_POD_NAMESPACE} + - action: insert + key: k8s.pod.name + value: ${env:K8S_POD_NAME} + - action: insert + key: process.executable.name + value: pgbackrest + resource/postgres: + attributes: + - action: insert + key: k8s.container.name + value: database + - action: insert + key: k8s.namespace.name + value: ${env:K8S_POD_NAMESPACE} + - action: insert + key: k8s.pod.name + value: ${env:K8S_POD_NAME} + - action: insert + key: process.executable.name + value: postgres + - action: insert + key: db.system + value: postgresql + - action: insert + key: db.version + value: "99" + resourcedetection: + detectors: + - gcp + override: false + timeout: 30s + transform/pgbackrest_logs: + log_statements: + - statements: + - set(instrumentation_scope.name, "pgbackrest") + - set(instrumentation_scope.schema_url, "https://opentelemetry.io/schemas/1.29.0") + - 'merge_maps(log.cache, ExtractPatterns(log.body, "^(?\\d{4}-\\d{2}-\\d{2} + \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) (?P\\d{2,3})\\s*(?\\S*): + (?(?s).*)$"), "insert") where Len(log.body) > 0' + - set(log.severity_text, log.cache["error_severity"]) where IsString(log.cache["error_severity"]) + - set(log.severity_number, SEVERITY_NUMBER_TRACE) where log.severity_text == + "TRACE" + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text == + "DEBUG" + - set(log.severity_number, SEVERITY_NUMBER_DEBUG2) where log.severity_text == + "DETAIL" + - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text == + "INFO" + - set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text == + "WARN" + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text == + "ERROR" + - set(log.time, Time(log.cache["timestamp"], "%Y-%m-%d %H:%M:%S.%L")) where + IsString(log.cache["timestamp"]) + - set(log.attributes["process.pid"], log.cache["process_id"]) + - set(log.attributes["log.record.original"], log.body) + - set(log.body, log.cache["message"]) + transform/postgres_logs: + log_statements: + - conditions: + - log.body["format"] == "csv" + statements: + - set(log.cache, ParseCSV(log.body["original"], log.body["headers"], delimiter=",", + mode="strict")) + - merge_maps(log.cache, ExtractPatterns(log.cache["connection_from"], "(?:^[[]local[]]:(?.+)|:(?[^:]+))$"), + "insert") where Len(log.cache["connection_from"]) > 0 + - set(log.cache["remote_host"], Substring(log.cache["connection_from"], 0, Len(log.cache["connection_from"]) + - Len(log.cache["remote_port"]) - 1)) where Len(log.cache["connection_from"]) + > 0 and IsString(log.cache["remote_port"]) + - set(log.cache["remote_host"], log.cache["connection_from"]) where Len(log.cache["connection_from"]) + > 0 and not IsString(log.cache["remote_host"]) + - merge_maps(log.cache, ExtractPatterns(log.cache["location"], "^(?:(?[^,]+), + )?(?[^:]+):(?\\d+)$"), "insert") where Len(log.cache["location"]) + > 0 + - set(log.cache["cursor_position"], Double(log.cache["cursor_position"])) where + IsMatch(log.cache["cursor_position"], "^[0-9.]+$") + - set(log.cache["file_line_num"], Double(log.cache["file_line_num"])) where + IsMatch(log.cache["file_line_num"], "^[0-9.]+$") + - set(log.cache["internal_position"], Double(log.cache["internal_position"])) + where IsMatch(log.cache["internal_position"], "^[0-9.]+$") + - set(log.cache["leader_pid"], Double(log.cache["leader_pid"])) where IsMatch(log.cache["leader_pid"], + "^[0-9.]+$") + - set(log.cache["line_num"], Double(log.cache["line_num"])) where IsMatch(log.cache["line_num"], + "^[0-9.]+$") + - set(log.cache["pid"], Double(log.cache["pid"])) where IsMatch(log.cache["pid"], + "^[0-9.]+$") + - set(log.cache["query_id"], Double(log.cache["query_id"])) where IsMatch(log.cache["query_id"], + "^[0-9.]+$") + - set(log.cache["remote_port"], Double(log.cache["remote_port"])) where IsMatch(log.cache["remote_port"], + "^[0-9.]+$") + - set(log.body["parsed"], log.cache) + - statements: + - set(instrumentation_scope.name, "postgres") + - set(instrumentation_scope.version, resource.attributes["db.version"]) + - set(log.cache, log.body["parsed"]) where log.body["format"] == "csv" + - set(log.cache, ParseJSON(log.body["original"])) where log.body["format"] == + "json" + - set(log.severity_text, log.cache["error_severity"]) + - set(log.severity_number, SEVERITY_NUMBER_TRACE) where log.severity_text == + "DEBUG5" + - set(log.severity_number, SEVERITY_NUMBER_TRACE2) where log.severity_text == + "DEBUG4" + - set(log.severity_number, SEVERITY_NUMBER_TRACE3) where log.severity_text == + "DEBUG3" + - set(log.severity_number, SEVERITY_NUMBER_TRACE4) where log.severity_text == + "DEBUG2" + - set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text == + "DEBUG1" + - set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text == + "INFO" or log.severity_text == "LOG" + - set(log.severity_number, SEVERITY_NUMBER_INFO2) where log.severity_text == + "NOTICE" + - set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text == + "WARNING" + - set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text == + "ERROR" + - set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text == + "FATAL" + - set(log.severity_number, SEVERITY_NUMBER_FATAL2) where log.severity_text == + "PANIC" + - set(log.time, Time(log.cache["timestamp"], "%F %T.%L %Z")) where IsString(log.cache["timestamp"]) + - set(instrumentation_scope.schema_url, "https://opentelemetry.io/schemas/1.29.0") + - set(resource.attributes["db.system"], "postgresql") + - set(log.attributes["log.record.original"], log.body["original"]) + - set(log.body, log.cache) + - set(log.attributes["client.address"], log.body["remote_host"]) where IsString(log.body["remote_host"]) + - set(log.attributes["client.port"], Int(log.body["remote_port"])) where IsDouble(log.body["remote_port"]) + - set(log.attributes["code.filepath"], log.body["file_name"]) where IsString(log.body["file_name"]) + - set(log.attributes["code.function"], log.body["func_name"]) where IsString(log.body["func_name"]) + - set(log.attributes["code.lineno"], Int(log.body["file_line_num"])) where IsDouble(log.body["file_line_num"]) + - set(log.attributes["db.namespace"], log.body["dbname"]) where IsString(log.body["dbname"]) + - set(log.attributes["db.response.status_code"], log.body["state_code"]) where + IsString(log.body["state_code"]) + - set(log.attributes["process.creation.time"], Concat([ Substring(log.body["session_start"], + 0, 10), "T", Substring(log.body["session_start"], 11, 8), "Z"], "")) where + IsMatch(log.body["session_start"], "^[^ ]{10} [^ ]{8} UTC$") + - set(log.attributes["process.pid"], Int(log.body["pid"])) where IsDouble(log.body["pid"]) + - set(log.attributes["process.title"], log.body["ps"]) where IsString(log.body["ps"]) + - set(log.attributes["user.name"], log.body["user"]) where IsString(log.body["user"]) + - conditions: + - 'Len(log.body["message"]) > 7 and Substring(log.body["message"], 0, 7) == + "AUDIT: "' + statements: + - set(log.body["pgaudit"], ParseCSV(Substring(log.body["message"], 7, Len(log.body["message"]) + - 7), "audit_type,statement_id,substatement_id,class,command,object_type,object_name,statement,parameter", + delimiter=",", mode="strict")) + - set(instrumentation_scope.name, "pgaudit") where Len(log.body["pgaudit"]) + > 0 +receivers: + filelog/pgbackrest_log: + include: + - /pgdata/pgbackrest/log/*.log + - /pgdata/pgbackrest/log/*.log.1 + multiline: + line_start_pattern: ^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}|^-{19} + storage: file_storage/pgbackrest_logs + filelog/postgres_csvlog: + include: + - /pgdata/logs/postgres/*.csv + multiline: + line_start_pattern: ^\d{4}-\d\d-\d\d \d\d:\d\d:\d\d.\d{3} UTC,(?:"[_\D](?:[^"]|"")*")?,(?:"[_\D](?:[^"]|"")*")?,\d*,(?:"(?:[^"]|"")+")?,[0-9a-f]+[.][0-9a-f]+,\d+, + operators: + - from: body + to: body.original + type: move + - field: body.format + type: add + value: csv + - field: body.headers + type: add + value: timestamp,user,dbname,pid,connection_from,session_id,line_num,ps,session_start,vxid,txid,error_severity,state_code,message,detail,hint,internal_query,internal_position,context,statement,cursor_position,location,application_name,backend_type,leader_pid,query_id + storage: file_storage/postgres_logs + filelog/postgres_jsonlog: + include: + - /pgdata/logs/postgres/*.json + operators: + - from: body + to: body.original + type: move + - field: body.format + type: add + value: json + storage: file_storage/postgres_logs service: extensions: - file_storage/pgbackrest_logs diff --git a/internal/collector/util.go b/internal/collector/util.go index ff5ae4ba3e..bb1cdc5a4e 100644 --- a/internal/collector/util.go +++ b/internal/collector/util.go @@ -21,9 +21,9 @@ func OpenTelemetrySpecPresent[T CrunchyCRD](object T) bool { case *v1beta1.InstrumentationSpec: return v != nil case *v1beta1.PostgresCluster: - return v.Spec.Instrumentation != nil + return v != nil && v.Spec.Instrumentation != nil case *v1beta1.PGAdmin: - return v.Spec.Instrumentation != nil + return v != nil && v.Spec.Instrumentation != nil default: return false } From 85629a9ba0812490122aadf7db4669e073b9a69b Mon Sep 17 00:00:00 2001 From: Drew Sessler Date: Mon, 26 Jan 2026 23:54:17 -0800 Subject: [PATCH 2/2] Bump collector version to 0.144.0 for pgdg build. --- components/image-collector/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/image-collector/Dockerfile b/components/image-collector/Dockerfile index dbef42f597..08d5f11081 100644 --- a/components/image-collector/Dockerfile +++ b/components/image-collector/Dockerfile @@ -4,7 +4,7 @@ # Rather than build the binary, retrieve the already-built binary from # the OpenTelemetry image -FROM otel/opentelemetry-collector-contrib:0.139.0 AS collector +FROM otel/opentelemetry-collector-contrib:0.144.0 AS collector # Aggregate the collector licenses from binary # and from root of the PGO repo