Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions .flake8

This file was deleted.

21 changes: 4 additions & 17 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,35 +1,22 @@
# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
repos:
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- id: absolufy-imports
name: absolufy-imports
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.5
hooks:
- id: ruff-check
- repo: https://github.com/pycqa/isort
rev: 5.13.2
hooks:
- id: isort
language_version: python3
- repo: https://github.com/asottile/pyupgrade
rev: v3.17.0
hooks:
- id: pyupgrade
args:
- --py39-plus
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 25.11.0
hooks:
- id: black
- repo: https://github.com/pycqa/flake8
rev: 7.1.1
hooks:
- id: flake8
language_version: python3
additional_dependencies:
# NOTE: autoupdate does not pick up flake8-bugbear since it is a transitive
# dependency. Make sure to update flake8-bugbear manually on a regular basis.
- flake8-bugbear==24.8.19
- repo: https://github.com/codespell-project/codespell
rev: v2.3.0
hooks:
Expand Down
4 changes: 2 additions & 2 deletions distributed/_concurrent_futures_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
for _, q in items:
q.put(None)
for t, q in items:
for t, _ in items:
t.join()


Expand Down
3 changes: 1 addition & 2 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,7 @@ class Suggestion(NamedTuple):


if TYPE_CHECKING:
# TODO import from typing (requires Python >=3.10)
from typing_extensions import TypeAlias
from typing import TypeAlias

SuggestionGenerator: TypeAlias = Generator[
Suggestion, Union["scheduler_module.WorkerState", None], None
Expand Down
2 changes: 1 addition & 1 deletion distributed/cfexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _cascade_future(future, cf_future):
try:
typ, exc, tb = result
raise exc.with_traceback(tb)
except BaseException as exc: # noqa: B036
except BaseException as exc:
cf_future.set_exception(exc)


Expand Down
6 changes: 1 addition & 5 deletions distributed/cli/dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,7 @@ def main(
import distributed

print("\n---------------------------------------------------------------")
print(
" Dask.distributed v{version}\n".format(
version=distributed.__version__
)
)
print(f" Dask.distributed v{distributed.__version__}\n")
print(f"Worker nodes: {len(hostnames)}")
for i, host in enumerate(hostnames):
print(f" {i}: {host}")
Expand Down
27 changes: 14 additions & 13 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
import warnings
import weakref
from collections import defaultdict
from collections.abc import Collection, Coroutine, Iterable, Iterator, Sequence
from collections.abc import (
Callable,
Collection,
Coroutine,
Iterable,
Iterator,
Sequence,
)
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures._base import DoneAndNotDoneFutures
from contextlib import asynccontextmanager, contextmanager, suppress
Expand All @@ -29,7 +36,6 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Generic,
Literal,
Expand Down Expand Up @@ -125,7 +131,7 @@
from distributed.worker import get_client, get_worker, secede

if TYPE_CHECKING:
from typing_extensions import TypeAlias
from typing import TypeAlias

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1121,9 +1127,7 @@ def __init__(
security = getattr(self.cluster, "security", None)
elif address is not None and not isinstance(address, str):
raise TypeError(
"Scheduler address must be a string or a Cluster instance, got {}".format(
type(address)
)
f"Scheduler address must be a string or a Cluster instance, got {type(address)}"
)

# If connecting to an address and no explicit security is configured, attempt
Expand Down Expand Up @@ -1375,10 +1379,7 @@ def __repr__(self):
return text

elif self.scheduler is not None:
return "<{}: scheduler={!r}>".format(
self.__class__.__name__,
self.scheduler.address,
)
return f"<{self.__class__.__name__}: scheduler={self.scheduler.address!r}>"
else:
return f"<{self.__class__.__name__}: No scheduler connected>"

Expand Down Expand Up @@ -2368,7 +2369,7 @@ async def _gather(self, futures, errors="raise", direct=None, local_worker=None)
"Cannot gather Futures created by another client. "
f"These are the {len(mismatched_futures)} (out of {len(futures)}) "
f"mismatched Futures and their client IDs (this client is {self.id}): "
f"{ {f: f.client.id for f in mismatched_futures} }" # noqa: E201, E202
f"{ {f: f.client.id for f in mismatched_futures} }"
)
keys = [future.key for future in future_set]
bad_data = dict()
Expand Down Expand Up @@ -5892,8 +5893,8 @@ def count(self):
return len(self.futures) + len(self.queue.queue)

def __repr__(self):
return "<as_completed: waiting={} done={}>".format(
len(self.futures), len(self.queue.queue)
return (
f"<as_completed: waiting={len(self.futures)} done={len(self.queue.queue)}>"
)

def __iter__(self):
Expand Down
5 changes: 2 additions & 3 deletions distributed/dashboard/components/nvml.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,8 @@ def update(self):
"escaped_worker": [url_escape(w) for w in worker],
}

self.memory_figure.title.text = "GPU Memory: {} / {}".format(
format_bytes(sum(memory)),
format_bytes(memory_total),
self.memory_figure.title.text = (
f"GPU Memory: {format_bytes(sum(memory))} / {format_bytes(memory_total)}"
)
self.memory_figure.x_range.end = memory_max

Expand Down
6 changes: 3 additions & 3 deletions distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import logging
from collections.abc import Hashable
from collections.abc import Callable, Hashable
from datetime import timedelta
from inspect import isawaitable
from typing import TYPE_CHECKING, Any, Callable, Literal, cast
from typing import TYPE_CHECKING, Any, Literal, cast

from tornado.ioloop import IOLoop

Expand All @@ -18,7 +18,7 @@
from distributed.utils import log_errors

if TYPE_CHECKING:
from typing_extensions import TypeAlias
from typing import TypeAlias

import distributed
from distributed.deploy.cluster import Cluster
Expand Down
28 changes: 8 additions & 20 deletions distributed/deploy/old_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
import os
import socket
import sys
import traceback
import warnings
Expand Down Expand Up @@ -136,7 +135,7 @@ def read_from_stdout():
)
)
line = stdout.readline()
except (PipeTimeout, socket.timeout):
except (TimeoutError, PipeTimeout):
pass

def read_from_stderr():
Expand All @@ -155,7 +154,7 @@ def read_from_stderr():
+ bcolors.ENDC
)
line = stderr.readline()
except (PipeTimeout, socket.timeout):
except (TimeoutError, PipeTimeout):
pass

def communicate():
Expand Down Expand Up @@ -208,16 +207,12 @@ def communicate():
def start_scheduler(
logdir, addr, port, ssh_username, ssh_port, ssh_private_key, remote_python=None
):
cmd = "{python} -m distributed.cli.dask_scheduler --port {port}".format(
python=remote_python or sys.executable, port=port
)
cmd = f"{remote_python or sys.executable} -m distributed.cli.dask_scheduler --port {port}"

# Optionally re-direct stdout and stderr to a logfile
if logdir is not None:
cmd = f"mkdir -p {logdir} && {cmd}"
cmd += "&> {logdir}/dask_scheduler_{addr}:{port}.log".format(
addr=addr, port=port, logdir=logdir
)
cmd += f"&> {logdir}/dask_scheduler_{addr}:{port}.log"

# Format output labels we can prepend to each line of output, and create
# a 'status' key to keep track of jobs that terminate prematurely.
Expand Down Expand Up @@ -297,16 +292,11 @@ def start_worker(
)

if local_directory is not None:
cmd += " --local-directory {local_directory}".format(
local_directory=local_directory
)
cmd += f" --local-directory {local_directory}"

# Optionally redirect stdout and stderr to a logfile
if logdir is not None:
cmd = f"mkdir -p {logdir} && {cmd}"
cmd += "&> {logdir}/dask_scheduler_{addr}.log".format(
addr=worker_addr, logdir=logdir
)
cmd = f"mkdir -p {logdir} && {cmd}&> {logdir}/dask_scheduler_{worker_addr}.log"

label = f"worker {worker_addr}"

Expand Down Expand Up @@ -401,10 +391,8 @@ def __init__(
"dask-ssh_" + datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S"),
)
print(
bcolors.WARNING + "Output will be redirected to logfiles "
'stored locally on individual worker nodes under "{logdir}".'.format(
logdir=logdir
)
bcolors.WARNING
+ f'Output will be redirected to logfiles stored locally on individual worker nodes under "{logdir}".'
+ bcolors.ENDC
)
self.logdir = logdir
Expand Down
16 changes: 4 additions & 12 deletions distributed/deploy/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,11 @@ async def start(self):

result = await self.connection.run("uname")
if result.exit_status == 0:
set_env = 'env DASK_INTERNAL_INHERIT_CONFIG="{}"'.format(
dask.config.serialize(dask.config.global_config)
)
set_env = f'env DASK_INTERNAL_INHERIT_CONFIG="{dask.config.serialize(dask.config.global_config)}"'
else:
result = await self.connection.run("cmd /c ver")
if result.exit_status == 0:
set_env = "set DASK_INTERNAL_INHERIT_CONFIG={} &&".format(
dask.config.serialize(dask.config.global_config)
)
set_env = f"set DASK_INTERNAL_INHERIT_CONFIG={dask.config.serialize(dask.config.global_config)} &&"
else:
raise Exception(
"Worker failed to set DASK_INTERNAL_INHERIT_CONFIG variable "
Expand Down Expand Up @@ -237,15 +233,11 @@ async def start(self):

result = await self.connection.run("uname")
if result.exit_status == 0:
set_env = 'env DASK_INTERNAL_INHERIT_CONFIG="{}"'.format(
dask.config.serialize(dask.config.global_config)
)
set_env = f'env DASK_INTERNAL_INHERIT_CONFIG="{dask.config.serialize(dask.config.global_config)}"'
else:
result = await self.connection.run("cmd /c ver")
if result.exit_status == 0:
set_env = "set DASK_INTERNAL_INHERIT_CONFIG={} &&".format(
dask.config.serialize(dask.config.global_config)
)
set_env = f"set DASK_INTERNAL_INHERIT_CONFIG={dask.config.serialize(dask.config.global_config)} &&"
else:
raise Exception(
"Scheduler failed to set DASK_INTERNAL_INHERIT_CONFIG variable "
Expand Down
4 changes: 2 additions & 2 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import tempfile
import uuid
import zipfile
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any, Callable, ClassVar
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, ClassVar

from dask.typing import Key
from dask.utils import _deprecated_kwarg, funcname, tmpfile
Expand Down
2 changes: 1 addition & 1 deletion distributed/diagnostics/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import sys
import warnings
import weakref
from collections.abc import Callable
from contextlib import suppress
from timeit import default_timer
from typing import Callable

from tlz import valmap
from tornado.ioloop import IOLoop
Expand Down
3 changes: 2 additions & 1 deletion distributed/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import logging
import threading
from collections import deque
from typing import Callable, Final
from collections.abc import Callable
from typing import Final

import psutil

Expand Down
Loading
Loading