From aa79556de0c84e811fa545ff90d6566f7026c3df Mon Sep 17 00:00:00 2001 From: Aerin Freilich Date: Tue, 4 Nov 2025 14:46:25 -0500 Subject: [PATCH] changefeedccl: change db-level feed default to no initial scan This change the default initial scan behavior for db-level feeds to not do an initial scan if no option is specified. When an initial scan is specified, that is respected. We do not allow db-level changefeeds with initial_scan='only'. This includes a rewrite of existing db-level feeds tests which rely on default initial scans in their testing. Epic: CRDB-1421 Informs: #156484 Release note: None --- pkg/ccl/changefeedccl/changefeed_stmt.go | 29 +- pkg/ccl/changefeedccl/changefeed_test.go | 513 +++++++++++------- .../show_changefeed_jobs_test.go | 6 +- 3 files changed, 360 insertions(+), 188 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 7bbaa0a3e620..c6fd28617d85 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -230,9 +230,18 @@ func changefeedPlanHook( return nil, nil, false, err } - // Treat all tables inside a database as if "split_column_families" is set. if changefeedStmt.Level == tree.ChangefeedLevelDatabase { + // Treat all tables inside a database as if "split_column_families" is set. rawOpts[changefeedbase.OptSplitColumnFamilies] = `yes` + + // The default behavior for a database-level changefeed is + // NOT to perform an initial scan, unlike table-level changefeeds. + _, initialScanSet := rawOpts[changefeedbase.OptInitialScan] + _, initialScanOnlySet := rawOpts[changefeedbase.OptInitialScanOnly] + _, noInitialScanSet := rawOpts[changefeedbase.OptNoInitialScan] + if !initialScanOnlySet && !noInitialScanSet && !initialScanSet { + rawOpts[changefeedbase.OptInitialScan] = `no` + } } opts := changefeedbase.MakeStatementOptions(rawOpts) @@ -1462,6 +1471,16 @@ func validateDetailsAndOptions( if err := opts.ValidateForCreateChangefeed(details.Select != ""); err != nil { return err } + if isDBLevelChangefeed(details) { + scanType, err := opts.GetInitialScanType() + if err != nil { + return err + } + if scanType == changefeedbase.OnlyInitialScan { + return errors.Errorf( + `cannot specify %s on a database level changefeed`, changefeedbase.OptInitialScanOnly) + } + } if opts.HasEndTime() { scanType, err := opts.GetInitialScanType() if err != nil { @@ -2123,6 +2142,14 @@ func getQualifiedTableNameObj( return tbName, nil } +func isDBLevelChangefeed(details jobspb.ChangefeedDetails) bool { + targetSpecs := details.TargetSpecifications + if len(targetSpecs) == 0 { + return false + } + return targetSpecs[0].Type == jobspb.ChangefeedTargetSpecification_DATABASE +} + // getChangefeedTargetName gets a table name with or without the dots func getChangefeedTargetName( ctx context.Context, diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 219c14db4b08..e584df3d2dfc 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -172,22 +172,11 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) { testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`) foo := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) defer closeFeed(t, foo) - // 'initial' is skipped because only the latest value ('updated') is - // emitted by the initial scan. - assertPayloads(t, foo, []string{ - `foo: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1, "b": "a"}}`, @@ -223,178 +212,244 @@ func TestDatabaseLevelChangefeedBasics(t *testing.T) { }) } - cdcTest(t, testFn) + // TODO(#158105): Add support for sinkless in db-level feeds tests. + cdcTest(t, testFn, feedTestEnterpriseSinks) } -func TestDatabaseLevelChangefeedWithIncludeFilter(t *testing.T) { +func TestDatabaseLevelChangefeedWithFilter(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - for i := range 4 { - name := fmt.Sprintf("foo%d", i+1) - sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) - sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name)) - sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name)) - } - - sqlDB.Exec(t, `CREATE SCHEMA private`) - sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`) - // Test that if there are multiple tables with the same name the correct - // one will still be found using the default schema of public. - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1`) - defer closeFeed(t, feed1) - assertPayloads(t, feed1, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + type testCase struct { + name string + createStmt string + expectedPayloads []string + expectedErr string + } - // Test that we can include multiple tables. - feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,foo2`) - defer closeFeed(t, feed2) - assertPayloads(t, feed2, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + // Each test case will create 5 tables: foo1, foo2, foo3, foo4, and private.foo1. + // Then the test case creates the changefeed with the statement shown and + // inserts a row into each table. + testCases := []testCase{ + { + name: "include_single_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + }, + }, + { + name: "include_multiple_tables", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, foo2", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + }, + }, + { + name: "include_qualified_names", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.public.foo2, public.foo3, foo4", + expectedPayloads: []string{ + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "include_nonexistent_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, bob", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + }, + }, + { + name: "include_partially_qualified_as_schema_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.foo1, public.foo2", + expectedPayloads: []string{ + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + }, + }, + { + // Test that if there are multiple tables with the same name the correct + // one will still be found using the default schema of public. + name: "exclude_potentially_ambiguous_table_name", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "exclude_single_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "exclude_multiple_tables", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2, foo3", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "exclude_qualified_names", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + }, + }, + { + name: "exclude_nonexistent_table", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bob", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + { + name: "include_error_table_outside_database", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES fizz.buzz.foo", + expectedErr: `table "fizz.buzz.foo" must be in target database "d"`, + }, + { + name: "include_error_table_pattern", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*", + expectedErr: `at or near "*": syntax error`, + }, + { + name: "exclude_error_table_outside_database", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES fizz.buzz.foo", + expectedErr: `table "fizz.buzz.foo" must be in target database "d"`, + }, + { + name: "exclude_error_table_pattern", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*", + expectedErr: `at or near "*": syntax error`, + }, + } - // Test that we can handle fully qualified, partially qualified, and unqualified - // table names. - feed3 := feed(t, f, - `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.public.foo2, public.foo3, foo4`) - defer closeFeed(t, feed3) - assertPayloads(t, feed3, []string{ - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) - // Test that we can handle tables that don't exist. - feed4 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1,bob`) - defer closeFeed(t, feed4) - assertPayloads(t, feed4, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) + } + sqlDB.Exec(t, `CREATE SCHEMA private`) + sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) - // Test that fully qualified table names outside of the target database will - // cause an error. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES fizz.buzz.foo`, - `table "fizz.buzz.foo" must be in target database "d"`) + if tc.expectedErr != "" { + expectErrCreatingFeed(t, f, tc.createStmt, tc.expectedErr) + return + } - // Table patterns are not supported. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo.*`, - `at or near "*": syntax error`) + feed := feed(t, f, tc.createStmt) + defer closeFeed(t, feed) - // Test that name resolution is not dependent on search_path() or current DB. - sqlDB.Exec(t, `CREATE DATABASE notd`) - sqlDB.Exec(t, `USE notd`) - sqlDB.Exec(t, `SET search_path TO notpublic`) - feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, private.foo1`) - defer closeFeed(t, feed5) - assertPayloads(t, feed5, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - }) + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (1, '%s')`, name, name)) + } + sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (1, 'private.foo1')`) - // Test that partially qualified table names are always assumed to be - // .. - feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES d.foo1, public.foo2`) - defer closeFeed(t, feed6) - assertPayloads(t, feed6, []string{ - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, + assertPayloads(t, feed, tc.expectedPayloads) + } + cdcTest(t, testFn, feedTestEnterpriseSinks) }) } - cdcTest(t, testFn, feedTestEnterpriseSinks) } -func TestDatabaseLevelChangefeedWithExcludeFilter(t *testing.T) { +func TestDatabaseLevelChangefeedNameResolutionIsSearchPathIndependent(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { - sqlDB := sqlutils.MakeSQLRunner(s.DB) - for i := range 4 { - name := fmt.Sprintf("foo%d", i+1) - sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) - sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO %s VALUES (0, 'initial')`, name)) - sqlDB.Exec(t, fmt.Sprintf(`UPSERT INTO %s VALUES (0, 'updated')`, name)) - } - - sqlDB.Exec(t, `CREATE SCHEMA private`) - sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO private.foo1 VALUES (0, 'initial')`) - // Test that if there are multiple tables with the same name the correct - // one will still be found using the default schema of public. - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1`) - defer closeFeed(t, feed1) - assertPayloads(t, feed1, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - feed2 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2`) - defer closeFeed(t, feed2) - assertPayloads(t, feed2, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that we can exclude multiple tables. - feed3 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo2,foo3`) - defer closeFeed(t, feed3) - assertPayloads(t, feed3, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that we can handle fully qualified, partially qualified, and unqualified - // table names. - feed4 := feed(t, f, - `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES d.public.foo2, public.foo3, foo4`) - defer closeFeed(t, feed4) - assertPayloads(t, feed4, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that we can handle tables that don't exist. - feed5 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES bob`) - defer closeFeed(t, feed5) - assertPayloads(t, feed5, []string{ - `foo1: [0]->{"after": {"a": 0, "b": "initial"}}`, - `foo1: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, - }) - - // Test that fully qualified table names outside of the target database will - // cause an error. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES fizz.buzz.foo`, - `table "fizz.buzz.foo" must be in target database "d"`) - - // Table patterns are not supported. - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo.*`, - `at or near "*": syntax error`) - - // Test that name resolution is not dependent on search_path() or current DB - sqlDB.Exec(t, `CREATE DATABASE notd`) - sqlDB.Exec(t, `USE notd`) - sqlDB.Exec(t, `SET search_path TO notpublic`) - feed6 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1, private.foo1`) - defer closeFeed(t, feed6) - assertPayloads(t, feed6, []string{ - `foo2: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo3: [0]->{"after": {"a": 0, "b": "updated"}}`, - `foo4: [0]->{"after": {"a": 0, "b": "updated"}}`, + type testCase struct { + name string + createStmt string + expectedPayloads []string + } + + // Each test case will create 5 tables: foo1, foo2, foo3, foo4, and private.foo1. + // Then the test case creates the changefeed with the statement shown and + // inserts a row into each table. + testCases := []testCase{ + { + name: "include_filter", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + }, + }, + { + name: "include_filter_with_private_schema", + createStmt: "CREATE CHANGEFEED FOR DATABASE d INCLUDE TABLES foo1, private.foo1", + expectedPayloads: []string{ + `foo1: [1]->{"after": {"a": 1, "b": "foo1"}}`, + `foo1: [1]->{"after": {"a": 1, "b": "private.foo1"}}`, + }, + }, + { + name: "exclude_filter", + createStmt: "CREATE CHANGEFEED FOR DATABASE d EXCLUDE TABLES foo1, private.foo1", + expectedPayloads: []string{ + `foo2: [1]->{"after": {"a": 1, "b": "foo2"}}`, + `foo3: [1]->{"after": {"a": 1, "b": "foo3"}}`, + `foo4: [1]->{"after": {"a": 1, "b": "foo4"}}`, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s (a INT PRIMARY KEY, b STRING)`, name)) + } + sqlDB.Exec(t, `CREATE SCHEMA private`) + sqlDB.Exec(t, `CREATE TABLE private.foo1 (a INT PRIMARY KEY, b STRING)`) + + // Switch to a different database and search_path to show that it + // doesn't affect table resolution for the changefeed. + sqlDB.Exec(t, `CREATE DATABASE notd`) + sqlDB.Exec(t, `USE notd`) + sqlDB.Exec(t, `SET search_path TO notpublic`) + sqlDB.Exec(t, `CREATE TABLE notd.foo1 (a INT PRIMARY KEY, b STRING)`) + + // Create feed - should resolve tables relative to database d, not current search_path + feed := feed(t, f, tc.createStmt) + defer closeFeed(t, feed) + + sqlDB.Exec(t, `INSERT INTO notd.foo1 VALUES (0, 'notd.notpublic.foo1')`) + sqlDB.Exec(t, `INSERT INTO d.private.foo1 VALUES (1, 'private.foo1')`) + for i := range 4 { + name := fmt.Sprintf("foo%d", i+1) + sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO d.%s VALUES (1, '%s')`, name, name)) + } + + assertPayloads(t, feed, tc.expectedPayloads) + } + + cdcTest(t, testFn, feedTestEnterpriseSinks) }) } - cdcTest(t, testFn, feedTestEnterpriseSinks) } func TestChangefeedBasicQuery(t *testing.T) { @@ -1150,21 +1205,10 @@ func TestDatabaseLevelChangefeedDiff(t *testing.T) { testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) sqlDB.Exec(t, `CREATE TABLE foo2 (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `INSERT INTO foo2 VALUES (0, 'initial')`) - sqlDB.Exec(t, `UPSERT INTO foo2 VALUES (0, 'updated')`) d := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH diff`) defer closeFeed(t, d) - // 'initial' is skipped because only the latest value ('updated') is - // emitted by the initial scan. - assertPayloads(t, d, []string{ - `foo: [0]->{"after": {"a": 0, "b": "updated"}, "before": null}`, - `foo2: [0]->{"after": {"a": 0, "b": "updated"}, "before": null}`, - }) - sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) assertPayloads(t, d, []string{ `foo: [1]->{"after": {"a": 1, "b": "a"}, "before": null}`, @@ -1209,7 +1253,7 @@ func TestDatabaseLevelChangefeedDiff(t *testing.T) { }) } - cdcTest(t, testFn) + cdcTest(t, testFn, feedTestEnterpriseSinks) } func TestChangefeedTenants(t *testing.T) { @@ -12686,7 +12730,7 @@ func TestChangefeedBareFullProtobuf(t *testing.T) { } } -func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) { +func TestDatabaseLevelChangefeedRenameDatabase(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -12694,12 +12738,14 @@ func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE DATABASE foo;`) sqlDB.Exec(t, `CREATE TABLE foo.bar (id INT PRIMARY KEY);`) + + feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`) + defer closeFeed(t, feed1) + sqlDB.Exec(t, `INSERT INTO foo.bar VALUES (1);`) expectedRows := []string{ `bar: [1]->{"after": {"id": 1}}`, } - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE foo`) - defer closeFeed(t, feed1) assertPayloads(t, feed1, expectedRows) sqlDB.Exec(t, `ALTER DATABASE foo RENAME TO bar;`) @@ -12711,22 +12757,23 @@ func TestDatabaseRenameDuringDatabaseLevelChangefeed(t *testing.T) { } // TODO(#152196): Remove feedTestUseRootUserConnection once we have ALTER // DEFAULT PRIVILEGES for databases - cdcTest(t, testFn, feedTestUseRootUserConnection) + cdcTest(t, testFn, feedTestEnterpriseSinks, feedTestUseRootUserConnection) } -func TestTableRenameDuringDatabaseLevelChangefeed(t *testing.T) { +func TestDatabaseLevelChangefeedRenameTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE d.bar (id INT PRIMARY KEY);`) + feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) + defer closeFeed(t, feed1) + sqlDB.Exec(t, `INSERT INTO d.bar VALUES (1);`) expectedRows := []string{ `bar: [1]->{"after": {"id": 1}}`, } - feed1 := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) - defer closeFeed(t, feed1) assertPayloads(t, feed1, expectedRows) sqlDB.Exec(t, `ALTER TABLE d.bar RENAME TO foo;`) @@ -12736,7 +12783,7 @@ func TestTableRenameDuringDatabaseLevelChangefeed(t *testing.T) { } assertPayloads(t, feed1, expectedRows) } - cdcTest(t, testFn) + cdcTest(t, testFn, feedTestEnterpriseSinks) } func getChangefeedLoggingChannel(sv *settings.Values) logpb.Channel { @@ -12821,7 +12868,7 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) sqlDB.Exec(t, `INSERT INTO bar VALUES (0, 'initial')`) }() - foo := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) + foo := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d WITH initial_scan='yes'`) defer closeFeed(t, foo) assertPayloads(t, foo, []string{ `foo: [0]->{"after": {"a": 0, "b": "updated"}}`, @@ -12913,3 +12960,103 @@ func TestDatabaseLevelChangefeedChangingTableset(t *testing.T) { } }) } + +// TestDatabaseLevelChangefeedWithInitialScanOptions tests the different initial +// scan options for a database-level changefeed. It makes sure that the default +// is NOT to perform an initial scan, unlike table-level changefeeds, but that +// it still respects the initial_scan_only option. No initial scan is not supported +// no matter how it is specified. +func TestDatabaseLevelChangefeedWithInitialScanOptions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + type testCase struct { + name string + option string + expectedBehavior string + } + + tests := []testCase{ + { + option: "no_initial_scan", + expectedBehavior: "no", + }, + { + option: "initial_scan = 'only'", + expectedBehavior: "only (error)", + }, + { + option: "initial_scan_only", + expectedBehavior: "only (error)", + }, + { + option: "initial_scan = 'yes'", + expectedBehavior: "yes", + }, + { + option: "initial_scan", + expectedBehavior: "yes", + }, + { + name: "no_option_specified", + option: "", + expectedBehavior: "no", + }, + { + option: "initial_scan = 'no'", + expectedBehavior: "no", + }, + } + + for _, tc := range tests { + if tc.name == "" { + tc.name = tc.option + } + t.Run(tc.name, func(t *testing.T) { + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (0, 'initial')`) + + var createStmt string + if tc.option == "" { + createStmt = `CREATE CHANGEFEED FOR DATABASE d` + } else { + createStmt = fmt.Sprintf(`CREATE CHANGEFEED FOR DATABASE d WITH %s`, tc.option) + } + + if tc.expectedBehavior == "only (error)" { + expectedErr := fmt.Sprintf( + `cannot specify %s on a database level changefeed`, + changefeedbase.OptInitialScanOnly, + ) + sqlDB.ExpectErrWithTimeout( + t, expectedErr, createStmt, + ) + return + } + + feed := feed(t, f, createStmt) + defer closeFeed(t, feed) + + if tc.expectedBehavior == "yes" { + assertPayloads(t, feed, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, + `bar: [0]->{"after": {"a": 0, "b": "initial"}}`, + }) + } + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'new')`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'new')`) + + assertPayloads(t, feed, []string{ + `foo: [1]->{"after": {"a": 1, "b": "new"}}`, + `bar: [1]->{"after": {"a": 1, "b": "new"}}`, + }) + } + cdcTest(t, testFn, feedTestEnterpriseSinks) + }) + } +} diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 6a763cdaeba8..2401163b0227 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -78,10 +78,8 @@ func TestShowChangefeedJobsDatabaseLevel(t *testing.T) { dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE d`) defer closeFeed(t, dbcf) - assertPayloads(t, dbcf, []string{ - `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, - `bar: [1]->{"after": {"a": 1, "b": "initial"}}`, - }) + // Unlike the table-level changefeed, tcf, database level changefeeds + // do not perform an initial scan by default. waitForJobState(sqlDB, t, dbcf.(cdctest.EnterpriseTestFeed).JobID(), jobs.StateRunning) t.Run("without watched tables", func(t *testing.T) {