Skip to content

Commit 85524a1

Browse files
committed
mmaprototype: introduce mmaload package
I want to avoid footguns related to negative load values. Having a package for the load primitives allows hiding the naked slice, so that use from the mmaprototype package can be enforced to be automatically correct. This first commit creates the package and moves the LoadValue LoadVector types. It also creates a SignledLoad{Value,Vector} type that will be adopted as representation for the adjusted load (which can become negative). Epic: CRDB-55052
1 parent 60165a6 commit 85524a1

22 files changed

+281
-215
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1495,6 +1495,7 @@ GO_TARGETS = [
14951495
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
14961496
"//pkg/kv/kvserver/allocator/load:load",
14971497
"//pkg/kv/kvserver/allocator/load:load_test",
1498+
"//pkg/kv/kvserver/allocator/mmaprototype/mmaload:mmaload",
14981499
"//pkg/kv/kvserver/allocator/mmaprototype:mmaprototype",
14991500
"//pkg/kv/kvserver/allocator/mmaprototype:mmaprototype_test",
15001501
"//pkg/kv/kvserver/allocator/plan:plan",

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ go_library(
132132
"//pkg/kv/kvserver/allocator/allocatorimpl",
133133
"//pkg/kv/kvserver/allocator/load",
134134
"//pkg/kv/kvserver/allocator/mmaprototype",
135+
"//pkg/kv/kvserver/allocator/mmaprototype/mmaload",
135136
"//pkg/kv/kvserver/allocator/plan",
136137
"//pkg/kv/kvserver/allocator/storepool",
137138
"//pkg/kv/kvserver/apply",

pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ go_library(
2323
visibility = ["//visibility:public"],
2424
deps = [
2525
"//pkg/kv/kvpb",
26+
"//pkg/kv/kvserver/allocator/mmaprototype/mmaload",
2627
"//pkg/roachpb",
2728
"//pkg/util/log",
2829
"//pkg/util/metric",
@@ -48,6 +49,7 @@ go_test(
4849
data = glob(["testdata/**"]),
4950
embed = [":mmaprototype"],
5051
deps = [
52+
"//pkg/kv/kvserver/allocator/mmaprototype/mmaload",
5153
"//pkg/roachpb",
5254
"//pkg/spanconfig/spanconfigtestutils",
5355
"//pkg/testutils/datapathutils",

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"sync"
1717
"time"
1818

19+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype/mmaload"
1920
"github.com/cockroachdb/cockroach/pkg/roachpb"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
2122
"github.com/cockroachdb/cockroach/pkg/util/metric"
@@ -492,7 +493,7 @@ func sortTargetCandidateSetAndPick(
492493
cands candidateSet,
493494
loadThreshold loadSummary,
494495
ignoreLevel ignoreLevel,
495-
overloadedDim LoadDimension,
496+
overloadedDim mmaload.LoadDimension,
496497
rng *rand.Rand,
497498
) roachpb.StoreID {
498499
var b strings.Builder
@@ -570,7 +571,7 @@ func sortTargetCandidateSetAndPick(
570571
if j == 0 {
571572
// This is the lowestLoad set being considered now.
572573
lowestLoadSet = cand.sls
573-
} else if ignoreLevel < ignoreHigherThanLoadThreshold || overloadedDim == NumLoadDimensions {
574+
} else if ignoreLevel < ignoreHigherThanLoadThreshold || overloadedDim == mmaload.NumLoadDimensions {
574575
// Past the lowestLoad set. We don't care about these.
575576
break
576577
}
@@ -583,7 +584,7 @@ func sortTargetCandidateSetAndPick(
583584
}
584585
candDiscardedByNLS := cand.nls > loadThreshold ||
585586
(cand.nls == loadThreshold && ignoreLevel < ignoreHigherThanLoadThreshold)
586-
candDiscardedByOverloadDim := overloadedDim != NumLoadDimensions &&
587+
candDiscardedByOverloadDim := overloadedDim != mmaload.NumLoadDimensions &&
587588
cand.dimSummary[overloadedDim] >= loadNoChange
588589
if candDiscardedByNLS || candDiscardedByOverloadDim ||
589590
cand.maxFractionPendingIncrease >= maxFractionPendingThreshold {
@@ -664,7 +665,7 @@ func sortTargetCandidateSetAndPick(
664665
return 0
665666
}
666667
cands.candidates = cands.candidates[:j]
667-
if lowestLoadSet != highestLoadSet || (lowestLoadSet >= loadNoChange && overloadedDim != NumLoadDimensions) {
668+
if lowestLoadSet != highestLoadSet || (lowestLoadSet >= loadNoChange && overloadedDim != mmaload.NumLoadDimensions) {
668669
// Sort candidates from lowest to highest along overloaded dimension. We
669670
// limit when we do this, since this will further restrict the pool of
670671
// candidates and in general we don't want to restrict the pool.

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
18+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype/mmaload"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/util/log"
2021
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -173,7 +174,7 @@ func (s ReplicaChangeType) String() string {
173174
type ReplicaChange struct {
174175
// The load this change adds to a store. The values will be negative if the
175176
// load is being removed.
176-
loadDelta LoadVector
177+
loadDelta mmaload.LoadVector
177178
secondaryLoadDelta SecondaryLoadVector
178179

179180
// target is the target {store,node} for the change.
@@ -333,9 +334,9 @@ func MakeLeaseTransferChanges(
333334
// Only account for the leaseholder CPU, all other primary load dimensions
334335
// are ignored. Byte size and write bytes are not impacted by having a range
335336
// lease.
336-
nonRaftCPU := rLoad.Load[CPURate] - rLoad.RaftCPU
337-
removeLease.loadDelta[CPURate] = -nonRaftCPU
338-
addLease.loadDelta[CPURate] = loadToAdd(nonRaftCPU)
337+
nonRaftCPU := rLoad.Load[mmaload.CPURate] - rLoad.RaftCPU
338+
removeLease.loadDelta[mmaload.CPURate] = -nonRaftCPU
339+
addLease.loadDelta[mmaload.CPURate] = loadToAdd(nonRaftCPU)
339340
return [2]ReplicaChange{removeLease, addLease}
340341
}
341342

@@ -362,13 +363,13 @@ func MakeAddReplicaChange(
362363
replicaChangeType: AddReplica,
363364
}
364365
addReplica.next.ReplicaID = unknownReplicaID
365-
addReplica.loadDelta.add(loadVectorToAdd(rLoad.Load))
366+
addReplica.loadDelta.Add(loadVectorToAdd(rLoad.Load))
366367
if replicaIDAndType.IsLeaseholder {
367368
addReplica.secondaryLoadDelta[LeaseCount] = 1
368369
} else {
369370
// Set the load delta for CPU to be just the raft CPU. The non-raft CPU we
370371
// assume is associated with the lease.
371-
addReplica.loadDelta[CPURate] = loadToAdd(rLoad.RaftCPU)
372+
addReplica.loadDelta[mmaload.CPURate] = loadToAdd(rLoad.RaftCPU)
372373
}
373374
return addReplica
374375
}
@@ -390,13 +391,13 @@ func MakeRemoveReplicaChange(
390391
},
391392
replicaChangeType: RemoveReplica,
392393
}
393-
removeReplica.loadDelta.subtract(rLoad.Load)
394+
removeReplica.loadDelta.Subtract(rLoad.Load)
394395
if replicaState.IsLeaseholder {
395396
removeReplica.secondaryLoadDelta[LeaseCount] = -1
396397
} else {
397398
// Set the load delta for CPU to be just the raft CPU. The non-raft CPU is
398399
// associated with the lease.
399-
removeReplica.loadDelta[CPURate] = -rLoad.RaftCPU
400+
removeReplica.loadDelta[mmaload.CPURate] = -rLoad.RaftCPU
400401
}
401402
return removeReplica
402403
}
@@ -421,10 +422,10 @@ func MakeReplicaTypeChange(
421422
}
422423
if next.IsLeaseholder {
423424
change.secondaryLoadDelta[LeaseCount] = 1
424-
change.loadDelta[CPURate] = loadToAdd(rLoad.Load[CPURate] - rLoad.RaftCPU)
425+
change.loadDelta[mmaload.CPURate] = loadToAdd(rLoad.Load[mmaload.CPURate] - rLoad.RaftCPU)
425426
} else if prev.IsLeaseholder {
426427
change.secondaryLoadDelta[LeaseCount] = -1
427-
change.loadDelta[CPURate] = rLoad.RaftCPU - rLoad.Load[CPURate]
428+
change.loadDelta[mmaload.CPURate] = rLoad.RaftCPU - rLoad.Load[mmaload.CPURate]
428429
}
429430
return change
430431
}
@@ -775,7 +776,7 @@ type storeState struct {
775776
// NB: these load values can become negative due to applying pending
776777
// changes. We need to let them be negative to retain the ability to undo
777778
// pending changes.
778-
load LoadVector
779+
load mmaload.LoadVector
779780
secondaryLoad SecondaryLoadVector
780781
// Pending changes for computing loadReplicas and load.
781782
// Added to at the same time as clusterState.pendingChanges.
@@ -977,7 +978,7 @@ type nodeState struct {
977978
stores []roachpb.StoreID
978979
NodeLoad
979980
// NB: adjustedCPU can be negative.
980-
adjustedCPU LoadValue
981+
adjustedCPU mmaload.LoadValue
981982
}
982983

983984
func newNodeState(nodeID roachpb.NodeID) *nodeState {
@@ -1400,11 +1401,11 @@ func (cs *clusterState) processStoreLoadMsg(ctx context.Context, storeMsg *Store
14001401
if ss == nil {
14011402
panic(fmt.Sprintf("store %d not found", storeMsg.StoreID))
14021403
}
1403-
ns.ReportedCPU += storeMsg.Load[CPURate] - ss.reportedLoad[CPURate]
1404-
ns.CapacityCPU += storeMsg.Capacity[CPURate] - ss.capacity[CPURate]
1404+
ns.ReportedCPU += storeMsg.Load[mmaload.CPURate] - ss.reportedLoad[mmaload.CPURate]
1405+
ns.CapacityCPU += storeMsg.Capacity[mmaload.CPURate] - ss.capacity[mmaload.CPURate]
14051406
// Undo the adjustment for the store. We will apply the adjustment again
14061407
// below.
1407-
ns.adjustedCPU += storeMsg.Load[CPURate] - ss.adjusted.load[CPURate]
1408+
ns.adjustedCPU += storeMsg.Load[mmaload.CPURate] - ss.adjusted.load[mmaload.CPURate]
14081409

14091410
// The store's load sequence number is incremented on each load change. The
14101411
// store's load is updated below.
@@ -1702,18 +1703,18 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
17021703
topk.startInit()
17031704
sls := cs.computeLoadSummary(ctx, ss.StoreID, &clusterMeans.storeLoad, &clusterMeans.nodeLoad)
17041705
if ss.StoreID == localss.StoreID {
1705-
topk.dim = CPURate
1706+
topk.dim = mmaload.CPURate
17061707
} else {
1707-
topk.dim = WriteBandwidth
1708+
topk.dim = mmaload.WriteBandwidth
17081709
}
17091710
if sls.highDiskSpaceUtilization {
1710-
topk.dim = ByteSize
1711+
topk.dim = mmaload.ByteSize
17111712
} else if sls.sls > loadNoChange {
17121713
// If multiple dimensions are contributing the same loadSummary, we will pick
17131714
// CPURate before WriteBandwidth before ByteSize.
17141715
for i := range sls.dimSummary {
17151716
if sls.dimSummary[i] == sls.sls {
1716-
topk.dim = LoadDimension(i)
1717+
topk.dim = mmaload.LoadDimension(i)
17171718
break
17181719
}
17191720
}
@@ -1775,7 +1776,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
17751776
minReplicaLoadFraction = 0.02
17761777
)
17771778
fraction := minReplicaLoadFraction
1778-
if ss.StoreID == msg.StoreID && topk.dim == CPURate {
1779+
if ss.StoreID == msg.StoreID && topk.dim == mmaload.CPURate {
17791780
// We are assuming we will be able to shed leases, but if we can't we
17801781
// will start shedding replicas, so this is just a heuristic.
17811782
fraction = minLeaseLoadFraction
@@ -1784,7 +1785,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
17841785
// Given that this is a most overloaded dim, the likelihood of the
17851786
// adjusted load being negative is very low.
17861787
adjustedStoreLoadValue := max(0, ss.adjusted.load[topk.dim])
1787-
threshold := LoadValue(float64(adjustedStoreLoadValue) * fraction)
1788+
threshold := mmaload.LoadValue(float64(adjustedStoreLoadValue) * fraction)
17881789
if ss.reportedSecondaryLoad[ReplicaCount] > 0 {
17891790
// Allow all ranges above 90% of the mean. This is quite arbitrary.
17901791
meanLoad := (adjustedStoreLoadValue * 9) / (ss.reportedSecondaryLoad[ReplicaCount] * 10)
@@ -1818,16 +1819,16 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
18181819
ss := cs.stores[replica.StoreID]
18191820
topk := ss.adjusted.topKRanges[msg.StoreID]
18201821
switch topk.dim {
1821-
case CPURate:
1822-
l := rs.load.Load[CPURate]
1822+
case mmaload.CPURate:
1823+
l := rs.load.Load[mmaload.CPURate]
18231824
if !replica.ReplicaState.IsLeaseholder {
18241825
l = rs.load.RaftCPU
18251826
}
18261827
topk.addReplica(ctx, rangeID, l, replica.StoreID, msg.StoreID)
1827-
case WriteBandwidth:
1828-
topk.addReplica(ctx, rangeID, rs.load.Load[WriteBandwidth], replica.StoreID, msg.StoreID)
1829-
case ByteSize:
1830-
topk.addReplica(ctx, rangeID, rs.load.Load[ByteSize], replica.StoreID, msg.StoreID)
1828+
case mmaload.WriteBandwidth:
1829+
topk.addReplica(ctx, rangeID, rs.load.Load[mmaload.WriteBandwidth], replica.StoreID, msg.StoreID)
1830+
case mmaload.ByteSize:
1831+
topk.addReplica(ctx, rangeID, rs.load.Load[mmaload.ByteSize], replica.StoreID, msg.StoreID)
18311832
}
18321833
}
18331834
}
@@ -2217,22 +2218,22 @@ func (cs *clusterState) undoReplicaChange(change ReplicaChange) {
22172218
// store and node affected.
22182219
func (cs *clusterState) applyChangeLoadDelta(change ReplicaChange) {
22192220
ss := cs.stores[change.target.StoreID]
2220-
ss.adjusted.load.add(change.loadDelta)
2221+
ss.adjusted.load.Add(change.loadDelta)
22212222
ss.adjusted.secondaryLoad.add(change.secondaryLoadDelta)
22222223
ss.loadSeqNum++
22232224
ss.computeMaxFractionPending()
2224-
cs.nodes[ss.NodeID].adjustedCPU += change.loadDelta[CPURate]
2225+
cs.nodes[ss.NodeID].adjustedCPU += change.loadDelta[mmaload.CPURate]
22252226
}
22262227

22272228
// undoChangeLoadDelta subtracts the change load delta from the adjusted load
22282229
// of the store and node affected.
22292230
func (cs *clusterState) undoChangeLoadDelta(change ReplicaChange) {
22302231
ss := cs.stores[change.target.StoreID]
2231-
ss.adjusted.load.subtract(change.loadDelta)
2232+
ss.adjusted.load.Subtract(change.loadDelta)
22322233
ss.adjusted.secondaryLoad.subtract(change.secondaryLoadDelta)
22332234
ss.loadSeqNum++
22342235
ss.computeMaxFractionPending()
2235-
cs.nodes[ss.NodeID].adjustedCPU -= change.loadDelta[CPURate]
2236+
cs.nodes[ss.NodeID].adjustedCPU -= change.loadDelta[mmaload.CPURate]
22362237
}
22372238

22382239
// setStore updates the store attributes and locality in the cluster state. If
@@ -2303,12 +2304,12 @@ func (cs *clusterState) canShedAndAddLoad(
23032304
ctx context.Context,
23042305
srcSS *storeState,
23052306
targetSS *storeState,
2306-
delta LoadVector,
2307+
delta mmaload.LoadVector,
23072308
means *meansLoad,
23082309
onlyConsiderTargetCPUSummary bool,
2309-
overloadedDim LoadDimension,
2310+
overloadedDim mmaload.LoadDimension,
23102311
) (canAddLoad bool) {
2311-
if overloadedDim == NumLoadDimensions {
2312+
if overloadedDim == mmaload.NumLoadDimensions {
23122313
panic("overloadedDim must not be NumLoadDimensions")
23132314
}
23142315
// TODO(tbg): in experiments, we often see interesting behavior right when
@@ -2318,26 +2319,26 @@ func (cs *clusterState) canShedAndAddLoad(
23182319
targetNS := cs.nodes[targetSS.NodeID]
23192320
// Add the delta.
23202321
deltaToAdd := loadVectorToAdd(delta)
2321-
targetSS.adjusted.load.add(deltaToAdd)
2322+
targetSS.adjusted.load.Add(deltaToAdd)
23222323
// TODO(tbg): why does NodeLoad have an adjustedCPU field but not fields for
23232324
// the other load dimensions? We just added deltaToAdd to targetSS.adjusted,
23242325
// shouldn't this be wholly reflected in targetNS as well, not just for CPU?
23252326
// Or maybe CPU is the only dimension that matters at the node level. It feels
23262327
// sloppy/confusing though.
2327-
targetNS.adjustedCPU += deltaToAdd[CPURate]
2328+
targetNS.adjustedCPU += deltaToAdd[mmaload.CPURate]
23282329
targetSLS := computeLoadSummary(ctx, targetSS, targetNS, &means.storeLoad, &means.nodeLoad)
23292330
// Undo the addition.
2330-
targetSS.adjusted.load.subtract(deltaToAdd)
2331-
targetNS.adjustedCPU -= deltaToAdd[CPURate]
2331+
targetSS.adjusted.load.Subtract(deltaToAdd)
2332+
targetNS.adjustedCPU -= deltaToAdd[mmaload.CPURate]
23322333

23332334
// Remove the delta.
23342335
srcNS := cs.nodes[srcSS.NodeID]
2335-
srcSS.adjusted.load.subtract(delta)
2336-
srcNS.adjustedCPU -= delta[CPURate]
2336+
srcSS.adjusted.load.Subtract(delta)
2337+
srcNS.adjustedCPU -= delta[mmaload.CPURate]
23372338
srcSLS := computeLoadSummary(ctx, srcSS, srcNS, &means.storeLoad, &means.nodeLoad)
23382339
// Undo the removal.
2339-
srcSS.adjusted.load.add(delta)
2340-
srcNS.adjustedCPU += delta[CPURate]
2340+
srcSS.adjusted.load.Add(delta)
2341+
srcNS.adjustedCPU += delta[mmaload.CPURate]
23412342

23422343
var reason strings.Builder
23432344
defer func() {
@@ -2365,7 +2366,7 @@ func (cs *clusterState) canShedAndAddLoad(
23652366
// loadNoChange, and the source must have been > loadNoChange.
23662367
var targetSummary loadSummary
23672368
if onlyConsiderTargetCPUSummary {
2368-
targetSummary = targetSLS.dimSummary[CPURate]
2369+
targetSummary = targetSLS.dimSummary[mmaload.CPURate]
23692370
if targetSummary < targetSLS.nls {
23702371
targetSummary = targetSLS.nls
23712372
}
@@ -2423,7 +2424,7 @@ func (cs *clusterState) canShedAndAddLoad(
24232424
}
24242425
otherDimensionsBecameWorseInTarget := false
24252426
for i := range targetSLS.dimSummary {
2426-
dim := LoadDimension(i)
2427+
dim := mmaload.LoadDimension(i)
24272428
if dim == overloadedDim {
24282429
continue
24292430
}
@@ -2530,24 +2531,24 @@ func computeLoadSummary(
25302531
) storeLoadSummary {
25312532
sls := loadLow
25322533
var highDiskSpaceUtil bool
2533-
var dimSummary [NumLoadDimensions]loadSummary
2534-
var worstDim LoadDimension
2534+
var dimSummary [mmaload.NumLoadDimensions]loadSummary
2535+
var worstDim mmaload.LoadDimension
25352536
for i := range msl.load {
25362537
const nodeIDForLogging = 0
2537-
ls := loadSummaryForDimension(ctx, ss.StoreID, nodeIDForLogging, LoadDimension(i), ss.adjusted.load[i], ss.capacity[i],
2538+
ls := loadSummaryForDimension(ctx, ss.StoreID, nodeIDForLogging, mmaload.LoadDimension(i), ss.adjusted.load[i], ss.capacity[i],
25382539
msl.load[i], msl.util[i])
25392540
if ls > sls {
25402541
sls = ls
2541-
worstDim = LoadDimension(i)
2542+
worstDim = mmaload.LoadDimension(i)
25422543
}
25432544
dimSummary[i] = ls
2544-
switch LoadDimension(i) {
2545-
case ByteSize:
2545+
switch mmaload.LoadDimension(i) {
2546+
case mmaload.ByteSize:
25462547
highDiskSpaceUtil = highDiskSpaceUtilization(ss.adjusted.load[i], ss.capacity[i])
25472548
}
25482549
}
25492550
const storeIDForLogging = 0
2550-
nls := loadSummaryForDimension(ctx, storeIDForLogging, ns.NodeID, CPURate, ns.adjustedCPU, ns.CapacityCPU, mnl.loadCPU, mnl.utilCPU)
2551+
nls := loadSummaryForDimension(ctx, storeIDForLogging, ns.NodeID, mmaload.CPURate, ns.adjustedCPU, ns.CapacityCPU, mnl.loadCPU, mnl.utilCPU)
25512552
return storeLoadSummary{
25522553
worstDim: worstDim,
25532554
sls: sls,

0 commit comments

Comments
 (0)