Skip to content

Commit e9d4859

Browse files
committed
[RayCluster] Improved the efficiency when checking rayclusters' expectations
1 parent 58c2aad commit e9d4859

File tree

2 files changed

+138
-26
lines changed

2 files changed

+138
-26
lines changed

ray-operator/controllers/ray/expectations/scale_expectations.go

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -70,44 +70,47 @@ func (r *rayClusterScaleExpectationImpl) ExpectScalePod(namespace, rayClusterNam
7070
}
7171
}
7272

73+
func (r *rayClusterScaleExpectationImpl) isPodScaled(ctx context.Context, rp *rayPod, namespace string) bool {
74+
pod := &corev1.Pod{}
75+
switch rp.action {
76+
case Create:
77+
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
78+
return true
79+
}
80+
// Tolerating extreme case:
81+
// The first reconciliation created a Pod. If the Pod was quickly deleted from etcd by another component
82+
// before the second reconciliation. This would lead to never satisfying the expected condition.
83+
// Avoid this by setting a timeout.
84+
return rp.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now())
85+
case Delete:
86+
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
87+
return errors.IsNotFound(err)
88+
}
89+
}
90+
return false
91+
}
92+
7393
func (r *rayClusterScaleExpectationImpl) IsSatisfied(ctx context.Context, namespace, rayClusterName, group string) (isSatisfied bool) {
7494
items, err := r.itemsCache.ByIndex(GroupIndex, fmt.Sprintf("%s/%s/%s", namespace, rayClusterName, group))
7595
if err != nil {
7696
// An error occurs when there is no corresponding IndexFunc for GroupIndex. This should be a fatal error.
7797
panic(err)
7898
}
79-
isSatisfied = true
8099
for i := range items {
81100
rp := items[i].(*rayPod)
82-
pod := &corev1.Pod{}
83-
isPodSatisfied := false
84-
switch rp.action {
85-
case Create:
86-
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err == nil {
87-
isPodSatisfied = true
88-
} else {
89-
// Tolerating extreme case:
90-
// The first reconciliation created a Pod. If the Pod was quickly deleted from etcd by another component
91-
// before the second reconciliation. This would lead to never satisfying the expected condition.
92-
// Avoid this by setting a timeout.
93-
isPodSatisfied = rp.recordTimestamp.Add(ExpectationsTimeout).Before(time.Now())
94-
}
95-
case Delete:
96-
if err := r.Get(ctx, types.NamespacedName{Name: rp.name, Namespace: namespace}, pod); err != nil {
97-
isPodSatisfied = errors.IsNotFound(err)
98-
}
101+
isPodSatisfied := r.isPodScaled(ctx, rp, namespace)
102+
103+
if !isPodSatisfied {
104+
return false
99105
}
106+
100107
// delete satisfied item in cache
101-
if isPodSatisfied {
102-
if err := r.itemsCache.Delete(items[i]); err != nil {
103-
// Fatal error in KeyFunc.
104-
panic(err)
105-
}
106-
} else {
107-
isSatisfied = false
108+
if err := r.itemsCache.Delete(items[i]); err != nil {
109+
// Fatal error in KeyFunc.
110+
panic(err)
108111
}
109112
}
110-
return isSatisfied
113+
return true
111114
}
112115

113116
func (r *rayClusterScaleExpectationImpl) Delete(rayClusterName, namespace string) {

ray-operator/controllers/ray/expectations/scale_expectations_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/stretchr/testify/require"
1010
corev1 "k8s.io/api/core/v1"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
1213
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
1314
)
1415

@@ -166,3 +167,111 @@ func getTestPod() []corev1.Pod {
166167
},
167168
}
168169
}
170+
171+
func TestIsPodScaled(t *testing.T) {
172+
ctx := context.Background()
173+
tests := []struct {
174+
action ScaleAction
175+
expectedResult bool
176+
name string
177+
setupFunc func(client.Client, *corev1.Pod)
178+
}{
179+
{
180+
name: "Create action - pod exists",
181+
action: Create,
182+
expectedResult: true,
183+
setupFunc: func(client client.Client, pod *corev1.Pod) {
184+
err := client.Create(ctx, pod)
185+
require.NoError(t, err)
186+
},
187+
},
188+
{
189+
name: "Create action - pod does not exist",
190+
action: Create,
191+
expectedResult: false,
192+
setupFunc: func(_ client.Client, _ *corev1.Pod) {},
193+
},
194+
{
195+
name: "Delete action - pod exists",
196+
action: Delete,
197+
expectedResult: false,
198+
setupFunc: func(client client.Client, pod *corev1.Pod) {
199+
err := client.Create(ctx, pod)
200+
require.NoError(t, err)
201+
},
202+
},
203+
{
204+
name: "Delete action - pod does not exist",
205+
action: Delete,
206+
expectedResult: true,
207+
setupFunc: func(_ client.Client, _ *corev1.Pod) {},
208+
},
209+
}
210+
211+
for _, tt := range tests {
212+
t.Run(tt.name, func(t *testing.T) {
213+
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
214+
exp := &rayClusterScaleExpectationImpl{
215+
Client: fakeClient,
216+
itemsCache: nil, // Not used in isPodScaled
217+
}
218+
219+
testPod := &corev1.Pod{
220+
ObjectMeta: metav1.ObjectMeta{
221+
Name: "test-pod",
222+
Namespace: "default",
223+
},
224+
}
225+
226+
rp := &rayPod{
227+
name: testPod.Name,
228+
namespace: testPod.Namespace,
229+
action: tt.action,
230+
recordTimestamp: time.Now(),
231+
}
232+
233+
tt.setupFunc(fakeClient, testPod)
234+
235+
result := exp.isPodScaled(ctx, rp, testPod.Namespace)
236+
assert.Equal(t, tt.expectedResult, result)
237+
})
238+
}
239+
}
240+
241+
func TestIsPodScaledTimeout(t *testing.T) {
242+
ctx := context.Background()
243+
244+
// Save original timeout and restore after test
245+
originalTimeout := ExpectationsTimeout
246+
ExpectationsTimeout = 20 * time.Millisecond
247+
defer func() { ExpectationsTimeout = originalTimeout }()
248+
249+
fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects().Build()
250+
exp := &rayClusterScaleExpectationImpl{
251+
Client: fakeClient,
252+
itemsCache: nil,
253+
}
254+
255+
testPod := &corev1.Pod{
256+
ObjectMeta: metav1.ObjectMeta{
257+
Name: "test-pod",
258+
Namespace: "default",
259+
},
260+
}
261+
262+
rp := &rayPod{
263+
name: testPod.Name,
264+
namespace: testPod.Namespace,
265+
action: Create,
266+
recordTimestamp: time.Now(),
267+
}
268+
269+
// Initially should return false (pod doesn't exist)
270+
result := exp.isPodScaled(ctx, rp, testPod.Namespace)
271+
assert.False(t, result)
272+
273+
// After timeout, should return true even though pod doesn't exist
274+
time.Sleep(ExpectationsTimeout + 10*time.Millisecond)
275+
result = exp.isPodScaled(ctx, rp, testPod.Namespace)
276+
assert.True(t, result)
277+
}

0 commit comments

Comments
 (0)