Skip to content

Commit f355e96

Browse files
committed
feat(FR-1562): Setup Relay Subscription for Session Status (#4423)
Resolves #4405 ([FR-1562](https://lablup.atlassian.net/browse/FR-1562)) > [!IMPORTANT] > Currently, the subscription is only applied to the notification shown when a session is created. This PR adds GraphQL subscription support to enable real-time updates for various features: - Implemented GraphQL SSE client for subscription support - Added session status subscription to replace polling-based status updates - Updated session notification components to use subscriptions when available - Added fallback to polling for older manager versions The subscription implementation provides more efficient real-time updates compared to the previous polling approach, reducing unnecessary network requests while providing immediate status changes. [FR-1562]: https://lablup.atlassian.net/browse/FR-1562?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent 5b5dda8 commit f355e96

File tree

8 files changed

+1606
-553
lines changed

8 files changed

+1606
-553
lines changed

data/schema.graphql

Lines changed: 1396 additions & 245 deletions
Large diffs are not rendered by default.

pnpm-lock.yaml

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

react/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"fast-deep-equal": "^3.1.3",
3838
"gpt-tokenizer": "^3.2.0",
3939
"graphql": "^16.12.0",
40+
"graphql-sse": "^2.6.0",
4041
"i18next": "^25.6.0",
4142
"i18next-http-backend": "^3.0.2",
4243
"jotai": "^2.15.0",

react/src/RelayEnvironment.ts

Lines changed: 75 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
// import { createClient } from "graphql-ws";
22
import { manipulateGraphQLQueryWithClientDirectives } from './helper/graphql-transformer';
3+
// import { createClient } from 'graphql-ws';
4+
import { createClient } from 'graphql-sse';
35
import {
46
Environment,
57
Network,
68
RecordSource,
79
Store,
810
FetchFunction,
9-
SubscribeFunction,
1011
RelayFeatureFlags,
12+
RequestParameters,
13+
Variables,
14+
Observable,
1115
} from 'relay-runtime';
1216

1317
RelayFeatureFlags.ENABLE_RELAY_RESOLVERS = true;
1418

15-
const fetchFn: FetchFunction = async (
16-
request,
17-
variables,
18-
// cacheConfig,
19-
// uploadables
20-
) => {
19+
const waitForBAIClient = async () => {
2120
//@ts-ignore
2221
if (globalThis.backendaiclient === undefined) {
2322
// If globalThis.backendaiclient is not defined, wait for the backend-ai-connected event.
@@ -33,6 +32,26 @@ const fetchFn: FetchFunction = async (
3332
document.addEventListener('backend-ai-connected', onBackendAIConnected);
3433
});
3534
}
35+
// @ts-ignore
36+
return globalThis.backendaiclient;
37+
};
38+
const getSubscriptionEndpoint = async () => {
39+
const baliClient = await waitForBAIClient();
40+
let api_endpoint: string = baliClient._config.endpoint;
41+
if (api_endpoint) {
42+
api_endpoint = api_endpoint.replace(/^"+|"+$/g, ''); // Remove leading and trailing quotes
43+
api_endpoint += '/func/admin/gql';
44+
}
45+
return api_endpoint;
46+
};
47+
48+
const fetchFn: FetchFunction = async (
49+
request,
50+
variables,
51+
// cacheConfig,
52+
// uploadables
53+
) => {
54+
await waitForBAIClient();
3655

3756
const transformedQuery = manipulateGraphQLQueryWithClientDirectives(
3857
request.text || '',
@@ -50,15 +69,15 @@ const fetchFn: FetchFunction = async (
5069
variables: variables,
5170
};
5271

53-
//@ts-ignore
72+
// @ts-ignore
5473
const reqInfo = globalThis.backendaiclient?.newSignedRequest(
5574
'POST',
5675
'/admin/gql',
5776
reqBody,
5877
);
5978

6079
const result =
61-
//@ts-ignore
80+
// @ts-ignore
6281
(await globalThis.backendaiclient
6382
?._wrapWithPromise(reqInfo)
6483
.catch((err: any) => {
@@ -83,37 +102,58 @@ const fetchFn: FetchFunction = async (
83102
return result;
84103
};
85104

86-
const subscribeFn: SubscribeFunction | undefined = undefined;
87-
88-
// if (typeof window !== "undefined") {
89-
// // We only want to setup subscriptions if we are on the client.
90-
// const subscriptionsClient = createClient({
91-
// url: WEBSOCKET_ENDPOINT,
92-
// });
105+
const subscriptionsClient = createClient({
106+
url: getSubscriptionEndpoint,
107+
headers: () => {
108+
const sessionId: string | undefined =
109+
// @ts-ignore
110+
globalThis.backendaiclient?._loginSessionId;
111+
const headers: Record<string, string> = {};
112+
if (sessionId) {
113+
headers['X-BackendAI-SessionID'] = sessionId;
114+
}
115+
return headers;
116+
},
117+
retryAttempts: 3,
118+
retry: async () => {
119+
// Wait and retry on connection failure
120+
await new Promise((resolve) => setTimeout(resolve, 1000));
121+
},
122+
});
93123

94-
// subscribeFn = (request, variables) => {
95-
// // To understand why we return Observable.create<any>,
96-
// // please see: https://github.com/enisdenjo/graphql-ws/issues/316#issuecomment-1047605774
97-
// return Observable.create<any>((sink) => {
98-
// if (!request.text) {
99-
// return sink.error(new Error("Operation text cannot be empty"));
100-
// }
124+
function fetchForSubscribe(
125+
operation: RequestParameters,
126+
variables: Variables,
127+
): Observable<any> {
128+
return Observable.create((sink) => {
129+
if (!operation.text) {
130+
return sink.error(new Error('Operation text cannot be empty'));
131+
}
132+
const transformedOperation = manipulateGraphQLQueryWithClientDirectives(
133+
operation.text || '',
134+
variables,
135+
(version) => {
136+
// @ts-ignore
137+
return !globalThis.backendaiclient?.isManagerVersionCompatibleWith(
138+
version,
139+
);
140+
},
141+
);
101142

102-
// return subscriptionsClient.subscribe(
103-
// {
104-
// operationName: request.name,
105-
// query: request.text,
106-
// variables,
107-
// },
108-
// sink
109-
// );
110-
// });
111-
// };
112-
// }
143+
return subscriptionsClient.subscribe(
144+
{
145+
operationName: operation.name,
146+
query: transformedOperation,
147+
variables,
148+
},
149+
sink,
150+
);
151+
});
152+
}
113153

114154
function createRelayEnvironment() {
115155
return new Environment({
116-
network: Network.create(fetchFn, subscribeFn),
156+
network: Network.create(fetchFn, fetchForSubscribe),
117157
store: new Store(new RecordSource()),
118158
});
119159
}

react/src/components/BAIComputeSessionNodeNotificationItem.tsx

Lines changed: 98 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import {
1818
graphql,
1919
useFragment,
2020
useRelayEnvironment,
21+
useSubscription,
2122
} from 'react-relay';
2223
import { useNavigate } from 'react-router-dom';
2324
import { BAIComputeSessionNodeNotificationItemFragment$key } from 'src/__generated__/BAIComputeSessionNodeNotificationItemFragment.graphql';
2425
import { BAIComputeSessionNodeNotificationItemRefreshQuery } from 'src/__generated__/BAIComputeSessionNodeNotificationItemRefreshQuery.graphql';
26+
import { useSuspendedBackendaiClient } from 'src/hooks';
2527
import {
2628
NotificationState,
2729
useSetBAINotification,
@@ -40,6 +42,7 @@ const BAIComputeSessionNodeNotificationItem: React.FC<
4042
const { closeNotification } = useSetBAINotification();
4143
const { t } = useTranslation();
4244
const navigate = useNavigate();
45+
const baiClient = useSuspendedBackendaiClient();
4346
const node = useFragment(
4447
graphql`
4548
fragment BAIComputeSessionNodeNotificationItemFragment on ComputeSessionNode {
@@ -54,24 +57,6 @@ const BAIComputeSessionNodeNotificationItem: React.FC<
5457
sessionFrgmt,
5558
);
5659

57-
// TODO: delete this when Status subscription is implemented
58-
const [delay, setDelay] = useState<number | null>(null);
59-
UNSAFE_useAutoRefreshInterval(node?.id || '', delay);
60-
useEffect(() => {
61-
if (
62-
!node?.status ||
63-
node?.status === 'TERMINATED' ||
64-
node?.status === 'CANCELLED'
65-
) {
66-
setDelay(null);
67-
} else if (node?.status === 'RUNNING') {
68-
setDelay(15000);
69-
} else {
70-
setDelay(3000);
71-
}
72-
}, [node?.status]);
73-
// ---
74-
7560
useUpdateEffect(() => {
7661
if (node?.status === 'TERMINATED' || node?.status === 'CANCELLED') {
7762
setTimeout(() => {
@@ -82,46 +67,56 @@ const BAIComputeSessionNodeNotificationItem: React.FC<
8267

8368
return (
8469
node && (
85-
<BAINotificationItem
86-
title={
87-
<BAIText ellipsis>
88-
{t('general.Session')}:&nbsp;
89-
<BAILink
90-
style={{
91-
fontWeight: 'normal',
92-
}}
93-
title={node.name || ''}
94-
onClick={() => {
95-
navigate(
96-
`/session${node.row_id ? `?${new URLSearchParams({ sessionDetail: node.row_id }).toString()}` : ''}`,
97-
);
98-
closeNotification(notification.key);
99-
}}
100-
>
101-
{node.name}
102-
</BAILink>
103-
</BAIText>
104-
}
105-
description={
106-
<BAIFlex justify="between">
107-
<SessionStatusTag
108-
sessionFrgmt={node || null}
109-
showQueuePosition={false}
110-
showTooltip={false}
111-
/>
112-
<SessionActionButtons
113-
compact
114-
size="small"
115-
sessionFrgmt={node || null}
116-
hiddenButtonKeys={['containerCommit']}
117-
primaryAppOption={primaryAppOption}
118-
/>
119-
</BAIFlex>
120-
}
121-
footer={
122-
showDate ? dayjs(notification.created).format('lll') : undefined
123-
}
124-
/>
70+
<>
71+
<BAINotificationItem
72+
title={
73+
<BAIText ellipsis>
74+
{t('general.Session')}:&nbsp;
75+
<BAILink
76+
style={{
77+
fontWeight: 'normal',
78+
}}
79+
title={node.name || ''}
80+
onClick={() => {
81+
navigate(
82+
`/session${node.row_id ? `?${new URLSearchParams({ sessionDetail: node.row_id }).toString()}` : ''}`,
83+
);
84+
closeNotification(notification.key);
85+
}}
86+
>
87+
{node.name}
88+
</BAILink>
89+
</BAIText>
90+
}
91+
description={
92+
<BAIFlex justify="between">
93+
<SessionStatusTag
94+
sessionFrgmt={node || null}
95+
showQueuePosition={false}
96+
showTooltip={false}
97+
/>
98+
<SessionActionButtons
99+
compact
100+
size="small"
101+
sessionFrgmt={node || null}
102+
hiddenButtonKeys={['containerCommit']}
103+
primaryAppOption={primaryAppOption}
104+
/>
105+
</BAIFlex>
106+
}
107+
footer={
108+
showDate ? dayjs(notification.created).format('lll') : undefined
109+
}
110+
/>
111+
{baiClient.isManagerVersionCompatibleWith('25.16.0') && node.row_id ? (
112+
<SessionStatusRefresherUsingSubscription sessionRowId={node.row_id} />
113+
) : node.row_id && node.status ? (
114+
<UNSAFE_SessionStatusRefresher
115+
id={node.row_id}
116+
status={node.status}
117+
/>
118+
) : null}
119+
</>
125120
)
126121
);
127122
};
@@ -151,3 +146,48 @@ const UNSAFE_useAutoRefreshInterval = (
151146
).toPromise();
152147
}, delay);
153148
};
149+
150+
const SessionStatusRefresherUsingSubscription: React.FC<{
151+
sessionRowId: string;
152+
}> = ({ sessionRowId }) => {
153+
useSubscription({
154+
subscription: graphql`
155+
subscription BAIComputeSessionNodeNotificationItemSubscription(
156+
$session_id: ID!
157+
) {
158+
schedulingEventsBySession(sessionId: $session_id) {
159+
reason
160+
session {
161+
status
162+
...BAIComputeSessionNodeNotificationItemFragment
163+
...SessionNodesFragment
164+
...SessionDetailContentFragment
165+
}
166+
}
167+
}
168+
`,
169+
variables: { session_id: sessionRowId },
170+
});
171+
return null;
172+
};
173+
174+
const UNSAFE_SessionStatusRefresher: React.FC<{
175+
id: string;
176+
status: string;
177+
}> = ({ id, status }) => {
178+
// TODO: delete this when Status subscription is implemented
179+
const [delay, setDelay] = useState<number | null>(null);
180+
UNSAFE_useAutoRefreshInterval(id || '', delay);
181+
useEffect(() => {
182+
if (!status || status === 'TERMINATED' || status === 'CANCELLED') {
183+
setDelay(null);
184+
} else if (status === 'RUNNING') {
185+
setDelay(15000);
186+
} else {
187+
setDelay(3000);
188+
}
189+
}, [status]);
190+
// ---
191+
192+
return null;
193+
};

0 commit comments

Comments
 (0)