Skip to content

Commit 1e50423

Browse files
authored
Fix shutdown of CRD process proxy (and subclasses) (#1234)
* Fix CRD/SparkOperator shutdown * Add some troubleshooting, prefer using 'namespaced' methods * Set EG_INHERITED_ENVS to include PATH for bootstrapping kernels * Refactor k8s-based process proxies regarding status and termination * Apply labels to spark operator executors * Minor refactor to reference k8s kind names * Cap pyzmq < 25 since jupyter_client 6 references removed code
1 parent dcaeb56 commit 1e50423

File tree

6 files changed

+96
-112
lines changed

6 files changed

+96
-112
lines changed

enterprise_gateway/services/processproxies/crd.py

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,49 @@
1515
class CustomResourceProcessProxy(KubernetesProcessProxy):
1616
"""A custom resource process proxy."""
1717

18-
group = version = plural = None
19-
custom_resource_template_name = None
20-
kernel_resource_name = None
18+
# Identifies the kind of object being managed by this process proxy.
19+
# For these values we will prefer the values found in the 'kind' field
20+
# of the object's metadata. This attribute is strictly used to provide
21+
# context to log messages.
22+
object_kind = "CustomResourceDefinition"
2123

2224
def __init__(self, kernel_manager: RemoteKernelManager, proxy_config: dict):
2325
"""Initialize the proxy."""
2426
super().__init__(kernel_manager, proxy_config)
27+
self.group = self.version = self.plural = None
28+
self.kernel_resource_name = None
2529

2630
async def launch_process(
2731
self, kernel_cmd: str, **kwargs: dict[str, Any] | None
2832
) -> "CustomResourceProcessProxy":
2933
"""Launch the process for a kernel."""
30-
kwargs["env"][
31-
"KERNEL_RESOURCE_NAME"
32-
] = self.kernel_resource_name = self._determine_kernel_pod_name(**kwargs)
34+
self.kernel_resource_name = self._determine_kernel_pod_name(**kwargs)
35+
kwargs["env"]["KERNEL_RESOURCE_NAME"] = self.kernel_resource_name
3336
kwargs["env"]["KERNEL_CRD_GROUP"] = self.group
3437
kwargs["env"]["KERNEL_CRD_VERSION"] = self.version
3538
kwargs["env"]["KERNEL_CRD_PLURAL"] = self.plural
3639

3740
await super().launch_process(kernel_cmd, **kwargs)
3841
return self
3942

40-
def terminate_container_resources(self) -> bool | None:
41-
"""Terminate the resources for the container."""
42-
result = None
43-
44-
if self.kernel_resource_name:
45-
if self.delete_kernel_namespace and not self.kernel_manager.restarting:
46-
body = client.V1DeleteOptions(
47-
grace_period_seconds=0, propagation_policy="Background"
48-
)
49-
v1_status = client.CoreV1Api().delete_namespace(
50-
name=self.kernel_namespace, body=body
51-
)
52-
53-
if v1_status and v1_status.status:
54-
termination_status = ["Succeeded", "Failed", "Terminating"]
55-
if any(status in v1_status.status for status in termination_status):
56-
result = True
57-
else:
58-
try:
59-
delete_status = client.CustomObjectsApi().delete_cluster_custom_object(
60-
self.group, self.version, self.plurals, self.kernel_resource_name
61-
)
62-
63-
result = delete_status and delete_status.get("status", None) == "Success"
64-
65-
except Exception as err:
66-
result = isinstance(err, client.rest.ApiException) and err.status == 404
67-
68-
if result:
69-
self.kernel_resource_name = None
43+
def delete_managed_object(self, termination_stati: list[str]) -> bool:
44+
"""Deletes the object managed by this process-proxy
45+
46+
A return value of True indicates the object is considered deleted,
47+
otherwise a False or None value is returned.
48+
49+
Note: the caller is responsible for handling exceptions.
50+
"""
51+
delete_status = client.CustomObjectsApi().delete_namespaced_custom_object(
52+
self.group,
53+
self.version,
54+
self.kernel_namespace,
55+
self.plural,
56+
self.kernel_resource_name,
57+
grace_period_seconds=0,
58+
propagation_policy="Background",
59+
)
60+
61+
result = delete_status and delete_status.get("status", None) in termination_stati
7062

7163
return result

enterprise_gateway/services/processproxies/k8s.py

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ class KubernetesProcessProxy(ContainerProcessProxy):
3737
Kernel lifecycle management for Kubernetes kernels.
3838
"""
3939

40+
# Identifies the kind of object being managed by this process proxy.
41+
# For these values we will prefer the values found in the 'kind' field
42+
# of the object's metadata. This attribute is strictly used to provide
43+
# context to log messages.
44+
object_kind = "Pod"
45+
4046
def __init__(self, kernel_manager: RemoteKernelManager, proxy_config: dict):
4147
"""Initialize the proxy."""
4248
super().__init__(kernel_manager, proxy_config)
@@ -91,19 +97,37 @@ def get_container_status(self, iteration: int | None) -> str | None:
9197

9298
if iteration: # only log if iteration is not None (otherwise poll() is too noisy)
9399
self.log.debug(
94-
"{}: Waiting to connect to k8s pod in namespace '{}'. "
95-
"Name: '{}', Status: '{}', Pod IP: '{}', KernelID: '{}'".format(
96-
iteration,
97-
self.kernel_namespace,
98-
self.container_name,
99-
pod_status,
100-
self.assigned_ip,
101-
self.kernel_id,
102-
)
100+
f"{iteration}: Waiting to connect to k8s {self.object_kind.lower()} in "
101+
f"namespace '{self.kernel_namespace}'. Name: '{self.container_name}', "
102+
f"Status: '{pod_status}', Pod IP: '{self.assigned_ip}', KernelID: '{self.kernel_id}'"
103103
)
104104

105105
return pod_status
106106

107+
def delete_managed_object(self, termination_stati: list[str]) -> bool:
108+
"""Deletes the object managed by this process-proxy
109+
110+
A return value of True indicates the object is considered deleted,
111+
otherwise a False or None value is returned.
112+
113+
Note: the caller is responsible for handling exceptions.
114+
"""
115+
body = client.V1DeleteOptions(grace_period_seconds=0, propagation_policy="Background")
116+
117+
# Deleting a Pod will return a v1.Pod if found and its status will be a PodStatus containing
118+
# a phase string property
119+
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podstatus-v1-core
120+
v1_pod = client.CoreV1Api().delete_namespaced_pod(
121+
namespace=self.kernel_namespace, body=body, name=self.container_name
122+
)
123+
status = None
124+
if v1_pod and v1_pod.status:
125+
status = v1_pod.status.phase
126+
127+
result = status in termination_stati
128+
129+
return result
130+
107131
def terminate_container_resources(self) -> bool | None:
108132
"""Terminate any artifacts created on behalf of the container's lifetime."""
109133
# Kubernetes objects don't go away on their own - so we need to tear down the namespace
@@ -114,46 +138,36 @@ def terminate_container_resources(self) -> bool | None:
114138
# from the pod deletion API, since it's not necessarily reflective of the actual status.
115139

116140
result = False
117-
body = client.V1DeleteOptions(grace_period_seconds=0, propagation_policy="Background")
141+
termination_stati = ["Succeeded", "Failed", "Terminating", "Success"]
118142

119-
# Delete the pod then, if applicable, the namespace
143+
# Delete the managed object then, if applicable, the namespace
144+
object_type = self.object_kind
120145
try:
121-
object_name = "pod"
122-
status = None
123-
termination_stati = ["Succeeded", "Failed", "Terminating"]
124-
125-
# Deleting a Pod will return a v1.Pod if found and its status will be a PodStatus containing
126-
# a phase string property
127-
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podstatus-v1-core
128-
v1_pod = client.CoreV1Api().delete_namespaced_pod(
129-
namespace=self.kernel_namespace, body=body, name=self.container_name
130-
)
131-
if v1_pod and v1_pod.status:
132-
status = v1_pod.status.phase
133-
134-
if status in termination_stati:
135-
result = True
136-
146+
result = self.delete_managed_object(termination_stati)
137147
if not result:
138-
# If the status indicates the pod is not terminated, capture its current status.
148+
# If the status indicates the object is not terminated, capture its current status.
139149
# If None, update the result to True, else issue warning that it is not YET deleted
140150
# since we still have the hard termination sequence to occur.
141151
cur_status = self.get_container_status(None)
142152
if cur_status is None:
143153
result = True
144154
else:
145155
self.log.warning(
146-
f"Pod {self.kernel_namespace}.{self.container_name} is not yet deleted. "
147-
f"Current status is '{cur_status}'."
156+
f"{object_type} '{self.kernel_namespace}.{self.container_name}'"
157+
f" is not yet deleted. Current status is '{cur_status}'."
148158
)
149159

150160
if self.delete_kernel_namespace and not self.kernel_manager.restarting:
151-
object_name = "namespace"
161+
object_type = "Namespace"
152162
# Status is a return value for calls that don't return other objects.
153163
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#status-v1-meta
164+
body = client.V1DeleteOptions(
165+
grace_period_seconds=0, propagation_policy="Background"
166+
)
154167
v1_status = client.CoreV1Api().delete_namespace(
155168
name=self.kernel_namespace, body=body
156169
)
170+
status = None
157171
if v1_status:
158172
status = v1_status.status
159173

@@ -169,25 +183,23 @@ def terminate_container_resources(self) -> bool | None:
169183

170184
except Exception as err:
171185
if isinstance(err, client.rest.ApiException) and err.status == 404:
172-
result = True # okay if its not found
186+
result = True # okay if it's not found
173187
else:
174-
self.log.warning(f"Error occurred deleting {object_name}: {err}")
188+
self.log.warning(f"Error occurred deleting {object_type.lower()}: {err}")
175189

176190
if result:
177191
self.log.debug(
178-
"KubernetesProcessProxy.terminate_container_resources, pod: {}.{}, kernel ID: {} has "
179-
"been terminated.".format(
180-
self.kernel_namespace, self.container_name, self.kernel_id
181-
)
192+
f"KubernetesProcessProxy.terminate_container_resources, "
193+
f"{self.object_kind}: {self.kernel_namespace}.{self.container_name}, "
194+
f"kernel ID: {self.kernel_id} has been terminated."
182195
)
183196
self.container_name = None
184197
result = None # maintain jupyter contract
185198
else:
186199
self.log.warning(
187-
"KubernetesProcessProxy.terminate_container_resources, pod: {}.{}, kernel ID: {} has "
188-
"not been terminated.".format(
189-
self.kernel_namespace, self.container_name, self.kernel_id
190-
)
200+
"KubernetesProcessProxy.terminate_container_resources, "
201+
f"{self.object_kind}: {self.kernel_namespace}.{self.container_name}, "
202+
f"kernel ID: {self.kernel_id} has not been terminated."
191203
)
192204

193205
# Check if there's a kernel pod template file for this kernel and silently delete it.

enterprise_gateway/services/processproxies/spark_operator.py

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,22 @@
33
# Distributed under the terms of the Modified BSD License.
44
from __future__ import annotations
55

6-
import os
7-
86
from ..kernels.remotemanager import RemoteKernelManager
9-
from .crd import CustomResourceProcessProxy, client
10-
11-
enterprise_gateway_namespace = os.environ.get("EG_NAMESPACE", "default")
7+
from .crd import CustomResourceProcessProxy
128

139

1410
class SparkOperatorProcessProxy(CustomResourceProcessProxy):
1511
"""Spark operator process proxy."""
1612

13+
# Identifies the kind of object being managed by this process proxy.
14+
# For these values we will prefer the values found in the 'kind' field
15+
# of the object's metadata. This attribute is strictly used to provide
16+
# context to log messages.
17+
object_kind = "SparkApplication"
18+
1719
def __init__(self, kernel_manager: RemoteKernelManager, proxy_config: dict):
1820
"""Initialize the proxy."""
1921
super().__init__(kernel_manager, proxy_config)
2022
self.group = "sparkoperator.k8s.io"
2123
self.version = "v1beta2"
2224
self.plural = "sparkapplications"
23-
24-
def get_container_status(self, iteration: int) -> str:
25-
"""Get the container status for a given iteration."""
26-
pod_status = pod_info = None
27-
28-
try:
29-
custom_resource = client.CustomObjectsApi().get_namespaced_custom_object(
30-
self.group,
31-
self.version,
32-
self.kernel_namespace,
33-
self.plural,
34-
self.kernel_resource_name,
35-
)
36-
37-
if custom_resource:
38-
pod_name = custom_resource["status"]["driverInfo"]["podName"]
39-
pod_info = client.CoreV1Api().read_namespaced_pod(pod_name, self.kernel_namespace)
40-
except Exception:
41-
pass
42-
43-
if pod_info and pod_info.status:
44-
pod_status = pod_info.status.phase
45-
if pod_status == "Running" and self.assigned_host == "":
46-
self.assigned_ip = pod_info.status.pod_ip
47-
self.assigned_host = pod_info.metadata.name
48-
self.assigned_node_ip = pod_info.status.host_ip
49-
50-
return pod_status

etc/kernel-launchers/operators/scripts/sparkoperator.k8s.io-v1beta2.yaml.j2

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ spec:
2929
coreLimit: 1000m
3030
memory: 1g
3131
executor:
32+
labels:
33+
kernel_id: "{{ kernel_id }}"
34+
app: enterprise-gateway
35+
component: worker
3236
image: {{ kernel_executor_image }}
3337
instances: 2
3438
cores: 1

etc/kubernetes/helm/enterprise-gateway/values.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ deployment:
6262
tolerations: []
6363
affinity: {}
6464
nodeSelector: {}
65-
extraEnv: {}
65+
extraEnv: {
6666
# SOME_ENV_VAR_WITH_SIMPLE_VALUE: "example"
6767
# SOME_ENV_VAR_WITH_LONG_VALUE: >
6868
# 'this example',
@@ -72,6 +72,8 @@ deployment:
7272
# this example
7373
# will preserve
7474
# line breaks
75+
EG_INHERITED_ENVS: "PATH"
76+
}
7577

7678
# Log output level.
7779
logLevel: DEBUG

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ dependencies = [
3333
"paramiko>=2.11",
3434
"pexpect>=4.8.0",
3535
"pycryptodomex>=3.9.7",
36-
"pyzmq>=17.0",
36+
"pyzmq>=17.0,<25.0", # Pyzmq 25 removes deprecated code that jupyter_client 6 uses, remove if v6 gets updated
3737
"requests>=2.14.2",
3838
"tornado>=6.1",
3939
"traitlets>=5.3.0",

0 commit comments

Comments
 (0)