Skip to content

Commit c127b6c

Browse files
committed
changefeedccl: support altering filters on db-level feeds
This commit allows users to change filters on existing DB-level changefeeds with commands like ALTER CHANGEFEED <job_id> SET INCLUDE TABLES <table> (including EXCLUDE or UNSET). This also implements errors when trying to set filters on non-DB-level feeds, ADD/DROP targets on DB-level feeds and setting both INCLUDE and EXCLUDE at the same time, or switching between them without first doing an explicit unset. Note that, when setting an include filter on a feed that already has a filter set, tables not explicitly respecified in the ALTER CHANGEFEED command will no longer be included (or excluded) by the filter. Epic: CRDB-1421 Fixes: #156484 Fixes: #155986 Informs: #155708 Release note: none
1 parent 528bc77 commit c127b6c

File tree

4 files changed

+861
-196
lines changed

4 files changed

+861
-196
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 172 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010
"maps"
1111
"net/url"
12+
"slices"
1213

1314
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1415
"github.com/cockroachdb/cockroach/pkg/build"
@@ -55,6 +56,9 @@ func alterChangefeedTypeCheck(
5556
toCheck := []exprutil.ToTypeCheck{
5657
exprutil.Ints{alterChangefeedStmt.Jobs},
5758
}
59+
// TODO(#156806): Validate the options for ADD and UNSET alter commands
60+
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
61+
// invalid option.
5862
for _, cmd := range alterChangefeedStmt.Cmds {
5963
switch v := cmd.(type) {
6064
case *tree.AlterChangefeedSetOptions:
@@ -130,6 +134,18 @@ func alterChangefeedPlanHook(
130134

131135
newChangefeedStmt := &tree.CreateChangefeed{}
132136

137+
// DB-level feeds should have exactly one target specification, but
138+
// table-level feeds should have at least one.
139+
if len(prevDetails.TargetSpecifications) == 0 {
140+
return errors.AssertionFailedf("no target specifications found for changefeed")
141+
}
142+
143+
if prevDetails.TargetSpecifications[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE {
144+
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
145+
} else {
146+
newChangefeedStmt.Level = tree.ChangefeedLevelTable
147+
}
148+
133149
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
134150
if err != nil {
135151
return err
@@ -149,9 +165,35 @@ func alterChangefeedPlanHook(
149165
if err := validateSettings(ctx, st != changefeedbase.OnlyInitialScan, p.ExecCfg()); err != nil {
150166
return err
151167
}
168+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase && st != changefeedbase.NoInitialScan {
169+
return errors.Errorf("cannot perform initial scan on a database level changefeed")
170+
}
171+
172+
// DB-level feeds have a DatabaseTarget in the changefeed statement,
173+
// unlike table-level feeds which have TableTargets. Set those here.
174+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
175+
targetSpec := prevDetails.TargetSpecifications[0]
176+
txn := p.InternalSQLTxn()
177+
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
178+
if err != nil {
179+
return err
180+
}
181+
dbName := databaseDescriptor.GetName()
182+
newChangefeedStmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(dbName)
183+
}
184+
185+
isAlteringTargets := slices.ContainsFunc(alterChangefeedStmt.Cmds, func(cmd tree.AlterChangefeedCmd) bool {
186+
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
187+
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
188+
return isAdd || isDrop
189+
})
190+
if isAlteringTargets && newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
191+
return errors.Errorf("cannot alter targets for a database level changefeed")
192+
}
152193

153194
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
154195
ctx, exprEval, p,
196+
newChangefeedStmt.Level,
155197
alterChangefeedStmt.Cmds,
156198
newOptions,
157199
prevDetails, job.Progress(),
@@ -160,7 +202,57 @@ func alterChangefeedPlanHook(
160202
if err != nil {
161203
return err
162204
}
163-
newChangefeedStmt.TableTargets = newTargets
205+
206+
// Only table-level feeds should have table targets in their changefeed statement.
207+
if newChangefeedStmt.Level == tree.ChangefeedLevelTable {
208+
newChangefeedStmt.TableTargets = newTargets
209+
}
210+
211+
// Now we process the filters. Only DB-level feeds should have filters.
212+
filters := generateNewFilters(alterChangefeedStmt.Cmds)
213+
if len(filters) > 0 && newChangefeedStmt.Level != tree.ChangefeedLevelDatabase {
214+
return errors.Errorf("filters are only supported for database level changefeeds")
215+
}
216+
217+
// We add the existing filter state to the changefeed statement for
218+
// DB-level feeds. This ensures that existing filters are preserved when
219+
// we alter options unrelated to filters and that we can error if we try
220+
// to set both include and exclude filters at the same time.
221+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
222+
targetSpec := prevDetails.TargetSpecifications[0]
223+
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
224+
if err != nil {
225+
return err
226+
}
227+
newChangefeedStmt.FilterOption = filterOpt
228+
}
229+
230+
// For table-level feeds, this should be a no-op since there are no
231+
// filters as asserted above. Otherwise, apply each filter in order.
232+
for _, filter := range filters {
233+
currentFilter := newChangefeedStmt.FilterOption
234+
// Note that the no-filters state is represented by FilterType = exclude
235+
// with an empty tables list. In that state, it's perfectly valid to
236+
// set or unset an Include filter.
237+
if len(currentFilter.Tables) > 0 && currentFilter.FilterType != filter.FilterType {
238+
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
239+
}
240+
if len(filter.Tables) > 0 {
241+
newChangefeedStmt.FilterOption = tree.ChangefeedFilterOption{
242+
FilterType: filter.FilterType,
243+
Tables: filter.Tables,
244+
}
245+
} else {
246+
// When unsetting a filter, the default case is to EXCLUDE nothing.
247+
// This will be the result after doing either UNSET INCLUDE TABLES
248+
// or UNSET EXCLUDE TABLES. INCLUDE with an empty tables list is
249+
// not a valid filter.
250+
newChangefeedStmt.FilterOption = tree.ChangefeedFilterOption{
251+
FilterType: tree.ExcludeFilter,
252+
Tables: tree.TableNames{},
253+
}
254+
}
255+
}
164256

165257
if prevDetails.Select != "" {
166258
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -218,7 +310,9 @@ func alterChangefeedPlanHook(
218310
}
219311

220312
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
221-
newDetails.Opts[changefeedbase.OptInitialScan] = ``
313+
if newChangefeedStmt.Level != tree.ChangefeedLevelDatabase {
314+
newDetails.Opts[changefeedbase.OptInitialScan] = ``
315+
}
222316

223317
// newStatementTime will either be the StatementTime of the job prior to the
224318
// alteration, or it will be the high watermark of the job.
@@ -372,6 +466,33 @@ func generateNewOpts(
372466
}
373467

374468
func generateAndValidateNewTargets(
469+
ctx context.Context,
470+
exprEval exprutil.Evaluator,
471+
p sql.PlanHookState,
472+
level tree.ChangefeedLevel,
473+
alterCmds tree.AlterChangefeedCmds,
474+
opts changefeedbase.StatementOptions,
475+
prevDetails jobspb.ChangefeedDetails,
476+
prevProgress jobspb.Progress,
477+
sinkURI string,
478+
) (tree.ChangefeedTableTargets, *jobspb.Progress, hlc.Timestamp, map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification, error) {
479+
if level == tree.ChangefeedLevelTable {
480+
return generateAndValidateNewTargetsForTableLevelFeed(
481+
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
482+
)
483+
}
484+
485+
// Since we do not allow initial scans on database level changefeeds
486+
// and database level feeds do not support ADD/DROP tables commands,
487+
// we do not need to call generateAndValidateNewTargetsForTableLevelFeed.
488+
// When filter changes result in changes to which tables are being tracked,
489+
// we can rely on startDistChangefeed to handle removing/adding tables to
490+
// the progress.
491+
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
492+
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
493+
}
494+
495+
func generateAndValidateNewTargetsForTableLevelFeed(
375496
ctx context.Context,
376497
exprEval exprutil.Evaluator,
377498
p sql.PlanHookState,
@@ -387,7 +508,6 @@ func generateAndValidateNewTargets(
387508
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
388509
error,
389510
) {
390-
391511
type targetKey struct {
392512
TableID descpb.ID
393513
FamilyName tree.Name
@@ -930,6 +1050,55 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
9301050
return primarySpans
9311051
}
9321052

1053+
// Filter represents a single filter configuration for a changefeed.
1054+
// It contains information about which tables to include or exclude.
1055+
type Filter struct {
1056+
// FilterType indicates whether this is an include or exclude filter
1057+
FilterType tree.FilterType
1058+
// Tables contains the list of table names to filter
1059+
Tables tree.TableNames
1060+
}
1061+
1062+
func parseFilterOptionFromTargetSpec(
1063+
targetSpec jobspb.ChangefeedTargetSpecification,
1064+
) (tree.ChangefeedFilterOption, error) {
1065+
var tables tree.TableNames
1066+
for table := range targetSpec.FilterList.Tables {
1067+
// Parse the fully-qualified table name string back into a TableName object.
1068+
unresolvedName, err := parser.ParseTableName(table)
1069+
if err != nil {
1070+
return tree.ChangefeedFilterOption{}, err
1071+
}
1072+
tableName := unresolvedName.ToTableName()
1073+
tables = append(tables, tableName)
1074+
}
1075+
return tree.ChangefeedFilterOption{
1076+
FilterType: targetSpec.FilterList.FilterType,
1077+
Tables: tables,
1078+
}, nil
1079+
}
1080+
1081+
// generateNewFilters processes alter changefeed commands and extracts filter changes.
1082+
// It returns a slice of Filter structs containing all filter modifications.
1083+
func generateNewFilters(alterCmds tree.AlterChangefeedCmds) (filters []Filter) {
1084+
for _, cmd := range alterCmds {
1085+
switch v := cmd.(type) {
1086+
case *tree.AlterChangefeedSetFilterOption:
1087+
filters = append(filters, Filter{
1088+
FilterType: v.ChangefeedFilterOption.FilterType,
1089+
Tables: v.ChangefeedFilterOption.Tables,
1090+
})
1091+
case *tree.AlterChangefeedUnsetFilterOption:
1092+
filters = append(filters, Filter{
1093+
FilterType: v.ChangefeedFilterOption.FilterType,
1094+
Tables: nil,
1095+
})
1096+
}
1097+
}
1098+
1099+
return filters
1100+
}
1101+
9331102
func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
9341103
prevStmt, err := parser.ParseOne(prevDescription)
9351104
if err != nil {

0 commit comments

Comments
 (0)