Skip to content

Commit 8d8aa0f

Browse files
LOG-7892: support ndjson for http sink
1 parent 05eea16 commit 8d8aa0f

File tree

9 files changed

+154
-18
lines changed

9 files changed

+154
-18
lines changed

api/observability/v1/output_types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,12 @@ type HTTP struct {
675675
// +kubebuilder:validation:Optional
676676
// +kubebuilder:validation:XValidation:rule="self == '' || isURL(self)", message="invalid URL"
677677
ProxyURL string `json:"proxyURL,omitempty"`
678+
679+
// LinePerEvent uses NDJSON instead of JSON to send data to remote destination.
680+
//
681+
// +kubebuilder:validation:Optional
682+
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Line Per Event"
683+
LinePerEvent bool `json:"line_per_event,omitempty"`
678684
}
679685

680686
type KafkaTuningSpec struct {

config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2465,6 +2465,10 @@ spec:
24652465
description: Headers specify optional headers to be sent
24662466
with the request
24672467
type: object
2468+
line_per_event:
2469+
description: LinePerEvent uses NDJSON instead of JSON to
2470+
send data to remote destination.
2471+
type: boolean
24682472
method:
24692473
description: Method specifies the HTTP method to be used
24702474
for sending logs. If not set, 'POST' is used.

docs/reference/operator/api_observability_v1.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2297,6 +2297,8 @@ The 'username@password' part of `url` is ignored.
22972297

22982298
|headers|object| Headers specify optional headers to be sent with the request
22992299

2300+
|line_per_event|bool| LinePerEvent uses NDJSON instead of JSON to send data to remote destination.
2301+
23002302
|method|string| Method specifies the HTTP method to be used for sending logs. If not set, 'POST' is used.
23012303

23022304
|proxyURL|string| ProxyURL URL of a HTTP or HTTPS proxy to be used instead of direct connection.

internal/generator/vector/output/http/http.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414
)
1515

1616
type Http struct {
17-
ComponentID string
18-
Inputs string
19-
URI string
20-
Method string
21-
Proxy string
17+
ComponentID string
18+
Inputs string
19+
URI string
20+
Method string
21+
Proxy string
22+
LinePerEvent bool
2223
common.RootMixin
2324
}
2425

@@ -33,6 +34,9 @@ type = "http"
3334
inputs = {{.Inputs}}
3435
uri = "{{.URI}}"
3536
method = "{{.Method}}"
37+
{{with .LinePerEvent}}
38+
framing.method = "newline_delimited"
39+
{{end}}
3640
{{with .Proxy -}}
3741
proxy.enabled = true
3842
proxy.http = "{{.}}"
@@ -76,12 +80,13 @@ func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Sec
7680

7781
func Output(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, op Options) *Http {
7882
return &Http{
79-
ComponentID: id,
80-
Inputs: vectorhelpers.MakeInputs(inputs...),
81-
URI: o.HTTP.URL,
82-
Method: Method(o.HTTP),
83-
Proxy: o.HTTP.ProxyURL,
84-
RootMixin: common.NewRootMixin(nil),
83+
ComponentID: id,
84+
Inputs: vectorhelpers.MakeInputs(inputs...),
85+
URI: o.HTTP.URL,
86+
Method: Method(o.HTTP),
87+
Proxy: o.HTTP.ProxyURL,
88+
LinePerEvent: o.HTTP.LinePerEvent,
89+
RootMixin: common.NewRootMixin(nil),
8590
}
8691
}
8792

internal/generator/vector/output/http/http_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ var _ = Describe("Generate vector config", func() {
143143
BaseOutputTuningSpec: *baseTune,
144144
}
145145
}, secrets, true, framework.NoOptions, "http_with_tuning.toml"),
146+
Entry("with ndjson", func(spec *obs.OutputSpec) {
147+
spec.HTTP.LinePerEvent = true
148+
}, secrets, true, framework.NoOptions, "http_with_ndjson.toml"),
146149
Entry("with proxy", func(spec *obs.OutputSpec) {
147150
spec.HTTP.ProxyURL = "http://somewhere.org/proxy"
148151
spec.HTTP.Headers = nil
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[sinks.http_receiver]
2+
type = "http"
3+
inputs = ["application"]
4+
uri = "https://my-logstore.com"
5+
method = "post"
6+
framing.method = "newline_delimited"
7+
8+
[sinks.http_receiver.encoding]
9+
codec = "json"
10+
except_fields = ["_internal"]
11+
12+
[sinks.http_receiver.request]
13+
headers = {"h1"="v1","h2"="v2"}
14+
15+
[sinks.http_receiver.auth]
16+
strategy = "basic"
17+
user = "SECRET[kubernetes_secret.http-receiver/username]"
18+
password = "SECRET[kubernetes_secret.http-receiver/password]"
19+
20+

test/framework/functional/framework.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,14 @@ func (f *CollectorFunctionalFramework) addOutputContainers(b *runtime.PodBuilder
437437
}
438438
}
439439
case obs.OutputTypeHTTP:
440-
if err := f.AddVectorHttpOutput(b, output); err != nil {
441-
return err
440+
if output.HTTP.LinePerEvent {
441+
if err := f.AddVLOutput(b, output, nil); err != nil {
442+
return err
443+
}
444+
} else {
445+
if err := f.AddVectorHttpOutput(b, output); err != nil {
446+
return err
447+
}
442448
}
443449
case obs.OutputTypeSplunk:
444450
if err := f.AddSplunkOutput(b, output); err != nil {

test/framework/functional/output_victorialogs.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,27 @@ func (f *CollectorFunctionalFramework) AddVLOutput(b *runtime.PodBuilder, output
1919
log.V(2).Info("Adding output for victorialogs", "name", output.Name)
2020
name := strings.ToLower(output.Name)
2121

22-
esURL, err := url.Parse(output.Elasticsearch.URL)
23-
if err != nil {
24-
return err
22+
port := "9428"
23+
switch output.Type {
24+
case obs.OutputTypeElasticsearch:
25+
u, err := url.Parse(output.Elasticsearch.URL)
26+
if err != nil {
27+
return err
28+
}
29+
port = u.Port()
30+
case obs.OutputTypeHTTP:
31+
u, err := url.Parse(output.HTTP.URL)
32+
if err != nil {
33+
return err
34+
}
35+
port = u.Port()
2536
}
2637

2738
log.V(2).Info("Adding container", "name", name)
28-
log.V(2).Info("Adding VictoriaLogs output container", "name", obs.OutputTypeElasticsearch)
39+
log.V(2).Info("Adding VictoriaLogs output container", "name", output.Type)
2940

3041
cmdArgs := []string{
31-
"-httpListenAddr=:" + esURL.Port(),
42+
"-httpListenAddr=:" + port,
3243
"-storageDataPath=/tmp/logs",
3344
}
3445
if len(args) > 0 {
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package http
2+
3+
import (
4+
"sort"
5+
"strings"
6+
"time"
7+
8+
. "github.com/onsi/ginkgo/v2"
9+
. "github.com/onsi/gomega"
10+
11+
obs "github.com/openshift/cluster-logging-operator/api/observability/v1"
12+
"github.com/openshift/cluster-logging-operator/test/framework/functional"
13+
14+
"github.com/openshift/cluster-logging-operator/test/helpers/types"
15+
obstestruntime "github.com/openshift/cluster-logging-operator/test/runtime/observability"
16+
)
17+
18+
var _ = Describe("[Functional][Outputs][HTTP] Logforwarding to VictoriaLogs", func() {
19+
20+
var (
21+
framework *functional.CollectorFunctionalFramework
22+
23+
// Template expected as output Log
24+
outputLogTemplate = functional.NewApplicationLogTemplate()
25+
)
26+
27+
Context("should write to victorialogs", func() {
28+
DescribeTable("with custom headers", func(headers map[string]string) {
29+
outputLogTemplate.ViaqIndexName = "app-write"
30+
framework = functional.NewCollectorFunctionalFramework()
31+
obstestruntime.NewClusterLogForwarderBuilder(framework.Forwarder).
32+
FromInput(obs.InputTypeApplication).
33+
ToHttpOutput(func(output *obs.OutputSpec) {
34+
output.HTTP.URL = "http://0.0.0.0:9428/insert/jsonline"
35+
output.HTTP.LinePerEvent = true
36+
output.HTTP.Headers = headers
37+
})
38+
defer framework.Cleanup()
39+
Expect(framework.Deploy()).To(BeNil())
40+
timestamp := functional.CRIOTime(time.Now())
41+
ukr := "привіт "
42+
jp := "こんにちは "
43+
ch := "你好"
44+
msg := functional.NewCRIOLogMessage(timestamp, ukr+jp+ch, false)
45+
Expect(framework.WriteMessagesToApplicationLog(msg, 10)).To(BeNil())
46+
Expect(framework.WriteMessagesWithNotUTF8SymbolsToLog()).To(BeNil())
47+
requestHeaders := map[string]string{
48+
"AccountID": "0",
49+
"ProjectID": "0",
50+
}
51+
for headerName := range requestHeaders {
52+
if v, ok := headers[headerName]; ok {
53+
requestHeaders[headerName] = v
54+
}
55+
}
56+
raw, err := framework.GetLogsFromVL(string(obs.OutputTypeHTTP), requestHeaders)
57+
Expect(err).To(BeNil(), "Expected no errors reading the logs")
58+
Expect(raw).To(Not(BeEmpty()))
59+
// Parse log line
60+
var logs []map[string]string
61+
err = types.StrictlyParseLogsFromSlice(raw, &logs)
62+
Expect(err).To(BeNil(), "Expected no errors parsing the logs")
63+
Expect(len(logs)).To(Equal(11))
64+
//sort log by time before matching
65+
sort.Slice(logs, func(i, j int) bool {
66+
return strings.Compare(logs[i]["_time"], logs[j]["_time"]) < 0
67+
})
68+
69+
Expect(logs[0]["_msg"]).To(Equal(ukr + jp + ch))
70+
Expect(logs[10]["_msg"]).To(Equal("������������"))
71+
},
72+
Entry("Non-default tenant ID", map[string]string{
73+
"AccountID": "10",
74+
"ProjectID": "10",
75+
"VL-Msg-Field": "message",
76+
}),
77+
)
78+
})
79+
})

0 commit comments

Comments
 (0)