溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Storm-kafka中如何理解ZkCoordinator的過程

發(fā)布時(shí)間:2021-11-24 15:51:35 來源:億速云 閱讀:121 作者:柒染 欄目:云計(jì)算

Storm-kafka中如何理解ZkCoordinator的過程,針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

梳理ZkCoordinator的過程

package com.mixbox.storm.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;

import java.util.*;

import static com.mixbox.storm.kafka.KafkaUtils.taskId;

/**
 * 
 * 
 * ZKCoordinator 協(xié)調(diào)器
 * 
 * @author Yin Shuai
 */

public class ZkCoordinator implements PartitionCoordinator {
	public static final Logger LOG = LoggerFactory
			.getLogger(ZkCoordinator.class);

	SpoutConfig _spoutConfig;

	int _taskIndex;

	int _totalTasks;
	
	String _topologyInstanceId;
	
	// 每一個(gè)分區(qū)對(duì)應(yīng)著一個(gè)分區(qū)管理器
	Map<Partition, PartitionManager> _managers = new HashMap();
	
	//緩存的List
	List<PartitionManager> _cachedList;

	//上次刷新的時(shí)間
	Long _lastRefreshTime = null;
	
	//刷新頻率 毫秒
	int _refreshFreqMs;

	//動(dòng)態(tài)分區(qū)連接
	DynamicPartitionConnections _connections;
	
	//動(dòng)態(tài)BrokersReader
	DynamicBrokersReader _reader;
	
	
	ZkState _state;
	
	Map _stormConf;

	/**
	 * 
	 * @param connections
	 *            動(dòng)態(tài)的 分區(qū)連接
	 * @param stormConf
	 *            Storm的配置文件
	 * @param spoutConfig
	 *            Storm sput的配置文件
	 * @param state
	 *            對(duì)于ZKState的連接
	 * @param taskIndex
	 *            任務(wù)
	 * @param totalTasks
	 *            總共的任務(wù)
	 * @param topologyInstanceId
	 *            拓?fù)涞膶?shí)例ID
	 */
	public ZkCoordinator(DynamicPartitionConnections connections,
			Map stormConf, SpoutConfig spoutConfig, ZkState state,
			int taskIndex, int totalTasks, String topologyInstanceId) {
		this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks,
				topologyInstanceId, buildReader(stormConf, spoutConfig));
	}

	public ZkCoordinator(DynamicPartitionConnections connections,
			Map stormConf, SpoutConfig spoutConfig, ZkState state,
			int taskIndex, int totalTasks, String topologyInstanceId,
			DynamicBrokersReader reader) {
		_spoutConfig = spoutConfig;
		_connections = connections;
		_taskIndex = taskIndex;
		_totalTasks = totalTasks;
		_topologyInstanceId = topologyInstanceId;
		_stormConf = stormConf;
		_state = state;
		ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
		_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
		_reader = reader;
	}

	/**
	 * @param stormConf
	 * @param spoutConfig
	 * @return
	 */
	private static DynamicBrokersReader buildReader(Map stormConf,
			SpoutConfig spoutConfig) {
		ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
		return new DynamicBrokersReader(stormConf, hosts.brokerZkStr,
				hosts.brokerZkPath, spoutConfig.topic);
	}

	@Override
	public List<PartitionManager> getMyManagedPartitions() {
		if (_lastRefreshTime == null
				|| (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
			refresh();
			_lastRefreshTime = System.currentTimeMillis();
		}
		return _cachedList;
	}

	/**
	 * 簡單的刷新的行為
	 * 
	 */
	void refresh() {
		try {

			LOG.info(taskId(_taskIndex, _totalTasks)
					+ "Refreshing partition manager connections");

			// 拿到所有的分區(qū)信息
			GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();

			// 拿到自己任務(wù)的所有分區(qū)
			List<Partition> mine = KafkaUtils.calculatePartitionsForTask(
					brokerInfo, _totalTasks, _taskIndex);

			// 拿到當(dāng)前任務(wù)的分區(qū)
			Set<Partition> curr = _managers.keySet();

			// 構(gòu)造一個(gè)集合
			Set<Partition> newPartitions = new HashSet<Partition>(mine);

			// 在new分區(qū)中,移除掉所有 自己擁有的分區(qū)
			newPartitions.removeAll(curr);

			// 要?jiǎng)h除的分區(qū)
			Set<Partition> deletedPartitions = new HashSet<Partition>(curr);

			//
			deletedPartitions.removeAll(mine);

			LOG.info(taskId(_taskIndex, _totalTasks)
					+ "Deleted partition managers: "
					+ deletedPartitions.toString());

			for (Partition id : deletedPartitions) {
				PartitionManager man = _managers.remove(id);
				man.close();
			}
			LOG.info(taskId(_taskIndex, _totalTasks)
					+ "New partition managers: " + newPartitions.toString());

			for (Partition id : newPartitions) {
				PartitionManager man = new PartitionManager(_connections,
						_topologyInstanceId, _state, _stormConf, _spoutConfig,
						id);
				_managers.put(id, man);
			}

		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		_cachedList = new ArrayList<PartitionManager>(_managers.values());
		LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
	}

	@Override
	public PartitionManager getManager(Partition partition) {
		return _managers.get(partition);
	}
}

   1 : 首先 ZKCoorDinator 實(shí)現(xiàn) PartitionCoordinator的接口

package com.mixbox.storm.kafka;

import java.util.List;

/**
 * @author Yin Shuai
 */
public interface PartitionCoordinator {
	
	/**
	 * 拿到我管理的分區(qū)列表  List{PartitionManager}
	 * @return
	 */
	List<PartitionManager> getMyManagedPartitions();

	
	/**
	 * @param 依據(jù)制定的分區(qū)partition,去getManager
	 * @return
	 */
	PartitionManager getManager(Partition partition);
}

         第一個(gè)方法拿到所有的   PartitionManager

         第二個(gè)方法依據(jù)特定的   Partition去得到一個(gè)分區(qū)管理器

關(guān)于 Storm-kafka中如何理解ZkCoordinator的過程問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI