Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ func (oDb *DB) ExecContext(ctx context.Context, query string, args ...any) (res
oDb.Metrics.ExecTxErr.Inc()
if !isDeadlock(err) {
oDb.Metrics.ExecTxFailed.Inc()
// We must roll back any partial updates
if err1 := tx.Rollback(); err1 != nil {
oDb.Metrics.RollbackErr.Inc()
return res, fmt.Errorf("exec and rollback failed: %w", errors.Join(err, err1))
}
oDb.Metrics.RollbackOk.Inc()
return nil, err
}
oDb.Metrics.ExecTxDeadlock.Inc()
Expand Down
8 changes: 4 additions & 4 deletions cdb/db_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ func (oDb *DB) UpdateSvcAction(ctx context.Context, a SvcAction) error {
return nil
}

// FindInstanceActionIDFromSID finds the instance action ID from the sid.
func (oDb *DB) FindInstanceActionIDFromSID(ctx context.Context, nodeID string, svcID string, sid string) (id int64, found bool, err error) {
const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND sid = ?"
err = oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, sid).Scan(&id)
// FindInstanceActionIDFromPidAndSID finds the instance action ID from the sid.
func (oDb *DB) FindInstanceActionIDFromPidAndSID(ctx context.Context, nodeID string, svcID string, pid, sid string) (id int64, found bool, err error) {
const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND pid = ? AND sid = ?"
err = oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, pid, sid).Scan(&id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
err = nil
Expand Down
2 changes: 2 additions & 0 deletions worker/base_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func RunJob(ctx context.Context, j JobRunner) error {

err := j.runOps(ctx, jlog, ops...)
if err != nil {
// TODO: confirm not anymore required rollack
if tx, ok := j.(cdb.DBTxer); ok {
jlog.Debug("call rollback on error")
if err := tx.Rollback(); err != nil {
Expand All @@ -64,6 +65,7 @@ func RunJob(ctx context.Context, j JobRunner) error {
}
return err
} else if tx, ok := j.(cdb.DBTxer); ok {
// TODO: confirm not anymore required commit
jlog.Debug("call commit")
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion worker/job_feed_instance_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (d *jobFeedInstanceAction) updateDB(ctx context.Context) error {
return fmt.Errorf("invalid end time format: %w", err)
}

actionID, found, err := d.oDb.FindInstanceActionIDFromSID(ctx, d.nodeID, d.objectID, d.data.SessionUuid)
actionID, found, err := d.oDb.FindInstanceActionIDFromPidAndSID(ctx, d.nodeID, d.objectID, d.data.Pid, d.data.SessionUuid)
if err != nil {
return fmt.Errorf("find action ID failed: %w", err)
}
Expand Down
16 changes: 8 additions & 8 deletions worker/job_feed_instance_resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe

func (j *jobFeedInstanceResourceInfo) Operations() []operation {
return []operation{
{name: "dropPending", do: j.dropPending},
{name: "getData", do: j.getData},
{name: "dbNow", do: j.dbNow},
{name: "updateDB", do: j.updateDB},
{name: "purgeDB", do: j.purgeDB},
{name: "dropPending", do: j.dropPending, blocking: true},
{name: "getData", do: j.getData, blocking: true},
{name: "dbNow", do: j.dbNow, blocking: true},
{name: "updateDB", do: j.updateDB, blocking: true},
{name: "purgeDB", do: j.purgeDB, blocking: true},
{name: "updateWSP", do: j.updateWSP, blocking: false},
{name: "pushFromTableChanges", do: j.pushFromTableChanges},
}
Expand Down Expand Up @@ -151,9 +151,9 @@ func (j *jobFeedInstanceResourceInfo) updateWSP(ctx context.Context) (err error)
okKeys = append(okKeys, v.Key)
}
}
if len(okKeys) > 0 {
j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys))
}
}
if len(okKeys) > 0 {
j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys))
}
if len(badKeys) > 0 {
return fmt.Errorf("jobFeedInstanceResourceInfo: updateWSP failed for keys %v", badKeys)
Expand Down
Loading