Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ scheduling:
maxSchedulingDuration: 5s
enableAssertions: false
enablePreferLargeJobOrdering: false
respectNodePodLimits: false
protectedFractionOfFairShare: 1.0
nodeIdLabel: "kubernetes.io/hostname"
priorityClasses:
Expand Down
4 changes: 4 additions & 0 deletions internal/common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)

// PodsResourceName is the Kubernetes resource key for per-node pod capacity
// (node.Status.Allocatable["pods"]), used by the scheduler's RespectNodePodLimits feature.
const PodsResourceName = "pods"

// FromResourceList function takes a map with keys of type ResourceName and values of type
// "resource.Quantity" as defined in the K8s API.
//
Expand Down
4 changes: 4 additions & 0 deletions internal/executor/utilisation/cluster_utilisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

log "github.com/armadaproject/armada/internal/common/logging"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
Expand Down Expand Up @@ -217,6 +218,9 @@ func allocatedByPriorityAndResourceTypeFromPods(pods []*v1.Pod) (map[int32]armad
priority = *(pod.Spec.Priority)
}
request := armadaresource.TotalPodResourceRequest(&pod.Spec)
// Always reported so the scheduler can track non-Armada pods against per-node pod capacity
// when RespectNodePodLimits is enabled; dropped by the scheduler's resource list factory otherwise.
request[armadaresource.PodsResourceName] = *k8sResource.NewQuantity(1, k8sResource.DecimalSI)
_, ok := resourcesByPc[priority]
if ok {
resourcesByPc[priority].Add(request)
Expand Down
7 changes: 7 additions & 0 deletions internal/executor/utilisation/cluster_utilisation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ func TestCalculateNonArmadaResource(t *testing.T) {
oneGi := resource.MustParse("1Gi")
twoGi := resource.MustParse("2Gi")
threeGi := resource.MustParse("3Gi")
onePod := resource.MustParse("1")
twoPods := resource.MustParse("2")

defaultMinToAllocatePriority := int32(3)
defaultMinToAllocate := armadaresource.ComputeResources{
Expand Down Expand Up @@ -404,6 +406,7 @@ func TestCalculateNonArmadaResource(t *testing.T) {
Resources: map[string]*resource.Quantity{
"cpu": &oneCpu,
"memory": &oneGi,
"pods": &onePod,
},
},
defaultMinToAllocatePriority: {
Expand All @@ -423,6 +426,7 @@ func TestCalculateNonArmadaResource(t *testing.T) {
Resources: map[string]*resource.Quantity{
"cpu": &threeCpu,
"memory": &threeGi,
"pods": &onePod,
},
},
},
Expand All @@ -439,6 +443,7 @@ func TestCalculateNonArmadaResource(t *testing.T) {
Resources: map[string]*resource.Quantity{
"cpu": &threeCpu,
"memory": &threeGi,
"pods": &twoPods,
},
},
},
Expand All @@ -455,12 +460,14 @@ func TestCalculateNonArmadaResource(t *testing.T) {
Resources: map[string]*resource.Quantity{
"cpu": &oneCpu,
"memory": &oneGi,
"pods": &onePod,
},
},
defaultMinToAllocatePriority: {
Resources: map[string]*resource.Quantity{
"cpu": &oneCpu,
"memory": &oneGi,
"pods": &onePod,
},
},
},
Expand Down
35 changes: 35 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ type SchedulingConfig struct {
MaxNewJobSchedulingDurationPerQueue time.Duration `validate:"omitempty,ltfield=MaxSchedulingDuration"`
// Set to true to enable scheduler assertions. This results in some performance loss.
EnableAssertions bool
// If true, the scheduler tracks per-node pod capacity and refuses to schedule
// jobs onto nodes that have exhausted their pod limit.
RespectNodePodLimits bool
// Experimental
// Set to true to enable larger job preferential ordering in the candidate gang iterator.
// This will result in larger jobs being ordered earlier in the job scheduling order
Expand Down Expand Up @@ -541,3 +544,35 @@ type PricingApiConfig struct {
// It will stub the pricing api so it returns non-zero values but won't call and external service
DevModeEnabled bool
}

// ApplyRespectNodePodLimits registers "pods" as a supported and indexed resource
// when RespectNodePodLimits is on. Must be called before constructing the
// ResourceListFactory / NodeDb so every downstream consumer sees "pods" as tracked.
// Returns true if config was modified.
func ApplyRespectNodePodLimits(config *SchedulingConfig) bool {
if !config.RespectNodePodLimits {
return false
}
config.SupportedResourceTypes = ensurePodsResourceType(config.SupportedResourceTypes)
config.IndexedResources = ensurePodsResourceType(config.IndexedResources)
return true
}

// ensurePodsResourceType ensures a "pods" entry with resolution 1 is present.
// If an entry already exists with a different resolution it is normalized.
// Resolution must be 1: jobdb injects a pods=1 quantity per job, so any other
// scale would break 1-to-1 pod accounting (e.g. resolution 10 would cause each
// job to consume 10 pod slots). Idempotent.
func ensurePodsResourceType(types []ResourceType) []ResourceType {
podsEntry := ResourceType{
Name: armadaresource.PodsResourceName,
Resolution: resource.MustParse("1"),
}
for i, t := range types {
if t.Name == armadaresource.PodsResourceName {
types[i] = podsEntry
return types
}
}
return append(types, podsEntry)
}
89 changes: 89 additions & 0 deletions internal/scheduler/configuration/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/resource"

armadaresource "github.com/armadaproject/armada/internal/common/resource"
)

func TestGetProtectedFractionOfFairShare(t *testing.T) {
Expand Down Expand Up @@ -31,3 +35,88 @@ func TestGetProtectedFractionOfFairShare(t *testing.T) {
assert.Equal(t, 0.1, sc.GetProtectedFractionOfFairShare("not-set-pool"))
assert.Equal(t, 0.1, sc.GetProtectedFractionOfFairShare("missing-pool"))
}

func TestApplyRespectNodePodLimits(t *testing.T) {
cpu := ResourceType{Name: "cpu", Resolution: resource.MustParse("1m")}
mem := ResourceType{Name: "memory", Resolution: resource.MustParse("1")}
podsDefault := ResourceType{Name: armadaresource.PodsResourceName, Resolution: resource.MustParse("1")}
podsCustom := ResourceType{Name: armadaresource.PodsResourceName, Resolution: resource.MustParse("2")}

tests := map[string]struct {
initial SchedulingConfig
applyTimes int
expectApplied bool
expectSupported []ResourceType
expectIndexed []ResourceType
}{
"flag off leaves config untouched": {
initial: SchedulingConfig{
SupportedResourceTypes: []ResourceType{cpu, mem},
IndexedResources: []ResourceType{cpu, mem},
},
applyTimes: 1,
expectApplied: false,
expectSupported: []ResourceType{cpu, mem},
expectIndexed: []ResourceType{cpu, mem},
},
"flag on appends pods to both slices with default resolution": {
initial: SchedulingConfig{
RespectNodePodLimits: true,
SupportedResourceTypes: []ResourceType{cpu, mem},
IndexedResources: []ResourceType{cpu},
},
applyTimes: 1,
expectApplied: true,
expectSupported: []ResourceType{cpu, mem, podsDefault},
expectIndexed: []ResourceType{cpu, podsDefault},
},
"flag on normalizes caller-supplied pods resolution to 1": {
// jobdb injects pods=1 per job; any resolution other than 1 would break
// 1-to-1 pod accounting, so ensurePodsResourceType rewrites the entry.
initial: SchedulingConfig{
RespectNodePodLimits: true,
SupportedResourceTypes: []ResourceType{cpu, podsCustom},
IndexedResources: []ResourceType{cpu, podsCustom},
},
applyTimes: 1,
expectApplied: true,
expectSupported: []ResourceType{cpu, podsDefault},
expectIndexed: []ResourceType{cpu, podsDefault},
},
"idempotent on repeated calls": {
initial: SchedulingConfig{
RespectNodePodLimits: true,
SupportedResourceTypes: []ResourceType{cpu},
IndexedResources: []ResourceType{cpu},
},
applyTimes: 3,
expectApplied: true,
expectSupported: []ResourceType{cpu, podsDefault},
expectIndexed: []ResourceType{cpu, podsDefault},
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
sc := tc.initial
var applied bool
for i := 0; i < tc.applyTimes; i++ {
applied = ApplyRespectNodePodLimits(&sc)
}
assert.Equal(t, tc.expectApplied, applied)
assertResourceTypesEqual(t, tc.expectSupported, sc.SupportedResourceTypes)
assertResourceTypesEqual(t, tc.expectIndexed, sc.IndexedResources)
})
}
}

func assertResourceTypesEqual(t *testing.T, expected, actual []ResourceType) {
t.Helper()
require.Len(t, actual, len(expected))
for i := range expected {
assert.Equal(t, expected[i].Name, actual[i].Name, "index %d name", i)
assert.True(t, expected[i].Resolution.Equal(actual[i].Resolution),
"index %d (%s): expected resolution %s, got %s",
i, expected[i].Name, expected[i].Resolution.String(), actual[i].Resolution.String())
}
}
16 changes: 14 additions & 2 deletions internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/utils/clock"

log "github.com/armadaproject/armada/internal/common/logging"
armadaresource "github.com/armadaproject/armada/internal/common/resource"
"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/scheduler/adapters"
Expand Down Expand Up @@ -88,7 +89,8 @@ type JobDb struct {
// Used for generating job run ids.
uuidProvider IDProvider
// Used to make efficient ResourceList types.
resourceListFactory *internaltypes.ResourceListFactory
resourceListFactory *internaltypes.ResourceListFactory
respectNodePodLimits bool
}

// IDProvider is an interface used to mock run id generation for tests.
Expand Down Expand Up @@ -155,6 +157,10 @@ func (jobDb *JobDb) SetClock(clock clock.PassiveClock) {
jobDb.clock = clock
}

func (jobDb *JobDb) SetRespectNodePodLimits(enabled bool) {
jobDb.respectNodePodLimits = enabled
}

func (jobDb *JobDb) SetUUIDProvider(uuidProvider IDProvider) {
jobDb.uuidProvider = uuidProvider
}
Expand All @@ -175,6 +181,7 @@ func (jobDb *JobDb) Clone() *JobDb {
schedulingKeyGenerator: jobDb.schedulingKeyGenerator,
stringInterner: jobDb.stringInterner,
resourceListFactory: jobDb.resourceListFactory,
respectNodePodLimits: jobDb.respectNodePodLimits,
}
}

Expand Down Expand Up @@ -247,7 +254,12 @@ func (jobDb *JobDb) NewJob(
}

func (jobDb *JobDb) getResourceRequirements(schedulingInfo *internaltypes.JobSchedulingInfo) internaltypes.ResourceList {
return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(safeGetRequirements(schedulingInfo))
requirements := safeGetRequirements(schedulingInfo)
if jobDb.respectNodePodLimits {
// Each Armada job is exactly one pod; consume one pod slot from the node's allocatable.
requirements[armadaresource.PodsResourceName] = *resource.NewQuantity(1, resource.DecimalSI)
}
return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(requirements)
}

func safeGetRequirements(schedulingInfo *internaltypes.JobSchedulingInfo) map[string]resource.Quantity {
Expand Down
58 changes: 58 additions & 0 deletions internal/scheduler/jobdb/jobdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/armadaproject/armada/internal/common/stringinterner"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfiguration "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/pkg/bidstore"
)
Expand Down Expand Up @@ -608,6 +609,63 @@ func TestJobDb_GangInfoIsPopulated(t *testing.T) {
}
}

func TestJobDb_RespectNodePodLimits_InjectsPodsResource(t *testing.T) {
cpuMem := []schedulerconfiguration.ResourceType{
{Name: "cpu", Resolution: resource.MustParse("1m")},
{Name: "memory", Resolution: resource.MustParse("1")},
}
cpuMemPods := []schedulerconfiguration.ResourceType{
{Name: "cpu", Resolution: resource.MustParse("1m")},
{Name: "memory", Resolution: resource.MustParse("1")},
{Name: "pods", Resolution: resource.MustParse("1")},
}

withCpuMem := &internaltypes.JobSchedulingInfo{
PriorityClass: "foo",
PodRequirements: &internaltypes.PodRequirements{
ResourceRequirements: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("1Gi"),
},
},
},
}
withNoRequests := &internaltypes.JobSchedulingInfo{
PriorityClass: "foo",
PodRequirements: &internaltypes.PodRequirements{},
}

tests := map[string]struct {
resourceTypes []schedulerconfiguration.ResourceType
respect bool
schedulingInfo *internaltypes.JobSchedulingInfo
expectedPods int64
}{
"flag off": {resourceTypes: cpuMemPods, respect: false, schedulingInfo: withCpuMem, expectedPods: 0},
"flag on": {resourceTypes: cpuMemPods, respect: true, schedulingInfo: withCpuMem, expectedPods: 1},
"flag on but factory lacks pods resource": {resourceTypes: cpuMem, respect: true, schedulingInfo: withCpuMem, expectedPods: 0},
// A job with no resource requests must still get pods=1 so it occupies a slot.
"flag on with no requests still injects pods": {resourceTypes: cpuMemPods, respect: true, schedulingInfo: withNoRequests, expectedPods: 1},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
factory, err := internaltypes.NewResourceListFactory(tc.resourceTypes, nil)
require.NoError(t, err)

jobDb := NewJobDb(map[string]types.PriorityClass{"foo": {}}, "foo", stringinterner.New(1024), factory)
jobDb.SetRespectNodePodLimits(tc.respect)

job, err := jobDb.NewJob("jobId", "jobSet", "queue", 1, tc.schedulingInfo, false, 0, false, false, false, 2, false, []string{}, 0)
require.NoError(t, err)

podsQty := job.KubernetesResourceRequirements().GetByNameZeroIfMissing("pods")
assert.Equal(t, tc.expectedPods, podsQty.Value())
})
}
}

func TestJobDb_SchedulingKeyIsPopulated(t *testing.T) {
podRequirements := &internaltypes.PodRequirements{
NodeSelector: map[string]string{"foo": "bar"},
Expand Down
Loading
Loading