Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 280 additions & 3 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"context"
"maps"
"net/url"
"slices"

"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobfrontier"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsauth"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand All @@ -55,6 +58,9 @@ func alterChangefeedTypeCheck(
toCheck := []exprutil.ToTypeCheck{
exprutil.Ints{alterChangefeedStmt.Jobs},
}
// TODO(#156806): Validate the options for ADD and UNSET alter commands
// to fail when 'PREPARE'ing an ALTER CHANGEFEED statement specifying an
// invalid option.
for _, cmd := range alterChangefeedStmt.Cmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedSetOptions:
Expand Down Expand Up @@ -129,6 +135,14 @@ func alterChangefeedPlanHook(
}

newChangefeedStmt := &tree.CreateChangefeed{}
if isDBLevelChangefeed(prevDetails) {
newChangefeedStmt.Level = tree.ChangefeedLevelDatabase
if len(prevDetails.TargetSpecifications) != 1 {
return errors.AssertionFailedf("database level changefeed must have exactly one target specification")
}
} else {
newChangefeedStmt.Level = tree.ChangefeedLevelTable
}

prevOpts, err := getPrevOpts(job.Payload().Description, prevDetails.Opts)
if err != nil {
Expand All @@ -152,6 +166,7 @@ func alterChangefeedPlanHook(

newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
ctx, exprEval, p,
newChangefeedStmt.Level,
alterChangefeedStmt.Cmds,
newOptions,
prevDetails, job.Progress(),
Expand All @@ -160,7 +175,14 @@ func alterChangefeedPlanHook(
if err != nil {
return err
}
newChangefeedStmt.TableTargets = newTargets

if err := setChangefeedTargets(ctx, p, newChangefeedStmt, prevDetails, newTargets); err != nil {
return err
}

if err := processFiltersForChangefeed(newChangefeedStmt, alterChangefeedStmt.Cmds, prevDetails); err != nil {
return err
}

if prevDetails.Select != "" {
query, err := cdceval.ParseChangefeedExpression(prevDetails.Select)
Expand Down Expand Up @@ -218,12 +240,31 @@ func alterChangefeedPlanHook(
}

newDetails := jobRecord.Details.(jobspb.ChangefeedDetails)
newDetails.Opts[changefeedbase.OptInitialScan] = ``

// newStatementTime will either be the StatementTime of the job prior to the
// alteration, or it will be the high watermark of the job.
newDetails.StatementTime = newStatementTime

if newChangefeedStmt.Level == tree.ChangefeedLevelDatabase {
// NB: We do not need to remove spans for tables that are no longer
// watched from job progress, only to add spans for new tables.
// When the changefeed is resumed, we will make the frontier so that
// it will only watch the new set of watched tables. This means
// that if altering a filter results in us watching fewer tables,
// we will not emit ANY more events for that table, even if it was
// lagging, i.e. even if they corresponded to updates made before
// the ALTER CHANGEFEED statement.
if err := storeAlterChangefeedFrontier(
ctx, p, jobID, newDetails, prevDetails,
); err != nil {
return err
}
} else {
// If the changefeed is a table-level changefeed, we set the initial
// scan option to its default value of "yes" so that we can do an
// initial scan for new targets.
newDetails.Opts[changefeedbase.OptInitialScan] = ``
}

newPayload := job.Payload()
newPayload.Details = jobspb.WrapPayloadDetails(newDetails)
newPayload.Description = jobRecord.Description
Expand Down Expand Up @@ -268,6 +309,76 @@ func alterChangefeedPlanHook(
return fn, alterChangefeedHeader, false, nil
}

// setChangefeedTargets sets the appropriate target fields on the changefeed
// statement based on the feed level. Database-level feeds get a DatabaseTarget,
// while table-level feeds get TableTargets.
func setChangefeedTargets(
ctx context.Context,
p sql.PlanHookState,
stmt *tree.CreateChangefeed,
prevDetails jobspb.ChangefeedDetails,
newTargets tree.ChangefeedTableTargets,
) error {
if stmt.Level == tree.ChangefeedLevelDatabase {
targetSpec := prevDetails.TargetSpecifications[0]
txn := p.InternalSQLTxn()
databaseDescriptor, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Database(ctx, targetSpec.DescID)
if err != nil {
return err
}
stmt.DatabaseTarget = tree.ChangefeedDatabaseTarget(databaseDescriptor.GetName())
} else {
stmt.TableTargets = newTargets
}
return nil
}

// processFiltersForChangefeed validates and applies filter changes to the
// changefeed statement. Filters are only supported for database-level feeds.
func processFiltersForChangefeed(
stmt *tree.CreateChangefeed,
alterCmds tree.AlterChangefeedCmds,
prevDetails jobspb.ChangefeedDetails,
) error {
filterCommands := generateNewFilters(alterCmds)

// Only DB-level feeds should have filters.
if stmt.Level != tree.ChangefeedLevelDatabase {
if len(filterCommands) > 0 {
return errors.Errorf("cannot set filters for table level changefeeds")
}
return nil
}

// We add the existing filter state to the changefeed statement for
// DB-level feeds. This ensures that existing filters are preserved when
// we alter options unrelated to filters and that we can error if we try
// to set both include and exclude filters at the same time.
targetSpec := prevDetails.TargetSpecifications[0]
filterOpt, err := parseFilterOptionFromTargetSpec(targetSpec)
if err != nil {
return err
}
stmt.FilterOption = filterOpt

for _, filter := range filterCommands {
currentFilter := stmt.FilterOption
if !currentFilter.IsEmpty() && currentFilter.FilterType != filter.FilterType {
return errors.Errorf("cannot alter filter type from %s to %s", currentFilter.FilterType, filter.FilterType)
}
if len(filter.Tables) > 0 {
stmt.FilterOption = tree.ChangefeedFilterOption{
FilterType: filter.FilterType,
Tables: filter.Tables,
}
} else {
stmt.FilterOption = tree.ChangefeedFilterOption{}
}
}

return nil
}

func getTargetDesc(
ctx context.Context,
p sql.PlanHookState,
Expand Down Expand Up @@ -375,6 +486,7 @@ func generateAndValidateNewTargets(
ctx context.Context,
exprEval exprutil.Evaluator,
p sql.PlanHookState,
level tree.ChangefeedLevel,
alterCmds tree.AlterChangefeedCmds,
opts changefeedbase.StatementOptions,
prevDetails jobspb.ChangefeedDetails,
Expand All @@ -387,7 +499,41 @@ func generateAndValidateNewTargets(
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
error,
) {
if level == tree.ChangefeedLevelTable {
return generateAndValidateNewTargetsForTableLevelFeed(
ctx, exprEval, p, alterCmds, opts, prevDetails, prevProgress, sinkURI,
)
}
// Database-level feeds do not support ADD/DROP target commands, so we validate
// that we haven't set any, otherwise, this is a no-op.
isAlteringTargets := slices.ContainsFunc(alterCmds, func(cmd tree.AlterChangefeedCmd) bool {
_, isAdd := cmd.(*tree.AlterChangefeedAddTarget)
_, isDrop := cmd.(*tree.AlterChangefeedDropTarget)
return isAdd || isDrop
})
if isAlteringTargets {
return nil, nil, hlc.Timestamp{}, nil, errors.Errorf("cannot alter targets for a database level changefeed")
}

return tree.ChangefeedTableTargets{}, &prevProgress, prevDetails.StatementTime, nil, nil
}

func generateAndValidateNewTargetsForTableLevelFeed(
ctx context.Context,
exprEval exprutil.Evaluator,
p sql.PlanHookState,
alterCmds tree.AlterChangefeedCmds,
opts changefeedbase.StatementOptions,
prevDetails jobspb.ChangefeedDetails,
prevProgress jobspb.Progress,
sinkURI string,
) (
tree.ChangefeedTableTargets,
*jobspb.Progress,
hlc.Timestamp,
map[tree.ChangefeedTableTarget]jobspb.ChangefeedTargetSpecification,
error,
) {
type targetKey struct {
TableID descpb.ID
FamilyName tree.Name
Expand Down Expand Up @@ -870,6 +1016,85 @@ func generateNewProgress(
return newProgress, prevStatementTime, nil
}

// storeAlterChangefeedFrontier uses the persistent job frontier to ensure that
// when we resume the changefeed, new tables being watched by the database level
// feed (due to ALTER CHANGEFEED changing the filter options) are watched only
// from the time of the ALTER CHANGEFEED statement.
func storeAlterChangefeedFrontier(
ctx context.Context,
p sql.PlanHookState,
jobID jobspb.JobID,
newDetails jobspb.ChangefeedDetails,
prevDetails jobspb.ChangefeedDetails,
) error {
// For database level changefeeds, we may have set a new value for
// the filter options. In this case, the set of tables we are watching
// may have changed. We need to store the new frontier so that we only
// emit events for the new tables from the time of the ALTER CHANGEFEED
// statement, but not before.
statementTime := hlc.Timestamp{
WallTime: p.ExtendedEvalContext().GetStmtTimestamp().UnixNano(),
}

getSpans := func(details jobspb.ChangefeedDetails) ([]roachpb.Span, error) {
targets, err := AllTargets(ctx, details, p.ExecCfg(), statementTime)
if err != nil {
return nil, err
}
tableDescs, err := fetchTableDescriptors(ctx, p.ExecCfg(), targets, statementTime)
if err != nil {
return nil, err
}
spans, err := fetchSpansForTables(ctx, p, tableDescs, details, statementTime)
if err != nil {
return nil, err
}
return spans, nil
}

// We compute the difference between the spans we would have been tracking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not so sure this is right. cant we do something like identify new spans, load the old frontier, add those spans to it at statementtime, and save it back?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're saying that maybe the order should be to 1) load the old frontier then 2) add the new spans instead of what I'm doing (1- creating a new frontier with the new spans and then 2- adding back in the old frontier by iterating over resolved spans). I'm going to try to make that change.

But are you saying something should be different about how I'm identifying the new spans or what timestamp I add the spans to the frontier with? I think that logic is right and I have tests that verify that, but it was kind of tricky. We spoke about this briefly last week. Any specific concerns?

// at the time of the ALTER CHANGEFEED statement and the spans we expect to
// be tracking at that time with the new filter.
trackedSpansBeforeAlter, err := getSpans(prevDetails)
if err != nil {
return err
}
// Note that these spans are computed based on the spans we expect to be
// tracking when processing events from the time of the ALTER CHANGEFEED
// statement. This may be different than the spans/targets we will be
// tracking when the changefeed is resumed (which will be computed based on
// the highwater). This will happen if a table that is matched by the filter
// is created between the highwater and the ALTER CHANGEFEED statement.
trackedSpansAfterAlter, err := getSpans(newDetails)
if err != nil {
return err
}

var newTrackedSpans roachpb.SpanGroup
newTrackedSpans.Add(trackedSpansAfterAlter...)
newTrackedSpans.Sub(trackedSpansBeforeAlter...)

if newTrackedSpans.Len() == 0 {
return nil
}

frontier, found, err := jobfrontier.Get(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`)
if err != nil {
return err
}
if !found {
frontier, err = span.MakeFrontier()
if err != nil {
return err
}
}

if err := frontier.AddSpansAt(statementTime, newTrackedSpans.Slice()...); err != nil {
return err
}

return jobfrontier.Store(ctx, p.InternalSQLTxn(), jobID, `alter_changefeed`, frontier)
}
func removeSpansFromProgress(progress jobspb.Progress, spansToRemove []roachpb.Span) error {
spanLevelCheckpoint, err := getSpanLevelCheckpointFromProgress(progress)
if err != nil {
Expand Down Expand Up @@ -934,6 +1159,58 @@ func fetchSpansForDescs(p sql.PlanHookState, spanIDs []spanID) (primarySpans []r
return primarySpans
}

// parseFilterOptionFromTargetSpec parses the existing filter option from the
// target specification so that it can be used to generate the new filter option.
func parseFilterOptionFromTargetSpec(
targetSpec jobspb.ChangefeedTargetSpecification,
) (tree.ChangefeedFilterOption, error) {
var tables tree.TableNames
if targetSpec.FilterList == nil {
return tree.ChangefeedFilterOption{}, errors.AssertionFailedf("filter list is nil")
}
for table := range targetSpec.FilterList.Tables {
// Parse the fully-qualified table name string back into a TableName object.
unresolvedName, err := parser.ParseTableName(table)
if err != nil {
return tree.ChangefeedFilterOption{}, err
}
tableName := unresolvedName.ToTableName()
tables = append(tables, tableName)
}
return tree.ChangefeedFilterOption{
FilterType: targetSpec.FilterList.FilterType,
Tables: tables,
}, nil
}

// generateNewFilters processes alter changefeed commands and extracts filter changes.
// It returns a slice of Filter structs containing all filter modifications.
func generateNewFilters(
alterCmds tree.AlterChangefeedCmds,
) (filters []tree.ChangefeedFilterOption) {
for _, cmd := range alterCmds {
switch v := cmd.(type) {
case *tree.AlterChangefeedSetFilterOption:
filters = append(filters, tree.ChangefeedFilterOption{
FilterType: v.ChangefeedFilterOption.FilterType,
Tables: v.ChangefeedFilterOption.Tables,
})
case *tree.AlterChangefeedUnsetFilterOption:
// Here we do not return the default changefeed filter option (Exclude
// with an empty tables list) so we can treat UNSET INCLUDE TABLES and
// UNSET EXCLUDE TABLES differently. For example, if we create a feed
// with EXCLUDE TABLES foo, bar, and then UNSET EXCLUDE TABLES, that
// would succeed. However, we expect UNSET INCLUDE TABLES to fail,
// even though both correspond to the empty filter state.
filters = append(filters, tree.ChangefeedFilterOption{
FilterType: v.ChangefeedFilterOption.FilterType,
})
}
}

return filters
}

func getPrevOpts(prevDescription string, opts map[string]string) (map[string]string, error) {
prevStmt, err := parser.ParseOne(prevDescription)
if err != nil {
Expand Down
Loading
Loading