您好,登錄后才能下訂單哦!
這篇“Druid核心源碼分析DruidDataSource”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“Druid核心源碼分析DruidDataSource”文章吧。
druid連接池支持的所有連接參數(shù)可在類com.alibaba.druid.pool.DruidDataSourceFactory
中查看。
配置讀取代碼:
public void configFromPropety(Properties properties) { //這方法太長(zhǎng),自己看源碼去吧,就是讀讀屬性。。。。 }
整體代碼比較簡(jiǎn)單,就是把配置內(nèi)容,讀取到dataSource。
首先是簡(jiǎn)單的判斷,加鎖:
if (inited) { //已經(jīng)被初始化好了,直接return return; } // bug fixed for dead lock, for issue #2980 DruidDriver.getInstance(); /**控制創(chuàng)建移除連接的鎖,并且通過條件去控制一個(gè)連接的生成消費(fèi)**/ // public DruidAbstractDataSource(boolean lockFair){ // lock = new ReentrantLock(lockFair); // // notEmpty = lock.newCondition(); // empty = lock.newCondition(); // } final ReentrantLock lock = this.lock; try { lock.lockInterruptibly(); } catch (InterruptedException e) { throw new SQLException("interrupt", e); }
之后會(huì)更新一些JMX的監(jiān)控指標(biāo):
//一些jmx監(jiān)控指標(biāo) this.connectionIdSeedUpdater.addAndGet(this, delta); this.statementIdSeedUpdater.addAndGet(this, delta); this.resultSetIdSeedUpdater.addAndGet(this, delta); this.transactionIdSeedUpdater.addAndGet(this, delta);
druid的監(jiān)控指標(biāo)都是通過jmx實(shí)現(xiàn)的。
解析連接串:
if (this.jdbcUrl != null) { //解析連接串 this.jdbcUrl = this.jdbcUrl.trim(); initFromWrapDriverUrl(); }
initFromWrapDriverUrl
方法,除了從jdbc url中解析出連接和驅(qū)動(dòng)信息,后面還把filters的名字,解析成了對(duì)應(yīng)的filter類。
private void initFromWrapDriverUrl() throws SQLException { if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) { return; } DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null); this.driverClass = config.getRawDriverClassName(); LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'"); this.jdbcUrl = config.getRawUrl(); if (this.name == null) { this.name = config.getName(); } for (Filter filter : config.getFilters()) { addFilter(filter); } }
之后在init方法里面,會(huì)進(jìn)行filters的初始化:
//初始化filter 屬性 for (Filter filter : filters) { filter.init(this); }
之后解析數(shù)據(jù)庫(kù)類型:
if (this.dbTypeName == null || this.dbTypeName.length() == 0) { this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null); }
注意枚舉值: com.alibaba.druid.DbType
,這個(gè)里面包含了目前durid連接池支持的所有數(shù)據(jù)源 類型,另外,druid還額外提供了一些驅(qū)動(dòng)類,例如:
elastic_search (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver
clickhouse還提供了負(fù)載均衡的驅(qū)動(dòng)類:
com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver
。
在回到init方法,之后是一堆參數(shù)解析,不再說,跳過了。 之后是通過SPI加載自定義的filter:
private void initFromSPIServiceLoader() { if (loadSpifilterSkip) { return; } if (autoFilters == null) { List<Filter> filters = new ArrayList<Filter>(); ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class); for (Filter filter : autoFilterLoader) { AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class); if (autoLoad != null && autoLoad.value()) { filters.add(filter); } } autoFilters = filters; } for (Filter filter : autoFilters) { if (LOG.isInfoEnabled()) { LOG.info("load filter from spi :" + filter.getClass().getName()); } addFilter(filter); } }
注意自定義的filter,要使用com.alibaba.druid.filter.AutoLoad
。
解析驅(qū)動(dòng):
protected void resolveDriver() throws SQLException { if (this.driver == null) { if (this.driverClass == null || this.driverClass.isEmpty()) { this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl); } if (MockDriver.class.getName().equals(driverClass)) { driver = MockDriver.instance; } else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) { Properties info = new Properties(); info.put("user", username); info.put("password", password); info.putAll(connectProperties); driver = new BalancedClickhouseDriver(jdbcUrl, info); } else { if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) { throw new SQLException("url not set"); } driver = JdbcUtils.createDriver(driverClassLoader, driverClass); } } else { if (this.driverClass == null) { this.driverClass = driver.getClass().getName(); } } }
其中durid自己的mock驅(qū)動(dòng)和clickhouse的負(fù)載均衡的驅(qū)動(dòng),特殊判斷了下,其他走的都是class forname.
之后是exception sorter和checker的一些東西,跟主線劇情關(guān)系不大,skip.
之后是一些初始化JdbcDataSourceStat
,沒啥東西。
之后是核心:
connections = new DruidConnectionHolder[maxActive]; //連接數(shù)組 evictConnections = new DruidConnectionHolder[maxActive]; //銷毀的連接數(shù)組 keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活躍可用的數(shù)組
dataSource的連接,都被包裝在類DruidConnectionHolder
中,之后是一個(gè)同步去初始化連接還是異步去初始化的連接,總之,是去初始化 連接的過程:
if (createScheduler != null && asyncInit) { for (int i = 0; i < initialSize; ++i) { submitCreateTask(true); } } else if (!asyncInit) { // init connections while (poolingCount < initialSize) { try { PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); connections[poolingCount++] = holder; } catch (SQLException ex) { LOG.error("init datasource error, url: " + this.getUrl(), ex); if (initExceptionThrow) { connectError = ex; break; } else { Thread.sleep(3000); } } } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } }
初始化的連接個(gè)數(shù)為連接串里面配置的initialSize
.
核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection()
,在這方法里面,會(huì)拿用戶名密碼,之后執(zhí)行真正的獲取connection:
public Connection createPhysicalConnection(String url, Properties info) throws SQLException { Connection conn; if (getProxyFilters().size() == 0) { conn = getDriver().connect(url, info); } else { conn = new FilterChainImpl(this).connection_connect(info); } createCountUpdater.incrementAndGet(this); return conn; }
注意,如果配置了filters,則所有操作,都會(huì)在操作前執(zhí)行filter處理鏈。
public ConnectionProxy connection_connect(Properties info) throws SQLException { if (this.pos < filterSize) { return nextFilter() .connection_connect(this, info); } Driver driver = dataSource.getRawDriver(); String url = dataSource.getRawJdbcUrl(); Connection nativeConnection = driver.connect(url, info); if (nativeConnection == null) { return null; } return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId()); }
再回到主流程init方法,connections
數(shù)組初始化完成之后, 開啟額外線程:
createAndLogThread(); //打印連接信息 createAndStartCreatorThread(); //創(chuàng)建連接線程 createAndStartDestroyThread(); //銷毀連接線程
先看注釋,具體里面的內(nèi)容后面單獨(dú)拉出來(lái)講。
之后:
initedLatch.await(); //初始化 latch -1 init = true; //標(biāo)記已經(jīng)初始化完成 initedTime = new Date(); //時(shí)間 registerMbean(); //為datasource 注冊(cè)jmx監(jiān)控指標(biāo)
最后的最后,如果配置了keepAlive:
if (keepAlive) { // async fill to minIdle if (createScheduler != null) { for (int i = 0; i < minIdle; ++i) { submitCreateTask(true); } } else { this.emptySignal(); } }
這時(shí)候,會(huì)根據(jù)配置的活躍連接數(shù)minIdle
,去給datasource的連接,做個(gè)保持活躍連接個(gè)數(shù),具體后面再說。
首先,使用數(shù)組作為連接的容器,對(duì)于真實(shí)連接的加入和移除,使用lock就行同步,另外,在加入和移除連接時(shí)候,對(duì)比生產(chǎn)消費(fèi)模型,通過lock上的條件,來(lái)通知是否可以獲取或者加入連接。
public DruidAbstractDataSource(boolean lockFair){ lock = new ReentrantLock(lockFair); notEmpty = lock.newCondition(); //非空,有連接 empty = lock.newCondition(); //空的 }
另外,默認(rèn)的fairlock為false
public DruidDataSource(){ this(false); } public DruidDataSource(boolean fairLock){ super(fairLock); configFromPropety(System.getProperties()); }
在線程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread
中:
if (emptyWait) { // 必須存在線程等待,才創(chuàng)建連接 if (poolingCount >= notEmptyWaitThreadCount // && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { empty.await(); } // 防止創(chuàng)建超過maxActive數(shù)量的連接 if (activeCount + poolingCount >= maxActive) { empty.await(); continue; } }
必須存在線程等待獲取連接時(shí)候,才能創(chuàng)建連接,并且要保持總的連接數(shù),不能超過配置的最大連接。
創(chuàng)建完連接之后,執(zhí)行 notEmpty.signalAll();
通知消費(fèi)者。
外層代碼:
@Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); } public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
忽略掉filter chain,其實(shí)最后執(zhí)行的還是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect
:
方法內(nèi)部:
poolableConnection = getConnectionInternal(maxWaitMillis);
1 , 連接不足,需要直接去創(chuàng)建新的,跟我們初始化一樣
2,從connections里面拿
if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); }
其中,maxWait默認(rèn)為-1,配置在init里面:
String property = properties.getProperty("druid.maxWait"); if (property != null && property.length() > 0) { try { int value = Integer.parseInt(property); this.setMaxWait(value); } catch (NumberFormatException e) { LOG.error("illegal property 'druid.maxWait'", e); } }
這個(gè)用于配置拿連接時(shí)候,是否在這個(gè)時(shí)間上進(jìn)行等待,默認(rèn)是否,即一直等到拿到連接為止。
直接看下阻塞拿的過程:
DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { //沒連接了 while (poolingCount == 0) { //暗示下創(chuàng)建線程沒連接了 emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { //傻等著創(chuàng)建或者回收,能給整出來(lái)點(diǎn)兒連接 notEmpty.await(); // signal by recycle or creator } finally { notEmptyWaitThreadCount--; } notEmptyWaitCount++; if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } //拿數(shù)組的最后一個(gè)連接 decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; return last; }
protected void createAndStartDestroyThread() { destroyTask = new DestroyTask(); //自定義配置銷毀 ,適用于連接數(shù)非常多的 情況 if (destroyScheduler != null) { long period = timeBetweenEvictionRunsMillis; if (period <= 0) { period = 1000; } destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period, TimeUnit.MILLISECONDS); initedLatch.countDown(); return; } String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this); //單線程銷毀 destroyConnectionThread = new DestroyConnectionThread(threadName); destroyConnectionThread.start(); }
實(shí)際的銷毀:
public class DestroyTask implements Runnable { public DestroyTask() { } @Override public void run() { shrink(true, keepAlive); if (isRemoveAbandoned()) { removeAbandoned(); } } }
最終 執(zhí)行的還是 shrink
方法。
public void shrink(boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return; } boolean needFill = false; int evictCount = 0; int keepAliveCount = 0; int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink; fatalErrorCountLastShrink = fatalErrorCount; try { if (!inited) { return; } final int checkCount = poolingCount - minIdle; //需要檢測(cè)連接的數(shù)量 final long currentTimeMillis = System.currentTimeMillis(); for (int i = 0; i < poolingCount; ++i) { //檢測(cè)目前connections數(shù)組中的連接 DruidConnectionHolder connection = connections[i]; if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) { keepAliveConnections[keepAliveCount++] = connection; continue; } if (checkTime) { //是否設(shè)置了物理連接的超時(shí)時(shí)間phyTimoutMills。假如設(shè)置了該時(shí)間, // 判斷連接時(shí)間存活時(shí)間是否已經(jīng)超過phyTimeoutMills,是則放入evictConnections中 if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue; } } long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;//獲取連接空閑時(shí)間 //如果某條連接空閑時(shí)間小于minEvictableIdleTimeMillis,則不用繼續(xù)檢查剩下的連接了 if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break; } if (idleMillis >= minEvictableIdleTimeMillis) { // check checkTime is silly code //檢測(cè)檢查了幾個(gè)連接了 if (checkTime && i < checkCount) { //超時(shí)了 evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { //超時(shí)了 evictConnections[evictCount++] = connection; continue; } } if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { //配置了keepAlive,并且在存活時(shí)間內(nèi),放到keepAlive數(shù)組 keepAliveConnections[keepAliveCount++] = connection; } } else { //不需要檢查時(shí)間的,直接移除 if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break; } } } int removeCount = evictCount + keepAliveCount; //移除了幾個(gè) //由于使用connections連接時(shí)候,都是取后面的,后面 的是最新的連接,只考慮前面過期就行,所以只需要挪動(dòng)前面的連接 if (removeCount > 0) { System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; if (keepAlive && poolingCount + activeCount < minIdle) { //不夠核心的活躍連接時(shí)候,需要去創(chuàng)建啦 needFill = true; } } finally { lock.unlock(); } if (evictCount > 0) { for (int i = 0; i < evictCount; ++i) { //銷毀連接 DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } if (keepAliveCount > 0) { // keep order for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } boolean discard = !validate; //沒通過validate if (validate) { //通過keep alive檢查,更新時(shí)間 holer.lastKeepTimeMillis = System.currentTimeMillis(); //這里還會(huì)嘗試放回connections數(shù)組 boolean putOk = put(holer, 0L, true); if (!putOk) { //沒放入,標(biāo)記要丟棄了 discard = true; } } if (discard) { try { connection.close(); } catch (Exception e) { // skip } lock.lock(); try { discardCount++; if (activeCount + poolingCount <= minIdle) { //發(fā)信號(hào)讓創(chuàng)建線程去創(chuàng)建 emptySignal(); } } finally { lock.unlock(); } } } this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null); } if (needFill) { //又要去創(chuàng)建了 lock.lock(); try { int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); } } finally { lock.unlock(); } } else if (onFatalError || fatalErrorIncrement > 0) { lock.lock(); try { emptySignal(); } finally { lock.unlock(); } } }
工具數(shù)組evictConnections
,keepAliveConnections
用完即被置空,老工具人了。
一波操作下來(lái),完成了對(duì)connections數(shù)組的大清洗。
以上就是關(guān)于“Druid核心源碼分析DruidDataSource”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。