diff --git a/downstreamadapter/dispatcher/block_event_executor_test.go b/downstreamadapter/dispatcher/block_event_executor_test.go index 64fd3d4ddf..8e772c3fb8 100644 --- a/downstreamadapter/dispatcher/block_event_executor_test.go +++ b/downstreamadapter/dispatcher/block_event_executor_test.go @@ -60,3 +60,51 @@ func TestBlockEventExecutorDoesNotBlockOtherDispatchers(t *testing.T) { close(unblockFirst) } + +// TestBlockEventExecutorSerializesPerDispatcherTasks verifies tasks submitted for the same dispatcher +// are executed sequentially (no overlap), even when multiple workers are available. +// This matters because concurrent execution for a single dispatcher can violate block-event ordering guarantees. +func TestBlockEventExecutorSerializesPerDispatcherTasks(t *testing.T) { + executor := newBlockEventExecutor() + t.Cleanup(executor.Close) + + d := &BasicDispatcher{id: common.DispatcherID{Low: 1, High: 0}} + + firstStarted := make(chan struct{}) + unblockFirst := make(chan struct{}) + secondStarted := make(chan struct{}) + + executor.Submit(d, func() { + close(firstStarted) + <-unblockFirst + }) + require.Eventually(t, func() bool { + select { + case <-firstStarted: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + + executor.Submit(d, func() { + close(secondStarted) + }) + + // The second task must not start before the first task finishes. + select { + case <-secondStarted: + require.FailNow(t, "unexpected parallel execution for the same dispatcher") + case <-time.After(100 * time.Millisecond): + } + + close(unblockFirst) + require.Eventually(t, func() bool { + select { + case <-secondStarted: + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) +} diff --git a/downstreamadapter/dispatcher/event_dispatcher_redo_cache_test.go b/downstreamadapter/dispatcher/event_dispatcher_redo_cache_test.go new file mode 100644 index 0000000000..e8561a3da2 --- /dev/null +++ b/downstreamadapter/dispatcher/event_dispatcher_redo_cache_test.go @@ -0,0 +1,97 @@ +package dispatcher + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/ticdc/downstreamadapter/sink" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/node" + "github.com/stretchr/testify/require" +) + +// TestEventDispatcherRedoGating verifies redo gating enforces "commitTs <= redoGlobalTs" before emitting events. +// This matters because emitting events ahead of redo catch-up can break downstream correctness and recovery semantics. +// The key scenario is a DML batch with commitTs greater than redoGlobalTs that must be cached and replayed later. +func TestEventDispatcherRedoGating(t *testing.T) { + var redoGlobalTs atomic.Uint64 + redoGlobalTs.Store(50) + + mockSink := sink.NewMockSink(common.MysqlSinkType) + txnAtomicity := config.DefaultAtomicityLevel() + sharedInfo := NewSharedInfo( + common.NewChangefeedID4Test("test", "dispatcher_redo_gating"), + "system", + false, + false, + nil, + nil, + nil, // no syncpoint config is required for this unit test + &txnAtomicity, + false, + make(chan TableSpanStatusWithSeq, 16), + make(chan *heartbeatpb.TableSpanBlockStatus, 16), + make(chan error, 1), + ) + t.Cleanup(sharedInfo.Close) + + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + dispatcher := NewEventDispatcher( + common.NewDispatcherID(), + tableSpan, + 0, + 1, + NewSchemaIDToDispatchers(), + false, + false, + 0, + mockSink, + sharedInfo, + true, // redoEnable + &redoGlobalTs, + ) + + // Avoid side effects on the dispatcher status dynamic stream in this unit test. + dispatcher.componentStatus.Set(heartbeatpb.ComponentState_Working) + + dml := &commonEvent.DMLEvent{ + StartTs: 1, + CommitTs: 100, + Length: 1, + } + from := node.NewID() + + var wakeCount atomic.Int32 + wakeCallback := func() { + wakeCount.Add(1) + } + + // The event must be cached when redoGlobalTs < commitTs. + block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&from, dml)}, wakeCallback) + require.True(t, block) + require.Len(t, mockSink.GetDMLs(), 0) + require.Equal(t, int32(0), wakeCount.Load()) + + // Calling HandleCacheEvents too early must not leak events to the sink. + dispatcher.HandleCacheEvents() + require.Len(t, mockSink.GetDMLs(), 0) + require.Equal(t, int32(0), wakeCount.Load()) + + // Once redoGlobalTs catches up, the cached events are replayed into the sink. + redoGlobalTs.Store(100) + dispatcher.HandleCacheEvents() + require.Len(t, mockSink.GetDMLs(), 1) + require.Equal(t, int32(0), wakeCount.Load()) + + // Wake the upstream only after the sink flush completes. + require.Eventually(t, func() bool { + mockSink.FlushDMLs() + return wakeCount.Load() == int32(1) + }, time.Second, 10*time.Millisecond) + require.Len(t, mockSink.GetDMLs(), 0) +} diff --git a/downstreamadapter/sink/mysql/sink_test.go b/downstreamadapter/sink/mysql/sink_test.go index 4fbc625115..4d61009a0a 100644 --- a/downstreamadapter/sink/mysql/sink_test.go +++ b/downstreamadapter/sink/mysql/sink_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/sink/mysql" + "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/stretchr/testify/require" ) @@ -73,6 +74,34 @@ func MysqlSinkForTestWithMaxTxnRows(maxTxnRows int) (*Sink, sqlmock.Sqlmock) { return sink, mock } +// TestMysqlSinkWriteBlockEventSkipsNonPrimaryBDRRole verifies that in BDR mode TiCDC skips executing +// non-primary-role DDLs and still calls PostFlush to unblock upstream. +// This matters because executing DDLs from non-primary roles can break downstream correctness, while skipping +// without PostFlush can deadlock the pipeline. +func TestMysqlSinkWriteBlockEventSkipsNonPrimaryBDRRole(t *testing.T) { + _, sink, mock := getMysqlSink() + sink.bdrMode = true + + var postFlushCount atomic.Int64 + ddlEvent := &commonEvent.DDLEvent{ + FinishedTs: 1, + BDRMode: string(ast.BDRRoleSecondary), + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + PostTxnFlushed: []func(){ + func() { postFlushCount.Add(1) }, + }, + } + + err := sink.WriteBlockEvent(ddlEvent) + require.NoError(t, err) + require.True(t, sink.IsNormal()) + require.Equal(t, int64(1), postFlushCount.Load()) + require.NoError(t, mock.ExpectationsWereMet()) +} + // Test callback and tableProgress works as expected after AddDMLEvent func TestMysqlSinkBasicFunctionality(t *testing.T) { sink, mock := MysqlSinkForTest() diff --git a/maintainer/barrier_event_forward_test.go b/maintainer/barrier_event_forward_test.go new file mode 100644 index 0000000000..d055f89de8 --- /dev/null +++ b/maintainer/barrier_event_forward_test.go @@ -0,0 +1,122 @@ +package maintainer + +import ( + "testing" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/maintainer/replica" + "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" +) + +// TestForwardBarrierEvent verifies the forwarding rules for barrier events against a span replication's +// checkpoint and block state. This matters because incorrect forwarding can either deadlock the barrier +// (never advancing) or violate ordering guarantees (advancing past an unflushed barrier). +// The key boundary is checkpointTs == commitTs, which must not be treated as "passed" except when +// ordering guarantees it (replication is blocked on a syncpoint at the same ts while the event is a DDL barrier). +func TestForwardBarrierEvent(t *testing.T) { + makeReplication := func(t *testing.T, checkpointTs uint64, blockState *heartbeatpb.State) *replica.SpanReplication { + t.Helper() + + span := &heartbeatpb.TableSpan{ + KeyspaceID: common.DefaultKeyspaceID, + TableID: 1, + } + startKey, endKey, err := common.GetKeyspaceTableRange(span.KeyspaceID, span.TableID) + require.NoError(t, err) + span.StartKey = common.ToComparableKey(startKey) + span.EndKey = common.ToComparableKey(endKey) + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme) + r := replica.NewSpanReplication( + cfID, + common.NewDispatcherID(), + 1, + span, + checkpointTs, + common.DefaultMode, + false, + ) + if blockState != nil { + r.UpdateBlockState(*blockState) + } + return r + } + + cases := []struct { + name string + checkpoint uint64 + blockState *heartbeatpb.State + event *BarrierEvent + shouldAllow bool + }{ + { + name: "checkpoint beyond barrier", + checkpoint: 101, + blockState: nil, + event: &BarrierEvent{commitTs: 100, isSyncPoint: false}, + shouldAllow: true, + }, + { + name: "checkpoint equals barrier without block state", + checkpoint: 100, + blockState: nil, + event: &BarrierEvent{commitTs: 100, isSyncPoint: false}, + shouldAllow: false, + }, + { + name: "checkpoint before barrier without block state", + checkpoint: 99, + blockState: nil, + event: &BarrierEvent{commitTs: 100, isSyncPoint: false}, + shouldAllow: false, + }, + { + name: "block state beyond barrier", + checkpoint: 99, + blockState: &heartbeatpb.State{ + BlockTs: 101, + IsSyncPoint: false, + }, + event: &BarrierEvent{commitTs: 100, isSyncPoint: false}, + shouldAllow: true, + }, + { + name: "syncpoint at same ts forwards ddl barrier", + checkpoint: 100, + blockState: &heartbeatpb.State{ + BlockTs: 100, + IsSyncPoint: true, + }, + event: &BarrierEvent{commitTs: 100, isSyncPoint: false}, + shouldAllow: true, + }, + { + name: "ddl at same ts does not forward syncpoint barrier", + checkpoint: 100, + blockState: &heartbeatpb.State{ + BlockTs: 100, + IsSyncPoint: false, + }, + event: &BarrierEvent{commitTs: 100, isSyncPoint: true}, + shouldAllow: false, + }, + { + name: "syncpoint at same ts does not forward syncpoint barrier", + checkpoint: 100, + blockState: &heartbeatpb.State{ + BlockTs: 100, + IsSyncPoint: true, + }, + event: &BarrierEvent{commitTs: 100, isSyncPoint: true}, + shouldAllow: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + r := makeReplication(t, tc.checkpoint, tc.blockState) + require.Equal(t, tc.shouldAllow, forwardBarrierEvent(r, tc.event)) + }) + } +} diff --git a/maintainer/barrier_helper_test.go b/maintainer/barrier_helper_test.go index 364951fb50..70b2486246 100644 --- a/maintainer/barrier_helper_test.go +++ b/maintainer/barrier_helper_test.go @@ -58,3 +58,28 @@ func TestPendingScheduleEventMapPopIfHead(t *testing.T) { require.True(t, ready) require.Equal(t, event2, candidate) } + +// TestPendingScheduleEventMapOrdersDDLBeforeSyncpointAtSameTs verifies event ordering when DDL and syncpoint +// share the same commitTs. This matters because scheduling must respect the DDL-before-syncpoint ordering +// guarantee to avoid advancing a syncpoint before the corresponding DDL barrier is handled. +func TestPendingScheduleEventMapOrdersDDLBeforeSyncpointAtSameTs(t *testing.T) { + m := newPendingScheduleEventMap() + ddlBarrier := &BarrierEvent{commitTs: 10, isSyncPoint: false} + syncpointBarrier := &BarrierEvent{commitTs: 10, isSyncPoint: true} + + // Add in reverse order to ensure ordering is determined by the heap, not insertion order. + m.add(syncpointBarrier) + m.add(ddlBarrier) + + ready, candidate := m.popIfHead(syncpointBarrier) + require.False(t, ready) + require.Equal(t, ddlBarrier, candidate) + + ready, candidate = m.popIfHead(ddlBarrier) + require.True(t, ready) + require.Equal(t, ddlBarrier, candidate) + + ready, candidate = m.popIfHead(syncpointBarrier) + require.True(t, ready) + require.Equal(t, syncpointBarrier, candidate) +} diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go index 30d04baf6c..cf95dacd85 100644 --- a/maintainer/barrier_test.go +++ b/maintainer/barrier_test.go @@ -475,6 +475,100 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { require.Len(t, barrier.blockedEvents.m, 0) } +// TestBarrierDiscardDBBlockEventWhilePendingScheduleEvents verifies the barrier does not ACK DB-level block events +// reported by the table trigger dispatcher while there are pending schedule-required DDL events. +// This matters because building DB/All range checkers based on an incomplete task snapshot can cause the barrier +// to advance incorrectly and break ordering/correctness guarantees. +func TestBarrierDiscardDBBlockEventWhilePendingScheduleEvents(t *testing.T) { + testutil.SetUpTestServices() + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + // Create one replicating table so the normal block event can be fully covered by the range checker. + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 10) + stm := spanController.GetTasksByTableID(1)[0] + spanController.BindSpanToNode("", "node1", stm) + spanController.MarkSpanReplicating(stm) + + barrier := NewBarrier(spanController, operatorController, false, nil, common.DefaultMode) + + // Step 1: Create a schedule-required normal block event so pendingScheduleEventMap becomes non-empty. + _ = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: stm.ID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{0, 1}, + }, + NeedDroppedTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{1}, + }, + }, + }, + { + ID: tableTriggerEventDispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{0, 1}, + }, + NeedDroppedTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{1}, + }, + }, + }, + }, + }) + require.Greater(t, barrier.pendingEvents.Len(), 0) + + // Step 2: A DB-level block event from the table trigger dispatcher must be discarded (no ACK) while pending exists. + msgs := barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: tableTriggerEventDispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 20, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_DB, + SchemaID: 1, + }, + IsSyncPoint: false, + }, + }, + }, + }) + require.NotNil(t, msgs) + resp := msgs[0].Message[0].(*heartbeatpb.HeartBeatResponse) + require.Len(t, resp.DispatcherStatuses, 0) + + // The event is tracked but should not have a range checker yet, because the report is discarded. + key := getEventKey(20, false) + event, ok := barrier.blockedEvents.Get(key) + require.True(t, ok) + require.Nil(t, event.rangeChecker) +} + func TestSchemaBlock(t *testing.T) { testutil.SetUpTestServices() nm := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) diff --git a/pkg/sink/mysql/format_ddl_test.go b/pkg/sink/mysql/format_ddl_test.go new file mode 100644 index 0000000000..73e4a27104 --- /dev/null +++ b/pkg/sink/mysql/format_ddl_test.go @@ -0,0 +1,24 @@ +package mysql + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestFormatQueryRewritesVectorColumnType verifies vector column definitions are rewritten into a +// downstream-compatible type. This matters because leaving unsupported vector types in DDL would +// cause DDL execution failures and stall replication. +// The key scenario is a CREATE TABLE statement containing a VECTOR column that must be converted. +func TestFormatQueryRewritesVectorColumnType(t *testing.T) { + in := "CREATE TABLE test.t(id int primary key, data VECTOR(5) COMMENT 'vec');" + out := formatQuery(in) + require.NotEmpty(t, out) + + upper := strings.ToUpper(out) + require.True(t, strings.Contains(upper, "LONGTEXT") || strings.Contains(upper, "LONGBLOB"), + "expected rewritten query to use a compatible long value type") + require.NotContains(t, upper, "VECTOR") + require.NotContains(t, upper, "COMMENT") +}