Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2925,3 +2925,99 @@ func RemoveTableFromCDCMetadata(ctx context.Context, db DBQuerier, tableName, pu

return nil
}

func GetReplicationOriginByName(ctx context.Context, db DBQuerier, originName string) (*uint32, error) {
sql, err := RenderSQL(SQLTemplates.GetReplicationOriginByName, nil)
if err != nil {
return nil, fmt.Errorf("failed to render GetReplicationOriginByName SQL: %w", err)
}

var originID uint32
err = db.QueryRow(ctx, sql, originName).Scan(&originID)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("query to get replication origin by name '%s' failed: %w", originName, err)
}

return &originID, nil
}

func CreateReplicationOrigin(ctx context.Context, db DBQuerier, originName string) (uint32, error) {
sql, err := RenderSQL(SQLTemplates.CreateReplicationOrigin, nil)
if err != nil {
return 0, fmt.Errorf("failed to render CreateReplicationOrigin SQL: %w", err)
}

var originID uint32
err = db.QueryRow(ctx, sql, originName).Scan(&originID)
if err != nil {
return 0, fmt.Errorf("query to create replication origin '%s' failed: %w", originName, err)
}

return originID, nil
}

func SetupReplicationOriginSession(ctx context.Context, db DBQuerier, originName string) error {
sql, err := RenderSQL(SQLTemplates.SetupReplicationOriginSession, nil)
if err != nil {
return fmt.Errorf("failed to render SetupReplicationOriginSession SQL: %w", err)
}

_, err = db.Exec(ctx, sql, originName)
if err != nil {
return fmt.Errorf("query to setup replication origin session for origin '%s' failed: %w", originName, err)
}

return nil
}

func ResetReplicationOriginSession(ctx context.Context, db DBQuerier) error {
sql, err := RenderSQL(SQLTemplates.ResetReplicationOriginSession, nil)
if err != nil {
return fmt.Errorf("failed to render ResetReplicationOriginSession SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to reset replication origin session failed: %w", err)
}

return nil
}

func SetupReplicationOriginXact(ctx context.Context, db DBQuerier, originLSN string, originTimestamp *time.Time) error {
sql, err := RenderSQL(SQLTemplates.SetupReplicationOriginXact, nil)
if err != nil {
return fmt.Errorf("failed to render SetupReplicationOriginXact SQL: %w", err)
}

var timestampParam any
if originTimestamp != nil {
timestampParam = originTimestamp.Format(time.RFC3339)
} else {
timestampParam = nil
}

_, err = db.Exec(ctx, sql, originLSN, timestampParam)
if err != nil {
return fmt.Errorf("query to setup replication origin xact with LSN %s failed: %w", originLSN, err)
}

return nil
}

func ResetReplicationOriginXact(ctx context.Context, db DBQuerier) error {
sql, err := RenderSQL(SQLTemplates.ResetReplicationOriginXact, nil)
if err != nil {
return fmt.Errorf("failed to render ResetReplicationOriginXact SQL: %w", err)
}

_, err = db.Exec(ctx, sql)
if err != nil {
return fmt.Errorf("query to reset replication origin xact failed: %w", err)
}

return nil
}
24 changes: 24 additions & 0 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ type Templates struct {
RemoveTableFromCDCMetadata *template.Template
GetSpockOriginLSNForNode *template.Template
GetSpockSlotLSNForNode *template.Template
GetReplicationOriginByName *template.Template
CreateReplicationOrigin *template.Template
SetupReplicationOriginSession *template.Template
ResetReplicationOriginSession *template.Template
SetupReplicationOriginXact *template.Template
ResetReplicationOriginXact *template.Template
}

var SQLTemplates = Templates{
Expand Down Expand Up @@ -1543,4 +1549,22 @@ var SQLTemplates = Templates{
ORDER BY rs.confirmed_flush_lsn DESC
LIMIT 1
`)),
GetReplicationOriginByName: template.Must(template.New("getReplicationOriginByName").Parse(`
SELECT roident FROM pg_replication_origin WHERE roname = $1
`)),
CreateReplicationOrigin: template.Must(template.New("createReplicationOrigin").Parse(`
SELECT pg_replication_origin_create($1)
`)),
SetupReplicationOriginSession: template.Must(template.New("setupReplicationOriginSession").Parse(`
SELECT pg_replication_origin_session_setup($1)
`)),
ResetReplicationOriginSession: template.Must(template.New("resetReplicationOriginSession").Parse(`
SELECT pg_replication_origin_session_reset()
`)),
SetupReplicationOriginXact: template.Must(template.New("setupReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_setup($1, $2)
`)),
ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_reset()
`)),
}
163 changes: 135 additions & 28 deletions docs/commands/repair/table-repair.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,79 @@
# table-repair

Performs repairs on tables of divergent nodes based on the diff report generated by `table-diff`.
Performs repairs on tables of divergent nodes based on the diff report
generated by `table-diff`.

## Syntax

When calling `table-repair`, use the following syntax:

```bash
./ace table-repair <cluster_name> <schema.table_name> --diff-file=<diff_file> [--source-of-truth=<node>] [options]
./ace table-repair <cluster_name> <schema.table_name> \
--diff-file=<diff_file> [--source-of-truth=<node>] [options]
```

- `cluster_name`: Name of the cluster. Optional. Overrides `default_cluster` in `ace.yaml` if specified.
Provide the following details in the call to `table-repair`:

- `cluster_name`: Name of the cluster. Optional. Overrides
`default_cluster` in `ace.yaml` if specified.
- `schema.table_name`: Schema‑qualified table name to repair.
- `--diff-file`: Path to the diff JSON file.
- `--source-of-truth` (`-r`): Authoritative node for repairs. *Required* unless you run in bidirectional insert-only mode **or** `--fix-nulls`.
- `--source-of-truth` (`-r`): Authoritative node for repairs. Required
unless you run in bidirectional insert-only mode *or* `--fix-nulls`.

## Options
Include the following options as needed:

| Option | Alias | Description | Default |
|---|---|---|---|
| `--diff-file <path>` | `-f` | Path to the diff file (required) | |
| `--dbname <name>` | `-d` | Database name override | |
| `--nodes <list>` | `-n` | Nodes to include when resolving cluster metadata | `all` |
| `--source-of-truth <node>` | `-r` | Authoritative node for repairs (required unless `--bidirectional`) | |
| `--dry-run` | `-y` | Emit a dry-run plan instead of executing changes | `false` |
| `--generate-report` | `-g` | Write a JSON repair report to `reports/<YYYY-MM-DD>/repair_report_<HHMMSS.mmm>.json` | `false` |
| `--insert-only` | `-i` | Only insert missing rows; skip updates/deletes | `false` |
| `--nodes <list>` | `-n` | Nodes to include when resolving |
| | | cluster metadata | `all` |
| `--source-of-truth <node>` | `-r` | Authoritative node for repairs |
| | | (required unless `--bidirectional`) | |
| `--dry-run` | `-y` | Emit a dry-run plan instead of |
| | | executing changes | `false` |
| `--generate-report` | `-g` | Write a JSON repair report to |
| | | `reports/<YYYY-MM-DD>/` |
| | | `repair_report_<HHMMSS.mmm>.json` | `false` |
| `--insert-only` | `-i` | Only insert missing rows; skip |
| | | updates/deletes | `false` |
| `--upsert-only` | `-P` | Insert or update rows; skip deletes | `false` |
| `--repair-plan <path>` | `-p` | Path to an advanced repair plan (YAML/JSON). Overrides `--source-of-truth` and uses rule-based actions. | |
| `--fix-nulls` | `-X` | Fill NULL columns on each node using non-NULL values from its peers (no source-of-truth needed) | `false` |
| `--bidirectional` | `-Z` | Perform insert-only repairs in both directions | `false` |
| `--fire-triggers` | `-t` | Execute triggers (otherwise runs with `session_replication_role='replica'`) | `false` |
| `--recovery-mode` | | Enable recovery-mode repair when the diff was generated with `--against-origin`; can auto-select a source of truth using Spock LSNs | `false` |
| `--repair-plan <path>` | `-p` | Path to an advanced repair plan |
| | | (YAML/JSON). Overrides |
| | | `--source-of-truth` and uses |
| | | rule-based actions. | |
| `--fix-nulls` | `-X` | Fill NULL columns on each node |
| | | using non-NULL values from its |
| | | peers (no source-of-truth needed) | `false` |
| `--bidirectional` | `-Z` | Perform insert-only repairs in |
| | | both directions | `false` |
| `--fire-triggers` | `-t` | Execute triggers (otherwise runs |
| | | with `session_replication_role=` |
| | | `'replica'`) | `false` |
| `--recovery-mode` | | Enable recovery-mode repair when |
| | | the diff was generated with |
| | | `--against-origin`; can auto-select |
| | | a source of truth using Spock LSNs | `false` |
| `--preserve-origin` | | Preserve replication origin node |
| | | ID and LSN for repaired rows. When |
| | | enabled, repaired rows will have |
| | | commits with the original node's |
| | | origin ID instead of the local node |
| | | ID. Requires LSN to be available |
| | | from a survivor node. | `true` |
| `--quiet` | `-q` | Suppress non-essential logging | `false` |
| `--debug` | `-v` | Enable verbose logging | `false` |

### Advanced repair plans
- Use `--repair-plan` to drive repairs from a plan. Source-of-truth becomes optional; the plan sets per-row decisions.
- Mutually exclusive with `--bidirectional` and `--fix-nulls` (those are separate modes).
- Dry-run and reports include rule usage counts per node.
- See [Advanced repair](advanced-repair.md) for grammar and [Examples](advanced-repair-examples.md) for recipes.

## Example
- Use `--repair-plan` to drive repairs from a plan. Source-of-truth
becomes optional; the plan sets per-row decisions.
- Mutually exclusive with `--bidirectional` and `--fix-nulls` (those are
separate modes).
- Dry-run and reports include rule usage counts per node.
- See [Advanced repair](advanced-repair.md) for grammar and
[Examples](advanced-repair-examples.md) for recipes.

```sh
./ace table-repair acctg public.customers_large \
Expand All @@ -48,24 +82,97 @@ Performs repairs on tables of divergent nodes based on the diff report generated
--generate-report
```

Diff reports share the same prefix generated by `table-diff` (for example `public_customers_large_diffs-20250718134542.json`). When you request a dry run or report, ACE also writes JSON summaries under `reports/<YYYY-MM-DD>/repair_report_<HHMMSS.mmm>.json` (or `dry_run_report_<...>.json`).
Diff reports share the same prefix generated by `table-diff` (for
example `public_customers_large_diffs-20250718134542.json`). When you
request a dry run or report, ACE also writes JSON summaries under
`reports/<YYYY-MM-DD>/repair_report_<HHMMSS.mmm>.json` (or
`dry_run_report_<...>.json`).

### Recovery-mode behavior

- If the diff file indicates `only_origin`, `table-repair` refuses to run unless `--recovery-mode` is set.
- In recovery-mode, if no `--source-of-truth` is provided, ACE probes surviving nodes for the failed node’s Spock origin LSN (preferred) and slot LSN (fallback) and picks the highest. Ties or missing LSNs require you to provide `--source-of-truth`.
- The chosen source (auto or explicit) is recorded in the repair report along with the LSN probes.
- Recovery-mode still accepts `--repair-plan`; the plan is applied after the source of truth is determined. If no repair plan is provided, ACE performs a standard repair with the recovery-mode source selection.
- If the diff file indicates `only_origin`, `table-repair` refuses to run
unless `--recovery-mode` is set.
- In recovery-mode, if no `--source-of-truth` is provided, ACE probes
surviving nodes for the failed node's Spock origin LSN (preferred) and
slot LSN (fallback) and picks the highest. Ties or missing LSNs require
you to provide `--source-of-truth`.
- The chosen source (auto or explicit) is recorded in the repair report
along with the LSN probes.
- Recovery-mode still accepts `--repair-plan`; the plan is applied after
the source of truth is determined. If no repair plan is provided, ACE
performs a standard repair with the recovery-mode source selection.

## Sample Output

```
2025/07/22 12:05:24 INFO Starting table repair for public.customers_large on cluster acctg
2025/07/22 12:05:24 INFO Starting table repair for
public.customers_large on cluster acctg
2025/07/22 12:05:24 INFO Processing repairs for divergent node: n2
2025/07/22 12:05:24 INFO Executed 99 upsert operations on n2
2025/07/22 12:05:24 INFO Repair of public.customers_large complete in 0.003s. Nodes n2 repaired (99 upserted).
2025/07/22 12:05:24 INFO Repair of public.customers_large complete
in 0.003s. Nodes n2 repaired (99 upserted).
```

## Fixing null-only drifts (`--fix-nulls`)

Replication hiccups can leave some columns NULL on one node while populated on another. The `--fix-nulls` mode cross-fills those NULLs in both directions using values from the paired node(s); it does **not** require a source-of-truth. Use it when the diff shows only NULL/NOT NULL mismatches and you want to reconcile columns without preferring a single node.
Replication hiccups can leave some columns NULL on one node while
populated on another. The `--fix-nulls` mode cross-fills those NULLs in
both directions using values from the paired node(s); it does **not**
require a source-of-truth. Use it when the diff shows only NULL/NOT NULL
mismatches and you want to reconcile columns without preferring a single
node.

## Preserving replication origin (`--preserve-origin`)

By default, `--preserve-origin` is enabled. When repairing rows, this
ensures that the repaired rows maintain the correct replication origin
node ID and LSN from the original transaction, rather than using the
local node's ID. This is particularly important in recovery scenarios
where:

- A node fails and rows are repaired from a survivor
- The failed node may come back online
- Without origin tracking, the repaired rows would have the local
node's origin ID, which could cause conflicts when the original node
resumes replication

### How it works

1. ACE extracts the `node_origin` and `commit_ts` from the diff file
metadata for each row being repaired.

2. For each origin node, ACE queries a survivor node to obtain the
origin LSN. This LSN must be available - if it's not, the repair
will fail (as required for data consistency).

3. Before executing repairs for each origin group, ACE:
- gets or creates a replication origin for the origin node.
- sets up a replication origin session.
- configures the session with the origin LSN and timestamp.
- executes the repairs.
- resets the session.

4. Rows are automatically grouped by origin node to minimize session
setup overhead.

### Requirements and limitations

- The origin LSN must be available from at least one survivor node. If
not available, the repair will fail with an error.
- At least one survivor node must be accessible to fetch the origin LSN.
- Replication origin functions require superuser or replication
privileges on the target database.
- Origin metadata may be missing from the diff file for some rows. Those
rows will be repaired without origin tracking (a warning will be
logged).

### When to disable

You may want to disable `--preserve-origin` with `--no-preserve-origin` if:
- You're certain the origin node will not come back online
- You've permanently removed the origin node from the cluster
- You want repaired rows to be treated as local writes

**Note**: Disabling origin preservation should only be done when you're
certain about the node's status, as it can cause replication conflicts if
the origin node returns.
5 changes: 5 additions & 0 deletions internal/api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type tableRepairRequest struct {
GenerateReport bool `json:"generate_report"`
FixNulls bool `json:"fix_nulls"`
Bidirectional bool `json:"bidirectional"`
PreserveOrigin *bool `json:"preserve_origin,omitempty"`
}

type spockDiffRequest struct {
Expand Down Expand Up @@ -434,6 +435,10 @@ func (s *APIServer) handleTableRepair(w http.ResponseWriter, r *http.Request) {
task.GenerateReport = req.GenerateReport
task.FixNulls = req.FixNulls
task.Bidirectional = req.Bidirectional
// PreserveOrigin defaults to true if not explicitly set
if req.PreserveOrigin != nil {
task.PreserveOrigin = *req.PreserveOrigin
}
task.Ctx = r.Context()
task.ClientRole = clientInfo.role
task.InvokeMethod = "api"
Expand Down
6 changes: 6 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func SetupCLI() *cli.App {
Usage: "Enable recovery-mode repair using origin-only diffs",
Value: false,
},
&cli.BoolFlag{
Name: "preserve-origin",
Usage: "Preserve replication origin node ID and LSN for repaired rows (default: true)",
Value: true,
},
&cli.BoolFlag{
Name: "fix-nulls",
Aliases: []string{"X"},
Expand Down Expand Up @@ -1199,6 +1204,7 @@ func TableRepairCLI(ctx *cli.Context) error {
task.Bidirectional = ctx.Bool("bidirectional")
task.GenerateReport = ctx.Bool("generate-report")
task.RecoveryMode = ctx.Bool("recovery-mode")
task.PreserveOrigin = ctx.Bool("preserve-origin")

if err := task.ValidateAndPrepare(); err != nil {
return fmt.Errorf("validation failed: %w", err)
Expand Down
Loading
Loading