diff --git a/verifier/pkg/token/cctp/verifier.go b/verifier/pkg/token/cctp/verifier.go index 4da14d000..da6955ffa 100644 --- a/verifier/pkg/token/cctp/verifier.go +++ b/verifier/pkg/token/cctp/verifier.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "golang.org/x/sync/errgroup" + "github.com/smartcontractkit/chainlink-ccv/verifier/pkg/commit" verifier "github.com/smartcontractkit/chainlink-ccv/verifier/pkg/vtypes" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -54,71 +56,78 @@ func NewVerifierWithConfig( } } +const workerPoolSize = 10 + func (v *Verifier) VerifyMessages( ctx context.Context, tasks []verifier.VerificationTask, ) []verifier.VerificationResult { - results := make([]verifier.VerificationResult, 0, len(tasks)) - - // TODO: `attestationService.Fetch` is an IO-bound operation and can be parallelized. Large number of tasks - // may lead to performance bottlenecks. Consider using a worker pool or goroutines with a semaphore to limit - // concurrency. - for _, task := range tasks { - lggr := logger.With(v.lggr, "messageID", task.MessageID, "txHash", task.TxHash) - lggr.Infow("Verifying CCTP task") - - // 1. Fetch attestation - attestation, err := v.attestationService.Fetch(ctx, task.TxHash, task.Message) - if err != nil { - lggr.Warnw("Failed to fetch attestation", "err", err) - verificationError := v.errorRetry(err, task) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - if !attestation.IsReady() { - lggr.Debugw("Attestation not ready for message") - verificationError := v.attestationErrorRetry( - fmt.Errorf("attestation not ready for message ID: %s", task.MessageID), - task, + results := make([]verifier.VerificationResult, len(tasks)) + + var g errgroup.Group + g.SetLimit(workerPoolSize) + + for i, task := range tasks { + g.Go(func() error { + lggr := logger.With(v.lggr, "messageID", task.MessageID, "txHash", task.TxHash) + lggr.Infow("Verifying CCTP task") + + // 1. Fetch attestation + attestation, err := v.attestationService.Fetch(ctx, task.TxHash, task.Message) + if err != nil { + lggr.Warnw("Failed to fetch attestation", "err", err) + verificationError := v.errorRetry(err, task) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + if !attestation.IsReady() { + lggr.Debugw("Attestation not ready for message") + verificationError := v.attestationErrorRetry( + fmt.Errorf("attestation not ready for message ID: %s", task.MessageID), + task, + ) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + verifierFormat, err := attestation.ToVerifierFormat() + if err != nil { + lggr.Errorw("Failed to decode attestation data", "err", err) + verificationError := v.errorRetry(err, task) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + lggr.Infow("Attestation fetched and decoded successfully", + "status", attestation.status, + "attestation", attestation.attestation, + "encodedCCTPMessage", attestation.encodedCCTPMessage, + "verifierFormat", verifierFormat, ) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - verifierFormat, err := attestation.ToVerifierFormat() - if err != nil { - lggr.Errorw("Failed to decode attestation data", "err", err) - verificationError := v.errorRetry(err, task) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - lggr.Infow("Attestation fetched and decoded successfully", - "status", attestation.status, - "attestation", attestation.attestation, - "encodedCCTPMessage", attestation.encodedCCTPMessage, - "verifierFormat", verifierFormat, - ) - - // 2. Create VerifierNodeResult - result, err := commit.CreateVerifierNodeResult( - &task, - verifierFormat, - attestation.verifierVersion, - ) - if err != nil { - lggr.Errorw("CreateVerifierNodeResult: Failed to create VerifierNodeResult", "err", err) - verificationError := v.errorRetry(err, task) - results = append(results, verifier.VerificationResult{Error: &verificationError}) - continue - } - - // 3. Return successful result - lggr.Infow("VerifierResults: Successfully verified message", "signature", result.Signature) - results = append(results, verifier.VerificationResult{Result: result}) + + // 2. Create VerifierNodeResult + result, err := commit.CreateVerifierNodeResult( + &task, + verifierFormat, + attestation.verifierVersion, + ) + if err != nil { + lggr.Errorw("CreateVerifierNodeResult: Failed to create VerifierNodeResult", "err", err) + verificationError := v.errorRetry(err, task) + results[i] = verifier.VerificationResult{Error: &verificationError} + return nil + } + + // 3. Return successful result + lggr.Infow("VerifierResults: Successfully verified message", "signature", result.Signature) + results[i] = verifier.VerificationResult{Result: result} + return nil + }) } + _ = g.Wait() + return results }