Skip to content
2 changes: 1 addition & 1 deletion apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct {
// Create RayJobSubmissionServiceServer
func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer {
zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel)
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)}
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil, nil)}
}

// Submit Ray job
Expand Down
6 changes: 4 additions & 2 deletions ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package v1alpha1

import (
cmap "github.com/orcaman/concurrent-map/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -85,8 +87,8 @@ type Configuration struct {
EnableMetrics bool `json:"enableMetrics,omitempty"`
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
func (config Configuration) GetDashboardClient(mgr manager.Manager, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, jobInfoMap)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
Expand Down
28 changes: 20 additions & 8 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-logr/logr"
cmap "github.com/orcaman/concurrent-map/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)

Expand All @@ -40,9 +42,9 @@ const (
// RayJobReconciler reconciles a RayJob object
type RayJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

Scheme *runtime.Scheme
Recorder record.EventRecorder
JobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we try not injecting this into the RayJobReconciler? I think it should be an implementation detail of the dashboard client and should be better hidden by the it.

dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
options RayJobReconcilerOptions
}
Expand All @@ -53,11 +55,13 @@ type RayJobReconcilerOptions struct {

// NewRayJobReconciler returns a new reconcile.Reconciler
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler {
dashboardClientFunc := provider.GetDashboardClient(mgr)
JobInfoMap := cmap.New[*utiltypes.RayJobInfo]()
dashboardClientFunc := provider.GetDashboardClient(mgr, &JobInfoMap)
return &RayJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
JobInfoMap: &JobInfoMap,
dashboardClientFunc: dashboardClientFunc,
options: options,
}
Expand Down Expand Up @@ -263,9 +267,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
var jobInfo *utiltypes.RayJobInfo
if loadedJobInfo, ok := r.JobInfoMap.Get(rayJobInstance.Status.JobId); ok {
logger.Info("Job info found in map", "JobId", rayJobInstance.Status.JobId, "JobInfo", loadedJobInfo)
jobInfo = loadedJobInfo
} else {
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) {
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
Expand All @@ -275,10 +281,16 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
logger.Info("Job info not found in map", "JobId", rayJobInstance.Status.JobId)
rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
if jobInfo == nil {
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job
// to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob
// as "Complete" or "Failed" to avoid unnecessary reconciliation.
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type RayServiceReconciler struct {

// NewRayServiceReconciler returns a new reconcile.Reconciler
func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayServiceReconciler {
dashboardClientFunc := provider.GetDashboardClient(mgr)
dashboardClientFunc := provider.GetDashboardClient(mgr, nil)
httpProxyClientFunc := provider.GetHttpProxyClient(mgr)
return &RayServiceReconciler{
Client: mgr.GetClient(),
Expand Down
4 changes: 3 additions & 1 deletion ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
cmap "github.com/orcaman/concurrent-map/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -36,6 +37,7 @@ import (
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

// These tests use Ginkgo (BDD-style Go testing framework). Refer to
Expand All @@ -52,7 +54,7 @@ var (

type TestClientProvider struct{}

func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return func(_ *rayv1.RayCluster, _ string) (dashboardclient.RayDashboardClientInterface, error) {
return fakeRayDashboardClient, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strings"

cmap "github.com/orcaman/concurrent-map/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -25,12 +26,13 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo])
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error)
GetMultiApplicationStatus(context.Context) (map[string]*utiltypes.ServeApplicationStatus, error)
GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error)
AsyncGetJobInfo(ctx context.Context, jobId string)
ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error)
SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error)
SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error)
Expand All @@ -41,12 +43,16 @@ type RayDashboardClientInterface interface {

type RayDashboardClient struct {
client *http.Client
workerPool *WorkerPool
jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]
dashboardURL string
}

func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) {
func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) {
r.client = client
r.dashboardURL = dashboardURL
r.workerPool = workerPool
r.jobInfoMap = jobInfoMap
}

// UpdateDeployments update the deployments in the Ray cluster.
Expand Down Expand Up @@ -161,6 +167,24 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti
return &jobInfo, nil
}

func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) {
if _, ok := r.workerPool.channelContent.Get(jobId); ok {
return
}
r.workerPool.channelContent.Set(jobId, struct{}{})
r.workerPool.taskQueue <- func() {
jobInfo, err := r.GetJobInfo(ctx, jobId)
r.workerPool.channelContent.Remove(jobId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use defer r.workerPool.channelContent.Remove(jobId)

if err != nil {
fmt.Printf("AsyncGetJobInfo: error: %v\n", err)
return
}
if jobInfo != nil {
r.jobInfoMap.Set(jobId, jobInfo)
}
}
}
Comment on lines 183 to 200
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually an edge case.
Let's assume

  1. RayJob finalizer deletes the jobID item from jobInfoMap and workerPool.channelContent
  2. the background go routine pool retrieves jobID from r.workerPool.taskQueue
  3. query job info using jobID from 2
  4. store result from 3 in jobInfoMap

In this case, we shouldn't store the result.
However, it's hard to handle this edge case, and the data we store will be near 100 bytes, is it ok not to handle this?
(Let's do the calculation, let's say we have 100,000 RayJob CR, the most stale cache we can produce will be 10MB (100 bytes *100000)

I think the solution to handle this edge case is using another backgroud go routine to list all rayjob CR, and check is there any additional key in jobInfoMap, and delete them

cc @rueian @andrewsykim

need your two's advice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is not hard to avoid. We just need to put a placeholder into the map and only update the map if the placeholder exists.

Copy link
Collaborator

@rueian rueian Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we also need to clear the jobInfoMap before each job retry and deletion.


func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath, nil)
if err != nil {
Expand Down Expand Up @@ -211,6 +235,7 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype
}

req.Header.Set("Content-Type", "application/json")

resp, err := r.client.Do(req)
if err != nil {
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package dashboardclient

import (
"sync"

cmap "github.com/orcaman/concurrent-map/v2"
)

type WorkerPool struct {
channelContent cmap.ConcurrentMap[string, struct{}]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the dashboard client.

taskQueue chan func()
stop chan struct{}
wg sync.WaitGroup
workers int
}

func NewWorkerPool(taskQueue chan func()) *WorkerPool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewWorkerPool(taskQueue chan func()) *WorkerPool {
func NewWorkerPool(workers int) *WorkerPool {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a task queue channel is weird. Specifying a worker count is more understandable. You can also make a buffered channel based on the worker count internally.

wp := &WorkerPool{
taskQueue: taskQueue,
workers: 10,
stop: make(chan struct{}),
channelContent: cmap.New[struct{}](),
}

// Start workers immediately
wp.Start()
return wp
}

// Start launches worker goroutines to consume from queue
func (wp *WorkerPool) Start() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private.

for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker()
}
}

// worker consumes and executes tasks from the queue
func (wp *WorkerPool) worker() {
defer wp.wg.Done()

for {
select {
case <-wp.stop:
return
case task := <-wp.taskQueue:
if task != nil {
task() // Execute the job
}
}
}
}

// Stop shuts down all workers
func (wp *WorkerPool) Stop() {
close(wp.stop)
wp.wg.Wait()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"sync/atomic"

cmap "github.com/orcaman/concurrent-map/v2"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
Expand All @@ -19,7 +21,7 @@ type FakeRayDashboardClient struct {

var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) {
func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) {
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error {
Expand All @@ -46,6 +48,9 @@ func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (
return &utiltypes.RayJobInfo{JobStatus: rayv1.JobStatusRunning}, nil
}

func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) {
}

func (r *FakeRayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
if mock := r.GetJobInfoMock.Load(); mock != nil {
info, err := (*mock)(ctx, "job_id")
Expand Down
12 changes: 9 additions & 3 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
"unicode"

cmap "github.com/orcaman/concurrent-map/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -27,6 +28,7 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

const (
Expand Down Expand Up @@ -641,7 +643,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool)
}

type ClientProvider interface {
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
GetDashboardClient(mgr manager.Manager, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface
}

Expand Down Expand Up @@ -758,7 +760,9 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
return headServiceURL, nil
}

func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
taskQueue := make(chan func())
workerPool := dashboardclient.NewWorkerPool(taskQueue)
return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
dashboardClient := &dashboardclient.RayDashboardClient{}
if useKubernetesProxy {
Expand All @@ -777,13 +781,15 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
// configured to communicate with the Kubernetes API server.
mgr.GetHTTPClient(),
fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
workerPool,
jobInfoMap,
)
return dashboardClient, nil
}

dashboardClient.InitClient(&http.Client{
Timeout: 2 * time.Second,
}, "http://"+url)
}, "http://"+url, workerPool, jobInfoMap)
return dashboardClient, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/rayjob-submitter/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
}
rayDashboardClient := &dashboardclient.RayDashboardClient{}
address = rayjobsubmitter.JobSubmissionURL(address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil)
submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req)
if err != nil {
if strings.Contains(err.Error(), "Please use a different submission_id") {
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg

g.Expect(err).ToNot(HaveOccurred())
url := fmt.Sprintf("127.0.0.1:%d", localPort)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil)
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
g.Expect(err).ToNot(HaveOccurred())
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
Expand Down