-
Notifications
You must be signed in to change notification settings - Fork 37
*: add more unit tests #4017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
*: add more unit tests #4017
Conversation
|
Skipping CI for Draft Pull Request. |
Summary of ChangesHello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the project's test suite by adding a substantial volume of new unit tests and improving existing ones. The changes aim to bolster the stability and correctness of critical components by covering more edge cases, validating configurations rigorously, and ensuring proper behavior under various conditions. The integration of Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant number of unit tests across various packages, which greatly enhances the project's test coverage and robustness. The new tests are well-structured and cover many important scenarios, including concurrency and edge cases. Additionally, the PR improves the testing infrastructure by setting up leak detection in several packages. I've found one test case that appears to be testing Go's standard library functions instead of project-specific code, which I've left a comment on. Overall, this is a valuable contribution to improving the project's quality.
pkg/sink/mysql/db_helper_test.go
Outdated
| func TestDecodePassword(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| password string | ||
| want string | ||
| needEncode bool | ||
| needEscape bool | ||
| }{ | ||
| { | ||
| name: "case1", | ||
| password: "123456", | ||
| want: "123456", | ||
| }, | ||
| { | ||
| name: "case2", | ||
| password: "asdeer112", | ||
| want: "asdeer112", | ||
| needEncode: true, | ||
| }, | ||
| { | ||
| name: "case3", | ||
| password: "asdeer112!@#&", | ||
| want: "asdeer112!@#&", | ||
| needEscape: true, | ||
| }, | ||
| { | ||
| name: "case4", | ||
| password: "!@#12312//", | ||
| want: "!@#12312//", | ||
| needEncode: true, | ||
| needEscape: true, | ||
| }, | ||
| } | ||
| for _, c := range tests { | ||
| var err error | ||
| if c.needEscape { | ||
| c.password = url.QueryEscape(c.password) | ||
| } | ||
| if c.needEncode { | ||
| c.password = base64.StdEncoding.EncodeToString([]byte(c.password)) | ||
| tem, err := base64.StdEncoding.DecodeString(c.password) | ||
| c.password = string(tem) | ||
| require.NoError(t, err, c.name) | ||
| } | ||
| if c.needEscape { | ||
| c.password, err = url.QueryUnescape(c.password) | ||
| require.NoError(t, err, c.name) | ||
| } | ||
| require.Equal(t, c.want, c.password) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test TestDecodePassword appears to test the behavior of Go's standard library functions (url.QueryEscape, url.QueryUnescape, base64.StdEncoding.EncodeToString, base64.StdEncoding.DecodeString) rather than any specific function from this package. It doesn't seem to call a decodePassword function from the project. As it stands, this test adds little value by testing standard library behavior and could be confusing. If it's intended to test a specific function, that function should be called within the test. Otherwise, it might be better to remove this test to avoid confusion.
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
📝 WalkthroughWalkthroughThis pull request adds comprehensive test coverage across the codebase, introducing over 40 new test files and test functions covering REST client operations, configuration validations, error handling, etcd operations, orchestrator workers, filter utilities, sink protocols, transaction utilities, and upstream management. One import path is updated to reflect a module migration. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🤖 Fix all issues with AI agents
In `@pkg/api/internal/rest/client_test.go`:
- Around line 90-102: The test dereferences c returned from restClient without
checking its error, which can panic if restClient fails; change the call to
capture the error (e.g., c, err := restClient(testServer)) and assert failure if
err != nil using require.NoError(t, err) or require.NotNil(t, c) before calling
c.Post/Get/Put/Delete so the test fails cleanly instead of panicking.
In `@pkg/config/large_message_test.go`:
- Around line 119-123: The comments claiming "`enable-tidb-extension` is false,
return error" are misleading for ProtocolOpen and ProtocolSimple because
AdjustAndValidate on largeMessageHandle does not validate enableTiDBExtension
for those protocols (only ProtocolCanalJSON does); update the comments near
tests that use largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) and
AdjustAndValidate(ProtocolSimple, false) to either remove the incorrect phrase
or reword to state that enableTiDBExtension has no effect for these protocols,
and keep the existing require.NoError assertions unchanged; search for
references to AdjustAndValidate, ProtocolOpen, ProtocolSimple,
ProtocolCanalJSON, and largeMessageHandle.LargeMessageHandleOption to locate all
affected test comments and correct them consistently.
In `@pkg/config/sink_protocol_test.go`:
- Around line 67-73: The loop over testCases doesn't assert that parsing
succeeded when tc.expectedErr is empty, so modify the test inside the loop that
calls ParseSinkProtocolFromString(tc.protocol) to explicitly check the error: if
tc.expectedErr != "" assert that err is non-nil and matches the regexp (using
require.Error/Regexp or require.Regexp with err.Error()), else assert
require.NoError(t, err) and then require.Equal(t, tc.expectedProtocolEnum,
protocol); reference the ParseSinkProtocolFromString call and
tc.expectedErr/tc.expectedProtocolEnum to make the checks.
In `@pkg/etcd/client_test.go`:
- Around line 34-39: The mockClient.Get currently never flips getOK from false
to true so it always returns success; change the logic in mockClient.Get to
return an error on the first call and succeed on the second by checking if getOK
is false, set m.getOK = true and return the mock error, otherwise return the
successful &clientv3.GetResponse{}; this ensures the retry path is exercised.
- Around line 124-129: The defer calling server.Close() is placed before
verifying error from SetupEmbedEtcd which can cause a nil deref if setup fails;
move the defer until after the require.Nil(t, err) (or after explicitly checking
err != nil) so server is only closed when non-nil — update the code around
SetupEmbedEtcd, the local variables ctx, url, server, err, and the deferred
server.Close() to perform error checking first and then defer server.Close().
- Around line 73-119: In TestRetry, the global maxTries is mutated for the test
and not restored if the test fails early; change the code so you capture
originValue and immediately schedule restoring maxTries via defer (or t.Cleanup)
right after setting maxTries = 2 to guarantee restoration even on early
exits/failures; reference the TestRetry function and the maxTries symbol when
making this change.
In `@pkg/etcd/util_test.go`:
- Line 30: The test assertion in util_test.go calls require.Equal with arguments
reversed; swap the order so the expected value (rev) is the second-to-last
argument and the actual value is the last, i.e. change require.Equal(t,
getRevisionFromWatchOpts(opt), rev) to require.Equal(t, rev,
getRevisionFromWatchOpts(opt)) so failures show correct messages for the
getRevisionFromWatchOpts result.
In `@pkg/orchestrator/batch_test.go`:
- Around line 47-52: The test constructs largeSizePatches using a
SingleDataPatch that currently creates a byte slice of size etcdTxnMaxSize which
equals the limit; change the allocation in the SingleDataPatch Func to make the
slice one byte larger (etcdTxnMaxSize + 1) so the patch truly exceeds the txn
max size and the test reliably exercises the "exceed size" path.
- Around line 62-68: The closure in the for-loop building manyOpsPatches
captures the loop variable i, causing all SingleDataPatch.Func closures to
reference the final i; fix by introducing a per-iteration shadow (e.g., assign
ii := i inside the loop) and use that shadow when calling
util.NewEtcdKey(fmt.Sprintf("/key%d", ii)) and inside the Func body when
building newValue (fmt.Sprintf("abc%d", ii)) so each closure captures its own
iteration value; update the loop that constructs manyOpsPatches accordingly
(referencing manyOpsPatches, SingleDataPatch, Func, and etcdTxnMaxOps).
In `@pkg/orchestrator/etcd_worker_test.go`:
- Around line 274-280: The test currently swallows timeouts by returning when
err from errg.Wait() is a DeadlineExceeded/Canceled or contains "etcdserver:
request timeout"; instead of returning, explicitly skip or fail the test—replace
the bare return with t.Skipf(...) (e.g. t.Skipf("skipping due to transient
environment timeout: %v", err)) for those transient conditions so flakiness is
recorded, and leave require.Nil(t, err) for all other errors; update the
error-check block around errg.Wait() and errors.Cause(err) accordingly.
- Around line 717-768: The test TestModifyAfterDelete spawns a goroutine that
calls require.Nil(t, err) inside the worker1.Run callback which violates Go's
testing rules; change the goroutine to capture the error returned by worker1.Run
and send it on a dedicated error channel (e.g., errCh) back to the main test
goroutine, then in the main goroutine receive from errCh and assert with
require.Nil(t, err). Specifically, replace the inline require in the anonymous
function that calls worker1.Run with sending the error into errCh, close or
signal errCh when done, and move the require.Nil(t, err) assertion to the main
test flow after wg.Wait()/receiving the error; keep the rest of
TestModifyAfterDelete, modifyReactor, deleteReactor, and worker setup unchanged.
- Around line 200-271: setUpTest currently returns a closure newClient that
calls require.Nil(t, err) (unsafe in goroutines) and TestEtcdSum also uses
require inside errgroup goroutines; change setUpTest to return func()
(etcd.Client, error) and update the closure to return the client and any error
instead of calling require, then in TestEtcdSum replace require calls inside
errg.Go functions with error returns (wrap/annotate the error) and only call
require.* from the main test goroutine after errg.Wait() or handle errors
returned by errgroup; update all references to newClient() to handle the
(client, error) tuple and propagate errors through errgroup so FailNow is never
called from spawned goroutines.
In `@pkg/upstream/upstream_test.go`:
- Around line 88-137: Add a short clarifying comment above the up.etcdCli = nil
before calling up.Close() explaining that registerTopologyInfo stores the
topology key with a lease on up.session, and that setting etcdCli to nil tests
graceful nil handling of the Upstream while calling up.Close() (which closes
up.session and triggers lease expiration and etcd key cleanup) still causes the
topology key to be removed; reference the Upstream methods/fields
registerTopologyInfo, up.etcdCli, up.session and Upstream.Close() in the
comment.
🧹 Nitpick comments (11)
pkg/etcd/util_test.go (1)
27-31: Consider adding explicit edge case coverage.The random test is good for general coverage, but
rand.Int63n(math.MaxInt64)only generates values in[0, MaxInt64). Consider adding explicit test cases for boundary values like0,1, andmath.MaxInt64.Optional: Add explicit edge cases
func TestGetRevisionFromWatchOpts(t *testing.T) { t.Parallel() + // Test explicit edge cases + edgeCases := []int64{0, 1, math.MaxInt64} + for _, rev := range edgeCases { + opt := clientv3.WithRev(rev) + require.Equal(t, rev, getRevisionFromWatchOpts(opt)) + } + + // Test random values for i := 0; i < 100; i++ { rev := rand.Int63n(math.MaxInt64) opt := clientv3.WithRev(rev) - require.Equal(t, getRevisionFromWatchOpts(opt), rev) + require.Equal(t, rev, getRevisionFromWatchOpts(opt)) } }pkg/config/sink_protocol_test.go (1)
24-65: Expand test cases to cover all supported protocols.
ParseSinkProtocolFromStringsupportscsv,debezium, andsimple, but the test tables don’t include them. Consider adding those toTestParseSinkProtocolFromString, and includeProtocolCsv/ProtocolDebezium/ProtocolSimple/ProtocolUnknowninTestStringandTestIsBatchEncoderto keep coverage aligned with the parser.Also applies to: 80-112, 122-154
pkg/etcd/etcdkey_test.go (1)
117-118: Swap expected/actual in require.Equal to match testify convention.The testify
require.Equalsignature isfunc Equal(t TestingT, expected, actual interface{}, ...). The current assertions pass arguments in reversed order, which produces unclear failure diffs.♻️ Proposed change
- require.Equal(t, k, tc.expected) - require.Equal(t, k.String(), tc.key) + require.Equal(t, tc.expected, k) + require.Equal(t, tc.key, k.String())pkg/errors/helper_test.go (1)
50-50: Consider swappingrequire.Equalarguments for clearer error messages.The
require.Equalfunction expects(expected, actual)but the code passes(actual, expected). While this doesn't break the test, failure messages will display "expected X but got Y" with the values swapped.Suggested fix
- require.Equal(t, we.Error(), tc.expected) + require.Equal(t, tc.expected, we.Error())pkg/config/server_config_test.go (1)
134-157: Consider addingt.Parallel()for consistency.Other tests in this file use
t.Parallel()but this test does not. Adding it would maintain consistency and potentially speed up test execution.Suggested fix
func TestIsValidClusterID(t *testing.T) { + t.Parallel() cases := []struct {pkg/config/messages_test.go (2)
23-37: Consider addingt.Parallel()for consistency.Most tests in the config package use
t.Parallel()for concurrent execution.Suggested fix
func TestDefaultMessageServerConfig(t *testing.T) { + t.Parallel() // This test case does a sanity check on default config.
79-83: Consider adding error message validation for consistency.Other illegal config test cases verify the error message matches
".*ErrInvalidServerOption.*", but the negativeMaxRecvMsgSizecase only checks for an error without validating the message.Suggested fix
illegalConfig = defaultMessageConfig.Clone() illegalConfig.MaxRecvMsgSize = -1 err = illegalConfig.ValidateAndAdjust() require.Error(t, err) + require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error())pkg/sink/kafka/claimcheck/claim_check_test.go (1)
37-43: Add a nil guard before callingFileNameWithPrefix.It improves clarity and avoids accidental panics if construction behavior changes.
✅ Proposed tweak
claimCheck, err = New(ctx, largeHandleConfig, changefeedID) require.NoError(t, err) +require.NotNil(t, claimCheck) fileName := claimCheck.FileNameWithPrefix("file.json") require.Equal(t, "file:///tmp/abc/file.json", fileName)pkg/filter/expr_filter_bench_test.go (1)
117-124: Avoid testify assertions inside the benchmark loop.This can distort benchmark timing; validate once, then keep the hot loop lean.
♻️ Proposed refactor
for _, c := range cases { b.Run(c.name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - ignore, err := f.shouldSkipDML(c.dmlType, c.preRow, c.row, tableInfo) - require.NoError(b, err) - require.Equal(b, c.shouldSkip, ignore) - } + ignore, err := f.shouldSkipDML(c.dmlType, c.preRow, c.row, tableInfo) + require.NoError(b, err) + require.Equal(b, c.shouldSkip, ignore) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := f.shouldSkipDML(c.dmlType, c.preRow, c.row, tableInfo); err != nil { + b.Fatal(err) + } + } }) }pkg/api/internal/rest/request_test.go (1)
70-91: Assert the request completes without error.
Right now the test only validates the header; it will still pass if the request fails after header injection.🛠️ Suggested tweak
- _ = req.Do(context.Background()) + if err := req.Do(context.Background()).Error(); err != nil { + t.Fatalf("unexpected request error: %v", err) + }pkg/orchestrator/etcd_worker_test.go (1)
332-374: Replace fixed sleep with polling for improved test reliability under load.The
time.Sleep(500 * time.Millisecond)can be flaky under load. Userequire.Eventuallywith a readiness condition—such as a channel from the reactor or by pollingstate.valwith proper synchronization (e.g.,sync.Mutexorsync/atomic)—to ensure the reactor has processed the initial batch before writing the second batch.Example:
// Add sync.Mutex to intReactorState, or use atomic operations require.Eventually(t, func() bool { return state.GetVal() >= 999 // or appropriate checkpoint }, 5*time.Second, 10*time.Millisecond)Alternatively, expose a readiness channel from the reactor to signal when it's ready for the next batch.
| c, _ := restClient(testServer) | ||
|
|
||
| req := c.Post() | ||
| require.NotNil(t, req) | ||
|
|
||
| req = c.Get() | ||
| require.NotNil(t, req) | ||
|
|
||
| req = c.Put() | ||
| require.NotNil(t, req) | ||
|
|
||
| req = c.Delete() | ||
| require.NotNil(t, req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle restClient errors before dereferencing.
If restClient fails, c may be nil and cause a panic.
✅ Proposed fix
- c, _ := restClient(testServer)
+ c, err := restClient(testServer)
+ require.NoError(t, err)
+ require.NotNil(t, c)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| c, _ := restClient(testServer) | |
| req := c.Post() | |
| require.NotNil(t, req) | |
| req = c.Get() | |
| require.NotNil(t, req) | |
| req = c.Put() | |
| require.NotNil(t, req) | |
| req = c.Delete() | |
| require.NotNil(t, req) | |
| c, err := restClient(testServer) | |
| require.NoError(t, err) | |
| require.NotNil(t, c) | |
| req := c.Post() | |
| require.NotNil(t, req) | |
| req = c.Get() | |
| require.NotNil(t, req) | |
| req = c.Put() | |
| require.NotNil(t, req) | |
| req = c.Delete() | |
| require.NotNil(t, req) |
🤖 Prompt for AI Agents
In `@pkg/api/internal/rest/client_test.go` around lines 90 - 102, The test
dereferences c returned from restClient without checking its error, which can
panic if restClient fails; change the call to capture the error (e.g., c, err :=
restClient(testServer)) and assert failure if err != nil using
require.NoError(t, err) or require.NotNil(t, c) before calling
c.Post/Get/Put/Delete so the test fails cleanly instead of panicking.
| largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly | ||
| // `enable-tidb-extension` is false, return error | ||
| err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) | ||
| require.NoError(t, err) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Get the AdjustAndValidate implementation
rg -n "func.*AdjustAndValidate" pkg/config/large_message.go -A 50Repository: pingcap/ticdc
Length of output: 2033
🏁 Script executed:
#!/bin/bash
# Read the test file at specified lines
sed -n '115,200p' pkg/config/large_message_test.goRepository: pingcap/ticdc
Length of output: 3368
Remove or correct misleading comments about "enable-tidb-extension = false" returning error for Open/Simple protocols.
For ProtocolOpen and ProtocolSimple, the AdjustAndValidate implementation does not validate the enableTiDBExtension parameter—it returns no error regardless of its value. Only ProtocolCanalJSON performs this validation. The comments at lines 119-123, 143-146, 167-171, and 191-194 stating "enable-tidb-extension is false, return error" contradict both the actual behavior and the correct assertions (require.NoError). Either remove these comments or reword them to clarify that enableTiDBExtension has no effect for these protocol types.
🤖 Prompt for AI Agents
In `@pkg/config/large_message_test.go` around lines 119 - 123, The comments
claiming "`enable-tidb-extension` is false, return error" are misleading for
ProtocolOpen and ProtocolSimple because AdjustAndValidate on largeMessageHandle
does not validate enableTiDBExtension for those protocols (only
ProtocolCanalJSON does); update the comments near tests that use
largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) and
AdjustAndValidate(ProtocolSimple, false) to either remove the incorrect phrase
or reword to state that enableTiDBExtension has no effect for these protocols,
and keep the existing require.NoError assertions unchanged; search for
references to AdjustAndValidate, ProtocolOpen, ProtocolSimple,
ProtocolCanalJSON, and largeMessageHandle.LargeMessageHandleOption to locate all
affected test comments and correct them consistently.
| for _, tc := range testCases { | ||
| protocol, err := ParseSinkProtocolFromString(tc.protocol) | ||
| if tc.expectedErr != "" { | ||
| require.Regexp(t, tc.expectedErr, err) | ||
| } else { | ||
| require.Equal(t, tc.expectedProtocolEnum, protocol) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert error-free parses explicitly.
A non-nil error in the success path would be ignored, potentially masking failures.
✅ Suggested fix
for _, tc := range testCases {
protocol, err := ParseSinkProtocolFromString(tc.protocol)
if tc.expectedErr != "" {
require.Regexp(t, tc.expectedErr, err)
+ require.Equal(t, ProtocolUnknown, protocol)
} else {
+ require.NoError(t, err)
require.Equal(t, tc.expectedProtocolEnum, protocol)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, tc := range testCases { | |
| protocol, err := ParseSinkProtocolFromString(tc.protocol) | |
| if tc.expectedErr != "" { | |
| require.Regexp(t, tc.expectedErr, err) | |
| } else { | |
| require.Equal(t, tc.expectedProtocolEnum, protocol) | |
| } | |
| for _, tc := range testCases { | |
| protocol, err := ParseSinkProtocolFromString(tc.protocol) | |
| if tc.expectedErr != "" { | |
| require.Regexp(t, tc.expectedErr, err) | |
| require.Equal(t, ProtocolUnknown, protocol) | |
| } else { | |
| require.NoError(t, err) | |
| require.Equal(t, tc.expectedProtocolEnum, protocol) | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@pkg/config/sink_protocol_test.go` around lines 67 - 73, The loop over
testCases doesn't assert that parsing succeeded when tc.expectedErr is empty, so
modify the test inside the loop that calls
ParseSinkProtocolFromString(tc.protocol) to explicitly check the error: if
tc.expectedErr != "" assert that err is non-nil and matches the regexp (using
require.Error/Regexp or require.Regexp with err.Error()), else assert
require.NoError(t, err) and then require.Equal(t, tc.expectedProtocolEnum,
protocol); reference the ParseSinkProtocolFromString call and
tc.expectedErr/tc.expectedProtocolEnum to make the checks.
| func (m *mockClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) { | ||
| if m.getOK { | ||
| m.getOK = true | ||
| return nil, errors.New("mock error") | ||
| } | ||
| return &clientv3.GetResponse{}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry path isn’t exercised in mockClient.Get.
getOK never flips from false to true, so the mock always succeeds and the retry logic isn’t actually tested. Flip the flag on the first call so the second attempt can succeed.
🐛 Proposed fix
- if m.getOK {
- m.getOK = true
- return nil, errors.New("mock error")
- }
- return &clientv3.GetResponse{}, nil
+ if !m.getOK {
+ m.getOK = true
+ return nil, errors.New("mock error")
+ }
+ return &clientv3.GetResponse{}, nil🤖 Prompt for AI Agents
In `@pkg/etcd/client_test.go` around lines 34 - 39, The mockClient.Get currently
never flips getOK from false to true so it always returns success; change the
logic in mockClient.Get to return an error on the first call and succeed on the
second by checking if getOK is false, set m.getOK = true and return the mock
error, otherwise return the successful &clientv3.GetResponse{}; this ensures the
retry path is exercised.
| func TestRetry(t *testing.T) { | ||
| // here we need to change maxTries, which is not thread safe | ||
| // so we don't use t.Parallel() for this test | ||
|
|
||
| originValue := maxTries | ||
| // to speedup the test | ||
| maxTries = 2 | ||
|
|
||
| cli := clientv3.NewCtxClient(context.TODO()) | ||
| cli.KV = &mockClient{} | ||
| retrycli := Wrap(cli, nil) | ||
| get, err := retrycli.Get(context.TODO(), "") | ||
|
|
||
| require.NoError(t, err) | ||
| require.NotNil(t, get) | ||
|
|
||
| _, err = retrycli.Put(context.TODO(), "", "") | ||
| require.NotNil(t, err) | ||
| require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| // Test Txn case | ||
| // case 0: normal | ||
| rsp, err := retrycli.Txn(ctx, nil, nil, nil) | ||
| require.NoError(t, err) | ||
| require.False(t, rsp.Succeeded) | ||
|
|
||
| // case 1: errors.ErrReachMaxTry | ||
| _, err = retrycli.Txn(ctx, txnEmptyCmps, nil, nil) | ||
| require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) | ||
|
|
||
| // case 2: errors.ErrReachMaxTry | ||
| _, err = retrycli.Txn(ctx, nil, txnEmptyOpsThen, nil) | ||
| require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) | ||
|
|
||
| // case 3: context.DeadlineExceeded | ||
| _, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, nil) | ||
| require.Equal(t, context.DeadlineExceeded, err) | ||
|
|
||
| // other case: mock error | ||
| _, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, TxnEmptyOpsElse) | ||
| require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) | ||
|
|
||
| maxTries = originValue | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure maxTries is restored even on early failures.
If any require.* fails, the test exits before the last line, leaving the global mutated and potentially impacting other tests. Use defer (or t.Cleanup) immediately after the assignment.
🐛 Proposed fix
originValue := maxTries
// to speedup the test
maxTries = 2
+ defer func() { maxTries = originValue }()
@@
- maxTries = originValue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestRetry(t *testing.T) { | |
| // here we need to change maxTries, which is not thread safe | |
| // so we don't use t.Parallel() for this test | |
| originValue := maxTries | |
| // to speedup the test | |
| maxTries = 2 | |
| cli := clientv3.NewCtxClient(context.TODO()) | |
| cli.KV = &mockClient{} | |
| retrycli := Wrap(cli, nil) | |
| get, err := retrycli.Get(context.TODO(), "") | |
| require.NoError(t, err) | |
| require.NotNil(t, get) | |
| _, err = retrycli.Put(context.TODO(), "", "") | |
| require.NotNil(t, err) | |
| require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| defer cancel() | |
| // Test Txn case | |
| // case 0: normal | |
| rsp, err := retrycli.Txn(ctx, nil, nil, nil) | |
| require.NoError(t, err) | |
| require.False(t, rsp.Succeeded) | |
| // case 1: errors.ErrReachMaxTry | |
| _, err = retrycli.Txn(ctx, txnEmptyCmps, nil, nil) | |
| require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) | |
| // case 2: errors.ErrReachMaxTry | |
| _, err = retrycli.Txn(ctx, nil, txnEmptyOpsThen, nil) | |
| require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) | |
| // case 3: context.DeadlineExceeded | |
| _, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, nil) | |
| require.Equal(t, context.DeadlineExceeded, err) | |
| // other case: mock error | |
| _, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, TxnEmptyOpsElse) | |
| require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) | |
| maxTries = originValue | |
| } | |
| func TestRetry(t *testing.T) { | |
| // here we need to change maxTries, which is not thread safe | |
| // so we don't use t.Parallel() for this test | |
| originValue := maxTries | |
| // to speedup the test | |
| maxTries = 2 | |
| defer func() { maxTries = originValue }() | |
| cli := clientv3.NewCtxClient(context.TODO()) | |
| cli.KV = &mockClient{} | |
| retrycli := Wrap(cli, nil) | |
| get, err := retrycli.Get(context.TODO(), "") | |
| require.NoError(t, err) | |
| require.NotNil(t, get) | |
| _, err = retrycli.Put(context.TODO(), "", "") | |
| require.NotNil(t, err) | |
| require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| defer cancel() | |
| // Test Txn case | |
| // case 0: normal | |
| rsp, err := retrycli.Txn(ctx, nil, nil, nil) | |
| require.NoError(t, err) | |
| require.False(t, rsp.Succeeded) | |
| // case 1: errors.ErrReachMaxTry | |
| _, err = retrycli.Txn(ctx, txnEmptyCmps, nil, nil) | |
| require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) | |
| // case 2: errors.ErrReachMaxTry | |
| _, err = retrycli.Txn(ctx, nil, txnEmptyOpsThen, nil) | |
| require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) | |
| // case 3: context.DeadlineExceeded | |
| _, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, nil) | |
| require.Equal(t, context.DeadlineExceeded, err) | |
| // other case: mock error | |
| _, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, TxnEmptyOpsElse) | |
| require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) | |
| } |
🤖 Prompt for AI Agents
In `@pkg/etcd/client_test.go` around lines 73 - 119, In TestRetry, the global
maxTries is mutated for the test and not restored if the test fails early;
change the code so you capture originValue and immediately schedule restoring
maxTries via defer (or t.Cleanup) right after setting maxTries = 2 to guarantee
restoration even on early exits/failures; reference the TestRetry function and
the maxTries symbol when making this change.
| for i := 0; i <= etcdTxnMaxOps*2; i++ { | ||
| manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{ | ||
| Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)), | ||
| Func: func(old []byte) (newValue []byte, changed bool, err error) { | ||
| newValue = []byte(fmt.Sprintf("abc%d", i)) | ||
| return newValue, true, nil | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cd /tmp && find . -name "batch_test.go" -path "*/orchestrator/*" 2>/dev/null | head -5Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
git ls-files | grep -E "orchestrator/batch_test\.go"Repository: pingcap/ticdc
Length of output: 87
🏁 Script executed:
cat -n pkg/orchestrator/batch_test.go | sed -n '50,85p'Repository: pingcap/ticdc
Length of output: 1208
🏁 Script executed:
cat -n pkg/orchestrator/batch_test.go | sed -n '1,80p'Repository: pingcap/ticdc
Length of output: 3182
🏁 Script executed:
grep -n "for.*:=.*i.*:=" pkg/orchestrator/batch_test.go | head -10Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
grep -n "func getBatchChangedState" pkg/orchestrator/batch_test.goRepository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
grep -n "func getBatchChangedState" pkg/orchestrator/*.go | head -5Repository: pingcap/ticdc
Length of output: 214
🏁 Script executed:
rg "func getBatchChangedState" -A 20 pkg/orchestrator/Repository: pingcap/ticdc
Length of output: 1492
🏁 Script executed:
rg "func getChangedState" -A 30 pkg/orchestrator/Repository: pingcap/ticdc
Length of output: 1159
🏁 Script executed:
rg "func.*Patch.*patch.*state.*changedSet" -A 20 pkg/orchestrator/Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
rg "type SingleDataPatch" -A 15 pkg/orchestrator/Repository: pingcap/ticdc
Length of output: 1117
Fix loop-variable capture in patch funcs.
In the for loop starting at line 62, the closure captures the same i variable; all funcs will see the final value after the loop completes. Shadow i per-iteration like the earlier loop at line 28 to ensure each closure captures its own iteration's value.
🔧 Suggested fix
for i := 0; i <= etcdTxnMaxOps*2; i++ {
+ i := i
manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{
Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
newValue = []byte(fmt.Sprintf("abc%d", i))
return newValue, true, nil🤖 Prompt for AI Agents
In `@pkg/orchestrator/batch_test.go` around lines 62 - 68, The closure in the
for-loop building manyOpsPatches captures the loop variable i, causing all
SingleDataPatch.Func closures to reference the final i; fix by introducing a
per-iteration shadow (e.g., assign ii := i inside the loop) and use that shadow
when calling util.NewEtcdKey(fmt.Sprintf("/key%d", ii)) and inside the Func body
when building newValue (fmt.Sprintf("abc%d", ii)) so each closure captures its
own iteration value; update the loop that constructs manyOpsPatches accordingly
(referencing manyOpsPatches, SingleDataPatch, Func, and etcdTxnMaxOps).
| func setUpTest(t *testing.T) (func() etcd.Client, func()) { | ||
| url, server, err := etcd.SetupEmbedEtcd(t.TempDir()) | ||
| require.Nil(t, err) | ||
| endpoints := []string{url.String()} | ||
| return func() etcd.Client { | ||
| rawCli, err := clientv3.NewFromURLs(endpoints) | ||
| require.Nil(t, err) | ||
| return etcd.Wrap(rawCli, map[string]prometheus.Counter{}) | ||
| }, func() { | ||
| server.Close() | ||
| } | ||
| } | ||
|
|
||
| func TestEtcdSum(t *testing.T) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) | ||
| defer cancel() | ||
|
|
||
| newClient, closer := setUpTest(t) | ||
| defer closer() | ||
|
|
||
| cli := newClient() | ||
| defer func() { | ||
| _ = cli.Unwrap().Close() | ||
| }() | ||
| _, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0") | ||
| require.Nil(t, err) | ||
|
|
||
| initArray := make([]int, numValuesPerGroup) | ||
| jsonStr, err := json.Marshal(initArray) | ||
| require.Nil(t, err) | ||
|
|
||
| for i := 0; i < numGroups; i++ { | ||
| _, err := cli.Put(ctx, testEtcdKeyPrefix+"/"+strconv.Itoa(i), string(jsonStr)) | ||
| require.Nil(t, err) | ||
| } | ||
|
|
||
| errg, ctx := errgroup.WithContext(ctx) | ||
| for i := 0; i < numValuesPerGroup+1; i++ { | ||
| finalI := i | ||
| errg.Go(func() error { | ||
| values := make([][]int, numGroups) | ||
| for j := range values { | ||
| values[j] = make([]int, numValuesPerGroup) | ||
| } | ||
|
|
||
| reactor := &simpleReactor{ | ||
| state: nil, | ||
| id: finalI, | ||
| } | ||
|
|
||
| initState := &simpleReactorState{ | ||
| values: values, | ||
| sum: 0, | ||
| deltas: nil, | ||
| patches: nil, | ||
| } | ||
|
|
||
| cli := newClient() | ||
| cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default") | ||
| require.Nil(t, err) | ||
| defer func() { | ||
| _ = cli.Unwrap().Close() | ||
| }() | ||
|
|
||
| etcdWorker, err := NewEtcdWorker(cdcCli, testEtcdKeyPrefix, reactor, initState, | ||
| &migrate.NoOpMigrator{}) | ||
| if err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "owner")) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
testify require FailNow goroutine safety behavior
💡 Result:
In Testify, anything in require.* ultimately fails the test by calling t.FailNow() (directly or indirectly). t.FailNow() (and therefore t.Fatal, require.*, etc.) must be called only from the goroutine that is running the test/benchmark function—not from a goroutine you start with go func(){...}(). Otherwise, it will only stop that goroutine (via runtime.Goexit), while the test goroutine may keep running/waiting, which can lead to flaky tests, hangs, and resource leaks. [1][2]
Key behavior to remember:
require.*terminates the current test on failure (it “Fatal”s) → so it inherits the same goroutine restriction ast.FailNow(). [1][3]- If called in a non-test goroutine,
FailNow/Fatal/require.*does not abort the test, it just aborts that goroutine; other goroutines continue. [2] - This is well-known enough that linters flag it (e.g.,
go vet’stestinggoroutineand tools liketestifylint). [4]
Typical fixes:
- Do assertions in the main test goroutine (send results over a channel from worker goroutines, then
require.NoError(t, err)in the test goroutine). [2][4] - If you must report from inside a goroutine, prefer non-FailNow reporting (
t.Error,assert.*) and still ensure the test doesn’t exit before the goroutine finishes (e.g.,WaitGroup). [2]
Sources:
[1] https://pkg.go.dev/github.com/stretchr/testify (require goroutine restriction)
[2] https://pkg.go.dev/testing (FailNow goroutine rule + Goexit behavior)
[3] https://deepwiki.com/stretchr/testify (require calls t.FailNow)
[4] https://github.com/Antonboom/testifylint (go-require checker rationale / goroutine issue)
🏁 Script executed:
# Check for other require calls in the test file
rg "require\." pkg/orchestrator/etcd_worker_test.go -A 2 -B 2Repository: pingcap/ticdc
Length of output: 5659
🏁 Script executed:
# Search for the context around the flagged lines to see if there are other require calls
wc -l pkg/orchestrator/etcd_worker_test.goRepository: pingcap/ticdc
Length of output: 97
Fix unsafe require calls in goroutines by propagating errors instead.
Calling require.* from goroutines spawned with errgroup is unsafe—FailNow() terminates only the spawned goroutine, not the test, leading to flaky tests and potential hangs.
The code has two problematic patterns: setUpTest returns a closure newClient() containing require.Nil(t, err) that gets called from goroutines, and TestEtcdSum directly uses require.Nil(t, err) inside errg.Go().
Change setUpTest to return func() (etcd.Client, error), replace require with error returns, and handle errors in the goroutine before passing them through errgroup:
Proposed fix
-func setUpTest(t *testing.T) (func() etcd.Client, func()) {
+func setUpTest(t *testing.T) (func() (etcd.Client, error), func()) {
url, server, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
endpoints := []string{url.String()}
- return func() etcd.Client {
- rawCli, err := clientv3.NewFromURLs(endpoints)
- require.Nil(t, err)
- return etcd.Wrap(rawCli, map[string]prometheus.Counter{})
- }, func() {
+ return func() (etcd.Client, error) {
+ rawCli, err := clientv3.NewFromURLs(endpoints)
+ if err != nil {
+ return nil, err
+ }
+ return etcd.Wrap(rawCli, map[string]prometheus.Counter{}), nil
+ }, func() {
server.Close()
}
}- cli := newClient()
- cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
- require.Nil(t, err)
+ cli, err := newClient()
+ if err != nil {
+ return errors.Trace(err)
+ }
+ cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
+ if err != nil {
+ return errors.Trace(err)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func setUpTest(t *testing.T) (func() etcd.Client, func()) { | |
| url, server, err := etcd.SetupEmbedEtcd(t.TempDir()) | |
| require.Nil(t, err) | |
| endpoints := []string{url.String()} | |
| return func() etcd.Client { | |
| rawCli, err := clientv3.NewFromURLs(endpoints) | |
| require.Nil(t, err) | |
| return etcd.Wrap(rawCli, map[string]prometheus.Counter{}) | |
| }, func() { | |
| server.Close() | |
| } | |
| } | |
| func TestEtcdSum(t *testing.T) { | |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) | |
| defer cancel() | |
| newClient, closer := setUpTest(t) | |
| defer closer() | |
| cli := newClient() | |
| defer func() { | |
| _ = cli.Unwrap().Close() | |
| }() | |
| _, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0") | |
| require.Nil(t, err) | |
| initArray := make([]int, numValuesPerGroup) | |
| jsonStr, err := json.Marshal(initArray) | |
| require.Nil(t, err) | |
| for i := 0; i < numGroups; i++ { | |
| _, err := cli.Put(ctx, testEtcdKeyPrefix+"/"+strconv.Itoa(i), string(jsonStr)) | |
| require.Nil(t, err) | |
| } | |
| errg, ctx := errgroup.WithContext(ctx) | |
| for i := 0; i < numValuesPerGroup+1; i++ { | |
| finalI := i | |
| errg.Go(func() error { | |
| values := make([][]int, numGroups) | |
| for j := range values { | |
| values[j] = make([]int, numValuesPerGroup) | |
| } | |
| reactor := &simpleReactor{ | |
| state: nil, | |
| id: finalI, | |
| } | |
| initState := &simpleReactorState{ | |
| values: values, | |
| sum: 0, | |
| deltas: nil, | |
| patches: nil, | |
| } | |
| cli := newClient() | |
| cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default") | |
| require.Nil(t, err) | |
| defer func() { | |
| _ = cli.Unwrap().Close() | |
| }() | |
| etcdWorker, err := NewEtcdWorker(cdcCli, testEtcdKeyPrefix, reactor, initState, | |
| &migrate.NoOpMigrator{}) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "owner")) | |
| }) | |
| func setUpTest(t *testing.T) (func() (etcd.Client, error), func()) { | |
| url, server, err := etcd.SetupEmbedEtcd(t.TempDir()) | |
| require.Nil(t, err) | |
| endpoints := []string{url.String()} | |
| return func() (etcd.Client, error) { | |
| rawCli, err := clientv3.NewFromURLs(endpoints) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return etcd.Wrap(rawCli, map[string]prometheus.Counter{}), nil | |
| }, func() { | |
| server.Close() | |
| } | |
| } | |
| func TestEtcdSum(t *testing.T) { | |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) | |
| defer cancel() | |
| newClient, closer := setUpTest(t) | |
| defer closer() | |
| cli := newClient() | |
| defer func() { | |
| _ = cli.Unwrap().Close() | |
| }() | |
| _, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0") | |
| require.Nil(t, err) | |
| initArray := make([]int, numValuesPerGroup) | |
| jsonStr, err := json.Marshal(initArray) | |
| require.Nil(t, err) | |
| for i := 0; i < numGroups; i++ { | |
| _, err := cli.Put(ctx, testEtcdKeyPrefix+"/"+strconv.Itoa(i), string(jsonStr)) | |
| require.Nil(t, err) | |
| } | |
| errg, ctx := errgroup.WithContext(ctx) | |
| for i := 0; i < numValuesPerGroup+1; i++ { | |
| finalI := i | |
| errg.Go(func() error { | |
| values := make([][]int, numGroups) | |
| for j := range values { | |
| values[j] = make([]int, numValuesPerGroup) | |
| } | |
| reactor := &simpleReactor{ | |
| state: nil, | |
| id: finalI, | |
| } | |
| initState := &simpleReactorState{ | |
| values: values, | |
| sum: 0, | |
| deltas: nil, | |
| patches: nil, | |
| } | |
| cli, err := newClient() | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default") | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| defer func() { | |
| _ = cli.Unwrap().Close() | |
| }() | |
| etcdWorker, err := NewEtcdWorker(cdcCli, testEtcdKeyPrefix, reactor, initState, | |
| &migrate.NoOpMigrator{}) | |
| if err != nil { | |
| return errors.Trace(err) | |
| } | |
| return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "owner")) | |
| }) |
🤖 Prompt for AI Agents
In `@pkg/orchestrator/etcd_worker_test.go` around lines 200 - 271, setUpTest
currently returns a closure newClient that calls require.Nil(t, err) (unsafe in
goroutines) and TestEtcdSum also uses require inside errgroup goroutines; change
setUpTest to return func() (etcd.Client, error) and update the closure to return
the client and any error instead of calling require, then in TestEtcdSum replace
require calls inside errg.Go functions with error returns (wrap/annotate the
error) and only call require.* from the main test goroutine after errg.Wait() or
handle errors returned by errgroup; update all references to newClient() to
handle the (client, error) tuple and propagate errors through errgroup so
FailNow is never called from spawned goroutines.
| err = errg.Wait() | ||
| if err != nil && (errors.Cause(err) == context.DeadlineExceeded || | ||
| errors.Cause(err) == context.Canceled || | ||
| strings.Contains(err.Error(), "etcdserver: request timeout")) { | ||
| return | ||
| } | ||
| require.Nil(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t silently pass on timeouts—skip or fail explicitly.
Returning here makes the test pass even if it times out, which can hide regressions. Consider t.Skipf for environmental flakiness or fail explicitly.
📝 Suggested change
-if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
- errors.Cause(err) == context.Canceled ||
- strings.Contains(err.Error(), "etcdserver: request timeout")) {
- return
-}
-require.Nil(t, err)
+if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
+ errors.Cause(err) == context.Canceled ||
+ strings.Contains(err.Error(), "etcdserver: request timeout")) {
+ t.Skipf("etcd timeout: %v", err)
+}
+require.NoError(t, err)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| err = errg.Wait() | |
| if err != nil && (errors.Cause(err) == context.DeadlineExceeded || | |
| errors.Cause(err) == context.Canceled || | |
| strings.Contains(err.Error(), "etcdserver: request timeout")) { | |
| return | |
| } | |
| require.Nil(t, err) | |
| err = errg.Wait() | |
| if err != nil && (errors.Cause(err) == context.DeadlineExceeded || | |
| errors.Cause(err) == context.Canceled || | |
| strings.Contains(err.Error(), "etcdserver: request timeout")) { | |
| t.Skipf("etcd timeout: %v", err) | |
| } | |
| require.NoError(t, err) |
🤖 Prompt for AI Agents
In `@pkg/orchestrator/etcd_worker_test.go` around lines 274 - 280, The test
currently swallows timeouts by returning when err from errg.Wait() is a
DeadlineExceeded/Canceled or contains "etcdserver: request timeout"; instead of
returning, explicitly skip or fail the test—replace the bare return with
t.Skipf(...) (e.g. t.Skipf("skipping due to transient environment timeout: %v",
err)) for those transient conditions so flakiness is recorded, and leave
require.Nil(t, err) for all other errors; update the error-check block around
errg.Wait() and errors.Cause(err) accordingly.
| func TestModifyAfterDelete(t *testing.T) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) | ||
| defer cancel() | ||
|
|
||
| newClient, closer := setUpTest(t) | ||
| defer closer() | ||
|
|
||
| cli1 := newClient() | ||
| cdcCli1, err := etcd.NewCDCEtcdClient(ctx, cli1.Unwrap(), "default") | ||
| require.Nil(t, err) | ||
|
|
||
| cli2 := newClient() | ||
| cdcCli2, err := etcd.NewCDCEtcdClient(ctx, cli2.Unwrap(), "default") | ||
| require.Nil(t, err) | ||
|
|
||
| _, err = cli1.Put(ctx, "/test/key1", "original value") | ||
| require.Nil(t, err) | ||
|
|
||
| modifyReactor := &modifyOneReactor{ | ||
| key: []byte("/test/key1"), | ||
| value: []byte("modified value"), | ||
| waitOnCh: make(chan struct{}), | ||
| } | ||
| worker1, err := NewEtcdWorker(cdcCli1, "/test", modifyReactor, &commonReactorState{ | ||
| state: make(map[string]string), | ||
| }, &migrate.NoOpMigrator{}) | ||
| require.Nil(t, err) | ||
|
|
||
| var wg sync.WaitGroup | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| err := worker1.Run(ctx, nil, time.Millisecond*100, "owner") | ||
| require.Nil(t, err) | ||
| }() | ||
|
|
||
| modifyReactor.waitOnCh <- struct{}{} | ||
|
|
||
| deleteReactor := &modifyOneReactor{ | ||
| key: []byte("/test/key1"), | ||
| value: nil, // deletion | ||
| } | ||
| worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{ | ||
| state: make(map[string]string), | ||
| }, &migrate.NoOpMigrator{}) | ||
| require.Nil(t, err) | ||
|
|
||
| err = worker2.Run(ctx, nil, time.Millisecond*100, "owner") | ||
| require.Nil(t, err) | ||
|
|
||
| modifyReactor.waitOnCh <- struct{}{} | ||
| wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
testify require safe goroutine concurrent testing golang
💡 Result:
testify/require is not safe to use inside a goroutine (other than the test’s main goroutine), because require.* calls t.FailNow(). The Go testing docs state FailNow must be called from the goroutine running the test, not from goroutines created during the test. (pkg.go.dev) The require package docs explicitly repeat this consequence. (pkg.go.dev)
What to do instead:
- Don’t assert in goroutines: send errors/results back to the main test goroutine (via channel/errgroup/WaitGroup), then call
require.*in the main goroutine. - If you must report from a worker goroutine, use
testify/assert(it reports viat.Errorf-style failures rather thanFailNow), understanding the test won’t stop immediately. (stackoverflow.com) - Linters/analysis tools flag this pattern for the same reason (it doesn’t stop the test and can cause hangs/leaks), e.g. “go-require” checks. (github.com)
Citations:
- 1: https://pkg.go.dev/testing%40go1.25.3?utm_source=openai
- 2: https://pkg.go.dev/github.com/stretchr/testify/require?utm_source=openai
- 3: https://stackoverflow.com/questions/72850921/go-assertion-utility-functions-behave-like-non-blocking-operation?utm_source=openai
- 4: https://github.com/Antonboom/testifylint?utm_source=openai
🏁 Script executed:
# First, let's locate and examine the test file
fd "etcd_worker_test.go" --type fRepository: pingcap/ticdc
Length of output: 93
🏁 Script executed:
# Read the test file to see the actual code and context
cat -n pkg/orchestrator/etcd_worker_test.go | sed -n '710,775p'Repository: pingcap/ticdc
Length of output: 2383
Avoid require inside the worker goroutine; report errors back to the main test goroutine.
Calling require.Nil(t, err) inside a spawned goroutine (line 750) violates Go's testing contract. The testing package requires that t.FailNow() (called by require.*) be invoked only from the test's main goroutine. Use an error channel to return the result to the main goroutine, then assert there.
🔧 Proposed fix
var wg sync.WaitGroup
+errCh := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "owner")
- require.Nil(t, err)
+ errCh <- err
}()
modifyReactor.waitOnCh <- struct{}{}
deleteReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: nil, // deletion
}
worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)
err = worker2.Run(ctx, nil, time.Millisecond*100, "owner")
require.Nil(t, err)
modifyReactor.waitOnCh <- struct{}{}
wg.Wait()
+require.NoError(t, <-errCh)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestModifyAfterDelete(t *testing.T) { | |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) | |
| defer cancel() | |
| newClient, closer := setUpTest(t) | |
| defer closer() | |
| cli1 := newClient() | |
| cdcCli1, err := etcd.NewCDCEtcdClient(ctx, cli1.Unwrap(), "default") | |
| require.Nil(t, err) | |
| cli2 := newClient() | |
| cdcCli2, err := etcd.NewCDCEtcdClient(ctx, cli2.Unwrap(), "default") | |
| require.Nil(t, err) | |
| _, err = cli1.Put(ctx, "/test/key1", "original value") | |
| require.Nil(t, err) | |
| modifyReactor := &modifyOneReactor{ | |
| key: []byte("/test/key1"), | |
| value: []byte("modified value"), | |
| waitOnCh: make(chan struct{}), | |
| } | |
| worker1, err := NewEtcdWorker(cdcCli1, "/test", modifyReactor, &commonReactorState{ | |
| state: make(map[string]string), | |
| }, &migrate.NoOpMigrator{}) | |
| require.Nil(t, err) | |
| var wg sync.WaitGroup | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| err := worker1.Run(ctx, nil, time.Millisecond*100, "owner") | |
| require.Nil(t, err) | |
| }() | |
| modifyReactor.waitOnCh <- struct{}{} | |
| deleteReactor := &modifyOneReactor{ | |
| key: []byte("/test/key1"), | |
| value: nil, // deletion | |
| } | |
| worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{ | |
| state: make(map[string]string), | |
| }, &migrate.NoOpMigrator{}) | |
| require.Nil(t, err) | |
| err = worker2.Run(ctx, nil, time.Millisecond*100, "owner") | |
| require.Nil(t, err) | |
| modifyReactor.waitOnCh <- struct{}{} | |
| wg.Wait() | |
| func TestModifyAfterDelete(t *testing.T) { | |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) | |
| defer cancel() | |
| newClient, closer := setUpTest(t) | |
| defer closer() | |
| cli1 := newClient() | |
| cdcCli1, err := etcd.NewCDCEtcdClient(ctx, cli1.Unwrap(), "default") | |
| require.Nil(t, err) | |
| cli2 := newClient() | |
| cdcCli2, err := etcd.NewCDCEtcdClient(ctx, cli2.Unwrap(), "default") | |
| require.Nil(t, err) | |
| _, err = cli1.Put(ctx, "/test/key1", "original value") | |
| require.Nil(t, err) | |
| modifyReactor := &modifyOneReactor{ | |
| key: []byte("/test/key1"), | |
| value: []byte("modified value"), | |
| waitOnCh: make(chan struct{}), | |
| } | |
| worker1, err := NewEtcdWorker(cdcCli1, "/test", modifyReactor, &commonReactorState{ | |
| state: make(map[string]string), | |
| }, &migrate.NoOpMigrator{}) | |
| require.Nil(t, err) | |
| var wg sync.WaitGroup | |
| errCh := make(chan error, 1) | |
| wg.Add(1) | |
| go func() { | |
| defer wg.Done() | |
| err := worker1.Run(ctx, nil, time.Millisecond*100, "owner") | |
| errCh <- err | |
| }() | |
| modifyReactor.waitOnCh <- struct{}{} | |
| deleteReactor := &modifyOneReactor{ | |
| key: []byte("/test/key1"), | |
| value: nil, // deletion | |
| } | |
| worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{ | |
| state: make(map[string]string), | |
| }, &migrate.NoOpMigrator{}) | |
| require.Nil(t, err) | |
| err = worker2.Run(ctx, nil, time.Millisecond*100, "owner") | |
| require.Nil(t, err) | |
| modifyReactor.waitOnCh <- struct{}{} | |
| wg.Wait() | |
| require.NoError(t, <-errCh) | |
| } |
🤖 Prompt for AI Agents
In `@pkg/orchestrator/etcd_worker_test.go` around lines 717 - 768, The test
TestModifyAfterDelete spawns a goroutine that calls require.Nil(t, err) inside
the worker1.Run callback which violates Go's testing rules; change the goroutine
to capture the error returned by worker1.Run and send it on a dedicated error
channel (e.g., errCh) back to the main test goroutine, then in the main
goroutine receive from errCh and assert with require.Nil(t, err). Specifically,
replace the inline require in the anonymous function that calls worker1.Run with
sending the error into errCh, close or signal errCh when done, and move the
require.Nil(t, err) assertion to the main test flow after wg.Wait()/receiving
the error; keep the rest of TestModifyAfterDelete, modifyReactor, deleteReactor,
and worker setup unchanged.
| func TestRegisterTopo(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
| clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir()) | ||
| defer etcdServer.Close() | ||
|
|
||
| require.NoError(t, err) | ||
| logConfig := logutil.DefaultZapLoggerConfig | ||
| logConfig.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) | ||
|
|
||
| rawEtcdCli, err := clientv3.New(clientv3.Config{ | ||
| Endpoints: []string{clientURL.String()}, | ||
| Context: ctx, | ||
| LogConfig: &logConfig, | ||
| DialTimeout: 3 * time.Second, | ||
| }) | ||
| require.NoError(t, err) | ||
| defer rawEtcdCli.Close() | ||
| etcdCli := etcd.Wrap(rawEtcdCli, make(map[string]prometheus.Counter)) | ||
| up := &Upstream{ | ||
| cancel: func() {}, | ||
| etcdCli: etcdCli, | ||
| wg: &sync.WaitGroup{}, | ||
| } | ||
|
|
||
| info := node.NewInfo("localhost:8300", "test") | ||
| info.Version = "test.1.0" | ||
| err = up.registerTopologyInfo(ctx, &NodeTopologyCfg{ | ||
| Info: info, | ||
| GCServiceID: "clusterID", | ||
| SessionTTL: 2, | ||
| }) | ||
| require.NoError(t, err) | ||
|
|
||
| resp, err := etcdCli.Get(ctx, "/topology/ticdc/clusterID/localhost:8300") | ||
| require.NoError(t, err) | ||
|
|
||
| infoData, err := info.Marshal() | ||
| require.NoError(t, err) | ||
| require.Equal(t, infoData, resp.Kvs[0].Value) | ||
|
|
||
| up.etcdCli = nil | ||
| up.Close() | ||
| require.Eventually(t, func() bool { | ||
| resp, err := etcdCli.Get(ctx, "/topology/ticdc/clusterID/localhost:8300") | ||
| require.NoError(t, err) | ||
| return len(resp.Kvs) == 0 | ||
| }, time.Second*5, time.Millisecond*100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
ast-grep --pattern $'func ($_ *Upstream) Close() {
$$$
}'Repository: pingcap/ticdc
Length of output: 2498
🏁 Script executed:
rg -A 20 "func.*registerTopologyInfo" pkg/upstream/upstream.goRepository: pingcap/ticdc
Length of output: 711
🏁 Script executed:
# Check if there are comments explaining this pattern in the test file
rg -B 5 "up.etcdCli = nil" pkg/upstream/upstream_test.goRepository: pingcap/ticdc
Length of output: 181
Add a clarifying comment explaining why etcdCli is set to nil before Close().
The topology key deletion is lease-based: registerTopologyInfo stores the key with a lease tied to up.session, and when Close() calls up.session.Close(), the lease expires and etcd automatically deletes the key. Setting up.etcdCli = nil before Close() tests graceful nil handling while the session closure still triggers the cleanup. Add a comment above line 131 to explain this pattern, e.g., // Test graceful nil handling; session closure will still expire the lease and cleanup topology key.
🤖 Prompt for AI Agents
In `@pkg/upstream/upstream_test.go` around lines 88 - 137, Add a short clarifying
comment above the up.etcdCli = nil before calling up.Close() explaining that
registerTopologyInfo stores the topology key with a lease on up.session, and
that setting etcdCli to nil tests graceful nil handling of the Upstream while
calling up.Close() (which closes up.session and triggers lease expiration and
etcd key cleanup) still causes the topology key to be removed; reference the
Upstream methods/fields registerTopologyInfo, up.etcdCli, up.session and
Upstream.Close() in the comment.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: 3AceShowHand The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
@wk989898: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: ref #758
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.