Skip to content

Commit 17d0c30

Browse files
committed
changefeedccl: support altering filters on db-level feeds
This commit allows users to change filters on existing DB-level changefeeds with commands like ALTER CHANGEFEED <job_id> SET INCLUDE TABLES (including EXCLUDE or UNSET). This also implements errors when trying to set filters on non-DB-level feeds, ADD/DROP targets on DB-level feeds, setting both INCLUDE and EXCLUDE at the same time, and switching between them without first doing an explicit unset. Note that, when setting an include filter on a feed that already has a filter set, tables not explicitly respecified in the ALTER CHANGEFEED command will no longer be included (or excluded) by the filter. When filter changes result in new tables being watched, they will not receive an initial scan. Epic: CRDB-1421 Fixes: #156484 Fixes: #155986 Informs: #155708 Release note: none
1 parent fd29a93 commit 17d0c30

File tree

3 files changed

+1094
-3
lines changed

3 files changed

+1094
-3
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 287 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import (
99
"context"
1010
"maps"
1111
"net/url"
12+
"slices"
1213

1314
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1415
"github.com/cockroachdb/cockroach/pkg/build"
1516
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1819
"github.com/cockroachdb/cockroach/pkg/jobs"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1921
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2022
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -35,6 +37,7 @@ import (
3537
"github.com/cockroachdb/cockroach/pkg/sql/types"
3638
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3739
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/span"
3841
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3942
"github.com/cockroachdb/errors"
4043
)
@@ -55,6 +58,9 @@ func alterChangefeedTypeCheck(
5558
toCheck := []exprutil.ToTypeCheck{
5659
exprutil.Ints{alterChangefeedStmt.Jobs},
5760
}
61+
// TODO(#156806): Validate the options for ADD and UNSET alter commands
62+
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
63+
// invalid option.
5864
for _, cmd := range alterChangefeedStmt.Cmds {
5965
switch v := cmd.(type) {
6066
case *tree.AlterChangefeedSetOptions:
@@ -129,6 +135,14 @@ func alterChangefeedPlanHook(
129135
}
130136

131137
newChangefeedStmt := &tree.CreateChangefeed{}
138+
if isDBLevelChangefeed(prevDetails) {
139+
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
140+
if len(prevDetails.TargetSpecifications) != 1 {
141+
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
142+
}
143+
} else {
144+
newChangefeedStmt.Level = tree.ChangefeedLevelTable
145+
}
132146

133147
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
134148
if err != nil {
@@ -152,6 +166,7 @@ func alterChangefeedPlanHook(
152166

153167
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
154168
ctx, exprEval, p,
169+
newChangefeedStmt.Level,
155170
alterChangefeedStmt.Cmds,
156171
newOptions,
157172
prevDetails, job.Progress(),
@@ -160,7 +175,14 @@ func alterChangefeedPlanHook(
160175
if err != nil {
161176
return err
162177
}
163-
newChangefeedStmt.TableTargets = newTargets
178+
179+
if err := setChangefeedTargets(ctx, p, newChangefeedStmt, prevDetails, newTargets); err != nil {
180+
return err
181+
}
182+
183+
if err := processFiltersForChangefeed(newChangefeedStmt, alterChangefeedStmt.Cmds, prevDetails); err != nil {
184+
return err
185+
}
164186

165187
if prevDetails.Select != "" {
166188
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -218,12 +240,54 @@ func alterChangefeedPlanHook(
218240
}
219241

220242
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
221-
newDetails.Opts[changefeedbase.OptInitialScan] = ``
222-
223243
// newStatementTime will either be the StatementTime of the job prior to the
224244
// alteration, or it will be the high watermark of the job.
225245
newDetails.StatementTime = newStatementTime
226246

247+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
248+
// For database level changefeeds, we may have set a new value for
249+
// the filter options. In this case, the set of tables we are watching
250+
// may have changed. We need to store the new frontier so that we only
251+
// emit events for the new tables from the time of the ALTER CHANGEFEED
252+
// statement, but not before.
253+
254+
// NB: We do not need to remove spans for tables that are no longer
255+
// watched. When the changefeed is resumed, we will make the frontier
256+
// so that it will only watch the new set of watched tables. This means
257+
// that if altering a filter results in us watching fewer tables,
258+
// we will not emit ANY more events for that table, even if it was
259+
// lagging, i.e. even if they corresponded to updates made before
260+
// the ALTER CHANGEFEED statement.
261+
262+
statementTime := hlc.Timestamp{
263+
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
264+
}
265+
prevTargets, err := AllTargets(ctx, prevDetails, p.ExecCfg(), statementTime)
266+
if err != nil {
267+
return err
268+
}
269+
// These targets are different from the ones returned by the call to
270+
// createChangefeedJobRecord above since those are evaluated at the
271+
// highwater. In order to know which tables we should start watching
272+
// at this alter changefeed statement time, we need to evaluate if the
273+
// targets change at this time, even though they may have been created
274+
// between the highwater and the alter changefeed statement.
275+
newTargets, err := AllTargets(ctx, newDetails, p.ExecCfg(), statementTime)
276+
if err != nil {
277+
return err
278+
}
279+
if err := storeNewTablesJobFrontier(
280+
ctx, p, jobID, statementTime, newTargets, prevTargets, newDetails, prevDetails,
281+
); err != nil {
282+
return err
283+
}
284+
} else {
285+
// If the changefeed is a table-level changefeed, we set the initial
286+
// scan option to its default value of "yes" so that we can do an
287+
// initial scan for new targets.
288+
newDetails.Opts[changefeedbase.OptInitialScan] = ``
289+
}
290+
227291
newPayload := job.Payload()
228292
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
229293
newPayload.Description = jobRecord.Description
@@ -268,6 +332,80 @@ func alterChangefeedPlanHook(
268332
return fn, alterChangefeedHeader, false, nil
269333
}
270334

335+
// setChangefeedTargets sets the appropriate target fields on the changefeed
336+
// statement based on the feed level. Database-level feeds get a DatabaseTarget,
337+
// while table-level feeds get TableTargets.
338+
func setChangefeedTargets(
339+
ctx context.Context,
340+
p sql.PlanHookState,
341+
stmt *tree.CreateChangefeed,
342+
prevDetails jobspb.ChangefeedDetails,
343+
newTargets tree.ChangefeedTableTargets,
344+
) error {
345+
if stmt.Level == tree.ChangefeedLevelDatabase {
346+
targetSpec := prevDetails.TargetSpecifications[0]
347+
txn := p.InternalSQLTxn()
348+
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
349+
if err != nil {
350+
return err
351+
}
352+
stmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(databaseDescriptor.GetName())
353+
} else {
354+
stmt.TableTargets = newTargets
355+
}
356+
return nil
357+
}
358+
359+
// processFiltersForChangefeed validates and applies filter changes to the
360+
// changefeed statement. Filters are only supported for database-level feeds.
361+
func processFiltersForChangefeed(
362+
stmt *tree.CreateChangefeed,
363+
alterCmds tree.AlterChangefeedCmds,
364+
prevDetails jobspb.ChangefeedDetails,
365+
) error {
366+
filterCommands := generateNewFilters(alterCmds)
367+
368+
// Only DB-level feeds should have filters.
369+
if len(filterCommands) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
370+
return errors.Errorf("filters are only supported for database level changefeeds")
371+
}
372+
373+
// We add the existing filter state to the changefeed statement for
374+
// DB-level feeds. This ensures that existing filters are preserved when
375+
// we alter options unrelated to filters and that we can error if we try
376+
// to set both include and exclude filters at the same time.
377+
if stmt.Level == tree.ChangefeedLevelDatabase {
378+
targetSpec := prevDetails.TargetSpecifications[0]
379+
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
380+
if err != nil {
381+
return err
382+
}
383+
stmt.FilterOption = filterOpt
384+
}
385+
386+
// For table-level feeds, this should be a no-op since there are no
387+
// filter commands as asserted above. Otherwise, apply each filter in order.
388+
for _, filter := range filterCommands {
389+
currentFilter := stmt.FilterOption
390+
// Note that the no-filters state is represented by FilterType = exclude
391+
// with an empty tables list. In that state, it's perfectly valid to
392+
// set or unset an Include filter.
393+
if len(currentFilter.Tables) > 0 && currentFilter.FilterType != filter.FilterType {
394+
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
395+
}
396+
if len(filter.Tables) > 0 {
397+
stmt.FilterOption = tree.ChangefeedFilterOption{
398+
FilterType: filter.FilterType,
399+
Tables: filter.Tables,
400+
}
401+
} else {
402+
stmt.FilterOption = tree.DefaultChangefeedFilterOption()
403+
}
404+
}
405+
406+
return nil
407+
}
408+
271409
func getTargetDesc(
272410
ctx context.Context,
273411
p sql.PlanHookState,
@@ -375,6 +513,7 @@ func generateAndValidateNewTargets(
375513
ctx context.Context,
376514
exprEval exprutil.Evaluator,
377515
p sql.PlanHookState,
516+
level tree.ChangefeedLevel,
378517
alterCmds tree.AlterChangefeedCmds,
379518
opts changefeedbase.StatementOptions,
380519
prevDetails jobspb.ChangefeedDetails,
@@ -387,7 +526,42 @@ func generateAndValidateNewTargets(
387526
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
388527
error,
389528
) {
529+
if level == tree.ChangefeedLevelTable {
530+
return generateAndValidateNewTargetsForTableLevelFeed(
531+
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
532+
)
533+
}
534+
// Database-level feeds do not support ADD/DROP target commands, so we validate
535+
// that we haven't set any, otherwise, this is a no-op.
536+
isAlteringTargets := slices.ContainsFunc(alterCmds, func(cmd tree.AlterChangefeedCmd) bool {
537+
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
538+
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
539+
return isAdd || isDrop
540+
})
541+
if isAlteringTargets {
542+
return nil, nil, hlc.Timestamp{}, nil, errors.Errorf("cannot alter targets for a database level changefeed")
543+
}
390544

545+
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
546+
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
547+
}
548+
549+
func generateAndValidateNewTargetsForTableLevelFeed(
550+
ctx context.Context,
551+
exprEval exprutil.Evaluator,
552+
p sql.PlanHookState,
553+
alterCmds tree.AlterChangefeedCmds,
554+
opts changefeedbase.StatementOptions,
555+
prevDetails jobspb.ChangefeedDetails,
556+
prevProgress jobspb.Progress,
557+
sinkURI string,
558+
) (
559+
tree.ChangefeedTableTargets,
560+
*jobspb.Progress,
561+
hlc.Timestamp,
562+
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
563+
error,
564+
) {
391565
type targetKey struct {
392566
TableID descpb.ID
393567
FamilyName tree.Name
@@ -870,6 +1044,67 @@ func generateNewProgress(
8701044
return newProgress, prevStatementTime, nil
8711045
}
8721046

1047+
// storeNewTablesJobFrontier uses the persistentjob frontier to ensure that when
1048+
// we resume the changefeed, new tables being watched by the database level feed
1049+
// (due to ALTER CHANGEFEED changing the filter options) are watched only from
1050+
// the time of the ALTER CHANGEFEED statement.
1051+
func storeNewTablesJobFrontier(
1052+
ctx context.Context,
1053+
p sql.PlanHookState,
1054+
jobID jobspb.JobID,
1055+
ts hlc.Timestamp,
1056+
targets changefeedbase.Targets,
1057+
prevTargets changefeedbase.Targets,
1058+
newDetails jobspb.ChangefeedDetails,
1059+
prevDetails jobspb.ChangefeedDetails,
1060+
) error {
1061+
getSpansForTargets := func(targets changefeedbase.Targets, details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
1062+
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, ts)
1063+
if err != nil {
1064+
return nil, err
1065+
}
1066+
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, ts)
1067+
if err != nil {
1068+
return nil, err
1069+
}
1070+
return spans, nil
1071+
}
1072+
1073+
existingTargetSpans, err := getSpansForTargets(prevTargets, prevDetails)
1074+
if err != nil {
1075+
return err
1076+
}
1077+
trackedSpans, err := getSpansForTargets(targets, newDetails)
1078+
if err != nil {
1079+
return err
1080+
}
1081+
1082+
var newTrackedSpans roachpb.SpanGroup
1083+
newTrackedSpans.Add(trackedSpans...)
1084+
newTrackedSpans.Sub(existingTargetSpans...)
1085+
1086+
frontier, err := span.MakeFrontierAt(ts, newTrackedSpans.Slice()...)
1087+
if err != nil {
1088+
return err
1089+
}
1090+
1091+
// If an alter_changefeed frontier already exists, we add those resolved spans
1092+
// back to the frontier so as not to lose that progress. This could happen if
1093+
// we execute multiple ALTER CHANGEFEED statements that change the filter
1094+
// options to track new tables.
1095+
resolvedSpans, _, err := jobfrontier.GetResolvedSpans(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`)
1096+
if err != nil {
1097+
return err
1098+
}
1099+
for _, resolvedSpan := range resolvedSpans {
1100+
frontier.AddSpansAt(resolvedSpan.Timestamp, resolvedSpan.Span)
1101+
}
1102+
1103+
if err := jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier); err != nil {
1104+
return err
1105+
}
1106+
return nil
1107+
}
8731108
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
8741109
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
8751110
if err != nil {
@@ -934,6 +1169,55 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
9341169
return primarySpans
9351170
}
9361171

1172+
// parseFilterOptionFromTargetSpec parses the existing filter option from the
1173+
// target specification so that it can be used to generate the new filter option.
1174+
func parseFilterOptionFromTargetSpec(
1175+
targetSpec jobspb.ChangefeedTargetSpecification,
1176+
) (tree.ChangefeedFilterOption, error) {
1177+
var tables tree.TableNames
1178+
if targetSpec.FilterList == nil {
1179+
return tree.ChangefeedFilterOption{}, errors.AssertionFailedf("filter list is nil")
1180+
}
1181+
for table := range targetSpec.FilterList.Tables {
1182+
// Parse the fully-qualified table name string back into a TableName object.
1183+
unresolvedName, err := parser.ParseTableName(table)
1184+
if err != nil {
1185+
return tree.ChangefeedFilterOption{}, err
1186+
}
1187+
tableName := unresolvedName.ToTableName()
1188+
tables = append(tables, tableName)
1189+
}
1190+
return tree.ChangefeedFilterOption{
1191+
FilterType: targetSpec.FilterList.FilterType,
1192+
Tables: tables,
1193+
}, nil
1194+
}
1195+
1196+
// generateNewFilters processes alter changefeed commands and extracts filter changes.
1197+
// It returns a slice of Filter structs containing all filter modifications.
1198+
func generateNewFilters(
1199+
alterCmds tree.AlterChangefeedCmds,
1200+
) (filters []tree.ChangefeedFilterOption) {
1201+
for _, cmd := range alterCmds {
1202+
switch v := cmd.(type) {
1203+
case *tree.AlterChangefeedSetFilterOption:
1204+
filters = append(filters, tree.ChangefeedFilterOption{
1205+
FilterType: v.ChangefeedFilterOption.FilterType,
1206+
Tables: v.ChangefeedFilterOption.Tables,
1207+
})
1208+
case *tree.AlterChangefeedUnsetFilterOption:
1209+
// Here we do not return the DefaultChangefeedFilterOption (Exclude
1210+
// with an empty tables list) so we can treat UNSET INCLUDE TABLES and
1211+
// UNSET EXCLUDE TABLES differently.
1212+
filters = append(filters, tree.ChangefeedFilterOption{
1213+
FilterType: v.ChangefeedFilterOption.FilterType,
1214+
})
1215+
}
1216+
}
1217+
1218+
return filters
1219+
}
1220+
9371221
func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
9381222
prevStmt, err := parser.ParseOne(prevDescription)
9391223
if err != nil {

0 commit comments

Comments
 (0)