Skip to content

Commit 7390e9e

Browse files
Add topology spread constraints to deployments (#332)
* Add topology spread constraints * Add unit tests * Add config unit tests * Change match expressions to match labels in label selector to be added to topology pod constraints * Simplify test topology spread constraints * Add topology spread constraints feature flag
1 parent 4889ff3 commit 7390e9e

File tree

11 files changed

+399
-24
lines changed

11 files changed

+399
-24
lines changed

api/turing/cluster/knative_service.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type KnativeService struct {
4545
// and a % value (of the requested value) for cpu / memory based autoscaling.
4646
AutoscalingTarget string `json:"autoscalingTarget"`
4747

48+
// TopologySpreadConstraints contains a list of topology spread constraint to be applied on the pods of this service
49+
TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints"`
50+
4851
// Resource properties
4952
QueueProxyResourcePercentage int `json:"queueProxyResourcePercentage"`
5053
UserContainerLimitRequestFactor float64 `json:"userContainerLimitRequestFactor"`
@@ -157,6 +160,10 @@ func (cfg *KnativeService) buildSvcSpec(
157160
initContainers = cfg.buildInitContainer(cfg.InitContainers)
158161
}
159162

163+
// Add Knative app name label to the match expressions of each topology spread constraint to spread out all
164+
// the pods across the specified topologyKey
165+
topologySpreadConstraints := cfg.appendPodSpreadingLabelSelectorsToTopologySpreadConstraints(revisionName)
166+
160167
return &knservingv1.ServiceSpec{
161168
ConfigurationSpec: knservingv1.ConfigurationSpec{
162169
Template: knservingv1.RevisionTemplateSpec{
@@ -167,9 +174,10 @@ func (cfg *KnativeService) buildSvcSpec(
167174
},
168175
Spec: knservingv1.RevisionSpec{
169176
PodSpec: corev1.PodSpec{
170-
Containers: []corev1.Container{container},
171-
Volumes: cfg.Volumes,
172-
InitContainers: initContainers,
177+
Containers: []corev1.Container{container},
178+
Volumes: cfg.Volumes,
179+
InitContainers: initContainers,
180+
TopologySpreadConstraints: topologySpreadConstraints,
173181
},
174182
TimeoutSeconds: &timeout,
175183
},
@@ -193,3 +201,23 @@ func (cfg *KnativeService) getAutoscalingTarget() (string, error) {
193201
// For all other metrics, we can use the supplied value as is.
194202
return cfg.AutoscalingTarget, nil
195203
}
204+
205+
// appendPodSpreadingLabelSelectorsToTopologySpreadConstraints adds the given revisionName as a label to the
206+
// match labels of each topology spread constraint to spread out all the pods across the specified topologyKey
207+
func (cfg *KnativeService) appendPodSpreadingLabelSelectorsToTopologySpreadConstraints(
208+
revisionName string,
209+
) []corev1.TopologySpreadConstraint {
210+
for i := range cfg.TopologySpreadConstraints {
211+
if cfg.TopologySpreadConstraints[i].LabelSelector == nil {
212+
cfg.TopologySpreadConstraints[i].LabelSelector = &metav1.LabelSelector{
213+
MatchLabels: map[string]string{"app": revisionName},
214+
}
215+
} else {
216+
if cfg.TopologySpreadConstraints[i].LabelSelector.MatchLabels == nil {
217+
cfg.TopologySpreadConstraints[i].LabelSelector.MatchLabels = make(map[string]string)
218+
}
219+
cfg.TopologySpreadConstraints[i].LabelSelector.MatchLabels["app"] = revisionName
220+
}
221+
}
222+
return cfg.TopologySpreadConstraints
223+
}

api/turing/cluster/knative_service_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,160 @@ func TestBuildKnativeServiceConfig(t *testing.T) {
335335
},
336336
},
337337
},
338+
"topology spread constraints": {
339+
serviceCfg: KnativeService{
340+
BaseService: baseSvc,
341+
ContainerPort: 8080,
342+
MinReplicas: 1,
343+
MaxReplicas: 2,
344+
AutoscalingMetric: "concurrency",
345+
AutoscalingTarget: "1",
346+
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{
347+
{
348+
MaxSkew: 1,
349+
TopologyKey: "kubernetes.io/hostname",
350+
WhenUnsatisfiable: corev1.ScheduleAnyway,
351+
},
352+
{
353+
MaxSkew: 2,
354+
TopologyKey: "kubernetes.io/hostname",
355+
WhenUnsatisfiable: corev1.DoNotSchedule,
356+
LabelSelector: &metav1.LabelSelector{
357+
MatchExpressions: []metav1.LabelSelectorRequirement{
358+
{
359+
Key: "app-expression",
360+
Operator: metav1.LabelSelectorOpIn,
361+
Values: []string{"1"},
362+
},
363+
},
364+
},
365+
},
366+
{
367+
MaxSkew: 3,
368+
TopologyKey: "kubernetes.io/hostname",
369+
WhenUnsatisfiable: corev1.DoNotSchedule,
370+
LabelSelector: &metav1.LabelSelector{
371+
MatchLabels: map[string]string{
372+
"app-label": "spread",
373+
},
374+
MatchExpressions: []metav1.LabelSelectorRequirement{
375+
{
376+
Key: "app-expression",
377+
Operator: metav1.LabelSelectorOpIn,
378+
Values: []string{"1"},
379+
},
380+
},
381+
},
382+
},
383+
},
384+
IsClusterLocal: true,
385+
QueueProxyResourcePercentage: 30,
386+
UserContainerLimitRequestFactor: 1.5,
387+
Protocol: routerConfig.UPI,
388+
},
389+
expectedSpec: knservingv1.Service{
390+
ObjectMeta: metav1.ObjectMeta{
391+
Name: "test-svc",
392+
Namespace: "test-namespace",
393+
Labels: map[string]string{
394+
"labelKey": "labelVal",
395+
"networking.knative.dev/visibility": "cluster-local",
396+
},
397+
},
398+
Spec: knservingv1.ServiceSpec{
399+
ConfigurationSpec: knservingv1.ConfigurationSpec{
400+
Template: knservingv1.RevisionTemplateSpec{
401+
ObjectMeta: metav1.ObjectMeta{
402+
Name: "test-svc-0",
403+
Labels: map[string]string{
404+
"labelKey": "labelVal",
405+
},
406+
Annotations: map[string]string{
407+
"autoscaling.knative.dev/minScale": "1",
408+
"autoscaling.knative.dev/maxScale": "2",
409+
"autoscaling.knative.dev/metric": "concurrency",
410+
"autoscaling.knative.dev/target": "1",
411+
"autoscaling.knative.dev/class": "kpa.autoscaling.knative.dev",
412+
"queue.sidecar.serving.knative.dev/resourcePercentage": "30",
413+
},
414+
},
415+
Spec: knservingv1.RevisionSpec{
416+
PodSpec: corev1.PodSpec{
417+
Containers: []corev1.Container{
418+
{
419+
Name: podSpec.Containers[0].Name,
420+
Image: podSpec.Containers[0].Image,
421+
Ports: []corev1.ContainerPort{
422+
{
423+
Name: "h2c",
424+
ContainerPort: 8080,
425+
},
426+
},
427+
Resources: resources,
428+
ReadinessProbe: podSpec.Containers[0].ReadinessProbe,
429+
LivenessProbe: podSpec.Containers[0].LivenessProbe,
430+
VolumeMounts: baseSvc.VolumeMounts,
431+
Env: envs,
432+
},
433+
},
434+
Volumes: baseSvc.Volumes,
435+
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{
436+
{
437+
MaxSkew: 1,
438+
TopologyKey: "kubernetes.io/hostname",
439+
WhenUnsatisfiable: corev1.ScheduleAnyway,
440+
LabelSelector: &metav1.LabelSelector{
441+
MatchLabels: map[string]string{
442+
"app": "test-svc-0",
443+
},
444+
},
445+
},
446+
{
447+
MaxSkew: 2,
448+
TopologyKey: "kubernetes.io/hostname",
449+
WhenUnsatisfiable: corev1.DoNotSchedule,
450+
LabelSelector: &metav1.LabelSelector{
451+
MatchLabels: map[string]string{
452+
"app": "test-svc-0",
453+
},
454+
MatchExpressions: []metav1.LabelSelectorRequirement{
455+
{
456+
Key: "app-expression",
457+
Operator: metav1.LabelSelectorOpIn,
458+
Values: []string{"1"},
459+
},
460+
},
461+
},
462+
},
463+
{
464+
MaxSkew: 3,
465+
TopologyKey: "kubernetes.io/hostname",
466+
WhenUnsatisfiable: corev1.DoNotSchedule,
467+
LabelSelector: &metav1.LabelSelector{
468+
MatchLabels: map[string]string{
469+
"app-label": "spread",
470+
"app": "test-svc-0",
471+
},
472+
MatchExpressions: []metav1.LabelSelectorRequirement{
473+
{
474+
Key: "app-expression",
475+
Operator: metav1.LabelSelectorOpIn,
476+
Values: []string{"1"},
477+
},
478+
},
479+
},
480+
},
481+
},
482+
},
483+
TimeoutSeconds: &timeout,
484+
ContainerConcurrency: &defaultConcurrency,
485+
},
486+
},
487+
},
488+
RouteSpec: defaultRouteSpec,
489+
},
490+
},
491+
},
338492
}
339493

340494
for name, data := range tests {

api/turing/cluster/servicebuilder/router.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ func (sb *clusterSvcBuilder) NewRouterService(
127127
if err != nil {
128128
return nil, err
129129
}
130+
131+
topologySpreadConstraints, err := sb.getTopologySpreadConstraints()
132+
if err != nil {
133+
return nil, err
134+
}
135+
130136
svc := &cluster.KnativeService{
131137
BaseService: &cluster.BaseService{
132138
Name: name,
@@ -150,6 +156,7 @@ func (sb *clusterSvcBuilder) NewRouterService(
150156
MaxReplicas: routerVersion.ResourceRequest.MaxReplica,
151157
AutoscalingMetric: string(routerVersion.AutoscalingPolicy.Metric),
152158
AutoscalingTarget: routerVersion.AutoscalingPolicy.Target,
159+
TopologySpreadConstraints: topologySpreadConstraints,
153160
QueueProxyResourcePercentage: knativeQueueProxyResourcePercentage,
154161
UserContainerLimitRequestFactor: userContainerLimitRequestFactor,
155162
}

api/turing/cluster/servicebuilder/router_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
)
2121

2222
func TestNewRouterService(t *testing.T) {
23-
sb := NewClusterServiceBuilder(resource.MustParse("2"), resource.MustParse("2Gi"), 30)
23+
sb := NewClusterServiceBuilder(resource.MustParse("2"), resource.MustParse("2Gi"), 30, testTopologySpreadConstraints)
2424
testDataBasePath := filepath.Join("..", "..", "testdata", "cluster", "servicebuilder")
2525
enrEndpoint := "http://test-svc-turing-enricher-1.test-project.svc.cluster.local/echo?delay=10ms"
2626
ensEndpoint := "http://test-svc-turing-ensembler-1.test-project.svc.cluster.local/echo?delay=20ms"
@@ -165,6 +165,7 @@ func TestNewRouterService(t *testing.T) {
165165
MaxReplicas: 4,
166166
AutoscalingMetric: "concurrency",
167167
AutoscalingTarget: "1",
168+
TopologySpreadConstraints: testTopologySpreadConstraints,
168169
QueueProxyResourcePercentage: 20,
169170
UserContainerLimitRequestFactor: 1.5,
170171
},
@@ -263,6 +264,7 @@ func TestNewRouterService(t *testing.T) {
263264
MaxReplicas: 4,
264265
AutoscalingMetric: "concurrency",
265266
AutoscalingTarget: "1",
267+
TopologySpreadConstraints: testTopologySpreadConstraints,
266268
QueueProxyResourcePercentage: 20,
267269
UserContainerLimitRequestFactor: 1.5,
268270
},
@@ -369,6 +371,7 @@ func TestNewRouterService(t *testing.T) {
369371
MaxReplicas: 4,
370372
AutoscalingMetric: "concurrency",
371373
AutoscalingTarget: "1",
374+
TopologySpreadConstraints: testTopologySpreadConstraints,
372375
QueueProxyResourcePercentage: 20,
373376
UserContainerLimitRequestFactor: 1.5,
374377
},
@@ -467,6 +470,7 @@ func TestNewRouterService(t *testing.T) {
467470
MaxReplicas: 4,
468471
AutoscalingMetric: "rps",
469472
AutoscalingTarget: "100",
473+
TopologySpreadConstraints: testTopologySpreadConstraints,
470474
QueueProxyResourcePercentage: 20,
471475
UserContainerLimitRequestFactor: 1.5,
472476
},
@@ -565,6 +569,7 @@ func TestNewRouterService(t *testing.T) {
565569
MaxReplicas: 4,
566570
AutoscalingMetric: "rps",
567571
AutoscalingTarget: "100",
572+
TopologySpreadConstraints: testTopologySpreadConstraints,
568573
QueueProxyResourcePercentage: 20,
569574
UserContainerLimitRequestFactor: 1.5,
570575
},
@@ -663,6 +668,7 @@ func TestNewRouterService(t *testing.T) {
663668
MaxReplicas: 4,
664669
AutoscalingMetric: "rps",
665670
AutoscalingTarget: "100",
671+
TopologySpreadConstraints: testTopologySpreadConstraints,
666672
QueueProxyResourcePercentage: 20,
667673
UserContainerLimitRequestFactor: 1.5,
668674
},
@@ -761,6 +767,7 @@ func TestNewRouterService(t *testing.T) {
761767
MaxReplicas: 4,
762768
AutoscalingMetric: "concurrency",
763769
AutoscalingTarget: "1",
770+
TopologySpreadConstraints: testTopologySpreadConstraints,
764771
QueueProxyResourcePercentage: 20,
765772
UserContainerLimitRequestFactor: 1.5,
766773
},
@@ -888,6 +895,7 @@ func TestNewRouterService(t *testing.T) {
888895
MaxReplicas: 4,
889896
AutoscalingMetric: "rps",
890897
AutoscalingTarget: "100",
898+
TopologySpreadConstraints: testTopologySpreadConstraints,
891899
QueueProxyResourcePercentage: 20,
892900
UserContainerLimitRequestFactor: 1.5,
893901
},
@@ -963,6 +971,7 @@ func TestNewRouterService(t *testing.T) {
963971
MaxReplicas: 4,
964972
AutoscalingMetric: "memory",
965973
AutoscalingTarget: "90",
974+
TopologySpreadConstraints: testTopologySpreadConstraints,
966975
QueueProxyResourcePercentage: 20,
967976
UserContainerLimitRequestFactor: 1.5,
968977
},
@@ -1016,7 +1025,7 @@ func TestNewRouterService(t *testing.T) {
10161025

10171026
func TestNewRouterEndpoint(t *testing.T) {
10181027
// Get router version
1019-
sb := NewClusterServiceBuilder(resource.MustParse("2"), resource.MustParse("2Gi"), 30)
1028+
sb := NewClusterServiceBuilder(resource.MustParse("2"), resource.MustParse("2Gi"), 30, testTopologySpreadConstraints)
10201029
testDataBasePath := filepath.Join("..", "..", "testdata", "cluster", "servicebuilder")
10211030
fileBytes, err := tu.ReadFile(filepath.Join(testDataBasePath, "router_version_success.json"))
10221031
require.NoError(t, err)

0 commit comments

Comments
 (0)