Skip to content

Commit 4b04f84

Browse files
committed
mmaprototype: make adjusted load a SignedLoadVector
This required an update to and thus review of all arithmetic involving this type (which was part of the point of doing this). A few TODOs were added, and I took the opportunity to refactor the max pending frac computation, which is now much simpler. I'm tracking writing the unit test in #157757. No AI in this workstream so far. Epic: CRDB-55052.
1 parent 8cedf9a commit 4b04f84

File tree

3 files changed

+123
-41
lines changed

3 files changed

+123
-41
lines changed

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -776,8 +776,8 @@ type storeState struct {
776776
// NB: these load values can become negative due to applying pending
777777
// changes. We need to let them be negative to retain the ability to undo
778778
// pending changes.
779-
load mmaload.LoadVector
780-
secondaryLoad SecondaryLoadVector
779+
load mmaload.SignedLoadVector
780+
secondaryLoad mmaload.SecondaryLoadVector
781781
// Pending changes for computing loadReplicas and load.
782782
// Added to at the same time as clusterState.pendingChanges.
783783
//
@@ -938,32 +938,44 @@ func (ss *storeState) computePendingChangesReflectedInLatestLoad(
938938
}
939939

940940
func (ss *storeState) computeMaxFractionPending() {
941-
fracIncrease := 0.0
942-
fracDecrease := 0.0
943-
for i := range ss.reportedLoad {
944-
if ss.reportedLoad[i] == ss.adjusted.load[i] && ss.reportedLoad[i] == 0 {
945-
// Avoid setting ss.maxFractionPendingIncrease and
946-
// ss.maxFractionPendingDecrease to 1000 when the reported load and
947-
// adjusted load are both 0 since some dimension is expected to have zero
948-
// (e.g. write bandwidth during read-only workloads).
949-
continue
950-
}
951-
if ss.reportedLoad[i] == 0 {
952-
fracIncrease = 1000
953-
fracDecrease = 1000
954-
break
955-
}
956-
f := math.Abs(float64(ss.adjusted.load[i]-ss.reportedLoad[i])) / float64(ss.reportedLoad[i])
957-
if ss.adjusted.load[i] > ss.reportedLoad[i] {
958-
if f > fracIncrease {
959-
fracIncrease = f
960-
}
961-
} else if f > fracDecrease {
962-
fracDecrease = f
941+
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease = computeMaxFractionPending(ss.reportedLoad, ss.adjusted.load)
942+
}
943+
944+
// TODO(tbg): unit test this function.
945+
func computeMaxFractionPending(rep mmaload.LoadVector, adj mmaload.SignedLoadVector) (maxFracInc, maxFracDec float64) {
946+
for i := range rep {
947+
// The fraction pending expresses the absolute difference of the adjusted
948+
// and reported load as a multiple of the reported load. Note that this
949+
// is the case even if the adjusted load is negative: if, say, the adjusted
950+
// load is -50 and the reported load is 100, it is still correct to say that
951+
// a "magnitude 1.5x" change is pending (from 100 to -50).
952+
diff := adj[i].Subtract(rep[i])
953+
954+
switch {
955+
case diff.Abs() == 0:
956+
// Reported and adjusted are equal, so nothing is pending.
957+
// This also handles the case in which both are zero.
958+
// We don't need to update maxFracInc or maxFracDec because
959+
// they started at zero and only go up from there.
960+
case rep[i] == 0:
961+
// The adjusted load is nonzero, but the reported one is zero. We can't
962+
// express the load change as a multiple of zero. Arbitrarily assign large
963+
// value to both increase and decrease, indicating that no more changes
964+
// should be made until either the pending change clears (and we get a
965+
// zero diff above) or we register positive reported load.
966+
maxFracInc = max(maxFracInc, 1000)
967+
maxFracDec = max(maxFracInc, 1000)
968+
case diff.Nonnegative() > 0:
969+
// Vanilla case of adjusted > reported, i.e. we have load incoming.
970+
// We don't need to update maxFracDec.
971+
maxFracInc = max(maxFracInc, math.Abs(float64(diff.Abs())/float64(rep[i])))
972+
default:
973+
// Vanilla case of adjusted < reported, i.e. we have load incoming.
974+
// We don't need to update maxFracInc.
975+
maxFracDec = max(maxFracDec, math.Abs(float64(diff.Abs())/float64(rep[i])))
963976
}
964977
}
965-
ss.maxFractionPendingIncrease = fracIncrease
966-
ss.maxFractionPendingDecrease = fracDecrease
978+
return maxFracInc, maxFracDec
967979
}
968980

969981
func newStoreState() *storeState {
@@ -1401,11 +1413,17 @@ func (cs *clusterState) processStoreLoadMsg(ctx context.Context, storeMsg *Store
14011413
if ss == nil {
14021414
panic(fmt.Sprintf("store %d not found", storeMsg.StoreID))
14031415
}
1416+
// TODO(tbg): review the math here, check for negative values, and explain
1417+
// what's going on.
14041418
ns.ReportedCPU += storeMsg.Load[mmaload.CPURate] - ss.reportedLoad[mmaload.CPURate]
14051419
ns.CapacityCPU += storeMsg.Capacity[mmaload.CPURate] - ss.capacity[mmaload.CPURate]
14061420
// Undo the adjustment for the store. We will apply the adjustment again
14071421
// below.
1408-
ns.adjustedCPU += storeMsg.Load[mmaload.CPURate] - ss.adjusted.load[mmaload.CPURate]
1422+
// TODO(tbg): how do we know ssAdjCPU is positive? We probably don't, and need
1423+
// to handle this case. Revisit and avoid the direct unverified conversion to
1424+
// LoadValue.
1425+
ssAdjCPU, _ := ss.adjusted.load.UnwrapAt(mmaload.CPURate)
1426+
ns.adjustedCPU += storeMsg.Load[mmaload.CPURate] - mmaload.LoadValue(ssAdjCPU)
14091427

14101428
// The store's load sequence number is incremented on each load change. The
14111429
// store's load is updated below.
@@ -1416,7 +1434,8 @@ func (cs *clusterState) processStoreLoadMsg(ctx context.Context, storeMsg *Store
14161434

14171435
// Reset the adjusted load to be the reported load. We will re-apply any
14181436
// remaining pending change deltas to the updated adjusted load.
1419-
ss.adjusted.load = storeMsg.Load
1437+
ss.adjusted.load = mmaload.SignedLoadVector{}
1438+
ss.adjusted.load.Add(storeMsg.Load)
14201439
ss.adjusted.secondaryLoad = storeMsg.SecondaryLoad
14211440
ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease = 0, 0
14221441

@@ -1784,7 +1803,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
17841803
// The max(0, ...) is defensive, in case the adjusted load is negative.
17851804
// Given that this is a most overloaded dim, the likelihood of the
17861805
// adjusted load being negative is very low.
1787-
adjustedStoreLoadValue := max(0, ss.adjusted.load[topk.dim])
1806+
adjustedStoreLoadValue := ss.adjusted.load[topk.dim].Nonnegative()
17881807
threshold := mmaload.LoadValue(float64(adjustedStoreLoadValue) * fraction)
17891808
if ss.reportedSecondaryLoad[mmaload.ReplicaCount] > 0 {
17901809
// Allow all ranges above 90% of the mean. This is quite arbitrary.
@@ -2413,9 +2432,9 @@ func (cs *clusterState) canShedAndAddLoad(
24132432
// transitions to overloadSlow along one dimension, to allow for an
24142433
// exchange.
24152434
overloadedDimFractionIncrease := math.MaxFloat64
2416-
if targetSS.adjusted.load[overloadedDim] > 0 {
2435+
if adjLoad, pos := targetSS.adjusted.load.NonnegativeAt(overloadedDim); pos {
24172436
overloadedDimFractionIncrease = float64(deltaToAdd[overloadedDim]) /
2418-
float64(targetSS.adjusted.load[overloadedDim])
2437+
float64(adjLoad)
24192438
} else {
24202439
// Else, the adjusted load on the overloadedDim is zero or negative, which
24212440
// is possible, but extremely rare in practice. We arbitrarily set the
@@ -2433,8 +2452,8 @@ func (cs *clusterState) canShedAndAddLoad(
24332452
}
24342453
// This is an overloaded dimension in the target. Only allow small
24352454
// increases along this dimension.
2436-
if targetSS.adjusted.load[dim] > 0 {
2437-
dimFractionIncrease := float64(deltaToAdd[dim]) / float64(targetSS.adjusted.load[dim])
2455+
if adjLoad, pos := targetSS.adjusted.load.NonnegativeAt(dim); pos {
2456+
dimFractionIncrease := float64(deltaToAdd[dim]) / float64(adjLoad)
24382457
// The use of 33% is arbitrary.
24392458
if dimFractionIncrease > overloadedDimFractionIncrease/3 {
24402459
log.KvDistribution.Infof(ctx, "%v: %f > %f/3", dim, dimFractionIncrease, overloadedDimFractionIncrease)
@@ -2535,7 +2554,8 @@ func computeLoadSummary(
25352554
var worstDim mmaload.LoadDimension
25362555
for i := range msl.load {
25372556
const nodeIDForLogging = 0
2538-
ls := loadSummaryForDimension(ctx, ss.StoreID, nodeIDForLogging, mmaload.LoadDimension(i), ss.adjusted.load[i], ss.capacity[i],
2557+
ls := loadSummaryForDimension(ctx, ss.StoreID, nodeIDForLogging, mmaload.LoadDimension(i), ss.adjusted.load.Nonnegative()[i],
2558+
ss.capacity[i],
25392559
msl.load[i], msl.util[i])
25402560
if ls > sls {
25412561
sls = ls
@@ -2544,7 +2564,7 @@ func computeLoadSummary(
25442564
dimSummary[i] = ls
25452565
switch mmaload.LoadDimension(i) {
25462566
case mmaload.ByteSize:
2547-
highDiskSpaceUtil = highDiskSpaceUtilization(ss.adjusted.load[i], ss.capacity[i])
2567+
highDiskSpaceUtil = highDiskSpaceUtilization(ss.adjusted.load.Nonnegative()[i], ss.capacity[i])
25482568
}
25492569
}
25502570
const storeIDForLogging = 0

pkg/kv/kvserver/allocator/mmaprototype/mmaload/signed.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,80 @@
55

66
package mmaload
77

8+
import "github.com/cockroachdb/redact"
9+
810
type SignedLoadValue struct {
911
n int64
1012
}
1113

14+
func (s SignedLoadValue) Subtract(lv LoadValue) SignedLoadValue {
15+
return SignedLoadValue{n: s.n - int64(lv)}
16+
}
17+
18+
func (s SignedLoadValue) Abs() int64 {
19+
return max(s.n, -s.n)
20+
}
21+
22+
func (s SignedLoadValue) Nonnegative() LoadValue {
23+
if s.n > 0 {
24+
return LoadValue(s.n)
25+
}
26+
return 0
27+
}
28+
29+
func (s SignedLoadValue) String() string {
30+
return redact.StringWithoutMarkers(s)
31+
}
32+
33+
// SafeFormat implements the redact.SafeFormatter interface.
34+
func (s SignedLoadValue) SafeFormat(w redact.SafePrinter, _ rune) {
35+
w.SafeInt(redact.SafeInt(s.n))
36+
}
37+
1238
type SignedLoadVector [NumLoadDimensions]SignedLoadValue
1339

14-
func (v *SignedLoadVector) Add(o SignedLoadVector) {
40+
func (v *SignedLoadVector) Add(o LoadVector) {
1541
for i := 0; i < int(NumLoadDimensions); i++ {
16-
(*v)[i].n += o[i].n
42+
(*v)[i].n += int64(o[i])
1743
}
1844
}
1945

20-
func (v *SignedLoadVector) Subtract(o SignedLoadVector) {
46+
func (v *SignedLoadVector) Subtract(o LoadVector) {
2147
for i := 0; i < int(NumLoadDimensions); i++ {
22-
(*v)[i].n -= o[i].n
48+
(*v)[i].n -= int64(o[i])
2349
}
2450
}
51+
52+
// NonnegativeAt returns the load value at the given dimension if it is
53+
// non-negative, and zero otherwise. The boolean indicates whether the returned
54+
// load is strictly positive.
55+
func (v *SignedLoadVector) NonnegativeAt(dim LoadDimension) (LoadValue, bool) {
56+
val := (*v)[int(dim)].n
57+
return LoadValue(max(0, val)), val > 0
58+
}
59+
60+
func (v *SignedLoadVector) UnwrapAt(dim LoadDimension) (_ int64, positive bool) {
61+
val := (*v)[int(dim)].n
62+
return val, val > 0
63+
}
64+
65+
// Nonnegative returns a LoadVector which has zeroes for any dimension for which
66+
// the signed load is negative.
67+
func (v *SignedLoadVector) Nonnegative() LoadVector {
68+
var lv LoadVector
69+
for i := 0; i < int(NumLoadDimensions); i++ {
70+
if val := (*v)[i].n; val > 0 {
71+
lv[i] = LoadValue(val)
72+
}
73+
}
74+
return lv
75+
}
76+
77+
func (v SignedLoadVector) String() string {
78+
return redact.StringWithoutMarkers(v)
79+
}
80+
81+
// SafeFormat implements the redact.SafeFormatter interface.
82+
func (v SignedLoadVector) SafeFormat(w redact.SafePrinter, _ rune) {
83+
w.Printf("[cpu:%d, write-bandwidth:%d, byte-size:%d]", v[CPURate], v[WriteBandwidth], v[ByteSize])
84+
}

pkg/kv/kvserver/allocator/mmaprototype/mmaload/unsigned.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ func (lv LoadVector) String() string {
5252
}
5353

5454
// SafeFormat implements the redact.SafeFormatter interface.
55-
func (lv LoadVector) SafeFormat(w redact.SafePrinter, _ rune) {
56-
w.Printf("[cpu:%d, write-bandwidth:%d, byte-size:%d]", lv[CPURate], lv[WriteBandwidth], lv[ByteSize])
55+
func (lv LoadVector) SafeFormat(w redact.SafePrinter, r rune) {
56+
var v SignedLoadVector
57+
v.Add(lv)
58+
v.SafeFormat(w, r)
5759
}
5860

5961
func (lv *LoadVector) Add(other LoadVector) {

0 commit comments

Comments
 (0)