Skip to content

Commit af0c5bb

Browse files
committed
wip
1 parent c64b197 commit af0c5bb

File tree

1 file changed

+27
-2
lines changed

1 file changed

+27
-2
lines changed

pkg/ccl/changefeedccl/helpers_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,7 +1193,9 @@ func maybeForceDBLevelChangefeed(
11931193
}
11941194

11951195
switch f := f.(type) {
1196-
case *sinklessFeedFactory, *externalConnectionFeedFactory:
1196+
case *externalConnectionFeedFactory:
1197+
return maybeForceDBLevelChangefeed(t, create, f.TestFeedFactory, args)
1198+
case *sinklessFeedFactory:
11971199
t.Logf("did not force DB level changefeed for %s because %T is not supported", create, f)
11981200
return create, args, nil
11991201
}
@@ -1214,10 +1216,17 @@ func maybeForceDBLevelChangefeed(
12141216
}
12151217

12161218
opts := createAST.Options
1219+
// Read the initial scan type from the options so that we can force it to "yes"
1220+
// for DB level changefeeds in the default case.
1221+
initialScanType := ``
1222+
hasCursor := false
12171223
for _, opt := range opts {
12181224
key := opt.Key.String()
12191225
if strings.EqualFold(key, "initial_scan") {
1220-
if opt.Value != nil && opt.Value.String() == "only" {
1226+
if opt.Value != nil {
1227+
initialScanType = opt.Value.String()
1228+
}
1229+
if strings.EqualFold(initialScanType, "'only'") {
12211230
t.Logf("did not force DB level changefeed for %s because it set initial scan only", create)
12221231
return create, args, nil
12231232
}
@@ -1226,6 +1235,12 @@ func maybeForceDBLevelChangefeed(
12261235
t.Logf("did not force DB level changefeed for %s because it set initial scan only", create)
12271236
return create, args, nil
12281237
}
1238+
if strings.EqualFold(key, "no_initial_scan") {
1239+
initialScanType = "no"
1240+
}
1241+
if strings.EqualFold(key, "cursor") {
1242+
hasCursor = true
1243+
}
12291244
// Since DB level feeds set split column families, and split column families is incompatible
12301245
// with resolved for kafka and pubsub sinks, we skip DB level feeds metamorphic testing in
12311246
// that case.
@@ -1249,6 +1264,16 @@ func maybeForceDBLevelChangefeed(
12491264
createStmt.AST.(*tree.CreateChangefeed).Level = tree.ChangefeedLevelDatabase
12501265
createStmt.AST.(*tree.CreateChangefeed).TableTargets = nil
12511266
createStmt.AST.(*tree.CreateChangefeed).DatabaseTarget = tree.ChangefeedDatabaseTarget("d")
1267+
1268+
// Unlike table-level changefeeds, database-level changefeeds do not perform
1269+
// an initial scan by default. To convert a default table level feed into an
1270+
// equivalent DB level feed, we need to explicitly set the initial scan type.
1271+
if initialScanType == "" && !hasCursor {
1272+
createStmt.AST.(*tree.CreateChangefeed).Options = append(createStmt.AST.(*tree.CreateChangefeed).Options, tree.KVOption{
1273+
Key: "initial_scan",
1274+
Value: tree.NewDString("yes"),
1275+
})
1276+
}
12521277
t.Logf("forcing DB level changefeed for %s", create)
12531278
create = tree.AsStringWithFlags(createStmt.AST, tree.FmtShowPasswords)
12541279

0 commit comments

Comments
 (0)