Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/csiaddons/v1alpha1/csiaddonsnode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (

// Failed represents the Connection Failed state.
CSIAddonsNodeStateFailed CSIAddonsNodeState = "Failed"

// Retrying represents the Connection Retrying state.
CSIAddonsNodeStateRetrying CSIAddonsNodeState = "Retrying"

CSIAddonsNodeStateRetryingFmtStr = "currentAttempt: %d"
)

type CSIAddonsNodeDriver struct {
Expand Down
129 changes: 114 additions & 15 deletions internal/controller/csiaddons/csiaddonsnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"net/url"
"slices"
"strconv"
"strings"
"time"

csiaddonsv1alpha1 "github.com/csi-addons/kubernetes-csi-addons/api/csiaddons/v1alpha1"
"github.com/csi-addons/kubernetes-csi-addons/internal/connection"
Expand All @@ -42,6 +45,19 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
// The base duration to use for exponential backoff while retrying connection
baseRetryDelay = 2 * time.Second

// Maximum attempts to make while retrying the connection
maxRetries = 3

// The duration after which a new reconcile should be triggered
// to validate the cluster state. Used only when reconciliation
// completes without any errors.
baseRequeueAfter = 1 * time.Hour
)

var (
csiAddonsNodeFinalizer = csiaddonsv1alpha1.GroupVersion.Group + "/csiaddonsnode"
)
Expand Down Expand Up @@ -102,7 +118,16 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// In case of CR is marked for deletion, we dont need the connection to be established.
if err != nil && podName == "" && csiAddonsNode.DeletionTimestamp.IsZero() {
logger.Error(err, "Failed to resolve endpoint")
return ctrl.Result{}, fmt.Errorf("failed to resolve endpoint %q: %w", csiAddonsNode.Spec.Driver.EndPoint, err)

// We will either:
// - requeue and try again with exponential backoff
// - delete the csiaddonsnode after max retries and stop the reconicle phase
return r.backoffAndRetry(
ctx,
logger,
csiAddonsNode,
fmt.Errorf("failed to resolve endpoint %q: %w", csiAddonsNode.Spec.Driver.EndPoint, err),
)
}

// namespace + "/" + leader identity(pod name) is the key for the connection.
Expand All @@ -128,20 +153,13 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques

logger.Info("Connecting to sidecar")
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name, r.EnableAuth)
if err != nil {
logger.Error(err, "Failed to establish connection with sidecar")

errMessage := util.GetErrorMessage(err)
csiAddonsNode.Status.State = csiaddonsv1alpha1.CSIAddonsNodeStateFailed
csiAddonsNode.Status.Message = fmt.Sprintf("Failed to establish connection with sidecar: %v", errMessage)
statusErr := r.Status().Update(ctx, csiAddonsNode)
if statusErr != nil {
logger.Error(statusErr, "Failed to update status")

return ctrl.Result{}, statusErr
}

return ctrl.Result{}, err
// If error occurs, we retry with exponential backoff until we reach `maxRetries`
if err != nil {
// We will either:
// - requeue and try again with exponential backoff
// - delete the csiaddonsnode after max retries and stop the reconicle phase
return r.backoffAndRetry(ctx, logger, csiAddonsNode, err)
}

nfsc, err := r.getNetworkFenceClientStatus(ctx, &logger, newConn, csiAddonsNode)
Expand All @@ -157,6 +175,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques

csiAddonsNode.Status.State = csiaddonsv1alpha1.CSIAddonsNodeStateConnected
csiAddonsNode.Status.Message = "Successfully established connection with sidecar"
csiAddonsNode.Status.Reason = ""
csiAddonsNode.Status.Capabilities = parseCapabilities(newConn.Capabilities)
err = r.Status().Update(ctx, csiAddonsNode)
if err != nil {
Expand All @@ -165,7 +184,8 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
// Reconciled successfully, requeue to validate state periodically
return ctrl.Result{RequeueAfter: baseRequeueAfter}, nil
}

// getNetworkFenceClassesForDriver gets the networkfenceclasses for the driver.
Expand Down Expand Up @@ -341,6 +361,10 @@ func (r *CSIAddonsNodeReconciler) resolveEndpoint(ctx context.Context, rawURL st
Name: podname,
}, pod)
if err != nil {
// do not return podname if the pod does not exist
if apierrors.IsNotFound(err) {
podname = ""
}
return podname, "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, podname, err)
} else if pod.Status.PodIP == "" {
return podname, "", fmt.Errorf("pod %s/%s does not have an IP-address", namespace, podname)
Expand Down Expand Up @@ -416,3 +440,78 @@ func parseCapabilities(caps []*identity.Capability) []string {

return capabilities
}

// getRetryCountFromReason expects a string and tries to extract
// and return the retry count from the string.
// If the reason string is empty, it assumes the first attempt and returns 0.
// An error is returned if the parsing is not successful.
func getRetryCountFromReason(reason string) (int, error) {
// Might not be updated yet, likely the 1st attempt
if reason == "" {
return 0, nil
}

parts := strings.SplitN(reason, ":", 2)
if len(parts) < 2 {
return 0, errors.New("got an unexpected length after splitting the reason string")
}

// Parse
if c, err := strconv.Atoi(strings.TrimSpace(parts[1])); err == nil {
return c, nil
}

return 0, errors.New("failed to parse the reason string to an integer")
}

// backoffAndRetry handles the retry mechanism with exponential backoff for establishing
// a connection with the sidecar. It updates the status of the CSIAddonsNode object to
// reflect the current retry attempt and reason for failure. If the maximum number of retries
// is reached, the CSIAddonsNode object is deleted to abort further attempts.
func (r *CSIAddonsNodeReconciler) backoffAndRetry(
ctx context.Context,
logger logr.Logger,
csiAddonsNode *csiaddonsv1alpha1.CSIAddonsNode,
err error,
) (ctrl.Result, error) {
// Only continue if we get a valid/initial retry count
currentRetries, e := getRetryCountFromReason(csiAddonsNode.Status.Reason)
if e != nil {
logger.Error(e, "failed to get the retry count from csiAddonsNode status", "csiAddonsNodeStatus", csiAddonsNode.Status)

return ctrl.Result{}, e
}
logger.Error(err, "Failed to establish connection with sidecar", "attempt", currentRetries+1)

// If reached max retries, abort and cleanup
if currentRetries >= maxRetries {
logger.Info(fmt.Sprintf("Failed to establish connection with sidecar after %d attempts, deleting the object", maxRetries))

if delErr := r.Delete(ctx, csiAddonsNode); client.IgnoreNotFound(delErr) != nil {
logger.Error(delErr, "failed to delete CSIAddonsNode object after max retries")

return ctrl.Result{}, delErr
}

// Object is deleted, stop the reconcile phase
logger.Info("successfully deleted CSIAddonsNode object due to reaching max reconnection attempts")
return ctrl.Result{}, nil
}

errMessage := util.GetErrorMessage(err)
csiAddonsNode.Status.State = csiaddonsv1alpha1.CSIAddonsNodeStateRetrying
csiAddonsNode.Status.Message = fmt.Sprintf("Connection failed: %v. Retrying attempt %d/%d.", errMessage, currentRetries+1, maxRetries)
csiAddonsNode.Status.Reason = fmt.Sprintf(csiaddonsv1alpha1.CSIAddonsNodeStateRetryingFmtStr, currentRetries+1)
statusErr := r.Status().Update(ctx, csiAddonsNode)
if statusErr != nil {
logger.Error(statusErr, "Failed to update status")

return ctrl.Result{}, statusErr
}

// Calculate backoff; baseRetryDelay * 1, 2, 4....
backoff := baseRetryDelay * time.Duration(math.Pow(2, float64(currentRetries)))
logger.Info("Requeuing request for attempting the connection again", "backoff", backoff)

return ctrl.Result{RequeueAfter: backoff}, nil
}
63 changes: 63 additions & 0 deletions internal/controller/csiaddons/csiaddonsnode_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,66 @@ func TestParseCapabilities(t *testing.T) {
})
}
}
func TestGetRetryCountFromReason(t *testing.T) {
tests := []struct {
name string
reason string
want int
wantErr bool
}{
{
name: "empty reason",
reason: "",
want: 0,
wantErr: false,
},
{
name: "valid reason",
reason: "retry: 2",
want: 2,
wantErr: false,
},
{
name: "valid with extra spaces",
reason: "retry: 5",
want: 5,
wantErr: false,
},
{
name: "valid with trailing spaces",
reason: "something: 10 ",
want: 10,
wantErr: false,
},
{
name: "no colon",
reason: "retry 3",
want: 0,
wantErr: true,
},
{
name: "non-integer value",
reason: "retry: abc",
want: 0,
wantErr: true,
},
{
name: "multiple colons",
reason: "prefix: 7:extra",
want: 0,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getRetryCountFromReason(tt.reason)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
}
})
}
}