diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 39ff54f4..90006a80 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -3,7 +3,6 @@ package alert import ( "context" "fmt" - "time" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" @@ -82,11 +81,9 @@ func WithDeploymentID(deploymentID string) AlertOption { // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { - Success bool - DeliveryTask *models.DeliveryTask - Destination *AlertDestination - Timestamp time.Time - DeliveryResponse map[string]interface{} + Event *models.Event + Destination *AlertDestination + Attempt *models.Attempt } type alertMonitor struct { @@ -138,12 +135,12 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ... } func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttempt) error { - if attempt.Success { + if attempt.Attempt.Status == models.AttemptStatusSuccess { return m.store.ResetConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID) } - // Get alert state - count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID) + // Get alert state — attemptID ensures idempotent counting on message replay + count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID, attempt.Attempt.ID) if err != nil { return fmt.Errorf("failed to get alert state: %w", err) } @@ -155,16 +152,16 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ Event: AlertedEvent{ - ID: attempt.DeliveryTask.Event.ID, - Topic: attempt.DeliveryTask.Event.Topic, - Metadata: attempt.DeliveryTask.Event.Metadata, - Data: attempt.DeliveryTask.Event.Data, + ID: attempt.Event.ID, + Topic: attempt.Event.Topic, + Metadata: attempt.Event.Metadata, + Data: attempt.Event.Data, }, MaxConsecutiveFailures: m.autoDisableFailureCount, ConsecutiveFailures: count, WillDisable: m.disabler != nil && level == 100, Destination: attempt.Destination, - DeliveryResponse: attempt.DeliveryResponse, + DeliveryResponse: attempt.Attempt.ResponseData, }) // If we've hit 100% and have a disabler configured, disable the destination @@ -174,7 +171,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } m.logger.Ctx(ctx).Audit("destination disabled", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), @@ -186,7 +184,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp if err := m.notifier.Notify(ctx, alert); err != nil { m.logger.Ctx(ctx).Error("failed to send alert", zap.Error(err), - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), @@ -195,7 +194,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } m.logger.Ctx(ctx).Audit("alert sent", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 16372475..8f5ad004 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -2,6 +2,7 @@ package alert_test import ( "context" + "fmt" "testing" "time" @@ -28,8 +29,8 @@ type mockDestinationDisabler struct { } func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { - m.Called(ctx, tenantID, destinationID) - return nil + args := m.Called(ctx, tenantID, destinationID) + return args.Error(0) } func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { @@ -53,39 +54,38 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} - attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, - }, - Timestamp: time.Now(), - } - // Send 20 consecutive failures + // Send 20 consecutive failures (each with a unique attempt ID) for i := 1; i <= 20; i++ { + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications were sent at correct thresholds - var notifyCallCount int + // Verify consecutive failure notifications were sent at correct thresholds + var consecutiveFailureCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - failures := alert.Data.ConsecutiveFailures - require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") - require.Equal(t, dest, alert.Data.Destination) - require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.DeliveryResponse, alert.Data.DeliveryResponse) - require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) - require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + require.Contains(t, []int{10, 14, 18, 20}, cfAlert.Data.ConsecutiveFailures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") + require.Equal(t, dest.ID, cfAlert.Data.Destination.ID) + require.Equal(t, dest.TenantID, cfAlert.Data.Destination.TenantID) + require.Equal(t, "alert.consecutive_failure", cfAlert.Topic) + require.Equal(t, 20, cfAlert.Data.MaxConsecutiveFailures) + } } } - require.Equal(t, 4, notifyCallCount, "Should have sent exactly 4 notifications") + require.Equal(t, 4, consecutiveFailureCount, "Should have sent exactly 4 consecutive failure notifications") // Verify destination was disabled exactly once at 100% var disableCallCount int @@ -120,20 +120,20 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} - failedAttempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, - }, - Timestamp: time.Now(), - } // Send 14 failures (should trigger 50% and 66% alerts) for i := 1; i <= 14; i++ { + failedAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, failedAttempt)) } @@ -141,15 +141,29 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { require.Equal(t, 2, len(notifier.Calls)) // Send a success to reset the counter - successAttempt := failedAttempt - successAttempt.Success = true + successAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{Status: models.AttemptStatusSuccess}, + } require.NoError(t, monitor.HandleAttempt(ctx, successAttempt)) // Clear the mock calls to start fresh notifier.Calls = nil - // Send 14 more failures - for i := 1; i <= 14; i++ { + // Send 14 more failures (new attempt IDs) + for i := 15; i <= 28; i++ { + failedAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, failedAttempt)) } @@ -159,8 +173,9 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { // Verify the notifications were at the right thresholds var seenCounts []int for _, call := range notifier.Calls { - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - seenCounts = append(seenCounts, alert.Data.ConsecutiveFailures) + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures) + } } assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)") assert.Contains(t, seenCounts, 14, "Should have alerted at 66% (14 failures)") @@ -193,39 +208,41 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_above", TenantID: "tenant_above"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} - attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - }, - Timestamp: time.Now(), - } // Send 25 consecutive failures (5 more than the threshold) for i := 1; i <= 25; i++ { + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications at 50%, 70%, 90%, and 100% thresholds + // Verify consecutive failure notifications at 50%, 70%, 90%, and 100% thresholds // Plus additional notifications for failures 21-25 (all at 100% level) - var notifyCallCount int + var consecutiveFailureCount int var disableNotifyCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - if alertData.Data.ConsecutiveFailures >= 20 { - disableNotifyCount++ - require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + if cfAlert.Data.ConsecutiveFailures >= 20 { + disableNotifyCount++ + require.True(t, cfAlert.Data.WillDisable, "WillDisable should be true at and above max") + } } } } // 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25 - require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)") - require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)") + require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)") + require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications at threshold 100 (20-25)") // Verify destination was disabled multiple times (once per failure >= 20) var disableCallCount int diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index beedcb95..25eaaf08 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -59,6 +59,18 @@ type AlertDestination struct { DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` } +func AlertDestinationFromDestination(d *models.Destination) *AlertDestination { + return &AlertDestination{ + ID: d.ID, + TenantID: d.TenantID, + Type: d.Type, + Topics: d.Topics, + Config: d.Config, + CreatedAt: d.CreatedAt, + DisabledAt: d.DisabledAt, + } +} + // ConsecutiveFailureData represents the data needed for a consecutive failure alert type ConsecutiveFailureData struct { Event AlertedEvent `json:"event"` diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 31661e52..716c6664 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -16,107 +17,103 @@ import ( func TestAlertNotifier_Notify(t *testing.T) { t.Parallel() - tests := []struct { - name string - handler func(w http.ResponseWriter, r *http.Request) - notifierOpts []alert.NotifierOption - wantErr bool - errContains string - }{ - { - name: "successful notification", - handler: func(w http.ResponseWriter, r *http.Request) { - // Verify request - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - - // Read and verify request body - var body map[string]interface{} - err := json.NewDecoder(r.Body).Decode(&body) - require.NoError(t, err) - - assert.Equal(t, "alert.consecutive_failure", body["topic"]) - data := body["data"].(map[string]interface{}) - assert.Equal(t, float64(10), data["max_consecutive_failures"]) - assert.Equal(t, float64(5), data["consecutive_failures"]) - assert.Equal(t, true, data["will_disable"]) - - // Log raw JSON for debugging - rawJSON, _ := json.Marshal(body) - t.Logf("Raw JSON: %s", string(rawJSON)) - - w.WriteHeader(http.StatusOK) - }, - }, - { - name: "server error", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - }, - wantErr: true, - errContains: "alert callback failed with status 500", - }, - { - name: "invalid response status", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - }, - wantErr: true, - errContains: "alert callback failed with status 400", - }, - { - name: "timeout exceeded", - handler: func(w http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - w.WriteHeader(http.StatusOK) - }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithTimeout(50 * time.Millisecond)}, - wantErr: true, - errContains: "context deadline exceeded", - }, - { - name: "successful notification with bearer token", - handler: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) - w.WriteHeader(http.StatusOK) - }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithBearerToken("test-token")}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // Create test server - ts := httptest.NewServer(http.HandlerFunc(tt.handler)) - defer ts.Close() - - // Create notifier - notifier := alert.NewHTTPAlertNotifier(ts.URL, tt.notifierOpts...) - - // Create test alert - dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} - testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "error", - "data": map[string]any{"code": "ETIMEDOUT"}, - }, - }) - - // Send alert - err := notifier.Notify(context.Background(), testAlert) - - if tt.wantErr { - require.Error(t, err) - assert.ErrorContains(t, err, tt.errContains) - } else { - require.NoError(t, err) - } + t.Run("successful notification", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + var body map[string]any + err := json.NewDecoder(r.Body).Decode(&body) + require.NoError(t, err) + + assert.Equal(t, "alert.consecutive_failure", body["topic"]) + data := body["data"].(map[string]any) + assert.Equal(t, float64(5), data["consecutive_failures"]) + assert.Equal(t, float64(10), data["max_consecutive_failures"]) + assert.Equal(t, true, data["will_disable"]) + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + ConsecutiveFailures: 5, + MaxConsecutiveFailures: 10, + WillDisable: true, + Destination: dest, }) - } + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("successful notification with bearer token", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithBearerToken("test-token")) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + ConsecutiveFailures: 5, + MaxConsecutiveFailures: 10, + WillDisable: true, + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("server error returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + }) + + t.Run("timeout returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithTimeout(50*time.Millisecond)) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to send alert") + }) } diff --git a/internal/alert/store.go b/internal/alert/store.go index 3bc3a0a4..fd4ebb42 100644 --- a/internal/alert/store.go +++ b/internal/alert/store.go @@ -9,13 +9,13 @@ import ( ) const ( - keyPrefixAlert = "alert" // Base prefix for all alert keys - keyFailures = "consecutive_failures" // Counter for consecutive failures) + keyPrefixAlert = "alert" // Base prefix for all alert keys + keyFailures = "cf" // Set for consecutive failure attempt IDs ) // AlertStore manages alert-related data persistence type AlertStore interface { - IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) + IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (int, error) ResetConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) error } @@ -32,14 +32,15 @@ func NewRedisAlertStore(client redis.Cmdable, deploymentID string) AlertStore { } } -func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) { +func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (int, error) { key := s.getFailuresKey(destinationID) - // Use a transaction to ensure atomicity between INCR and EXPIRE operations. - // Since all operations use the same key, they will be routed to the same hash slot - // in Redis cluster mode, making transactions safe to use. + // Use a transaction to ensure atomicity between SADD, SCARD, and EXPIRE operations. + // SADD is idempotent — adding the same attemptID on replay is a no-op, + // preventing double-counting when messages are redelivered. pipe := s.client.TxPipeline() - incrCmd := pipe.Incr(ctx, key) + pipe.SAdd(ctx, key, attemptID) + scardCmd := pipe.SCard(ctx, key) pipe.Expire(ctx, key, 24*time.Hour) _, err := pipe.Exec(ctx) @@ -47,10 +48,9 @@ func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, return 0, fmt.Errorf("failed to execute consecutive failure count transaction: %w", err) } - // Get the incremented count from the INCR command result - count, err := incrCmd.Result() + count, err := scardCmd.Result() if err != nil { - return 0, fmt.Errorf("failed to get incremented consecutive failure count: %w", err) + return 0, fmt.Errorf("failed to get consecutive failure count: %w", err) } return int(count), nil diff --git a/internal/alert/store_test.go b/internal/alert/store_test.go index f70a0189..0fdfe5a4 100644 --- a/internal/alert/store_test.go +++ b/internal/alert/store_test.go @@ -19,12 +19,12 @@ func TestRedisAlertStore(t *testing.T) { store := alert.NewRedisAlertStore(redisClient, "") // First increment - count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_1") require.NoError(t, err) assert.Equal(t, 1, count) - // Second increment - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + // Second increment (different attempt) + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_2") require.NoError(t, err) assert.Equal(t, 2, count) }) @@ -35,7 +35,7 @@ func TestRedisAlertStore(t *testing.T) { store := alert.NewRedisAlertStore(redisClient, "") // Set up initial failures - count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2") + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2", "att_1") require.NoError(t, err) assert.Equal(t, 1, count) @@ -44,10 +44,36 @@ func TestRedisAlertStore(t *testing.T) { require.NoError(t, err) // Verify counter is reset by incrementing again - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2") + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2", "att_2") require.NoError(t, err) assert.Equal(t, 1, count) }) + + t.Run("idempotent on replay", func(t *testing.T) { + t.Parallel() + redisClient := testutil.CreateTestRedisClient(t) + store := alert.NewRedisAlertStore(redisClient, "") + + // First call + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_1") + require.NoError(t, err) + assert.Equal(t, 1, count) + + // Replay same attempt — count should not change + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_1") + require.NoError(t, err) + assert.Equal(t, 1, count, "replaying the same attemptID should not increment the count") + + // Different attempt should still increment + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_2") + require.NoError(t, err) + assert.Equal(t, 2, count) + + // Replay the second attempt — count should not change + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_2") + require.NoError(t, err) + assert.Equal(t, 2, count, "replaying the same attemptID should not increment the count") + }) } func TestRedisAlertStore_WithDeploymentID(t *testing.T) { @@ -57,12 +83,12 @@ func TestRedisAlertStore_WithDeploymentID(t *testing.T) { store := alert.NewRedisAlertStore(redisClient, "dp_test_001") // Test increment with deployment ID - count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_1") require.NoError(t, err) assert.Equal(t, 1, count) // Second increment - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_2") require.NoError(t, err) assert.Equal(t, 2, count) @@ -71,7 +97,7 @@ func TestRedisAlertStore_WithDeploymentID(t *testing.T) { require.NoError(t, err) // Verify counter is reset - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_3") require.NoError(t, err) assert.Equal(t, 1, count) } @@ -90,21 +116,21 @@ func TestAlertStoreIsolation(t *testing.T) { destinationID := "dest_shared" // Increment in store1 - count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_1") require.NoError(t, err) assert.Equal(t, 1, count1) - count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_2") require.NoError(t, err) assert.Equal(t, 2, count1) // Increment in store2 - should start at 1 (isolated from store1) - count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_1") require.NoError(t, err) assert.Equal(t, 1, count2, "Store 2 should have its own counter") // Increment store1 again - should continue from 2 - count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_3") require.NoError(t, err) assert.Equal(t, 3, count1, "Store 1 counter should be unaffected by store 2") @@ -113,12 +139,12 @@ func TestAlertStoreIsolation(t *testing.T) { require.NoError(t, err) // Verify store1 is reset - count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_4") require.NoError(t, err) assert.Equal(t, 1, count1, "Store 1 should be reset") // Verify store2 is unaffected - count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_2") require.NoError(t, err) assert.Equal(t, 2, count2, "Store 2 should be unaffected by store 1 reset") } diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 622e5c1d..f0ff78e0 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -293,7 +293,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return &PostDeliveryError{err: logErr} } - go h.handleAlertAttempt(ctx, task, destination, attempt, err) + go h.handleAlertAttempt(ctx, destination, &task.Event, attempt) // If we have an AttemptError, return it as is var atmErr *AttemptError @@ -315,47 +315,18 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return nil } -func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attemptResult *models.Attempt, err error) { - alertAttempt := alert.DeliveryAttempt{ - Success: attemptResult.Status == models.AttemptStatusSuccess, - DeliveryTask: task, - Destination: &alert.AlertDestination{ - ID: destination.ID, - TenantID: destination.TenantID, - Type: destination.Type, - Topics: destination.Topics, - Config: destination.Config, - CreatedAt: destination.CreatedAt, - DisabledAt: destination.DisabledAt, - }, - Timestamp: attemptResult.Time, +func (h *messageHandler) handleAlertAttempt(ctx context.Context, destination *models.Destination, event *models.Event, attempt *models.Attempt) { + da := alert.DeliveryAttempt{ + Event: event, + Destination: alert.AlertDestinationFromDestination(destination), + Attempt: attempt, } - if !alertAttempt.Success && err != nil { - // Extract attempt data if available - var atmErr *AttemptError - if errors.As(err, &atmErr) { - var pubErr *destregistry.ErrDestinationPublishAttempt - if errors.As(atmErr.err, &pubErr) { - alertAttempt.DeliveryResponse = pubErr.Data - } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ - "error": atmErr.err.Error(), - } - } - } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ - "error": "unexpected", - "message": err.Error(), - } - } - } - - if monitorErr := h.alertMonitor.HandleAttempt(ctx, alertAttempt); monitorErr != nil { + if monitorErr := h.alertMonitor.HandleAttempt(ctx, da); monitorErr != nil { h.logger.Ctx(ctx).Error("failed to handle alert attempt", zap.Error(monitorErr), - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) @@ -363,8 +334,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De } h.logger.Ctx(ctx).Info("alert attempt handled", - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 8a038391..51244643 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -258,7 +258,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { "should use GetRetryID for task ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_PublishError_NotEligible(t *testing.T) { @@ -326,7 +326,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { assert.Equal(t, 1, publisher.current, "should only attempt once") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_RetryFlow(t *testing.T) { @@ -845,7 +845,7 @@ func TestManualDelivery_PublishError(t *testing.T) { assert.Equal(t, 1, publisher.current, "should attempt publish once") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) // Assert retry state: the old pending retry (tier 0, 1s) was atomically // replaced with the next tier. Attempt 2 uses backoff index 1 → 2s. @@ -990,7 +990,7 @@ func TestManualDelivery_CancelError(t *testing.T) { assert.Equal(t, models.RetryID(task.Event.ID, task.DestinationID), retryScheduler.canceled[0], "should cancel with correct retry ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusSuccess, logPublisher.entries[0].Attempt.Status, "delivery status should be OK despite cancel error") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestManualDelivery_DestinationDisabled(t *testing.T) { @@ -1080,10 +1080,10 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Setup alert monitor expectations alertMonitor.On("HandleAttempt", mock.Anything, mock.MatchedBy(func(attempt alert.DeliveryAttempt) bool { - return attempt.Success && // Should be a successful attempt - attempt.Destination.ID == destination.ID && // Should have correct destination - attempt.DeliveryTask != nil && // Should have delivery task - attempt.DeliveryResponse == nil // No error data for success + return attempt.Attempt.Status == models.AttemptStatusSuccess && + attempt.Destination.ID == destination.ID && + attempt.Event != nil && + attempt.Attempt != nil })).Return(nil) // Setup message handler @@ -1114,7 +1114,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Assert behavior assert.True(t, mockMsg.acked, "message should be acked on success") assert.False(t, mockMsg.nacked, "message should not be nacked on success") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestMessageHandler_AlertMonitorError(t *testing.T) { @@ -1188,7 +1188,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { } // Helper function to assert alert monitor calls -func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination, expectedData map[string]interface{}) { +func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination) { t.Helper() // Wait for the alert monitor to be called @@ -1201,15 +1201,10 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina lastCall := calls[len(calls)-1] attempt := lastCall.Arguments[1].(alert.DeliveryAttempt) - assert.Equal(t, success, attempt.Success, "alert attempt success should match") + assert.Equal(t, success, attempt.Attempt.Status == models.AttemptStatusSuccess, "alert attempt success should match") assert.Equal(t, destination.ID, attempt.Destination.ID, "alert attempt destination should match") - assert.NotNil(t, attempt.DeliveryTask, "alert attempt should have delivery task") - - if expectedData != nil { - assert.Equal(t, expectedData, attempt.DeliveryResponse, "alert attempt data should match") - } else { - assert.Nil(t, attempt.DeliveryResponse, "alert attempt should not have data") - } + assert.NotNil(t, attempt.Event, "alert attempt should have event") + assert.NotNil(t, attempt.Attempt, "alert attempt should have attempt data") } func TestManualDelivery_DuplicateRetry(t *testing.T) {