Skip to content

Commit 8cedf9a

Browse files
committed
mmaprototype: move secondary load to mmaload
1 parent 85524a1 commit 8cedf9a

File tree

8 files changed

+81
-72
lines changed

8 files changed

+81
-72
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ type ReplicaChange struct {
175175
// The load this change adds to a store. The values will be negative if the
176176
// load is being removed.
177177
loadDelta mmaload.LoadVector
178-
secondaryLoadDelta SecondaryLoadVector
178+
secondaryLoadDelta mmaload.SecondaryLoadVector
179179

180180
// target is the target {store,node} for the change.
181181
target roachpb.ReplicationTarget
@@ -328,8 +328,8 @@ func MakeLeaseTransferChanges(
328328
}
329329
removeLease.next.IsLeaseholder = false
330330
addLease.next.IsLeaseholder = true
331-
removeLease.secondaryLoadDelta[LeaseCount] = -1
332-
addLease.secondaryLoadDelta[LeaseCount] = 1
331+
removeLease.secondaryLoadDelta[mmaload.LeaseCount] = -1
332+
addLease.secondaryLoadDelta[mmaload.LeaseCount] = 1
333333

334334
// Only account for the leaseholder CPU, all other primary load dimensions
335335
// are ignored. Byte size and write bytes are not impacted by having a range
@@ -365,7 +365,7 @@ func MakeAddReplicaChange(
365365
addReplica.next.ReplicaID = unknownReplicaID
366366
addReplica.loadDelta.Add(loadVectorToAdd(rLoad.Load))
367367
if replicaIDAndType.IsLeaseholder {
368-
addReplica.secondaryLoadDelta[LeaseCount] = 1
368+
addReplica.secondaryLoadDelta[mmaload.LeaseCount] = 1
369369
} else {
370370
// Set the load delta for CPU to be just the raft CPU. The non-raft CPU we
371371
// assume is associated with the lease.
@@ -393,7 +393,7 @@ func MakeRemoveReplicaChange(
393393
}
394394
removeReplica.loadDelta.Subtract(rLoad.Load)
395395
if replicaState.IsLeaseholder {
396-
removeReplica.secondaryLoadDelta[LeaseCount] = -1
396+
removeReplica.secondaryLoadDelta[mmaload.LeaseCount] = -1
397397
} else {
398398
// Set the load delta for CPU to be just the raft CPU. The non-raft CPU is
399399
// associated with the lease.
@@ -421,10 +421,10 @@ func MakeReplicaTypeChange(
421421
replicaChangeType: ChangeReplica,
422422
}
423423
if next.IsLeaseholder {
424-
change.secondaryLoadDelta[LeaseCount] = 1
424+
change.secondaryLoadDelta[mmaload.LeaseCount] = 1
425425
change.loadDelta[mmaload.CPURate] = loadToAdd(rLoad.Load[mmaload.CPURate] - rLoad.RaftCPU)
426426
} else if prev.IsLeaseholder {
427-
change.secondaryLoadDelta[LeaseCount] = -1
427+
change.secondaryLoadDelta[mmaload.LeaseCount] = -1
428428
change.loadDelta[mmaload.CPURate] = rLoad.RaftCPU - rLoad.Load[mmaload.CPURate]
429429
}
430430
return change
@@ -1786,9 +1786,9 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
17861786
// adjusted load being negative is very low.
17871787
adjustedStoreLoadValue := max(0, ss.adjusted.load[topk.dim])
17881788
threshold := mmaload.LoadValue(float64(adjustedStoreLoadValue) * fraction)
1789-
if ss.reportedSecondaryLoad[ReplicaCount] > 0 {
1789+
if ss.reportedSecondaryLoad[mmaload.ReplicaCount] > 0 {
17901790
// Allow all ranges above 90% of the mean. This is quite arbitrary.
1791-
meanLoad := (adjustedStoreLoadValue * 9) / (ss.reportedSecondaryLoad[ReplicaCount] * 10)
1791+
meanLoad := (adjustedStoreLoadValue * 9) / (ss.reportedSecondaryLoad[mmaload.ReplicaCount] * 10)
17921792
threshold = min(meanLoad, threshold)
17931793
}
17941794
topk.threshold = threshold
@@ -2219,7 +2219,7 @@ func (cs *clusterState) undoReplicaChange(change ReplicaChange) {
22192219
func (cs *clusterState) applyChangeLoadDelta(change ReplicaChange) {
22202220
ss := cs.stores[change.target.StoreID]
22212221
ss.adjusted.load.Add(change.loadDelta)
2222-
ss.adjusted.secondaryLoad.add(change.secondaryLoadDelta)
2222+
ss.adjusted.secondaryLoad.Add(change.secondaryLoadDelta)
22232223
ss.loadSeqNum++
22242224
ss.computeMaxFractionPending()
22252225
cs.nodes[ss.NodeID].adjustedCPU += change.loadDelta[mmaload.CPURate]
@@ -2230,7 +2230,7 @@ func (cs *clusterState) applyChangeLoadDelta(change ReplicaChange) {
22302230
func (cs *clusterState) undoChangeLoadDelta(change ReplicaChange) {
22312231
ss := cs.stores[change.target.StoreID]
22322232
ss.adjusted.load.Subtract(change.loadDelta)
2233-
ss.adjusted.secondaryLoad.subtract(change.secondaryLoadDelta)
2233+
ss.adjusted.secondaryLoad.Subtract(change.secondaryLoadDelta)
22342234
ss.loadSeqNum++
22352235
ss.computeMaxFractionPending()
22362236
cs.nodes[ss.NodeID].adjustedCPU -= change.loadDelta[mmaload.CPURate]

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ func parseLoadVector(t *testing.T, in string) mmaload.LoadVector {
5858
return vec
5959
}
6060

61-
func parseSecondaryLoadVector(t *testing.T, in string) SecondaryLoadVector {
62-
var vec SecondaryLoadVector
61+
func parseSecondaryLoadVector(t *testing.T, in string) mmaload.SecondaryLoadVector {
62+
var vec mmaload.SecondaryLoadVector
6363
parts := strings.Split(stripBrackets(t, in), ",")
64-
require.LessOrEqual(t, len(parts), int(NumSecondaryLoadDimensions))
64+
require.LessOrEqual(t, len(parts), int(mmaload.NumSecondaryLoadDimensions))
6565
for dim := range parts {
6666
vec[dim] = mmaload.LoadValue(parseInt(t, parts[dim]))
6767
}

pkg/kv/kvserver/allocator/mmaprototype/load.go

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,57 +26,6 @@ const (
2626
UnknownCapacity mmaload.LoadValue = math.MaxInt64
2727
)
2828

29-
// SecondaryLoadDimension represents secondary load dimensions that should be
30-
// considered after we are done rebalancing using loadDimensions, since these
31-
// don't represent "real" resources. Currently, only lease and replica counts
32-
// are considered here. Lease rebalancing will see if there is scope to move
33-
// some leases between stores that do not have any pending changes and are not
34-
// overloaded (and will not get overloaded by the movement). This will happen
35-
// in a separate pass (i.e., not in clusterState.rebalanceStores) -- the
36-
// current plan is to continue using the leaseQueue and call from it into MMA.
37-
//
38-
// Note that lease rebalancing will only move leases and not replicas. Also,
39-
// the rebalancing will take into account the lease preferences, as discussed
40-
// in https://github.com/cockroachdb/cockroach/issues/93258, and the lease
41-
// counts among the current candidates (see
42-
// https://github.com/cockroachdb/cockroach/pull/98893).
43-
//
44-
// To use MMA for replica count rebalancing, done by the replicateQueue, we
45-
// also have a ReplicaCount load dimension.
46-
//
47-
// These are currently unused, since the initial integration of MMA is to
48-
// replace load-based rebalancing performed by the StoreRebalancer.
49-
type SecondaryLoadDimension uint8
50-
51-
const (
52-
LeaseCount SecondaryLoadDimension = iota
53-
ReplicaCount
54-
NumSecondaryLoadDimensions
55-
)
56-
57-
type SecondaryLoadVector [NumSecondaryLoadDimensions]mmaload.LoadValue
58-
59-
func (lv *SecondaryLoadVector) add(other SecondaryLoadVector) {
60-
for i := range other {
61-
(*lv)[i] += other[i]
62-
}
63-
}
64-
65-
func (lv *SecondaryLoadVector) subtract(other SecondaryLoadVector) {
66-
for i := range other {
67-
(*lv)[i] -= other[i]
68-
}
69-
}
70-
71-
func (lv SecondaryLoadVector) String() string {
72-
return redact.StringWithoutMarkers(lv)
73-
}
74-
75-
// SafeFormat implements the redact.SafeFormatter interface.
76-
func (lv SecondaryLoadVector) SafeFormat(w redact.SafePrinter, _ rune) {
77-
w.Printf("[lease:%d, replica:%d]", lv[LeaseCount], lv[ReplicaCount])
78-
}
79-
8029
type RangeLoad struct {
8130
Load mmaload.LoadVector
8231
// Nanos per second. RaftCPU <= Load[cpu]. Handling this as a special case,
@@ -111,7 +60,7 @@ type storeLoad struct {
11160
// provisioned disk bandwidth in the near future.
11261
capacity mmaload.LoadVector
11362

114-
reportedSecondaryLoad SecondaryLoadVector
63+
reportedSecondaryLoad mmaload.SecondaryLoadVector
11564
}
11665

11766
// NodeLoad is the load information for a node.
@@ -130,7 +79,7 @@ type meanStoreLoad struct {
13079
// Util is 0 for CPURate, WriteBandwidth. Non-zero for ByteSize.
13180
util [mmaload.NumLoadDimensions]float64
13281

133-
secondaryLoad SecondaryLoadVector
82+
secondaryLoad mmaload.SecondaryLoadVector
13483
}
13584

13685
// The mean node load for a set of NodeLoad.

pkg/kv/kvserver/allocator/mmaprototype/load_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestMeansMemo(t *testing.T) {
8989
reportedLoad: mmaload.LoadVector{mmaload.LoadValue(cpuLoad), mmaload.LoadValue(wbLoad), mmaload.LoadValue(bsLoad)},
9090
capacity: mmaload.LoadVector{
9191
mmaload.LoadValue(cpuCapacity), mmaload.LoadValue(wbCapacity), mmaload.LoadValue(bsCapacity)},
92-
reportedSecondaryLoad: SecondaryLoadVector{leaseCountLoad},
92+
reportedSecondaryLoad: mmaload.SecondaryLoadVector{leaseCountLoad},
9393
}
9494
for i := range sLoad.capacity {
9595
if sLoad.capacity[i] < 0 {

pkg/kv/kvserver/allocator/mmaprototype/messages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type StoreLoadMsg struct {
2525
// derive a node level capacity, and then dividing that by the number of
2626
// stores.
2727
Capacity mmaload.LoadVector
28-
SecondaryLoad SecondaryLoadVector
28+
SecondaryLoad mmaload.SecondaryLoadVector
2929

3030
LoadTime time.Time
3131
}

pkg/kv/kvserver/allocator/mmaprototype/mmaload/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
33
go_library(
44
name = "mmaload",
55
srcs = [
6+
"secondary.go",
67
"signed.go",
78
"unsigned.go",
89
],
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package mmaload
7+
8+
import "github.com/cockroachdb/redact"
9+
10+
// SecondaryLoadDimension represents secondary load dimensions that should be
11+
// considered after we are done rebalancing using loadDimensions, since these
12+
// don't represent "real" resources. Currently, only lease and replica counts
13+
// are considered here. Lease rebalancing will see if there is scope to move
14+
// some leases between stores that do not have any pending changes and are not
15+
// overloaded (and will not get overloaded by the movement). This will happen
16+
// in a separate pass (i.e., not in clusterState.rebalanceStores) -- the
17+
// current plan is to continue using the leaseQueue and call from it into MMA.
18+
//
19+
// Note that lease rebalancing will only move leases and not replicas. Also,
20+
// the rebalancing will take into account the lease preferences, as discussed
21+
// in https://github.com/cockroachdb/cockroach/issues/93258, and the lease
22+
// counts among the current candidates (see
23+
// https://github.com/cockroachdb/cockroach/pull/98893).
24+
//
25+
// To use MMA for replica count rebalancing, done by the replicateQueue, we
26+
// also have a ReplicaCount load dimension.
27+
//
28+
// These are currently unused, since the initial integration of MMA is to
29+
// replace load-based rebalancing performed by the StoreRebalancer.
30+
type SecondaryLoadDimension uint8
31+
32+
const (
33+
LeaseCount SecondaryLoadDimension = iota
34+
ReplicaCount
35+
NumSecondaryLoadDimensions
36+
)
37+
38+
type SecondaryLoadVector [NumSecondaryLoadDimensions]LoadValue
39+
40+
func (lv *SecondaryLoadVector) Add(other SecondaryLoadVector) {
41+
for i := range other {
42+
(*lv)[i] += other[i]
43+
}
44+
}
45+
46+
func (lv *SecondaryLoadVector) Subtract(other SecondaryLoadVector) {
47+
for i := range other {
48+
(*lv)[i] -= other[i]
49+
}
50+
}
51+
52+
func (lv SecondaryLoadVector) String() string {
53+
return redact.StringWithoutMarkers(lv)
54+
}
55+
56+
// SafeFormat implements the redact.SafeFormatter interface.
57+
func (lv SecondaryLoadVector) SafeFormat(w redact.SafePrinter, _ rune) {
58+
w.Printf("[lease:%d, replica:%d]", lv[LeaseCount], lv[ReplicaCount])
59+
}

pkg/kv/kvserver/mmaintegration/store_load_msg.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ func MakeStoreLoadMsg(
118118
// which is desirably pessimistic.
119119
capacity[mmaload.ByteSize] = mmaload.LoadValue(desc.Capacity.Available)
120120
}
121-
var secondaryLoad mmaprototype.SecondaryLoadVector
122-
secondaryLoad[mmaprototype.LeaseCount] = mmaload.LoadValue(desc.Capacity.LeaseCount)
123-
secondaryLoad[mmaprototype.ReplicaCount] = mmaload.LoadValue(desc.Capacity.RangeCount)
121+
var secondaryLoad mmaload.SecondaryLoadVector
122+
secondaryLoad[mmaload.LeaseCount] = mmaload.LoadValue(desc.Capacity.LeaseCount)
123+
secondaryLoad[mmaload.ReplicaCount] = mmaload.LoadValue(desc.Capacity.RangeCount)
124124
// TODO(tbg): this triggers early in tests, probably we're making load messages
125125
// before having received the first capacity. Still, this is bad, should fix.
126126
// or handle properly by communicating an unknown capacity.

0 commit comments

Comments
 (0)