-
Notifications
You must be signed in to change notification settings - Fork 37
kafka-consumer: dedupe replayed DML by applied watermark #4052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka-consumer: dedupe replayed DML by applied watermark #4052
Conversation
Summary of ChangesHello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the robustness of the Kafka consumer by refining its event deduplication and handling of out-of-order DML events. By introducing an "applied watermark," the consumer can now accurately distinguish between truly processed events and those that are replayed due to system restarts or retries. This prevents premature dropping of necessary events, thereby ensuring data consistency in the downstream system, particularly in scenarios involving at-least-once replay semantics. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a crucial fix for handling replayed DML events in the Kafka consumer by using an appliedWatermark for deduplication. The changes significantly improve correctness by preventing data loss on retries/restarts and simplify the event handling logic in appendRow2Group. The addition of AppliedWatermark to EventsGroup and the logic to merge events with the same commit timestamp during forced insertion are well-implemented. The new unit tests effectively validate the new behavior.
I've made a couple of suggestions to reduce some code duplication in writer.go, which could further improve maintainability. Overall, this is a great improvement.
…r-applied-watermark
📝 WalkthroughWalkthroughThis PR adds an AppliedWatermark to EventsGroup, changes DML gating to use AppliedWatermark, ensures late/same-commitTs DML fragments are merged (not dropped), and advances AppliedWatermark after DDL/DML flush completion. Logging and tests updated accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as Kafka
participant Writer as Writer
participant Group as EventsGroup
participant Sink as DownstreamSink
Note over Kafka,Sink: DML ingest and merge flow
Kafka->>Writer: Emit DML (commitTs=100)
Writer->>Group: appendRow2Group(event commitTs=100)
Group->>Group: Compare commitTs vs AppliedWatermark
alt commitTs > AppliedWatermark
Group->>Group: Append or merge into existing commitTs bucket
Group-->>Writer: Accepted
else commitTs <= AppliedWatermark
Group-->>Writer: Skip (already applied)
end
Note over Writer,Sink: Flush path advances AppliedWatermark
Writer->>Writer: Flush DDL / flush DMLs by watermark
Writer->>Sink: Apply batch
Sink-->>Writer: ACK
Writer->>Group: Update AppliedWatermark = max(commits resolved)
Group-->>Writer: AppliedWatermark advanced
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/test all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@cmd/util/event_group.go`:
- Around line 66-71: mergeDMLEvent currently appends
Rows/RowTypes/Length/PostTxnFlushed but omits merging RowKeys and
ApproximateSize; update mergeDMLEvent(dst, src *commonEvent.DMLEvent) to also
append src.RowKeys into dst.RowKeys (preserving ordering to keep 1:1
correspondence with RowTypes) and add src.ApproximateSize to dst.ApproximateSize
(dst.ApproximateSize += src.ApproximateSize) so downstream key-based
deduplication and size aggregation remain correct.
| mergeDMLEvent := func(dst, src *commonEvent.DMLEvent) { | ||
| dst.Rows.Append(src.Rows, 0, src.Rows.NumRows()) | ||
| dst.RowTypes = append(dst.RowTypes, src.RowTypes...) | ||
| dst.Length += src.Length | ||
| dst.PostTxnFlushed = append(dst.PostTxnFlushed, src.PostTxnFlushed...) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for RowKeys usage to understand its purpose and importance
rg -nP --type=go '\.RowKeys' | head -150Repository: pingcap/ticdc
Length of output: 1600
🏁 Script executed:
#!/bin/bash
# Also search for ApproximateSize usage
rg -nP --type=go '\.ApproximateSize' | head -100Repository: pingcap/ticdc
Length of output: 391
🏁 Script executed:
#!/bin/bash
# Find the DMLEvent struct definition to understand all its fields
rg -nP --type=go 'type DMLEvent struct' -A 20Repository: pingcap/ticdc
Length of output: 1872
🏁 Script executed:
#!/bin/bash
# Check the mergeDMLEvent function context and how it's called
rg -nP --type=go 'mergeDMLEvent' -B 5 -A 10Repository: pingcap/ticdc
Length of output: 2609
Add missing field merges in mergeDMLEvent.
The function merges Rows, RowTypes, Length, and PostTxnFlushed, but omits:
RowKeys: Used downstream inmysql_writer_dml.gofor key-based deduplication (lines 346-348, 382-387). Omitting its merge breaks the 1:1 correspondence between row keys and row types, causing inconsistencies in key-based routing.ApproximateSize: Aggregates raw entry sizes for throughput calculations and is serialized with events. Must be summed:dst.ApproximateSize += src.ApproximateSize.
Both fields must be merged to maintain data consistency.
🤖 Prompt for AI Agents
In `@cmd/util/event_group.go` around lines 66 - 71, mergeDMLEvent currently
appends Rows/RowTypes/Length/PostTxnFlushed but omits merging RowKeys and
ApproximateSize; update mergeDMLEvent(dst, src *commonEvent.DMLEvent) to also
append src.RowKeys into dst.RowKeys (preserving ordering to keep 1:1
correspondence with RowTypes) and add src.ApproximateSize to dst.ApproximateSize
(dst.ApproximateSize += src.ApproximateSize) so downstream key-based
deduplication and size aggregation remain correct.
|
/retest |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: tenfyzhong, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
…r-applied-watermark # Conflicts: # cmd/kafka-consumer/writer.go # cmd/util/event_group_test.go
|
/retest |
What problem does this PR solve?
Issue Number: close #4051
What is changed and how it works?
cmd/util.EventsGroupto track the maximum commitTs that has been flushed to downstream.commitTs <= appliedWatermark. Do not drop events just becausecommitTs < HighWatermark.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No behavior change in the normal (commitTs-ordered) path. Under restart/replay, it may keep more buffered events instead of dropping them, which is required for correctness.
Do you need to update user documentation, design documentation or monitoring documentation?
None.
Release note
Summary by CodeRabbit
Bug Fixes
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.