From 50de515350c29ae86b0cf91ad3e696b36a182ef9 Mon Sep 17 00:00:00 2001 From: Sebastien FELIX Date: Sun, 29 Jun 2025 19:41:31 +0200 Subject: [PATCH 1/4] feature(syncwave) - add BTree order for syncwave management --- pkg/sync/common/types.go | 4 +- pkg/sync/doc.go | 24 +++++ pkg/sync/sync_context.go | 14 +-- pkg/sync/sync_context_test.go | 22 ++--- pkg/sync/sync_tasks.go | 136 +++++++++++++++++++++++++-- pkg/sync/sync_tasks_test.go | 153 ++++++++++++++++++++++++++++++- pkg/sync/syncwaves/waves.go | 11 +++ pkg/sync/syncwaves/waves_test.go | 6 ++ 8 files changed, 339 insertions(+), 31 deletions(-) diff --git a/pkg/sync/common/types.go b/pkg/sync/common/types.go index d5052ed4a..9c4ea3d27 100644 --- a/pkg/sync/common/types.go +++ b/pkg/sync/common/types.go @@ -13,6 +13,8 @@ const ( // AnnotationSyncWave indicates which wave of the sync the resource or hook should be in AnnotationSyncWave = "argocd.argoproj.io/sync-wave" // AnnotationKeyHook contains the hook type of a resource + AnnotationSyncWaveOrder = "argocd.argoproj.io/sync-wave-order" + // AnnotationKeyHook contains the hook type of a resource AnnotationKeyHook = "argocd.argoproj.io/hook" // AnnotationKeyHookDeletePolicy is the policy of deleting a hook AnnotationKeyHookDeletePolicy = "argocd.argoproj.io/hook-delete-policy" @@ -58,7 +60,7 @@ type SyncPhase string // SyncWaveHook is a callback function which will be invoked after each sync wave is successfully // applied during a sync operation. The callback indicates which phase and wave it had just // executed, and whether or not that wave was the final one. -type SyncWaveHook func(phase SyncPhase, wave int, final bool) error +type SyncWaveHook func(phase SyncPhase, waves []int, final bool) error const ( SyncPhasePreSync = "PreSync" diff --git a/pkg/sync/doc.go b/pkg/sync/doc.go index f4f5d8725..f6410971f 100644 --- a/pkg/sync/doc.go +++ b/pkg/sync/doc.go @@ -4,6 +4,7 @@ Package implements Kubernetes resources synchronization and provides the followi - resource pruning - resource hooks - sync waves + - sync waves ordering - sync options # Basic Syncing @@ -75,6 +76,29 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat annotations: argocd.argoproj.io/sync-wave: "5" +# Sync Waves Ordering + +The wave ordering feature allows to run parallel waves of synchronisation where the sync-wave values correspond to a complete +binary tree with root's label equal to 1. A sync-wave value X would be considered less than Y if and only if there exists +integers N and M such that : +Y = X * 2**N + M where 0 <= M < N. + +The `argocd.argoproj.io/sync-wave-order` annotation define the type of wave's ordering used for a resource's wave: + + metadata: + annotations: + argocd.argoproj.io/sync-wave: "5" + argocd.argoproj.io/sync-wave-order: "BTree" + +example of sync-waves ordering using BTree: + + 1 -----> 2 -----> 4 + \ \----> 5 + \---> 3 -----> 6 + \----> 7 + +Note that a resource using a BTree ordering for it's sync-wave will always be synced after all resources using a Normal ordering. + # Sync Options The sync options allows customizing the synchronization of selected resources. The options are specified using the diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 8f4d51e4f..9daa465bd 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "reflect" + "slices" "sort" "strings" "sync" @@ -562,16 +564,16 @@ func (sc *syncContext) Sync() { // remove any tasks not in this wave phase := tasks.phase() - wave := tasks.wave() - finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave() + waves := tasks.waves() + finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, tasks.lastWaves()) // if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful // EVEN if those objects subsequently degraded // This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately. - remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() }) + remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() }) - sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") - tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave }) + sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") + tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && slices.Contains(waves, t.wave()) }) sc.setOperationPhase(common.OperationRunning, "one or more tasks are running") @@ -579,7 +581,7 @@ func (sc *syncContext) Sync() { runState := sc.runTasks(tasks, false) if sc.syncWaveHook != nil && runState != failed { - err := sc.syncWaveHook(phase, wave, finalWave) + err := sc.syncWaveHook(phase, waves, finalWaves) if err != nil { sc.deleteHooks(hooksPendingDeletionFailed) sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err)) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0e8d01ebb..132beb55b 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -1631,10 +1631,10 @@ func TestSyncWaveHook(t *testing.T) { syncCtx.hooks = []*unstructured.Unstructured{pod3} called := false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhaseSync, string(phase)) - assert.Equal(t, -1, wave) + assert.True(t, reflect.DeepEqual([]int{-1}, waves)) assert.False(t, final) return nil } @@ -1644,7 +1644,7 @@ func TestSyncWaveHook(t *testing.T) { // call sync again, it should not invoke the SyncWaveHook callback since we only should be // doing this after an apply, and not every reconciliation called = false - syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { + syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ []int, _ bool) error { called = true return nil } @@ -1657,10 +1657,10 @@ func TestSyncWaveHook(t *testing.T) { pod1Res.HookPhase = synccommon.OperationSucceeded syncCtx.syncRes[resourceResultKey(pod1Res.ResourceKey, synccommon.SyncPhaseSync)] = pod1Res called = false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhaseSync, string(phase)) - assert.Equal(t, 0, wave) + assert.True(t, reflect.DeepEqual([]int{0}, waves)) assert.False(t, final) return nil } @@ -1673,10 +1673,10 @@ func TestSyncWaveHook(t *testing.T) { pod2Res.HookPhase = synccommon.OperationSucceeded syncCtx.syncRes[resourceResultKey(pod2Res.ResourceKey, synccommon.SyncPhaseSync)] = pod2Res called = false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhasePostSync, string(phase)) - assert.Equal(t, 0, wave) + assert.True(t, reflect.DeepEqual([]int{0}, waves)) assert.True(t, final) return nil } @@ -1695,7 +1695,7 @@ func TestSyncWaveHookFail(t *testing.T) { }) called := false - syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { + syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ []int, _ bool) error { called = true return errors.New("intentional error") } @@ -1728,7 +1728,7 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 1, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{1}, tasks.lastWaves())) }) t.Run("syncPhaseDifferentWave", func(t *testing.T) { @@ -1744,7 +1744,7 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for tasks + 1 - assert.Equal(t, 8, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{8}, tasks.lastWaves())) }) t.Run("pruneLastIndividualResources", func(t *testing.T) { @@ -1762,7 +1762,7 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for tasks + 1 - assert.Equal(t, 8, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{8}, tasks.lastWaves())) }) } diff --git a/pkg/sync/sync_tasks.go b/pkg/sync/sync_tasks.go index 813533a23..624bb4ae8 100644 --- a/pkg/sync/sync_tasks.go +++ b/pkg/sync/sync_tasks.go @@ -2,12 +2,15 @@ package sync import ( "fmt" + "math" + "reflect" "sort" "strings" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/argoproj/gitops-engine/pkg/sync/common" + "github.com/argoproj/gitops-engine/pkg/sync/syncwaves" "github.com/argoproj/gitops-engine/pkg/utils/kube" ) @@ -137,6 +140,70 @@ func (s syncTasks) Sort() { }) } +func LessBTree(u int, v int) bool { + if (u <= 1) && (v <= 1) { + return u < v + } + if u <= 1 { + return true + } + if v <= 1 { + return false + } + xMax := int(math.Floor(math.Log2(float64(v)/float64(u))) + 1) + for i := 1; i <= xMax; i++ { + N := int(math.Pow(2, float64(i))) + w := v - int(N)*u + if (w >= 0) && (w < N) { + return true + } + } + return false +} + +func (s syncTasks) GetSyncTasksWithNoAntecedent() syncTasks { + tasksWithNoAntecedent := syncTasks{} + for iSyncTask := range s { + candidateTask := s[iSyncTask] + candidateTaskWaveOrdering := syncwaves.WaveOrdering(candidateTask.obj()) + candidateTaskHasNoAntecedent := true + for jSyncTask := range s { + task := s[jSyncTask] + taskWaveOrdering := syncwaves.WaveOrdering(task.obj()) + if (candidateTaskWaveOrdering == "Normal") && (taskWaveOrdering == "Normal") { + if syncwaves.Wave(task.obj()) < syncwaves.Wave(candidateTask.obj()) { + candidateTaskHasNoAntecedent = false + break + } + } + if (candidateTaskWaveOrdering == "Normal") && (taskWaveOrdering == "BTree") { + if syncwaves.Wave(task.obj()) < syncwaves.Wave(candidateTask.obj()) { + // Warning, a resource using Normal syncwave ordering will always be synced before a + // resource using a BTree syncwave ordering, even it has a greater syncwave value. + } + } + if (candidateTaskWaveOrdering == "Btree") && (taskWaveOrdering == "Normal") { + candidateTaskHasNoAntecedent = false + break + } + if (candidateTaskWaveOrdering == "Btree") && (taskWaveOrdering == "BTree") { + fmt.Println("TASK WAVE") + fmt.Println(syncwaves.Wave(task.obj())) + fmt.Println("CANDIDATE TASK WAVE") + fmt.Println(syncwaves.Wave(candidateTask.obj())) + if LessBTree(syncwaves.Wave(task.obj()), syncwaves.Wave(candidateTask.obj())) { + candidateTaskHasNoAntecedent = false + break + } + } + } + if candidateTaskHasNoAntecedent { + tasksWithNoAntecedent = append(tasksWithNoAntecedent, candidateTask) + } + } + return tasksWithNoAntecedent +} + // adjust order of tasks and bubble up tasks which are dependencies of other tasks // (e.g. namespace sync should happen before resources that resides in that namespace) func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) { @@ -250,11 +317,33 @@ func (s syncTasks) phase() common.SyncPhase { return "" } -func (s syncTasks) wave() int { - if len(s) > 0 { - return s[0].wave() +func (s syncTasks) waves() []int { + var waves []int + tasksNormalWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "Normal" }) + tasksBTreeWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "BTree" }) + if len(tasksNormalWaves) > 0 { + waves = append(waves, tasksNormalWaves[0].wave()) + return waves } - return 0 + if len(tasksBTreeWaves) > 0 { + for iSyncTask := range tasksBTreeWaves { + candidateTask := tasksBTreeWaves[iSyncTask] + candidateTaskHasNoAntecedent := true + for jSyncTask := range tasksBTreeWaves { + task := tasksBTreeWaves[jSyncTask] + if LessBTree(syncwaves.Wave(task.obj()), syncwaves.Wave(candidateTask.obj())) { + candidateTaskHasNoAntecedent = false + break + } + } + if candidateTaskHasNoAntecedent { + waves = append(waves, syncwaves.Wave(candidateTask.obj())) + } + } + return waves + } + waves = append(waves, 0) + return waves } func (s syncTasks) lastPhase() common.SyncPhase { @@ -264,13 +353,42 @@ func (s syncTasks) lastPhase() common.SyncPhase { return "" } -func (s syncTasks) lastWave() int { - if len(s) > 0 { - return s[len(s)-1].wave() +func (s syncTasks) lastWaves() []int { + tasksNormalWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "Normal" }) + tasksBTreeWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "BTree" }) + + var lastwaves []int + + if len(tasksBTreeWaves) > 0 && len(tasksNormalWaves) > 0 { + lastwaves = append(lastwaves, tasksNormalWaves[len(tasksNormalWaves)-1].wave()+1) + return lastwaves + } + if len(tasksNormalWaves) > 0 { + lastwaves = append(lastwaves, tasksNormalWaves[len(tasksNormalWaves)-1].wave()) + return lastwaves + } + if len(tasksBTreeWaves) > 0 { + for iSyncTask := range tasksBTreeWaves { + candidateTask := s[iSyncTask] + candidateTaskHasNoSuccessor := true + for jSyncTask := range s { + task := s[jSyncTask] + if LessBTree(syncwaves.Wave(candidateTask.obj()), syncwaves.Wave(task.obj())) { + candidateTaskHasNoSuccessor = false + break + } + + } + if candidateTaskHasNoSuccessor { + lastwaves = append(lastwaves, syncwaves.Wave(candidateTask.obj())) + } + } + return lastwaves } - return 0 + lastwaves = append(lastwaves, 0) + return lastwaves } func (s syncTasks) multiStep() bool { - return s.wave() != s.lastWave() || s.phase() != s.lastPhase() + return !reflect.DeepEqual(s.waves(), s.lastWaves()) || s.phase() != s.lastPhase() } diff --git a/pkg/sync/sync_tasks_test.go b/pkg/sync/sync_tasks_test.go index 18135beab..0a9d3e266 100644 --- a/pkg/sync/sync_tasks_test.go +++ b/pkg/sync/sync_tasks_test.go @@ -1,7 +1,10 @@ package sync import ( + "fmt" + "reflect" "sort" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -9,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/argoproj/gitops-engine/pkg/sync/common" + "github.com/argoproj/gitops-engine/pkg/sync/syncwaves" testingutils "github.com/argoproj/gitops-engine/pkg/utils/testing" ) @@ -465,9 +469,9 @@ func Test_syncTasks_multiStep(t *testing.T) { t.Run("Single", func(t *testing.T) { tasks := syncTasks{{liveObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "-1"), phase: common.SyncPhaseSync}} assert.Equal(t, common.SyncPhaseSync, string(tasks.phase())) - assert.Equal(t, -1, tasks.wave()) + assert.True(t, reflect.DeepEqual([]int{-1}, tasks.waves())) assert.Equal(t, common.SyncPhaseSync, string(tasks.lastPhase())) - assert.Equal(t, -1, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{-1}, tasks.lastWaves())) assert.False(t, tasks.multiStep()) }) t.Run("Double", func(t *testing.T) { @@ -476,9 +480,150 @@ func Test_syncTasks_multiStep(t *testing.T) { {liveObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1"), phase: common.SyncPhasePostSync}, } assert.Equal(t, common.SyncPhasePreSync, string(tasks.phase())) - assert.Equal(t, -1, tasks.wave()) + assert.True(t, reflect.DeepEqual([]int{-1}, tasks.waves())) assert.Equal(t, common.SyncPhasePostSync, string(tasks.lastPhase())) - assert.Equal(t, 1, tasks.lastWave()) + fmt.Println("ICI") + fmt.Println(tasks.lastWaves()) + assert.True(t, reflect.DeepEqual([]int{1}, tasks.lastWaves())) assert.True(t, tasks.multiStep()) }) } + +var tasksSingletonNormal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "-1", + }, + }, + }, + }, + }, +} + +func Test_GetSyncTasksWithNoAntecedent_Singleton(t *testing.T) { + tasks := tasksSingletonNormal + minimalTasks := tasks.GetSyncTasksWithNoAntecedent() + assert.Equal(t, tasks, minimalTasks) +} + +var tasksNormal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "-1", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "0", + }, + }, + }, + }, + }, +} + +func Test_GetSyncTasksWithNoAntecedent_Normal(t *testing.T) { + tasks := tasksNormal + minimalTasks := tasks.GetSyncTasksWithNoAntecedent() + assert.True(t, reflect.DeepEqual(tasksSingletonNormal, minimalTasks)) +} + +var tasksBTreeMinimal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "2", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, +} + +var tasksBTreeBothMinimal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "2", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "3", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, +} + +func Test_GetSyncTasksWithNoAntecedent_BTree_BothMinimal(t *testing.T) { + tasks := tasksBTreeBothMinimal + minimalTasks := tasks.GetSyncTasksWithNoAntecedent() + assert.True(t, reflect.DeepEqual(tasksBTreeBothMinimal, minimalTasks)) +} + +var tasksBTreeOneMinimal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "2", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "4", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, +} + +func Test_GetSyncTasksWithNoAntecedent_BTree_OneMinimal(t *testing.T) { + tasks := tasksBTreeOneMinimal + minimalTasks := tasks.GetSyncTasksWithNoAntecedent() + tasksBTreeOneMinimalWaves := tasksBTreeOneMinimal.Map(func(task *syncTask) string { + return strconv.Itoa(syncwaves.Wave(task.obj())) + }) + minimalTasksWaves := minimalTasks.Map(func(task *syncTask) string { + return strconv.Itoa(syncwaves.Wave(task.obj())) + }) + + assert.True(t, reflect.DeepEqual(tasksBTreeOneMinimalWaves, minimalTasksWaves)) +} diff --git a/pkg/sync/syncwaves/waves.go b/pkg/sync/syncwaves/waves.go index 200433571..888d4c69f 100644 --- a/pkg/sync/syncwaves/waves.go +++ b/pkg/sync/syncwaves/waves.go @@ -19,3 +19,14 @@ func Wave(obj *unstructured.Unstructured) int { } return helmhook.Weight(obj) } + +func WaveOrdering(obj *unstructured.Unstructured) string { + text, ok := obj.GetAnnotations()[common.AnnotationSyncWaveOrder] + if ok { + if text == "BTree" { + return text + } + return "Normal" + } + return "Normal" +} diff --git a/pkg/sync/syncwaves/waves_test.go b/pkg/sync/syncwaves/waves_test.go index 2fadf4fd7..64f8826c2 100644 --- a/pkg/sync/syncwaves/waves_test.go +++ b/pkg/sync/syncwaves/waves_test.go @@ -13,3 +13,9 @@ func TestWave(t *testing.T) { assert.Equal(t, 1, Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave", "1"))) assert.Equal(t, 1, Wave(testingutils.Annotate(testingutils.NewPod(), "helm.sh/hook-weight", "1"))) } + +func TestWaveOrdering(t *testing.T) { + assert.Equal(t, "Normal", WaveOrdering(testingutils.NewPod())) + assert.Equal(t, "Normal", Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave-order", "Normal"))) + assert.Equal(t, "BTree", Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave-order", "BTree"))) +} From ab53ce59e706cb09e9615a463567177a273702a5 Mon Sep 17 00:00:00 2001 From: Sebastien FELIX Date: Sun, 29 Jun 2025 19:41:31 +0200 Subject: [PATCH 2/4] feature(syncwave) - add BTree order for syncwave management --- pkg/sync/common/types.go | 4 +- pkg/sync/doc.go | 24 ++++++ pkg/sync/sync_context.go | 14 ++-- pkg/sync/sync_context_test.go | 22 ++--- pkg/sync/sync_tasks.go | 93 +++++++++++++++++++-- pkg/sync/sync_tasks_test.go | 137 ++++++++++++++++++++++++++++++- pkg/sync/syncwaves/waves.go | 11 +++ pkg/sync/syncwaves/waves_test.go | 6 ++ 8 files changed, 280 insertions(+), 31 deletions(-) diff --git a/pkg/sync/common/types.go b/pkg/sync/common/types.go index d5052ed4a..9c4ea3d27 100644 --- a/pkg/sync/common/types.go +++ b/pkg/sync/common/types.go @@ -13,6 +13,8 @@ const ( // AnnotationSyncWave indicates which wave of the sync the resource or hook should be in AnnotationSyncWave = "argocd.argoproj.io/sync-wave" // AnnotationKeyHook contains the hook type of a resource + AnnotationSyncWaveOrder = "argocd.argoproj.io/sync-wave-order" + // AnnotationKeyHook contains the hook type of a resource AnnotationKeyHook = "argocd.argoproj.io/hook" // AnnotationKeyHookDeletePolicy is the policy of deleting a hook AnnotationKeyHookDeletePolicy = "argocd.argoproj.io/hook-delete-policy" @@ -58,7 +60,7 @@ type SyncPhase string // SyncWaveHook is a callback function which will be invoked after each sync wave is successfully // applied during a sync operation. The callback indicates which phase and wave it had just // executed, and whether or not that wave was the final one. -type SyncWaveHook func(phase SyncPhase, wave int, final bool) error +type SyncWaveHook func(phase SyncPhase, waves []int, final bool) error const ( SyncPhasePreSync = "PreSync" diff --git a/pkg/sync/doc.go b/pkg/sync/doc.go index f4f5d8725..f6410971f 100644 --- a/pkg/sync/doc.go +++ b/pkg/sync/doc.go @@ -4,6 +4,7 @@ Package implements Kubernetes resources synchronization and provides the followi - resource pruning - resource hooks - sync waves + - sync waves ordering - sync options # Basic Syncing @@ -75,6 +76,29 @@ that runs before all other resources. The `argocd.argoproj.io/sync-wave` annotat annotations: argocd.argoproj.io/sync-wave: "5" +# Sync Waves Ordering + +The wave ordering feature allows to run parallel waves of synchronisation where the sync-wave values correspond to a complete +binary tree with root's label equal to 1. A sync-wave value X would be considered less than Y if and only if there exists +integers N and M such that : +Y = X * 2**N + M where 0 <= M < N. + +The `argocd.argoproj.io/sync-wave-order` annotation define the type of wave's ordering used for a resource's wave: + + metadata: + annotations: + argocd.argoproj.io/sync-wave: "5" + argocd.argoproj.io/sync-wave-order: "BTree" + +example of sync-waves ordering using BTree: + + 1 -----> 2 -----> 4 + \ \----> 5 + \---> 3 -----> 6 + \----> 7 + +Note that a resource using a BTree ordering for it's sync-wave will always be synced after all resources using a Normal ordering. + # Sync Options The sync options allows customizing the synchronization of selected resources. The options are specified using the diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 8f4d51e4f..9daa465bd 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "reflect" + "slices" "sort" "strings" "sync" @@ -562,16 +564,16 @@ func (sc *syncContext) Sync() { // remove any tasks not in this wave phase := tasks.phase() - wave := tasks.wave() - finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave() + waves := tasks.waves() + finalWaves := phase == tasks.lastPhase() && reflect.DeepEqual(waves, tasks.lastWaves()) // if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful // EVEN if those objects subsequently degraded // This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately. - remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() }) + remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || !slices.Contains(waves, t.wave()) || t.isHook() }) - sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") - tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave }) + sc.log.WithValues("phase", phase, "wave", waves, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave") + tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && slices.Contains(waves, t.wave()) }) sc.setOperationPhase(common.OperationRunning, "one or more tasks are running") @@ -579,7 +581,7 @@ func (sc *syncContext) Sync() { runState := sc.runTasks(tasks, false) if sc.syncWaveHook != nil && runState != failed { - err := sc.syncWaveHook(phase, wave, finalWave) + err := sc.syncWaveHook(phase, waves, finalWaves) if err != nil { sc.deleteHooks(hooksPendingDeletionFailed) sc.setOperationPhase(common.OperationFailed, fmt.Sprintf("SyncWaveHook failed: %v", err)) diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index 0e8d01ebb..132beb55b 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -1631,10 +1631,10 @@ func TestSyncWaveHook(t *testing.T) { syncCtx.hooks = []*unstructured.Unstructured{pod3} called := false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhaseSync, string(phase)) - assert.Equal(t, -1, wave) + assert.True(t, reflect.DeepEqual([]int{-1}, waves)) assert.False(t, final) return nil } @@ -1644,7 +1644,7 @@ func TestSyncWaveHook(t *testing.T) { // call sync again, it should not invoke the SyncWaveHook callback since we only should be // doing this after an apply, and not every reconciliation called = false - syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { + syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ []int, _ bool) error { called = true return nil } @@ -1657,10 +1657,10 @@ func TestSyncWaveHook(t *testing.T) { pod1Res.HookPhase = synccommon.OperationSucceeded syncCtx.syncRes[resourceResultKey(pod1Res.ResourceKey, synccommon.SyncPhaseSync)] = pod1Res called = false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhaseSync, string(phase)) - assert.Equal(t, 0, wave) + assert.True(t, reflect.DeepEqual([]int{0}, waves)) assert.False(t, final) return nil } @@ -1673,10 +1673,10 @@ func TestSyncWaveHook(t *testing.T) { pod2Res.HookPhase = synccommon.OperationSucceeded syncCtx.syncRes[resourceResultKey(pod2Res.ResourceKey, synccommon.SyncPhaseSync)] = pod2Res called = false - syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, wave int, final bool) error { + syncCtx.syncWaveHook = func(phase synccommon.SyncPhase, waves []int, final bool) error { called = true assert.Equal(t, synccommon.SyncPhasePostSync, string(phase)) - assert.Equal(t, 0, wave) + assert.True(t, reflect.DeepEqual([]int{0}, waves)) assert.True(t, final) return nil } @@ -1695,7 +1695,7 @@ func TestSyncWaveHookFail(t *testing.T) { }) called := false - syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ int, _ bool) error { + syncCtx.syncWaveHook = func(_ synccommon.SyncPhase, _ []int, _ bool) error { called = true return errors.New("intentional error") } @@ -1728,7 +1728,7 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for non-prune task + 1 - assert.Equal(t, 1, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{1}, tasks.lastWaves())) }) t.Run("syncPhaseDifferentWave", func(t *testing.T) { @@ -1744,7 +1744,7 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for tasks + 1 - assert.Equal(t, 8, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{8}, tasks.lastWaves())) }) t.Run("pruneLastIndividualResources", func(t *testing.T) { @@ -1762,7 +1762,7 @@ func TestPruneLast(t *testing.T) { assert.True(t, successful) assert.Len(t, tasks, 3) // last wave is the last sync wave for tasks + 1 - assert.Equal(t, 8, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{8}, tasks.lastWaves())) }) } diff --git a/pkg/sync/sync_tasks.go b/pkg/sync/sync_tasks.go index 813533a23..48d7f352e 100644 --- a/pkg/sync/sync_tasks.go +++ b/pkg/sync/sync_tasks.go @@ -2,12 +2,15 @@ package sync import ( "fmt" + "math" + "reflect" "sort" "strings" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/argoproj/gitops-engine/pkg/sync/common" + "github.com/argoproj/gitops-engine/pkg/sync/syncwaves" "github.com/argoproj/gitops-engine/pkg/utils/kube" ) @@ -137,6 +140,27 @@ func (s syncTasks) Sort() { }) } +func LessBTree(u int, v int) bool { + if (u <= 1) && (v <= 1) { + return u < v + } + if u <= 1 { + return true + } + if v <= 1 { + return false + } + xMax := int(math.Floor(math.Log2(float64(v)/float64(u))) + 1) + for i := 1; i <= xMax; i++ { + N := int(math.Pow(2, float64(i))) + w := v - int(N)*u + if (w >= 0) && (w < N) { + return true + } + } + return false +} + // adjust order of tasks and bubble up tasks which are dependencies of other tasks // (e.g. namespace sync should happen before resources that resides in that namespace) func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) { @@ -250,11 +274,33 @@ func (s syncTasks) phase() common.SyncPhase { return "" } -func (s syncTasks) wave() int { - if len(s) > 0 { - return s[0].wave() +func (s syncTasks) waves() []int { + var waves []int + tasksNormalWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "Normal" }) + tasksBTreeWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "BTree" }) + if len(tasksNormalWaves) > 0 { + waves = append(waves, tasksNormalWaves[0].wave()) + return waves } - return 0 + if len(tasksBTreeWaves) > 0 { + for iSyncTask := range tasksBTreeWaves { + candidateTask := tasksBTreeWaves[iSyncTask] + candidateTaskHasNoAntecedent := true + for jSyncTask := range tasksBTreeWaves { + task := tasksBTreeWaves[jSyncTask] + if LessBTree(syncwaves.Wave(task.obj()), syncwaves.Wave(candidateTask.obj())) { + candidateTaskHasNoAntecedent = false + break + } + } + if candidateTaskHasNoAntecedent { + waves = append(waves, syncwaves.Wave(candidateTask.obj())) + } + } + return waves + } + waves = append(waves, 0) + return waves } func (s syncTasks) lastPhase() common.SyncPhase { @@ -264,13 +310,42 @@ func (s syncTasks) lastPhase() common.SyncPhase { return "" } -func (s syncTasks) lastWave() int { - if len(s) > 0 { - return s[len(s)-1].wave() +func (s syncTasks) lastWaves() []int { + tasksNormalWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "Normal" }) + tasksBTreeWaves := s.Filter(func(t *syncTask) bool { return syncwaves.WaveOrdering(t.obj()) == "BTree" }) + + var lastwaves []int + + if len(tasksBTreeWaves) > 0 && len(tasksNormalWaves) > 0 { + lastwaves = append(lastwaves, tasksNormalWaves[len(tasksNormalWaves)-1].wave()+1) + return lastwaves + } + if len(tasksNormalWaves) > 0 { + lastwaves = append(lastwaves, tasksNormalWaves[len(tasksNormalWaves)-1].wave()) + return lastwaves + } + if len(tasksBTreeWaves) > 0 { + for iSyncTask := range tasksBTreeWaves { + candidateTask := s[iSyncTask] + candidateTaskHasNoSuccessor := true + for jSyncTask := range s { + task := s[jSyncTask] + if LessBTree(syncwaves.Wave(candidateTask.obj()), syncwaves.Wave(task.obj())) { + candidateTaskHasNoSuccessor = false + break + } + + } + if candidateTaskHasNoSuccessor { + lastwaves = append(lastwaves, syncwaves.Wave(candidateTask.obj())) + } + } + return lastwaves } - return 0 + lastwaves = append(lastwaves, 0) + return lastwaves } func (s syncTasks) multiStep() bool { - return s.wave() != s.lastWave() || s.phase() != s.lastPhase() + return !reflect.DeepEqual(s.waves(), s.lastWaves()) || s.phase() != s.lastPhase() } diff --git a/pkg/sync/sync_tasks_test.go b/pkg/sync/sync_tasks_test.go index 18135beab..5c9cada67 100644 --- a/pkg/sync/sync_tasks_test.go +++ b/pkg/sync/sync_tasks_test.go @@ -1,6 +1,8 @@ package sync import ( + "fmt" + "reflect" "sort" "testing" @@ -465,9 +467,9 @@ func Test_syncTasks_multiStep(t *testing.T) { t.Run("Single", func(t *testing.T) { tasks := syncTasks{{liveObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "-1"), phase: common.SyncPhaseSync}} assert.Equal(t, common.SyncPhaseSync, string(tasks.phase())) - assert.Equal(t, -1, tasks.wave()) + assert.True(t, reflect.DeepEqual([]int{-1}, tasks.waves())) assert.Equal(t, common.SyncPhaseSync, string(tasks.lastPhase())) - assert.Equal(t, -1, tasks.lastWave()) + assert.True(t, reflect.DeepEqual([]int{-1}, tasks.lastWaves())) assert.False(t, tasks.multiStep()) }) t.Run("Double", func(t *testing.T) { @@ -476,9 +478,136 @@ func Test_syncTasks_multiStep(t *testing.T) { {liveObj: testingutils.Annotate(testingutils.NewPod(), common.AnnotationSyncWave, "1"), phase: common.SyncPhasePostSync}, } assert.Equal(t, common.SyncPhasePreSync, string(tasks.phase())) - assert.Equal(t, -1, tasks.wave()) + assert.True(t, reflect.DeepEqual([]int{-1}, tasks.waves())) assert.Equal(t, common.SyncPhasePostSync, string(tasks.lastPhase())) - assert.Equal(t, 1, tasks.lastWave()) + fmt.Println("ICI") + fmt.Println(tasks.lastWaves()) + assert.True(t, reflect.DeepEqual([]int{1}, tasks.lastWaves())) assert.True(t, tasks.multiStep()) }) } + +var tasksSingletonNormal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "-1", + }, + }, + }, + }, + }, +} + +func Test_waveSingleton(t *testing.T) { + tasks := tasksSingletonNormal + tasksWaves := tasks.waves() + tasksLastWaves := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{-1})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{-1})) +} + +var tasksNormal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "-1", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "0", + }, + }, + }, + }, + }, +} + +func Test_waveNormal(t *testing.T) { + tasks := tasksNormal + tasksWaves := tasks.waves() + tasksLastWaves := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{-1})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{0})) +} + +var tasksBTreeBothMinimal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "2", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "3", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, +} + +func Test_waveBTree_BothMinimal(t *testing.T) { + tasks := tasksBTreeBothMinimal + tasksWaves := tasks.waves() + tasksLastWaves := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{2,3})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{2,3})) +} + +var tasksBTreeOneMinimal = syncTasks{ + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "2", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, + { + targetObj: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + "argocd.argoproj.io/sync-wave": "4", + "argocd.argoproj.io/sync-wave-order": "BTree", + }, + }, + }, + }, + }, +} + +func Test_waveBTree_OneMinimal(t *testing.T) { + tasks := tasksBTreeOneMinimal + tasksWaves := tasks.waves() + tasksLastWaves := tasks.lastWaves() + assert.True(t, reflect.DeepEqual(tasksWaves, []int{2})) + assert.True(t, reflect.DeepEqual(tasksLastWaves, []int{4})) +} diff --git a/pkg/sync/syncwaves/waves.go b/pkg/sync/syncwaves/waves.go index 200433571..888d4c69f 100644 --- a/pkg/sync/syncwaves/waves.go +++ b/pkg/sync/syncwaves/waves.go @@ -19,3 +19,14 @@ func Wave(obj *unstructured.Unstructured) int { } return helmhook.Weight(obj) } + +func WaveOrdering(obj *unstructured.Unstructured) string { + text, ok := obj.GetAnnotations()[common.AnnotationSyncWaveOrder] + if ok { + if text == "BTree" { + return text + } + return "Normal" + } + return "Normal" +} diff --git a/pkg/sync/syncwaves/waves_test.go b/pkg/sync/syncwaves/waves_test.go index 2fadf4fd7..64f8826c2 100644 --- a/pkg/sync/syncwaves/waves_test.go +++ b/pkg/sync/syncwaves/waves_test.go @@ -13,3 +13,9 @@ func TestWave(t *testing.T) { assert.Equal(t, 1, Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave", "1"))) assert.Equal(t, 1, Wave(testingutils.Annotate(testingutils.NewPod(), "helm.sh/hook-weight", "1"))) } + +func TestWaveOrdering(t *testing.T) { + assert.Equal(t, "Normal", WaveOrdering(testingutils.NewPod())) + assert.Equal(t, "Normal", Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave-order", "Normal"))) + assert.Equal(t, "BTree", Wave(testingutils.Annotate(testingutils.NewPod(), "argocd.argoproj.io/sync-wave-order", "BTree"))) +} From 7bd1bddbe73754e506a4505da13c17a73dfcbf6b Mon Sep 17 00:00:00 2001 From: SebastienFelix Date: Wed, 2 Jul 2025 21:30:41 +0200 Subject: [PATCH 3/4] Cleaned up version of BTree syncwaves --- pkg/sync/sync_context.go | 48 +++++++++++++++++++++++++--------------- pkg/sync/sync_tasks.go | 11 +++++++++ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 5d3adf571..0a3ca0df4 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -906,25 +906,26 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) + // if all prune tasks have a normal syncWaveOrdering, use the legacy method. Otherwise, change the + // syncWaveOrdering of all prune tasks to BTree and modify the waves to decreasing power of 2. + // For prune tasks which already had a BTree syncWaveOrdering, set an identical syncWave to tasks which + // have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children. + normalPruneTasks := make(map[int][]*syncTask) for _, task := range tasks { - //if task.isPrune() { if task.isPrune() && task.waveOrdering() == "Normal" { normalPruneTasks[task.wave()] = append(normalPruneTasks[task.wave()], task) } } - var uniqueNormalPruneWaves []int for k := range normalPruneTasks { uniqueNormalPruneWaves = append(uniqueNormalPruneWaves, k) } sort.Ints(uniqueNormalPruneWaves) - bTreePruneTasks := make(map[int][]*syncTask) for _, task := range tasks { if task.isPrune() && task.waveOrdering() == "BTree" { - //if task.isPrune() && task.waveOrdering() == "Normal" { bTreePruneTasks[task.wave()] = append(bTreePruneTasks[task.wave()], task) } } @@ -934,47 +935,57 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { for k := range bTreePruneTasks { uniqueBTreePruneWaves = append(uniqueBTreePruneWaves, k) } - sort.Ints(uniqueBTreePruneWaves) pruneWaves := []int{0} for i := 1; i < len(uniqueNormalPruneWaves); i++ { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]+1) + pruneWaves = append(pruneWaves, i) } + nextPotentialWave := len(uniqueNormalPruneWaves) if len(uniqueNormalPruneWaves) != 0 { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]+1) + pruneWaves = append(pruneWaves, nextPotentialWave) } for i := 1; i < len(uniqueBTreePruneWaves); i++ { - if int(math.Floor(math.Log2(float64(uniqueBTreePruneWaves[i])))) == int(math.Floor(math.Log2(float64(uniqueBTreePruneWaves[i-1])))) { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]) + currentBTreeWaveLevel := biggestPowerOf2InferiorThan(uniqueBTreePruneWaves[i]) + previousBTreeWaveLevel := biggestPowerOf2InferiorThan(uniqueBTreePruneWaves[i-1]) + if currentBTreeWaveLevel == previousBTreeWaveLevel { + pruneWaves = append(pruneWaves, nextPotentialWave) } else { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]+1) + nextPotentialWave++ + pruneWaves = append(pruneWaves, nextPotentialWave) } } - reversedPruneWaves := []int{} - for i := 0; i < len(pruneWaves); i++ { - reversedPruneWaves = append(reversedPruneWaves, int(math.Pow(2, float64(pruneWaves[len(pruneWaves)-1-i])))) + bTreeWave := int(math.Pow(2, float64(pruneWaves[len(pruneWaves)-1]))) + newPruneWaves := []int{bTreeWave} + n := len(pruneWaves) + for i := 1; i < len(pruneWaves); i++ { + if pruneWaves[n-i-1] == pruneWaves[n-i] { + newPruneWaves = append(newPruneWaves, bTreeWave) + } else { + bTreeWave = bTreeWave / 2 + newPruneWaves = append(newPruneWaves, bTreeWave) + } } bTreeWaveOrdering := "BTree" for i := 0; i < len(uniqueNormalPruneWaves); i++ { - // waves to swap + // Normal waves to reorder iWave := uniqueNormalPruneWaves[i] for _, task := range normalPruneTasks[iWave] { - task.waveOverride = &reversedPruneWaves[i] + task.waveOverride = &newPruneWaves[i] task.waveOrderingOverride = &bTreeWaveOrdering } } for i := len(uniqueNormalPruneWaves); i < len(uniqueNormalPruneWaves)+len(uniqueBTreePruneWaves); i++ { - // waves to swap + // BTree waves to reorder iWave := uniqueBTreePruneWaves[i-len(uniqueNormalPruneWaves)] for _, task := range bTreePruneTasks[iWave] { - task.waveOverride = &(reversedPruneWaves[i]) + task.waveOverride = &(newPruneWaves[i]) task.waveOrderingOverride = &bTreeWaveOrdering } } @@ -1011,10 +1022,11 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } } + // if prune tasks contain BTree ordering syncWaves, then set the tasks with PruneLast if syncPhaseLastWaveOrdering == "Normal" { syncPhaseLastWave = syncPhaseLastWave + 1 } else { - syncPhaseLastWave = int(math.Pow(2, math.Floor(math.Log2(float64(syncPhaseLastWave))+1))) + syncPhaseLastWave = syncPhaseLastWave * 2 } for _, task := range tasks { diff --git a/pkg/sync/sync_tasks.go b/pkg/sync/sync_tasks.go index 1b904cb4e..a2e68d1d1 100644 --- a/pkg/sync/sync_tasks.go +++ b/pkg/sync/sync_tasks.go @@ -161,6 +161,17 @@ func LessBTree(u int, v int) bool { return false } +func biggestPowerOf2InferiorThan(n int) int { + if n < 1 { + return 0 + } + i := 1 + for i <= n { + i = i * 2 + } + return i / 2 +} + // adjust order of tasks and bubble up tasks which are dependencies of other tasks // (e.g. namespace sync should happen before resources that resides in that namespace) func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) { From 12d9d0bfc68d3fad9b6178a80048b9417da72d92 Mon Sep 17 00:00:00 2001 From: SebastienFelix Date: Wed, 2 Jul 2025 21:30:41 +0200 Subject: [PATCH 4/4] Cleaned up version of BTree syncwaves --- pkg/sync/doc.go | 11 ++++----- pkg/sync/sync_context.go | 48 +++++++++++++++++++++++++--------------- pkg/sync/sync_tasks.go | 11 +++++++++ 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/pkg/sync/doc.go b/pkg/sync/doc.go index 72b551089..20bc82aee 100644 --- a/pkg/sync/doc.go +++ b/pkg/sync/doc.go @@ -92,12 +92,13 @@ The `argocd.argoproj.io/sync-wave-order` annotation define the type of wave's or example of sync-waves ordering using BTree: - 1 -----> 2 -----> 4 - \ \----> 5 - \---> 3 -----> 6 - \----> 7 + 1 -----> 2 -----> 4 + \ \----> 5 + \---> 3 -----> 6 + \----> 7 -Note that a resource using a BTree ordering for it's sync-wave will always be synced after all resources using a Normal ordering. +Note that a resource using a BTree ordering will always be synced after all resources using a Normal ordering. +Note also that all resources using a BTree ordering with a syncWave < 1 will behave like resources having a Normal ordering. # Sync Options diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index 5d3adf571..0a3ca0df4 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -906,25 +906,26 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } // for prune tasks, modify the waves for proper cleanup i.e reverse of sync wave (creation order) + // if all prune tasks have a normal syncWaveOrdering, use the legacy method. Otherwise, change the + // syncWaveOrdering of all prune tasks to BTree and modify the waves to decreasing power of 2. + // For prune tasks which already had a BTree syncWaveOrdering, set an identical syncWave to tasks which + // have the same level in a complete binary tree rooted at 1 where each node n has 2*n and 2*n+1 as children. + normalPruneTasks := make(map[int][]*syncTask) for _, task := range tasks { - //if task.isPrune() { if task.isPrune() && task.waveOrdering() == "Normal" { normalPruneTasks[task.wave()] = append(normalPruneTasks[task.wave()], task) } } - var uniqueNormalPruneWaves []int for k := range normalPruneTasks { uniqueNormalPruneWaves = append(uniqueNormalPruneWaves, k) } sort.Ints(uniqueNormalPruneWaves) - bTreePruneTasks := make(map[int][]*syncTask) for _, task := range tasks { if task.isPrune() && task.waveOrdering() == "BTree" { - //if task.isPrune() && task.waveOrdering() == "Normal" { bTreePruneTasks[task.wave()] = append(bTreePruneTasks[task.wave()], task) } } @@ -934,47 +935,57 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { for k := range bTreePruneTasks { uniqueBTreePruneWaves = append(uniqueBTreePruneWaves, k) } - sort.Ints(uniqueBTreePruneWaves) pruneWaves := []int{0} for i := 1; i < len(uniqueNormalPruneWaves); i++ { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]+1) + pruneWaves = append(pruneWaves, i) } + nextPotentialWave := len(uniqueNormalPruneWaves) if len(uniqueNormalPruneWaves) != 0 { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]+1) + pruneWaves = append(pruneWaves, nextPotentialWave) } for i := 1; i < len(uniqueBTreePruneWaves); i++ { - if int(math.Floor(math.Log2(float64(uniqueBTreePruneWaves[i])))) == int(math.Floor(math.Log2(float64(uniqueBTreePruneWaves[i-1])))) { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]) + currentBTreeWaveLevel := biggestPowerOf2InferiorThan(uniqueBTreePruneWaves[i]) + previousBTreeWaveLevel := biggestPowerOf2InferiorThan(uniqueBTreePruneWaves[i-1]) + if currentBTreeWaveLevel == previousBTreeWaveLevel { + pruneWaves = append(pruneWaves, nextPotentialWave) } else { - pruneWaves = append(pruneWaves, pruneWaves[len(pruneWaves)-1]+1) + nextPotentialWave++ + pruneWaves = append(pruneWaves, nextPotentialWave) } } - reversedPruneWaves := []int{} - for i := 0; i < len(pruneWaves); i++ { - reversedPruneWaves = append(reversedPruneWaves, int(math.Pow(2, float64(pruneWaves[len(pruneWaves)-1-i])))) + bTreeWave := int(math.Pow(2, float64(pruneWaves[len(pruneWaves)-1]))) + newPruneWaves := []int{bTreeWave} + n := len(pruneWaves) + for i := 1; i < len(pruneWaves); i++ { + if pruneWaves[n-i-1] == pruneWaves[n-i] { + newPruneWaves = append(newPruneWaves, bTreeWave) + } else { + bTreeWave = bTreeWave / 2 + newPruneWaves = append(newPruneWaves, bTreeWave) + } } bTreeWaveOrdering := "BTree" for i := 0; i < len(uniqueNormalPruneWaves); i++ { - // waves to swap + // Normal waves to reorder iWave := uniqueNormalPruneWaves[i] for _, task := range normalPruneTasks[iWave] { - task.waveOverride = &reversedPruneWaves[i] + task.waveOverride = &newPruneWaves[i] task.waveOrderingOverride = &bTreeWaveOrdering } } for i := len(uniqueNormalPruneWaves); i < len(uniqueNormalPruneWaves)+len(uniqueBTreePruneWaves); i++ { - // waves to swap + // BTree waves to reorder iWave := uniqueBTreePruneWaves[i-len(uniqueNormalPruneWaves)] for _, task := range bTreePruneTasks[iWave] { - task.waveOverride = &(reversedPruneWaves[i]) + task.waveOverride = &(newPruneWaves[i]) task.waveOrderingOverride = &bTreeWaveOrdering } } @@ -1011,10 +1022,11 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) { } } + // if prune tasks contain BTree ordering syncWaves, then set the tasks with PruneLast if syncPhaseLastWaveOrdering == "Normal" { syncPhaseLastWave = syncPhaseLastWave + 1 } else { - syncPhaseLastWave = int(math.Pow(2, math.Floor(math.Log2(float64(syncPhaseLastWave))+1))) + syncPhaseLastWave = syncPhaseLastWave * 2 } for _, task := range tasks { diff --git a/pkg/sync/sync_tasks.go b/pkg/sync/sync_tasks.go index 1b904cb4e..a2e68d1d1 100644 --- a/pkg/sync/sync_tasks.go +++ b/pkg/sync/sync_tasks.go @@ -161,6 +161,17 @@ func LessBTree(u int, v int) bool { return false } +func biggestPowerOf2InferiorThan(n int) int { + if n < 1 { + return 0 + } + i := 1 + for i <= n { + i = i * 2 + } + return i / 2 +} + // adjust order of tasks and bubble up tasks which are dependencies of other tasks // (e.g. namespace sync should happen before resources that resides in that namespace) func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) {