From 69886261e7be13d2984aa76e23a965a5ffd7946a Mon Sep 17 00:00:00 2001 From: Simon Davies Date: Wed, 7 Jan 2026 22:38:13 +0000 Subject: [PATCH 1/2] use fsnotify and klog Signed-off-by: Simon Davies --- device-plugin/go.mod | 3 + device-plugin/go.sum | 4 ++ device-plugin/main.go | 125 +++++++++++++++++++++++++++++++----------- 3 files changed, 99 insertions(+), 33 deletions(-) diff --git a/device-plugin/go.mod b/device-plugin/go.mod index 59cdef7..2bec215 100644 --- a/device-plugin/go.mod +++ b/device-plugin/go.mod @@ -3,11 +3,14 @@ module github.com/hyperlight-dev/hyperlight-device-plugin go 1.25.0 require ( + github.com/fsnotify/fsnotify v1.8.0 google.golang.org/grpc v1.78.0 + k8s.io/klog/v2 v2.130.1 k8s.io/kubelet v0.35.0 ) require ( + github.com/go-logr/logr v1.4.3 // indirect golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/text v0.32.0 // indirect diff --git a/device-plugin/go.sum b/device-plugin/go.sum index bb2273e..8a16f52 100644 --- a/device-plugin/go.sum +++ b/device-plugin/go.sum @@ -1,3 +1,5 @@ +github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= +github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -34,5 +36,7 @@ google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= +k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kubelet v0.35.0 h1:8cgJHCBCKLYuuQ7/Pxb/qWbJfX1LXIw7790ce9xHq7c= k8s.io/kubelet v0.35.0/go.mod h1:ciRzAXn7C4z5iB7FhG1L2CGPPXLTVCABDlbXt/Zz8YA= diff --git a/device-plugin/main.go b/device-plugin/main.go index 9c0a5c3..c6f3cff 100644 --- a/device-plugin/main.go +++ b/device-plugin/main.go @@ -18,8 +18,8 @@ package main import ( "context" + "flag" "fmt" - "log" "net" "os" "os/signal" @@ -28,8 +28,10 @@ import ( "syscall" "time" + "github.com/fsnotify/fsnotify" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "k8s.io/klog/v2" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" ) @@ -66,7 +68,7 @@ func NewHyperlightDevicePlugin() (*HyperlightDevicePlugin, error) { return nil, fmt.Errorf("no supported hypervisor found (/dev/kvm or /dev/mshv)") } - log.Printf("Detected hypervisor: %s at %s", hypervisor, devicePath) + klog.Infof("Detected hypervisor: %s at %s", hypervisor, devicePath) // Create CDI spec if err := writeCDISpec(hypervisor, devicePath); err != nil { @@ -84,7 +86,7 @@ func NewHyperlightDevicePlugin() (*HyperlightDevicePlugin, error) { if count, err := strconv.Atoi(countStr); err == nil && count > 0 { numDevices = count } else { - log.Printf("Invalid DEVICE_COUNT '%s', using default %d", countStr, defaultDeviceCount) + klog.Warningf("Invalid DEVICE_COUNT '%s', using default %d", countStr, defaultDeviceCount) } } @@ -95,7 +97,7 @@ func NewHyperlightDevicePlugin() (*HyperlightDevicePlugin, error) { Health: pluginapi.Healthy, } } - log.Printf("Advertising %d hypervisor devices (configurable via DEVICE_COUNT)", numDevices) + klog.Infof("Advertising %d hypervisor devices (configurable via DEVICE_COUNT)", numDevices) return &HyperlightDevicePlugin{ devices: devices, @@ -113,7 +115,7 @@ func writeCDISpec(hypervisor, devicePath string) error { if parsed, err := strconv.Atoi(uidStr); err == nil && parsed >= 0 { uid = parsed } else { - log.Printf("Invalid DEVICE_UID '%s', using default %d", uidStr, defaultDeviceUID) + klog.Warningf("Invalid DEVICE_UID '%s', using default %d", uidStr, defaultDeviceUID) } } @@ -122,11 +124,11 @@ func writeCDISpec(hypervisor, devicePath string) error { if parsed, err := strconv.Atoi(gidStr); err == nil && parsed >= 0 { gid = parsed } else { - log.Printf("Invalid DEVICE_GID '%s', using default %d", gidStr, defaultDeviceGID) + klog.Warningf("Invalid DEVICE_GID '%s', using default %d", gidStr, defaultDeviceGID) } } - log.Printf("CDI device ownership: uid=%d, gid=%d (configurable via DEVICE_UID/DEVICE_GID)", uid, gid) + klog.Infof("CDI device ownership: uid=%d, gid=%d (configurable via DEVICE_UID/DEVICE_GID)", uid, gid) spec := fmt.Sprintf(`{ "cdiVersion": "0.6.0", @@ -159,7 +161,7 @@ func writeCDISpec(hypervisor, devicePath string) error { if err := os.WriteFile(cdiSpecPath, []byte(spec), 0644); err != nil { return err } - log.Printf("CDI spec written to %s", cdiSpecPath) + klog.Infof("CDI spec written to %s", cdiSpecPath) return nil } @@ -173,7 +175,7 @@ func (p *HyperlightDevicePlugin) GetDevicePluginOptions(ctx context.Context, req // ListAndWatch lists devices and watches for changes func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginapi.DevicePlugin_ListAndWatchServer) error { - log.Printf("ListAndWatch called, sending %d devices", len(p.devices)) + klog.Infof("ListAndWatch called, sending %d devices", len(p.devices)) if err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { return err @@ -188,15 +190,19 @@ func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginap case <-p.stopCh: return nil case <-ticker.C: - health := pluginapi.Healthy + newHealth := pluginapi.Healthy if _, err := os.Stat(p.devicePath); err != nil { - health = pluginapi.Unhealthy - log.Printf("Device %s not found, marking unhealthy", p.devicePath) + newHealth = pluginapi.Unhealthy + klog.Warningf("Device %s not found, marking all devices unhealthy", p.devicePath) } - if p.devices[0].Health != health { - p.devices[0].Health = health - log.Printf("Device health changed to %s", health) + // Check if health changed (compare against first device as representative) + if p.devices[0].Health != newHealth { + // Update ALL devices - they all share the same underlying hypervisor device + for i := range p.devices { + p.devices[i].Health = newHealth + } + klog.Infof("Device health changed to %s for all %d devices", newHealth, len(p.devices)) if err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { return err } @@ -207,7 +213,7 @@ func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginap // Allocate allocates devices to a container func (p *HyperlightDevicePlugin) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { - log.Printf("Allocate called for %d containers", len(req.ContainerRequests)) + klog.V(2).Infof("Allocate called for %d containers", len(req.ContainerRequests)) responses := make([]*pluginapi.ContainerAllocateResponse, len(req.ContainerRequests)) @@ -220,7 +226,7 @@ func (p *HyperlightDevicePlugin) Allocate(ctx context.Context, req *pluginapi.Al }, }, } - log.Printf("Allocated CDI device: hyperlight.dev/hypervisor=%s", p.hypervisor) + klog.V(2).Infof("Allocated CDI device: hyperlight.dev/hypervisor=%s", p.hypervisor) } return &pluginapi.AllocateResponse{ContainerResponses: responses}, nil @@ -242,7 +248,7 @@ func (p *HyperlightDevicePlugin) Start() error { // Remove old socket if err := os.Remove(serverSock); err != nil && !os.IsNotExist(err) { - log.Printf("Warning: failed to remove old socket: %v", err) + klog.Warningf("Failed to remove old socket: %v", err) } listener, err := net.Listen("unix", serverSock) @@ -254,9 +260,9 @@ func (p *HyperlightDevicePlugin) Start() error { pluginapi.RegisterDevicePluginServer(p.server, p) go func() { - log.Printf("Starting gRPC server on %s", serverSock) + klog.Infof("Starting gRPC server on %s", serverSock) if err := p.server.Serve(listener); err != nil { - log.Printf("gRPC server stopped: %v", err) + klog.V(1).Infof("gRPC server stopped: %v", err) } }() @@ -297,7 +303,7 @@ func (p *HyperlightDevicePlugin) Register() error { return fmt.Errorf("failed to register with kubelet: %v", err) } - log.Printf("Registered with kubelet as %s", resourceName) + klog.Infof("Registered with kubelet as %s", resourceName) return nil } @@ -307,14 +313,65 @@ func (p *HyperlightDevicePlugin) Stop() { p.server.Stop() } os.Remove(serverSock) - log.Println("Device plugin stopped") + klog.Info("Device plugin stopped") } -// watchKubeletRestart monitors for kubelet restarts by watching the plugin socket. +// newFSWatcher creates a filesystem watcher for kubelet restart detection. +// This is the industry-standard approach used by NVIDIA, Intel, and other device plugins. +func newFSWatcher(files ...string) (*fsnotify.Watcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + for _, f := range files { + if err := watcher.Add(f); err != nil { + watcher.Close() + return nil, err + } + } + + return watcher, nil +} + +// watchKubeletRestart monitors for kubelet restarts using fsnotify. // When kubelet restarts, it deletes all sockets in /var/lib/kubelet/device-plugins/. -// This function blocks until the socket is deleted, signaling a kubelet restart. +// This function blocks until it detects a relevant filesystem event. func (p *HyperlightDevicePlugin) watchKubeletRestart() { - log.Println("Watching for kubelet restart (socket deletion)...") + klog.Info("Watching for kubelet restart using fsnotify...") + + watcher, err := newFSWatcher(pluginapi.DevicePluginPath) + if err != nil { + klog.Errorf("Failed to create fsnotify watcher, falling back to polling: %v", err) + p.watchKubeletRestartPolling() + return + } + defer watcher.Close() + + for { + select { + case <-p.stopCh: + return + case event := <-watcher.Events: + if event.Name == serverSock && (event.Op&fsnotify.Remove) == fsnotify.Remove { + klog.Info("Plugin socket deleted - kubelet may have restarted") + return + } + // Also watch for kubelet socket recreation (indicates kubelet restart complete) + if event.Name == kubeletSock && (event.Op&fsnotify.Create) == fsnotify.Create { + klog.Info("Kubelet socket recreated - kubelet restart detected") + return + } + case err := <-watcher.Errors: + klog.Warningf("fsnotify error: %v", err) + } + } +} + +// watchKubeletRestartPolling is a fallback method using polling. +// Used when fsnotify is unavailable. +func (p *HyperlightDevicePlugin) watchKubeletRestartPolling() { + klog.Info("Watching for kubelet restart (polling)...") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -324,9 +381,8 @@ func (p *HyperlightDevicePlugin) watchKubeletRestart() { case <-p.stopCh: return case <-ticker.C: - // Check if our socket still exists if _, err := os.Stat(serverSock); os.IsNotExist(err) { - log.Println("Plugin socket deleted - kubelet may have restarted") + klog.Info("Plugin socket deleted - kubelet may have restarted") return } } @@ -334,12 +390,15 @@ func (p *HyperlightDevicePlugin) watchKubeletRestart() { } func main() { - log.SetFlags(log.LstdFlags | log.Lshortfile) - log.Println("Starting Hyperlight Device Plugin") + klog.InitFlags(nil) + flag.Parse() + defer klog.Flush() + + klog.Info("Starting Hyperlight Device Plugin") plugin, err := NewHyperlightDevicePlugin() if err != nil { - log.Fatalf("Failed to create device plugin: %v", err) + klog.Fatalf("Failed to create device plugin: %v", err) } // Handle signals for graceful shutdown @@ -350,7 +409,7 @@ func main() { go func() { for { if err := plugin.Start(); err != nil { - log.Printf("Failed to start device plugin: %v", err) + klog.Errorf("Failed to start device plugin: %v", err) time.Sleep(5 * time.Second) continue } @@ -360,13 +419,13 @@ func main() { plugin.watchKubeletRestart() // If we get here, kubelet restarted - stop current server and re-register - log.Println("Detected kubelet restart, re-registering...") + klog.Info("Detected kubelet restart, re-registering...") plugin.server.Stop() time.Sleep(time.Second) // Brief pause before restart } }() sig := <-sigCh - log.Printf("Received signal %v, shutting down", sig) + klog.Infof("Received signal %v, shutting down", sig) plugin.Stop() } From 9ccbb3307ab34c4adf181528514e6a31b11cb989 Mon Sep 17 00:00:00 2001 From: Simon Davies Date: Wed, 7 Jan 2026 22:49:34 +0000 Subject: [PATCH 2/2] fix: address PR review comments - Rename newHealth to health (Go naming conventions) - Remove dead kubeletSock check (wrong directory being watched) - Handle closed fsnotify channels with fallback to polling - Update comment to reflect actual behavior Signed-off-by: Simon Davies --- device-plugin/main.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/device-plugin/main.go b/device-plugin/main.go index c6f3cff..6630a38 100644 --- a/device-plugin/main.go +++ b/device-plugin/main.go @@ -190,19 +190,19 @@ func (p *HyperlightDevicePlugin) ListAndWatch(req *pluginapi.Empty, srv pluginap case <-p.stopCh: return nil case <-ticker.C: - newHealth := pluginapi.Healthy + health := pluginapi.Healthy if _, err := os.Stat(p.devicePath); err != nil { - newHealth = pluginapi.Unhealthy + health = pluginapi.Unhealthy klog.Warningf("Device %s not found, marking all devices unhealthy", p.devicePath) } // Check if health changed (compare against first device as representative) - if p.devices[0].Health != newHealth { + if p.devices[0].Health != health { // Update ALL devices - they all share the same underlying hypervisor device for i := range p.devices { - p.devices[i].Health = newHealth + p.devices[i].Health = health } - klog.Infof("Device health changed to %s for all %d devices", newHealth, len(p.devices)) + klog.Infof("Device health changed to %s for all %d devices", health, len(p.devices)) if err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: p.devices}); err != nil { return err } @@ -317,7 +317,6 @@ func (p *HyperlightDevicePlugin) Stop() { } // newFSWatcher creates a filesystem watcher for kubelet restart detection. -// This is the industry-standard approach used by NVIDIA, Intel, and other device plugins. func newFSWatcher(files ...string) (*fsnotify.Watcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -336,7 +335,7 @@ func newFSWatcher(files ...string) (*fsnotify.Watcher, error) { // watchKubeletRestart monitors for kubelet restarts using fsnotify. // When kubelet restarts, it deletes all sockets in /var/lib/kubelet/device-plugins/. -// This function blocks until it detects a relevant filesystem event. +// This function blocks until it detects our plugin socket being deleted. func (p *HyperlightDevicePlugin) watchKubeletRestart() { klog.Info("Watching for kubelet restart using fsnotify...") @@ -352,17 +351,22 @@ func (p *HyperlightDevicePlugin) watchKubeletRestart() { select { case <-p.stopCh: return - case event := <-watcher.Events: + case event, ok := <-watcher.Events: + if !ok { + klog.Warning("fsnotify events channel closed, falling back to polling") + p.watchKubeletRestartPolling() + return + } if event.Name == serverSock && (event.Op&fsnotify.Remove) == fsnotify.Remove { klog.Info("Plugin socket deleted - kubelet may have restarted") return } - // Also watch for kubelet socket recreation (indicates kubelet restart complete) - if event.Name == kubeletSock && (event.Op&fsnotify.Create) == fsnotify.Create { - klog.Info("Kubelet socket recreated - kubelet restart detected") + case err, ok := <-watcher.Errors: + if !ok { + klog.Warning("fsnotify errors channel closed, falling back to polling") + p.watchKubeletRestartPolling() return } - case err := <-watcher.Errors: klog.Warningf("fsnotify error: %v", err) } }