From a5b8322949992be1718d0dd2d1b4db086cbd39a8 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 3 Apr 2026 19:18:26 +0700 Subject: [PATCH 1/2] refactor: make AlertStore idempotent using Redis Sets Replace INCR/GET with SADD/SCARD on a Redis Set keyed by attemptID. Replayed messages no longer double-count consecutive failures. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/alert/store.go | 22 +++++++-------- internal/alert/store_test.go | 54 ++++++++++++++++++++++++++---------- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/internal/alert/store.go b/internal/alert/store.go index 3bc3a0a4d..fd4ebb42f 100644 --- a/internal/alert/store.go +++ b/internal/alert/store.go @@ -9,13 +9,13 @@ import ( ) const ( - keyPrefixAlert = "alert" // Base prefix for all alert keys - keyFailures = "consecutive_failures" // Counter for consecutive failures) + keyPrefixAlert = "alert" // Base prefix for all alert keys + keyFailures = "cf" // Set for consecutive failure attempt IDs ) // AlertStore manages alert-related data persistence type AlertStore interface { - IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) + IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (int, error) ResetConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) error } @@ -32,14 +32,15 @@ func NewRedisAlertStore(client redis.Cmdable, deploymentID string) AlertStore { } } -func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) { +func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID, attemptID string) (int, error) { key := s.getFailuresKey(destinationID) - // Use a transaction to ensure atomicity between INCR and EXPIRE operations. - // Since all operations use the same key, they will be routed to the same hash slot - // in Redis cluster mode, making transactions safe to use. + // Use a transaction to ensure atomicity between SADD, SCARD, and EXPIRE operations. + // SADD is idempotent — adding the same attemptID on replay is a no-op, + // preventing double-counting when messages are redelivered. pipe := s.client.TxPipeline() - incrCmd := pipe.Incr(ctx, key) + pipe.SAdd(ctx, key, attemptID) + scardCmd := pipe.SCard(ctx, key) pipe.Expire(ctx, key, 24*time.Hour) _, err := pipe.Exec(ctx) @@ -47,10 +48,9 @@ func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, return 0, fmt.Errorf("failed to execute consecutive failure count transaction: %w", err) } - // Get the incremented count from the INCR command result - count, err := incrCmd.Result() + count, err := scardCmd.Result() if err != nil { - return 0, fmt.Errorf("failed to get incremented consecutive failure count: %w", err) + return 0, fmt.Errorf("failed to get consecutive failure count: %w", err) } return int(count), nil diff --git a/internal/alert/store_test.go b/internal/alert/store_test.go index f70a01892..0fdfe5a46 100644 --- a/internal/alert/store_test.go +++ b/internal/alert/store_test.go @@ -19,12 +19,12 @@ func TestRedisAlertStore(t *testing.T) { store := alert.NewRedisAlertStore(redisClient, "") // First increment - count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_1") require.NoError(t, err) assert.Equal(t, 1, count) - // Second increment - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + // Second increment (different attempt) + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_2") require.NoError(t, err) assert.Equal(t, 2, count) }) @@ -35,7 +35,7 @@ func TestRedisAlertStore(t *testing.T) { store := alert.NewRedisAlertStore(redisClient, "") // Set up initial failures - count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2") + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2", "att_1") require.NoError(t, err) assert.Equal(t, 1, count) @@ -44,10 +44,36 @@ func TestRedisAlertStore(t *testing.T) { require.NoError(t, err) // Verify counter is reset by incrementing again - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2") + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2", "att_2") require.NoError(t, err) assert.Equal(t, 1, count) }) + + t.Run("idempotent on replay", func(t *testing.T) { + t.Parallel() + redisClient := testutil.CreateTestRedisClient(t) + store := alert.NewRedisAlertStore(redisClient, "") + + // First call + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_1") + require.NoError(t, err) + assert.Equal(t, 1, count) + + // Replay same attempt — count should not change + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_1") + require.NoError(t, err) + assert.Equal(t, 1, count, "replaying the same attemptID should not increment the count") + + // Different attempt should still increment + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_2") + require.NoError(t, err) + assert.Equal(t, 2, count) + + // Replay the second attempt — count should not change + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_3", "dest_3", "att_2") + require.NoError(t, err) + assert.Equal(t, 2, count, "replaying the same attemptID should not increment the count") + }) } func TestRedisAlertStore_WithDeploymentID(t *testing.T) { @@ -57,12 +83,12 @@ func TestRedisAlertStore_WithDeploymentID(t *testing.T) { store := alert.NewRedisAlertStore(redisClient, "dp_test_001") // Test increment with deployment ID - count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_1") require.NoError(t, err) assert.Equal(t, 1, count) // Second increment - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_2") require.NoError(t, err) assert.Equal(t, 2, count) @@ -71,7 +97,7 @@ func TestRedisAlertStore_WithDeploymentID(t *testing.T) { require.NoError(t, err) // Verify counter is reset - count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1") + count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1", "att_3") require.NoError(t, err) assert.Equal(t, 1, count) } @@ -90,21 +116,21 @@ func TestAlertStoreIsolation(t *testing.T) { destinationID := "dest_shared" // Increment in store1 - count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_1") require.NoError(t, err) assert.Equal(t, 1, count1) - count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_2") require.NoError(t, err) assert.Equal(t, 2, count1) // Increment in store2 - should start at 1 (isolated from store1) - count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_1") require.NoError(t, err) assert.Equal(t, 1, count2, "Store 2 should have its own counter") // Increment store1 again - should continue from 2 - count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_3") require.NoError(t, err) assert.Equal(t, 3, count1, "Store 1 counter should be unaffected by store 2") @@ -113,12 +139,12 @@ func TestAlertStoreIsolation(t *testing.T) { require.NoError(t, err) // Verify store1 is reset - count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_4") require.NoError(t, err) assert.Equal(t, 1, count1, "Store 1 should be reset") // Verify store2 is unaffected - count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID) + count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID, "att_2") require.NoError(t, err) assert.Equal(t, 2, count2, "Store 2 should be unaffected by store 1 reset") } From e5e3b514fc8f437b458c337d9b82e105be089a9e Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 3 Apr 2026 19:18:35 +0700 Subject: [PATCH 2/2] refactor: simplify DeliveryAttempt to use models.Attempt Replace DeliveryAttempt fields (Success, DeliveryTask, DeliveryResponse, Timestamp) with models.Attempt and models.Event. Success is derived from Attempt.Status, delivery response from Attempt.ResponseData. Simplifies messagehandler by removing manual error extraction. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/alert/monitor.go | 34 ++-- internal/alert/monitor_test.go | 141 ++++++++------- internal/alert/notifier.go | 12 ++ internal/alert/notifier_test.go | 201 ++++++++++----------- internal/deliverymq/messagehandler.go | 51 ++---- internal/deliverymq/messagehandler_test.go | 31 ++-- 6 files changed, 231 insertions(+), 239 deletions(-) diff --git a/internal/alert/monitor.go b/internal/alert/monitor.go index 39ff54f40..90006a80c 100644 --- a/internal/alert/monitor.go +++ b/internal/alert/monitor.go @@ -3,7 +3,6 @@ package alert import ( "context" "fmt" - "time" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" @@ -82,11 +81,9 @@ func WithDeploymentID(deploymentID string) AlertOption { // DeliveryAttempt represents a single delivery attempt type DeliveryAttempt struct { - Success bool - DeliveryTask *models.DeliveryTask - Destination *AlertDestination - Timestamp time.Time - DeliveryResponse map[string]interface{} + Event *models.Event + Destination *AlertDestination + Attempt *models.Attempt } type alertMonitor struct { @@ -138,12 +135,12 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ... } func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttempt) error { - if attempt.Success { + if attempt.Attempt.Status == models.AttemptStatusSuccess { return m.store.ResetConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID) } - // Get alert state - count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID) + // Get alert state — attemptID ensures idempotent counting on message replay + count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID, attempt.Attempt.ID) if err != nil { return fmt.Errorf("failed to get alert state: %w", err) } @@ -155,16 +152,16 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{ Event: AlertedEvent{ - ID: attempt.DeliveryTask.Event.ID, - Topic: attempt.DeliveryTask.Event.Topic, - Metadata: attempt.DeliveryTask.Event.Metadata, - Data: attempt.DeliveryTask.Event.Data, + ID: attempt.Event.ID, + Topic: attempt.Event.Topic, + Metadata: attempt.Event.Metadata, + Data: attempt.Event.Data, }, MaxConsecutiveFailures: m.autoDisableFailureCount, ConsecutiveFailures: count, WillDisable: m.disabler != nil && level == 100, Destination: attempt.Destination, - DeliveryResponse: attempt.DeliveryResponse, + DeliveryResponse: attempt.Attempt.ResponseData, }) // If we've hit 100% and have a disabler configured, disable the destination @@ -174,7 +171,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } m.logger.Ctx(ctx).Audit("destination disabled", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), @@ -186,7 +184,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp if err := m.notifier.Notify(ctx, alert); err != nil { m.logger.Ctx(ctx).Error("failed to send alert", zap.Error(err), - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), @@ -195,7 +194,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp } m.logger.Ctx(ctx).Audit("alert sent", - zap.String("event_id", attempt.DeliveryTask.Event.ID), + zap.String("attempt_id", attempt.Attempt.ID), + zap.String("event_id", attempt.Event.ID), zap.String("tenant_id", attempt.Destination.TenantID), zap.String("destination_id", attempt.Destination.ID), zap.String("destination_type", attempt.Destination.Type), diff --git a/internal/alert/monitor_test.go b/internal/alert/monitor_test.go index 16372475d..8f5ad0042 100644 --- a/internal/alert/monitor_test.go +++ b/internal/alert/monitor_test.go @@ -2,6 +2,7 @@ package alert_test import ( "context" + "fmt" "testing" "time" @@ -28,8 +29,8 @@ type mockDestinationDisabler struct { } func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error { - m.Called(ctx, tenantID, destinationID) - return nil + args := m.Called(ctx, tenantID, destinationID) + return args.Error(0) } func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { @@ -53,39 +54,38 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} - attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, - }, - Timestamp: time.Now(), - } - // Send 20 consecutive failures + // Send 20 consecutive failures (each with a unique attempt ID) for i := 1; i <= 20; i++ { + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications were sent at correct thresholds - var notifyCallCount int + // Verify consecutive failure notifications were sent at correct thresholds + var consecutiveFailureCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - failures := alert.Data.ConsecutiveFailures - require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") - require.Equal(t, dest, alert.Data.Destination) - require.Equal(t, "alert.consecutive_failure", alert.Topic) - require.Equal(t, attempt.DeliveryResponse, alert.Data.DeliveryResponse) - require.Equal(t, 20, alert.Data.MaxConsecutiveFailures) - require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + require.Contains(t, []int{10, 14, 18, 20}, cfAlert.Data.ConsecutiveFailures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds") + require.Equal(t, dest.ID, cfAlert.Data.Destination.ID) + require.Equal(t, dest.TenantID, cfAlert.Data.Destination.TenantID) + require.Equal(t, "alert.consecutive_failure", cfAlert.Topic) + require.Equal(t, 20, cfAlert.Data.MaxConsecutiveFailures) + } } } - require.Equal(t, 4, notifyCallCount, "Should have sent exactly 4 notifications") + require.Equal(t, 4, consecutiveFailureCount, "Should have sent exactly 4 consecutive failure notifications") // Verify destination was disabled exactly once at 100% var disableCallCount int @@ -120,20 +120,20 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} - failedAttempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - "data": map[string]any{"error": "test error"}, - }, - Timestamp: time.Now(), - } // Send 14 failures (should trigger 50% and 66% alerts) for i := 1; i <= 14; i++ { + failedAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, failedAttempt)) } @@ -141,15 +141,29 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { require.Equal(t, 2, len(notifier.Calls)) // Send a success to reset the counter - successAttempt := failedAttempt - successAttempt.Success = true + successAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{Status: models.AttemptStatusSuccess}, + } require.NoError(t, monitor.HandleAttempt(ctx, successAttempt)) // Clear the mock calls to start fresh notifier.Calls = nil - // Send 14 more failures - for i := 1; i <= 14; i++ { + // Send 14 more failures (new attempt IDs) + for i := 15; i <= 28; i++ { + failedAttempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, failedAttempt)) } @@ -159,8 +173,9 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) { // Verify the notifications were at the right thresholds var seenCounts []int for _, call := range notifier.Calls { - alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - seenCounts = append(seenCounts, alert.Data.ConsecutiveFailures) + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures) + } } assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)") assert.Contains(t, seenCounts, 14, "Should have alerted at 66% (14 failures)") @@ -193,39 +208,41 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) { dest := &alert.AlertDestination{ID: "dest_above", TenantID: "tenant_above"} event := &models.Event{Topic: "test.event"} - task := &models.DeliveryTask{Event: *event} - attempt := alert.DeliveryAttempt{ - Success: false, - DeliveryTask: task, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "500", - }, - Timestamp: time.Now(), - } // Send 25 consecutive failures (5 more than the threshold) for i := 1; i <= 25; i++ { + attempt := alert.DeliveryAttempt{ + Event: event, + Destination: dest, + Attempt: &models.Attempt{ + ID: fmt.Sprintf("att_%d", i), + Status: "failed", + Code: "500", + ResponseData: map[string]interface{}{"error": "test error"}, + Time: time.Now(), + }, + } require.NoError(t, monitor.HandleAttempt(ctx, attempt)) } - // Verify notifications at 50%, 70%, 90%, and 100% thresholds + // Verify consecutive failure notifications at 50%, 70%, 90%, and 100% thresholds // Plus additional notifications for failures 21-25 (all at 100% level) - var notifyCallCount int + var consecutiveFailureCount int var disableNotifyCount int for _, call := range notifier.Calls { if call.Method == "Notify" { - notifyCallCount++ - alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert) - if alertData.Data.ConsecutiveFailures >= 20 { - disableNotifyCount++ - require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%") + if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok { + consecutiveFailureCount++ + if cfAlert.Data.ConsecutiveFailures >= 20 { + disableNotifyCount++ + require.True(t, cfAlert.Data.WillDisable, "WillDisable should be true at and above max") + } } } } // 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25 - require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)") - require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)") + require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)") + require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications at threshold 100 (20-25)") // Verify destination was disabled multiple times (once per failure >= 20) var disableCallCount int diff --git a/internal/alert/notifier.go b/internal/alert/notifier.go index beedcb958..25eaaf082 100644 --- a/internal/alert/notifier.go +++ b/internal/alert/notifier.go @@ -59,6 +59,18 @@ type AlertDestination struct { DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"` } +func AlertDestinationFromDestination(d *models.Destination) *AlertDestination { + return &AlertDestination{ + ID: d.ID, + TenantID: d.TenantID, + Type: d.Type, + Topics: d.Topics, + Config: d.Config, + CreatedAt: d.CreatedAt, + DisabledAt: d.DisabledAt, + } +} + // ConsecutiveFailureData represents the data needed for a consecutive failure alert type ConsecutiveFailureData struct { Event AlertedEvent `json:"event"` diff --git a/internal/alert/notifier_test.go b/internal/alert/notifier_test.go index 31661e521..716c66641 100644 --- a/internal/alert/notifier_test.go +++ b/internal/alert/notifier_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync/atomic" "testing" "time" @@ -16,107 +17,103 @@ import ( func TestAlertNotifier_Notify(t *testing.T) { t.Parallel() - tests := []struct { - name string - handler func(w http.ResponseWriter, r *http.Request) - notifierOpts []alert.NotifierOption - wantErr bool - errContains string - }{ - { - name: "successful notification", - handler: func(w http.ResponseWriter, r *http.Request) { - // Verify request - assert.Equal(t, "application/json", r.Header.Get("Content-Type")) - - // Read and verify request body - var body map[string]interface{} - err := json.NewDecoder(r.Body).Decode(&body) - require.NoError(t, err) - - assert.Equal(t, "alert.consecutive_failure", body["topic"]) - data := body["data"].(map[string]interface{}) - assert.Equal(t, float64(10), data["max_consecutive_failures"]) - assert.Equal(t, float64(5), data["consecutive_failures"]) - assert.Equal(t, true, data["will_disable"]) - - // Log raw JSON for debugging - rawJSON, _ := json.Marshal(body) - t.Logf("Raw JSON: %s", string(rawJSON)) - - w.WriteHeader(http.StatusOK) - }, - }, - { - name: "server error", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - }, - wantErr: true, - errContains: "alert callback failed with status 500", - }, - { - name: "invalid response status", - handler: func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - }, - wantErr: true, - errContains: "alert callback failed with status 400", - }, - { - name: "timeout exceeded", - handler: func(w http.ResponseWriter, r *http.Request) { - time.Sleep(100 * time.Millisecond) - w.WriteHeader(http.StatusOK) - }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithTimeout(50 * time.Millisecond)}, - wantErr: true, - errContains: "context deadline exceeded", - }, - { - name: "successful notification with bearer token", - handler: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) - w.WriteHeader(http.StatusOK) - }, - notifierOpts: []alert.NotifierOption{alert.NotifierWithBearerToken("test-token")}, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - // Create test server - ts := httptest.NewServer(http.HandlerFunc(tt.handler)) - defer ts.Close() - - // Create notifier - notifier := alert.NewHTTPAlertNotifier(ts.URL, tt.notifierOpts...) - - // Create test alert - dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} - testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ - MaxConsecutiveFailures: 10, - ConsecutiveFailures: 5, - WillDisable: true, - Destination: dest, - DeliveryResponse: map[string]interface{}{ - "status": "error", - "data": map[string]any{"code": "ETIMEDOUT"}, - }, - }) - - // Send alert - err := notifier.Notify(context.Background(), testAlert) - - if tt.wantErr { - require.Error(t, err) - assert.ErrorContains(t, err, tt.errContains) - } else { - require.NoError(t, err) - } + t.Run("successful notification", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + var body map[string]any + err := json.NewDecoder(r.Body).Decode(&body) + require.NoError(t, err) + + assert.Equal(t, "alert.consecutive_failure", body["topic"]) + data := body["data"].(map[string]any) + assert.Equal(t, float64(5), data["consecutive_failures"]) + assert.Equal(t, float64(10), data["max_consecutive_failures"]) + assert.Equal(t, true, data["will_disable"]) + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + ConsecutiveFailures: 5, + MaxConsecutiveFailures: 10, + WillDisable: true, + Destination: dest, }) - } + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("successful notification with bearer token", func(t *testing.T) { + t.Parallel() + var called atomic.Bool + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called.Store(true) + assert.Equal(t, "Bearer test-token", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithBearerToken("test-token")) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + ConsecutiveFailures: 5, + MaxConsecutiveFailures: 10, + WillDisable: true, + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.NoError(t, err) + assert.True(t, called.Load(), "handler should have been called") + }) + + t.Run("server error returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + }) + + t.Run("timeout returns error", func(t *testing.T) { + t.Parallel() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + notifier := alert.NewHTTPAlertNotifier(ts.URL, alert.NotifierWithTimeout(50*time.Millisecond)) + dest := &alert.AlertDestination{ID: "dest_123", TenantID: "tenant_123"} + testAlert := alert.NewConsecutiveFailureAlert(alert.ConsecutiveFailureData{ + Destination: dest, + }) + + err := notifier.Notify(context.Background(), testAlert) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to send alert") + }) } diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 622e5c1db..f0ff78e00 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -293,7 +293,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return &PostDeliveryError{err: logErr} } - go h.handleAlertAttempt(ctx, task, destination, attempt, err) + go h.handleAlertAttempt(ctx, destination, &task.Event, attempt) // If we have an AttemptError, return it as is var atmErr *AttemptError @@ -315,47 +315,18 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, task *models.Del return nil } -func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.DeliveryTask, destination *models.Destination, attemptResult *models.Attempt, err error) { - alertAttempt := alert.DeliveryAttempt{ - Success: attemptResult.Status == models.AttemptStatusSuccess, - DeliveryTask: task, - Destination: &alert.AlertDestination{ - ID: destination.ID, - TenantID: destination.TenantID, - Type: destination.Type, - Topics: destination.Topics, - Config: destination.Config, - CreatedAt: destination.CreatedAt, - DisabledAt: destination.DisabledAt, - }, - Timestamp: attemptResult.Time, +func (h *messageHandler) handleAlertAttempt(ctx context.Context, destination *models.Destination, event *models.Event, attempt *models.Attempt) { + da := alert.DeliveryAttempt{ + Event: event, + Destination: alert.AlertDestinationFromDestination(destination), + Attempt: attempt, } - if !alertAttempt.Success && err != nil { - // Extract attempt data if available - var atmErr *AttemptError - if errors.As(err, &atmErr) { - var pubErr *destregistry.ErrDestinationPublishAttempt - if errors.As(atmErr.err, &pubErr) { - alertAttempt.DeliveryResponse = pubErr.Data - } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ - "error": atmErr.err.Error(), - } - } - } else { - alertAttempt.DeliveryResponse = map[string]interface{}{ - "error": "unexpected", - "message": err.Error(), - } - } - } - - if monitorErr := h.alertMonitor.HandleAttempt(ctx, alertAttempt); monitorErr != nil { + if monitorErr := h.alertMonitor.HandleAttempt(ctx, da); monitorErr != nil { h.logger.Ctx(ctx).Error("failed to handle alert attempt", zap.Error(monitorErr), - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) @@ -363,8 +334,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, task *models.De } h.logger.Ctx(ctx).Info("alert attempt handled", - zap.String("attempt_id", attemptResult.ID), - zap.String("event_id", task.Event.ID), + zap.String("attempt_id", attempt.ID), + zap.String("event_id", event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), zap.String("destination_type", destination.Type)) diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 8a0383919..51244643e 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -258,7 +258,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { "should use GetRetryID for task ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_PublishError_NotEligible(t *testing.T) { @@ -326,7 +326,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { assert.Equal(t, 1, publisher.current, "should only attempt once") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) } func TestMessageHandler_RetryFlow(t *testing.T) { @@ -845,7 +845,7 @@ func TestManualDelivery_PublishError(t *testing.T) { assert.Equal(t, 1, publisher.current, "should attempt publish once") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusFailed, logPublisher.entries[0].Attempt.Status, "delivery status should be Failed") - assertAlertMonitor(t, alertMonitor, false, &destination, publishErr.Data) + assertAlertMonitor(t, alertMonitor, false, &destination) // Assert retry state: the old pending retry (tier 0, 1s) was atomically // replaced with the next tier. Attempt 2 uses backoff index 1 → 2s. @@ -990,7 +990,7 @@ func TestManualDelivery_CancelError(t *testing.T) { assert.Equal(t, models.RetryID(task.Event.ID, task.DestinationID), retryScheduler.canceled[0], "should cancel with correct retry ID") require.Len(t, logPublisher.entries, 1, "should have one delivery") assert.Equal(t, models.AttemptStatusSuccess, logPublisher.entries[0].Attempt.Status, "delivery status should be OK despite cancel error") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestManualDelivery_DestinationDisabled(t *testing.T) { @@ -1080,10 +1080,10 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Setup alert monitor expectations alertMonitor.On("HandleAttempt", mock.Anything, mock.MatchedBy(func(attempt alert.DeliveryAttempt) bool { - return attempt.Success && // Should be a successful attempt - attempt.Destination.ID == destination.ID && // Should have correct destination - attempt.DeliveryTask != nil && // Should have delivery task - attempt.DeliveryResponse == nil // No error data for success + return attempt.Attempt.Status == models.AttemptStatusSuccess && + attempt.Destination.ID == destination.ID && + attempt.Event != nil && + attempt.Attempt != nil })).Return(nil) // Setup message handler @@ -1114,7 +1114,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Assert behavior assert.True(t, mockMsg.acked, "message should be acked on success") assert.False(t, mockMsg.nacked, "message should not be nacked on success") - assertAlertMonitor(t, alertMonitor, true, &destination, nil) + assertAlertMonitor(t, alertMonitor, true, &destination) } func TestMessageHandler_AlertMonitorError(t *testing.T) { @@ -1188,7 +1188,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { } // Helper function to assert alert monitor calls -func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination, expectedData map[string]interface{}) { +func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destination *models.Destination) { t.Helper() // Wait for the alert monitor to be called @@ -1201,15 +1201,10 @@ func assertAlertMonitor(t *testing.T, m *mockAlertMonitor, success bool, destina lastCall := calls[len(calls)-1] attempt := lastCall.Arguments[1].(alert.DeliveryAttempt) - assert.Equal(t, success, attempt.Success, "alert attempt success should match") + assert.Equal(t, success, attempt.Attempt.Status == models.AttemptStatusSuccess, "alert attempt success should match") assert.Equal(t, destination.ID, attempt.Destination.ID, "alert attempt destination should match") - assert.NotNil(t, attempt.DeliveryTask, "alert attempt should have delivery task") - - if expectedData != nil { - assert.Equal(t, expectedData, attempt.DeliveryResponse, "alert attempt data should match") - } else { - assert.Nil(t, attempt.DeliveryResponse, "alert attempt should not have data") - } + assert.NotNil(t, attempt.Event, "alert attempt should have event") + assert.NotNil(t, attempt.Attempt, "alert attempt should have attempt data") } func TestManualDelivery_DuplicateRetry(t *testing.T) {