Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions internal/collector/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ func EnablePatroniLogging(ctx context.Context,
inCluster *v1beta1.PostgresCluster,
outConfig *Config,
) {
var spec *v1beta1.InstrumentationLogsSpec
if inCluster != nil && inCluster.Spec.Instrumentation != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Did something change so that these checks are no longer necessary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we checked them to ensure that accessing inCluster.Spec.Instrumentation.Logs wouldn't result in a nil pointer error, but the inCluster.Spec.Instrumentation != nil check happens in the OpenTelemetryLogsEnabled function call below anyways, so we can get rid of them and move the spec assignment inside the if block... That being said, OpenTelemetryLogsEnabled doesn't check that inCluster != nil, so we actually theoretically could have had nil pointer exceptions even before my change, although I don't really know if this is possible since I can't think of a reason why we'd be passing around a nil pointer to a v1beta1.PostgresCluster... Either way, probably a good idea to stick a check in there.

spec = inCluster.Spec.Instrumentation.Logs
}

if OpenTelemetryLogsEnabled(ctx, inCluster) {
spec := inCluster.Spec.Instrumentation
directory := naming.PatroniPGDataLogPath

// Keep track of what log records and files have been processed.
Expand Down Expand Up @@ -117,21 +113,31 @@ func EnablePatroniLogging(ctx context.Context,
// If there are exporters to be added to the logs pipelines defined in
// the spec, add them to the pipeline. Otherwise, add the DebugExporter.
exporters := []ComponentID{DebugExporter}
if spec != nil && spec.Exporters != nil {
exporters = slices.Clone(spec.Exporters)
if spec.Logs != nil && spec.Logs.Exporters != nil {
exporters = slices.Clone(spec.Logs.Exporters)
}

patroniProcessors := []ComponentID{
"resource/patroni",
"transform/patroni_logs",
}

// We can only add the ResourceDetectionProcessor if there are detectors set,
// otherwise it will fail. This is due to a change in the following upstream commmit:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298
if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 {
patroniProcessors = append(patroniProcessors, ResourceDetectionProcessor)
}

// Order of processors matter so we add the batching and compacting processors after
// potentially adding the resourcedetection processor
patroniProcessors = append(patroniProcessors, LogsBatchProcessor, CompactingProcessor)

outConfig.Pipelines["logs/patroni"] = Pipeline{
Extensions: []ComponentID{"file_storage/patroni_logs"},
Receivers: []ComponentID{"filelog/patroni_jsonlog"},
Processors: []ComponentID{
"resource/patroni",
"transform/patroni_logs",
ResourceDetectionProcessor,
LogsBatchProcessor,
CompactingProcessor,
},
Exporters: exporters,
Processors: patroniProcessors,
Exporters: exporters,
}
}
}
Expand Down
111 changes: 109 additions & 2 deletions internal/collector/patroni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,14 @@ service:
processors:
- resource/patroni
- transform/patroni_logs
- resourcedetection
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/patroni_jsonlog
`)
})

t.Run("InstrumentationSpecDefined", func(t *testing.T) {
t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
Expand Down Expand Up @@ -202,6 +201,114 @@ receivers:
to: body.original
type: move
storage: file_storage/patroni_logs
service:
extensions:
- file_storage/patroni_logs
pipelines:
logs/patroni:
exporters:
- googlecloud
processors:
- resource/patroni
- transform/patroni_logs
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/patroni_jsonlog
`)
})

t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))
ctx := feature.NewContext(context.Background(), gate)

cluster := new(v1beta1.PostgresCluster)
cluster.Spec.Instrumentation = testInstrumentationSpec()
cluster.Spec.Instrumentation.Config.Detectors = []v1beta1.OpenTelemetryResourceDetector{
{
Name: "gcp",
},
}
config := NewConfig(cluster.Spec.Instrumentation)

EnablePatroniLogging(ctx, cluster, config)

result, err := config.ToYAML()
assert.NilError(t, err)
assert.DeepEqual(t, result, `# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
debug:
verbosity: detailed
googlecloud:
log:
default_log_name: opentelemetry.io/collector-exported-log
project: google-project-name
extensions:
file_storage/patroni_logs:
create_directory: true
directory: /pgdata/patroni/log/receiver
fsync: true
processors:
batch/1s:
timeout: 1s
batch/200ms:
timeout: 200ms
batch/logs:
send_batch_size: 8192
timeout: 200ms
groupbyattrs/compact: {}
resource/patroni:
attributes:
- action: insert
key: k8s.container.name
value: database
- action: insert
key: k8s.namespace.name
value: ${env:K8S_POD_NAMESPACE}
- action: insert
key: k8s.pod.name
value: ${env:K8S_POD_NAME}
- action: insert
key: process.executable.name
value: patroni
resourcedetection:
detectors:
- gcp
override: false
timeout: 30s
transform/patroni_logs:
log_statements:
- statements:
- set(instrumentation_scope.name, "patroni")
- set(log.cache, ParseJSON(log.body["original"]))
- set(log.severity_text, log.cache["levelname"])
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text ==
"DEBUG"
- set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text ==
"INFO"
- set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text ==
"WARNING"
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text ==
"ERROR"
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text ==
"CRITICAL"
- set(log.time, Time(log.cache["asctime"], "%F %T,%L")) where IsString(log.cache["asctime"])
- set(log.attributes["log.record.original"], log.body["original"])
- set(log.body, log.cache["message"])
receivers:
filelog/patroni_jsonlog:
include:
- /pgdata/patroni/log/*.log
- /pgdata/patroni/log/*.log.1
operators:
- from: body
to: body.original
type: move
storage: file_storage/patroni_logs
service:
extensions:
- file_storage/patroni_logs
Expand Down
36 changes: 20 additions & 16 deletions internal/collector/pgadmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,34 @@ func EnablePgAdminLogging(ctx context.Context, spec *v1beta1.InstrumentationSpec
exporters = slices.Clone(spec.Logs.Exporters)
}

pgadminProcessors := []ComponentID{
"resource/pgadmin",
"transform/pgadmin_log",
}

// We can only add the ResourceDetectionProcessor if there are detectors set,
// otherwise it will fail. This is due to a change in the following upstream commmit:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/commit/50cd2e8433cee1e292e7b7afac9758365f3a1298
if spec.Config != nil && spec.Config.Detectors != nil && len(spec.Config.Detectors) > 0 {
pgadminProcessors = append(pgadminProcessors, ResourceDetectionProcessor)
}

// Order of processors matter so we add the batching and compacting processors after
// potentially adding the resourcedetection processor
pgadminProcessors = append(pgadminProcessors, LogsBatchProcessor, CompactingProcessor)

otelConfig.Pipelines["logs/pgadmin"] = Pipeline{
Extensions: []ComponentID{"file_storage/pgadmin_data_logs"},
Receivers: []ComponentID{"filelog/pgadmin"},
Processors: []ComponentID{
"resource/pgadmin",
"transform/pgadmin_log",
ResourceDetectionProcessor,
LogsBatchProcessor,
CompactingProcessor,
},
Exporters: exporters,
Processors: pgadminProcessors,
Exporters: exporters,
}

otelConfig.Pipelines["logs/gunicorn"] = Pipeline{
Extensions: []ComponentID{"file_storage/pgadmin_data_logs"},
Receivers: []ComponentID{"filelog/gunicorn"},
Processors: []ComponentID{
"resource/pgadmin",
"transform/pgadmin_log",
ResourceDetectionProcessor,
LogsBatchProcessor,
CompactingProcessor,
},
Exporters: exporters,
Processors: pgadminProcessors,
Exporters: exporters,
}

otelYAML, err := otelConfig.ToYAML()
Expand Down
132 changes: 129 additions & 3 deletions internal/collector/pgadmin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ collector.yaml: |
processors:
- resource/pgadmin
- transform/pgadmin_log
- resourcedetection
- batch/logs
- groupbyattrs/compact
receivers:
Expand All @@ -125,15 +124,14 @@ collector.yaml: |
processors:
- resource/pgadmin
- transform/pgadmin_log
- resourcedetection
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/pgadmin
`))
})

t.Run("InstrumentationSpecDefined", func(t *testing.T) {
t.Run("InstrumentationSpecDefinedNoDetectors", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
Expand Down Expand Up @@ -232,6 +230,134 @@ collector.yaml: |
include:
- /var/lib/pgadmin/logs/pgadmin.log
storage: file_storage/pgadmin_data_logs
service:
extensions:
- file_storage/pgadmin_data_logs
pipelines:
logs/gunicorn:
exporters:
- googlecloud
processors:
- resource/pgadmin
- transform/pgadmin_log
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/gunicorn
logs/pgadmin:
exporters:
- googlecloud
processors:
- resource/pgadmin
- transform/pgadmin_log
- batch/logs
- groupbyattrs/compact
receivers:
- filelog/pgadmin
`))
})

t.Run("InstrumentationSpecDefinedDetectorSet", func(t *testing.T) {
gate := feature.NewGate()
assert.NilError(t, gate.SetFromMap(map[string]bool{
feature.OpenTelemetryLogs: true,
}))

ctx := feature.NewContext(context.Background(), gate)

var spec v1beta1.InstrumentationSpec
require.UnmarshalInto(t, &spec, `{
config: {
detectors: [{name: gcp}],
exporters: {
googlecloud: {
log: { default_log_name: opentelemetry.io/collector-exported-log },
project: google-project-name,
},
},
},
logs: { exporters: [googlecloud] },
}`)

configmap := new(corev1.ConfigMap)
initialize.Map(&configmap.Data)
err := collector.EnablePgAdminLogging(ctx, &spec, configmap)
assert.NilError(t, err)

assert.Assert(t, cmp.MarshalMatches(configmap.Data, `
collector.yaml: |
# Generated by postgres-operator. DO NOT EDIT.
# Your changes will not be saved.
exporters:
debug:
verbosity: detailed
googlecloud:
log:
default_log_name: opentelemetry.io/collector-exported-log
project: google-project-name
extensions:
file_storage/pgadmin_data_logs:
create_directory: false
directory: /var/lib/pgadmin/logs/receiver
fsync: true
processors:
batch/1s:
timeout: 1s
batch/200ms:
timeout: 200ms
batch/logs:
send_batch_size: 8192
timeout: 200ms
groupbyattrs/compact: {}
resource/pgadmin:
attributes:
- action: insert
key: k8s.container.name
value: pgadmin
- action: insert
key: k8s.namespace.name
value: ${env:K8S_POD_NAMESPACE}
- action: insert
key: k8s.pod.name
value: ${env:K8S_POD_NAME}
- action: insert
key: process.executable.name
value: pgadmin
resourcedetection:
detectors:
- gcp
override: false
timeout: 30s
transform/pgadmin_log:
log_statements:
- statements:
- set(log.attributes["log.record.original"], log.body)
- set(log.cache, ParseJSON(log.body))
- merge_maps(log.attributes, ExtractPatterns(log.cache["message"], "(?P<webrequest>[A-Z]{3}.*?[\\d]{3})"),
"insert")
- set(log.body, log.cache["message"])
- set(instrumentation_scope.name, log.cache["name"])
- set(log.severity_text, log.cache["level"])
- set(log.time_unix_nano, Int(log.cache["time"]*1000000000))
- set(log.severity_number, SEVERITY_NUMBER_DEBUG) where log.severity_text ==
"DEBUG"
- set(log.severity_number, SEVERITY_NUMBER_INFO) where log.severity_text ==
"INFO"
- set(log.severity_number, SEVERITY_NUMBER_WARN) where log.severity_text ==
"WARNING"
- set(log.severity_number, SEVERITY_NUMBER_ERROR) where log.severity_text ==
"ERROR"
- set(log.severity_number, SEVERITY_NUMBER_FATAL) where log.severity_text ==
"CRITICAL"
receivers:
filelog/gunicorn:
include:
- /var/lib/pgadmin/logs/gunicorn.log
storage: file_storage/pgadmin_data_logs
filelog/pgadmin:
include:
- /var/lib/pgadmin/logs/pgadmin.log
storage: file_storage/pgadmin_data_logs
service:
extensions:
- file_storage/pgadmin_data_logs
Expand Down
Loading
Loading