diff --git a/app/eth2wrap/cache.go b/app/eth2wrap/cache.go index 66fe9b0e64..75daf0b5e4 100644 --- a/app/eth2wrap/cache.go +++ b/app/eth2wrap/cache.go @@ -374,49 +374,55 @@ func (c *DutiesCache) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoc allActive := c.activeValIdxs.valIdxs c.activeValIdxs.RUnlock() - requestVidxs := vidxs + // Clone so requestVidxs is an independent working copy; it is narrowed to missing indices below + // and must never alias either the caller's slice or the shared activeValIdxs slice. + requestVidxs := slices.Clone(vidxs) if len(requestVidxs) == 0 { - requestVidxs = allActive + requestVidxs = slices.Clone(allActive) } dutiesForEpoch, ok := c.fetchProposerDuties(epoch) dutiesResult := make([]*eth2v1.ProposerDuty, 0, len(vidxs)) if ok { - // If the request was for all validators and also all duties are already cached, skip more expensive operations. - // This is the common case for most validator clients and Charon, which usually request duties for all active validators. - if len(allActive) == len(requestVidxs) && len(allActive) == len(dutiesForEpoch.requestedIdxs) { - for _, d := range dutiesForEpoch.duties { - dutiesResult = append(dutiesResult, &d) - } + // previouslyRequested is the set of indices already queried from the beacon for this epoch. + // A validator with no duty for the epoch is absent from dutiesForEpoch.duties but present + // in dutiesForEpoch.requestedIdxs, so this set (not the duties list) determines cache hits. + previouslyRequested := make(map[eth2p0.ValidatorIndex]struct{}, len(dutiesForEpoch.requestedIdxs)) + for _, idx := range dutiesForEpoch.requestedIdxs { + previouslyRequested[idx] = struct{}{} + } - cacheUsed = true + requestedSet := make(map[eth2p0.ValidatorIndex]struct{}, len(requestVidxs)) - return ProposerDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil + var missing []eth2p0.ValidatorIndex + + for _, idx := range requestVidxs { + requestedSet[idx] = struct{}{} + + if _, hit := previouslyRequested[idx]; !hit { + missing = append(missing, idx) + } } - // Filter out the found duties. for _, d := range dutiesForEpoch.duties { - if slices.Contains(requestVidxs, d.ValidatorIndex) { + if _, hit := requestedSet[d.ValidatorIndex]; hit { dutiesResult = append(dutiesResult, &d) } } - if len(dutiesResult) > 0 { + // Fast path: every requested index has been queried previously, so the cache answer is complete. + if len(missing) == 0 { cacheUsed = true - } - - // Check if all requested duties were found in the cache (= being a subset of it). - if len(dutiesResult) == len(requestVidxs) { return ProposerDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil } - for _, duty := range dutiesForEpoch.duties { - requestVidxs = slices.DeleteFunc(requestVidxs, func(requestVidx eth2p0.ValidatorIndex) bool { - return requestVidx == duty.ValidatorIndex - }) + if len(dutiesResult) > 0 { + cacheUsed = true } + requestVidxs = missing + log.Debug(ctx, "Cached proposer duties do not contain all requested validator indices, fetching from beacon node...", z.Any("missing_validator_indices", requestVidxs), z.Any("requested_validator_indices", vidxs)) } @@ -462,49 +468,55 @@ func (c *DutiesCache) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoc allActive := c.activeValIdxs.valIdxs c.activeValIdxs.RUnlock() - requestVidxs := vidxs + // Clone so requestVidxs is an independent working copy; it is narrowed to missing indices below + // and must never alias either the caller's slice or the shared activeValIdxs slice. + requestVidxs := slices.Clone(vidxs) if len(requestVidxs) == 0 { - requestVidxs = allActive + requestVidxs = slices.Clone(allActive) } dutiesForEpoch, ok := c.fetchAttesterDuties(epoch) dutiesResult := make([]*eth2v1.AttesterDuty, 0, len(vidxs)) if ok { - // If the request was for all validators and also all duties are already cached, this is done to skip more expensive operations. - // This is the common case for most validator clients and Charon, which usually request duties for all active validators. - if len(allActive) == len(requestVidxs) && len(allActive) == len(dutiesForEpoch.requestedIdxs) { - for _, d := range dutiesForEpoch.duties { - dutiesResult = append(dutiesResult, &d) - } + // previouslyRequested is the set of indices already queried from the beacon for this epoch. + // A validator with no duty for the epoch is absent from dutiesForEpoch.duties but present + // in dutiesForEpoch.requestedIdxs, so this set (not the duties list) determines cache hits. + previouslyRequested := make(map[eth2p0.ValidatorIndex]struct{}, len(dutiesForEpoch.requestedIdxs)) + for _, idx := range dutiesForEpoch.requestedIdxs { + previouslyRequested[idx] = struct{}{} + } - cacheUsed = true + requestedSet := make(map[eth2p0.ValidatorIndex]struct{}, len(requestVidxs)) - return AttesterDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil + var missing []eth2p0.ValidatorIndex + + for _, idx := range requestVidxs { + requestedSet[idx] = struct{}{} + + if _, hit := previouslyRequested[idx]; !hit { + missing = append(missing, idx) + } } - // Filter out the found duties. for _, d := range dutiesForEpoch.duties { - if slices.Contains(requestVidxs, d.ValidatorIndex) { + if _, hit := requestedSet[d.ValidatorIndex]; hit { dutiesResult = append(dutiesResult, &d) } } - if len(dutiesResult) > 0 { + // Fast path: every requested index has been queried previously, so the cache answer is complete. + if len(missing) == 0 { cacheUsed = true - } - - // Check if all requested duties were found in the cache (= being a subset of it). - if len(dutiesResult) == len(requestVidxs) { return AttesterDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil } - for _, duty := range dutiesForEpoch.duties { - requestVidxs = slices.DeleteFunc(requestVidxs, func(requestVidx eth2p0.ValidatorIndex) bool { - return requestVidx == duty.ValidatorIndex - }) + if len(dutiesResult) > 0 { + cacheUsed = true } + requestVidxs = missing + log.Debug(ctx, "Cached attester duties do not contain all requested validator indices, fetching from beacon node...", z.Any("missing_validator_indices", requestVidxs), z.Any("requested_validator_indices", vidxs)) } @@ -550,49 +562,55 @@ func (c *DutiesCache) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoc allActive := c.activeValIdxs.valIdxs c.activeValIdxs.RUnlock() - requestVidxs := vidxs + // Clone so requestVidxs is an independent working copy; it is narrowed to missing indices below + // and must never alias either the caller's slice or the shared activeValIdxs slice. + requestVidxs := slices.Clone(vidxs) if len(requestVidxs) == 0 { - requestVidxs = allActive + requestVidxs = slices.Clone(allActive) } dutiesForEpoch, ok := c.fetchSyncDuties(epoch) dutiesResult := make([]*eth2v1.SyncCommitteeDuty, 0, len(vidxs)) if ok { - // If the request was for all validators and also all duties are already cached, skip more expensive operations. - // This is the common case for most validator clients and Charon, which usually request duties for all active validators. - if len(allActive) == len(requestVidxs) && len(allActive) == len(dutiesForEpoch.requestedIdxs) { - for _, d := range dutiesForEpoch.duties { - dutiesResult = append(dutiesResult, &d) - } + // previouslyRequested is the set of indices already queried from the beacon for this epoch. + // A validator with no duty for the epoch is absent from dutiesForEpoch.duties but present + // in dutiesForEpoch.requestedIdxs, so this set (not the duties list) determines cache hits. + previouslyRequested := make(map[eth2p0.ValidatorIndex]struct{}, len(dutiesForEpoch.requestedIdxs)) + for _, idx := range dutiesForEpoch.requestedIdxs { + previouslyRequested[idx] = struct{}{} + } - cacheUsed = true + requestedSet := make(map[eth2p0.ValidatorIndex]struct{}, len(requestVidxs)) - return SyncDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil + var missing []eth2p0.ValidatorIndex + + for _, idx := range requestVidxs { + requestedSet[idx] = struct{}{} + + if _, hit := previouslyRequested[idx]; !hit { + missing = append(missing, idx) + } } - // Filter out the found duties. for _, d := range dutiesForEpoch.duties { - if slices.Contains(requestVidxs, d.ValidatorIndex) { + if _, hit := requestedSet[d.ValidatorIndex]; hit { dutiesResult = append(dutiesResult, &d) } } - if len(dutiesResult) > 0 { + // Fast path: every requested index has been queried previously, so the cache answer is complete. + if len(missing) == 0 { cacheUsed = true - } - - // Check if all requested duties were found in the cache (= being a subset of it). - if len(dutiesResult) == len(requestVidxs) { return SyncDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil } - for _, duty := range dutiesForEpoch.duties { - requestVidxs = slices.DeleteFunc(requestVidxs, func(requestVidx eth2p0.ValidatorIndex) bool { - return requestVidx == duty.ValidatorIndex - }) + if len(dutiesResult) > 0 { + cacheUsed = true } + requestVidxs = missing + log.Debug(ctx, "Cached sync duties do not contain all requested validator indices, fetching from beacon node...", z.Any("missing_validator_indices", requestVidxs), z.Any("requested_validator_indices", vidxs)) } diff --git a/app/eth2wrap/cache_test.go b/app/eth2wrap/cache_test.go index c547fdf0c3..97a0c9ccc1 100644 --- a/app/eth2wrap/cache_test.go +++ b/app/eth2wrap/cache_test.go @@ -291,3 +291,314 @@ func TestDutiesCache(t *testing.T) { require.NoError(t, err) require.True(t, proposerDutiesCalled) } + +// cacheAdapter captures the per-duty-type differences needed to exercise DutiesCache +// under the shared scenario runners below. installBN wires the mock's duty endpoint to +// record every call's indices and return one duty per requested index; callCache +// invokes the matching cache method and returns the number of duties in the response. +type cacheAdapter struct { + installBN func(m *beaconmock.Mock, record func([]eth2p0.ValidatorIndex), valSet beaconmock.ValidatorSet) + callCache func(c *eth2wrap.DutiesCache, ctx context.Context, vidxs []eth2p0.ValidatorIndex) (int, error) +} + +func proposerCacheAdapter() cacheAdapter { + return cacheAdapter{ + installBN: func(m *beaconmock.Mock, record func([]eth2p0.ValidatorIndex), valSet beaconmock.ValidatorSet) { + m.ProposerDutiesFunc = func(_ context.Context, _ eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.ProposerDuty, error) { + record(vidxs) + + resp := make([]*eth2v1.ProposerDuty, 0, len(vidxs)) + for _, vidx := range vidxs { + val, ok := valSet[vidx] + if !ok { + continue + } + + resp = append(resp, ð2v1.ProposerDuty{ + PubKey: val.Validator.PublicKey, + ValidatorIndex: vidx, + }) + } + + return resp, nil + } + }, + callCache: func(c *eth2wrap.DutiesCache, ctx context.Context, vidxs []eth2p0.ValidatorIndex) (int, error) { + r, err := c.ProposerDutiesCache(ctx, 0, vidxs) + return len(r.Duties), err + }, + } +} + +func attesterCacheAdapter() cacheAdapter { + return cacheAdapter{ + installBN: func(m *beaconmock.Mock, record func([]eth2p0.ValidatorIndex), valSet beaconmock.ValidatorSet) { + m.AttesterDutiesFunc = func(_ context.Context, _ eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.AttesterDuty, error) { + record(vidxs) + + resp := make([]*eth2v1.AttesterDuty, 0, len(vidxs)) + for _, vidx := range vidxs { + val, ok := valSet[vidx] + if !ok { + continue + } + + resp = append(resp, ð2v1.AttesterDuty{ + PubKey: val.Validator.PublicKey, + ValidatorIndex: vidx, + }) + } + + return resp, nil + } + }, + callCache: func(c *eth2wrap.DutiesCache, ctx context.Context, vidxs []eth2p0.ValidatorIndex) (int, error) { + r, err := c.AttesterDutiesCache(ctx, 0, vidxs) + return len(r.Duties), err + }, + } +} + +func syncCommitteeCacheAdapter() cacheAdapter { + return cacheAdapter{ + installBN: func(m *beaconmock.Mock, record func([]eth2p0.ValidatorIndex), valSet beaconmock.ValidatorSet) { + m.SyncCommitteeDutiesFunc = func(_ context.Context, _ eth2p0.Epoch, vidxs []eth2p0.ValidatorIndex) ([]*eth2v1.SyncCommitteeDuty, error) { + record(vidxs) + + resp := make([]*eth2v1.SyncCommitteeDuty, 0, len(vidxs)) + for _, vidx := range vidxs { + val, ok := valSet[vidx] + if !ok { + continue + } + + resp = append(resp, ð2v1.SyncCommitteeDuty{ + PubKey: val.Validator.PublicKey, + ValidatorIndex: vidx, + }) + } + + return resp, nil + } + }, + callCache: func(c *eth2wrap.DutiesCache, ctx context.Context, vidxs []eth2p0.ValidatorIndex) (int, error) { + r, err := c.SyncCommDutiesCache(ctx, 0, vidxs) + return len(r.Duties), err + }, + } +} + +// cacheHarnessValidators is the validator-set size used by every scenario runner; kept +// as a shared constant so assertions can reference it without plumbing it through. +const cacheHarnessValidators = 8 + +// newCacheHarness builds a DutiesCache wired to a beaconmock whose duty endpoint (as +// installed by the adapter) records every call. It returns the cache, the sorted list +// of all validator indices, and a pointer to the growing slice of recorded BN calls. +func newCacheHarness(t *testing.T, a cacheAdapter) (*eth2wrap.DutiesCache, []eth2p0.ValidatorIndex, *[][]eth2p0.ValidatorIndex) { + t.Helper() + + valSet := testutil.RandomValidatorSet(t, cacheHarnessValidators) + allIdxs := slices.Collect(maps.Keys(valSet)) + slices.Sort(allIdxs) + + eth2Cl, err := beaconmock.New(t.Context(), beaconmock.WithValidatorSet(valSet)) + require.NoError(t, err) + + var bnCalls [][]eth2p0.ValidatorIndex + + a.installBN(ð2Cl, func(vidxs []eth2p0.ValidatorIndex) { + bnCalls = append(bnCalls, slices.Clone(vidxs)) + }, valSet) + + return eth2wrap.NewDutiesCache(eth2Cl, allIdxs), allIdxs, &bnCalls +} + +func sortedIdxs(idxs []eth2p0.ValidatorIndex) []eth2p0.ValidatorIndex { + out := slices.Clone(idxs) + slices.Sort(out) + + return out +} + +// runAllThenAllCached covers: cache empty -> request all validators causes a single BN +// fetch; a second request for all validators is fully served from the cache. +func runAllThenAllCached(t *testing.T, a cacheAdapter) { + t.Helper() + + cache, allIdxs, bnCalls := newCacheHarness(t, a) + ctx := t.Context() + + // Call 1: all validators -> cache miss, BN fetch for all. + count, err := a.callCache(cache, ctx, slices.Clone(allIdxs)) + require.NoError(t, err) + require.Equal(t, cacheHarnessValidators, count) + require.Len(t, *bnCalls, 1) + require.Equal(t, allIdxs, sortedIdxs((*bnCalls)[0])) + + // Call 2: all validators -> fully served from cache, no BN call. + count, err = a.callCache(cache, ctx, slices.Clone(allIdxs)) + require.NoError(t, err) + require.Equal(t, cacheHarnessValidators, count) + require.Len(t, *bnCalls, 1, "cache should serve without hitting the beacon node") +} + +// runSingleThenAllThenCached covers: cache empty -> request one validator X causes a BN +// fetch for [X]; a follow-up request for all validators causes a BN fetch for +// all-except-X; a subsequent request for a now-cached validator Y is served entirely +// from the cache. +func runSingleThenAllThenCached(t *testing.T, a cacheAdapter) { + t.Helper() + + cache, allIdxs, bnCalls := newCacheHarness(t, a) + ctx := t.Context() + + x := allIdxs[0] + y := allIdxs[1] + + // Call 1: validator X only -> BN fetch [X]. + count, err := a.callCache(cache, ctx, []eth2p0.ValidatorIndex{x}) + require.NoError(t, err) + require.Equal(t, 1, count) + require.Len(t, *bnCalls, 1) + require.Equal(t, []eth2p0.ValidatorIndex{x}, (*bnCalls)[0]) + + // Call 2: all validators -> BN fetch for all-except-X; X is served from cache. + count, err = a.callCache(cache, ctx, slices.Clone(allIdxs)) + require.NoError(t, err) + require.Equal(t, cacheHarnessValidators, count) + require.Len(t, *bnCalls, 2) + + expectedSecondCall := slices.DeleteFunc(slices.Clone(allIdxs), func(i eth2p0.ValidatorIndex) bool { + return i == x + }) + require.Equal(t, expectedSecondCall, sortedIdxs((*bnCalls)[1])) + + // Call 3: validator Y only -> fully served from cache, no BN call. + count, err = a.callCache(cache, ctx, []eth2p0.ValidatorIndex{y}) + require.NoError(t, err) + require.Equal(t, 1, count) + require.Len(t, *bnCalls, 2, "Y was populated by the all-validators call; no BN call expected") +} + +// runSingleThenSingleThenAll covers: cache empty -> request one validator X causes a BN +// fetch for [X]; a request for a different validator Y (not yet cached) causes a BN +// fetch for [Y]; a subsequent request for all validators causes a BN fetch for +// all-except-{X,Y} while X and Y are served from the cache. +func runSingleThenSingleThenAll(t *testing.T, a cacheAdapter) { + t.Helper() + + cache, allIdxs, bnCalls := newCacheHarness(t, a) + ctx := t.Context() + + x := allIdxs[0] + y := allIdxs[1] + + // Call 1: validator X -> BN fetch [X]. + count, err := a.callCache(cache, ctx, []eth2p0.ValidatorIndex{x}) + require.NoError(t, err) + require.Equal(t, 1, count) + require.Len(t, *bnCalls, 1) + require.Equal(t, []eth2p0.ValidatorIndex{x}, (*bnCalls)[0]) + + // Call 2: validator Y (not yet cached) -> BN fetch [Y]. + count, err = a.callCache(cache, ctx, []eth2p0.ValidatorIndex{y}) + require.NoError(t, err) + require.Equal(t, 1, count) + require.Len(t, *bnCalls, 2) + require.Equal(t, []eth2p0.ValidatorIndex{y}, (*bnCalls)[1]) + + // Call 3: all validators -> BN fetch for all-except-{X,Y}; X and Y come from cache. + count, err = a.callCache(cache, ctx, slices.Clone(allIdxs)) + require.NoError(t, err) + require.Equal(t, cacheHarnessValidators, count) + require.Len(t, *bnCalls, 3) + + expectedThirdCall := slices.DeleteFunc(slices.Clone(allIdxs), func(i eth2p0.ValidatorIndex) bool { + return i == x || i == y + }) + require.Equal(t, expectedThirdCall, sortedIdxs((*bnCalls)[2])) +} + +// runCallerSliceNotMutated covers the regression that used to make the cache mutate the +// caller's vidxs via slices.DeleteFunc. Exercises both the non-empty vidxs path and the +// len(vidxs)==0 "all active" path, across a cold miss and a subsequent call that needs to +// fetch missing indices (so requestVidxs is narrowed internally). +func runCallerSliceNotMutated(t *testing.T, a cacheAdapter) { + t.Helper() + + cache, allIdxs, _ := newCacheHarness(t, a) + ctx := t.Context() + + // Non-empty vidxs path: pre-seed the cache with one index, then request a subset + // that overlaps (forces the internal "narrow to missing" branch). + _, err := a.callCache(cache, ctx, []eth2p0.ValidatorIndex{allIdxs[0]}) + require.NoError(t, err) + + subset := []eth2p0.ValidatorIndex{allIdxs[0], allIdxs[1], allIdxs[2]} + original := slices.Clone(subset) + _, err = a.callCache(cache, ctx, subset) + require.NoError(t, err) + require.Equal(t, original, subset, "caller's vidxs must not be mutated") + + // Empty vidxs path: the cache expands to allActive internally. Pass nil and an empty + // slice (both are valid "all validators" inputs) and assert they are unchanged. + var nilVidxs []eth2p0.ValidatorIndex + + _, err = a.callCache(cache, ctx, nilVidxs) + require.NoError(t, err) + require.Nil(t, nilVidxs, "nil vidxs must remain nil") + + emptyVidxs := []eth2p0.ValidatorIndex{} + _, err = a.callCache(cache, ctx, emptyVidxs) + require.NoError(t, err) + require.Empty(t, emptyVidxs, "empty vidxs must remain empty") +} + +func TestProposerDutiesCache_AllValidators(t *testing.T) { + runAllThenAllCached(t, proposerCacheAdapter()) +} + +func TestProposerDutiesCache_SingleThenAllThenCached(t *testing.T) { + runSingleThenAllThenCached(t, proposerCacheAdapter()) +} + +func TestProposerDutiesCache_SingleThenSingleThenAll(t *testing.T) { + runSingleThenSingleThenAll(t, proposerCacheAdapter()) +} + +func TestAttesterDutiesCache_AllValidators(t *testing.T) { + runAllThenAllCached(t, attesterCacheAdapter()) +} + +func TestAttesterDutiesCache_SingleThenAllThenCached(t *testing.T) { + runSingleThenAllThenCached(t, attesterCacheAdapter()) +} + +func TestAttesterDutiesCache_SingleThenSingleThenAll(t *testing.T) { + runSingleThenSingleThenAll(t, attesterCacheAdapter()) +} + +func TestSyncCommDutiesCache_AllValidators(t *testing.T) { + runAllThenAllCached(t, syncCommitteeCacheAdapter()) +} + +func TestSyncCommDutiesCache_SingleThenAllThenCached(t *testing.T) { + runSingleThenAllThenCached(t, syncCommitteeCacheAdapter()) +} + +func TestSyncCommDutiesCache_SingleThenSingleThenAll(t *testing.T) { + runSingleThenSingleThenAll(t, syncCommitteeCacheAdapter()) +} + +func TestProposerDutiesCache_CallerSliceNotMutated(t *testing.T) { + runCallerSliceNotMutated(t, proposerCacheAdapter()) +} + +func TestAttesterDutiesCache_CallerSliceNotMutated(t *testing.T) { + runCallerSliceNotMutated(t, attesterCacheAdapter()) +} + +func TestSyncCommDutiesCache_CallerSliceNotMutated(t *testing.T) { + runCallerSliceNotMutated(t, syncCommitteeCacheAdapter()) +} diff --git a/app/version/version.go b/app/version/version.go index e103590e81..9c799b8305 100644 --- a/app/version/version.go +++ b/app/version/version.go @@ -15,7 +15,7 @@ import ( ) // version a string since it is overwritten at build-time with the git tag for official releases. -var version = "v1.9" +var version = "v1.9-rc" // Version is the branch version of the codebase. // - Main branch: v0.X-dev