您好,登錄后才能下訂單哦!
一般我們常見的RPC框架都包含如下三個部分:
注冊中心,用于服務(wù)端注冊遠程服務(wù)以及客戶端發(fā)現(xiàn)服務(wù)
服務(wù)端,對外提供后臺服務(wù),將自己的服務(wù)信息注冊到注冊中心
客戶端,從注冊中心獲取遠程服務(wù)的注冊信息,然后進行遠程過程調(diào)用
上面提到的注冊中心其實屬于服務(wù)治理,即使沒有注冊中心,RPC的功能也是完整的。之前我大多接觸的是基于zookeeper的注冊中心,這里基于consul來實現(xiàn)注冊中心的基本功能。
Raft相比Paxos直接
此外不多描述,還沒研究raft
支持數(shù)據(jù)中心,可以用來解決單點故障之類的問題
集成相比zookeeper更加簡單(代碼量少,邏輯清晰簡單)
支持健康檢查,支持http以及tcp
自帶UI管理功能,不需要額外第三方支持。(zookeeper需要單獨部署zkui之類的第三方工具)
支持key/value存儲
啟動consul之后訪問管理頁面
提取出服務(wù)注冊與服務(wù)發(fā)現(xiàn)兩個接口,然后使用Consul實現(xiàn),這里主要通過consul-client來實現(xiàn)(也可以是consul-api),需要在pom中引入:
<dependency> <groupId>com.orbitz.consul</groupId> <artifactId>consul-client</artifactId> <version>0.14.1</version> </dependency>
RegistryService
提供服務(wù)的注冊與刪除功能
public interface RegistryService { void register(RpcURL url); void unregister(RpcURL url); }
AbstractConsulService
consul的基類,用于構(gòu)建Consl對象,服務(wù)于服務(wù)端以及客戶端。
public class AbstractConsulService { private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class); protected final static String CONSUL_NAME="consul_node_jim"; protected final static String CONSUL_ID="consul_node_id"; protected final static String CONSUL_TAGS="v3"; protected final static String CONSUL_HEALTH_INTERVAL="1s"; protected Consul buildConsul(String registryHost, int registryPort){ return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build(); } }
ConsulRegistryService
服務(wù)注冊實現(xiàn)類,在注冊服務(wù)的同時,指定了健康檢查。
服務(wù)的刪除暫時未實現(xiàn)
public class ConsulRegistryService extends AbstractConsulService implements RegistryService { private final static int CONSUL_CONNECT_PERIOD=1*1000; @Override public void register(RpcURL url) { Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort()); AgentClient agent = consul.agentClient(); ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build(); ImmutableRegistration.Builder builder = ImmutableRegistration.builder(); builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check); agent.register(builder.build()); } @Override public void unregister(RpcURL url) { } }
由于我實現(xiàn)的RPC是基于TCP的,所以服務(wù)注冊的健康檢查也指定為TCP,consul會按指定的IP以及端口建立連接以此判斷服務(wù)的健康狀態(tài)。如果是http,則需要調(diào)用http方法,同時指定健康檢查地址。
ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();
后臺的監(jiān)控信息如下:
雖然只是指定了TCP,可能出于某種機制后臺依然會發(fā)起HTTP的健康檢查請求,上圖第一條請求日志。
DiscoveryService
獲取所有注冊的有效的服務(wù)信息。
public interface DiscoveryService { List<RpcURL> getUrls(String registryHost, int registryPort); }
ConsulDiscoveryService
首先是獲取有效的服務(wù)列表:
List<RpcURL> urls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){ RpcURL url=new RpcURL(); url.setHost(serviceHealth.getService().getAddress()); url.setPort(serviceHealth.getService().getPort()); urls.add(url); }
服務(wù)更新監(jiān)聽,當(dāng)可用服務(wù)列表發(fā)現(xiàn)變化時需要通知調(diào)用端。
try { ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name); serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() { @Override public void notify(Map<ServiceHealthKey, ServiceHealth> map) { logger.info("serviceHealthCache.addListener notify"); RpcClientInvokerCache.clear(); } }); serviceHealthCache.start(); } catch (Exception e) { logger.info("serviceHealthCache.start error:",e); }
由于之前對客戶端的Invoker有緩存,所以當(dāng)服務(wù)列表有變化時需要對緩存信息進行更新。
這里簡單的直接對緩存做清除處理,其實好一點的方法應(yīng)該只對有變化的做處理。
RpcClientInvokerCache
對客戶端實例化后的Invoker的緩存類
public class RpcClientInvokerCache { private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>(); public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){ return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone(); } public static void addHandler(RpcClientInvoker handler) { CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone(); newHandlers.add(handler); connectedHandlers=newHandlers; } public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){ return connectedHandlers; } public static RpcClientInvoker get(int i){ return connectedHandlers.get(i); } public static int size(){ return connectedHandlers.size(); } public static void clear(){ CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone(); newHandlers.clear(); connectedHandlers=newHandlers; } }
負載均衡
當(dāng)同一個接口有多個服務(wù)同時提供服務(wù)時,客戶端需要有一定的負載均衡機制去決策將客戶端的請求分配給哪一臺服務(wù)器,這里實現(xiàn)一個簡易的輪詢實現(xiàn)方式。請求次數(shù)累加,累加的值與服務(wù)列表的大小做取模操作。
代碼中取服務(wù)列表的方法有小問題,未按接口信息取,后續(xù)再完成
public class RoundRobinLoadbalanceService implements LoadbalanceService { private AtomicInteger roundRobin = new AtomicInteger(0); private static final int MAX_VALUE=1000; private static final int MIN_VALUE=1; private AtomicInteger getRoundRobinValue(){ if(this.roundRobin.getAndAdd(1)>MAX_VALUE){ this.roundRobin.set(MIN_VALUE); } return this.roundRobin; } @Override public int index(int size) { return (this.getRoundRobinValue().get() + size) % size; } }
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。