Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions internal/alert/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package alert
import (
"context"
"fmt"
"time"

"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/models"
Expand Down Expand Up @@ -82,11 +81,9 @@ func WithDeploymentID(deploymentID string) AlertOption {

// DeliveryAttempt represents a single delivery attempt
type DeliveryAttempt struct {
Success bool
DeliveryTask *models.DeliveryTask
Destination *AlertDestination
Timestamp time.Time
DeliveryResponse map[string]interface{}
Event *models.Event
Destination *AlertDestination
Attempt *models.Attempt
}

type alertMonitor struct {
Expand Down Expand Up @@ -138,12 +135,12 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ...
}

func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttempt) error {
if attempt.Success {
if attempt.Attempt.Status == models.AttemptStatusSuccess {
return m.store.ResetConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID)
}

// Get alert state
count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID)
// Get alert state — attemptID ensures idempotent counting on message replay
count, err := m.store.IncrementConsecutiveFailureCount(ctx, attempt.Destination.TenantID, attempt.Destination.ID, attempt.Attempt.ID)
if err != nil {
return fmt.Errorf("failed to get alert state: %w", err)
}
Expand All @@ -155,16 +152,16 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp

alert := NewConsecutiveFailureAlert(ConsecutiveFailureData{
Event: AlertedEvent{
ID: attempt.DeliveryTask.Event.ID,
Topic: attempt.DeliveryTask.Event.Topic,
Metadata: attempt.DeliveryTask.Event.Metadata,
Data: attempt.DeliveryTask.Event.Data,
ID: attempt.Event.ID,
Topic: attempt.Event.Topic,
Metadata: attempt.Event.Metadata,
Data: attempt.Event.Data,
},
MaxConsecutiveFailures: m.autoDisableFailureCount,
ConsecutiveFailures: count,
WillDisable: m.disabler != nil && level == 100,
Destination: attempt.Destination,
DeliveryResponse: attempt.DeliveryResponse,
DeliveryResponse: attempt.Attempt.ResponseData,
})

// If we've hit 100% and have a disabler configured, disable the destination
Expand All @@ -174,7 +171,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp
}

m.logger.Ctx(ctx).Audit("destination disabled",
zap.String("event_id", attempt.DeliveryTask.Event.ID),
zap.String("attempt_id", attempt.Attempt.ID),
zap.String("event_id", attempt.Event.ID),
zap.String("tenant_id", attempt.Destination.TenantID),
zap.String("destination_id", attempt.Destination.ID),
zap.String("destination_type", attempt.Destination.Type),
Expand All @@ -186,7 +184,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp
if err := m.notifier.Notify(ctx, alert); err != nil {
m.logger.Ctx(ctx).Error("failed to send alert",
zap.Error(err),
zap.String("event_id", attempt.DeliveryTask.Event.ID),
zap.String("attempt_id", attempt.Attempt.ID),
zap.String("event_id", attempt.Event.ID),
zap.String("tenant_id", attempt.Destination.TenantID),
zap.String("destination_id", attempt.Destination.ID),
zap.String("destination_type", attempt.Destination.Type),
Expand All @@ -195,7 +194,8 @@ func (m *alertMonitor) HandleAttempt(ctx context.Context, attempt DeliveryAttemp
}

m.logger.Ctx(ctx).Audit("alert sent",
zap.String("event_id", attempt.DeliveryTask.Event.ID),
zap.String("attempt_id", attempt.Attempt.ID),
zap.String("event_id", attempt.Event.ID),
zap.String("tenant_id", attempt.Destination.TenantID),
zap.String("destination_id", attempt.Destination.ID),
zap.String("destination_type", attempt.Destination.Type),
Expand Down
141 changes: 79 additions & 62 deletions internal/alert/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package alert_test

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -28,8 +29,8 @@ type mockDestinationDisabler struct {
}

func (m *mockDestinationDisabler) DisableDestination(ctx context.Context, tenantID, destinationID string) error {
m.Called(ctx, tenantID, destinationID)
return nil
args := m.Called(ctx, tenantID, destinationID)
return args.Error(0)
}

func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) {
Expand All @@ -53,39 +54,38 @@ func TestAlertMonitor_ConsecutiveFailures_MaxFailures(t *testing.T) {

dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"}
event := &models.Event{Topic: "test.event"}
task := &models.DeliveryTask{Event: *event}
attempt := alert.DeliveryAttempt{
Success: false,
DeliveryTask: task,
Destination: dest,
DeliveryResponse: map[string]interface{}{
"status": "500",
"data": map[string]any{"error": "test error"},
},
Timestamp: time.Now(),
}

// Send 20 consecutive failures
// Send 20 consecutive failures (each with a unique attempt ID)
for i := 1; i <= 20; i++ {
attempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{
ID: fmt.Sprintf("att_%d", i),
Status: "failed",
Code: "500",
ResponseData: map[string]interface{}{"error": "test error"},
Time: time.Now(),
},
}
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
}

// Verify notifications were sent at correct thresholds
var notifyCallCount int
// Verify consecutive failure notifications were sent at correct thresholds
var consecutiveFailureCount int
for _, call := range notifier.Calls {
if call.Method == "Notify" {
notifyCallCount++
alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert)
failures := alert.Data.ConsecutiveFailures
require.Contains(t, []int{10, 14, 18, 20}, failures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds")
require.Equal(t, dest, alert.Data.Destination)
require.Equal(t, "alert.consecutive_failure", alert.Topic)
require.Equal(t, attempt.DeliveryResponse, alert.Data.DeliveryResponse)
require.Equal(t, 20, alert.Data.MaxConsecutiveFailures)
require.Equal(t, failures == 20, alert.Data.WillDisable, "WillDisable should only be true at 100% (20 failures)")
if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok {
consecutiveFailureCount++
require.Contains(t, []int{10, 14, 18, 20}, cfAlert.Data.ConsecutiveFailures, "Alert should be sent at 50%, 66%, 90%, and 100% thresholds")
require.Equal(t, dest.ID, cfAlert.Data.Destination.ID)
require.Equal(t, dest.TenantID, cfAlert.Data.Destination.TenantID)
require.Equal(t, "alert.consecutive_failure", cfAlert.Topic)
require.Equal(t, 20, cfAlert.Data.MaxConsecutiveFailures)
}
}
}
require.Equal(t, 4, notifyCallCount, "Should have sent exactly 4 notifications")
require.Equal(t, 4, consecutiveFailureCount, "Should have sent exactly 4 consecutive failure notifications")

// Verify destination was disabled exactly once at 100%
var disableCallCount int
Expand Down Expand Up @@ -120,36 +120,50 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) {

dest := &alert.AlertDestination{ID: "dest_1", TenantID: "tenant_1"}
event := &models.Event{Topic: "test.event"}
task := &models.DeliveryTask{Event: *event}
failedAttempt := alert.DeliveryAttempt{
Success: false,
DeliveryTask: task,
Destination: dest,
DeliveryResponse: map[string]interface{}{
"status": "500",
"data": map[string]any{"error": "test error"},
},
Timestamp: time.Now(),
}

// Send 14 failures (should trigger 50% and 66% alerts)
for i := 1; i <= 14; i++ {
failedAttempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{
ID: fmt.Sprintf("att_%d", i),
Status: "failed",
Code: "500",
ResponseData: map[string]interface{}{"error": "test error"},
Time: time.Now(),
},
}
require.NoError(t, monitor.HandleAttempt(ctx, failedAttempt))
}

// Verify we got exactly 2 notifications (50% and 66%)
require.Equal(t, 2, len(notifier.Calls))

// Send a success to reset the counter
successAttempt := failedAttempt
successAttempt.Success = true
successAttempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{Status: models.AttemptStatusSuccess},
}
require.NoError(t, monitor.HandleAttempt(ctx, successAttempt))

// Clear the mock calls to start fresh
notifier.Calls = nil

// Send 14 more failures
for i := 1; i <= 14; i++ {
// Send 14 more failures (new attempt IDs)
for i := 15; i <= 28; i++ {
failedAttempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{
ID: fmt.Sprintf("att_%d", i),
Status: "failed",
Code: "500",
ResponseData: map[string]interface{}{"error": "test error"},
Time: time.Now(),
},
}
require.NoError(t, monitor.HandleAttempt(ctx, failedAttempt))
}

Expand All @@ -159,8 +173,9 @@ func TestAlertMonitor_ConsecutiveFailures_Reset(t *testing.T) {
// Verify the notifications were at the right thresholds
var seenCounts []int
for _, call := range notifier.Calls {
alert := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert)
seenCounts = append(seenCounts, alert.Data.ConsecutiveFailures)
if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok {
seenCounts = append(seenCounts, cfAlert.Data.ConsecutiveFailures)
}
}
assert.Contains(t, seenCounts, 10, "Should have alerted at 50% (10 failures)")
assert.Contains(t, seenCounts, 14, "Should have alerted at 66% (14 failures)")
Expand Down Expand Up @@ -193,39 +208,41 @@ func TestAlertMonitor_ConsecutiveFailures_AboveThreshold(t *testing.T) {

dest := &alert.AlertDestination{ID: "dest_above", TenantID: "tenant_above"}
event := &models.Event{Topic: "test.event"}
task := &models.DeliveryTask{Event: *event}
attempt := alert.DeliveryAttempt{
Success: false,
DeliveryTask: task,
Destination: dest,
DeliveryResponse: map[string]interface{}{
"status": "500",
},
Timestamp: time.Now(),
}

// Send 25 consecutive failures (5 more than the threshold)
for i := 1; i <= 25; i++ {
attempt := alert.DeliveryAttempt{
Event: event,
Destination: dest,
Attempt: &models.Attempt{
ID: fmt.Sprintf("att_%d", i),
Status: "failed",
Code: "500",
ResponseData: map[string]interface{}{"error": "test error"},
Time: time.Now(),
},
}
require.NoError(t, monitor.HandleAttempt(ctx, attempt))
}

// Verify notifications at 50%, 70%, 90%, and 100% thresholds
// Verify consecutive failure notifications at 50%, 70%, 90%, and 100% thresholds
// Plus additional notifications for failures 21-25 (all at 100% level)
var notifyCallCount int
var consecutiveFailureCount int
var disableNotifyCount int
for _, call := range notifier.Calls {
if call.Method == "Notify" {
notifyCallCount++
alertData := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert)
if alertData.Data.ConsecutiveFailures >= 20 {
disableNotifyCount++
require.True(t, alertData.Data.WillDisable, "WillDisable should be true at and above 100%")
if cfAlert, ok := call.Arguments.Get(1).(alert.ConsecutiveFailureAlert); ok {
consecutiveFailureCount++
if cfAlert.Data.ConsecutiveFailures >= 20 {
disableNotifyCount++
require.True(t, cfAlert.Data.WillDisable, "WillDisable should be true at and above max")
}
}
}
}
// 4 alerts at thresholds (10, 14, 18, 20) + 5 alerts for 21-25
require.Equal(t, 9, notifyCallCount, "Should have sent 9 notifications (4 at thresholds + 5 above)")
require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications with WillDisable=true (20-25)")
require.Equal(t, 9, consecutiveFailureCount, "Should have sent 9 consecutive failure notifications (4 at thresholds + 5 above)")
require.Equal(t, 6, disableNotifyCount, "Should have 6 notifications at threshold 100 (20-25)")

// Verify destination was disabled multiple times (once per failure >= 20)
var disableCallCount int
Expand Down
12 changes: 12 additions & 0 deletions internal/alert/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ type AlertDestination struct {
DisabledAt *time.Time `json:"disabled_at" redis:"disabled_at"`
}

func AlertDestinationFromDestination(d *models.Destination) *AlertDestination {
return &AlertDestination{
ID: d.ID,
TenantID: d.TenantID,
Type: d.Type,
Topics: d.Topics,
Config: d.Config,
CreatedAt: d.CreatedAt,
DisabledAt: d.DisabledAt,
}
}

// ConsecutiveFailureData represents the data needed for a consecutive failure alert
type ConsecutiveFailureData struct {
Event AlertedEvent `json:"event"`
Expand Down
Loading
Loading