Skip to content

Conversation

@wk989898
Copy link
Collaborator

@wk989898 wk989898 commented Jan 16, 2026

What problem does this PR solve?

Issue Number: ref #758

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Tests
    • Added comprehensive test coverage across REST client functionality, configuration validation, sink protocol handling, error handling, etcd operations, filter utilities, orchestrator state management, and upstream manager behavior.
    • Added test harnesses for leak detection across multiple packages to improve test reliability.

✏️ Tip: You can customize this high-level summary in your review settings.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 16, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Jan 16, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @wk989898, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the project's test suite by adding a substantial volume of new unit tests and improving existing ones. The changes aim to bolster the stability and correctness of critical components by covering more edge cases, validating configurations rigorously, and ensuring proper behavior under various conditions. The integration of goleak further strengthens the project's ability to identify and manage goroutine leaks, contributing to overall system health and reliability.

Highlights

  • Expanded Unit Test Coverage: Introduced a large number of new unit tests across various packages including API clients, configuration, error handling, etcd interactions, data filtering, orchestration, sink codecs, data structures, garbage collection, and upstream management.
  • Improved Goroutine Leak Detection: Integrated go.uber.org/goleak into TestMain functions in multiple packages to enhance the detection of goroutine leaks during testing, with specific ignorable functions configured.
  • Refined Test Logic: Updated existing test logic for better determinism, such as using time.UTC for timestamp conversions and simplifying default value assertions in pkg/common/event/mounter_test.go.
  • Comprehensive Configuration Validation Tests: Added extensive tests for configuration validation and adjustment across ServerConfig, DBConfig, KVClientConfig, SchedulerConfig, MessagesConfig, LargeMessageHandle, and SinkConfig.
  • Etcd Client and Orchestrator Robustness Tests: Included detailed tests for etcd client retry mechanisms, lease management, watch channel behavior, and orchestrator state linearizability, batching, and snapshot isolation during concurrent operations.
  • Sink Codec and URI Utility Tests: Added thorough tests for various sink codecs (Canal, Open Protocol, Simple) covering encoding/decoding, message size limits, batching, and callback handling. Also, new tests for URI parsing, IPv6 validation, and sensitive data masking.
  • Performance Benchmarks: Introduced benchmarks for expression filters (pkg/filter/expr_filter_bench_test.go) and map implementations (pkg/spanz/map_bench_test.go) to evaluate and compare performance characteristics.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant number of unit tests across various packages, which greatly enhances the project's test coverage and robustness. The new tests are well-structured and cover many important scenarios, including concurrency and edge cases. Additionally, the PR improves the testing infrastructure by setting up leak detection in several packages. I've found one test case that appears to be testing Go's standard library functions instead of project-specific code, which I've left a comment on. Overall, this is a valuable contribution to improving the project's quality.

Comment on lines 24 to 76
func TestDecodePassword(t *testing.T) {
t.Parallel()

tests := []struct {
name string
password string
want string
needEncode bool
needEscape bool
}{
{
name: "case1",
password: "123456",
want: "123456",
},
{
name: "case2",
password: "asdeer112",
want: "asdeer112",
needEncode: true,
},
{
name: "case3",
password: "asdeer112!@#&",
want: "asdeer112!@#&",
needEscape: true,
},
{
name: "case4",
password: "!@#12312//",
want: "!@#12312//",
needEncode: true,
needEscape: true,
},
}
for _, c := range tests {
var err error
if c.needEscape {
c.password = url.QueryEscape(c.password)
}
if c.needEncode {
c.password = base64.StdEncoding.EncodeToString([]byte(c.password))
tem, err := base64.StdEncoding.DecodeString(c.password)
c.password = string(tem)
require.NoError(t, err, c.name)
}
if c.needEscape {
c.password, err = url.QueryUnescape(c.password)
require.NoError(t, err, c.name)
}
require.Equal(t, c.want, c.password)
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test TestDecodePassword appears to test the behavior of Go's standard library functions (url.QueryEscape, url.QueryUnescape, base64.StdEncoding.EncodeToString, base64.StdEncoding.DecodeString) rather than any specific function from this package. It doesn't seem to call a decodePassword function from the project. As it stands, this test adds little value by testing standard library behavior and could be confusing. If it's intended to test a specific function, that function should be called within the test. Otherwise, it might be better to remove this test to avoid confusion.

Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
@wk989898 wk989898 changed the title [WIP]: add more unit test *: add more unit tests Jan 21, 2026
@wk989898 wk989898 marked this pull request as ready for review January 21, 2026 09:47
@ti-chi-bot ti-chi-bot bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 21, 2026
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
.
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
Signed-off-by: wk989898 <[email protected]>
@coderabbitai
Copy link

coderabbitai bot commented Jan 28, 2026

📝 Walkthrough

Walkthrough

This pull request adds comprehensive test coverage across the codebase, introducing over 40 new test files and test functions covering REST client operations, configuration validations, error handling, etcd operations, orchestrator workers, filter utilities, sink protocols, transaction utilities, and upstream management. One import path is updated to reflect a module migration.

Changes

Cohort / File(s) Summary
Import path update
downstreamadapter/sink/eventrouter/topic/expression_test.go
Updated import paths from github.com/pingcap/tiflow/pkg/errors to github.com/pingcap/ticdc/pkg/errors; updated benchmark and package reference comments.
REST API client tests
pkg/api/internal/rest/client_test.go, pkg/api/internal/rest/config_test.go, pkg/api/internal/rest/request_test.go
Added comprehensive REST client tests covering successful requests, error handling, HTTP method availability, TLS configuration, request parameters/URIs/bodies/headers, context cancellation, timeouts, and response decoding.
Configuration validation tests
pkg/config/large_message_test.go, pkg/config/messages_test.go, pkg/config/server_config_test.go, pkg/config/sink_protocol_test.go, pkg/config/sink_test.go
Added extensive configuration tests validating compression options, protocol-specific behavior, TLS handling, message server/client configs, sink atomicity settings, CSV parameters, storage configuration, and bootstrap message semantics.
Error handling and utility tests
pkg/errors/helper_test.go, pkg/leakutil/leak_helper_test.go
Added tests for error wrapping, RFC code resolution, retryability detection, CLI-unprintable error detection; updated channel type in leak helper test.
ETCD client and key parsing tests
pkg/etcd/client_test.go, pkg/etcd/etcdkey_test.go, pkg/etcd/util_test.go
Added tests for etcd retry logic, lease delegation, watch channel behavior, CDC key parsing/formatting, and revision extraction from watch options.
Orchestrator tests
pkg/orchestrator/batch_test.go, pkg/orchestrator/etcd_worker_test.go
Added tests for batch state computation, boundary condition handling, and extensive etcd worker tests with mock reactors validating patch generation, consistency guarantees, and state synchronization.
Filter tests
pkg/filter/expr_filter_bench_test.go, pkg/filter/utils_test.go
Added benchmark tests for DML filtering performance and unit tests for schema utilities and table rule verification.
Sink tests
pkg/sink/cloudstorage/main_test.go, pkg/sink/kafka/claimcheck/claim_check_test.go, pkg/sink/mysql/main_test.go
Added test harness for cloudstorage, unit test for claim-check initialization and file naming, and test harness for MySQL sink.
Transaction utility tests
pkg/txnutil/gc/gc_manager_test.go, pkg/txnutil/gc/gc_service_test.go
Added tests for GC safe-point management, stale checkpoint detection, PD leader switch simulation, and GC barrier management with comprehensive mock implementations.
Upstream manager tests
pkg/upstream/manager_test.go, pkg/upstream/upstream_test.go
Added tests for upstream initialization, idle time management, close behavior, visitor traversal, and topology registration with embedded etcd.
Utility function tests
pkg/util/json_writer_test.go, pkg/util/memory_test.go, pkg/util/uri_test.go
Added comprehensive tests and benchmarks for JSON writing operations, memory limit retrieval, and URI validation/masking utilities.
Test harnesses for leak detection
pkg/config/main_test.go, pkg/errors/main_test.go, pkg/etcd/main_test.go, pkg/eventservice/main_test.go, pkg/filter/main_test.go, pkg/orchestrator/main_test.go, pkg/txnutil/gc/main_test.go, pkg/upstream/main_test.go, pkg/util/main_test.go, pkg/version/main_test.go
Added TestMain functions across multiple packages to initialize leak detection via leakutil.SetUpLeakTest(m); simplified pkg/eventservice/main_test.go by removing explicit goleak options.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 With whiskers twitched and test files spun,
Configuration validations, one by one!
From etcd keys to REST client calls,
We've tested the paths through all the halls. 🏔️
No logic bent, just coverage grew—
Now leak detection watches too!

🚥 Pre-merge checks | ❌ 3
❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is incomplete; 'What is changed and how it works?' section is empty, performance/compatibility and documentation sections lack answers, and release note is not filled out despite PR being labeled 'release-note'. Fill in the empty sections: describe the test additions, answer performance/compatibility questions, clarify if documentation updates are needed, and provide or explicitly state 'None' for the release note.
Docstring Coverage ⚠️ Warning Docstring coverage is 3.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title '*: add more unit tests' is vague and generic, using non-descriptive phrasing that doesn't convey specific information about which packages or functionality are being tested. Replace with a more specific title that identifies the primary packages or areas being tested, e.g., 'Add unit tests for config, errors, etcd, orchestrator, and other packages' or identify the most critical package being tested.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🤖 Fix all issues with AI agents
In `@pkg/api/internal/rest/client_test.go`:
- Around line 90-102: The test dereferences c returned from restClient without
checking its error, which can panic if restClient fails; change the call to
capture the error (e.g., c, err := restClient(testServer)) and assert failure if
err != nil using require.NoError(t, err) or require.NotNil(t, c) before calling
c.Post/Get/Put/Delete so the test fails cleanly instead of panicking.

In `@pkg/config/large_message_test.go`:
- Around line 119-123: The comments claiming "`enable-tidb-extension` is false,
return error" are misleading for ProtocolOpen and ProtocolSimple because
AdjustAndValidate on largeMessageHandle does not validate enableTiDBExtension
for those protocols (only ProtocolCanalJSON does); update the comments near
tests that use largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) and
AdjustAndValidate(ProtocolSimple, false) to either remove the incorrect phrase
or reword to state that enableTiDBExtension has no effect for these protocols,
and keep the existing require.NoError assertions unchanged; search for
references to AdjustAndValidate, ProtocolOpen, ProtocolSimple,
ProtocolCanalJSON, and largeMessageHandle.LargeMessageHandleOption to locate all
affected test comments and correct them consistently.

In `@pkg/config/sink_protocol_test.go`:
- Around line 67-73: The loop over testCases doesn't assert that parsing
succeeded when tc.expectedErr is empty, so modify the test inside the loop that
calls ParseSinkProtocolFromString(tc.protocol) to explicitly check the error: if
tc.expectedErr != "" assert that err is non-nil and matches the regexp (using
require.Error/Regexp or require.Regexp with err.Error()), else assert
require.NoError(t, err) and then require.Equal(t, tc.expectedProtocolEnum,
protocol); reference the ParseSinkProtocolFromString call and
tc.expectedErr/tc.expectedProtocolEnum to make the checks.

In `@pkg/etcd/client_test.go`:
- Around line 34-39: The mockClient.Get currently never flips getOK from false
to true so it always returns success; change the logic in mockClient.Get to
return an error on the first call and succeed on the second by checking if getOK
is false, set m.getOK = true and return the mock error, otherwise return the
successful &clientv3.GetResponse{}; this ensures the retry path is exercised.
- Around line 124-129: The defer calling server.Close() is placed before
verifying error from SetupEmbedEtcd which can cause a nil deref if setup fails;
move the defer until after the require.Nil(t, err) (or after explicitly checking
err != nil) so server is only closed when non-nil — update the code around
SetupEmbedEtcd, the local variables ctx, url, server, err, and the deferred
server.Close() to perform error checking first and then defer server.Close().
- Around line 73-119: In TestRetry, the global maxTries is mutated for the test
and not restored if the test fails early; change the code so you capture
originValue and immediately schedule restoring maxTries via defer (or t.Cleanup)
right after setting maxTries = 2 to guarantee restoration even on early
exits/failures; reference the TestRetry function and the maxTries symbol when
making this change.

In `@pkg/etcd/util_test.go`:
- Line 30: The test assertion in util_test.go calls require.Equal with arguments
reversed; swap the order so the expected value (rev) is the second-to-last
argument and the actual value is the last, i.e. change require.Equal(t,
getRevisionFromWatchOpts(opt), rev) to require.Equal(t, rev,
getRevisionFromWatchOpts(opt)) so failures show correct messages for the
getRevisionFromWatchOpts result.

In `@pkg/orchestrator/batch_test.go`:
- Around line 47-52: The test constructs largeSizePatches using a
SingleDataPatch that currently creates a byte slice of size etcdTxnMaxSize which
equals the limit; change the allocation in the SingleDataPatch Func to make the
slice one byte larger (etcdTxnMaxSize + 1) so the patch truly exceeds the txn
max size and the test reliably exercises the "exceed size" path.
- Around line 62-68: The closure in the for-loop building manyOpsPatches
captures the loop variable i, causing all SingleDataPatch.Func closures to
reference the final i; fix by introducing a per-iteration shadow (e.g., assign
ii := i inside the loop) and use that shadow when calling
util.NewEtcdKey(fmt.Sprintf("/key%d", ii)) and inside the Func body when
building newValue (fmt.Sprintf("abc%d", ii)) so each closure captures its own
iteration value; update the loop that constructs manyOpsPatches accordingly
(referencing manyOpsPatches, SingleDataPatch, Func, and etcdTxnMaxOps).

In `@pkg/orchestrator/etcd_worker_test.go`:
- Around line 274-280: The test currently swallows timeouts by returning when
err from errg.Wait() is a DeadlineExceeded/Canceled or contains "etcdserver:
request timeout"; instead of returning, explicitly skip or fail the test—replace
the bare return with t.Skipf(...) (e.g. t.Skipf("skipping due to transient
environment timeout: %v", err)) for those transient conditions so flakiness is
recorded, and leave require.Nil(t, err) for all other errors; update the
error-check block around errg.Wait() and errors.Cause(err) accordingly.
- Around line 717-768: The test TestModifyAfterDelete spawns a goroutine that
calls require.Nil(t, err) inside the worker1.Run callback which violates Go's
testing rules; change the goroutine to capture the error returned by worker1.Run
and send it on a dedicated error channel (e.g., errCh) back to the main test
goroutine, then in the main goroutine receive from errCh and assert with
require.Nil(t, err). Specifically, replace the inline require in the anonymous
function that calls worker1.Run with sending the error into errCh, close or
signal errCh when done, and move the require.Nil(t, err) assertion to the main
test flow after wg.Wait()/receiving the error; keep the rest of
TestModifyAfterDelete, modifyReactor, deleteReactor, and worker setup unchanged.
- Around line 200-271: setUpTest currently returns a closure newClient that
calls require.Nil(t, err) (unsafe in goroutines) and TestEtcdSum also uses
require inside errgroup goroutines; change setUpTest to return func()
(etcd.Client, error) and update the closure to return the client and any error
instead of calling require, then in TestEtcdSum replace require calls inside
errg.Go functions with error returns (wrap/annotate the error) and only call
require.* from the main test goroutine after errg.Wait() or handle errors
returned by errgroup; update all references to newClient() to handle the
(client, error) tuple and propagate errors through errgroup so FailNow is never
called from spawned goroutines.

In `@pkg/upstream/upstream_test.go`:
- Around line 88-137: Add a short clarifying comment above the up.etcdCli = nil
before calling up.Close() explaining that registerTopologyInfo stores the
topology key with a lease on up.session, and that setting etcdCli to nil tests
graceful nil handling of the Upstream while calling up.Close() (which closes
up.session and triggers lease expiration and etcd key cleanup) still causes the
topology key to be removed; reference the Upstream methods/fields
registerTopologyInfo, up.etcdCli, up.session and Upstream.Close() in the
comment.
🧹 Nitpick comments (11)
pkg/etcd/util_test.go (1)

27-31: Consider adding explicit edge case coverage.

The random test is good for general coverage, but rand.Int63n(math.MaxInt64) only generates values in [0, MaxInt64). Consider adding explicit test cases for boundary values like 0, 1, and math.MaxInt64.

Optional: Add explicit edge cases
 func TestGetRevisionFromWatchOpts(t *testing.T) {
 	t.Parallel()

+	// Test explicit edge cases
+	edgeCases := []int64{0, 1, math.MaxInt64}
+	for _, rev := range edgeCases {
+		opt := clientv3.WithRev(rev)
+		require.Equal(t, rev, getRevisionFromWatchOpts(opt))
+	}
+
+	// Test random values
 	for i := 0; i < 100; i++ {
 		rev := rand.Int63n(math.MaxInt64)
 		opt := clientv3.WithRev(rev)
-		require.Equal(t, getRevisionFromWatchOpts(opt), rev)
+		require.Equal(t, rev, getRevisionFromWatchOpts(opt))
 	}
 }
pkg/config/sink_protocol_test.go (1)

24-65: Expand test cases to cover all supported protocols.
ParseSinkProtocolFromString supports csv, debezium, and simple, but the test tables don’t include them. Consider adding those to TestParseSinkProtocolFromString, and include ProtocolCsv/ProtocolDebezium/ProtocolSimple/ProtocolUnknown in TestString and TestIsBatchEncoder to keep coverage aligned with the parser.

Also applies to: 80-112, 122-154

pkg/etcd/etcdkey_test.go (1)

117-118: Swap expected/actual in require.Equal to match testify convention.

The testify require.Equal signature is func Equal(t TestingT, expected, actual interface{}, ...). The current assertions pass arguments in reversed order, which produces unclear failure diffs.

♻️ Proposed change
-		require.Equal(t, k, tc.expected)
-		require.Equal(t, k.String(), tc.key)
+		require.Equal(t, tc.expected, k)
+		require.Equal(t, tc.key, k.String())
pkg/errors/helper_test.go (1)

50-50: Consider swapping require.Equal arguments for clearer error messages.

The require.Equal function expects (expected, actual) but the code passes (actual, expected). While this doesn't break the test, failure messages will display "expected X but got Y" with the values swapped.

Suggested fix
-			require.Equal(t, we.Error(), tc.expected)
+			require.Equal(t, tc.expected, we.Error())
pkg/config/server_config_test.go (1)

134-157: Consider adding t.Parallel() for consistency.

Other tests in this file use t.Parallel() but this test does not. Adding it would maintain consistency and potentially speed up test execution.

Suggested fix
 func TestIsValidClusterID(t *testing.T) {
+	t.Parallel()
 	cases := []struct {
pkg/config/messages_test.go (2)

23-37: Consider adding t.Parallel() for consistency.

Most tests in the config package use t.Parallel() for concurrent execution.

Suggested fix
 func TestDefaultMessageServerConfig(t *testing.T) {
+	t.Parallel()
 	// This test case does a sanity check on default config.

79-83: Consider adding error message validation for consistency.

Other illegal config test cases verify the error message matches ".*ErrInvalidServerOption.*", but the negative MaxRecvMsgSize case only checks for an error without validating the message.

Suggested fix
 	illegalConfig = defaultMessageConfig.Clone()
 	illegalConfig.MaxRecvMsgSize = -1
 	err = illegalConfig.ValidateAndAdjust()
 	require.Error(t, err)
+	require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error())
pkg/sink/kafka/claimcheck/claim_check_test.go (1)

37-43: Add a nil guard before calling FileNameWithPrefix.

It improves clarity and avoids accidental panics if construction behavior changes.

✅ Proposed tweak
 claimCheck, err = New(ctx, largeHandleConfig, changefeedID)
 require.NoError(t, err)
+require.NotNil(t, claimCheck)

 fileName := claimCheck.FileNameWithPrefix("file.json")
 require.Equal(t, "file:///tmp/abc/file.json", fileName)
pkg/filter/expr_filter_bench_test.go (1)

117-124: Avoid testify assertions inside the benchmark loop.

This can distort benchmark timing; validate once, then keep the hot loop lean.

♻️ Proposed refactor
 for _, c := range cases {
 	b.Run(c.name, func(b *testing.B) {
-		for i := 0; i < b.N; i++ {
-			ignore, err := f.shouldSkipDML(c.dmlType, c.preRow, c.row, tableInfo)
-			require.NoError(b, err)
-			require.Equal(b, c.shouldSkip, ignore)
-		}
+		ignore, err := f.shouldSkipDML(c.dmlType, c.preRow, c.row, tableInfo)
+		require.NoError(b, err)
+		require.Equal(b, c.shouldSkip, ignore)
+
+		b.ResetTimer()
+		for i := 0; i < b.N; i++ {
+			if _, err := f.shouldSkipDML(c.dmlType, c.preRow, c.row, tableInfo); err != nil {
+				b.Fatal(err)
+			}
+		}
 	})
 }
pkg/api/internal/rest/request_test.go (1)

70-91: Assert the request completes without error.
Right now the test only validates the header; it will still pass if the request fails after header injection.

🛠️ Suggested tweak
-	_ = req.Do(context.Background())
+	if err := req.Do(context.Background()).Error(); err != nil {
+		t.Fatalf("unexpected request error: %v", err)
+	}
pkg/orchestrator/etcd_worker_test.go (1)

332-374: Replace fixed sleep with polling for improved test reliability under load.

The time.Sleep(500 * time.Millisecond) can be flaky under load. Use require.Eventually with a readiness condition—such as a channel from the reactor or by polling state.val with proper synchronization (e.g., sync.Mutex or sync/atomic)—to ensure the reactor has processed the initial batch before writing the second batch.

Example:

// Add sync.Mutex to intReactorState, or use atomic operations
require.Eventually(t, func() bool {
    return state.GetVal() >= 999  // or appropriate checkpoint
}, 5*time.Second, 10*time.Millisecond)

Alternatively, expose a readiness channel from the reactor to signal when it's ready for the next batch.

Comment on lines +90 to +102
c, _ := restClient(testServer)

req := c.Post()
require.NotNil(t, req)

req = c.Get()
require.NotNil(t, req)

req = c.Put()
require.NotNil(t, req)

req = c.Delete()
require.NotNil(t, req)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle restClient errors before dereferencing.

If restClient fails, c may be nil and cause a panic.

✅ Proposed fix
-	c, _ := restClient(testServer)
+	c, err := restClient(testServer)
+	require.NoError(t, err)
+	require.NotNil(t, c)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
c, _ := restClient(testServer)
req := c.Post()
require.NotNil(t, req)
req = c.Get()
require.NotNil(t, req)
req = c.Put()
require.NotNil(t, req)
req = c.Delete()
require.NotNil(t, req)
c, err := restClient(testServer)
require.NoError(t, err)
require.NotNil(t, c)
req := c.Post()
require.NotNil(t, req)
req = c.Get()
require.NotNil(t, req)
req = c.Put()
require.NotNil(t, req)
req = c.Delete()
require.NotNil(t, req)
🤖 Prompt for AI Agents
In `@pkg/api/internal/rest/client_test.go` around lines 90 - 102, The test
dereferences c returned from restClient without checking its error, which can
panic if restClient fails; change the call to capture the error (e.g., c, err :=
restClient(testServer)) and assert failure if err != nil using
require.NoError(t, err) or require.NotNil(t, c) before calling
c.Post/Get/Put/Delete so the test fails cleanly instead of panicking.

Comment on lines +119 to +123
largeMessageHandle.LargeMessageHandleOption = LargeMessageHandleOptionHandleKeyOnly
// `enable-tidb-extension` is false, return error
err = largeMessageHandle.AdjustAndValidate(ProtocolOpen, false)
require.NoError(t, err)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Get the AdjustAndValidate implementation
rg -n "func.*AdjustAndValidate" pkg/config/large_message.go -A 50

Repository: pingcap/ticdc

Length of output: 2033


🏁 Script executed:

#!/bin/bash
# Read the test file at specified lines
sed -n '115,200p' pkg/config/large_message_test.go

Repository: pingcap/ticdc

Length of output: 3368


Remove or correct misleading comments about "enable-tidb-extension = false" returning error for Open/Simple protocols.

For ProtocolOpen and ProtocolSimple, the AdjustAndValidate implementation does not validate the enableTiDBExtension parameter—it returns no error regardless of its value. Only ProtocolCanalJSON performs this validation. The comments at lines 119-123, 143-146, 167-171, and 191-194 stating "enable-tidb-extension is false, return error" contradict both the actual behavior and the correct assertions (require.NoError). Either remove these comments or reword them to clarify that enableTiDBExtension has no effect for these protocol types.

🤖 Prompt for AI Agents
In `@pkg/config/large_message_test.go` around lines 119 - 123, The comments
claiming "`enable-tidb-extension` is false, return error" are misleading for
ProtocolOpen and ProtocolSimple because AdjustAndValidate on largeMessageHandle
does not validate enableTiDBExtension for those protocols (only
ProtocolCanalJSON does); update the comments near tests that use
largeMessageHandle.AdjustAndValidate(ProtocolOpen, false) and
AdjustAndValidate(ProtocolSimple, false) to either remove the incorrect phrase
or reword to state that enableTiDBExtension has no effect for these protocols,
and keep the existing require.NoError assertions unchanged; search for
references to AdjustAndValidate, ProtocolOpen, ProtocolSimple,
ProtocolCanalJSON, and largeMessageHandle.LargeMessageHandleOption to locate all
affected test comments and correct them consistently.

Comment on lines +67 to +73
for _, tc := range testCases {
protocol, err := ParseSinkProtocolFromString(tc.protocol)
if tc.expectedErr != "" {
require.Regexp(t, tc.expectedErr, err)
} else {
require.Equal(t, tc.expectedProtocolEnum, protocol)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert error-free parses explicitly.
A non-nil error in the success path would be ignored, potentially masking failures.

✅ Suggested fix
	for _, tc := range testCases {
		protocol, err := ParseSinkProtocolFromString(tc.protocol)
		if tc.expectedErr != "" {
			require.Regexp(t, tc.expectedErr, err)
+			require.Equal(t, ProtocolUnknown, protocol)
		} else {
+			require.NoError(t, err)
			require.Equal(t, tc.expectedProtocolEnum, protocol)
		}
	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _, tc := range testCases {
protocol, err := ParseSinkProtocolFromString(tc.protocol)
if tc.expectedErr != "" {
require.Regexp(t, tc.expectedErr, err)
} else {
require.Equal(t, tc.expectedProtocolEnum, protocol)
}
for _, tc := range testCases {
protocol, err := ParseSinkProtocolFromString(tc.protocol)
if tc.expectedErr != "" {
require.Regexp(t, tc.expectedErr, err)
require.Equal(t, ProtocolUnknown, protocol)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedProtocolEnum, protocol)
}
}
🤖 Prompt for AI Agents
In `@pkg/config/sink_protocol_test.go` around lines 67 - 73, The loop over
testCases doesn't assert that parsing succeeded when tc.expectedErr is empty, so
modify the test inside the loop that calls
ParseSinkProtocolFromString(tc.protocol) to explicitly check the error: if
tc.expectedErr != "" assert that err is non-nil and matches the regexp (using
require.Error/Regexp or require.Regexp with err.Error()), else assert
require.NoError(t, err) and then require.Equal(t, tc.expectedProtocolEnum,
protocol); reference the ParseSinkProtocolFromString call and
tc.expectedErr/tc.expectedProtocolEnum to make the checks.

Comment on lines +34 to +39
func (m *mockClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) {
if m.getOK {
m.getOK = true
return nil, errors.New("mock error")
}
return &clientv3.GetResponse{}, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Retry path isn’t exercised in mockClient.Get.

getOK never flips from false to true, so the mock always succeeds and the retry logic isn’t actually tested. Flip the flag on the first call so the second attempt can succeed.

🐛 Proposed fix
-	if m.getOK {
-		m.getOK = true
-		return nil, errors.New("mock error")
-	}
-	return &clientv3.GetResponse{}, nil
+	if !m.getOK {
+		m.getOK = true
+		return nil, errors.New("mock error")
+	}
+	return &clientv3.GetResponse{}, nil
🤖 Prompt for AI Agents
In `@pkg/etcd/client_test.go` around lines 34 - 39, The mockClient.Get currently
never flips getOK from false to true so it always returns success; change the
logic in mockClient.Get to return an error on the first call and succeed on the
second by checking if getOK is false, set m.getOK = true and return the mock
error, otherwise return the successful &clientv3.GetResponse{}; this ensures the
retry path is exercised.

Comment on lines +73 to +119
func TestRetry(t *testing.T) {
// here we need to change maxTries, which is not thread safe
// so we don't use t.Parallel() for this test

originValue := maxTries
// to speedup the test
maxTries = 2

cli := clientv3.NewCtxClient(context.TODO())
cli.KV = &mockClient{}
retrycli := Wrap(cli, nil)
get, err := retrycli.Get(context.TODO(), "")

require.NoError(t, err)
require.NotNil(t, get)

_, err = retrycli.Put(context.TODO(), "", "")
require.NotNil(t, err)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test Txn case
// case 0: normal
rsp, err := retrycli.Txn(ctx, nil, nil, nil)
require.NoError(t, err)
require.False(t, rsp.Succeeded)

// case 1: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, txnEmptyCmps, nil, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 2: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, nil, txnEmptyOpsThen, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 3: context.DeadlineExceeded
_, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, nil)
require.Equal(t, context.DeadlineExceeded, err)

// other case: mock error
_, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, TxnEmptyOpsElse)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

maxTries = originValue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure maxTries is restored even on early failures.

If any require.* fails, the test exits before the last line, leaving the global mutated and potentially impacting other tests. Use defer (or t.Cleanup) immediately after the assignment.

🐛 Proposed fix
 	originValue := maxTries
 	// to speedup the test
 	maxTries = 2
+	defer func() { maxTries = originValue }()
@@
-	maxTries = originValue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestRetry(t *testing.T) {
// here we need to change maxTries, which is not thread safe
// so we don't use t.Parallel() for this test
originValue := maxTries
// to speedup the test
maxTries = 2
cli := clientv3.NewCtxClient(context.TODO())
cli.KV = &mockClient{}
retrycli := Wrap(cli, nil)
get, err := retrycli.Get(context.TODO(), "")
require.NoError(t, err)
require.NotNil(t, get)
_, err = retrycli.Put(context.TODO(), "", "")
require.NotNil(t, err)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Test Txn case
// case 0: normal
rsp, err := retrycli.Txn(ctx, nil, nil, nil)
require.NoError(t, err)
require.False(t, rsp.Succeeded)
// case 1: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, txnEmptyCmps, nil, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)
// case 2: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, nil, txnEmptyOpsThen, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)
// case 3: context.DeadlineExceeded
_, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, nil)
require.Equal(t, context.DeadlineExceeded, err)
// other case: mock error
_, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, TxnEmptyOpsElse)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())
maxTries = originValue
}
func TestRetry(t *testing.T) {
// here we need to change maxTries, which is not thread safe
// so we don't use t.Parallel() for this test
originValue := maxTries
// to speedup the test
maxTries = 2
defer func() { maxTries = originValue }()
cli := clientv3.NewCtxClient(context.TODO())
cli.KV = &mockClient{}
retrycli := Wrap(cli, nil)
get, err := retrycli.Get(context.TODO(), "")
require.NoError(t, err)
require.NotNil(t, get)
_, err = retrycli.Put(context.TODO(), "", "")
require.NotNil(t, err)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Test Txn case
// case 0: normal
rsp, err := retrycli.Txn(ctx, nil, nil, nil)
require.NoError(t, err)
require.False(t, rsp.Succeeded)
// case 1: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, txnEmptyCmps, nil, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)
// case 2: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, nil, txnEmptyOpsThen, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)
// case 3: context.DeadlineExceeded
_, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, nil)
require.Equal(t, context.DeadlineExceeded, err)
// other case: mock error
_, err = retrycli.Txn(ctx, txnEmptyCmps, txnEmptyOpsThen, TxnEmptyOpsElse)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())
}
🤖 Prompt for AI Agents
In `@pkg/etcd/client_test.go` around lines 73 - 119, In TestRetry, the global
maxTries is mutated for the test and not restored if the test fails early;
change the code so you capture originValue and immediately schedule restoring
maxTries via defer (or t.Cleanup) right after setting maxTries = 2 to guarantee
restoration even on early exits/failures; reference the TestRetry function and
the maxTries symbol when making this change.

Comment on lines +62 to +68
for i := 0; i <= etcdTxnMaxOps*2; i++ {
manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{
Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)),
Func: func(old []byte) (newValue []byte, changed bool, err error) {
newValue = []byte(fmt.Sprintf("abc%d", i))
return newValue, true, nil
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd /tmp && find . -name "batch_test.go" -path "*/orchestrator/*" 2>/dev/null | head -5

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

git ls-files | grep -E "orchestrator/batch_test\.go"

Repository: pingcap/ticdc

Length of output: 87


🏁 Script executed:

cat -n pkg/orchestrator/batch_test.go | sed -n '50,85p'

Repository: pingcap/ticdc

Length of output: 1208


🏁 Script executed:

cat -n pkg/orchestrator/batch_test.go | sed -n '1,80p'

Repository: pingcap/ticdc

Length of output: 3182


🏁 Script executed:

grep -n "for.*:=.*i.*:=" pkg/orchestrator/batch_test.go | head -10

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

grep -n "func getBatchChangedState" pkg/orchestrator/batch_test.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

grep -n "func getBatchChangedState" pkg/orchestrator/*.go | head -5

Repository: pingcap/ticdc

Length of output: 214


🏁 Script executed:

rg "func getBatchChangedState" -A 20 pkg/orchestrator/

Repository: pingcap/ticdc

Length of output: 1492


🏁 Script executed:

rg "func getChangedState" -A 30 pkg/orchestrator/

Repository: pingcap/ticdc

Length of output: 1159


🏁 Script executed:

rg "func.*Patch.*patch.*state.*changedSet" -A 20 pkg/orchestrator/

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

rg "type SingleDataPatch" -A 15 pkg/orchestrator/

Repository: pingcap/ticdc

Length of output: 1117


Fix loop-variable capture in patch funcs.

In the for loop starting at line 62, the closure captures the same i variable; all funcs will see the final value after the loop completes. Shadow i per-iteration like the earlier loop at line 28 to ensure each closure captures its own iteration's value.

🔧 Suggested fix
 for i := 0; i <= etcdTxnMaxOps*2; i++ {
+    i := i
     manyOpsPatches = append(manyOpsPatches, &SingleDataPatch{
         Key: util.NewEtcdKey(fmt.Sprintf("/key%d", i)),
         Func: func(old []byte) (newValue []byte, changed bool, err error) {
             newValue = []byte(fmt.Sprintf("abc%d", i))
             return newValue, true, nil
🤖 Prompt for AI Agents
In `@pkg/orchestrator/batch_test.go` around lines 62 - 68, The closure in the
for-loop building manyOpsPatches captures the loop variable i, causing all
SingleDataPatch.Func closures to reference the final i; fix by introducing a
per-iteration shadow (e.g., assign ii := i inside the loop) and use that shadow
when calling util.NewEtcdKey(fmt.Sprintf("/key%d", ii)) and inside the Func body
when building newValue (fmt.Sprintf("abc%d", ii)) so each closure captures its
own iteration value; update the loop that constructs manyOpsPatches accordingly
(referencing manyOpsPatches, SingleDataPatch, Func, and etcdTxnMaxOps).

Comment on lines +200 to +271
func setUpTest(t *testing.T) (func() etcd.Client, func()) {
url, server, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
endpoints := []string{url.String()}
return func() etcd.Client {
rawCli, err := clientv3.NewFromURLs(endpoints)
require.Nil(t, err)
return etcd.Wrap(rawCli, map[string]prometheus.Counter{})
}, func() {
server.Close()
}
}

func TestEtcdSum(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

newClient, closer := setUpTest(t)
defer closer()

cli := newClient()
defer func() {
_ = cli.Unwrap().Close()
}()
_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
require.Nil(t, err)

initArray := make([]int, numValuesPerGroup)
jsonStr, err := json.Marshal(initArray)
require.Nil(t, err)

for i := 0; i < numGroups; i++ {
_, err := cli.Put(ctx, testEtcdKeyPrefix+"/"+strconv.Itoa(i), string(jsonStr))
require.Nil(t, err)
}

errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < numValuesPerGroup+1; i++ {
finalI := i
errg.Go(func() error {
values := make([][]int, numGroups)
for j := range values {
values[j] = make([]int, numValuesPerGroup)
}

reactor := &simpleReactor{
state: nil,
id: finalI,
}

initState := &simpleReactorState{
values: values,
sum: 0,
deltas: nil,
patches: nil,
}

cli := newClient()
cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
require.Nil(t, err)
defer func() {
_ = cli.Unwrap().Close()
}()

etcdWorker, err := NewEtcdWorker(cdcCli, testEtcdKeyPrefix, reactor, initState,
&migrate.NoOpMigrator{})
if err != nil {
return errors.Trace(err)
}

return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "owner"))
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

testify require FailNow goroutine safety behavior

💡 Result:

In Testify, anything in require.* ultimately fails the test by calling t.FailNow() (directly or indirectly). t.FailNow() (and therefore t.Fatal, require.*, etc.) must be called only from the goroutine that is running the test/benchmark functionnot from a goroutine you start with go func(){...}(). Otherwise, it will only stop that goroutine (via runtime.Goexit), while the test goroutine may keep running/waiting, which can lead to flaky tests, hangs, and resource leaks. [1][2]

Key behavior to remember:

  • require.* terminates the current test on failure (it “Fatal”s) → so it inherits the same goroutine restriction as t.FailNow(). [1][3]
  • If called in a non-test goroutine, FailNow/Fatal/require.* does not abort the test, it just aborts that goroutine; other goroutines continue. [2]
  • This is well-known enough that linters flag it (e.g., go vet’s testinggoroutine and tools like testifylint). [4]

Typical fixes:

  • Do assertions in the main test goroutine (send results over a channel from worker goroutines, then require.NoError(t, err) in the test goroutine). [2][4]
  • If you must report from inside a goroutine, prefer non-FailNow reporting (t.Error, assert.*) and still ensure the test doesn’t exit before the goroutine finishes (e.g., WaitGroup). [2]

Sources:
[1] https://pkg.go.dev/github.com/stretchr/testify (require goroutine restriction)
[2] https://pkg.go.dev/testing (FailNow goroutine rule + Goexit behavior)
[3] https://deepwiki.com/stretchr/testify (require calls t.FailNow)
[4] https://github.com/Antonboom/testifylint (go-require checker rationale / goroutine issue)


🏁 Script executed:

# Check for other require calls in the test file
rg "require\." pkg/orchestrator/etcd_worker_test.go -A 2 -B 2

Repository: pingcap/ticdc

Length of output: 5659


🏁 Script executed:

# Search for the context around the flagged lines to see if there are other require calls
wc -l pkg/orchestrator/etcd_worker_test.go

Repository: pingcap/ticdc

Length of output: 97


Fix unsafe require calls in goroutines by propagating errors instead.

Calling require.* from goroutines spawned with errgroup is unsafe—FailNow() terminates only the spawned goroutine, not the test, leading to flaky tests and potential hangs.

The code has two problematic patterns: setUpTest returns a closure newClient() containing require.Nil(t, err) that gets called from goroutines, and TestEtcdSum directly uses require.Nil(t, err) inside errg.Go().

Change setUpTest to return func() (etcd.Client, error), replace require with error returns, and handle errors in the goroutine before passing them through errgroup:

Proposed fix
-func setUpTest(t *testing.T) (func() etcd.Client, func()) {
+func setUpTest(t *testing.T) (func() (etcd.Client, error), func()) {
 	url, server, err := etcd.SetupEmbedEtcd(t.TempDir())
 	require.Nil(t, err)
 	endpoints := []string{url.String()}
-	return func() etcd.Client {
-			rawCli, err := clientv3.NewFromURLs(endpoints)
-			require.Nil(t, err)
-			return etcd.Wrap(rawCli, map[string]prometheus.Counter{})
-		}, func() {
+	return func() (etcd.Client, error) {
+			rawCli, err := clientv3.NewFromURLs(endpoints)
+			if err != nil {
+				return nil, err
+			}
+			return etcd.Wrap(rawCli, map[string]prometheus.Counter{}), nil
+		}, func() {
 			server.Close()
 		}
 }
-			cli := newClient()
-			cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
-			require.Nil(t, err)
+			cli, err := newClient()
+			if err != nil {
+				return errors.Trace(err)
+			}
+			cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
+			if err != nil {
+				return errors.Trace(err)
+			}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func setUpTest(t *testing.T) (func() etcd.Client, func()) {
url, server, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
endpoints := []string{url.String()}
return func() etcd.Client {
rawCli, err := clientv3.NewFromURLs(endpoints)
require.Nil(t, err)
return etcd.Wrap(rawCli, map[string]prometheus.Counter{})
}, func() {
server.Close()
}
}
func TestEtcdSum(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
newClient, closer := setUpTest(t)
defer closer()
cli := newClient()
defer func() {
_ = cli.Unwrap().Close()
}()
_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
require.Nil(t, err)
initArray := make([]int, numValuesPerGroup)
jsonStr, err := json.Marshal(initArray)
require.Nil(t, err)
for i := 0; i < numGroups; i++ {
_, err := cli.Put(ctx, testEtcdKeyPrefix+"/"+strconv.Itoa(i), string(jsonStr))
require.Nil(t, err)
}
errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < numValuesPerGroup+1; i++ {
finalI := i
errg.Go(func() error {
values := make([][]int, numGroups)
for j := range values {
values[j] = make([]int, numValuesPerGroup)
}
reactor := &simpleReactor{
state: nil,
id: finalI,
}
initState := &simpleReactorState{
values: values,
sum: 0,
deltas: nil,
patches: nil,
}
cli := newClient()
cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
require.Nil(t, err)
defer func() {
_ = cli.Unwrap().Close()
}()
etcdWorker, err := NewEtcdWorker(cdcCli, testEtcdKeyPrefix, reactor, initState,
&migrate.NoOpMigrator{})
if err != nil {
return errors.Trace(err)
}
return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "owner"))
})
func setUpTest(t *testing.T) (func() (etcd.Client, error), func()) {
url, server, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
endpoints := []string{url.String()}
return func() (etcd.Client, error) {
rawCli, err := clientv3.NewFromURLs(endpoints)
if err != nil {
return nil, err
}
return etcd.Wrap(rawCli, map[string]prometheus.Counter{}), nil
}, func() {
server.Close()
}
}
func TestEtcdSum(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
newClient, closer := setUpTest(t)
defer closer()
cli := newClient()
defer func() {
_ = cli.Unwrap().Close()
}()
_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
require.Nil(t, err)
initArray := make([]int, numValuesPerGroup)
jsonStr, err := json.Marshal(initArray)
require.Nil(t, err)
for i := 0; i < numGroups; i++ {
_, err := cli.Put(ctx, testEtcdKeyPrefix+"/"+strconv.Itoa(i), string(jsonStr))
require.Nil(t, err)
}
errg, ctx := errgroup.WithContext(ctx)
for i := 0; i < numValuesPerGroup+1; i++ {
finalI := i
errg.Go(func() error {
values := make([][]int, numGroups)
for j := range values {
values[j] = make([]int, numValuesPerGroup)
}
reactor := &simpleReactor{
state: nil,
id: finalI,
}
initState := &simpleReactorState{
values: values,
sum: 0,
deltas: nil,
patches: nil,
}
cli, err := newClient()
if err != nil {
return errors.Trace(err)
}
cdcCli, err := etcd.NewCDCEtcdClient(ctx, cli.Unwrap(), "default")
if err != nil {
return errors.Trace(err)
}
defer func() {
_ = cli.Unwrap().Close()
}()
etcdWorker, err := NewEtcdWorker(cdcCli, testEtcdKeyPrefix, reactor, initState,
&migrate.NoOpMigrator{})
if err != nil {
return errors.Trace(err)
}
return errors.Trace(etcdWorker.Run(ctx, nil, 10*time.Millisecond, "owner"))
})
🤖 Prompt for AI Agents
In `@pkg/orchestrator/etcd_worker_test.go` around lines 200 - 271, setUpTest
currently returns a closure newClient that calls require.Nil(t, err) (unsafe in
goroutines) and TestEtcdSum also uses require inside errgroup goroutines; change
setUpTest to return func() (etcd.Client, error) and update the closure to return
the client and any error instead of calling require, then in TestEtcdSum replace
require calls inside errg.Go functions with error returns (wrap/annotate the
error) and only call require.* from the main test goroutine after errg.Wait() or
handle errors returned by errgroup; update all references to newClient() to
handle the (client, error) tuple and propagate errors through errgroup so
FailNow is never called from spawned goroutines.

Comment on lines +274 to +280
err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
return
}
require.Nil(t, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don’t silently pass on timeouts—skip or fail explicitly.

Returning here makes the test pass even if it times out, which can hide regressions. Consider t.Skipf for environmental flakiness or fail explicitly.

📝 Suggested change
-if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
-	errors.Cause(err) == context.Canceled ||
-	strings.Contains(err.Error(), "etcdserver: request timeout")) {
-	return
-}
-require.Nil(t, err)
+if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
+	errors.Cause(err) == context.Canceled ||
+	strings.Contains(err.Error(), "etcdserver: request timeout")) {
+	t.Skipf("etcd timeout: %v", err)
+}
+require.NoError(t, err)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
return
}
require.Nil(t, err)
err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
t.Skipf("etcd timeout: %v", err)
}
require.NoError(t, err)
🤖 Prompt for AI Agents
In `@pkg/orchestrator/etcd_worker_test.go` around lines 274 - 280, The test
currently swallows timeouts by returning when err from errg.Wait() is a
DeadlineExceeded/Canceled or contains "etcdserver: request timeout"; instead of
returning, explicitly skip or fail the test—replace the bare return with
t.Skipf(...) (e.g. t.Skipf("skipping due to transient environment timeout: %v",
err)) for those transient conditions so flakiness is recorded, and leave
require.Nil(t, err) for all other errors; update the error-check block around
errg.Wait() and errors.Cause(err) accordingly.

Comment on lines +717 to +768
func TestModifyAfterDelete(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()

newClient, closer := setUpTest(t)
defer closer()

cli1 := newClient()
cdcCli1, err := etcd.NewCDCEtcdClient(ctx, cli1.Unwrap(), "default")
require.Nil(t, err)

cli2 := newClient()
cdcCli2, err := etcd.NewCDCEtcdClient(ctx, cli2.Unwrap(), "default")
require.Nil(t, err)

_, err = cli1.Put(ctx, "/test/key1", "original value")
require.Nil(t, err)

modifyReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: []byte("modified value"),
waitOnCh: make(chan struct{}),
}
worker1, err := NewEtcdWorker(cdcCli1, "/test", modifyReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "owner")
require.Nil(t, err)
}()

modifyReactor.waitOnCh <- struct{}{}

deleteReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: nil, // deletion
}
worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)

err = worker2.Run(ctx, nil, time.Millisecond*100, "owner")
require.Nil(t, err)

modifyReactor.waitOnCh <- struct{}{}
wg.Wait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

testify require safe goroutine concurrent testing golang

💡 Result:

testify/require is not safe to use inside a goroutine (other than the test’s main goroutine), because require.* calls t.FailNow(). The Go testing docs state FailNow must be called from the goroutine running the test, not from goroutines created during the test. (pkg.go.dev) The require package docs explicitly repeat this consequence. (pkg.go.dev)

What to do instead:

  • Don’t assert in goroutines: send errors/results back to the main test goroutine (via channel/errgroup/WaitGroup), then call require.* in the main goroutine.
  • If you must report from a worker goroutine, use testify/assert (it reports via t.Errorf-style failures rather than FailNow), understanding the test won’t stop immediately. (stackoverflow.com)
  • Linters/analysis tools flag this pattern for the same reason (it doesn’t stop the test and can cause hangs/leaks), e.g. “go-require” checks. (github.com)

Citations:


🏁 Script executed:

# First, let's locate and examine the test file
fd "etcd_worker_test.go" --type f

Repository: pingcap/ticdc

Length of output: 93


🏁 Script executed:

# Read the test file to see the actual code and context
cat -n pkg/orchestrator/etcd_worker_test.go | sed -n '710,775p'

Repository: pingcap/ticdc

Length of output: 2383


Avoid require inside the worker goroutine; report errors back to the main test goroutine.

Calling require.Nil(t, err) inside a spawned goroutine (line 750) violates Go's testing contract. The testing package requires that t.FailNow() (called by require.*) be invoked only from the test's main goroutine. Use an error channel to return the result to the main goroutine, then assert there.

🔧 Proposed fix
 var wg sync.WaitGroup
+errCh := make(chan error, 1)
 wg.Add(1)
 go func() {
 	defer wg.Done()
 	err := worker1.Run(ctx, nil, time.Millisecond*100, "owner")
-	require.Nil(t, err)
+	errCh <- err
 }()

 modifyReactor.waitOnCh <- struct{}{}

 deleteReactor := &modifyOneReactor{
 	key:   []byte("/test/key1"),
 	value: nil, // deletion
 }
 worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{
 	state: make(map[string]string),
 }, &migrate.NoOpMigrator{})
 require.Nil(t, err)

 err = worker2.Run(ctx, nil, time.Millisecond*100, "owner")
 require.Nil(t, err)

 modifyReactor.waitOnCh <- struct{}{}
 wg.Wait()
+require.NoError(t, <-errCh)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestModifyAfterDelete(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
newClient, closer := setUpTest(t)
defer closer()
cli1 := newClient()
cdcCli1, err := etcd.NewCDCEtcdClient(ctx, cli1.Unwrap(), "default")
require.Nil(t, err)
cli2 := newClient()
cdcCli2, err := etcd.NewCDCEtcdClient(ctx, cli2.Unwrap(), "default")
require.Nil(t, err)
_, err = cli1.Put(ctx, "/test/key1", "original value")
require.Nil(t, err)
modifyReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: []byte("modified value"),
waitOnCh: make(chan struct{}),
}
worker1, err := NewEtcdWorker(cdcCli1, "/test", modifyReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "owner")
require.Nil(t, err)
}()
modifyReactor.waitOnCh <- struct{}{}
deleteReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: nil, // deletion
}
worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)
err = worker2.Run(ctx, nil, time.Millisecond*100, "owner")
require.Nil(t, err)
modifyReactor.waitOnCh <- struct{}{}
wg.Wait()
func TestModifyAfterDelete(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
newClient, closer := setUpTest(t)
defer closer()
cli1 := newClient()
cdcCli1, err := etcd.NewCDCEtcdClient(ctx, cli1.Unwrap(), "default")
require.Nil(t, err)
cli2 := newClient()
cdcCli2, err := etcd.NewCDCEtcdClient(ctx, cli2.Unwrap(), "default")
require.Nil(t, err)
_, err = cli1.Put(ctx, "/test/key1", "original value")
require.Nil(t, err)
modifyReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: []byte("modified value"),
waitOnCh: make(chan struct{}),
}
worker1, err := NewEtcdWorker(cdcCli1, "/test", modifyReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)
var wg sync.WaitGroup
errCh := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
err := worker1.Run(ctx, nil, time.Millisecond*100, "owner")
errCh <- err
}()
modifyReactor.waitOnCh <- struct{}{}
deleteReactor := &modifyOneReactor{
key: []byte("/test/key1"),
value: nil, // deletion
}
worker2, err := NewEtcdWorker(cdcCli2, "/test", deleteReactor, &commonReactorState{
state: make(map[string]string),
}, &migrate.NoOpMigrator{})
require.Nil(t, err)
err = worker2.Run(ctx, nil, time.Millisecond*100, "owner")
require.Nil(t, err)
modifyReactor.waitOnCh <- struct{}{}
wg.Wait()
require.NoError(t, <-errCh)
}
🤖 Prompt for AI Agents
In `@pkg/orchestrator/etcd_worker_test.go` around lines 717 - 768, The test
TestModifyAfterDelete spawns a goroutine that calls require.Nil(t, err) inside
the worker1.Run callback which violates Go's testing rules; change the goroutine
to capture the error returned by worker1.Run and send it on a dedicated error
channel (e.g., errCh) back to the main test goroutine, then in the main
goroutine receive from errCh and assert with require.Nil(t, err). Specifically,
replace the inline require in the anonymous function that calls worker1.Run with
sending the error into errCh, close or signal errCh when done, and move the
require.Nil(t, err) assertion to the main test flow after wg.Wait()/receiving
the error; keep the rest of TestModifyAfterDelete, modifyReactor, deleteReactor,
and worker setup unchanged.

Comment on lines +88 to +137
func TestRegisterTopo(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir())
defer etcdServer.Close()

require.NoError(t, err)
logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)

rawEtcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientURL.String()},
Context: ctx,
LogConfig: &logConfig,
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
defer rawEtcdCli.Close()
etcdCli := etcd.Wrap(rawEtcdCli, make(map[string]prometheus.Counter))
up := &Upstream{
cancel: func() {},
etcdCli: etcdCli,
wg: &sync.WaitGroup{},
}

info := node.NewInfo("localhost:8300", "test")
info.Version = "test.1.0"
err = up.registerTopologyInfo(ctx, &NodeTopologyCfg{
Info: info,
GCServiceID: "clusterID",
SessionTTL: 2,
})
require.NoError(t, err)

resp, err := etcdCli.Get(ctx, "/topology/ticdc/clusterID/localhost:8300")
require.NoError(t, err)

infoData, err := info.Marshal()
require.NoError(t, err)
require.Equal(t, infoData, resp.Kvs[0].Value)

up.etcdCli = nil
up.Close()
require.Eventually(t, func() bool {
resp, err := etcdCli.Get(ctx, "/topology/ticdc/clusterID/localhost:8300")
require.NoError(t, err)
return len(resp.Kvs) == 0
}, time.Second*5, time.Millisecond*100)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

ast-grep --pattern $'func ($_ *Upstream) Close() {
  $$$
}'

Repository: pingcap/ticdc

Length of output: 2498


🏁 Script executed:

rg -A 20 "func.*registerTopologyInfo" pkg/upstream/upstream.go

Repository: pingcap/ticdc

Length of output: 711


🏁 Script executed:

# Check if there are comments explaining this pattern in the test file
rg -B 5 "up.etcdCli = nil" pkg/upstream/upstream_test.go

Repository: pingcap/ticdc

Length of output: 181


Add a clarifying comment explaining why etcdCli is set to nil before Close().

The topology key deletion is lease-based: registerTopologyInfo stores the key with a lease tied to up.session, and when Close() calls up.session.Close(), the lease expires and etcd automatically deletes the key. Setting up.etcdCli = nil before Close() tests graceful nil handling while the session closure still triggers the cleanup. Add a comment above line 131 to explain this pattern, e.g., // Test graceful nil handling; session closure will still expire the lease and cleanup topology key.

🤖 Prompt for AI Agents
In `@pkg/upstream/upstream_test.go` around lines 88 - 137, Add a short clarifying
comment above the up.etcdCli = nil before calling up.Close() explaining that
registerTopologyInfo stores the topology key with a lease on up.session, and
that setting etcdCli to nil tests graceful nil handling of the Upstream while
calling up.Close() (which closes up.session and triggers lease expiration and
etcd key cleanup) still causes the topology key to be removed; reference the
Upstream methods/fields registerTopologyInfo, up.etcdCli, up.session and
Upstream.Close() in the comment.

@ti-chi-bot ti-chi-bot bot added the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label Jan 28, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 28, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: 3AceShowHand
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 28, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-01-28 09:56:19.266587255 +0000 UTC m=+1179006.880544111: ☑️ agreed by 3AceShowHand.

@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 29, 2026

@wk989898: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 3a67c63 link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

needs-1-more-lgtm Indicates a PR needs 1 more LGTM. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants