Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sgkit conversion for 1M samples taking too long #35

Closed
jeromekelleher opened this issue Sep 4, 2023 · 48 comments · May be fixed by sgkit-dev/sgkit#1138
Closed

Sgkit conversion for 1M samples taking too long #35

jeromekelleher opened this issue Sep 4, 2023 · 48 comments · May be fixed by sgkit-dev/sgkit#1138

Comments

@jeromekelleher
Copy link
Collaborator

Running the sgkit conversion for 1M samples is taking weeks, and it looks like it's spilling to disk a lot. It needs some tweaking, so that it'll run on my server (256G of RAM).

Any pointers on what to do here @tomwhite, @benjeffery? code is here: https://github.com/pystatgen/sgkit-publication/blob/c9fe141764abd5b016d39d09c90f6b99b0d30a3c/src/convert.py#L27

@jeromekelleher
Copy link
Collaborator Author

There's lot of this warning:

2023-09-04 14:11:23,828 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 26.06 GiB -- Worker memory limit: 31.48 GiB

@jeromekelleher
Copy link
Collaborator Author

I thought I had tried some of the more obvious things like tweaking the temp sizes, but maybe not.

@jeromekelleher
Copy link
Collaborator Author

Aha, I did in #36

@benjeffery
Copy link
Contributor

So there are three chunk lengths to set here. I explain them in https://github.com/pystatgen/sgkit/pull/1044#issuecomment-1458068866 the upshot of which is that read_chunk_length is the one that impacts dask worker memory.

@jeromekelleher
Copy link
Collaborator Author

jeromekelleher commented Sep 5, 2023

Any suggestions for starting point values for 1M samples and 256G of RAM?

@benjeffery
Copy link
Contributor

It's hard as it is VCF-content dependant, but you should find that halving read_chunk_length roughly halves the peak RAM. So use your current usage and the default value (chunk_length) to pick one.
For GeL I am using 10_000 which for 80,000 diploids uses about 16GB.

@jeromekelleher
Copy link
Collaborator Author

Currently I have this:

    client = dask.distributed.Client(n_workers=12, threads_per_worker=1)
    print(client)
    # Tried this, doesn't work:
    # with dask.diagnostics.ProgressBar():
    # Need temp_chunk_length for 1M samples.
    sgkit.io.vcf.vcf_to_zarr(infile, outfile, temp_chunk_length=100,
            read_chunk_length=1000, tempdir="data/tmp/")

which seems to be fitting into the available RAM fairly comfortably. However, it's producing gajillions of temporary files (to the point where I can't even count them all in a reasonable time). Currently there are 457 "part" zarrs, and each of these has 10s of thousands of tiny files. E.g.:

jk@holly$ ls -lh data/tmp/vcf_to_zarr_992618a2-64ca-4b20-b567-11888fc3a0a7/chr21_n1000000.bcf/part-365.zarr/call_genotype | head
total 137M
-rw-r--r-- 1 jk jk 520 Sep  7 05:05 0.0.0
-rw-r--r-- 1 jk jk 506 Sep  7 05:05 0.1.0
-rw-r--r-- 1 jk jk 474 Sep  7 05:05 0.10.0
-rw-r--r-- 1 jk jk 460 Sep  7 05:05 0.100.0
-rw-r--r-- 1 jk jk 475 Sep  7 05:05 0.101.0
-rw-r--r-- 1 jk jk 462 Sep  7 05:05 0.102.0
-rw-r--r-- 1 jk jk 466 Sep  7 05:05 0.103.0
-rw-r--r-- 1 jk jk 493 Sep  7 05:05 0.104.0
-rw-r--r-- 1 jk jk 492 Sep  7 05:05 0.105.0
$ ls -lh data/tmp/vcf_to_zarr_992618a2-64ca-4b20-b567-11888fc3a0a7/chr21_n1000000.bcf/part-365.zarr/call_genotype | wc -l
35001

Because each of these files is < 4K, we're using up a huge amount of temporary storage space.

Any guidance here @tomwhite @benjeffery? I think we really need to make this a bit easier, as our key application is working on large datasets.

@jeromekelleher
Copy link
Collaborator Author

It's hard as it is VCF-content dependant,

Surely knowing the number of variants and number of samples will give a reasonable first approximation?

@benjeffery
Copy link
Contributor

You can reduce the number of files by having a larger temp_chunk_length, this shouldn't increase peak RAM much. You can prob go to at least 10_000. I agree this isn't easy hence this issue: https://github.com/pystatgen/sgkit/issues/1123

@jeromekelleher
Copy link
Collaborator Author

Latest update: having too small a read chunk length seems to make it very slow https://github.com/pystatgen/sgkit/issues/1128

@jeromekelleher
Copy link
Collaborator Author

Latest 😢

2023-09-08 23:45:03,560 - distributed.worker.memory - WARNING - Worker is at 66% memory usage. Resuming worker. Process memory: 16.67 GiB -- Worker memory limit: 25.18 GiB        [97/129840]
2023-09-08 23:45:03,573 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 22.32 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:04,085 - distributed.worker.memory - WARNING - Worker is at 65% memory usage. Resuming worker. Process memory: 16.46 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:04,708 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:36717 (pid=433010) exceeded 95% memory budget. Restarting...
2023-09-08 23:45:05,676 - distributed.nanny - WARNING - Restarting worker                      
2023-09-08 23:45:06,811 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 17.77 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:08,610 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 20.20 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:11,187 - distributed.worker.memory - WARNING - Worker is at 77% memory usage. Resuming worker. Process memory: 19.52 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:13,603 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 19.52 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:16,911 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 20.21 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:27,009 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 20.21 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:37,362 - distributed.worker.memory - WARNING - Worker is at 65% memory usage. Resuming worker. Process memory: 16.47 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:38,310 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 17.65 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:49,215 - distributed.worker - WARNING - Compute Failed                        
Key:       vcf_to_zarr_sequential-00fa8da8-4d3a-4999-9123-e050482262e5                         
Function:  execute_task
args:      ((<function apply at 0x7f90536be550>, <function vcf_to_zarr_sequential at 0x7f90425be940>, ['data/chr21_n1000000.bcf'], (<class 'dict'>, [['output', <fsspec.mapping.FSMap object a
t 0x7f904182aa60>], ['region', '1:20398081-20414464'], ['chunk_length', 500], ['chunk_width', 1000], ['read_chunk_length', 2000], ['compressor', Blosc(cname='zstd', clevel=7, shuffle=AUTOSHU
FFLE, blocksize=0)], ['encoding', None], ['ploidy', 2], ['mixed_ploidy', False], ['truncate_calls', False], ['max_alt_alleles', 3], ['fields', None], ['exclude_fields', None], ['field_defs',
 None]])))                                     
kwargs:    {}                                  
Exception: 'ValueError("append_dim=\'variants\' does not match any existing dataset dimensions {}")'

2023-09-08 23:45:52,200 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 17.74 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:45:53,629 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 20.16 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:02,456 - distributed.worker.memory - WARNING - Worker is at 79% memory usage. Resuming worker. Process memory: 20.07 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:02,475 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 20.07 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:04,199 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 20.19 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:09,237 - distributed.worker.memory - WARNING - Worker is at 79% memory usage. Resuming worker. Process memory: 20.07 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:09,398 - distributed.worker.memory - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 20.24 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:12,499 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 23.82 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:22,600 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 23.81 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:32,699 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 23.82 GiB -- Worker memory limit: 25.18 GiB
2023-09-08 23:46:36,115 - distributed.worker - WARNING - Compute Failed                        
Key:       vcf_to_zarr_sequential-047a1607-b15d-4a24-83d8-349908bc0cbc                         
Function:  execute_task
args:      ((<function apply at 0x7fdbd7827550>, <function vcf_to_zarr_sequential at 0x7fdbc6726940>, ['data/chr21_n1000000.bcf'], (<class 'dict'>, [['output', <fsspec.mapping.FSMap object a
t 0x7fdbb6f005b0>], ['region', '1:37470209-37486592'], ['chunk_length', 500], ['chunk_width', 1000], ['read_chunk_length', 2000], ['compressor', Blosc(cname='zstd', clevel=7, shuffle=AUTOSHU
FFLE, blocksize=0)], ['encoding', None], ['ploidy', 2], ['mixed_ploidy', False], ['truncate_calls', False], ['max_alt_alleles', 3], ['fields', None], ['exclude_fields', None], ['field_defs',
 None]])))                                     
kwargs:    {}                                  
Exception: 'ValueError("append_dim=\'variants\' does not match any existing dataset dimensions {}")'

2023-09-08 23:46:42,825 - distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distrib
uted.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: 23.81 GiB -- Worker memory limit: 25.18 GiB

I think that failed after about 12 hours.

This is the code:

   sgkit.io.vcf.vcf_to_zarr(infile, outfile,
            temp_chunk_length=500,
            read_chunk_length=2000,
            chunk_length=10_000,
            tempdir="tmp/")

We must be doing something wrong here, this should be more robust...

@tomwhite
Copy link
Collaborator

I'm not sure if this is the problem you are hitting, but I would expect

read_chunk_length <= temp_chunk_length <= chunk_length

This is because read_chunk_length is the number of variants (VCF rows) to read from the VCF file, and it includes all samples and all fields, while temp_chunk_length is the (variant) chunk size of the intermediate Zarr store, which includes all samples but only the fields being written and is column-oriented so should be more compressible than VCF.

Also chunk_length can be a lot bigger than temp_chunk_length since the sample chunking (chunk_width) is being taken into account at this point. You might want to increase that too (to 10,000?) - the default is 1000 which means you'll have 1000 chunks in the samples dimension, which is quite a lot.

@tomwhite
Copy link
Collaborator

Another thing that is useful when debugging is to break vcf_to_zarr into the two constituent operations vcf_to_zarrs (plural) and concat_zarrs as this can help identify where the problem is. See https://pystatgen.github.io/sgkit/latest/vcf.html#vcf-low-level-operation

Confusingly, chunk_length in vcf_to_zarrs is the same as temp_chunk_length in vcf_to_zarr!

@jeromekelleher
Copy link
Collaborator Author

After a discussion with @benjeffery, we tried this:

  sgkit.io.vcf.vcf_to_zarr(infile,
            outfile,
            target_part_size="100MB",
            chunk_width=10_000,
            chunk_length=2000,
            temp_chunk_length=1000,
            read_chunk_length=500,
            tempdir="tmp/")

which seemed to be working OK for a few days. However, the memory usage seems to keep creeping up, and it looks like an error happened about 2/3 of the way through the vcf reading phase and it seems to have restarted the whole thing. (I see a few "erred" blocks when I look at the graph)

I guess I just have to try using fewer threads? I'm currently using 10 threads on a machine with 256G of RAM.

@benjeffery
Copy link
Contributor

I would be good to know what the errors are that caused those tasks to fail if you have the worker logs.

@jeromekelleher
Copy link
Collaborator Author

I should do - how do I get them?

@tomwhite
Copy link
Collaborator

Rather than using fewer threads, would it be possible to run with more resources on a cluster? A computation that takes days is more painful to debug.

What is the size of the VCF in bytes, and number of variants? Is it synthetic?

Regarding the memory usage creeping up, there is a recent Dask issue (hard to say if this is related): dask/distributed#8164. And regarding the computation being restarted, there's some recent discussion here: dask/distributed#8182 (again, not sure how relevant this is).

@jeromekelleher
Copy link
Collaborator Author

jeromekelleher commented Sep 15, 2023

Thanks @tomwhite. I don't really have time to get this set up on a cluster I'm afraid. I'm trying again with 8 threads and seeing if it gets through over the weekend.

The VCF is a 1M subset of the simulated French-Canadian dataset from this paper

It has 7.2 M variants and the BCF file is 52G. This is a simulation of chromosome 21 - a big chromosome would be at least 10X larger.

@benjeffery
Copy link
Contributor

For the ukb and GeL work I've been using 200-400 threads and it still takes hours, for 10% the samples you have. With 8 threads I'd think it would take more than a weekend. Setting up the cluster shouldn't be too hard, using https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SGECluster.html#dask_jobqueue.SGECluster
As I've done this at GeL and ukb I'm happy to do it on BMRC if we have a project code? Or is that an admin headache?

@jeromekelleher
Copy link
Collaborator Author

It's 10% through after a few hours, and there's no sign of memory pressure, so this really should work.

I think it's important to not just throw a cluster at this - these are more than adequate computational resources and we should be able to do the conversion. We'll put people off sgkit before they start if they can't convert their VCFs without a cluster.

@jeromekelleher
Copy link
Collaborator Author

Here's a screenshot of the Dask dashboard after running over the weekend. What seems to be happening is that the workers keep running tasks (each taking about 20 minutes), slowly increasing their memory usage until eventually getting killed.

workers

Which would be fine, except it seems that the computation ends up starting from scratch each time. The transition log seems to imply that tasks are getting done multiple times:

Screenshot from 2023-09-18 13-27-40

So, it seems like there's a couple of problems here:

  1. Memory usage keeps growing (possibly one of the linked upstream issues)
  2. We're redoing tasks unnecessarily when a worker dies

Any ideas on why (2) is happening?

@benjeffery
Copy link
Contributor

We had a chat about this, but I think it's worth recording it here.
The workers are getting killed due to memory exhaustion. As they are not shutting down gracefully they are not copying their in-memory result objects to another node. Dask sees this as the task being lost and therefore needing to be redone.

We've configured the workers to restart every hour with the lifetime argument - this should limit the memory usage and let the workers restart gracefully by copying their results to another node.

@jeromekelleher
Copy link
Collaborator Author

jeromekelleher commented Sep 19, 2023

Hmm, trying to expire the workers didn't work. Here's some snippets from the log:

distributed.client.AllExit
Process Dask Worker process (from Nanny):
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap                
    self.run()                                 
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run                       
    self._target(*self._args, **self._kwargs)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/process.py", line 202, in _run 
    target(*args, **kwargs)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 1001, in _run  
    asyncio_run(run(), loop_factory=get_loop_factory())                                        
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/compatibility.py", line 239, in asyncio_run
    _cancel_all_tasks(loop)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/compatibility.py", line 255, in _cancel_all_tasks
    loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))                
  File "/usr/lib/python3.9/asyncio/base_events.py", line 629, in run_until_complete            
    self.run_forever()                         
  File "/usr/lib/python3.9/asyncio/base_events.py", line 596, in run_forever                   
    self._run_once()                           
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once                    
    handle._run()                              
  File "/usr/lib/python3.9/asyncio/events.py", line 80, in _run                                
    self._context.run(self._callback, *self._args)                                             
  File "/home/jk/.local/lib/python3.9/site-packages/tornado/ioloop.py", line 685, in <lambda>  
    lambda f: self._run_callback(functools.partial(callback, future))                          
  File "/home/jk/.local/lib/python3.9/site-packages/tornado/ioloop.py", line 738, in _run_callback
    ret = callback()                           
  File "/home/jk/.local/lib/python3.9/site-packages/tornado/ioloop.py", line 762, in _discard_future_result
    future.result()                            
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/compatibility.py", line 236, in asyncio_run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 629, in run_until_complete            
    self.run_forever()                         
  File "/usr/lib/python3.9/asyncio/base_events.py", line 596, in run_forever                   
    self._run_once()                           
  File "/usr/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once                    
    handle._run()                              
  File "/usr/lib/python3.9/asyncio/events.py", line 80, in _run                                
    self._context.run(self._callback, *self._args)                                             
  File "/home/jk/.local/lib/python3.9/site-packages/tornado/ioloop.py", line 919, in _run      
    val = self.callback()
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/system_monitor.py", line 160, in update
    net_ioc = psutil.net_io_counters()
  File "/usr/lib/python3/dist-packages/psutil/__init__.py", line 2109, in net_io_counters      
    rawdict = _psplatform.net_io_counters()
  File "/usr/lib/python3/dist-packages/psutil/_pslinux.py", line 1034, in net_io_counters      
    with open_text("%s/net/dev" % get_procfs_path()) as f:                                     
  File "/usr/lib/python3/dist-packages/psutil/_common.py", line 750, in get_procfs_path        
    return sys.modules['psutil'].PROCFS_PATH
KeyboardInterrupt                              
2023-09-18 13:39:13,819 - distributed.nanny - WARNING - Worker process still alive after 3.199998474121094 seconds, killing
2023-09-18 13:39:13,819 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2023-09-18 13:39:13,819 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 13:39:13,820 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
...
Task exception was never retrieved
future: <Task finished name='Task-592' coro=<Client._gather.<locals>.wait() done, defined at /home/jk/.local/lib/python3.9/site-packages/distributed/client.py:2232> exception=AllExit()>
Traceback (most recent call last):
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/client.py", line 2241, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-595' coro=<Client._gather.<locals>.wait() done, defined at /home/jk/.local/lib/python3.9/site-packages/distributed/client.py:2232> exception=AllExit()>
Traceback (most recent call last):
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/client.py", line 2241, in wait
    raise AllExit()
distributed.client.AllExit
make: *** [Makefile:26: data/chr21_n1000000.sgz] Error 1

jk@holly$ make data/chr21_n1000000.sgz
python3 ../src/convert.py vcf-to-sgkit-zarr data/chr21_n1000000.bcf data/chr21_n1000000.sgz
<Client: 'tcp://127.0.0.1:45527' processes=16 threads=16, memory=251.84 GiB>
2023-09-18 15:17:44,691 - distributed.nanny - WARNING - Worker process still alive after 3.199998474121094 seconds, killing
2023-09-18 15:19:15,373 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 15:20:03,260 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2023-09-18 15:20:21,574 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2023-09-18 15:20:41,689 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2023-09-18 15:20:58,999 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2023-09-18 15:21:09,093 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 15:21:40,610 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2023-09-18 15:21:51,262 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 15:21:54,438 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing
2023-09-18 15:23:29,914 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 15:24:03,211 - distributed.nanny - WARNING - Worker process still alive after 3.1999989318847657 seconds, killing
2023-09-18 15:24:07,157 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 15:24:11,903 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2023-09-18 15:24:55,601 - distributed.nanny - WARNING - Worker process still alive after 3.1999990844726565 seconds, killing
2023-09-18 15:26:47,883 - distributed.active_memory_manager - WARNING - Tried retiring worker tcp://127.0.0.1:40455, but 68 tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired.
2023-09-18 15:27:06,096 - distributed.nanny - WARNING - Worker process still alive after 3.1999992370605472 seconds, killing

This was the code:

 client = dask.distributed.Client(
            n_workers=16, threads_per_worker=1,
            lifetime="100m", lifetime_stagger="5m",
            lifetime_restart=True)

@jeromekelleher
Copy link
Collaborator Author

It does seem like the tasks never getting "done" here is the major problem?

@benjeffery
Copy link
Contributor

What was progress like? Were any getting completed? Odd that "68 tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired." I don't understand that. I'm running the worker with memray at the moment to try and figure out:
a) Where the peak ram moment is and what drives it
b) What is accumulating over many tasks

@jeromekelleher
Copy link
Collaborator Author

I don't think it got anywhere, it died on the first attempt to turn over the workers AFAICT.

@benjeffery
Copy link
Contributor

Screenshot from 2023-09-19 13-06-00
I can see memory usage going up as tasks are processed - digging into what objects those are.

@benjeffery
Copy link
Contributor

benjeffery commented Sep 19, 2023

Here are two flamegraphs of retained memory objects, one for a after a single VCF parse, and the second for five in a row.
Screenshot from 2023-09-19 14-24-00
Screenshot from 2023-09-19 14-24-37

The horizontal axis is the amount of memory, not time. By comparing the single run to the five in a row we can see which callstacks are holding more ram in the five in a row.

From this it is clear that dask profiling is taking some of this RAM (about 100MB in 40min). You can disable it by running dask.config.set({'scheduler-profiler': None}) before doing anything else, and I'd advise doing that. I'd be interested why profiling is on by default in dask as it doesn't make sense to me.
Jerome's workers were dying after a few hours, so at that point profile data would be maybe 500MB? Doesn't explain everything.

The other place that is holding ram is in the cyvcf2 parser - in this case 80MB, or about 16MB per full parse. I'm now writing a minimal standalone parser and seeing if that leaks. My guess is that this leaks in proportion to some dimension of the data and that's why Jerome is hitting issues with these large files.

@benjeffery
Copy link
Contributor

benjeffery commented Sep 19, 2023

Getting memray to detail the stack down into native non-python call frames below cycvcf2 shows one called bcf_hdr_add_sample_len which implies this leak is proportional to samples.

Running:

def doit():
    vcf = VCF(sys.argv[1])
    vcf.close()
    del vcf

for i in range(500):
    doit()

Results in growing memory usage. Tracking down the leak in cyvcf2/htslib.

@benjeffery
Copy link
Contributor

Found it. See https://github.com/pystatgen/sgkit/pull/1129

@jeromekelleher try pip install git+https://github.com/benjeffery/sgkit.git@memleak along with dask.config.set({'scheduler-profiler': None})

@savitakartik I think this may explain why your larger chromosomes were failing.

@benjeffery
Copy link
Contributor

On testing memory growth is reduced, but not eliminated. Looks like there is an additional leak in cyvcf2.

@jeromekelleher
Copy link
Collaborator Author

That's what I'm observing as well. Currently half-way through converting the VCF chunks with no errors, but memory is growing.

Any theories as to why Dask is repeating the already-done tasks? This seems at least as bad as the memory leak.

@benjeffery
Copy link
Contributor

Any theories as to why Dask is repeating the already-done tasks? This seems at least as bad as the memory leak.

I thought that was due to workers getting killed and the (tiny) result object being lost. We might be able to get around that with the dask.graph_manipulation.checkpoint(split_every=64) but I have not yet fully understood the implications of that.

I have confirmed another leak in cyvcf2 in the vcf.raw_header method - working up a PR now.

@benjeffery
Copy link
Contributor

brentp/cyvcf2#281

@jeromekelleher
Copy link
Collaborator Author

That's what I'm observing as well. Currently half-way through converting the VCF chunks with no errors, but memory is growing.

And it's still failing in the same way :sadpanda:

We just got a bit further this time.

@jeromekelleher
Copy link
Collaborator Author

The non-recovery bit is the killer here - memory leaks are bad, but Dask should let us recover from this.

I wonder if it's worth inspecting the dask graph we're getting when when we call compute here?

Maybe dask is doing something a bit weird when aggregating all the tasks, and we should explicitly create a single root task instead? Or perhaps setting optimize_graph=False when calling compute()?

@benjeffery
Copy link
Contributor

Yes, I'm still seeing a doubling of RAM used after 90min of parsing:
Screenshot from 2023-09-21 09-49-19
About half of this looks to be profiling-related, even though I have run the command that is supposed to disable it. The other half seems to be stuff that python is holding onto in sgkit code.

I'll have a quick look at both of these - but as you say the main problem is that the process should be resilient to worker loss, and it isn't. The problem is that dask wants all the result objects to exist at the end of compute - this phase of the process has no need for any result object, so I'll look into ways to tell dask this.

@tomwhite
Copy link
Collaborator

I wonder if it's worth inspecting the dask graph we're getting when when we call compute here?

There's no harm in inspecting the graph, but this is embarrassingly parallel so should just be a bunch of distinct nodes.

Another way to make progress (and separate the Dask problem from the memory leak problem) would be to adapt the code in https://github.com/pystatgen/sgkit/issues/1130 to run using a ProcessPoolExecutor.

@jeromekelleher
Copy link
Collaborator Author

Would that be resilient to failures due to memory leaks @tomwhite? I guess we could use max_tasks_per_child to limit the number of tasks per worker to something small, and workaround that way?

@tomwhite
Copy link
Collaborator

It wouldn't be resilient to memory leaks, so setting max_tasks_per_child is a good idea. As long as the memory usage for processing a single task is under control then this should work ok.

@benjeffery
Copy link
Contributor

I think a pragmatic solution here might be to let dask redo the tasks, but have the task itself realise that the zarr chunk has already been written and to return immediately. This could be done by writing a "success" file just after the zarr chunk is written out, then checking for its existence before starting the parse. I'll code up a concept PR for this.

@benjeffery
Copy link
Contributor

benjeffery commented Sep 21, 2023

Ok @jeromekelleher please try pip install git+https://github.com/benjeffery/sgkit.git@skip-done-parse which has https://github.com/pystatgen/sgkit/pull/1132 in. This PR prevents double-parsing of chunks.

@tomwhite
Copy link
Collaborator

I think a pragmatic solution here might be to let dask redo the tasks, but have the task itself realise that the zarr chunk has already been written and to return immediately. This could be done by writing a "success" file just after the zarr chunk is written out, then checking for its existence before starting the parse. I'll code up a concept PR for this.

I'm ok with this as a way to move forward, but it seems that there is still a major bug lurking (possibly in Dask) that needs getting to the bottom of.

@jeromekelleher
Copy link
Collaborator Author

I finally got this to work 🎉

I thought initially that it had failed again, given the output:


Traceback (most recent call last):
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 599, in close
    await self.kill(timeout=timeout, reason=reason)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 382, in kill
    await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 855, in kill
    await process.join(max(0, deadline - time()))
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/process.py", line 330, in join
    await wait_for(asyncio.shield(self._exit_future), timeout)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/utils.py", line 1927, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
2023-09-22 10:31:44,737 - distributed.nanny - ERROR - Error in Nanny killing Worker subprocess
Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 599, in close
    await self.kill(timeout=timeout, reason=reason)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 382, in kill
    await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/nanny.py", line 855, in kill
    await process.join(max(0, deadline - time()))
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/process.py", line 330, in join
    await wait_for(asyncio.shield(self._exit_future), timeout)
  File "/home/jk/.local/lib/python3.9/site-packages/distributed/utils.py", line 1927, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
2023-09-22 10:31:44,746 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487083 parent=485970 started daemon>
2023-09-22 10:31:44,746 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487072 parent=485970 started daemon>
2023-09-22 10:31:44,746 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487050 parent=485970 started daemon>
2023-09-22 10:31:44,746 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487041 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487030 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487021 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487012 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=487000 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486989 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486971 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486959 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486931 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486928 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486919 parent=485970 started daemon>
2023-09-22 10:31:44,747 - distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=486906 parent=485970 started daemon>
2023-09-22 10:31:45,315 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 486989 exit status was already read will report exitcode 255
2023-09-22 10:31:45,330 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 486919 exit status was already read will report exitcode 255

but it seems that the process actually exited sucessfully and we have an sgkit dataset of the right size.

Definitely room for improvement here...

@tomwhite
Copy link
Collaborator

@benjeffery I wonder if the suggestions here would help with the side effects problem: dask/distributed#8164 (comment)

@benjeffery
Copy link
Contributor

Matt Rocklin over at dask/dask#10654 suggests using fire_and_forget. This prevents work from being redone, but also returns immediately - which we can't have as we need to wait for all the chunks to be parsed. Adding a wait(futures) makes the work be redone again. Looking into how to wait, but not have the future redone.

@jeromekelleher
Copy link
Collaborator Author

I think we should close this issue here because it's an sgkit problem, not a particular issue here on this repo.

Have we captured the context required in sgkit issues @benjeffery?

@benjeffery
Copy link
Contributor

I've written it up at https://github.com/pystatgen/sgkit/issues/1154 as there was no issue on sgkit specifically for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants