From f83eeef74530e87bdb7898fb3565fe2f67f08500 Mon Sep 17 00:00:00 2001 From: Vojtech Tuma Date: Wed, 10 Jul 2024 19:37:21 +0200 Subject: [PATCH 1/2] Add no-clients-timeout option To automatically shut down the cluster in case no client heartbeats and no computation running or scheduled. Similar to existing idle timeout, but more convenient for ad hoc clusters --- distributed/distributed-schema.yaml | 8 ++++++ distributed/distributed.yaml | 1 + distributed/scheduler.py | 41 +++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 45534e9be80..cf70ffdab61 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -91,6 +91,14 @@ properties: Works in conjunction with idle-timeout. + no-clients-timeout: + type: + - string + - "null" + description: | + Shut down the scheduler after this duration if no client heartbeats received, + and there are no running or queued tasks. + work-stealing: type: boolean description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index a7ec037e27c..4d77990357e 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -19,6 +19,7 @@ distributed: events-cleanup-delay: 1h idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" no-workers-timeout: null # Shut down if there are tasks but no workers to process them + no-clients-timeout: null # Shut down after this duration of client inactivity, similar to idle-timeout work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 0273d333da3..180a7aadd44 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3582,6 +3582,8 @@ class Scheduler(SchedulerState, ServerNode): idle_timeout: float | None _no_workers_since: float | None # Note: not None iff there are pending tasks no_workers_timeout: float | None + _no_clients_since: float + no_clients_timeout: float | None def __init__( self, @@ -3597,6 +3599,7 @@ def __init__( security=None, worker_ttl=None, idle_timeout=None, + no_clients_timeout=None, interface=None, host=None, port=0, @@ -3656,6 +3659,11 @@ def __init__( dask.config.get("distributed.scheduler.no-workers-timeout") ) self._no_workers_since = None + self.no_clients_timeout = parse_timedelta( + no_clients_timeout + or dask.config.get("distributed.scheduler.no-clients-timeout") + ) + self._no_clients_since = time() self.time_started = self.idle_since # compatibility for dask-gateway self._replica_lock = RLock() @@ -3934,6 +3942,9 @@ async def post(self): pc = PeriodicCallback(self._check_no_workers, 250) self.periodic_callbacks["no-workers-timeout"] = pc + pc = PeriodicCallback(self._check_no_clients, 250) + self.periodic_callbacks["no-clients-timeout"] = pc + if extensions is None: extensions = DEFAULT_EXTENSIONS.copy() if not dask.config.get("distributed.scheduler.work-stealing"): @@ -8521,6 +8532,36 @@ def check_idle(self) -> float | None: self._ongoing_background_tasks.call_soon(self.close) return self.idle_since + def _check_no_clients(self) -> None: + """Shut down the schedule if there are no running tasks / no tasks ready to + run, and there hasn't been any client connection for + `distributed.scheduler.no-workers-timeout`.""" + if self.status in (Status.closing, Status.closed): + return # pragma: nocover + + if ( + self.queued + or self.unrunnable + or any(ws.processing for ws in self.workers.values()) + ): + return + + if self.jupyter: + self._no_clients_since = max( + self._no_clients_since, + self._jupyter_server_application.web_app.last_activity().timestamp(), + ) + if self.clients: + clients_hb = max(c.last_seen for c in self.clients.values()) + self._no_clients_since = max(self._no_clients_since, clients_hb) + + if ( + self.no_clients_timeout + and time() > self._no_clients_since + self.no_clients_timeout + ): + logger.info("No tasks and no client heartbeat; shutting scheduler down") + self._ongoing_background_tasks.call_soon(self.close) + def _check_no_workers(self) -> None: """Shut down the scheduler if there have been tasks ready to run which have nowhere to run for `distributed.scheduler.no-workers-timeout`, and there From 6a6b4454844d1d33b913f447404eb958f77d8f65 Mon Sep 17 00:00:00 2001 From: Vojtech Tuma Date: Thu, 11 Jul 2024 14:08:01 +0200 Subject: [PATCH 2/2] Add no-clients-timeout test --- distributed/tests/test_scheduler.py | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9aaea1288a4..305364021d4 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2541,6 +2541,36 @@ async def test_no_workers_timeout_processing(c, s, a, b): await asyncio.sleep(0.01) +@pytest.mark.slow +@gen_cluster(client=True) +async def test_no_clients_timeout(c, s, a, b): + """Tests that with setting `no_clients_timeout` value, a submitted future + does complete, but afterwards cluster shuts down with the expected log message.""" + + beginning = time() + future = c.submit(slowinc, 1) + while not s.tasks: + await asyncio.sleep(0.01) + s.no_clients_timeout = 1.0 + pc = PeriodicCallback(s._check_no_clients, 10) + pc.start() + await future + + with captured_logger("distributed.scheduler") as logs: + start = time() + while s.status != Status.closed: + await asyncio.sleep(0.01) + assert time() < start + 3 + + start = time() + while not (a.status == Status.closed and b.status == Status.closed): + await asyncio.sleep(0.01) + assert time() < start + 1 + + assert "No tasks and no client heartbeat" in logs.getvalue() + pc.stop() + + @gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"}) async def test_bandwidth(c, s, a, b): start = s.bandwidth