Skip to content

Commit 372c33b

Browse files
committed
with go routine pool
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
1 parent fc730de commit 372c33b

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ const (
3939

4040
var jobInfoMap sync.Map
4141

42+
// Simple worker pool for job info updates
43+
var jobInfoChan = make(chan func(), 300) // Unbuffered channel with unlimited capacity
44+
45+
func init() {
46+
// Start 10 worker goroutines that will live for the entire program
47+
for i := 0; i < 100; i++ {
48+
go func() {
49+
for task := range jobInfoChan {
50+
task() // Execute the function
51+
}
52+
}()
53+
}
54+
}
55+
4256
// RayJobReconciler reconciles a RayJob object
4357
type RayJobReconciler struct {
4458
client.Client
@@ -312,7 +326,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
312326
reason = rayv1.AppFailed
313327
}
314328
} else {
315-
go func() {
329+
// Submit to simple worker pool instead of creating new goroutine
330+
select {
331+
case jobInfoChan <- func() {
316332
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
317333
if err != nil {
318334
logger.Error(err, "Failed to get Job client", "JobId", rayJobInstance.Status.JobId)
@@ -324,7 +340,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
324340
return
325341
}
326342
jobInfoMap.Store(rayJobInstance.Name, *jobInfo)
327-
}()
343+
}:
344+
// Task submitted successfully
345+
default:
346+
// Channel full, skip this update
347+
logger.V(1).Info("Worker pool busy, skipping job info update")
348+
}
328349
}
329350

330351
// Always update RayClusterStatus along with JobStatus and JobDeploymentStatus updates.

0 commit comments

Comments
 (0)