Skip to content

Commit 2a3d503

Browse files
committed
wip
1 parent c64b197 commit 2a3d503

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,9 @@ func maybeForceDBLevelChangefeed(
11931193
}
11941194

11951195
switch f := f.(type) {
1196-
case *sinklessFeedFactory, *externalConnectionFeedFactory:
1196+
case *externalConnectionFeedFactory:
1197+
return maybeForceDBLevelChangefeed(t, create, f.TestFeedFactory, args)
1198+
case *sinklessFeedFactory:
11971199
t.Logf("did not force DB level changefeed for %s because %T is not supported", create, f)
11981200
return create, args, nil
11991201
}
@@ -1214,10 +1216,16 @@ func maybeForceDBLevelChangefeed(
12141216
}
12151217

12161218
opts := createAST.Options
1219+
// Read the initial scan type from the options so that we can force it to "yes"
1220+
// for DB level changefeeds in the default case.
1221+
initialScanType := ``
12171222
for _, opt := range opts {
12181223
key := opt.Key.String()
12191224
if strings.EqualFold(key, "initial_scan") {
1220-
if opt.Value != nil && opt.Value.String() == "only" {
1225+
if opt.Value != nil {
1226+
initialScanType = opt.Value.String()
1227+
}
1228+
if initialScanType == "only" {
12211229
t.Logf("did not force DB level changefeed for %s because it set initial scan only", create)
12221230
return create, args, nil
12231231
}
@@ -1226,6 +1234,9 @@ func maybeForceDBLevelChangefeed(
12261234
t.Logf("did not force DB level changefeed for %s because it set initial scan only", create)
12271235
return create, args, nil
12281236
}
1237+
if strings.EqualFold(key, "no_initial_scan") {
1238+
initialScanType = "no"
1239+
}
12291240
// Since DB level feeds set split column families, and split column families is incompatible
12301241
// with resolved for kafka and pubsub sinks, we skip DB level feeds metamorphic testing in
12311242
// that case.
@@ -1249,6 +1260,16 @@ func maybeForceDBLevelChangefeed(
12491260
createStmt.AST.(*tree.CreateChangefeed).Level = tree.ChangefeedLevelDatabase
12501261
createStmt.AST.(*tree.CreateChangefeed).TableTargets = nil
12511262
createStmt.AST.(*tree.CreateChangefeed).DatabaseTarget = tree.ChangefeedDatabaseTarget("d")
1263+
1264+
// Unlike table-level changefeeds, database-level changefeeds do not perform
1265+
// an initial scan by default. To convert a default table level feed into an
1266+
// equivalent DB level feed, we need to explicitly set the initial scan type.
1267+
if initialScanType == "" {
1268+
createStmt.AST.(*tree.CreateChangefeed).Options = append(createStmt.AST.(*tree.CreateChangefeed).Options, tree.KVOption{
1269+
Key: "initial_scan",
1270+
Value: tree.NewDString("yes"),
1271+
})
1272+
}
12521273
t.Logf("forcing DB level changefeed for %s", create)
12531274
create = tree.AsStringWithFlags(createStmt.AST, tree.FmtShowPasswords)
12541275

0 commit comments

Comments
 (0)