Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ jobs:
fi
make deploy-local
minikube image load ghcr.io/flux-framework/flux-operator:test
VERSION=v0.1.3
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
kubectl apply -f examples/dist/flux-operator-local.yaml

- name: Test ${{ matrix.test[0] }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
minikube ssh docker pull ${container}
make deploy-local
minikube image load ghcr.io/flux-framework/flux-operator:test
VERSION=v0.1.3
kubectl apply --server-side -f https://github.com/kubernetes-sigs/jobset/releases/download/$VERSION/manifests.yaml
kubectl apply -f examples/dist/flux-operator-local.yaml

- name: Test ${{ matrix.test[0] }}
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.18 as builder
FROM golang:1.20 as builder

WORKDIR /workspace

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ clean:
kubectl delete -n flux-operator pods --all --grace-period=0 --force
kubectl delete -n flux-operator pvc --all --grace-period=0 --force
kubectl delete -n flux-operator pv --all --grace-period=0 --force
kubectl delete -n flux-operator jobset --all --grace-period=0 --force
kubectl delete -n flux-operator jobs --all --grace-period=0 --force
kubectl delete -n flux-operator MiniCluster --all --grace-period=0 --force

Expand Down
26 changes: 26 additions & 0 deletions chart/templates/manager-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/finalizers
verbs:
- update
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- networking.k8s.io
resources:
Expand Down
26 changes: 26 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,32 @@ rules:
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/finalizers
verbs:
- update
- apiGroups:
- jobset.x-k8s.io
resources:
- jobsets/status
verbs:
- get
- patch
- update
- apiGroups:
- networking.k8s.io
resources:
Expand Down
3 changes: 2 additions & 1 deletion controllers/flux/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (r *MiniClusterReconciler) getContainers(
specs []api.MiniClusterContainer,
defaultName string,
mounts []corev1.VolumeMount,
entrypoint string,
) ([]corev1.Container, error) {

// Create the containers for the pod
Expand All @@ -45,7 +46,7 @@ func (r *MiniClusterReconciler) getContainers(
if container.RunFlux {

// wait.sh path corresponds to container identifier
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
waitScript := fmt.Sprintf("/flux_operator/%s-%d.sh", entrypoint, i)
command = []string{"/bin/bash", waitScript, container.Command}
containerName = defaultName
}
Expand Down
99 changes: 0 additions & 99 deletions controllers/flux/job.go

This file was deleted.

157 changes: 157 additions & 0 deletions controllers/flux/jobset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2023 Lawrence Livermore National Security, LLC
(c.f. AUTHORS, NOTICE.LLNS, COPYING)

This is part of the Flux resource manager framework.
For details, see https://github.com/flux-framework.

SPDX-License-Identifier: Apache-2.0
*/

package controllers

import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

api "flux-framework/flux-operator/api/v1alpha1"

ctrl "sigs.k8s.io/controller-runtime"
jobset "sigs.k8s.io/jobset/api/v1alpha1"
)

func (r *MiniClusterReconciler) newJobSet(
cluster *api.MiniCluster,
) (*jobset.JobSet, error) {

// Name for the broker job for the failure policy
brokerJobName := cluster.Name + "-broker"

// When suspend is true we have a hard time debugging jobs, so keep false
suspend := false
jobs := jobset.JobSet{
ObjectMeta: metav1.ObjectMeta{
Name: "minicluster",
Namespace: cluster.Namespace,
Labels: cluster.Spec.JobLabels,
},
Spec: jobset.JobSetSpec{

// The job is successful when the broker job finishes with completed (0)
SuccessPolicy: &jobset.SuccessPolicy{
Operator: jobset.OperatorAny,
TargetReplicatedJobs: []string{brokerJobName},
},

// This might be the control for child jobs (worker)
// But I don't think we need this anymore.
Suspend: &suspend,
// TODO decide on FailurePolicy here
// default is to fail if all jobs in jobset fail
},
}

// Get leader broker job, the parent in the JobSet (worker or follower pods)
// We have to do this as indexed for the predictable hostname
// cluster, size, entrypoint, indexed
leaderJob, err := r.getJob(cluster, 1, "broker", true)
if err != nil {
return &jobs, err
}
workerJob, err := r.getJob(cluster, cluster.Spec.Size-1, "worker", true)
if err != nil {
return &jobs, err
}
jobs.Spec.ReplicatedJobs = []jobset.ReplicatedJob{leaderJob, workerJob}
ctrl.SetControllerReference(cluster, &jobs, r.Scheme)
return &jobs, nil
}

// getJob creates a job for a main leader (broker) or worker (followers)
func (r *MiniClusterReconciler) getJob(
cluster *api.MiniCluster,
size int32,
entrypoint string,
indexed bool,
) (jobset.ReplicatedJob, error) {

backoffLimit := int32(100)
podLabels := r.getPodLabels(cluster)
enableDNSHostnames := false
completionMode := batchv1.NonIndexedCompletion

if indexed {
completionMode = batchv1.IndexedCompletion
}

job := jobset.ReplicatedJob{
Name: cluster.Name + "-" + entrypoint,

// This would allow pods to be reached by their hostnames!
// It doesn't work for the Flux broker config at the moment,
// but could if we are allowed to specify the service name.
// <jobSet.name>-<spec.replicatedJob.name>-<job-index>-<pod-index>.<jobSet.name>-<spec.replicatedJob.name>
Network: &jobset.Network{
EnableDNSHostnames: &enableDNSHostnames,
},

Template: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: cluster.Namespace,
Labels: cluster.Spec.JobLabels,
},
},
// This is the default, but let's be explicit
Replicas: 1,
}

// Create the JobSpec for the job -> Template -> Spec
jobspec := batchv1.JobSpec{
BackoffLimit: &backoffLimit,
Completions: &size,
Parallelism: &size,
CompletionMode: &completionMode,
ActiveDeadlineSeconds: &cluster.Spec.DeadlineSeconds,

// Note there is parameter to limit runtime
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name,
Namespace: cluster.Namespace,
Labels: podLabels,
Annotations: cluster.Spec.Pod.Annotations,
},
Spec: corev1.PodSpec{
// matches the service
Subdomain: restfulServiceName,
Volumes: getVolumes(cluster, entrypoint),
RestartPolicy: corev1.RestartPolicyOnFailure,
ImagePullSecrets: getImagePullSecrets(cluster),
ServiceAccountName: cluster.Spec.Pod.ServiceAccountName,
NodeSelector: cluster.Spec.Pod.NodeSelector,
},
},
}
// Get resources for the pod
resources, err := r.getPodResources(cluster)
r.log.Info("🌀 MiniCluster", "Pod.Resources", resources)
if err != nil {
r.log.Info("🌀 MiniCluster", "Pod.Resources", resources)
return job, err
}
jobspec.Template.Spec.Overhead = resources

// Get volume mounts, add on container specific ones
mounts := getVolumeMounts(cluster)
containers, err := r.getContainers(
cluster.Spec.Containers,
cluster.Name,
mounts,
entrypoint,
)
jobspec.Template.Spec.Containers = containers
job.Template.Spec = jobspec
return job, err
}
Loading