Skip to content

Commit 25cb92d

Browse files
committed
changefeedccl: (wip to be squashed) add tracking in job frontier
Puts in a rough fix that involves updating the checkpoint to reflect the new tableset of the DB level changefeed when changing the filter by storing a resolved span for newly watched tables at the timestamp of the alter changefeed statement. Epic: CRDB-1421 Release note: None
1 parent 3b65458 commit 25cb92d

File tree

3 files changed

+137
-14
lines changed

3 files changed

+137
-14
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1818
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1919
"github.com/cockroachdb/cockroach/pkg/jobs"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
2021
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2122
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2223
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -36,6 +37,7 @@ import (
3637
"github.com/cockroachdb/cockroach/pkg/sql/types"
3738
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3839
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/span"
3941
"github.com/cockroachdb/cockroach/pkg/util/uuid"
4042
"github.com/cockroachdb/errors"
4143
)
@@ -143,7 +145,7 @@ func alterChangefeedPlanHook(
143145
if prevDetails.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE {
144146
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
145147
if len(prevDetails.TargetSpecifications) != 1 {
146-
return errors.Errorf("database level changefeed must have exactly one target specification")
148+
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
147149
}
148150
} else {
149151
newChangefeedStmt.Level = tree.ChangefeedLevelTable
@@ -248,18 +250,36 @@ func alterChangefeedPlanHook(
248250
}
249251

250252
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
251-
if newChangefeedStmt.Level != tree.ChangefeedLevelDatabase {
253+
// newStatementTime will either be the StatementTime of the job prior to the
254+
// alteration, or it will be the high watermark of the job.
255+
newDetails.StatementTime = newStatementTime
256+
257+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
258+
// For database level changefeeds, we may have set a new value for
259+
// the filter options. In this case, the set of tables we are watching
260+
// may have changed. We need to store the new frontier so that we only
261+
// emit events for the new tables from the time of the ALTER CHANGEFEED
262+
// statement, but not before.
263+
// We do not need to remove spans for tables that are no longer
264+
// watched. When the changefeed is resumed, we will make the frontier
265+
// to only watch the new set of watched tables.
266+
targetTS := newProgress.GetHighWater()
267+
if targetTS == nil || targetTS.IsEmpty() {
268+
targetTS = &prevDetails.StatementTime
269+
}
270+
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg(), *targetTS)
271+
if err != nil {
272+
return err
273+
}
274+
storeNewTablesJobFrontier(ctx, p, jobID, *targetTS, targets, prevTargets, prevDetails, newDetails)
275+
} else {
252276
// If the changefeed is a table-level changefeed, we set the initial
253277
// scan option to its default value of "yes" so that we can do an
254278
// initial scan for new targets. We do not allow initial scans on
255279
// database level changefeeds.
256280
newDetails.Opts[changefeedbase.OptInitialScan] = ``
257281
}
258282

259-
// newStatementTime will either be the StatementTime of the job prior to the
260-
// alteration, or it will be the high watermark of the job.
261-
newDetails.StatementTime = newStatementTime
262-
263283
newPayload := job.Payload()
264284
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
265285
newPayload.Description = jobRecord.Description
@@ -346,10 +366,10 @@ func processFiltersForChangefeed(
346366
alterCmds tree.AlterChangefeedCmds,
347367
prevDetails jobspb.ChangefeedDetails,
348368
) error {
349-
filters := generateNewFilters(alterCmds)
369+
filterCommands := generateNewFilters(alterCmds)
350370

351371
// Only DB-level feeds should have filters.
352-
if len(filters) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
372+
if len(filterCommands) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
353373
return errors.Errorf("filters are only supported for database level changefeeds")
354374
}
355375

@@ -367,8 +387,8 @@ func processFiltersForChangefeed(
367387
}
368388

369389
// For table-level feeds, this should be a no-op since there are no
370-
// filters as asserted above. Otherwise, apply each filter in order.
371-
for _, filter := range filters {
390+
// filter commands as asserted above. Otherwise, apply each filter in order.
391+
for _, filter := range filterCommands {
372392
currentFilter := stmt.FilterOption
373393
// Note that the no-filters state is represented by FilterType = exclude
374394
// with an empty tables list. In that state, it's perfectly valid to
@@ -527,9 +547,9 @@ func generateAndValidateNewTargets(
527547

528548
// Since we do not allow initial scans on database level changefeeds,
529549
// we do not need to call generateAndValidateNewTargetsForTableLevelFeed.
530-
// When filter changes result in changes to which tables are being tracked,
531-
// we can rely on startDistChangefeed to handle removing/adding tables to
532-
// the progress.
550+
// When filter changes result in our set of watched tables changing, though,
551+
// we will also need to modify the progress, but we do that once we've created
552+
// the new changefeed job record so that we have access to the new table targets.
533553
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
534554
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
535555
}
@@ -978,7 +998,6 @@ func generateNewProgress(
978998
// TODO(#142376): Consider whether we want to set the new statement time
979999
// to the actual new statement time (ALTER CHANGEFEED statement time).
9801000
newStatementTime := *prevHighWater
981-
9821001
newProgress := jobspb.Progress{
9831002
Progress: &jobspb.Progress_HighWater{},
9841003
Details: &jobspb.Progress_Changefeed{
@@ -1032,6 +1051,51 @@ func generateNewProgress(
10321051
return newProgress, prevStatementTime, nil
10331052
}
10341053

1054+
// storeNewTablesJobFrontier uses the persistentjob frontier to ensure that when
1055+
// we resume the changefeed, new tables being watched by the database level feed
1056+
// (due to ALTER CHANGEFEED changing the filter options) are watched only from
1057+
// the time of the ALTER CHANGEFEED statement.
1058+
func storeNewTablesJobFrontier(
1059+
ctx context.Context, p sql.PlanHookState, jobID jobspb.JobID, ts hlc.Timestamp,
1060+
targets changefeedbase.Targets, prevTargets changefeedbase.Targets,
1061+
newDetails jobspb.ChangefeedDetails, prevDetails jobspb.ChangefeedDetails,
1062+
) error {
1063+
getSpansForTargets := func(targets changefeedbase.Targets, details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
1064+
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, ts)
1065+
if err != nil {
1066+
return nil, err
1067+
}
1068+
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, ts)
1069+
if err != nil {
1070+
return nil, err
1071+
}
1072+
return spans, nil
1073+
}
1074+
1075+
existingTargetSpans, err := getSpansForTargets(prevTargets, prevDetails)
1076+
if err != nil {
1077+
return err
1078+
}
1079+
trackedSpans, err := getSpansForTargets(targets, newDetails)
1080+
if err != nil {
1081+
return err
1082+
}
1083+
1084+
statementTime := hlc.Timestamp{WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano()}
1085+
var newTrackedSpans roachpb.SpanGroup
1086+
newTrackedSpans.Add(trackedSpans...)
1087+
newTrackedSpans.Sub(existingTargetSpans...)
1088+
1089+
frontier, err := span.MakeFrontierAt(statementTime, newTrackedSpans.Slice()...)
1090+
if err != nil {
1091+
return err
1092+
}
1093+
1094+
if err := jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier); err != nil {
1095+
return err
1096+
}
1097+
return nil
1098+
}
10351099
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
10361100
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
10371101
if err != nil {

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,6 +1642,63 @@ func TestAlterChangefeedDatabaseLevelChangefeedFilterSemantics(t *testing.T) {
16421642
}
16431643
}
16441644

1645+
// TestAlterChangefeedFilterChangesDontEmitPreAlterEvents tests that if
1646+
// we alter a changefeed's filter options in a way that means new tables are
1647+
// being watched that we do not emit events for the new tables from before the
1648+
// ALTER CHANGEFEED statement.
1649+
func TestAlterChangefeedFilterChangesDontEmitPreAlterEvents(t *testing.T) {
1650+
defer leaktest.AfterTest(t)()
1651+
defer log.Scope(t).Close(t)
1652+
1653+
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
1654+
sqlDB := sqlutils.MakeSQLRunner(s.DB)
1655+
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
1656+
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`)
1657+
1658+
// If the checkpoint advances, that will be the point we resume from for
1659+
// all tables, including 'bar' skipping the pre-alter event. This makes
1660+
// the test fail more consistently if we didn't add 'bar' to the frontier.
1661+
knobs := s.TestingKnobs.
1662+
DistSQL.(*execinfra.TestingKnobs).
1663+
Changefeed.(*TestingKnobs)
1664+
knobs.ShouldCheckpointToJobRecord = func(hw hlc.Timestamp) bool {
1665+
return false
1666+
}
1667+
1668+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo`)
1669+
defer closeFeed(t, testFeed)
1670+
1671+
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'before alter')`)
1672+
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'before alter')`)
1673+
1674+
assertPayloads(t, testFeed, []string{
1675+
`foo: [1]->{"after": {"a": 1, "b": "before alter"}}`,
1676+
})
1677+
1678+
sqlDB.Exec(t, fmt.Sprintf(`PAUSE JOB %d`, testFeed.(cdctest.EnterpriseTestFeed).JobID()))
1679+
waitForJobState(sqlDB, t, testFeed.(cdctest.EnterpriseTestFeed).JobID(), `paused`)
1680+
1681+
sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d UNSET INCLUDE TABLES`,
1682+
testFeed.(cdctest.EnterpriseTestFeed).JobID()))
1683+
1684+
sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, testFeed.(cdctest.EnterpriseTestFeed).JobID()))
1685+
waitForJobState(sqlDB, t, testFeed.(cdctest.EnterpriseTestFeed).JobID(), `running`)
1686+
1687+
sqlDB.Exec(t, `INSERT INTO foo VALUES (2, 'after alter')`)
1688+
sqlDB.Exec(t, `INSERT INTO bar VALUES (2, 'after alter')`)
1689+
1690+
// We should only see the event from bar that was inserted after the alter,
1691+
// but we should see both events from foo. This test may falsely pass if
1692+
// we "get lucky" and the pre-alter event for bar is the last one emitted.
1693+
assertPayloads(t, testFeed, []string{
1694+
`foo: [2]->{"after": {"a": 2, "b": "after alter"}}`,
1695+
`bar: [2]->{"after": {"a": 2, "b": "after alter"}}`,
1696+
})
1697+
}
1698+
1699+
cdcTest(t, testFn, feedTestEnterpriseSinks)
1700+
}
1701+
16451702
func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
16461703
defer leaktest.AfterTest(t)()
16471704
defer log.Scope(t).Close(t)

pkg/ccl/changefeedccl/changefeed_dist.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,11 @@ func startDistChangefeed(
254254

255255
dsp := execCtx.DistSQLPlanner()
256256

257+
// ??
257258
var spanLevelCheckpoint *jobspb.TimestampSpansMap
258259
if progress := localState.progress.GetChangefeed(); progress != nil && progress.SpanLevelCheckpoint != nil {
259260
spanLevelCheckpoint = progress.SpanLevelCheckpoint
261+
fmt.Println("spanLevelCheckpoint starting for changefeed", spanLevelCheckpoint)
260262
if log.V(2) {
261263
log.Changefeed.Infof(ctx, "span-level checkpoint: %s", spanLevelCheckpoint)
262264
}

0 commit comments

Comments
 (0)