Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 136 additions & 27 deletions common/domain/replicationTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ package domain
import (
"context"
"errors"
"fmt"
"time"

guuid "github.com/google/uuid"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -66,23 +70,28 @@ type (
}

domainReplicationTaskExecutorImpl struct {
domainManager persistence.DomainManager
timeSource clock.TimeSource
logger log.Logger
domainManager persistence.DomainManager
timeSource clock.TimeSource
logger log.Logger
domainAuditManager persistence.DomainAuditManager
enableDomainAuditLogging dynamicproperties.BoolPropertyFn
}
)

// NewReplicationTaskExecutor create a new instance of domain replicator
func NewReplicationTaskExecutor(
domainManager persistence.DomainManager,
domainAuditManager persistence.DomainAuditManager,
timeSource clock.TimeSource,
logger log.Logger,
enableDomainAuditLogging dynamicproperties.BoolPropertyFn,
) ReplicationTaskExecutor {

return &domainReplicationTaskExecutorImpl{
domainManager: domainManager,
timeSource: timeSource,
logger: logger,
domainManager: domainManager,
timeSource: timeSource,
logger: logger,
domainAuditManager: domainAuditManager,
enableDomainAuditLogging: enableDomainAuditLogging,
}
}

Expand Down Expand Up @@ -189,7 +198,21 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainCreationReplicationTask(
return err
}

return err
resp, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
ID: task.GetID(),
})
if getErr != nil {
return fmt.Errorf("failed to get domain while trying to create domain audit log: %w", getErr)
}
h.createDomainAuditLog(
ctx,
task,
persistence.DomainAuditOperationTypeCreate,
nil,
resp,
)

return nil
}

// handleDomainUpdateReplicationTask handles the domain update replication task
Expand All @@ -210,9 +233,11 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct

// plus, we need to check whether the config version is <= the config version set in the input
// plus, we need to check whether the failover version is <= the failover version set in the input
existingDomain, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this gets mutated due to the substructs getting dereferenced and then edited, so it was necessary to deepcopy even though there's no actual editing of the existingDomain variable

originalDomainState, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
Name: task.Info.GetName(),
})
intendedDomainState := originalDomainState.DeepCopy()

if err != nil {
if _, ok := err.(*types.EntityNotExistsError); ok {
// this can happen if the create domain replication task is to processed.
Expand All @@ -225,18 +250,18 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct

recordUpdated := false
request := &persistence.UpdateDomainRequest{
Info: existingDomain.Info,
Config: existingDomain.Config,
ReplicationConfig: existingDomain.ReplicationConfig,
ConfigVersion: existingDomain.ConfigVersion,
FailoverVersion: existingDomain.FailoverVersion,
FailoverNotificationVersion: existingDomain.FailoverNotificationVersion,
PreviousFailoverVersion: existingDomain.PreviousFailoverVersion,
Info: intendedDomainState.Info,
Config: intendedDomainState.Config,
ReplicationConfig: intendedDomainState.ReplicationConfig,
ConfigVersion: intendedDomainState.ConfigVersion,
FailoverVersion: intendedDomainState.FailoverVersion,
FailoverNotificationVersion: intendedDomainState.FailoverNotificationVersion,
PreviousFailoverVersion: intendedDomainState.PreviousFailoverVersion,
NotificationVersion: notificationVersion,
LastUpdatedTime: h.timeSource.Now().UnixNano(),
}

if existingDomain.ConfigVersion < task.GetConfigVersion() {
if intendedDomainState.ConfigVersion < task.GetConfigVersion() {
recordUpdated = true
request.Info = &persistence.DomainInfo{
ID: task.GetID(),
Expand All @@ -263,35 +288,71 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct
request.ConfigVersion = task.GetConfigVersion()
}

// todo (david.porter) reason through if this is compatible with the proposed merge strategy
// in active/active domains.
if existingDomain.FailoverVersion < task.GetFailoverVersion() {
if originalDomainState.FailoverVersion < task.GetFailoverVersion() {
recordUpdated = true
request.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName()
request.ReplicationConfig.ActiveClusters = task.ReplicationConfig.GetActiveClusters()
request.FailoverVersion = task.GetFailoverVersion()
request.FailoverNotificationVersion = notificationVersion
request.PreviousFailoverVersion = task.GetPreviousFailoverVersion()
} else if !existingDomain.ReplicationConfig.IsActiveActive() {
} else if !originalDomainState.ReplicationConfig.IsActiveActive() {
h.logger.Warn("the existing failover version was more recent, indicating that the domain replication message was out of date and is consequently being dropped",
tag.WorkflowDomainName(existingDomain.Info.Name),
tag.FailoverVersion(existingDomain.FailoverVersion),
tag.WorkflowDomainName(originalDomainState.Info.Name),
tag.FailoverVersion(originalDomainState.FailoverVersion),
tag.FailoverVersion(task.GetFailoverVersion()))
}

if existingDomain.ReplicationConfig.IsActiveActive() || task.ReplicationConfig.IsActiveActive() {
mergedActiveClusters, aaChanged := mergeActiveActiveScopes(existingDomain.ReplicationConfig.ActiveClusters, task.ReplicationConfig.ActiveClusters)
if intendedDomainState.ReplicationConfig.IsActiveActive() || task.ReplicationConfig.IsActiveActive() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the places where we use intendedDomainState and other places where we use originalDomainState

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fair, let me annotate

mergedActiveClusters, aaChanged := mergeActiveActiveScopes(intendedDomainState.ReplicationConfig.ActiveClusters, task.ReplicationConfig.ActiveClusters)
if aaChanged {
recordUpdated = true
request.ReplicationConfig.ActiveClusters = mergedActiveClusters
}
}

if !recordUpdated {
h.logger.Warn("no record updated while handling domain update replication task",
tag.WorkflowDomainName(task.Info.GetName()),
tag.WorkflowDomainID(task.GetID()))
return nil
}

return h.domainManager.UpdateDomain(ctx, request)
err = h.domainManager.UpdateDomain(ctx, request)
if err != nil {
h.logger.Error("failed to update domain while handling domain update replication task",
tag.Error(err),
tag.WorkflowDomainName(task.Info.GetName()),
tag.WorkflowDomainID(task.GetID()))
return err
}
afterUpdate, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
ID: task.GetID(),
})
if getErr != nil {
return fmt.Errorf("failed to get domain while trying to create domain audit log for update: %w", getErr)
}

// relying on the fact that for both failovers and the failover of cluster-attibutes
// within the domain, in both instances the failover version will be incremented, indicating
// this is failover type update.
if intendedDomainState.FailoverVersion < afterUpdate.FailoverVersion {
h.createDomainAuditLog(
ctx,
task,
persistence.DomainAuditOperationTypeFailover,
originalDomainState,
afterUpdate,
)
return nil
}
h.createDomainAuditLog(
ctx,
task,
persistence.DomainAuditOperationTypeUpdate,
originalDomainState,
afterUpdate,
)
return nil
}

// handleDomainDeleteReplicationTask handles the domain delete replication task
Expand All @@ -300,6 +361,11 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainDeleteReplicationTask(ct
Name: task.Info.GetName(),
}

// ignoring error since this might be already deleted
getDomainResp, _ := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
Name: task.Info.GetName(),
})

err := h.domainManager.DeleteDomainByName(ctx, request)
if err != nil {
_, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
Expand All @@ -310,8 +376,17 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainDeleteReplicationTask(ct
if errors.As(err, &entityNotExistsError) {
return nil
}
return err
}
return err

h.createDomainAuditLog(
ctx,
task,
persistence.DomainAuditOperationTypeDelete,
getDomainResp,
nil,
)
return nil
}

func (h *domainReplicationTaskExecutorImpl) validateDomainReplicationTask(task *types.DomainTaskAttributes) error {
Expand Down Expand Up @@ -357,3 +432,37 @@ func (h *domainReplicationTaskExecutorImpl) convertDomainStatusFromThrift(input
return 0, ErrInvalidDomainStatus
}
}

func (h *domainReplicationTaskExecutorImpl) createDomainAuditLog(
ctx context.Context,
task *types.DomainTaskAttributes,
operationType persistence.DomainAuditOperationType,
stateBefore *persistence.GetDomainResponse,
stateAfter *persistence.GetDomainResponse,
) {
if !h.enableDomainAuditLogging() {
return
}
eventID, err := guuid.NewV7()
if err != nil {
h.logger.Error("failed to generate event ID while creating domain audit log", tag.Error(err))
return
}
creationTime := time.Unix(eventID.Time().UnixTime())
_, err = h.domainAuditManager.CreateDomainAuditLog(ctx, &persistence.CreateDomainAuditLogRequest{
EventID: eventID.String(),
DomainID: task.GetID(),
OperationType: operationType,
CreatedTime: creationTime,
StateBefore: stateBefore,
StateAfter: stateAfter,
Identity: "replication task executor",
Comment: fmt.Sprintf("replicated domain operation %s for domain %s", operationType.String(), task.Info.GetName()),
})
if err != nil {
h.logger.Error("failed to create domain audit log while creating domain audit log",
tag.Error(err),
tag.WorkflowDomainName(task.Info.GetName()),
tag.WorkflowDomainID(task.GetID()))
}
}
11 changes: 11 additions & 0 deletions common/domain/replicationTaskExecutor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/constants"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
"github.com/uber/cadence/common/persistence"
persistencetests "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite"
Expand Down Expand Up @@ -63,10 +64,20 @@ func (s *domainReplicationTaskExecutorSuite) setupTestBase(t *testing.T) {
func (s *domainReplicationTaskExecutorSuite) SetupTest() {
s.setupTestBase(s.T())

domainAuditManager, err := s.ExecutionMgrFactory.NewDomainAuditManager()
if err != nil {
s.T().Fatalf("Failed to create domain audit manager: %v", err)
}

// Disable audit logging for integration tests as SQLite doesn't fully support the audit manager
enableAuditLogging := func(...dynamicproperties.FilterOption) bool { return false }

s.domainReplicator = NewReplicationTaskExecutor(
s.DomainManager,
domainAuditManager,
clock.NewRealTimeSource(),
s.Logger,
enableAuditLogging,
).(*domainReplicationTaskExecutorImpl)
}

Expand Down
Loading
Loading