diff --git a/api/observability/v1/input_types.go b/api/observability/v1/input_types.go index ccd844e65..f980837da 100644 --- a/api/observability/v1/input_types.go +++ b/api/observability/v1/input_types.go @@ -15,6 +15,7 @@ limitations under the License. package v1 import ( + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -96,6 +97,12 @@ type ContainerInputTuningSpec struct { // +kubebuilder:validation:Optional // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Per-Container Rate Limit" RateLimitPerContainer *LimitSpec `json:"rateLimitPerContainer,omitempty"` + + // MaxMessageSize The maximum message length in bytes that a single log event can be when all + // partial log lines are merged. Messages exceeding this limit are dropped. + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Message Size" + MaxMessageSize *resource.Quantity `json:"maxMessageSize,omitempty"` } // ApplicationSource defines the type of ApplicationSource log source to use. @@ -184,6 +191,16 @@ var ( } ) +// InfrastructureInputTuningSpec is the infrastructure input tuning spec, for now available only for container sources +type InfrastructureInputTuningSpec struct { + + // Container is the input tuning spec for container sources + // + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Input Tuning" + Container *ContainerInputTuningSpec `json:"container,omitempty"` +} + // Infrastructure enables infrastructure logs. // Sources of these logs: // * container workloads deployed to namespaces: default, kube*, openshift* @@ -195,6 +212,12 @@ type Infrastructure struct { // +kubebuilder:validation:Optional // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Log Sources" Sources []InfrastructureSource `json:"sources,omitempty"` + + // Tuning is the infrastructure input tuning spec, for now available only for container sources + // + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Container Input Tuning" + Tuning *InfrastructureInputTuningSpec `json:"tuning,omitempty"` } // AuditSource defines which type of audit log source is used. diff --git a/api/observability/v1/zz_generated.deepcopy.go b/api/observability/v1/zz_generated.deepcopy.go index 2c7993a43..d8cb37a56 100644 --- a/api/observability/v1/zz_generated.deepcopy.go +++ b/api/observability/v1/zz_generated.deepcopy.go @@ -483,6 +483,11 @@ func (in *ContainerInputTuningSpec) DeepCopyInto(out *ContainerInputTuningSpec) *out = new(LimitSpec) **out = **in } + if in.MaxMessageSize != nil { + in, out := &in.MaxMessageSize, &out.MaxMessageSize + x := (*in).DeepCopy() + *out = &x + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerInputTuningSpec. @@ -790,6 +795,11 @@ func (in *Infrastructure) DeepCopyInto(out *Infrastructure) { *out = make([]InfrastructureSource, len(*in)) copy(*out, *in) } + if in.Tuning != nil { + in, out := &in.Tuning, &out.Tuning + *out = new(InfrastructureInputTuningSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Infrastructure. @@ -802,6 +812,26 @@ func (in *Infrastructure) DeepCopy() *Infrastructure { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InfrastructureInputTuningSpec) DeepCopyInto(out *InfrastructureInputTuningSpec) { + *out = *in + if in.Container != nil { + in, out := &in.Container, &out.Container + *out = new(ContainerInputTuningSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InfrastructureInputTuningSpec. +func (in *InfrastructureInputTuningSpec) DeepCopy() *InfrastructureInputTuningSpec { + if in == nil { + return nil + } + out := new(InfrastructureInputTuningSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InputSpec) DeepCopyInto(out *InputSpec) { *out = *in diff --git a/bundle/manifests/cluster-logging.clusterserviceversion.yaml b/bundle/manifests/cluster-logging.clusterserviceversion.yaml index 77dee9f07..1d23e1be8 100644 --- a/bundle/manifests/cluster-logging.clusterserviceversion.yaml +++ b/bundle/manifests/cluster-logging.clusterserviceversion.yaml @@ -82,7 +82,7 @@ metadata: categories: OpenShift Optional, Logging & Tracing, Observability certified: "false" containerImage: quay.io/openshift-logging/cluster-logging-operator:latest - createdAt: "2025-11-13T11:03:18Z" + createdAt: "2025-12-12T18:19:58Z" description: The Red Hat OpenShift Logging Operator for OCP provides a means for configuring and managing log collection and forwarding. features.operators.openshift.io/cnf: "false" @@ -333,6 +333,11 @@ spec: sources displayName: Input Tuning path: inputs[0].application.tuning + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].application.tuning.maxMessageSize - description: |- RateLimitPerContainer is the limit applied to each container by this input. This limit is applied per collector deployment. @@ -361,6 +366,30 @@ spec: This field is optional and omission results in the collection of all infrastructure sources. displayName: Log Sources path: inputs[0].infrastructure.sources + - description: Tuning is the infrastructure input tuning spec, for now available + only for container sources + displayName: Container Input Tuning + path: inputs[0].infrastructure.tuning + - description: Container is the input tuning spec for container sources + displayName: Input Tuning + path: inputs[0].infrastructure.tuning.container + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].infrastructure.tuning.container.maxMessageSize + - description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + displayName: Per-Container Rate Limit + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer + - description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + displayName: Max Records Per Second + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer.maxRecordsPerSecond + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:number - description: Name used to refer to the input of a `pipeline`. displayName: Input Name path: inputs[0].name diff --git a/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml b/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml index 6e2e45091..995992e84 100644 --- a/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml +++ b/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml @@ -1524,6 +1524,15 @@ spec: description: Tuning is the container input tuning spec for this container sources properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true rateLimitPerContainer: description: |- RateLimitPerContainer is the limit applied to each container @@ -1575,6 +1584,41 @@ spec: - node type: string type: array + tuning: + description: Tuning is the infrastructure input tuning spec, + for now available only for container sources + properties: + container: + description: Container is the input tuning spec for + container sources + properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + rateLimitPerContainer: + description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + properties: + maxRecordsPerSecond: + description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + exclusiveMinimum: true + format: int64 + minimum: 0 + type: integer + required: + - maxRecordsPerSecond + type: object + type: object + type: object type: object name: description: Name used to refer to the input of a `pipeline`. diff --git a/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml b/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml index 73872f239..48bd49141 100644 --- a/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml +++ b/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml @@ -1524,6 +1524,15 @@ spec: description: Tuning is the container input tuning spec for this container sources properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true rateLimitPerContainer: description: |- RateLimitPerContainer is the limit applied to each container @@ -1575,6 +1584,41 @@ spec: - node type: string type: array + tuning: + description: Tuning is the infrastructure input tuning spec, + for now available only for container sources + properties: + container: + description: Container is the input tuning spec for + container sources + properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + rateLimitPerContainer: + description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + properties: + maxRecordsPerSecond: + description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + exclusiveMinimum: true + format: int64 + minimum: 0 + type: integer + required: + - maxRecordsPerSecond + type: object + type: object + type: object type: object name: description: Name used to refer to the input of a `pipeline`. diff --git a/config/manifests/bases/cluster-logging.clusterserviceversion.yaml b/config/manifests/bases/cluster-logging.clusterserviceversion.yaml index 18407f7e9..d87a856f1 100644 --- a/config/manifests/bases/cluster-logging.clusterserviceversion.yaml +++ b/config/manifests/bases/cluster-logging.clusterserviceversion.yaml @@ -256,6 +256,11 @@ spec: sources displayName: Input Tuning path: inputs[0].application.tuning + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].application.tuning.maxMessageSize - description: |- RateLimitPerContainer is the limit applied to each container by this input. This limit is applied per collector deployment. @@ -284,6 +289,30 @@ spec: This field is optional and omission results in the collection of all infrastructure sources. displayName: Log Sources path: inputs[0].infrastructure.sources + - description: Tuning is the infrastructure input tuning spec, for now available + only for container sources + displayName: Container Input Tuning + path: inputs[0].infrastructure.tuning + - description: Container is the input tuning spec for container sources + displayName: Input Tuning + path: inputs[0].infrastructure.tuning.container + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].infrastructure.tuning.container.maxMessageSize + - description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + displayName: Per-Container Rate Limit + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer + - description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + displayName: Max Records Per Second + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer.maxRecordsPerSecond + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:number - description: Name used to refer to the input of a `pipeline`. displayName: Input Name path: inputs[0].name diff --git a/docs/reference/operator/api_logging_v1alpha1.adoc b/docs/reference/operator/api_logging_v1alpha1.adoc index 12bb4110c..c28e6f326 100644 --- a/docs/reference/operator/api_logging_v1alpha1.adoc +++ b/docs/reference/operator/api_logging_v1alpha1.adoc @@ -37,11 +37,24 @@ Type:: object |====================== |Property|Type|Description +|networkPolicy|object| *(optional)* Define the Network Policy for the LogFileMetricExporter |nodeSelector|object| *(optional)* Define which Nodes the Pods are scheduled on. |resources|object| *(optional)* The resource requirements for the LogFileMetricExporter |tolerations|array| *(optional)* Define the tolerations the Pods will accept |====================== +=== .spec.networkPolicy + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|ruleSet|string| NetworkPolicyRuleSetType is the type of network policy rule set to use + +|====================== + === .spec.nodeSelector Type:: object diff --git a/docs/reference/operator/api_observability_v1.adoc b/docs/reference/operator/api_observability_v1.adoc index a4d6449d5..878eb805f 100644 --- a/docs/reference/operator/api_observability_v1.adoc +++ b/docs/reference/operator/api_observability_v1.adoc @@ -1427,11 +1427,79 @@ Type:: object |====================== |Property|Type|Description +|maxMessageSize|object| MaxMessageSize The maximum message length in bytes that a single log event can be when all +partial log lines are merged. Messages exceeding this limit are dropped. |rateLimitPerContainer|object| RateLimitPerContainer is the limit applied to each container by this input. This limit is applied per collector deployment. |====================== +=== .spec.inputs[].application.tuning.maxMessageSize + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|Format|string| Change Format at will. See the comment for Canonicalize for +more details. +|d|object| d is the quantity in inf.Dec form if d.Dec != nil +|i|int| i is the quantity in int64 scaled form, if d.Dec == nil +|s|string| s is the generated value of this quantity to avoid recalculation +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|Dec|object| +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d.Dec + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|scale|int| +|unscaled|object| +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d.Dec.unscaled + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|abs|Word| sign +|neg|bool| +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d.Dec.unscaled.abs + +Type:: Word + +=== .spec.inputs[].application.tuning.maxMessageSize.i + +Type:: int + +[options="header"] +|====================== +|Property|Type|Description + +|scale|int| +|value|int| +|====================== + === .spec.inputs[].application.tuning.rateLimitPerContainer Type:: object diff --git a/hack/generate-bundle.sh b/hack/generate-bundle.sh index 544439509..a5ec6ca93 100755 --- a/hack/generate-bundle.sh +++ b/hack/generate-bundle.sh @@ -1,4 +1,4 @@ -#!/usr/bin/bash +#!/usr/bin/env bash source .bingo/variables.env diff --git a/hack/revert-bundle.sh b/hack/revert-bundle.sh index f77a7ea6a..0f8af90b9 100755 --- a/hack/revert-bundle.sh +++ b/hack/revert-bundle.sh @@ -1,4 +1,4 @@ -#!/usr/bin/bash +#!/usr/bin/env bash git diff --no-ext-diff --quiet -I'^ createdAt: ' bundle/manifests/cluster-logging.clusterserviceversion.yaml if ((! $?)) ; then diff --git a/hack/run-linter b/hack/run-linter index 4ba44d57c..09973cf8a 100755 --- a/hack/run-linter +++ b/hack/run-linter @@ -1,4 +1,4 @@ -#!/usr/bin/sh +#!/usr/bin/env bash set -euo pipefail diff --git a/internal/generator/vector/input/application_with_max_merge_line_size.toml b/internal/generator/vector/input/application_with_max_merge_line_size.toml new file mode 100644 index 000000000..01683d5ea --- /dev/null +++ b/internal/generator/vector/input/application_with_max_merge_line_size.toml @@ -0,0 +1,133 @@ +# Logs from containers (including openshift containers) +[sources.input_my_app_container] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +max_merged_line_bytes = 1048576 +exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/default_*/*/*.log", "/var/log/pods/kube*_*/*/*.log", "/var/log/pods/openshift*_*/*/*.log"] +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_secs = 5 +use_apiserver_cache = true + +[transforms.input_my_app_container_meta] +type = "remap" +inputs = ["input_my_app_container"] +source = ''' + . = {"_internal": .} + if exists(._internal.stream) {._internal.kubernetes.container_iostream = ._internal.stream} + ._internal.log_source = "container" + # If namespace is infra, label log_type as infra + if match_any(string!(._internal.kubernetes.namespace_name), [r'^default$', r'^openshift(-.+)?$', r'^kube(-.+)?$']) { + ._internal.log_type = "infrastructure" + } else { + ._internal.log_type = "application" + } + + ._internal.hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" + ._internal.openshift = { "cluster_id": "${OPENSHIFT_CLUSTER_ID:-}"} + ._internal.openshift.sequence = to_unix_timestamp(now(), unit: "nanoseconds") + + if !exists(._internal.level) { + level = null + message = ._internal.message + + # attempt 1: parse as logfmt (e.g. level=error msg="Failed to connect") + + parsed_logfmt, err = parse_logfmt(message) + if err == null && is_string(parsed_logfmt.level) { + level = downcase!(parsed_logfmt.level) + } + + # attempt 2: parse as klog (e.g. I0920 14:22:00.089385 1 scheduler.go:592] "Successfully bound pod to node") + if level == null { + parsed_klog, err = parse_klog(message) + if err == null && is_string(parsed_klog.level) { + level = parsed_klog.level + } + } + + # attempt 3: parse with groks template (if previous attempts failed) for classic text logs like Logback, Log4j etc. + + if level == null { + parsed_grok, err = parse_groks(message, + patterns: [ + "%{common_prefix} %{_message}" + ], + aliases: { + "common_prefix": "%{_timestamp} %{_loglevel}", + "_timestamp": "%{TIMESTAMP_ISO8601:timestamp}", + "_loglevel": "%{LOGLEVEL:level}", + "_message": "%{GREEDYDATA:message}" + } + ) + + if err == null && is_string(parsed_grok.level) { + level = downcase!(parsed_grok.level) + } + } + + if level == null { + level = "default" + + # attempt 4: Match on well known structured patterns + # Order: emergency, alert, critical, error, warn, notice, info, debug, trace + + if match!(message, r'^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"') { + level = "emergency" + } else if match!(message, r'^A[0-9]+|level=alert|Value:alert|"level":"alert"') { + level = "alert" + } else if match!(message, r'^C[0-9]+|level=critical|Value:critical|"level":"critical"') { + level = "critical" + } else if match!(message, r'^E[0-9]+|level=error|Value:error|"level":"error"') { + level = "error" + } else if match!(message, r'^W[0-9]+|level=warn|Value:warn|"level":"warn"') { + level = "warn" + } else if match!(message, r'^N[0-9]+|level=notice|Value:notice|"level":"notice"') { + level = "notice" + } else if match!(message, r'^I[0-9]+|level=info|Value:info|"level":"info"') { + level = "info" + } else if match!(message, r'^D[0-9]+|level=debug|Value:debug|"level":"debug"') { + level = "debug" + } else if match!(message, r'^T[0-9]+|level=trace|Value:trace|"level":"trace"') { + level = "trace" + } + + # attempt 5: Match on the keyword that appears earliest in the message + + + if level == "default" { + level_patterns = r'(?i)(?emergency|)|(?alert|)|(?critical|)|(?error|)|(?warn(?:ing)?|)|(?notice|)|(?:\b(?info)\b|)|(?debug|)|(?trace|)' + parsed, err = parse_regex(message, level_patterns) + if err == null { + if is_string(parsed.emergency) { + level = "emergency" + } else if is_string(parsed.alert) { + level = "alert" + } else if is_string(parsed.critical) { + level = "critical" + } else if is_string(parsed.error) { + level = "error" + } else if is_string(parsed.warn) { + level = "warn" + } else if is_string(parsed.notice) { + level = "notice" + } else if is_string(parsed.info) { + level = "info" + } else if is_string(parsed.debug) { + level = "debug" + } else if is_string(parsed.trace) { + level = "trace" + } + } + } + } + ._internal.level = level +} + +''' diff --git a/internal/generator/vector/input/container.go b/internal/generator/vector/input/container.go index 189711df7..1d217d465 100644 --- a/internal/generator/vector/input/container.go +++ b/internal/generator/vector/input/container.go @@ -12,6 +12,7 @@ import ( "github.com/openshift/cluster-logging-operator/internal/generator/vector/source" "github.com/openshift/cluster-logging-operator/internal/utils/sets" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/set" ) const ( @@ -41,19 +42,29 @@ var ( func NewContainerSource(spec obs.InputSpec, namespace, includes, excludes string, logType obs.InputType, logSource interface{}) ([]framework.Element, []string) { base := helpers.MakeInputID(spec.Name, "container") var selector *metav1.LabelSelector + maxMsgSize := int64(0) if spec.Application != nil { selector = spec.Application.Selector + if spec.Application.Tuning != nil && spec.Application.Tuning.MaxMessageSize != nil { + if size, ok := spec.Application.Tuning.MaxMessageSize.AsInt64(); ok { + maxMsgSize = size + } + } + } + if spec.Infrastructure != nil { + if (len(spec.Infrastructure.Sources) == 0 || set.New(spec.Infrastructure.Sources...).Has(obs.InfrastructureSourceContainer)) && + spec.Infrastructure.Tuning != nil && spec.Infrastructure.Tuning.Container != nil && spec.Infrastructure.Tuning.Container.MaxMessageSize != nil { + if size, ok := spec.Infrastructure.Tuning.Container.MaxMessageSize.AsInt64(); ok { + maxMsgSize = size + } + } } + metaID := helpers.MakeID(base, "meta") + k8sLogs := source.NewKubernetesLogs(base, includes, excludes, maxMsgSize) + k8sLogs.ExtraLabelSelector = source.LabelSelectorFrom(selector) el := []framework.Element{ - source.KubernetesLogs{ - ComponentID: base, - Desc: "Logs from containers (including openshift containers)", - IncludePaths: includes, - ExcludePaths: excludes, - ExtraLabelSelector: source.LabelSelectorFrom(selector), - UseKubeCache: true, - }, + k8sLogs, NewInternalNormalization(metaID, logSource, logType, base), } inputID := metaID diff --git a/internal/generator/vector/input/infrastructure_container_with_max_merge_line_size.toml b/internal/generator/vector/input/infrastructure_container_with_max_merge_line_size.toml new file mode 100644 index 000000000..ab479530c --- /dev/null +++ b/internal/generator/vector/input/infrastructure_container_with_max_merge_line_size.toml @@ -0,0 +1,133 @@ +# Logs from containers (including openshift containers) +[sources.input_myinfra_container] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +max_merged_line_bytes = 1000000 +include_paths_glob_patterns = ["/var/log/pods/default_*/*/*.log", "/var/log/pods/kube*_*/*/*.log", "/var/log/pods/openshift*_*/*/*.log"] +exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log"] +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_secs = 5 +use_apiserver_cache = true + +[transforms.input_myinfra_container_meta] +type = "remap" +inputs = ["input_myinfra_container"] +source = ''' + . = {"_internal": .} + if exists(._internal.stream) {._internal.kubernetes.container_iostream = ._internal.stream} + ._internal.log_source = "container" + # If namespace is infra, label log_type as infra + if match_any(string!(._internal.kubernetes.namespace_name), [r'^default$', r'^openshift(-.+)?$', r'^kube(-.+)?$']) { + ._internal.log_type = "infrastructure" + } else { + ._internal.log_type = "application" + } + ._internal.hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" + ._internal.openshift = { "cluster_id": "${OPENSHIFT_CLUSTER_ID:-}"} + ._internal.openshift.sequence = to_unix_timestamp(now(), unit: "nanoseconds") + + if !exists(._internal.level) { + level = null + message = ._internal.message + + # attempt 1: parse as logfmt (e.g. level=error msg="Failed to connect") + + parsed_logfmt, err = parse_logfmt(message) + if err == null && is_string(parsed_logfmt.level) { + level = downcase!(parsed_logfmt.level) + } + + # attempt 2: parse as klog (e.g. I0920 14:22:00.089385 1 scheduler.go:592] "Successfully bound pod to node") + if level == null { + parsed_klog, err = parse_klog(message) + if err == null && is_string(parsed_klog.level) { + level = parsed_klog.level + } + } + + # attempt 3: parse with groks template (if previous attempts failed) for classic text logs like Logback, Log4j etc. + + if level == null { + parsed_grok, err = parse_groks(message, + patterns: [ + "%{common_prefix} %{_message}" + ], + aliases: { + "common_prefix": "%{_timestamp} %{_loglevel}", + "_timestamp": "%{TIMESTAMP_ISO8601:timestamp}", + "_loglevel": "%{LOGLEVEL:level}", + "_message": "%{GREEDYDATA:message}" + } + ) + + if err == null && is_string(parsed_grok.level) { + level = downcase!(parsed_grok.level) + } + } + + if level == null { + level = "default" + + # attempt 4: Match on well known structured patterns + # Order: emergency, alert, critical, error, warn, notice, info, debug, trace + + if match!(message, r'^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"') { + level = "emergency" + } else if match!(message, r'^A[0-9]+|level=alert|Value:alert|"level":"alert"') { + level = "alert" + } else if match!(message, r'^C[0-9]+|level=critical|Value:critical|"level":"critical"') { + level = "critical" + } else if match!(message, r'^E[0-9]+|level=error|Value:error|"level":"error"') { + level = "error" + } else if match!(message, r'^W[0-9]+|level=warn|Value:warn|"level":"warn"') { + level = "warn" + } else if match!(message, r'^N[0-9]+|level=notice|Value:notice|"level":"notice"') { + level = "notice" + } else if match!(message, r'^I[0-9]+|level=info|Value:info|"level":"info"') { + level = "info" + } else if match!(message, r'^D[0-9]+|level=debug|Value:debug|"level":"debug"') { + level = "debug" + } else if match!(message, r'^T[0-9]+|level=trace|Value:trace|"level":"trace"') { + level = "trace" + } + + # attempt 5: Match on the keyword that appears earliest in the message + + + if level == "default" { + level_patterns = r'(?i)(?emergency|)|(?alert|)|(?critical|)|(?error|)|(?warn(?:ing)?|)|(?notice|)|(?:\b(?info)\b|)|(?debug|)|(?trace|)' + parsed, err = parse_regex(message, level_patterns) + if err == null { + if is_string(parsed.emergency) { + level = "emergency" + } else if is_string(parsed.alert) { + level = "alert" + } else if is_string(parsed.critical) { + level = "critical" + } else if is_string(parsed.error) { + level = "error" + } else if is_string(parsed.warn) { + level = "warn" + } else if is_string(parsed.notice) { + level = "notice" + } else if is_string(parsed.info) { + level = "info" + } else if is_string(parsed.debug) { + level = "debug" + } else if is_string(parsed.trace) { + level = "trace" + } + } + } + } + ._internal.level = level +} + +''' \ No newline at end of file diff --git a/internal/generator/vector/input/source_test.go b/internal/generator/vector/input/source_test.go index 265216383..03325a22f 100644 --- a/internal/generator/vector/input/source_test.go +++ b/internal/generator/vector/input/source_test.go @@ -2,8 +2,11 @@ package input import ( "fmt" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" . "github.com/onsi/ginkgo/v2" @@ -394,5 +397,30 @@ var _ = Describe("inputs", func() { }, "receiver_syslog_tls_from_configmap.toml", ), + Entry("application input with a MaxMessageSize", obs.InputSpec{ + Name: "my_app", + Type: obs.InputTypeApplication, + Application: &obs.Application{ + Tuning: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1Mi")), + }, + }, + }, + "application_with_max_merge_line_size.toml", + ), + Entry("infrastructure input with containers source and MaxMessageSize", obs.InputSpec{ + Name: "myinfra", + Type: obs.InputTypeInfrastructure, + Infrastructure: &obs.Infrastructure{ + Sources: []obs.InfrastructureSource{obs.InfrastructureSourceContainer}, + Tuning: &obs.InfrastructureInputTuningSpec{ + Container: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1M")), + }, + }, + }, + }, + "infrastructure_container_with_max_merge_line_size.toml", + ), ) }) diff --git a/internal/generator/vector/source/kubernetes_logs.go b/internal/generator/vector/source/kubernetes_logs.go index aa54f2e88..870e69b7f 100644 --- a/internal/generator/vector/source/kubernetes_logs.go +++ b/internal/generator/vector/source/kubernetes_logs.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/openshift/cluster-logging-operator/internal/generator/framework" + "github.com/openshift/cluster-logging-operator/internal/generator/helpers" "github.com/openshift/cluster-logging-operator/internal/utils/sets" ) @@ -17,6 +18,7 @@ type KubernetesLogs struct { ExcludePaths string ExtraLabelSelector string UseKubeCache bool + MaxMergedLineBytes helpers.OptionalPair } func (kl KubernetesLogs) Name() string { @@ -31,6 +33,7 @@ type = "kubernetes_logs" max_read_bytes = 3145728 glob_minimum_cooldown_ms = 15000 auto_partial_merge = true +{{ .MaxMergedLineBytes }} {{- if gt (len .IncludePaths) 0}} include_paths_glob_patterns = {{.IncludePaths}} {{- end}} @@ -52,14 +55,18 @@ use_apiserver_cache = {{.UseKubeCache}} } // NewKubernetesLogs element which always excludes temp and gzip files -func NewKubernetesLogs(id, includes, excludes string) KubernetesLogs { - return KubernetesLogs{ +func NewKubernetesLogs(id, includes, excludes string, maxMergedLineBytes int64) KubernetesLogs { + logs := KubernetesLogs{ ComponentID: id, Desc: "Logs from containers (including openshift containers)", IncludePaths: includes, ExcludePaths: excludes, UseKubeCache: true, } + if maxMergedLineBytes > 0 { + logs.MaxMergedLineBytes = helpers.NewOptionalPair("max_merged_line_bytes", maxMergedLineBytes) + } + return logs } const ( diff --git a/internal/generator/vector/source/kubernetes_logs_max_merge_line_size.toml b/internal/generator/vector/source/kubernetes_logs_max_merge_line_size.toml new file mode 100644 index 000000000..339b7da24 --- /dev/null +++ b/internal/generator/vector/source/kubernetes_logs_max_merge_line_size.toml @@ -0,0 +1,15 @@ +# Logs from containers (including openshift containers) +[sources.source_foo] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +max_merged_line_bytes = 1000000 +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_secs = 5 +use_apiserver_cache = true diff --git a/internal/generator/vector/source/kubernetes_logs_test.go b/internal/generator/vector/source/kubernetes_logs_test.go index 3bbd9fb10..b073dbf3f 100644 --- a/internal/generator/vector/source/kubernetes_logs_test.go +++ b/internal/generator/vector/source/kubernetes_logs_test.go @@ -2,6 +2,7 @@ package source import ( "fmt" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/openshift/cluster-logging-operator/internal/generator/vector/helpers" @@ -9,25 +10,33 @@ import ( ) var _ = Describe("source", func() { - DescribeTable("#NewKubernetesLogs", func(includes, excludes string, expFile string) { + DescribeTable("#NewKubernetesLogs", func(includes, excludes string, maxMergedLineBytes int64, expFile string) { exp, err := tomlContent.ReadFile(expFile) if err != nil { Fail(fmt.Sprintf("Error reading the file %q with exp config: %v", expFile, err)) } id := helpers.MakeID("source", "foo") - conf := NewKubernetesLogs(id, includes, excludes) //, includeNS, excludes) + conf := NewKubernetesLogs(id, includes, excludes, maxMergedLineBytes) //, includeNS, excludes) Expect(string(exp)).To(EqualConfigFrom(conf), fmt.Sprintf("for exp. file %s", expFile)) }, Entry("should exclude includes/excludes globs from the config when they are empty", "", "", + int64(0), "kubernetes_logs_no_includes_excludes.toml", ), Entry("should use includes/excludes globs from the config when they exist", `["/var/log/pods/foo"]`, `["/var/log/pods/bar"]`, + int64(0), "kubernetes_logs_with_includes.toml", ), + Entry("should set correct value for max_merge_line_size", + "", + "", + int64(1000_000), + "kubernetes_logs_max_merge_line_size.toml", + ), ) DescribeTable("#normalizeNamespace", func(ns, exp string) { diff --git a/internal/validations/observability/inputs/infrastructure.go b/internal/validations/observability/inputs/infrastructure.go index e34ed02b7..f57f378a1 100644 --- a/internal/validations/observability/inputs/infrastructure.go +++ b/internal/validations/observability/inputs/infrastructure.go @@ -2,9 +2,12 @@ package inputs import ( "fmt" + "strings" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" . "github.com/openshift/cluster-logging-operator/internal/api/observability" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/set" ) func ValidateInfrastructure(spec obs.InputSpec) []metav1.Condition { @@ -22,6 +25,18 @@ func ValidateInfrastructure(spec obs.InputSpec) []metav1.Condition { NewConditionFromPrefix(obs.ConditionTypeValidInputPrefix, spec.Name, false, obs.ReasonValidationFailure, fmt.Sprintf("%s must define at least one valid source", spec.Name)), } } + if !set.New(spec.Infrastructure.Sources...).Has(obs.InfrastructureSourceContainer) && spec.Infrastructure.Tuning != nil && + spec.Infrastructure.Tuning.Container != nil && spec.Infrastructure.Tuning.Container.MaxMessageSize != nil { + sources := make([]string, len(spec.Infrastructure.Sources)) + for i, s := range spec.Infrastructure.Sources { + sources[i] = string(s) + } + strSources := strings.Join(sources, ",") + + return []metav1.Condition{ + NewConditionFromPrefix(obs.ConditionTypeValidInputPrefix, spec.Name, false, obs.ReasonValidationFailure, fmt.Sprintf("%s tuning section available only for \"container\" source type, but found %s", spec.Name, strSources)), + } + } return []metav1.Condition{ NewConditionFromPrefix(obs.ConditionTypeValidInputPrefix, spec.Name, true, obs.ReasonValidationSuccess, fmt.Sprintf("input %q is valid", spec.Name)), } diff --git a/internal/validations/observability/inputs/infrastructure_test.go b/internal/validations/observability/inputs/infrastructure_test.go index 1ae7fc094..3c8547e3e 100644 --- a/internal/validations/observability/inputs/infrastructure_test.go +++ b/internal/validations/observability/inputs/infrastructure_test.go @@ -4,7 +4,9 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" . "github.com/openshift/cluster-logging-operator/test/matchers" + "k8s.io/apimachinery/pkg/api/resource" ) var _ = Describe("#ValidateInfrastructure", func() { @@ -41,4 +43,26 @@ var _ = Describe("#ValidateInfrastructure", func() { input.Infrastructure.Sources = []obs.InfrastructureSource{} Expect(ValidateInfrastructure(input)).To(HaveCondition(expConditionTypeRE, false, obs.ReasonValidationFailure, "must define at least one valid source")) }) + It("should pass for valid infrastructure input with container source and MaxMessageSize", func() { + input.Infrastructure = &obs.Infrastructure{ + Sources: []obs.InfrastructureSource{obs.InfrastructureSourceContainer}, + Tuning: &obs.InfrastructureInputTuningSpec{ + Container: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1Mi")), + }, + }, + } + Expect(ValidateInfrastructure(input)).To(HaveCondition(expConditionTypeRE, true, obs.ReasonValidationSuccess, `input.*is valid`)) + }) + It("should fail for valid infrastructure input with node source and MaxMessageSiz", func() { + input.Infrastructure = &obs.Infrastructure{ + Sources: []obs.InfrastructureSource{obs.InfrastructureSourceNode}, + Tuning: &obs.InfrastructureInputTuningSpec{ + Container: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1Mi")), + }, + }, + } + Expect(ValidateInfrastructure(input)).To(HaveCondition(expConditionTypeRE, false, obs.ReasonValidationFailure, "tuning section available only for \"container\" source type, but found node")) + }) }) diff --git a/test/functional/outputs/splunk/forward_to_splunk_test.go b/test/functional/outputs/splunk/forward_to_splunk_test.go index a0df1d2d6..49c740c78 100644 --- a/test/functional/outputs/splunk/forward_to_splunk_test.go +++ b/test/functional/outputs/splunk/forward_to_splunk_test.go @@ -5,9 +5,11 @@ import ( "time" obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" "github.com/openshift/cluster-logging-operator/test/helpers/splunk" "github.com/openshift/cluster-logging-operator/test/helpers/types" obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" + "k8s.io/apimachinery/pkg/api/resource" "strings" @@ -74,6 +76,53 @@ var _ = Describe("Forwarding to Splunk", func() { Expect(outputTestLog.LogType).To(Equal(string(obs.InputTypeApplication))) }) + It("should not send application logs more than 64Ki", func() { + obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeApplication, func(spec *obs.InputSpec) { + spec.Name = "small-msg-app" + spec.Application.Tuning = &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("64Ki")), + } + }). + ToSplunkOutput(hecSecretKey, func(output *obs.OutputSpec) { + output.Splunk.Index = "main" + }) + framework.Secrets = append(framework.Secrets, secret) + Expect(framework.Deploy()).To(BeNil()) + + // Wait for splunk to be ready + splunk.WaitOnSplunk(framework) + + // Write app logs + timestamp := "2020-11-04T18:13:59.061892+00:00" + applicationLogLine := functional.NewCRIOLogMessage(timestamp, "This is my test message", false) + Expect(framework.WriteMessagesToApplicationLog(applicationLogLine, 1)).To(BeNil()) + + // Write large app logs + Expect(framework.WriteApplicationLogOfSizeAsPartials(65 * 1024)).To(BeNil()) + + //one more normal app log + Expect(framework.WriteMessagesToApplicationLog(applicationLogLine, 1)).To(BeNil()) + + // Parse the logs + var appLogs []types.ApplicationLog + logs, err := framework.ReadLogsByTypeFromSplunk(string(obs.InputTypeApplication)) + Expect(err).To(BeNil()) + jsonString := fmt.Sprintf("[%s]", strings.Join(logs, ",")) + err = types.ParseLogsFrom(jsonString, &appLogs, false) + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + + Expect(appLogs).To(HaveLen(2)) + collectorLog, err := framework.ReadCollectorLogs() + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + Expect(collectorLog).To( + And( + ContainSubstring("Found line that exceeds max_merged_line_bytes; discarding."), + ContainSubstring("configured_limit=65536"), + ), + ) + }) + It("should accept audit logs without timestamp unexpected type warning (see: https://issues.redhat.com/browse/LOG-4672)", func() { obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder). FromInput(obs.InputTypeAudit). diff --git a/test/helpers/splunk/splunk.go b/test/helpers/splunk/splunk.go index f7f0dcaf1..e0ae3c419 100644 --- a/test/helpers/splunk/splunk.go +++ b/test/helpers/splunk/splunk.go @@ -3,8 +3,8 @@ package splunk import ( "time" - "github.com/openshift/cluster-logging-operator/test/framework/functional" . "github.com/onsi/gomega" + "github.com/openshift/cluster-logging-operator/test/framework/functional" ) // WaitOnSplunk waits for Splunk to be ready by checking HEC health and service status @@ -30,4 +30,4 @@ func WaitOnSplunk(f *functional.CollectorFunctionalFramework) { ContainSubstring("splunkd is running"), ContainSubstring("splunk helpers are running"), )) -} \ No newline at end of file +}