From 07ced216424e24d97465a74e6bc225fb47e383e4 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 12:21:57 +0700 Subject: [PATCH 01/10] refactor: update alert payload schema Co-Authored-By: Claude Opus 4.5 --- cmd/e2e/alert/alert.go | 37 ++++++++++--- cmd/e2e/alerts_test.go | 64 +++++++++++++++++++++- internal/alert/monitor.go | 13 +++-- internal/alert/monitor_test.go | 8 +-- internal/alert/notifier.go | 35 +++++++++--- internal/alert/notifier_test.go | 2 +- internal/deliverymq/messagehandler.go | 18 ++---- internal/deliverymq/messagehandler_test.go | 6 +- 8 files changed, 138 insertions(+), 45 deletions(-) diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index e96443cf..abf73ce1 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -8,8 +8,6 @@ import ( "net/http" "sync" "time" - - "github.com/hookdeck/outpost/internal/models" ) type AlertRequest struct { @@ -23,12 +21,37 @@ type AlertPayload struct { Data ConsecutiveFailureData `json:"data"` } +// AlertedEvent matches internal/alert.AlertedEvent +type AlertedEvent struct { + ID string `json:"id"` + Topic string `json:"topic"` + Metadata map[string]string `json:"metadata"` + Data map[string]any `json:"data"` +} + +// AlertDestination matches internal/alert.AlertDestination +type AlertDestination struct { + ID string `json:"id"` + TenantID string `json:"tenant_id"` + Type string `json:"type"` + Topics []string `json:"topics"` + Filter map[string]any `json:"filter,omitempty"` + Config map[string]string `json:"config"` + Metadata map[string]string `json:"metadata,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + DisabledAt *time.Time `json:"disabled_at"` +} + +// ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData type ConsecutiveFailureData struct { - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - ConsecutiveFailures int `json:"consecutive_failures"` - WillDisable bool `json:"will_disable"` - Destination *models.Destination `json:"destination"` - Data map[string]interface{} `json:"data"` + TenantID string `json:"tenant_id"` + Event AlertedEvent `json:"event"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + ConsecutiveFailures int `json:"consecutive_failures"` + WillDisable bool `json:"will_disable"` + Destination *AlertDestination `json:"destination"` + AttemptResponse map[string]any `json:"attempt_response"` } type AlertMockServer struct { diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index cbb2cc4e..5774b5fd 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -7,7 +7,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) // Publish 20 failing events - for i := 0; i < 20; i++ { + for i := range 20 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -27,9 +27,39 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { expectedCounts := []int{10, 14, 18, 20} for i, alert := range alerts { + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + + // TenantID assertion + s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") + s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + + // Event assertions + s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") + s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") + s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + + // ConsecutiveFailures assertions s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) + s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + + // WillDisable assertion (should be true for last alert only) + if i == len(alerts)-1 { + s.True(alert.Alert.Data.WillDisable, "last alert should have will_disable=true") + } + + // AttemptResponse assertion + s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") } } @@ -38,7 +68,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) // First batch: 14 failures - for i := 0; i < 14; i++ { + for i := range 14 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -56,7 +86,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.waitForNewMockServerEvents(dest.mockID, 15) // Second batch: 14 more failures - for i := 0; i < 14; i++ { + for i := range 14 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -76,8 +106,36 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { expectedCounts := []int{10, 14, 10, 14} for i, alert := range alerts { + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + + // TenantID assertion + s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") + s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + + // Event assertions + s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") + s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") + s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + + // ConsecutiveFailures assertions s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) + s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + + // WillDisable assertion (none should have will_disable=true since counter resets) + s.False(alert.Alert.Data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) + + // AttemptResponse assertion + s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") } } diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 39ff54f4..7eeb10df 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -82,11 +82,11 @@ func WithDeploymentID(deploymentID string) AlertOption { // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { - Success bool - DeliveryTask *models.DeliveryTask - Destination *AlertDestination - Timestamp time.Time - DeliveryResponse map[string]interface{} + Success bool + DeliveryTask *models.DeliveryTask + Destination *AlertDestination + Timestamp time.Time + AttemptResponse map[string]interface{} } type alertMonitor struct { @@ -154,6 +154,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ + TenantID: attempt.Destination.TenantID, Event: AlertedEvent{ ID: attempt.DeliveryTask.Event.ID, Topic: attempt.DeliveryTask.Event.Topic, @@ -164,7 +165,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp ConsecutiveFailures: count, WillDisable: m.disabler != nil && level == 100, Destination: attempt.Destination, - DeliveryResponse: attempt.DeliveryResponse, + AttemptResponse: attempt.AttemptResponse, }) // If we've hit 100% and have a disabler configured, disable the destination diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 16372475..2185635e 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -58,7 +58,7 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { Success: false, DeliveryTask: task, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "500", "data": map[string]any{"error": "test error"}, }, @@ -80,7 +80,7 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") require.Equal(t, dest, alert.Data.Destination) require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.DeliveryResponse, alert.Data.DeliveryResponse) + require.Equal(t, attempt.AttemptResponse, alert.Data.AttemptResponse) require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") } @@ -125,7 +125,7 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { Success: false, DeliveryTask: task, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "500", "data": map[string]any{"error": "test error"}, }, @@ -198,7 +198,7 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { Success: false, DeliveryTask: task, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "500", }, Timestamp: time.Now(), diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index ef9589bc..92cc8b51 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -50,23 +50,42 @@ type AlertedEvent struct { } type AlertDestination struct { - ID string `json:"id" redis:"id"` - TenantID string `json:"tenant_id" redis:"-"` - Type string `json:"type" redis:"type"` - Topics models.Topics `json:"topics" redis:"-"` - Config models.Config `json:"config" redis:"-"` - CreatedAt time.Time `json:"created_at" redis:"created_at"` - DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` + ID string `json:"id" redis:"id"` + TenantID string `json:"tenant_id" redis:"-"` + Type string `json:"type" redis:"type"` + Topics models.Topics `json:"topics" redis:"-"` + Filter models.Filter `json:"filter,omitempty" redis:"-"` + Config models.Config `json:"config" redis:"-"` + Metadata models.Metadata `json:"metadata,omitempty" redis:"-"` + CreatedAt time.Time `json:"created_at" redis:"created_at"` + UpdatedAt time.Time `json:"updated_at" redis:"updated_at"` + DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` +} + +func AlertDestinationFromDestination(d *models.Destination) *AlertDestination { + return &AlertDestination{ + ID: d.ID, + TenantID: d.TenantID, + Type: d.Type, + Topics: d.Topics, + Filter: d.Filter, + Config: d.Config, + Metadata: d.Metadata, + CreatedAt: d.CreatedAt, + UpdatedAt: d.UpdatedAt, + DisabledAt: d.DisabledAt, + } } // ConsecutiveFailureData represents the data needed for a consecutive failure alert type ConsecutiveFailureData struct { + TenantID string `json:"tenant_id"` Event AlertedEvent `json:"event"` MaxConsecutiveFailures int `json:"max_consecutive_failures"` ConsecutiveFailures int `json:"consecutive_failures"` WillDisable bool `json:"will_disable"` Destination *AlertDestination `json:"destination"` - DeliveryResponse map[string]interface{} `json:"delivery_response"` + AttemptResponse map[string]interface{} `json:"attempt_response"` } // ConsecutiveFailureAlert represents an alert for consecutive failures diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 31661e52..43ff62e4 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -102,7 +102,7 @@ func TestAlertNotifier_Notify(t *testing.T) { ConsecutiveFailures: 5, WillDisable: true, Destination: dest, - DeliveryResponse: map[string]interface{}{ + AttemptResponse: map[string]interface{}{ "status": "error", "data": map[string]any{"code": "ETIMEDOUT"}, }, diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index f077c99d..39eca454 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -298,16 +298,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De alertAttempt := alert.DeliveryAttempt{ Success: attemptResult.Status == models.AttemptStatusSuccess, DeliveryTask: task, - Destination: &alert.AlertDestination{ - ID: destination.ID, - TenantID: destination.TenantID, - Type: destination.Type, - Topics: destination.Topics, - Config: destination.Config, - CreatedAt: destination.CreatedAt, - DisabledAt: destination.DisabledAt, - }, - Timestamp: attemptResult.Time, + Destination: alert.AlertDestinationFromDestination(destination), + Timestamp: attemptResult.Time, } if !alertAttempt.Success && err != nil { @@ -316,14 +308,14 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De if errors.As(err, &atmErr) { var pubErr *destregistry.ErrDestinationPublishAttempt if errors.As(atmErr.err, &pubErr) { - alertAttempt.DeliveryResponse = pubErr.Data + alertAttempt.AttemptResponse = pubErr.Data } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ + alertAttempt.AttemptResponse = map[string]interface{}{ "error": atmErr.err.Error(), } } } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ + alertAttempt.AttemptResponse = map[string]interface{}{ "error": "unexpected", "message": err.Error(), } diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index ebbab75f..f3a5212b 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -989,7 +989,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { return attempt.Success && // Should be a successful attempt attempt.Destination.ID == destination.ID && // Should have correct destination attempt.DeliveryTask != nil && // Should have delivery task - attempt.DeliveryResponse == nil // No error data for success + attempt.AttemptResponse == nil // No error data for success })).Return(nil) // Setup message handler @@ -1112,9 +1112,9 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina assert.NotNil(t, attempt.DeliveryTask, "alert attempt should have delivery task") if expectedData != nil { - assert.Equal(t, expectedData, attempt.DeliveryResponse, "alert attempt data should match") + assert.Equal(t, expectedData, attempt.AttemptResponse, "alert attempt data should match") } else { - assert.Nil(t, attempt.DeliveryResponse, "alert attempt should not have data") + assert.Nil(t, attempt.AttemptResponse, "alert attempt should not have data") } } From cea11271ac21457272dfb6696f1e87da7ed8a17d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 12:34:56 +0700 Subject: [PATCH 02/10] test: add failing tests for alert.destination.disabled callback TDD setup - tests will pass once feature is implemented. Co-Authored-By: Claude Opus 4.5 --- cmd/e2e/alert/alert.go | 75 +++++++++++++++++++- cmd/e2e/alerts_test.go | 125 +++++++++++++++++++++++++-------- cmd/e2e/helpers_test.go | 17 +++++ internal/alert/monitor_test.go | 86 +++++++++++++++++++++++ internal/alert/notifier.go | 33 +++++++++ 5 files changed, 307 insertions(+), 29 deletions(-) diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index abf73ce1..bc901498 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -16,11 +16,25 @@ type AlertRequest struct { } type AlertPayload struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data json.RawMessage `json:"data"` +} + +// ConsecutiveFailureAlert is a parsed alert for "alert.consecutive_failure" +type ConsecutiveFailureAlert struct { Topic string `json:"topic"` Timestamp time.Time `json:"timestamp"` Data ConsecutiveFailureData `json:"data"` } +// DestinationDisabledAlert is a parsed alert for "alert.destination.disabled" +type DestinationDisabledAlert struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data DestinationDisabledData `json:"data"` +} + // AlertedEvent matches internal/alert.AlertedEvent type AlertedEvent struct { ID string `json:"id"` @@ -54,6 +68,17 @@ type ConsecutiveFailureData struct { AttemptResponse map[string]any `json:"attempt_response"` } +// DestinationDisabledData matches the expected payload for "alert.destination.disabled" +type DestinationDisabledData struct { + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"` + ConsecutiveFailures int `json:"consecutive_failures"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + AttemptResponse map[string]any `json:"attempt_response"` +} + type AlertMockServer struct { server *http.Server alerts []AlertRequest @@ -140,6 +165,11 @@ func (s *AlertMockServer) handleAlert(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +// alertDataWithDestination is used to extract destination from any alert type +type alertDataWithDestination struct { + Destination *AlertDestination `json:"destination"` +} + // Helper methods for assertions func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertRequest { s.mu.RLock() @@ -147,7 +177,11 @@ func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertR var filtered []AlertRequest for _, alert := range s.alerts { - if alert.Alert.Data.Destination != nil && alert.Alert.Data.Destination.ID == destinationID { + var data alertDataWithDestination + if err := json.Unmarshal(alert.Alert.Data, &data); err != nil { + continue + } + if data.Destination != nil && data.Destination.ID == destinationID { filtered = append(filtered, alert) } } @@ -164,3 +198,42 @@ func (s *AlertMockServer) GetLastAlert() *AlertRequest { alert := s.alerts[len(s.alerts)-1] return &alert } + +// GetAlertsForDestinationByTopic returns alerts filtered by destination ID and topic +func (s *AlertMockServer) GetAlertsForDestinationByTopic(destinationID, topic string) []AlertRequest { + s.mu.RLock() + defer s.mu.RUnlock() + + var filtered []AlertRequest + for _, alert := range s.alerts { + if alert.Alert.Topic != topic { + continue + } + var data alertDataWithDestination + if err := json.Unmarshal(alert.Alert.Data, &data); err != nil { + continue + } + if data.Destination != nil && data.Destination.ID == destinationID { + filtered = append(filtered, alert) + } + } + return filtered +} + +// ParseConsecutiveFailureData parses the Data field as ConsecutiveFailureData +func (a *AlertRequest) ParseConsecutiveFailureData() (*ConsecutiveFailureData, error) { + var data ConsecutiveFailureData + if err := json.Unmarshal(a.Alert.Data, &data); err != nil { + return nil, err + } + return &data, nil +} + +// ParseDestinationDisabledData parses the Data field as DestinationDisabledData +func (a *AlertRequest) ParseDestinationDisabledData() (*DestinationDisabledData, error) { + var data DestinationDisabledData + if err := json.Unmarshal(a.Alert.Data, &data); err != nil { + return nil, err + } + return &data, nil +} diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index 5774b5fd..4622b097 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -22,11 +22,15 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { // Wait for 4 alert callbacks to be processed s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestination(dest.ID) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 18, 20} for i, alert := range alerts { + // Parse alert data + data, err := alert.ParseConsecutiveFailureData() + s.Require().NoError(err, "failed to parse consecutive failure data") + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") @@ -34,32 +38,32 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") // TenantID assertion - s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") - s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") // Destination assertions - s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") - s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") - s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") - s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") // Event assertions - s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") - s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") - s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + s.NotEmpty(data.Event.ID, "alert event should have ID") + s.Equal("user.created", data.Event.Topic, "alert event topic should match") + s.NotNil(data.Event.Data, "alert event should have data") // ConsecutiveFailures assertions - s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, + s.Equal(expectedCounts[i], data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) - s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20") // WillDisable assertion (should be true for last alert only) if i == len(alerts)-1 { - s.True(alert.Alert.Data.WillDisable, "last alert should have will_disable=true") + s.True(data.WillDisable, "last alert should have will_disable=true") } // AttemptResponse assertion - s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") + s.NotNil(data.AttemptResponse, "alert should have attempt_response") } } @@ -101,11 +105,15 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { // Wait for 4 alert callbacks: [10, 14] from first batch, [10, 14] from second batch s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestination(dest.ID) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 10, 14} for i, alert := range alerts { + // Parse alert data + data, err := alert.ParseConsecutiveFailureData() + s.Require().NoError(err, "failed to parse consecutive failure data") + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") @@ -113,29 +121,90 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") // TenantID assertion - s.NotEmpty(alert.Alert.Data.TenantID, "alert should have tenant_id") - s.Equal(tenant.ID, alert.Alert.Data.TenantID, "alert tenant_id should match") + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") // Destination assertions - s.Require().NotNil(alert.Alert.Data.Destination, "alert should have destination") - s.Equal(dest.ID, alert.Alert.Data.Destination.ID, "alert destination ID should match") - s.Equal(tenant.ID, alert.Alert.Data.Destination.TenantID, "alert destination tenant_id should match") - s.Equal("webhook", alert.Alert.Data.Destination.Type, "alert destination type should be webhook") + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") // Event assertions - s.NotEmpty(alert.Alert.Data.Event.ID, "alert event should have ID") - s.Equal("user.created", alert.Alert.Data.Event.Topic, "alert event topic should match") - s.NotNil(alert.Alert.Data.Event.Data, "alert event should have data") + s.NotEmpty(data.Event.ID, "alert event should have ID") + s.Equal("user.created", data.Event.Topic, "alert event topic should match") + s.NotNil(data.Event.Data, "alert event should have data") // ConsecutiveFailures assertions - s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, + s.Equal(expectedCounts[i], data.ConsecutiveFailures, "alert %d should have %d consecutive failures", i, expectedCounts[i]) - s.Equal(20, alert.Alert.Data.MaxConsecutiveFailures, "max consecutive failures should be 20") + s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20") // WillDisable assertion (none should have will_disable=true since counter resets) - s.False(alert.Alert.Data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) + s.False(data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) // AttemptResponse assertion - s.NotNil(alert.Alert.Data.AttemptResponse, "alert should have attempt_response") + s.NotNil(data.AttemptResponse, "alert should have attempt_response") + } +} + +func (s *basicSuite) TestAlerts_DestinationDisabledCallback() { + tenant := s.createTenant() + dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) + + // Publish 20 failing events to trigger auto-disable + for i := range 20 { + s.publish(tenant.ID, "user.created", map[string]any{ + "index": i, + }, withPublishMetadata(map[string]string{"should_err": "true"})) + } + + // Wait for destination to be disabled (sync point for all 20 deliveries) + s.waitForNewDestinationDisabled(tenant.ID, dest.ID) + + // Verify destination is disabled + got := s.getDestination(tenant.ID, dest.ID) + s.NotNil(got.DisabledAt, "destination should be disabled") + + // Wait for the destination.disabled alert callback + s.waitForAlertsByTopic(dest.ID, "alert.destination.disabled", 1) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.disabled") + s.Require().Len(alerts, 1, "should have 1 destination.disabled alert") + + alert := alerts[0] + data, err := alert.ParseDestinationDisabledData() + s.Require().NoError(err, "failed to parse destination disabled data") + + // Auth header assertion + s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.destination.disabled", alert.Alert.Topic, "alert topic should be alert.destination.disabled") + + // TenantID assertion + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") + s.NotNil(data.Destination.DisabledAt, "destination should have disabled_at set") + + // DisabledAt assertion + s.False(data.DisabledAt.IsZero(), "disabled_at should not be zero") + + // TriggeringEvent assertions (optional but expected) + if data.TriggeringEvent != nil { + s.NotEmpty(data.TriggeringEvent.ID, "triggering event should have ID") + s.Equal("user.created", data.TriggeringEvent.Topic, "triggering event topic should match") } + + // ConsecutiveFailures assertions + s.Equal(20, data.ConsecutiveFailures, "consecutive_failures should be 20") + s.Equal(20, data.MaxConsecutiveFailures, "max_consecutive_failures should be 20") + + // AttemptResponse assertion + s.NotNil(data.AttemptResponse, "alert should have attempt_response") } diff --git a/cmd/e2e/helpers_test.go b/cmd/e2e/helpers_test.go index 852d390f..e2c3da56 100644 --- a/cmd/e2e/helpers_test.go +++ b/cmd/e2e/helpers_test.go @@ -407,6 +407,23 @@ func (s *basicSuite) waitForAlerts(destID string, count int) { s.Require().FailNowf("timeout", "timed out waiting for %d alerts for %s (got %d)", count, destID, lastCount) } +// waitForAlertsByTopic polls until at least count alerts with the specific topic exist for the destination. +func (s *basicSuite) waitForAlertsByTopic(destID, topic string, count int) { + s.T().Helper() + timeout := alertPollTimeout + deadline := time.Now().Add(timeout) + var lastCount int + + for time.Now().Before(deadline) { + lastCount = len(s.alertServer.GetAlertsForDestinationByTopic(destID, topic)) + if lastCount >= count { + return + } + time.Sleep(100 * time.Millisecond) + } + s.Require().FailNowf("timeout", "timed out waiting for %d %s alerts for %s (got %d)", count, topic, destID, lastCount) +} + // ============================================================================= // Absence assertion // ============================================================================= diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 2185635e..9f75cd8b 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -236,3 +236,89 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { } require.Equal(t, 6, disableCallCount, "Should have called disable 6 times (for failures 20-25)") } + +func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { + // This test verifies that when a destination is auto-disabled after reaching + // the consecutive failure threshold, a DestinationDisabledAlert is sent via + // the notifier with topic "alert.destination.disabled". + t.Parallel() + ctx := context.Background() + logger := testutil.CreateTestLogger(t) + redisClient := testutil.CreateTestRedisClient(t) + notifier := &mockAlertNotifier{} + notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabler := &mockDestinationDisabler{} + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + autoDisableCount := 5 + monitor := alert.NewAlertMonitor( + logger, + redisClient, + alert.WithNotifier(notifier), + alert.WithDisabler(disabler), + alert.WithAutoDisableFailureCount(autoDisableCount), + alert.WithAlertThresholds([]int{100}), // Only alert at 100% to simplify test + ) + + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_disabled_test"), + testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), + ) + dest := alert.AlertDestinationFromDestination(&modelsDest) + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID("event_123"), + testutil.EventFactory.WithTopic("test.event"), + ) + task := &models.DeliveryTask{Event: *event} + attemptResponse := map[string]interface{}{ + "status": "500", + "data": map[string]any{"error": "internal server error"}, + } + attempt := alert.DeliveryAttempt{ + Success: false, + DeliveryTask: task, + Destination: dest, + AttemptResponse: attemptResponse, + Timestamp: time.Now(), + } + + // Send exactly autoDisableCount failures to trigger auto-disable + for i := 1; i <= autoDisableCount; i++ { + require.NoError(t, monitor.HandleAttempt(ctx, attempt)) + } + + // Verify destination was disabled + disabler.AssertCalled(t, "DisableDestination", mock.Anything, dest.TenantID, dest.ID) + + // Find the DestinationDisabledAlert in the notifier calls + var foundDestinationDisabledAlert bool + var destinationDisabledAlert alert.DestinationDisabledAlert + for _, call := range notifier.Calls { + if call.Method == "Notify" { + alertArg := call.Arguments.Get(1) + if disabledAlert, ok := alertArg.(alert.DestinationDisabledAlert); ok { + foundDestinationDisabledAlert = true + destinationDisabledAlert = disabledAlert + break + } + } + } + + require.True(t, foundDestinationDisabledAlert, "Expected DestinationDisabledAlert to be sent when destination is disabled") + + // Verify the alert topic + assert.Equal(t, "alert.destination.disabled", destinationDisabledAlert.Topic, "Alert should have topic 'alert.destination.disabled'") + + // Verify the alert data + assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.TenantID, "TenantID should match") + assert.Equal(t, dest, destinationDisabledAlert.Data.Destination, "Destination should match") + assert.False(t, destinationDisabledAlert.Data.DisabledAt.IsZero(), "DisabledAt should be set") + assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures, "ConsecutiveFailures should match threshold") + assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.MaxConsecutiveFailures, "MaxConsecutiveFailures should match configured value") + assert.Equal(t, attemptResponse, destinationDisabledAlert.Data.AttemptResponse, "AttemptResponse should match") + + // Verify the triggering event is included + require.NotNil(t, destinationDisabledAlert.Data.TriggeringEvent, "TriggeringEvent should be set") + assert.Equal(t, event.ID, destinationDisabledAlert.Data.TriggeringEvent.ID, "TriggeringEvent ID should match") + assert.Equal(t, event.Topic, destinationDisabledAlert.Data.TriggeringEvent.Topic, "TriggeringEvent Topic should match") +} diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index 92cc8b51..75ba00ed 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -110,6 +110,39 @@ func NewConsecutiveFailureAlert(data ConsecutiveFailureData) ConsecutiveFailureA } } +// DestinationDisabledData represents the data for a destination disabled alert +type DestinationDisabledData struct { + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"` + ConsecutiveFailures int `json:"consecutive_failures"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + AttemptResponse map[string]any `json:"attempt_response"` +} + +// DestinationDisabledAlert represents an alert for when a destination is auto-disabled +type DestinationDisabledAlert struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data DestinationDisabledData `json:"data"` +} + +// MarshalJSON implements json.Marshaler +func (a DestinationDisabledAlert) MarshalJSON() ([]byte, error) { + type Alias DestinationDisabledAlert + return json.Marshal(Alias(a)) +} + +// NewDestinationDisabledAlert creates a new destination disabled alert with defaults +func NewDestinationDisabledAlert(data DestinationDisabledData) DestinationDisabledAlert { + return DestinationDisabledAlert{ + Topic: "alert.destination.disabled", + Timestamp: time.Now(), + Data: data, + } +} + type httpAlertNotifier struct { client *http.Client callbackURL string From 419d6fb12beea80497be513eb0cf493a19c6a19d Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 13:00:34 +0700 Subject: [PATCH 03/10] feat: add alert.destination.disabled callback Send alert when destination is auto-disabled after reaching consecutive failure threshold. Co-Authored-By: Claude Opus 4.5 --- internal/alert/monitor.go | 31 ++++++++++- internal/alert/monitor_test.go | 98 ++++++++++++++++++++++------------ internal/services/builder.go | 12 +++-- 3 files changed, 101 insertions(+), 40 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 7eeb10df..d87b9f75 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -13,7 +13,7 @@ import ( // DestinationDisabler handles disabling destinations type DestinationDisabler interface { - DisableDestination(ctx context.Context, tenantID, destinationID string) error + DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) } // AlertMonitor is the main interface for handling delivery attempt alerts @@ -170,7 +170,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp // If we've hit 100% and have a disabler configured, disable the destination if level == 100 && m.disabler != nil { - if err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID); err != nil { + disabledDest, err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID) + if err != nil { return fmt.Errorf("failed to disable destination: %w", err) } @@ -180,6 +181,32 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) + + // Send destination disabled alert + if m.notifier != nil { + disabledAlert := NewDestinationDisabledAlert(DestinationDisabledData{ + TenantID: attempt.Destination.TenantID, + Destination: AlertDestinationFromDestination(&disabledDest), + DisabledAt: *disabledDest.DisabledAt, + TriggeringEvent: &AlertedEvent{ + ID: attempt.DeliveryTask.Event.ID, + Topic: attempt.DeliveryTask.Event.Topic, + Metadata: attempt.DeliveryTask.Event.Metadata, + Data: attempt.DeliveryTask.Event.Data, + }, + ConsecutiveFailures: count, + MaxConsecutiveFailures: m.autoDisableFailureCount, + AttemptResponse: attempt.AttemptResponse, + }) + if err := m.notifier.Notify(ctx, disabledAlert); err != nil { + m.logger.Ctx(ctx).Error("failed to send destination disabled alert", + zap.Error(err), + zap.String("tenant_id", attempt.Destination.TenantID), + zap.String("destination_id", attempt.Destination.ID), + ) + return fmt.Errorf("failed to send destination disabled alert: %w", err) + } + } } // Send alert if notifier is configured diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 9f75cd8b..62c87613 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -27,9 +27,9 @@ type mockDestinationDisabler struct { mock.Mock } -func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { - m.Called(ctx, tenantID, destinationID) - return nil +func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) { + args := m.Called(ctx, tenantID, destinationID) + return args.Get(0).(models.Destination), args.Error(1) } func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { @@ -39,8 +39,14 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_1", + TenantID: "tenant_1", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -70,22 +76,23 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications were sent at correct thresholds - var notifyCallCount int + // Verify consecutive failure notifications were sent at correct thresholds + var consecutiveFailureCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - failures := alert.Data.ConsecutiveFailures - require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") - require.Equal(t, dest, alert.Data.Destination) - require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.AttemptResponse, alert.Data.AttemptResponse) - require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) - require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + failures := cfAlert.Data.ConsecutiveFailures + require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") + require.Equal(t, dest, cfAlert.Data.Destination) + require.Equal(t, "alert.consecutive_failure", cfAlert.Topic) + require.Equal(t, attempt.AttemptResponse, cfAlert.Data.AttemptResponse) + require.Equal(t, 20, cfAlert.Data.MaxConsecutiveFailures) + require.Equal(t, failures == 20, cfAlert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + } } } - require.Equal(t, 4, notifyCallCount, "Should have sent exactly 4 notifications") + require.Equal(t, 4, consecutiveFailureCount, "Should have sent exactly 4 consecutive failure notifications") // Verify destination was disabled exactly once at 100% var disableCallCount int @@ -106,8 +113,14 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_1", + TenantID: "tenant_1", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -159,8 +172,9 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { // Verify the notifications were at the right thresholds var seenCounts []int for _, call := range notifier.Calls { - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - seenCounts = append(seenCounts, alert.Data.ConsecutiveFailures) + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures) + } } assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)") assert.Contains(t, seenCounts, 14, "Should have alerted at 66% (14 failures)") @@ -179,8 +193,14 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_above", + TenantID: "tenant_above", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -209,22 +229,23 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications at 50%, 70%, 90%, and 100% thresholds + // Verify consecutive failure notifications at 50%, 70%, 90%, and 100% thresholds // Plus additional notifications for failures 21-25 (all at 100% level) - var notifyCallCount int + var consecutiveFailureCount int var disableNotifyCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - if alertData.Data.ConsecutiveFailures >= 20 { - disableNotifyCount++ - require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + if cfAlert.Data.ConsecutiveFailures >= 20 { + disableNotifyCount++ + require.True(t, cfAlert.Data.WillDisable, "WillDisable should be true at and above 100%") + } } } } // 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25 - require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)") + require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)") require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)") // Verify destination was disabled multiple times (once per failure >= 20) @@ -247,8 +268,16 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + + // Create a destination that will be returned by the disabler + disabledAt := time.Now() + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_disabled_test"), + testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), + ) + modelsDest.DisabledAt = &disabledAt disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(modelsDest, nil) autoDisableCount := 5 monitor := alert.NewAlertMonitor( @@ -260,10 +289,6 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { alert.WithAlertThresholds([]int{100}), // Only alert at 100% to simplify test ) - modelsDest := testutil.DestinationFactory.Any( - testutil.DestinationFactory.WithID("dest_disabled_test"), - testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), - ) dest := alert.AlertDestinationFromDestination(&modelsDest) event := testutil.EventFactory.AnyPointer( testutil.EventFactory.WithID("event_123"), @@ -311,8 +336,13 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { // Verify the alert data assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.TenantID, "TenantID should match") - assert.Equal(t, dest, destinationDisabledAlert.Data.Destination, "Destination should match") + assert.Equal(t, dest.ID, destinationDisabledAlert.Data.Destination.ID, "Destination ID should match") + assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.Destination.TenantID, "Destination TenantID should match") + assert.NotNil(t, destinationDisabledAlert.Data.Destination.DisabledAt, "Destination DisabledAt should be set") assert.False(t, destinationDisabledAlert.Data.DisabledAt.IsZero(), "DisabledAt should be set") + // Verify the alert's DisabledAt matches the destination's DisabledAt exactly + assert.Equal(t, disabledAt, destinationDisabledAlert.Data.DisabledAt, "Alert DisabledAt should match destination's DisabledAt exactly") + assert.Equal(t, disabledAt, *destinationDisabledAlert.Data.Destination.DisabledAt, "Alert Destination.DisabledAt should match destination's DisabledAt exactly") assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures, "ConsecutiveFailures should match threshold") assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.MaxConsecutiveFailures, "MaxConsecutiveFailures should match configured value") assert.Equal(t, attemptResponse, destinationDisabledAlert.Data.AttemptResponse, "AttemptResponse should match") diff --git a/internal/services/builder.go b/internal/services/builder.go index b3c0e71b..961b1fd0 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -18,6 +18,7 @@ import ( "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logmq" "github.com/hookdeck/outpost/internal/logstore" + "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/publishmq" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/scheduler" @@ -413,17 +414,20 @@ func newDestinationDisabler(tenantStore tenantstore.TenantStore) alert.Destinati } } -func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { +func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) { destination, err := d.tenantStore.RetrieveDestination(ctx, tenantID, destinationID) if err != nil { - return err + return models.Destination{}, err } if destination == nil { - return nil + return models.Destination{}, fmt.Errorf("destination not found") } now := time.Now() destination.DisabledAt = &now - return d.tenantStore.UpsertDestination(ctx, *destination) + if err := d.tenantStore.UpsertDestination(ctx, *destination); err != nil { + return models.Destination{}, err + } + return *destination, nil } // Helper methods for serviceInstance to initialize common dependencies From 0bc231eea5f6b4130ac22ef301f98d790563bd73 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 3 Feb 2026 18:54:46 +0700 Subject: [PATCH 04/10] chore: error handling --- internal/alert/monitor.go | 21 ++-- internal/alert/notifier_test.go | 205 ++++++++++++++++---------------- 2 files changed, 113 insertions(+), 113 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index d87b9f75..e13f245d 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -182,7 +182,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp zap.String("destination_type", attempt.Destination.Type), ) - // Send destination disabled alert + // Send destination disabled alert (best-effort, don't fail on notification error) if m.notifier != nil { disabledAlert := NewDestinationDisabledAlert(DestinationDisabledData{ TenantID: attempt.Destination.TenantID, @@ -204,30 +204,27 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), ) - return fmt.Errorf("failed to send destination disabled alert: %w", err) } } } - // Send alert if notifier is configured + // Send alert if notifier is configured (best-effort, don't fail on notification error) if m.notifier != nil { if err := m.notifier.Notify(ctx, alert); err != nil { - m.logger.Ctx(ctx).Error("failed to send alert", + m.logger.Ctx(ctx).Error("failed to send consecutive failure alert", zap.Error(err), zap.String("event_id", attempt.DeliveryTask.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), + ) + } else { + m.logger.Ctx(ctx).Audit("alert sent", + zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("tenant_id", attempt.Destination.TenantID), + zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) - return fmt.Errorf("failed to send alert: %w", err) } - - m.logger.Ctx(ctx).Audit("alert sent", - zap.String("event_id", attempt.DeliveryTask.Event.ID), - zap.String("tenant_id", attempt.Destination.TenantID), - zap.String("destination_id", attempt.Destination.ID), - zap.String("destination_type", attempt.Destination.Type), - ) } return nil diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 43ff62e4..6ca044f7 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -16,107 +17,109 @@ import ( func TestAlertNotifier_Notify(t *testing.T) { t.Parallel() - tests := []struct { - name string - handler func(w http.ResponseWriter, r *http.Request) - notifierOpts []alert.NotifierOption - wantErr bool - errContains string - }{ - { - name: "successful notification", - handler: func(w http.ResponseWriter, r *http.Request) { - // Verify request - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - - // Read and verify request body - var body map[string]interface{} - err := json.NewDecoder(r.Body).Decode(&body) - require.NoError(t, err) - - assert.Equal(t, "alert.consecutive_failure", body["topic"]) - data := body["data"].(map[string]interface{}) - assert.Equal(t, float64(10), data["max_consecutive_failures"]) - assert.Equal(t, float64(5), data["consecutive_failures"]) - assert.Equal(t, true, data["will_disable"]) - - // Log raw JSON for debugging - rawJSON, _ := json.Marshal(body) - t.Logf("Raw JSON: %s", string(rawJSON)) - - w.WriteHeader(http.StatusOK) - }, - }, - { - name: "server error", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - }, - wantErr: true, - errContains: "alert callback failed with status 500", - }, - { - name: "invalid response status", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - }, - wantErr: true, - errContains: "alert callback failed with status 400", - }, - { - name: "timeout exceeded", - handler: func(w http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - w.WriteHeader(http.StatusOK) - }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithTimeout(50 * time.Millisecond)}, - wantErr: true, - errContains: "context deadline exceeded", - }, - { - name: "successful notification with bearer token", - handler: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) - w.WriteHeader(http.StatusOK) + t.Run("successful notification", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + // Verify request + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + // Read and verify request body + var body map[string]any + err := json.NewDecoder(r.Body).Decode(&body) + require.NoError(t, err) + + assert.Equal(t, "alert.consecutive_failure", body["topic"]) + data := body["data"].(map[string]any) + assert.Equal(t, float64(10), data["max_consecutive_failures"]) + assert.Equal(t, float64(5), data["consecutive_failures"]) + assert.Equal(t, true, data["will_disable"]) + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + MaxConsecutiveFailures: 10, + ConsecutiveFailures: 5, + WillDisable: true, + Destination: dest, + AttemptResponse: map[string]any{ + "status": "error", + "data": map[string]any{"code": "ETIMEDOUT"}, }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithBearerToken("test-token")}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // Create test server - ts := httptest.NewServer(http.HandlerFunc(tt.handler)) - defer ts.Close() - - // Create notifier - notifier := alert.NewHTTPAlertNotifier(ts.URL, tt.notifierOpts...) - - // Create test alert - dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} - testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, - AttemptResponse: map[string]interface{}{ - "status": "error", - "data": map[string]any{"code": "ETIMEDOUT"}, - }, - }) - - // Send alert - err := notifier.Notify(context.Background(), testAlert) - - if tt.wantErr { - require.Error(t, err) - assert.ErrorContains(t, err, tt.errContains) - } else { - require.NoError(t, err) - } }) - } + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("successful notification with bearer token", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithBearerToken("test-token")) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + MaxConsecutiveFailures: 10, + ConsecutiveFailures: 5, + WillDisable: true, + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("server error returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + }) + + t.Run("timeout returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithTimeout(50*time.Millisecond)) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to send alert") + }) } From 7bb4bc834d8da97ec794e00b022335b29dacfa58 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 6 Feb 2026 16:44:45 +0700 Subject: [PATCH 05/10] refactor: restructure consecutive failure alert payload into nested object Replace flat fields (ConsecutiveFailures, MaxConsecutiveFailures, Progress, WillDisable) with a scoped ConsecutiveFailures struct containing Current, Max, and Threshold. This produces a cleaner JSON payload structure and removes the redundant WillDisable field (threshold == 100 implies disable). Co-Authored-By: Claude Opus 4.6 --- cmd/e2e/alert/alert.go | 24 ++--- cmd/e2e/alerts_test.go | 43 +++++--- internal/alert/monitor.go | 53 +++++----- internal/alert/monitor_test.go | 111 +++++++++++---------- internal/alert/notifier.go | 59 ++++------- internal/alert/notifier_test.go | 31 +++--- internal/deliverymq/messagehandler.go | 43 ++------ internal/deliverymq/messagehandler_test.go | 31 +++--- 8 files changed, 183 insertions(+), 212 deletions(-) diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index bc901498..bbd833d2 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -8,6 +8,8 @@ import ( "net/http" "sync" "time" + + "github.com/hookdeck/outpost/internal/models" ) type AlertRequest struct { @@ -35,14 +37,6 @@ type DestinationDisabledAlert struct { Data DestinationDisabledData `json:"data"` } -// AlertedEvent matches internal/alert.AlertedEvent -type AlertedEvent struct { - ID string `json:"id"` - Topic string `json:"topic"` - Metadata map[string]string `json:"metadata"` - Data map[string]any `json:"data"` -} - // AlertDestination matches internal/alert.AlertDestination type AlertDestination struct { ID string `json:"id"` @@ -60,12 +54,13 @@ type AlertDestination struct { // ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData type ConsecutiveFailureData struct { TenantID string `json:"tenant_id"` - Event AlertedEvent `json:"event"` - MaxConsecutiveFailures int `json:"max_consecutive_failures"` + Attempt *models.Attempt `json:"attempt"` + Event *models.Event `json:"event"` + Destination *AlertDestination `json:"destination"` ConsecutiveFailures int `json:"consecutive_failures"` + MaxConsecutiveFailures int `json:"max_consecutive_failures"` + Progress int `json:"progress"` WillDisable bool `json:"will_disable"` - Destination *AlertDestination `json:"destination"` - AttemptResponse map[string]any `json:"attempt_response"` } // DestinationDisabledData matches the expected payload for "alert.destination.disabled" @@ -73,10 +68,11 @@ type DestinationDisabledData struct { TenantID string `json:"tenant_id"` Destination *AlertDestination `json:"destination"` DisabledAt time.Time `json:"disabled_at"` - TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"` + Attempt *models.Attempt `json:"attempt,omitempty"` + Event *models.Event `json:"event,omitempty"` ConsecutiveFailures int `json:"consecutive_failures"` MaxConsecutiveFailures int `json:"max_consecutive_failures"` - AttemptResponse map[string]any `json:"attempt_response"` + Progress int `json:"progress"` } type AlertMockServer struct { diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index 4622b097..f4dd8a9e 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -22,7 +22,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { // Wait for 4 alert callbacks to be processed s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure") + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 18, 20} @@ -35,7 +35,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") // Topic assertion - s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + s.Equal("alert.destination.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") // TenantID assertion s.NotEmpty(data.TenantID, "alert should have tenant_id") @@ -62,8 +62,13 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { s.True(data.WillDisable, "last alert should have will_disable=true") } - // AttemptResponse assertion - s.NotNil(data.AttemptResponse, "alert should have attempt_response") + // Attempt assertion + s.Require().NotNil(data.Attempt, "alert should have attempt") + s.NotEmpty(data.Attempt.ID, "attempt should have ID") + s.NotEmpty(data.Attempt.Status, "attempt should have status") + + // Progress assertion + s.Greater(data.Progress, 0, "progress should be > 0") } } @@ -105,7 +110,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { // Wait for 4 alert callbacks: [10, 14] from first batch, [10, 14] from second batch s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.consecutive_failure") + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 10, 14} @@ -118,7 +123,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") // Topic assertion - s.Equal("alert.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + s.Equal("alert.destination.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") // TenantID assertion s.NotEmpty(data.TenantID, "alert should have tenant_id") @@ -143,8 +148,12 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { // WillDisable assertion (none should have will_disable=true since counter resets) s.False(data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) - // AttemptResponse assertion - s.NotNil(data.AttemptResponse, "alert should have attempt_response") + // Attempt assertion + s.Require().NotNil(data.Attempt, "alert should have attempt") + s.NotEmpty(data.Attempt.ID, "attempt should have ID") + + // Progress assertion + s.Greater(data.Progress, 0, "progress should be > 0") } } @@ -195,16 +204,22 @@ func (s *basicSuite) TestAlerts_DestinationDisabledCallback() { // DisabledAt assertion s.False(data.DisabledAt.IsZero(), "disabled_at should not be zero") - // TriggeringEvent assertions (optional but expected) - if data.TriggeringEvent != nil { - s.NotEmpty(data.TriggeringEvent.ID, "triggering event should have ID") - s.Equal("user.created", data.TriggeringEvent.Topic, "triggering event topic should match") + // Event assertions (optional but expected) + if data.Event != nil { + s.NotEmpty(data.Event.ID, "event should have ID") + s.Equal("user.created", data.Event.Topic, "event topic should match") + } + + // Attempt assertions (optional but expected) + if data.Attempt != nil { + s.NotEmpty(data.Attempt.ID, "attempt should have ID") + s.NotEmpty(data.Attempt.Status, "attempt should have status") } // ConsecutiveFailures assertions s.Equal(20, data.ConsecutiveFailures, "consecutive_failures should be 20") s.Equal(20, data.MaxConsecutiveFailures, "max_consecutive_failures should be 20") - // AttemptResponse assertion - s.NotNil(data.AttemptResponse, "alert should have attempt_response") + // Progress assertion + s.Equal(100, data.Progress, "progress should be 100 for disabled destination") } diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index e13f245d..46f1a371 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -3,7 +3,6 @@ package alert import ( "context" "fmt" - "time" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" @@ -82,11 +81,9 @@ func WithDeploymentID(deploymentID string) AlertOption { // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { - Success bool - DeliveryTask *models.DeliveryTask - Destination *AlertDestination - Timestamp time.Time - AttemptResponse map[string]interface{} + Event *models.Event + Destination *AlertDestination + Attempt *models.Attempt } type alertMonitor struct { @@ -138,7 +135,7 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ... } func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttempt) error { - if attempt.Success { + if attempt.Attempt.Status == models.AttemptStatusSuccess { return m.store.ResetConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID) } @@ -154,18 +151,15 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ - TenantID: attempt.Destination.TenantID, - Event: AlertedEvent{ - ID: attempt.DeliveryTask.Event.ID, - Topic: attempt.DeliveryTask.Event.Topic, - Metadata: attempt.DeliveryTask.Event.Metadata, - Data: attempt.DeliveryTask.Event.Data, + TenantID: attempt.Destination.TenantID, + Attempt: attempt.Attempt, + Event: attempt.Event, + Destination: attempt.Destination, + ConsecutiveFailures: ConsecutiveFailures{ + Current: count, + Max: m.autoDisableFailureCount, + Threshold: level, }, - MaxConsecutiveFailures: m.autoDisableFailureCount, - ConsecutiveFailures: count, - WillDisable: m.disabler != nil && level == 100, - Destination: attempt.Destination, - AttemptResponse: attempt.AttemptResponse, }) // If we've hit 100% and have a disabler configured, disable the destination @@ -174,9 +168,12 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp if err != nil { return fmt.Errorf("failed to disable destination: %w", err) } + if disabledDest.DisabledAt == nil { + return fmt.Errorf("invariant violation: DisableDestination returned destination without DisabledAt set") + } m.logger.Ctx(ctx).Audit("destination disabled", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), @@ -188,15 +185,13 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp TenantID: attempt.Destination.TenantID, Destination: AlertDestinationFromDestination(&disabledDest), DisabledAt: *disabledDest.DisabledAt, - TriggeringEvent: &AlertedEvent{ - ID: attempt.DeliveryTask.Event.ID, - Topic: attempt.DeliveryTask.Event.Topic, - Metadata: attempt.DeliveryTask.Event.Metadata, - Data: attempt.DeliveryTask.Event.Data, + Attempt: attempt.Attempt, + Event: attempt.Event, + ConsecutiveFailures: ConsecutiveFailures{ + Current: count, + Max: m.autoDisableFailureCount, + Threshold: 100, }, - ConsecutiveFailures: count, - MaxConsecutiveFailures: m.autoDisableFailureCount, - AttemptResponse: attempt.AttemptResponse, }) if err := m.notifier.Notify(ctx, disabledAlert); err != nil { m.logger.Ctx(ctx).Error("failed to send destination disabled alert", @@ -213,13 +208,13 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp if err := m.notifier.Notify(ctx, alert); err != nil { m.logger.Ctx(ctx).Error("failed to send consecutive failure alert", zap.Error(err), - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), ) } else { m.logger.Ctx(ctx).Audit("alert sent", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 62c87613..46c5d7f4 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -59,16 +59,16 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - AttemptResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: "attempt_1", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), }, - Timestamp: time.Now(), } // Send 20 consecutive failures @@ -82,13 +82,12 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { if call.Method == "Notify" { if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { consecutiveFailureCount++ - failures := cfAlert.Data.ConsecutiveFailures - require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") + cf := cfAlert.Data.ConsecutiveFailures + require.Contains(t, []int{10, 14, 18, 20}, cf.Current, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") require.Equal(t, dest, cfAlert.Data.Destination) - require.Equal(t, "alert.consecutive_failure", cfAlert.Topic) - require.Equal(t, attempt.AttemptResponse, cfAlert.Data.AttemptResponse) - require.Equal(t, 20, cfAlert.Data.MaxConsecutiveFailures) - require.Equal(t, failures == 20, cfAlert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + require.Equal(t, "alert.destination.consecutive_failure", cfAlert.Topic) + require.Equal(t, "attempt_1", cfAlert.Data.Attempt.ID) + require.Equal(t, 20, cf.Max) } } } @@ -133,16 +132,16 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} failedAttempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - AttemptResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: "attempt_reset", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), }, - Timestamp: time.Now(), } // Send 14 failures (should trigger 50% and 66% alerts) @@ -154,8 +153,11 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { require.Equal(t, 2, len(notifier.Calls)) // Send a success to reset the counter - successAttempt := failedAttempt - successAttempt.Success = true + successAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{Status: models.AttemptStatusSuccess}, + } require.NoError(t, monitor.HandleAttempt(ctx, successAttempt)) // Clear the mock calls to start fresh @@ -173,7 +175,7 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { var seenCounts []int for _, call := range notifier.Calls { if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { - seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures) + seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures.Current) } } assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)") @@ -213,15 +215,16 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_above", TenantID: "tenant_above"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - AttemptResponse: map[string]interface{}{ - "status": "500", + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: "attempt_above", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), }, - Timestamp: time.Now(), } // Send 25 consecutive failures (5 more than the threshold) @@ -237,16 +240,16 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { if call.Method == "Notify" { if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { consecutiveFailureCount++ - if cfAlert.Data.ConsecutiveFailures >= 20 { + if cfAlert.Data.ConsecutiveFailures.Current >= 20 { disableNotifyCount++ - require.True(t, cfAlert.Data.WillDisable, "WillDisable should be true at and above 100%") + require.Equal(t, 100, cfAlert.Data.ConsecutiveFailures.Threshold, "Threshold should be 100 at and above max") } } } } // 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25 require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)") - require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)") + require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications at threshold 100 (20-25)") // Verify destination was disabled multiple times (once per failure >= 20) var disableCallCount int @@ -294,17 +297,17 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { testutil.EventFactory.WithID("event_123"), testutil.EventFactory.WithTopic("test.event"), ) - task := &models.DeliveryTask{Event: *event} - attemptResponse := map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "internal server error"}, + testAttempt := &models.Attempt{ + ID: "attempt_disabled", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "internal server error"}, + Time: time.Now(), } attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - AttemptResponse: attemptResponse, - Timestamp: time.Now(), + Event: event, + Destination: dest, + Attempt: testAttempt, } // Send exactly autoDisableCount failures to trigger auto-disable @@ -343,12 +346,16 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { // Verify the alert's DisabledAt matches the destination's DisabledAt exactly assert.Equal(t, disabledAt, destinationDisabledAlert.Data.DisabledAt, "Alert DisabledAt should match destination's DisabledAt exactly") assert.Equal(t, disabledAt, *destinationDisabledAlert.Data.Destination.DisabledAt, "Alert Destination.DisabledAt should match destination's DisabledAt exactly") - assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures, "ConsecutiveFailures should match threshold") - assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.MaxConsecutiveFailures, "MaxConsecutiveFailures should match configured value") - assert.Equal(t, attemptResponse, destinationDisabledAlert.Data.AttemptResponse, "AttemptResponse should match") - - // Verify the triggering event is included - require.NotNil(t, destinationDisabledAlert.Data.TriggeringEvent, "TriggeringEvent should be set") - assert.Equal(t, event.ID, destinationDisabledAlert.Data.TriggeringEvent.ID, "TriggeringEvent ID should match") - assert.Equal(t, event.Topic, destinationDisabledAlert.Data.TriggeringEvent.Topic, "TriggeringEvent Topic should match") + assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures.Current, "ConsecutiveFailures.Current should match threshold") + assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures.Max, "ConsecutiveFailures.Max should match configured value") + assert.Equal(t, 100, destinationDisabledAlert.Data.ConsecutiveFailures.Threshold, "ConsecutiveFailures.Threshold should be 100") + + // Verify the attempt is included + require.NotNil(t, destinationDisabledAlert.Data.Attempt, "Attempt should be set") + assert.Equal(t, testAttempt.ID, destinationDisabledAlert.Data.Attempt.ID, "Attempt ID should match") + + // Verify the event is included + require.NotNil(t, destinationDisabledAlert.Data.Event, "Event should be set") + assert.Equal(t, event.ID, destinationDisabledAlert.Data.Event.ID, "Event ID should match") + assert.Equal(t, event.Topic, destinationDisabledAlert.Data.Event.Topic, "Event Topic should match") } diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index 75ba00ed..cbc03d67 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -12,9 +12,7 @@ import ( ) // Alert represents any alert that can be sent -type Alert interface { - json.Marshaler -} +type Alert interface{} // AlertNotifier sends alerts to configured destinations type AlertNotifier interface { @@ -42,13 +40,6 @@ func NotifierWithBearerToken(token string) NotifierOption { } } -type AlertedEvent struct { - ID string `json:"id"` - Topic string `json:"topic"` // event topic - Metadata map[string]string `json:"metadata"` // event metadata - Data map[string]interface{} `json:"data"` // event payload -} - type AlertDestination struct { ID string `json:"id" redis:"id"` TenantID string `json:"tenant_id" redis:"-"` @@ -77,15 +68,20 @@ func AlertDestinationFromDestination(d *models.Destination) *AlertDestination { } } +// ConsecutiveFailures represents the nested consecutive failure state +type ConsecutiveFailures struct { + Current int `json:"current"` + Max int `json:"max"` + Threshold int `json:"threshold"` +} + // ConsecutiveFailureData represents the data needed for a consecutive failure alert type ConsecutiveFailureData struct { - TenantID string `json:"tenant_id"` - Event AlertedEvent `json:"event"` - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - ConsecutiveFailures int `json:"consecutive_failures"` - WillDisable bool `json:"will_disable"` - Destination *AlertDestination `json:"destination"` - AttemptResponse map[string]interface{} `json:"attempt_response"` + TenantID string `json:"tenant_id"` + Attempt *models.Attempt `json:"attempt"` + Event *models.Event `json:"event"` + Destination *AlertDestination `json:"destination"` + ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"` } // ConsecutiveFailureAlert represents an alert for consecutive failures @@ -95,16 +91,10 @@ type ConsecutiveFailureAlert struct { Data ConsecutiveFailureData `json:"data"` } -// MarshalJSON implements json.Marshaler -func (a ConsecutiveFailureAlert) MarshalJSON() ([]byte, error) { - type Alias ConsecutiveFailureAlert - return json.Marshal(Alias(a)) -} - // NewConsecutiveFailureAlert creates a new consecutive failure alert with defaults func NewConsecutiveFailureAlert(data ConsecutiveFailureData) ConsecutiveFailureAlert { return ConsecutiveFailureAlert{ - Topic: "alert.consecutive_failure", + Topic: "alert.destination.consecutive_failure", Timestamp: time.Now(), Data: data, } @@ -112,13 +102,12 @@ func NewConsecutiveFailureAlert(data ConsecutiveFailureData) ConsecutiveFailureA // DestinationDisabledData represents the data for a destination disabled alert type DestinationDisabledData struct { - TenantID string `json:"tenant_id"` - Destination *AlertDestination `json:"destination"` - DisabledAt time.Time `json:"disabled_at"` - TriggeringEvent *AlertedEvent `json:"triggering_event,omitempty"` - ConsecutiveFailures int `json:"consecutive_failures"` - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - AttemptResponse map[string]any `json:"attempt_response"` + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + Attempt *models.Attempt `json:"attempt,omitempty"` + Event *models.Event `json:"event,omitempty"` + ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"` } // DestinationDisabledAlert represents an alert for when a destination is auto-disabled @@ -128,12 +117,6 @@ type DestinationDisabledAlert struct { Data DestinationDisabledData `json:"data"` } -// MarshalJSON implements json.Marshaler -func (a DestinationDisabledAlert) MarshalJSON() ([]byte, error) { - type Alias DestinationDisabledAlert - return json.Marshal(Alias(a)) -} - // NewDestinationDisabledAlert creates a new destination disabled alert with defaults func NewDestinationDisabledAlert(data DestinationDisabledData) DestinationDisabledAlert { return DestinationDisabledAlert{ @@ -162,7 +145,7 @@ func NewHTTPAlertNotifier(callbackURL string, opts ...NotifierOption) AlertNotif } func (n *httpAlertNotifier) Notify(ctx context.Context, alert Alert) error { - body, err := alert.MarshalJSON() + body, err := json.Marshal(alert) if err != nil { return fmt.Errorf("failed to marshal alert: %w", err) } diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 6ca044f7..2aece718 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -31,11 +31,12 @@ func TestAlertNotifier_Notify(t *testing.T) { err := json.NewDecoder(r.Body).Decode(&body) require.NoError(t, err) - assert.Equal(t, "alert.consecutive_failure", body["topic"]) + assert.Equal(t, "alert.destination.consecutive_failure", body["topic"]) data := body["data"].(map[string]any) - assert.Equal(t, float64(10), data["max_consecutive_failures"]) - assert.Equal(t, float64(5), data["consecutive_failures"]) - assert.Equal(t, true, data["will_disable"]) + cf := data["consecutive_failures"].(map[string]any) + assert.Equal(t, float64(5), cf["current"]) + assert.Equal(t, float64(10), cf["max"]) + assert.Equal(t, float64(50), cf["threshold"]) w.WriteHeader(http.StatusOK) })) @@ -44,13 +45,11 @@ func TestAlertNotifier_Notify(t *testing.T) { notifier := alert.NewHTTPAlertNotifier(ts.URL) dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, - AttemptResponse: map[string]any{ - "status": "error", - "data": map[string]any{"code": "ETIMEDOUT"}, + Destination: dest, + ConsecutiveFailures: alert.ConsecutiveFailures{ + Current: 5, + Max: 10, + Threshold: 50, }, }) @@ -73,10 +72,12 @@ func TestAlertNotifier_Notify(t *testing.T) { notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithBearerToken("test-token")) dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, + Destination: dest, + ConsecutiveFailures: alert.ConsecutiveFailures{ + Current: 5, + Max: 10, + Threshold: 50, + }, }) err := notifier.Notify(context.Background(), testAlert) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 39eca454..92642527 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -272,7 +272,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return &PostDeliveryError{err: logErr} } - go h.handleAlertAttempt(ctx, task, destination, attempt, err) + go h.handleAlertAttempt(ctx, destination, &task.Event, attempt) // If we have an AttemptError, return it as is var atmErr *AttemptError @@ -294,39 +294,18 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return nil } -func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attemptResult *models.Attempt, err error) { - alertAttempt := alert.DeliveryAttempt{ - Success: attemptResult.Status == models.AttemptStatusSuccess, - DeliveryTask: task, - Destination: alert.AlertDestinationFromDestination(destination), - Timestamp: attemptResult.Time, +func (h *messageHandler) handleAlertAttempt(ctx context.Context, destination *models.Destination, event *models.Event, attempt *models.Attempt) { + da := alert.DeliveryAttempt{ + Event: event, + Destination: alert.AlertDestinationFromDestination(destination), + Attempt: attempt, } - if !alertAttempt.Success && err != nil { - // Extract attempt data if available - var atmErr *AttemptError - if errors.As(err, &atmErr) { - var pubErr *destregistry.ErrDestinationPublishAttempt - if errors.As(atmErr.err, &pubErr) { - alertAttempt.AttemptResponse = pubErr.Data - } else { - alertAttempt.AttemptResponse = map[string]interface{}{ - "error": atmErr.err.Error(), - } - } - } else { - alertAttempt.AttemptResponse = map[string]interface{}{ - "error": "unexpected", - "message": err.Error(), - } - } - } - - if monitorErr := h.alertMonitor.HandleAttempt(ctx, alertAttempt); monitorErr != nil { + if monitorErr := h.alertMonitor.HandleAttempt(ctx, da); monitorErr != nil { h.logger.Ctx(ctx).Error("failed to handle alert attempt", zap.Error(monitorErr), - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) @@ -334,8 +313,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De } h.logger.Ctx(ctx).Info("alert attempt handled", - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index f3a5212b..23b0ccc9 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -258,7 +258,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { "should use GetRetryID for task ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_PublishError_NotEligible(t *testing.T) { @@ -326,7 +326,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { assert.Equal(t, 1, publisher.current, "should only attempt once") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_RetryFlow(t *testing.T) { @@ -833,7 +833,7 @@ func TestManualDelivery_PublishError(t *testing.T) { assert.Empty(t, retryScheduler.schedules, "should not schedule retry for manual delivery") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestManualDelivery_CancelError(t *testing.T) { @@ -896,7 +896,7 @@ func TestManualDelivery_CancelError(t *testing.T) { assert.Equal(t, models.RetryID(task.Event.ID, task.DestinationID), retryScheduler.canceled[0], "should cancel with correct retry ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusSuccess, logPublisher.entries[0].Attempt.Status, "delivery status should be OK despite cancel error") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestManualDelivery_DestinationDisabled(t *testing.T) { @@ -986,10 +986,10 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Setup alert monitor expectations alertMonitor.On("HandleAttempt", mock.Anything, mock.MatchedBy(func(attempt alert.DeliveryAttempt) bool { - return attempt.Success && // Should be a successful attempt - attempt.Destination.ID == destination.ID && // Should have correct destination - attempt.DeliveryTask != nil && // Should have delivery task - attempt.AttemptResponse == nil // No error data for success + return attempt.Attempt.Status == models.AttemptStatusSuccess && + attempt.Destination.ID == destination.ID && + attempt.Event != nil && + attempt.Attempt != nil })).Return(nil) // Setup message handler @@ -1020,7 +1020,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Assert behavior assert.True(t, mockMsg.acked, "message should be acked on success") assert.False(t, mockMsg.nacked, "message should not be nacked on success") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestMessageHandler_AlertMonitorError(t *testing.T) { @@ -1094,7 +1094,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { } // Helper function to assert alert monitor calls -func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination, expectedData map[string]interface{}) { +func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination) { t.Helper() // Wait for the alert monitor to be called @@ -1107,15 +1107,10 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina lastCall := calls[len(calls)-1] attempt := lastCall.Arguments[1].(alert.DeliveryAttempt) - assert.Equal(t, success, attempt.Success, "alert attempt success should match") + assert.Equal(t, success, attempt.Attempt.Status == models.AttemptStatusSuccess, "alert attempt success should match") assert.Equal(t, destination.ID, attempt.Destination.ID, "alert attempt destination should match") - assert.NotNil(t, attempt.DeliveryTask, "alert attempt should have delivery task") - - if expectedData != nil { - assert.Equal(t, expectedData, attempt.AttemptResponse, "alert attempt data should match") - } else { - assert.Nil(t, attempt.AttemptResponse, "alert attempt should not have data") - } + assert.NotNil(t, attempt.Event, "alert attempt should have event") + assert.NotNil(t, attempt.Attempt, "alert attempt should have attempt data") } func TestManualDelivery_DuplicateRetry(t *testing.T) { From 34840443409b60721486acfdd43dc0f24c389a48 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 6 Feb 2026 16:58:36 +0700 Subject: [PATCH 06/10] refactor: decouple disabled alert from consecutive failure mechanism Replace ConsecutiveFailures struct in DestinationDisabledData with a Reason string field. This decouples the disabled alert from the specific trigger mechanism, allowing future disable reasons (e.g., error rate) without restructuring the payload. Also update e2e types and assertions for the nested ConsecutiveFailures struct introduced in the previous commit. Co-Authored-By: Claude Opus 4.6 --- cmd/e2e/alert/alert.go | 34 ++++++++++++++++++---------------- cmd/e2e/alerts_test.go | 33 +++++++++------------------------ internal/alert/monitor.go | 6 +----- internal/alert/monitor_test.go | 4 +--- internal/alert/notifier.go | 12 ++++++------ 5 files changed, 35 insertions(+), 54 deletions(-) diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index bbd833d2..b950e8e8 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -51,28 +51,30 @@ type AlertDestination struct { DisabledAt *time.Time `json:"disabled_at"` } +// ConsecutiveFailures represents the nested consecutive failure state +type ConsecutiveFailures struct { + Current int `json:"current"` + Max int `json:"max"` + Threshold int `json:"threshold"` +} + // ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData type ConsecutiveFailureData struct { - TenantID string `json:"tenant_id"` - Attempt *models.Attempt `json:"attempt"` - Event *models.Event `json:"event"` - Destination *AlertDestination `json:"destination"` - ConsecutiveFailures int `json:"consecutive_failures"` - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - Progress int `json:"progress"` - WillDisable bool `json:"will_disable"` + TenantID string `json:"tenant_id"` + Attempt *models.Attempt `json:"attempt"` + Event *models.Event `json:"event"` + Destination *AlertDestination `json:"destination"` + ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"` } // DestinationDisabledData matches the expected payload for "alert.destination.disabled" type DestinationDisabledData struct { - TenantID string `json:"tenant_id"` - Destination *AlertDestination `json:"destination"` - DisabledAt time.Time `json:"disabled_at"` - Attempt *models.Attempt `json:"attempt,omitempty"` - Event *models.Event `json:"event,omitempty"` - ConsecutiveFailures int `json:"consecutive_failures"` - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - Progress int `json:"progress"` + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + Reason string `json:"reason"` + Attempt *models.Attempt `json:"attempt,omitempty"` + Event *models.Event `json:"event,omitempty"` } type AlertMockServer struct { diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index f4dd8a9e..5af4a863 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -53,22 +53,15 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { s.NotNil(data.Event.Data, "alert event should have data") // ConsecutiveFailures assertions - s.Equal(expectedCounts[i], data.ConsecutiveFailures, + s.Equal(expectedCounts[i], data.ConsecutiveFailures.Current, "alert %d should have %d consecutive failures", i, expectedCounts[i]) - s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20") - - // WillDisable assertion (should be true for last alert only) - if i == len(alerts)-1 { - s.True(data.WillDisable, "last alert should have will_disable=true") - } + s.Equal(20, data.ConsecutiveFailures.Max, "max consecutive failures should be 20") + s.Greater(data.ConsecutiveFailures.Threshold, 0, "threshold should be > 0") // Attempt assertion s.Require().NotNil(data.Attempt, "alert should have attempt") s.NotEmpty(data.Attempt.ID, "attempt should have ID") s.NotEmpty(data.Attempt.Status, "attempt should have status") - - // Progress assertion - s.Greater(data.Progress, 0, "progress should be > 0") } } @@ -141,19 +134,15 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.NotNil(data.Event.Data, "alert event should have data") // ConsecutiveFailures assertions - s.Equal(expectedCounts[i], data.ConsecutiveFailures, + s.Equal(expectedCounts[i], data.ConsecutiveFailures.Current, "alert %d should have %d consecutive failures", i, expectedCounts[i]) - s.Equal(20, data.MaxConsecutiveFailures, "max consecutive failures should be 20") - - // WillDisable assertion (none should have will_disable=true since counter resets) - s.False(data.WillDisable, "alert %d should have will_disable=false (counter resets)", i) + s.Equal(20, data.ConsecutiveFailures.Max, "max consecutive failures should be 20") + s.Greater(data.ConsecutiveFailures.Threshold, 0, "threshold should be > 0") + s.Less(data.ConsecutiveFailures.Threshold, 100, "threshold should be < 100 (counter resets)") // Attempt assertion s.Require().NotNil(data.Attempt, "alert should have attempt") s.NotEmpty(data.Attempt.ID, "attempt should have ID") - - // Progress assertion - s.Greater(data.Progress, 0, "progress should be > 0") } } @@ -216,10 +205,6 @@ func (s *basicSuite) TestAlerts_DestinationDisabledCallback() { s.NotEmpty(data.Attempt.Status, "attempt should have status") } - // ConsecutiveFailures assertions - s.Equal(20, data.ConsecutiveFailures, "consecutive_failures should be 20") - s.Equal(20, data.MaxConsecutiveFailures, "max_consecutive_failures should be 20") - - // Progress assertion - s.Equal(100, data.Progress, "progress should be 100 for disabled destination") + // Reason assertion + s.Equal("consecutive_failure", data.Reason, "reason should be consecutive_failure") } diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 46f1a371..2f273485 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -185,13 +185,9 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp TenantID: attempt.Destination.TenantID, Destination: AlertDestinationFromDestination(&disabledDest), DisabledAt: *disabledDest.DisabledAt, + Reason: "consecutive_failure", Attempt: attempt.Attempt, Event: attempt.Event, - ConsecutiveFailures: ConsecutiveFailures{ - Current: count, - Max: m.autoDisableFailureCount, - Threshold: 100, - }, }) if err := m.notifier.Notify(ctx, disabledAlert); err != nil { m.logger.Ctx(ctx).Error("failed to send destination disabled alert", diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 46c5d7f4..3c709152 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -346,9 +346,7 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { // Verify the alert's DisabledAt matches the destination's DisabledAt exactly assert.Equal(t, disabledAt, destinationDisabledAlert.Data.DisabledAt, "Alert DisabledAt should match destination's DisabledAt exactly") assert.Equal(t, disabledAt, *destinationDisabledAlert.Data.Destination.DisabledAt, "Alert Destination.DisabledAt should match destination's DisabledAt exactly") - assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures.Current, "ConsecutiveFailures.Current should match threshold") - assert.Equal(t, autoDisableCount, destinationDisabledAlert.Data.ConsecutiveFailures.Max, "ConsecutiveFailures.Max should match configured value") - assert.Equal(t, 100, destinationDisabledAlert.Data.ConsecutiveFailures.Threshold, "ConsecutiveFailures.Threshold should be 100") + assert.Equal(t, "consecutive_failure", destinationDisabledAlert.Data.Reason, "Reason should be consecutive_failure") // Verify the attempt is included require.NotNil(t, destinationDisabledAlert.Data.Attempt, "Attempt should be set") diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index cbc03d67..fd8724c9 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -102,12 +102,12 @@ func NewConsecutiveFailureAlert(data ConsecutiveFailureData) ConsecutiveFailureA // DestinationDisabledData represents the data for a destination disabled alert type DestinationDisabledData struct { - TenantID string `json:"tenant_id"` - Destination *AlertDestination `json:"destination"` - DisabledAt time.Time `json:"disabled_at"` - Attempt *models.Attempt `json:"attempt,omitempty"` - Event *models.Event `json:"event,omitempty"` - ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"` + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + Reason string `json:"reason"` + Attempt *models.Attempt `json:"attempt,omitempty"` + Event *models.Event `json:"event,omitempty"` } // DestinationDisabledAlert represents an alert for when a destination is auto-disabled From 5b326ac82a6343c38a3e59b4aa63e88b68bffff8 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 6 Feb 2026 16:59:56 +0700 Subject: [PATCH 07/10] feat: reflect disabled state in consecutive failure alert at threshold 100 After disabling a destination, update the consecutive failure alert's destination to include DisabledAt, so consumers see the post-disable state. Add test asserting this behavior. Co-Authored-By: Claude Opus 4.6 --- internal/alert/monitor.go | 34 +++++++++------ internal/alert/monitor_test.go | 73 ++++++++++++++++++++++++++++++++- internal/util/testutil/event.go | 6 +++ 3 files changed, 99 insertions(+), 14 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 2f273485..8b6d9977 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -150,19 +150,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp return nil } - alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ - TenantID: attempt.Destination.TenantID, - Attempt: attempt.Attempt, - Event: attempt.Event, - Destination: attempt.Destination, - ConsecutiveFailures: ConsecutiveFailures{ - Current: count, - Max: m.autoDisableFailureCount, - Threshold: level, - }, - }) - // If we've hit 100% and have a disabler configured, disable the destination + alertDest := attempt.Destination if level == 100 && m.disabler != nil { disabledDest, err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID) if err != nil { @@ -173,17 +162,20 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } m.logger.Ctx(ctx).Audit("destination disabled", + zap.String("attempt_id", attempt.Attempt.ID), zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) + alertDest = AlertDestinationFromDestination(&disabledDest) + // Send destination disabled alert (best-effort, don't fail on notification error) if m.notifier != nil { disabledAlert := NewDestinationDisabledAlert(DestinationDisabledData{ TenantID: attempt.Destination.TenantID, - Destination: AlertDestinationFromDestination(&disabledDest), + Destination: alertDest, DisabledAt: *disabledDest.DisabledAt, Reason: "consecutive_failure", Attempt: attempt.Attempt, @@ -192,6 +184,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp if err := m.notifier.Notify(ctx, disabledAlert); err != nil { m.logger.Ctx(ctx).Error("failed to send destination disabled alert", zap.Error(err), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), ) @@ -199,17 +193,31 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } } + alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ + TenantID: attempt.Destination.TenantID, + Attempt: attempt.Attempt, + Event: attempt.Event, + Destination: alertDest, + ConsecutiveFailures: ConsecutiveFailures{ + Current: count, + Max: m.autoDisableFailureCount, + Threshold: level, + }, + }) + // Send alert if notifier is configured (best-effort, don't fail on notification error) if m.notifier != nil { if err := m.notifier.Notify(ctx, alert); err != nil { m.logger.Ctx(ctx).Error("failed to send consecutive failure alert", zap.Error(err), + zap.String("attempt_id", attempt.Attempt.ID), zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), ) } else { m.logger.Ctx(ctx).Audit("alert sent", + zap.String("attempt_id", attempt.Attempt.ID), zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 3c709152..03b92cc3 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -84,7 +84,13 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { consecutiveFailureCount++ cf := cfAlert.Data.ConsecutiveFailures require.Contains(t, []int{10, 14, 18, 20}, cf.Current, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") - require.Equal(t, dest, cfAlert.Data.Destination) + require.Equal(t, dest.ID, cfAlert.Data.Destination.ID) + require.Equal(t, dest.TenantID, cfAlert.Data.Destination.TenantID) + if cf.Threshold == 100 { + require.NotNil(t, cfAlert.Data.Destination.DisabledAt, "Destination should have DisabledAt at threshold 100") + } else { + require.Nil(t, cfAlert.Data.Destination.DisabledAt, "Destination should not have DisabledAt below threshold 100") + } require.Equal(t, "alert.destination.consecutive_failure", cfAlert.Topic) require.Equal(t, "attempt_1", cfAlert.Data.Attempt.ID) require.Equal(t, 20, cf.Max) @@ -357,3 +363,68 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { assert.Equal(t, event.ID, destinationDisabledAlert.Data.Event.ID, "Event ID should match") assert.Equal(t, event.Topic, destinationDisabledAlert.Data.Event.Topic, "Event Topic should match") } + +func TestAlertMonitor_ConsecutiveFailureAlert_ReflectsDisabledDestination(t *testing.T) { + // Tests that the consecutive failure alert at threshold 100 includes + // the destination with DisabledAt set, reflecting the post-disable state. + t.Parallel() + ctx := context.Background() + logger := testutil.CreateTestLogger(t) + redisClient := testutil.CreateTestRedisClient(t) + notifier := &mockAlertNotifier{} + notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + + disabledAt := time.Now() + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_reflect"), + testutil.DestinationFactory.WithTenantID("tenant_reflect"), + ) + modelsDest.DisabledAt = &disabledAt + disabler := &mockDestinationDisabler{} + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(modelsDest, nil) + + autoDisableCount := 5 + monitor := alert.NewAlertMonitor( + logger, + redisClient, + alert.WithNotifier(notifier), + alert.WithDisabler(disabler), + alert.WithAutoDisableFailureCount(autoDisableCount), + alert.WithAlertThresholds([]int{100}), + ) + + dest := &alert.AlertDestination{ID: "dest_reflect", TenantID: "tenant_reflect"} + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID("event_reflect"), + ) + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: testutil.AttemptFactory.AnyPointer( + testutil.AttemptFactory.WithID("attempt_reflect"), + testutil.AttemptFactory.WithStatus("failed"), + testutil.AttemptFactory.WithCode("500"), + ), + } + + for i := 1; i <= autoDisableCount; i++ { + require.NoError(t, monitor.HandleAttempt(ctx, attempt)) + } + + // Find the consecutive failure alert at threshold 100 + var found bool + for _, call := range notifier.Calls { + if call.Method == "Notify" { + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + if cfAlert.Data.ConsecutiveFailures.Threshold == 100 { + found = true + // The destination in the alert should reflect the disabled state + require.NotNil(t, cfAlert.Data.Destination.DisabledAt, "Destination in consecutive failure alert at threshold 100 should have DisabledAt set") + assert.Equal(t, disabledAt, *cfAlert.Data.Destination.DisabledAt, "DisabledAt should match") + break + } + } + } + } + require.True(t, found, "Should have found a consecutive failure alert at threshold 100") +} diff --git a/internal/util/testutil/event.go b/internal/util/testutil/event.go index 31ba24c6..9f085ab2 100644 --- a/internal/util/testutil/event.go +++ b/internal/util/testutil/event.go @@ -163,6 +163,12 @@ func (f *mockAttemptFactory) WithStatus(status string) func(*models.Attempt) { } } +func (f *mockAttemptFactory) WithCode(code string) func(*models.Attempt) { + return func(attempt *models.Attempt) { + attempt.Code = code + } +} + func (f *mockAttemptFactory) WithTime(time time.Time) func(*models.Attempt) { return func(attempt *models.Attempt) { attempt.Time = time From 04d264876b05ea9060f200fa7c84142e1f87d3ba Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 6 Feb 2026 17:11:07 +0700 Subject: [PATCH 08/10] chore: unify alert log messages with topic field Use generic "failed to send alert" / "alert sent" messages with a topic field instead of alert-type-specific message strings. Co-Authored-By: Claude Opus 4.6 --- internal/alert/monitor.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 8b6d9977..87cfbfe7 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -182,8 +182,9 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp Event: attempt.Event, }) if err := m.notifier.Notify(ctx, disabledAlert); err != nil { - m.logger.Ctx(ctx).Error("failed to send destination disabled alert", + m.logger.Ctx(ctx).Error("failed to send alert", zap.Error(err), + zap.String("topic", disabledAlert.Topic), zap.String("attempt_id", attempt.Attempt.ID), zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), @@ -208,8 +209,9 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp // Send alert if notifier is configured (best-effort, don't fail on notification error) if m.notifier != nil { if err := m.notifier.Notify(ctx, alert); err != nil { - m.logger.Ctx(ctx).Error("failed to send consecutive failure alert", + m.logger.Ctx(ctx).Error("failed to send alert", zap.Error(err), + zap.String("topic", alert.Topic), zap.String("attempt_id", attempt.Attempt.ID), zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), @@ -217,6 +219,7 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp ) } else { m.logger.Ctx(ctx).Audit("alert sent", + zap.String("topic", alert.Topic), zap.String("attempt_id", attempt.Attempt.ID), zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), From 49cb3297fcb8b7cc0e6e8e1b53095b077215924a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 6 Feb 2026 17:20:45 +0700 Subject: [PATCH 09/10] chore: use AttemptFactory in SendsDestinationDisabledAlert test Co-Authored-By: Claude Opus 4.6 --- internal/alert/monitor_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 03b92cc3..44c07045 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -303,13 +303,11 @@ func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { testutil.EventFactory.WithID("event_123"), testutil.EventFactory.WithTopic("test.event"), ) - testAttempt := &models.Attempt{ - ID: "attempt_disabled", - Status: "failed", - Code: "500", - ResponseData: map[string]interface{}{"error": "internal server error"}, - Time: time.Now(), - } + testAttempt := testutil.AttemptFactory.AnyPointer( + testutil.AttemptFactory.WithID("attempt_disabled"), + testutil.AttemptFactory.WithStatus("failed"), + testutil.AttemptFactory.WithCode("500"), + ) attempt := alert.DeliveryAttempt{ Event: event, Destination: dest, From fa4811510b1e8d5be39fe7f76783ed786bd1f916 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 12 Feb 2026 21:48:44 +0700 Subject: [PATCH 10/10] docs: update alert payload schema and add destination disabled alert Co-Authored-By: Claude Opus 4.6 --- docs/pages/features/alerts.mdx | 70 ++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 15 deletions(-) diff --git a/docs/pages/features/alerts.mdx b/docs/pages/features/alerts.mdx index 22cea6a5..98a902bc 100644 --- a/docs/pages/features/alerts.mdx +++ b/docs/pages/features/alerts.mdx @@ -12,26 +12,66 @@ It's your responsibility to format and deliver the alert to your tenant using yo ## Consecutive failure alerts -The `ALERT_CONSECUTIVE_FAILURE_COUNT` variable triggers an alert when the consecutive failure count reaches 50%, 70%, 90%, and 100%. At 100%, the destination will be disabled if the configuration is enabled. +The `ALERT_CONSECUTIVE_FAILURE_COUNT` variable triggers an `alert.destination.consecutive_failure` alert when the consecutive failure count reaches 50%, 70%, 90%, and 100% of the configured threshold. At 100%, the destination will be disabled if `ALERT_AUTO_DISABLE_DESTINATION` is enabled. ```json { - "topic": "alert.consecutive_failure", - "timestamp": "2025-05-29T05:07:09.269672003Z", + "topic": "alert.destination.consecutive_failure", + "timestamp": "2025-01-15T10:30:00Z", "data": { - "event": { - "id": "evt_id", - "topic": "user.created", - "metadata": {}, - "data": {} + "tenant_id": "tenant_123", + "event": { ... }, + "attempt": { ... }, + "consecutive_failures": { + "current": 10, + "max": 20, + "threshold": 50 }, - "max_consecutive_failures": 3, - "consecutive_failures": 3, - "will_disable": false, - "destination": {}, - "delivery_response": { - "body": "{\"success\":false,\"verified\":false,\"payload\":{\"user_id\":\"userid\"}}", - "status": 400 + "destination": { + "id": "dest_xyz", + "tenant_id": "tenant_123", + "type": "webhook", + "topics": ["*"], + "filter": {}, + "config": {}, + "metadata": {}, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "disabled_at": null + } + } +} +``` + +The `destination` object in alert payloads does not include `credentials` or `delivery_metadata` to avoid exposing sensitive information. + +When the threshold reaches 100% and the destination is auto-disabled, `destination.disabled_at` will be set in the payload reflecting the post-disable state. + +## Destination disabled alerts + +When a destination is auto-disabled after reaching the consecutive failure threshold, an `alert.destination.disabled` alert is sent. When both alerts are triggered, the disabled alert is sent first, but ordering is not guaranteed. + +```json +{ + "topic": "alert.destination.disabled", + "timestamp": "2025-01-15T10:30:00Z", + "data": { + "tenant_id": "tenant_123", + "disabled_at": "2025-01-15T10:30:00Z", + "reason": "consecutive_failure", + "event": { ... }, + "attempt": { ... }, + "destination": { + "id": "dest_xyz", + "tenant_id": "tenant_123", + "type": "webhook", + "topics": ["*"], + "filter": {}, + "config": {}, + "metadata": {}, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-15T10:30:00Z", + "disabled_at": "2025-01-15T10:30:00Z" } } }