溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么在Python中異步操作數(shù)據(jù)庫(kù)

發(fā)布時(shí)間:2023-05-09 10:34:42 來(lái)源:億速云 閱讀:128 作者:iii 欄目:編程語(yǔ)言

本篇內(nèi)容介紹了“怎么在Python中異步操作數(shù)據(jù)庫(kù)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

異步操作 MySQL

異步操作 MySQL 的話,需要使用一個(gè) aiomysql,直接 pip install aiomysql 即可。

aiomysql 底層依賴于 pymysql,所以 aiomysql 并沒(méi)有單獨(dú)實(shí)現(xiàn)相應(yīng)的連接驅(qū)動(dòng),而是在 pymysql 之上進(jìn)行了封裝。

查詢記錄

下面先來(lái)看看如何查詢記錄。

import asyncio
import aiomysql.sa as aio_sa
async def main():
# 創(chuàng)建一個(gè)異步引擎
engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10)
# 通過(guò) engine.acquire() 獲取一個(gè)連接
async with engine.acquire() as conn:
# 異步執(zhí)行, 返回一個(gè)對(duì)象
result = await conn.execute("SELECT * FROM girl")
# 通過(guò) await result.fetchone() 可以獲取滿足條件的第一條記錄, 一個(gè)對(duì)象
data = await result.fetchone()
# 可以將對(duì)象想象成一個(gè)字典
print(data.keys())# KeysView((1, '古明地覺(jué)', 16, '地靈殿'))
print(list(data.keys()))# ['id', 'name', 'age', 'place']
print(data.values())# ValuesView((1, '古明地覺(jué)', 16, '地靈殿'))
print(list(data.values()))# [1, '古明地覺(jué)', 16, '地靈殿']
print(data.items())# ItemsView((1, '古明地覺(jué)', 16, '地靈殿'))
print(list(data.items()))# [('id', 1), ('name', '古明地覺(jué)'), ('age', 16), ('place', '地靈殿')]
# 直接轉(zhuǎn)成字典也是可以的
print(dict(data))# {'id': 1, 'name': '古明地覺(jué)', 'age': 16, 'place': '地靈殿'}
# 最后別忘記關(guān)閉引擎, 當(dāng)然你在創(chuàng)建引擎的時(shí)候也可以通過(guò) async with aio_sa.create_engine 的方式創(chuàng)建
# async with 語(yǔ)句結(jié)束后會(huì)自動(dòng)執(zhí)行下面兩行代碼
engine.close()
await engine.wait_closed()
 loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

怎么樣,是不是很簡(jiǎn)單呢,和同步庫(kù)的操作方式其實(shí)是類似的。但是很明顯,我們?cè)讷@取記錄的時(shí)候不會(huì)只獲取一條,而是會(huì)獲取多條,獲取多條的話使用 await result.fetchall() 即可。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
async def main():
# 通過(guò)異步上下文管理器的方式創(chuàng)建, 會(huì)自動(dòng)幫我們關(guān)閉引擎
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
result = await conn.execute("SELECT * FROM girl")
# 此時(shí)的 data 是一個(gè)列表, 列表里面是對(duì)象
data = await result.fetchall()
# 將里面的元素轉(zhuǎn)成字典
pprint(list(map(dict, data)))
"""
[{'age': 16, 'id': 1, 'name': '古明地覺(jué)', 'place': '地靈殿'},
 {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'},
 {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

除了 fetchone、fetchall 之外,還有一個(gè) fetchmany,可以獲取指定記錄的條數(shù)。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
async def main():
# 通過(guò)異步上下文管理器的方式創(chuàng)建, 會(huì)自動(dòng)幫我們關(guān)閉引擎
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
result = await conn.execute("SELECT * FROM girl")
# 默認(rèn)是獲取一條, 得到的仍然是一個(gè)列表
data = await result.fetchmany(2)
pprint(list(map(dict, data)))
"""
[{'age': 16, 'id': 1, 'name': '古明地覺(jué)', 'place': '地靈殿'},
 {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上就是通過(guò) aiomysql 查詢數(shù)據(jù)庫(kù)中的記錄,沒(méi)什么難度。但是值得一提的是,await conn.execute 里面除了可以傳遞一個(gè)原生的 SQL 語(yǔ)句之外,我們還可以借助 SQLAlchemy。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
result = await conn.execute(sql)
data = await result.fetchall()
pprint(list(map(dict, data)))
"""
[{'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'},
 {'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
添加記錄

然后是添加記錄,我們同樣可以借助 SQLAlchemy 幫助我們拼接 SQL 語(yǔ)句。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
# 我們還需要?jiǎng)?chuàng)建一個(gè) SQLAlchemy 中的引擎, 然后將表反射出來(lái)
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True
insert_sql = tbl.insert().values(
[{"name": "十六夜咲夜", "age": 17, "place": "紅魔館"},
 {"name": "琪露諾", "age": 60, "place": "霧之湖"}])
# 注意: 執(zhí)行的執(zhí)行必須開(kāi)啟一個(gè)事務(wù), 否則數(shù)據(jù)是不會(huì)進(jìn)入到數(shù)據(jù)庫(kù)中的
async with conn.begin():
# 同樣會(huì)返回一個(gè)對(duì)象
# 盡管我們插入了多條, 但只會(huì)返回最后一條的插入信息
result = await conn.execute(insert_sql)
# 返回最后一條記錄的自增 id
print(result.lastrowid)
# 影響的行數(shù)
print(result.rowcount)
# 重新查詢, 看看記錄是否進(jìn)入到數(shù)據(jù)庫(kù)中
async with engine.acquire() as conn:
data = await (await conn.execute("select * from girl")).fetchall()
data = list(map(dict, data))
pprint(data)
"""
[{'age': 16, 'id': 1, 'name': '古明地覺(jué)', 'place': '地靈殿'},
 {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'},
 {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'},
 {'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '紅魔館'},
 {'age': 60, 'id': 17, 'name': '琪露諾', 'place': '霧之湖'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

還是很方便的,但是插入多條記錄的話只會(huì)返回插入的最后一條記錄的信息,所以如果你希望獲取每一條的信息,那么就一條一條插入。

修改記錄

修改記錄和添加記錄是類似的,我們來(lái)看一下。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
update_sql = tbl.update().where(text("name = '古明地覺(jué)'")).values({"place": "東方地靈殿"})
# 同樣需要開(kāi)啟一個(gè)事務(wù)
async with conn.begin():
result = await conn.execute(update_sql)
print(result.lastrowid)# 0
print(result.rowcount) # 1
# 查詢結(jié)果
async with engine.acquire() as conn:
data = await (await conn.execute("select * from girl where name = '古明地覺(jué)'")).fetchall()
data = list(map(dict, data))
pprint(data)
"""
[{'age': 16, 'id': 1, 'name': '古明地覺(jué)', 'place': '東方地靈殿'}]
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

可以看到,記錄被成功的修改了。

刪除記錄

刪除記錄就更簡(jiǎn)單了,直接看代碼。

import asyncio
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text
async def main():
async with aio_sa.create_engine(host="xx.xx.xx.xxx",
port=3306,
user="root",
password="root",
db="_hanser",
connect_timeout=10) as engine:
async with engine.acquire() as conn:
s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
update_sql = tbl.delete()# 全部刪除
# 同樣需要開(kāi)啟一個(gè)事務(wù)
async with conn.begin():
result = await conn.execute(update_sql)
# 返回最后一條記錄的自增 id, 我們之前修改了 id = 0 記錄, 所以它跑到最后了
print(result.lastrowid)# 0
# 受影響的行數(shù)
print(result.rowcount) # 6
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

此時(shí)數(shù)據(jù)庫(kù)中的記錄已經(jīng)全部被刪除了。

整體來(lái)看還是比較簡(jiǎn)單的,并且支持的功能也比較全面。

異步操作 PostgreSQL

異步操作 PostgreSQL 的話,我們有兩個(gè)選擇,一個(gè)是 asyncpg 庫(kù),另一個(gè)是 aiopg 庫(kù)。

asyncpg 是自己實(shí)現(xiàn)了一套連接驅(qū)動(dòng),而 aiopg 則是對(duì) psycopg2 進(jìn)行了封裝,個(gè)人更推薦 asyncpg,性能和活躍度都比 aiopg 要好。

下面來(lái)看看如何使用 asyncpg,首先是安裝,直接 pip install asyncpg 即可。

查詢記錄

首先是查詢記錄。

import asyncio
from pprint import pprint
import asyncpg
async def main():
# 創(chuàng)建連接數(shù)據(jù)庫(kù)的驅(qū)動(dòng)
conn = await asyncpg.connect(host="localhost",
 port=5432,
 user="postgres",
 password="zgghyys123",
 database="postgres",
 timeout=10)
# 除了上面的方式,還可以使用類似于 SQLAlchemy 的方式創(chuàng)建
# await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
# 調(diào)用 await conn.fetchrow 執(zhí)行 select 語(yǔ)句,獲取滿足條件的單條記錄
# 調(diào)用 await conn.fetch 執(zhí)行 select 語(yǔ)句,獲取滿足條件的全部記錄
row1 = await conn.fetchrow("select * from girl")
row2 = await conn.fetch("select * from girl")
# 返回的是一個(gè) Record 對(duì)象,這個(gè) Record 對(duì)象等于將返回的記錄進(jìn)行了一個(gè)封裝
# 至于怎么用后面會(huì)說(shuō)
print(row1)#pprint(row2)
"""
[,,]
"""
# 關(guān)閉連接
await conn.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上我們演示了如何使用 asyncpg 來(lái)獲取數(shù)據(jù)庫(kù)中的記錄,我們看到執(zhí)行 select 語(yǔ)句的話,我們可以使用 conn.fetchrow(query) 來(lái)獲取滿足條件的單條記錄,conn.fetch(query) 來(lái)獲取滿足條件的所有記錄。

Record 對(duì)象

我們說(shuō)使用 conn.fetchone 查詢得到的是一個(gè) Record 對(duì)象,使用 conn.fetch 查詢得到的是多個(gè) Record 對(duì)象組成的列表,那么這個(gè) Rcord 對(duì)象怎么用呢?

import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
row = await conn.fetchrow("select * from girl")
print(type(row))#print(row)## 這個(gè) Record 對(duì)象可以想象成一個(gè)字典
# 我們可以將返回的字段名作為 key, 通過(guò)字典的方式進(jìn)行獲取
print(row["id"], row["name"])# 1 古明地覺(jué)
# 除此之外,還可以通過(guò) get 獲取,獲取不到的時(shí)候會(huì)返回默認(rèn)值
print(row.get("id"), row.get("name"))# 1 古明地覺(jué)
print(row.get("xxx"), row.get("xxx", "不存在的字段"))# None 不存在的字段
# 除此之外還可以調(diào)用 keys、values、items,這個(gè)不用我說(shuō),都應(yīng)該知道意味著什么
# 只不過(guò)返回的是一個(gè)迭代器
print(row.keys())#print(row.values())#print(row.items())## 我們需要轉(zhuǎn)成列表或者元組
print(list(row.keys()))# ['id', 'name', 'age', 'place']
print(list(row.values()))# [1, '古明地覺(jué)', 16, '地靈殿']
print(dict(row.items()))# {'id': 1, 'name': '古明地覺(jué)', 'age': 16, 'place': '地靈殿'}
print(dict(row))# {'id': 1, 'name': '古明地覺(jué)', 'age': 16, 'place': '地靈殿'}
# 關(guān)閉連接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

當(dāng)然我們也可以借助 SQLAlchemy 幫我們拼接 SQL 語(yǔ)句。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
# 我們不能直接傳遞一個(gè) Select 對(duì)象, 而是需要將其轉(zhuǎn)成原生的字符串才可以
rows = await conn.fetch(str(sql))
pprint(list(map(dict, rows)))
"""
[{'id': 2, 'name': '椎名真白', 'place': '櫻花莊'},
 {'id': 3, 'name': '古明地戀', 'place': '地靈殿'}]
"""
# 關(guān)閉連接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

此外,conn.fetch 里面還支持占位符,使用百分號(hào)加數(shù)字的方式,舉個(gè)例子:

import asyncio
from pprint import pprint
import asyncpg
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
rows = await conn.fetch("select * from girl where id != $1", 1)
pprint(list(map(dict, rows)))
"""
[{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '櫻花莊'},
 {'age': 15, 'id': 3, 'name': '古明地戀', 'place': '地靈殿'}]
"""
# 關(guān)閉連接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())
  • 還是推薦使用 SQLAlchemy 的方式,這樣更加方便一些,就像 aiomysql 一樣。但是對(duì)于 asyncpg 而言,實(shí)際上接收的是一個(gè)原生的 SQL 語(yǔ)句,是一個(gè)字符串,因此它不能像 aiomysql 一樣自動(dòng)識(shí)別 Select 對(duì)象,我們還需要手動(dòng)將其轉(zhuǎn)成字符串。而且這樣還存在一個(gè)問(wèn)題,至于是什么我們下面介紹添加記錄的時(shí)候說(shuō)。

添加記錄

然后是添加記錄,我們看看如何往庫(kù)里面添加數(shù)據(jù)。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
# 執(zhí)行 insert 語(yǔ)句我們可以使用 execute
row = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)",
 '十六夜咲夜', 17, '紅魔館')
pprint(row)# INSERT 0 1
pprint(type(row))#await conn.close()
if __name__ == '__main__':
asyncio.run(main())

通過(guò) execute 可以插入單條記錄,同時(shí)返回相關(guān)信息,但是說(shuō)實(shí)話這個(gè)信息沒(méi)什么太大用。除了 execute 之外,還有 executemany,用來(lái)執(zhí)行多條插入語(yǔ)句。

import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
# executemany:第一條參數(shù)是一個(gè)模板,第二條命令是包含多個(gè)元組的列表
# 執(zhí)行多條記錄的話,返回的結(jié)果為 None
rows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)",
[('十六夜咲夜', 17, '紅魔館'), ('琪露諾', 60, '霧之湖')])
print(rows)# None
# 關(guān)閉連接
await conn.close()
if __name__ == '__main__':
asyncio.run(main())

注意:如果是執(zhí)行大量 insert 語(yǔ)句的話,那么 executemany 要比 execute 快很多,但是 executemany 不具備事務(wù)功。

“怎么在Python中異步操作數(shù)據(jù)庫(kù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI