Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ipyparallel raises concurrent.futures._base.InvalidStateError: CANCELLED #289

Open
basnijholt opened this issue Aug 18, 2020 · 1 comment

Comments

@basnijholt
Copy link
Member

Running the following script in Python 3.8

#!/usr/bin/env python3
# ipcluster start --n=10 --profile=test --cluster-id=''
# python run_learner.py --n=10 --profile=test
from adaptive_scheduler.utils import connect_to_ipyparallel
from adaptive import Runner, Learner1D

if __name__ == "__main__":  # ← use this, see warning @ https://bit.ly/2HAk0GG

    def peak(x, a=0.01):
        return x + a**2 / (a**2 + x**2)

    learner = Learner1D(peak, bounds=(-1, 1))
    executor = connect_to_ipyparallel(profile="test", n=10)
    runner = Runner(learner, goal=lambda l: l.loss() < 0.01, executor=executor)
    runner.ioloop.run_until_complete(runner.task)

raises

Connected to 10 out of 10 engines after 0 seconds.
exception calling callback for <Future at 0x2ac131ebd220 state=finished returned list>
Traceback (most recent call last):
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 230, in _resolve_result
    self.set_result(self._reconstruct_result(results))
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 524, in set_result
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: peak:finished>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 233, in _resolve_result
    self.set_exception(e)
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: peak:finished>

I am trying to make a more minimal example but I will leave this here for now.

@basnijholt basnijholt changed the title ipyparallel concurrent.futures._base.InvalidStateError: CANCELLED ipyparallel raises concurrent.futures._base.InvalidStateError: CANCELLED Aug 18, 2020
@basnijholt
Copy link
Member Author

basnijholt commented Aug 18, 2020

A more minimal example that triggers the same error:

#!/usr/bin/env python3

# ipython profile create test
# ipcluster start --n=10 --profile=test --cluster-id=''
# python fail.py --n=10 --profile=test

import asyncio
import time
from random import random

from ipyparallel import Client
from ipyparallel.error import NoEnginesRegistered


def f(x):
    return x


def connect_to_ipyparallel(profile, n):
    client = Client(profile=profile)
    while True:
        try:
            dview = client[:]
            if len(dview) == n:
                dview.use_dill()
                return client
        except NoEnginesRegistered:
            time.sleep(0.1)


async def _run(loop, executor, ncores):
    pending = set()
    done = set()

    for _ in range(10):  # do some loops that submit futures
        while len(pending) + len(done) <= ncores:
            fut = loop.run_in_executor(executor, f, random())
            pending.add(fut)
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)

        for fut in done:
            fut.result()  # process results that are done

    for fut in pending:  # cancel the results that are pending
        fut.cancel()

    return done


def do(loop, executor, ncores):
    coro = _run(loop, executor, ncores)
    task = loop.create_task(coro)
    loop.run_until_complete(task)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    ncores = 10
    executor = connect_to_ipyparallel(profile="test", n=ncores).executor()

    for i in range(10):
        do(loop, executor, ncores)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant