溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何實(shí)現(xiàn)Python重試超時(shí)裝飾器

發(fā)布時(shí)間:2023-05-04 09:51:16 來(lái)源:億速云 閱讀:97 作者:iii 欄目:開發(fā)技術(shù)

本文小編為大家詳細(xì)介紹“如何實(shí)現(xiàn)Python重試超時(shí)裝飾器”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“如何實(shí)現(xiàn)Python重試超時(shí)裝飾器”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。

    一、前言

    在寫業(yè)務(wù)代碼時(shí)候,有許多場(chǎng)景需要重試某塊業(yè)務(wù)邏輯,例如網(wǎng)絡(luò)請(qǐng)求、購(gòu)物下單等,希望發(fā)生異常的時(shí)候多重試幾次。

    二、簡(jiǎn)單分析

    一個(gè)重試裝飾器,最重要的就是發(fā)生意外異常處理失敗自動(dòng)重試,有如下幾點(diǎn)需要注意

    • 失敗不能一直重試,因?yàn)榭赡軙?huì)出現(xiàn)死循環(huán)浪費(fèi)資源,因此需要有 最大重試次數(shù) 或者 最大超時(shí)時(shí)間

    • 不能重試太頻繁,因?yàn)樘l繁容易導(dǎo)致重試次數(shù)很快用完,卻沒(méi)有成功響應(yīng),需要有 重試時(shí)間間隔 來(lái)限制,有時(shí)可以加大成功概率,例如網(wǎng)絡(luò)請(qǐng)求時(shí)有一段時(shí)間是堵塞的,或者對(duì)方服務(wù)負(fù)載太高導(dǎo)致一段時(shí)間無(wú)法響應(yīng)等。

    簡(jiǎn)單分析完,我們的重試裝飾器,就要支持可配置最大重試次數(shù)、最大超時(shí)時(shí)間、重試間隔,所以裝飾器就要設(shè)計(jì)成帶參數(shù)裝飾器。

    三、代碼模擬實(shí)現(xiàn)

    重試裝飾器-初版

    分析完畢后,看看第一版的裝飾器

    import time
    from functools import wraps
    def task_retry(max_retry_count: int = 5, time_interval: int = 2):
        """
        任務(wù)重試裝飾器
        Args:
            max_retry_count: 最大重試次數(shù) 默認(rèn)5次
            time_interval: 每次重試間隔 默認(rèn)2s
        """
        def _task_retry(task_func):
            @wraps(task_func)
            def wrapper(*args, **kwargs):
                # 函數(shù)循環(huán)重試
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    try:
                        task_result = task_func(*args, **kwargs)
                        return task_result
                    except Exception as e:
                        print(f"fail {str(e)}")
                        time.sleep(time_interval)
            return wrapper
        return _task_retry

    裝飾器內(nèi)部閉包,就簡(jiǎn)單通過(guò) for 循環(huán) 來(lái)執(zhí)行指定重試次數(shù),成功獲取結(jié)果就直接 return 返回,發(fā)生異常則睡眠配置重試間隔時(shí)間后繼續(xù)循環(huán)

    寫個(gè)例子來(lái)模擬測(cè)試下看看效果

    沒(méi)有異常正常執(zhí)行,在函數(shù)中模擬一個(gè)異常來(lái)進(jìn)行重試看看

    @task_retry(max_retry_count=3, time_interval=1)
    def user_place_order():
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    ret = user_place_order()
    print("user place order ret", ret)
    >>>out
    fail division by zero
    execute count 2
    fail division by zero
    execute count 3
    fail division by zero
    user place order ret None

    可以看到 user_place_order 函數(shù)執(zhí)行了三遍,都發(fā)生了除零異常,最后超過(guò)最大執(zhí)行次數(shù),返回了 None 值,我們可以在主邏輯中來(lái)判斷返回值是否為 None 來(lái)進(jìn)行超過(guò)最大重試次數(shù)失敗的業(yè)務(wù)邏輯處理

    ret = user_place_order()
    print("user place order ret", ret)
    
    if not ret:
        print("user place order failed")
        ...

    重試裝飾器-改進(jìn)版

    現(xiàn)在只能配置 最大重試次數(shù) 沒(méi)有最大超時(shí)時(shí)間,有時(shí)候我們想不但有重試,還得在規(guī)定時(shí)間內(nèi)完成,不想浪費(fèi)太多試錯(cuò)時(shí)間。所以增加一個(gè) 最大超時(shí)時(shí)間配置選項(xiàng)默認(rèn)為None,有值時(shí)超過(guò)最大超時(shí)時(shí)間退出重試。

    def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None):
        """
        任務(wù)重試裝飾器
        Args:
            max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
            time_interval: 每次重試間隔 默認(rèn) 2s
            max_timeout: 最大超時(shí)時(shí)間,單位s 默認(rèn)為 None,
        """
    
        def _task_retry(task_func):
    
            @wraps(task_func)
            def wrapper(*args, **kwargs):
                # 函數(shù)循環(huán)重試
                start_time = time.time()
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    use_time = time.time() - start_time
                    if max_timeout and use_time > max_timeout:
                        # 超出最大超時(shí)時(shí)間
                        print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                        return
    
                    try:
                        return task_func(*args, **kwargs)
                    except Exception as e:
                        print(f"fail {str(e)}")
                        time.sleep(time_interval)
    
            return wrapper
    
        return _task_retry

    看看效果

    # 超時(shí)
    @task_retry(max_retry_count=3, time_interval=1, max_timeout=2)
    def user_place_order():
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    >>>out
    execute count 1
    fail division by zero
    execute count 2
    fail division by zero
    execute count 3
    execute timeout, use time 2.010528802871704s, max timeout 2
    user place order ret None
    # 超過(guò)最大重試次數(shù)
    @task_retry(max_retry_count=3, time_interval=1)
    def user_place_order():
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    >>>out
    execute count 1
    fail division by zero
    execute count 2
    fail division by zero
    execute count 3
    fail division by zero
    user place order ret None
    # 正常
    @task_retry(max_retry_count=3, time_interval=1, max_timeout=2)
    def user_place_order():
        # a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    >>>out
    execute count 1
    user place order success
    user place order ret {'code': 0, 'msg': 'ok'}

    重試裝飾器-加強(qiáng)版

    到這重試裝飾器基本功能就實(shí)現(xiàn)了,但還可以加強(qiáng),Python現(xiàn)在支持 async 異步方式寫法,因此要是可以兼容異步寫法那就更好了。先看看裝飾異步函數(shù)會(huì)是什么樣的效果

    import time
    import asyncio
    import functools
    def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None):
        """
        任務(wù)重試裝飾器
        Args:
            max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
            time_interval: 每次重試間隔 默認(rèn) 2s
            max_timeout: 最大超時(shí)時(shí)間,單位s 默認(rèn)為 None,
        """
        def _task_retry(task_func):
            @wraps(task_func)
            def wrapper(*args, **kwargs):
                # 函數(shù)循環(huán)重試
                start_time = time.time()
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    use_time = time.time() - start_time
                    if max_timeout and use_time > max_timeout:
                        # 超出最大超時(shí)時(shí)間
                        print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                        return
                    try:
                        return task_func(*args, **kwargs)
                    except Exception as e:
                        print(f"fail {str(e)}")
                        time.sleep(time_interval)
            return wrapper
        return _task_retry
    @task_retry(max_retry_count=3, time_interval=1, max_timeout=2)
    def user_place_order():
        # a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    @task_retry(max_retry_count=3, time_interval=2, max_timeout=5)
    async def user_place_order_async():
        """異步函數(shù)重試案例"""
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    async def main():
        # 同步案例
        # ret = user_place_order()
        # print(f"user place order ret {ret}")
        # 異步案例
        ret = await user_place_order_async()
        print(f"user place order ret {ret}")
    if __name__ == '__main__':
        asyncio.run(main())
    # 正常時(shí)候
    execute count 1
    user place order success
    user place order ret {'code': 0, 'msg': 'ok'}
    # 異常時(shí)候
    >>>out
    execute count 1
    Traceback (most recent call last):
      File "G:/code/python/py-tools/decorator/base.py", line 138, in <module>
        asyncio.run(main())
      File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\runners.py", line 43, in run
        return loop.run_until_complete(main)
      File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\base_events.py", line 587, in run_until_complete
        return future.result()
      File "G:/code/python/py-tools/decorator/base.py", line 133, in main
        ret = await user_place_order_async()
      File "G:/code/python/py-tools/decorator/base.py", line 121, in user_place_order_async
        a = 1 / 0
    ZeroDivisionError: division by zero
    Process finished with exit code 1

    發(fā)現(xiàn)發(fā)生異常的時(shí)候并沒(méi)有重試,為什么呢?其實(shí)在執(zhí)行 task_func() 它并沒(méi)有真正的執(zhí)行內(nèi)部邏輯,而是返回一個(gè) coroutine 協(xié)程對(duì)象,并不會(huì)報(bào)異常,所以再裝飾器中執(zhí)行一遍就成功就出來(lái)了,外面 ret = await user_place_order_async(), 后才真正的等待執(zhí)行,然后執(zhí)行函數(shù)內(nèi)的邏輯再報(bào)異常就沒(méi)有捕獲到。我們可以打斷點(diǎn)驗(yàn)證下

    如何實(shí)現(xiàn)Python重試超時(shí)裝飾器

    這樣裝飾器就不支持異步函數(shù)的重試,需要加強(qiáng)它,可以使用 asyncio.iscoroutinefunction() 來(lái)進(jìn)行異步函數(shù)的判斷, 然后再加一個(gè)異步函數(shù)的閉包就可以實(shí)現(xiàn)異步、同步函數(shù)都兼容的重試裝飾器。

    def task_retry(max_retry_count: int = 5, time_interval: int = 2, max_timeout: int = None):
        """
        任務(wù)重試裝飾器
        Args:
            max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
            time_interval: 每次重試間隔 默認(rèn) 2s
            max_timeout: 最大超時(shí)時(shí)間,單位s 默認(rèn)為 None,
        """
    
        def _task_retry(task_func):
    
            @functools.wraps(task_func)
            def sync_wrapper(*args, **kwargs):
                # 同步循環(huán)重試
                start_time = time.time()
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    use_time = time.time() - start_time
                    if max_timeout and use_time &gt; max_timeout:
                        # 超出最大超時(shí)時(shí)間
                        print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                        return
    
                    try:
                        task_ret = task_func(*args, **kwargs)
                        return task_ret
                    except Exception as e:
                        print(f"fail {str(e)}")
                        time.sleep(time_interval)
    
            @functools.wraps(task_func)
            async def async_wrapper(*args, **kwargs):
                # 異步循環(huán)重試
                start_time = time.time()
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    use_time = time.time() - start_time
                    if max_timeout and use_time &gt; max_timeout:
                        # 超出最大超時(shí)時(shí)間
                        print(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
                        return
    
                    try:
                        return await task_func(*args, **kwargs)
                    except Exception as e:
                        print(f"fail {str(e)}")
                        await asyncio.sleep(time_interval)
    
            # 異步函數(shù)判斷
            wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper
            return wrapper_func
    
        return _task_retry

    注意時(shí)間等待 await asyncio.sleep(time_interval) 會(huì)導(dǎo)致函數(shù)掛起,程序不會(huì)在這里等待,而是去事件循環(huán)loop中執(zhí)行其他的已經(jīng)就緒的任務(wù),如果其他函數(shù)運(yùn)行時(shí)間太久了才切換回來(lái),會(huì)導(dǎo)致時(shí)間超時(shí),換成 time.sleep()的話其實(shí)也沒(méi)有用,如果函數(shù)內(nèi)部還有異步函數(shù)執(zhí)行還是會(huì)切換出去,因此異步的時(shí)候感覺(jué)超時(shí)參數(shù)意義不大。

    模擬測(cè)試下

    @task_retry(max_retry_count=5, time_interval=2, max_timeout=5)
    async def user_place_order_async():
        """異步函數(shù)重試案例"""
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    
    
    async def io_test():
        """模擬io阻塞"""
        print("io test start")
        time.sleep(3)
        print("io test end")
        return "io test end"
    
    
    async def main():
        # 同步案例
        # ret = user_place_order()
        # print(f"user place order ret {ret}")
    
        # 異步案例
        # ret = await user_place_order_async()
        # print(f"user place order ret {ret}")
    
        # 并發(fā)異步
        order_ret, io_ret = await asyncio.gather(
            user_place_order_async(),
            io_test(),
        )
        print(f"io ret {io_ret}")
        print(f"user place order ret {order_ret}")
    
    
    if __name__ == '__main__':
        asyncio.run(main())
        
        
     &gt;&gt;&gt;out
    execute count 1
    fail division by zero
    io test start
    io test end
    execute count 2
    fail division by zero
    execute count 3
    execute timeout, use time 5.015768527984619s, max timeout 5
    io ret io test end
    user place order ret None

    可以看出執(zhí)行一遍后自動(dòng)切換到了 io_test 中執(zhí)行由于 io test 中的 time.sleep(3) 會(huì)導(dǎo)致整個(gè)線程阻塞,一定要等到io_test執(zhí)行完后才會(huì)切換回去,然后再執(zhí)行兩遍就超時(shí)了,你可能會(huì)說(shuō)都用異步的庫(kù),是的異步的庫(kù)是可以加速,但我想表達(dá)就是這時(shí)候統(tǒng)計(jì)的耗時(shí)是整個(gè)程序的而不是單獨(dú)一個(gè)函數(shù)的。大家可以在評(píng)論區(qū)幫我想想有沒(méi)有其他的方法,要么就不要用這個(gè)超時(shí)參數(shù)。

    可以兼容異步函數(shù)、然后超時(shí)參數(shù)可以不配置,影響不大,O(&cap;_&cap;)O~

    重試裝飾器-最終版

    最終版就是利用拋異常的方式來(lái)結(jié)束超過(guò)最大重試次數(shù)、最大超時(shí),而不是直接返回None,然后再添加一個(gè)可配置捕獲指定異常的參數(shù),當(dāng)發(fā)生特定異常的時(shí)候才重試。

    import time
    import asyncio
    import functools
    from typing import Type
    
    
    class MaxRetryException(Exception):
        """最大重試次數(shù)異常"""
        pass
    
    
    class MaxTimeoutException(Exception):
        """最大超時(shí)異常"""
        pass
    
    
    def task_retry(
            max_retry_count: int = 5,
            time_interval: int = 2,
            max_timeout: int = None,
            catch_exc: Type[BaseException] = Exception
    ):
        """
        任務(wù)重試裝飾器
        Args:
            max_retry_count: 最大重試次數(shù) 默認(rèn) 5 次
            time_interval: 每次重試間隔 默認(rèn) 2s
            max_timeout: 最大超時(shí)時(shí)間,單位s 默認(rèn)為 None,
            catch_exc: 指定捕獲的異常類用于特定的異常重試 默認(rèn)捕獲 Exception
        """
    
        def _task_retry(task_func):
    
            @functools.wraps(task_func)
            def sync_wrapper(*args, **kwargs):
                # 函數(shù)循環(huán)重試
                start_time = time.time()
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    use_time = time.time() - start_time
                    if max_timeout and use_time &gt; max_timeout:
                        # 超出最大超時(shí)時(shí)間
                        raise MaxTimeoutException(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
    
                    try:
                        task_ret = task_func(*args, **kwargs)
                        return task_ret
                    except catch_exc as e:
                        print(f"fail {str(e)}")
                        time.sleep(time_interval)
                else:
                    # 超過(guò)最大重試次數(shù), 拋異常終止
                    raise MaxRetryException(f"超過(guò)最大重試次數(shù)失敗, max_retry_count {max_retry_count}")
    
            @functools.wraps(task_func)
            async def async_wrapper(*args, **kwargs):
                # 異步循環(huán)重試
                start_time = time.time()
                for retry_count in range(max_retry_count):
                    print(f"execute count {retry_count + 1}")
                    use_time = time.time() - start_time
                    if max_timeout and use_time &gt; max_timeout:
                        # 超出最大超時(shí)時(shí)間
                        raise MaxTimeoutException(f"execute timeout, use time {use_time}s, max timeout {max_timeout}")
    
                    try:
                        return await task_func(*args, **kwargs)
                    except catch_exc as e:
                        print(f"fail {str(e)}")
                        await asyncio.sleep(time_interval)
                else:
                    # 超過(guò)最大重試次數(shù), 拋異常終止
                    raise MaxRetryException(f"超過(guò)最大重試次數(shù)失敗, max_retry_count {max_retry_count}")
    
            # 異步函數(shù)判斷
            wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper
            return wrapper_func
    
        return _task_retry
    
    
    @task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError,max_timeout=5)
    def user_place_order():
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    
    
    @task_retry(max_retry_count=5, time_interval=2, max_timeout=5)
    async def user_place_order_async():
        """異步函數(shù)重試案例"""
        a = 1 / 0
        print("user place order success")
        return {"code": 0, "msg": "ok"}
    
    
    async def io_test():
        """模擬io阻塞"""
        print("io test start")
        time.sleep(3)
        print("io test end")
        return "io test end"
    
    
    async def main():
        # 同步案例
        try:
            ret = user_place_order()
            print(f"user place order ret {ret}")
        except MaxRetryException as e:
            # 超過(guò)最大重試次數(shù)處理
            print("MaxRetryException", e)
        except MaxTimeoutException as e:
            # 超過(guò)最大超時(shí)處理
            print("MaxTimeoutException", e)
    
        # 異步案例
        # ret = await user_place_order_async()
        # print(f"user place order ret {ret}")
    
        # 并發(fā)異步
        # order_ret, io_ret = await asyncio.gather(
        #     user_place_order_async(),
        #     io_test(),
        # )
        # print(f"io ret {io_ret}")
        # print(f"user place order ret {order_ret}")
    
    
    if __name__ == '__main__':
        asyncio.run(main())

    測(cè)試捕獲指定異常

    # 指定捕獲除零錯(cuò)誤,正常捕獲重試
    @task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError)
    def user_place_order():
        a = 1 / 0
        # a = []
        # b = a[0]
        print("user place order success")
        return {"code": 0, "msg": "ok"}
     
    
    # out
    execute count 1
    fail division by zero
    execute count 2
    fail division by zero
    execute count 3
    fail division by zero
    MaxRetryException 超過(guò)最大重試次數(shù)失敗, max_retry_count 3
    
    
    # 指定捕獲除零錯(cuò)誤,報(bào)索引越界錯(cuò)誤,未正常捕獲重試,直接退出
    @task_retry(max_retry_count=3, time_interval=1, catch_exc=ZeroDivisionError)
    def user_place_order():
        # a = 1 / 0
        a = []
        b = a[0]
        print("user place order success")
        return {"code": 0, "msg": "ok"}
     
    
    # out
    Traceback (most recent call last):
      File "G:/code/python/py-tools/decorator/base.py", line 184, in &lt;module&gt;
        asyncio.run(main())
      File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\runners.py", line 43, in run
        return loop.run_until_complete(main)
      File "G:\softs\DevEnv\python-3.7.9\lib\asyncio\base_events.py", line 587, in run_until_complete
        return future.result()
      File "G:/code/python/py-tools/decorator/base.py", line 161, in main
        ret = user_place_order()
      File "G:/code/python/py-tools/decorator/base.py", line 97, in sync_wrapper
        task_ret = task_func(*args, **kwargs)
      File "G:/code/python/py-tools/decorator/base.py", line 137, in user_place_order
        b = a[0]
    IndexError: list index out of range
    
    Process finished with exit code 1

    修改記錄

    • 把重試?yán)锏某瑫r(shí)計(jì)算單獨(dú)抽離出來(lái),這樣功能不會(huì)太藕合,分兩個(gè)裝飾實(shí)現(xiàn)

    def set_timeout(timeout: int, use_signal=False):
        """
        超時(shí)處理裝飾器
        Args:
            timeout: 超時(shí)時(shí)間,單位秒
            use_signal: 使用信號(hào)量機(jī)制只能在 unix內(nèi)核上使用,默認(rèn)False
        Raises:
            TimeoutException
        """
        def _timeout(func: Callable):
            def _handle_timeout(signum, frame):
                raise MaxTimeoutException(f"Function timed out after {timeout} seconds")
            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                # 同步函數(shù)處理超時(shí)
                if use_signal:
                    # 使用信號(hào)量計(jì)算超時(shí)
                    signal.signal(signal.SIGALRM, _handle_timeout)
                    signal.alarm(timeout)
                    try:
                        return func(*args, **kwargs)
                    finally:
                        signal.alarm(0)
                else:
                    # 使用線程
                    with ThreadPoolExecutor() as executor:
                        future = executor.submit(func, *args, **kwargs)
                        try:
                            return future.result(timeout)
                        except TimeoutError:
                            raise MaxTimeoutException(f"Function timed out after {timeout} seconds")
            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                # 異步函數(shù)處理超時(shí)
                try:
                    ret = await asyncio.wait_for(func(*args, **kwargs), timeout)
                    return ret
                except asyncio.TimeoutError:
                    raise MaxTimeoutException(f"Function timed out after {timeout} seconds")
            return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
        return _timeout
    def retry(
            max_count: int = 5,
            interval: int = 2,
            catch_exc: Type[BaseException] = Exception
    ):
        """
        重試裝飾器
        Args:
            max_count: 最大重試次數(shù) 默認(rèn) 5 次
            interval: 每次異常重試間隔 默認(rèn) 2s
            catch_exc: 指定捕獲的異常類用于特定的異常重試 默認(rèn)捕獲 Exception
        Raises:
            MaxRetryException
        """
        def _retry(task_func):
            @functools.wraps(task_func)
            def sync_wrapper(*args, **kwargs):
                # 函數(shù)循環(huán)重試
                for retry_count in range(max_count):
                    logger.info(f"{task_func} execute count {retry_count + 1}")
                    try:
                        return task_func(*args, **kwargs)
                    except catch_exc:
                        logger.error(f"fail {traceback.print_exc()}")
                        if retry_count < max_count - 1:
                            # 最后一次異常不等待
                            time.sleep(interval)
                # 超過(guò)最大重試次數(shù), 拋異常終止
                raise MaxRetryException(f"超過(guò)最大重試次數(shù)失敗, max_retry_count {max_count}")
            @functools.wraps(task_func)
            async def async_wrapper(*args, **kwargs):
                # 異步循環(huán)重試
                for retry_count in range(max_count):
                    logger.info(f"{task_func} execute count {retry_count + 1}")
                    try:
                        return await task_func(*args, **kwargs)
                    except catch_exc as e:
                        logger.error(f"fail {str(e)}")
                        if retry_count < max_count - 1:
                            await asyncio.sleep(interval)
                # 超過(guò)最大重試次數(shù), 拋異常終止
                raise MaxRetryException(f"超過(guò)最大重試次數(shù)失敗, max_retry_count {max_count}")
            # 異步函數(shù)判斷
            wrapper_func = async_wrapper if asyncio.iscoroutinefunction(task_func) else sync_wrapper
            return wrapper_func
        return _retry

    讀到這里,這篇“如何實(shí)現(xiàn)Python重試超時(shí)裝飾器”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI