Skip to content

Commit f564a06

Browse files
committed
changefeedccl: add metamorphic testing for db-level feeds
Epic: CRDB-1421 Fixes: #148858
1 parent e04fc04 commit f564a06

File tree

7 files changed

+364
-73
lines changed

7 files changed

+364
-73
lines changed

pkg/ccl/changefeedccl/alter_changefeed_test.go

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,9 @@ func TestAlterChangefeedAddTargetAfterInitialScan(t *testing.T) {
261261
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
262262
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b INT)`)
263263

264-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
264+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
265+
reason: "db level changefeeds don't support ALTER CHANGEFEED commands with initial_scan",
266+
})
265267
defer closeFeed(t, testFeed)
266268

267269
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -493,7 +495,11 @@ func TestAlterChangefeedDropTargetAfterTableDrop(t *testing.T) {
493495
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
494496
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
495497

496-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`)
498+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH on_error='pause'`,
499+
optOutOfMetamorphicDBLevelChangefeed{
500+
reason: "db level changefeeds don't support ADD/DROP TARGETS in ALTER CHANGEFEEDs",
501+
},
502+
)
497503
defer closeFeed(t, testFeed)
498504

499505
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -668,7 +674,9 @@ func TestAlterChangefeedErrors(t *testing.T) {
668674
sqlDB := sqlutils.MakeSQLRunner(s.DB)
669675
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
670676
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
671-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`)
677+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo`, optOutOfMetamorphicDBLevelChangefeed{
678+
reason: "test initializes multiple tables but doesn't watch all of them",
679+
})
672680
defer closeFeed(t, testFeed)
673681

674682
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
@@ -799,7 +807,10 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
799807
// Reset the counts.
800808
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
801809

802-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
810+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`,
811+
optOutOfMetamorphicDBLevelChangefeed{
812+
reason: "test initializes multiple tables but doesn't watch all of them",
813+
})
803814
defer closeFeed(t, testFeed)
804815
feed := testFeed.(cdctest.EnterpriseTestFeed)
805816

@@ -1068,7 +1079,10 @@ func TestAlterChangefeedDatabaseQualifiedNames(t *testing.T) {
10681079
sqlDB.Exec(t, `CREATE TABLE d.users (id INT PRIMARY KEY, name STRING)`)
10691080
sqlDB.Exec(t, `INSERT INTO d.drivers VALUES (1, 'Alice')`)
10701081
sqlDB.Exec(t, `INSERT INTO d.users VALUES (1, 'Bob')`)
1071-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`)
1082+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR d.drivers WITH resolved = '100ms', diff`,
1083+
optOutOfMetamorphicDBLevelChangefeed{
1084+
reason: "test initializes multiple tables but doesn't watch all of them",
1085+
})
10721086
defer closeFeed(t, testFeed)
10731087

10741088
assertPayloads(t, testFeed, []string{
@@ -1119,7 +1133,11 @@ func TestAlterChangefeedDatabaseScope(t *testing.T) {
11191133
`INSERT INTO new_movr.drivers VALUES (1, 'Bob')`,
11201134
)
11211135

1122-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`)
1136+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff`,
1137+
optOutOfMetamorphicDBLevelChangefeed{
1138+
reason: "changefeed watches tables not in the default database",
1139+
},
1140+
)
11231141
defer closeFeed(t, testFeed)
11241142

11251143
assertPayloads(t, testFeed, []string{
@@ -1162,7 +1180,10 @@ func TestAlterChangefeedDatabaseScopeUnqualifiedName(t *testing.T) {
11621180
)
11631181

11641182
sqlDB.Exec(t, `USE movr`)
1165-
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`)
1183+
testFeed := feed(t, f, `CREATE CHANGEFEED FOR drivers WITH diff, resolved = '100ms'`,
1184+
optOutOfMetamorphicDBLevelChangefeed{
1185+
reason: "test initializes multiple tables but doesn't watch all of them",
1186+
})
11661187
defer closeFeed(t, testFeed)
11671188

11681189
assertPayloads(t, testFeed, []string{
@@ -1211,6 +1232,9 @@ func TestAlterChangefeedColumnFamilyDatabaseScope(t *testing.T) {
12111232
if _, ok := f.(*webhookFeedFactory); ok {
12121233
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "metamorphic enriched envelope does not support column families for webhook sinks"})
12131234
}
1235+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{
1236+
reason: "changefeed watches tables not in the default database",
1237+
})
12141238
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.drivers WITH diff, split_column_families`, args...)
12151239
defer closeFeed(t, testFeed)
12161240

@@ -1263,6 +1287,7 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
12631287
if _, ok := f.(*webhookFeedFactory); ok {
12641288
args = append(args, optOutOfMetamorphicEnrichedEnvelope{reason: "see comment"})
12651289
}
1290+
args = append(args, optOutOfMetamorphicDBLevelChangefeed{reason: "uses non default DB"})
12661291

12671292
testFeed := feed(t, f, `CREATE CHANGEFEED FOR movr.users WITH diff, resolved = '100ms'`, args...)
12681293
defer closeFeed(t, testFeed)
@@ -1551,7 +1576,10 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
15511576

15521577
registry := s.Server.JobRegistry().(*jobs.Registry)
15531578
testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo
1554-
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`)
1579+
WITH resolved = '100ms', min_checkpoint_frequency='1ns'`,
1580+
optOutOfMetamorphicDBLevelChangefeed{
1581+
reason: "test initializes multiple tables but doesn't watch all of them",
1582+
})
15551583

15561584
g := ctxgroup.WithContext(context.Background())
15571585
g.Go(func() error {
@@ -1861,7 +1889,9 @@ func TestAlterChangefeedAccessControl(t *testing.T) {
18611889
rootDB := sqlutils.MakeSQLRunner(s.DB)
18621890

18631891
createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
1864-
successfulFeed := feed(t, f, stmt)
1892+
successfulFeed := feed(t, f, stmt, optOutOfMetamorphicDBLevelChangefeed{
1893+
reason: "test initializes multiple tables but doesn't watch all of them",
1894+
})
18651895
closeCf := func() {
18661896
closeFeed(t, successfulFeed)
18671897
}
@@ -2050,7 +2080,9 @@ func TestAlterChangefeedRandomizedTargetChanges(t *testing.T) {
20502080
createStmt := fmt.Sprintf(
20512081
`CREATE CHANGEFEED FOR %s WITH updated`, strings.Join(initialTables, ", "))
20522082
t.Log(createStmt)
2053-
testFeed := feed(t, f, createStmt)
2083+
testFeed := feed(t, f, createStmt, optOutOfMetamorphicDBLevelChangefeed{
2084+
reason: "db level feeds don't support ALTERing targets with ADD/DROP TARGETS",
2085+
})
20542086
defer closeFeed(t, testFeed)
20552087

20562088
feed, ok := testFeed.(cdctest.EnterpriseTestFeed)

pkg/ccl/changefeedccl/changefeed.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ func AllTargets(
7979
if len(cd.TargetSpecifications) > 1 {
8080
return changefeedbase.Targets{}, errors.AssertionFailedf("database-level changefeed is not supported with multiple targets")
8181
}
82-
targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp)
82+
_, useFullTableName := cd.Opts[changefeedbase.OptFullTableName]
83+
targets, err = getTargetsFromDatabaseSpec(ctx, ts, execCfg, timestamp, useFullTableName)
8384
if err != nil {
8485
return changefeedbase.Targets{}, err
8586
}
@@ -117,6 +118,7 @@ func getTargetsFromDatabaseSpec(
117118
ts jobspb.ChangefeedTargetSpecification,
118119
execCfg *sql.ExecutorConfig,
119120
timestamp hlc.Timestamp,
121+
useFullTableName bool,
120122
) (targets changefeedbase.Targets, err error) {
121123
err = sql.DescsTxn(ctx, execCfg, func(
122124
ctx context.Context, txn isql.Txn, descs *descs.Collection,
@@ -172,15 +174,21 @@ func getTargetsFromDatabaseSpec(
172174
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
173175
}
174176

177+
tableName := func() string {
178+
if useFullTableName {
179+
return fullyQualifiedTableName
180+
}
181+
return desc.GetName()
182+
}()
175183
targets.Add(changefeedbase.Target{
176184
Type: tableType,
177185
DescID: desc.GetID(),
178-
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
186+
StatementTimeName: changefeedbase.StatementTimeName(tableName),
179187
})
180188
}
181189
case tree.IncludeFilter:
182-
for name := range ts.FilterList.Tables {
183-
tn, err := parser.ParseTableName(name)
190+
for fullyQualifiedTableName := range ts.FilterList.Tables {
191+
tn, err := parser.ParseTableName(fullyQualifiedTableName)
184192
if err != nil {
185193
return err
186194
}
@@ -215,10 +223,16 @@ func getTargetsFromDatabaseSpec(
215223
tableType = jobspb.ChangefeedTargetSpecification_EACH_FAMILY
216224
}
217225

226+
tableName := func() string {
227+
if useFullTableName {
228+
return fullyQualifiedTableName
229+
}
230+
return desc.GetName()
231+
}()
218232
targets.Add(changefeedbase.Target{
219233
Type: tableType,
220234
DescID: tableID,
221-
StatementTimeName: changefeedbase.StatementTimeName(desc.GetName()),
235+
StatementTimeName: changefeedbase.StatementTimeName(tableName),
222236
})
223237
}
224238
default:

0 commit comments

Comments
 (0)