diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 1461f8fbe62..6bea9242c89 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -83,6 +83,7 @@ scheduling: maxSchedulingDuration: 5s enableAssertions: false enablePreferLargeJobOrdering: false + respectNodePodLimits: false protectedFractionOfFairShare: 1.0 nodeIdLabel: "kubernetes.io/hostname" priorityClasses: diff --git a/internal/common/resource/resource.go b/internal/common/resource/resource.go index 1ad5e939a4f..1ef9144867e 100644 --- a/internal/common/resource/resource.go +++ b/internal/common/resource/resource.go @@ -7,6 +7,10 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) +// PodsResourceName is the Kubernetes resource key for per-node pod capacity +// (node.Status.Allocatable["pods"]), used by the scheduler's RespectNodePodLimits feature. +const PodsResourceName = "pods" + // FromResourceList function takes a map with keys of type ResourceName and values of type // "resource.Quantity" as defined in the K8s API. // diff --git a/internal/executor/utilisation/cluster_utilisation.go b/internal/executor/utilisation/cluster_utilisation.go index fbc5062021e..b86be7180f4 100644 --- a/internal/executor/utilisation/cluster_utilisation.go +++ b/internal/executor/utilisation/cluster_utilisation.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" v1 "k8s.io/api/core/v1" + k8sResource "k8s.io/apimachinery/pkg/api/resource" log "github.com/armadaproject/armada/internal/common/logging" armadaresource "github.com/armadaproject/armada/internal/common/resource" @@ -217,6 +218,9 @@ func allocatedByPriorityAndResourceTypeFromPods(pods []*v1.Pod) (map[int32]armad priority = *(pod.Spec.Priority) } request := armadaresource.TotalPodResourceRequest(&pod.Spec) + // Always reported so the scheduler can track non-Armada pods against per-node pod capacity + // when RespectNodePodLimits is enabled; dropped by the scheduler's resource list factory otherwise. + request[armadaresource.PodsResourceName] = *k8sResource.NewQuantity(1, k8sResource.DecimalSI) _, ok := resourcesByPc[priority] if ok { resourcesByPc[priority].Add(request) diff --git a/internal/executor/utilisation/cluster_utilisation_test.go b/internal/executor/utilisation/cluster_utilisation_test.go index 4a891b137a6..901d685fd1d 100644 --- a/internal/executor/utilisation/cluster_utilisation_test.go +++ b/internal/executor/utilisation/cluster_utilisation_test.go @@ -369,6 +369,8 @@ func TestCalculateNonArmadaResource(t *testing.T) { oneGi := resource.MustParse("1Gi") twoGi := resource.MustParse("2Gi") threeGi := resource.MustParse("3Gi") + onePod := resource.MustParse("1") + twoPods := resource.MustParse("2") defaultMinToAllocatePriority := int32(3) defaultMinToAllocate := armadaresource.ComputeResources{ @@ -404,6 +406,7 @@ func TestCalculateNonArmadaResource(t *testing.T) { Resources: map[string]*resource.Quantity{ "cpu": &oneCpu, "memory": &oneGi, + "pods": &onePod, }, }, defaultMinToAllocatePriority: { @@ -423,6 +426,7 @@ func TestCalculateNonArmadaResource(t *testing.T) { Resources: map[string]*resource.Quantity{ "cpu": &threeCpu, "memory": &threeGi, + "pods": &onePod, }, }, }, @@ -439,6 +443,7 @@ func TestCalculateNonArmadaResource(t *testing.T) { Resources: map[string]*resource.Quantity{ "cpu": &threeCpu, "memory": &threeGi, + "pods": &twoPods, }, }, }, @@ -455,12 +460,14 @@ func TestCalculateNonArmadaResource(t *testing.T) { Resources: map[string]*resource.Quantity{ "cpu": &oneCpu, "memory": &oneGi, + "pods": &onePod, }, }, defaultMinToAllocatePriority: { Resources: map[string]*resource.Quantity{ "cpu": &oneCpu, "memory": &oneGi, + "pods": &onePod, }, }, }, diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index cd56cb0a60d..13a0da11705 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -222,6 +222,9 @@ type SchedulingConfig struct { MaxNewJobSchedulingDurationPerQueue time.Duration `validate:"omitempty,ltfield=MaxSchedulingDuration"` // Set to true to enable scheduler assertions. This results in some performance loss. EnableAssertions bool + // If true, the scheduler tracks per-node pod capacity and refuses to schedule + // jobs onto nodes that have exhausted their pod limit. + RespectNodePodLimits bool // Experimental // Set to true to enable larger job preferential ordering in the candidate gang iterator. // This will result in larger jobs being ordered earlier in the job scheduling order @@ -541,3 +544,35 @@ type PricingApiConfig struct { // It will stub the pricing api so it returns non-zero values but won't call and external service DevModeEnabled bool } + +// ApplyRespectNodePodLimits registers "pods" as a supported and indexed resource +// when RespectNodePodLimits is on. Must be called before constructing the +// ResourceListFactory / NodeDb so every downstream consumer sees "pods" as tracked. +// Returns true if config was modified. +func ApplyRespectNodePodLimits(config *SchedulingConfig) bool { + if !config.RespectNodePodLimits { + return false + } + config.SupportedResourceTypes = ensurePodsResourceType(config.SupportedResourceTypes) + config.IndexedResources = ensurePodsResourceType(config.IndexedResources) + return true +} + +// ensurePodsResourceType ensures a "pods" entry with resolution 1 is present. +// If an entry already exists with a different resolution it is normalized. +// Resolution must be 1: jobdb injects a pods=1 quantity per job, so any other +// scale would break 1-to-1 pod accounting (e.g. resolution 10 would cause each +// job to consume 10 pod slots). Idempotent. +func ensurePodsResourceType(types []ResourceType) []ResourceType { + podsEntry := ResourceType{ + Name: armadaresource.PodsResourceName, + Resolution: resource.MustParse("1"), + } + for i, t := range types { + if t.Name == armadaresource.PodsResourceName { + types[i] = podsEntry + return types + } + } + return append(types, podsEntry) +} diff --git a/internal/scheduler/configuration/configuration_test.go b/internal/scheduler/configuration/configuration_test.go index f01ed685a9b..68800a9d489 100644 --- a/internal/scheduler/configuration/configuration_test.go +++ b/internal/scheduler/configuration/configuration_test.go @@ -4,6 +4,10 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/resource" + + armadaresource "github.com/armadaproject/armada/internal/common/resource" ) func TestGetProtectedFractionOfFairShare(t *testing.T) { @@ -31,3 +35,88 @@ func TestGetProtectedFractionOfFairShare(t *testing.T) { assert.Equal(t, 0.1, sc.GetProtectedFractionOfFairShare("not-set-pool")) assert.Equal(t, 0.1, sc.GetProtectedFractionOfFairShare("missing-pool")) } + +func TestApplyRespectNodePodLimits(t *testing.T) { + cpu := ResourceType{Name: "cpu", Resolution: resource.MustParse("1m")} + mem := ResourceType{Name: "memory", Resolution: resource.MustParse("1")} + podsDefault := ResourceType{Name: armadaresource.PodsResourceName, Resolution: resource.MustParse("1")} + podsCustom := ResourceType{Name: armadaresource.PodsResourceName, Resolution: resource.MustParse("2")} + + tests := map[string]struct { + initial SchedulingConfig + applyTimes int + expectApplied bool + expectSupported []ResourceType + expectIndexed []ResourceType + }{ + "flag off leaves config untouched": { + initial: SchedulingConfig{ + SupportedResourceTypes: []ResourceType{cpu, mem}, + IndexedResources: []ResourceType{cpu, mem}, + }, + applyTimes: 1, + expectApplied: false, + expectSupported: []ResourceType{cpu, mem}, + expectIndexed: []ResourceType{cpu, mem}, + }, + "flag on appends pods to both slices with default resolution": { + initial: SchedulingConfig{ + RespectNodePodLimits: true, + SupportedResourceTypes: []ResourceType{cpu, mem}, + IndexedResources: []ResourceType{cpu}, + }, + applyTimes: 1, + expectApplied: true, + expectSupported: []ResourceType{cpu, mem, podsDefault}, + expectIndexed: []ResourceType{cpu, podsDefault}, + }, + "flag on normalizes caller-supplied pods resolution to 1": { + // jobdb injects pods=1 per job; any resolution other than 1 would break + // 1-to-1 pod accounting, so ensurePodsResourceType rewrites the entry. + initial: SchedulingConfig{ + RespectNodePodLimits: true, + SupportedResourceTypes: []ResourceType{cpu, podsCustom}, + IndexedResources: []ResourceType{cpu, podsCustom}, + }, + applyTimes: 1, + expectApplied: true, + expectSupported: []ResourceType{cpu, podsDefault}, + expectIndexed: []ResourceType{cpu, podsDefault}, + }, + "idempotent on repeated calls": { + initial: SchedulingConfig{ + RespectNodePodLimits: true, + SupportedResourceTypes: []ResourceType{cpu}, + IndexedResources: []ResourceType{cpu}, + }, + applyTimes: 3, + expectApplied: true, + expectSupported: []ResourceType{cpu, podsDefault}, + expectIndexed: []ResourceType{cpu, podsDefault}, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + sc := tc.initial + var applied bool + for i := 0; i < tc.applyTimes; i++ { + applied = ApplyRespectNodePodLimits(&sc) + } + assert.Equal(t, tc.expectApplied, applied) + assertResourceTypesEqual(t, tc.expectSupported, sc.SupportedResourceTypes) + assertResourceTypesEqual(t, tc.expectIndexed, sc.IndexedResources) + }) + } +} + +func assertResourceTypesEqual(t *testing.T, expected, actual []ResourceType) { + t.Helper() + require.Len(t, actual, len(expected)) + for i := range expected { + assert.Equal(t, expected[i].Name, actual[i].Name, "index %d name", i) + assert.True(t, expected[i].Resolution.Equal(actual[i].Resolution), + "index %d (%s): expected resolution %s, got %s", + i, expected[i].Name, expected[i].Resolution.String(), actual[i].Resolution.String()) + } +} diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 6ed2716391c..03d7351fbad 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -15,6 +15,7 @@ import ( "k8s.io/utils/clock" log "github.com/armadaproject/armada/internal/common/logging" + armadaresource "github.com/armadaproject/armada/internal/common/resource" "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/scheduler/adapters" @@ -88,7 +89,8 @@ type JobDb struct { // Used for generating job run ids. uuidProvider IDProvider // Used to make efficient ResourceList types. - resourceListFactory *internaltypes.ResourceListFactory + resourceListFactory *internaltypes.ResourceListFactory + respectNodePodLimits bool } // IDProvider is an interface used to mock run id generation for tests. @@ -155,6 +157,10 @@ func (jobDb *JobDb) SetClock(clock clock.PassiveClock) { jobDb.clock = clock } +func (jobDb *JobDb) SetRespectNodePodLimits(enabled bool) { + jobDb.respectNodePodLimits = enabled +} + func (jobDb *JobDb) SetUUIDProvider(uuidProvider IDProvider) { jobDb.uuidProvider = uuidProvider } @@ -175,6 +181,7 @@ func (jobDb *JobDb) Clone() *JobDb { schedulingKeyGenerator: jobDb.schedulingKeyGenerator, stringInterner: jobDb.stringInterner, resourceListFactory: jobDb.resourceListFactory, + respectNodePodLimits: jobDb.respectNodePodLimits, } } @@ -247,7 +254,12 @@ func (jobDb *JobDb) NewJob( } func (jobDb *JobDb) getResourceRequirements(schedulingInfo *internaltypes.JobSchedulingInfo) internaltypes.ResourceList { - return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(safeGetRequirements(schedulingInfo)) + requirements := safeGetRequirements(schedulingInfo) + if jobDb.respectNodePodLimits { + // Each Armada job is exactly one pod; consume one pod slot from the node's allocatable. + requirements[armadaresource.PodsResourceName] = *resource.NewQuantity(1, resource.DecimalSI) + } + return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(requirements) } func safeGetRequirements(schedulingInfo *internaltypes.JobSchedulingInfo) map[string]resource.Quantity { diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index 9add27bd512..6497a2a15ed 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -18,6 +18,7 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" + schedulerconfiguration "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/pkg/bidstore" ) @@ -608,6 +609,63 @@ func TestJobDb_GangInfoIsPopulated(t *testing.T) { } } +func TestJobDb_RespectNodePodLimits_InjectsPodsResource(t *testing.T) { + cpuMem := []schedulerconfiguration.ResourceType{ + {Name: "cpu", Resolution: resource.MustParse("1m")}, + {Name: "memory", Resolution: resource.MustParse("1")}, + } + cpuMemPods := []schedulerconfiguration.ResourceType{ + {Name: "cpu", Resolution: resource.MustParse("1m")}, + {Name: "memory", Resolution: resource.MustParse("1")}, + {Name: "pods", Resolution: resource.MustParse("1")}, + } + + withCpuMem := &internaltypes.JobSchedulingInfo{ + PriorityClass: "foo", + PodRequirements: &internaltypes.PodRequirements{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("1Gi"), + }, + }, + }, + } + withNoRequests := &internaltypes.JobSchedulingInfo{ + PriorityClass: "foo", + PodRequirements: &internaltypes.PodRequirements{}, + } + + tests := map[string]struct { + resourceTypes []schedulerconfiguration.ResourceType + respect bool + schedulingInfo *internaltypes.JobSchedulingInfo + expectedPods int64 + }{ + "flag off": {resourceTypes: cpuMemPods, respect: false, schedulingInfo: withCpuMem, expectedPods: 0}, + "flag on": {resourceTypes: cpuMemPods, respect: true, schedulingInfo: withCpuMem, expectedPods: 1}, + "flag on but factory lacks pods resource": {resourceTypes: cpuMem, respect: true, schedulingInfo: withCpuMem, expectedPods: 0}, + // A job with no resource requests must still get pods=1 so it occupies a slot. + "flag on with no requests still injects pods": {resourceTypes: cpuMemPods, respect: true, schedulingInfo: withNoRequests, expectedPods: 1}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + factory, err := internaltypes.NewResourceListFactory(tc.resourceTypes, nil) + require.NoError(t, err) + + jobDb := NewJobDb(map[string]types.PriorityClass{"foo": {}}, "foo", stringinterner.New(1024), factory) + jobDb.SetRespectNodePodLimits(tc.respect) + + job, err := jobDb.NewJob("jobId", "jobSet", "queue", 1, tc.schedulingInfo, false, 0, false, false, false, 2, false, []string{}, 0) + require.NoError(t, err) + + podsQty := job.KubernetesResourceRequirements().GetByNameZeroIfMissing("pods") + assert.Equal(t, tc.expectedPods, podsQty.Value()) + }) + } +} + func TestJobDb_SchedulingKeyIsPopulated(t *testing.T) { podRequirements := &internaltypes.PodRequirements{ NodeSelector: map[string]string{"foo": "bar"}, diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index ba36008b991..d32a7804646 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -13,6 +13,8 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/pointer" + armadaresource "github.com/armadaproject/armada/internal/common/resource" + "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" @@ -231,6 +233,93 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { assert.Empty(t, unboundNode.EvictedJobRunIds) } +// Covers the pods resource path, which TestNodeBindingEvictionUnbinding does not exercise. +// Bind, evict, and unbind flow resource requirements through KubernetesResourceRequirements, +// so a regression in pod-slot accounting would surface here even though the cpu/memory/GPU +// cases look green. +func TestNodeBindingEvictionUnbinding_ReleasesPodSlot(t *testing.T) { + resourceTypes := []schedulerconfig.ResourceType{ + {Name: "cpu", Resolution: resource.MustParse("1m")}, + {Name: "memory", Resolution: resource.MustParse("1")}, + {Name: armadaresource.PodsResourceName, Resolution: resource.MustParse("1")}, + } + rlFactory, err := internaltypes.NewResourceListFactory(resourceTypes, nil) + require.NoError(t, err) + + jobDb := jobdb.NewJobDbWithSchedulingKeyGenerator( + testfixtures.TestPriorityClasses, + testfixtures.TestDefaultPriorityClass, + testfixtures.SchedulingKeyGenerator, + stringinterner.New(1024), + rlFactory, + ) + jobDb.SetRespectNodePodLimits(true) + + nodeDb, err := NewNodeDb( + testfixtures.TestPriorityClasses, + resourceTypes, + testfixtures.TestIndexedTaints, + testfixtures.TestIndexedNodeLabels, + testfixtures.TestWellKnownNodeTypes, + rlFactory, + ) + require.NoError(t, err) + + nodeProto := testfixtures.TestSchedulerObjectsNode(testfixtures.TestPriorities, map[string]*resource.Quantity{ + "cpu": pointer.MustParseResource("10"), + "memory": pointer.MustParseResource("64Gi"), + armadaresource.PodsResourceName: pointer.MustParseResource("1"), + }) + nodeFactory := internaltypes.NewNodeFactory( + testfixtures.TestIndexedTaints, + testfixtures.TestIndexedNodeLabels, + testfixtures.TestPriorityClasses, + rlFactory, + ) + node := nodeFactory.FromSchedulerObjectsNode(nodeProto) + + txn := nodeDb.Txn(true) + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(txn, nil, node)) + txn.Commit() + + job := newPodsJob(t, jobDb, "jobA") + priority := job.PriorityClass().Priority + entry, err := nodeDb.GetNode(node.GetId()) + require.NoError(t, err) + + boundNode, err := nodeDb.BindJobToNode(entry, job, priority) + require.NoError(t, err) + boundPods := boundNode.AllocatableByPriority[priority].GetByNameZeroIfMissing(armadaresource.PodsResourceName) + assert.Equal(t, int64(0), boundPods.Value(), "bind should consume the pod slot") + + evictedNode, err := nodeDb.EvictJobsFromNode([]*jobdb.Job{job}, boundNode) + require.NoError(t, err) + unboundNode, err := nodeDb.UnbindJobFromNode(job, evictedNode) + require.NoError(t, err) + releasedPods := unboundNode.AllocatableByPriority[priority].GetByNameZeroIfMissing(armadaresource.PodsResourceName) + assert.Equal(t, int64(1), releasedPods.Value(), "evict+unbind should free the pod slot") + + followUp := newPodsJob(t, jobDb, "jobB") + rebindNode, err := nodeDb.BindJobToNode(unboundNode, followUp, followUp.PriorityClass().Priority) + require.NoError(t, err, "second job should bind after the slot is freed") + rebindPods := rebindNode.AllocatableByPriority[priority].GetByNameZeroIfMissing(armadaresource.PodsResourceName) + assert.Equal(t, int64(0), rebindPods.Value(), "second bind should also consume the pod slot") +} + +func newPodsJob(t *testing.T, db *jobdb.JobDb, jobId string) *jobdb.Job { + t.Helper() + info := &internaltypes.JobSchedulingInfo{ + PriorityClass: testfixtures.PriorityClass0, + PodRequirements: testfixtures.TestPodReqs(v1.ResourceList{ + "cpu": resource.MustParse("1"), + "memory": resource.MustParse("1Gi"), + }), + } + job, err := db.NewJob(jobId, "jobSet", "queue", 1, info, false, 0, false, false, false, 0, false, []string{testfixtures.TestPool}, 0) + require.NoError(t, err) + return job +} + func assertNodeAccountingEqual(t *testing.T, node1, node2 *internaltypes.Node) { assert.True( t, diff --git a/internal/scheduler/nodedb/nodematching_test.go b/internal/scheduler/nodedb/nodematching_test.go index 7cd2553eb5f..225951b4d6e 100644 --- a/internal/scheduler/nodedb/nodematching_test.go +++ b/internal/scheduler/nodedb/nodematching_test.go @@ -4,10 +4,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" armadamaps "github.com/armadaproject/armada/internal/common/maps" + armadaresource "github.com/armadaproject/armada/internal/common/resource" + schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/internaltypes" schedulercontext "github.com/armadaproject/armada/internal/scheduler/scheduling/context" "github.com/armadaproject/armada/internal/scheduler/testfixtures" @@ -650,7 +653,61 @@ func BenchmarkInsufficientResourcesSum64(b *testing.B) { } } -func TestResourceRequirementsMet(t *testing.T) { +func TestResourceRequirementsMet_RespectNodePodLimits(t *testing.T) { + rlFactory, err := internaltypes.NewResourceListFactory( + []schedulerconfig.ResourceType{ + {Name: "cpu", Resolution: resource.MustParse("1m")}, + {Name: "memory", Resolution: resource.MustParse("1")}, + {Name: armadaresource.PodsResourceName, Resolution: resource.MustParse("1")}, + }, + nil, + ) + require.NoError(t, err) + + required := rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{ + "cpu": resource.MustParse("1"), + armadaresource.PodsResourceName: resource.MustParse("1"), + }) + + tests := map[string]struct { + availableCpu, availablePods string + expectOk bool + expectReason string + }{ + "saturated by pods": { + availableCpu: "10", availablePods: "0", + expectOk: false, expectReason: armadaresource.PodsResourceName, + }, + "slot available": { + availableCpu: "10", availablePods: "1", + expectOk: true, + }, + "saturated by both cpu and pods": { + // Factory iterates resources in declared order (cpu, memory, pods), + // so cpu wins the tiebreak. Pins down operator-facing reason ordering. + availableCpu: "0", availablePods: "0", + expectOk: false, expectReason: "cpu", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + available := rlFactory.FromJobResourceListIgnoreUnknown(map[string]resource.Quantity{ + "cpu": resource.MustParse(tc.availableCpu), + "memory": resource.MustParse("64Gi"), + armadaresource.PodsResourceName: resource.MustParse(tc.availablePods), + }) + ok, reason := resourceRequirementsMet(available, required) + assert.Equal(t, tc.expectOk, ok) + if tc.expectOk { + assert.Nil(t, reason) + return + } + insuff, isInsufficient := reason.(*InsufficientResources) + require.True(t, isInsufficient, "expected InsufficientResources, got %T", reason) + assert.Equal(t, tc.expectReason, insuff.ResourceName) + }) + } } func makeTestNodeTaintsLabels(taints []v1.Taint, labels map[string]string) *internaltypes.Node { diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 84dbcb7eb5e..8ec24a06f62 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -80,6 +80,7 @@ func Run(config schedulerconfig.Configuration) error { // //////////////////////////////////////////////////////////////////////// // Resource list factory // //////////////////////////////////////////////////////////////////////// + schedulerconfig.ApplyRespectNodePodLimits(&config.Scheduling) resourceListFactory, err := internaltypes.NewResourceListFactory( config.Scheduling.SupportedResourceTypes, config.Scheduling.FloatingResources, @@ -364,6 +365,7 @@ func Run(config schedulerconfig.Configuration) error { stringInterner, resourceListFactory, ) + jobDb.SetRespectNodePodLimits(config.Scheduling.RespectNodePodLimits) schedulerMetrics, err := metrics.New( config.Metrics.TrackedErrorRegexes, diff --git a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go index 3d0503e8c12..46fe79c2ac3 100644 --- a/internal/scheduler/scheduling/preempting_queue_scheduler_test.go +++ b/internal/scheduler/scheduling/preempting_queue_scheduler_test.go @@ -13,11 +13,14 @@ import ( "golang.org/x/exp/slices" "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" + k8sResource "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/clock" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/logging" armadamaps "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/common/pointer" + armadaresource "github.com/armadaproject/armada/internal/common/resource" armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" @@ -3007,6 +3010,219 @@ func TestEvictionAssertions_MixedStateGangWithRequeuedMember(t *testing.T) { assert.NoError(t, err, "requeued member not on a node should be excluded from eviction count") } +// TestPreemptingQueueScheduler_RespectNodePodLimits verifies that with the +// respectNodePodLimits flag enabled and pods registered as a tracked resource, +// the preempting queue scheduler treats per-node pod capacity as a binding +// constraint: single jobs and gangs honour the pod cap, gangs reject atomically +// when they don't fit, and preemption can free slots for a higher-priority gang. +func TestPreemptingQueueScheduler_RespectNodePodLimits(t *testing.T) { + tests := map[string]struct { + incumbentPriorityClass string + challengerCount int + challengerIsGang bool + nodePodCapacity int64 + extraNodePodCapacities []int64 + incumbentCount int + expectedPreemptions int + expectedNewlyScheduled int + }{ + "saturated with preemptible incumbents, higher-priority challenger arrives": { + incumbentPriorityClass: testfixtures.PriorityClass0, + challengerCount: 1, + nodePodCapacity: 5, + incumbentCount: 5, + expectedPreemptions: 1, + expectedNewlyScheduled: 1, + }, + "node has free pod slot, higher-priority challenger arrives": { + incumbentPriorityClass: testfixtures.PriorityClass0, + challengerCount: 1, + nodePodCapacity: 5, + incumbentCount: 4, + expectedPreemptions: 0, + expectedNewlyScheduled: 1, + }, + "gang challenger exceeds node pod cap, atomic rejection": { + // Gang must reject as a unit; no partial schedule for the executor to clean up. + challengerCount: 5, + challengerIsGang: true, + nodePodCapacity: 3, + incumbentCount: 0, + expectedPreemptions: 0, + expectedNewlyScheduled: 0, + }, + "gang challenger fits exactly after preempting all incumbents": { + incumbentPriorityClass: testfixtures.PriorityClass0, + challengerCount: 5, + challengerIsGang: true, + nodePodCapacity: 5, + incumbentCount: 5, + expectedPreemptions: 5, + expectedNewlyScheduled: 5, + }, + "saturated node + free node, challenger picks free node": { + // Proves the per-node pod predicate is consulted during cross-node + // candidate iteration: landing on the empty node instead of preempting + // on the full one is only correct if pods is part of node fitness, not + // just within-node accounting. + incumbentPriorityClass: testfixtures.PriorityClass0, + challengerCount: 1, + nodePodCapacity: 5, + extraNodePodCapacities: []int64{5}, + incumbentCount: 5, + expectedPreemptions: 0, + expectedNewlyScheduled: 1, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + config := testfixtures.TestSchedulingConfig() + config.RespectNodePodLimits = true + require.True(t, configuration.ApplyRespectNodePodLimits(&config)) + + // The global testfixtures.TestResourceListFactory does not track pods, so + // we build a local pods-aware factory and thread it through both the jobDb + // and the nodeDb. Without this, assertSameResourceListFactory panics when + // the scheduler compares job and node resource lists. + podsAwareFactory, err := internaltypes.NewResourceListFactory(config.SupportedResourceTypes, nil) + require.NoError(t, err) + + jobDb := jobdb.NewJobDb(config.PriorityClasses, config.DefaultPriorityClassName, stringinterner.New(1024), podsAwareFactory) + jobDb.SetRespectNodePodLimits(true) + + nodeFactory := internaltypes.NewNodeFactory( + config.IndexedTaints, + config.IndexedNodeLabels, + config.PriorityClasses, + podsAwareFactory, + ) + buildNode := func(podCap int64) *internaltypes.Node { + return nodeFactory.FromSchedulerObjectsNode(testfixtures.TestSchedulerObjectsNode(testfixtures.TestPriorities, map[string]*k8sResource.Quantity{ + "cpu": pointer.MustParseResource("10"), + "memory": pointer.MustParseResource("64Gi"), + armadaresource.PodsResourceName: pointer.MustParseResource(fmt.Sprintf("%d", podCap)), + })) + } + node := buildNode(tc.nodePodCapacity) + extraNodes := make([]*internaltypes.Node, len(tc.extraNodePodCapacities)) + for i, capacity := range tc.extraNodePodCapacities { + extraNodes[i] = buildNode(capacity) + } + + makeJob := func(priorityClass string, queued bool) *jobdb.Job { + jobId := util.ULID() + now := time.Now() + job, err := jobDb.NewJob( + jobId.String(), + testfixtures.TestJobset, + "A", + 1000, + &internaltypes.JobSchedulingInfo{ + PriorityClass: priorityClass, + SubmitTime: now, + PodRequirements: testfixtures.Test1Cpu4GiPodReqs("A", jobId, 0), + }, + queued, + 0, + false, + false, + false, + now.UnixNano(), + false, + []string{testfixtures.TestPool}, + 0, + ) + require.NoError(t, err) + return job + } + + incumbents := make([]*jobdb.Job, tc.incumbentCount) + for i := 0; i < tc.incumbentCount; i++ { + j := makeJob(tc.incumbentPriorityClass, false) + incumbents[i] = j.WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), j.PriorityClass().Priority) + } + challengers := make([]*jobdb.Job, tc.challengerCount) + for i := 0; i < tc.challengerCount; i++ { + challengers[i] = makeJob(testfixtures.PriorityClass3, true) + } + if tc.challengerIsGang { + challengers = testfixtures.WithGangJobDetails(challengers, "gang-1", tc.challengerCount, "") + } + + nodeDb, err := nodedb.NewNodeDb( + config.PriorityClasses, + config.IndexedResources, + config.IndexedTaints, + config.IndexedNodeLabels, + config.WellKnownNodeTypes, + podsAwareFactory, + ) + require.NoError(t, err) + nodeDbTxn := nodeDb.Txn(true) + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, incumbents, node)) + for _, n := range extraNodes { + require.NoError(t, nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, nil, n)) + } + nodeDbTxn.Commit() + + jobDbTxn := jobDb.WriteTxn() + require.NoError(t, jobDbTxn.Upsert(incumbents)) + require.NoError(t, jobDbTxn.Upsert(challengers)) + + totalResources := nodeDb.TotalKubernetesResources() + fairnessCostProvider, err := fairness.NewDominantResourceFairness(totalResources, testfixtures.TestPool, config) + require.NoError(t, err) + + demand := internaltypes.ResourceList{} + for _, j := range challengers { + demand = demand.Add(j.AllResourceRequirements()) + } + // Seed allocated-by-priority-class so eviction bookkeeping can subtract + // the incumbents without underflowing the queue allocation to negative. + allocatedByPriorityClass := map[string]internaltypes.ResourceList{} + for _, j := range incumbents { + allocatedByPriorityClass[j.PriorityClassName()] = allocatedByPriorityClass[j.PriorityClassName()].Add(j.AllResourceRequirements()) + } + + sctx := schedulingcontext.NewSchedulingContext(testfixtures.TestPool, fairnessCostProvider, rate.NewLimiter(rate.Inf, 1000), totalResources) + require.NoError(t, sctx.AddQueueSchedulingContext( + "A", 1, 1, + allocatedByPriorityClass, + demand, + demand, + internaltypes.ResourceList{}, + rate.NewLimiter(rate.Inf, 1000), + )) + sctx.UpdateFairShares() + + constraints := schedulerconstraints.NewSchedulingConstraints("pool", totalResources, config, []*api.Queue{{Name: "A"}}) + + sch := NewPreemptingQueueScheduler( + sctx, constraints, testfixtures.TestEmptyFloatingResources, config, + jobDbTxn, nodeDb, false, clock.RealClock{}, + ) + result, err := sch.Schedule(armadacontext.Background()) + require.NoError(t, err) + + assert.Len(t, result.PreemptedJobs, tc.expectedPreemptions, "unexpected preemption count") + assert.Len(t, result.ScheduledJobs, tc.expectedNewlyScheduled, "unexpected scheduling count") + for _, jctx := range result.PreemptedJobs { + assert.Equal(t, tc.incumbentPriorityClass, jctx.Job.PriorityClassName(), + "preempted job should be from the incumbent priority class") + } + for _, jctx := range result.ScheduledJobs { + assert.Equal(t, testfixtures.PriorityClass3, jctx.Job.PriorityClassName(), + "scheduled job should be the higher-priority challenger") + if len(extraNodes) > 0 { + assert.NotEqual(t, node.GetId(), jctx.PodSchedulingContext.NodeId, + "challenger should land on an extra node, not the saturated one") + } + } + }) + } +} + func testNodeWithTaints(node *internaltypes.Node, taints []v1.Taint) *internaltypes.Node { return internaltypes.CreateNode( node.GetId(), diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index f9cf64b19e5..26e72648e4c 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -120,6 +120,7 @@ func NewSimulator( schedulerCyclePeriodSeconds int, sink sink.Sink, ) (*Simulator, error) { + configuration.ApplyRespectNodePodLimits(&schedulingConfig) resourceListFactory, err := internaltypes.NewResourceListFactory( schedulingConfig.SupportedResourceTypes, schedulingConfig.FloatingResources, @@ -147,6 +148,7 @@ func NewSimulator( stringinterner.New(1024), resourceListFactory, ) + jobDb.SetRespectNodePodLimits(schedulingConfig.RespectNodePodLimits) randomSeed := workloadSpec.RandomSeed if randomSeed == 0 { // Seed the RNG using the local time if no explicit random seed is provided.