diff --git a/Makefile b/Makefile index e086eaf0b6..ac7c17a158 100644 --- a/Makefile +++ b/Makefile @@ -232,7 +232,7 @@ test-functional-benchmarker-vector: bin/functional-benchmarker @out=$$(RELATED_IMAGE_VECTOR=$(IMAGE_LOGGING_VECTOR) bin/functional-benchmarker --image=$(IMAGE_LOGGING_VECTOR) --artifact-dir=/tmp/benchmark-test-vector 2>&1); if [ "$$?" != "0" ] ; then echo "$$out"; exit 1; fi .PHONY: test-unit -test-unit: test-forwarder-generator test-unit-api +test-unit: test-unit-api RELATED_IMAGE_VECTOR=$(IMAGE_LOGGING_VECTOR) \ RELATED_IMAGE_LOG_FILE_METRIC_EXPORTER=$(IMAGE_LOGFILEMETRICEXPORTER) \ go test -coverprofile=test.cov -race ./api/... ./internal/... `go list ./test/... | grep -Ev 'test/(e2e|functional|framework|client|helpers)'` diff --git a/api/observability/v1/input_types.go b/api/observability/v1/input_types.go index 509abfbe2b..5ca67f336f 100644 --- a/api/observability/v1/input_types.go +++ b/api/observability/v1/input_types.go @@ -15,6 +15,7 @@ limitations under the License. package v1 import ( + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -96,6 +97,12 @@ type ContainerInputTuningSpec struct { // +kubebuilder:validation:Optional // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Per-Container Rate Limit" RateLimitPerContainer *LimitSpec `json:"rateLimitPerContainer,omitempty"` + + // MaxMessageSize The maximum message length in bytes that a single log event can be when all + // partial log lines are merged. Messages exceeding this limit are dropped. + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Message Size" + MaxMessageSize *resource.Quantity `json:"maxMessageSize,omitempty"` } // ApplicationSource defines the type of ApplicationSource log source to use. @@ -184,6 +191,16 @@ var ( } ) +// InfrastructureInputTuningSpec is the infrastructure input tuning spec, for now available only for container sources +type InfrastructureInputTuningSpec struct { + + // Container is the input tuning spec for container sources + // + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Input Tuning" + Container *ContainerInputTuningSpec `json:"container,omitempty"` +} + // Infrastructure enables infrastructure logs. // Sources of these logs: // * container workloads deployed to namespaces: default, kube*, openshift* @@ -195,6 +212,12 @@ type Infrastructure struct { // +kubebuilder:validation:Optional // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Log Sources" Sources []InfrastructureSource `json:"sources,omitempty"` + + // Tuning is the infrastructure input tuning spec, for now available only for container sources + // + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Container Input Tuning" + Tuning *InfrastructureInputTuningSpec `json:"tuning,omitempty"` } // AuditSource defines which type of audit log source is used. diff --git a/api/observability/v1/zz_generated.deepcopy.go b/api/observability/v1/zz_generated.deepcopy.go index d9e786f9a9..47ee5519f5 100644 --- a/api/observability/v1/zz_generated.deepcopy.go +++ b/api/observability/v1/zz_generated.deepcopy.go @@ -478,6 +478,11 @@ func (in *ContainerInputTuningSpec) DeepCopyInto(out *ContainerInputTuningSpec) *out = new(LimitSpec) **out = **in } + if in.MaxMessageSize != nil { + in, out := &in.MaxMessageSize, &out.MaxMessageSize + x := (*in).DeepCopy() + *out = &x + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerInputTuningSpec. @@ -785,6 +790,11 @@ func (in *Infrastructure) DeepCopyInto(out *Infrastructure) { *out = make([]InfrastructureSource, len(*in)) copy(*out, *in) } + if in.Tuning != nil { + in, out := &in.Tuning, &out.Tuning + *out = new(InfrastructureInputTuningSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Infrastructure. @@ -797,6 +807,26 @@ func (in *Infrastructure) DeepCopy() *Infrastructure { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InfrastructureInputTuningSpec) DeepCopyInto(out *InfrastructureInputTuningSpec) { + *out = *in + if in.Container != nil { + in, out := &in.Container, &out.Container + *out = new(ContainerInputTuningSpec) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InfrastructureInputTuningSpec. +func (in *InfrastructureInputTuningSpec) DeepCopy() *InfrastructureInputTuningSpec { + if in == nil { + return nil + } + out := new(InfrastructureInputTuningSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InputSpec) DeepCopyInto(out *InputSpec) { *out = *in diff --git a/bundle/manifests/cluster-logging.clusterserviceversion.yaml b/bundle/manifests/cluster-logging.clusterserviceversion.yaml index b6cd803f61..755effb9f3 100644 --- a/bundle/manifests/cluster-logging.clusterserviceversion.yaml +++ b/bundle/manifests/cluster-logging.clusterserviceversion.yaml @@ -82,7 +82,7 @@ metadata: categories: OpenShift Optional, Logging & Tracing certified: "false" containerImage: quay.io/openshift-logging/cluster-logging-operator:latest - createdAt: "2025-07-29T15:25:15Z" + createdAt: "2025-12-12T17:14:22Z" description: The Red Hat OpenShift Logging Operator for OCP provides a means for configuring and managing log collection and forwarding. features.operators.openshift.io/cnf: "false" @@ -357,6 +357,11 @@ spec: sources displayName: Input Tuning path: inputs[0].application.tuning + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].application.tuning.maxMessageSize - description: |- RateLimitPerContainer is the limit applied to each container by this input. This limit is applied per collector deployment. @@ -385,6 +390,30 @@ spec: This field is optional and omission results in the collection of all infrastructure sources. displayName: Log Sources path: inputs[0].infrastructure.sources + - description: Tuning is the infrastructure input tuning spec, for now available + only for container sources + displayName: Container Input Tuning + path: inputs[0].infrastructure.tuning + - description: Container is the input tuning spec for container sources + displayName: Input Tuning + path: inputs[0].infrastructure.tuning.container + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].infrastructure.tuning.container.maxMessageSize + - description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + displayName: Per-Container Rate Limit + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer + - description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + displayName: Max Records Per Second + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer.maxRecordsPerSecond + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:number - description: Name used to refer to the input of a `pipeline`. displayName: Input Name path: inputs[0].name diff --git a/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml b/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml index 0486b41d0a..561a2cd4de 100644 --- a/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml +++ b/bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml @@ -629,6 +629,15 @@ spec: description: Tuning is the container input tuning spec for this container sources properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true rateLimitPerContainer: description: |- RateLimitPerContainer is the limit applied to each container @@ -680,6 +689,41 @@ spec: - node type: string type: array + tuning: + description: Tuning is the infrastructure input tuning spec, + for now available only for container sources + properties: + container: + description: Container is the input tuning spec for + container sources + properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + rateLimitPerContainer: + description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + properties: + maxRecordsPerSecond: + description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + exclusiveMinimum: true + format: int64 + minimum: 0 + type: integer + required: + - maxRecordsPerSecond + type: object + type: object + type: object type: object name: description: Name used to refer to the input of a `pipeline`. diff --git a/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml b/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml index 1e73d95258..f848901070 100644 --- a/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml +++ b/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml @@ -629,6 +629,15 @@ spec: description: Tuning is the container input tuning spec for this container sources properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true rateLimitPerContainer: description: |- RateLimitPerContainer is the limit applied to each container @@ -680,6 +689,41 @@ spec: - node type: string type: array + tuning: + description: Tuning is the infrastructure input tuning spec, + for now available only for container sources + properties: + container: + description: Container is the input tuning spec for + container sources + properties: + maxMessageSize: + anyOf: + - type: integer + - type: string + description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + rateLimitPerContainer: + description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + properties: + maxRecordsPerSecond: + description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + exclusiveMinimum: true + format: int64 + minimum: 0 + type: integer + required: + - maxRecordsPerSecond + type: object + type: object + type: object type: object name: description: Name used to refer to the input of a `pipeline`. diff --git a/config/manifests/bases/cluster-logging.clusterserviceversion.yaml b/config/manifests/bases/cluster-logging.clusterserviceversion.yaml index f3ff9e3ff1..cc9290f444 100644 --- a/config/manifests/bases/cluster-logging.clusterserviceversion.yaml +++ b/config/manifests/bases/cluster-logging.clusterserviceversion.yaml @@ -280,6 +280,11 @@ spec: sources displayName: Input Tuning path: inputs[0].application.tuning + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].application.tuning.maxMessageSize - description: |- RateLimitPerContainer is the limit applied to each container by this input. This limit is applied per collector deployment. @@ -308,6 +313,30 @@ spec: This field is optional and omission results in the collection of all infrastructure sources. displayName: Log Sources path: inputs[0].infrastructure.sources + - description: Tuning is the infrastructure input tuning spec, for now available + only for container sources + displayName: Container Input Tuning + path: inputs[0].infrastructure.tuning + - description: Container is the input tuning spec for container sources + displayName: Input Tuning + path: inputs[0].infrastructure.tuning.container + - description: |- + MaxMessageSize The maximum message length in bytes that a single log event can be when all + partial log lines are merged. Messages exceeding this limit are dropped. + displayName: Max Message Size + path: inputs[0].infrastructure.tuning.container.maxMessageSize + - description: |- + RateLimitPerContainer is the limit applied to each container + by this input. This limit is applied per collector deployment. + displayName: Per-Container Rate Limit + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer + - description: |- + MaxRecordsPerSecond is the maximum number of log records + allowed per input/output in a pipeline + displayName: Max Records Per Second + path: inputs[0].infrastructure.tuning.container.rateLimitPerContainer.maxRecordsPerSecond + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:number - description: Name used to refer to the input of a `pipeline`. displayName: Input Name path: inputs[0].name diff --git a/docs/reference/operator/api_logging_v1alpha1.adoc b/docs/reference/operator/api_logging_v1alpha1.adoc index 12bb4110cf..004ac8acfd 100644 --- a/docs/reference/operator/api_logging_v1alpha1.adoc +++ b/docs/reference/operator/api_logging_v1alpha1.adoc @@ -13,22 +13,22 @@ toc::[] ** This reference is generated from the content in the openshift/cluster-logging-operator repository. ** Do not modify the content here manually except for the metadata and section IDs - changes to the content should be made in the source code. //// - + [id="logging-6-x-reference-LogFileMetricExporter"] == LogFileMetricExporter A Log File Metric Exporter instance. LogFileMetricExporter is the Schema for the logFileMetricExporters API - + [options="header"] |====================== |Property|Type|Description - + |spec|object| |status|object| |====================== - + === .spec - + LogFileMetricExporterSpec defines the desired state of LogFileMetricExporter Type:: object @@ -36,24 +36,24 @@ Type:: object [options="header"] |====================== |Property|Type|Description - + |nodeSelector|object| *(optional)* Define which Nodes the Pods are scheduled on. |resources|object| *(optional)* The resource requirements for the LogFileMetricExporter |tolerations|array| *(optional)* Define the tolerations the Pods will accept |====================== - + === .spec.nodeSelector - + Type:: object - + === .spec.resources - + Type:: object [options="header"] |====================== |Property|Type|Description - + |claims|array| *(optional)* Claims lists the names of resources, defined in spec.resourceClaims, that are used by this container. @@ -69,36 +69,36 @@ If Requests is omitted for a container, it defaults to Limits if that is explici otherwise to an implementation-defined value. Requests cannot exceed Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ |====================== - + === .spec.resources.claims[] - + Type:: array [options="header"] |====================== |Property|Type|Description - + |name|string| Name must match the name of one entry in pod.spec.resourceClaims of the Pod where this field is used. It makes that resource available inside a container. |====================== - + === .spec.resources.limits - + Type:: object - + === .spec.resources.requests - + Type:: object - + === .spec.tolerations[] - + Type:: array [options="header"] |====================== |Property|Type|Description - + |effect|string| *(optional)* Effect indicates the taint effect to match. Empty means match all taint effects. When specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute. |key|string| *(optional)* Key is the taint key that the toleration applies to. Empty means match all taint keys. @@ -114,13 +114,13 @@ negative values will be treated as 0 (evict immediately) by the system. |value|string| *(optional)* Value is the taint value the toleration matches to. If the operator is Exists, the value should be empty, otherwise just a regular string. |====================== - + === .spec.tolerations[].tolerationSeconds - + Type:: int - + === .status - + LogFileMetricExporterStatus defines the observed state of LogFileMetricExporter Type:: object @@ -128,18 +128,18 @@ Type:: object [options="header"] |====================== |Property|Type|Description - + |conditions|array| Conditions of the Log File Metrics Exporter. |====================== - + === .status.conditions[] - + Type:: array [options="header"] |====================== |Property|Type|Description - + |lastTransitionTime|string| lastTransitionTime is the last time the condition transitioned from one status to another. This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. |message|string| message is a human readable message indicating details about the transition. @@ -159,4 +159,4 @@ Many .condition.type values are consistent across resources like Available, but useful (see .node.status.conditions), the ability to deconflict is important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) |====================== - + diff --git a/docs/reference/operator/api_observability_v1.adoc b/docs/reference/operator/api_observability_v1.adoc index 26535a4349..20df8ea8b6 100644 --- a/docs/reference/operator/api_observability_v1.adoc +++ b/docs/reference/operator/api_observability_v1.adoc @@ -628,11 +628,79 @@ Type:: object |====================== |Property|Type|Description +|maxMessageSize|object| MaxMessageSize The maximum message length in bytes that a single log event can be when all +partial log lines are merged. Messages exceeding this limit are dropped. |rateLimitPerContainer|object| RateLimitPerContainer is the limit applied to each container by this input. This limit is applied per collector deployment. |====================== +=== .spec.inputs[].application.tuning.maxMessageSize + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|Format|string| Change Format at will. See the comment for Canonicalize for +more details. +|d|object| d is the quantity in inf.Dec form if d.Dec != nil +|i|int| i is the quantity in int64 scaled form, if d.Dec == nil +|s|string| s is the generated value of this quantity to avoid recalculation +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|Dec|object| +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d.Dec + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|scale|int| +|unscaled|object| +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d.Dec.unscaled + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|abs|Word| sign +|neg|bool| +|====================== + +=== .spec.inputs[].application.tuning.maxMessageSize.d.Dec.unscaled.abs + +Type:: Word + +=== .spec.inputs[].application.tuning.maxMessageSize.i + +Type:: int + +[options="header"] +|====================== +|Property|Type|Description + +|scale|int| +|value|int| +|====================== + === .spec.inputs[].application.tuning.rateLimitPerContainer Type:: object @@ -683,6 +751,8 @@ Type:: object |sources|array| Sources defines the list of infrastructure sources to collect. This field is optional and omission results in the collection of all infrastructure sources. +|tuning|object| Tuning is the infrastructure input tuning spec, for now available only for container sources + |====================== === .spec.inputs[].infrastructure.sources[] @@ -691,6 +761,114 @@ InfrastructureSource defines the type of infrastructure log source to use. Type:: array +=== .spec.inputs[].infrastructure.tuning + +InfrastructureInputTuningSpec is the infrastructure input tuning spec, for now available only for container sources + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|container|object| Container is the input tuning spec for container sources + +|====================== + +=== .spec.inputs[].infrastructure.tuning.container + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|maxMessageSize|object| MaxMessageSize The maximum message length in bytes that a single log event can be when all +partial log lines are merged. Messages exceeding this limit are dropped. +|rateLimitPerContainer|object| RateLimitPerContainer is the limit applied to each container +by this input. This limit is applied per collector deployment. + +|====================== + +=== .spec.inputs[].infrastructure.tuning.container.maxMessageSize + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|Format|string| Change Format at will. See the comment for Canonicalize for +more details. +|d|object| d is the quantity in inf.Dec form if d.Dec != nil +|i|int| i is the quantity in int64 scaled form, if d.Dec == nil +|s|string| s is the generated value of this quantity to avoid recalculation +|====================== + +=== .spec.inputs[].infrastructure.tuning.container.maxMessageSize.d + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|Dec|object| +|====================== + +=== .spec.inputs[].infrastructure.tuning.container.maxMessageSize.d.Dec + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|scale|int| +|unscaled|object| +|====================== + +=== .spec.inputs[].infrastructure.tuning.container.maxMessageSize.d.Dec.unscaled + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|abs|Word| sign +|neg|bool| +|====================== + +=== .spec.inputs[].infrastructure.tuning.container.maxMessageSize.d.Dec.unscaled.abs + +Type:: Word + +=== .spec.inputs[].infrastructure.tuning.container.maxMessageSize.i + +Type:: int + +[options="header"] +|====================== +|Property|Type|Description + +|scale|int| +|value|int| +|====================== + +=== .spec.inputs[].infrastructure.tuning.container.rateLimitPerContainer + +Type:: object + +[options="header"] +|====================== +|Property|Type|Description + +|maxRecordsPerSecond|int| MaxRecordsPerSecond is the maximum number of log records +allowed per input/output in a pipeline + +|====================== + === .spec.inputs[].receiver ReceiverSpec is a union of input Receiver types. diff --git a/hack/generate-bundle.sh b/hack/generate-bundle.sh index 544439509a..a5ec6ca93a 100755 --- a/hack/generate-bundle.sh +++ b/hack/generate-bundle.sh @@ -1,4 +1,4 @@ -#!/usr/bin/bash +#!/usr/bin/env bash source .bingo/variables.env diff --git a/hack/revert-bundle.sh b/hack/revert-bundle.sh index f77a7ea6aa..0f8af90b95 100755 --- a/hack/revert-bundle.sh +++ b/hack/revert-bundle.sh @@ -1,4 +1,4 @@ -#!/usr/bin/bash +#!/usr/bin/env bash git diff --no-ext-diff --quiet -I'^ createdAt: ' bundle/manifests/cluster-logging.clusterserviceversion.yaml if ((! $?)) ; then diff --git a/internal/generator/vector/input/application_with_max_merge_line_size.toml b/internal/generator/vector/input/application_with_max_merge_line_size.toml new file mode 100644 index 0000000000..2818815e4a --- /dev/null +++ b/internal/generator/vector/input/application_with_max_merge_line_size.toml @@ -0,0 +1,29 @@ +# Logs from containers (including openshift containers) +[sources.input_my_app_container] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +max_merged_line_bytes = 1048576 +exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/default_*/*/*.log", "/var/log/pods/kube-*_*/*/*.log", "/var/log/pods/kube_*/*/*.log", "/var/log/pods/openshift-*_*/*/*.log", "/var/log/pods/openshift_*/*/*.log"] +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_secs = 5 +use_apiserver_cache = true + +[transforms.input_my_app_container_meta] +type = "remap" +inputs = ["input_my_app_container"] +source = ''' + .log_source = "container" + # If namespace is infra, label log_type as infra + if match_any(string!(.kubernetes.namespace_name), [r'^default$', r'^openshift(-.+)?$', r'^kube(-.+)?$']) { + .log_type = "infrastructure" + } else { + .log_type = "application" + } +''' \ No newline at end of file diff --git a/internal/generator/vector/input/infrastructure_container_with_max_merge_line_size.toml b/internal/generator/vector/input/infrastructure_container_with_max_merge_line_size.toml new file mode 100644 index 0000000000..b489f29524 --- /dev/null +++ b/internal/generator/vector/input/infrastructure_container_with_max_merge_line_size.toml @@ -0,0 +1,30 @@ +# Logs from containers (including openshift containers) +[sources.input_myinfra_container] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +max_merged_line_bytes = 1000000 +include_paths_glob_patterns = ["/var/log/pods/default_*/*/*.log", "/var/log/pods/kube-*_*/*/*.log", "/var/log/pods/kube_*/*/*.log", "/var/log/pods/openshift-*_*/*/*.log", "/var/log/pods/openshift_*/*/*.log"] +exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log"] +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_secs = 5 +use_apiserver_cache = true + +[transforms.input_myinfra_container_meta] +type = "remap" +inputs = ["input_myinfra_container"] +source = ''' + .log_source = "container" + # If namespace is infra, label log_type as infra + if match_any(string!(.kubernetes.namespace_name), [r'^default$', r'^openshift(-.+)?$', r'^kube(-.+)?$']) { + .log_type = "infrastructure" + } else { + .log_type = "application" + } +''' \ No newline at end of file diff --git a/internal/generator/vector/input/source.go b/internal/generator/vector/input/source.go index ad706eda23..99fb631295 100644 --- a/internal/generator/vector/input/source.go +++ b/internal/generator/vector/input/source.go @@ -158,19 +158,28 @@ func NewSource(input obs.InputSpec, collectorNS string, resNames factory.Forward func NewContainerSource(spec obs.InputSpec, namespace, includes, excludes string, logType obs.InputType, logSource interface{}) ([]framework.Element, []string) { base := helpers.MakeInputID(spec.Name, "container") var selector *metav1.LabelSelector + maxMsgSize := int64(0) if spec.Application != nil { selector = spec.Application.Selector + if spec.Application.Tuning != nil && spec.Application.Tuning.MaxMessageSize != nil { + if size, ok := spec.Application.Tuning.MaxMessageSize.AsInt64(); ok { + maxMsgSize = size + } + } + } + if spec.Infrastructure != nil { + if (len(spec.Infrastructure.Sources) == 0 || set.New(spec.Infrastructure.Sources...).Has(obs.InfrastructureSourceContainer)) && + spec.Infrastructure.Tuning != nil && spec.Infrastructure.Tuning.Container != nil && spec.Infrastructure.Tuning.Container.MaxMessageSize != nil { + if size, ok := spec.Infrastructure.Tuning.Container.MaxMessageSize.AsInt64(); ok { + maxMsgSize = size + } + } } metaID := helpers.MakeID(base, "meta") + k8sLogs := source.NewKubernetesLogs(base, includes, excludes, maxMsgSize) + k8sLogs.ExtraLabelSelector = source.LabelSelectorFrom(selector) el := []framework.Element{ - source.KubernetesLogs{ - ComponentID: base, - Desc: "Logs from containers (including openshift containers)", - IncludePaths: includes, - ExcludePaths: excludes, - ExtraLabelSelector: source.LabelSelectorFrom(selector), - UseKubeCache: true, - }, + k8sLogs, NewLogSourceAndType(metaID, logSource, logType, base, func(remap *elements.Remap) { remap.VRL = fmt.Sprintf( ` diff --git a/internal/generator/vector/input/source_test.go b/internal/generator/vector/input/source_test.go index 439d1442e5..ce15c4ee42 100644 --- a/internal/generator/vector/input/source_test.go +++ b/internal/generator/vector/input/source_test.go @@ -2,8 +2,11 @@ package input import ( "fmt" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" . "github.com/onsi/ginkgo" @@ -395,5 +398,30 @@ var _ = Describe("inputs", func() { }, "receiver_syslog_tls_from_configmap.toml", ), + Entry("application input with a MaxMessageSize", obs.InputSpec{ + Name: "my_app", + Type: obs.InputTypeApplication, + Application: &obs.Application{ + Tuning: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1Mi")), + }, + }, + }, + "application_with_max_merge_line_size.toml", + ), + Entry("infrastructure input with containers source and MaxMessageSize", obs.InputSpec{ + Name: "myinfra", + Type: obs.InputTypeInfrastructure, + Infrastructure: &obs.Infrastructure{ + Sources: []obs.InfrastructureSource{obs.InfrastructureSourceContainer}, + Tuning: &obs.InfrastructureInputTuningSpec{ + Container: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1M")), + }, + }, + }, + }, + "infrastructure_container_with_max_merge_line_size.toml", + ), ) }) diff --git a/internal/generator/vector/source/kubernetes_logs.go b/internal/generator/vector/source/kubernetes_logs.go index aa54f2e886..870e69b7ff 100644 --- a/internal/generator/vector/source/kubernetes_logs.go +++ b/internal/generator/vector/source/kubernetes_logs.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/openshift/cluster-logging-operator/internal/generator/framework" + "github.com/openshift/cluster-logging-operator/internal/generator/helpers" "github.com/openshift/cluster-logging-operator/internal/utils/sets" ) @@ -17,6 +18,7 @@ type KubernetesLogs struct { ExcludePaths string ExtraLabelSelector string UseKubeCache bool + MaxMergedLineBytes helpers.OptionalPair } func (kl KubernetesLogs) Name() string { @@ -31,6 +33,7 @@ type = "kubernetes_logs" max_read_bytes = 3145728 glob_minimum_cooldown_ms = 15000 auto_partial_merge = true +{{ .MaxMergedLineBytes }} {{- if gt (len .IncludePaths) 0}} include_paths_glob_patterns = {{.IncludePaths}} {{- end}} @@ -52,14 +55,18 @@ use_apiserver_cache = {{.UseKubeCache}} } // NewKubernetesLogs element which always excludes temp and gzip files -func NewKubernetesLogs(id, includes, excludes string) KubernetesLogs { - return KubernetesLogs{ +func NewKubernetesLogs(id, includes, excludes string, maxMergedLineBytes int64) KubernetesLogs { + logs := KubernetesLogs{ ComponentID: id, Desc: "Logs from containers (including openshift containers)", IncludePaths: includes, ExcludePaths: excludes, UseKubeCache: true, } + if maxMergedLineBytes > 0 { + logs.MaxMergedLineBytes = helpers.NewOptionalPair("max_merged_line_bytes", maxMergedLineBytes) + } + return logs } const ( diff --git a/internal/generator/vector/source/kubernetes_logs_max_merge_line_size.toml b/internal/generator/vector/source/kubernetes_logs_max_merge_line_size.toml new file mode 100644 index 0000000000..339b7da24c --- /dev/null +++ b/internal/generator/vector/source/kubernetes_logs_max_merge_line_size.toml @@ -0,0 +1,15 @@ +# Logs from containers (including openshift containers) +[sources.source_foo] +type = "kubernetes_logs" +max_read_bytes = 3145728 +glob_minimum_cooldown_ms = 15000 +auto_partial_merge = true +max_merged_line_bytes = 1000000 +pod_annotation_fields.pod_labels = "kubernetes.labels" +pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" +pod_annotation_fields.pod_annotations = "kubernetes.annotations" +pod_annotation_fields.pod_uid = "kubernetes.pod_id" +pod_annotation_fields.pod_node_name = "hostname" +namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" +rotate_wait_secs = 5 +use_apiserver_cache = true diff --git a/internal/generator/vector/source/kubernetes_logs_test.go b/internal/generator/vector/source/kubernetes_logs_test.go index bb13edcb79..ec943119ca 100644 --- a/internal/generator/vector/source/kubernetes_logs_test.go +++ b/internal/generator/vector/source/kubernetes_logs_test.go @@ -2,6 +2,7 @@ package source import ( "fmt" + . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" @@ -10,25 +11,33 @@ import ( ) var _ = Describe("source", func() { - DescribeTable("#NewKubernetesLogs", func(includes, excludes string, expFile string) { + DescribeTable("#NewKubernetesLogs", func(includes, excludes string, maxMergedLineBytes int64, expFile string) { exp, err := tomlContent.ReadFile(expFile) if err != nil { Fail(fmt.Sprintf("Error reading the file %q with exp config: %v", expFile, err)) } id := helpers.MakeID("source", "foo") - conf := NewKubernetesLogs(id, includes, excludes) //, includeNS, excludes) + conf := NewKubernetesLogs(id, includes, excludes, maxMergedLineBytes) //, includeNS, excludes) Expect(string(exp)).To(EqualConfigFrom(conf), fmt.Sprintf("for exp. file %s", expFile)) }, Entry("should exclude includes/excludes globs from the config when they are empty", "", "", + int64(0), "kubernetes_logs_no_includes_excludes.toml", ), Entry("should use includes/excludes globs from the config when they exist", `["/var/log/pods/foo"]`, `["/var/log/pods/bar"]`, + int64(0), "kubernetes_logs_with_includes.toml", ), + Entry("should set correct value for max_merge_line_size", + "", + "", + int64(1000_000), + "kubernetes_logs_max_merge_line_size.toml", + ), ) DescribeTable("#normalizeNamespace", func(ns, exp string) { diff --git a/internal/validations/observability/inputs/infrastructure.go b/internal/validations/observability/inputs/infrastructure.go index e34ed02b72..f57f378a12 100644 --- a/internal/validations/observability/inputs/infrastructure.go +++ b/internal/validations/observability/inputs/infrastructure.go @@ -2,9 +2,12 @@ package inputs import ( "fmt" + "strings" + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" . "github.com/openshift/cluster-logging-operator/internal/api/observability" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/set" ) func ValidateInfrastructure(spec obs.InputSpec) []metav1.Condition { @@ -22,6 +25,18 @@ func ValidateInfrastructure(spec obs.InputSpec) []metav1.Condition { NewConditionFromPrefix(obs.ConditionTypeValidInputPrefix, spec.Name, false, obs.ReasonValidationFailure, fmt.Sprintf("%s must define at least one valid source", spec.Name)), } } + if !set.New(spec.Infrastructure.Sources...).Has(obs.InfrastructureSourceContainer) && spec.Infrastructure.Tuning != nil && + spec.Infrastructure.Tuning.Container != nil && spec.Infrastructure.Tuning.Container.MaxMessageSize != nil { + sources := make([]string, len(spec.Infrastructure.Sources)) + for i, s := range spec.Infrastructure.Sources { + sources[i] = string(s) + } + strSources := strings.Join(sources, ",") + + return []metav1.Condition{ + NewConditionFromPrefix(obs.ConditionTypeValidInputPrefix, spec.Name, false, obs.ReasonValidationFailure, fmt.Sprintf("%s tuning section available only for \"container\" source type, but found %s", spec.Name, strSources)), + } + } return []metav1.Condition{ NewConditionFromPrefix(obs.ConditionTypeValidInputPrefix, spec.Name, true, obs.ReasonValidationSuccess, fmt.Sprintf("input %q is valid", spec.Name)), } diff --git a/internal/validations/observability/inputs/infrastructure_test.go b/internal/validations/observability/inputs/infrastructure_test.go index 15f6ae23d0..a006e6f928 100644 --- a/internal/validations/observability/inputs/infrastructure_test.go +++ b/internal/validations/observability/inputs/infrastructure_test.go @@ -4,7 +4,9 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" . "github.com/openshift/cluster-logging-operator/test/matchers" + "k8s.io/apimachinery/pkg/api/resource" ) var _ = Describe("#ValidateInfrastructure", func() { @@ -41,4 +43,26 @@ var _ = Describe("#ValidateInfrastructure", func() { input.Infrastructure.Sources = []obs.InfrastructureSource{} Expect(ValidateInfrastructure(input)).To(HaveCondition(expConditionTypeRE, false, obs.ReasonValidationFailure, "must define at least one valid source")) }) + It("should pass for valid infrastructure input with container source and MaxMessageSize", func() { + input.Infrastructure = &obs.Infrastructure{ + Sources: []obs.InfrastructureSource{obs.InfrastructureSourceContainer}, + Tuning: &obs.InfrastructureInputTuningSpec{ + Container: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1Mi")), + }, + }, + } + Expect(ValidateInfrastructure(input)).To(HaveCondition(expConditionTypeRE, true, obs.ReasonValidationSuccess, `input.*is valid`)) + }) + It("should fail for valid infrastructure input with node source and MaxMessageSiz", func() { + input.Infrastructure = &obs.Infrastructure{ + Sources: []obs.InfrastructureSource{obs.InfrastructureSourceNode}, + Tuning: &obs.InfrastructureInputTuningSpec{ + Container: &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("1Mi")), + }, + }, + } + Expect(ValidateInfrastructure(input)).To(HaveCondition(expConditionTypeRE, false, obs.ReasonValidationFailure, "tuning section available only for \"container\" source type, but found node")) + }) }) diff --git a/test/framework/functional/output_splunk.go b/test/framework/functional/output_splunk.go index 38e854011d..0842e177b3 100644 --- a/test/framework/functional/output_splunk.go +++ b/test/framework/functional/output_splunk.go @@ -130,6 +130,40 @@ func GenerateConfigmapData() (data map[string]string, err error) { return data, nil } +func (f *CollectorFunctionalFramework) SplunkHealthCheck() (string, error) { + var output string + cmd := fmt.Sprintf(`curl http://localhost:%d/services/collector/health/1.0 -H "Authorization: Splunk %s"`, SplunkHecPort, HecToken) + err := wait.PollUntilContextTimeout(context.TODO(), defaultRetryInterval, f.GetMaxReadDuration(), true, func(cxt context.Context) (done bool, err error) { + output, err = oc.Exec().WithNamespace(f.Namespace).Pod(f.Name).Container(string(obs.OutputTypeSplunk)).WithCmd("/bin/sh", "-c", cmd).Run() + if output == "" || err != nil { + return false, err + } + return true, nil + }) + + if err != nil { + return err.Error(), err + } + return output, nil +} + +func (f *CollectorFunctionalFramework) ReadSplunkStatus() (string, error) { + var output string + cmd := "/opt/splunk/bin/splunk status" + err := wait.PollUntilContextTimeout(context.TODO(), defaultRetryInterval, f.GetMaxReadDuration(), true, func(cxt context.Context) (done bool, err error) { + output, err = oc.Exec().WithNamespace(f.Namespace).Pod(f.Name).Container(string(obs.OutputTypeSplunk)).WithCmd("/bin/sh", "-c", cmd).Run() + if output == "" || err != nil { + return false, err + } + return true, nil + }) + + if err != nil { + return err.Error(), err + } + return output, nil +} + func (f *CollectorFunctionalFramework) ReadLogsByTypeFromSplunk(namespace, name, logType string) (results []string, err error) { var output string cmd := fmt.Sprintf(`/opt/splunk/bin/splunk search log_type=%s -auth "admin:%s"`, logType, AdminPassword) diff --git a/test/framework/functional/write.go b/test/framework/functional/write.go index 5678eea689..868c081d03 100644 --- a/test/framework/functional/write.go +++ b/test/framework/functional/write.go @@ -134,6 +134,31 @@ func (f *CollectorFunctionalFramework) WritesApplicationLogsWithDelay(numOfLogs return f.WritesNApplicationLogsOfSize(numOfLogs, 100, delay) } +func (f *CollectorFunctionalFramework) WriteApplicationLogOfSizeAsPartials(size int) error { + partialLimit := 1000 + partialMsg := "$(date -u +'%Y-%m-%dT%H:%M:%S.%N%:z') stdout P $msg " + numOfLogs := size / partialLimit + + file := fmt.Sprintf("%s/%s_%s_%s/%s/0.log", fileLogPaths[applicationLog], f.Pod.Namespace, f.Pod.Name, f.Pod.UID, constants.CollectorName) + logPath := filepath.Dir(file) + if numOfLogs > 1 { + log.V(3).Info("Writing message to app log with path", "path", logPath) + result, err := f.RunCommand(constants.CollectorName, "bash", "-c", + fmt.Sprintf("bash -c 'mkdir -p %s;msg=$(cat /dev/urandom|tr -dc 'a-zA-Z0-9'|fold -w %d|head -n 1);for n in $(seq 1 %d);do echo %s >> %s; done'", logPath, partialLimit, numOfLogs, partialMsg, file)) + log.V(3).Info("WriteApplicationLogOfSizeAsPartials: partials", "namespace", f.Pod.Namespace, "result", result, "err", err) + if err != nil { + return err + } + } + + finalMsg := "$(date -u +'%Y-%m-%dT%H:%M:%S.%N%:z') stdout F $msg " + finalLength := size - (numOfLogs * partialLimit) + result, err := f.RunCommand(constants.CollectorName, "bash", "-c", + fmt.Sprintf("bash -c 'mkdir -p %s;msg=$(cat /dev/urandom|tr -dc 'a-zA-Z0-9'|fold -w %d|head -n 1); echo %s >> %s'", logPath, finalLength, finalMsg, file)) + log.V(3).Info("WriteApplicationLogOfSizeAsPartials: full", "namespace", f.Pod.Namespace, "result", result, "err", err) + return err +} + func (f *CollectorFunctionalFramework) WritesNApplicationLogsOfSize(numOfLogs, size int, delay float32) error { msg := "$(date -u +'%Y-%m-%dT%H:%M:%S.%N%:z') stdout F $msg " file := fmt.Sprintf("%s/%s_%s_%s/%s/0.log", fileLogPaths[applicationLog], f.Pod.Namespace, f.Pod.Name, f.Pod.UID, constants.CollectorName) diff --git a/test/functional/outputs/splunk/forward_to_splunk_test.go b/test/functional/outputs/splunk/forward_to_splunk_test.go index 0f77108fea..ea857fef77 100644 --- a/test/functional/outputs/splunk/forward_to_splunk_test.go +++ b/test/functional/outputs/splunk/forward_to_splunk_test.go @@ -2,33 +2,39 @@ package splunk import ( "fmt" - "strings" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/internal/utils" + "github.com/openshift/cluster-logging-operator/test/helpers/splunk" + "github.com/openshift/cluster-logging-operator/test/helpers/types" + obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" + "k8s.io/apimachinery/pkg/api/resource" + + "strings" + internalobs "github.com/openshift/cluster-logging-operator/internal/api/observability" "github.com/openshift/cluster-logging-operator/internal/constants" "github.com/openshift/cluster-logging-operator/internal/runtime" "github.com/openshift/cluster-logging-operator/test/framework/functional" - "github.com/openshift/cluster-logging-operator/test/helpers/types" - obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" - v1 "k8s.io/api/core/v1" ) +const SplunkSecretName = "splunk-secret" + var _ = Describe("Forwarding to Splunk", func() { - const splunkSecretName = "splunk-secret" var ( framework *functional.CollectorFunctionalFramework secret *v1.Secret - hecSecretKey = *internalobs.NewSecretReference(constants.SplunkHECTokenKey, splunkSecretName) + hecSecretKey = *internalobs.NewSecretReference(constants.SplunkHECTokenKey, SplunkSecretName) ) + BeforeEach(func() { framework = functional.NewCollectorFunctionalFramework() - secret = runtime.NewSecret(framework.Namespace, splunkSecretName, + secret = runtime.NewSecret(framework.Namespace, SplunkSecretName, map[string][]byte{ "hecToken": functional.HecToken, }, @@ -49,7 +55,7 @@ var _ = Describe("Forwarding to Splunk", func() { Expect(framework.Deploy()).To(BeNil()) // Wait for splunk to be ready - time.Sleep(90 * time.Second) + splunk.WaitOnSplunk(framework) // Write app logs timestamp := "2020-11-04T18:13:59.061892+00:00" @@ -71,6 +77,53 @@ var _ = Describe("Forwarding to Splunk", func() { Expect(outputTestLog.LogType).To(Equal(string(obs.InputTypeApplication))) }) + It("should not send application logs more than 64Ki", func() { + obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeApplication, func(spec *obs.InputSpec) { + spec.Name = "small-msg-app" + spec.Application.Tuning = &obs.ContainerInputTuningSpec{ + MaxMessageSize: utils.GetPtr(resource.MustParse("64Ki")), + } + }). + ToSplunkOutput(hecSecretKey, func(output *obs.OutputSpec) { + output.Splunk.Index = "main" + }) + framework.Secrets = append(framework.Secrets, secret) + Expect(framework.Deploy()).To(BeNil()) + + // Wait for splunk to be ready + splunk.WaitOnSplunk(framework) + + // Write app logs + timestamp := "2020-11-04T18:13:59.061892+00:00" + applicationLogLine := functional.NewCRIOLogMessage(timestamp, "This is my test message", false) + Expect(framework.WriteMessagesToApplicationLog(applicationLogLine, 1)).To(BeNil()) + + // Write large app logs + Expect(framework.WriteApplicationLogOfSizeAsPartials(65 * 1024)).To(BeNil()) + + //one more normal app log + Expect(framework.WriteMessagesToApplicationLog(applicationLogLine, 1)).To(BeNil()) + + // Parse the logs + var appLogs []types.ApplicationLog + logs, err := framework.ReadLogsByTypeFromSplunk(framework.Namespace, framework.Name, string(obs.InputTypeApplication)) + Expect(err).To(BeNil()) + jsonString := fmt.Sprintf("[%s]", strings.Join(logs, ",")) + err = types.ParseLogsFrom(jsonString, &appLogs, false) + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + + Expect(appLogs).To(HaveLen(2)) + collectorLog, err := framework.ReadCollectorLogs() + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + Expect(collectorLog).To( + And( + ContainSubstring("Found line that exceeds max_merged_line_bytes; discarding."), + ContainSubstring("configured_limit=65536"), + ), + ) + }) + It("should accept audit logs without timestamp unexpected type warning (see: https://issues.redhat.com/browse/LOG-4672)", func() { obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder). FromInput(obs.InputTypeAudit). @@ -81,7 +134,7 @@ var _ = Describe("Forwarding to Splunk", func() { Expect(framework.Deploy()).To(BeNil()) // Wait for splunk to be ready - time.Sleep(90 * time.Second) + splunk.WaitOnSplunk(framework) // Write audit logs timestamp, _ := time.Parse(time.RFC3339Nano, "2024-04-16T09:46:19.116+00:00") @@ -125,7 +178,7 @@ var _ = Describe("Forwarding to Splunk", func() { Expect(framework.Deploy()).To(BeNil()) // Wait for splunk to be ready - time.Sleep(90 * time.Second) + splunk.WaitOnSplunk(framework) // Write app logs timestamp := "2020-11-04T18:13:59.061892+00:00" @@ -168,7 +221,7 @@ var _ = Describe("Forwarding to Splunk", func() { Expect(framework.Deploy()).To(BeNil()) // Wait for splunk to be ready - time.Sleep(90 * time.Second) + splunk.WaitOnSplunk(framework) // Write app logs timestamp := "2020-11-04T18:13:59.061892+00:00" diff --git a/test/helpers/splunk/splunk.go b/test/helpers/splunk/splunk.go new file mode 100644 index 0000000000..e0ae3c419e --- /dev/null +++ b/test/helpers/splunk/splunk.go @@ -0,0 +1,33 @@ +package splunk + +import ( + "time" + + . "github.com/onsi/gomega" + "github.com/openshift/cluster-logging-operator/test/framework/functional" +) + +// WaitOnSplunk waits for Splunk to be ready by checking HEC health and service status +func WaitOnSplunk(f *functional.CollectorFunctionalFramework) { + time.Sleep(20 * time.Second) + Eventually(func() string { + // Run the Splunk CLI status command to check if splunkd is running + output, err := f.SplunkHealthCheck() + if err != nil { + return output + } + return output + }, 90*time.Second, 3*time.Second).Should(ContainSubstring("HEC is healthy")) + time.Sleep(1 * time.Second) + Eventually(func() string { + // Run the Splunk CLI status command to check if splunkd is running + output, err := f.ReadSplunkStatus() + if err != nil { + return output + } + return output + }, 15*time.Second, 3*time.Second).Should(SatisfyAll( + ContainSubstring("splunkd is running"), + ContainSubstring("splunk helpers are running"), + )) +}