diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index ab9f07f1a9..3fc8f3480d 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -22,6 +22,14 @@ type Database struct { Lineage string `json:"lineage"` Serial int `json:"serial"` State map[string]ResourceEntry `json:"state"` + WAL []LogEntry `json:"wal,omitempty"` +} + +type LogEntry struct { + Op string `json:"op"` + Key string `json:"key"` + ID string `json:"id,omitempty"` + State any `json:"state,omitempty"` } type ResourceEntry struct { @@ -34,15 +42,16 @@ func (db *DeploymentState) SaveState(key, newID string, state any) error { db.mu.Lock() defer db.mu.Unlock() - if db.Data.State == nil { - db.Data.State = make(map[string]ResourceEntry) + db.Data.WAL = append(db.Data.WAL, LogEntry{Op: "save", Key: key, ID: newID, State: state}) + if err := db.writeState(); err != nil { + db.Data.WAL = db.Data.WAL[:len(db.Data.WAL)-1] // Rollback on failure + return err } - db.Data.State[key] = ResourceEntry{ - ID: newID, - State: state, + if db.Data.State == nil { + db.Data.State = make(map[string]ResourceEntry) } - + db.Data.State[key] = ResourceEntry{ID: newID, State: state} return nil } @@ -51,12 +60,13 @@ func (db *DeploymentState) DeleteState(key string) error { db.mu.Lock() defer db.mu.Unlock() - if db.Data.State == nil { - return nil + db.Data.WAL = append(db.Data.WAL, LogEntry{Op: "delete", Key: key}) + if err := db.writeState(); err != nil { + db.Data.WAL = db.Data.WAL[:len(db.Data.WAL)-1] // Rollback on failure + return err } delete(db.Data.State, key) - return nil } @@ -104,6 +114,7 @@ func (db *DeploymentState) Open(path string) error { } db.Path = path + db.replayWAL() return nil } @@ -111,7 +122,13 @@ func (db *DeploymentState) Finalize() error { db.mu.Lock() defer db.mu.Unlock() - return db.unlockedSave() + oldWAL := db.Data.WAL + db.Data.WAL = nil + if err := db.writeState(); err != nil { + db.Data.WAL = oldWAL // Restore on failure + return err + } + return nil } func (db *DeploymentState) AssertOpened() { @@ -148,17 +165,25 @@ func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.Export return result } -func (db *DeploymentState) unlockedSave() error { +func (db *DeploymentState) writeState() error { db.AssertOpened() data, err := json.MarshalIndent(db.Data, "", " ") if err != nil { return err } + return os.WriteFile(db.Path, data, 0o600) +} - err = os.WriteFile(db.Path, data, 0o600) - if err != nil { - return fmt.Errorf("failed to save resources state to %#v: %w", db.Path, err) +func (db *DeploymentState) replayWAL() { + for _, entry := range db.Data.WAL { + switch entry.Op { + case "save": + if db.Data.State == nil { + db.Data.State = make(map[string]ResourceEntry) + } + db.Data.State[entry.Key] = ResourceEntry{ID: entry.ID, State: entry.State} + case "delete": + delete(db.Data.State, entry.Key) + } } - - return nil } diff --git a/bundle/direct/dstate/state_test.go b/bundle/direct/dstate/state_test.go new file mode 100644 index 0000000000..9be97b2696 --- /dev/null +++ b/bundle/direct/dstate/state_test.go @@ -0,0 +1,193 @@ +package dstate + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWALSaveAndReplay(t *testing.T) { + tmpDir := t.TempDir() + statePath := filepath.Join(tmpDir, "state.json") + + db := &DeploymentState{} + err := db.Open(statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"}) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"}) + require.NoError(t, err) + + // Verify WAL exists in file + data, err := os.ReadFile(statePath) + require.NoError(t, err) + var loaded Database + require.NoError(t, json.Unmarshal(data, &loaded)) + assert.Len(t, loaded.WAL, 2) + + // Simulate crash: open a new state (WAL replays on open) + db2 := &DeploymentState{} + err = db2.Open(statePath) + require.NoError(t, err) + + entry1, ok := db2.GetResourceEntry("resources.jobs.job1") + assert.True(t, ok) + assert.Equal(t, "123", entry1.ID) + + entry2, ok := db2.GetResourceEntry("resources.jobs.job2") + assert.True(t, ok) + assert.Equal(t, "456", entry2.ID) +} + +func TestWALDeleteAndReplay(t *testing.T) { + tmpDir := t.TempDir() + statePath := filepath.Join(tmpDir, "state.json") + + db := &DeploymentState{} + err := db.Open(statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"}) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"}) + require.NoError(t, err) + + err = db.DeleteState("resources.jobs.job1") + require.NoError(t, err) + + // Simulate crash and reopen + db2 := &DeploymentState{} + err = db2.Open(statePath) + require.NoError(t, err) + + _, ok := db2.GetResourceEntry("resources.jobs.job1") + assert.False(t, ok, "job1 should be deleted") + + entry2, ok := db2.GetResourceEntry("resources.jobs.job2") + assert.True(t, ok) + assert.Equal(t, "456", entry2.ID) +} + +func TestWALFinalizeClearsWAL(t *testing.T) { + tmpDir := t.TempDir() + statePath := filepath.Join(tmpDir, "state.json") + + db := &DeploymentState{} + err := db.Open(statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"}) + require.NoError(t, err) + + // Verify WAL has entries before finalize + data, err := os.ReadFile(statePath) + require.NoError(t, err) + var before Database + require.NoError(t, json.Unmarshal(data, &before)) + assert.NotEmpty(t, before.WAL) + + err = db.Finalize() + require.NoError(t, err) + + // Verify WAL is cleared after finalize + data, err = os.ReadFile(statePath) + require.NoError(t, err) + var after Database + require.NoError(t, json.Unmarshal(data, &after)) + assert.Empty(t, after.WAL) + + // State still has the entry + db2 := &DeploymentState{} + err = db2.Open(statePath) + require.NoError(t, err) + + entry, ok := db2.GetResourceEntry("resources.jobs.job1") + assert.True(t, ok) + assert.Equal(t, "123", entry.ID) +} + +func TestWALWithExistingState(t *testing.T) { + tmpDir := t.TempDir() + statePath := filepath.Join(tmpDir, "state.json") + + // Create state with one entry and finalize + db := &DeploymentState{} + err := db.Open(statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"}) + require.NoError(t, err) + + err = db.Finalize() + require.NoError(t, err) + + // Open again and add more entries without finalizing + db2 := &DeploymentState{} + err = db2.Open(statePath) + require.NoError(t, err) + + err = db2.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"}) + require.NoError(t, err) + + // Simulate crash and reopen + db3 := &DeploymentState{} + err = db3.Open(statePath) + require.NoError(t, err) + + entry1, ok := db3.GetResourceEntry("resources.jobs.job1") + assert.True(t, ok) + assert.Equal(t, "123", entry1.ID) + + entry2, ok := db3.GetResourceEntry("resources.jobs.job2") + assert.True(t, ok) + assert.Equal(t, "456", entry2.ID) +} + +func TestWALUpdateExistingEntry(t *testing.T) { + tmpDir := t.TempDir() + statePath := filepath.Join(tmpDir, "state.json") + + db := &DeploymentState{} + err := db.Open(statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"}) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "789", map[string]any{"name": "job1-updated"}) + require.NoError(t, err) + + // Simulate crash and reopen + db2 := &DeploymentState{} + err = db2.Open(statePath) + require.NoError(t, err) + + entry, ok := db2.GetResourceEntry("resources.jobs.job1") + assert.True(t, ok) + assert.Equal(t, "789", entry.ID) +} + +func TestNoStateFile(t *testing.T) { + tmpDir := t.TempDir() + statePath := filepath.Join(tmpDir, "state.json") + + db := &DeploymentState{} + err := db.Open(statePath) + require.NoError(t, err) + + err = db.Finalize() + require.NoError(t, err) + + db2 := &DeploymentState{} + err = db2.Open(statePath) + require.NoError(t, err) + + assert.NotEmpty(t, db2.Data.Lineage) +}