Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issues with dask interpolate #185

Open
knaaptime opened this issue Sep 10, 2023 · 5 comments
Open

issues with dask interpolate #185

knaaptime opened this issue Sep 10, 2023 · 5 comments
Assignees

Comments

@knaaptime
Copy link
Member

knaaptime commented Sep 10, 2023

i've finally had some time to take a closer look at the dask backend, so starting a thread here to triage some thoughts. In this gist I'm looking at timing for some pretty big datasets (source=2.7m, target=137k). tl;dr,:

  • the single core interpolation takes 4-4.5 minutes,
  • the parallelized version takes 3.5 min
  • the dask-based version takes 1.5 min
    • (thats the generic one without doing any config. If i try to use distributed, everything crashes)

So this is really nice when you've got a large enough dataset, and it will be great to get it fleshed out for the other variable types. Now that i've interacted with the code a little more:

  • i'm not sure what the dtype error is in the currently failing tests, but im not seeing it locally
  • why do categoricals need to be categorical dtype? Usually they are just strings, so this line can be a little confusing when you get back AttributeError: Can only use .cat accessor with a 'category' dtype. If dask absolutely requires a categorical type, we should do a check or conversion
    • what's the purpose of category_vars? The categorical variables should already come back formatted like that, so this block is basically just doing the same thing but forcing categorical instead of using unique
  • if not given, the id col should probably be the target gdf index
    • we probably want to drop the dask index ('hilbert_distance' in the current example) that comes back from area_interpolate_dask, and probably instead set the index to id_col
  • the real work here is still done by the area_interpolate function, and dask is acting as a scheduler (basically a drop in replacement for joblib in the parallelized implementation). So all actual computation of extensive/intensive/categorical happens in the workers, and just needs to be stuck back together at the end. So shouldn't we have a single aggregation instead of multiple based on variable type? the weighting and summations should've already happened by this point, so everything should get the same groupby/apply?
@knaaptime
Copy link
Member Author

knaaptime commented Sep 11, 2023

a couple updates after looking at this a bit.

On the same dataset in the gist,_area_tables_binning takes 4 minutes, which confirms it's building the area-of-intersection sparse matrix that consumes all the time when n gets large (so yeah, we need to think about dask as a replacement for joblib in _area_tables_binning_parallel, then operations on the resulting sparse matrix should all be fast).

It's still not clear to me why extensive and intensive aren't working, because we should be able to aggregate the results from area_tables_interpolate from the distributed dask dataframes the same way as we do for categorical. The only difference between categorical and intensive/extensive in the single core implementation is that categorical just does a row-sum across the table (masked by different categorical values) whereas the extensive/intensive first do a row-transform, then a dot product with the ext/int variables. Since the intersection table is correct (otherwise categorical couldnt be correct), its something about the math that's off, and my best guess would be it's the row-transform (maybe we're missing important neighbors in other partitions?). I dunno.

Either way, if we use dask to distribute building the values of the AoI table, (i.e. replace _area_tables_binning_parallel, moving the parallelization down a level from where it is now) i think it would solve that issue anyway. Currently, we get a speedup with dask because we're breaking the geodataframes into spatial partitions and building several AoI tables, operating on them, and aggregating the results. But i think we need to (use dask to) break the pieces apart to build a single 'table' (sparse aoi matrix), then resume business as usual. Like with the new libpysal.Graph, i think maybe the way to handle this is to build the AoI table as an adjlist in the distributed dask df, then bring the chunks back and pivot to sparse, then send the table along

what's the purpose of category_vars? The categorical variables should already come back formatted like that, so this block is basically just doing the same thing but forcing categorical instead of using unique

ok, i see dask needs the metadata ahead of time, so we need to create the new columns, but still not sure we need to cast as categorical instead of looking at unique vals and allowing string type?

@knaaptime
Copy link
Member Author

cc @martinfleis @ljwolf @sjsrey in case anyone else has thoughts

@darribas
Copy link
Member

darribas commented Sep 11, 2023

Thanks very much for chipping in @knaaptime !!!

A few thoughts, follow up's:

  • On the categorical vars, yes, it's basically what you say, it's very expensive for Dask to do a full pass just to get the number of variables. Also, remember Dask is not only parallel but also lazy computation, so you need to be able to describe de full graph before computing. If you don't know the number (and which ones) of categories, you can't build the full computation graph (e.g., you don't know the column number or names of the output table). Lazy also means you can't assume the data is in memory, so a full pass of unique() can be extremely expensivel. More generally, this point about lazy computation is a reasonable headache to keep in mind when writing Dask code :-)
  • I'd be curious to see more about when distributed crashes. For local computation, it brings the added value of being able to use the dashboard and tweak the number of workers, which is nice, but the real value is allowing you to run your code on a Dask cluster (e.g., homegrown or somewhere like on Coiled). I've looked into the code and the error does not ring any bell, it appears it has a hard time serialising something on the table (numpy int?)...
  • On the dtype issue, I did a bit of googling and nothing came obvious, my sense was there might have been a switch in Dask to the types it returns, or how the checker consideres differences in the test. I haven't tested it myself yet, but my hope is the actual values are correct. I'd love to hear more from @martinfleis in case this is something that's come up in other libs and/or is aware of a solution.
  • hilbert_distance is technically not a Dask creation but the output of the spatial shuffle, and it contains information on the geo-packing (essentially, as I understand it, these algos sort observations so that each of them is placed before and after of others that are geographically nearby) so dropping this information may preclude using it in useful contexts in the future. Having said that, if we drop it on the output table, that may be fine, as it pressumably stays on the input table we use.
  • On the interpolation for other variables, the biggie here.
    • I'm also still a bit not 100% sure why (at least extensive) doesn't work, but after looking at the single-core code, my conclusion was that the weighting done when row standardising (L. 316-321) requires knowing all the values, so if we weight on a chunk-by-chunk basis within each worker, those weights will be off.
    • My conclusion then was, if that is the case, we're a bit screwed to make it run fast on Dask, because we then have to re-write the algo in Dask, rather than use it as a scheduler of chunks, and the bringing together later one requires a bunch of inter-worker computation, which is very very slow. All of this made me loose excitement and I parked it :-)
    • Having said, that, I may be wrong or may not be seeing a bit that allows us to still speed up the single-machine implementation or I may just be wrong in the diagnosis and there's still a way of interpolating extensive/intensive vars fully on a chunk-by-chunk basis.
    • What I think I'll do is dig up the code experimentation I wrote for this and add it as a PR at least for folks to look into, and maybe we can find a way? UPDATE: see WIP Native Dask implementation for area interpolation (Do not merge) #187

@knaaptime
Copy link
Member Author

I'm also still a bit not 100% sure why (at least extensive) doesn't work, but after looking at the single-core code, my conclusion was that the weighting done when row standardising (L. 316-321) requires knowing all the values, so if we weight on a chunk-by-chunk basis within each worker, those weights will be off.
My conclusion then was, if that is the case, we're a bit screwed to make it run fast on Dask, because we then have to re-write the algo in Dask, rather than use it as a scheduler of chunks,

exactly. So we reimplement area_tables_binnning using dask as the backend instead of joblib, but thats basically just a matter of moving the logic over (as you've already done in #187)

Having said, that, I may be wrong or may not be seeing a bit that allows us to still speed up the single-machine implementation or I may just be wrong in the diagnosis and there's still a way of interpolating extensive/intensive vars fully on a chunk-by-chunk basis.

I dont think we want to interpolate the chunks. The tests make clear that all the expensive computation is building the adjacency table, but multiplying through it is basically costless. Conversely, it is very expensive to try and send the multiplication through the distributed chunks. So in #187, lets just return the adjacency list, convert to (pandas not dask) multiindex series with the sparse dtype, then dump that out to sparse. Then we just pick up from right here, having already handled the expensive part

@knaaptime
Copy link
Member Author

@darribas can you take a look at what's going wrong with the dask interpolator? I want to cut a new release this month but dont want to do so until everything is passing. If you dont have time i'll make the dask stuff private until its ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants