Example for running celery task which includes db transcation in async fastapi app #7579
-
Is there any working example of running a celery task, which has some DB transactions ( async DB transaction) ?? Note: I'm using MongoDB (motor) for db utils. This URL is a well-documented example for running fastapi with MongoDB in async fashion. I've checked all project template generator as well. None of them have any example for having a celery task interacting with db transactions. ( I'll be okay to contribute on this once these current issues will get resolve) I've found that there is no support for an async function in current (stable) celery release. Until then, it's going to be a massive rewriting for me,if I wanted to use fastapi with celery ( Need to drop off MongoDB async way). Is there any better way to deal with this situation? So, that fastapi can have few endpoints work with MongoDB (motor) transactions and a few endpoints ( which have celery task) can consume db functions in sync way to work well. In a nutshell, The reason, I'm asking these questions because I'm getting below exception in the celery tasks. which are depending on db calls ( mongodb motor).
|
Beta Was this translation helpful? Give feedback.
Replies: 10 comments 1 reply
-
Can you provide a code sample for this problem? |
Beta Was this translation helpful? Give feedback.
-
@retnikt I've achieved to run celery task with DB transactions in async fashion. Though, it was not at all straight forward solutions. I have to do some cleanup in code and will share it after that. I'll create a git repo with detailed information for using celery task + scheduling + fast API in next few days. |
Beta Was this translation helpful? Give feedback.
-
Thanks for reporting back and closing the issue. 👍 If using async functions for distributed background tasks, I would try ARQ 😉 |
Beta Was this translation helpful? Give feedback.
-
@tiangolo I'd tried ARQ already. But, it can't handle too much load. It got failed in my load testing. Celery was the only solution in my case. Though ARQ works pretty well if you don't have a much-complicated task to be performed. Nevertheless, Celery coroutine support is in pipeline and will get a release in celery 5.0 as per the github discussion. Code is almost ready for it. But, I'd reported an issue in it. |
Beta Was this translation helpful? Give feedback.
-
@codesutras I am facing the same issue these days, in my case I have a function that does some image processing and then updates the database, I am trying to figure out how to decorate this function using celery Task as the function contains |
Beta Was this translation helpful? Give feedback.
-
@ZhengRui could you find any solution to this? |
Beta Was this translation helpful? Give feedback.
-
celery app file from celery import Celery
app = Celery('app', backend='rpc://', broker='amqp://localhost') task definition file import asyncio
from database import Database
from ..worker import app
db = Database(f'postgresql://user:password@localhost:port/db')
async def some_task(param1: str, param2: str, current_task_ref):
await db.connect()
# interact with db
tot = 1e4
for i in range(tot):
current_task_ref.update_state(state='PROGRESSING', meta={'current': i + 1, 'total': tot})
await db.close()
return {'current': tot, 'total': tot}
@app.task(bind=True, acks_late=True)
def some_task_t(self, *args, **kwargs):
return asyncio.run(some_task(*args, **kwargs, current_task_ref=self)) a file to expose to fastapi from celery.result import AsyncResult
from .tasks.default import (
some_task_t,
another_task_t,
)
def get_task_progress(task_id):
result = AsyncResult(task_id)
return result.info in your fastapi routes import tq
@r.post('/sometask')
async def some_task_r(
param1: str,
param2: str,
db: Database = Depends(get_db),
):
task = tq.some_task_t.delay(
param1,
param2,
)
return task.task_id
@r.get('/sometask/progress')
async def get_some_task_progress_r(
task_id: str,
):
return tq.get_task_progress(task_id) when rabbitmq server and postgres database is ready, start the celery worker: |
Beta Was this translation helpful? Give feedback.
-
@ZhengRui thanks a lot for this. |
Beta Was this translation helpful? Give feedback.
-
@ZhengRui I tried the above - I get this error:
this is my app.py
|
Beta Was this translation helpful? Give feedback.
-
Below is a detailed code of how I dealt with this. Python async/await is still very weird to me. So, feel free to correct. My celery app was a separate service running in docker. import asyncio
import threading
from utils import init_cache, close_cache
thread_local = threading.local()
async def fetch_user_api():
# Saving api call in redis cache
async def update_all_users():
await fetch_user_api()
def get_or_create_event_loop():
if not hasattr(thread_local, "loop") or thread_local.loop.is_closed():
thread_local.loop = asyncio.new_event_loop()
asyncio.set_event_loop(thread_local.loop)
return thread_local.loop
@worker_init.connect
def on_worker_init(*args, **kwargs):
loop = get_or_create_event_loop()
loop.run_until_complete(init_cache())
print("Cache initialized!")
@worker_shutdown.connect
def on_worker_shutdown(*args, **kwargs):
loop = get_or_create_event_loop()
loop.run_until_complete(close_cache())
loop.close()
print("Cache closed!")
# Task runs every 5 mins
@celery_app.task(acks_late=True)
def periodic_update():
loop = get_or_create_event_loop()
loop.run_until_complete(update_all_users()) |
Beta Was this translation helpful? Give feedback.
@retnikt I've achieved to run celery task with DB transactions in async fashion. Though, it was not at all straight forward solutions. I have to do some cleanup in code and will share it after that.
I'll create a git repo with detailed information for using celery task + scheduling + fast API in next few days.