Skip to content

Commit 8a17f38

Browse files
committed
first go of jobset wroking
we were not able to use the jobset networking, but instead fell back to our custom headless service with a job-group selector Signed-off-by: vsoch <[email protected]>
1 parent b78ae56 commit 8a17f38

File tree

16 files changed

+179
-135
lines changed

16 files changed

+179
-135
lines changed

.github/workflows/main.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ jobs:
101101
fi
102102
make deploy-local
103103
minikube image load ghcr.io/flux-framework/flux-operator:test
104+
VERSION=v0.1.3
105+
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
104106
kubectl apply -f examples/dist/flux-operator-local.yaml
105107
106108
- name: Test ${{ matrix.test[0] }}

.github/workflows/test-python.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ jobs:
7171
minikube ssh docker pull ${container}
7272
make deploy-local
7373
minikube image load ghcr.io/flux-framework/flux-operator:test
74+
VERSION=v0.1.3
75+
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
7476
kubectl apply -f examples/dist/flux-operator-local.yaml
7577
7678
- name: Test ${{ matrix.test[0] }}

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ clean:
165165
kubectl delete -n flux-operator pods --all --grace-period=0 --force
166166
kubectl delete -n flux-operator pvc --all --grace-period=0 --force
167167
kubectl delete -n flux-operator pv --all --grace-period=0 --force
168+
kubectl delete -n flux-operator jobset --all --grace-period=0 --force
168169
kubectl delete -n flux-operator jobs --all --grace-period=0 --force
169170
kubectl delete -n flux-operator MiniCluster --all --grace-period=0 --force
170171

chart/templates/manager-rbac.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,32 @@ rules:
273273
- patch
274274
- update
275275
- watch
276+
- apiGroups:
277+
- jobset.x-k8s.io
278+
resources:
279+
- jobsets
280+
verbs:
281+
- create
282+
- delete
283+
- get
284+
- list
285+
- patch
286+
- update
287+
- watch
288+
- apiGroups:
289+
- jobset.x-k8s.io
290+
resources:
291+
- jobsets/finalizers
292+
verbs:
293+
- update
294+
- apiGroups:
295+
- jobset.x-k8s.io
296+
resources:
297+
- jobsets/status
298+
verbs:
299+
- get
300+
- patch
301+
- update
276302
- apiGroups:
277303
- networking.k8s.io
278304
resources:

config/rbac/role.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,32 @@ rules:
273273
- patch
274274
- update
275275
- watch
276+
- apiGroups:
277+
- jobset.x-k8s.io
278+
resources:
279+
- jobsets
280+
verbs:
281+
- create
282+
- delete
283+
- get
284+
- list
285+
- patch
286+
- update
287+
- watch
288+
- apiGroups:
289+
- jobset.x-k8s.io
290+
resources:
291+
- jobsets/finalizers
292+
verbs:
293+
- update
294+
- apiGroups:
295+
- jobset.x-k8s.io
296+
resources:
297+
- jobsets/status
298+
verbs:
299+
- get
300+
- patch
301+
- update
276302
- apiGroups:
277303
- networking.k8s.io
278304
resources:

controllers/flux/containers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func (r *MiniClusterReconciler) getContainers(
2323
specs []api.MiniClusterContainer,
2424
defaultName string,
2525
mounts []corev1.VolumeMount,
26+
entrypoint string,
2627
) ([]corev1.Container, error) {
2728

2829
// Create the containers for the pod
@@ -45,7 +46,7 @@ func (r *MiniClusterReconciler) getContainers(
4546
if container.RunFlux {
4647

4748
// wait.sh path corresponds to container identifier
48-
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
49+
waitScript := fmt.Sprintf("/flux_operator/%s-%d.sh", entrypoint, i)
4950
command = []string{"/bin/bash", waitScript, container.Command}
5051
containerName = defaultName
5152
}

controllers/flux/job.go

Lines changed: 0 additions & 100 deletions
This file was deleted.

controllers/flux/jobset.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,37 @@ import (
1717

1818
api "flux-framework/flux-operator/api/v1alpha1"
1919

20+
ctrl "sigs.k8s.io/controller-runtime"
2021
jobset "sigs.k8s.io/jobset/api/v1alpha1"
2122
)
2223

2324
func (r *MiniClusterReconciler) newJobSet(
2425
cluster *api.MiniCluster,
2526
) (*jobset.JobSet, error) {
2627

27-
suspend := true
28+
// When suspend is true we have a hard time debugging jobs, so keep false
29+
suspend := false
2830
jobs := jobset.JobSet{
2931
ObjectMeta: metav1.ObjectMeta{
30-
Name: cluster.Name,
32+
Name: "minicluster",
3133
Namespace: cluster.Namespace,
3234
Labels: cluster.Spec.JobLabels,
3335
},
3436
Spec: jobset.JobSetSpec{
3537

36-
// Suspend child jobs (the worker pods) when broker finishes
38+
// This might be the control for child jobs (worker)
39+
// But I don't think we need this anymore.
3740
Suspend: &suspend,
3841
// TODO decide on FailurePolicy here
3942
// default is to fail if all jobs in jobset fail
4043
},
4144
}
4245

4346
// Get leader broker job, the parent in the JobSet (worker or follower pods)
47+
// Both are required to be in indexed completion mode to have a service!
48+
// I'm not sure that totally makes sense, will suggest a change.
4449
// cluster, size, entrypoint, indexed
45-
leaderJob, err := r.getJob(cluster, 1, "broker", false)
50+
leaderJob, err := r.getJob(cluster, 1, "broker", true)
4651
if err != nil {
4752
return &jobs, err
4853
}
@@ -51,10 +56,11 @@ func (r *MiniClusterReconciler) newJobSet(
5156
return &jobs, err
5257
}
5358
jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob}
59+
ctrl.SetControllerReference(cluster, &jobs, r.Scheme)
5460
return &jobs, nil
5561
}
5662

57-
// getBrokerJob creates the job for the main leader broker
63+
// getJob creates a job for a main leader (broker) or worker (followers)
5864
func (r *MiniClusterReconciler) getJob(
5965
cluster *api.MiniCluster,
6066
size int32,
@@ -64,18 +70,19 @@ func (r *MiniClusterReconciler) getJob(
6470

6571
backoffLimit := int32(100)
6672
podLabels := r.getPodLabels(cluster)
67-
enableDNSHostnames := true
73+
enableDNSHostnames := false
6874
completionMode := batchv1.NonIndexedCompletion
6975

7076
if indexed {
7177
completionMode = batchv1.IndexedCompletion
7278
}
7379

74-
// TODO how are these named
7580
job := jobset.ReplicatedJob{
7681
Name: cluster.Name + "-" + entrypoint,
7782

78-
// Allow pods to be reached by their hostnames! A simple boolean! Chef's kiss!
83+
// This would allow pods to be reached by their hostnames!
84+
// It doesn't work for the Flux broker config at the moment,
85+
// but could if we are allowed to specify the service name.
7986
// <jobSet.name>-<spec.replicatedJob.name>-<job-index>-<pod-index>.<jobSet.name>-<spec.replicatedJob.name>
8087
Network: &jobset.Network{
8188
EnableDNSHostnames: &enableDNSHostnames,
@@ -110,7 +117,7 @@ func (r *MiniClusterReconciler) getJob(
110117
},
111118
Spec: corev1.PodSpec{
112119
// matches the service
113-
// Subdomain: restfulServiceName,
120+
Subdomain: restfulServiceName,
114121
Volumes: getVolumes(cluster, entrypoint),
115122
RestartPolicy: corev1.RestartPolicyOnFailure,
116123
ImagePullSecrets: getImagePullSecrets(cluster),
@@ -130,7 +137,12 @@ func (r *MiniClusterReconciler) getJob(
130137

131138
// Get volume mounts, add on container specific ones
132139
mounts := getVolumeMounts(cluster)
133-
containers, err := r.getContainers(cluster.Spec.Containers, cluster.Name, mounts)
140+
containers, err := r.getContainers(
141+
cluster.Spec.Containers,
142+
cluster.Name,
143+
mounts,
144+
entrypoint,
145+
)
134146
jobspec.Template.Spec.Containers = containers
135147
job.Template.Spec = jobspec
136148
return job, err

controllers/flux/minicluster.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,11 @@ func (r *MiniClusterReconciler) ensureMiniCluster(
8989
}
9090

9191
// Create headless service for the MiniCluster
92-
// We should not technically need this anymore.
93-
// TODO I need to test the cluster, but I can't get the JobSet working
94-
//selector := map[string]string{"job-name": cluster.Name}
95-
//result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
96-
//if err != nil {
97-
// return result, err
98-
//}
92+
selector := map[string]string{"job-group": cluster.Name}
93+
result, err = r.exposeServices(ctx, cluster, restfulServiceName, selector)
94+
if err != nil {
95+
return result, err
96+
}
9997

10098
// Create the batch job that brings it all together!
10199
// A batchv1.Job can hold a spec for containers that use the configs we just made
@@ -452,16 +450,19 @@ func (r *MiniClusterReconciler) getConfigMap(
452450
func generateHostlist(cluster *api.MiniCluster, size int) string {
453451

454452
// The hosts are generated through the max size, so the cluster can expand
455-
return fmt.Sprintf("%s-[%s]", cluster.Name, generateRange(size))
453+
// minicluster-flux-sample-broker-0-0
454+
// minicluster-flux-sample-worker-0-1 through 0-3 for a size 4 cluster
455+
return fmt.Sprintf("minicluster-%s-broker-0-0,minicluster-%s-worker-0-[%s]", cluster.Name, cluster.Name, generateRange(size-1))
456456
}
457457

458458
// generateFluxConfig creates the broker.toml file used to boostrap flux
459459
func generateFluxConfig(cluster *api.MiniCluster) string {
460460

461461
// The hosts are generated through the max size, so the cluster can expand
462+
brokerFqdn := fmt.Sprintf("minicluster-%s-broker-0-0", cluster.Name)
462463
fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", restfulServiceName, cluster.Namespace)
463-
hosts := fmt.Sprintf("[%s]", generateRange(int(cluster.Spec.MaxSize)))
464-
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, cluster.Name, hosts)
464+
hosts := fmt.Sprintf("%s, minicluster-%s-worker-0-[%s]", brokerFqdn, cluster.Name, generateRange(int(cluster.Spec.MaxSize-1)))
465+
fluxConfig := fmt.Sprintf(brokerConfigTemplate, fqdn, hosts)
465466
fluxConfig += "\n" + brokerArchiveSection
466467
return fluxConfig
467468
}

controllers/flux/minicluster_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ func NewMiniClusterReconciler(
8989
//+kubebuilder:rbac:groups=networking.k8s.io,resources="ingresses",verbs=get;list;watch;create;update;patch;delete
9090

9191
//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update
92+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete
93+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
94+
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=update
9295
//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete;exec
9396
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get;list;watch;create;update;patch;delete;exec
9497

0 commit comments

Comments
 (0)