您好,登錄后才能下訂單哦!
本文章向大家介紹怎么在Python中利用 Asyncio模塊實(shí)現(xiàn)一個(gè)生產(chǎn)消費(fèi)者模型的基本知識(shí)點(diǎn)總結(jié)和需要注意事項(xiàng),具有一定的參考價(jià)值,需要的朋友可以參考一下。
Python主要應(yīng)用于:1、Web開(kāi)發(fā);2、數(shù)據(jù)科學(xué)研究;3、網(wǎng)絡(luò)爬蟲(chóng);4、嵌入式應(yīng)用開(kāi)發(fā);5、游戲開(kāi)發(fā);6、桌面應(yīng)用開(kāi)發(fā)。
asyncio的關(guān)鍵字說(shuō)明
event_loop事件循環(huán):程序開(kāi)啟一個(gè)無(wú)限循環(huán),把一些函數(shù)注冊(cè)到事件循環(huán)上,當(dāng)滿足事件發(fā)生的時(shí)候,調(diào)用相應(yīng)的協(xié)程函數(shù)
coroutine協(xié)程:協(xié)程對(duì)象,指一個(gè)使用async關(guān)鍵字定義的函數(shù),它的調(diào)用不會(huì)立即執(zhí)行函數(shù),而是會(huì)返回一個(gè)協(xié)程對(duì)象,協(xié)程對(duì)象需要注冊(cè)到事件循環(huán),由事件循環(huán)調(diào)用。
task任務(wù):一個(gè)協(xié)程對(duì)象就是一個(gè)原生可以掛起的函數(shù),任務(wù)則是對(duì)協(xié)程進(jìn)一步封裝,其中包含了任務(wù)的各種狀態(tài)
future:代表將來(lái)執(zhí)行或沒(méi)有執(zhí)行的任務(wù)結(jié)果。它和task上沒(méi)有本質(zhì)上的區(qū)別
async/await關(guān)鍵字:async定義一個(gè)協(xié)程,await用于掛起阻塞的異步調(diào)用接口,在python3.4是使用asyncio.coroutine/yield from
在設(shè)計(jì)模式中,生產(chǎn)消費(fèi)者模型占有非常重要的地位,這個(gè)模型在現(xiàn)實(shí)世界中也有很多有意思的對(duì)應(yīng)場(chǎng)景,比如做包子的人和吃包子的人,當(dāng)兩者速度不匹配時(shí),就需要有一個(gè)模型來(lái)做匹配(偶合),實(shí)現(xiàn)做的包子都會(huì)依次消費(fèi)掉。
import asyncio class ConsumerProducerModel: def __init__(self, producer, consumer, queue=asyncio.Queue(), plate_size=6): # the plate holds 6pcs bread self.queue = queue self.producer = producer self.consumer = consumer self.plate_size = plate_size async def produce_bread(self): for i in range(self.plate_size): bread = f"bread {i}" await asyncio.sleep(0.5) # bread makes faster, 0.5s/pc await self.queue.put(bread) print(f'{self.producer} makes {bread}') async def consume_bread(self): while True: bread = await self.queue.get() await asyncio.sleep(1) # eat slower, 1s/pc print(f'{self.consumer} eats {bread}') self.queue.task_done() async def main(): queue = asyncio.Queue() cp1 = ConsumerProducerModel("John", "Grace", queue) # group 1 cp2 = ConsumerProducerModel("Mike", "Lucy", queue) # group 2 producer_1 = cp1.produce_bread() producer_2 = cp2.produce_bread() consumer_1 = asyncio.ensure_future(cp1.consume_bread()) consumer_2 = asyncio.ensure_future(cp2.consume_bread()) await asyncio.gather(*[producer_1, producer_2]) await queue.join() consumer_1.cancel() consumer_2.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
生產(chǎn)消費(fèi)者模型可以使用多線程和隊(duì)列來(lái)實(shí)現(xiàn),這里選擇協(xié)程不僅是因?yàn)樾阅懿诲e(cuò),而且整個(gè)下來(lái)邏輯清晰:
1. 先定義初始化的東西,要有個(gè)隊(duì)列,要有生產(chǎn)者,要有消費(fèi)者,要有裝面包的盤(pán)子大??;
2. 生產(chǎn)者:根據(jù)盤(pán)子大小生產(chǎn)出對(duì)應(yīng)的東西(面包),將東西放入盤(pán)子(queue);
3. 消費(fèi)者:從盤(pán)子上取東西,每次取東西都是一個(gè)任務(wù),每次任務(wù)完成,就標(biāo)記為task_done(調(diào)用函數(shù))。在這個(gè)層面,一直循環(huán);
4. 主邏輯:實(shí)例化生產(chǎn)消費(fèi)者模型對(duì)象,創(chuàng)建生產(chǎn)者協(xié)程,創(chuàng)建任務(wù)(ensure_future),收集協(xié)程結(jié)果,等待所有線程結(jié)束(join),手動(dòng)取消兩個(gè)消費(fèi)者協(xié)程;
5. 運(yùn)行:首先創(chuàng)建事件循環(huán),然后進(jìn)入主邏輯,直到完成,關(guān)閉循環(huán)。
以上就是小編為大家?guī)?lái)的怎么在Python中利用 Asyncio模塊實(shí)現(xiàn)一個(gè)生產(chǎn)消費(fèi)者模型的全部?jī)?nèi)容了,希望大家多多支持億速云!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。