Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery with a Redis cluster, an exception is raised: ResponseError("'BRPOP' command keys must be in the same slot") #4472

Open
4 tasks done
vicwang3 opened this issue May 17, 2024 · 1 comment · May be fixed by #4478
Open
4 tasks done
Assignees
Labels
🐞 bug Something isn't working

Comments

@vicwang3
Copy link

Self Checks

  • This is only for bug report, if you would like to ask a question, please head to Discussions.
  • I have searched for existing issues search for existing issues, including closed ones.
  • I confirm that I am using English to submit this report (我已阅读并同意 Language Policy).
  • Please do not modify this template :) and fill in all the required fields.

Dify version

0.6.8

Cloud or Self Hosted

Self Hosted (Source)

Steps to reproduce

  1. set CELERY_BROKER_URL=redis://:password@host:port/db
  2. celery -A app.celery worker -P gevent -c 1 -Q dataset,generation,mail --loglevel INFO

✔️ Expected Behavior

-------------- celery@TAKATOST.lan v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- macOS-10.16-x86_64-i386-64bit 2023-07-31 12:58:08

  • *** --- * ---
  • ** ---------- [config]
  • ** ---------- .> app: app:0x7fb568572a10
  • ** ---------- .> transport: redis://:**@localhost:6379/1
  • ** ---------- .> results: postgresql://postgres:**@localhost:5432/dify
  • *** --- * --- .> concurrency: 1 (gevent)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> dataset exchange=dataset(direct) key=dataset
    .> generation exchange=generation(direct) key=generation
    .> mail exchange=mail(direct) key=mail

[tasks]
. tasks.add_document_to_index_task.add_document_to_index_task
. tasks.clean_dataset_task.clean_dataset_task
. tasks.clean_document_task.clean_document_task
. tasks.clean_notion_document_task.clean_notion_document_task
. tasks.create_segment_to_index_task.create_segment_to_index_task
. tasks.deal_dataset_vector_index_task.deal_dataset_vector_index_task
. tasks.document_indexing_sync_task.document_indexing_sync_task
. tasks.document_indexing_task.document_indexing_task
. tasks.document_indexing_update_task.document_indexing_update_task
. tasks.enable_segment_to_index_task.enable_segment_to_index_task
. tasks.generate_conversation_summary_task.generate_conversation_summary_task
. tasks.mail_invite_member_task.send_invite_member_mail_task
. tasks.remove_document_from_index_task.remove_document_from_index_task
. tasks.remove_segment_from_index_task.remove_segment_from_index_task
. tasks.update_segment_index_task.update_segment_index_task
. tasks.update_segment_keyword_index_task.update_segment_keyword_index_task

[2023-07-31 12:58:08,831: INFO/MainProcess] Connected to redis://:@localhost:6379/1
[2023-07-31 12:58:08,840: INFO/MainProcess] mingle: searching for neighbors
[2023-07-31 12:58:09,873: INFO/MainProcess] mingle: all alone
[2023-07-31 12:58:09,886: INFO/MainProcess] pidbox: Connected to redis://:
@localhost:6379/1.
[2023-07-31 12:58:09,890: INFO/MainProcess] celery@TAKATOST.lan ready.

❌ Actual Behavior

[2024-05-17 06:19:34,129: INFO/MainProcess] Connected to redis://:**@host:6379/1
[2024-05-17 06:19:34,417: INFO/MainProcess] mingle: searching for neighbors
[2024-05-17 06:19:35,437: CRITICAL/MainProcess] Unrecoverable error: ResponseError("'BRPOP' command keys must in same slot")
Traceback (most recent call last):
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start
self.blueprint.start(self)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/worker/consumer/mingle.py", line 37, in start
self.sync(c)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/worker/consumer/mingle.py", line 41, in sync
replies = self.send_hello(c)
^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/worker/consumer/mingle.py", line 54, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/app/control.py", line 389, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/app/control.py", line 106, in _request
return self._prepare(self.app.control.broadcast(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/celery/app/control.py", line 776, in broadcast
return self.mailbox(conn)._broadcast(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/pidbox.py", line 346, in _broadcast
return self._collect(reply_ticket, limit=limit,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/pidbox.py", line 388, in _collect
self.connection.drain_events(timeout=timeout)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 997, in drain_events
get(self._deliver, timeout=timeout)
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/transport/redis.py", line 591, in get
ret = self.handle_event(fileno, event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/transport/redis.py", line 573, in handle_event
return self.on_readable(fileno), self
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/transport/redis.py", line 569, in on_readable
chan.handlerstype
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/transport/redis.py", line 962, in _brpop_read
dest__item = self.client.parse_response(self.client.connection,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/kombu/transport/redis.py", line 247, in parse_response
ret = super().parse_response(connection, command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/redis/client.py", line 562, in parse_response
response = connection.read_response()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/wangvic/miniconda3/lib/python3.11/site-packages/redis/connection.py", line 536, in read_response
raise response
redis.exceptions.ResponseError: 'BRPOP' command keys must in same slot

Copy link

dosubot bot commented May 17, 2024

The error you're encountering is due to Redis Cluster's requirement that all keys involved in a multi-key operation like BRPOP must be in the same hash slot. To resolve this, you can use "hash tags" in your queue names to ensure they're assigned to the same slot.

For example, rename your queues to include a common hash tag: {myapp}:dataset, {myapp}:generation, {myapp}:mail. This ensures all keys related to a specific queue are in the same Redis Cluster slot, avoiding the ResponseError.

Here's how you might configure Celery with these names:

app.conf.task_default_queue = '{myapp}:default'
app.conf.task_queues = {
    '{myapp}:default': {
        'exchange': '{myapp}:default',
        'routing_key': '{myapp}:default',
    },
}

Make sure all producers and consumers use the updated queue names. This approach requires changes in your codebase where the queue names are defined and used. For more details, refer to the Redis Cluster specification and the Celery documentation on routing and queues.

Details

Sources


About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@dosubot dosubot bot added the 🐞 bug Something isn't working label May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐞 bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants