Skip to content

Commit f1ad028

Browse files
committed
changefeedccl: (wip to be squashed) add tracking in job frontier
Puts in a rough fix that involves updating the checkpoint to reflect the new tableset of the DB level changefeed when changing the filter by storing a resolved span for newly watched tables at the timestamp of the alter changefeed statement. Epic: CRDB-1421 Release note: None
1 parent 3b65458 commit f1ad028

File tree

3 files changed

+155
-14
lines changed

3 files changed

+155
-14
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1919
"github.com/cockroachdb/cockroach/pkg/jobs"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
2021
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2122
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2223
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -36,6 +37,7 @@ import (
3637
"github.com/cockroachdb/cockroach/pkg/sql/types"
3738
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3839
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/span"
3941
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4042
"github.com/cockroachdb/errors"
4143
)
@@ -143,7 +145,7 @@ func alterChangefeedPlanHook(
143145
if prevDetails.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE {
144146
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
145147
if len(prevDetails.TargetSpecifications) != 1 {
146-
return errors.Errorf("database level changefeed must have exactly one target specification")
148+
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
147149
}
148150
} else {
149151
newChangefeedStmt.Level = tree.ChangefeedLevelTable
@@ -248,18 +250,48 @@ func alterChangefeedPlanHook(
248250
}
249251

250252
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
251-
if newChangefeedStmt.Level != tree.ChangefeedLevelDatabase {
253+
// newStatementTime will either be the StatementTime of the job prior to the
254+
// alteration, or it will be the high watermark of the job.
255+
newDetails.StatementTime = newStatementTime
256+
257+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
258+
// For database level changefeeds, we may have set a new value for
259+
// the filter options. In this case, the set of tables we are watching
260+
// may have changed. We need to store the new frontier so that we only
261+
// emit events for the new tables from the time of the ALTER CHANGEFEED
262+
// statement, but not before.
263+
264+
// NB: We do not need to remove spans for tables that are no longer
265+
// watched. When the changefeed is resumed, we will make the frontier
266+
// so that it will only watch the new set of watched tables. This means
267+
// that if altering a filter results in us watching fewer tables,
268+
// we will not emit ANY more events for that table, even if it was
269+
// lagging, i.e. even if they corresponded to updates made before
270+
// the ALTER CHANGEFEED statement.
271+
272+
// We compare the effect of the two filters at the time of the current
273+
// high watermark. If the filter now includes a new table, but that
274+
// table was created between the previous high watermark and the
275+
// ALTER CHANGEFEED statement, then we will not store progress for it.
276+
// Instead, the tableset watcher will pick it up once the high water
277+
// advances to its creation time.
278+
targetTS := newProgress.GetHighWater()
279+
if targetTS == nil || targetTS.IsEmpty() {
280+
targetTS = &prevDetails.StatementTime
281+
}
282+
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg(), *targetTS)
283+
if err != nil {
284+
return err
285+
}
286+
storeNewTablesJobFrontier(ctx, p, jobID, *targetTS, targets, prevTargets, newDetails, prevDetails)
287+
} else {
252288
// If the changefeed is a table-level changefeed, we set the initial
253289
// scan option to its default value of "yes" so that we can do an
254290
// initial scan for new targets. We do not allow initial scans on
255291
// database level changefeeds.
256292
newDetails.Opts[changefeedbase.OptInitialScan] = ``
257293
}
258294

259-
// newStatementTime will either be the StatementTime of the job prior to the
260-
// alteration, or it will be the high watermark of the job.
261-
newDetails.StatementTime = newStatementTime
262-
263295
newPayload := job.Payload()
264296
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
265297
newPayload.Description = jobRecord.Description
@@ -346,10 +378,10 @@ func processFiltersForChangefeed(
346378
alterCmds tree.AlterChangefeedCmds,
347379
prevDetails jobspb.ChangefeedDetails,
348380
) error {
349-
filters := generateNewFilters(alterCmds)
381+
filterCommands := generateNewFilters(alterCmds)
350382

351383
// Only DB-level feeds should have filters.
352-
if len(filters) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
384+
if len(filterCommands) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
353385
return errors.Errorf("filters are only supported for database level changefeeds")
354386
}
355387

@@ -367,8 +399,8 @@ func processFiltersForChangefeed(
367399
}
368400

369401
// For table-level feeds, this should be a no-op since there are no
370-
// filters as asserted above. Otherwise, apply each filter in order.
371-
for _, filter := range filters {
402+
// filter commands as asserted above. Otherwise, apply each filter in order.
403+
for _, filter := range filterCommands {
372404
currentFilter := stmt.FilterOption
373405
// Note that the no-filters state is represented by FilterType = exclude
374406
// with an empty tables list. In that state, it's perfectly valid to
@@ -527,9 +559,9 @@ func generateAndValidateNewTargets(
527559

528560
// Since we do not allow initial scans on database level changefeeds,
529561
// we do not need to call generateAndValidateNewTargetsForTableLevelFeed.
530-
// When filter changes result in changes to which tables are being tracked,
531-
// we can rely on startDistChangefeed to handle removing/adding tables to
532-
// the progress.
562+
// When filter changes result in our set of watched tables changing, though,
563+
// we will also need to modify the progress, but we do that once we've created
564+
// the new changefeed job record so that we have access to the new table targets.
533565
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
534566
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
535567
}
@@ -978,7 +1010,6 @@ func generateNewProgress(
9781010
// TODO(#142376): Consider whether we want to set the new statement time
9791011
// to the actual new statement time (ALTER CHANGEFEED statement time).
9801012
newStatementTime := *prevHighWater
981-
9821013
newProgress := jobspb.Progress{
9831014
Progress: &jobspb.Progress_HighWater{},
9841015
Details: &jobspb.Progress_Changefeed{
@@ -1032,6 +1063,51 @@ func generateNewProgress(
10321063
return newProgress, prevStatementTime, nil
10331064
}
10341065

1066+
// storeNewTablesJobFrontier uses the persistentjob frontier to ensure that when
1067+
// we resume the changefeed, new tables being watched by the database level feed
1068+
// (due to ALTER CHANGEFEED changing the filter options) are watched only from
1069+
// the time of the ALTER CHANGEFEED statement.
1070+
func storeNewTablesJobFrontier(
1071+
ctx context.Context, p sql.PlanHookState, jobID jobspb.JobID, ts hlc.Timestamp,
1072+
targets changefeedbase.Targets, prevTargets changefeedbase.Targets,
1073+
newDetails jobspb.ChangefeedDetails, prevDetails jobspb.ChangefeedDetails,
1074+
) error {
1075+
getSpansForTargets := func(targets changefeedbase.Targets, details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
1076+
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, ts)
1077+
if err != nil {
1078+
return nil, err
1079+
}
1080+
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, ts)
1081+
if err != nil {
1082+
return nil, err
1083+
}
1084+
return spans, nil
1085+
}
1086+
1087+
existingTargetSpans, err := getSpansForTargets(prevTargets, prevDetails)
1088+
if err != nil {
1089+
return err
1090+
}
1091+
trackedSpans, err := getSpansForTargets(targets, newDetails)
1092+
if err != nil {
1093+
return err
1094+
}
1095+
1096+
statementTime := hlc.Timestamp{WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano()}
1097+
var newTrackedSpans roachpb.SpanGroup
1098+
newTrackedSpans.Add(trackedSpans...)
1099+
newTrackedSpans.Sub(existingTargetSpans...)
1100+
1101+
frontier, err := span.MakeFrontierAt(statementTime, newTrackedSpans.Slice()...)
1102+
if err != nil {
1103+
return err
1104+
}
1105+
1106+
if err := jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier); err != nil {
1107+
return err
1108+
}
1109+
return nil
1110+
}
10351111
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
10361112
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
10371113
if err != nil {

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,6 +1574,12 @@ func TestAlterChangefeedDatabaseLevelChangefeedFilterSemantics(t *testing.T) {
15741574
// since the filter check passes when tables list is empty.
15751575
expectPayloads: map[string]bool{"foo": true, "bar": false, "baz": true},
15761576
},
1577+
{
1578+
name: "unset_include_then_set_exclude",
1579+
createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo",
1580+
alters: []alterSpec{{stmt: "UNSET INCLUDE TABLES"}, {stmt: "SET EXCLUDE TABLES bar"}},
1581+
expectPayloads: map[string]bool{"foo": true, "bar": false, "baz": true},
1582+
},
15771583
{
15781584
name: "set_exclude_then_unset_include_fails",
15791585
createStmt: "CREATE CHANGEFEED FOR DATABASE d",
@@ -1642,6 +1648,63 @@ func TestAlterChangefeedDatabaseLevelChangefeedFilterSemantics(t *testing.T) {
16421648
}
16431649
}
16441650

1651+
// TestAlterChangefeedFilterChangesDontEmitPreAlterEvents tests that if
1652+
// we alter a changefeed's filter options in a way that means new tables are
1653+
// being watched that we do not emit events for the new tables from before the
1654+
// ALTER CHANGEFEED statement.
1655+
func TestAlterChangefeedFilterChangesDontEmitPreAlterEvents(t *testing.T) {
1656+
defer leaktest.AfterTest(t)()
1657+
defer log.Scope(t).Close(t)
1658+
1659+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1660+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1661+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
1662+
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)
1663+
1664+
// If the checkpoint advances, that will be the point we resume from for
1665+
// all tables, including 'bar' skipping the pre-alter event. This makes
1666+
// the test fail more consistently if we didn't add 'bar' to the frontier.
1667+
knobs := s.TestingKnobs.
1668+
DistSQL.(*execinfra.TestingKnobs).
1669+
Changefeed.(*TestingKnobs)
1670+
knobs.ShouldCheckpointToJobRecord = func(hw hlc.Timestamp) bool {
1671+
return false
1672+
}
1673+
1674+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
1675+
defer closeFeed(t, testFeed)
1676+
1677+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before alter')`)
1678+
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'before alter')`)
1679+
1680+
assertPayloads(t, testFeed, []string{
1681+
`foo: [1]->{"after": {"a": 1, "b": "before alter"}}`,
1682+
})
1683+
1684+
sqlDB.Exec(t, fmt.Sprintf(`PAUSE JOB %d`, testFeed.(cdctest.EnterpriseTestFeed).JobID()))
1685+
waitForJobState(sqlDB, t, testFeed.(cdctest.EnterpriseTestFeed).JobID(), `paused`)
1686+
1687+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d UNSET INCLUDE TABLES`,
1688+
testFeed.(cdctest.EnterpriseTestFeed).JobID()))
1689+
1690+
sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, testFeed.(cdctest.EnterpriseTestFeed).JobID()))
1691+
waitForJobState(sqlDB, t, testFeed.(cdctest.EnterpriseTestFeed).JobID(), `running`)
1692+
1693+
sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after alter')`)
1694+
sqlDB.Exec(t, `INSERT INTO bar VALUES (2, 'after alter')`)
1695+
1696+
// We should only see the event from bar that was inserted after the alter,
1697+
// but we should see both events from foo. This test may falsely pass if
1698+
// we "get lucky" and the pre-alter event for bar is the last one emitted.
1699+
assertPayloads(t, testFeed, []string{
1700+
`foo: [2]->{"after": {"a": 2, "b": "after alter"}}`,
1701+
`bar: [2]->{"after": {"a": 2, "b": "after alter"}}`,
1702+
})
1703+
}
1704+
1705+
cdcTest(t, testFn, feedTestEnterpriseSinks)
1706+
}
1707+
16451708
func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
16461709
defer leaktest.AfterTest(t)()
16471710
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,11 @@ func startDistChangefeed(
254254

255255
dsp := execCtx.DistSQLPlanner()
256256

257+
// ??
257258
var spanLevelCheckpoint *jobspb.TimestampSpansMap
258259
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
259260
spanLevelCheckpoint = progress.SpanLevelCheckpoint
261+
fmt.Println("spanLevelCheckpoint starting for changefeed", spanLevelCheckpoint)
260262
if log.V(2) {
261263
log.Changefeed.Infof(ctx, "span-level checkpoint: %s", spanLevelCheckpoint)
262264
}

0 commit comments

Comments
 (0)