溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Ribbon中怎么使用 LoadBalancer 實現負載均衡

發(fā)布時間:2021-06-18 15:18:25 來源:億速云 閱讀:343 作者:Leah 欄目:大數據

這篇文章給大家介紹Ribbon中怎么使用 LoadBalancer 實現負載均衡,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。


Ribbon 負載均衡器

Ribbon 的負載均衡器是通過 LoadBalancerClient 來實現的,在應用啟動的時候,LoadBalancerClient 默認會從 EurekaClient 獲取服務列表,并將服務注冊列表緩存在本地,當調用 LoadBalancerClient 的 choose() 方法的時候, 根據負載均衡策略 IRule 來選擇一個可用的服務,從而實現負載均衡。

當然,LoadBalancerClient 也可以不從 EurekaClient 中獲取服務列表,這是需要自己維護一個服務注冊列表信息,具體操作如下:

ribbon:
  eureka:
    enabled: false
  
stores:
  ribbon:
    listOfServers: baidu.com, google.com

Ribbon 負載均衡器流程圖

Ribbon中怎么使用 LoadBalancer 實現負載均衡

主要流程:

    1. 當應用啟動的時候,ILoadBalancer 從 EurekaClient 獲取服務列表
    2. 然后每 10 秒 向 EurekaClient 發(fā)送一次心跳檢測,如果注冊列表發(fā)生了變化,則更新獲取重新獲取
    3. LoadBalancerClient 調用 choose() 方法來選擇服務的時候,會調用 ILoadBalancer 的  chooseServer() 來獲取一個可以的服務,
    4. 在 ILoadBalancer 進行獲取服務的時候,會根據負載均衡策略 IRule 來進行選擇
    5. 返回可用的服務

Ribbon 負載均衡器實現原理

下面就來看看每個類的實現原理

RibbonLoadBalancerClient

RibbonLoadBalancerClient 它是 Ribbon 負載均衡實現的一個重要的類,最終的負載均衡的請求處理都由它來執(zhí)行,先來看下它的類圖:

Ribbon中怎么使用 LoadBalancer 實現負載均衡

它實現了 LoadBalancerClient 接口,而 LoadBalancerClient 接口實現了 ServiceInstanceChooser 接口:

ServiceInstanceChooser

該接口用來從負載均衡器中獲取一個可用的服務,只有一個方法:

public interface ServiceInstanceChooser {
	/**
	 * @param serviceId:服務ID
	 * @return 可用的服務實例
	 */
	ServiceInstance choose(String serviceId);
}

LoadBalancerClient

表示負載均衡的客戶端,是一個接口,繼承了 ServiceInstanceChooser 接口 ,共有三個方法:

public interface LoadBalancerClient extends ServiceInstanceChooser {
	/**
	 * 執(zhí)行請求
	 * @param serviceId :用于查找 LoadBalancer的服務ID
	 * @param request:允許實現執(zhí)行前后操作
	 */
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * 執(zhí)行請求
	 * @param serviceId :用于查找 LoadBalancer的服務ID
	 * @param serviceInstance :執(zhí)行請求的服務
	 * @param request:允許實現執(zhí)行前后操作
	 */
	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * 創(chuàng)建具有真實主機和端口的正確URI,有些系統使用帶有邏輯服務名的URL作為主機,調用該方法將會使用 host:port 來替換邏輯服務名
	 * @param instance :用于重建URI的服務實例
	 * @param original :具有邏輯服務名的URL
	 * @return A reconstructed URI.
	 */
	URI reconstructURI(ServiceInstance instance, URI original);
}

RibbonLoadBalancerClient 實現如下:

主要看下從 ServiceInstanceChooser,LoadBalancerClient 中實現的接口方法

public class RibbonLoadBalancerClient implements LoadBalancerClient {

    // 工廠:主要用來創(chuàng)建客戶端,創(chuàng)建負載均衡器,進行客戶端配置等
    // 對于每一個客戶端名稱都會創(chuàng)建一個Spring ApplicationContext,可以從中獲取需要的bean
	private SpringClientFactory clientFactory;

	protected ILoadBalancer getLoadBalancer(String serviceId) {
		return this.clientFactory.getLoadBalancer(serviceId);
	}
    ..................
}


public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {

    // 獲取客戶端
	public <C extends IClient<?, ?>> C getClient(String name, Class<C> clientClass) {
		return getInstance(name, clientClass);
	}

	// 獲取負載均衡器
	public ILoadBalancer getLoadBalancer(String name) {
		return getInstance(name, ILoadBalancer.class);
	}

	//獲取客戶端配置
	public IClientConfig getClientConfig(String name) {
		return getInstance(name, IClientConfig.class);
	}

	// 獲取 RibbonLoadBalancerContext
	public RibbonLoadBalancerContext getLoadBalancerContext(String serviceId) {
		return getInstance(serviceId, RibbonLoadBalancerContext.class);
	}
    
    // 獲取對應的bean
	public <T> T getInstance(String name, Class<T> type) {
		AnnotationConfigApplicationContext context = getContext(name);
		.....
		return context.getBean(type);
	    .....
	}
}

choose() 方法

該方法主要用來獲取一個可用的服務實例

	public ServiceInstance choose(String serviceId, Object hint) {
		Server server = getServer(getLoadBalancer(serviceId), hint);
		if (server == null) {
			return null;
		}
        // RibbonServer 實現了 ServiceInstance
		return new RibbonServer(serviceId, server, isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
	}
   
    // 根據服務ID獲取負載均衡器,會調用 SpringClientFactory 的方法進行獲取
	protected ILoadBalancer getLoadBalancer(String serviceId) {
		return this.clientFactory.getLoadBalancer(serviceId);
	}
   
    // 根據負載均衡器來獲取可用的服務
	protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
		if (loadBalancer == null) {
			return null;
		}
		return loadBalancer.chooseServer(hint != null ? hint : "default");
	}

最后會調用 ILoadBalancer.chooseServer 來獲取可用服務,后面再來說 ILoadBalancer 。

execute() 方法

該方法執(zhí)行請求

	public <T> T execute(String serviceId, ServiceInstance serviceInstance,
			LoadBalancerRequest<T> request) throws IOException {
		Server server = null;
		if (serviceInstance instanceof RibbonServer) {
			server = ((RibbonServer) serviceInstance).getServer();
		}

		RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);

        // 狀態(tài)記錄器,記錄著服務的狀態(tài)
		RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

        ...........
	    T returnVal = request.apply(serviceInstance);
		statsRecorder.recordStats(returnVal);
		return returnVal;
        ...........
	}

    // apply 方法調用如下,最終返回 ClientHttpResponse
	public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request,
			final byte[] body, final AsyncClientHttpRequestExecution execution)
			throws IOException {
		final URI originalUri = request.getURI();
		String serviceName = originalUri.getHost();
		return this.loadBalancer.execute(serviceName,
				new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() {
					@Override
					public ListenableFuture<ClientHttpResponse> apply(
							final ServiceInstance instance) throws Exception {
						HttpRequest serviceRequest = new ServiceRequestWrapper(request,
								instance, AsyncLoadBalancerInterceptor.this.loadBalancer);
						return execution.executeAsync(serviceRequest, body);
					}

				});
	}

以上就是負載均衡器流程圖左邊部分的原理,接下來看下右邊的部分

ILoadBalancer

通過上面的分析,負載均衡器獲取一個可用的服務,最終會調用 ILoadBalancer 的 chooseServer 方法,下面就來看下 ILoadBalancer 的實現原理

首先來看下 ILoadBalancer 的整體類圖:

Ribbon中怎么使用 LoadBalancer 實現負載均衡

在上面的類圖中,主要的邏輯實在 BaseLoadBalancer 中實現,而 DynamicServerListLoadBalancer 主要提供動態(tài)獲取服務列表的能力。

ILoadBalancer

首先來看下 ILoadBalancer,它表示一個負載均衡器接口,

public interface ILoadBalancer {
	// 添加服務
	public void addServers(List<Server> newServers);
	
	//獲取服務
	public Server chooseServer(Object key);
	
	//標記某個服務下線
	public void markServerDown(Server server);

	//獲取狀態(tài)為UP的所有可用服務列表
    public List<Server> getReachableServers();

    //獲取所有服務列表,包括可用的和不可用的
	public List<Server> getAllServers();
}

AbstractLoadBalancer

實現 ILoadBalancer 接口,提供一些默認實現

public abstract class AbstractLoadBalancer implements ILoadBalancer {
    public enum ServerGroup{ALL, STATUS_UP, STATUS_NOT_UP}

    public Server chooseServer() {
    	return chooseServer(null);
    }
    public abstract List<Server> getServerList(ServerGroup serverGroup);
    // 獲取狀態(tài)
    public abstract LoadBalancerStats getLoadBalancerStats();    
}

IClientConfigAware

客戶端配置

public interface IClientConfigAware {
    public abstract void initWithNiwsConfig(IClientConfig clientConfig);
}

BaseLoadBalancer

負載均衡器的主要實現邏輯,在該類中,會根據負載均衡策略 IRule 來獲取可用的服務,會通過 IPing 來檢測服務的可用性,此外,該類中從 EurkaClient 獲取到服務列表后,會在該類中保存下來,會維護所有的服務列表和可用的服務列表。

首先來看下它的一些屬性,然后再來看每個對應的方法

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {

    // 默認的負載均衡策略:輪詢選擇服務實例
    private final static IRule DEFAULT_RULE = new RoundRobinRule();
    protected IRule rule = DEFAULT_RULE;

    // 默認 ping 的策略,會調用 IPing 來檢測服務是否可用
    private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
    protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
    protected IPing ping = null;

    // 所有服務列表
    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    // 狀態(tài)為 up 的服務列表
    protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());

    // 鎖
    protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
    protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();

    // 定時任務,去 ping 服務是否可用
    protected Timer lbTimer = null;
    // ping 的時間間隔,10秒
    protected int pingIntervalSeconds = 10;
    // ping 的最大次數
    protected int maxTotalPingTimeSeconds = 5;

    // 負載均衡器的狀態(tài)
    protected LoadBalancerStats lbStats;
    // 客戶端配置
    private IClientConfig config;
    
    // 服務列表變化監(jiān)聽器
    private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
    // 服務狀態(tài)變化監(jiān)聽器
    private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();

    // 構造方法,使用默認的配置來創(chuàng)建負載均衡器,還有其他重載的構造方法,可用根據需要來創(chuàng)建負載均衡器
    public BaseLoadBalancer() {
        this.name = DEFAULT_NAME;
        this.ping = null;
        setRule(DEFAULT_RULE);
        setupPingTask();
        lbStats = new LoadBalancerStats(DEFAULT_NAME);
    }

   .....................
}

在上面的屬性中,Ribbon 提供了一些默認的配置:

IClientConfig 表示客戶端的配置,實現類為 DefaultClientConfigImpl,在該類中配置了默認的值,:

public class DefaultClientConfigImpl implements IClientConfig {

    // ping 的默認策略 DummyPing
	public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName();
    public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";
    public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";
    public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000;
    public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9;
  
   .............................................
}

IRule 表示 負載均衡策略,即如何去選擇服務實例,默認為  RoundRobinRule,即通過輪詢的方式選擇服務。Ribbon 默認提供的有 7 種。

IPing 表示檢測服務是否可用策略,Ribbon 也提供了很多策略,共有 5 種,默認為 DummyPing

關于 IRule 和 IPing 的策略,后面會專門進行研究。

在 BaseLoadBalancer 中,除了提供一個無參的構造方法(使用的是默認的配置)外,還提供了很多重載的構造方法,下面來看下根據客戶端的配置來創(chuàng)建BaseLoadBalancer :

// 根據客戶端配置來創(chuàng)建 BaseLoadBalancer
public BaseLoadBalancer(IClientConfig config) {
	initWithNiwsConfig(config);
}

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
	// 負載均衡策略
	String ruleClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName);
	// ping 策略
	String pingClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName);
	IRule rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClassName, clientConfig);
	IPing ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(pingClassName, clientConfig);
	// 狀態(tài)
	LoadBalancerStats stats = createLoadBalancerStatsFromConfig(clientConfig);
	// 初始化配置
	initWithConfig(clientConfig, rule, ping, stats);
}

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
	this.config = clientConfig;
	String clientName = clientConfig.getClientName();
	this.name = clientName;
	
	// ping 的周期
	int pingIntervalTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30")));
	// 最大 ping 的次數
	int maxTotalPingTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,Integer.parseInt("2")));
	
	setPingInterval(pingIntervalTime);
	setMaxTotalPingTime(maxTotalPingTime);
	setRule(rule);
	setPing(ping);

	setLoadBalancerStats(stats);
	rule.setLoadBalancer(this);
	if (ping instanceof AbstractLoadBalancerPing) {
		((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
	}
	.................
	// 注冊監(jiān)控/可忽略
	init();
}

在上面的構造方法中,可用根據客戶端配置的信息來創(chuàng)建一個BaseLoadBalancer,如客戶端可以配置負載均衡策略,ping的策略,ping的時間間隔和最大次數等。

判斷服務的可用性

在 Ribbon 中,負載均衡器多久才去更新獲取服務列表呢?在 BaseLoadBalancer 類中,有一個 setupPingTask 方法,在該方法內部,會創(chuàng)建 PingTask 定時任務去檢測服務的可用性,而 PingTask 又會創(chuàng)建 Pinger 對象,在 Pinger 對象的 runPinger() 方法中,會根據ping策略即 pingerStrategy 的 pingServers(ping, allServer) 來獲取服務的可用性,主要邏輯如下:

void setupPingTask() {
	if (canSkipPing()) {
		return;
	}
	// 如果已經有了定時任務,則取消
	if (lbTimer != null) {
		lbTimer.cancel();
	}
	// 第二個參數為true,表示它是一個deamon線程
	lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
	// 創(chuàng)建 PingTask, 它繼承于 TimerTask,定時執(zhí)行 run 方法
	lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
	......
}
class PingTask extends TimerTask {
	public void run() {
		// 默認 pingStrategy = new SerialPingStrategy()
		new Pinger(pingStrategy).runPinger();
	}
}

public void runPinger() throws Exception {
	// 如果正在ping,則返回
	if (!pingInProgress.compareAndSet(false, true)) { 
		return; // Ping in progress - nothing to do
	}
	// 所有的服務,包括不可用的服務
	Server[] allServers = null;
	boolean[] results = null;

	Lock allLock = null;
	Lock upLock = null;

	try {

		allLock = allServerLock.readLock();
		allLock.lock();
		allServers = allServerList.toArray(new Server[allServerList.size()]);
		allLock.unlock();
		// 所有服務的數量
		int numCandidates = allServers.length;
		// 所有服務ping的結果
		results = pingerStrategy.pingServers(ping, allServers);

		// 狀態(tài)可用的服務列表 
		final List<Server> newUpList = new ArrayList<Server>();
		// 狀態(tài)改變的服務列表
		final List<Server> changedServers = new ArrayList<Server>();

		for (int i = 0; i < numCandidates; i++) {
			// 最新的狀態(tài)
			boolean isAlive = results[i];
			Server svr = allServers[i];
			// 老的狀態(tài)
			boolean oldIsAlive = svr.isAlive();
			// 更新狀態(tài)
			svr.setAlive(isAlive);
			
			// 如果狀態(tài)改變了,則放到集合中,會進行重新拉取
			if (oldIsAlive != isAlive) {
				changedServers.add(svr);
			}
			// 狀態(tài)可用的服務
			if (isAlive) {
				newUpList.add(svr);
			}
		}
		upLock = upServerLock.writeLock();
		upLock.lock();
		upServerList = newUpList;
		upLock.unlock();
		// 變態(tài)改變監(jiān)聽器
		notifyServerStatusChangeListener(changedServers);
	} finally {
		// ping 完成
		pingInProgress.set(false);
	}
}

// 檢測服務的狀態(tài)
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
	int numCandidates = servers.length;
	boolean[] results = new boolean[numCandidates];

	for (int i = 0; i < numCandidates; i++) {
		results[i] = false;
		if (ping != null) {
			results[i] = ping.isAlive(servers[i]);
		}
	}
	return results;
}

在上面的邏輯中,Ribbon 每10秒向 EurekaClient 發(fā)送 ping 來判斷服務的可用性,如果服務的可用性發(fā)生了改變或服務的數量和之前的不一致,則會更新或重新拉取服務。有了這些服務之后,會根據負載均衡策略 IRule 來選擇一個可用的服務。

根據負載均衡策略 IRule 來選擇一個可用的服務

在前文說到 Ribbon 客戶端 RibbonLoadBalancerClient 選擇服務的時候,最終會調用  ILoadBalancer.chooseServer 來選擇服務,接下來就來看下這個方法:

public Server chooseServer(Object key) {
	.......
    //rule= new RoundRobinRule()
	return rule.choose(key);
	....
}

關于 Ribbon 的負載均衡策略 IRule, Ribbon 提供了 7 種,后面再來分析,現在只需要知道通過 IRule 來選擇服務就可以了。

初始化獲取所有服務列表

在上面的分析中,Ribbon 會每10秒定時的去檢測服務的可用性,如果服務狀態(tài)發(fā)生了變化則重新獲取,之后,再根據負載均衡策略 IRule 來選擇一個可用的服務;但是,在初始化的時候,是從哪里獲取服務列表呢?下面就來分析這個問題

 BaseLoadBalancer 有個子類 DynamicServerListLoadBalancer,它具有使用動態(tài)源獲取服務器列表的功能。即服務器列表在運行時可能會更改。此外,還可以通過條件來過濾掉不符合所需條件的服務。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
	// 是否正在進行服務列表的更新
    protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
	// 服務列表
    volatile ServerList<T> serverListImpl;
	// 服務過濾器
    volatile ServerListFilter<T> filter;
}

1. 初始化時獲取所有服務

在 DynamicServerListLoadBalancer 中,有個 restOfInit 方法,在初始化時進行調用,在該方法中,會從 Eureka 客戶端中拉取所有的服務列表:

void restOfInit(IClientConfig clientConfig) {
	.............
	updateListOfServers();
	........
}

public void updateListOfServers() {
	List<T> servers = new ArrayList<T>();
	if (serverListImpl != null) {
		// 獲取所有服務列表
		servers = serverListImpl.getUpdatedListOfServers();
		// 根據條件過濾服務
		if (filter != null) {
			servers = filter.getFilteredListOfServers(servers);
		}
	}
	updateAllServerList(servers);
}

protected void updateAllServerList(List<T> ls) {
	if (serverListUpdateInProgress.compareAndSet(false, true)) {
		try {
			for (T s : ls) {
				s.setAlive(true); // 狀態(tài)設置為可用
			}
			setServersList(ls);
			super.forceQuickPing(); // 強制檢測服務狀態(tài)
		} finally {
			serverListUpdateInProgress.set(false);
		}
	}
}

 獲取所有服務列表 servers = serverListImpl.getUpdatedListOfServers(); 最終會調用  DiscoveryEnabledNIWSServerList 的方法:

servers = serverListImpl.getUpdatedListOfServers();

public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
	return obtainServersViaDiscovery();
}

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
	List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
	........
	// 通過 eurekaClient 來獲取注冊的服務列表 
	EurekaClient eurekaClient = eurekaClientProvider.get();
	if (vipAddresses!=null){
		for (String vipAddress : vipAddresses.split(",")) {
		
			List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
			for (InstanceInfo ii : listOfInstanceInfo) {
				if (ii.getStatus().equals(InstanceStatus.UP)) {
					.....
					DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
					serverList.add(des);
				}
			}
			......
		}
	}
	return serverList;
}

通過上面方法的分析,Ribbon 最終會通過 EurekaClient 來獲取服務列表的,而 EurekaClient 的實現類是 DiscoveryClient,而在 Eureka 中,DiscoveryClient 類具有服務的注冊,發(fā)現,續(xù)約,獲取服務列表等功能。

此外,該類中還可以通過過濾器來獲取不符合條件的服務。

以上就是 Ribbon 負載均衡器的一個實現原理。最后再來看下流程圖,加深印象:

Ribbon中怎么使用 LoadBalancer 實現負載均衡

關于Ribbon中怎么使用 LoadBalancer 實現負載均衡就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI