您好,登錄后才能下訂單哦!
這篇文章主要介紹“Python怎么封裝數(shù)據(jù)庫連接池”,在日常操作中,相信很多人在Python怎么封裝數(shù)據(jù)庫連接池問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Python怎么封裝數(shù)據(jù)庫連接池”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
前言:
線程安全問題:當(dāng)2個(gè)線程同時(shí)用到線程池時(shí),會(huì)同時(shí)創(chuàng)建2個(gè)線程池。如果多個(gè)線程,錯(cuò)開用到線程池,就只會(huì)創(chuàng)建一個(gè)線程池,會(huì)共用一個(gè)線程池。我用的注解方式的單例模式,感覺就是這個(gè)注解的單例方式,解決了多線程問題,但是沒解決線程安全問題,需要優(yōu)化這個(gè)單例模式。
主要通過 PooledDB 模塊實(shí)現(xiàn)。
db_config.py
# -*- coding: UTF-8 -*- import pymysql # 數(shù)據(jù)庫信息 DB_TEST_HOST = "127.0.0.1" DB_TEST_PORT = 3308 DB_TEST_DBNAME = "bt" DB_TEST_USER = "root" DB_TEST_PASSWORD = "123456" # 數(shù)據(jù)庫連接編碼 DB_CHARSET = "utf8" # mincached : 啟動(dòng)時(shí)開啟的閑置連接數(shù)量(缺省值 0 開始時(shí)不創(chuàng)建連接) DB_MIN_CACHED = 5 # maxcached : 連接池中允許的閑置的最多連接數(shù)量(缺省值 0 代表不閑置連接池大小) DB_MAX_CACHED = 0 # maxshared : 共享連接數(shù)允許的最大數(shù)量(缺省值 0 代表所有連接都是專用的)如果達(dá)到了最大數(shù)量,被請求為共享的連接將會(huì)被共享使用 DB_MAX_SHARED = 5 # maxconnecyions : 創(chuàng)建連接池的最大數(shù)量(缺省值 0 代表不限制) DB_MAX_CONNECYIONS = 300 # blocking : 設(shè)置在連接池達(dá)到最大數(shù)量時(shí)的行為(缺省值 0 或 False 代表返回一個(gè)錯(cuò)誤<toMany......> 其他代表阻塞直到連接數(shù)減少,連接被分配) DB_BLOCKING = True # maxusage : 單個(gè)連接的最大允許復(fù)用次數(shù)(缺省值 0 或 False 代表不限制的復(fù)用).當(dāng)達(dá)到最大數(shù)時(shí),連接會(huì)自動(dòng)重新連接(關(guān)閉和重新打開) DB_MAX_USAGE = 0 # setsession : 一個(gè)可選的SQL命令列表用于準(zhǔn)備每個(gè)會(huì)話,如["set datestyle to german", ...] DB_SET_SESSION = None # creator : 使用連接數(shù)據(jù)庫的模塊 DB_CREATOR = pymysql
設(shè)置連接池最大最小為5個(gè)。則啟動(dòng)連接池時(shí),就會(huì)建立5個(gè)連接。
singleton.py
#單例模式函數(shù),用來修飾類 def singleton(cls,*args,**kw): instances = {} def _singleton(): if cls not in instances: instances[cls] = cls(*args,**kw) return instances[cls] return _singleton
db_dbutils_init.py
from dbutils.pooled_db import PooledDB import db_config as config # import random from singleton import singleton """ @功能:創(chuàng)建數(shù)據(jù)庫連接池 """ class MyConnectionPool(object): # 私有屬性 # 能通過對象直接訪問,但是可以在本類內(nèi)部訪問; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 創(chuàng)建數(shù)據(jù)庫連接conn和游標(biāo)cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 創(chuàng)建數(shù)據(jù)庫連接池 def __getconn(self): if self.__pool is None: # i = random.randint(1, 100) # print("創(chuàng)建線程池的數(shù)量"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連接池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關(guān)閉連接歸還給鏈接池 # def close(self): # self.cursor.close() # self.conn.close() # 從連接池中取出一個(gè)連接 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連接池,實(shí)例化 @singleton def get_my_connection(): return MyConnectionPool()
mysqlhelper.py
import time from db_dbutils_init import get_my_connection """執(zhí)行語句查詢有結(jié)果返回結(jié)果沒有返回0;增/刪/改返回變更數(shù)據(jù)條數(shù),沒有返回0""" class MySqLHelper(object): def __init__(self): self.db = get_my_connection() # 從數(shù)據(jù)池中獲取連接 # # def __new__(cls, *args, **kwargs): # if not hasattr(cls, 'inst'): # 單例 # cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs) # return cls.inst # 封裝執(zhí)行命令 def execute(self, sql, param=None, autoclose=False): """ 【主要判斷是否有參數(shù)和是否執(zhí)行完就釋放連接】 :param sql: 字符串類型,sql語句 :param param: sql語句中要替換的參數(shù)"select %s from tab where id=%s" 其中的%s就是參數(shù) :param autoclose: 是否關(guān)閉連接 :return: 返回連接conn和游標(biāo)cursor """ cursor, conn = self.db.getconn() # 從連接池獲取連接 count = 0 try: # count : 為改變的數(shù)據(jù)條數(shù) if param: count = cursor.execute(sql, param) else: count = cursor.execute(sql) conn.commit() if autoclose: self.close(cursor, conn) except Exception as e: pass return cursor, conn, count # 釋放連接 def close(self, cursor, conn): """釋放連接歸還給連接池""" cursor.close() conn.close() # 查詢所有 def selectall(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchall() return res except Exception as e: print(e) self.close(cursor, conn) return count # 查詢單條 def selectone(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchone() self.close(cursor, conn) return res except Exception as e: print("error_msg:", e.args) self.close(cursor, conn) return count # 增加 def insertone(self, sql, param): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) # _id = cursor.lastrowid() # 獲取當(dāng)前插入數(shù)據(jù)的主鍵id,該id應(yīng)該為自動(dòng)生成為好 conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 增加多行 def insertmany(self, sql, param): """ :param sql: :param param: 必須是元組或列表[(),()]或((),()) :return: """ cursor, conn, count = self.db.getconn() try: cursor.executemany(sql, param) conn.commit() return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 刪除 def delete(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 更新 def update(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # if __name__ == '__main__': # db = MySqLHelper() # sql = "SELECT SLEEP(10)" # db.execute(sql) # time.sleep(20) # TODO 查詢單條 # sql1 = 'select * from userinfo where name=%s' # args = 'python' # ret = db.selectone(sql=sql1, param=args) # print(ret) # (None, b'python', b'123456', b'0') # TODO 增加單條 # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)' # ret = db.insertone(sql2, ('1', '2', '1', '2', '2')) # print(ret) # TODO 增加多條 # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)' # li = li = [ # ('分省', '123'), # ('到達(dá)','456') # ] # ret = db.insertmany(sql3,li) # print(ret) # TODO 刪除 # sql4 = 'delete from userinfo WHERE name=%s' # args = 'xxxx' # ret = db.delete(sql4, args) # print(ret) # TODO 更新 # sql5 = r'update userinfo set password=%s WHERE name LIKE %s' # args = ('993333993', '%old%') # ret = db.update(sql5, args) # print(ret)
修改 db_dbutils_init.py 文件,在創(chuàng)建連接池def __getconn(self):方法下,加一個(gè)打印隨機(jī)數(shù),方便將來我們定位是否時(shí)單例的線程池。
修改后的db_dbutils_init.py 文件:
from dbutils.pooled_db import PooledDB import db_config as config import random from singleton import singleton """ @功能:創(chuàng)建數(shù)據(jù)庫連接池 """ class MyConnectionPool(object): # 私有屬性 # 能通過對象直接訪問,但是可以在本類內(nèi)部訪問; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 創(chuàng)建數(shù)據(jù)庫連接conn和游標(biāo)cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 創(chuàng)建數(shù)據(jù)庫連接池 def __getconn(self): if self.__pool is None: i = random.randint(1, 100) print("線程池的隨機(jī)數(shù)"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連接池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關(guān)閉連接歸還給鏈接池 # def close(self): # self.cursor.close() # self.conn.close() # 從連接池中取出一個(gè)連接 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連接池,實(shí)例化 @singleton def get_my_connection(): return MyConnectionPool()
開始測試:
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db = MySqLHelper() db.execute(sql) db.execute(sql1) time.sleep(20)
在數(shù)據(jù)庫中,使用 show processlist;
show processlist;
當(dāng)執(zhí)行第一個(gè)sql時(shí)。數(shù)據(jù)庫連接顯示。
當(dāng)執(zhí)行第二個(gè)sql時(shí)。數(shù)據(jù)庫連接顯示:
當(dāng)執(zhí)行完sql,程序sleep時(shí)。數(shù)據(jù)庫連接顯示:
程序打印結(jié)果:
線程池的隨機(jī)數(shù)43
由以上可以得出結(jié)論:
線程池啟動(dòng)后,生成了5個(gè)連接。執(zhí)行第一個(gè)sql時(shí),使用了1個(gè)連接。執(zhí)行完第一個(gè)sql后,使用了另外1個(gè)連接。 這是一個(gè)線性的,線程池中一共5個(gè)連接,但是每次執(zhí)行,只使用了其中一個(gè)。
有個(gè)疑問,連接池如果不支持并發(fā)是不是就毫無意義?
如上,雖然開了線程池5個(gè)連接,但是每次執(zhí)行sql,只用到了一個(gè)連接。那為何不設(shè)置線程池大小為1呢?設(shè)置線程池大小的意義何在呢?(如果在非并發(fā)的場景下,是不是設(shè)置大小無意義?)
相比于不用線程池的優(yōu)點(diǎn):
如果不用線程池,則每次執(zhí)行一個(gè)sql都要?jiǎng)?chuàng)建、斷開連接。 像我們這樣使用連接池,不用反復(fù)創(chuàng)建、斷開連接,拿現(xiàn)成的連接直接用就好了。
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() db1 = MySqLHelper() sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db.execute(sql) db1.execute(sql1) time.sleep(20)
第一個(gè)實(shí)例db,執(zhí)行sql。線程池啟動(dòng)了5個(gè)連接
第二個(gè)實(shí)例db1,執(zhí)行sql:
程序睡眠時(shí),一共5個(gè)線程池:
打印結(jié)果:
結(jié)果證明:
雖然我們依次創(chuàng)建了2個(gè)實(shí)例,但是(1)創(chuàng)建線程池的打印結(jié)果,只打印1次,且從始至終,線程池一共只啟動(dòng)了5個(gè)連接,且連接的id沒有發(fā)生改變,說明一直是這5個(gè)連接。
證明,我們雖然創(chuàng)建了2個(gè)實(shí)例,但是這2個(gè)實(shí)例其實(shí)是一個(gè)實(shí)例。(單例模式是生效的)
import threading from mysqlhelper import MySqLHelper import time def sl1(): time.sleep(2) db = MySqLHelper() sql = "SELECT SLEEP(6)" db.execute(sql) def sl2(): time.sleep(4) db = MySqLHelper() sql = "SELECT SLEEP(15)" db.execute(sql) if __name__ == '__main__': threads = [] t1 = threading.Thread(target=sl1) threads.append(t1) t2 = threading.Thread(target=sl2) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
2個(gè)線程間隔了2秒。
觀察數(shù)據(jù)庫的連接數(shù)量:
打印結(jié)果:
在并發(fā)執(zhí)行2個(gè)sql時(shí),共用了這5個(gè)連接,且打印結(jié)果只打印了一次,說明雖然并發(fā)創(chuàng)建了2次實(shí)例,但真正只創(chuàng)建了一個(gè)連接池。
import threading from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() sql = "SELECT SLEEP(6)" sql1 = "SELECT SLEEP(15)" threads = [] t1 = threading.Thread(target=db.execute, args=(sql,)) threads.append(t1) t2 = threading.Thread(target=db.execute, args=(sql1,)) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
觀察數(shù)據(jù)庫連接 :
打印結(jié)果:
結(jié)果表明:
終端打印了2次,數(shù)據(jù)庫建立了10個(gè)連接,說明創(chuàng)建了2個(gè)線程池。這樣的單例模式,存在線程安全問題。
到此,關(guān)于“Python怎么封裝數(shù)據(jù)庫連接池”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。