Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems when running PyWPS with Dask in async and sync modes #627

Open
cjauvin opened this issue Sep 22, 2021 · 2 comments
Open

Problems when running PyWPS with Dask in async and sync modes #627

cjauvin opened this issue Sep 22, 2021 · 2 comments
Labels

Comments

@cjauvin
Copy link

cjauvin commented Sep 22, 2021

In the context of PAVICS, we are observing that the PyWPS process queues of our services (which are backed by a Postgres database) regularly fill up with stuck processes (in the "stated" state, but not progressing in terms of their complete percentage).

Our first assumption was that the problem is at the PyWPS level, but after a careful analysis I have established that it's rather related with dask, xarray and netcdf, which are used by our processes.

A process gets stuck when it is called in async mode, and following a (successful) process that was called in sync mode, by the same server instance. This is important because a series of exclusively sync calls, or alternatively a series of exclusively async calls, does NOT trigger the problem. It's only when the server needs to operate in both modes, with code using the libraries mentioned above, that the issue arises.

To be more precise, I have been able to reproduce and isolate two different types of crash in our environment, and I'm not certain to what extent they are related. One thing is clear: they are both triggered by the same sync + async sequence described above.

The first problem requires that the server runs through gunicorn, and hangs exactly there: https://github.com/pydata/xarray/blob/7a65d59637efb0c78d5f657f3178ffb332a54dfb/xarray/backends/common.py#L157, when trying to save a NetCDF file.

After investigating this problem for a while, and because by now the principal suspect was clearly Dask (which makes sense, given that this whole setup is making use of no less than THREE different layers of multiprocessing, with gunicorn, PyWPS and Dask!), I had the idea of trying to refactor our Dask scheduler setup. In our current code, we are using the default scheduler, so I simply introduced a dask.distributed.Client, connecting to an externally running dask scheduler, at the location in the code which seems to make the most sense for this: the beginning of the PyWPS process _handler. This experiment lead to the discovery of a second problem, which is described in this issue: dask/distributed#5334, because it is easier to isolate and reproduce.

Given that it's related to other similar existing issues in the dask.distributed project, this second problem has a potential fix: setting the multiprocessing.set_start_method to spawn, instead of its default fork (at least on Linux). This however leads to pickle-related problems, because the process and WPS request and response classes contain some hard to serialize objects. Among many things, I have tried to replace pickle with dill, which is supposedly more powerful, to no avail.

So in the end we are stuck with these problems (or maybe it's a unique problem?) and my only mitigation solution for now is to make birdy, which is the client to talk with our WPS processes, async-only, which is clearly not an ideal solution.

@cehbrecht cehbrecht added the bug label Sep 23, 2021
@cehbrecht
Copy link
Collaborator

In our Rook WPS we are using clisops with xarray and dask. In our production deployment we run async processes on a Slurm cluster. This gets around the multiprocessing in pywps. With Slurm it seems to work ok ... I have not seen stucked jobs ...

There is also still another issue with multiprocessing, see #508.

@huard
Copy link
Collaborator

huard commented May 11, 2022

@cehbrecht I suggest adding a new dask processing backend to see if it solves some of our issues.

The rough plan would be to have a new mode option, and new CONFIG values:

[processing]
mode = "dask"
dask_n_workers = "10"
dask_scheduler = "http://xxx.xxx.x.x"

At initialization, pywps would connect to the scheduler if it exists, otherwise create a new one. Then a new DaskProcessing class would instruct PyWPS on how to start and cancel processes using the dask backend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants