Skip to content

Commit 9596618

Browse files
committed
with go routine pool
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
1 parent 913e17b commit 9596618

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ const (
3939

4040
var jobInfoMap sync.Map
4141

42+
// Simple worker pool for job info updates
43+
var jobInfoChan = make(chan func(), 300) // Unbuffered channel with unlimited capacity
44+
45+
func init() {
46+
// Start 10 worker goroutines that will live for the entire program
47+
for i := 0; i < 100; i++ {
48+
go func() {
49+
for task := range jobInfoChan {
50+
task() // Execute the function
51+
}
52+
}()
53+
}
54+
}
55+
4256
// RayJobReconciler reconciles a RayJob object
4357
type RayJobReconciler struct {
4458
client.Client
@@ -306,7 +320,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
306320
reason = rayv1.AppFailed
307321
}
308322
} else {
309-
go func() {
323+
// Submit to simple worker pool instead of creating new goroutine
324+
select {
325+
case jobInfoChan <- func() {
310326
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
311327
if err != nil {
312328
logger.Error(err, "Failed to get Job client", "JobId", rayJobInstance.Status.JobId)
@@ -318,7 +334,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
318334
return
319335
}
320336
jobInfoMap.Store(rayJobInstance.Name, *jobInfo)
321-
}()
337+
}:
338+
// Task submitted successfully
339+
default:
340+
// Channel full, skip this update
341+
logger.V(1).Info("Worker pool busy, skipping job info update")
342+
}
322343
}
323344

324345
// Always update RayClusterStatus along with JobStatus and JobDeploymentStatus updates.

0 commit comments

Comments
 (0)