Skip to content

Commit 40ef5d9

Browse files
committed
changefeedccl: support altering filters on db-level feeds
This commit allows users to change filters on existing DB-level changefeeds with commands like ALTER CHANGEFEED <job_id> SET INCLUDE TABLES (including EXCLUDE or UNSET). This also implements errors when trying to set filters on non-DB-level feeds, ADD/DROP targets on DB-level feeds, setting both INCLUDE and EXCLUDE at the same time, and switching between them without first doing an explicit unset. Note that, when setting an include filter on a feed that already has a filter set, tables not explicitly respecified in the ALTER CHANGEFEED command will no longer be included (or excluded) by the filter. When filter changes result in new tables being watched, they will only emit changes from the time the filter was altered. Epic: CRDB-1421 Fixes: #156484 Fixes: #155986 Informs: #155708 Release note: none
1 parent fd29a93 commit 40ef5d9

File tree

3 files changed

+1092
-3
lines changed

3 files changed

+1092
-3
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 285 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import (
99
"context"
1010
"maps"
1111
"net/url"
12+
"slices"
1213

1314
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1415
"github.com/cockroachdb/cockroach/pkg/build"
1516
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1819
"github.com/cockroachdb/cockroach/pkg/jobs"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1921
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2022
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -35,6 +37,7 @@ import (
3537
"github.com/cockroachdb/cockroach/pkg/sql/types"
3638
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3739
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/span"
3841
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3942
"github.com/cockroachdb/errors"
4043
)
@@ -55,6 +58,9 @@ func alterChangefeedTypeCheck(
5558
toCheck := []exprutil.ToTypeCheck{
5659
exprutil.Ints{alterChangefeedStmt.Jobs},
5760
}
61+
// TODO(#156806): Validate the options for ADD and UNSET alter commands
62+
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
63+
// invalid option.
5864
for _, cmd := range alterChangefeedStmt.Cmds {
5965
switch v := cmd.(type) {
6066
case *tree.AlterChangefeedSetOptions:
@@ -129,6 +135,14 @@ func alterChangefeedPlanHook(
129135
}
130136

131137
newChangefeedStmt := &tree.CreateChangefeed{}
138+
if isDBLevelChangefeed(prevDetails) {
139+
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
140+
if len(prevDetails.TargetSpecifications) != 1 {
141+
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
142+
}
143+
} else {
144+
newChangefeedStmt.Level = tree.ChangefeedLevelTable
145+
}
132146

133147
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
134148
if err != nil {
@@ -152,6 +166,7 @@ func alterChangefeedPlanHook(
152166

153167
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
154168
ctx, exprEval, p,
169+
newChangefeedStmt.Level,
155170
alterChangefeedStmt.Cmds,
156171
newOptions,
157172
prevDetails, job.Progress(),
@@ -160,7 +175,14 @@ func alterChangefeedPlanHook(
160175
if err != nil {
161176
return err
162177
}
163-
newChangefeedStmt.TableTargets = newTargets
178+
179+
if err := setChangefeedTargets(ctx, p, newChangefeedStmt, prevDetails, newTargets); err != nil {
180+
return err
181+
}
182+
183+
if err := processFiltersForChangefeed(newChangefeedStmt, alterChangefeedStmt.Cmds, prevDetails); err != nil {
184+
return err
185+
}
164186

165187
if prevDetails.Select != "" {
166188
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -218,12 +240,31 @@ func alterChangefeedPlanHook(
218240
}
219241

220242
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
221-
newDetails.Opts[changefeedbase.OptInitialScan] = ``
222-
223243
// newStatementTime will either be the StatementTime of the job prior to the
224244
// alteration, or it will be the high watermark of the job.
225245
newDetails.StatementTime = newStatementTime
226246

247+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
248+
// NB: We do not need to remove spans for tables that are no longer
249+
// watched from job progress, only to add spans for new tables.
250+
// When the changefeed is resumed, we will make the frontier so that
251+
// it will only watch the new set of watched tables. This means
252+
// that if altering a filter results in us watching fewer tables,
253+
// we will not emit ANY more events for that table, even if it was
254+
// lagging, i.e. even if they corresponded to updates made before
255+
// the ALTER CHANGEFEED statement.
256+
if err := storeAlterChangefeedFrontier(
257+
ctx, p, jobID, newDetails, prevDetails,
258+
); err != nil {
259+
return err
260+
}
261+
} else {
262+
// If the changefeed is a table-level changefeed, we set the initial
263+
// scan option to its default value of "yes" so that we can do an
264+
// initial scan for new targets.
265+
newDetails.Opts[changefeedbase.OptInitialScan] = ``
266+
}
267+
227268
newPayload := job.Payload()
228269
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
229270
newPayload.Description = jobRecord.Description
@@ -268,6 +309,80 @@ func alterChangefeedPlanHook(
268309
return fn, alterChangefeedHeader, false, nil
269310
}
270311

312+
// setChangefeedTargets sets the appropriate target fields on the changefeed
313+
// statement based on the feed level. Database-level feeds get a DatabaseTarget,
314+
// while table-level feeds get TableTargets.
315+
func setChangefeedTargets(
316+
ctx context.Context,
317+
p sql.PlanHookState,
318+
stmt *tree.CreateChangefeed,
319+
prevDetails jobspb.ChangefeedDetails,
320+
newTargets tree.ChangefeedTableTargets,
321+
) error {
322+
if stmt.Level == tree.ChangefeedLevelDatabase {
323+
targetSpec := prevDetails.TargetSpecifications[0]
324+
txn := p.InternalSQLTxn()
325+
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
326+
if err != nil {
327+
return err
328+
}
329+
stmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(databaseDescriptor.GetName())
330+
} else {
331+
stmt.TableTargets = newTargets
332+
}
333+
return nil
334+
}
335+
336+
// processFiltersForChangefeed validates and applies filter changes to the
337+
// changefeed statement. Filters are only supported for database-level feeds.
338+
func processFiltersForChangefeed(
339+
stmt *tree.CreateChangefeed,
340+
alterCmds tree.AlterChangefeedCmds,
341+
prevDetails jobspb.ChangefeedDetails,
342+
) error {
343+
filterCommands := generateNewFilters(alterCmds)
344+
345+
// Only DB-level feeds should have filters.
346+
if len(filterCommands) > 0 && stmt.Level != tree.ChangefeedLevelDatabase {
347+
return errors.Errorf("filters are only supported for database level changefeeds")
348+
}
349+
350+
// We add the existing filter state to the changefeed statement for
351+
// DB-level feeds. This ensures that existing filters are preserved when
352+
// we alter options unrelated to filters and that we can error if we try
353+
// to set both include and exclude filters at the same time.
354+
if stmt.Level == tree.ChangefeedLevelDatabase {
355+
targetSpec := prevDetails.TargetSpecifications[0]
356+
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
357+
if err != nil {
358+
return err
359+
}
360+
stmt.FilterOption = filterOpt
361+
}
362+
363+
// For table-level feeds, this should be a no-op since there are no
364+
// filter commands as asserted above. Otherwise, apply each filter in order.
365+
for _, filter := range filterCommands {
366+
currentFilter := stmt.FilterOption
367+
// Note that the no-filters state is represented by FilterType = exclude
368+
// with an empty tables list. In that state, it's perfectly valid to
369+
// set or unset an Include filter.
370+
if len(currentFilter.Tables) > 0 && currentFilter.FilterType != filter.FilterType {
371+
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
372+
}
373+
if len(filter.Tables) > 0 {
374+
stmt.FilterOption = tree.ChangefeedFilterOption{
375+
FilterType: filter.FilterType,
376+
Tables: filter.Tables,
377+
}
378+
} else {
379+
stmt.FilterOption = tree.DefaultChangefeedFilterOption()
380+
}
381+
}
382+
383+
return nil
384+
}
385+
271386
func getTargetDesc(
272387
ctx context.Context,
273388
p sql.PlanHookState,
@@ -375,6 +490,7 @@ func generateAndValidateNewTargets(
375490
ctx context.Context,
376491
exprEval exprutil.Evaluator,
377492
p sql.PlanHookState,
493+
level tree.ChangefeedLevel,
378494
alterCmds tree.AlterChangefeedCmds,
379495
opts changefeedbase.StatementOptions,
380496
prevDetails jobspb.ChangefeedDetails,
@@ -387,7 +503,42 @@ func generateAndValidateNewTargets(
387503
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
388504
error,
389505
) {
506+
if level == tree.ChangefeedLevelTable {
507+
return generateAndValidateNewTargetsForTableLevelFeed(
508+
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
509+
)
510+
}
511+
// Database-level feeds do not support ADD/DROP target commands, so we validate
512+
// that we haven't set any, otherwise, this is a no-op.
513+
isAlteringTargets := slices.ContainsFunc(alterCmds, func(cmd tree.AlterChangefeedCmd) bool {
514+
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
515+
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
516+
return isAdd || isDrop
517+
})
518+
if isAlteringTargets {
519+
return nil, nil, hlc.Timestamp{}, nil, errors.Errorf("cannot alter targets for a database level changefeed")
520+
}
390521

522+
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime,
523+
make(map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification), nil
524+
}
525+
526+
func generateAndValidateNewTargetsForTableLevelFeed(
527+
ctx context.Context,
528+
exprEval exprutil.Evaluator,
529+
p sql.PlanHookState,
530+
alterCmds tree.AlterChangefeedCmds,
531+
opts changefeedbase.StatementOptions,
532+
prevDetails jobspb.ChangefeedDetails,
533+
prevProgress jobspb.Progress,
534+
sinkURI string,
535+
) (
536+
tree.ChangefeedTableTargets,
537+
*jobspb.Progress,
538+
hlc.Timestamp,
539+
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
540+
error,
541+
) {
391542
type targetKey struct {
392543
TableID descpb.ID
393544
FamilyName tree.Name
@@ -870,6 +1021,88 @@ func generateNewProgress(
8701021
return newProgress, prevStatementTime, nil
8711022
}
8721023

1024+
// storeAlterChangefeedFrontier uses the persistentjob frontier to ensure that when
1025+
// we resume the changefeed, new tables being watched by the database level feed
1026+
// (due to ALTER CHANGEFEED changing the filter options) are watched only from
1027+
// the time of the ALTER CHANGEFEED statement.
1028+
func storeAlterChangefeedFrontier(
1029+
ctx context.Context,
1030+
p sql.PlanHookState,
1031+
jobID jobspb.JobID,
1032+
newDetails jobspb.ChangefeedDetails,
1033+
prevDetails jobspb.ChangefeedDetails,
1034+
) error {
1035+
// For database level changefeeds, we may have set a new value for
1036+
// the filter options. In this case, the set of tables we are watching
1037+
// may have changed. We need to store the new frontier so that we only
1038+
// emit events for the new tables from the time of the ALTER CHANGEFEED
1039+
// statement, but not before.
1040+
statementTime := hlc.Timestamp{
1041+
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
1042+
}
1043+
1044+
getSpans := func(details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
1045+
targets, err := AllTargets(ctx, details, p.ExecCfg(), statementTime)
1046+
if err != nil {
1047+
return nil, err
1048+
}
1049+
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, statementTime)
1050+
if err != nil {
1051+
return nil, err
1052+
}
1053+
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, statementTime)
1054+
if err != nil {
1055+
return nil, err
1056+
}
1057+
return spans, nil
1058+
}
1059+
1060+
// We compute the difference between the spans we would have been tracking
1061+
// at the time of the ALTER CHANGEFEED statement and the spans we expect to
1062+
// be tracking at that time with the new filter.
1063+
trackedSpansBeforeAlter, err := getSpans(prevDetails)
1064+
if err != nil {
1065+
return err
1066+
}
1067+
// Note that these spans are computed based on the spans we expect to be
1068+
// tracking when processing events from the time of the ALTER CHANGEFEED
1069+
// statement. This may be different than the spans/targets we will be
1070+
// tracking when the changefeed is resumed (which will be computed based on
1071+
// the highwater). This will happen if a table that is matched by the filter
1072+
// is created between the highwater and the ALTER CHANGEFEED statement.
1073+
trackedSpansAfterAlter, err := getSpans(newDetails)
1074+
if err != nil {
1075+
return err
1076+
}
1077+
1078+
var newTrackedSpans roachpb.SpanGroup
1079+
newTrackedSpans.Add(trackedSpansAfterAlter...)
1080+
newTrackedSpans.Sub(trackedSpansBeforeAlter...)
1081+
1082+
frontier, err := span.MakeFrontierAt(statementTime, newTrackedSpans.Slice()...)
1083+
if err != nil {
1084+
return err
1085+
}
1086+
1087+
// If an alter_changefeed frontier already exists, we add those resolved spans
1088+
// back to the frontier so as not to lose that progress. This could happen if
1089+
// we execute multiple ALTER CHANGEFEED statements that change the filter
1090+
// options to track new tables.
1091+
resolvedSpans, _, err := jobfrontier.GetResolvedSpans(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`)
1092+
if err != nil {
1093+
return err
1094+
}
1095+
for _, resolvedSpan := range resolvedSpans {
1096+
if err := frontier.AddSpansAt(resolvedSpan.Timestamp, resolvedSpan.Span); err != nil {
1097+
return err
1098+
}
1099+
}
1100+
1101+
if err := jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier); err != nil {
1102+
return err
1103+
}
1104+
return nil
1105+
}
8731106
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
8741107
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
8751108
if err != nil {
@@ -934,6 +1167,55 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
9341167
return primarySpans
9351168
}
9361169

1170+
// parseFilterOptionFromTargetSpec parses the existing filter option from the
1171+
// target specification so that it can be used to generate the new filter option.
1172+
func parseFilterOptionFromTargetSpec(
1173+
targetSpec jobspb.ChangefeedTargetSpecification,
1174+
) (tree.ChangefeedFilterOption, error) {
1175+
var tables tree.TableNames
1176+
if targetSpec.FilterList == nil {
1177+
return tree.ChangefeedFilterOption{}, errors.AssertionFailedf("filter list is nil")
1178+
}
1179+
for table := range targetSpec.FilterList.Tables {
1180+
// Parse the fully-qualified table name string back into a TableName object.
1181+
unresolvedName, err := parser.ParseTableName(table)
1182+
if err != nil {
1183+
return tree.ChangefeedFilterOption{}, err
1184+
}
1185+
tableName := unresolvedName.ToTableName()
1186+
tables = append(tables, tableName)
1187+
}
1188+
return tree.ChangefeedFilterOption{
1189+
FilterType: targetSpec.FilterList.FilterType,
1190+
Tables: tables,
1191+
}, nil
1192+
}
1193+
1194+
// generateNewFilters processes alter changefeed commands and extracts filter changes.
1195+
// It returns a slice of Filter structs containing all filter modifications.
1196+
func generateNewFilters(
1197+
alterCmds tree.AlterChangefeedCmds,
1198+
) (filters []tree.ChangefeedFilterOption) {
1199+
for _, cmd := range alterCmds {
1200+
switch v := cmd.(type) {
1201+
case *tree.AlterChangefeedSetFilterOption:
1202+
filters = append(filters, tree.ChangefeedFilterOption{
1203+
FilterType: v.ChangefeedFilterOption.FilterType,
1204+
Tables: v.ChangefeedFilterOption.Tables,
1205+
})
1206+
case *tree.AlterChangefeedUnsetFilterOption:
1207+
// Here we do not return the DefaultChangefeedFilterOption (Exclude
1208+
// with an empty tables list) so we can treat UNSET INCLUDE TABLES and
1209+
// UNSET EXCLUDE TABLES differently.
1210+
filters = append(filters, tree.ChangefeedFilterOption{
1211+
FilterType: v.ChangefeedFilterOption.FilterType,
1212+
})
1213+
}
1214+
}
1215+
1216+
return filters
1217+
}
1218+
9371219
func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
9381220
prevStmt, err := parser.ParseOne(prevDescription)
9391221
if err != nil {

0 commit comments

Comments
 (0)