溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Python探針怎么完成調(diào)用庫的數(shù)據(jù)提取

發(fā)布時間:2022-05-11 13:43:20 來源:億速云 閱讀:150 作者:iii 欄目:開發(fā)技術(shù)

今天小編給大家分享一下Python探針怎么完成調(diào)用庫的數(shù)據(jù)提取的相關(guān)知識點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

1.簡單粗暴的方法--對mysql庫進(jìn)行封裝

要統(tǒng)計(jì)一個執(zhí)行過程, 就需要知道這個執(zhí)行過程的開始位置和結(jié)束位置, 所以最簡單粗暴的方法就是基于要調(diào)用的方法進(jìn)行封裝,在框架調(diào)用MySQL庫和MySQL庫中間實(shí)現(xiàn)一個中間層, 在中間層完成耗時統(tǒng)計(jì),如:

# 偽代碼
def my_execute(conn, sql, param):
 # 針對MySql庫的統(tǒng)計(jì)封裝組件
 with MyTracer(conn, sql, param):
     # 以下為正常使用MySql庫的代碼
with conn.cursor as cursor:
 cursor.execute(sql, param)
...

看樣子實(shí)現(xiàn)起來非常不錯, 而且更改非常方便, 但由于是在最頂層的API上進(jìn)行修改, 其實(shí)是非常不靈活的, 同時在cursor.execute里會進(jìn)行一些預(yù)操作, 如把sql和param進(jìn)行拼接, 調(diào)用nextset清除當(dāng)前游標(biāo)的數(shù)據(jù)等等。我們最后拿到的數(shù)據(jù)如時間耗時也是不準(zhǔn)確的, 同時也沒辦法得到一些詳細(xì)的元數(shù)據(jù), 如錯誤碼等等.

如果要拿到最直接有用的數(shù)據(jù),就只能去改源代碼, 然后再調(diào)用源代碼了, 但是如果每個庫都需要改源代碼才能統(tǒng)計(jì), 那也太麻煩了, 好在Python也提供了一些類似探針的接口, 可以通過探針把庫的源碼進(jìn)行替換完成我們的代碼.

2.Python的探針

在Python中可以通過sys.meta_path來實(shí)現(xiàn)import hook的功能, 當(dāng)執(zhí)行 import 相關(guān)操作時, 會根據(jù)sys.meta_path定義的對象對import相關(guān)庫進(jìn)行更改.sys.meta_path中的對象需要實(shí)現(xiàn)一個find_module方法, 這個find_module方法返回None或一個實(shí)現(xiàn)了load_module方法的對象, 我們可以通過這個對象, 針對一些庫在import時, 把相關(guān)的方法進(jìn)行替換, 簡單用法如下,通過hooktime.sleep讓他在sleep的時候能打印消耗的時間.

import importlib
import sys
from functools import wraps
def func_wrapper(func):
    """這里通過一個裝飾器來達(dá)到貍貓換太子和獲取數(shù)據(jù)的效果"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 記錄開始時間
        start = time.time()
        result = func(*args, **kwargs)
        # 統(tǒng)計(jì)消耗時間
        end = time.time()
        print(f"speed time:{end - start}")
        return result
    return wrapper
class MetaPathFinder:
    def find_module(self, fullname, path=None):
        # 執(zhí)行時可以看出來在import哪些模塊
        print(f'find module:{path}:{fullname}')
        return MetaPathLoader()
class MetaPathLoader:
    def load_module(self, fullname):
        # import的模塊都會存放在sys.modules里面, 通過判斷可以減少重復(fù)import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止遞歸調(diào)用
        finder = sys.meta_path.pop(0)
        # 導(dǎo)入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替換函數(shù)
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module
sys.meta_path.insert(0, MetaPathFinder())
if __name__ == '__main__':
    import time
    time.sleep(1)
# 輸出示例:
# find module:datetime
# find module:time
# load module:time
# find module:math
# find module:_datetime
# speed time:1.00073385238647468

3.制作探針模塊

了解完了主要流程后, 可以開始制作自己的探針模塊了, 由于示例只涉及到aiomysql模塊, 那么在MetaPathFinder.find_module中需要只對aiomysql模塊進(jìn)行處理, 其他的先忽略. 然后我們需要確定我們要把a(bǔ)iomysql的哪個功能給替換, 從業(yè)務(wù)上來說, 一般情況下我們只要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany這幾個主要的操作,所以需要深入cursor看看如何去更改代碼, 后者重載哪個函數(shù).

先cursor.execute的源碼(cursor.executemanay也類似), 發(fā)現(xiàn)會先調(diào)用self.nextset的方法, 把上個請求的數(shù)據(jù)先拿完, 再合并sql語句, 最后通過self._query進(jìn)行查詢:

async def execute(self, query, args=None):
    """Executes the given operation
    Executes the given operation substituting any markers with
    the given parameters.
    For example, getting all rows where id is 5:
        cursor.execute("SELECT * FROM t1 WHERE id = %s", (5,))
    :param query: ``str`` sql statement
    :param args: ``tuple`` or ``list`` of arguments for sql query
    :returns: ``int``, number of rows that has been produced of affected
    """
    conn = self._get_db()

    while (await self.nextset()):
        pass

    if args is not None:
        query = query % self._escape_args(args, conn)

    await self._query(query)
    self._executed = query
    if self._echo:
        logger.info(query)
        logger.info("%r", args)
    return self._rowcount

再看cursor.fetchone的源碼(cursor.fetchall也類似), 發(fā)現(xiàn)其實(shí)是從緩存中獲取數(shù)據(jù),

這些數(shù)據(jù)在執(zhí)行cursor.execute中就已經(jīng)獲取了:

def fetchone(self):
    """Fetch the next row """
    self._check_executed()
    fut = self._loop.create_future()
    if self._rows is None or self._rownumber >= len(self._rows):
        fut.set_result(None)
        return fut
    result = self._rows[self._rownumber]
    self._rownumber += 1
    fut = self._loop.create_future()
    fut.set_result(result)
    return fut

綜合上面的分析, 我們只要對核心的方法self._query進(jìn)行重載即可拿到我們要的數(shù)據(jù), 從源碼中我們可以知道, 我們能獲取到傳入self._query的self和sql參數(shù), 根據(jù)self又能獲取到查詢的結(jié)果, 同時我們通過裝飾器能獲取到運(yùn)行的時間, 要的數(shù)據(jù)基本都到齊了,

按照思路修改后的代碼如下:

import importlib
import time
import sys
from functools import wraps
from typing import cast, Any, Callable, Optional, Tuple, TYPE_CHECKING
from types import ModuleType
if TYPE_CHECKING:
    import aiomysql
def func_wrapper(func: Callable):
    @wraps(func)
    async def wrapper(*args, **kwargs) -> Any:
        start: float = time.time()
        func_result: Any = await func(*args, **kwargs)
        end: float = time.time()
        # 根據(jù)_query可以知道, 第一格參數(shù)是self, 第二個參數(shù)是sql
        self: aiomysql.Cursor = args[0]
        sql: str = args[1]
        # 通過self,我們可以拿到其他的數(shù)據(jù)
        db: str = self._connection.db
        user: str = self._connection.user
        host: str = self._connection.host
        port: str = self._connection.port
        execute_result: Tuple[Tuple] = self._rows
        # 可以根據(jù)自己定義的agent把數(shù)據(jù)發(fā)送到指定的平臺, 然后我們就可以在平臺上看到對應(yīng)的數(shù)據(jù)或進(jìn)行監(jiān)控了, 
        # 這里只是打印一部分?jǐn)?shù)據(jù)出來
        print({
            "sql": sql,
            "db": db,
            "user": user,
            "host": host,
            "port": port,
            "result": execute_result,
            "speed time": end - start
        })
        return func_result
    return cast(Callable, wrapper)
class MetaPathFinder:

    @staticmethod
    def find_module(fullname: str, path: Optional[str] = None) -> Optional["MetaPathLoader"]:
        if fullname == 'aiomysql':
            # 只有aiomysql才進(jìn)行hook
            return MetaPathLoader()
        else:
            return None
class MetaPathLoader:
    @staticmethod
    def load_module(fullname: str):
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止遞歸調(diào)用
        finder: "MetaPathFinder" = sys.meta_path.pop(0)
        # 導(dǎo)入 module
        module: ModuleType = importlib.import_module(fullname)
        # 針對_query進(jìn)行hook
        module.Cursor._query = func_wrapper(module.Cursor._query)
        sys.meta_path.insert(0, finder)
        return module
async def test_mysql() -> None:
    import aiomysql
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='123123', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

if __name__ == '__main__':
    sys.meta_path.insert(0, MetaPathFinder())
    import asyncio
    asyncio.run(test_mysql())
# 輸出示例:
# 可以看出sql語句與我們輸入的一樣, db, user, host, port等參數(shù)也是, 還能知道執(zhí)行的結(jié)果和運(yùn)行時間
# {'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}

這個例子看來很不錯, 但是需要在調(diào)用的入口處顯式調(diào)用該邏輯, 通常一個項(xiàng)目可能有幾個入口, 每個入口都顯示調(diào)用該邏輯會非常麻煩, 而且必須先調(diào)用我們的hook邏輯后才能import, 這樣就得訂好引入規(guī)范, 不然就可能出現(xiàn)部分地方hook不成功, 如果能把引入hook這個邏輯安排在解析器啟動后馬上執(zhí)行, 就可以完美地解決這個問題了. 查閱了一翻資料后發(fā)現(xiàn),python解釋器初始化的時候會自動import PYTHONPATH下存在的sitecustomize和usercustomize模塊, 我們只要創(chuàng)建該模塊, 并在模塊里面寫入我們的 替換函數(shù)即可。

.
├── __init__.py
├── hook_aiomysql.py
├── sitecustomize.py
└── test_auto_hook.py

hook_aiomysql.py是我們制作探針的代碼為例子, 而sitecustomize.py存放的代碼如下, 非常簡單, 就是引入我們的探針代碼, 并插入到sys.meta_path:

import sys
from hook_aiomysql import MetaPathFinder
sys.meta_path.insert(0, MetaPathFinder())

test_auto_hook.py則是測試代碼:

import asyncio
from hook_aiomysql import test_mysql
asyncio.run(test_mysql())

接下來只要設(shè)置PYTHONPATH并運(yùn)行我們的代碼即可(如果是項(xiàng)目的話一般交由superisor啟動,則可以在配置文件中設(shè)置好PYTHONPATH):

(.venv) ?  python_hook git:(master) ? export PYTHONPATH=.      
(.venv) ?  python_hook git:(master) ? python test_auto_hook.py 
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}

4.直接替換方法

可以看到上面的方法很好的運(yùn)行了, 而且可以很方便的嵌入到我們的項(xiàng)目中, 但是依賴與sitecustomize.py文件很難讓他抽離成一個第三方的庫, 如果要抽離成第三方的庫就得考慮看看有沒有其他的方法。在上面介紹MetaPathLoader時說到了sys.module, 在里面通過sys.modules來減少重復(fù)引入:

class MetaPathLoader:
    def load_module(self, fullname):
        # import的模塊都會存放在sys.modules里面, 通過判斷可以減少重復(fù)import
        if fullname in sys.modules:
            return sys.modules[fullname]
        # 防止遞歸調(diào)用
        finder = sys.meta_path.pop(0)
        # 導(dǎo)入 module
        module = importlib.import_module(fullname)
        if fullname == 'time':
            # 替換函數(shù)
            module.sleep = func_wrapper(module.sleep)
        sys.meta_path.insert(0, finder)
        return module

這個減少重復(fù)引入的原理是, 每次引入一個模塊后, 他就會存放在sys.modules, 如果是重復(fù)引入, 就會直接刷新成最新引入的模塊。上面之所以會考慮到減少重復(fù)import是因?yàn)槲覀儾粫诔绦蜻\(yùn)行時升級第三方庫的依賴。利用到我們可以不考慮重復(fù)引入同名不同實(shí)現(xiàn)的模塊, 以及sys.modules會緩存引入模塊的特點(diǎn), 我們可以把上面的邏輯簡化成引入模塊->替換當(dāng)前模塊方法為我們修改的hook方法。

import time
from functools import wraps
from typing import Any, Callable, Tuple, cast
import aiomysql
def func_wrapper(func: Callable):
    """和上面一樣的封裝函數(shù), 這里簡單略過"""
# 判斷是否hook過
_IS_HOOK: bool = False
# 存放原來的_query
_query: Callable = aiomysql.Cursor._query
# hook函數(shù)
def install_hook() -> None:
    _IS_HOOK = False
    if _IS_HOOK:
        return
    aiomysql.Cursor._query = func_wrapper(aiomysql.Cursor._query)
    _IS_HOOK = True
# 還原到原來的函數(shù)方法
def reset_hook() -> None:
    aiomysql.Cursor._query = _query
    _IS_HOOK = False

代碼簡單明了,接下來跑一跑剛才的測試:

import asyncio
import aiomysql
from demo import install_hook, reset_hook
async def test_mysql() -> None:
    pool: aiomysql.Pool = await aiomysql.create_pool(
        host='127.0.0.1', port=3306, user='root', password='', db='mysql'
    )
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            (r,) = await cur.fetchone()
            assert r == 42
    pool.close()
    await pool.wait_closed()

print("install hook")
install_hook()
asyncio.run(test_mysql())
print("reset hook")
reset_hook()
asyncio.run(test_mysql())
print("end")

通過測試輸出可以發(fā)現(xiàn)我們的邏輯的正確的, install hook后能出現(xiàn)我們提取的元信息, 而reset后則不會打印原信息

install hook
{'sql': 'SELECT 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}
reset hook
end

以上就是“Python探針怎么完成調(diào)用庫的數(shù)據(jù)提取”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學(xué)習(xí)更多的知識,請關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI