Skip to content

Commit 5644984

Browse files
authored
Create scheduler pod through a Deployment (#711)
1 parent 6970a8c commit 5644984

File tree

4 files changed

+115
-67
lines changed

4 files changed

+115
-67
lines changed

dask_kubernetes/common/networking.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,14 @@ async def get_scheduler_address(
196196
async def wait_for_scheduler(api, cluster_name, namespace, timeout=None):
197197
pod_start_time = None
198198
while True:
199+
async with kubernetes.client.api_client.ApiClient() as api_client:
200+
k8s_api = kubernetes.client.CoreV1Api(api_client)
201+
pods = await k8s_api.list_namespaced_pod(
202+
namespace=namespace,
203+
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={cluster_name}",
204+
)
199205
pod = await Pod.objects(api, namespace=namespace).get_by_name(
200-
cluster_name + "-scheduler"
206+
pods.items[0].metadata.name
201207
)
202208
phase = pod.obj["status"]["phase"]
203209
if phase == "Running":

dask_kubernetes/operator/controller/controller.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,22 +60,34 @@ def _get_labels(meta):
6060
}
6161

6262

63-
def build_scheduler_pod_spec(cluster_name, spec, annotations, labels):
63+
def build_scheduler_deployment_spec(
64+
cluster_name, namespace, pod_spec, annotations, labels
65+
):
6466
labels.update(
6567
**{
6668
"dask.org/cluster-name": cluster_name,
6769
"dask.org/component": "scheduler",
6870
"sidecar.istio.io/inject": "false",
6971
}
7072
)
73+
metadata = {
74+
"name": f"{cluster_name}-scheduler",
75+
"labels": labels,
76+
"annotations": annotations,
77+
}
78+
spec = {}
79+
spec["replicas"] = 1
80+
spec["selector"] = {
81+
"matchLabels": labels,
82+
}
83+
spec["template"] = {
84+
"metadata": metadata,
85+
"spec": pod_spec,
86+
}
7187
return {
72-
"apiVersion": "v1",
73-
"kind": "Pod",
74-
"metadata": {
75-
"name": f"{cluster_name}-scheduler",
76-
"labels": labels,
77-
"annotations": annotations,
78-
},
88+
"apiVersion": "apps/v1",
89+
"kind": "Deployment",
90+
"metadata": metadata,
7991
"spec": spec,
8092
}
8193

@@ -270,21 +282,21 @@ async def daskcluster_create_components(
270282
annotations.update(**scheduler_spec["metadata"]["annotations"])
271283
if "labels" in scheduler_spec["metadata"]:
272284
labels.update(**scheduler_spec["metadata"]["labels"])
273-
data = build_scheduler_pod_spec(
274-
name, scheduler_spec.get("spec"), annotations, labels
285+
data = build_scheduler_deployment_spec(
286+
name, namespace, scheduler_spec.get("spec"), annotations, labels
275287
)
276288
kopf.adopt(data)
277289
pod = await api.list_namespaced_pod(
278290
namespace=namespace,
279291
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}",
280292
)
281293
if not pod.items:
282-
await api.create_namespaced_pod(
294+
await kubernetes.client.AppsV1Api(api_client).create_namespaced_deployment(
283295
namespace=namespace,
284296
body=data,
285297
)
286298
logger.info(
287-
f"Scheduler pod {data['metadata']['name']} created in {namespace}."
299+
f"Scheduler deployment {data['metadata']['name']} created in {namespace}."
288300
)
289301

290302
data = build_scheduler_service_spec(
@@ -295,7 +307,7 @@ async def daskcluster_create_components(
295307
namespace=namespace,
296308
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}",
297309
)
298-
if not pod.items:
310+
if not service.items:
299311
await api.create_namespaced_service(
300312
namespace=namespace,
301313
body=data,
@@ -862,8 +874,12 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
862874

863875
pod_ready = False
864876
try:
877+
pods = await coreapi.list_namespaced_pod(
878+
namespace=namespace,
879+
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={spec['cluster']}",
880+
)
865881
scheduler_pod = await coreapi.read_namespaced_pod(
866-
f"{spec['cluster']}-scheduler", namespace
882+
pods.items[0].metadata.name, namespace
867883
)
868884
if scheduler_pod.status.phase == "Running":
869885
pod_ready = True

dask_kubernetes/operator/controller/tests/test_controller.py

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,19 @@ def test_operator_plugins(kopf_runner):
104104
async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster):
105105
with kopf_runner as runner:
106106
async with gen_cluster() as (cluster_name, ns):
107-
scheduler_pod_name = "simple-scheduler"
107+
scheduler_deployment_name = "simple-scheduler"
108108
worker_pod_name = "simple-default-worker"
109109
service_name = "simple-scheduler"
110110

111-
while scheduler_pod_name not in k8s_cluster.kubectl(
112-
"get", "pods", "-n", ns
111+
while scheduler_deployment_name not in k8s_cluster.kubectl(
112+
"get", "deployments", "-n", ns
113113
):
114114
await asyncio.sleep(0.1)
115115
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
116116
await asyncio.sleep(0.1)
117117
while worker_pod_name not in k8s_cluster.kubectl("get", "pods", "-n", ns):
118118
await asyncio.sleep(0.1)
119-
k8s_cluster.kubectl(
120-
"wait",
121-
"-n",
122-
ns,
123-
"pods",
124-
"--for=condition=Ready",
125-
scheduler_pod_name,
126-
"--timeout=120s",
127-
)
119+
128120
with k8s_cluster.port_forward(
129121
f"service/{service_name}", 8786, "-n", ns
130122
) as port:
@@ -256,26 +248,18 @@ async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster):
256248
async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster):
257249
with kopf_runner as runner:
258250
async with gen_cluster() as (cluster_name, ns):
259-
scheduler_pod_name = "simple-scheduler"
251+
scheduler_deployment_name = "simple-scheduler"
260252
worker_pod_name = "simple-default-worker"
261253
service_name = "simple-scheduler"
262-
while scheduler_pod_name not in k8s_cluster.kubectl(
263-
"get", "pods", "-n", ns
254+
while scheduler_deployment_name not in k8s_cluster.kubectl(
255+
"get", "deployments", "-n", ns
264256
):
265257
await asyncio.sleep(0.1)
266258
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
267259
await asyncio.sleep(0.1)
268260
while worker_pod_name not in k8s_cluster.kubectl("get", "pods", "-n", ns):
269261
await asyncio.sleep(0.1)
270-
k8s_cluster.kubectl(
271-
"wait",
272-
"pods",
273-
"-n",
274-
ns,
275-
"--for=condition=Ready",
276-
scheduler_pod_name,
277-
"--timeout=120s",
278-
)
262+
279263
with k8s_cluster.port_forward(
280264
f"service/{service_name}", 8786, "-n", ns
281265
) as port:
@@ -311,26 +295,18 @@ async def test_scalesimplecluster_from_cluster_spec(
311295
):
312296
with kopf_runner as runner:
313297
async with gen_cluster() as (cluster_name, ns):
314-
scheduler_pod_name = "simple-scheduler"
298+
scheduler_deployment_name = "simple-scheduler"
315299
worker_pod_name = "simple-default-worker"
316300
service_name = "simple-scheduler"
317-
while scheduler_pod_name not in k8s_cluster.kubectl(
301+
while scheduler_deployment_name not in k8s_cluster.kubectl(
318302
"get", "pods", "-n", ns
319303
):
320304
await asyncio.sleep(0.1)
321305
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
322306
await asyncio.sleep(0.1)
323307
while worker_pod_name not in k8s_cluster.kubectl("get", "pods", "-n", ns):
324308
await asyncio.sleep(0.1)
325-
k8s_cluster.kubectl(
326-
"wait",
327-
"pods",
328-
"-n",
329-
ns,
330-
"--for=condition=Ready",
331-
scheduler_pod_name,
332-
"--timeout=120s",
333-
)
309+
334310
with k8s_cluster.port_forward(
335311
f"service/{service_name}", 8786, "-n", ns
336312
) as port:
@@ -360,6 +336,44 @@ async def test_scalesimplecluster_from_cluster_spec(
360336
await client.wait_for_workers(3)
361337

362338

339+
@pytest.mark.asyncio
340+
async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster):
341+
with kopf_runner as runner:
342+
async with gen_cluster() as (cluster_name, ns):
343+
scheduler_deployment_name = "simple-scheduler"
344+
worker_pod_name = "simple-default-worker"
345+
service_name = "simple-scheduler"
346+
while scheduler_deployment_name not in k8s_cluster.kubectl(
347+
"get", "pods", "-n", ns
348+
):
349+
await asyncio.sleep(0.1)
350+
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
351+
await asyncio.sleep(0.1)
352+
while worker_pod_name not in k8s_cluster.kubectl("get", "pods", "-n", ns):
353+
await asyncio.sleep(0.1)
354+
k8s_cluster.kubectl(
355+
"delete",
356+
"pods",
357+
"-l",
358+
"dask.org/cluster-name=simple,dask.org/component=scheduler",
359+
"-n",
360+
ns,
361+
)
362+
k8s_cluster.kubectl(
363+
"wait",
364+
"--for=condition=Ready",
365+
"-l",
366+
"dask.org/cluster-name=simple,dask.org/component=scheduler",
367+
"pod",
368+
"-n",
369+
ns,
370+
"--timeout=60s",
371+
)
372+
assert scheduler_deployment_name in k8s_cluster.kubectl(
373+
"get", "pods", "-n", ns
374+
)
375+
376+
363377
def _get_job_status(k8s_cluster, ns):
364378
return json.loads(
365379
k8s_cluster.kubectl(

dask_kubernetes/operator/kubecluster/kubecluster.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -374,11 +374,15 @@ async def _create_cluster(self):
374374
)
375375
except CrashLoopBackOffError as e:
376376
logs = await self._get_logs()
377+
pods = await core_api.list_namespaced_pod(
378+
namespace=self.namespace,
379+
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
380+
)
377381
await self._close()
378382
raise SchedulerStartupError(
379383
"Scheduler failed to start.",
380384
"Scheduler Pod logs:",
381-
logs[self.name + "-scheduler"],
385+
logs[pods.items[0].metadata.name],
382386
) from e
383387
self._log("Waiting for scheduler service")
384388
await wait_for_service(core_api, f"{self.name}-scheduler", self.namespace)
@@ -493,22 +497,30 @@ async def _watch_component_status(self):
493497

494498
# Get Scheduler Pod status
495499
with suppress(pykube.exceptions.ObjectDoesNotExist):
496-
pod = await Pod.objects(
497-
self.k8s_api, namespace=self.namespace
498-
).get_by_name(self.name + "-scheduler")
499-
phase = pod.obj["status"]["phase"]
500-
if phase == "Running":
501-
conditions = {
502-
c["type"]: c["status"] for c in pod.obj["status"]["conditions"]
503-
}
504-
if "Ready" not in conditions or conditions["Ready"] != "True":
505-
phase = "Health Checking"
506-
if "containerStatuses" in pod.obj["status"]:
507-
for container in pod.obj["status"]["containerStatuses"]:
508-
if "waiting" in container["state"]:
509-
phase = container["state"]["waiting"]["reason"]
510-
511-
self._startup_component_status["schedulerpod"] = phase
500+
async with kubernetes.client.api_client.ApiClient() as api_client:
501+
core_api = kubernetes.client.CoreV1Api(api_client)
502+
pods = await core_api.list_namespaced_pod(
503+
namespace=self.namespace,
504+
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
505+
)
506+
if pods.items:
507+
pod = await Pod.objects(
508+
self.k8s_api, namespace=self.namespace
509+
).get_by_name(pods.items[0].metadata.name)
510+
phase = pod.obj["status"]["phase"]
511+
if phase == "Running":
512+
conditions = {
513+
c["type"]: c["status"]
514+
for c in pod.obj["status"]["conditions"]
515+
}
516+
if "Ready" not in conditions or conditions["Ready"] != "True":
517+
phase = "Health Checking"
518+
if "containerStatuses" in pod.obj["status"]:
519+
for container in pod.obj["status"]["containerStatuses"]:
520+
if "waiting" in container["state"]:
521+
phase = container["state"]["waiting"]["reason"]
522+
523+
self._startup_component_status["schedulerpod"] = phase
512524

513525
# Get Scheduler Service status
514526
with suppress(pykube.exceptions.ObjectDoesNotExist):

0 commit comments

Comments
 (0)