溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Druid核心源碼分析DruidDataSource

發(fā)布時(shí)間:2023-03-31 15:08:40 來(lái)源:億速云 閱讀:132 作者:iii 欄目:開發(fā)技術(shù)

這篇“Druid核心源碼分析DruidDataSource”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“Druid核心源碼分析DruidDataSource”文章吧。

配置讀取

druid連接池支持的所有連接參數(shù)可在類com.alibaba.druid.pool.DruidDataSourceFactory中查看。

配置讀取代碼:

 public void configFromPropety(Properties properties) {
        //這方法太長(zhǎng),自己看源碼去吧,就是讀讀屬性。。。。
    }

整體代碼比較簡(jiǎn)單,就是把配置內(nèi)容,讀取到dataSource。

連接池初始化

首先是簡(jiǎn)單的判斷,加鎖:

if (inited) {
            //已經(jīng)被初始化好了,直接return
            return;
        }
        // bug fixed for dead lock, for issue #2980
        DruidDriver.getInstance();
        /**控制創(chuàng)建移除連接的鎖,并且通過條件去控制一個(gè)連接的生成消費(fèi)**/
        // public DruidAbstractDataSource(boolean lockFair){
        //        lock = new ReentrantLock(lockFair);
        //
        //        notEmpty = lock.newCondition();
        //        empty = lock.newCondition();
        //    }
        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

之后會(huì)更新一些JMX的監(jiān)控指標(biāo):

//一些jmx監(jiān)控指標(biāo)
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);

druid的監(jiān)控指標(biāo)都是通過jmx實(shí)現(xiàn)的。

解析連接串:

 if (this.jdbcUrl != null) {
                //解析連接串
                this.jdbcUrl = this.jdbcUrl.trim();
                initFromWrapDriverUrl();
            }

initFromWrapDriverUrl方法,除了從jdbc url中解析出連接和驅(qū)動(dòng)信息,后面還把filters的名字,解析成了對(duì)應(yīng)的filter類。

  private void initFromWrapDriverUrl() throws SQLException {
        if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
            return;
        }
        DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
        this.driverClass = config.getRawDriverClassName();
        LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'");
        this.jdbcUrl = config.getRawUrl();
        if (this.name == null) {
            this.name = config.getName();
        }
        for (Filter filter : config.getFilters()) {
            addFilter(filter);
        }
    }

之后在init方法里面,會(huì)進(jìn)行filters的初始化:

 //初始化filter 屬性
            for (Filter filter : filters) {
                filter.init(this);
            }

之后解析數(shù)據(jù)庫(kù)類型:

 if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
                this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
            }

注意枚舉值: com.alibaba.druid.DbType,這個(gè)里面包含了目前durid連接池支持的所有數(shù)據(jù)源 類型,另外,druid還額外提供了一些驅(qū)動(dòng)類,例如:

 elastic_search  (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver

clickhouse還提供了負(fù)載均衡的驅(qū)動(dòng)類:

com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver 。

在回到init方法,之后是一堆參數(shù)解析,不再說,跳過了。 之后是通過SPI加載自定義的filter:

  private void initFromSPIServiceLoader() {
        if (loadSpifilterSkip) {
            return;
        }
        if (autoFilters == null) {
            List<Filter> filters = new ArrayList<Filter>();
            ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
            for (Filter filter : autoFilterLoader) {
                AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
                if (autoLoad != null && autoLoad.value()) {
                    filters.add(filter);
                }
            }
            autoFilters = filters;
        }
        for (Filter filter : autoFilters) {
            if (LOG.isInfoEnabled()) {
                LOG.info("load filter from spi :" + filter.getClass().getName());
            }
            addFilter(filter);
        }
    }

注意自定義的filter,要使用com.alibaba.druid.filter.AutoLoad。

解析驅(qū)動(dòng):

  protected void resolveDriver() throws SQLException {
        if (this.driver == null) {
            if (this.driverClass == null || this.driverClass.isEmpty()) {
                this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
            }
            if (MockDriver.class.getName().equals(driverClass)) {
                driver = MockDriver.instance;
            } else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
                Properties info = new Properties();
                info.put("user", username);
                info.put("password", password);
                info.putAll(connectProperties);
                driver = new BalancedClickhouseDriver(jdbcUrl, info);
            } else {
                if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
                    throw new SQLException("url not set");
                }
                driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
            }
        } else {
            if (this.driverClass == null) {
                this.driverClass = driver.getClass().getName();
            }
        }
    }

其中durid自己的mock驅(qū)動(dòng)和clickhouse的負(fù)載均衡的驅(qū)動(dòng),特殊判斷了下,其他走的都是class forname.

之后是exception sorter和checker的一些東西,跟主線劇情關(guān)系不大,skip.

之后是一些初始化JdbcDataSourceStat,沒啥東西。

之后是核心:

  connections = new DruidConnectionHolder[maxActive];  //連接數(shù)組
            evictConnections = new DruidConnectionHolder[maxActive]; //銷毀的連接數(shù)組
            keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活躍可用的數(shù)組

dataSource的連接,都被包裝在類DruidConnectionHolder中,之后是一個(gè)同步去初始化連接還是異步去初始化的連接,總之,是去初始化 連接的過程:

if (createScheduler != null && asyncInit) {
                for (int i = 0; i < initialSize; ++i) {
                    submitCreateTask(true);
                }
            } else if (!asyncInit) {
                // init connections
                while (poolingCount < initialSize) {
                    try {
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount++] = holder;
                    } catch (SQLException ex) {
                        LOG.error("init datasource error, url: " + this.getUrl(), ex);
                        if (initExceptionThrow) {
                            connectError = ex;
                            break;
                        } else {
                            Thread.sleep(3000);
                        }
                    }
                }
                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            }

初始化的連接個(gè)數(shù)為連接串里面配置的initialSize.

核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection(),在這方法里面,會(huì)拿用戶名密碼,之后執(zhí)行真正的獲取connection:

 public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
        Connection conn;
        if (getProxyFilters().size() == 0) {
            conn = getDriver().connect(url, info);
        } else {
            conn = new FilterChainImpl(this).connection_connect(info);
        }
        createCountUpdater.incrementAndGet(this);
        return conn;
    }

注意,如果配置了filters,則所有操作,都會(huì)在操作前執(zhí)行filter處理鏈。

 public ConnectionProxy connection_connect(Properties info) throws SQLException {
        if (this.pos &lt; filterSize) {
            return nextFilter()
                    .connection_connect(this, info);
        }
        Driver driver = dataSource.getRawDriver();
        String url = dataSource.getRawJdbcUrl();
        Connection nativeConnection = driver.connect(url, info);
        if (nativeConnection == null) {
            return null;
        }
        return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
    }

再回到主流程init方法,connections數(shù)組初始化完成之后, 開啟額外線程:

     createAndLogThread();  //打印連接信息
            createAndStartCreatorThread(); //創(chuàng)建連接線程
            createAndStartDestroyThread(); //銷毀連接線程

先看注釋,具體里面的內(nèi)容后面單獨(dú)拉出來(lái)講。

之后:

 initedLatch.await(); //初始化 latch -1
            init = true;  //標(biāo)記已經(jīng)初始化完成
            initedTime = new Date(); //時(shí)間
            registerMbean(); //為datasource 注冊(cè)jmx監(jiān)控指標(biāo)

最后的最后,如果配置了keepAlive:

if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i &lt; minIdle; ++i) {
                        submitCreateTask(true);
                    }
                } else {
                    this.emptySignal();
                }
            }

這時(shí)候,會(huì)根據(jù)配置的活躍連接數(shù)minIdle,去給datasource的連接,做個(gè)保持活躍連接個(gè)數(shù),具體后面再說。

連接池使用的核心邏輯

首先,使用數(shù)組作為連接的容器,對(duì)于真實(shí)連接的加入和移除,使用lock就行同步,另外,在加入和移除連接時(shí)候,對(duì)比生產(chǎn)消費(fèi)模型,通過lock上的條件,來(lái)通知是否可以獲取或者加入連接。

 public DruidAbstractDataSource(boolean lockFair){
        lock = new ReentrantLock(lockFair);
        notEmpty = lock.newCondition();  //非空,有連接
        empty = lock.newCondition(); //空的
    }

另外,默認(rèn)的fairlock為false

  public DruidDataSource(){
        this(false);
    }
    public DruidDataSource(boolean fairLock){
        super(fairLock);
        configFromPropety(System.getProperties());
    }

創(chuàng)建連接

在線程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread中:

 if (emptyWait) {
                        // 必須存在線程等待,才創(chuàng)建連接
                        if (poolingCount >= notEmptyWaitThreadCount //
                                && (!(keepAlive && activeCount + poolingCount < minIdle))
                                && !isFailContinuous()
                        ) {
                            empty.await();
                        }
                        // 防止創(chuàng)建超過maxActive數(shù)量的連接
                        if (activeCount + poolingCount >= maxActive) {
                            empty.await();
                            continue;
                        }
                    }

必須存在線程等待獲取連接時(shí)候,才能創(chuàng)建連接,并且要保持總的連接數(shù),不能超過配置的最大連接。

創(chuàng)建完連接之后,執(zhí)行 notEmpty.signalAll();通知消費(fèi)者。

獲取連接

外層代碼:

 @Override
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }
    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();
        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

忽略掉filter chain,其實(shí)最后執(zhí)行的還是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect

方法內(nèi)部:

   poolableConnection = getConnectionInternal(maxWaitMillis);
  • 1 , 連接不足,需要直接去創(chuàng)建新的,跟我們初始化一樣

  • 2,從connections里面拿

 if (maxWait &gt; 0) {
                    holder = pollLast(nanos);
                } else {
                    holder = takeLast();
                }

其中,maxWait默認(rèn)為-1,配置在init里面:

 String property = properties.getProperty("druid.maxWait");
            if (property != null && property.length() > 0) {
                try {
                    int value = Integer.parseInt(property);
                    this.setMaxWait(value);
                } catch (NumberFormatException e) {
                    LOG.error("illegal property 'druid.maxWait'", e);
                }
            }

這個(gè)用于配置拿連接時(shí)候,是否在這個(gè)時(shí)間上進(jìn)行等待,默認(rèn)是否,即一直等到拿到連接為止。

直接看下阻塞拿的過程:

 DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
        try {
            //沒連接了
            while (poolingCount == 0) {
                //暗示下創(chuàng)建線程沒連接了
                emptySignal(); // send signal to CreateThread create connection
                if (failFast &amp;&amp; isFailContinuous()) {
                    throw new DataSourceNotAvailableException(createError);
                }
                notEmptyWaitThreadCount++;
                if (notEmptyWaitThreadCount &gt; notEmptyWaitThreadPeak) {
                    notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
                }
                try {
                    //傻等著創(chuàng)建或者回收,能給整出來(lái)點(diǎn)兒連接
                    notEmpty.await(); // signal by recycle or creator
                } finally {
                    notEmptyWaitThreadCount--;
                }
                notEmptyWaitCount++;
                if (!enable) {
                    connectErrorCountUpdater.incrementAndGet(this);
                    if (disableException != null) {
                        throw disableException;
                    }
                    throw new DataSourceDisableException();
                }
            }
        } catch (InterruptedException ie) {
            notEmpty.signal(); // propagate to non-interrupted thread
            notEmptySignalCount++;
            throw ie;
        }
        //拿數(shù)組的最后一個(gè)連接
        decrementPoolingCount();
        DruidConnectionHolder last = connections[poolingCount];
        connections[poolingCount] = null;
        return last;
    }

連接回收

 protected void createAndStartDestroyThread() {
        destroyTask = new DestroyTask();
	//自定義配置銷毀 ,適用于連接數(shù)非常多的 情況
        if (destroyScheduler != null) {
            long period = timeBetweenEvictionRunsMillis;
            if (period &lt;= 0) {
                period = 1000;
            }
            destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
                                                                          TimeUnit.MILLISECONDS);
            initedLatch.countDown();
            return;
        }
        String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
        //單線程銷毀 
        destroyConnectionThread = new DestroyConnectionThread(threadName);
        destroyConnectionThread.start();
    }

實(shí)際的銷毀:

 public class DestroyTask implements Runnable {
        public DestroyTask() {
        }
        @Override
        public void run() {
            shrink(true, keepAlive);
            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }
    }

最終 執(zhí)行的還是 shrink方法。

   public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }
        boolean needFill = false;
        int evictCount = 0;
        int keepAliveCount = 0;
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        fatalErrorCountLastShrink = fatalErrorCount;
        try {
            if (!inited) {
                return;
            }
            final int checkCount = poolingCount - minIdle; //需要檢測(cè)連接的數(shù)量
            final long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i < poolingCount; ++i) { //檢測(cè)目前connections數(shù)組中的連接
                DruidConnectionHolder connection = connections[i];
                if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis))  {
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                }
                if (checkTime) {
                    //是否設(shè)置了物理連接的超時(shí)時(shí)間phyTimoutMills。假如設(shè)置了該時(shí)間,
                    // 判斷連接時(shí)間存活時(shí)間是否已經(jīng)超過phyTimeoutMills,是則放入evictConnections中
                    if (phyTimeoutMillis > 0) {
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }
                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;//獲取連接空閑時(shí)間
                    //如果某條連接空閑時(shí)間小于minEvictableIdleTimeMillis,則不用繼續(xù)檢查剩下的連接了
                    if (idleMillis < minEvictableIdleTimeMillis
                            && idleMillis < keepAliveBetweenTimeMillis
                    ) {
                        break;
                    }
                    if (idleMillis >= minEvictableIdleTimeMillis) {
                        // check checkTime is silly code
                        //檢測(cè)檢查了幾個(gè)連接了
                        if (checkTime && i < checkCount) {
                            //超時(shí)了
                            evictConnections[evictCount++] = connection;
                            continue;
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            //超時(shí)了
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }
                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                        //配置了keepAlive,并且在存活時(shí)間內(nèi),放到keepAlive數(shù)組
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    //不需要檢查時(shí)間的,直接移除
                    if (i < checkCount) {
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }
            int removeCount = evictCount + keepAliveCount; //移除了幾個(gè)
            //由于使用connections連接時(shí)候,都是取后面的,后面 的是最新的連接,只考慮前面過期就行,所以只需要挪動(dòng)前面的連接
            if (removeCount > 0) {
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            }
            keepAliveCheckCount += keepAliveCount;
            if (keepAlive && poolingCount + activeCount < minIdle) {
                //不夠核心的活躍連接時(shí)候,需要去創(chuàng)建啦
                needFill = true;
            }
        } finally {
            lock.unlock();
        }
        if (evictCount > 0) {
            for (int i = 0; i < evictCount; ++i) {
                //銷毀連接
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            }
            Arrays.fill(evictConnections, null);
        }
        if (keepAliveCount > 0) {
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();
                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }
                boolean discard = !validate; //沒通過validate
                if (validate) {
                    //通過keep alive檢查,更新時(shí)間
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    //這里還會(huì)嘗試放回connections數(shù)組
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) {
                        //沒放入,標(biāo)記要丟棄了
                        discard = true;
                    }
                }
                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        // skip
                    }
                    lock.lock();
                    try {
                        discardCount++;
                        if (activeCount + poolingCount <= minIdle) {
                            //發(fā)信號(hào)讓創(chuàng)建線程去創(chuàng)建
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }
        if (needFill) {
            //又要去創(chuàng)建了
            lock.lock();
            try {
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        } else if (onFatalError || fatalErrorIncrement > 0) {
            lock.lock();
            try {
                emptySignal();
            } finally {
                lock.unlock();
            }
        }
    }

工具數(shù)組evictConnections,keepAliveConnections 用完即被置空,老工具人了。

一波操作下來(lái),完成了對(duì)connections數(shù)組的大清洗。

以上就是關(guān)于“Druid核心源碼分析DruidDataSource”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI