Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions bundle/direct/dstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type Database struct {
Lineage string `json:"lineage"`
Serial int `json:"serial"`
State map[string]ResourceEntry `json:"state"`
WAL []LogEntry `json:"wal,omitempty"`
}

type LogEntry struct {
Op string `json:"op"`
Key string `json:"key"`
ID string `json:"id,omitempty"`
State any `json:"state,omitempty"`
}

type ResourceEntry struct {
Expand All @@ -34,15 +42,16 @@ func (db *DeploymentState) SaveState(key, newID string, state any) error {
db.mu.Lock()
defer db.mu.Unlock()

if db.Data.State == nil {
db.Data.State = make(map[string]ResourceEntry)
db.Data.WAL = append(db.Data.WAL, LogEntry{Op: "save", Key: key, ID: newID, State: state})
if err := db.writeState(); err != nil {
db.Data.WAL = db.Data.WAL[:len(db.Data.WAL)-1] // Rollback on failure
return err
}

db.Data.State[key] = ResourceEntry{
ID: newID,
State: state,
if db.Data.State == nil {
db.Data.State = make(map[string]ResourceEntry)
}

db.Data.State[key] = ResourceEntry{ID: newID, State: state}
return nil
}

Expand All @@ -51,12 +60,13 @@ func (db *DeploymentState) DeleteState(key string) error {
db.mu.Lock()
defer db.mu.Unlock()

if db.Data.State == nil {
return nil
db.Data.WAL = append(db.Data.WAL, LogEntry{Op: "delete", Key: key})
if err := db.writeState(); err != nil {
db.Data.WAL = db.Data.WAL[:len(db.Data.WAL)-1] // Rollback on failure
return err
}

delete(db.Data.State, key)

return nil
}

Expand Down Expand Up @@ -104,14 +114,21 @@ func (db *DeploymentState) Open(path string) error {
}

db.Path = path
db.replayWAL()
return nil
}

func (db *DeploymentState) Finalize() error {
db.mu.Lock()
defer db.mu.Unlock()

return db.unlockedSave()
oldWAL := db.Data.WAL
db.Data.WAL = nil
if err := db.writeState(); err != nil {
db.Data.WAL = oldWAL // Restore on failure
return err
}
return nil
}

func (db *DeploymentState) AssertOpened() {
Expand Down Expand Up @@ -148,17 +165,25 @@ func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.Export
return result
}

func (db *DeploymentState) unlockedSave() error {
func (db *DeploymentState) writeState() error {
db.AssertOpened()
data, err := json.MarshalIndent(db.Data, "", " ")
if err != nil {
return err
}
return os.WriteFile(db.Path, data, 0o600)
}

err = os.WriteFile(db.Path, data, 0o600)
if err != nil {
return fmt.Errorf("failed to save resources state to %#v: %w", db.Path, err)
func (db *DeploymentState) replayWAL() {
for _, entry := range db.Data.WAL {
switch entry.Op {
case "save":
if db.Data.State == nil {
db.Data.State = make(map[string]ResourceEntry)
}
db.Data.State[entry.Key] = ResourceEntry{ID: entry.ID, State: entry.State}
case "delete":
delete(db.Data.State, entry.Key)
}
}

return nil
}
193 changes: 193 additions & 0 deletions bundle/direct/dstate/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package dstate

import (
"encoding/json"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWALSaveAndReplay(t *testing.T) {
tmpDir := t.TempDir()
statePath := filepath.Join(tmpDir, "state.json")

db := &DeploymentState{}
err := db.Open(statePath)
require.NoError(t, err)

err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
require.NoError(t, err)

err = db.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"})
require.NoError(t, err)

// Verify WAL exists in file
data, err := os.ReadFile(statePath)
require.NoError(t, err)
var loaded Database
require.NoError(t, json.Unmarshal(data, &loaded))
assert.Len(t, loaded.WAL, 2)

// Simulate crash: open a new state (WAL replays on open)
db2 := &DeploymentState{}
err = db2.Open(statePath)
require.NoError(t, err)

entry1, ok := db2.GetResourceEntry("resources.jobs.job1")
assert.True(t, ok)
assert.Equal(t, "123", entry1.ID)

entry2, ok := db2.GetResourceEntry("resources.jobs.job2")
assert.True(t, ok)
assert.Equal(t, "456", entry2.ID)
}

func TestWALDeleteAndReplay(t *testing.T) {
tmpDir := t.TempDir()
statePath := filepath.Join(tmpDir, "state.json")

db := &DeploymentState{}
err := db.Open(statePath)
require.NoError(t, err)

err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
require.NoError(t, err)

err = db.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"})
require.NoError(t, err)

err = db.DeleteState("resources.jobs.job1")
require.NoError(t, err)

// Simulate crash and reopen
db2 := &DeploymentState{}
err = db2.Open(statePath)
require.NoError(t, err)

_, ok := db2.GetResourceEntry("resources.jobs.job1")
assert.False(t, ok, "job1 should be deleted")

entry2, ok := db2.GetResourceEntry("resources.jobs.job2")
assert.True(t, ok)
assert.Equal(t, "456", entry2.ID)
}

func TestWALFinalizeClearsWAL(t *testing.T) {
tmpDir := t.TempDir()
statePath := filepath.Join(tmpDir, "state.json")

db := &DeploymentState{}
err := db.Open(statePath)
require.NoError(t, err)

err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
require.NoError(t, err)

// Verify WAL has entries before finalize
data, err := os.ReadFile(statePath)
require.NoError(t, err)
var before Database
require.NoError(t, json.Unmarshal(data, &before))
assert.NotEmpty(t, before.WAL)

err = db.Finalize()
require.NoError(t, err)

// Verify WAL is cleared after finalize
data, err = os.ReadFile(statePath)
require.NoError(t, err)
var after Database
require.NoError(t, json.Unmarshal(data, &after))
assert.Empty(t, after.WAL)

// State still has the entry
db2 := &DeploymentState{}
err = db2.Open(statePath)
require.NoError(t, err)

entry, ok := db2.GetResourceEntry("resources.jobs.job1")
assert.True(t, ok)
assert.Equal(t, "123", entry.ID)
}

func TestWALWithExistingState(t *testing.T) {
tmpDir := t.TempDir()
statePath := filepath.Join(tmpDir, "state.json")

// Create state with one entry and finalize
db := &DeploymentState{}
err := db.Open(statePath)
require.NoError(t, err)

err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
require.NoError(t, err)

err = db.Finalize()
require.NoError(t, err)

// Open again and add more entries without finalizing
db2 := &DeploymentState{}
err = db2.Open(statePath)
require.NoError(t, err)

err = db2.SaveState("resources.jobs.job2", "456", map[string]any{"name": "job2"})
require.NoError(t, err)

// Simulate crash and reopen
db3 := &DeploymentState{}
err = db3.Open(statePath)
require.NoError(t, err)

entry1, ok := db3.GetResourceEntry("resources.jobs.job1")
assert.True(t, ok)
assert.Equal(t, "123", entry1.ID)

entry2, ok := db3.GetResourceEntry("resources.jobs.job2")
assert.True(t, ok)
assert.Equal(t, "456", entry2.ID)
}

func TestWALUpdateExistingEntry(t *testing.T) {
tmpDir := t.TempDir()
statePath := filepath.Join(tmpDir, "state.json")

db := &DeploymentState{}
err := db.Open(statePath)
require.NoError(t, err)

err = db.SaveState("resources.jobs.job1", "123", map[string]any{"name": "job1"})
require.NoError(t, err)

err = db.SaveState("resources.jobs.job1", "789", map[string]any{"name": "job1-updated"})
require.NoError(t, err)

// Simulate crash and reopen
db2 := &DeploymentState{}
err = db2.Open(statePath)
require.NoError(t, err)

entry, ok := db2.GetResourceEntry("resources.jobs.job1")
assert.True(t, ok)
assert.Equal(t, "789", entry.ID)
}

func TestNoStateFile(t *testing.T) {
tmpDir := t.TempDir()
statePath := filepath.Join(tmpDir, "state.json")

db := &DeploymentState{}
err := db.Open(statePath)
require.NoError(t, err)

err = db.Finalize()
require.NoError(t, err)

db2 := &DeploymentState{}
err = db2.Open(statePath)
require.NoError(t, err)

assert.NotEmpty(t, db2.Data.Lineage)
}