FastAPI中怎么使用WebSocket

小億
391
2024-05-11 11:35:52

在 FastAPI 中使用 WebSocket 非常簡(jiǎn)單。首先需要導(dǎo)入 WebSocket 類和 WebSocketDisconnect 異常類,然后在路由函數(shù)中添加一個(gè) WebSocket 參數(shù)來(lái)處理 WebSocket 連接。

以下是一個(gè)簡(jiǎn)單的例子:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message text was: {data}")

在這個(gè)例子中,我們創(chuàng)建了一個(gè) WebSocket 端點(diǎn) /ws,當(dāng)有客戶端連接時(shí),會(huì)調(diào)用 websocket_endpoint 函數(shù)處理連接。在函數(shù)中我們首先調(diào)用 await websocket.accept() 來(lái)接受連接,并開始一個(gè)無(wú)限循環(huán)來(lái)接收和發(fā)送消息。

當(dāng)客戶端發(fā)送消息時(shí),我們通過 await websocket.receive_text() 方法來(lái)接收消息,并通過 await websocket.send_text() 方法來(lái)發(fā)送消息給客戶端。

在 FastAPI 中使用 WebSocket 很容易,你可以根據(jù)自己的需求來(lái)處理 WebSocket 連接和消息。

0