Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions downstreamadapter/dispatcher/block_event_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,51 @@ func TestBlockEventExecutorDoesNotBlockOtherDispatchers(t *testing.T) {

close(unblockFirst)
}

// TestBlockEventExecutorSerializesPerDispatcherTasks verifies tasks submitted for the same dispatcher
// are executed sequentially (no overlap), even when multiple workers are available.
// This matters because concurrent execution for a single dispatcher can violate block-event ordering guarantees.
func TestBlockEventExecutorSerializesPerDispatcherTasks(t *testing.T) {
executor := newBlockEventExecutor()
t.Cleanup(executor.Close)

d := &BasicDispatcher{id: common.DispatcherID{Low: 1, High: 0}}

firstStarted := make(chan struct{})
unblockFirst := make(chan struct{})
secondStarted := make(chan struct{})

executor.Submit(d, func() {
close(firstStarted)
<-unblockFirst
})
require.Eventually(t, func() bool {
select {
case <-firstStarted:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)

executor.Submit(d, func() {
close(secondStarted)
})

// The second task must not start before the first task finishes.
select {
case <-secondStarted:
require.FailNow(t, "unexpected parallel execution for the same dispatcher")
case <-time.After(100 * time.Millisecond):
}

close(unblockFirst)
require.Eventually(t, func() bool {
select {
case <-secondStarted:
return true
default:
return false
}
}, time.Second, 10*time.Millisecond)
}
97 changes: 97 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_redo_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package dispatcher

import (
"sync/atomic"
"testing"
"time"

"github.com/pingcap/ticdc/downstreamadapter/sink"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/node"
"github.com/stretchr/testify/require"
)

// TestEventDispatcherRedoGating verifies redo gating enforces "commitTs <= redoGlobalTs" before emitting events.
// This matters because emitting events ahead of redo catch-up can break downstream correctness and recovery semantics.
// The key scenario is a DML batch with commitTs greater than redoGlobalTs that must be cached and replayed later.
func TestEventDispatcherRedoGating(t *testing.T) {
var redoGlobalTs atomic.Uint64
redoGlobalTs.Store(50)

mockSink := sink.NewMockSink(common.MysqlSinkType)
txnAtomicity := config.DefaultAtomicityLevel()
sharedInfo := NewSharedInfo(
common.NewChangefeedID4Test("test", "dispatcher_redo_gating"),
"system",
false,
false,
nil,
nil,
nil, // no syncpoint config is required for this unit test
&txnAtomicity,
false,
make(chan TableSpanStatusWithSeq, 16),
make(chan *heartbeatpb.TableSpanBlockStatus, 16),
make(chan error, 1),
)
t.Cleanup(sharedInfo.Close)

tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)
dispatcher := NewEventDispatcher(
common.NewDispatcherID(),
tableSpan,
0,
1,
NewSchemaIDToDispatchers(),
false,
false,
0,
mockSink,
sharedInfo,
true, // redoEnable
&redoGlobalTs,
)

// Avoid side effects on the dispatcher status dynamic stream in this unit test.
dispatcher.componentStatus.Set(heartbeatpb.ComponentState_Working)

dml := &commonEvent.DMLEvent{
StartTs: 1,
CommitTs: 100,
Length: 1,
}
from := node.NewID()

var wakeCount atomic.Int32
wakeCallback := func() {
wakeCount.Add(1)
}

// The event must be cached when redoGlobalTs < commitTs.
block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&from, dml)}, wakeCallback)
require.True(t, block)
require.Len(t, mockSink.GetDMLs(), 0)
require.Equal(t, int32(0), wakeCount.Load())

// Calling HandleCacheEvents too early must not leak events to the sink.
dispatcher.HandleCacheEvents()
require.Len(t, mockSink.GetDMLs(), 0)
require.Equal(t, int32(0), wakeCount.Load())

// Once redoGlobalTs catches up, the cached events are replayed into the sink.
redoGlobalTs.Store(100)
dispatcher.HandleCacheEvents()
require.Len(t, mockSink.GetDMLs(), 1)
require.Equal(t, int32(0), wakeCount.Load())

// Wake the upstream only after the sink flush completes.
require.Eventually(t, func() bool {
mockSink.FlushDMLs()
return wakeCount.Load() == int32(1)
}, time.Second, 10*time.Millisecond)
require.Len(t, mockSink.GetDMLs(), 0)
}
29 changes: 29 additions & 0 deletions downstreamadapter/sink/mysql/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/sink/mysql"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -73,6 +74,34 @@ func MysqlSinkForTestWithMaxTxnRows(maxTxnRows int) (*Sink, sqlmock.Sqlmock) {
return sink, mock
}

// TestMysqlSinkWriteBlockEventSkipsNonPrimaryBDRRole verifies that in BDR mode TiCDC skips executing
// non-primary-role DDLs and still calls PostFlush to unblock upstream.
// This matters because executing DDLs from non-primary roles can break downstream correctness, while skipping
// without PostFlush can deadlock the pipeline.
func TestMysqlSinkWriteBlockEventSkipsNonPrimaryBDRRole(t *testing.T) {
_, sink, mock := getMysqlSink()
sink.bdrMode = true

var postFlushCount atomic.Int64
ddlEvent := &commonEvent.DDLEvent{
FinishedTs: 1,
BDRMode: string(ast.BDRRoleSecondary),
BlockedTables: &commonEvent.InfluencedTables{
InfluenceType: commonEvent.InfluenceTypeNormal,
TableIDs: []int64{0},
},
PostTxnFlushed: []func(){
func() { postFlushCount.Add(1) },
},
}

err := sink.WriteBlockEvent(ddlEvent)
require.NoError(t, err)
require.True(t, sink.IsNormal())
require.Equal(t, int64(1), postFlushCount.Load())
require.NoError(t, mock.ExpectationsWereMet())
}

// Test callback and tableProgress works as expected after AddDMLEvent
func TestMysqlSinkBasicFunctionality(t *testing.T) {
sink, mock := MysqlSinkForTest()
Expand Down
122 changes: 122 additions & 0 deletions maintainer/barrier_event_forward_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package maintainer

import (
"testing"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/replica"
"github.com/pingcap/ticdc/pkg/common"
"github.com/stretchr/testify/require"
)

// TestForwardBarrierEvent verifies the forwarding rules for barrier events against a span replication's
// checkpoint and block state. This matters because incorrect forwarding can either deadlock the barrier
// (never advancing) or violate ordering guarantees (advancing past an unflushed barrier).
// The key boundary is checkpointTs == commitTs, which must not be treated as "passed" except when
// ordering guarantees it (replication is blocked on a syncpoint at the same ts while the event is a DDL barrier).
func TestForwardBarrierEvent(t *testing.T) {
makeReplication := func(t *testing.T, checkpointTs uint64, blockState *heartbeatpb.State) *replica.SpanReplication {
t.Helper()

span := &heartbeatpb.TableSpan{
KeyspaceID: common.DefaultKeyspaceID,
TableID: 1,
}
startKey, endKey, err := common.GetKeyspaceTableRange(span.KeyspaceID, span.TableID)
require.NoError(t, err)
span.StartKey = common.ToComparableKey(startKey)
span.EndKey = common.ToComparableKey(endKey)

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme)
r := replica.NewSpanReplication(
cfID,
common.NewDispatcherID(),
1,
span,
checkpointTs,
common.DefaultMode,
false,
)
if blockState != nil {
r.UpdateBlockState(*blockState)
}
return r
}

cases := []struct {
name string
checkpoint uint64
blockState *heartbeatpb.State
event *BarrierEvent
shouldAllow bool
}{
{
name: "checkpoint beyond barrier",
checkpoint: 101,
blockState: nil,
event: &BarrierEvent{commitTs: 100, isSyncPoint: false},
shouldAllow: true,
},
{
name: "checkpoint equals barrier without block state",
checkpoint: 100,
blockState: nil,
event: &BarrierEvent{commitTs: 100, isSyncPoint: false},
shouldAllow: false,
},
{
name: "checkpoint before barrier without block state",
checkpoint: 99,
blockState: nil,
event: &BarrierEvent{commitTs: 100, isSyncPoint: false},
shouldAllow: false,
},
{
name: "block state beyond barrier",
checkpoint: 99,
blockState: &heartbeatpb.State{
BlockTs: 101,
IsSyncPoint: false,
},
event: &BarrierEvent{commitTs: 100, isSyncPoint: false},
shouldAllow: true,
},
{
name: "syncpoint at same ts forwards ddl barrier",
checkpoint: 100,
blockState: &heartbeatpb.State{
BlockTs: 100,
IsSyncPoint: true,
},
event: &BarrierEvent{commitTs: 100, isSyncPoint: false},
shouldAllow: true,
},
{
name: "ddl at same ts does not forward syncpoint barrier",
checkpoint: 100,
blockState: &heartbeatpb.State{
BlockTs: 100,
IsSyncPoint: false,
},
event: &BarrierEvent{commitTs: 100, isSyncPoint: true},
shouldAllow: false,
},
{
name: "syncpoint at same ts does not forward syncpoint barrier",
checkpoint: 100,
blockState: &heartbeatpb.State{
BlockTs: 100,
IsSyncPoint: true,
},
event: &BarrierEvent{commitTs: 100, isSyncPoint: true},
shouldAllow: false,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
r := makeReplication(t, tc.checkpoint, tc.blockState)
require.Equal(t, tc.shouldAllow, forwardBarrierEvent(r, tc.event))
})
}
}
25 changes: 25 additions & 0 deletions maintainer/barrier_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,28 @@ func TestPendingScheduleEventMapPopIfHead(t *testing.T) {
require.True(t, ready)
require.Equal(t, event2, candidate)
}

// TestPendingScheduleEventMapOrdersDDLBeforeSyncpointAtSameTs verifies event ordering when DDL and syncpoint
// share the same commitTs. This matters because scheduling must respect the DDL-before-syncpoint ordering
// guarantee to avoid advancing a syncpoint before the corresponding DDL barrier is handled.
func TestPendingScheduleEventMapOrdersDDLBeforeSyncpointAtSameTs(t *testing.T) {
m := newPendingScheduleEventMap()
ddlBarrier := &BarrierEvent{commitTs: 10, isSyncPoint: false}
syncpointBarrier := &BarrierEvent{commitTs: 10, isSyncPoint: true}

// Add in reverse order to ensure ordering is determined by the heap, not insertion order.
m.add(syncpointBarrier)
m.add(ddlBarrier)

ready, candidate := m.popIfHead(syncpointBarrier)
require.False(t, ready)
require.Equal(t, ddlBarrier, candidate)

ready, candidate = m.popIfHead(ddlBarrier)
require.True(t, ready)
require.Equal(t, ddlBarrier, candidate)

ready, candidate = m.popIfHead(syncpointBarrier)
require.True(t, ready)
require.Equal(t, syncpointBarrier, candidate)
}
Loading