From 6d5bb35a394b1fa3c8358fd431f1073fd7c8ad88 Mon Sep 17 00:00:00 2001 From: shverma Date: Tue, 9 Dec 2025 21:41:22 +0530 Subject: [PATCH] Fix `fialed to watch` context canceled error when streaming logs When streaming pipelineRun or taskRun logs, closing the stop channel to terminate the informer was causing "Failed to watch" error messages to be logged with "context canceled". This patch add custom watchErrorHandler function that filter out context.Canceled errors while passing other errors to the default handler. This prevents the error log messages when the CLI intentionally stops watching after a run completes. Signed-off-by: Shiv Verma --- pkg/pipelinerun/tracker.go | 14 ++++++++++++++ pkg/pipelinerun/tracker_test.go | 27 +++++++++++++++++++++++++++ pkg/pods/pod.go | 17 ++++++++++++++++- pkg/pods/pod_test.go | 27 +++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 1 deletion(-) diff --git a/pkg/pipelinerun/tracker.go b/pkg/pipelinerun/tracker.go index a2dc2cdde1..cac39174c6 100644 --- a/pkg/pipelinerun/tracker.go +++ b/pkg/pipelinerun/tracker.go @@ -16,6 +16,7 @@ package pipelinerun import ( "context" + "errors" "sync" "time" @@ -68,6 +69,11 @@ func (t *Tracker) Monitor(allowed []string) <-chan []taskrunpkg.Run { genericInformer, _ := factory.ForResource(*gvr) informer := genericInformer.Informer() + + // Set a custom watch error handler that ignores context.Canceled errors + // to prevent "Failed to watch" log messages when the informer is stopped intentionally + _ = informer.SetWatchErrorHandlerWithContext(watchErrorHandler) + mu := &sync.Mutex{} stopC := make(chan struct{}) trC := make(chan []taskrunpkg.Run) @@ -154,7 +160,15 @@ func pipelinerunOpts(name string) func(opts *metav1.ListOptions) { return func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String() } +} +// watchErrorHandler is a custom watch error handler that filters out context.Canceled errors +// to prevent "Failed to watch" log messages when the informer is stopped intentionally. +// Other errors are passed to the default handler. +func watchErrorHandler(ctx context.Context, r *cache.Reflector, err error) { + if !errors.Is(err, context.Canceled) { + cache.DefaultWatchErrorHandler(ctx, r, err) + } } // handles changes to pipelinerun and pushes the Run information to the diff --git a/pkg/pipelinerun/tracker_test.go b/pkg/pipelinerun/tracker_test.go index c8ee96ef07..49d5822cd8 100644 --- a/pkg/pipelinerun/tracker_test.go +++ b/pkg/pipelinerun/tracker_test.go @@ -15,6 +15,8 @@ package pipelinerun import ( + "context" + "errors" "fmt" "testing" "time" @@ -239,3 +241,28 @@ func startPipelineRun(t *testing.T, data pipelinetest.Data, prStatus ...v1.Pipel Dynamic: dynamic, } } + +func TestTracker_watchErrorHandler(t *testing.T) { + tests := []struct { + name string + err error + }{ + { + name: "context.Canceled should be filtered", + err: context.Canceled, + }, + { + name: "wrapped context.Canceled should be filtered", + err: errors.Join(errors.New("watch failed"), context.Canceled), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(_ *testing.T) { + // Call watchErrorHandler with context.Canceled errors + // These should be filtered (not passed to DefaultWatchErrorHandler) + // so passing nil reflector is safe + watchErrorHandler(context.Background(), nil, tt.err) + }) + } +} diff --git a/pkg/pods/pod.go b/pkg/pods/pod.go index d1da2a7bd6..b931184180 100644 --- a/pkg/pods/pod.go +++ b/pkg/pods/pod.go @@ -16,6 +16,7 @@ package pods import ( "context" + "errors" "fmt" "io" "sync" @@ -115,7 +116,12 @@ func (p *Pod) watcher(stopC chan struct{}, result *podResult, mu *sync.Mutex) { } } - _, err := factory.Core().V1().Pods().Informer().AddEventHandler( + informer := factory.Core().V1().Pods().Informer() + // Set a custom watch error handler that ignores context.Canceled errors + // to prevent "Failed to watch" log messages when the informer is stopped intentionally + _ = informer.SetWatchErrorHandlerWithContext(watchErrorHandler) + + _, err := informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { select { @@ -156,6 +162,15 @@ func podOpts(name string) func(opts *metav1.ListOptions) { } } +// watchErrorHandler is a custom watch error handler that filters out context.Canceled errors +// to prevent "Failed to watch" log messages when the informer is stopped intentionally. +// Other errors are passed to the default handler. +func watchErrorHandler(ctx context.Context, r *cache.Reflector, err error) { + if !errors.Is(err, context.Canceled) { + cache.DefaultWatchErrorHandler(ctx, r, err) + } +} + func checkPodStatus(obj interface{}) (*corev1.Pod, error) { pod, ok := obj.(*corev1.Pod) if !ok { diff --git a/pkg/pods/pod_test.go b/pkg/pods/pod_test.go index 438cef61ab..02879d7a2a 100644 --- a/pkg/pods/pod_test.go +++ b/pkg/pods/pod_test.go @@ -15,6 +15,8 @@ package pods import ( + "context" + "errors" "testing" "time" @@ -212,3 +214,28 @@ func simulateDeleteWatch(t *testing.T, initial *corev1.Pod, later *corev1.Pod) k return clients.Kube } + +func Test_watchErrorHandler(t *testing.T) { + tests := []struct { + name string + err error + }{ + { + name: "context.Canceled should be filtered", + err: context.Canceled, + }, + { + name: "wrapped context.Canceled should be filtered", + err: errors.Join(errors.New("watch failed"), context.Canceled), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(_ *testing.T) { + // Call watchErrorHandler with context.Canceled errors + // These should be filtered (not passed to DefaultWatchErrorHandler) + // so passing nil reflector is safe + watchErrorHandler(context.Background(), nil, tt.err) + }) + } +}