Skip to content

Commit 5c95a18

Browse files
Adds missing part of the domain failover history
1 parent 0589e03 commit 5c95a18

File tree

10 files changed

+249
-82
lines changed

10 files changed

+249
-82
lines changed

common/domain/replicationTaskExecutor.go

Lines changed: 136 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ package domain
2525
import (
2626
"context"
2727
"errors"
28+
"fmt"
2829
"time"
2930

31+
guuid "github.com/google/uuid"
32+
3033
"github.com/uber/cadence/common/clock"
34+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3135
"github.com/uber/cadence/common/log"
3236
"github.com/uber/cadence/common/log/tag"
3337
"github.com/uber/cadence/common/persistence"
@@ -66,23 +70,28 @@ type (
6670
}
6771

6872
domainReplicationTaskExecutorImpl struct {
69-
domainManager persistence.DomainManager
70-
timeSource clock.TimeSource
71-
logger log.Logger
73+
domainManager persistence.DomainManager
74+
timeSource clock.TimeSource
75+
logger log.Logger
76+
domainAuditManager persistence.DomainAuditManager
77+
enableDomainAuditLogging dynamicproperties.BoolPropertyFn
7278
}
7379
)
7480

7581
// NewReplicationTaskExecutor create a new instance of domain replicator
7682
func NewReplicationTaskExecutor(
7783
domainManager persistence.DomainManager,
84+
domainAuditManager persistence.DomainAuditManager,
7885
timeSource clock.TimeSource,
7986
logger log.Logger,
87+
enableDomainAuditLogging dynamicproperties.BoolPropertyFn,
8088
) ReplicationTaskExecutor {
81-
8289
return &domainReplicationTaskExecutorImpl{
83-
domainManager: domainManager,
84-
timeSource: timeSource,
85-
logger: logger,
90+
domainManager: domainManager,
91+
timeSource: timeSource,
92+
logger: logger,
93+
domainAuditManager: domainAuditManager,
94+
enableDomainAuditLogging: enableDomainAuditLogging,
8695
}
8796
}
8897

@@ -189,7 +198,21 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainCreationReplicationTask(
189198
return err
190199
}
191200

192-
return err
201+
resp, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
202+
ID: task.GetID(),
203+
})
204+
if getErr != nil {
205+
return fmt.Errorf("failed to get domain while trying to create domain audit log: %w", getErr)
206+
}
207+
h.createDomainAuditLog(
208+
ctx,
209+
task,
210+
persistence.DomainAuditOperationTypeCreate,
211+
nil,
212+
resp,
213+
)
214+
215+
return nil
193216
}
194217

195218
// handleDomainUpdateReplicationTask handles the domain update replication task
@@ -210,9 +233,11 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct
210233

211234
// plus, we need to check whether the config version is <= the config version set in the input
212235
// plus, we need to check whether the failover version is <= the failover version set in the input
213-
existingDomain, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
236+
originalDomainState, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
214237
Name: task.Info.GetName(),
215238
})
239+
intendedDomainState := originalDomainState.DeepCopy()
240+
216241
if err != nil {
217242
if _, ok := err.(*types.EntityNotExistsError); ok {
218243
// this can happen if the create domain replication task is to processed.
@@ -225,18 +250,18 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct
225250

226251
recordUpdated := false
227252
request := &persistence.UpdateDomainRequest{
228-
Info: existingDomain.Info,
229-
Config: existingDomain.Config,
230-
ReplicationConfig: existingDomain.ReplicationConfig,
231-
ConfigVersion: existingDomain.ConfigVersion,
232-
FailoverVersion: existingDomain.FailoverVersion,
233-
FailoverNotificationVersion: existingDomain.FailoverNotificationVersion,
234-
PreviousFailoverVersion: existingDomain.PreviousFailoverVersion,
253+
Info: intendedDomainState.Info,
254+
Config: intendedDomainState.Config,
255+
ReplicationConfig: intendedDomainState.ReplicationConfig,
256+
ConfigVersion: intendedDomainState.ConfigVersion,
257+
FailoverVersion: intendedDomainState.FailoverVersion,
258+
FailoverNotificationVersion: intendedDomainState.FailoverNotificationVersion,
259+
PreviousFailoverVersion: intendedDomainState.PreviousFailoverVersion,
235260
NotificationVersion: notificationVersion,
236261
LastUpdatedTime: h.timeSource.Now().UnixNano(),
237262
}
238263

239-
if existingDomain.ConfigVersion < task.GetConfigVersion() {
264+
if intendedDomainState.ConfigVersion < task.GetConfigVersion() {
240265
recordUpdated = true
241266
request.Info = &persistence.DomainInfo{
242267
ID: task.GetID(),
@@ -263,35 +288,71 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainUpdateReplicationTask(ct
263288
request.ConfigVersion = task.GetConfigVersion()
264289
}
265290

266-
// todo (david.porter) reason through if this is compatible with the proposed merge strategy
267-
// in active/active domains.
268-
if existingDomain.FailoverVersion < task.GetFailoverVersion() {
291+
if originalDomainState.FailoverVersion < task.GetFailoverVersion() {
269292
recordUpdated = true
270293
request.ReplicationConfig.ActiveClusterName = task.ReplicationConfig.GetActiveClusterName()
271294
request.ReplicationConfig.ActiveClusters = task.ReplicationConfig.GetActiveClusters()
272295
request.FailoverVersion = task.GetFailoverVersion()
273296
request.FailoverNotificationVersion = notificationVersion
274297
request.PreviousFailoverVersion = task.GetPreviousFailoverVersion()
275-
} else if !existingDomain.ReplicationConfig.IsActiveActive() {
298+
} else if !originalDomainState.ReplicationConfig.IsActiveActive() {
276299
h.logger.Warn("the existing failover version was more recent, indicating that the domain replication message was out of date and is consequently being dropped",
277-
tag.WorkflowDomainName(existingDomain.Info.Name),
278-
tag.FailoverVersion(existingDomain.FailoverVersion),
300+
tag.WorkflowDomainName(originalDomainState.Info.Name),
301+
tag.FailoverVersion(originalDomainState.FailoverVersion),
279302
tag.FailoverVersion(task.GetFailoverVersion()))
280303
}
281304

282-
if existingDomain.ReplicationConfig.IsActiveActive() || task.ReplicationConfig.IsActiveActive() {
283-
mergedActiveClusters, aaChanged := mergeActiveActiveScopes(existingDomain.ReplicationConfig.ActiveClusters, task.ReplicationConfig.ActiveClusters)
305+
if intendedDomainState.ReplicationConfig.IsActiveActive() || task.ReplicationConfig.IsActiveActive() {
306+
mergedActiveClusters, aaChanged := mergeActiveActiveScopes(intendedDomainState.ReplicationConfig.ActiveClusters, task.ReplicationConfig.ActiveClusters)
284307
if aaChanged {
285308
recordUpdated = true
286309
request.ReplicationConfig.ActiveClusters = mergedActiveClusters
287310
}
288311
}
289312

290313
if !recordUpdated {
314+
h.logger.Warn("no record updated while handling domain update replication task",
315+
tag.WorkflowDomainName(task.Info.GetName()),
316+
tag.WorkflowDomainID(task.GetID()))
291317
return nil
292318
}
293319

294-
return h.domainManager.UpdateDomain(ctx, request)
320+
err = h.domainManager.UpdateDomain(ctx, request)
321+
if err != nil {
322+
h.logger.Error("failed to update domain while handling domain update replication task",
323+
tag.Error(err),
324+
tag.WorkflowDomainName(task.Info.GetName()),
325+
tag.WorkflowDomainID(task.GetID()))
326+
return err
327+
}
328+
afterUpdate, getErr := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
329+
ID: task.GetID(),
330+
})
331+
if getErr != nil {
332+
return fmt.Errorf("failed to get domain while trying to create domain audit log for update: %w", getErr)
333+
}
334+
335+
// relying on the fact that for both failovers and the failover of cluster-attibutes
336+
// within the domain, in both instances the failover version will be incremented, indicating
337+
// this is failover type update.
338+
if intendedDomainState.FailoverVersion < afterUpdate.FailoverVersion {
339+
h.createDomainAuditLog(
340+
ctx,
341+
task,
342+
persistence.DomainAuditOperationTypeFailover,
343+
originalDomainState,
344+
afterUpdate,
345+
)
346+
return nil
347+
}
348+
h.createDomainAuditLog(
349+
ctx,
350+
task,
351+
persistence.DomainAuditOperationTypeUpdate,
352+
originalDomainState,
353+
afterUpdate,
354+
)
355+
return nil
295356
}
296357

297358
// handleDomainDeleteReplicationTask handles the domain delete replication task
@@ -300,6 +361,11 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainDeleteReplicationTask(ct
300361
Name: task.Info.GetName(),
301362
}
302363

364+
// ignoring error since this might be already deleted
365+
getDomainResp, _ := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
366+
Name: task.Info.GetName(),
367+
})
368+
303369
err := h.domainManager.DeleteDomainByName(ctx, request)
304370
if err != nil {
305371
_, err := h.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{
@@ -310,8 +376,17 @@ func (h *domainReplicationTaskExecutorImpl) handleDomainDeleteReplicationTask(ct
310376
if errors.As(err, &entityNotExistsError) {
311377
return nil
312378
}
379+
return err
313380
}
314-
return err
381+
382+
h.createDomainAuditLog(
383+
ctx,
384+
task,
385+
persistence.DomainAuditOperationTypeDelete,
386+
getDomainResp,
387+
nil,
388+
)
389+
return nil
315390
}
316391

317392
func (h *domainReplicationTaskExecutorImpl) validateDomainReplicationTask(task *types.DomainTaskAttributes) error {
@@ -357,3 +432,37 @@ func (h *domainReplicationTaskExecutorImpl) convertDomainStatusFromThrift(input
357432
return 0, ErrInvalidDomainStatus
358433
}
359434
}
435+
436+
func (h *domainReplicationTaskExecutorImpl) createDomainAuditLog(
437+
ctx context.Context,
438+
task *types.DomainTaskAttributes,
439+
operationType persistence.DomainAuditOperationType,
440+
stateBefore *persistence.GetDomainResponse,
441+
stateAfter *persistence.GetDomainResponse,
442+
) {
443+
if !h.enableDomainAuditLogging() {
444+
return
445+
}
446+
eventID, err := guuid.NewV7()
447+
if err != nil {
448+
h.logger.Error("failed to generate event ID while creating domain audit log", tag.Error(err))
449+
return
450+
}
451+
creationTime := time.Unix(eventID.Time().UnixTime())
452+
_, err = h.domainAuditManager.CreateDomainAuditLog(ctx, &persistence.CreateDomainAuditLogRequest{
453+
EventID: eventID.String(),
454+
DomainID: task.GetID(),
455+
OperationType: operationType,
456+
CreatedTime: creationTime,
457+
StateBefore: stateBefore,
458+
StateAfter: stateAfter,
459+
Identity: "replication task executor",
460+
Comment: fmt.Sprintf("replicated domain operation %s for domain %s", operationType.String(), task.Info.GetName()),
461+
})
462+
if err != nil {
463+
h.logger.Error("failed to create domain audit log while creating domain audit log",
464+
tag.Error(err),
465+
tag.WorkflowDomainName(task.Info.GetName()),
466+
tag.WorkflowDomainID(task.GetID()))
467+
}
468+
}

common/domain/replicationTaskExecutor_integration_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
"github.com/uber/cadence/common/clock"
3333
"github.com/uber/cadence/common/constants"
34+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3435
"github.com/uber/cadence/common/persistence"
3536
persistencetests "github.com/uber/cadence/common/persistence/persistence-tests"
3637
"github.com/uber/cadence/common/persistence/sql/sqlplugin/sqlite"
@@ -63,10 +64,20 @@ func (s *domainReplicationTaskExecutorSuite) setupTestBase(t *testing.T) {
6364
func (s *domainReplicationTaskExecutorSuite) SetupTest() {
6465
s.setupTestBase(s.T())
6566

67+
domainAuditManager, err := s.ExecutionMgrFactory.NewDomainAuditManager()
68+
if err != nil {
69+
s.T().Fatalf("Failed to create domain audit manager: %v", err)
70+
}
71+
72+
// Disable audit logging for integration tests as the audit manager may not be fully initialized
73+
enableAuditLogging := func(...dynamicproperties.FilterOption) bool { return false }
74+
6675
s.domainReplicator = NewReplicationTaskExecutor(
6776
s.DomainManager,
77+
domainAuditManager,
6878
clock.NewRealTimeSource(),
6979
s.Logger,
80+
enableAuditLogging,
7081
).(*domainReplicationTaskExecutorImpl)
7182
}
7283

0 commit comments

Comments
 (0)