溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Python中asyncio與aiohttp入門(mén)教程

發(fā)布時(shí)間:2020-09-29 15:41:21 來(lái)源:腳本之家 閱讀:372 作者:數(shù)據(jù)架構(gòu)師 欄目:開(kāi)發(fā)技術(shù)

很多朋友對(duì)異步編程都處于“聽(tīng)說(shuō)很強(qiáng)大”的認(rèn)知狀態(tài)。鮮有在生產(chǎn)項(xiàng)目中使用它。而使用它的同學(xué),則大多數(shù)都停留在知道如何使用 Tornado、Twisted、Gevent 這類(lèi)異步框架上,出現(xiàn)各種古怪的問(wèn)題難以解決。而且使用了異步框架的部分同學(xué),由于用法不對(duì),感覺(jué)它并沒(méi)牛逼到哪里去,所以很多同學(xué)做 Web 后端服務(wù)時(shí)還是采用 Flask、Django等傳統(tǒng)的非異步框架。

從上兩屆 PyCon 技術(shù)大會(huì)看來(lái),異步編程已經(jīng)成了 Python 生態(tài)下一階段的主旋律。如新興的 Go、Rust、Elixir 等編程語(yǔ)言都將其支持異步和高并發(fā)作為主要“賣(mài)點(diǎn)”,技術(shù)變化趨勢(shì)如此。Python 生態(tài)為不落人后,從2013年起由 Python 之父 Guido 親自操刀主持了Tulip(asyncio)項(xiàng)目的開(kāi)發(fā)。

異步io的好處在于避免的線(xiàn)程的開(kāi)銷(xiāo)和切換,而且我們都知道python其實(shí)是沒(méi)有多線(xiàn)程的,只是通過(guò)底層線(xiàn)層鎖實(shí)現(xiàn)的多線(xiàn)程。另一個(gè)好處在于避免io操作(包含網(wǎng)絡(luò)傳輸)的堵塞時(shí)間。

asyncio可以實(shí)現(xiàn)單線(xiàn)程并發(fā)IO操作。如果僅用在客戶(hù)端,發(fā)揮的威力不大。如果把a(bǔ)syncio用在服務(wù)器端,例如Web服務(wù)器,由于HTTP連接就是IO操作,因此可以用單線(xiàn)程+coroutine實(shí)現(xiàn)多用戶(hù)的高并發(fā)支持。

asyncio實(shí)現(xiàn)了TCP、UDP、SSL等協(xié)議,aiohttp則是基于asyncio實(shí)現(xiàn)的HTTP框架。

  • 對(duì)于異步io你需要知道的重點(diǎn),要注意的是,await語(yǔ)法只能出現(xiàn)在通過(guò)async修飾的函數(shù)中,否則會(huì)報(bào)SyntaxError錯(cuò)誤。而且await后面的對(duì)象需要是一個(gè)Awaitable,或者實(shí)現(xiàn)了相關(guān)的協(xié)議。

注意:

  • 所有需要異步執(zhí)行的函數(shù),都需要asyncio中的輪訓(xùn)器去輪訓(xùn)執(zhí)行,如果函數(shù)阻塞,輪訓(xùn)器就會(huì)去執(zhí)行下一個(gè)函數(shù)。所以所有需要異步執(zhí)行的函數(shù)都需要加入到這個(gè)輪訓(xùn)器中。

asyncio

asyncio的基本概念asyncio是在python3.4中被引進(jìn)的異步IO庫(kù)。你也可以通過(guò)python3.3的pypi來(lái)安裝它。它相當(dāng)?shù)膹?fù)雜,而且我不會(huì)介紹太多的細(xì)節(jié)。相反,我將會(huì)解釋你需要知道些什么,以利用它來(lái)寫(xiě)異步的代碼。簡(jiǎn)而言之,有兩件事情你需要知道:協(xié)同程序和事件循環(huán)。協(xié)同程序像是方法,但是它們可以在代碼中的特定點(diǎn)暫停和繼續(xù)。當(dāng)在等待一個(gè)IO(比如一個(gè)HTTP請(qǐng)求),同時(shí)執(zhí)行另一個(gè)請(qǐng)求的時(shí)候,可以用來(lái)暫停一個(gè)協(xié)同程序。

例如:

import requests
import time
import asyncio
# 創(chuàng)建一個(gè)異步函數(shù)
async def task_func():
  await asyncio.sleep(1)
  resp = requests.get('http://192.168.2.177:5002/')
  print('2222222',time.time(),resp.text)
async def main(loop):
  loop=asyncio.get_event_loop()  # 獲取全局輪訓(xùn)器
  task = loop.create_task(task_func()) # 在全局輪訓(xùn)器加入?yún)f(xié)成,只有加入全局輪訓(xùn)器才能被監(jiān)督執(zhí)行
  await asyncio.sleep(2)  # 等待兩秒為了不要立即執(zhí)行event_loop.close(),項(xiàng)目中event_loop應(yīng)該是永不停歇的
  print('11111111111',time.time())
event_loop = asyncio.get_event_loop()
try:
  event_loop.run_until_complete(main(event_loop))
finally:
  event_loop.close()  # 當(dāng)輪訓(xùn)器關(guān)閉以后,所有沒(méi)有執(zhí)行完成的協(xié)成將全部關(guān)閉

aiohttp服務(wù)器

下面是aiohttp作為服務(wù)器端的一個(gè)簡(jiǎn)單的demo。

#!/usr/bin/env python3
import argparse
from aiohttp import web
import asyncio
import base64
import logging
import uvloop
import time,datetime
import json
import requests
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
routes = web.RouteTableDef()
@routes.get('/')
async def hello(request):
  return web.Response(text="Hello, world")
# 定義一個(gè)路由映射,接收網(wǎng)址參數(shù),post方式
@routes.post('/demo1/{name}')
async def demo1(request):  # 異步監(jiān)聽(tīng),只要一有握手就開(kāi)始觸發(fā),此時(shí)網(wǎng)址參數(shù)中的name就已經(jīng)知道了,但是前端可能還沒(méi)有完全post完數(shù)據(jù)。
  name = request.match_info.get('name', "Anonymous") # 獲取name
  print(datetime.datetime.now())  # 觸發(fā)視圖函數(shù)的時(shí)間
  data = await request.post()  # 等待post數(shù)據(jù)完成接收,只有接收完成才能進(jìn)行后續(xù)操作.data['key']獲取參數(shù)
  print(datetime.datetime.now())  # 接收post數(shù)據(jù)完成的時(shí)間
  logging.info('safety dect request start %s' % datetime.datetime.now())
  result = {'name':name,'key':data['key']}
  logging.info('safety dect request finish %s, %s' % (datetime.datetime.now(),json.dumps(result)))
  return web.json_response(result)
# 定義一個(gè)路由映射,設(shè)計(jì)到io操作
@routes.post('/demo2')
async def demo2(request):  # 異步監(jiān)聽(tīng),只要一有握手就開(kāi)始觸發(fā),此時(shí)網(wǎng)址參數(shù)中的name就已經(jīng)知道了,但是前端可能還沒(méi)有完全post完數(shù)據(jù)。
  data = await request.post()  # 等待post數(shù)據(jù)完成接收,只有接收完成才能進(jìn)行后續(xù)操作.data['key']獲取參數(shù)
  logging.info('safety dect request start %s' % datetime.datetime.now())
  res = requests.post('http://www.baidu.com')  # 網(wǎng)路id,會(huì)自動(dòng)切換到其他協(xié)成上
  logging.info('safety dect request finish %s' % res.test)
  return web.Response(text="welcome")
if __name__ == '__main__':
  logging.info('server start')
  app = web.Application()
  app.add_routes(routes)
  web.run_app(app,host='0.0.0.0',port=8080)
  logging.info('server close')

aiohttp客戶(hù)端

aiohttp的另一個(gè)主要作用是作為異步客戶(hù)端,用來(lái)解決高并發(fā)請(qǐng)求的情況。比如現(xiàn)在我要模擬一個(gè)高并發(fā)請(qǐng)求來(lái)測(cè)試我的服務(wù)器負(fù)載情況。所以需要在python里模擬高并發(fā)。高并發(fā)可以有多種方式,比如多線(xiàn)程,但是由于python本質(zhì)上是沒(méi)有多線(xiàn)程的,通過(guò)底層線(xiàn)程鎖實(shí)現(xiàn)的多線(xiàn)程。在模型高并發(fā)時(shí),具有線(xiàn)程切換和線(xiàn)程開(kāi)銷(xiāo)的損耗。所以我們就可以使用多協(xié)成來(lái)實(shí)現(xiàn)高并發(fā)。

我們就可以使用aiohttp來(lái)模擬高并發(fā)客戶(hù)端。demo如下,用來(lái)模擬多個(gè)客戶(hù)端向指定服務(wù)器post圖片。

# 異步并發(fā)客戶(hù)端
class Asyncio_Client(object):
  def __init__(self):
    self.loop=asyncio.get_event_loop()
    self.tasks=[]
  # 將異步函數(shù)介入任務(wù)列表。后續(xù)參數(shù)直接傳給異步函數(shù)
  def set_task(self,task_fun,num,*args):
    for i in range(num):
      self.tasks.append(task_fun(*args))
  # 運(yùn)行,獲取返回結(jié)果
  def run(self):
    back=[]
    try:
      f = asyncio.wait(self.tasks)  # 創(chuàng)建future
      self.loop.run_until_complete(f) # 等待future完成
    finally:
      pass
# 服務(wù)器高并發(fā)壓力測(cè)試
class Test_Load():
  total_time=0 # 總耗時(shí)
  total_payload=0 # 總負(fù)載
  total_num=0 # 總并發(fā)數(shù)
  all_time=[]
  # 創(chuàng)建一個(gè)異步任務(wù),本地測(cè)試,所以post和接收幾乎不損耗時(shí)間,可以等待完成,主要耗時(shí)為算法模塊
  async def task_func1(self,session):
    begin = time.time()
    # print('開(kāi)始發(fā)送:', begin)
    file=open(self.image, 'rb')
    fsize = os.path.getsize(self.image)
    self.total_payload+=fsize/(1024*1024)
    data = {"image_id": "2", 'image':file}
    r = await session.post(self.url,data=data) #只post,不接收
    result = await r.json()
    self.total_num+=1
    # print(result)
    end = time.time()
    # print('接收完成:', end,',index=',self.total_num)
    self.all_time.append(end-begin)
  # 負(fù)載測(cè)試
  def test_safety(self):
    print('test begin')
    async_client = Asyncio_Client() # 創(chuàng)建客戶(hù)端
    session = aiohttp.ClientSession()
    for i in range(10): # 執(zhí)行10次
      self.all_time=[]
      self.total_num=0
      self.total_payload=0
      self.image = 'xxxx.jpg' # 設(shè)置測(cè)試nayizhang
      print('測(cè)試圖片:', self.image)
      begin = time.time()
      async_client.set_task(self.task_func1,self.num,session) # 設(shè)置并發(fā)任務(wù)
      async_client.run()  # 執(zhí)行任務(wù)
      end=time.time()
      self.all_time.sort(reverse=True)
      print(self.all_time)
      print('并發(fā)數(shù)量(個(gè)):',self.total_num)
      print('總耗時(shí)(s):',end-begin)
      print('最大時(shí)延(s):',self.all_time[0])
      print('最小時(shí)延(s):', self.all_time[len(self.all_time)-1])
      print('top-90%時(shí)延(s):', self.all_time[int(len(self.all_time)*0.1)])
      print('平均耗時(shí)(s/個(gè)):',sum(self.all_time)/self.total_num)
      print('支持并發(fā)率(個(gè)/s):',self.total_num/(end-begin))
      print('總負(fù)載(MB):',self.total_payload)
      print('吞吐率(MB/S):',self.total_payload/(end-begin))  # 吞吐率受上行下行帶寬,服務(wù)器帶寬,服務(wù)器算法性能諸多影響
      time.sleep(3)
    session.close()
    print('test finish')

aiohttp服務(wù)器mvc(靜態(tài)網(wǎng)頁(yè),模板,數(shù)據(jù)庫(kù),log)

aiohttp之添加靜態(tài)資源路徑

所謂靜態(tài)資源,是指圖片、js、css等文件。

以一個(gè)小項(xiàng)目來(lái)說(shuō)明,下面是項(xiàng)目的目錄結(jié)構(gòu):

.
├── static
│  ├── css
│  │  ├── base.css
│  │  ├── bootstrap.min.css
│  │  └── font-awesome.min.css
│  ├── font
│  │  ├── FontAwesome.otf
│  │  ├── fontawesome-webfont.eot
│  │  ├── fontawesome-webfont.svg
│  │  ├── fontawesome-webfont.ttf
│  │  └── fontawesome-webfont.woff
│  └── index.html
└── proxy_server.py

在proxy_server.py給2個(gè)靜態(tài)文件目錄static/css和static/font添加路由:

app.router.add_static('/css/',
            path='static/css',
            name='css')
app.router.add_static('/font/',
            path='static/font',
            name='font')

必需的2個(gè)參數(shù):

prefix:是靜態(tài)文件的url的前綴,以/開(kāi)始,在瀏覽器地址欄上顯示在網(wǎng)站host之后,也用于index.html靜態(tài)頁(yè)面進(jìn)行引用
path:靜態(tài)文件目錄的路徑,可以是相對(duì)路徑,上面代碼使用的static/css就是相對(duì)路徑——相對(duì)于proxy_server.py所在路徑。

Python中asyncio與aiohttp入門(mén)教程

加載的是index.html,下面是它引用靜態(tài)資源的代碼:

<!-- Bootstrap CSS -->
<link href="css/bootstrap.min.css" rel="external nofollow" rel="stylesheet">
<!-- Base CSS -->
<link href="css/base.css" rel="external nofollow" rel="stylesheet">
<!-- FA CSS -->
<link href="css/font-awesome.min.css" rel="external nofollow" rel="stylesheet">

添加font的路徑是因?yàn)?font-awesome.min.css需要使用:

如果修改前綴:

 app.router.add_static('/css2017/',
            path='static/css',
            name='css')

雖然目錄本身還是css,但通過(guò)add_static已經(jīng)將它視為了css2017,在文件和瀏覽器中要想鏈接到css下的文件,必須使用css2017/xx.css來(lái)鏈接。

此外,如果加上show_index=True,就可以顯示靜態(tài)資源的目錄索引了——默認(rèn)是禁止訪問(wèn)的:

app.router.add_static('/css2017/',
            path='static/css',
            name='css',
            show_index=True)

Python中asyncio與aiohttp入門(mén)教程

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)億速云的支持。如果你想了解更多相關(guān)內(nèi)容請(qǐng)查看下面相關(guān)鏈接

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI