Skip to content

Commit 316d69d

Browse files
committed
changefeedccl: support altering filters on db-level feeds
This commit allows users to change filters on existing DB-level changefeeds with commands like ALTER CHANGEFEED <job_id> SET INCLUDE TABLES (including EXCLUDE or UNSET). This also implements errors when trying to set filters on non-DB-level feeds, ADD/DROP targets on DB-level feeds, setting both INCLUDE and EXCLUDE at the same time, and switching between them without first doing an explicit unset. Note that, when setting an include filter on a feed that already has a filter set, tables not explicitly respecified in the ALTER CHANGEFEED command will no longer be included (or excluded) by the filter. When filter changes result in new tables being watched, they will not receive an initial scan. Epic: CRDB-1421 Fixes: #156484 Fixes: #155986 Informs: #155708 Release note: none
1 parent fd29a93 commit 316d69d

File tree

3 files changed

+876
-3
lines changed

3 files changed

+876
-3
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 265 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import (
99
"context"
1010
"maps"
1111
"net/url"
12+
"slices"
1213

1314
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1415
"github.com/cockroachdb/cockroach/pkg/build"
1516
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1819
"github.com/cockroachdb/cockroach/pkg/jobs"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1921
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2022
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -35,6 +37,7 @@ import (
3537
"github.com/cockroachdb/cockroach/pkg/sql/types"
3638
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3739
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/span"
3841
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3942
"github.com/cockroachdb/errors"
4043
)
@@ -55,6 +58,9 @@ func alterChangefeedTypeCheck(
5558
toCheck := []exprutil.ToTypeCheck{
5659
exprutil.Ints{alterChangefeedStmt.Jobs},
5760
}
61+
// TODO(#156806): Validate the options for ADD and UNSET alter commands
62+
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
63+
// invalid option.
5864
for _, cmd := range alterChangefeedStmt.Cmds {
5965
switch v := cmd.(type) {
6066
case *tree.AlterChangefeedSetOptions:
@@ -129,6 +135,14 @@ func alterChangefeedPlanHook(
129135
}
130136

131137
newChangefeedStmt := &tree.CreateChangefeed{}
138+
if isDBLevelChangefeed(prevDetails) {
139+
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
140+
if len(prevDetails.TargetSpecifications) != 1 {
141+
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
142+
}
143+
} else {
144+
newChangefeedStmt.Level = tree.ChangefeedLevelTable
145+
}
132146

133147
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
134148
if err != nil {
@@ -152,6 +166,7 @@ func alterChangefeedPlanHook(
152166

153167
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
154168
ctx, exprEval, p,
169+
newChangefeedStmt.Level,
155170
alterChangefeedStmt.Cmds,
156171
newOptions,
157172
prevDetails, job.Progress(),
@@ -160,7 +175,14 @@ func alterChangefeedPlanHook(
160175
if err != nil {
161176
return err
162177
}
163-
newChangefeedStmt.TableTargets = newTargets
178+
179+
if err := setChangefeedTargets(ctx, p, newChangefeedStmt, prevDetails, newTargets); err != nil {
180+
return err
181+
}
182+
183+
if err := processFiltersForChangefeed(newChangefeedStmt, alterChangefeedStmt.Cmds, prevDetails); err != nil {
184+
return err
185+
}
164186

165187
if prevDetails.Select != "" {
166188
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -218,12 +240,44 @@ func alterChangefeedPlanHook(
218240
}
219241

220242
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
221-
newDetails.Opts[changefeedbase.OptInitialScan] = ``
222-
223243
// newStatementTime will either be the StatementTime of the job prior to the
224244
// alteration, or it will be the high watermark of the job.
225245
newDetails.StatementTime = newStatementTime
226246

247+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
248+
// For database level changefeeds, we may have set a new value for
249+
// the filter options. In this case, the set of tables we are watching
250+
// may have changed. We need to store the new frontier so that we only
251+
// emit events for the new tables from the time of the ALTER CHANGEFEED
252+
// statement, but not before.
253+
254+
// NB: We do not need to remove spans for tables that are no longer
255+
// watched. When the changefeed is resumed, we will make the frontier
256+
// so that it will only watch the new set of watched tables. This means
257+
// that if altering a filter results in us watching fewer tables,
258+
// we will not emit ANY more events for that table, even if it was
259+
// lagging, i.e. even if they corresponded to updates made before
260+
// the ALTER CHANGEFEED statement.
261+
statementTime := hlc.Timestamp{
262+
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
263+
}
264+
265+
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg(), statementTime)
266+
if err != nil {
267+
return err
268+
}
269+
if err := storeNewTablesJobFrontier(
270+
ctx, p, jobID, statementTime, targets, prevTargets, newDetails, prevDetails,
271+
); err != nil {
272+
return err
273+
}
274+
} else {
275+
// If the changefeed is a table-level changefeed, we set the initial
276+
// scan option to its default value of "yes" so that we can do an
277+
// initial scan for new targets.
278+
newDetails.Opts[changefeedbase.OptInitialScan] = ``
279+
}
280+
227281
newPayload := job.Payload()
228282
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
229283
newPayload.Description = jobRecord.Description
@@ -268,6 +322,80 @@ func alterChangefeedPlanHook(
268322
return fn, alterChangefeedHeader, false, nil
269323
}
270324

325+
// setChangefeedTargets sets the appropriate target fields on the changefeed
326+
// statement based on the feed level. Database-level feeds get a DatabaseTarget,
327+
// while table-level feeds get TableTargets.
328+
func setChangefeedTargets(
329+
ctx context.Context,
330+
p sql.PlanHookState,
331+
stmt *tree.CreateChangefeed,
332+
prevDetails jobspb.ChangefeedDetails,
333+
newTargets tree.ChangefeedTableTargets,
334+
) error {
335+
if stmt.Level == tree.ChangefeedLevelDatabase {
336+
targetSpec := prevDetails.TargetSpecifications[0]
337+
txn := p.InternalSQLTxn()
338+
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
339+
if err != nil {
340+
return err
341+
}
342+
stmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(databaseDescriptor.GetName())
343+
} else {
344+
stmt.TableTargets = newTargets
345+
}
346+
return nil
347+
}
348+
349+
// processFiltersForChangefeed validates and applies filter changes to the
350+
// changefeed statement. Filters are only supported for database-level feeds.
351+
func processFiltersForChangefeed(
352+
stmt *tree.CreateChangefeed,
353+
alterCmds tree.AlterChangefeedCmds,
354+
prevDetails jobspb.ChangefeedDetails,
355+
) error {
356+
filterCommands := generateNewFilters(alterCmds)
357+
358+
// Only DB-level feeds should have filters.
359+
if len(filterCommands) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
360+
return errors.Errorf("filters are only supported for database level changefeeds")
361+
}
362+
363+
// We add the existing filter state to the changefeed statement for
364+
// DB-level feeds. This ensures that existing filters are preserved when
365+
// we alter options unrelated to filters and that we can error if we try
366+
// to set both include and exclude filters at the same time.
367+
if stmt.Level == tree.ChangefeedLevelDatabase {
368+
targetSpec := prevDetails.TargetSpecifications[0]
369+
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
370+
if err != nil {
371+
return err
372+
}
373+
stmt.FilterOption = filterOpt
374+
}
375+
376+
// For table-level feeds, this should be a no-op since there are no
377+
// filter commands as asserted above. Otherwise, apply each filter in order.
378+
for _, filter := range filterCommands {
379+
currentFilter := stmt.FilterOption
380+
// Note that the no-filters state is represented by FilterType = exclude
381+
// with an empty tables list. In that state, it's perfectly valid to
382+
// set or unset an Include filter.
383+
if len(currentFilter.Tables) > 0 && currentFilter.FilterType != filter.FilterType {
384+
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
385+
}
386+
if len(filter.Tables) > 0 {
387+
stmt.FilterOption = tree.ChangefeedFilterOption{
388+
FilterType: filter.FilterType,
389+
Tables: filter.Tables,
390+
}
391+
} else {
392+
stmt.FilterOption = tree.DefaultChangefeedFilterOption()
393+
}
394+
}
395+
396+
return nil
397+
}
398+
271399
func getTargetDesc(
272400
ctx context.Context,
273401
p sql.PlanHookState,
@@ -375,6 +503,7 @@ func generateAndValidateNewTargets(
375503
ctx context.Context,
376504
exprEval exprutil.Evaluator,
377505
p sql.PlanHookState,
506+
level tree.ChangefeedLevel,
378507
alterCmds tree.AlterChangefeedCmds,
379508
opts changefeedbase.StatementOptions,
380509
prevDetails jobspb.ChangefeedDetails,
@@ -387,7 +516,42 @@ func generateAndValidateNewTargets(
387516
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
388517
error,
389518
) {
519+
if level == tree.ChangefeedLevelTable {
520+
return generateAndValidateNewTargetsForTableLevelFeed(
521+
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
522+
)
523+
}
524+
// Database-level feeds do not support ADD/DROP target commands, so we validate
525+
// that we haven't set any, otherwise, this is a no-op.
526+
isAlteringTargets := slices.ContainsFunc(alterCmds, func(cmd tree.AlterChangefeedCmd) bool {
527+
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
528+
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
529+
return isAdd || isDrop
530+
})
531+
if isAlteringTargets {
532+
return nil, nil, hlc.Timestamp{}, nil, errors.Errorf("cannot alter targets for a database level changefeed")
533+
}
534+
535+
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
536+
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
537+
}
390538

539+
func generateAndValidateNewTargetsForTableLevelFeed(
540+
ctx context.Context,
541+
exprEval exprutil.Evaluator,
542+
p sql.PlanHookState,
543+
alterCmds tree.AlterChangefeedCmds,
544+
opts changefeedbase.StatementOptions,
545+
prevDetails jobspb.ChangefeedDetails,
546+
prevProgress jobspb.Progress,
547+
sinkURI string,
548+
) (
549+
tree.ChangefeedTableTargets,
550+
*jobspb.Progress,
551+
hlc.Timestamp,
552+
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
553+
error,
554+
) {
391555
type targetKey struct {
392556
TableID descpb.ID
393557
FamilyName tree.Name
@@ -870,6 +1034,55 @@ func generateNewProgress(
8701034
return newProgress, prevStatementTime, nil
8711035
}
8721036

1037+
// storeNewTablesJobFrontier uses the persistentjob frontier to ensure that when
1038+
// we resume the changefeed, new tables being watched by the database level feed
1039+
// (due to ALTER CHANGEFEED changing the filter options) are watched only from
1040+
// the time of the ALTER CHANGEFEED statement.
1041+
func storeNewTablesJobFrontier(
1042+
ctx context.Context,
1043+
p sql.PlanHookState,
1044+
jobID jobspb.JobID,
1045+
ts hlc.Timestamp,
1046+
targets changefeedbase.Targets,
1047+
prevTargets changefeedbase.Targets,
1048+
newDetails jobspb.ChangefeedDetails,
1049+
prevDetails jobspb.ChangefeedDetails,
1050+
) error {
1051+
getSpansForTargets := func(targets changefeedbase.Targets, details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
1052+
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, ts)
1053+
if err != nil {
1054+
return nil, err
1055+
}
1056+
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, ts)
1057+
if err != nil {
1058+
return nil, err
1059+
}
1060+
return spans, nil
1061+
}
1062+
1063+
existingTargetSpans, err := getSpansForTargets(prevTargets, prevDetails)
1064+
if err != nil {
1065+
return err
1066+
}
1067+
trackedSpans, err := getSpansForTargets(targets, newDetails)
1068+
if err != nil {
1069+
return err
1070+
}
1071+
1072+
var newTrackedSpans roachpb.SpanGroup
1073+
newTrackedSpans.Add(trackedSpans...)
1074+
newTrackedSpans.Sub(existingTargetSpans...)
1075+
1076+
frontier, err := span.MakeFrontierAt(ts, newTrackedSpans.Slice()...)
1077+
if err != nil {
1078+
return err
1079+
}
1080+
1081+
if err := jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier); err != nil {
1082+
return err
1083+
}
1084+
return nil
1085+
}
8731086
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
8741087
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
8751088
if err != nil {
@@ -934,6 +1147,55 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
9341147
return primarySpans
9351148
}
9361149

1150+
// parseFilterOptionFromTargetSpec parses the existing filter option from the
1151+
// target specification so that it can be used to generate the new filter option.
1152+
func parseFilterOptionFromTargetSpec(
1153+
targetSpec jobspb.ChangefeedTargetSpecification,
1154+
) (tree.ChangefeedFilterOption, error) {
1155+
var tables tree.TableNames
1156+
if targetSpec.FilterList == nil {
1157+
return tree.ChangefeedFilterOption{}, errors.AssertionFailedf("filter list is nil")
1158+
}
1159+
for table := range targetSpec.FilterList.Tables {
1160+
// Parse the fully-qualified table name string back into a TableName object.
1161+
unresolvedName, err := parser.ParseTableName(table)
1162+
if err != nil {
1163+
return tree.ChangefeedFilterOption{}, err
1164+
}
1165+
tableName := unresolvedName.ToTableName()
1166+
tables = append(tables, tableName)
1167+
}
1168+
return tree.ChangefeedFilterOption{
1169+
FilterType: targetSpec.FilterList.FilterType,
1170+
Tables: tables,
1171+
}, nil
1172+
}
1173+
1174+
// generateNewFilters processes alter changefeed commands and extracts filter changes.
1175+
// It returns a slice of Filter structs containing all filter modifications.
1176+
func generateNewFilters(
1177+
alterCmds tree.AlterChangefeedCmds,
1178+
) (filters []tree.ChangefeedFilterOption) {
1179+
for _, cmd := range alterCmds {
1180+
switch v := cmd.(type) {
1181+
case *tree.AlterChangefeedSetFilterOption:
1182+
filters = append(filters, tree.ChangefeedFilterOption{
1183+
FilterType: v.ChangefeedFilterOption.FilterType,
1184+
Tables: v.ChangefeedFilterOption.Tables,
1185+
})
1186+
case *tree.AlterChangefeedUnsetFilterOption:
1187+
// Here we do not return the DefaultChangefeedFilterOption (Exclude
1188+
// with an empty tables list) so we can treat UNSET INCLUDE TABLES and
1189+
// UNSET EXCLUDE TABLES differently.
1190+
filters = append(filters, tree.ChangefeedFilterOption{
1191+
FilterType: v.ChangefeedFilterOption.FilterType,
1192+
})
1193+
}
1194+
}
1195+
1196+
return filters
1197+
}
1198+
9371199
func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
9381200
prevStmt, err := parser.ParseOne(prevDescription)
9391201
if err != nil {

0 commit comments

Comments
 (0)