Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Flox with cubed #224

Open
TomNicholas opened this issue Mar 13, 2023 · 24 comments · May be fixed by #249
Open

Using Flox with cubed #224

TomNicholas opened this issue Mar 13, 2023 · 24 comments · May be fixed by #249

Comments

@TomNicholas
Copy link
Contributor

TomNicholas commented Mar 13, 2023

What would need to happen to use flox with cubed instead of dask?

(I see this question as part of fully solving pydata/xarray#6807.)

In the code I see blockwise being used, but also dask.array.reductions._tree_reduce and custom HighLevelGraph objects being created. Is there any combination of arguments to flox that only uses blockwise? Could more of flox be made to work via the blockwise abstraction? Should we add ._tree_reduce to our list of useful computation patterns for chunked arrays that includes blockwise, map_blocks, and apply_gufunc?

Sorry if we already discussed this elsewhere.

EDIT: cc @tomwhite

@dcherian
Copy link
Collaborator

Should we add ._tree_reduce to our list of useful computation patterns for chunked arrays that includes blockwise, map_blocks, and apply_gufunc

It's implemented in both cubed and dask.array as reduction (https://tom-e-white.com/cubed/operations.html#reduction-and-arg-reduction, https://docs.dask.org/en/stable/generated/dask.array.reduction.html).

We should be able to "just" change _tree_reduce(...) to reduction(chunk=identity, ...) where

def identity(x):
    return x

With this, method="blockwise" and method="map-reduce" will work. cohorts will take some thinking. Right now it's built around a .blocks accessor so perhaps cubed could add that.

@TomNicholas
Copy link
Contributor Author

TomNicholas commented Jun 24, 2023

So I tried to hack flox into accepting cubed.Array objects in #249, to see what exactly is missing to make it work. I'm running the example in cubed-dev/cubed#223. (Also requires pydata/xarray#7941)

For method='cohorts'

I got to the point of cubed needing to implement .blocks as expected.

File ~/Documents/Work/Code/flox/flox/core.py:1454, in dask_groupby_agg(array, by, agg, expected_groups, axis, fill_value, method, reindex, engine, sort, chunks_cohorts)
   1452 for blks, cohort in chunks_cohorts.items():
   1453     index = pd.Index(cohort)
-> 1454     subset = subset_to_blocks(intermediate, blks, array.blocks.shape[-len(axis) :])
   1455     reindexed = dask.array.map_blocks(
   1456         reindex_intermediates, subset, agg=agg, unique_groups=index, meta=subset._meta
   1457     )
   1458     # now that we have reindexed, we can set reindex=True explicitlly

AttributeError: 'Array' object has no attribute 'blocks'

For method='map-reduce'

it seems cubed might need to implement concatenate as an argument to blockwise,as concatenate=False is always passed by flox:

File ~/Documents/Work/Code/flox/flox/core.py:1432, in dask_groupby_agg(array, by, agg, expected_groups, axis, fill_value, method, reindex, engine, sort, chunks_cohorts)
   1426 # Each chunk of `reduced`` is really a dict mapping
   1427 # 1. reduction name to array
   1428 # 2. "groups" to an array of group labels
   1429 # Note: it does not make sense to interpret axis relative to
   1430 # shape of intermediate results after the blockwise call
   1431 if method == "map-reduce":
-> 1432     reduced = tree_reduce(
   1433         intermediate,
   1434         combine=partial(combine, agg=agg),
   1435         aggregate=partial(aggregate, expected_groups=expected_groups, reindex=reindex),
   1436     )
   1437     if is_duck_dask_array(by_input) and expected_groups is None:
   1438         groups = _extract_unknown_groups(reduced, dtype=by.dtype)

TypeError: reduction() got an unexpected keyword argument 'concatenate'

For method='blockwise'

an error from what looks like a potential bug inside cubed.rechunk (from the call to rechunk_for_blockwise inside flox.core.groupby_reduce:

File ~/Documents/Work/Code/cubed/cubed/primitive/rechunk.py:124, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    121     target_chunks = source_chunks
    123 if isinstance(target_chunks, dict):
--> 124     array_dims = _get_dims_from_zarr_array(source_array)
    125     try:
    126         target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:21, in _get_dims_from_zarr_array(z_array)
     18 def _get_dims_from_zarr_array(z_array):
     19     # use Xarray convention
     20     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
---> 21     return z_array.attrs["_ARRAY_DIMENSIONS"]

AttributeError: 'LazyZarrArray' object has no attribute 'attrs

@dcherian
Copy link
Collaborator

t seems cubed might need to implement concatenate as an argument to blockwise,as concatenate=False is always passed by flox

Could delete it. The default in dask is None, so that line might not be doing anything.

@TomNicholas
Copy link
Contributor Author

The default in dask is None

Is it? Looks like it's True to me?

Commenting out the concatenate kwarg in every place it occurs allows me to build a nice-looking graph for groupby using map-reduce:

image

This is for 120 chunks, using

mean = ds.groupby("time.dayofyear").mean(skipna=False, method='map-reduce')  # skipna=False to avoid eager load, see xarray issue #7243

Notice also that because this is the map-reduce strategy the output object has only one chunk, meaning the Spec has to be large.

This doesn't actually execute though (unsuprisingly), .compute fails with

--------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:382, in Retrying.__call__(self, fn, *args, **kwargs)
    381 try:
--> 382     result = fn(*args, **kwargs)
    383 except BaseException:  # noqa: B902

File ~/Documents/Work/Code/cubed/cubed/runtime/executors/python.py:10, in exec_stage_func(func, *args, **kwargs)
      8 @retry(stop=stop_after_attempt(3))
      9 def exec_stage_func(func, *args, **kwargs):
---> 10     return func(*args, **kwargs)

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:67, in apply_blockwise(out_key, config)
     65     args.append(arg)
---> 67 result = config.function(*args)
     68 if isinstance(result, dict):  # structured array with named fields

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:259, in fuse.<locals>.fused_func(*args)
    258 def fused_func(*args):
--> 259     return pipeline2.config.function(pipeline1.config.function(*args))

TypeError: <lambda>() got an unexpected keyword argument 'axis'

Because of the way cubed retries things the traceback doesn't give me any more useful context than this though. Is there a way to resurface the useful part of the traceback @tomwhite?

@tomwhite
Copy link
Contributor

Because of the way cubed retries things the traceback doesn't give me any more useful context than this though. Is there a way to resurface the useful part of the traceback @tomwhite?

This might help: https://tenacity.readthedocs.io/en/latest/index.html#error-handling. If so we should add reraise=True everywhere in Cubed.

@TomNicholas
Copy link
Contributor Author

Thanks for the rapid reply!

I tried adding that into cubed's python executor but the traceback still has me none the wiser as to where axis is being passed in erroneously.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[17], line 1
----> 1 result['asn'].compute()

File ~/Documents/Work/Code/xarray/xarray/core/dataarray.py:1102, in DataArray.compute(self, **kwargs)
   1083 """Manually trigger loading of this array's data from disk or a
   1084 remote source into memory and return a new array. The original is
   1085 left unaltered.
   (...)
   1099 dask.compute
   1100 """
   1101 new = self.copy(deep=False)
-> 1102 return new.load(**kwargs)

File ~/Documents/Work/Code/xarray/xarray/core/dataarray.py:1076, in DataArray.load(self, **kwargs)
   1058 def load(self: T_DataArray, **kwargs) -> T_DataArray:
   1059     """Manually trigger loading of this array's data from disk or a
   1060     remote source into memory and return this array.
   1061 
   (...)
   1074     dask.compute
   1075     """
-> 1076     ds = self._to_temp_dataset().load(**kwargs)
   1077     new = self._from_temp_dataset(ds)
   1078     self._variable = new._variable

File ~/Documents/Work/Code/xarray/xarray/core/dataset.py:792, in Dataset.load(self, **kwargs)
    789 chunkmanager = get_chunked_array_type(*lazy_data.values())
    791 # evaluate all the chunked arrays simultaneously
--> 792 evaluated_data = chunkmanager.compute(*lazy_data.values(), **kwargs)
    794 for k, data in zip(lazy_data, evaluated_data):
    795     self.variables[k].data = data

File ~/Documents/Work/Code/cubed-xarray/cubed_xarray/cubedmanager.py:69, in CubedManager.compute(self, *data, **kwargs)
     66 def compute(self, *data: "CubedArray", **kwargs) -> tuple[np.ndarray, ...]:
     67     from cubed import compute
---> 69     return compute(*data, **kwargs)

File ~/Documents/Work/Code/cubed/cubed/core/array.py:410, in compute(executor, callbacks, optimize_graph, resume, *arrays, **kwargs)
    407         executor = PythonDagExecutor()
    409 _return_in_memory_array = kwargs.pop("_return_in_memory_array", True)
--> 410 plan.execute(
    411     executor=executor,
    412     callbacks=callbacks,
    413     optimize_graph=optimize_graph,
    414     resume=resume,
    415     array_names=[a.name for a in arrays],
    416     **kwargs,
    417 )
    419 if _return_in_memory_array:
    420     return tuple(a._read_stored() for a in arrays)

File ~/Documents/Work/Code/cubed/cubed/core/plan.py:202, in Plan.execute(self, executor, callbacks, optimize_graph, resume, array_names, **kwargs)
    197 if callbacks is not None:
    198     [
    199         callback.on_compute_start(dag, resume=resume)
    200         for callback in callbacks
    201     ]
--> 202 executor.execute_dag(
    203     dag,
    204     callbacks=callbacks,
    205     array_names=array_names,
    206     resume=resume,
    207     **kwargs,
    208 )
    209 if callbacks is not None:
    210     [callback.on_compute_end(dag) for callback in callbacks]

File ~/Documents/Work/Code/cubed/cubed/runtime/executors/python.py:22, in PythonDagExecutor.execute_dag(self, dag, callbacks, array_names, resume, **kwargs)
     20 if stage.mappable is not None:
     21     for m in stage.mappable:
---> 22         exec_stage_func(stage.function, m, config=pipeline.config)
     23         if callbacks is not None:
     24             event = TaskEndEvent(array_name=name)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:289, in BaseRetrying.wraps.<locals>.wrapped_f(*args, **kw)
    287 @functools.wraps(f)
    288 def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
--> 289     return self(f, *args, **kw)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:379, in Retrying.__call__(self, fn, *args, **kwargs)
    377 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
    378 while True:
--> 379     do = self.iter(retry_state=retry_state)
    380     if isinstance(do, DoAttempt):
    381         try:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:325, in BaseRetrying.iter(self, retry_state)
    323     retry_exc = self.retry_error_cls(fut)
    324     if self.reraise:
--> 325         raise retry_exc.reraise()
    326     raise retry_exc from fut.exception()
    328 if self.wait:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:158, in RetryError.reraise(self)
    156 def reraise(self) -> t.NoReturn:
    157     if self.last_attempt.failed:
--> 158         raise self.last_attempt.result()
    159     raise self

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    437     raise CancelledError()
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    441 self._condition.wait(timeout)
    443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:382, in Retrying.__call__(self, fn, *args, **kwargs)
    380 if isinstance(do, DoAttempt):
    381     try:
--> 382         result = fn(*args, **kwargs)
    383     except BaseException:  # noqa: B902
    384         retry_state.set_exception(sys.exc_info())  # type: ignore[arg-type]

File ~/Documents/Work/Code/cubed/cubed/runtime/executors/python.py:10, in exec_stage_func(func, *args, **kwargs)
      8 @retry(reraise=True, stop=stop_after_attempt(3))
      9 def exec_stage_func(func, *args, **kwargs):
---> 10     return func(*args, **kwargs)

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:67, in apply_blockwise(out_key, config)
     64     arg = arr[chunk_key]
     65     args.append(arg)
---> 67 result = config.function(*args)
     68 if isinstance(result, dict):  # structured array with named fields
     69     for k, v in result.items():

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:259, in fuse.<locals>.fused_func(*args)
    258 def fused_func(*args):
--> 259     return pipeline2.config.function(pipeline1.config.function(*args))

TypeError: <lambda>() got an unexpected keyword argument 'axis'

@dcherian
Copy link
Collaborator

Your chunk lambda identity function probably needs to accept axis and keepdims

@tomwhite
Copy link
Contributor

The default in dask is None

Is it? Looks like it's True to me?

It's confusing because the default for concatenate in blockwise is None, but in reduction it's True.

Perhaps we could add a concatenate parameter to Cubed's reduction that only accepts None or False, and raises an exception for True?

@tomwhite
Copy link
Contributor

For method='blockwise'

an error from what looks like a potential bug inside cubed.rechunk (from the call to rechunk_for_blockwise inside flox.core.groupby_reduce:

File ~/Documents/Work/Code/cubed/cubed/primitive/rechunk.py:124, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    121     target_chunks = source_chunks
    123 if isinstance(target_chunks, dict):
--> 124     array_dims = _get_dims_from_zarr_array(source_array)
    125     try:
    126         target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:21, in _get_dims_from_zarr_array(z_array)
     18 def _get_dims_from_zarr_array(z_array):
     19     # use Xarray convention
     20     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
---> 21     return z_array.attrs["_ARRAY_DIMENSIONS"]

AttributeError: 'LazyZarrArray' object has no attribute 'attrs

I've opened cubed-dev/cubed#226 to fix this.

@tomwhite
Copy link
Contributor

For method='cohorts'

I got to the point of cubed needing to implement .blocks as expected.

From what I can tell, Flox only uses the shape of blocks, not the values themselves. So would it be possible to implement this using array.chunks?

@dcherian
Copy link
Collaborator

Flox only uses the shape of blocks, not the values themselves. So would it be possible to implement this using array.chunks

Maybe but I don't know that its worth it. cohorts currently uses a lot of internal dask implementation details and doesn't really work that well despite all the effort.

Perhaps with cubed we should just always have method="map-reduce" since memory use is less of a concern.

@TomNicholas
Copy link
Contributor Author

Perhaps with cubed we should just always have method="map-reduce" since memory use is less of a concern.

Right now it would just be nice to make a groupby work on at least one case to test potential performance.

Your chunk lambda identity function probably needs to accept axis and keepdims

^ Thanks - changing this leads to some kind of cubed error:

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:70, in apply_blockwise(out_key, config)
     69     for k, v in result.items():
---> 70         config.write.open().set_basic_selection(out_chunk_key, v, fields=k)
     71 else:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/core.py:1486, in Array.set_basic_selection(self, selection, value, fields)
   1485 else:
-> 1486     return self._set_basic_selection_nd(selection, value, fields=fields)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/core.py:1790, in Array._set_basic_selection_nd(self, selection, value, fields)
   1788 indexer = BasicIndexer(selection, self)
-> 1790 self._set_selection(indexer, value, fields=fields)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/core.py:1802, in Array._set_selection(self, indexer, value, fields)
   1792 def _set_selection(self, indexer, value, fields=None):
   1793 
   1794     # We iterate over all chunks which overlap the selection and thus contain data
   (...)
   1800 
   1801     # check fields are sensible
-> 1802     check_fields(fields, self._dtype)
   1803     fields = check_no_multi_fields(fields)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/indexing.py:854, in check_fields(fields, dtype)
    853 if dtype.names is None:
--> 854     raise IndexError("invalid 'fields' argument, array does not have any fields")
    855 try:

IndexError: invalid 'fields' argument, array does not have any fields

@tomwhite
Copy link
Contributor

changing this leads to some kind of cubed error

It's quite hard to debug without the code, but it's worth trying to turn off graph optimization (fuse) by passing optimize_graph=False to compute() to check that the problem doesn't lie there.

I can also take a closer look if there's some code you are able to share.

@TomNicholas
Copy link
Contributor Author

I can also take a closer look if there's some code you are able to share.

https://gist.github.com/TomNicholas/c50b89eeb3ec9e8e49368f811689fd65

@tomwhite
Copy link
Contributor

Thanks for the notebook @TomNicholas. I tried running it, and managed to reproduce the problem.

I think what's happening is that Dask doesn't care about the dtype or shape of intermediate values in the reduction (see e.g. this comment), whereas Cubed does, since it is materializing the intermediates as Zarr arrays, so the dtypes and shapes must be correct.

Another problem is that Dask's _tree_reduce and Cubed's reduction have slightly different semantics, which may also be causing problems.

I'm going to have a look at seeing what it would take to get a Flox unit test running with Cubed.

@dcherian
Copy link
Collaborator

think what's happening is that Dask doesn't care about the dtype or shape of intermediate values in the reduction (see e.g. this comment), whereas Cubed does, since it is materializing the intermediates as Zarr arrays, so the dtypes and shapes must be correct.

We would need to migrate to using structured arrays as the intermediates (which dask now supports IIRC). Or totally simplify by making the intermediates simple arrays. These would be very major changes, I would just move on with the blogpost.

@tomwhite
Copy link
Contributor

I just reached the same conclusion after looking at it for a while!

@dcherian
Copy link
Collaborator

Sorry! I did have the same realization many months ago but didn't write it down and totally forgot about it !

@tomwhite
Copy link
Contributor

No problem! It was quite instructive looking through the code.

A couple of things I noticed:

  • One reason that blockwise is used rather than reduction to apply the initial chunk function is because there is a by grouping array that must be co-processed (and reduction can't do that).
  • The intermediate values includes a groups value in the dictionary. For the case we are looking at here, I think that the groups are known, so don't need to be passed around like this. Indeed, they wouldn't really fit in a structured array as they have a different shape to the data being reduced.

@dcherian
Copy link
Collaborator

dcherian commented Jan 2, 2024

I've been thinking about this some. I think blockwise and map-reduce can be made to work out of the box, if we adjust the intermediates to be structured arrays.

For "cohorts", I think we'll want to think about what makes sense for cubed. The current implementation really adapts to dask's preferences at the moment by (practically speaking) choosing to shuffle chunks, instead of subsets of a chunk. Perhaps for cubed a more explicit shuffle makes more sense: calculate the blockwise intermediates, but write them to multiple intermediate zarrs (one per cohort), and then tree-reduce those independently.

In any case, the fundamental idea is that some group labels tend to occur together (particularly for quasi-periodic time groupings groupby("time.hour"), groupby("time.dayofyear")), and we should be able to use that to reduce the number of reduction steps and communication overhead.

PS: find_group_cohorts is now substantially improved, and does not use the .blocks property any more.

@tomwhite
Copy link
Contributor

tomwhite commented Jan 2, 2024

I think blockwise and map-reduce can be made to work out of the box, if we adjust the intermediates to be structured arrays.

Great! Does Flox need to use the Array API for this to work, or is that not needed?

Perhaps for cubed a more explicit shuffle makes more sense: calculate the blockwise intermediates, but write them to multiple intermediate zarrs (one per cohort), and then tree-reduce those independently.

BTW I'm planning on adding tree_reduce to Cubed as a part of the optimization work (cubed-dev/cubed#339), which hopefully will make this easier.

@dcherian
Copy link
Collaborator

dcherian commented Jan 2, 2024

Does Flox need to use the Array API for this to work, or is that not needed?

Not needed AFAICT. I'm also OK to if ... else ... here.

adding tree_reduce to Cubed

Aside from algorithmic changes, isn't this just reduction with chunk=lambda x: x?

PS: It'd be nice if both dask and cubed allowed chunk=None to skip the blockwise step, since that could be arbitrarily complicated

@tomwhite
Copy link
Contributor

tomwhite commented Jan 3, 2024

Aside from algorithmic changes, isn't this just reduction with chunk=lambda x: x?

Yes

@dcherian
Copy link
Collaborator

dcherian commented Apr 30, 2024

FYI my original proposal for what is now "cohorts" was quite different. The idea came to me as "shuffling chunks" so that members of a group would be in nearby chunks, so that map-reduce would be very effective at reducing data early on.

https://gist.github.com/dcherian/6ccd76d2a6eaadb7844d61d197a8b3db

If cubed can do the 'block shuffling' without actually rewriting data, it might be a good idea still. The downside is that you may end up with massive chunks at the end of the reduction. For e.g. consider doing ds.groupby("time.dayofyear") when ds has chunksize=1 along time. You'll end up with output chunksizes being 365x bigger :D , this is why the current idea is more of "index out blocks with the members from the same groups", reduce, then concatenate.

Just though I'd bring it up since you are reimplementing these ideas.

EDIT: flox today can quickly detect when the chunking is "perfect" for this kind of shuffling.

no_overlapping_cohorts = (np.bincount(np.concatenate(tuple(chunks_cohorts.keys()))) == 1).all()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants