Skip to content

Conversation

@wlwilliamx
Copy link
Collaborator

@wlwilliamx wlwilliamx commented Jan 22, 2026

What problem does this PR solve?

Issue Number: close #4051

What is changed and how it works?

  • Introduce an "applied" watermark in cmd/util.EventsGroup to track the maximum commitTs that has been flushed to downstream.
  • Advance the applied watermark after a successful flush (both watermark-driven DML flush and DDL-preflush).
  • In kafka-consumer, only ignore replayed DML when commitTs <= appliedWatermark. Do not drop events just because commitTs < HighWatermark.
  • When force-inserting out-of-order commitTs, merge into the existing commitTs event to avoid splitting one upstream transaction.

Check List

Tests

  • Unit test
  • Integration test

Questions

Will it cause performance regression or break compatibility?

No behavior change in the normal (commitTs-ordered) path. Under restart/replay, it may keep more buffered events instead of dropping them, which is required for correctness.

Do you need to update user documentation, design documentation or monitoring documentation?

None.

Release note

Fix kafka-consumer dropping replayed DML events by deduplicating based on an applied (flushed) watermark, preventing downstream inconsistency after retries/restarts.

Summary by CodeRabbit

  • Bug Fixes

    • Prevented splitting of upstream transactions by merging related event fragments into a single downstream transaction.
    • Ensured events are not dropped prematurely by aligning append behavior with applied watermarks.
  • Improvements

    • Enhanced watermark tracking and visibility to improve event ordering and processing reliability.
    • Unified append behavior to reduce fallback divergence and improve logging clarity.
  • Tests

    • Added tests for late-fragment merging and commit-timestamp/watermark progression.

✏️ Tip: You can customize this high-level summary in your review settings.

@ti-chi-bot ti-chi-bot bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Jan 22, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the robustness of the Kafka consumer by refining its event deduplication and handling of out-of-order DML events. By introducing an "applied watermark," the consumer can now accurately distinguish between truly processed events and those that are replayed due to system restarts or retries. This prevents premature dropping of necessary events, thereby ensuring data consistency in the downstream system, particularly in scenarios involving at-least-once replay semantics.

Highlights

  • Introduced Applied Watermark: A new AppliedWatermark field is added to util.EventsGroup to track the maximum CommitTs that has been successfully flushed to the downstream system.
  • Enhanced DML Deduplication Logic: The Kafka consumer now uses the AppliedWatermark to determine if a replayed DML event should be ignored, ensuring that events are only dropped if they have already been processed downstream.
  • Improved Out-of-Order Event Handling: When DML events with an older CommitTs arrive after a newer one (e.g., due to retries), the system now merges these "fallback" events into existing transactions to maintain data integrity and avoid splitting upstream transactions.
  • Watermark Advancement: The AppliedWatermark is advanced after both DML events flushed by watermark and DDL-preflush operations are successfully written to the sink.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a crucial fix for handling replayed DML events in the Kafka consumer by using an appliedWatermark for deduplication. The changes significantly improve correctness by preventing data loss on retries/restarts and simplify the event handling logic in appendRow2Group. The addition of AppliedWatermark to EventsGroup and the logic to merge events with the same commit timestamp during forced insertion are well-implemented. The new unit tests effectively validate the new behavior.

I've made a couple of suggestions to reduce some code duplication in writer.go, which could further improve maintainability. Overall, this is a great improvement.

@ti-chi-bot ti-chi-bot bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Jan 22, 2026
@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

This PR adds an AppliedWatermark to EventsGroup, changes DML gating to use AppliedWatermark, ensures late/same-commitTs DML fragments are merged (not dropped), and advances AppliedWatermark after DDL/DML flush completion. Logging and tests updated accordingly.

Changes

Cohort / File(s) Summary
Writer logic
cmd/kafka-consumer/writer.go
Replace watermark-based gating with commitTs <= group.AppliedWatermark; record resolvedGroups during DDL/DML flush paths and advance groups' AppliedWatermark after flush completion; unify DML append path (remove protocol branching); add extended logging (appliedWatermark, highWatermark, protocol).
EventsGroup internals
cmd/util/event_group.go
Add HighWatermark and AppliedWatermark public fields; introduce atomic merge helper for DMLEvent merges on equal CommitTs; in force-insert path insert by commitTs order and merge when same CommitTs exists to avoid splitting transactions; update HighWatermark management.
Tests and test setup
cmd/kafka-consumer/writer_test.go, cmd/util/event_group_test.go
Add tests validating that fallback commitTs events are not dropped before AppliedWatermark and that late fragments merge into existing commitTs events; update imports/aliases and test wiring for EventRouter-based sink and partition table accessor.

Sequence Diagram(s)

sequenceDiagram
    participant Kafka as Kafka
    participant Writer as Writer
    participant Group as EventsGroup
    participant Sink as DownstreamSink

    Note over Kafka,Sink: DML ingest and merge flow
    Kafka->>Writer: Emit DML (commitTs=100)
    Writer->>Group: appendRow2Group(event commitTs=100)
    Group->>Group: Compare commitTs vs AppliedWatermark
    alt commitTs > AppliedWatermark
        Group->>Group: Append or merge into existing commitTs bucket
        Group-->>Writer: Accepted
    else commitTs <= AppliedWatermark
        Group-->>Writer: Skip (already applied)
    end

    Note over Writer,Sink: Flush path advances AppliedWatermark
    Writer->>Writer: Flush DDL / flush DMLs by watermark
    Writer->>Sink: Apply batch
    Sink-->>Writer: ACK
    Writer->>Group: Update AppliedWatermark = max(commits resolved)
    Group-->>Writer: AppliedWatermark advanced
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I nibbled logs and stitched the seam,

Where seen and flushed now split the stream.
No fragment lost, no duplicate fray—
Applied marks show the honest way. ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: introducing deduplication of replayed DML events using an applied watermark instead of the HighWatermark.
Description check ✅ Passed The description includes all required sections: Issue Number linking to #4051, detailed explanation of changes, test coverage (unit and integration tests), impact assessment, and a release note.
Linked Issues check ✅ Passed The PR fully addresses issue #4051 by replacing HighWatermark-based deduplication with applied watermark tracking, merging out-of-order events, and adding corresponding tests.
Out of Scope Changes check ✅ Passed All changes are directly related to the PR objectives: EventsGroup field additions, kafka-consumer logic changes for watermark handling, new test cases validating merge behavior, and no extraneous modifications.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

🧪 Unit Test Generation v2 is now available!

We have significantly improved our unit test generation capabilities.

To enable: Add this to your .coderabbit.yaml configuration:

reviews:
  finishing_touches:
    unit_tests:
      enabled: true

Try it out by using the @coderabbitai generate unit tests command on your code files or under ✨ Finishing Touches on the walkthrough!

Have feedback? Share your thoughts on our Discord thread!


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@wlwilliamx
Copy link
Collaborator Author

/test all

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@cmd/util/event_group.go`:
- Around line 66-71: mergeDMLEvent currently appends
Rows/RowTypes/Length/PostTxnFlushed but omits merging RowKeys and
ApproximateSize; update mergeDMLEvent(dst, src *commonEvent.DMLEvent) to also
append src.RowKeys into dst.RowKeys (preserving ordering to keep 1:1
correspondence with RowTypes) and add src.ApproximateSize to dst.ApproximateSize
(dst.ApproximateSize += src.ApproximateSize) so downstream key-based
deduplication and size aggregation remain correct.

Comment on lines +66 to +71
mergeDMLEvent := func(dst, src *commonEvent.DMLEvent) {
dst.Rows.Append(src.Rows, 0, src.Rows.NumRows())
dst.RowTypes = append(dst.RowTypes, src.RowTypes...)
dst.Length += src.Length
dst.PostTxnFlushed = append(dst.PostTxnFlushed, src.PostTxnFlushed...)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for RowKeys usage to understand its purpose and importance
rg -nP --type=go '\.RowKeys' | head -150

Repository: pingcap/ticdc

Length of output: 1600


🏁 Script executed:

#!/bin/bash
# Also search for ApproximateSize usage
rg -nP --type=go '\.ApproximateSize' | head -100

Repository: pingcap/ticdc

Length of output: 391


🏁 Script executed:

#!/bin/bash
# Find the DMLEvent struct definition to understand all its fields
rg -nP --type=go 'type DMLEvent struct' -A 20

Repository: pingcap/ticdc

Length of output: 1872


🏁 Script executed:

#!/bin/bash
# Check the mergeDMLEvent function context and how it's called
rg -nP --type=go 'mergeDMLEvent' -B 5 -A 10

Repository: pingcap/ticdc

Length of output: 2609


Add missing field merges in mergeDMLEvent.

The function merges Rows, RowTypes, Length, and PostTxnFlushed, but omits:

  • RowKeys: Used downstream in mysql_writer_dml.go for key-based deduplication (lines 346-348, 382-387). Omitting its merge breaks the 1:1 correspondence between row keys and row types, causing inconsistencies in key-based routing.
  • ApproximateSize: Aggregates raw entry sizes for throughput calculations and is serialized with events. Must be summed: dst.ApproximateSize += src.ApproximateSize.

Both fields must be merged to maintain data consistency.

🤖 Prompt for AI Agents
In `@cmd/util/event_group.go` around lines 66 - 71, mergeDMLEvent currently
appends Rows/RowTypes/Length/PostTxnFlushed but omits merging RowKeys and
ApproximateSize; update mergeDMLEvent(dst, src *commonEvent.DMLEvent) to also
append src.RowKeys into dst.RowKeys (preserving ordering to keep 1:1
correspondence with RowTypes) and add src.ApproximateSize to dst.ApproximateSize
(dst.ApproximateSize += src.ApproximateSize) so downstream key-based
deduplication and size aggregation remain correct.

@wlwilliamx
Copy link
Collaborator Author

/retest

@ti-chi-bot ti-chi-bot bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels Jan 30, 2026
@ti-chi-bot ti-chi-bot bot added the lgtm label Jan 30, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 30, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: tenfyzhong, wk989898

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:
  • OWNERS [tenfyzhong,wk989898]

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot removed the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label Jan 30, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 30, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-01-30 10:37:38.727039645 +0000 UTC m=+1354286.340996501: ☑️ agreed by wk989898.
  • 2026-01-30 10:46:48.447624162 +0000 UTC m=+1354836.061581008: ☑️ agreed by tenfyzhong.

@wlwilliamx
Copy link
Collaborator Author

/retest

…r-applied-watermark

# Conflicts:
#	cmd/kafka-consumer/writer.go
#	cmd/util/event_group_test.go
@wlwilliamx
Copy link
Collaborator Author

/retest

@wlwilliamx wlwilliamx merged commit 50a3798 into pingcap:master Jan 30, 2026
21 of 27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

kafka-consumer may drop replayed DML events based on HighWatermark and cause downstream inconsistency

3 participants