@@ -773,12 +773,6 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
773773 * out = * updated
774774 }
775775
776- // Check if the current CSV is being replaced, return with replacing status if so
777- if err := a .checkReplacementsAndUpdateStatus (out ); err != nil {
778- logger .WithError (err ).Info ("replacement check" )
779- return
780- }
781-
782776 // Verify CSV operatorgroup (and update annotations if needed)
783777 operatorGroup , err := a .operatorGroupForCSV (out , logger )
784778 if operatorGroup == nil {
@@ -889,6 +883,15 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
889883 }
890884 out .SetRequirementStatus (statuses )
891885
886+ // Check if we need to requeue the previous
887+ if prev := a .isReplacing (out ); prev != nil {
888+ if prev .Status .Phase == v1alpha1 .CSVPhaseSucceeded {
889+ if err := a .csvQueueSet .Requeue (prev .GetName (), prev .GetNamespace ()); err != nil {
890+ a .Log .WithError (err ).Warn ("error requeueing previous" )
891+ }
892+ }
893+ }
894+
892895 if ! met {
893896 logger .Info ("requirements were not met" )
894897 out .SetPhaseWithEventIfChanged (v1alpha1 .CSVPhasePending , v1alpha1 .CSVReasonRequirementsNotMet , "one or more requirements couldn't be found" , now , a .recorder )
@@ -912,6 +915,13 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
912915 return
913916 }
914917
918+ // Check if we're not ready to install part of the replacement chain yet
919+ if prev := a .isReplacing (out ); prev != nil {
920+ if prev .Status .Phase != v1alpha1 .CSVPhaseReplacing {
921+ return
922+ }
923+ }
924+
915925 logger .Info ("scheduling ClusterServiceVersion for install" )
916926 out .SetPhaseWithEvent (v1alpha1 .CSVPhaseInstallReady , v1alpha1 .CSVReasonRequirementsMet , "all requirements found, attempting install" , now , a .recorder )
917927 case v1alpha1 .CSVPhaseInstallReady :
@@ -955,6 +965,12 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
955965 }
956966
957967 case v1alpha1 .CSVPhaseSucceeded :
968+ // Check if the current CSV is being replaced, return with replacing status if so
969+ if err := a .checkReplacementsAndUpdateStatus (out ); err != nil {
970+ logger .WithError (err ).Info ("replacement check" )
971+ return
972+ }
973+
958974 installer , strategy , _ := a .parseStrategiesAndUpdateStatus (out )
959975 if strategy == nil {
960976 return
@@ -1073,6 +1089,14 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
10731089 return
10741090 }
10751091
1092+ // If we are a leaf, we should requeue the replacement for processing
1093+ if next := a .isBeingReplaced (out , a .csvSet (out .GetNamespace (), v1alpha1 .CSVPhaseAny )); next != nil {
1094+ err := a .csvQueueSet .Requeue (next .GetName (), next .GetNamespace ())
1095+ if err != nil {
1096+ a .Log .WithError (err ).Warn ("error requeuing replacement" )
1097+ }
1098+ }
1099+
10761100 // If we can find a newer version that's successfully installed, we're safe to mark all intermediates
10771101 for _ , csv := range a .findIntermediatesForDeletion (out ) {
10781102 // we only mark them in this step, in case some get deleted but others fail and break the replacement chain
@@ -1093,6 +1117,10 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
10931117 }
10941118 case v1alpha1 .CSVPhaseDeleting :
10951119 var immediate int64 = 0
1120+
1121+ if err := a .csvQueueSet .Remove (out .GetNamespace (), out .GetName ()); err != nil {
1122+ logger .WithError (err ).Debug ("error removing from queue" )
1123+ }
10961124 syncError = a .client .OperatorsV1alpha1 ().ClusterServiceVersions (out .GetNamespace ()).Delete (out .GetName (), & metav1.DeleteOptions {GracePeriodSeconds : & immediate })
10971125 if syncError != nil {
10981126 logger .Debugf ("unable to get delete csv marked for deletion: %s" , syncError .Error ())
@@ -1158,7 +1186,7 @@ func (a *Operator) checkReplacementsAndUpdateStatus(csv *v1alpha1.ClusterService
11581186 }
11591187 if replacement := a .isBeingReplaced (csv , a .csvSet (csv .GetNamespace (), v1alpha1 .CSVPhaseAny )); replacement != nil {
11601188 a .Log .Infof ("newer ClusterServiceVersion replacing %s, no-op" , csv .SelfLink )
1161- msg := fmt .Sprintf ("being replaced by csv: %s" , replacement .SelfLink )
1189+ msg := fmt .Sprintf ("being replaced by csv: %s" , replacement .GetName () )
11621190 csv .SetPhaseWithEvent (v1alpha1 .CSVPhaseReplacing , v1alpha1 .CSVReasonBeingReplaced , msg , timeNow (), a .recorder )
11631191 metrics .CSVUpgradeCount .Inc ()
11641192
@@ -1237,10 +1265,15 @@ func (a *Operator) parseStrategiesAndUpdateStatus(csv *v1alpha1.ClusterServiceVe
12371265 return installer , strategy , previousStrategy
12381266}
12391267
1240- func (a * Operator ) crdOwnerConflicts (in * v1alpha1.ClusterServiceVersion , csvs map [string ]* v1alpha1.ClusterServiceVersion ) error {
1241- for _ , crd := range in .Spec .CustomResourceDefinitions .Owned {
1242- for name , csv := range csvs {
1243- if name != in .GetName () && in .Spec .Replaces != name && csv .OwnsCRD (crd .Name ) {
1268+ func (a * Operator ) crdOwnerConflicts (in * v1alpha1.ClusterServiceVersion , csvsInNamespace map [string ]* v1alpha1.ClusterServiceVersion ) error {
1269+ csvsInChain := a .getReplacementChain (in , csvsInNamespace )
1270+ // find csvs in the namespace that are not part of the replacement chain
1271+ for name , csv := range csvsInNamespace {
1272+ if _ , ok := csvsInChain [name ]; ok {
1273+ continue
1274+ }
1275+ for _ , crd := range in .Spec .CustomResourceDefinitions .Owned {
1276+ if name != in .GetName () && csv .OwnsCRD (crd .Name ) {
12441277 return ErrCRDOwnerConflict
12451278 }
12461279 }
@@ -1249,6 +1282,53 @@ func (a *Operator) crdOwnerConflicts(in *v1alpha1.ClusterServiceVersion, csvs ma
12491282 return nil
12501283}
12511284
1285+ func (a * Operator ) getReplacementChain (in * v1alpha1.ClusterServiceVersion , csvsInNamespace map [string ]* v1alpha1.ClusterServiceVersion ) map [string ]struct {} {
1286+ current := in .GetName ()
1287+ csvsInChain := map [string ]struct {}{
1288+ current : {},
1289+ }
1290+
1291+ replacement := func (csvName string ) * string {
1292+ for _ , csv := range csvsInNamespace {
1293+ if csv .Spec .Replaces == csvName {
1294+ name := csv .GetName ()
1295+ return & name
1296+ }
1297+ }
1298+ return nil
1299+ }
1300+
1301+ replaces := func (replaces string ) * string {
1302+ for _ , csv := range csvsInNamespace {
1303+ name := csv .GetName ()
1304+ if name == replaces {
1305+ rep := csv .Spec .Replaces
1306+ return & rep
1307+ }
1308+ }
1309+ return nil
1310+ }
1311+
1312+ next := replacement (current )
1313+ for next != nil {
1314+ csvsInChain [* next ] = struct {}{}
1315+ current = * next
1316+ next = replacement (current )
1317+ }
1318+
1319+ current = in .Spec .Replaces
1320+ prev := replaces (current )
1321+ if prev != nil {
1322+ csvsInChain [current ] = struct {}{}
1323+ }
1324+ for prev != nil && * prev != "" {
1325+ current = * prev
1326+ csvsInChain [current ] = struct {}{}
1327+ prev = replaces (current )
1328+ }
1329+ return csvsInChain
1330+ }
1331+
12521332func (a * Operator ) apiServiceOwnerConflicts (csv * v1alpha1.ClusterServiceVersion ) error {
12531333 // Get replacing CSV if exists
12541334 replacing , err := a .lister .OperatorsV1alpha1 ().ClusterServiceVersionLister ().ClusterServiceVersions (csv .GetNamespace ()).Get (csv .Spec .Replaces )
@@ -1297,6 +1377,8 @@ func (a *Operator) isReplacing(in *v1alpha1.ClusterServiceVersion) *v1alpha1.Clu
12971377 if in .Spec .Replaces == "" {
12981378 return nil
12991379 }
1380+
1381+ // using the client instead of a lister; missing an object because of a cache sync can cause upgrades to fail
13001382 previous , err := a .lister .OperatorsV1alpha1 ().ClusterServiceVersionLister ().ClusterServiceVersions (in .GetNamespace ()).Get (in .Spec .Replaces )
13011383 if err != nil {
13021384 a .Log .WithField ("replacing" , in .Spec .Replaces ).WithError (err ).Debugf ("unable to get previous csv" )
0 commit comments