您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么使用sharding-jdbc讀寫分離”,在日常操作中,相信很多人在怎么使用sharding-jdbc讀寫分離問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么使用sharding-jdbc讀寫分離”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
核心概念
主庫:添加、更新以及刪除數(shù)據(jù)操作
從庫:查詢數(shù)據(jù)操作所使用的數(shù)據(jù)庫,可支持多從庫
一主多從讀寫分離,多主多從需用使用sharding
源碼分析
1.啟動入口:
public class JavaConfigurationExample { // private static ShardingType shardingType = ShardingType.SHARDING_DATABASES; // private static ShardingType shardingType = ShardingType.SHARDING_TABLES; // private static ShardingType shardingType = ShardingType.SHARDING_DATABASES_AND_TABLES; private static ShardingType shardingType = ShardingType.MASTER_SLAVE; // private static ShardingType shardingType = ShardingType.SHARDING_MASTER_SLAVE; // private static ShardingType shardingType = ShardingType.SHARDING_VERTICAL; public static void main(final String[] args) throws SQLException { DataSource dataSource = DataSourceFactory.newInstance(shardingType); CommonService commonService = getCommonService(dataSource); commonService.initEnvironment(); commonService.processSuccess(); commonService.cleanEnvironment(); } private static CommonService getCommonService(final DataSource dataSource) { return new CommonServiceImpl(new OrderRepositoryImpl(dataSource), new OrderItemRepositoryImpl(dataSource)); } }
2.以sharding-jdbc為例,配置主從讀寫分離代碼如下:
@Override public DataSource getDataSource() throws SQLException { //主從配置 MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(/*主從命名*/"demo_ds_master_slave", /*主庫*/"demo_ds_master", /*從庫*/Arrays.asList("demo_ds_slave_0", "demo_ds_slave_1")); //打印sql Properties props = new Properties(); props.put("sql.show", true); //創(chuàng)建MasterSlaveDataSource數(shù)據(jù)源 return MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig, props); } private Map<String, DataSource> createDataSourceMap() { Map<String, DataSource> result = new HashMap<>(); //主庫 result.put("demo_ds_master", DataSourceUtil.createDataSource("demo_ds_master")); //兩從庫 result.put("demo_ds_slave_0", DataSourceUtil.createDataSource("demo_ds_slave_0")); result.put("demo_ds_slave_1", DataSourceUtil.createDataSource("demo_ds_slave_1")); return result; }
創(chuàng)建sharding主從數(shù)據(jù)源MasterSlaveDataSource
public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException { super(dataSourceMap); //緩存mysql元數(shù)據(jù) cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //主從規(guī)則配置 this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig); //主從sql解析 parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType()); shardingProperties = new ShardingProperties(null == props ? new Properties() : props); }
3.執(zhí)行insert插入方法
@Override public Long insert(final Order order) throws SQLException { String sql = "INSERT INTO t_order (user_id, status) VALUES (?, ?)"; //獲取MasterSlaveDataSource數(shù)據(jù)源連接,同時創(chuàng)建MasterSlavePreparedStatement //這里有兩個Statement分別含義 //1.MasterSlaveStatement:執(zhí)行sql時候才路由 //2.MasterSlavePreparedStatement:創(chuàng)建Statement時就路由 //Statement.RETURN_GENERATED_KEYS 自動生成主鍵并返回生成的主鍵 try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setInt(1, order.getUserId()); preparedStatement.setString(2, order.getStatus()); //MasterSlavePreparedStatement執(zhí)行sql preparedStatement.executeUpdate(); try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) { if (resultSet.next()) { order.setOrderId(resultSet.getLong(1)); } } } return order.getOrderId(); }
獲取數(shù)據(jù)庫連接MasterSlaveConnection->AbstractConnectionAdapter#getConnection
/** * Get database connection. * * @param dataSourceName data source name * @return database connection * @throws SQLException SQL exception */ //MEMORY_STRICTLY:Proxy會保持一個數(shù)據(jù)庫中所有被路由到的表的連接,這種方式的好處是利用流式ResultSet來節(jié)省內(nèi)存 // //CONNECTION_STRICTLY:代理在取出ResultSet中的所有數(shù)據(jù)后會釋放連接,同時,內(nèi)存的消耗將會增加 // public final Connection getConnection(final String dataSourceName) throws SQLException { return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0); } /** * Get database connections. * * @param connectionMode connection mode * @param dataSourceName data source name * @param connectionSize size of connection list to be get * @return database connections * @throws SQLException SQL exception */ public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { //獲取數(shù)據(jù)源 DataSource dataSource = getDataSourceMap().get(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName); Collection<Connection> connections; //并發(fā)從cache中獲取連接 synchronized (cachedConnections) { connections = cachedConnections.get(dataSourceName); } List<Connection> result; //如果cache中連接數(shù)大于指定連接數(shù)時,返回指定連接數(shù)量 if (connections.size() >= connectionSize) { result = new ArrayList<>(connections).subList(0, connectionSize); } else if (!connections.isEmpty()) { result = new ArrayList<>(connectionSize); result.addAll(connections); //創(chuàng)建缺少的指定連接數(shù) List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size()); result.addAll(newConnections); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, newConnections); } } else { result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize)); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, result); } } return result; } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException { //為1時不存在并發(fā)獲取連接情況,直接返回單個連接 if (1 == connectionSize) { return Collections.singletonList(createConnection(dataSourceName, dataSource)); } //TODO 不處理并發(fā) if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) { return createConnections(dataSourceName, dataSource, connectionSize); } //并發(fā) synchronized (dataSource) { return createConnections(dataSourceName, dataSource, connectionSize); } } private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException { List<Connection> result = new ArrayList<>(connectionSize); for (int i = 0; i < connectionSize; i++) { try { result.add(createConnection(dataSourceName, dataSource)); } catch (final SQLException ex) { for (Connection each : result) { each.close(); } throw new SQLException(String.format("Could't get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex); } } return result; } private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException { //判斷是否是sharding事物 Connection result = isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection(); replayMethodsInvocation(result); return result; }
預(yù)準(zhǔn)備路由并緩存Statement
public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException { this.connection = connection; //創(chuàng)建router對象 masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); //緩存路由后的Statement,useCache緩存解析后的sql Statement for (String each : masterSlaveRouter.route(sql, true)) { //獲取數(shù)據(jù)庫連接 PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys); routedStatements.add(preparedStatement); } }
執(zhí)行MasterSlaveRouter#route方法獲取路由庫
public Collection<String> route(final String sql, final boolean useCache) { //解析sql,這里不分析sql如何使用antlr4解析 Collection<String> result = route(parseEngine.parse(sql, useCache)); //是否打印sql if (showSQL) { SQLLogger.logSQL(sql, result); } return result; } private Collection<String> route(final SQLStatement sqlStatement) { //判斷是否master if (isMasterRoute(sqlStatement)) { //設(shè)置當(dāng)前線程是否允許訪問主庫 MasterVisitedManager.setMasterVisited(); //返回主庫 return Collections.singletonList(masterSlaveRule.getMasterDataSourceName()); } //根據(jù)配置的算法獲取從庫,兩種算法: //1、隨機 //2、輪詢 return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()))); }
執(zhí)行MasterSlavePreparedStatement#executeUpdate
@Override public int executeUpdate() throws SQLException { int result = 0; //從本地緩存遍歷執(zhí)行 for (PreparedStatement each : routedStatements) { result += each.executeUpdate(); } return result; }
4.獲取從庫算法策略
隨機算法
@Getter @Setter public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm { private Properties properties = new Properties(); @Override public String getType() { return "RANDOM"; } @Override public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) { //從slave.size()中獲取一個隨機數(shù) return slaveDataSourceNames.get(new Random().nextInt(slaveDataSourceNames.size())); } }
輪詢算法
@Getter @Setter public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm { //并發(fā)map private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new ConcurrentHashMap<>(); private Properties properties = new Properties(); @Override public String getType() { return "ROUND_ROBIN"; } @Override public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) { //查看對應(yīng)名稱的計數(shù)器,沒有則初始化一個 AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0); COUNTS.putIfAbsent(name, count); // 采用cas輪詢,如果計數(shù)器長到slave.size(),那么歸零(防止計數(shù)器不斷增長下去) count.compareAndSet(slaveDataSourceNames.size(), 0); //絕對值,計數(shù)器%slave.size()取模 return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size()); } }
默認(rèn)算法
SPI擴展機制,load加載第一個算法作為默認(rèn)算法;ss默認(rèn)是隨機
到此,關(guān)于“怎么使用sharding-jdbc讀寫分離”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。