Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/observability/v1/output_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,12 @@ type HTTP struct {
// +kubebuilder:validation:Optional
// +kubebuilder:validation:XValidation:rule="self == '' || isURL(self)", message="invalid URL"
ProxyURL string `json:"proxyURL,omitempty"`

// LinePerEvent uses NDJSON instead of JSON to send data to remote destination.
//
// +kubebuilder:validation:Optional
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Line Per Event"
LinePerEvent bool `json:"line_per_event,omitempty"`
}

type KafkaTuningSpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2517,6 +2517,10 @@ spec:
description: Headers specify optional headers to be sent
with the request
type: object
line_per_event:
description: LinePerEvent uses NDJSON instead of JSON to
send data to remote destination.
type: boolean
method:
description: Method specifies the HTTP method to be used
for sending logs. If not set, 'POST' is used.
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/operator/api_observability_v1.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2410,6 +2410,8 @@ The 'username@password' part of `url` is ignored.

|headers|object| Headers specify optional headers to be sent with the request

|line_per_event|bool| LinePerEvent uses NDJSON instead of JSON to send data to remote destination.

|method|string| Method specifies the HTTP method to be used for sending logs. If not set, 'POST' is used.

|proxyURL|string| ProxyURL URL of a HTTP or HTTPS proxy to be used instead of direct connection.
Expand Down
27 changes: 16 additions & 11 deletions internal/generator/vector/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type Http struct {
ComponentID string
Inputs string
URI string
Method string
Proxy string
ComponentID string
Inputs string
URI string
Method string
Proxy string
LinePerEvent bool
common.RootMixin
}

Expand All @@ -33,6 +34,9 @@ type = "http"
inputs = {{.Inputs}}
uri = "{{.URI}}"
method = "{{.Method}}"
{{with .LinePerEvent}}
framing.method = "newline_delimited"
{{end}}
{{with .Proxy -}}
proxy.enabled = true
proxy.http = "{{.}}"
Expand Down Expand Up @@ -76,12 +80,13 @@ func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Sec

func Output(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, op Options) *Http {
return &Http{
ComponentID: id,
Inputs: vectorhelpers.MakeInputs(inputs...),
URI: o.HTTP.URL,
Method: Method(o.HTTP),
Proxy: o.HTTP.ProxyURL,
RootMixin: common.NewRootMixin(nil),
ComponentID: id,
Inputs: vectorhelpers.MakeInputs(inputs...),
URI: o.HTTP.URL,
Method: Method(o.HTTP),
Proxy: o.HTTP.ProxyURL,
LinePerEvent: o.HTTP.LinePerEvent,
RootMixin: common.NewRootMixin(nil),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/generator/vector/output/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ var _ = Describe("Generate vector config", func() {
BaseOutputTuningSpec: *baseTune,
}
}, secrets, true, framework.NoOptions, "http_with_tuning.toml"),
Entry("with ndjson", func(spec *obs.OutputSpec) {
spec.HTTP.LinePerEvent = true
}, secrets, true, framework.NoOptions, "http_with_ndjson.toml"),
Entry("with proxy", func(spec *obs.OutputSpec) {
spec.HTTP.ProxyURL = "http://somewhere.org/proxy"
spec.HTTP.Headers = nil
Expand Down
20 changes: 20 additions & 0 deletions internal/generator/vector/output/http/http_with_ndjson.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[sinks.http_receiver]
type = "http"
inputs = ["application"]
uri = "https://my-logstore.com"
method = "post"
framing.method = "newline_delimited"

[sinks.http_receiver.encoding]
codec = "json"
except_fields = ["_internal"]

[sinks.http_receiver.request]
headers = {"h1"="v1","h2"="v2"}

[sinks.http_receiver.auth]
strategy = "basic"
user = "SECRET[kubernetes_secret.http-receiver/username]"
password = "SECRET[kubernetes_secret.http-receiver/password]"


10 changes: 8 additions & 2 deletions test/framework/functional/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,14 @@ func (f *CollectorFunctionalFramework) addOutputContainers(b *runtime.PodBuilder
}
}
case obs.OutputTypeHTTP:
if err := f.AddVectorHttpOutput(b, output); err != nil {
return err
if output.HTTP.LinePerEvent {
if err := f.AddVLOutput(b, output, nil); err != nil {
return err
}
} else {
if err := f.AddVectorHttpOutput(b, output); err != nil {
return err
}
}
case obs.OutputTypeSplunk:
if err := f.AddSplunkOutput(b, output); err != nil {
Expand Down
21 changes: 16 additions & 5 deletions test/framework/functional/output_victorialogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@ func (f *CollectorFunctionalFramework) AddVLOutput(b *runtime.PodBuilder, output
log.V(2).Info("Adding output for victorialogs", "name", output.Name)
name := strings.ToLower(output.Name)

esURL, err := url.Parse(output.Elasticsearch.URL)
if err != nil {
return err
port := "9428"
switch output.Type {
case obs.OutputTypeElasticsearch:
u, err := url.Parse(output.Elasticsearch.URL)
if err != nil {
return err
}
port = u.Port()
case obs.OutputTypeHTTP:
u, err := url.Parse(output.HTTP.URL)
if err != nil {
return err
}
port = u.Port()
}

log.V(2).Info("Adding container", "name", name)
log.V(2).Info("Adding VictoriaLogs output container", "name", obs.OutputTypeElasticsearch)
log.V(2).Info("Adding VictoriaLogs output container", "name", output.Type)

cmdArgs := []string{
"-httpListenAddr=:" + esURL.Port(),
"-httpListenAddr=:" + port,
"-storageDataPath=/tmp/logs",
}
if len(args) > 0 {
Expand Down
79 changes: 79 additions & 0 deletions test/functional/outputs/http/forward_to_victorialogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package http

import (
"sort"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
"github.com/openshift/cluster-logging-operator/test/framework/functional"

"github.com/openshift/cluster-logging-operator/test/helpers/types"
obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
)

var _ = Describe("[Functional][Outputs][HTTP] Logforwarding to VictoriaLogs", func() {

var (
framework *functional.CollectorFunctionalFramework

// Template expected as output Log
outputLogTemplate = functional.NewApplicationLogTemplate()
)

Context("should write to victorialogs", func() {
DescribeTable("with custom headers", func(headers map[string]string) {
outputLogTemplate.ViaqIndexName = "app-write"
framework = functional.NewCollectorFunctionalFramework()
obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
FromInput(obs.InputTypeApplication).
ToHttpOutput(func(output *obs.OutputSpec) {
output.HTTP.URL = "http://0.0.0.0:9428/insert/jsonline"
output.HTTP.LinePerEvent = true
output.HTTP.Headers = headers
})
defer framework.Cleanup()
Expect(framework.Deploy()).To(BeNil())
timestamp := functional.CRIOTime(time.Now())
ukr := "привіт "
jp := "こんにちは "
ch := "你好"
msg := functional.NewCRIOLogMessage(timestamp, ukr+jp+ch, false)
Expect(framework.WriteMessagesToApplicationLog(msg, 10)).To(BeNil())
Expect(framework.WriteMessagesWithNotUTF8SymbolsToLog()).To(BeNil())
requestHeaders := map[string]string{
"AccountID": "0",
"ProjectID": "0",
}
for headerName := range requestHeaders {
if v, ok := headers[headerName]; ok {
requestHeaders[headerName] = v
}
}
raw, err := framework.GetLogsFromVL(string(obs.OutputTypeHTTP), requestHeaders)
Expect(err).To(BeNil(), "Expected no errors reading the logs")
Expect(raw).To(Not(BeEmpty()))
// Parse log line
var logs []map[string]string
err = types.StrictlyParseLogsFromSlice(raw, &logs)
Expect(err).To(BeNil(), "Expected no errors parsing the logs")
Expect(len(logs)).To(Equal(11))
//sort log by time before matching
sort.Slice(logs, func(i, j int) bool {
return strings.Compare(logs[i]["_time"], logs[j]["_time"]) < 0
})

Expect(logs[0]["_msg"]).To(Equal(ukr + jp + ch))
Expect(logs[10]["_msg"]).To(Equal("������������"))
},
Entry("Non-default tenant ID", map[string]string{
"AccountID": "10",
"ProjectID": "10",
"VL-Msg-Field": "message",
}),
)
})
})