diff --git a/apiserver/pkg/server/ray_job_submission_service_server.go b/apiserver/pkg/server/ray_job_submission_service_server.go index 4fdead1e50f..1b026858a3f 100644 --- a/apiserver/pkg/server/ray_job_submission_service_server.go +++ b/apiserver/pkg/server/ray_job_submission_service_server.go @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct { // Create RayJobSubmissionServiceServer func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer { zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel) - return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)} + return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil)} } // Submit Ray job diff --git a/go.mod b/go.mod index 472e6d593df..0db4cf6a5eb 100644 --- a/go.mod +++ b/go.mod @@ -82,6 +82,7 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/go.sum b/go.sum index dddab9f7e86..7549b7d26c1 100644 --- a/go.sum +++ b/go.sum @@ -165,6 +165,8 @@ github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOT github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y= github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 6acaedeb260..d6d25ee3b8d 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -1,6 +1,7 @@ package v1alpha1 import ( + cmap "github.com/orcaman/concurrent-map/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -8,6 +9,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" ) //+kubebuilder:object:root=true @@ -86,7 +88,8 @@ type Configuration struct { } func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy) + jobInfoMap := cmap.New[*utiltypes.RayJobCache]() + return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, &jobInfoMap) } func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 1741507a586..6e4b12977cc 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -30,6 +30,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" "github.com/ray-project/kuberay/ray-operator/pkg/features" ) @@ -41,9 +42,8 @@ const ( // RayJobReconciler reconciles a RayJob object type RayJobReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - + Scheme *runtime.Scheme + Recorder record.EventRecorder dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) options RayJobReconcilerOptions } @@ -275,10 +275,24 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) if err != nil { return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } - - jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) - if err != nil { - // If the Ray job was not found, GetJobInfo returns a BadRequest error. + var jobInfo *utiltypes.RayJobInfo + jobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId) + if jobCache != nil { + if jobCache.Err != nil { + if errors.IsBadRequest(jobCache.Err) && isSubmitterFinished { + rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed + rayJobInstance.Status.Reason = rayv1.AppFailed + rayJobInstance.Status.Message = "Submitter completed but Ray job not found in RayCluster." + break + } + logger.Error(jobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", jobCache.Err) + rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, jobCache.Err + } + jobInfo = jobCache.JobInfo + } else { + // Cache miss: try a direct fetch to disambiguate not-found vs. transient + jobInfo, err = rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) if errors.IsBadRequest(err) { if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode { logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId) @@ -295,11 +309,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } } + } + rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) + if jobInfo == nil { logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } - // If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job // to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob // as "Complete" or "Failed" to avoid unnecessary reconciliation. diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index d360ebc0af9..565f76a4d04 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -9,6 +9,7 @@ import ( "net/url" "strings" + cmap "github.com/orcaman/concurrent-map/v2" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/yaml" @@ -27,28 +28,36 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string) + InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) GetMultiApplicationStatus(context.Context) (map[string]*utiltypes.ServeApplicationStatus, error) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) + AsyncGetJobInfo(ctx context.Context, jobId string) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error) SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error) GetJobLog(ctx context.Context, jobName string) (*string, error) StopJob(ctx context.Context, jobName string) error DeleteJob(ctx context.Context, jobName string) error + GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache } type RayDashboardClient struct { - client *http.Client - dashboardURL string + client *http.Client + workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}] + workerPool *WorkerPool + jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache] + dashboardURL string } -func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) { +func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]) { r.client = client r.dashboardURL = dashboardURL + r.workerPool = workerPool + r.jobInfoMap = jobInfoMap + r.workerPoolChannelContent = workerPoolChannelContent } // UpdateDeployments update the deployments in the Ray cluster. @@ -171,6 +180,25 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti return &jobInfo, nil } +func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) { + if _, ok := r.workerPoolChannelContent.Get(jobId); ok { + return + } + r.workerPoolChannelContent.Set(jobId, struct{}{}) + r.workerPool.taskQueue <- func() { + defer r.workerPoolChannelContent.Remove(jobId) + jobInfo, err := r.GetJobInfo(ctx, jobId) + if err != nil { + err = fmt.Errorf("failed to get job info: %w", err) + r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: nil, Err: err}) + return + } + if jobInfo != nil { + r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: jobInfo, Err: nil}) + } + } +} + func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath, nil) if err != nil { @@ -221,6 +249,7 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype } req.Header.Set("Content-Type", "application/json") + resp, err := r.client.Do(req) if err != nil { return @@ -333,6 +362,13 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro return nil } +func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache { + if jobInfo, ok := r.jobInfoMap.Get(jobId); ok { + return jobInfo + } + return nil +} + func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*utiltypes.RayJobRequest, error) { req := &utiltypes.RayJobRequest{ Entrypoint: rayJob.Spec.Entrypoint, diff --git a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go new file mode 100644 index 00000000000..6b0c0bf6f2e --- /dev/null +++ b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go @@ -0,0 +1,48 @@ +package dashboardclient + +import ( + "sync" +) + +type WorkerPool struct { + taskQueue chan func() + stopChan chan struct{} + wg sync.WaitGroup + workers int +} + +func NewWorkerPool(taskQueue chan func()) *WorkerPool { + wp := &WorkerPool{ + taskQueue: taskQueue, + workers: 10, + stopChan: make(chan struct{}), + } + + // Start workers immediately + wp.start() + return wp +} + +// Start launches worker goroutines to consume from queue +func (wp *WorkerPool) start() { + for i := 0; i < wp.workers; i++ { + wp.wg.Add(1) + go wp.worker() + } +} + +// worker consumes and executes tasks from the queue +func (wp *WorkerPool) worker() { + defer wp.wg.Done() + + for { + select { + case <-wp.stopChan: + return + case task := <-wp.taskQueue: + if task != nil { + task() // Execute the job + } + } + } +} diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index 21a3fdb91be..005a5eed561 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -6,6 +6,8 @@ import ( "net/http" "sync/atomic" + cmap "github.com/orcaman/concurrent-map/v2" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" @@ -19,7 +21,7 @@ type FakeRayDashboardClient struct { var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil) -func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) { +func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], _ *cmap.ConcurrentMap[string, struct{}]) { } func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error { @@ -46,6 +48,9 @@ func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) ( return &utiltypes.RayJobInfo{JobStatus: rayv1.JobStatusRunning}, nil } +func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) { +} + func (r *FakeRayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { if mock := r.GetJobInfoMock.Load(); mock != nil { info, err := (*mock)(ctx, "job_id") @@ -77,3 +82,7 @@ func (r *FakeRayDashboardClient) StopJob(_ context.Context, _ string) (err error func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error { return nil } + +func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobCache { + return nil +} diff --git a/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go index a246ea399f3..54bc373a29d 100644 --- a/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go @@ -46,3 +46,8 @@ type RayJobStopResponse struct { type RayJobLogsResponse struct { Logs string `json:"logs,omitempty"` } + +type RayJobCache struct { + JobInfo *RayJobInfo + Err error +} diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 3bb63f79189..070c3c9ad1c 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -14,6 +14,7 @@ import ( "time" "unicode" + cmap "github.com/orcaman/concurrent-map/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -27,6 +28,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" ) const ( @@ -758,7 +760,10 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray return headServiceURL, nil } -func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { + taskQueue := make(chan func()) + workerPool := dashboardclient.NewWorkerPool(taskQueue) + workerPoolChannelContent := cmap.New[struct{}]() return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { dashboardClient := &dashboardclient.RayDashboardClient{} if useKubernetesProxy { @@ -777,13 +782,16 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun // configured to communicate with the Kubernetes API server. mgr.GetHTTPClient(), fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName), + workerPool, + jobInfoMap, + &workerPoolChannelContent, ) return dashboardClient, nil } dashboardClient.InitClient(&http.Client{ Timeout: 2 * time.Second, - }, "http://"+url) + }, "http://"+url, workerPool, jobInfoMap, &workerPoolChannelContent) return dashboardClient, nil } } diff --git a/ray-operator/rayjob-submitter/cmd/main.go b/ray-operator/rayjob-submitter/cmd/main.go index dd4f68eb420..d11cb02f44e 100644 --- a/ray-operator/rayjob-submitter/cmd/main.go +++ b/ray-operator/rayjob-submitter/cmd/main.go @@ -64,7 +64,7 @@ func main() { } rayDashboardClient := &dashboardclient.RayDashboardClient{} address = rayjobsubmitter.JobSubmissionURL(address) - rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address) + rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil, nil) submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req) if err != nil { if strings.Contains(err.Error(), "Please use a different submission_id") { diff --git a/ray-operator/test/e2erayjob/rayjob_retry_test.go b/ray-operator/test/e2erayjob/rayjob_retry_test.go index 3cbb70ab48a..ce1e565a23c 100644 --- a/ray-operator/test/e2erayjob/rayjob_retry_test.go +++ b/ray-operator/test/e2erayjob/rayjob_retry_test.go @@ -107,7 +107,7 @@ func TestRayJobRetry(t *testing.T) { LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) // Ensure JobDeploymentStatus transit to Failed - g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) // Ensure JobStatus is empty g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)). diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index 42ffd19e0a2..1a6393a2761 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg g.Expect(err).ToNot(HaveOccurred()) url := fmt.Sprintf("127.0.0.1:%d", localPort) - rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false) + rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil) rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url) g.Expect(err).ToNot(HaveOccurred()) serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())