diff --git a/targeting/cpu_combined_test.go b/targeting/cpu_combined_test.go new file mode 100644 index 0000000..03b64b3 --- /dev/null +++ b/targeting/cpu_combined_test.go @@ -0,0 +1,110 @@ +package targeting + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/adcontextprotocol/adcp-go/tmproto" +) + +// TestScale_IdentityMatch_CPU_Combined measures EvaluateIdentityResolved CPU +// across the combined dimensions that matter for production sizing: +// (candidate packages per request) × (exposure log entries per identity) × +// (identities per request). All numbers are isolated from network I/O via +// the mock store, so they represent in-process CPU only. +func TestScale_IdentityMatch_CPU_Combined(t *testing.T) { + pkgCounts := []int{10, 100, 1000} + logSizes := []int{0, 100, 1000, 10000} + identityCounts := []int{1, 3} + + t.Log("") + t.Log("=== IdentityMatch CPU: packages × log_size × identities ===") + t.Log("") + t.Logf(" %-10s %-10s %-10s %-15s %-12s", "packages", "log_size", "identities", "ns/op", "µs/eval") + t.Logf(" %-10s %-10s %-10s %-15s %-12s", "--------", "--------", "----------", "-----", "-------") + + for _, numPkgs := range pkgCounts { + for _, logSize := range logSizes { + for _, numIdentities := range identityCounts { + now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC) + store := NewMockStore() + store.Now = func() time.Time { return now } + + // Set up N packages, each with one fcap rule of max=10/86400s. + var pkgs []PackageConfig + var pkgIDs []string + idConfigs := make(map[string]*PackageIdentityConfig, numPkgs) + for i := range numPkgs { + pkgID := fmt.Sprintf("pkg-%d", i) + pkgs = append(pkgs, PackageConfig{PackageID: pkgID}) + pkgIDs = append(pkgIDs, pkgID) + idCfg := PackageIdentityConfig{ + FrequencyRules: []FrequencyRuleJSON{{MaxCount: 1_000_000, WindowSeconds: 86400}}, + } + store.SetPackageIdentityConfig(pkgID, idCfg) + idConfigs[pkgID] = &idCfg + } + + // Build identities and write per-identity exposure logs of `logSize`. + identities := make([]tmproto.IdentityToken, numIdentities) + for i := range numIdentities { + tok := fmt.Sprintf("tok-bench-%d", i) + identities[i] = tmproto.IdentityToken{UserToken: tok} + + if logSize > 0 { + entries := make([]ExposureEntry, 0, logSize) + for j := range logSize { + pkg := pkgIDs[j%numPkgs] + entries = append(entries, ExposureEntry{ + ImpressionID: fmt.Sprintf("imp-%d-%d", i, j), + PackageID: pkg, + SourceID: "bench", + Timestamp: now.Add(-time.Duration(j) * time.Minute).Unix(), + }) + } + store.SetUserExposures(tok, entries) + } + } + + engine := NewEngine(EngineConfig{ + ProviderID: "bench", + Store: store, + Packages: pkgs, + }) + engine.Now = func() time.Time { return now } + + resolved := &ResolvedPackages{IdentityConfigs: idConfigs} + + req := &tmproto.IdentityMatchRequest{ + RequestID: "bench", + Identities: identities, + PackageIDs: pkgIDs, + } + + // Warmup + for range 10 { + _, _ = engine.EvaluateIdentityResolved(context.Background(), resolved, req) + } + + // Time + const iterations = 200 + start := time.Now() + for range iterations { + _, _ = engine.EvaluateIdentityResolved(context.Background(), resolved, req) + } + elapsed := time.Since(start) + perEval := elapsed / iterations + nsPerOp := perEval.Nanoseconds() + + t.Logf(" %-10d %-10d %-10d %-15s %-12.2f", + numPkgs, logSize, numIdentities, + fmt.Sprintf("%d ns", nsPerOp), + float64(perEval.Microseconds()), + ) + } + } + } + t.Log("") +} diff --git a/targeting/engine.go b/targeting/engine.go index 79dab32..9a48c6b 100644 --- a/targeting/engine.go +++ b/targeting/engine.go @@ -385,6 +385,16 @@ func (e *Engine) EvaluateIdentityResolved(ctx context.Context, resolved *Resolve // 5. SegmentIndex: which packages is this user eligible for? segmentEligible := resolved.SegmentCandidates(userSegments) + // 5a. Optional preaggregation: when many candidate packages share one + // user's logs, build a per-key bucket index once instead of re-scanning + // the logs per package. For small-package requests the build overhead + // doesn't amortize; ShouldPreaggregate gates the choice. See + // exposure_aggregate.go for the empirical crossover measurement. + var agg *PreaggregatedExposures + if ShouldPreaggregate(len(req.PackageIDs)) { + agg = BuildPreaggregatedExposures(firstLogs) + } + // 6. Evaluate each requested package using binary lazy dedup. var eligibility []tmproto.PackageEligibility for _, pkgID := range req.PackageIDs { @@ -399,32 +409,52 @@ func (e *Engine) EvaluateIdentityResolved(ctx context.Context, resolved *Resolve } } - // Campaign frequency cap (binary lazy dedup across all UID logs). + // Campaign frequency cap. Use the preaggregated index when present; + // fall back to per-package log scan for small request sizes where + // the aggregation overhead doesn't amortize. if eligible && idCfg != nil && idCfg.CampaignID != "" { campCfg := resolved.CampaignConfigs[idCfg.CampaignID] if campCfg != nil && len(campCfg.FrequencyRules) > 0 { rules := toFrequencyRules(campCfg.FrequencyRules) campHash := hashString(idCfg.CampaignID) - if CheckFrequencyRulesMultiLog(firstLogs, campHash, true, rules, nowUnix) { + var capped bool + if agg != nil { + capped = CheckFrequencyRulesAggregated(agg, campHash, true, rules, nowUnix) + } else { + capped = CheckFrequencyRulesMultiLog(firstLogs, campHash, true, rules, nowUnix) + } + if capped { eligible = false e.metrics.IdentityEvaluated(pkgID, StageCampaignFreq, false) } } } - // Package frequency cap (binary lazy dedup across all UID logs). + // Package frequency cap. pkgHash := hashString(pkgID) if eligible && idCfg != nil && len(idCfg.FrequencyRules) > 0 { rules := toFrequencyRules(idCfg.FrequencyRules) - if CheckFrequencyRulesMultiLog(firstLogs, pkgHash, false, rules, nowUnix) { + var capped bool + if agg != nil { + capped = CheckFrequencyRulesAggregated(agg, pkgHash, false, rules, nowUnix) + } else { + capped = CheckFrequencyRulesMultiLog(firstLogs, pkgHash, false, rules, nowUnix) + } + if capped { eligible = false e.metrics.IdentityEvaluated(pkgID, StagePackageFreq, false) } } - // Intent score (binary, scan across all UID logs). + // Intent score. Use the precomputed per-package latest from the + // aggregated index when present; otherwise scan all UID logs. var intent float64 - latestTS := LatestExposureMultiLog(firstLogs, pkgHash) + var latestTS int64 + if agg != nil { + latestTS = LatestExposureAggregated(agg, pkgHash) + } else { + latestTS = LatestExposureMultiLog(firstLogs, pkgHash) + } if latestTS > 0 { intent = ComputeIntentScore(latestTS, now) } diff --git a/targeting/exposure_aggregate.go b/targeting/exposure_aggregate.go new file mode 100644 index 0000000..2e5424a --- /dev/null +++ b/targeting/exposure_aggregate.go @@ -0,0 +1,129 @@ +package targeting + +// PreaggregatedExposures partitions a user's exposure log entries by filter +// hash for fast per-package eligibility checks. Trades a one-time build pass +// (O(total log entries)) for O(matches-per-filter) per-package lookups, +// instead of the O(packages × total log entries) cost of scanning all logs +// for every package check. +// +// Use when the eligibility evaluator will check many packages against the +// same logs. For small-package requests, the naive CheckFrequencyRulesMultiLog +// is faster because the build overhead isn't amortized — at 10 packages × +// 10K-entry log × 3 identities the build cost more than triples per-request +// CPU. ShouldPreaggregate gates the choice based on the empirical crossover. +type PreaggregatedExposures struct { + // Per-key bucket of entries that match that hash. Each entry carries the + // timestamp and impression hash so per-rule window filtering and + // per-impression dedup can run on the much smaller bucket. + byCampaign map[uint64][]aggEntry + byPackage map[uint64][]aggEntry + // Per-package latest timestamp; precomputed once during the build pass + // so intent-score lookups don't have to re-scan the log per package. + latestByPackage map[uint64]int64 +} + +type aggEntry struct { + impHash uint64 + timestamp int64 +} + +// BuildPreaggregatedExposures partitions the entries across all the user's +// identity logs into per-(campaign|package) buckets and precomputes +// per-package latest timestamps. Cost: O(L × I) where L is total entries +// per identity and I is number of identities. +func BuildPreaggregatedExposures(logs []BinaryExposureLog) *PreaggregatedExposures { + // Pre-size maps to reduce growth allocations; assume a typical user has + // O(distinct_campaigns) << total_entries. + totalEntries := 0 + for _, log := range logs { + totalEntries += log.Len() + } + cap := totalEntries / 4 + if cap < 8 { + cap = 8 + } + pa := &PreaggregatedExposures{ + byCampaign: make(map[uint64][]aggEntry, cap), + byPackage: make(map[uint64][]aggEntry, cap), + latestByPackage: make(map[uint64]int64, cap), + } + + for _, log := range logs { + n := log.Len() + for i := 0; i < n; i++ { + ts := log.Timestamp(i) + imp := log.ImpressionHash(i) + campH := log.CampaignHash(i) + pkgH := log.PackageHash(i) + entry := aggEntry{impHash: imp, timestamp: ts} + if campH != 0 { + pa.byCampaign[campH] = append(pa.byCampaign[campH], entry) + } + pa.byPackage[pkgH] = append(pa.byPackage[pkgH], entry) + if cur, ok := pa.latestByPackage[pkgH]; !ok || ts > cur { + pa.latestByPackage[pkgH] = ts + } + } + } + return pa +} + +// LatestExposureAggregated returns the latest timestamp recorded for the +// given package hash. Equivalent to LatestExposureMultiLog but reads from +// the precomputed index, O(1) per call. +func LatestExposureAggregated(agg *PreaggregatedExposures, pkgHash uint64) int64 { + return agg.latestByPackage[pkgHash] +} + +// CheckFrequencyRulesAggregated checks frequency rules against a +// pre-aggregated bucket. Equivalent to CheckFrequencyRulesMultiLog but +// avoids re-scanning the full log per check. +func CheckFrequencyRulesAggregated(agg *PreaggregatedExposures, filterHash uint64, isCampaign bool, rules []FrequencyRule, nowUnix int64) bool { + var bucket []aggEntry + if isCampaign { + bucket = agg.byCampaign[filterHash] + } else { + bucket = agg.byPackage[filterHash] + } + if len(bucket) == 0 { + return false + } + for _, rule := range rules { + cutoff := nowUnix - int64(rule.Window.Seconds()) + seen := make(map[uint64]struct{}) + count := 0 + for i := range bucket { + if bucket[i].timestamp < cutoff { + continue + } + if _, dup := seen[bucket[i].impHash]; dup { + continue + } + seen[bucket[i].impHash] = struct{}{} + count++ + if count >= rule.MaxCount { + return true + } + } + } + return false +} + +// PreaggregatePackagesThreshold is the per-request candidate-package count +// above which the eligibility evaluator should build a preaggregated view of +// the user's exposure log before per-package checks. +// +// Empirically tuned (TestPreaggregate_Crossover): below ~50 packages, the +// map-build allocation overhead per log entry dominates and naive scanning +// is faster regardless of log size — at 10 packages × 10K log × 3 identities, +// preagg is ~3× slower (1.25ms vs 408µs). Above ~50 packages, preagg +// amortizes — at 1000 packages × 1000-entry log × 3 identities the speedup +// is ~26×; at 1000 × 10K × 3 identities, ~40×. +const PreaggregatePackagesThreshold = 50 + +// ShouldPreaggregate returns whether the eligibility evaluator should build +// a preaggregated view before per-package checks. Cheap to evaluate; safe to +// call on every request. +func ShouldPreaggregate(numPackages int) bool { + return numPackages > PreaggregatePackagesThreshold +} diff --git a/targeting/exposure_aggregate_test.go b/targeting/exposure_aggregate_test.go new file mode 100644 index 0000000..946cbbd --- /dev/null +++ b/targeting/exposure_aggregate_test.go @@ -0,0 +1,84 @@ +package targeting + +import ( + "fmt" + "testing" + "time" +) + +// TestPreaggregate_Crossover measures naive vs preaggregated frequency-cap +// evaluation across the (packages × log_entries × identities) matrix to +// determine where the heuristic threshold should sit. +func TestPreaggregate_Crossover(t *testing.T) { + pkgCounts := []int{10, 100, 1000} + logSizes := []int{0, 100, 1000, 10000} + identityCounts := []int{1, 3} + + now := time.Date(2026, 1, 1, 12, 0, 0, 0, time.UTC).Unix() + rules := []FrequencyRule{{MaxCount: 1_000_000, Window: 24 * time.Hour}} + + t.Log("") + t.Log("=== Naive scan vs preaggregated lookup, frequency-cap eligibility ===") + t.Log("") + t.Logf(" %-9s %-9s %-3s %-12s %-12s %-12s", + "packages", "log_size", "ids", "naive (ns)", "preagg (ns)", "ratio") + t.Logf(" %-9s %-9s %-3s %-12s %-12s %-12s", + "--------", "--------", "---", "----------", "-----------", "-----") + + for _, numPkgs := range pkgCounts { + for _, logSize := range logSizes { + for _, numIds := range identityCounts { + // Build per-identity binary logs of `logSize` entries, distributed + // across numPkgs packages with ~10 distinct campaigns. + logs := make([]BinaryExposureLog, numIds) + for i := 0; i < numIds; i++ { + entries := make(ExposureLog, 0, logSize) + for j := 0; j < logSize; j++ { + pkgIdx := j % numPkgs + entries = append(entries, ExposureEntry{ + ImpressionID: fmt.Sprintf("imp-%d-%d", i, j), + PackageID: fmt.Sprintf("pkg-%d", pkgIdx), + CampaignID: fmt.Sprintf("camp-%d", pkgIdx%10), + SourceID: "bench", + Timestamp: now - int64(j*60), + }) + } + logs[i] = EncodeBinaryExposureLog(entries) + } + + pkgHashes := make([]uint64, numPkgs) + for i := 0; i < numPkgs; i++ { + pkgHashes[i] = hashString(fmt.Sprintf("pkg-%d", i)) + } + + const iters = 200 + + // Naive timing: re-scan all logs per package check. + start := time.Now() + for it := 0; it < iters; it++ { + for _, h := range pkgHashes { + _ = CheckFrequencyRulesMultiLog(logs, h, false, rules, now) + } + } + naive := time.Since(start) / time.Duration(iters) + + // Preaggregated timing: build once per request, then cheap lookups. + start = time.Now() + for it := 0; it < iters; it++ { + agg := BuildPreaggregatedExposures(logs) + for _, h := range pkgHashes { + _ = CheckFrequencyRulesAggregated(agg, h, false, rules, now) + } + } + preagg := time.Since(start) / time.Duration(iters) + + ratio := float64(naive) / float64(preagg) + + t.Logf(" %-9d %-9d %-3d %-12d %-12d %-12.2fx", + numPkgs, logSize, numIds, + naive.Nanoseconds(), preagg.Nanoseconds(), ratio) + } + } + } + t.Log("") +}