Skip to content

Commit e73dea7

Browse files
authored
Add Streaming Support for Kubernetes Logs (#63)
If a user selects the "Kubernetes: Logs" query type, it is now possible to stream logs in real-time.
1 parent bee3d30 commit e73dea7

File tree

5 files changed

+329
-4
lines changed

5 files changed

+329
-4
lines changed

pkg/kubernetes/client.go

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type Client interface {
4444
GetResources(ctx context.Context, user string, groups []string, resourceId, namespace, parameterName, parameterValue string, wide bool) (*data.Frame, error)
4545
GetContainers(ctx context.Context, user string, groups []string, resourceId, namespace, name string) (*data.Frame, error)
4646
GetLogs(ctx context.Context, user string, groups []string, resourceId, namespace, name, container, filter string, tail int64, previous bool, timeRange backend.TimeRange) (*data.Frame, error)
47+
StreamLogs(ctx context.Context, user string, groups []string, resourceId, namespace, name, container, filter string, sender *backend.StreamSender) error
4748
GetResource(ctx context.Context, resourceId string) (*Resource, error)
4849
Proxy(user string, groups []string, requestUrl string, w http.ResponseWriter, r *http.Request)
4950
}
@@ -466,12 +467,23 @@ func (c *client) GetLogs(ctx context.Context, user string, groups []string, reso
466467
var bodys []string
467468
var labels []json.RawMessage
468469

469-
r, _ := regexp.Compile(filter)
470+
var r *regexp.Regexp
471+
if filter != "" {
472+
r, err = regexp.Compile(filter)
473+
if err != nil {
474+
span.RecordError(err)
475+
span.SetStatus(codes.Error, err.Error())
476+
return nil, err
477+
}
478+
}
470479

471480
for _, stream := range streams {
472481
scanner := bufio.NewScanner(stream.Stream)
473482
for scanner.Scan() {
474483
parts := strings.SplitN(scanner.Text(), " ", 2)
484+
if len(parts) != 2 {
485+
continue
486+
}
475487

476488
timestamp, err := time.Parse(time.RFC3339Nano, parts[0])
477489
if err != nil {
@@ -481,12 +493,16 @@ func (c *client) GetLogs(ctx context.Context, user string, groups []string, reso
481493
continue
482494
}
483495

496+
if r != nil && !r.MatchString(parts[1]) {
497+
continue
498+
}
499+
484500
var label json.RawMessage
485501
if err := json.Unmarshal([]byte(strings.Replace(parts[1], "{", `{"pod": "`+stream.Pod+`", `, 1)), &label); err != nil {
486502
label = json.RawMessage(fmt.Sprintf(`{"pod": "%s"}`, stream.Pod))
487503
}
488504

489-
if timestamp.After(timeRange.From) && timestamp.Before(timeRange.To) && r.MatchString(parts[1]) {
505+
if timestamp.After(timeRange.From) && timestamp.Before(timeRange.To) {
490506
timestamps = append(timestamps, timestamp)
491507
bodys = append(bodys, parts[1])
492508
labels = append(labels, label)
@@ -515,6 +531,131 @@ func (c *client) GetLogs(ctx context.Context, user string, groups []string, reso
515531
return frame, nil
516532
}
517533

534+
// StreamLogs streams the logs for the requested resource as data frame. If the
535+
// resource is a pod the logs for the pod are streamed. If the resource is a
536+
// daemonset, deployment, job or statefulset, the logs for all pods belonging to
537+
// the resource are streamed.
538+
//
539+
// The logs are fetched in parallel for all pods and sent to the stream sender.
540+
// Each log line is prefixed with a timestamp in RFC3339Nano format and is split
541+
// into two fields: "timestamp" and "body". The "body" field contains the log
542+
// line itself.
543+
//
544+
// If the log line is a JSON object, the log line is parsed and stored in the
545+
// "labels" field as JSON object.
546+
//
547+
// The filter parameter is a regular expression that is used to filter the log
548+
// lines. Only log lines that match the regular expression are sent to the
549+
// stream sender.
550+
func (c *client) StreamLogs(ctx context.Context, user string, groups []string, resourceId, namespace, name, container, filter string, sender *backend.StreamSender) error {
551+
ctx, span := tracing.DefaultTracer().Start(ctx, "StreamLogs")
552+
defer span.End()
553+
span.SetAttributes(attribute.Key("user").String(user))
554+
span.SetAttributes(attribute.Key("groups").StringSlice(groups))
555+
span.SetAttributes(attribute.Key("resourceId").String(resourceId))
556+
span.SetAttributes(attribute.Key("namespace").String(namespace))
557+
span.SetAttributes(attribute.Key("name").String(name))
558+
span.SetAttributes(attribute.Key("container").String(container))
559+
span.SetAttributes(attribute.Key("filter").String(filter))
560+
561+
// Get the pods for the requested resource.
562+
pods, _, err := c.getPodsAndContainers(ctx, user, groups, resourceId, namespace, name)
563+
if err != nil {
564+
span.RecordError(err)
565+
span.SetStatus(codes.Error, err.Error())
566+
return err
567+
}
568+
569+
var r *regexp.Regexp
570+
if filter != "" {
571+
r, err = regexp.Compile(filter)
572+
if err != nil {
573+
span.RecordError(err)
574+
span.SetStatus(codes.Error, err.Error())
575+
return err
576+
}
577+
}
578+
579+
var streamsWG sync.WaitGroup
580+
streamsWG.Add(len(pods))
581+
582+
for _, pod := range pods {
583+
go func(pod string) {
584+
defer streamsWG.Done()
585+
586+
options := &corev1.PodLogOptions{
587+
Container: container,
588+
Timestamps: true,
589+
Follow: true,
590+
}
591+
592+
stream, err := c.clientset.CoreV1().Pods(namespace).GetLogs(pod, options).Stream(ctx)
593+
if err != nil {
594+
span.RecordError(err)
595+
span.SetStatus(codes.Error, err.Error())
596+
c.logger.Error("Failed to get stream", "error", err.Error())
597+
return
598+
}
599+
defer stream.Close()
600+
601+
scanner := bufio.NewScanner(stream)
602+
for scanner.Scan() {
603+
select {
604+
case <-ctx.Done():
605+
return
606+
default:
607+
parts := strings.SplitN(scanner.Text(), " ", 2)
608+
if len(parts) != 2 {
609+
continue
610+
}
611+
612+
timestamp, err := time.Parse(time.RFC3339Nano, parts[0])
613+
if err != nil {
614+
span.RecordError(err)
615+
span.SetStatus(codes.Error, err.Error())
616+
c.logger.Error("Failed to parse timestamp", "error", err.Error())
617+
continue
618+
}
619+
620+
if r != nil && !r.MatchString(parts[1]) {
621+
continue
622+
}
623+
624+
var label json.RawMessage
625+
if err := json.Unmarshal([]byte(strings.Replace(parts[1], "{", `{"pod": "`+pod+`", `, 1)), &label); err != nil {
626+
label = json.RawMessage(fmt.Sprintf(`{"pod": "%s"}`, pod))
627+
}
628+
629+
frame := data.NewFrame(
630+
"Logs",
631+
data.NewField("timestamp", nil, []time.Time{timestamp}),
632+
data.NewField("body", nil, []string{parts[1]}),
633+
data.NewField("labels", nil, []json.RawMessage{label}),
634+
)
635+
636+
frame.SetMeta(&data.FrameMeta{
637+
PreferredVisualization: data.VisTypeLogs,
638+
Type: data.FrameTypeLogLines,
639+
})
640+
641+
if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
642+
c.logger.Error("Failed to send frame", "error", err.Error())
643+
return
644+
}
645+
}
646+
}
647+
648+
if err := scanner.Err(); err != nil {
649+
span.RecordError(err)
650+
span.SetStatus(codes.Error, err.Error())
651+
}
652+
}(pod)
653+
}
654+
655+
streamsWG.Wait()
656+
return nil
657+
}
658+
518659
// GetResource returns the resource for the given resource ID from the cache. If
519660
// the resource is not found in the cache, an error is returned.
520661
func (c *client) GetResource(ctx context.Context, resourceId string) (*Resource, error) {

pkg/kubernetes/client_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/plugin/datasource.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package plugin
22

33
import (
44
"context"
5+
"encoding/json"
56
"net/http"
67
"time"
78

@@ -16,6 +17,7 @@ import (
1617
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
1718
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
1819
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
20+
"go.opentelemetry.io/otel/attribute"
1921
"go.opentelemetry.io/otel/codes"
2022
)
2123

@@ -29,6 +31,7 @@ var (
2931
_ backend.CheckHealthHandler = (*Datasource)(nil)
3032
_ backend.QueryDataHandler = (*Datasource)(nil)
3133
_ backend.CallResourceHandler = (*Datasource)(nil)
34+
_ backend.StreamHandler = (*Datasource)(nil)
3235
_ instancemgmt.InstanceDisposer = (*Datasource)(nil)
3336
)
3437

@@ -194,6 +197,111 @@ func (d *Datasource) CallResource(ctx context.Context, req *backend.CallResource
194197
return d.resourceHandler.CallResource(ctx, req, sender)
195198
}
196199

200+
// SubscribeStream is called when a client wants to connect to a stream. As soon
201+
// as first subscriber joins channel "RunStream" will be called.
202+
//
203+
// Before a user can subscribe to a stream, we verify that the user has access
204+
// to the stream / logs he wants to subscribe to.
205+
func (d *Datasource) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
206+
_, span := tracing.DefaultTracer().Start(ctx, "SubscribeStream")
207+
defer span.End()
208+
209+
d.logger.Debug("SubscribeStream", "path", req.Path)
210+
span.SetAttributes(attribute.Key("path").String(req.Path))
211+
212+
user, err := d.grafanaClient.GetImpersonateUser(ctx, req.GetHTTPHeaders())
213+
if err != nil {
214+
d.logger.Error("Failed to get user", "error", err.Error())
215+
span.RecordError(err)
216+
span.SetStatus(codes.Error, err.Error())
217+
return nil, err
218+
}
219+
220+
groups, err := d.grafanaClient.GetImpersonateGroups(ctx, req.GetHTTPHeaders())
221+
if err != nil {
222+
d.logger.Error("Failed to get groups", "error", err.Error())
223+
span.RecordError(err)
224+
span.SetStatus(codes.Error, err.Error())
225+
return nil, err
226+
}
227+
228+
var qm models.QueryModelKubernetesLogs
229+
err = json.Unmarshal(req.Data, &qm)
230+
if err != nil {
231+
d.logger.Error("Failed to unmarshal query model", "error", err.Error())
232+
span.RecordError(err)
233+
span.SetStatus(codes.Error, err.Error())
234+
return nil, err
235+
}
236+
237+
d.logger.Info("SubscribeStream request", "user", user, "groups", groups, "resourceId", qm.ResourceId, "namespace", qm.Namespace, "name", qm.Name, "container", qm.Container, "filter", qm.Filter)
238+
span.SetAttributes(attribute.Key("user").String(user))
239+
span.SetAttributes(attribute.Key("groups").StringSlice(groups))
240+
span.SetAttributes(attribute.Key("resourceId").String(qm.ResourceId))
241+
span.SetAttributes(attribute.Key("namespace").String(qm.Namespace))
242+
span.SetAttributes(attribute.Key("name").String(qm.Name))
243+
244+
_, err = d.kubeClient.GetContainers(ctx, user, groups, qm.ResourceId, qm.Namespace, qm.Name)
245+
if err != nil {
246+
return &backend.SubscribeStreamResponse{
247+
Status: backend.SubscribeStreamStatusPermissionDenied,
248+
}, nil
249+
}
250+
251+
return &backend.SubscribeStreamResponse{
252+
Status: backend.SubscribeStreamStatusOK,
253+
}, nil
254+
}
255+
256+
// RunStream is called once for any open channel. It handles all the streaming
257+
// logic for the channel and sends data to the sender as needed, via the
258+
// "StreamLogs" method of the Kubernetes client.
259+
//
260+
// We do not pass the user and groups to the "StreamLogs" method here, because
261+
// multiple we can have multiple subscribers on the same stream and each
262+
// subscriber might have a different user and groups. Therefore we extract the
263+
// user and groups inside the "StreamLogs" method for each HTTP request.
264+
func (d *Datasource) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
265+
_, span := tracing.DefaultTracer().Start(ctx, "RunStream")
266+
defer span.End()
267+
268+
d.logger.Debug("RunStream", "path", req.Path)
269+
span.SetAttributes(attribute.Key("path").String(req.Path))
270+
271+
var qm models.QueryModelKubernetesLogs
272+
err := json.Unmarshal(req.Data, &qm)
273+
if err != nil {
274+
d.logger.Error("Failed to unmarshal query model", "error", err.Error())
275+
span.RecordError(err)
276+
span.SetStatus(codes.Error, err.Error())
277+
return err
278+
}
279+
280+
d.logger.Info("RunStream request", "resourceId", qm.ResourceId, "namespace", qm.Namespace, "name", qm.Name, "container", qm.Container, "filter", qm.Filter)
281+
span.SetAttributes(attribute.Key("resourceId").String(qm.ResourceId))
282+
span.SetAttributes(attribute.Key("namespace").String(qm.Namespace))
283+
span.SetAttributes(attribute.Key("name").String(qm.Name))
284+
span.SetAttributes(attribute.Key("container").String(qm.Container))
285+
span.SetAttributes(attribute.Key("filter").String(qm.Filter))
286+
287+
return d.kubeClient.StreamLogs(ctx, "", nil, qm.ResourceId, qm.Namespace, qm.Name, qm.Container, qm.Filter, sender)
288+
}
289+
290+
// PublishStream is called when a client sends a message to the stream. Since
291+
// this datasource does not support publishing to streams, we return permission
292+
// denied response.
293+
func (d *Datasource) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
294+
_, span := tracing.DefaultTracer().Start(ctx, "PublishStream")
295+
defer span.End()
296+
297+
d.logger.Debug("PublishStream", "path", req.Path)
298+
span.SetAttributes(attribute.Key("path").String(req.Path))
299+
300+
return &backend.PublishStreamResponse{
301+
Status: backend.PublishStreamStatusPermissionDenied,
302+
}, nil
303+
}
304+
197305
// Dispose here tells plugin SDK that plugin wants to clean up resources when a
198306
// new instance created. As soon as datasource settings change detected by SDK
199307
// old datasource instance will be disposed and a new one will be created using

0 commit comments

Comments
 (0)