溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

sharding中怎么執(zhí)行jdbc

發(fā)布時間:2021-07-19 15:04:25 來源:億速云 閱讀:126 作者:Leah 欄目:大數(shù)據(jù)

sharding中怎么執(zhí)行jdbc,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

  • 內(nèi)存限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費的數(shù)據(jù)庫連接數(shù)量不做限制。如果實際執(zhí)行的SQL需要對某數(shù)據(jù)庫實例中的200張表做操作,則對每張表創(chuàng)建一個新的數(shù)據(jù)庫連接,并通過多線程的方式并發(fā)處理,以達成執(zhí)行效率最大化。并且在SQL滿足條件情況下,優(yōu)先選擇流式歸并,以防止出現(xiàn)內(nèi)存溢出或避免頻繁垃圾回收情況

  • 連接限制模式:使用此模式的前提是,ShardingSphere嚴格控制對一次操作所耗費的數(shù)據(jù)庫連接數(shù)量。如果實際執(zhí)行的SQL需要對某數(shù)據(jù)庫實例中的200張表做操作,那么只會創(chuàng)建唯一的數(shù)據(jù)庫連接,并對其200張表串行處理。如果一次操作中的分片散落在不同的數(shù)據(jù)庫,仍然采用多線程處理對不同庫的操作,但每個庫的每次操作仍然只創(chuàng)建一個唯一的數(shù)據(jù)庫連接。這樣即可以防止對一次請求對數(shù)據(jù)庫連接占用過多所帶來的問題。該模式始終選擇內(nèi)存歸并

case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個簡單查詢語句,來分析ss大致如何來執(zhí)行sql,根據(jù)分片改寫后的sql,應(yīng)該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執(zhí)行

準備階段

1.初始化PreparedStatementExecutor#init,封裝Statement執(zhí)行單元

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    @Getter
    private final boolean returnGeneratedKeys;

    public PreparedStatementExecutor(
            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
    }

    /**
     * Initialize executor.
     *
     * @param routeResult route result
     * @throws SQLException SQL exception
     */
    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement());
        //添加路由單元,即數(shù)據(jù)源對應(yīng)的sql單元
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        //緩存statement、參數(shù)
        cacheStatements();
    }

    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
        //執(zhí)行封裝Statement執(zhí)行單元
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {

            @Override
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }

   ... ...   
}

2.執(zhí)行封裝Statement執(zhí)行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups

@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {

    private final int maxConnectionsSizePerQuery;

    /**
     * Get execute unit groups.
     *
     * @param routeUnits route units
     * @param callback SQL execute prepare callback
     * @return statement execute unit groups
     * @throws SQLException SQL exception
     */
    public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        return getSynchronizedExecuteUnitGroups(routeUnits, callback);
    }

    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
            final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        //數(shù)據(jù)源對應(yīng)sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?]
        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits);
        Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();

        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            //添加分片執(zhí)行組
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
        }
        return result;
    }

    private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) {
        Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
        for (RouteUnit each : routeUnits) {
            if (!result.containsKey(each.getDataSourceName())) {
                result.put(each.getDataSourceName(), new LinkedList<SQLUnit>());
            }
            result.get(each.getDataSourceName()).add(each.getSqlUnit());
        }
        return result;
    }

    private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(
            final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //在maxConnectionSizePerQuery允許的范圍內(nèi),當(dāng)一個連接需要執(zhí)行的請求數(shù)量大于1時,意味著當(dāng)前的數(shù)據(jù)庫連接無法持有相應(yīng)的數(shù)據(jù)結(jié)果集,則必須采用內(nèi)存歸并;
        //反之,當(dāng)一個連接需要執(zhí)行的請求數(shù)量等于1時,意味著當(dāng)前的數(shù)據(jù)庫連接可以持有相應(yīng)的數(shù)據(jù)結(jié)果集,則可以采用流式歸并

        //TODO 場景:在不分庫只分表的情況下,會存在一個數(shù)據(jù)源對應(yīng)多個sql單元的情況

        //計算所需要的分區(qū)大小
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        //按照分區(qū)大小進行分區(qū)
        //事例:
        //sqlUnits = [1, 2, 3, 4, 5]
        //desiredPartitionSize = 2
        //則結(jié)果為:[[1, 2], [3,4], [5]]
        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        //maxConnectionsSizePerQuery該參數(shù)表示一次查詢時每個數(shù)據(jù)庫所允許使用的最大連接數(shù)
        //根據(jù)maxConnectionsSizePerQuery來判斷使用連接模式
        //CONNECTION_STRICTLY 連接限制模式
        //MEMORY_STRICTLY 內(nèi)存限制模式
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        //獲取分區(qū)大小的連接
        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
        int count = 0;
        //遍歷分區(qū),將分區(qū)好的sql單元放到指定連接執(zhí)行
        for (List<SQLUnit> each : sqlUnitPartitions) {
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
        }
        return result;
    }

    private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, 
                                                                          final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
        List<StatementExecuteUnit> result = new LinkedList<>();
        //遍歷sql單元
        for (SQLUnit each : sqlUnitGroup) {
            //回調(diào),創(chuàng)建statement執(zhí)行單元
            result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
        }
        //封裝成分片執(zhí)行組
        return new ShardingExecuteGroup<>(result);
    }
}
執(zhí)行階段

1.執(zhí)行查詢sql

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List<QueryResult> executeQuery() throws SQLException {
        //獲取當(dāng)前是否異常值
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        //創(chuàng)建回調(diào)實例
        //執(zhí)行SQLExecuteCallback的execute方法
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {

            @Override
            protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);
    }

    ... ...

   protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
        //執(zhí)行完后刷新分片元數(shù)據(jù),比如創(chuàng)建表、修改表etc.
        refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement);
        return result;
    }

    ... ...
}

2.通過線程池分組執(zhí)行,并回調(diào)callback

@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
    //數(shù)據(jù)庫類型
    private final DatabaseType databaseType;
    //是否異常
    private final boolean isExceptionThrown;

    @Override
    public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread,
                                       final Map<String, Object> shardingExecuteDataMap) throws SQLException {
        Collection<T> result = new LinkedList<>();
        //遍歷statement執(zhí)行單元
        for (StatementExecuteUnit each : statementExecuteUnits) {
            //執(zhí)行添加返回結(jié)果T
            result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
        }
        return result;
    }

    private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
        //設(shè)置當(dāng)前線程是否異常
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        //根據(jù)url獲取數(shù)據(jù)源元數(shù)據(jù)
        DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
        //sql執(zhí)行鉤子
        SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
        try {
            sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
            //執(zhí)行sql
            T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
            sqlExecutionHook.finishSuccess();
            return result;
        } catch (final SQLException ex) {
            sqlExecutionHook.finishFailure(ex);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
    }

    protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}

3.執(zhí)行executeSQL,調(diào)用第三步的callback中的executeSQL,封裝ResultSet

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule();
        //緩存resultSet
        getResultSets().add(resultSet);
        //判斷ConnectionMode
        //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用內(nèi)存MemoryQueryResult
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) 
                : new MemoryQueryResult(resultSet, shardingRule);
    }

    ... ...
}

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI