Skip to content

Commit 109639c

Browse files
committed
changefeedccl: support altering filters on db-level feeds
This commit allows users to change filters on existing DB-level changefeeds with commands like ALTER CHANGEFEED <job_id> SET INCLUDE TABLES (including EXCLUDE or UNSET). This also implements errors when trying to set filters on non-DB-level feeds, ADD/DROP targets on DB-level feeds, setting both INCLUDE and EXCLUDE at the same time, and switching between them without first doing an explicit unset. Note that, when setting an include filter on a feed that already has a filter set, tables not explicitly respecified in the ALTER CHANGEFEED command will no longer be included (or excluded) by the filter. When filter changes result in new tables being watched, they will only emit changes from the time the filter was altered. Epic: CRDB-1421 Fixes: #156484 Fixes: #155986 Informs: #155708 Release note: none
1 parent 4bcccc8 commit 109639c

File tree

2 files changed

+1077
-3
lines changed

2 files changed

+1077
-3
lines changed

pkg/ccl/changefeedccl/alter_changefeed_stmt.go

Lines changed: 280 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import (
99
"context"
1010
"maps"
1111
"net/url"
12+
"slices"
1213

1314
"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
1415
"github.com/cockroachdb/cockroach/pkg/build"
1516
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
1617
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
1718
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
1819
"github.com/cockroachdb/cockroach/pkg/jobs"
20+
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
1921
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
2022
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2123
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -35,6 +37,7 @@ import (
3537
"github.com/cockroachdb/cockroach/pkg/sql/types"
3638
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3739
"github.com/cockroachdb/cockroach/pkg/util/log"
40+
"github.com/cockroachdb/cockroach/pkg/util/span"
3841
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3942
"github.com/cockroachdb/errors"
4043
)
@@ -55,6 +58,9 @@ func alterChangefeedTypeCheck(
5558
toCheck := []exprutil.ToTypeCheck{
5659
exprutil.Ints{alterChangefeedStmt.Jobs},
5760
}
61+
// TODO(#156806): Validate the options for ADD and UNSET alter commands
62+
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
63+
// invalid option.
5864
for _, cmd := range alterChangefeedStmt.Cmds {
5965
switch v := cmd.(type) {
6066
case *tree.AlterChangefeedSetOptions:
@@ -129,6 +135,14 @@ func alterChangefeedPlanHook(
129135
}
130136

131137
newChangefeedStmt := &tree.CreateChangefeed{}
138+
if isDBLevelChangefeed(prevDetails) {
139+
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
140+
if len(prevDetails.TargetSpecifications) != 1 {
141+
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
142+
}
143+
} else {
144+
newChangefeedStmt.Level = tree.ChangefeedLevelTable
145+
}
132146

133147
prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
134148
if err != nil {
@@ -152,6 +166,7 @@ func alterChangefeedPlanHook(
152166

153167
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
154168
ctx, exprEval, p,
169+
newChangefeedStmt.Level,
155170
alterChangefeedStmt.Cmds,
156171
newOptions,
157172
prevDetails, job.Progress(),
@@ -160,7 +175,14 @@ func alterChangefeedPlanHook(
160175
if err != nil {
161176
return err
162177
}
163-
newChangefeedStmt.TableTargets = newTargets
178+
179+
if err := setChangefeedTargets(ctx, p, newChangefeedStmt, prevDetails, newTargets); err != nil {
180+
return err
181+
}
182+
183+
if err := processFiltersForChangefeed(newChangefeedStmt, alterChangefeedStmt.Cmds, prevDetails); err != nil {
184+
return err
185+
}
164186

165187
if prevDetails.Select != "" {
166188
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
@@ -218,12 +240,31 @@ func alterChangefeedPlanHook(
218240
}
219241

220242
newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
221-
newDetails.Opts[changefeedbase.OptInitialScan] = ``
222-
223243
// newStatementTime will either be the StatementTime of the job prior to the
224244
// alteration, or it will be the high watermark of the job.
225245
newDetails.StatementTime = newStatementTime
226246

247+
if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
248+
// NB: We do not need to remove spans for tables that are no longer
249+
// watched from job progress, only to add spans for new tables.
250+
// When the changefeed is resumed, we will make the frontier so that
251+
// it will only watch the new set of watched tables. This means
252+
// that if altering a filter results in us watching fewer tables,
253+
// we will not emit ANY more events for that table, even if it was
254+
// lagging, i.e. even if they corresponded to updates made before
255+
// the ALTER CHANGEFEED statement.
256+
if err := storeAlterChangefeedFrontier(
257+
ctx, p, jobID, newDetails, prevDetails,
258+
); err != nil {
259+
return err
260+
}
261+
} else {
262+
// If the changefeed is a table-level changefeed, we set the initial
263+
// scan option to its default value of "yes" so that we can do an
264+
// initial scan for new targets.
265+
newDetails.Opts[changefeedbase.OptInitialScan] = ``
266+
}
267+
227268
newPayload := job.Payload()
228269
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
229270
newPayload.Description = jobRecord.Description
@@ -268,6 +309,79 @@ func alterChangefeedPlanHook(
268309
return fn, alterChangefeedHeader, false, nil
269310
}
270311

312+
// setChangefeedTargets sets the appropriate target fields on the changefeed
313+
// statement based on the feed level. Database-level feeds get a DatabaseTarget,
314+
// while table-level feeds get TableTargets.
315+
func setChangefeedTargets(
316+
ctx context.Context,
317+
p sql.PlanHookState,
318+
stmt *tree.CreateChangefeed,
319+
prevDetails jobspb.ChangefeedDetails,
320+
newTargets tree.ChangefeedTableTargets,
321+
) error {
322+
if stmt.Level == tree.ChangefeedLevelDatabase {
323+
targetSpec := prevDetails.TargetSpecifications[0]
324+
txn := p.InternalSQLTxn()
325+
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
326+
if err != nil {
327+
return err
328+
}
329+
stmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(databaseDescriptor.GetName())
330+
} else {
331+
stmt.TableTargets = newTargets
332+
}
333+
return nil
334+
}
335+
336+
// processFiltersForChangefeed validates and applies filter changes to the
337+
// changefeed statement. Filters are only supported for database-level feeds.
338+
func processFiltersForChangefeed(
339+
stmt *tree.CreateChangefeed,
340+
alterCmds tree.AlterChangefeedCmds,
341+
prevDetails jobspb.ChangefeedDetails,
342+
) error {
343+
filterCommands := generateNewFilters(alterCmds)
344+
345+
// Only DB-level feeds should have filters.
346+
if stmt.Level != tree.ChangefeedLevelDatabase {
347+
if len(filterCommands) > 0 {
348+
return errors.Errorf("cannot set filters for table level changefeeds")
349+
}
350+
return nil
351+
}
352+
353+
// We add the existing filter state to the changefeed statement for
354+
// DB-level feeds. This ensures that existing filters are preserved when
355+
// we alter options unrelated to filters and that we can error if we try
356+
// to set both include and exclude filters at the same time.
357+
targetSpec := prevDetails.TargetSpecifications[0]
358+
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
359+
if err != nil {
360+
return err
361+
}
362+
stmt.FilterOption = filterOpt
363+
364+
for _, filter := range filterCommands {
365+
currentFilter := stmt.FilterOption
366+
// Note that the no-filters state is represented by FilterType = exclude
367+
// with an empty tables list. In that state, it's perfectly valid to
368+
// set or unset an Include filter.
369+
if len(currentFilter.Tables) > 0 && currentFilter.FilterType != filter.FilterType {
370+
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
371+
}
372+
if len(filter.Tables) > 0 {
373+
stmt.FilterOption = tree.ChangefeedFilterOption{
374+
FilterType: filter.FilterType,
375+
Tables: filter.Tables,
376+
}
377+
} else {
378+
stmt.FilterOption = tree.ChangefeedFilterOption{}
379+
}
380+
}
381+
382+
return nil
383+
}
384+
271385
func getTargetDesc(
272386
ctx context.Context,
273387
p sql.PlanHookState,
@@ -375,6 +489,7 @@ func generateAndValidateNewTargets(
375489
ctx context.Context,
376490
exprEval exprutil.Evaluator,
377491
p sql.PlanHookState,
492+
level tree.ChangefeedLevel,
378493
alterCmds tree.AlterChangefeedCmds,
379494
opts changefeedbase.StatementOptions,
380495
prevDetails jobspb.ChangefeedDetails,
@@ -387,7 +502,41 @@ func generateAndValidateNewTargets(
387502
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
388503
error,
389504
) {
505+
if level == tree.ChangefeedLevelTable {
506+
return generateAndValidateNewTargetsForTableLevelFeed(
507+
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
508+
)
509+
}
510+
// Database-level feeds do not support ADD/DROP target commands, so we validate
511+
// that we haven't set any, otherwise, this is a no-op.
512+
isAlteringTargets := slices.ContainsFunc(alterCmds, func(cmd tree.AlterChangefeedCmd) bool {
513+
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
514+
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
515+
return isAdd || isDrop
516+
})
517+
if isAlteringTargets {
518+
return nil, nil, hlc.Timestamp{}, nil, errors.Errorf("cannot alter targets for a database level changefeed")
519+
}
390520

521+
return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime, nil, nil
522+
}
523+
524+
func generateAndValidateNewTargetsForTableLevelFeed(
525+
ctx context.Context,
526+
exprEval exprutil.Evaluator,
527+
p sql.PlanHookState,
528+
alterCmds tree.AlterChangefeedCmds,
529+
opts changefeedbase.StatementOptions,
530+
prevDetails jobspb.ChangefeedDetails,
531+
prevProgress jobspb.Progress,
532+
sinkURI string,
533+
) (
534+
tree.ChangefeedTableTargets,
535+
*jobspb.Progress,
536+
hlc.Timestamp,
537+
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
538+
error,
539+
) {
391540
type targetKey struct {
392541
TableID descpb.ID
393542
FamilyName tree.Name
@@ -870,6 +1019,85 @@ func generateNewProgress(
8701019
return newProgress, prevStatementTime, nil
8711020
}
8721021

1022+
// storeAlterChangefeedFrontier uses the persistent job frontier to ensure that
1023+
// when we resume the changefeed, new tables being watched by the database level
1024+
// feed (due to ALTER CHANGEFEED changing the filter options) are watched only
1025+
// from the time of the ALTER CHANGEFEED statement.
1026+
func storeAlterChangefeedFrontier(
1027+
ctx context.Context,
1028+
p sql.PlanHookState,
1029+
jobID jobspb.JobID,
1030+
newDetails jobspb.ChangefeedDetails,
1031+
prevDetails jobspb.ChangefeedDetails,
1032+
) error {
1033+
// For database level changefeeds, we may have set a new value for
1034+
// the filter options. In this case, the set of tables we are watching
1035+
// may have changed. We need to store the new frontier so that we only
1036+
// emit events for the new tables from the time of the ALTER CHANGEFEED
1037+
// statement, but not before.
1038+
statementTime := hlc.Timestamp{
1039+
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
1040+
}
1041+
1042+
getSpans := func(details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
1043+
targets, err := AllTargets(ctx, details, p.ExecCfg(), statementTime)
1044+
if err != nil {
1045+
return nil, err
1046+
}
1047+
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, statementTime)
1048+
if err != nil {
1049+
return nil, err
1050+
}
1051+
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, statementTime)
1052+
if err != nil {
1053+
return nil, err
1054+
}
1055+
return spans, nil
1056+
}
1057+
1058+
// We compute the difference between the spans we would have been tracking
1059+
// at the time of the ALTER CHANGEFEED statement and the spans we expect to
1060+
// be tracking at that time with the new filter.
1061+
trackedSpansBeforeAlter, err := getSpans(prevDetails)
1062+
if err != nil {
1063+
return err
1064+
}
1065+
// Note that these spans are computed based on the spans we expect to be
1066+
// tracking when processing events from the time of the ALTER CHANGEFEED
1067+
// statement. This may be different than the spans/targets we will be
1068+
// tracking when the changefeed is resumed (which will be computed based on
1069+
// the highwater). This will happen if a table that is matched by the filter
1070+
// is created between the highwater and the ALTER CHANGEFEED statement.
1071+
trackedSpansAfterAlter, err := getSpans(newDetails)
1072+
if err != nil {
1073+
return err
1074+
}
1075+
1076+
var newTrackedSpans roachpb.SpanGroup
1077+
newTrackedSpans.Add(trackedSpansAfterAlter...)
1078+
newTrackedSpans.Sub(trackedSpansBeforeAlter...)
1079+
1080+
if newTrackedSpans.Len() == 0 {
1081+
return nil
1082+
}
1083+
1084+
frontier, found, err := jobfrontier.Get(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`)
1085+
if err != nil {
1086+
return err
1087+
}
1088+
if !found {
1089+
frontier, err = span.MakeFrontier()
1090+
if err != nil {
1091+
return err
1092+
}
1093+
}
1094+
1095+
if err := frontier.AddSpansAt(statementTime, newTrackedSpans.Slice()...); err != nil {
1096+
return err
1097+
}
1098+
1099+
return jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier)
1100+
}
8731101
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
8741102
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
8751103
if err != nil {
@@ -934,6 +1162,55 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
9341162
return primarySpans
9351163
}
9361164

1165+
// parseFilterOptionFromTargetSpec parses the existing filter option from the
1166+
// target specification so that it can be used to generate the new filter option.
1167+
func parseFilterOptionFromTargetSpec(
1168+
targetSpec jobspb.ChangefeedTargetSpecification,
1169+
) (tree.ChangefeedFilterOption, error) {
1170+
var tables tree.TableNames
1171+
if targetSpec.FilterList == nil {
1172+
return tree.ChangefeedFilterOption{}, errors.AssertionFailedf("filter list is nil")
1173+
}
1174+
for table := range targetSpec.FilterList.Tables {
1175+
// Parse the fully-qualified table name string back into a TableName object.
1176+
unresolvedName, err := parser.ParseTableName(table)
1177+
if err != nil {
1178+
return tree.ChangefeedFilterOption{}, err
1179+
}
1180+
tableName := unresolvedName.ToTableName()
1181+
tables = append(tables, tableName)
1182+
}
1183+
return tree.ChangefeedFilterOption{
1184+
FilterType: targetSpec.FilterList.FilterType,
1185+
Tables: tables,
1186+
}, nil
1187+
}
1188+
1189+
// generateNewFilters processes alter changefeed commands and extracts filter changes.
1190+
// It returns a slice of Filter structs containing all filter modifications.
1191+
func generateNewFilters(
1192+
alterCmds tree.AlterChangefeedCmds,
1193+
) (filters []tree.ChangefeedFilterOption) {
1194+
for _, cmd := range alterCmds {
1195+
switch v := cmd.(type) {
1196+
case *tree.AlterChangefeedSetFilterOption:
1197+
filters = append(filters, tree.ChangefeedFilterOption{
1198+
FilterType: v.ChangefeedFilterOption.FilterType,
1199+
Tables: v.ChangefeedFilterOption.Tables,
1200+
})
1201+
case *tree.AlterChangefeedUnsetFilterOption:
1202+
// Here we do not return the default changefeed filter option (Exclude
1203+
// with an empty tables list) so we can treat UNSET INCLUDE TABLES and
1204+
// UNSET EXCLUDE TABLES differently.
1205+
filters = append(filters, tree.ChangefeedFilterOption{
1206+
FilterType: v.ChangefeedFilterOption.FilterType,
1207+
})
1208+
}
1209+
}
1210+
1211+
return filters
1212+
}
1213+
9371214
func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
9381215
prevStmt, err := parser.ParseOne(prevDescription)
9391216
if err != nil {

0 commit comments

Comments
 (0)