Skip to content

Commit 7c5fc77

Browse files
nixpanicmergify[bot]
authored andcommitted
sidecar: add Volume Condition Reporter
Signed-off-by: Niels de Vos <ndevos@ibm.com>
1 parent c1a02fd commit 7c5fc77

File tree

1 file changed

+199
-0
lines changed

1 file changed

+199
-0
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
Copyright 2025 The Kubernetes-CSI-Addons Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package condition
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/client-go/kubernetes"
26+
"k8s.io/klog/v2"
27+
28+
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/volume-condition/node"
29+
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/volume-condition/platform"
30+
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/volume-condition/volume"
31+
)
32+
33+
// VolumeConditionReporter provides the entrypoint for running the volume
34+
// condition checks and reporting the results.
35+
type VolumeConditionReporter interface {
36+
// Run starts the checking of volumes at the given interval. This
37+
// function is not expected to ever end, only if there is a critical
38+
// error.
39+
Run(ctx context.Context, interval time.Duration) error
40+
}
41+
42+
type volumeConditionReporter struct {
43+
client *kubernetes.Clientset
44+
45+
driver volume.Driver
46+
localNode node.Node
47+
recorders []conditionRecorder
48+
49+
// conditionCache tracks the condition by volume-handle
50+
conditionCache map[string]volume.VolumeCondition
51+
}
52+
53+
// NewVolumeConditionReporter creates a new VolumeConditionReporter. The
54+
// drivername that is passed is used to find the matching CSI-driver on the
55+
// local node. Multiple RecorderOptions can be passed, which will be used
56+
// to report the volume condition.
57+
func NewVolumeConditionReporter(
58+
ctx context.Context,
59+
client *kubernetes.Clientset,
60+
hostname, drivername string,
61+
recorderOptions []RecorderOption,
62+
) (VolumeConditionReporter, error) {
63+
drv, err := volume.FindDriver(ctx, drivername)
64+
if err != nil {
65+
return nil, fmt.Errorf("failed to find driver %q: %w", drivername, err)
66+
}
67+
68+
if !drv.SupportsVolumeCondition() {
69+
return nil, fmt.Errorf("driver %q does not support volume-condition", drivername)
70+
}
71+
72+
n, err := node.NewNode(ctx, client, hostname)
73+
if err != nil {
74+
return nil, fmt.Errorf("failed to get node %q: %w", hostname, err)
75+
}
76+
77+
// TODO: use options for the recorders
78+
recorders := make([]conditionRecorder, len(recorderOptions))
79+
for i, opt := range recorderOptions {
80+
var rec conditionRecorder
81+
rec, err = opt.newRecorder(client, hostname)
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create recorder: %w", err)
84+
}
85+
86+
recorders[i] = rec
87+
}
88+
89+
return &volumeConditionReporter{
90+
client: client,
91+
recorders: recorders,
92+
driver: drv,
93+
localNode: n,
94+
conditionCache: make(map[string]volume.VolumeCondition, 0),
95+
}, nil
96+
}
97+
98+
// Run starts the volume condition reporter. This function never returns, only
99+
// if a critical error occurs.
100+
// Only new/updated volume conditions are reported, this prevents noise of
101+
// recurring conditions.
102+
func (cvr *volumeConditionReporter) Run(ctx context.Context, interval time.Duration) error {
103+
running := time.Tick(interval)
104+
if running == nil {
105+
return fmt.Errorf("interval %v is invalid", interval)
106+
}
107+
108+
for range running {
109+
volumes, err := cvr.localNode.ListCSIVolumes(ctx)
110+
if err != nil {
111+
return fmt.Errorf("failed to list volumes: %w", err)
112+
}
113+
114+
for _, v := range volumes {
115+
if v.GetDriver() != cvr.driver.GetDrivername() {
116+
continue
117+
}
118+
119+
vc, err := cvr.driver.GetVolumeCondition(v)
120+
if err != nil {
121+
klog.Errorf("failed to check if %q is healthy: %v", v.GetVolumeID(), err)
122+
continue
123+
} else if vc == nil {
124+
klog.Errorf(
125+
"driver %q did not return a volume condition for volume %q",
126+
cvr.driver.GetDrivername(),
127+
v,
128+
)
129+
continue
130+
}
131+
132+
if !cvr.isUpdatedVolumeCondition(v.GetVolumeID(), vc) {
133+
// skip recording if there is no update
134+
continue
135+
}
136+
137+
cvr.recordVolumeCondition(ctx, v.GetVolumeID(), vc)
138+
}
139+
140+
cvr.pruneConditionCache(volumes)
141+
}
142+
143+
return nil
144+
}
145+
146+
// isUpdatedVolumeCondition checks the conditionCache, and updates it in case
147+
// the passed vc is assumed to be an update.
148+
func (cvr *volumeConditionReporter) isUpdatedVolumeCondition(volumeID string, vc volume.VolumeCondition) bool {
149+
lastCondition := cvr.conditionCache[volumeID]
150+
if lastCondition == nil ||
151+
(lastCondition.IsHealthy() != vc.IsHealthy() || lastCondition.GetMessage() != vc.GetMessage()) {
152+
// vc is an update, store in the cache
153+
cvr.conditionCache[volumeID] = vc
154+
return true
155+
}
156+
157+
return false
158+
}
159+
160+
// pruneConditionCache removed inactive volume-handles from the
161+
// cvr.conditionCache. This is done by creating a new cache, and have the old
162+
// cvr.conditionCache get garbage collected.
163+
func (cvr *volumeConditionReporter) pruneConditionCache(volumes []volume.CSIVolume) {
164+
newConditionCache := make(map[string]volume.VolumeCondition, len(volumes))
165+
for _, v := range volumes {
166+
volumeID := v.GetVolumeID()
167+
newConditionCache[volumeID] = cvr.conditionCache[volumeID]
168+
}
169+
170+
cvr.conditionCache = newConditionCache
171+
}
172+
173+
// recordVolumeCondition resolves the PersistentVolume from the given volumeID
174+
// and loops through all recorders to report the volume condition.
175+
func (cvr *volumeConditionReporter) recordVolumeCondition(ctx context.Context, volumeID string, vc volume.VolumeCondition) {
176+
pvName, err := platform.GetPlatform().ResolvePersistentVolumeName(cvr.driver.GetDrivername(), volumeID)
177+
if err != nil {
178+
klog.Errorf("failed to resolve persistent volume name: %v", err)
179+
return
180+
}
181+
182+
pv, err := cvr.client.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
183+
if err != nil {
184+
klog.Errorf("failed to get persistent volume %q: %v", pvName, err)
185+
return
186+
}
187+
188+
for _, recorder := range cvr.recorders {
189+
err = recorder.record(ctx, pv, vc)
190+
if err != nil {
191+
klog.Warningf(
192+
"%T failed to record volume condition for persistent volume %q: %v",
193+
recorder,
194+
pv.Name,
195+
err,
196+
)
197+
}
198+
}
199+
}

0 commit comments

Comments
 (0)