Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Using flask restful server and got [Session and connector has to use same event loop] error #13426

Open
mglslg opened this issue May 11, 2024 · 9 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@mglslg
Copy link

mglslg commented May 11, 2024

Bug Description

This is ElasticsearchStore code:

app = Flask(__name__)

es_store = ElasticsearchStore(
    index_name="sk_test_index",
    es_url="https://xxx.com:9200",  # found within the deployment page
    es_user="elastic",
    es_password="xxx"  # provided when creating deployment. Alternatively can reset password.
)

storage_context = StorageContext.from_defaults(vector_store=es_store)

prompt_helper = PromptHelper(
    context_window=3000,
    num_output=256,
    chunk_overlap_ratio=0.1,
    chunk_size_limit=None
)
Settings.prompt_helper = prompt_helper
Settings.chunk_size = 512

documents = SimpleDirectoryReader("./data/file").load_data()
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)  

This is restful api code:

@app.route('/chat', methods=['POST'])
def chat():
    logger.info(f"Received a request")

    nest_asyncio.apply()

    memory = ChatMemoryBuffer.from_defaults(token_limit=1500)

    chat_engine = index.as_chat_engine(
        chat_mode=ChatMode.CONTEXT,
        memory=memory,
        system_prompt=(
            "You are a helpful assistant"
        )
    )

    resp = chat_engine.chat("Who are you")

    return jsonify(resp)

And I got this error below:

File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/callbacks/utils.py", line 41, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/chat_engine/context.py", line 162, in chat
    context_str_template, nodes = self._generate_context(message)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/chat_engine/context.py", line 109, in _generate_context
    nodes = self._retriever.retrieve(message)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 146, in wrapper
    self.span_drop(id_=id_, bound_args=bound_args, instance=instance, err=e)
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 98, in span_drop
    h.span_drop(
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/span_handlers/base.py", line 77, in span_drop
    span = self.prepare_to_drop_span(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/span_handlers/null.py", line 71, in prepare_to_drop_span
    raise err
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 144, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/base/base_retriever.py", line 238, in retrieve
    nodes = self._retrieve(query_bundle)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 146, in wrapper
    self.span_drop(id_=id_, bound_args=bound_args, instance=instance, err=e)
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 98, in span_drop
    h.span_drop(
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/span_handlers/base.py", line 77, in span_drop
    span = self.prepare_to_drop_span(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/span_handlers/null.py", line 71, in prepare_to_drop_span
    raise err
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 144, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py", line 101, in _retrieve
    return self._get_nodes_with_embeddings(query_bundle)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/core/indices/vector_store/retrievers/retriever.py", line 176, in _get_nodes_with_embeddings
    query_result = self._vector_store.query(query, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/vector_stores/elasticsearch/base.py", line 502, in query
    return asyncio.get_event_loop().run_until_complete(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/asyncio/futures.py", line 203, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/asyncio/tasks.py", line 277, in __step
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/vector_stores/elasticsearch/base.py", line 574, in aquery
    response = await client.search(
               ^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/elasticsearch/_async/client/__init__.py", line 4114, in search
    return await self.perform_request(  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/elasticsearch/_async/client/_base.py", line 271, in perform_request
    response = await self._perform_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/elasticsearch/_async/client/_base.py", line 316, in _perform_request
    meta, resp_body = await self.transport.perform_request(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/elastic_transport/_async_transport.py", line 264, in perform_request
    resp = await node.perform_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/elastic_transport/_node/_http_aiohttp.py", line 136, in perform_request
    self._create_aiohttp_session()
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/elastic_transport/_node/_http_aiohttp.py", line 252, in _create_aiohttp_session
    self.session = aiohttp.ClientSession(
                   ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/aiohttp/client.py", line 255, in __init__
    raise RuntimeError("Session and connector has to use same event loop")

How can I solve this?

Version

0.10.19

Steps to Reproduce

Start a Flask service, then use a REST request to query an index bound to Elasticsearch.

Relevant Logs/Tracbacks

No response

@mglslg mglslg added bug Something isn't working triage Issue needs to be triaged/prioritized labels May 11, 2024
Copy link

dosubot bot commented May 11, 2024

Hey there, @mglslg! 😄 Long time, no bug, huh? It's always a pleasure to see your name pop up, even if it's accompanied by a tricky issue. I'm diving into this headfirst to untangle it for you, so hang tight! Will get back to you with a full answer shortly.

@logan-markewich
Copy link
Collaborator

My first suggestion is to rewrite your api endpoint using actual async, but I'm not sure how async in flask works (I typically use fastapi)

Alternatively, seems like an internal issue in the elastic search client

@mglslg
Copy link
Author

mglslg commented May 11, 2024

@logan-markewich Thank you for your help :) Yes, it is indeed an issue with Elasticsearch itself, but I also tried using a custom-created Elasticsearch synchronized client and it still doesn't work. Then I added nest_asyncio.apply() in the rest API code, but it still throw the same error. Currently, I am not very keen on switching the app to another async-supported framework. Is there any other way to solve this issue?

Copy link

dosubot bot commented May 11, 2024

To address the "RuntimeError: Session and connector has to use same event loop" error in your Flask application without switching to an async-supported framework, you'll need to ensure that your Elasticsearch queries are executed in a synchronous manner. This involves avoiding direct use of asynchronous methods within Flask route handlers, which are not compatible with Flask's synchronous nature.

Here's a strategy to adapt your Elasticsearch operations to be synchronous within Flask:

  1. Synchronous Elasticsearch Operations: Ensure that all interactions with Elasticsearch from your Flask application are done synchronously. If your ElasticsearchStore or any related component uses asynchronous calls, you should modify these to be synchronous or use synchronous equivalents if available.

  2. Adapting Asynchronous Calls: If you have asynchronous methods that need to be called from Flask routes, consider wrapping these calls to run them synchronously. However, be cautious as directly using asyncio.run() in Flask routes can lead to performance issues and block the server's event loop. It's generally better to redesign the async parts to be compatible with Flask's synchronous execution model.

  3. Example Adaptation: If your ElasticsearchStore class or any method you're using to interact with Elasticsearch is designed to be asynchronous, you'll need to either provide a synchronous interface for it or ensure that its asynchronous operations are properly managed when called from Flask routes.

By focusing on ensuring that all Elasticsearch queries and related operations are executed in a synchronous context within your Flask application, you can avoid the event loop error and ensure compatibility with Flask's execution model.

Details

Sources


About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@mglslg
Copy link
Author

mglslg commented May 11, 2024

@dosubot Then how can I specify using a synchronous Elasticsearch client in LlamaIndex?

Copy link

dosubot bot commented May 11, 2024

To use a synchronous Elasticsearch client in LlamaIndex within a Flask application, you can adapt asynchronous operations to run synchronously. This involves wrapping asynchronous method calls with asyncio.get_event_loop().run_until_complete(). Here's how you can apply this approach:

  1. Wrap Asynchronous Calls: For any asynchronous operation you need to perform with Elasticsearch in Flask, use asyncio.get_event_loop().run_until_complete() to execute these operations synchronously. This method allows asynchronous functions to be awaited and their results returned just like synchronous functions.

  2. Example Implementation:

    • When adding nodes to Elasticsearch, wrap the call like so:
      asyncio.get_event_loop().run_until_complete(self.async_add(nodes))
    • Similarly, for deleting nodes:
      asyncio.get_event_loop().run_until_complete(self.adelete(ref_doc_id))
  3. Integrate with Flask: Ensure that nest_asyncio.apply() is called in your Flask application setup to patch the event loop for compatibility with synchronous Flask routes.

By following these steps, you can maintain your Flask application's synchronous execution model while performing asynchronous Elasticsearch operations under the hood. This method leverages the power of asynchronous IO with Elasticsearch without needing to switch your entire application to an asynchronous framework.

Details

Sources


About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@mglslg
Copy link
Author

mglslg commented May 11, 2024

@dosubot I tried to use synchronized client like this:

es_client=elasticsearch.Elasticsearch(**connection_params)

es_store = ElasticsearchStore(
    index_name="sk_test_index",
    es_client=es_client
)

But got this exception:

  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/vector_stores/elasticsearch/base.py", line 351, in add
    return asyncio.get_event_loop().run_until_complete(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/asyncio/futures.py", line 203, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/asyncio/tasks.py", line 277, in __step
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/vector_stores/elasticsearch/base.py", line 383, in async_add
    await self._create_index_if_not_exists(
  File "/opt/miniconda3/envs/inc-cyjr-oms-sk-gpt/lib/python3.11/site-packages/llama_index/vector_stores/elasticsearch/base.py", line 281, in _create_index_if_not_exists
    if await self.client.indices.exists(index=index_name):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object HeadApiResponse can't be used in 'await' expression

So, is llamaindex not supporting synchronous ES client?

Copy link

dosubot bot commented May 11, 2024

It seems like the core of the issue you're facing with LlamaIndex and the synchronous Elasticsearch client revolves around the execution context—specifically, the mix-up between synchronous and asynchronous operations. LlamaIndex is designed to support both synchronous and asynchronous operations, which gives you the flexibility to choose the approach that best fits your application's architecture. However, this flexibility also requires careful management of execution contexts to avoid errors.

From the error message you shared, it appears that there's an attempt to use an asynchronous operation in a context that expects synchronous execution. This is evident from the TypeError: object HeadApiResponse can't be used in 'await' expression, indicating that an asynchronous response object is being incorrectly used in a synchronous context.

To resolve this issue, you should ensure that your usage of Elasticsearch operations through LlamaIndex aligns with the execution model of your Flask application. If your Flask application is designed to operate synchronously, you should use LlamaIndex's synchronous methods (e.g., add, query) without await. Conversely, if you're working within an asynchronous context or adapting parts of your application to be asynchronous, you should use the asynchronous counterparts of these methods (e.g., async_add, aquery) and ensure they are awaited properly.

Here's a quick recap of the approach you should take:

  • For synchronous operations: Directly call the method without using await. This is suitable for Flask routes that are designed to operate synchronously.
  • For asynchronous operations: Use await with the method call, ensuring that the Flask route or the context in which this operation is performed supports asynchronous execution.

This distinction is crucial for avoiding errors related to execution context mismatches. If your application primarily uses Flask in a synchronous manner and you wish to keep it that way, focusing on the synchronous methods provided by LlamaIndex and ensuring any asynchronous operations are properly adapted to fit this model will be key.

Remember, the goal is to match your operations with the execution context of your application. Misalignments between synchronous and asynchronous contexts are common sources of errors when integrating asynchronous capabilities into traditionally synchronous frameworks like Flask.

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@minglu7
Copy link
Contributor

minglu7 commented Jun 1, 2024

Every time you use es_vector_store, just recreate the object, don't get it from the context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

3 participants