@@ -8,11 +8,9 @@ import (
88 "sync"
99 "time"
1010
11- "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
1211 registryclient "github.com/operator-framework/operator-registry/pkg/client"
1312 errorwrap "github.com/pkg/errors"
1413 "github.com/sirupsen/logrus"
15- "google.golang.org/grpc/connectivity"
1614 corev1 "k8s.io/api/core/v1"
1715 rbacv1 "k8s.io/api/rbac/v1"
1816 v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
@@ -350,6 +348,11 @@ func (o *Operator) handleCatSrcDeletion(obj interface{}) {
350348 func () {
351349 o .sourcesLock .Lock ()
352350 defer o .sourcesLock .Unlock ()
351+ if s , ok := o .sources [sourceKey ]; ok {
352+ if err := s .Client .Close (); err != nil {
353+ o .Log .WithError (err ).Warn ("error closing client" )
354+ }
355+ }
353356 delete (o .sources , sourceKey )
354357 }()
355358 o .Log .WithField ("source" , sourceKey ).Info ("removed client for deleted catalogsource" )
@@ -449,6 +452,12 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
449452 func () {
450453 o .sourcesLock .Lock ()
451454 defer o .sourcesLock .Unlock ()
455+ s , ok := o .sources [sourceKey ]
456+ if ok {
457+ if err := s .Client .Close (); err != nil {
458+ o .Log .WithError (err ).Debug ("error closing client connection" )
459+ }
460+ }
452461 delete (o .sources , sourceKey )
453462 }()
454463
@@ -464,50 +473,65 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
464473 address := catsrc .Address ()
465474 currentSource , ok := o .sources [sourceKey ]
466475 logger = logger .WithField ("currentSource" , sourceKey )
467- if ! ok || currentSource .Address != address || catsrc .Status .LastSync .After (currentSource .LastConnect .Time ) {
476+
477+ connect := false
478+
479+ // this connection is out of date, close and reconnect
480+ if ok && (currentSource .Address != address || catsrc .Status .LastSync .After (currentSource .LastConnect .Time )) {
481+ logger .Info ("rebuilding connection to registry" )
482+ if currentSource .Client != nil {
483+ if err := currentSource .Client .Close (); err != nil {
484+ logger .WithError (err ).Warn ("couldn't close outdated connection to registry" )
485+ return
486+ }
487+ }
488+ delete (o .sources , sourceKey )
489+ connect = true
490+ } else if ! ok {
491+ // have never made a connection, so need to build a new one
492+ connect = true
493+ }
494+
495+ if connect {
468496 logger .Info ("building connection to registry" )
469- client , err := registryclient .NewClient (address )
497+ c , err := registryclient .NewClient (address )
470498 if err != nil {
471499 logger .WithError (err ).Warn ("couldn't connect to registry" )
472500 }
473501 sourceRef := resolver.SourceRef {
474502 Address : address ,
475- Client : client ,
503+ Client : c ,
476504 LastConnect : timeNow (),
477505 LastHealthy : metav1.Time {}, // haven't detected healthy yet
478506 }
479507 o .sources [sourceKey ] = sourceRef
480508 currentSource = sourceRef
481509 sourcesUpdated = true
482510 }
511+
483512 if currentSource .LastHealthy .IsZero () {
484513 logger .Info ("client hasn't yet become healthy, attempt a health check" )
485- client , ok := currentSource .Client .(* registryclient.Client )
486- if ! ok {
487- logger .WithField ("client" , currentSource .Client ).Warn ("unexpected client" )
488- return
489- }
490- res , err := client .Health .Check (context .TODO (), & grpc_health_v1.HealthCheckRequest {Service : "Registry" })
514+ healthy , err := currentSource .Client .HealthCheck (context .TODO (), 1 * time .Second )
491515 if err != nil {
492- logger .WithError (err ).Debug ("error checking health" )
493- if client .Conn .GetState () == connectivity .TransientFailure {
494- logger .Debug ("wait for state to change" )
495- ctx , _ := context .WithTimeout (context .TODO (), 1 * time .Second )
496- if ! client .Conn .WaitForStateChange (ctx , connectivity .TransientFailure ) {
497- logger .Debug ("state didn't change, trigger reconnect. this may happen when cached dns is wrong." )
498- delete (o .sources , sourceKey )
499- if err := o .catSrcQueueSet .Requeue (sourceKey .Name , sourceKey .Namespace ); err != nil {
500- logger .WithError (err ).Debug ("error requeueing" )
501- }
516+ if registryclient .IsErrorUnrecoverable (err ) {
517+ logger .Debug ("state didn't change, trigger reconnect. this may happen when cached dns is wrong." )
518+ if err := currentSource .Client .Close (); err != nil {
519+ logger .WithError (err ).Warn ("couldn't close outdated connection to registry" )
502520 return
503521 }
522+ delete (o .sources , sourceKey )
523+ if err := o .catSrcQueueSet .Requeue (sourceKey .Name , sourceKey .Namespace ); err != nil {
524+ logger .WithError (err ).Debug ("error requeueing" )
525+ }
504526 }
527+ logger .WithError (err ).Debug ("connection error" )
505528 return
506529 }
507- if res . Status != grpc_health_v1 . HealthCheckResponse_SERVING {
508- logger .WithField ( "status" , res . Status . String ()). Debug ("source not healthy" )
530+ if ! healthy {
531+ logger .Debug ("source not healthy" )
509532 return
510533 }
534+
511535 currentSource .LastHealthy = timeNow ()
512536 o .sources [sourceKey ] = currentSource
513537 sourcesUpdated = true
@@ -694,26 +718,14 @@ func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string)
694718 }()
695719
696720 for k , s := range resolverSources {
697- client , ok := s .(* registryclient.Client )
698- if ! ok {
699- logger .Warn ("unexpected client" )
700- continue
701- }
702-
703721 logger = logger .WithField ("resolverSource" , k )
704- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("source" )
705- if client .Conn .GetState () == connectivity .TransientFailure {
706- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("waiting for connection" )
707- ctx , _ := context .WithTimeout (context .TODO (), 2 * time .Second )
708- changed := client .Conn .WaitForStateChange (ctx , connectivity .TransientFailure )
709- if ! changed {
710- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("source in transient failure and didn't recover" )
711- delete (resolverSources , k )
712- } else {
713- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("connection re-established" )
714- }
722+ healthy , err := s .HealthCheck (context .TODO (), 2 * time .Second )
723+ if err != nil || ! healthy {
724+ logger .WithError (err ).Debug ("omitting source due to unhealthy source" )
725+ delete (resolverSources , k )
715726 }
716727 }
728+
717729 return resolverSources
718730}
719731
0 commit comments