Skip to content

InternalRunService: silent data loss when RecordAction fails before UpdateActionStatus #7257

@pingsutw

Description

@pingsutw

Summary

When the Actions Service's TaskAction CRD watcher (actions/k8s/client.go:notifyRunService) fails to call InternalRunService.RecordAction, subsequent UpdateActionStatus calls for the same action silently no-op — the gRPC response is success, but zero rows are updated and the action is effectively invisible in the Runs DB until a K8s watch re-list re-emits ADDED.

Reproduction

  1. Runs Service is briefly unavailable when a TaskAction's ADDED event is processed.
  2. RecordAction fails. The Actions Service logs a Warnf and moves on (no retry). The bloom filter is not updated, but no further action is taken.
  3. Runs Service recovers.
  4. Subsequent MODIFIED events for that TaskAction call UpdateActionStatus. The SQL UPDATE matches zero rows. Runs Service returns nil error (see runs/repository/impl/action.go:397-416, runs/service/internal_run_service.go:232).
  5. The action never appears to clients using WatchRunDetails until a watch re-list occurs (controller restart / watcher reconnect).

Root cause

Two coupled issues in InternalRunService:

  1. UpdateActionPhase returns success on 0 rows affected. The caller has no way to know the action is missing from the DB.
  2. RecordAction and UpdateActionStatus are split RPCs. The client must call both in order; there is no single idempotent "report" operation.

Proposed fix

1. Make UpdateActionStatus return NotFound when no row is matched.

// runs/repository/impl/action.go
if rowsAffected == 0 {
    return ErrActionNotFound
}

…mapped to connect.CodeNotFound in the gRPC handler.

2. Add ActionStatus status = 6 to RecordActionRequest so the initial insert carries the correct phase (no transient PHASE_UNSPECIFIED window).

3. Self-heal in actions/k8s/client.go:notifyRunService:

_, err := c.runClient.UpdateActionStatus(ctx, statusReq)
if connect.CodeOf(err) == connect.CodeNotFound {
    // Row missing — RecordAction must have failed earlier. Rebuild from CRD
    // and re-record. Follow the existing pattern: Add to the bloom filter only
    // on success (the filter is add-only; never Remove).
    recordReq := buildRecordRequestFromCRD(taskAction)
    if _, recErr := c.runClient.RecordAction(ctx, recordReq); recErr == nil {
        if c.recordedFilter != nil {
            c.recordedFilter.Add(ctx, actionKey)
        }
        // Optional: retry UpdateActionStatus now that the row exists.
    }
}

The Actions Service still has the TaskAction CRD in its informer cache, so rebuilding the RecordActionRequest is free.

4. On ADDED events, skip the separate UpdateActionStatusRecordAction now carries the status.

Independent improvements

  • Bounded retry with backoff for transient errors (Unavailable, DeadlineExceeded) — 3 attempts, exponential (100ms / 500ms / 2s). Don't block the sharded worker on long outages.
  • Periodic reconciliation sweep — every ~5min list TaskAction CRDs without the flyte.org/terminal-status-recorded label and force a notifyRunService pass. Backstop for cases where the watcher doesn't reconnect but Runs Service has recovered.

References

  • actions/k8s/client.go:554-646notifyRunService
  • runs/service/internal_run_service.go:191-255UpdateActionStatus handler
  • runs/repository/impl/action.go:361-417UpdateActionPhase (the silent-success site)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions