溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

使用python怎么實(shí)現(xiàn)一個(gè)重試裝飾器

發(fā)布時(shí)間:2021-04-20 17:28:34 來源:億速云 閱讀:273 作者:Leah 欄目:開發(fā)技術(shù)

今天就跟大家聊聊有關(guān)使用python怎么實(shí)現(xiàn)一個(gè)重試裝飾器,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

python是什么意思

Python是一種跨平臺(tái)的、具有解釋性、編譯性、互動(dòng)性和面向?qū)ο蟮哪_本語言,其最初的設(shè)計(jì)是用于編寫自動(dòng)化腳本,隨著版本的不斷更新和新功能的添加,常用于用于開發(fā)獨(dú)立的項(xiàng)目和大型項(xiàng)目。

import time
from functools import wraps

__author__ = 'blackmatrix'


"""
在函數(shù)執(zhí)行出現(xiàn)異常時(shí)自動(dòng)重試的簡單裝飾器
"""


class StopRetry(Exception):

 def __repr__(self):
  return 'retry stop'


def retry(max_retries: int =5, delay: (int, float) =0, step: (int, float) =0,
   exceptions: (BaseException, tuple, list) =BaseException,
   sleep=time.sleep, callback=None, validate=None):
 """
 函數(shù)執(zhí)行出現(xiàn)異常時(shí)自動(dòng)重試的簡單裝飾器。
 :param max_retries: 最多重試次數(shù)。
 :param delay: 每次重試的延遲,單位秒。
 :param step: 每次重試后延遲遞增,單位秒。
 :param exceptions: 觸發(fā)重試的異常類型,單個(gè)異常直接傳入異常類型,多個(gè)異常以tuple或list傳入。
 :param sleep: 實(shí)現(xiàn)延遲的方法,默認(rèn)為time.sleep。
 在一些異步框架,如tornado中,使用time.sleep會(huì)導(dǎo)致阻塞,可以傳入自定義的方法來實(shí)現(xiàn)延遲。
 自定義方法函數(shù)簽名應(yīng)與time.sleep相同,接收一個(gè)參數(shù),為延遲執(zhí)行的時(shí)間。
 :param callback: 回調(diào)函數(shù),函數(shù)簽名應(yīng)接收一個(gè)參數(shù),每次出現(xiàn)異常時(shí),會(huì)將異常對(duì)象傳入。
 可用于記錄異常日志,中斷重試等。
 如回調(diào)函數(shù)正常執(zhí)行,并返回True,則表示告知重試裝飾器異常已經(jīng)處理,重試裝飾器終止重試,并且不會(huì)拋出任何異常。
 如回調(diào)函數(shù)正常執(zhí)行,沒有返回值或返回除True以外的結(jié)果,則繼續(xù)重試。
 如回調(diào)函數(shù)拋出異常,則終止重試,并將回調(diào)函數(shù)的異常拋出。
 :param validate: 驗(yàn)證函數(shù),用于驗(yàn)證執(zhí)行結(jié)果,并確認(rèn)是否繼續(xù)重試。
 函數(shù)簽名應(yīng)接收一個(gè)參數(shù),每次被裝飾的函數(shù)完成且未拋出任何異常時(shí),調(diào)用驗(yàn)證函數(shù),將執(zhí)行的結(jié)果傳入。
 如驗(yàn)證函數(shù)正常執(zhí)行,且返回False,則繼續(xù)重試,即使被裝飾的函數(shù)完成且未拋出任何異常。
 如回調(diào)函數(shù)正常執(zhí)行,沒有返回值或返回除False以外的結(jié)果,則終止重試,并將函數(shù)執(zhí)行結(jié)果返回。
 如驗(yàn)證函數(shù)拋出異常,且異常屬于被重試裝飾器捕獲的類型,則繼續(xù)重試。
 如驗(yàn)證函數(shù)拋出異常,且異常不屬于被重試裝飾器捕獲的類型,則將驗(yàn)證函數(shù)的異常拋出。
 :return: 被裝飾函數(shù)的執(zhí)行結(jié)果。
 """
 def wrapper(func):
  @wraps(func)
  def _wrapper(*args, **kwargs):
   nonlocal delay, step, max_retries
   func_ex = StopRetry
   while max_retries > 0:
    try:
     result = func(*args, **kwargs)
     # 驗(yàn)證函數(shù)返回False時(shí),表示告知裝飾器驗(yàn)證不通過,繼續(xù)重試
     if callable(validate) and validate(result) is False:
      continue
     else:
      return result
    except exceptions as ex:
     # 回調(diào)函數(shù)返回True時(shí),表示告知裝飾器異常已經(jīng)處理,終止重試
     if callable(callback) and callback(ex) is True:
      return
     func_ex = ex
    finally:
     max_retries -= 1
     if delay > 0 or step > 0:
      sleep(delay)
      delay += step
   else:
    raise func_ex
  return _wrapper
 return wrapper


if __name__ == '__main__':
 pass

看完上述內(nèi)容,你們對(duì)使用python怎么實(shí)現(xiàn)一個(gè)重試裝飾器有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI