您好,登錄后才能下訂單哦!
這篇“Python運行時修改業(yè)務(wù)SQL代碼怎么寫”文章的知識點大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Python運行時修改業(yè)務(wù)SQL代碼怎么寫”文章吧。
最近項目在準備搞SASS化,SASS化有一個特點就是多租戶,且每個租戶之間的數(shù)據(jù)都要隔離,對于數(shù)據(jù)庫的隔離方案常見的有數(shù)據(jù)庫隔離,表隔離,字段隔離,目前我只用到表隔離和字段隔離(數(shù)據(jù)庫隔離的原理也是差不多)。 對于字段隔離比較簡單,就是查詢條件不同而已,比如像下面的SQL查詢:
SELECT * FROM t_demo WHERE tenant_id='xxx' AND is_del=0
但是為了嚴謹,需求上需要在執(zhí)行SQL之前檢查對應(yīng)的表是否帶上tenant_id
的查詢字段。
對于表隔離就麻煩了一些,他需要做到在運行的時候根據(jù)對應(yīng)的租戶ID來處理某個數(shù)據(jù)表,舉個例子,假如有下面這樣的一條SQL查詢:
SELECT * FROM t_demo WHERE is_del=0
在遇到租戶A時,SQL查詢將變?yōu)椋?/strong>
SELECT * FROM t_demo_a WHERE is_del=0
在遇到租戶B時,SQL查詢將變?yōu)椋?/strong>
SELECT * FROM t_demo_b WHERE is_del=0
如果商戶數(shù)量固定時,一般在代碼里編寫if-else
來判斷就可以了,但是常見的SASS化應(yīng)用的商戶是會一直新增的,那么對于這個SQL邏輯就會變成這樣:
def sql_handle(tenant_id: str): table_name: str = f"t_demo_{tenant_id}" sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
但是這有幾個問題,對于ORM來說,一開始只創(chuàng)建一個t_demo
對應(yīng)的表對象就可以了,現(xiàn)在卻要根據(jù)多個商戶創(chuàng)建多個表對象,這是不現(xiàn)實的,其次如果是裸寫SQL,一般會使用IDE的檢查,而對于這樣的SQL:
sql: str = f"SELECT * FROM {table_name} WHERE is_del=0"
IDE是沒辦法進行檢查的,當然還有一個最為嚴重的問題,就是當前的項目已經(jīng)非常龐大了,如果每個相關(guān)表的調(diào)用都進行適配更改的話,那工程量就非常龐大了,所以最好的方案就是在引擎庫得到用戶傳過來的SQL語句后且還沒發(fā)送到MySQL
服務(wù)器之前自動的根據(jù)商戶ID更改SQL, 而要達到這樣的效果,就必須侵入到我們使用的MySQL
的引擎庫,修改里面的方法來兼容我們的需求。
不管是使用
dbutils
還是sqlalchemy
,都可以指定一個引擎庫,目前常用的引擎庫是pymysql
,所以下文都將以pymysql
為例進行闡述。
由于必須侵入到我們使用的引擎庫,所以我們應(yīng)該先判斷我們需要修改引擎庫的哪個方法,在經(jīng)過源碼閱讀后,我判定只要更改pymysql.cursors.Cursor
的mogrify
方法:
def mogrify(self, query, args=None): """ Returns the exact string that is sent to the database by calling the execute() method. This method follows the extension to the DB API 2.0 followed by Psycopg. """ conn = self._get_db() if args is not None: query = query % self._escape_args(args, conn) return query
這個方法的作用就是把用戶傳過來的SQL和參數(shù)進行整合,生成一個最終的SQL,剛好符合我們的需求,于是可以通過繼承的思路來創(chuàng)建一個新的屬于我們自己的Cursor
類:
import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: # 在此可以編寫處理還合成的SQL邏輯 mogrify_sql: str = super().mogrify(query, args) # 在此可以編寫處理合成后的SQL邏輯 return mogrify_sql class DictCursor(pymysql.cursors.DictCursorMixin, Cursor): """A cursor which returns results as a dictionary""" # 直接修改Cursor類的`mogrify`方法并不會影響到`DictCursor`類,所以我們也要創(chuàng)建一個新的`Cursor`類。
創(chuàng)建好了Cursor
類后,就需要考慮如何在pymysql
中應(yīng)用我們自定義的Cursor
類了,一般的Mysql
連接庫都支持我們傳入自定義的Cursor
類,比如pymysql
:
import pymysql.cursors # Connect to the database connection = pymysql.connect( host='localhost', user='user', password='passwd', database='db', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor )
我們可以通過cursorclass
來指定我們的Cursor
類,如果使用的庫不支持或者是其它原因則需要使用猴子補丁的方法,具體的使用方法見Python探針完成調(diào)用庫的數(shù)據(jù)提取。
現(xiàn)在我們已經(jīng)搞定了在何處修改SQL的問題了,接下來就要思考如何在mogrify
方法獲取到商戶ID以及那些表要進行替換,一般我們在進行一段代碼調(diào)用時,有兩種傳參數(shù)的方法, 一種是傳數(shù)組類型的參數(shù):
with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
一種是傳字典類型的參數(shù):
with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%(is_del)s", {"is_del": 0})
目前大多數(shù)的項目都存在這兩種類型的編寫習(xí)慣,而引擎庫在執(zhí)行execute
時會經(jīng)過處理后才把參數(shù)sql
和args
傳給了mogrify
,如果我們是使用字典類型的參數(shù),那么可以在里面嵌入我們需要的參數(shù),并在mogrify
里面提取出來,但是使用了數(shù)組類型的參數(shù)或者是ORM庫的話就比較難傳遞參數(shù)給mogrify
方法了,這時可以通過context
隱式的把參數(shù)傳給mogrify
方法,具體的分析和原理可見:python如何使用contextvars模塊源碼分析。
context
的使用方法很簡單, 首先是創(chuàng)建一個context
封裝的類:
from contextvars import ContextVar, Token from typing import Any, Dict, Optional, Set context: ContextVar[Dict[str, Any]] = ContextVar("context", default={}) class Context(object): """基礎(chǔ)的context調(diào)用,支持Type Hints檢查""" tenant_id: str replace_table_set: Set[str] def __getattr__(self, key: str) -> Any: value: Any = context.get().get(key) return value def __setattr__(self, key: str, value: Any) -> None: context.get()[key] = value class WithContext(Context): """簡單的處理reset token邏輯,和context管理,只用在業(yè)務(wù)代碼""" def __init__(self) -> None: self._token: Optional[Token] = None def __enter__(self) -> "WithContext": self._token = context.set({}) return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self._token: context.reset(self._token) self._token = None
接下來在業(yè)務(wù)代碼中,通過context傳入當前業(yè)務(wù)對應(yīng)的參數(shù):
with WithContext as context: context.tenant_id = "xxx" context.replace_table_set = {"t_demo"} with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_demo WHERE is_del=%s", (0, ))
然后在mogrify
中通過調(diào)用context
即可獲得對應(yīng)的參數(shù)了:
import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str] = context.replace_table_set # 在此可以編寫處理還合成的SQL邏輯 mogrify_sql: str = super().mogrify(query, args) # 在此可以編寫處理合成后的SQL邏輯 return mogrify_sql
現(xiàn)在,萬事俱備,只剩下修改SQL的邏輯,之前在做別的項目的時候,建的表都是十分的規(guī)范,它們是以t_xxx
的格式給表命名,這樣一來替換表名十分方便,只要進行兩次替換就可以兼容大多數(shù)情況了,代碼如下:
import pymysql class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id replace_table_set: Set[str] = context.replace_table_set # 簡單示例,實際上正則的效率會更好 for replace_table in replace_table_set: if replace_table in query: # 替換表名 query = query.replace(f" {replace_table} ", f" {replace_table}_{tenant_id} ") # 替換查詢條件中帶有表名的 query = query.replace(f" {replace_table}.", f" {replace_table}_{tenant_id}.") mogrify_sql: str = super().mogrify(query, args) # 在此可以編寫處理合成后的SQL邏輯 return mogrify_sql
但是現(xiàn)在項目的SQL規(guī)范并不是很好,有些表名還是MySQL
的關(guān)鍵字,所以靠簡單的替換是行不通的,同時這個需求中,一些表只需要字段隔離,需要確保有帶上對應(yīng)的字段查詢,這就意味著必須有一個庫可以來解析SQL
,并返回一些數(shù)據(jù)使我們可以比較方便的知道SQL
中哪些是表名,哪些是查詢字段了。
目前在Python中有一個比較知名的SQL
解析庫--sqlparse,它可以通過解析引擎把SQL解析成一個Python對象
,之后我們就可以通過一些語法來判斷哪些是SQL
關(guān)鍵字, 哪些是表名,哪些是查詢條件等等。但是這個庫只實現(xiàn)一些底層的API,我們需要對他和SQL比較了解之后才能實現(xiàn)一些比較完備的功能,比如下面3種常見的SQL:
SELECT * FROM t_demo SELECT * FROM t_demo as demo SELECT * FROM t_other as other LEFT JOIN t_demo demo on demo.xxx==other.xxx
如果我們要通過sqlparse
來提取表名的話就需要處理這3種情況,而我們?nèi)绻恳粋€情況都編寫出來的話,那將會非常費心費力,同時也可能存在遺漏的情況,這時就需要用到另外一個庫--sql_metadata,這個庫是基于sqlparse
和正則的解析庫,同時提供了大量的常見使用方法的封裝,我們通過直接調(diào)用對應(yīng)的函數(shù)就能知道SQL
中有哪些表名,查詢字段是什么了。
目前已知這個庫有一個缺陷,就是會自動去掉字段的符號, 比如表名為關(guān)鍵字時,我們需要使用`符號把它包起來:
SELECT * FROM `case`
但在經(jīng)過sql_metadata
解析后得到的表名是case
而不是`case`,需要人為的處理,但是我并不覺得這是一個BUG,自己不按規(guī)范創(chuàng)建表,能怪誰呢。
接下來就可以通過sql_metadata
的方法來實現(xiàn)我需要的功能了,在根據(jù)需求修改后,代碼長這樣(說明見注釋):
from typing import Dict, Set, Tuple, Union import pymysql import sql_metadata class Cursor(pymysql.cursors.Cursor): def mogrify(self, query: str, args: Union[None, list, dict, tuple] = None) -> str: tenant_id: str = context.tenant_id # 生成一個解析完成的SQL對象 sql_parse: sql_metadata.Parser = sql_metadata.Parser(query) # 新加的一個屬性,這里存下需要校驗查詢條件的表名 check_flag = False where_table_set: Set[str] = context.where_table_set # 該方法會獲取到SQL對應(yīng)的table,返回的是一個table的數(shù)組 for table_name in sql_parse.tables: if table_name in where_table_set: if sql_parse.columns_dict: # 該方法會返回SQL對應(yīng)的字段,其中分為select, join, where等,這里只用到了where for where_column in sql_parse.columns_dict.get("where", []): # 如果連表,里面存的是類似于t_demo.tenant_id,所以要兼容這一個情況 if "tenant_id" in where_column.lower().split("."): check_flag = True break if not check_flag: # 檢查不通過就拋錯 raise RuntimeError() # 更換表名的邏輯 replace_table_set: Set[str] = context.replace_table_set new_query: str = query for table_name in sql_parse.tables: if table_name in replace_table_set: new_query = "" # tokens存放著解析完的數(shù)據(jù),比如SELECT * FROM t_demo解析后是 # [SELECT, *, FROM, t_demo]四個token for token in sql_parse.tokens: # 判斷token是否是表名 if token.is_potential_table_name: # 提取規(guī)范的表名 parse_table_name: str = token.stringified_token.strip() if parse_table_name in replace_table_set: new_table_name: str = f" {parse_table_name}_{tenant_id}" # next_token代表SQL的下一個字段 if token.next_token.normalized != "AS": # 如果當前表沒有設(shè)置別名 # 通過AS把替換前的表名設(shè)置為新表名的別名,這樣一來后面的表名即使沒進行更改,也是能讀到對應(yīng)商戶ID的表 new_table_name += f" AS {parse_table_name}" query += new_table_name continue # 通過stringified_token獲取的數(shù)據(jù)會自動帶空格,比如`FROM`得到的會是` FROM`,這樣拼接的時候就不用考慮是否加空格了 new_query += token.stringified_token mogrify_sql: str = super().mogrify(new_query, args) # 在此可以編寫處理合成后的SQL邏輯 return mogrify_sql
這份代碼十分簡單,它只做簡單介紹,事實上這段邏輯會應(yīng)用到所有的SQL
查詢中,我們應(yīng)該要保證這段代碼是沒問題的,同時不要有太多的性能浪費,所以在使用的時候要考慮到代碼拆分和優(yōu)化。 比如在使用的過程中可以發(fā)現(xiàn),我們的SQL
轉(zhuǎn)換和檢查都是在父類的Cursor.mogrify
之前進行的,這就意味著不管我們代碼邏輯里cursor.execute
傳的參數(shù)是什么,對于同一個代碼邏輯來說,傳過來的query
值是保持不變的,比如下面的代碼:
def get_user_info(uid: str) -> Dict[str, Any]: with conn.cursor() as cursor: cursor.execute("SELECT * FROM t_user WHERE uid=%(uid)s", {"uid": uid}) return cursor.fetchone() or {}
這段代碼中傳到Cursor.mogrify
的query永遠為SELECT * FROM t_user WHERE uid=%(uid)s,有變化的只是args中uid的不同。 有了這樣的一個前提條件,那么我們就可以把query
的校驗結(jié)果和轉(zhuǎn)換結(jié)果緩存下來,減少每次都需要解析SQL
再校驗造成的性能浪費。至于如何實現(xiàn)緩存則需要根據(jù)自己的項目來決定,比如項目中只有幾百個SQL
執(zhí)行,那么直接用Python
的dict
來存放就可以了,如果項目中執(zhí)行的SQL
很多,同時有些執(zhí)行的頻率非常的高,有些執(zhí)行的頻率非常的低,那么可以考慮使用LRU
來緩存。
以上就是關(guān)于“Python運行時修改業(yè)務(wù)SQL代碼怎么寫”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對大家有幫助,若想了解更多相關(guān)的知識內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。