diff --git a/cdb/db.go b/cdb/db.go index 464cd3f..5923665 100644 --- a/cdb/db.go +++ b/cdb/db.go @@ -246,6 +246,12 @@ func (oDb *DB) ExecContext(ctx context.Context, query string, args ...any) (res oDb.Metrics.ExecTxErr.Inc() if !isDeadlock(err) { oDb.Metrics.ExecTxFailed.Inc() + // We must roll back any partial updates + if err1 := tx.Rollback(); err1 != nil { + oDb.Metrics.RollbackErr.Inc() + return res, fmt.Errorf("exec and rollback failed: %w", errors.Join(err, err1)) + } + oDb.Metrics.RollbackOk.Inc() return nil, err } oDb.Metrics.ExecTxDeadlock.Inc() diff --git a/cdb/db_actions.go b/cdb/db_actions.go index 9858df6..38f8803 100644 --- a/cdb/db_actions.go +++ b/cdb/db_actions.go @@ -326,10 +326,10 @@ func (oDb *DB) UpdateSvcAction(ctx context.Context, a SvcAction) error { return nil } -// FindInstanceActionIDFromSID finds the instance action ID from the sid. -func (oDb *DB) FindInstanceActionIDFromSID(ctx context.Context, nodeID string, svcID string, sid string) (id int64, found bool, err error) { - const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND sid = ?" - err = oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, sid).Scan(&id) +// FindInstanceActionIDFromPidAndSID finds the instance action ID from the sid. +func (oDb *DB) FindInstanceActionIDFromPidAndSID(ctx context.Context, nodeID string, svcID string, pid, sid string) (id int64, found bool, err error) { + const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND pid = ? AND sid = ?" + err = oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, pid, sid).Scan(&id) if err != nil { if errors.Is(err, sql.ErrNoRows) { err = nil diff --git a/worker/base_job.go b/worker/base_job.go index 159adff..e9bc93d 100644 --- a/worker/base_job.go +++ b/worker/base_job.go @@ -56,6 +56,7 @@ func RunJob(ctx context.Context, j JobRunner) error { err := j.runOps(ctx, jlog, ops...) if err != nil { + // TODO: confirm not anymore required rollack if tx, ok := j.(cdb.DBTxer); ok { jlog.Debug("call rollback on error") if err := tx.Rollback(); err != nil { @@ -64,6 +65,7 @@ func RunJob(ctx context.Context, j JobRunner) error { } return err } else if tx, ok := j.(cdb.DBTxer); ok { + // TODO: confirm not anymore required commit jlog.Debug("call commit") if err := tx.Commit(); err != nil { return fmt.Errorf("commit: %w", err) diff --git a/worker/job_feed_instance_action.go b/worker/job_feed_instance_action.go index e88c0d3..13efd54 100644 --- a/worker/job_feed_instance_action.go +++ b/worker/job_feed_instance_action.go @@ -142,7 +142,7 @@ func (d *jobFeedInstanceAction) updateDB(ctx context.Context) error { return fmt.Errorf("invalid end time format: %w", err) } - actionID, found, err := d.oDb.FindInstanceActionIDFromSID(ctx, d.nodeID, d.objectID, d.data.SessionUuid) + actionID, found, err := d.oDb.FindInstanceActionIDFromPidAndSID(ctx, d.nodeID, d.objectID, d.data.Pid, d.data.SessionUuid) if err != nil { return fmt.Errorf("find action ID failed: %w", err) } diff --git a/worker/job_feed_instance_resource_info.go b/worker/job_feed_instance_resource_info.go index 413ad6a..69fec64 100644 --- a/worker/job_feed_instance_resource_info.go +++ b/worker/job_feed_instance_resource_info.go @@ -68,11 +68,11 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe func (j *jobFeedInstanceResourceInfo) Operations() []operation { return []operation{ - {name: "dropPending", do: j.dropPending}, - {name: "getData", do: j.getData}, - {name: "dbNow", do: j.dbNow}, - {name: "updateDB", do: j.updateDB}, - {name: "purgeDB", do: j.purgeDB}, + {name: "dropPending", do: j.dropPending, blocking: true}, + {name: "getData", do: j.getData, blocking: true}, + {name: "dbNow", do: j.dbNow, blocking: true}, + {name: "updateDB", do: j.updateDB, blocking: true}, + {name: "purgeDB", do: j.purgeDB, blocking: true}, {name: "updateWSP", do: j.updateWSP, blocking: false}, {name: "pushFromTableChanges", do: j.pushFromTableChanges}, } @@ -151,9 +151,9 @@ func (j *jobFeedInstanceResourceInfo) updateWSP(ctx context.Context) (err error) okKeys = append(okKeys, v.Key) } } - if len(okKeys) > 0 { - j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys)) - } + } + if len(okKeys) > 0 { + j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys)) } if len(badKeys) > 0 { return fmt.Errorf("jobFeedInstanceResourceInfo: updateWSP failed for keys %v", badKeys)