Skip to content

Commit c76320a

Browse files
committed
Implementing maxAvailableComponentSets for resource models
Signed-off-by: mszacillo <[email protected]>
1 parent 0fde1d2 commit c76320a

File tree

2 files changed

+576
-10
lines changed

2 files changed

+576
-10
lines changed

pkg/estimator/client/general.go

Lines changed: 242 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"math"
23+
"sort"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
"k8s.io/apimachinery/pkg/api/resource"
@@ -54,7 +55,7 @@ func (ge *GeneralEstimator) MaxAvailableReplicas(_ context.Context, clusters []*
5455
}
5556

5657
func (ge *GeneralEstimator) maxAvailableReplicas(cluster *clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) int32 {
57-
//Note: resourceSummary must be deep-copied before using in the function to avoid modifying the original data structure.
58+
// Note: resourceSummary must be deep-copied before using in the function to avoid modifying the original data structure.
5859
resourceSummary := cluster.Status.ResourceSummary.DeepCopy()
5960
if resourceSummary == nil {
6061
return 0
@@ -149,8 +150,7 @@ func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.C
149150
}
150151

151152
if features.FeatureGate.Enabled(features.CustomizedClusterResourceModeling) && len(cluster.Status.ResourceSummary.AllocatableModelings) > 0 {
152-
num, err := getMaximumSetsBasedOnResourceModels(cluster, components)
153-
if err != nil {
153+
if num, err := getMaximumSetsBasedOnResourceModels(cluster, components, podBound); err != nil {
154154
klog.Warningf("Failed to get maximum sets based on resource models, skipping: %v", err)
155155
} else if num < maxSets {
156156
maxSets = num
@@ -160,13 +160,245 @@ func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.C
160160
return int32(maxSets) // #nosec G115: integer overflow conversion int64 -> int32
161161
}
162162

163-
// getMaximumSetsBasedOnResourceModels is a placeholder for future implementation.
164-
// It should refine the maximum sets based on cluster resource models, similar
165-
// to getMaximumReplicasBasedOnResourceModels but adapted to full component sets.
166-
func getMaximumSetsBasedOnResourceModels(_ *clusterv1alpha1.Cluster, _ []workv1alpha2.Component) (int64, error) {
167-
// TODO: implement logic based on cluster.Spec.ResourceModels
168-
// For now, just return MaxInt64 so it never reduces the upper bound.
169-
return math.MaxInt64, nil
163+
// getMaximumSetsBasedOnResourceModels computes the maximum number of full sets that can be
164+
// placed on a cluster using the cluster's ResourceModels. It expands one set into
165+
// replica kinds (demand + count) and performs a first-fit-decreasing placement onto model-grade nodes.
166+
// `upperBound` caps the search. We can set this using the podBound (allowedPods / podsPerSet)
167+
func getMaximumSetsBasedOnResourceModels(
168+
cluster *clusterv1alpha1.Cluster,
169+
components []workv1alpha2.Component,
170+
upperBound int64,
171+
) (int64, error) {
172+
if upperBound <= 0 {
173+
return 0, nil
174+
}
175+
176+
// Compressed one-set: per-kind (identical replicas grouped)
177+
oneSetKinds := expandKindsOneSet(components)
178+
if len(oneSetKinds) == 0 {
179+
// If there are no pods to schedule, just return upperBound
180+
return upperBound, nil
181+
}
182+
183+
// Use cluster "available" totals (allocatable - allocated - allocating) for normalized scoring
184+
// This reflects what the cluster can actually accept now
185+
totals := availableResourceMap(cluster.Status.ResourceSummary)
186+
187+
for i := range oneSetKinds {
188+
oneSetKinds[i].score = demandScoreNormalized(oneSetKinds[i].dem, totals)
189+
}
190+
sort.Slice(oneSetKinds, func(i, j int) bool {
191+
if oneSetKinds[i].score == oneSetKinds[j].score {
192+
return demandSum(oneSetKinds[i].dem) > demandSum(oneSetKinds[j].dem)
193+
}
194+
return oneSetKinds[i].score > oneSetKinds[j].score
195+
})
196+
197+
//Build model nodes from Spec.ResourceModels and Status.AllocatableModelings
198+
nodes, err := buildModelNodes(cluster)
199+
if err != nil {
200+
return -1, err
201+
}
202+
if len(nodes) == 0 {
203+
return 0, nil
204+
}
205+
206+
var sets int64
207+
for sets < upperBound {
208+
if !placeOneSet(oneSetKinds, nodes) {
209+
break
210+
}
211+
sets++
212+
}
213+
return sets, nil
214+
}
215+
216+
// placeOneSet attempts to place exactly ONE full set (all kinds with their per-set replica counts)
217+
// onto the provided working node capacities (in-place)
218+
// Returns true if successful
219+
func placeOneSet(orderedKinds []replicaKind, work []modelNode) bool {
220+
for _, k := range orderedKinds {
221+
remaining := k.count
222+
if remaining <= 0 {
223+
continue
224+
}
225+
// first-fit across nodes
226+
for n := range work {
227+
if remaining <= 0 {
228+
break
229+
}
230+
fit := maxFit(work[n].cap, k.dem)
231+
if fit <= 0 {
232+
continue
233+
}
234+
place := fit
235+
if place > remaining {
236+
place = remaining
237+
}
238+
consumeMul(work[n].cap, k.dem, place)
239+
remaining -= place
240+
}
241+
if remaining > 0 {
242+
return false
243+
}
244+
}
245+
return true
246+
}
247+
248+
// modelNode holds remaining capacity for a given node across all resource types
249+
type modelNode struct {
250+
cap map[corev1.ResourceName]int64
251+
}
252+
253+
// buildModelNodes constructs identical nodes for each model grade using its Min vector,
254+
// repeated `AllocatableModelings[i].Count` times
255+
func buildModelNodes(cluster *clusterv1alpha1.Cluster) ([]modelNode, error) {
256+
if len(cluster.Spec.ResourceModels) == 0 {
257+
return nil, fmt.Errorf("resource model is inapplicable as no grades are defined")
258+
}
259+
260+
if len(cluster.Spec.ResourceModels) > len(cluster.Status.ResourceSummary.AllocatableModelings) {
261+
// Shouldn’t happen - status is malformed
262+
return nil, fmt.Errorf("resource model/status mismatch: %d grades in spec, %d in status",
263+
len(cluster.Spec.ResourceModels), len(cluster.Status.ResourceSummary.AllocatableModelings))
264+
}
265+
266+
// Convert Spec.ResourceModels to a map of resource -> []MinByGrade
267+
minMap := convertToResourceModelsMinMap(cluster.Spec.ResourceModels)
268+
269+
// Build nodes for each grade index i
270+
var nodes []modelNode
271+
for i := 0; i < len(cluster.Spec.ResourceModels); i++ {
272+
count := cluster.Status.ResourceSummary.AllocatableModelings[i].Count
273+
if count == 0 {
274+
continue
275+
}
276+
277+
// Capacity vector for this grade = Min boundary of each resource at grade i (normalized)
278+
capTemplate := make(map[corev1.ResourceName]int64, len(minMap))
279+
for resName, mins := range minMap {
280+
if i >= len(mins) {
281+
// Model shape mismatch; treat as missing resource for this grade
282+
return nil, fmt.Errorf("resource model is inapplicable as missing resource %q in grade %d", string(resName), i)
283+
}
284+
capTemplate[resName] = quantityAsInt64(mins[i])
285+
}
286+
287+
// Append `count` identical nodes of this grade
288+
for n := 0; n < count; n++ {
289+
// Copy capTemplate to each node
290+
capCopy := make(map[corev1.ResourceName]int64, len(capTemplate))
291+
for k, v := range capTemplate {
292+
capCopy[k] = v
293+
}
294+
nodes = append(nodes, modelNode{cap: capCopy})
295+
}
296+
}
297+
return nodes, nil
298+
}
299+
300+
// replicaKind represents a single type of component, including replica demand and count
301+
type replicaKind struct {
302+
dem map[corev1.ResourceName]int64 // per-replica demand
303+
count int64 // how many replicas
304+
score float64 // ordering heuristic (higher first)
305+
}
306+
307+
// expandKindsOneSet flattens components into a slice of unique replica kinds.
308+
// Each entry holds the per-replica demand and how many replicas of that kind a set needs.
309+
func expandKindsOneSet(components []workv1alpha2.Component) []replicaKind {
310+
kinds := make([]replicaKind, 0, len(components))
311+
for _, c := range components {
312+
if c.ReplicaRequirements == nil || c.ReplicaRequirements.ResourceRequest == nil {
313+
continue
314+
}
315+
// normalize per-replica demand
316+
base := make(map[corev1.ResourceName]int64, len(c.ReplicaRequirements.ResourceRequest))
317+
for name, qty := range c.ReplicaRequirements.ResourceRequest {
318+
base[name] = quantityAsInt64(qty)
319+
}
320+
// skip zero-demand or non-positive replica count
321+
if allZero(base) || c.Replicas <= 0 {
322+
continue
323+
}
324+
325+
k := replicaKind{
326+
dem: base,
327+
count: int64(c.Replicas),
328+
// score is filled later once we know cluster-wide totals
329+
}
330+
kinds = append(kinds, k)
331+
}
332+
return kinds
333+
}
334+
335+
// demandScoreNormalized returns the "max utilization ratio" of a demand vector against total capacities
336+
// If a resource is missing/zero in total, treat it as maximally constrained
337+
func demandScoreNormalized(
338+
demand map[corev1.ResourceName]int64,
339+
total map[corev1.ResourceName]int64,
340+
) float64 {
341+
var maxRatio float64
342+
for res, req := range demand {
343+
if req <= 0 {
344+
continue
345+
}
346+
totalCap := float64(total[res])
347+
if totalCap <= 0 {
348+
return math.MaxFloat64
349+
}
350+
ratio := float64(req) / totalCap
351+
if ratio > maxRatio {
352+
maxRatio = ratio
353+
}
354+
}
355+
return maxRatio
356+
}
357+
358+
// demandSum is used as a tie-breaker when initial scores are equal
359+
func demandSum(m map[corev1.ResourceName]int64) int64 {
360+
var s int64
361+
for _, v := range m {
362+
if v > 0 {
363+
s += v
364+
}
365+
}
366+
return s
367+
}
368+
369+
// maxFit returns how many copies of `dem` fit in `cap` simultaneously
370+
func maxFit(capacity map[corev1.ResourceName]int64, dem map[corev1.ResourceName]int64) int64 {
371+
var limit int64 = math.MaxInt64
372+
for k, req := range dem {
373+
if req <= 0 {
374+
continue
375+
}
376+
avail := capacity[k]
377+
if avail <= 0 {
378+
return 0
379+
}
380+
bound := avail / req
381+
if bound < limit {
382+
limit = bound
383+
}
384+
}
385+
if limit == math.MaxInt64 {
386+
return 0
387+
}
388+
return limit
389+
}
390+
391+
// consumeMul subtracts mult * dem from cap
392+
func consumeMul(capacity map[corev1.ResourceName]int64, dem map[corev1.ResourceName]int64, mult int64) {
393+
if mult <= 0 {
394+
return
395+
}
396+
for k, req := range dem {
397+
if req <= 0 {
398+
continue
399+
}
400+
capacity[k] -= req * mult
401+
}
170402
}
171403

172404
// podsInSet computes the total number of pods in the CRD

0 commit comments

Comments
 (0)