在FastAPI中實(shí)現(xiàn)背景任務(wù)可以使用Python的asyncio
庫(kù)來(lái)實(shí)現(xiàn)。以下是一個(gè)簡(jiǎn)單的示例代碼:
from fastapi import BackgroundTasks, FastAPI
import asyncio
app = FastAPI()
def background_task():
# 模擬一個(gè)長(zhǎng)時(shí)間運(yùn)行的任務(wù)
asyncio.sleep(5)
print("Background task completed")
@app.post("/send-notification/{message}")
async def send_notification(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_task)
return {"message": f"Notification '{message}' sent in the background"}
在上面的示例中,我們定義了一個(gè)背景任務(wù)background_task
,它模擬了一個(gè)長(zhǎng)時(shí)間運(yùn)行的任務(wù)。然后我們定義了一個(gè)路由/send-notification/{message}
,當(dāng)用戶(hù)訪(fǎng)問(wèn)這個(gè)路由時(shí),會(huì)觸發(fā)發(fā)送通知的操作,并將背景任務(wù)background_task
添加到BackgroundTasks
中。這樣在接收到請(qǐng)求后,就會(huì)異步執(zhí)行這個(gè)背景任務(wù),不會(huì)阻塞主線(xiàn)程。
請(qǐng)注意,需要在啟動(dòng)應(yīng)用程序時(shí)運(yùn)行uvicorn
服務(wù)器時(shí)添加--reload
參數(shù),以便在代碼更改時(shí)重新加載應(yīng)用程序。