diff --git a/cmd/e2e/alert/alert.go b/cmd/e2e/alert/alert.go index e96443cf..b950e8e8 100644 --- a/cmd/e2e/alert/alert.go +++ b/cmd/e2e/alert/alert.go @@ -18,17 +18,63 @@ type AlertRequest struct { } type AlertPayload struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data json.RawMessage `json:"data"` +} + +// ConsecutiveFailureAlert is a parsed alert for "alert.consecutive_failure" +type ConsecutiveFailureAlert struct { Topic string `json:"topic"` Timestamp time.Time `json:"timestamp"` Data ConsecutiveFailureData `json:"data"` } +// DestinationDisabledAlert is a parsed alert for "alert.destination.disabled" +type DestinationDisabledAlert struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data DestinationDisabledData `json:"data"` +} + +// AlertDestination matches internal/alert.AlertDestination +type AlertDestination struct { + ID string `json:"id"` + TenantID string `json:"tenant_id"` + Type string `json:"type"` + Topics []string `json:"topics"` + Filter map[string]any `json:"filter,omitempty"` + Config map[string]string `json:"config"` + Metadata map[string]string `json:"metadata,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + DisabledAt *time.Time `json:"disabled_at"` +} + +// ConsecutiveFailures represents the nested consecutive failure state +type ConsecutiveFailures struct { + Current int `json:"current"` + Max int `json:"max"` + Threshold int `json:"threshold"` +} + +// ConsecutiveFailureData matches internal/alert.ConsecutiveFailureData type ConsecutiveFailureData struct { - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - ConsecutiveFailures int `json:"consecutive_failures"` - WillDisable bool `json:"will_disable"` - Destination *models.Destination `json:"destination"` - Data map[string]interface{} `json:"data"` + TenantID string `json:"tenant_id"` + Attempt *models.Attempt `json:"attempt"` + Event *models.Event `json:"event"` + Destination *AlertDestination `json:"destination"` + ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"` +} + +// DestinationDisabledData matches the expected payload for "alert.destination.disabled" +type DestinationDisabledData struct { + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + Reason string `json:"reason"` + Attempt *models.Attempt `json:"attempt,omitempty"` + Event *models.Event `json:"event,omitempty"` } type AlertMockServer struct { @@ -117,6 +163,11 @@ func (s *AlertMockServer) handleAlert(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +// alertDataWithDestination is used to extract destination from any alert type +type alertDataWithDestination struct { + Destination *AlertDestination `json:"destination"` +} + // Helper methods for assertions func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertRequest { s.mu.RLock() @@ -124,7 +175,11 @@ func (s *AlertMockServer) GetAlertsForDestination(destinationID string) []AlertR var filtered []AlertRequest for _, alert := range s.alerts { - if alert.Alert.Data.Destination != nil && alert.Alert.Data.Destination.ID == destinationID { + var data alertDataWithDestination + if err := json.Unmarshal(alert.Alert.Data, &data); err != nil { + continue + } + if data.Destination != nil && data.Destination.ID == destinationID { filtered = append(filtered, alert) } } @@ -141,3 +196,42 @@ func (s *AlertMockServer) GetLastAlert() *AlertRequest { alert := s.alerts[len(s.alerts)-1] return &alert } + +// GetAlertsForDestinationByTopic returns alerts filtered by destination ID and topic +func (s *AlertMockServer) GetAlertsForDestinationByTopic(destinationID, topic string) []AlertRequest { + s.mu.RLock() + defer s.mu.RUnlock() + + var filtered []AlertRequest + for _, alert := range s.alerts { + if alert.Alert.Topic != topic { + continue + } + var data alertDataWithDestination + if err := json.Unmarshal(alert.Alert.Data, &data); err != nil { + continue + } + if data.Destination != nil && data.Destination.ID == destinationID { + filtered = append(filtered, alert) + } + } + return filtered +} + +// ParseConsecutiveFailureData parses the Data field as ConsecutiveFailureData +func (a *AlertRequest) ParseConsecutiveFailureData() (*ConsecutiveFailureData, error) { + var data ConsecutiveFailureData + if err := json.Unmarshal(a.Alert.Data, &data); err != nil { + return nil, err + } + return &data, nil +} + +// ParseDestinationDisabledData parses the Data field as DestinationDisabledData +func (a *AlertRequest) ParseDestinationDisabledData() (*DestinationDisabledData, error) { + var data DestinationDisabledData + if err := json.Unmarshal(a.Alert.Data, &data); err != nil { + return nil, err + } + return &data, nil +} diff --git a/cmd/e2e/alerts_test.go b/cmd/e2e/alerts_test.go index cbb2cc4e..5af4a863 100644 --- a/cmd/e2e/alerts_test.go +++ b/cmd/e2e/alerts_test.go @@ -7,7 +7,7 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) // Publish 20 failing events - for i := 0; i < 20; i++ { + for i := range 20 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -22,14 +22,46 @@ func (s *basicSuite) TestAlerts_ConsecutiveFailuresTriggerAlertCallback() { // Wait for 4 alert callbacks to be processed s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestination(dest.ID) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 18, 20} for i, alert := range alerts { + // Parse alert data + data, err := alert.ParseConsecutiveFailureData() + s.Require().NoError(err, "failed to parse consecutive failure data") + + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") - s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, + + // Topic assertion + s.Equal("alert.destination.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + + // TenantID assertion + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") + + // Event assertions + s.NotEmpty(data.Event.ID, "alert event should have ID") + s.Equal("user.created", data.Event.Topic, "alert event topic should match") + s.NotNil(data.Event.Data, "alert event should have data") + + // ConsecutiveFailures assertions + s.Equal(expectedCounts[i], data.ConsecutiveFailures.Current, "alert %d should have %d consecutive failures", i, expectedCounts[i]) + s.Equal(20, data.ConsecutiveFailures.Max, "max consecutive failures should be 20") + s.Greater(data.ConsecutiveFailures.Threshold, 0, "threshold should be > 0") + + // Attempt assertion + s.Require().NotNil(data.Attempt, "alert should have attempt") + s.NotEmpty(data.Attempt.ID, "attempt should have ID") + s.NotEmpty(data.Attempt.Status, "attempt should have status") } } @@ -38,7 +70,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) // First batch: 14 failures - for i := 0; i < 14; i++ { + for i := range 14 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -56,7 +88,7 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { s.waitForNewMockServerEvents(dest.mockID, 15) // Second batch: 14 more failures - for i := 0; i < 14; i++ { + for i := range 14 { s.publish(tenant.ID, "user.created", map[string]any{ "index": i, }, withPublishMetadata(map[string]string{"should_err": "true"})) @@ -71,13 +103,108 @@ func (s *basicSuite) TestAlerts_SuccessResetsConsecutiveFailureCounter() { // Wait for 4 alert callbacks: [10, 14] from first batch, [10, 14] from second batch s.waitForAlerts(dest.ID, 4) - alerts := s.alertServer.GetAlertsForDestination(dest.ID) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.consecutive_failure") s.Require().Len(alerts, 4, "should have 4 alerts") expectedCounts := []int{10, 14, 10, 14} for i, alert := range alerts { + // Parse alert data + data, err := alert.ParseConsecutiveFailureData() + s.Require().NoError(err, "failed to parse consecutive failure data") + + // Auth header assertion s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") - s.Equal(expectedCounts[i], alert.Alert.Data.ConsecutiveFailures, + + // Topic assertion + s.Equal("alert.destination.consecutive_failure", alert.Alert.Topic, "alert topic should be alert.consecutive_failure") + + // TenantID assertion + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") + + // Event assertions + s.NotEmpty(data.Event.ID, "alert event should have ID") + s.Equal("user.created", data.Event.Topic, "alert event topic should match") + s.NotNil(data.Event.Data, "alert event should have data") + + // ConsecutiveFailures assertions + s.Equal(expectedCounts[i], data.ConsecutiveFailures.Current, "alert %d should have %d consecutive failures", i, expectedCounts[i]) + s.Equal(20, data.ConsecutiveFailures.Max, "max consecutive failures should be 20") + s.Greater(data.ConsecutiveFailures.Threshold, 0, "threshold should be > 0") + s.Less(data.ConsecutiveFailures.Threshold, 100, "threshold should be < 100 (counter resets)") + + // Attempt assertion + s.Require().NotNil(data.Attempt, "alert should have attempt") + s.NotEmpty(data.Attempt.ID, "attempt should have ID") + } +} + +func (s *basicSuite) TestAlerts_DestinationDisabledCallback() { + tenant := s.createTenant() + dest := s.createWebhookDestination(tenant.ID, "*", withSecret(testSecret)) + + // Publish 20 failing events to trigger auto-disable + for i := range 20 { + s.publish(tenant.ID, "user.created", map[string]any{ + "index": i, + }, withPublishMetadata(map[string]string{"should_err": "true"})) } + + // Wait for destination to be disabled (sync point for all 20 deliveries) + s.waitForNewDestinationDisabled(tenant.ID, dest.ID) + + // Verify destination is disabled + got := s.getDestination(tenant.ID, dest.ID) + s.NotNil(got.DisabledAt, "destination should be disabled") + + // Wait for the destination.disabled alert callback + s.waitForAlertsByTopic(dest.ID, "alert.destination.disabled", 1) + alerts := s.alertServer.GetAlertsForDestinationByTopic(dest.ID, "alert.destination.disabled") + s.Require().Len(alerts, 1, "should have 1 destination.disabled alert") + + alert := alerts[0] + data, err := alert.ParseDestinationDisabledData() + s.Require().NoError(err, "failed to parse destination disabled data") + + // Auth header assertion + s.Equal(fmt.Sprintf("Bearer %s", s.config.APIKey), alert.AuthHeader, "auth header should match") + + // Topic assertion + s.Equal("alert.destination.disabled", alert.Alert.Topic, "alert topic should be alert.destination.disabled") + + // TenantID assertion + s.NotEmpty(data.TenantID, "alert should have tenant_id") + s.Equal(tenant.ID, data.TenantID, "alert tenant_id should match") + + // Destination assertions + s.Require().NotNil(data.Destination, "alert should have destination") + s.Equal(dest.ID, data.Destination.ID, "alert destination ID should match") + s.Equal(tenant.ID, data.Destination.TenantID, "alert destination tenant_id should match") + s.Equal("webhook", data.Destination.Type, "alert destination type should be webhook") + s.NotNil(data.Destination.DisabledAt, "destination should have disabled_at set") + + // DisabledAt assertion + s.False(data.DisabledAt.IsZero(), "disabled_at should not be zero") + + // Event assertions (optional but expected) + if data.Event != nil { + s.NotEmpty(data.Event.ID, "event should have ID") + s.Equal("user.created", data.Event.Topic, "event topic should match") + } + + // Attempt assertions (optional but expected) + if data.Attempt != nil { + s.NotEmpty(data.Attempt.ID, "attempt should have ID") + s.NotEmpty(data.Attempt.Status, "attempt should have status") + } + + // Reason assertion + s.Equal("consecutive_failure", data.Reason, "reason should be consecutive_failure") } diff --git a/cmd/e2e/helpers_test.go b/cmd/e2e/helpers_test.go index 852d390f..e2c3da56 100644 --- a/cmd/e2e/helpers_test.go +++ b/cmd/e2e/helpers_test.go @@ -407,6 +407,23 @@ func (s *basicSuite) waitForAlerts(destID string, count int) { s.Require().FailNowf("timeout", "timed out waiting for %d alerts for %s (got %d)", count, destID, lastCount) } +// waitForAlertsByTopic polls until at least count alerts with the specific topic exist for the destination. +func (s *basicSuite) waitForAlertsByTopic(destID, topic string, count int) { + s.T().Helper() + timeout := alertPollTimeout + deadline := time.Now().Add(timeout) + var lastCount int + + for time.Now().Before(deadline) { + lastCount = len(s.alertServer.GetAlertsForDestinationByTopic(destID, topic)) + if lastCount >= count { + return + } + time.Sleep(100 * time.Millisecond) + } + s.Require().FailNowf("timeout", "timed out waiting for %d %s alerts for %s (got %d)", count, topic, destID, lastCount) +} + // ============================================================================= // Absence assertion // ============================================================================= diff --git a/docs/pages/features/alerts.mdx b/docs/pages/features/alerts.mdx index 22cea6a5..98a902bc 100644 --- a/docs/pages/features/alerts.mdx +++ b/docs/pages/features/alerts.mdx @@ -12,26 +12,66 @@ It's your responsibility to format and deliver the alert to your tenant using yo ## Consecutive failure alerts -The `ALERT_CONSECUTIVE_FAILURE_COUNT` variable triggers an alert when the consecutive failure count reaches 50%, 70%, 90%, and 100%. At 100%, the destination will be disabled if the configuration is enabled. +The `ALERT_CONSECUTIVE_FAILURE_COUNT` variable triggers an `alert.destination.consecutive_failure` alert when the consecutive failure count reaches 50%, 70%, 90%, and 100% of the configured threshold. At 100%, the destination will be disabled if `ALERT_AUTO_DISABLE_DESTINATION` is enabled. ```json { - "topic": "alert.consecutive_failure", - "timestamp": "2025-05-29T05:07:09.269672003Z", + "topic": "alert.destination.consecutive_failure", + "timestamp": "2025-01-15T10:30:00Z", "data": { - "event": { - "id": "evt_id", - "topic": "user.created", - "metadata": {}, - "data": {} + "tenant_id": "tenant_123", + "event": { ... }, + "attempt": { ... }, + "consecutive_failures": { + "current": 10, + "max": 20, + "threshold": 50 }, - "max_consecutive_failures": 3, - "consecutive_failures": 3, - "will_disable": false, - "destination": {}, - "delivery_response": { - "body": "{\"success\":false,\"verified\":false,\"payload\":{\"user_id\":\"userid\"}}", - "status": 400 + "destination": { + "id": "dest_xyz", + "tenant_id": "tenant_123", + "type": "webhook", + "topics": ["*"], + "filter": {}, + "config": {}, + "metadata": {}, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "disabled_at": null + } + } +} +``` + +The `destination` object in alert payloads does not include `credentials` or `delivery_metadata` to avoid exposing sensitive information. + +When the threshold reaches 100% and the destination is auto-disabled, `destination.disabled_at` will be set in the payload reflecting the post-disable state. + +## Destination disabled alerts + +When a destination is auto-disabled after reaching the consecutive failure threshold, an `alert.destination.disabled` alert is sent. When both alerts are triggered, the disabled alert is sent first, but ordering is not guaranteed. + +```json +{ + "topic": "alert.destination.disabled", + "timestamp": "2025-01-15T10:30:00Z", + "data": { + "tenant_id": "tenant_123", + "disabled_at": "2025-01-15T10:30:00Z", + "reason": "consecutive_failure", + "event": { ... }, + "attempt": { ... }, + "destination": { + "id": "dest_xyz", + "tenant_id": "tenant_123", + "type": "webhook", + "topics": ["*"], + "filter": {}, + "config": {}, + "metadata": {}, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-15T10:30:00Z", + "disabled_at": "2025-01-15T10:30:00Z" } } } diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 39ff54f4..87cfbfe7 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -3,7 +3,6 @@ package alert import ( "context" "fmt" - "time" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" @@ -13,7 +12,7 @@ import ( // DestinationDisabler handles disabling destinations type DestinationDisabler interface { - DisableDestination(ctx context.Context, tenantID, destinationID string) error + DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) } // AlertMonitor is the main interface for handling delivery attempt alerts @@ -82,11 +81,9 @@ func WithDeploymentID(deploymentID string) AlertOption { // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { - Success bool - DeliveryTask *models.DeliveryTask - Destination *AlertDestination - Timestamp time.Time - DeliveryResponse map[string]interface{} + Event *models.Event + Destination *AlertDestination + Attempt *models.Attempt } type alertMonitor struct { @@ -138,7 +135,7 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ... } func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttempt) error { - if attempt.Success { + if attempt.Attempt.Status == models.AttemptStatusSuccess { return m.store.ResetConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID) } @@ -153,53 +150,83 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp return nil } - alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ - Event: AlertedEvent{ - ID: attempt.DeliveryTask.Event.ID, - Topic: attempt.DeliveryTask.Event.Topic, - Metadata: attempt.DeliveryTask.Event.Metadata, - Data: attempt.DeliveryTask.Event.Data, - }, - MaxConsecutiveFailures: m.autoDisableFailureCount, - ConsecutiveFailures: count, - WillDisable: m.disabler != nil && level == 100, - Destination: attempt.Destination, - DeliveryResponse: attempt.DeliveryResponse, - }) - // If we've hit 100% and have a disabler configured, disable the destination + alertDest := attempt.Destination if level == 100 && m.disabler != nil { - if err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID); err != nil { + disabledDest, err := m.disabler.DisableDestination(ctx, attempt.Destination.TenantID, attempt.Destination.ID) + if err != nil { return fmt.Errorf("failed to disable destination: %w", err) } + if disabledDest.DisabledAt == nil { + return fmt.Errorf("invariant violation: DisableDestination returned destination without DisabledAt set") + } m.logger.Ctx(ctx).Audit("destination disabled", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) + + alertDest = AlertDestinationFromDestination(&disabledDest) + + // Send destination disabled alert (best-effort, don't fail on notification error) + if m.notifier != nil { + disabledAlert := NewDestinationDisabledAlert(DestinationDisabledData{ + TenantID: attempt.Destination.TenantID, + Destination: alertDest, + DisabledAt: *disabledDest.DisabledAt, + Reason: "consecutive_failure", + Attempt: attempt.Attempt, + Event: attempt.Event, + }) + if err := m.notifier.Notify(ctx, disabledAlert); err != nil { + m.logger.Ctx(ctx).Error("failed to send alert", + zap.Error(err), + zap.String("topic", disabledAlert.Topic), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), + zap.String("tenant_id", attempt.Destination.TenantID), + zap.String("destination_id", attempt.Destination.ID), + ) + } + } } - // Send alert if notifier is configured + alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ + TenantID: attempt.Destination.TenantID, + Attempt: attempt.Attempt, + Event: attempt.Event, + Destination: alertDest, + ConsecutiveFailures: ConsecutiveFailures{ + Current: count, + Max: m.autoDisableFailureCount, + Threshold: level, + }, + }) + + // Send alert if notifier is configured (best-effort, don't fail on notification error) if m.notifier != nil { if err := m.notifier.Notify(ctx, alert); err != nil { m.logger.Ctx(ctx).Error("failed to send alert", zap.Error(err), - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("topic", alert.Topic), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), + zap.String("tenant_id", attempt.Destination.TenantID), + zap.String("destination_id", attempt.Destination.ID), + ) + } else { + m.logger.Ctx(ctx).Audit("alert sent", + zap.String("topic", alert.Topic), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), ) - return fmt.Errorf("failed to send alert: %w", err) } - - m.logger.Ctx(ctx).Audit("alert sent", - zap.String("event_id", attempt.DeliveryTask.Event.ID), - zap.String("tenant_id", attempt.Destination.TenantID), - zap.String("destination_id", attempt.Destination.ID), - zap.String("destination_type", attempt.Destination.Type), - ) } return nil diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 16372475..44c07045 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -27,9 +27,9 @@ type mockDestinationDisabler struct { mock.Mock } -func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { - m.Called(ctx, tenantID, destinationID) - return nil +func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) { + args := m.Called(ctx, tenantID, destinationID) + return args.Get(0).(models.Destination), args.Error(1) } func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { @@ -39,8 +39,14 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_1", + TenantID: "tenant_1", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -53,16 +59,16 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: "attempt_1", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), }, - Timestamp: time.Now(), } // Send 20 consecutive failures @@ -70,22 +76,28 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications were sent at correct thresholds - var notifyCallCount int + // Verify consecutive failure notifications were sent at correct thresholds + var consecutiveFailureCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - failures := alert.Data.ConsecutiveFailures - require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") - require.Equal(t, dest, alert.Data.Destination) - require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.DeliveryResponse, alert.Data.DeliveryResponse) - require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) - require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + cf := cfAlert.Data.ConsecutiveFailures + require.Contains(t, []int{10, 14, 18, 20}, cf.Current, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") + require.Equal(t, dest.ID, cfAlert.Data.Destination.ID) + require.Equal(t, dest.TenantID, cfAlert.Data.Destination.TenantID) + if cf.Threshold == 100 { + require.NotNil(t, cfAlert.Data.Destination.DisabledAt, "Destination should have DisabledAt at threshold 100") + } else { + require.Nil(t, cfAlert.Data.Destination.DisabledAt, "Destination should not have DisabledAt below threshold 100") + } + require.Equal(t, "alert.destination.consecutive_failure", cfAlert.Topic) + require.Equal(t, "attempt_1", cfAlert.Data.Attempt.ID) + require.Equal(t, 20, cf.Max) + } } } - require.Equal(t, 4, notifyCallCount, "Should have sent exactly 4 notifications") + require.Equal(t, 4, consecutiveFailureCount, "Should have sent exactly 4 consecutive failure notifications") // Verify destination was disabled exactly once at 100% var disableCallCount int @@ -106,8 +118,14 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_1", + TenantID: "tenant_1", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -120,16 +138,16 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} failedAttempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: "attempt_reset", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), }, - Timestamp: time.Now(), } // Send 14 failures (should trigger 50% and 66% alerts) @@ -141,8 +159,11 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { require.Equal(t, 2, len(notifier.Calls)) // Send a success to reset the counter - successAttempt := failedAttempt - successAttempt.Success = true + successAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{Status: models.AttemptStatusSuccess}, + } require.NoError(t, monitor.HandleAttempt(ctx, successAttempt)) // Clear the mock calls to start fresh @@ -159,8 +180,9 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { // Verify the notifications were at the right thresholds var seenCounts []int for _, call := range notifier.Calls { - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - seenCounts = append(seenCounts, alert.Data.ConsecutiveFailures) + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures.Current) + } } assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)") assert.Contains(t, seenCounts, 14, "Should have alerted at 66% (14 failures)") @@ -179,8 +201,14 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { redisClient := testutil.CreateTestRedisClient(t) notifier := &mockAlertNotifier{} notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + disabledAt := time.Now() + disabledDest := models.Destination{ + ID: "dest_above", + TenantID: "tenant_above", + DisabledAt: &disabledAt, + } disabler := &mockDestinationDisabler{} - disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(nil) + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(disabledDest, nil) monitor := alert.NewAlertMonitor( logger, @@ -193,15 +221,16 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_above", TenantID: "tenant_above"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: "attempt_above", + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), }, - Timestamp: time.Now(), } // Send 25 consecutive failures (5 more than the threshold) @@ -209,23 +238,24 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications at 50%, 70%, 90%, and 100% thresholds + // Verify consecutive failure notifications at 50%, 70%, 90%, and 100% thresholds // Plus additional notifications for failures 21-25 (all at 100% level) - var notifyCallCount int + var consecutiveFailureCount int var disableNotifyCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - if alertData.Data.ConsecutiveFailures >= 20 { - disableNotifyCount++ - require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + if cfAlert.Data.ConsecutiveFailures.Current >= 20 { + disableNotifyCount++ + require.Equal(t, 100, cfAlert.Data.ConsecutiveFailures.Threshold, "Threshold should be 100 at and above max") + } } } } // 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25 - require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)") - require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)") + require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)") + require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications at threshold 100 (20-25)") // Verify destination was disabled multiple times (once per failure >= 20) var disableCallCount int @@ -236,3 +266,163 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { } require.Equal(t, 6, disableCallCount, "Should have called disable 6 times (for failures 20-25)") } + +func TestAlertMonitor_SendsDestinationDisabledAlert(t *testing.T) { + // This test verifies that when a destination is auto-disabled after reaching + // the consecutive failure threshold, a DestinationDisabledAlert is sent via + // the notifier with topic "alert.destination.disabled". + t.Parallel() + ctx := context.Background() + logger := testutil.CreateTestLogger(t) + redisClient := testutil.CreateTestRedisClient(t) + notifier := &mockAlertNotifier{} + notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + + // Create a destination that will be returned by the disabler + disabledAt := time.Now() + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_disabled_test"), + testutil.DestinationFactory.WithTenantID("tenant_disabled_test"), + ) + modelsDest.DisabledAt = &disabledAt + disabler := &mockDestinationDisabler{} + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(modelsDest, nil) + + autoDisableCount := 5 + monitor := alert.NewAlertMonitor( + logger, + redisClient, + alert.WithNotifier(notifier), + alert.WithDisabler(disabler), + alert.WithAutoDisableFailureCount(autoDisableCount), + alert.WithAlertThresholds([]int{100}), // Only alert at 100% to simplify test + ) + + dest := alert.AlertDestinationFromDestination(&modelsDest) + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID("event_123"), + testutil.EventFactory.WithTopic("test.event"), + ) + testAttempt := testutil.AttemptFactory.AnyPointer( + testutil.AttemptFactory.WithID("attempt_disabled"), + testutil.AttemptFactory.WithStatus("failed"), + testutil.AttemptFactory.WithCode("500"), + ) + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: testAttempt, + } + + // Send exactly autoDisableCount failures to trigger auto-disable + for i := 1; i <= autoDisableCount; i++ { + require.NoError(t, monitor.HandleAttempt(ctx, attempt)) + } + + // Verify destination was disabled + disabler.AssertCalled(t, "DisableDestination", mock.Anything, dest.TenantID, dest.ID) + + // Find the DestinationDisabledAlert in the notifier calls + var foundDestinationDisabledAlert bool + var destinationDisabledAlert alert.DestinationDisabledAlert + for _, call := range notifier.Calls { + if call.Method == "Notify" { + alertArg := call.Arguments.Get(1) + if disabledAlert, ok := alertArg.(alert.DestinationDisabledAlert); ok { + foundDestinationDisabledAlert = true + destinationDisabledAlert = disabledAlert + break + } + } + } + + require.True(t, foundDestinationDisabledAlert, "Expected DestinationDisabledAlert to be sent when destination is disabled") + + // Verify the alert topic + assert.Equal(t, "alert.destination.disabled", destinationDisabledAlert.Topic, "Alert should have topic 'alert.destination.disabled'") + + // Verify the alert data + assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.TenantID, "TenantID should match") + assert.Equal(t, dest.ID, destinationDisabledAlert.Data.Destination.ID, "Destination ID should match") + assert.Equal(t, dest.TenantID, destinationDisabledAlert.Data.Destination.TenantID, "Destination TenantID should match") + assert.NotNil(t, destinationDisabledAlert.Data.Destination.DisabledAt, "Destination DisabledAt should be set") + assert.False(t, destinationDisabledAlert.Data.DisabledAt.IsZero(), "DisabledAt should be set") + // Verify the alert's DisabledAt matches the destination's DisabledAt exactly + assert.Equal(t, disabledAt, destinationDisabledAlert.Data.DisabledAt, "Alert DisabledAt should match destination's DisabledAt exactly") + assert.Equal(t, disabledAt, *destinationDisabledAlert.Data.Destination.DisabledAt, "Alert Destination.DisabledAt should match destination's DisabledAt exactly") + assert.Equal(t, "consecutive_failure", destinationDisabledAlert.Data.Reason, "Reason should be consecutive_failure") + + // Verify the attempt is included + require.NotNil(t, destinationDisabledAlert.Data.Attempt, "Attempt should be set") + assert.Equal(t, testAttempt.ID, destinationDisabledAlert.Data.Attempt.ID, "Attempt ID should match") + + // Verify the event is included + require.NotNil(t, destinationDisabledAlert.Data.Event, "Event should be set") + assert.Equal(t, event.ID, destinationDisabledAlert.Data.Event.ID, "Event ID should match") + assert.Equal(t, event.Topic, destinationDisabledAlert.Data.Event.Topic, "Event Topic should match") +} + +func TestAlertMonitor_ConsecutiveFailureAlert_ReflectsDisabledDestination(t *testing.T) { + // Tests that the consecutive failure alert at threshold 100 includes + // the destination with DisabledAt set, reflecting the post-disable state. + t.Parallel() + ctx := context.Background() + logger := testutil.CreateTestLogger(t) + redisClient := testutil.CreateTestRedisClient(t) + notifier := &mockAlertNotifier{} + notifier.On("Notify", mock.Anything, mock.Anything).Return(nil) + + disabledAt := time.Now() + modelsDest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithID("dest_reflect"), + testutil.DestinationFactory.WithTenantID("tenant_reflect"), + ) + modelsDest.DisabledAt = &disabledAt + disabler := &mockDestinationDisabler{} + disabler.On("DisableDestination", mock.Anything, mock.Anything, mock.Anything).Return(modelsDest, nil) + + autoDisableCount := 5 + monitor := alert.NewAlertMonitor( + logger, + redisClient, + alert.WithNotifier(notifier), + alert.WithDisabler(disabler), + alert.WithAutoDisableFailureCount(autoDisableCount), + alert.WithAlertThresholds([]int{100}), + ) + + dest := &alert.AlertDestination{ID: "dest_reflect", TenantID: "tenant_reflect"} + event := testutil.EventFactory.AnyPointer( + testutil.EventFactory.WithID("event_reflect"), + ) + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: testutil.AttemptFactory.AnyPointer( + testutil.AttemptFactory.WithID("attempt_reflect"), + testutil.AttemptFactory.WithStatus("failed"), + testutil.AttemptFactory.WithCode("500"), + ), + } + + for i := 1; i <= autoDisableCount; i++ { + require.NoError(t, monitor.HandleAttempt(ctx, attempt)) + } + + // Find the consecutive failure alert at threshold 100 + var found bool + for _, call := range notifier.Calls { + if call.Method == "Notify" { + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + if cfAlert.Data.ConsecutiveFailures.Threshold == 100 { + found = true + // The destination in the alert should reflect the disabled state + require.NotNil(t, cfAlert.Data.Destination.DisabledAt, "Destination in consecutive failure alert at threshold 100 should have DisabledAt set") + assert.Equal(t, disabledAt, *cfAlert.Data.Destination.DisabledAt, "DisabledAt should match") + break + } + } + } + } + require.True(t, found, "Should have found a consecutive failure alert at threshold 100") +} diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index ef9589bc..fd8724c9 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -12,9 +12,7 @@ import ( ) // Alert represents any alert that can be sent -type Alert interface { - json.Marshaler -} +type Alert interface{} // AlertNotifier sends alerts to configured destinations type AlertNotifier interface { @@ -42,31 +40,48 @@ func NotifierWithBearerToken(token string) NotifierOption { } } -type AlertedEvent struct { - ID string `json:"id"` - Topic string `json:"topic"` // event topic - Metadata map[string]string `json:"metadata"` // event metadata - Data map[string]interface{} `json:"data"` // event payload +type AlertDestination struct { + ID string `json:"id" redis:"id"` + TenantID string `json:"tenant_id" redis:"-"` + Type string `json:"type" redis:"type"` + Topics models.Topics `json:"topics" redis:"-"` + Filter models.Filter `json:"filter,omitempty" redis:"-"` + Config models.Config `json:"config" redis:"-"` + Metadata models.Metadata `json:"metadata,omitempty" redis:"-"` + CreatedAt time.Time `json:"created_at" redis:"created_at"` + UpdatedAt time.Time `json:"updated_at" redis:"updated_at"` + DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` } -type AlertDestination struct { - ID string `json:"id" redis:"id"` - TenantID string `json:"tenant_id" redis:"-"` - Type string `json:"type" redis:"type"` - Topics models.Topics `json:"topics" redis:"-"` - Config models.Config `json:"config" redis:"-"` - CreatedAt time.Time `json:"created_at" redis:"created_at"` - DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` +func AlertDestinationFromDestination(d *models.Destination) *AlertDestination { + return &AlertDestination{ + ID: d.ID, + TenantID: d.TenantID, + Type: d.Type, + Topics: d.Topics, + Filter: d.Filter, + Config: d.Config, + Metadata: d.Metadata, + CreatedAt: d.CreatedAt, + UpdatedAt: d.UpdatedAt, + DisabledAt: d.DisabledAt, + } +} + +// ConsecutiveFailures represents the nested consecutive failure state +type ConsecutiveFailures struct { + Current int `json:"current"` + Max int `json:"max"` + Threshold int `json:"threshold"` } // ConsecutiveFailureData represents the data needed for a consecutive failure alert type ConsecutiveFailureData struct { - Event AlertedEvent `json:"event"` - MaxConsecutiveFailures int `json:"max_consecutive_failures"` - ConsecutiveFailures int `json:"consecutive_failures"` - WillDisable bool `json:"will_disable"` - Destination *AlertDestination `json:"destination"` - DeliveryResponse map[string]interface{} `json:"delivery_response"` + TenantID string `json:"tenant_id"` + Attempt *models.Attempt `json:"attempt"` + Event *models.Event `json:"event"` + Destination *AlertDestination `json:"destination"` + ConsecutiveFailures ConsecutiveFailures `json:"consecutive_failures"` } // ConsecutiveFailureAlert represents an alert for consecutive failures @@ -76,16 +91,36 @@ type ConsecutiveFailureAlert struct { Data ConsecutiveFailureData `json:"data"` } -// MarshalJSON implements json.Marshaler -func (a ConsecutiveFailureAlert) MarshalJSON() ([]byte, error) { - type Alias ConsecutiveFailureAlert - return json.Marshal(Alias(a)) -} - // NewConsecutiveFailureAlert creates a new consecutive failure alert with defaults func NewConsecutiveFailureAlert(data ConsecutiveFailureData) ConsecutiveFailureAlert { return ConsecutiveFailureAlert{ - Topic: "alert.consecutive_failure", + Topic: "alert.destination.consecutive_failure", + Timestamp: time.Now(), + Data: data, + } +} + +// DestinationDisabledData represents the data for a destination disabled alert +type DestinationDisabledData struct { + TenantID string `json:"tenant_id"` + Destination *AlertDestination `json:"destination"` + DisabledAt time.Time `json:"disabled_at"` + Reason string `json:"reason"` + Attempt *models.Attempt `json:"attempt,omitempty"` + Event *models.Event `json:"event,omitempty"` +} + +// DestinationDisabledAlert represents an alert for when a destination is auto-disabled +type DestinationDisabledAlert struct { + Topic string `json:"topic"` + Timestamp time.Time `json:"timestamp"` + Data DestinationDisabledData `json:"data"` +} + +// NewDestinationDisabledAlert creates a new destination disabled alert with defaults +func NewDestinationDisabledAlert(data DestinationDisabledData) DestinationDisabledAlert { + return DestinationDisabledAlert{ + Topic: "alert.destination.disabled", Timestamp: time.Now(), Data: data, } @@ -110,7 +145,7 @@ func NewHTTPAlertNotifier(callbackURL string, opts ...NotifierOption) AlertNotif } func (n *httpAlertNotifier) Notify(ctx context.Context, alert Alert) error { - body, err := alert.MarshalJSON() + body, err := json.Marshal(alert) if err != nil { return fmt.Errorf("failed to marshal alert: %w", err) } diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 31661e52..2aece718 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -16,107 +17,110 @@ import ( func TestAlertNotifier_Notify(t *testing.T) { t.Parallel() - tests := []struct { - name string - handler func(w http.ResponseWriter, r *http.Request) - notifierOpts []alert.NotifierOption - wantErr bool - errContains string - }{ - { - name: "successful notification", - handler: func(w http.ResponseWriter, r *http.Request) { - // Verify request - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - - // Read and verify request body - var body map[string]interface{} - err := json.NewDecoder(r.Body).Decode(&body) - require.NoError(t, err) - - assert.Equal(t, "alert.consecutive_failure", body["topic"]) - data := body["data"].(map[string]interface{}) - assert.Equal(t, float64(10), data["max_consecutive_failures"]) - assert.Equal(t, float64(5), data["consecutive_failures"]) - assert.Equal(t, true, data["will_disable"]) - - // Log raw JSON for debugging - rawJSON, _ := json.Marshal(body) - t.Logf("Raw JSON: %s", string(rawJSON)) - - w.WriteHeader(http.StatusOK) - }, - }, - { - name: "server error", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - }, - wantErr: true, - errContains: "alert callback failed with status 500", - }, - { - name: "invalid response status", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - }, - wantErr: true, - errContains: "alert callback failed with status 400", - }, - { - name: "timeout exceeded", - handler: func(w http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - w.WriteHeader(http.StatusOK) + t.Run("successful notification", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + // Verify request + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + // Read and verify request body + var body map[string]any + err := json.NewDecoder(r.Body).Decode(&body) + require.NoError(t, err) + + assert.Equal(t, "alert.destination.consecutive_failure", body["topic"]) + data := body["data"].(map[string]any) + cf := data["consecutive_failures"].(map[string]any) + assert.Equal(t, float64(5), cf["current"]) + assert.Equal(t, float64(10), cf["max"]) + assert.Equal(t, float64(50), cf["threshold"]) + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + ConsecutiveFailures: alert.ConsecutiveFailures{ + Current: 5, + Max: 10, + Threshold: 50, }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithTimeout(50 * time.Millisecond)}, - wantErr: true, - errContains: "context deadline exceeded", - }, - { - name: "successful notification with bearer token", - handler: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) - w.WriteHeader(http.StatusOK) + }) + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("successful notification with bearer token", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithBearerToken("test-token")) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + ConsecutiveFailures: alert.ConsecutiveFailures{ + Current: 5, + Max: 10, + Threshold: 50, }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithBearerToken("test-token")}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // Create test server - ts := httptest.NewServer(http.HandlerFunc(tt.handler)) - defer ts.Close() - - // Create notifier - notifier := alert.NewHTTPAlertNotifier(ts.URL, tt.notifierOpts...) - - // Create test alert - dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} - testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "error", - "data": map[string]any{"code": "ETIMEDOUT"}, - }, - }) - - // Send alert - err := notifier.Notify(context.Background(), testAlert) - - if tt.wantErr { - require.Error(t, err) - assert.ErrorContains(t, err, tt.errContains) - } else { - require.NoError(t, err) - } }) - } + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("server error returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + }) + + t.Run("timeout returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithTimeout(50*time.Millisecond)) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to send alert") + }) } diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index f077c99d..92642527 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -272,7 +272,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return &PostDeliveryError{err: logErr} } - go h.handleAlertAttempt(ctx, task, destination, attempt, err) + go h.handleAlertAttempt(ctx, destination, &task.Event, attempt) // If we have an AttemptError, return it as is var atmErr *AttemptError @@ -294,47 +294,18 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return nil } -func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attemptResult *models.Attempt, err error) { - alertAttempt := alert.DeliveryAttempt{ - Success: attemptResult.Status == models.AttemptStatusSuccess, - DeliveryTask: task, - Destination: &alert.AlertDestination{ - ID: destination.ID, - TenantID: destination.TenantID, - Type: destination.Type, - Topics: destination.Topics, - Config: destination.Config, - CreatedAt: destination.CreatedAt, - DisabledAt: destination.DisabledAt, - }, - Timestamp: attemptResult.Time, +func (h *messageHandler) handleAlertAttempt(ctx context.Context, destination *models.Destination, event *models.Event, attempt *models.Attempt) { + da := alert.DeliveryAttempt{ + Event: event, + Destination: alert.AlertDestinationFromDestination(destination), + Attempt: attempt, } - if !alertAttempt.Success && err != nil { - // Extract attempt data if available - var atmErr *AttemptError - if errors.As(err, &atmErr) { - var pubErr *destregistry.ErrDestinationPublishAttempt - if errors.As(atmErr.err, &pubErr) { - alertAttempt.DeliveryResponse = pubErr.Data - } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ - "error": atmErr.err.Error(), - } - } - } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ - "error": "unexpected", - "message": err.Error(), - } - } - } - - if monitorErr := h.alertMonitor.HandleAttempt(ctx, alertAttempt); monitorErr != nil { + if monitorErr := h.alertMonitor.HandleAttempt(ctx, da); monitorErr != nil { h.logger.Ctx(ctx).Error("failed to handle alert attempt", zap.Error(monitorErr), - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) @@ -342,8 +313,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De } h.logger.Ctx(ctx).Info("alert attempt handled", - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index ebbab75f..23b0ccc9 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -258,7 +258,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { "should use GetRetryID for task ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_PublishError_NotEligible(t *testing.T) { @@ -326,7 +326,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { assert.Equal(t, 1, publisher.current, "should only attempt once") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_RetryFlow(t *testing.T) { @@ -833,7 +833,7 @@ func TestManualDelivery_PublishError(t *testing.T) { assert.Empty(t, retryScheduler.schedules, "should not schedule retry for manual delivery") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestManualDelivery_CancelError(t *testing.T) { @@ -896,7 +896,7 @@ func TestManualDelivery_CancelError(t *testing.T) { assert.Equal(t, models.RetryID(task.Event.ID, task.DestinationID), retryScheduler.canceled[0], "should cancel with correct retry ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusSuccess, logPublisher.entries[0].Attempt.Status, "delivery status should be OK despite cancel error") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestManualDelivery_DestinationDisabled(t *testing.T) { @@ -986,10 +986,10 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Setup alert monitor expectations alertMonitor.On("HandleAttempt", mock.Anything, mock.MatchedBy(func(attempt alert.DeliveryAttempt) bool { - return attempt.Success && // Should be a successful attempt - attempt.Destination.ID == destination.ID && // Should have correct destination - attempt.DeliveryTask != nil && // Should have delivery task - attempt.DeliveryResponse == nil // No error data for success + return attempt.Attempt.Status == models.AttemptStatusSuccess && + attempt.Destination.ID == destination.ID && + attempt.Event != nil && + attempt.Attempt != nil })).Return(nil) // Setup message handler @@ -1020,7 +1020,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Assert behavior assert.True(t, mockMsg.acked, "message should be acked on success") assert.False(t, mockMsg.nacked, "message should not be nacked on success") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestMessageHandler_AlertMonitorError(t *testing.T) { @@ -1094,7 +1094,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { } // Helper function to assert alert monitor calls -func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination, expectedData map[string]interface{}) { +func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination) { t.Helper() // Wait for the alert monitor to be called @@ -1107,15 +1107,10 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina lastCall := calls[len(calls)-1] attempt := lastCall.Arguments[1].(alert.DeliveryAttempt) - assert.Equal(t, success, attempt.Success, "alert attempt success should match") + assert.Equal(t, success, attempt.Attempt.Status == models.AttemptStatusSuccess, "alert attempt success should match") assert.Equal(t, destination.ID, attempt.Destination.ID, "alert attempt destination should match") - assert.NotNil(t, attempt.DeliveryTask, "alert attempt should have delivery task") - - if expectedData != nil { - assert.Equal(t, expectedData, attempt.DeliveryResponse, "alert attempt data should match") - } else { - assert.Nil(t, attempt.DeliveryResponse, "alert attempt should not have data") - } + assert.NotNil(t, attempt.Event, "alert attempt should have event") + assert.NotNil(t, attempt.Attempt, "alert attempt should have attempt data") } func TestManualDelivery_DuplicateRetry(t *testing.T) { diff --git a/internal/services/builder.go b/internal/services/builder.go index b3c0e71b..961b1fd0 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -18,6 +18,7 @@ import ( "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logmq" "github.com/hookdeck/outpost/internal/logstore" + "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/publishmq" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/scheduler" @@ -413,17 +414,20 @@ func newDestinationDisabler(tenantStore tenantstore.TenantStore) alert.Destinati } } -func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { +func (d *destinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) (models.Destination, error) { destination, err := d.tenantStore.RetrieveDestination(ctx, tenantID, destinationID) if err != nil { - return err + return models.Destination{}, err } if destination == nil { - return nil + return models.Destination{}, fmt.Errorf("destination not found") } now := time.Now() destination.DisabledAt = &now - return d.tenantStore.UpsertDestination(ctx, *destination) + if err := d.tenantStore.UpsertDestination(ctx, *destination); err != nil { + return models.Destination{}, err + } + return *destination, nil } // Helper methods for serviceInstance to initialize common dependencies diff --git a/internal/util/testutil/event.go b/internal/util/testutil/event.go index 31ba24c6..9f085ab2 100644 --- a/internal/util/testutil/event.go +++ b/internal/util/testutil/event.go @@ -163,6 +163,12 @@ func (f *mockAttemptFactory) WithStatus(status string) func(*models.Attempt) { } } +func (f *mockAttemptFactory) WithCode(code string) func(*models.Attempt) { + return func(attempt *models.Attempt) { + attempt.Code = code + } +} + func (f *mockAttemptFactory) WithTime(time time.Time) func(*models.Attempt) { return func(attempt *models.Attempt) { attempt.Time = time