Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Dask: test_case_when() randomly fails on Windows with CancelledError #7148

Open
3 tasks done
AndreyPavlenko opened this issue Apr 4, 2024 · 0 comments
Open
3 tasks done
Labels
bug 🦗 Something isn't working Dask ⚡ Issues related to the Dask engine

Comments

@AndreyPavlenko
Copy link
Collaborator

Modin version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest released version of Modin.

  • I have confirmed this bug exists on the main branch of Modin. (In order to do this you can follow this guide.)

Reproducible Example

Ran the test.

Issue Description

See #6972 for details.

Expected Behavior

Do not fail.

Error Logs

self = <Client: 'tcp://127.0.0.1:61644' processes=4 threads=4, memory=16.00 GiB>
futures = [<Future: cancelled, key: lambda-03566af00f50e1eaeaefe1cde41abeea>]
errors = 'raise', direct = False, local_worker = None

    async def _gather(self, futures, errors="raise", direct=None, local_worker=None):
        unpacked, future_set = unpack_remotedata(futures, byte_keys=True)
        mismatched_futures = [f for f in future_set if f.client is not self]
        if mismatched_futures:
            raise ValueError(
                "Cannot gather Futures created by another client. "
                f"These are the {len(mismatched_futures)} (out of {len(futures)}) "
                f"mismatched Futures and their client IDs (this client is {self.id}): "
                f"{ {f: f.client.id for f in mismatched_futures} }"  # noqa: E201, E202
            )
        keys = [future.key for future in future_set]
        bad_data = dict()
        data = {}
    
        if direct is None:
            direct = self.direct_to_workers
        if direct is None:
            try:
                w = get_worker()
            except Exception:
                direct = False
            else:
                if w.scheduler.address == self.scheduler.address:
                    direct = True
    
        async def wait(k):
            """Want to stop the All(...) early if we find an error"""
            try:
                st = self.futures[k]
            except KeyError:
                raise AllExit()
            else:
                await st.wait()
            if st.status != "finished" and errors == "raise":
                raise AllExit()
    
        while True:
            logger.debug("Waiting on futures to clear before gather")
    
            with suppress(AllExit):
                await distributed.utils.All(
                    [wait(key) for key in keys if key in self.futures],
                    quiet_exceptions=AllExit,
                )
    
            failed = ("error", "cancelled")
    
            exceptions = set()
            bad_keys = set()
            for key in keys:
                if key not in self.futures or self.futures[key].status in failed:
                    exceptions.add(key)
                    if errors == "raise":
                        try:
                            st = self.futures[key]
                            exception = st.exception
                            traceback = st.traceback
                        except (KeyError, AttributeError):
                            exc = CancelledError(key)
                        else:
                            raise exception.with_traceback(traceback)
>                       raise exc
E                       concurrent.futures._base.CancelledError: lambda-03566af00f50e1eaeaefe1cde41abeea

C:\Miniconda3\envs\modin\lib\site-packages\distributed\client.py:2233: CancelledError

Installed Versions

23c1ec0

@AndreyPavlenko AndreyPavlenko added bug 🦗 Something isn't working Triage 🩹 Issues that need triage Dask ⚡ Issues related to the Dask engine and removed Triage 🩹 Issues that need triage labels Apr 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working Dask ⚡ Issues related to the Dask engine
Projects
None yet
Development

No branches or pull requests

1 participant