您好,登錄后才能下訂單哦!
sharding中怎么執(zhí)行jdbc,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
內(nèi)存限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費的數(shù)據(jù)庫連接數(shù)量不做限制。如果實際執(zhí)行的SQL需要對某數(shù)據(jù)庫實例中的200張表做操作,則對每張表創(chuàng)建一個新的數(shù)據(jù)庫連接,并通過多線程的方式并發(fā)處理,以達成執(zhí)行效率最大化。并且在SQL滿足條件情況下,優(yōu)先選擇流式歸并,以防止出現(xiàn)內(nèi)存溢出或避免頻繁垃圾回收情況
連接限制模式:使用此模式的前提是,ShardingSphere嚴格控制對一次操作所耗費的數(shù)據(jù)庫連接數(shù)量。如果實際執(zhí)行的SQL需要對某數(shù)據(jù)庫實例中的200張表做操作,那么只會創(chuàng)建唯一的數(shù)據(jù)庫連接,并對其200張表串行處理。如果一次操作中的分片散落在不同的數(shù)據(jù)庫,仍然采用多線程處理對不同庫的操作,但每個庫的每次操作仍然只創(chuàng)建一個唯一的數(shù)據(jù)庫連接。這樣即可以防止對一次請求對數(shù)據(jù)庫連接占用過多所帶來的問題。該模式始終選擇內(nèi)存歸并
case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個簡單查詢語句,來分析ss大致如何來執(zhí)行sql,根據(jù)分片改寫后的sql,應(yīng)該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執(zhí)行
1.初始化PreparedStatementExecutor#init,封裝Statement執(zhí)行單元
public final class PreparedStatementExecutor extends AbstractStatementExecutor { @Getter private final boolean returnGeneratedKeys; public PreparedStatementExecutor( final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) { super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection); this.returnGeneratedKeys = returnGeneratedKeys; } /** * Initialize executor. * * @param routeResult route result * @throws SQLException SQL exception */ public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement()); //添加路由單元,即數(shù)據(jù)源對應(yīng)的sql單元 getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); //緩存statement、參數(shù) cacheStatements(); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException { //執(zhí)行封裝Statement執(zhí)行單元 return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); } @SuppressWarnings("MagicConstant") private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException { return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()); } ... ... }
2.執(zhí)行封裝Statement執(zhí)行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups
@RequiredArgsConstructor public final class SQLExecutePrepareTemplate { private final int maxConnectionsSizePerQuery; /** * Get execute unit groups. * * @param routeUnits route units * @param callback SQL execute prepare callback * @return statement execute unit groups * @throws SQLException SQL exception */ public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { return getSynchronizedExecuteUnitGroups(routeUnits, callback); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups( final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { //數(shù)據(jù)源對應(yīng)sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?] Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits); Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>(); for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) { //添加分片執(zhí)行組 result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback)); } return result; } private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) { Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1); for (RouteUnit each : routeUnits) { if (!result.containsKey(each.getDataSourceName())) { result.put(each.getDataSourceName(), new LinkedList<SQLUnit>()); } result.get(each.getDataSourceName()).add(each.getSqlUnit()); } return result; } private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups( final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>(); //在maxConnectionSizePerQuery允許的范圍內(nèi),當(dāng)一個連接需要執(zhí)行的請求數(shù)量大于1時,意味著當(dāng)前的數(shù)據(jù)庫連接無法持有相應(yīng)的數(shù)據(jù)結(jié)果集,則必須采用內(nèi)存歸并; //反之,當(dāng)一個連接需要執(zhí)行的請求數(shù)量等于1時,意味著當(dāng)前的數(shù)據(jù)庫連接可以持有相應(yīng)的數(shù)據(jù)結(jié)果集,則可以采用流式歸并 //TODO 場景:在不分庫只分表的情況下,會存在一個數(shù)據(jù)源對應(yīng)多個sql單元的情況 //計算所需要的分區(qū)大小 int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1); //按照分區(qū)大小進行分區(qū) //事例: //sqlUnits = [1, 2, 3, 4, 5] //desiredPartitionSize = 2 //則結(jié)果為:[[1, 2], [3,4], [5]] List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize); //maxConnectionsSizePerQuery該參數(shù)表示一次查詢時每個數(shù)據(jù)庫所允許使用的最大連接數(shù) //根據(jù)maxConnectionsSizePerQuery來判斷使用連接模式 //CONNECTION_STRICTLY 連接限制模式 //MEMORY_STRICTLY 內(nèi)存限制模式 ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; //獲取分區(qū)大小的連接 List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); int count = 0; //遍歷分區(qū),將分區(qū)好的sql單元放到指定連接執(zhí)行 for (List<SQLUnit> each : sqlUnitPartitions) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; } private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException { List<StatementExecuteUnit> result = new LinkedList<>(); //遍歷sql單元 for (SQLUnit each : sqlUnitGroup) { //回調(diào),創(chuàng)建statement執(zhí)行單元 result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode)); } //封裝成分片執(zhí)行組 return new ShardingExecuteGroup<>(result); } }
1.執(zhí)行查詢sql
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... /** * Execute query. * * @return result set list * @throws SQLException SQL exception */ public List<QueryResult> executeQuery() throws SQLException { //獲取當(dāng)前是否異常值 final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); //創(chuàng)建回調(diào)實例 //執(zhí)行SQLExecuteCallback的execute方法 SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) { @Override protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(statement, connectionMode); } }; return executeCallback(executeCallback); } ... ... protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException { List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); //執(zhí)行完后刷新分片元數(shù)據(jù),比如創(chuàng)建表、修改表etc. refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement); return result; } ... ... }
2.通過線程池分組執(zhí)行,并回調(diào)callback
@RequiredArgsConstructor public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> { //數(shù)據(jù)庫類型 private final DatabaseType databaseType; //是否異常 private final boolean isExceptionThrown; @Override public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException { Collection<T> result = new LinkedList<>(); //遍歷statement執(zhí)行單元 for (StatementExecuteUnit each : statementExecuteUnits) { //執(zhí)行添加返回結(jié)果T result.add(execute0(each, isTrunkThread, shardingExecuteDataMap)); } return result; } private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException { //設(shè)置當(dāng)前線程是否異常 ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); //根據(jù)url獲取數(shù)據(jù)源元數(shù)據(jù) DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL()); //sql執(zhí)行鉤子 SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook(); try { sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap); //執(zhí)行sql T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode()); sqlExecutionHook.finishSuccess(); return result; } catch (final SQLException ex) { sqlExecutionHook.finishFailure(ex); ExecutorExceptionHandler.handleException(ex); return null; } } protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException; }
3.執(zhí)行executeSQL,調(diào)用第三步的callback中的executeSQL,封裝ResultSet
public final class PreparedStatementExecutor extends AbstractStatementExecutor { ... ... private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException { PreparedStatement preparedStatement = (PreparedStatement) statement; ResultSet resultSet = preparedStatement.executeQuery(); ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule(); //緩存resultSet getResultSets().add(resultSet); //判斷ConnectionMode //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用內(nèi)存MemoryQueryResult return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) : new MemoryQueryResult(resultSet, shardingRule); } ... ... }
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。