溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Storm如何和Kafka進(jìn)行整合

發(fā)布時間:2021-11-24 15:48:36 來源:億速云 閱讀:173 作者:柒染 欄目:云計算

這篇文章將為大家詳細(xì)講解有關(guān)Storm如何和Kafka進(jìn)行整合,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

 對于Storm 如何和Kafka進(jìn)行整合

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;

import java.util.*;

/**
 * @author Yin Shuai
 */

public class KafkaSpout extends BaseRichSpout {

	public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

	/**
	 * 內(nèi)部類,Message和Offset的偏移量對象
	 * 
	 * @author Yin Shuai
	 */

	public static class MessageAndRealOffset {
		public Message msg;
		public long offset;

		public MessageAndRealOffset(Message msg, long offset) {
			this.msg = msg;
			this.offset = offset;
		}
	}

	/**
	 * 發(fā)射的枚舉類
	 * @author Yin Shuai
	 */
	static enum EmitState {
		EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
	}

	String _uuid = UUID.randomUUID().toString();
	
	SpoutConfig _spoutConfig;
	
	SpoutOutputCollector _collector;

	// 分區(qū)的協(xié)調(diào)器,getMyManagedPartitions 拿到我所管理的分區(qū)
	PartitionCoordinator _coordinator;

	// 動態(tài)的分區(qū)鏈接:保存到kafka各個節(jié)點的連接,以及負(fù)責(zé)的topic的partition號碼
	DynamicPartitionConnections _connections;

	// 提供了從zookeeper讀寫kafka 消費者信息的功能
	ZkState _state;

	// 上次更新的毫秒數(shù)
	long _lastUpdateMs = 0;

	// 當(dāng)前的分區(qū)
	int _currPartitionIndex = 0;

	public KafkaSpout(SpoutConfig spoutConf) {
		_spoutConfig = spoutConf;
	}

	@SuppressWarnings("unchecked")
	@Override
	public void open(Map conf, final TopologyContext context,
			final SpoutOutputCollector collector) {
		_collector = collector;

		List<String> zkServers = _spoutConfig.zkServers;

		// 初始化的時候如果zkServers 為空,那么初始化 默認(rèn)的配置Zookeeper
		if (zkServers == null) {

			zkServers = new ArrayList<String>() {

				{
					add("192.168.50.144");
					add("192.168.50.169");
					add("192.168.50.207");
				}
			};

			// zkServers =
			// (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
			System.out.println(" 使用的是Storm默認(rèn)配置的Zookeeper List : " + zkServers);

		}

		Integer zkPort = _spoutConfig.zkPort;

		// 在這里我們也同時 來檢查zookeeper的端口是否為空
		if (zkPort == null) {

			zkPort = 2181;
			// zkPort = ((Number)
			// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
		}

		Map stateConf = new HashMap(conf);

		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);

		// 通過保存的配置文件,我們持有了一個zookeeper的state,支持節(jié)點內(nèi)容的創(chuàng)建和刪除
		_state = new ZkState(stateConf);

		// 對于連接的維護(hù)
		_connections = new DynamicPartitionConnections(_spoutConfig,
				KafkaUtils.makeBrokerReader(conf, _spoutConfig));

		// using TransactionalState like this is a hack
		// 拿到總共的任務(wù)次數(shù)

		int totalTasks = context
				.getComponentTasks(context.getThisComponentId()).size();

		// 判斷當(dāng)前的主機(jī)是否是靜態(tài)的statichost
		if (_spoutConfig.hosts instanceof StaticHosts) {
			_coordinator = new StaticCoordinator(_connections, conf,
					_spoutConfig, _state, context.getThisTaskIndex(),
					totalTasks, _uuid);

			// 當(dāng)你拿到的spoutConfig是zkhost的時候
		} else {
			_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
					_state, context.getThisTaskIndex(), totalTasks, _uuid);
		}

		context.registerMetric("kafkaOffset", new IMetric() {
			KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(
					_spoutConfig.topic, _connections);

			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Set<Partition> latestPartitions = new HashSet();
				for (PartitionManager pm : pms) {
					latestPartitions.add(pm.getPartition());
				}
				_kafkaOffsetMetric.refreshPartitions(latestPartitions);
				for (PartitionManager pm : pms) {
					_kafkaOffsetMetric.setLatestEmittedOffset(
							pm.getPartition(), pm.lastCompletedOffset());
				}
				return _kafkaOffsetMetric.getValueAndReset();
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);

		context.registerMetric("kafkaPartition", new IMetric() {
			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Map concatMetricsDataMaps = new HashMap();
				for (PartitionManager pm : pms) {
					concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
				}
				return concatMetricsDataMaps;
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);
	}

	@Override
	public void close() {
		_state.close();
	}

	@Override
	public void nextTuple() {
		// Storm-spout 是從kafka 消費數(shù)據(jù),把 kafka 的 consumer
		// 當(dāng)成是一個spout,并且向其他的bolt的發(fā)送數(shù)據(jù)

		// 拿到當(dāng)前我管理的這些PartitionsManager
		List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
		for (int i = 0; i < managers.size(); i++) {

			// 對于每一個分區(qū)的 PartitionManager

			// in case the number of managers decreased
			// 當(dāng)前的分區(qū)

			_currPartitionIndex = _currPartitionIndex % managers.size();

			// 拿到當(dāng)前的分區(qū),并且發(fā)送,這里把SpoutOutputCollector傳遞進(jìn)去了,由他發(fā)射元祖
			EmitState state = managers.get(_currPartitionIndex)
					.next(_collector);

			// 如果發(fā)送狀態(tài)為:發(fā)送-還有剩余
			if (state != EmitState.EMITTED_MORE_LEFT) {
				_currPartitionIndex = (_currPartitionIndex + 1)
						% managers.size();
			}

			// 如果發(fā)送的狀態(tài)為: 發(fā)送-沒有剩余
			if (state != EmitState.NO_EMITTED) {
				break;
			}
		}

		long now = System.currentTimeMillis();
		if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
			commit();
		}
	}

	@Override
	public void ack(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.ack(id.offset);
		}
	}

	@Override
	public void fail(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.fail(id.offset);
		}
	}

	@Override
	public void deactivate() {
		// 停止工作
		commit();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		System.out.println(_spoutConfig.scheme.getOutputFields());
		declarer.declare(_spoutConfig.scheme.getOutputFields());
	}

	private void commit() {
		_lastUpdateMs = System.currentTimeMillis();
		for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
			manager.commit();
		}
	}

}

       在粗淺的代碼閱讀之后,在這里進(jìn)行詳細(xì)的分析:

      1  KafkaSpout之中持有了一個 MessageAndRealOffset 的內(nèi)部類

public static class MessageAndRealOffset
{
    public Message msg;
    
    public long offset;
    
    public MessageAndRealOffset(Message msg,long offset)
    {
        this.msg = msg;
        this.offset = offset;
    }
}

    2 在Spout之中我們還持有了一個PartitionCoordinator的分區(qū)協(xié)調(diào)器,默認(rèn)的情況我們實例化的對象

是ZKCoordinator

    

關(guān)于Storm如何和Kafka進(jìn)行整合就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI