Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _background_send(self):
# and we're using `yield` and not `await`, the `write` coroutine object
# will not actually have been awaited, and it will remain sitting around
# for someone to retrieve it. At interpreter exit, this will warn
# sommething like `RuntimeWarning: coroutine 'TCP.write' was never
# something like `RuntimeWarning: coroutine 'TCP.write' was never
# awaited`. By using the `closing` contextmanager, the `write` coroutine
# object is always cleaned up, even if `yield` raises `GeneratorExit`.
with contextlib.closing(
Expand Down
4 changes: 2 additions & 2 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ class CustomBase(BaseException):
async def test_comm_closed_on_write_error(tcp, exc_type):
# Internal errors from comm.stream.write, such as
# BufferError should lead to the stream being closed
# and not re-used. See GitHub #4133
# and not reused. See GitHub #4133

reader, writer = await get_tcp_comm_pair()

Expand Down Expand Up @@ -1365,7 +1365,7 @@ def test_register_backend_entrypoint(tmp_path):


class OpaqueList(list):
"""Don't let the serialization layer travese this object"""
"""Don't let the serialization layer traverse this object"""

pass

Expand Down
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,7 @@ def reuse(self, addr: str, comm: Comm) -> None:
"""
Reuse an open communication to the given address. For internal use.
"""
# if the pool is asked to re-use a comm it does not know about, ignore
# if the pool is asked to reuse a comm it does not know about, ignore
# this comm: just close it.
if comm not in self.occupied[addr]:
IOLoop.current().add_callback(comm.close)
Expand Down
2 changes: 1 addition & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2794,7 +2794,7 @@ def update(self):
nodes_data["color"].append(color_of(tg.prefix.name))
nodes_data["tot_tasks"].append(tot_tasks)

# memory alpha factor by 0.4 if not get's too dark
# memory alpha factor by 0.4 if not gets too dark
nodes_data["mem_alpha"].append(
(tg.states["memory"] / sum(tg.states.values())) * 0.4
)
Expand Down
2 changes: 1 addition & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ distributed:
lifetime:
duration: null # Time after which to gracefully shutdown the worker
stagger: 0 seconds # Random amount by which to stagger lifetimes
restart: False # Do we ressurrect the worker after the lifetime deadline?
restart: False # Do we resurrect the worker after the lifetime deadline?

profile:
enabled: True # Whether or not to enable profiling
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4286,7 +4286,7 @@ async def start_unsafe(self) -> Self:
try:
link = format_dashboard_link(addr, server.port)
# formatting dashboard link can fail if distributed.dashboard.link
# refers to non-existant env vars.
# refers to non-existent env vars.
except KeyError as e:
logger.warning(
f"Failed to format dashboard link, unknown value: {e}"
Expand Down
2 changes: 1 addition & 1 deletion distributed/semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def __init__(
if lease_timeout == "inf":
return

## Below is all code for the lease timout validation
## Below is all code for the lease timeout validation

lease_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.locks.lease-timeout"),
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def f(ev):
# another worker with a smaller delay. The key is still the same
fut = c.submit(inc, 1, key="f1", workers=[b.worker_address])
# then, a must switch the execute to fetch. Instead of doing so, it will
# simply re-use the currently computing result.
# simply reuse the currently computing result.
fut = c.submit(inc, fut, workers=[a.worker_address], key="f2")
await wait_for_state("f2", "waiting", a)
await ev.set()
Expand Down
8 changes: 4 additions & 4 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3118,8 +3118,8 @@ async def test_compute_partially_forgotten(c, s, *workers, validate, swap_keys):
s.validate = False
# (CPython impl detail)
# While it is not possible to know what the iteration order of a set will
# be, it is determinisitic and only depends on the hash of the inserted
# elements. Therefore, converting the set to a list will alway yield the
# be, it is deterministic and only depends on the hash of the inserted
# elements. Therefore, converting the set to a list will always yield the
# same order. We're initializing the keys in this very specific order to
# ensure that the scheduler internally arranges the keys in this way

Expand Down Expand Up @@ -4733,7 +4733,7 @@ async def test_retire_workers(c, s, a, b):
info = await c.retire_workers(workers=[a.address], close_workers=True)

# Deployment tooling is sometimes relying on this information to be returned
# This represents WorkerState.idenity() right now but may be slimmed down in
# This represents WorkerState.identity() right now but may be slimmed down in
# the future
assert info
assert info[a.address]
Expand Down Expand Up @@ -8301,7 +8301,7 @@ def test_worker_clients_do_not_claim_ownership_of_serialize_futures(
# different process such that we cannot rely on msg ordering
#
# 1. The alive futures have to be serialized as part of the submit payload
# 2. The future arive at the worker but the worker client doesn't claim
# 2. The future arrive at the worker but the worker client doesn't claim
# ownership
# 3. We're deleting them and the cancellation goes through to the scheduler
# such that it can release the futures
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ async def test_broken_worker_during_computation(c, s, a, b):
)

await asyncio.sleep(random.random() / 20)
with suppress(CommClosedError): # comm will be closed abrupty
with suppress(CommClosedError): # comm will be closed abruptly
await c.run(os._exit, 1, workers=[n.worker_address])

await asyncio.sleep(random.random() / 20)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def _retrieve_annotations():
# The current implementation does in fact not use the same channel due to
# serialization issue (including Futures in BatchedSend appends them to the
# "recent messages" log which screws with the refcounting) but ensure that
# all queued up messages are flushed and received by the schduler befure
# all queued up messages are flushed and received by the scheduler before
# publishing
future = c.submit(_retrieve_annotations, resources=RESOURCES, pure=False)

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4724,7 +4724,7 @@ def assert_rootish():
# can finish the task since otherwise the worker would recognize it as
# cancelled and would forget about it. We emulate this behavior by blocking
# the outgoing scheduler stream until that happens, i.e. this introduces
# artifical latency
# artificial latency
with freeze_batched_send(s.stream_comms[a.address]):
del f1
while any(k in s.tasks for k in keys):
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def test_allow_tasks_stolen_before_first_completes(c, s, a, b):
# same task prefix. This ensures that we have tasks queued up but all of
# them are still classified as unknown.
# The lock allows us to control the duration of the first task without
# delaying test runtime or flakyness
# delaying test runtime or flakiness
def blocked_task(x, lock):
if x == 0:
with lock:
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ def test_raises_with_cause():
raise RuntimeError("foo") from ValueError("bar")

# we're trying to stick to pytest semantics
# If the exception types don't match, raise the first exception that doesnt' match
# If the exception types don't match, raise the first exception that doesn't match
# If the text doesn't match, raise an assert

with pytest.raises(OSError):
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_worker_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def test_task_lifecycle(c, s, a, b):
)

del x, y, z
await async_poll_for(lambda: not a.state.tasks, timeout=5) # For hygene only
await async_poll_for(lambda: not a.state.tasks, timeout=5) # For hygiene only

# Note: use set instead of list to account for rare, but harmless, race conditions
expect = {
Expand Down
2 changes: 1 addition & 1 deletion distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ class LoopRunner:
Parameters
----------
loop: IOLoop (optional)
If given, this loop will be re-used, otherwise an appropriate one
If given, this loop will be reused, otherwise an appropriate one
will be looked up or created.
asynchronous: boolean (optional, default False)
If false (the default), the loop is meant to run in a separate
Expand Down
2 changes: 1 addition & 1 deletion distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def nodebug_setup_module(module):

def nodebug_teardown_module(module):
"""
A teardown_module() that you can install in a test module to reenable
A teardown_module() that you can install in a test module to re-enable
debug facilities.
"""
if module._old_asyncio_debug is not None:
Expand Down
Loading