Skip to content

Commit 6f53c23

Browse files
committed
changefeedccl: support altering filters on db-level feeds
This commit allows users to change filters on existing DB-level changefeeds with commands like ALTER CHANGEFEED <job_id> SET INCLUDE TABLES (including EXCLUDE or UNSET). This also implements errors when trying to set filters on non-DB-level feeds, ADD/DROP targets on DB-level feeds, setting both INCLUDE and EXCLUDE at the same time, and switching between them without first doing an explicit unset. Note that, when setting an include filter on a feed that already has a filter set, tables not explicitly respecified in the ALTER CHANGEFEED command will no longer be included (or excluded) by the filter. Epic: CRDB-1421 Fixes: #156484 Fixes: #155986 Informs: #155708 Release note: none
1 parent faddc23 commit 6f53c23

File tree

3 files changed

+656
-34
lines changed

3 files changed

+656
-34
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 181 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"maps"
1111
"net/url"
12+
"slices"
1213

1314
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1415
"github.com/cockroachdb/cockroach/pkg/build"
@@ -55,6 +56,9 @@ func alterChangefeedTypeCheck(
5556
toCheck := []exprutil.ToTypeCheck{
5657
exprutil.Ints{alterChangefeedStmt.Jobs},
5758
}
59+
// TODO(#156806): Validate the options for ADD and UNSET alter commands
60+
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
61+
// invalid option.
5862
for _, cmd := range alterChangefeedStmt.Cmds {
5963
switch v := cmd.(type) {
6064
case *tree.AlterChangefeedSetOptions:
@@ -130,6 +134,18 @@ func alterChangefeedPlanHook(
130134

131135
newChangefeedStmt := &tree.CreateChangefeed{}
132136

137+
// DB-level feeds should have exactly one target specification, but
138+
// table-level feeds should have at least one.
139+
if len(prevDetails.TargetSpecifications) == 0 {
140+
return errors.AssertionFailedf("no target specifications found for changefeed")
141+
}
142+
143+
if prevDetails.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE {
144+
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
145+
} else {
146+
newChangefeedStmt.Level = tree.ChangefeedLevelTable
147+
}
148+
133149
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
134150
if err != nil {
135151
return err
@@ -149,9 +165,35 @@ func alterChangefeedPlanHook(
149165
if err := validateSettings(ctx, st != changefeedbase.OnlyInitialScan, p.ExecCfg()); err != nil {
150166
return err
151167
}
168+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase && st != changefeedbase.NoInitialScan {
169+
return errors.Errorf("cannot perform initial scan on a database level changefeed")
170+
}
171+
172+
// DB-level feeds have a DatabaseTarget in the changefeed statement,
173+
// unlike table-level feeds which have TableTargets. Set those here.
174+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
175+
targetSpec := prevDetails.TargetSpecifications[0]
176+
txn := p.InternalSQLTxn()
177+
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
178+
if err != nil {
179+
return err
180+
}
181+
dbName := databaseDescriptor.GetName()
182+
newChangefeedStmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(dbName)
183+
}
184+
185+
isAlteringTargets := slices.ContainsFunc(alterChangefeedStmt.Cmds, func(cmd tree.AlterChangefeedCmd) bool {
186+
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
187+
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
188+
return isAdd || isDrop
189+
})
190+
if isAlteringTargets && newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
191+
return errors.Errorf("cannot alter targets for a database level changefeed")
192+
}
152193

153194
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
154195
ctx, exprEval, p,
196+
newChangefeedStmt.Level,
155197
alterChangefeedStmt.Cmds,
156198
newOptions,
157199
prevDetails, job.Progress(),
@@ -160,7 +202,57 @@ func alterChangefeedPlanHook(
160202
if err != nil {
161203
return err
162204
}
163-
newChangefeedStmt.TableTargets = newTargets
205+
206+
// Only table-level feeds should have table targets in their changefeed statement.
207+
if newChangefeedStmt.Level == tree.ChangefeedLevelTable {
208+
newChangefeedStmt.TableTargets = newTargets
209+
}
210+
211+
// Now we process the filters. Only DB-level feeds should have filters.
212+
filters := generateNewFilters(alterChangefeedStmt.Cmds)
213+
if len(filters) > 0 && newChangefeedStmt.Level != tree.ChangefeedLevelDatabase {
214+
return errors.Errorf("filters are only supported for database level changefeeds")
215+
}
216+
217+
// We add the existing filter state to the changefeed statement for
218+
// DB-level feeds. This ensures that existing filters are preserved when
219+
// we alter options unrelated to filters and that we can error if we try
220+
// to set both include and exclude filters at the same time.
221+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
222+
targetSpec := prevDetails.TargetSpecifications[0]
223+
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
224+
if err != nil {
225+
return err
226+
}
227+
newChangefeedStmt.FilterOption = filterOpt
228+
}
229+
230+
// For table-level feeds, this should be a no-op since there are no
231+
// filters as asserted above. Otherwise, apply each filter in order.
232+
for _, filter := range filters {
233+
currentFilter := newChangefeedStmt.FilterOption
234+
// Note that the no-filters state is represented by FilterType = exclude
235+
// with an empty tables list. In that state, it's perfectly valid to
236+
// set or unset an Include filter.
237+
if len(currentFilter.Tables) > 0 && currentFilter.FilterType != filter.FilterType {
238+
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
239+
}
240+
if len(filter.Tables) > 0 {
241+
newChangefeedStmt.FilterOption = tree.ChangefeedFilterOption{
242+
FilterType: filter.FilterType,
243+
Tables: filter.Tables,
244+
}
245+
} else {
246+
// When unsetting a filter, the default case is to EXCLUDE nothing.
247+
// This will be the result after doing either UNSET INCLUDE TABLES
248+
// or UNSET EXCLUDE TABLES. INCLUDE with an empty tables list is
249+
// not a valid filter.
250+
newChangefeedStmt.FilterOption = tree.ChangefeedFilterOption{
251+
FilterType: tree.ExcludeFilter,
252+
Tables: tree.TableNames{},
253+
}
254+
}
255+
}
164256

165257
if prevDetails.Select != "" {
166258
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -218,7 +310,13 @@ func alterChangefeedPlanHook(
218310
}
219311

220312
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
221-
newDetails.Opts[changefeedbase.OptInitialScan] = ``
313+
if newChangefeedStmt.Level != tree.ChangefeedLevelDatabase {
314+
// If the changefeed is a table-level changefeed, we set the initial
315+
// scan option to its default value of "yes" so that we can do an
316+
// initial scan for new targets. We do not allow initial scans on
317+
// database level changefeeds.
318+
newDetails.Opts[changefeedbase.OptInitialScan] = ``
319+
}
222320

223321
// newStatementTime will either be the StatementTime of the job prior to the
224322
// alteration, or it will be the high watermark of the job.
@@ -375,6 +473,7 @@ func generateAndValidateNewTargets(
375473
ctx context.Context,
376474
exprEval exprutil.Evaluator,
377475
p sql.PlanHookState,
476+
level tree.ChangefeedLevel,
378477
alterCmds tree.AlterChangefeedCmds,
379478
opts changefeedbase.StatementOptions,
380479
prevDetails jobspb.ChangefeedDetails,
@@ -387,7 +486,38 @@ func generateAndValidateNewTargets(
387486
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
388487
error,
389488
) {
489+
if level == tree.ChangefeedLevelTable {
490+
return generateAndValidateNewTargetsForTableLevelFeed(
491+
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
492+
)
493+
}
494+
495+
// Since we do not allow initial scans on database level changefeeds
496+
// and database level feeds do not support ADD/DROP tables commands,
497+
// we do not need to call generateAndValidateNewTargetsForTableLevelFeed.
498+
// When filter changes result in changes to which tables are being tracked,
499+
// we can rely on startDistChangefeed to handle removing/adding tables to
500+
// the progress.
501+
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
502+
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
503+
}
390504

505+
func generateAndValidateNewTargetsForTableLevelFeed(
506+
ctx context.Context,
507+
exprEval exprutil.Evaluator,
508+
p sql.PlanHookState,
509+
alterCmds tree.AlterChangefeedCmds,
510+
opts changefeedbase.StatementOptions,
511+
prevDetails jobspb.ChangefeedDetails,
512+
prevProgress jobspb.Progress,
513+
sinkURI string,
514+
) (
515+
tree.ChangefeedTableTargets,
516+
*jobspb.Progress,
517+
hlc.Timestamp,
518+
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
519+
error,
520+
) {
391521
type targetKey struct {
392522
TableID descpb.ID
393523
FamilyName tree.Name
@@ -930,6 +1060,55 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
9301060
return primarySpans
9311061
}
9321062

1063+
// Filter represents a single filter configuration for a changefeed.
1064+
// It contains information about which tables to include or exclude.
1065+
type Filter struct {
1066+
// FilterType indicates whether this is an include or exclude filter
1067+
FilterType tree.FilterType
1068+
// Tables contains the list of table names to filter
1069+
Tables tree.TableNames
1070+
}
1071+
1072+
func parseFilterOptionFromTargetSpec(
1073+
targetSpec jobspb.ChangefeedTargetSpecification,
1074+
) (tree.ChangefeedFilterOption, error) {
1075+
var tables tree.TableNames
1076+
for table := range targetSpec.FilterList.Tables {
1077+
// Parse the fully-qualified table name string back into a TableName object.
1078+
unresolvedName, err := parser.ParseTableName(table)
1079+
if err != nil {
1080+
return tree.ChangefeedFilterOption{}, err
1081+
}
1082+
tableName := unresolvedName.ToTableName()
1083+
tables = append(tables, tableName)
1084+
}
1085+
return tree.ChangefeedFilterOption{
1086+
FilterType: targetSpec.FilterList.FilterType,
1087+
Tables: tables,
1088+
}, nil
1089+
}
1090+
1091+
// generateNewFilters processes alter changefeed commands and extracts filter changes.
1092+
// It returns a slice of Filter structs containing all filter modifications.
1093+
func generateNewFilters(alterCmds tree.AlterChangefeedCmds) (filters []Filter) {
1094+
for _, cmd := range alterCmds {
1095+
switch v := cmd.(type) {
1096+
case *tree.AlterChangefeedSetFilterOption:
1097+
filters = append(filters, Filter{
1098+
FilterType: v.ChangefeedFilterOption.FilterType,
1099+
Tables: v.ChangefeedFilterOption.Tables,
1100+
})
1101+
case *tree.AlterChangefeedUnsetFilterOption:
1102+
filters = append(filters, Filter{
1103+
FilterType: v.ChangefeedFilterOption.FilterType,
1104+
Tables: nil,
1105+
})
1106+
}
1107+
}
1108+
1109+
return filters
1110+
}
1111+
9331112
func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
9341113
prevStmt, err := parser.ParseOne(prevDescription)
9351114
if err != nil {

0 commit comments

Comments
 (0)