From 3be7f6fb974172ac8ee4ab8df09a3fcebda028a7 Mon Sep 17 00:00:00 2001 From: asoliman Date: Wed, 8 Apr 2026 15:54:17 +0400 Subject: [PATCH] feat(cctp): parallelize attestation fetching with bounded worker pool Circle's attestation API does not support batching, so each task requires a separate HTTP call. Process tasks concurrently using an errgroup capped at 10 workers to avoid sequential bottlenecks on large batches without unbounded goroutine spawning. The 10 cap is simialr to what we have in v1.6 at the moment. --- verifier/pkg/token/cctp/verifier.go | 125 +++++++++++++++------------- 1 file changed, 67 insertions(+), 58 deletions(-) diff --git a/verifier/pkg/token/cctp/verifier.go b/verifier/pkg/token/cctp/verifier.go index 4da14d000..da6955ffa 100644 --- a/verifier/pkg/token/cctp/verifier.go +++ b/verifier/pkg/token/cctp/verifier.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "golang.org/x/sync/errgroup" + "github.com/smartcontractkit/chainlink-ccv/verifier/pkg/commit" verifier "github.com/smartcontractkit/chainlink-ccv/verifier/pkg/vtypes" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -54,71 +56,78 @@ func NewVerifierWithConfig( } } +const workerPoolSize = 10 + func (v *Verifier) VerifyMessages( ctx context.Context, tasks []verifier.VerificationTask, ) []verifier.VerificationResult { - results := make([]verifier.VerificationResult, 0, len(tasks)) - - // TODO: `attestationService.Fetch` is an IO-bound operation and can be parallelized. Large number of tasks - // may lead to performance bottlenecks. Consider using a worker pool or goroutines with a semaphore to limit - // concurrency. - for _, task := range tasks { - lggr := logger.With(v.lggr, "messageID", task.MessageID, "txHash", task.TxHash) - lggr.Infow("Verifying CCTP task") - - // 1. Fetch attestation - attestation, err := v.attestationService.Fetch(ctx, task.TxHash, task.Message) - if err != nil { - lggr.Warnw("Failed to fetch attestation", "err", err) - verificationError := v.errorRetry(err, task) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - if !attestation.IsReady() { - lggr.Debugw("Attestation not ready for message") - verificationError := v.attestationErrorRetry( - fmt.Errorf("attestation not ready for message ID: %s", task.MessageID), - task, + results := make([]verifier.VerificationResult, len(tasks)) + + var g errgroup.Group + g.SetLimit(workerPoolSize) + + for i, task := range tasks { + g.Go(func() error { + lggr := logger.With(v.lggr, "messageID", task.MessageID, "txHash", task.TxHash) + lggr.Infow("Verifying CCTP task") + + // 1. Fetch attestation + attestation, err := v.attestationService.Fetch(ctx, task.TxHash, task.Message) + if err != nil { + lggr.Warnw("Failed to fetch attestation", "err", err) + verificationError := v.errorRetry(err, task) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + if !attestation.IsReady() { + lggr.Debugw("Attestation not ready for message") + verificationError := v.attestationErrorRetry( + fmt.Errorf("attestation not ready for message ID: %s", task.MessageID), + task, + ) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + verifierFormat, err := attestation.ToVerifierFormat() + if err != nil { + lggr.Errorw("Failed to decode attestation data", "err", err) + verificationError := v.errorRetry(err, task) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + lggr.Infow("Attestation fetched and decoded successfully", + "status", attestation.status, + "attestation", attestation.attestation, + "encodedCCTPMessage", attestation.encodedCCTPMessage, + "verifierFormat", verifierFormat, ) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - verifierFormat, err := attestation.ToVerifierFormat() - if err != nil { - lggr.Errorw("Failed to decode attestation data", "err", err) - verificationError := v.errorRetry(err, task) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - lggr.Infow("Attestation fetched and decoded successfully", - "status", attestation.status, - "attestation", attestation.attestation, - "encodedCCTPMessage", attestation.encodedCCTPMessage, - "verifierFormat", verifierFormat, - ) - - // 2. Create VerifierNodeResult - result, err := commit.CreateVerifierNodeResult( - &task, - verifierFormat, - attestation.verifierVersion, - ) - if err != nil { - lggr.Errorw("CreateVerifierNodeResult: Failed to create VerifierNodeResult", "err", err) - verificationError := v.errorRetry(err, task) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - // 3. Return successful result - lggr.Infow("VerifierResults: Successfully verified message", "signature", result.Signature) - results = append(results, verifier.VerificationResult{Result: result}) + + // 2. Create VerifierNodeResult + result, err := commit.CreateVerifierNodeResult( + &task, + verifierFormat, + attestation.verifierVersion, + ) + if err != nil { + lggr.Errorw("CreateVerifierNodeResult: Failed to create VerifierNodeResult", "err", err) + verificationError := v.errorRetry(err, task) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + // 3. Return successful result + lggr.Infow("VerifierResults: Successfully verified message", "signature", result.Signature) + results[i] = verifier.VerificationResult{Result: result} + return nil + }) } + _ = g.Wait() + return results }