Skip to content

How do I start Dask from within an MPI4Py workflow #28

@mrocklin

Description

@mrocklin

I'm sitting with @zonca and he's asking how to start a Dask application from within an mpi4py application. I'll give a brief explanation, and then some code snippets.

You can start a scheduler, client, and workers from within your Python script after you do other work, maybe you call a barrier, then on rank 0 start a scheduler, rank 1 start a client (and a bunch of other dask array/dataframe/delayed code) then on all the other ranks you start workers.

There are docs on how to start dask schedulers and workers from Python here: https://docs.dask.org/en/latest/setup/python-advanced.html .

Then @zonca asks

Well how do I get my data from the existing process?

There are many ways to do that, but a simple (if perhaps inelegant way) would be to attach the data on each process to some worker, and then run a task that collects that data on that worker, something like the following:

# on worker
w = Worker('tcp://127.0.0.1:8786', name='rank-' + rank)
w.my_special_data = data  # <--- we add this line to the docs mentioned above
w.start()

# on client
from dask.distributed import get_worker

def get_local_data():
    worker = get_worker()  # get the worker object from which this task is run
    return worker.my_special_data

# run that function on every rank that holds a worker
futures = [client.submit(get_local_data, pure=False, workers=['rank-' + str(rank)]) for rank in range(2, comm_size)]

Then you can do with those futures as you like

Then @zonca asks

Great, well then how do I cleanly shut down my Dask workers and continue on with my MPI execution?

I thought that calling client.retire_workers() would do this, but apparently it didn't. It looks like the Worker shuts down but the Torando IOLoop continues, which blocks the process. We can probably add a keyword argument to some method to improve this, or you can probably get around it by calling something like:

client.run(lambda : tornado.ioloop.IOLoop.current().stop())

Though this is somewhat rude to the workers :)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions