在 FastAPI 中使用 WebSocket 非常簡(jiǎn)單。首先需要導(dǎo)入 WebSocket 類和 WebSocketDisconnect 異常類,然后在路由函數(shù)中添加一個(gè) WebSocket 參數(shù)來(lái)處理 WebSocket 連接。
以下是一個(gè)簡(jiǎn)單的例子:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
在這個(gè)例子中,我們創(chuàng)建了一個(gè) WebSocket 端點(diǎn) /ws
,當(dāng)有客戶端連接時(shí),會(huì)調(diào)用 websocket_endpoint
函數(shù)處理連接。在函數(shù)中我們首先調(diào)用 await websocket.accept()
來(lái)接受連接,并開始一個(gè)無(wú)限循環(huán)來(lái)接收和發(fā)送消息。
當(dāng)客戶端發(fā)送消息時(shí),我們通過 await websocket.receive_text()
方法來(lái)接收消息,并通過 await websocket.send_text()
方法來(lái)發(fā)送消息給客戶端。
在 FastAPI 中使用 WebSocket 很容易,你可以根據(jù)自己的需求來(lái)處理 WebSocket 連接和消息。