Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.9.1] - 2026-03-13

### Added

- **`DRY_RUN_COMPLETED` event** — terminal event that closes the dry-run observation loop for each evaluation period. Published after `DRY_RUN_WOULD_TRIGGER` and `DRY_RUN_SLA_PROJECTION`, carrying the SLA verdict (`met`, `breach`, or `n/a`) so operators can see each period resolve.

### Fixed

- **Dry-run pipelines could start Step Function executions via rerun and job-failure paths** — `handleRerunRequest` and `handleJobFailure` did not check `cfg.DryRun` before calling `startSFNWithName`, allowing rerun requests and job failure retries to start real SFN executions for dry-run pipelines. Added dry-run guards in both handlers and defense-in-depth in `startSFNWithName` to suppress execution unconditionally. Watchdog reconciliation loop now skips dry-run pipelines to prevent orphaned trigger locks.
- **Watchdog scheduled real SLA alerts for dry-run pipelines** — `scheduleSLAAlerts`, `detectMissedSchedules`, `detectMissedInclusionSchedules`, `checkTriggerDeadlines`, `detectMissingPostRunSensors`, and `detectRelativeSLABreaches` all iterated over dry-run pipelines without checking `cfg.DryRun`. This caused EventBridge Scheduler entries for SLA_WARNING/SLA_BREACH, SCHEDULE_MISSED events, and RELATIVE_SLA_BREACH alerts to fire for observation-only pipelines. Added `cfg.DryRun` guard to all six watchdog functions.
- **Duplicate `JOB_COMPLETED` alerts for polled jobs** — `handleCheckJob` in the orchestrator published `JOB_COMPLETED` when polling detected success, but the stream-router's `handleJobSuccess` also published the same event when the JOB# record arrived via DynamoDB stream. Removed the orchestrator emission; the stream-router is now the single canonical source for `JOB_COMPLETED` across all job types.

## [0.9.0] - 2026-03-12

### Added
Expand Down
9 changes: 9 additions & 0 deletions deploy/terraform/eventbridge.tf
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ resource "aws_cloudwatch_event_rule" "alert_events" {
"TRIGGER_RECOVERED",
"BASELINE_CAPTURE_FAILED",
"PIPELINE_EXCLUDED",
"SENSOR_DEADLINE_EXPIRED",
"IRREGULAR_SCHEDULE_MISSED",
"RELATIVE_SLA_WARNING",
"RELATIVE_SLA_BREACH",
"DRY_RUN_WOULD_TRIGGER",
"DRY_RUN_LATE_DATA",
"DRY_RUN_SLA_PROJECTION",
"DRY_RUN_DRIFT",
"DRY_RUN_COMPLETED",
]
})
}
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/configuration/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Declarative rules that determine pipeline readiness. See [Validation Rules](#val

### `dryRun`

Enables observation-only mode. When `true`, Interlock evaluates trigger conditions and validation rules against real sensor data but never starts a Step Function execution or triggers any job. All observations are published as EventBridge events (`DRY_RUN_WOULD_TRIGGER`, `DRY_RUN_LATE_DATA`, `DRY_RUN_SLA_PROJECTION`, `DRY_RUN_DRIFT`).
Enables observation-only mode. When `true`, Interlock evaluates trigger conditions and validation rules against real sensor data but never starts a Step Function execution or triggers any job. All observations are published as EventBridge events (`DRY_RUN_WOULD_TRIGGER`, `DRY_RUN_LATE_DATA`, `DRY_RUN_SLA_PROJECTION`, `DRY_RUN_DRIFT`, `DRY_RUN_COMPLETED`).

| Field | Type | Default | Description |
|---|---|---|---|
Expand Down
5 changes: 3 additions & 2 deletions docs/content/docs/guides/dry-run.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ Sensor arrives → Stream-router

## Monitoring Events

Dry-run pipelines emit four event types to EventBridge:
Dry-run pipelines emit five event types to EventBridge:

| Event | Meaning |
|---|---|
| `DRY_RUN_WOULD_TRIGGER` | All validation rules passed — Interlock would have triggered the job |
| `DRY_RUN_LATE_DATA` | Sensor data arrived after the trigger point was already recorded |
| `DRY_RUN_SLA_PROJECTION` | Estimated completion time vs. deadline — `met` or `breach` status |
| `DRY_RUN_COMPLETED` | Observation loop closed — carries the SLA verdict (`met`, `breach`, or `n/a`) |
| `DRY_RUN_LATE_DATA` | Sensor data arrived after the trigger point was already recorded |
| `DRY_RUN_DRIFT` | Post-run sensor data changed from the baseline captured at trigger time |

Create an EventBridge rule to capture these events:
Expand Down
3 changes: 3 additions & 0 deletions docs/content/docs/reference/alerting.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ Published by the **stream-router** Lambda for pipelines with `dryRun: true`. The
| `DRY_RUN_LATE_DATA` | Sensor updated after trigger point | Sensor data arrived after the dry-run trigger was recorded — would have triggered a re-run |
| `DRY_RUN_SLA_PROJECTION` | Estimated completion vs. deadline | Projects whether the SLA would be met or breached based on `expectedDuration` and `deadline` |
| `DRY_RUN_DRIFT` | Post-run sensor data changed | Sensor value drifted from baseline captured at trigger time — would have triggered a drift re-run |
| `DRY_RUN_COMPLETED` | Observation loop closed | Terminal event for the evaluation period — carries the SLA verdict (`met`, `breach`, or `n/a`) |

The `DRY_RUN_SLA_PROJECTION` detail includes `status` (`"met"` or `"breach"`), `estimatedCompletion`, `deadline`, and `marginSeconds` fields.

The `DRY_RUN_COMPLETED` detail includes `triggeredAt`, `slaStatus` (`"met"`, `"breach"`, or `"n/a"`), and optionally `estimatedCompletion` and `deadline` when SLA is configured.

### Watchdog Events

Published by the **watchdog** Lambda, invoked on an EventBridge schedule (default: every 5 minutes).
Expand Down
64 changes: 55 additions & 9 deletions internal/lambda/dryrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,17 @@ func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig

if marker != nil {
// Late data: sensor arrived after we already recorded a would-trigger.
triggeredAtStr, _ := marker.Data["triggeredAt"].(string)
triggeredAt, _ := time.Parse(time.RFC3339, triggeredAtStr)
triggeredAtStr, ok := marker.Data["triggeredAt"].(string)
if !ok || triggeredAtStr == "" {
d.Logger.WarnContext(ctx, "dry-run marker missing triggeredAt", "pipelineId", pipelineID)
return nil
}
triggeredAt, parseErr := time.Parse(time.RFC3339, triggeredAtStr)
if parseErr != nil {
d.Logger.WarnContext(ctx, "dry-run marker has invalid triggeredAt",
"pipelineId", pipelineID, "value", triggeredAtStr, "error", parseErr)
return nil
}
lateBy := now.Sub(triggeredAt)

if pubErr := publishEvent(ctx, d, string(types.EventDryRunLateData), pipelineID, scheduleID, date,
Expand Down Expand Up @@ -83,8 +92,29 @@ func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig
}

// SLA projection if configured.
var slaVerdict *dryRunSLAVerdict
if cfg.SLA != nil && cfg.SLA.ExpectedDuration != "" {
publishDryRunSLAProjection(ctx, d, cfg, pipelineID, scheduleID, date, now)
slaVerdict = publishDryRunSLAProjection(ctx, d, cfg, pipelineID, scheduleID, date, now)
}

// Publish DRY_RUN_COMPLETED to close the observation loop.
completedDetail := map[string]interface{}{
"triggeredAt": now.UTC().Format(time.RFC3339),
}
if slaVerdict != nil {
completedDetail["slaStatus"] = slaVerdict.Status
completedDetail["estimatedCompletion"] = slaVerdict.EstimatedCompletion
if slaVerdict.Deadline != "" {
completedDetail["deadline"] = slaVerdict.Deadline
}
} else {
completedDetail["slaStatus"] = "n/a"
}

if pubErr := publishEvent(ctx, d, string(types.EventDryRunCompleted), pipelineID, scheduleID, date,
fmt.Sprintf("dry-run: observation complete for %s/%s", pipelineID, date),
completedDetail); pubErr != nil {
d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunCompleted, "error", pubErr)
}

d.Logger.Info("dry-run: would trigger",
Expand All @@ -95,15 +125,24 @@ func handleDryRunTrigger(ctx context.Context, d *Deps, cfg *types.PipelineConfig
return nil
}

// dryRunSLAVerdict holds the SLA projection result for inclusion in the
// DRY_RUN_COMPLETED event detail.
type dryRunSLAVerdict struct {
Status string // "met" or "breach"
EstimatedCompletion string // RFC3339
Deadline string // RFC3339, empty if no deadline configured
}

// publishDryRunSLAProjection computes and publishes an SLA projection event
// for a dry-run pipeline. Reuses handleSLACalculate to resolve the breach
// deadline consistently with production SLA monitoring (including hourly
// pipeline T+1 adjustment).
func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, triggeredAt time.Time) {
// pipeline T+1 adjustment). Returns the verdict for inclusion in the
// DRY_RUN_COMPLETED event.
func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date string, triggeredAt time.Time) *dryRunSLAVerdict {
expectedDur, err := time.ParseDuration(cfg.SLA.ExpectedDuration)
if err != nil {
d.Logger.WarnContext(ctx, "dry-run: invalid expectedDuration", "error", err)
return
return nil
}

estimatedCompletion := triggeredAt.Add(expectedDur)
Expand All @@ -113,7 +152,11 @@ func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.Pipelin
"expectedDuration": cfg.SLA.ExpectedDuration,
}

status := "met"
verdict := &dryRunSLAVerdict{
Status: "met",
EstimatedCompletion: estimatedCompletion.UTC().Format(time.RFC3339),
}

message := fmt.Sprintf("dry-run: SLA projection for %s — estimated completion %s",
pipelineID, estimatedCompletion.Format(time.RFC3339))

Expand All @@ -135,10 +178,11 @@ func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.Pipelin
breachAt, parseErr := time.Parse(time.RFC3339, slaOutput.BreachAt)
if parseErr == nil {
detail["deadline"] = slaOutput.BreachAt
verdict.Deadline = slaOutput.BreachAt
margin := breachAt.Sub(estimatedCompletion)
detail["marginSeconds"] = margin.Seconds()
if estimatedCompletion.After(breachAt) {
status = "breach"
verdict.Status = "breach"
message = fmt.Sprintf("dry-run: SLA projection for %s — would breach by %.0fm",
pipelineID, math.Abs(margin.Minutes()))
} else {
Expand All @@ -149,11 +193,13 @@ func publishDryRunSLAProjection(ctx context.Context, d *Deps, cfg *types.Pipelin
}
}

detail["status"] = status
detail["status"] = verdict.Status

if pubErr := publishEvent(ctx, d, string(types.EventDryRunSLAProjection), pipelineID, scheduleID, date, message, detail); pubErr != nil {
d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventDryRunSLAProjection, "error", pubErr)
}

return verdict
}

// handleDryRunPostRunSensor handles post-run sensor events for dry-run pipelines.
Expand Down
6 changes: 2 additions & 4 deletions internal/lambda/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func TestE2E_PrimarySFNPaths(t *testing.T) {
assert.Equal(t, "SLA_MET", r.slaOutcome)
assert.Contains(t, r.events, "VALIDATION_PASSED")
assert.Contains(t, r.events, "JOB_TRIGGERED")
assert.Contains(t, r.events, "JOB_COMPLETED")
// JOB_COMPLETED is emitted by stream-router (not SFN), verified in stream_router_test.go.
assert.Contains(t, r.events, "SLA_MET")
assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-a1"))
assert.Contains(t, collectJoblogEvents(mock, "pipe-a1"), "success")
Expand Down Expand Up @@ -589,7 +589,6 @@ func TestE2E_PrimarySFNPaths(t *testing.T) {
assert.Equal(t, 4, r.evalCount)
assert.Contains(t, r.events, "VALIDATION_PASSED")
assert.Contains(t, r.events, "JOB_TRIGGERED")
assert.Contains(t, r.events, "JOB_COMPLETED")
assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-a2"))
assertAlertFormats(t, eb)
})
Expand Down Expand Up @@ -754,7 +753,6 @@ func TestE2E_PostRunMonitoring(t *testing.T) {
})

assert.Equal(t, sfnDone, r.terminal)
assert.Contains(t, r.events, "JOB_COMPLETED")
assert.NotContains(t, r.events, "POST_RUN_DRIFT")
assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-b1"))
assertAlertFormats(t, eb)
Expand Down Expand Up @@ -1932,7 +1930,7 @@ func TestE2E_CrossHandlerEdgeCases(t *testing.T) {

assert.Equal(t, sfnDone, r.terminal)
// check-job should have skipped the non-terminal entry and polled StatusChecker.
assert.Contains(t, r.events, "JOB_COMPLETED", "check-job should fall through non-terminal joblog to StatusChecker")
// JOB_COMPLETED is emitted by stream-router when the JOB# record arrives, not by the orchestrator.
assert.Equal(t, types.TriggerStatusCompleted, e2eTriggerStatus(mock, "pipe-j2"))
joblogs := collectJoblogEvents(mock, "pipe-j2")
assert.Contains(t, joblogs, "success", "StatusChecker success should be written to joblog")
Expand Down
6 changes: 3 additions & 3 deletions internal/lambda/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func handleCheckJob(ctx context.Context, d *Deps, input OrchestratorInput) (Orch
if err := d.Store.WriteJobEvent(ctx, input.PipelineID, input.ScheduleID, input.Date, types.JobEventSuccess, input.RunID, 0, ""); err != nil {
d.Logger.Warn("failed to write polled job success joblog", "error", err, "pipeline", input.PipelineID, "schedule", input.ScheduleID, "date", input.Date)
}
if err := publishEvent(ctx, d, string(types.EventJobCompleted), input.PipelineID, input.ScheduleID, input.Date, "job succeeded"); err != nil {
d.Logger.WarnContext(ctx, "failed to publish event", "type", types.EventJobCompleted, "error", err)
}
// JOB_COMPLETED is published by the stream-router when the JOB#
// record arrives via DynamoDB stream (handleJobSuccess). Publishing
// here as well would cause duplicate alerts for polled jobs.
return OrchestratorOutput{Mode: "check-job", Event: "success"}, nil
case "failed":
var writeOpts []store.JobEventOption
Expand Down
14 changes: 14 additions & 0 deletions internal/lambda/rerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func handleRerunRequest(ctx context.Context, d *Deps, pk, sk string, record even
return nil
}

// Dry-run pipelines never start real executions.
if cfg.DryRun {
d.Logger.Info("dry-run: skipping rerun request",
"pipelineId", pipelineID, "schedule", schedule, "date", date)
return nil
}

// --- Calendar exclusion check (execution date) ---
if isExcludedDate(cfg, date) {
if err := d.Store.WriteJobEvent(ctx, pipelineID, schedule, date, types.JobEventRerunRejected, "", 0, "excluded by calendar"); err != nil {
Expand Down Expand Up @@ -199,6 +206,13 @@ func handleJobFailure(ctx context.Context, d *Deps, pipelineID, schedule, date,
return nil
}

// Dry-run pipelines never start real executions.
if cfg.DryRun {
d.Logger.Info("dry-run: skipping job failure rerun",
"pipelineId", pipelineID, "schedule", schedule, "date", date)
return nil
}

maxRetries := cfg.Job.MaxRetries

// Check if the latest failure has a category for budget selection.
Expand Down
7 changes: 7 additions & 0 deletions internal/lambda/sfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,14 @@ func startSFN(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineI
}

// startSFNWithName starts a Step Function execution with a custom execution name.
// Defense-in-depth: refuses to start if the pipeline is in dry-run mode.
func startSFNWithName(ctx context.Context, d *Deps, cfg *types.PipelineConfig, pipelineID, scheduleID, date, name string) error {
if cfg.DryRun {
d.Logger.Warn("startSFNWithName called for dry-run pipeline, suppressing execution",
"pipelineId", pipelineID, "schedule", scheduleID, "date", date)
return nil
}

sc := buildSFNConfig(cfg)

// Warn if the sum of evaluation + poll windows exceeds the SFN timeout.
Expand Down
Loading
Loading