From 28e4615ef429e8ae7e68fa2815ba6ea32e9b3d07 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 9 Jun 2026 00:41:50 +0200 Subject: [PATCH 1/5] [job/feed_instance_status] Fix missed end action when multiple pids within the same sid. - Adjusted related logic in `job_feed_instance_action.go` to include `pid` in action ID retrieval. => retrieve correct action id when actions are run with the same sid - Updated `FindInstanceActionIDFromSID` to `FindInstanceActionIDFromPidAndSID`, adding the `pid` parameter for more precise queries. --- cdb/db_actions.go | 8 ++++---- worker/job_feed_instance_action.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cdb/db_actions.go b/cdb/db_actions.go index 9858df6..38f8803 100644 --- a/cdb/db_actions.go +++ b/cdb/db_actions.go @@ -326,10 +326,10 @@ func (oDb *DB) UpdateSvcAction(ctx context.Context, a SvcAction) error { return nil } -// FindInstanceActionIDFromSID finds the instance action ID from the sid. -func (oDb *DB) FindInstanceActionIDFromSID(ctx context.Context, nodeID string, svcID string, sid string) (id int64, found bool, err error) { - const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND sid = ?" - err = oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, sid).Scan(&id) +// FindInstanceActionIDFromPidAndSID finds the instance action ID from the sid. +func (oDb *DB) FindInstanceActionIDFromPidAndSID(ctx context.Context, nodeID string, svcID string, pid, sid string) (id int64, found bool, err error) { + const query = "SELECT id FROM svcactions WHERE node_id = ? AND svc_id = ? AND pid = ? AND sid = ?" + err = oDb.DB.QueryRowContext(ctx, query, nodeID, svcID, pid, sid).Scan(&id) if err != nil { if errors.Is(err, sql.ErrNoRows) { err = nil diff --git a/worker/job_feed_instance_action.go b/worker/job_feed_instance_action.go index e88c0d3..13efd54 100644 --- a/worker/job_feed_instance_action.go +++ b/worker/job_feed_instance_action.go @@ -142,7 +142,7 @@ func (d *jobFeedInstanceAction) updateDB(ctx context.Context) error { return fmt.Errorf("invalid end time format: %w", err) } - actionID, found, err := d.oDb.FindInstanceActionIDFromSID(ctx, d.nodeID, d.objectID, d.data.SessionUuid) + actionID, found, err := d.oDb.FindInstanceActionIDFromPidAndSID(ctx, d.nodeID, d.objectID, d.data.Pid, d.data.SessionUuid) if err != nil { return fmt.Errorf("find action ID failed: %w", err) } From d9da327351dc4d3093bdc3e52aeb50cc6a648cc6 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 9 Jun 2026 00:43:31 +0200 Subject: [PATCH 2/5] [worker] Fix misplaced log message in `updateWSP` logic - Moved the `updateWSP` success log outside of the loop for proper execution context. --- worker/job_feed_instance_resource_info.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/worker/job_feed_instance_resource_info.go b/worker/job_feed_instance_resource_info.go index 413ad6a..077876a 100644 --- a/worker/job_feed_instance_resource_info.go +++ b/worker/job_feed_instance_resource_info.go @@ -151,9 +151,9 @@ func (j *jobFeedInstanceResourceInfo) updateWSP(ctx context.Context) (err error) okKeys = append(okKeys, v.Key) } } - if len(okKeys) > 0 { - j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys)) - } + } + if len(okKeys) > 0 { + j.logger.Debug(fmt.Sprintf("updateWSP done for keys %v", okKeys)) } if len(badKeys) > 0 { return fmt.Errorf("jobFeedInstanceResourceInfo: updateWSP failed for keys %v", badKeys) From b341d67fea61ad88e4d8e43909c365a4d7166f00 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 9 Jun 2026 00:44:31 +0200 Subject: [PATCH 3/5] [jobFeedInstanceResourceInfo] Mark all existing operations as blocking in `Operations` list - Added `blocking: true` for operations (`dropPending`, `getData`, `dbNow`, `updateDB`, `purgeDB`) to ensure accurate execution behavior. - Retained `blocking: false` for `updateWSP` to maintain its non-blocking behavior. --- worker/job_feed_instance_resource_info.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/worker/job_feed_instance_resource_info.go b/worker/job_feed_instance_resource_info.go index 077876a..69fec64 100644 --- a/worker/job_feed_instance_resource_info.go +++ b/worker/job_feed_instance_resource_info.go @@ -68,11 +68,11 @@ func newjobFeedInstanceResourceInfo(objectName, nodeID, clusterID string) *jobFe func (j *jobFeedInstanceResourceInfo) Operations() []operation { return []operation{ - {name: "dropPending", do: j.dropPending}, - {name: "getData", do: j.getData}, - {name: "dbNow", do: j.dbNow}, - {name: "updateDB", do: j.updateDB}, - {name: "purgeDB", do: j.purgeDB}, + {name: "dropPending", do: j.dropPending, blocking: true}, + {name: "getData", do: j.getData, blocking: true}, + {name: "dbNow", do: j.dbNow, blocking: true}, + {name: "updateDB", do: j.updateDB, blocking: true}, + {name: "purgeDB", do: j.purgeDB, blocking: true}, {name: "updateWSP", do: j.updateWSP, blocking: false}, {name: "pushFromTableChanges", do: j.pushFromTableChanges}, } From ebe2f24a0e0a16b143e5fea01ab5d14b5d8cd399 Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 9 Jun 2026 00:50:23 +0200 Subject: [PATCH 4/5] [cdb] Add missing rollback call on non retryable errors - Added rollback handling logic in `ExecTx` for failed transactions. --- cdb/db.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cdb/db.go b/cdb/db.go index 464cd3f..5923665 100644 --- a/cdb/db.go +++ b/cdb/db.go @@ -246,6 +246,12 @@ func (oDb *DB) ExecContext(ctx context.Context, query string, args ...any) (res oDb.Metrics.ExecTxErr.Inc() if !isDeadlock(err) { oDb.Metrics.ExecTxFailed.Inc() + // We must roll back any partial updates + if err1 := tx.Rollback(); err1 != nil { + oDb.Metrics.RollbackErr.Inc() + return res, fmt.Errorf("exec and rollback failed: %w", errors.Join(err, err1)) + } + oDb.Metrics.RollbackOk.Inc() return nil, err } oDb.Metrics.ExecTxDeadlock.Inc() From f622eb259d77c1b3da72be1f38d35843068b903b Mon Sep 17 00:00:00 2001 From: Cyril Galibern Date: Tue, 9 Jun 2026 00:52:54 +0200 Subject: [PATCH 5/5] [worker] Add TODOs to verify rollback/commit necessity in transaction logic - Added TODO comments in `base_job.go` to review whether rollback and commit calls are still required. --- worker/base_job.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/worker/base_job.go b/worker/base_job.go index 159adff..e9bc93d 100644 --- a/worker/base_job.go +++ b/worker/base_job.go @@ -56,6 +56,7 @@ func RunJob(ctx context.Context, j JobRunner) error { err := j.runOps(ctx, jlog, ops...) if err != nil { + // TODO: confirm not anymore required rollack if tx, ok := j.(cdb.DBTxer); ok { jlog.Debug("call rollback on error") if err := tx.Rollback(); err != nil { @@ -64,6 +65,7 @@ func RunJob(ctx context.Context, j JobRunner) error { } return err } else if tx, ok := j.(cdb.DBTxer); ok { + // TODO: confirm not anymore required commit jlog.Debug("call commit") if err := tx.Commit(); err != nil { return fmt.Errorf("commit: %w", err)