Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions targeting/cpu_combined_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package targeting

import (
"context"
"fmt"
"testing"
"time"

"github.com/adcontextprotocol/adcp-go/tmproto"
)

// TestScale_IdentityMatch_CPU_Combined measures EvaluateIdentityResolved CPU
// across the combined dimensions that matter for production sizing:
// (candidate packages per request) × (exposure log entries per identity) ×
// (identities per request). All numbers are isolated from network I/O via
// the mock store, so they represent in-process CPU only.
func TestScale_IdentityMatch_CPU_Combined(t *testing.T) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as TestPreaggregate_Crossover - no real tests, should be skipped or Benchmark

pkgCounts := []int{10, 100, 1000}
logSizes := []int{0, 100, 1000, 10000}
identityCounts := []int{1, 3}

t.Log("")
t.Log("=== IdentityMatch CPU: packages × log_size × identities ===")
t.Log("")
t.Logf(" %-10s %-10s %-10s %-15s %-12s", "packages", "log_size", "identities", "ns/op", "µs/eval")
t.Logf(" %-10s %-10s %-10s %-15s %-12s", "--------", "--------", "----------", "-----", "-------")

for _, numPkgs := range pkgCounts {
for _, logSize := range logSizes {
for _, numIdentities := range identityCounts {
now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC)
store := NewMockStore()
store.Now = func() time.Time { return now }

// Set up N packages, each with one fcap rule of max=10/86400s.
var pkgs []PackageConfig
var pkgIDs []string
idConfigs := make(map[string]*PackageIdentityConfig, numPkgs)
for i := range numPkgs {
pkgID := fmt.Sprintf("pkg-%d", i)
pkgs = append(pkgs, PackageConfig{PackageID: pkgID})
pkgIDs = append(pkgIDs, pkgID)
idCfg := PackageIdentityConfig{
FrequencyRules: []FrequencyRuleJSON{{MaxCount: 1_000_000, WindowSeconds: 86400}},
}
store.SetPackageIdentityConfig(pkgID, idCfg)
idConfigs[pkgID] = &idCfg
}

// Build identities and write per-identity exposure logs of `logSize`.
identities := make([]tmproto.IdentityToken, numIdentities)
for i := range numIdentities {
tok := fmt.Sprintf("tok-bench-%d", i)
identities[i] = tmproto.IdentityToken{UserToken: tok}

if logSize > 0 {
entries := make([]ExposureEntry, 0, logSize)
for j := range logSize {
pkg := pkgIDs[j%numPkgs]
entries = append(entries, ExposureEntry{
ImpressionID: fmt.Sprintf("imp-%d-%d", i, j),
PackageID: pkg,
SourceID: "bench",
Timestamp: now.Add(-time.Duration(j) * time.Minute).Unix(),
})
}
store.SetUserExposures(tok, entries)
}
}

engine := NewEngine(EngineConfig{
ProviderID: "bench",
Store: store,
Packages: pkgs,
})
engine.Now = func() time.Time { return now }

resolved := &ResolvedPackages{IdentityConfigs: idConfigs}

req := &tmproto.IdentityMatchRequest{
RequestID: "bench",
Identities: identities,
PackageIDs: pkgIDs,
}

// Warmup
for range 10 {
_, _ = engine.EvaluateIdentityResolved(context.Background(), resolved, req)
}

// Time
const iterations = 200
start := time.Now()
for range iterations {
_, _ = engine.EvaluateIdentityResolved(context.Background(), resolved, req)
}
elapsed := time.Since(start)
perEval := elapsed / iterations
nsPerOp := perEval.Nanoseconds()

t.Logf(" %-10d %-10d %-10d %-15s %-12.2f",
numPkgs, logSize, numIdentities,
fmt.Sprintf("%d ns", nsPerOp),
float64(perEval.Microseconds()),
)
}
}
}
t.Log("")
}
42 changes: 36 additions & 6 deletions targeting/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,16 @@ func (e *Engine) EvaluateIdentityResolved(ctx context.Context, resolved *Resolve
// 5. SegmentIndex: which packages is this user eligible for?
segmentEligible := resolved.SegmentCandidates(userSegments)

// 5a. Optional preaggregation: when many candidate packages share one
// user's logs, build a per-key bucket index once instead of re-scanning
// the logs per package. For small-package requests the build overhead
// doesn't amortize; ShouldPreaggregate gates the choice. See
// exposure_aggregate.go for the empirical crossover measurement.
var agg *PreaggregatedExposures
if ShouldPreaggregate(len(req.PackageIDs)) {
agg = BuildPreaggregatedExposures(firstLogs)
}

// 6. Evaluate each requested package using binary lazy dedup.
var eligibility []tmproto.PackageEligibility
for _, pkgID := range req.PackageIDs {
Expand All @@ -399,32 +409,52 @@ func (e *Engine) EvaluateIdentityResolved(ctx context.Context, resolved *Resolve
}
}

// Campaign frequency cap (binary lazy dedup across all UID logs).
// Campaign frequency cap. Use the preaggregated index when present;
// fall back to per-package log scan for small request sizes where
// the aggregation overhead doesn't amortize.
if eligible && idCfg != nil && idCfg.CampaignID != "" {
campCfg := resolved.CampaignConfigs[idCfg.CampaignID]
if campCfg != nil && len(campCfg.FrequencyRules) > 0 {
rules := toFrequencyRules(campCfg.FrequencyRules)
campHash := hashString(idCfg.CampaignID)
if CheckFrequencyRulesMultiLog(firstLogs, campHash, true, rules, nowUnix) {
var capped bool
if agg != nil {
capped = CheckFrequencyRulesAggregated(agg, campHash, true, rules, nowUnix)
} else {
capped = CheckFrequencyRulesMultiLog(firstLogs, campHash, true, rules, nowUnix)
}
if capped {
eligible = false
e.metrics.IdentityEvaluated(pkgID, StageCampaignFreq, false)
}
}
}

// Package frequency cap (binary lazy dedup across all UID logs).
// Package frequency cap.
pkgHash := hashString(pkgID)
if eligible && idCfg != nil && len(idCfg.FrequencyRules) > 0 {
rules := toFrequencyRules(idCfg.FrequencyRules)
if CheckFrequencyRulesMultiLog(firstLogs, pkgHash, false, rules, nowUnix) {
var capped bool
if agg != nil {
capped = CheckFrequencyRulesAggregated(agg, pkgHash, false, rules, nowUnix)
} else {
capped = CheckFrequencyRulesMultiLog(firstLogs, pkgHash, false, rules, nowUnix)
}
if capped {
eligible = false
e.metrics.IdentityEvaluated(pkgID, StagePackageFreq, false)
}
}

// Intent score (binary, scan across all UID logs).
// Intent score. Use the precomputed per-package latest from the
// aggregated index when present; otherwise scan all UID logs.
var intent float64
latestTS := LatestExposureMultiLog(firstLogs, pkgHash)
var latestTS int64
if agg != nil {
latestTS = LatestExposureAggregated(agg, pkgHash)
} else {
latestTS = LatestExposureMultiLog(firstLogs, pkgHash)
}
if latestTS > 0 {
intent = ComputeIntentScore(latestTS, now)
}
Expand Down
129 changes: 129 additions & 0 deletions targeting/exposure_aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package targeting

// PreaggregatedExposures partitions a user's exposure log entries by filter
// hash for fast per-package eligibility checks. Trades a one-time build pass
// (O(total log entries)) for O(matches-per-filter) per-package lookups,
// instead of the O(packages × total log entries) cost of scanning all logs
// for every package check.
//
// Use when the eligibility evaluator will check many packages against the
// same logs. For small-package requests, the naive CheckFrequencyRulesMultiLog
// is faster because the build overhead isn't amortized — at 10 packages ×
// 10K-entry log × 3 identities the build cost more than triples per-request
// CPU. ShouldPreaggregate gates the choice based on the empirical crossover.
type PreaggregatedExposures struct {
// Per-key bucket of entries that match that hash. Each entry carries the
// timestamp and impression hash so per-rule window filtering and
// per-impression dedup can run on the much smaller bucket.
byCampaign map[uint64][]aggEntry
byPackage map[uint64][]aggEntry
// Per-package latest timestamp; precomputed once during the build pass
// so intent-score lookups don't have to re-scan the log per package.
latestByPackage map[uint64]int64
}

type aggEntry struct {
impHash uint64
timestamp int64
}

// BuildPreaggregatedExposures partitions the entries across all the user's
// identity logs into per-(campaign|package) buckets and precomputes
// per-package latest timestamps. Cost: O(L × I) where L is total entries
// per identity and I is number of identities.
func BuildPreaggregatedExposures(logs []BinaryExposureLog) *PreaggregatedExposures {
// Pre-size maps to reduce growth allocations; assume a typical user has
// O(distinct_campaigns) << total_entries.
totalEntries := 0
for _, log := range logs {
totalEntries += log.Len()
}
cap := totalEntries / 4
if cap < 8 {
cap = 8
}
pa := &PreaggregatedExposures{
byCampaign: make(map[uint64][]aggEntry, cap),
byPackage: make(map[uint64][]aggEntry, cap),
latestByPackage: make(map[uint64]int64, cap),
}

for _, log := range logs {
n := log.Len()
for i := 0; i < n; i++ {
ts := log.Timestamp(i)
imp := log.ImpressionHash(i)
campH := log.CampaignHash(i)
pkgH := log.PackageHash(i)
entry := aggEntry{impHash: imp, timestamp: ts}
if campH != 0 {
pa.byCampaign[campH] = append(pa.byCampaign[campH], entry)
}
pa.byPackage[pkgH] = append(pa.byPackage[pkgH], entry)
if cur, ok := pa.latestByPackage[pkgH]; !ok || ts > cur {
pa.latestByPackage[pkgH] = ts
}
}
}
return pa
}

// LatestExposureAggregated returns the latest timestamp recorded for the
// given package hash. Equivalent to LatestExposureMultiLog but reads from
// the precomputed index, O(1) per call.
func LatestExposureAggregated(agg *PreaggregatedExposures, pkgHash uint64) int64 {
return agg.latestByPackage[pkgHash]
}

// CheckFrequencyRulesAggregated checks frequency rules against a
// pre-aggregated bucket. Equivalent to CheckFrequencyRulesMultiLog but
// avoids re-scanning the full log per check.
func CheckFrequencyRulesAggregated(agg *PreaggregatedExposures, filterHash uint64, isCampaign bool, rules []FrequencyRule, nowUnix int64) bool {
var bucket []aggEntry
if isCampaign {
bucket = agg.byCampaign[filterHash]
} else {
bucket = agg.byPackage[filterHash]
}
if len(bucket) == 0 {
return false
}
for _, rule := range rules {
cutoff := nowUnix - int64(rule.Window.Seconds())
seen := make(map[uint64]struct{})
count := 0
for i := range bucket {
if bucket[i].timestamp < cutoff {
continue
}
if _, dup := seen[bucket[i].impHash]; dup {
continue
}
seen[bucket[i].impHash] = struct{}{}
count++
if count >= rule.MaxCount {
return true
}
}
}
return false
}

// PreaggregatePackagesThreshold is the per-request candidate-package count
// above which the eligibility evaluator should build a preaggregated view of
// the user's exposure log before per-package checks.
//
// Empirically tuned (TestPreaggregate_Crossover): below ~50 packages, the
// map-build allocation overhead per log entry dominates and naive scanning
// is faster regardless of log size — at 10 packages × 10K log × 3 identities,
// preagg is ~3× slower (1.25ms vs 408µs). Above ~50 packages, preagg
// amortizes — at 1000 packages × 1000-entry log × 3 identities the speedup
// is ~26×; at 1000 × 10K × 3 identities, ~40×.
const PreaggregatePackagesThreshold = 50

// ShouldPreaggregate returns whether the eligibility evaluator should build
// a preaggregated view before per-package checks. Cheap to evaluate; safe to
// call on every request.
func ShouldPreaggregate(numPackages int) bool {
return numPackages > PreaggregatePackagesThreshold
}
Loading
Loading