Skip to content

Commit cf8f80e

Browse files
committed
[RayCluster] Improved the efficiency when checking rayclusters' expectations
1 parent 58c2aad commit cf8f80e

File tree

2 files changed

+139
-26
lines changed

2 files changed

+139
-26
lines changed

ray-operator/controllers/ray/expectations/scale_expectations.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,44 +70,47 @@ func (r *rayClusterScaleExpectationImpl) ExpectScalePod(namespace, rayClusterNam
7070
}
7171
}
7272

73+
func (r *rayClusterScaleExpectationImpl) isPodScaled(ctx context.Context, rp *rayPod, namespace string) bool {
74+
pod := &corev1.Pod{}
75+
switch rp.action {
76+
case Create:
77+
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
78+
return true
79+
}
80+
// Tolerating extreme case:
81+
// The first reconciliation created a Pod. If the Pod was quickly deleted from etcd by another component
82+
// before the second reconciliation. This would lead to never satisfying the expected condition.
83+
// Avoid this by setting a timeout.
84+
return rp.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now())
85+
case Delete:
86+
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
87+
return errors.IsNotFound(err)
88+
}
89+
}
90+
return false
91+
}
92+
7393
func (r *rayClusterScaleExpectationImpl) IsSatisfied(ctx context.Context, namespace, rayClusterName, group string) (isSatisfied bool) {
7494
items, err := r.itemsCache.ByIndex(GroupIndex, fmt.Sprintf("%s/%s/%s", namespace, rayClusterName, group))
7595
if err != nil {
7696
// An error occurs when there is no corresponding IndexFunc for GroupIndex. This should be a fatal error.
7797
panic(err)
7898
}
79-
isSatisfied = true
8099
for i := range items {
81100
rp := items[i].(*rayPod)
82-
pod := &corev1.Pod{}
83-
isPodSatisfied := false
84-
switch rp.action {
85-
case Create:
86-
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
87-
isPodSatisfied = true
88-
} else {
89-
// Tolerating extreme case:
90-
// The first reconciliation created a Pod. If the Pod was quickly deleted from etcd by another component
91-
// before the second reconciliation. This would lead to never satisfying the expected condition.
92-
// Avoid this by setting a timeout.
93-
isPodSatisfied = rp.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now())
94-
}
95-
case Delete:
96-
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
97-
isPodSatisfied = errors.IsNotFound(err)
98-
}
101+
isPodSatisfied := r.isPodScaled(ctx, rp, namespace)
102+
103+
if !isPodSatisfied {
104+
return false
99105
}
106+
100107
// delete satisfied item in cache
101-
if isPodSatisfied {
102-
if err := r.itemsCache.Delete(items[i]); err != nil {
103-
// Fatal error in KeyFunc.
104-
panic(err)
105-
}
106-
} else {
107-
isSatisfied = false
108+
if err := r.itemsCache.Delete(items[i]); err != nil {
109+
// Fatal error in KeyFunc.
110+
panic(err)
108111
}
109112
}
110-
return isSatisfied
113+
return true
111114
}
112115

113116
func (r *rayClusterScaleExpectationImpl) Delete(rayClusterName, namespace string) {

ray-operator/controllers/ray/expectations/scale_expectations_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010
corev1 "k8s.io/api/core/v1"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
1213
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
1314
)
1415

@@ -166,3 +167,112 @@ func getTestPod() []corev1.Pod {
166167
},
167168
}
168169
}
170+
171+
func TestIsPodScaled(t *testing.T) {
172+
ctx := context.Background()
173+
174+
tests := []struct {
175+
name string
176+
action ScaleAction
177+
expectedResult bool
178+
setupFunc func(client.Client, *corev1.Pod)
179+
}{
180+
{
181+
name: "Create action - pod exists",
182+
action: Create,
183+
expectedResult: true,
184+
setupFunc: func(client client.Client, pod *corev1.Pod) {
185+
err := client.Create(ctx, pod)
186+
require.NoError(t, err)
187+
},
188+
},
189+
{
190+
name: "Create action - pod does not exist",
191+
action: Create,
192+
expectedResult: false,
193+
setupFunc: func(client client.Client, pod *corev1.Pod) {},
194+
},
195+
{
196+
name: "Delete action - pod exists",
197+
action: Delete,
198+
expectedResult: false,
199+
setupFunc: func(client client.Client, pod *corev1.Pod) {
200+
err := client.Create(ctx, pod)
201+
require.NoError(t, err)
202+
},
203+
},
204+
{
205+
name: "Delete action - pod does not exist",
206+
action: Delete,
207+
expectedResult: true,
208+
setupFunc: func(client client.Client, pod *corev1.Pod) {},
209+
},
210+
}
211+
212+
for _, tt := range tests {
213+
t.Run(tt.name, func(t *testing.T) {
214+
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
215+
exp := &rayClusterScaleExpectationImpl{
216+
Client: fakeClient,
217+
itemsCache: nil, // Not used in isPodScaled
218+
}
219+
220+
testPod := &corev1.Pod{
221+
ObjectMeta: metav1.ObjectMeta{
222+
Name: "test-pod",
223+
Namespace: "default",
224+
},
225+
}
226+
227+
rp := &rayPod{
228+
name: testPod.Name,
229+
namespace: testPod.Namespace,
230+
action: tt.action,
231+
recordTimestamp: time.Now(),
232+
}
233+
234+
tt.setupFunc(fakeClient, testPod)
235+
236+
result := exp.isPodScaled(ctx, rp, testPod.Namespace)
237+
assert.Equal(t, tt.expectedResult, result)
238+
})
239+
}
240+
}
241+
242+
func TestIsPodScaledTimeout(t *testing.T) {
243+
ctx := context.Background()
244+
245+
// Save original timeout and restore after test
246+
originalTimeout := ExpectationsTimeout
247+
ExpectationsTimeout = 20 * time.Millisecond
248+
defer func() { ExpectationsTimeout = originalTimeout }()
249+
250+
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
251+
exp := &rayClusterScaleExpectationImpl{
252+
Client: fakeClient,
253+
itemsCache: nil,
254+
}
255+
256+
testPod := &corev1.Pod{
257+
ObjectMeta: metav1.ObjectMeta{
258+
Name: "test-pod",
259+
Namespace: "default",
260+
},
261+
}
262+
263+
rp := &rayPod{
264+
name: testPod.Name,
265+
namespace: testPod.Namespace,
266+
action: Create,
267+
recordTimestamp: time.Now(),
268+
}
269+
270+
// Initially should return false (pod doesn't exist)
271+
result := exp.isPodScaled(ctx, rp, testPod.Namespace)
272+
assert.False(t, result)
273+
274+
// After timeout, should return true even though pod doesn't exist
275+
time.Sleep(ExpectationsTimeout + 10*time.Millisecond)
276+
result = exp.isPodScaled(ctx, rp, testPod.Namespace)
277+
assert.True(t, result)
278+
}

0 commit comments

Comments
 (0)