diff --git a/common/domain/replicationTaskExecutor.go b/common/domain/replicationTaskExecutor.go index a02727af3fc..f874099bd13 100644 --- a/common/domain/replicationTaskExecutor.go +++ b/common/domain/replicationTaskExecutor.go @@ -25,9 +25,13 @@ package domain import ( "context" "errors" + "fmt" "time" + guuid "github.com/google/uuid" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" @@ -66,23 +70,28 @@ type ( } domainReplicationTaskExecutorImpl struct { - domainManager persistence.DomainManager - timeSource clock.TimeSource - logger log.Logger + domainManager persistence.DomainManager + timeSource clock.TimeSource + logger log.Logger + domainAuditManager persistence.DomainAuditManager + enableDomainAuditLogging dynamicproperties.BoolPropertyFn } ) // NewReplicationTaskExecutor create a new instance of domain replicator func NewReplicationTaskExecutor( domainManager persistence.DomainManager, + domainAuditManager persistence.DomainAuditManager, timeSource clock.TimeSource, logger log.Logger, + enableDomainAuditLogging dynamicproperties.BoolPropertyFn, ) ReplicationTaskExecutor { - return &domainReplicationTaskExecutorImpl{ - domainManager: domainManager, - timeSource: timeSource, - logger: logger, + domainManager: domainManager, + timeSource: timeSource, + logger: logger, + domainAuditManager: domainAuditManager, + enableDomainAuditLogging: enableDomainAuditLogging, } } @@ -189,7 +198,21 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainCreationReplicationTask( return err } - return err + resp, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ + ID: task.GetID(), + }) + if getErr != nil { + return fmt.Errorf("failed to get domain while trying to create domain audit log: %w", getErr) + } + h.createDomainAuditLog( + ctx, + task, + persistence.DomainAuditOperationTypeCreate, + nil, + resp, + ) + + return nil } // handleDomainUpdateReplicationTask handles the domain update replication task @@ -210,9 +233,11 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct // plus, we need to check whether the config version is <= the config version set in the input // plus, we need to check whether the failover version is <= the failover version set in the input - existingDomain, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ + originalDomainState, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ Name: task.Info.GetName(), }) + intendedDomainState := originalDomainState.DeepCopy() + if err != nil { if _, ok := err.(*types.EntityNotExistsError); ok { // this can happen if the create domain replication task is to processed. @@ -225,18 +250,18 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct recordUpdated := false request := &persistence.UpdateDomainRequest{ - Info: existingDomain.Info, - Config: existingDomain.Config, - ReplicationConfig: existingDomain.ReplicationConfig, - ConfigVersion: existingDomain.ConfigVersion, - FailoverVersion: existingDomain.FailoverVersion, - FailoverNotificationVersion: existingDomain.FailoverNotificationVersion, - PreviousFailoverVersion: existingDomain.PreviousFailoverVersion, + Info: intendedDomainState.Info, + Config: intendedDomainState.Config, + ReplicationConfig: intendedDomainState.ReplicationConfig, + ConfigVersion: intendedDomainState.ConfigVersion, + FailoverVersion: intendedDomainState.FailoverVersion, + FailoverNotificationVersion: intendedDomainState.FailoverNotificationVersion, + PreviousFailoverVersion: intendedDomainState.PreviousFailoverVersion, NotificationVersion: notificationVersion, LastUpdatedTime: h.timeSource.Now().UnixNano(), } - if existingDomain.ConfigVersion < task.GetConfigVersion() { + if intendedDomainState.ConfigVersion < task.GetConfigVersion() { recordUpdated = true request.Info = &persistence.DomainInfo{ ID: task.GetID(), @@ -263,24 +288,22 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct request.ConfigVersion = task.GetConfigVersion() } - // todo (david.porter) reason through if this is compatible with the proposed merge strategy - // in active/active domains. - if existingDomain.FailoverVersion < task.GetFailoverVersion() { + if originalDomainState.FailoverVersion < task.GetFailoverVersion() { recordUpdated = true request.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName() request.ReplicationConfig.ActiveClusters = task.ReplicationConfig.GetActiveClusters() request.FailoverVersion = task.GetFailoverVersion() request.FailoverNotificationVersion = notificationVersion request.PreviousFailoverVersion = task.GetPreviousFailoverVersion() - } else if !existingDomain.ReplicationConfig.IsActiveActive() { + } else if !originalDomainState.ReplicationConfig.IsActiveActive() { h.logger.Warn("the existing failover version was more recent, indicating that the domain replication message was out of date and is consequently being dropped", - tag.WorkflowDomainName(existingDomain.Info.Name), - tag.FailoverVersion(existingDomain.FailoverVersion), + tag.WorkflowDomainName(originalDomainState.Info.Name), + tag.FailoverVersion(originalDomainState.FailoverVersion), tag.FailoverVersion(task.GetFailoverVersion())) } - if existingDomain.ReplicationConfig.IsActiveActive() || task.ReplicationConfig.IsActiveActive() { - mergedActiveClusters, aaChanged := mergeActiveActiveScopes(existingDomain.ReplicationConfig.ActiveClusters, task.ReplicationConfig.ActiveClusters) + if intendedDomainState.ReplicationConfig.IsActiveActive() || task.ReplicationConfig.IsActiveActive() { + mergedActiveClusters, aaChanged := mergeActiveActiveScopes(intendedDomainState.ReplicationConfig.ActiveClusters, task.ReplicationConfig.ActiveClusters) if aaChanged { recordUpdated = true request.ReplicationConfig.ActiveClusters = mergedActiveClusters @@ -288,10 +311,48 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct } if !recordUpdated { + h.logger.Warn("no record updated while handling domain update replication task", + tag.WorkflowDomainName(task.Info.GetName()), + tag.WorkflowDomainID(task.GetID())) return nil } - return h.domainManager.UpdateDomain(ctx, request) + err = h.domainManager.UpdateDomain(ctx, request) + if err != nil { + h.logger.Error("failed to update domain while handling domain update replication task", + tag.Error(err), + tag.WorkflowDomainName(task.Info.GetName()), + tag.WorkflowDomainID(task.GetID())) + return err + } + afterUpdate, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ + ID: task.GetID(), + }) + if getErr != nil { + return fmt.Errorf("failed to get domain while trying to create domain audit log for update: %w", getErr) + } + + // relying on the fact that for both failovers and the failover of cluster-attibutes + // within the domain, in both instances the failover version will be incremented, indicating + // this is failover type update. + if intendedDomainState.FailoverVersion < afterUpdate.FailoverVersion { + h.createDomainAuditLog( + ctx, + task, + persistence.DomainAuditOperationTypeFailover, + originalDomainState, + afterUpdate, + ) + return nil + } + h.createDomainAuditLog( + ctx, + task, + persistence.DomainAuditOperationTypeUpdate, + originalDomainState, + afterUpdate, + ) + return nil } // handleDomainDeleteReplicationTask handles the domain delete replication task @@ -300,6 +361,11 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainDeleteReplicationTask(ct Name: task.Info.GetName(), } + // ignoring error since this might be already deleted + getDomainResp, _ := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ + Name: task.Info.GetName(), + }) + err := h.domainManager.DeleteDomainByName(ctx, request) if err != nil { _, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ @@ -310,8 +376,17 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainDeleteReplicationTask(ct if errors.As(err, &entityNotExistsError) { return nil } + return err } - return err + + h.createDomainAuditLog( + ctx, + task, + persistence.DomainAuditOperationTypeDelete, + getDomainResp, + nil, + ) + return nil } func (h *domainReplicationTaskExecutorImpl) validateDomainReplicationTask(task *types.DomainTaskAttributes) error { @@ -357,3 +432,37 @@ func (h *domainReplicationTaskExecutorImpl) convertDomainStatusFromThrift(input return 0, ErrInvalidDomainStatus } } + +func (h *domainReplicationTaskExecutorImpl) createDomainAuditLog( + ctx context.Context, + task *types.DomainTaskAttributes, + operationType persistence.DomainAuditOperationType, + stateBefore *persistence.GetDomainResponse, + stateAfter *persistence.GetDomainResponse, +) { + if !h.enableDomainAuditLogging() { + return + } + eventID, err := guuid.NewV7() + if err != nil { + h.logger.Error("failed to generate event ID while creating domain audit log", tag.Error(err)) + return + } + creationTime := time.Unix(eventID.Time().UnixTime()) + _, err = h.domainAuditManager.CreateDomainAuditLog(ctx, &persistence.CreateDomainAuditLogRequest{ + EventID: eventID.String(), + DomainID: task.GetID(), + OperationType: operationType, + CreatedTime: creationTime, + StateBefore: stateBefore, + StateAfter: stateAfter, + Identity: "replication task executor", + Comment: fmt.Sprintf("replicated domain operation %s for domain %s", operationType.String(), task.Info.GetName()), + }) + if err != nil { + h.logger.Error("failed to create domain audit log while creating domain audit log", + tag.Error(err), + tag.WorkflowDomainName(task.Info.GetName()), + tag.WorkflowDomainID(task.GetID())) + } +} diff --git a/common/domain/replicationTaskExecutor_integration_test.go b/common/domain/replicationTaskExecutor_integration_test.go index 5b408884b04..db8d147ddf9 100644 --- a/common/domain/replicationTaskExecutor_integration_test.go +++ b/common/domain/replicationTaskExecutor_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/constants" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/persistence" persistencetests "github.com/uber/cadence/common/persistence/persistence-tests" "github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite" @@ -63,10 +64,20 @@ func (s *domainReplicationTaskExecutorSuite) setupTestBase(t *testing.T) { func (s *domainReplicationTaskExecutorSuite) SetupTest() { s.setupTestBase(s.T()) + domainAuditManager, err := s.ExecutionMgrFactory.NewDomainAuditManager() + if err != nil { + s.T().Fatalf("Failed to create domain audit manager: %v", err) + } + + // Disable audit logging for integration tests as SQLite doesn't fully support the audit manager + enableAuditLogging := func(...dynamicproperties.FilterOption) bool { return false } + s.domainReplicator = NewReplicationTaskExecutor( s.DomainManager, + domainAuditManager, clock.NewRealTimeSource(), s.Logger, + enableAuditLogging, ).(*domainReplicationTaskExecutorImpl) } diff --git a/common/domain/replicationTaskExecutor_test.go b/common/domain/replicationTaskExecutor_test.go index 314ee2843c3..100024afdff 100644 --- a/common/domain/replicationTaskExecutor_test.go +++ b/common/domain/replicationTaskExecutor_test.go @@ -32,6 +32,7 @@ import ( "go.uber.org/mock/gomock" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence" @@ -41,14 +42,14 @@ import ( func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { tests := []struct { name string - setupMock func(mockDomainManager persistence.MockDomainManager) + setupMock func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) task *types.DomainTaskAttributes wantErr bool errType interface{} }{ { name: "Validate Domain Task - Empty Task", - setupMock: func(mockDomainManager persistence.MockDomainManager) { + setupMock: func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) { // No setup required as the task itself is nil, triggering the validation error }, task: nil, @@ -57,11 +58,28 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { }, { name: "Handle Create Domain Task - Valid", - setupMock: func(mockDomainManager persistence.MockDomainManager) { + setupMock: func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(&persistence.CreateDomainResponse{ID: "validDomainID"}, nil). Times(1) + + // Expect GetDomain for audit log + mockDomainManager.EXPECT(). + GetDomain(gomock.Any(), &persistence.GetDomainRequest{ID: "validDomainID"}). + Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ + ID: "validDomainID", + Name: "validDomain", + }, + }, nil). + Times(1) + + // Expect audit log creation + mockAuditManager.EXPECT(). + CreateDomainAuditLog(gomock.Any(), gomock.Any()). + Return(&persistence.CreateDomainAuditLogResponse{}, nil). + Times(1) }, task: &types.DomainTaskAttributes{ DomainOperation: types.DomainOperationCreate.Ptr(), @@ -109,7 +127,7 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { }, { name: "Handle Create Domain Task - Name UUID Collision", - setupMock: func(mockDomainManager persistence.MockDomainManager) { + setupMock: func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) { // call to GetDomain simulates a name collision by returning a different domain ID mockDomainManager.EXPECT(). GetDomain(gomock.Any(), &persistence.GetDomainRequest{Name: "collisionDomain"}). @@ -155,7 +173,7 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { }, { name: "Handle Update Domain Task - Valid Update", - setupMock: func(mockDomainManager persistence.MockDomainManager) { + setupMock: func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetMetadata(gomock.Any()). Return(&persistence.GetMetadataResponse{NotificationVersion: 123}, nil). @@ -163,17 +181,36 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { // Mock GetDomain to simulate domain fetch before update mockDomainManager.EXPECT(). - GetDomain(gomock.Any(), gomock.Any()). + GetDomain(gomock.Any(), &persistence.GetDomainRequest{Name: "existingDomainName"}). Return(&persistence.GetDomainResponse{ Info: &persistence.DomainInfo{ID: "existingDomainID", Name: "existingDomainName"}, Config: &persistence.DomainConfig{}, ReplicationConfig: &persistence.DomainReplicationConfig{}, - }, nil).AnyTimes() + ConfigVersion: 1, + FailoverVersion: 50, + }, nil).Times(1) // Mock UpdateDomain to simulate a successful domain update mockDomainManager.EXPECT(). UpdateDomain(gomock.Any(), gomock.Any()). Return(nil).Times(1) + + // Mock GetDomain after update for audit log + mockDomainManager.EXPECT(). + GetDomain(gomock.Any(), &persistence.GetDomainRequest{ID: "existingDomainID"}). + Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: "existingDomainID", Name: "existingDomainName"}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + ConfigVersion: 2, + FailoverVersion: 100, + }, nil).Times(1) + + // Expect audit log creation + mockAuditManager.EXPECT(). + CreateDomainAuditLog(gomock.Any(), gomock.Any()). + Return(&persistence.CreateDomainAuditLogResponse{}, nil). + Times(1) }, task: &types.DomainTaskAttributes{ DomainOperation: types.DomainOperationUpdate.Ptr(), @@ -211,7 +248,7 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { }, { name: "Handle Invalid Domain Operation", - setupMock: func(mockDomainManager persistence.MockDomainManager) { + setupMock: func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) { // No mock setup is required as the operation should not proceed to any database calls }, task: &types.DomainTaskAttributes{ @@ -233,7 +270,7 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { }, { name: "Handle Unsupported Domain Operation", - setupMock: func(mockDomainManager persistence.MockDomainManager) { + setupMock: func(mockDomainManager persistence.MockDomainManager, mockAuditManager persistence.MockDomainAuditManager) { // No mock setup is needed as the operation should immediately return an error }, task: &types.DomainTaskAttributes{ @@ -268,11 +305,13 @@ func TestDomainReplicationTaskExecutor_Execute(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) mockDomainManager := persistence.NewMockDomainManager(ctrl) + mockDomainAuditManager := persistence.NewMockDomainAuditManager(ctrl) mockTimeSource := clock.NewRealTimeSource() mockLogger := log.NewNoop() + enableAuditLogging := func(...dynamicproperties.FilterOption) bool { return true } - executor := NewReplicationTaskExecutor(mockDomainManager, mockTimeSource, mockLogger).(*domainReplicationTaskExecutorImpl) - tt.setupMock(*mockDomainManager) + executor := NewReplicationTaskExecutor(mockDomainManager, mockDomainAuditManager, mockTimeSource, mockLogger, enableAuditLogging).(*domainReplicationTaskExecutorImpl) + tt.setupMock(*mockDomainManager, *mockDomainAuditManager) err := executor.Execute(tt.task) if tt.wantErr { require.Error(t, err) @@ -324,23 +363,38 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { tests := []struct { name string task *types.DomainTaskAttributes - setup func(mockDomainManager *persistence.MockDomainManager) + setup func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) wantError bool }{ { name: "Successful Domain Creation", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(&persistence.CreateDomainResponse{ID: "testDomainID"}, nil) + + // Expect GetDomain for audit log + mockDomainManager.EXPECT(). + GetDomain(gomock.Any(), &persistence.GetDomainRequest{ID: "testDomainID"}). + Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ + ID: "testDomainID", + Name: "testDomain", + }, + }, nil) + + // Expect audit log creation + mockAuditManager.EXPECT(). + CreateDomainAuditLog(gomock.Any(), gomock.Any()). + Return(&persistence.CreateDomainAuditLogResponse{}, nil) }, wantError: false, }, { name: "Generic Error During Domain Creation", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, types.InternalServiceError{Message: "an internal error"}). @@ -357,7 +411,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { { name: "Handle Name/UUID Collision - EntityNotExistsError", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, ErrNameUUIDCollision).Times(1) @@ -369,7 +423,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { }, { name: "Immediate Error Return from CreateDomain", - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, types.InternalServiceError{Message: "internal error"}). @@ -392,7 +446,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { // Status is intentionally left as nil to trigger the error }, }, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { // No need to set up a mock for CreateDomain as the call should not reach this point }, wantError: true, @@ -407,7 +461,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { Status: types.DomainStatus(999).Ptr(), // Assuming 999 is an unrecognized status }, }, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { // As before, no need for mock setup for CreateDomain }, wantError: true, @@ -415,7 +469,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { { name: "Unexpected Error Type from GetDomain Leads to Default Error Handling", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, ErrInvalidDomainStatus).Times(1) @@ -429,7 +483,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { { name: "Successful GetDomain with Name/UUID Mismatch", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, ErrNameUUIDCollision).AnyTimes() @@ -445,7 +499,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { { name: "Handle Domain Creation with Unhandled Error", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetDomain(gomock.Any(), gomock.Any()). Return(nil, &types.EntityNotExistsError{}). @@ -461,7 +515,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { { name: "Handle Domain Creation - Unexpected Error from GetDomain", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, errors.New("test error")).Times(1) @@ -479,7 +533,7 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { { name: "Duplicate Domain Creation With Same ID and Name", task: domainCreationTask(), - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). CreateDomain(gomock.Any(), gomock.Any()). Return(nil, ErrNameUUIDCollision).Times(1) @@ -501,16 +555,20 @@ func TestHandleDomainCreationReplicationTask(t *testing.T) { ctrl := gomock.NewController(t) mockDomainManager := persistence.NewMockDomainManager(ctrl) + mockDomainAuditManager := persistence.NewMockDomainAuditManager(ctrl) mockLogger := testlogger.New(t) mockTimeSource := clock.NewMockedTimeSourceAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) // Fixed time + enableAuditLogging := func(...dynamicproperties.FilterOption) bool { return true } executor := &domainReplicationTaskExecutorImpl{ - domainManager: mockDomainManager, - logger: mockLogger, - timeSource: mockTimeSource, + domainManager: mockDomainManager, + domainAuditManager: mockDomainAuditManager, + logger: mockLogger, + timeSource: mockTimeSource, + enableDomainAuditLogging: enableAuditLogging, } - tt.setup(mockDomainManager) + tt.setup(mockDomainManager, mockDomainAuditManager) err := executor.handleDomainCreationReplicationTask(context.Background(), tt.task) if tt.wantError { assert.Error(t, err) @@ -526,7 +584,7 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { name string task *types.DomainTaskAttributes wantErr bool - setup func(mockDomainManager *persistence.MockDomainManager) + setup func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) }{ { name: "Convert Status Error", @@ -536,13 +594,13 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { }, }, wantErr: true, - setup: func(dm *persistence.MockDomainManager) {}, + setup: func(dm *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) {}, }, { name: "Error Fetching Metadata", task: domainCreationTask(), wantErr: true, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetMetadata(gomock.Any()). Return(nil, errors.New("Error getting metadata while handling replication task")). @@ -557,7 +615,7 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { }, }, wantErr: true, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetDomain(gomock.Any(), gomock.Any()). Return(nil, errors.New("general error")).AnyTimes() @@ -576,7 +634,7 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { }, }, wantErr: true, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetDomain(gomock.Any(), &persistence.GetDomainRequest{ Name: "nonexistentDomain", @@ -620,7 +678,7 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { }, }, wantErr: false, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetMetadata(gomock.Any()). Return(&persistence.GetMetadataResponse{NotificationVersion: 1}, nil).AnyTimes() @@ -654,7 +712,7 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { }, }, wantErr: true, - setup: func(mockDomainManager *persistence.MockDomainManager) { + setup: func(mockDomainManager *persistence.MockDomainManager, mockAuditManager *persistence.MockDomainAuditManager) { mockDomainManager.EXPECT(). GetMetadata(gomock.Any()). Return(&persistence.GetMetadataResponse{NotificationVersion: 1}, nil). @@ -678,15 +736,19 @@ func TestHandleDomainUpdateReplicationTask(t *testing.T) { mockCtrl := gomock.NewController(t) mockDomainManager := persistence.NewMockDomainManager(mockCtrl) + mockDomainAuditManager := persistence.NewMockDomainAuditManager(mockCtrl) mockLogger := testlogger.New(t) mockTimeSource := clock.NewMockedTimeSourceAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) // Fixed time + enableAuditLogging := func(...dynamicproperties.FilterOption) bool { return true } executor := &domainReplicationTaskExecutorImpl{ - domainManager: mockDomainManager, - logger: mockLogger, - timeSource: mockTimeSource, + domainManager: mockDomainManager, + domainAuditManager: mockDomainAuditManager, + logger: mockLogger, + timeSource: mockTimeSource, + enableDomainAuditLogging: enableAuditLogging, } - tt.setup(mockDomainManager) + tt.setup(mockDomainManager, mockDomainAuditManager) err := executor.handleDomainUpdateReplicationTask(context.Background(), tt.task) if tt.wantErr { diff --git a/common/dynamicconfig/dynamicproperties/constants.go b/common/dynamicconfig/dynamicproperties/constants.go index a621b6ccee0..319170421dd 100644 --- a/common/dynamicconfig/dynamicproperties/constants.go +++ b/common/dynamicconfig/dynamicproperties/constants.go @@ -1624,6 +1624,12 @@ const ( // Default value: false // Allowed filters: N/A EnableConnectionRetainingDirectChooser + // EnableDomainAuditLogging enables audit logging for a domain to the domain audit log table + // KeyName: system.enableDomainAuditLogging + // Value type: Bool + // Default value: false + // Allowed filters: N/A + EnableDomainAuditLogging // key for frontend @@ -1652,13 +1658,6 @@ const ( // Allowed filters: N/A EnableQueryAttributeValidation - // EnableDomainAuditLogging enables audit logging for a domain to the domain audit log table - // Keyname: frontend.enableDomainAuditLogging - // Value type: Bool - // Default value: false - // Allowed filters: N/A - FrontendEnableDomainAuditLogging - // key for matching // MatchingEnableSyncMatch is to enable sync match @@ -4183,6 +4182,11 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableGracefulFailover is whether enabling graceful failover", DefaultValue: true, }, + EnableDomainAuditLogging: { + KeyName: "system.enableDomainAuditLogging", + Description: "EnableDomainAuditLogging enables audit logging for a domain to the domain audit log table", + DefaultValue: false, + }, DisallowQuery: { KeyName: "system.disallowQuery", Filters: []Filter{DomainName}, @@ -4231,11 +4235,6 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableQueryAttributeValidation enables validation of queries' search attributes against the dynamic config whitelist", DefaultValue: true, }, - FrontendEnableDomainAuditLogging: { - KeyName: "frontend.enableDomainAuditLogging", - Description: "FrontendEnableDomainAuditLogging enables audit logging for a domain to the domain audit log table", - DefaultValue: false, - }, MatchingEnableSyncMatch: { KeyName: "matching.enableSyncMatch", Filters: []Filter{DomainName, TaskListName, TaskType}, diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index d25c67acd26..fca9f731fd7 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -22,7 +22,7 @@ history.EnableConsistentQueryByDomain: history.useNewInitialFailoverVersion: - value: true constraints: {} -frontend.enableDomainAuditLogging: +system.enableDomainAuditLogging: - value: true constraints: {} history.enableStrongIdempotency: diff --git a/host/testcluster.go b/host/testcluster.go index 95f76562af8..52b9254a814 100644 --- a/host/testcluster.go +++ b/host/testcluster.go @@ -185,7 +185,7 @@ func NewCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, par MatchingConfig: options.MatchingConfig, WorkerConfig: options.WorkerConfig, MockAdminClient: options.MockAdminClient, - DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, clock.NewRealTimeSource(), logger), + DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, newNoopDomainAuditManager(), clock.NewRealTimeSource(), logger, dynamicproperties.GetBoolPropertyFn(false)), AuthorizationConfig: aConfig, AsyncWFQueues: options.AsyncWFQueues, TimeSource: options.TimeSource, @@ -266,7 +266,7 @@ func NewPinotTestCluster(t *testing.T, options *TestClusterConfig, logger log.Lo MatchingConfig: options.MatchingConfig, WorkerConfig: options.WorkerConfig, MockAdminClient: options.MockAdminClient, - DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, clock.NewRealTimeSource(), logger), + DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, newNoopDomainAuditManager(), clock.NewRealTimeSource(), logger, dynamicproperties.GetBoolPropertyFn(false)), AuthorizationConfig: aConfig, PinotConfig: options.PinotConfig, PinotClient: pinotClient, @@ -354,6 +354,26 @@ func NewPersistenceTestCluster(t *testing.T, clusterConfig *TestClusterConfig) t return testCluster } +func newNoopDomainAuditManager() persistence.DomainAuditManager { + return &noopDomainAuditManager{} +} + +type noopDomainAuditManager struct{} + +func (n *noopDomainAuditManager) GetName() string { + return "noop" +} + +func (n *noopDomainAuditManager) Close() {} + +func (n *noopDomainAuditManager) CreateDomainAuditLog(ctx context.Context, request *persistence.CreateDomainAuditLogRequest) (*persistence.CreateDomainAuditLogResponse, error) { + return &persistence.CreateDomainAuditLogResponse{}, nil +} + +func (n *noopDomainAuditManager) GetDomainAuditLogs(ctx context.Context, request *persistence.GetDomainAuditLogsRequest) (*persistence.GetDomainAuditLogsResponse, error) { + return &persistence.GetDomainAuditLogsResponse{}, nil +} + func setupShards(testBase *persistencetests.TestBase, numHistoryShards int, logger log.Logger) { // shard 0 is always created, we create additional shards if needed for shardID := 1; shardID < numHistoryShards; shardID++ { diff --git a/service/frontend/admin/handler.go b/service/frontend/admin/handler.go index bda692db8bd..6eee2a9590d 100644 --- a/service/frontend/admin/handler.go +++ b/service/frontend/admin/handler.go @@ -118,8 +118,10 @@ func NewHandler( domainReplicationTaskExecutor := domain.NewReplicationTaskExecutor( resource.GetDomainManager(), + resource.GetDomainAuditManager(), resource.GetTimeSource(), resource.GetLogger(), + dynamicproperties.GetBoolPropertyFn(false), // audit operations are not needed for DLQ operations ) return &adminHandlerImpl{ diff --git a/service/frontend/config/config.go b/service/frontend/config/config.go index 728425e9e0a..f0d8ff65423 100644 --- a/service/frontend/config/config.go +++ b/service/frontend/config/config.go @@ -189,7 +189,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVis EmitSignalNameMetricsTag: dc.GetBoolPropertyFilteredByDomain(dynamicproperties.FrontendEmitSignalNameMetricsTag), Lockdown: dc.GetBoolPropertyFilteredByDomain(dynamicproperties.Lockdown), EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicproperties.EnableTasklistIsolation), - EnableDomainAuditLogging: dc.GetBoolProperty(dynamicproperties.FrontendEnableDomainAuditLogging), + EnableDomainAuditLogging: dc.GetBoolProperty(dynamicproperties.EnableDomainAuditLogging), DomainConfig: domain.Config{ MaxBadBinaryCount: dc.GetIntPropertyFilteredByDomain(dynamicproperties.FrontendMaxBadBinaries), MinRetentionDays: dc.GetIntProperty(dynamicproperties.MinRetentionDays), @@ -197,7 +197,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVis FailoverCoolDown: dc.GetDurationPropertyFilteredByDomain(dynamicproperties.FrontendFailoverCoolDown), RequiredDomainDataKeys: dc.GetMapProperty(dynamicproperties.RequiredDomainDataKeys), FailoverHistoryMaxSize: dc.GetIntPropertyFilteredByDomain(dynamicproperties.FrontendFailoverHistoryMaxSize), - EnableDomainAuditLogging: dc.GetBoolProperty(dynamicproperties.FrontendEnableDomainAuditLogging), + EnableDomainAuditLogging: dc.GetBoolProperty(dynamicproperties.EnableDomainAuditLogging), }, HostName: hostName, } diff --git a/service/frontend/config/config_test.go b/service/frontend/config/config_test.go index 0b325359e19..e6bb87ce0c5 100644 --- a/service/frontend/config/config_test.go +++ b/service/frontend/config/config_test.go @@ -108,7 +108,7 @@ func TestNewConfig(t *testing.T) { "GlobalRatelimiterKeyMode": {dynamicproperties.FrontendGlobalRatelimiterMode, "disabled"}, "GlobalRatelimiterUpdateInterval": {dynamicproperties.GlobalRatelimiterUpdateInterval, 3 * time.Second}, "PinotOptimizedQueryColumns": {dynamicproperties.PinotOptimizedQueryColumns, map[string]interface{}{"foo": "bar"}}, - "EnableDomainAuditLogging": {dynamicproperties.FrontendEnableDomainAuditLogging, true}, + "EnableDomainAuditLogging": {dynamicproperties.EnableDomainAuditLogging, true}, } domainFields := map[string]configTestCase{ "MaxBadBinaryCount": {dynamicproperties.FrontendMaxBadBinaries, 40}, @@ -117,7 +117,7 @@ func TestNewConfig(t *testing.T) { "FailoverCoolDown": {dynamicproperties.FrontendFailoverCoolDown, time.Duration(43)}, "RequiredDomainDataKeys": {dynamicproperties.RequiredDomainDataKeys, map[string]interface{}{"bar": "baz"}}, "FailoverHistoryMaxSize": {dynamicproperties.FrontendFailoverHistoryMaxSize, 44}, - "EnableDomainAuditLogging": {dynamicproperties.FrontendEnableDomainAuditLogging, true}, + "EnableDomainAuditLogging": {dynamicproperties.EnableDomainAuditLogging, true}, } client := dynamicconfig.NewInMemoryClient() logger := testlogger.New(t) diff --git a/service/worker/service.go b/service/worker/service.go index 0a443e463c8..cbc352f78d5 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -90,6 +90,7 @@ type ( DomainReplicationMaxRetryDuration dynamicproperties.DurationPropertyFn EnableESAnalyzer dynamicproperties.BoolPropertyFn EnableAsyncWorkflowConsumption dynamicproperties.BoolPropertyFn + EnableDomainAuditLogging dynamicproperties.BoolPropertyFn HostName string } ) @@ -189,6 +190,7 @@ func NewConfig(params *resource.Params) *Config { PersistenceMaxQPS: dc.GetIntProperty(dynamicproperties.WorkerPersistenceMaxQPS), DomainReplicationMaxRetryDuration: dc.GetDurationProperty(dynamicproperties.WorkerReplicationTaskMaxRetryDuration), EnableAsyncWorkflowConsumption: dc.GetBoolProperty(dynamicproperties.EnableAsyncWorkflowConsumption), + EnableDomainAuditLogging: dc.GetBoolProperty(dynamicproperties.EnableDomainAuditLogging), HostName: params.HostName, } advancedVisWritingMode := dc.GetStringProperty( @@ -376,8 +378,10 @@ func (s *Service) startDiagnostics() { func (s *Service) startReplicator() { domainReplicationTaskExecutor := domain.NewReplicationTaskExecutor( s.Resource.GetDomainManager(), + s.Resource.GetDomainAuditManager(), s.Resource.GetTimeSource(), s.Resource.GetLogger(), + s.config.EnableDomainAuditLogging, ) msgReplicator := replicator.NewReplicator( s.GetClusterMetadata(),