diff --git a/api/observability/v1/output_types.go b/api/observability/v1/output_types.go index ddaaf159b..db7d14764 100644 --- a/api/observability/v1/output_types.go +++ b/api/observability/v1/output_types.go @@ -698,6 +698,12 @@ type HTTP struct { // +kubebuilder:validation:Optional // +kubebuilder:validation:XValidation:rule="self == '' || isURL(self)", message="invalid URL" ProxyURL string `json:"proxyURL,omitempty"` + + // LinePerEvent uses NDJSON instead of JSON to send data to remote destination. + // + // +kubebuilder:validation:Optional + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Line Per Event" + LinePerEvent bool `json:"line_per_event,omitempty"` } type KafkaTuningSpec struct { diff --git a/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml b/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml index 1094b2718..4f4ff6b7e 100644 --- a/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml +++ b/config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml @@ -2517,6 +2517,10 @@ spec: description: Headers specify optional headers to be sent with the request type: object + line_per_event: + description: LinePerEvent uses NDJSON instead of JSON to + send data to remote destination. + type: boolean method: description: Method specifies the HTTP method to be used for sending logs. If not set, 'POST' is used. diff --git a/docs/reference/operator/api_observability_v1.adoc b/docs/reference/operator/api_observability_v1.adoc index 042b544bb..374f104ab 100644 --- a/docs/reference/operator/api_observability_v1.adoc +++ b/docs/reference/operator/api_observability_v1.adoc @@ -2410,6 +2410,8 @@ The 'username@password' part of `url` is ignored. |headers|object| Headers specify optional headers to be sent with the request +|line_per_event|bool| LinePerEvent uses NDJSON instead of JSON to send data to remote destination. + |method|string| Method specifies the HTTP method to be used for sending logs. If not set, 'POST' is used. |proxyURL|string| ProxyURL URL of a HTTP or HTTPS proxy to be used instead of direct connection. diff --git a/internal/generator/vector/output/http/http.go b/internal/generator/vector/output/http/http.go index f10826278..1a75acbed 100644 --- a/internal/generator/vector/output/http/http.go +++ b/internal/generator/vector/output/http/http.go @@ -14,11 +14,12 @@ import ( ) type Http struct { - ComponentID string - Inputs string - URI string - Method string - Proxy string + ComponentID string + Inputs string + URI string + Method string + Proxy string + LinePerEvent bool common.RootMixin } @@ -33,6 +34,9 @@ type = "http" inputs = {{.Inputs}} uri = "{{.URI}}" method = "{{.Method}}" +{{with .LinePerEvent}} +framing.method = "newline_delimited" +{{end}} {{with .Proxy -}} proxy.enabled = true proxy.http = "{{.}}" @@ -76,12 +80,13 @@ func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Sec func Output(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, op Options) *Http { return &Http{ - ComponentID: id, - Inputs: vectorhelpers.MakeInputs(inputs...), - URI: o.HTTP.URL, - Method: Method(o.HTTP), - Proxy: o.HTTP.ProxyURL, - RootMixin: common.NewRootMixin(nil), + ComponentID: id, + Inputs: vectorhelpers.MakeInputs(inputs...), + URI: o.HTTP.URL, + Method: Method(o.HTTP), + Proxy: o.HTTP.ProxyURL, + LinePerEvent: o.HTTP.LinePerEvent, + RootMixin: common.NewRootMixin(nil), } } diff --git a/internal/generator/vector/output/http/http_test.go b/internal/generator/vector/output/http/http_test.go index ad16aff48..80a9764d4 100644 --- a/internal/generator/vector/output/http/http_test.go +++ b/internal/generator/vector/output/http/http_test.go @@ -143,6 +143,9 @@ var _ = Describe("Generate vector config", func() { BaseOutputTuningSpec: *baseTune, } }, secrets, true, framework.NoOptions, "http_with_tuning.toml"), + Entry("with ndjson", func(spec *obs.OutputSpec) { + spec.HTTP.LinePerEvent = true + }, secrets, true, framework.NoOptions, "http_with_ndjson.toml"), Entry("with proxy", func(spec *obs.OutputSpec) { spec.HTTP.ProxyURL = "http://somewhere.org/proxy" spec.HTTP.Headers = nil diff --git a/internal/generator/vector/output/http/http_with_ndjson.toml b/internal/generator/vector/output/http/http_with_ndjson.toml new file mode 100644 index 000000000..41408afcc --- /dev/null +++ b/internal/generator/vector/output/http/http_with_ndjson.toml @@ -0,0 +1,20 @@ +[sinks.http_receiver] +type = "http" +inputs = ["application"] +uri = "https://my-logstore.com" +method = "post" +framing.method = "newline_delimited" + +[sinks.http_receiver.encoding] +codec = "json" +except_fields = ["_internal"] + +[sinks.http_receiver.request] +headers = {"h1"="v1","h2"="v2"} + +[sinks.http_receiver.auth] +strategy = "basic" +user = "SECRET[kubernetes_secret.http-receiver/username]" +password = "SECRET[kubernetes_secret.http-receiver/password]" + + diff --git a/test/framework/functional/framework.go b/test/framework/functional/framework.go index 8d5e46b9b..1f6001285 100644 --- a/test/framework/functional/framework.go +++ b/test/framework/functional/framework.go @@ -437,8 +437,14 @@ func (f *CollectorFunctionalFramework) addOutputContainers(b *runtime.PodBuilder } } case obs.OutputTypeHTTP: - if err := f.AddVectorHttpOutput(b, output); err != nil { - return err + if output.HTTP.LinePerEvent { + if err := f.AddVLOutput(b, output, nil); err != nil { + return err + } + } else { + if err := f.AddVectorHttpOutput(b, output); err != nil { + return err + } } case obs.OutputTypeSplunk: if err := f.AddSplunkOutput(b, output); err != nil { diff --git a/test/framework/functional/output_victorialogs.go b/test/framework/functional/output_victorialogs.go index 3a1c98761..06bb6b4d4 100644 --- a/test/framework/functional/output_victorialogs.go +++ b/test/framework/functional/output_victorialogs.go @@ -19,16 +19,27 @@ func (f *CollectorFunctionalFramework) AddVLOutput(b *runtime.PodBuilder, output log.V(2).Info("Adding output for victorialogs", "name", output.Name) name := strings.ToLower(output.Name) - esURL, err := url.Parse(output.Elasticsearch.URL) - if err != nil { - return err + port := "9428" + switch output.Type { + case obs.OutputTypeElasticsearch: + u, err := url.Parse(output.Elasticsearch.URL) + if err != nil { + return err + } + port = u.Port() + case obs.OutputTypeHTTP: + u, err := url.Parse(output.HTTP.URL) + if err != nil { + return err + } + port = u.Port() } log.V(2).Info("Adding container", "name", name) - log.V(2).Info("Adding VictoriaLogs output container", "name", obs.OutputTypeElasticsearch) + log.V(2).Info("Adding VictoriaLogs output container", "name", output.Type) cmdArgs := []string{ - "-httpListenAddr=:" + esURL.Port(), + "-httpListenAddr=:" + port, "-storageDataPath=/tmp/logs", } if len(args) > 0 { diff --git a/test/functional/outputs/http/forward_to_victorialogs_test.go b/test/functional/outputs/http/forward_to_victorialogs_test.go new file mode 100644 index 000000000..add1f682d --- /dev/null +++ b/test/functional/outputs/http/forward_to_victorialogs_test.go @@ -0,0 +1,79 @@ +package http + +import ( + "sort" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + obs "github.com/openshift/cluster-logging-operator/api/observability/v1" + "github.com/openshift/cluster-logging-operator/test/framework/functional" + + "github.com/openshift/cluster-logging-operator/test/helpers/types" + obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability" +) + +var _ = Describe("[Functional][Outputs][HTTP] Logforwarding to VictoriaLogs", func() { + + var ( + framework *functional.CollectorFunctionalFramework + + // Template expected as output Log + outputLogTemplate = functional.NewApplicationLogTemplate() + ) + + Context("should write to victorialogs", func() { + DescribeTable("with custom headers", func(headers map[string]string) { + outputLogTemplate.ViaqIndexName = "app-write" + framework = functional.NewCollectorFunctionalFramework() + obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder). + FromInput(obs.InputTypeApplication). + ToHttpOutput(func(output *obs.OutputSpec) { + output.HTTP.URL = "http://0.0.0.0:9428/insert/jsonline" + output.HTTP.LinePerEvent = true + output.HTTP.Headers = headers + }) + defer framework.Cleanup() + Expect(framework.Deploy()).To(BeNil()) + timestamp := functional.CRIOTime(time.Now()) + ukr := "привіт " + jp := "こんにちは " + ch := "你好" + msg := functional.NewCRIOLogMessage(timestamp, ukr+jp+ch, false) + Expect(framework.WriteMessagesToApplicationLog(msg, 10)).To(BeNil()) + Expect(framework.WriteMessagesWithNotUTF8SymbolsToLog()).To(BeNil()) + requestHeaders := map[string]string{ + "AccountID": "0", + "ProjectID": "0", + } + for headerName := range requestHeaders { + if v, ok := headers[headerName]; ok { + requestHeaders[headerName] = v + } + } + raw, err := framework.GetLogsFromVL(string(obs.OutputTypeHTTP), requestHeaders) + Expect(err).To(BeNil(), "Expected no errors reading the logs") + Expect(raw).To(Not(BeEmpty())) + // Parse log line + var logs []map[string]string + err = types.StrictlyParseLogsFromSlice(raw, &logs) + Expect(err).To(BeNil(), "Expected no errors parsing the logs") + Expect(len(logs)).To(Equal(11)) + //sort log by time before matching + sort.Slice(logs, func(i, j int) bool { + return strings.Compare(logs[i]["_time"], logs[j]["_time"]) < 0 + }) + + Expect(logs[0]["_msg"]).To(Equal(ukr + jp + ch)) + Expect(logs[10]["_msg"]).To(Equal("������������")) + }, + Entry("Non-default tenant ID", map[string]string{ + "AccountID": "10", + "ProjectID": "10", + "VL-Msg-Field": "message", + }), + ) + }) +})