溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

PartitionManager分區(qū)管理器怎么使用

發(fā)布時(shí)間:2021-12-09 16:18:44 來源:億速云 閱讀:127 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“PartitionManager分區(qū)管理器怎么使用”,在日常操作中,相信很多人在PartitionManager分區(qū)管理器怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”PartitionManager分區(qū)管理器怎么使用”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

        閱讀背景:對(duì)于java內(nèi)部類有一個(gè)粗淺的認(rèn)識(shí)

        閱讀目的:了解kafka 分區(qū)是如何在Storm接口之中進(jìn)行管理的

        最終主題:詳盡的梳理PartitionManager的整個(gè)過程

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import com.google.common.collect.ImmutableMap;
import com.mixbox.storm.kafka.KafkaSpout.EmitState;
import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset;
import com.mixbox.storm.kafka.trident.MaxMetric;

import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

/**
 * 分區(qū)的管理器
 * 
 * @author Yin Shuai
 * 
 */
public class PartitionManager {
	public static final Logger LOG = LoggerFactory
			.getLogger(PartitionManager.class);
	private final CombinedMetric _fetchAPILatencyMax;
	private final ReducedMetric _fetchAPILatencyMean;
	private final CountMetric _fetchAPICallCount;
	private final CountMetric _fetchAPIMessageCount;

	/**
	 * kafka MessageID 封裝了 partition 和offset
	 * 
	 * @author Yin Shuai
	 */
	static class KafkaMessageId {
		public Partition partition;
		public long offset;

		public KafkaMessageId(Partition partition, long offset) {
			this.partition = partition;
			this.offset = offset;
		}
	}

	// 被發(fā)送的偏移量
	Long _emittedToOffset;

	SortedSet<Long> _pending = new TreeSet<Long>();

	// 已經(jīng)提交的
	Long _committedTo;

	// 等待去發(fā)射
	LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();

	// 分區(qū)
	Partition _partition;

	// Storm Spout的配置文件
	SpoutConfig _spoutConfig;

	// topology 的實(shí)例ID
	String _topologyInstanceId;

	// kafka 底層的消費(fèi)者ID
	SimpleConsumer _consumer;

	// 動(dòng)態(tài)的分區(qū)Connection
	DynamicPartitionConnections _connections;

	//ZKState 狀態(tài)的維護(hù)
	ZkState _state;

	//Storm的配置文件
	Map _stormConf;

	//
	@SuppressWarnings("unchecked")
	public PartitionManager(DynamicPartitionConnections connections,
			String topologyInstanceId, ZkState state, Map stormConf,
			SpoutConfig spoutConfig, Partition id) {
		_partition = id;
		_connections = connections;
		_spoutConfig = spoutConfig;
		_topologyInstanceId = topologyInstanceId;
		_consumer = connections.register(id.host, id.partition);
		_state = state;
		_stormConf = stormConf;

		String jsonTopologyId = null;
		Long jsonOffset = null;
		String path = committedPath();
		try {
			
			Map<Object, Object> json = _state.readJSON(path);
			LOG.info("Read partition information from: " + path + " --> "
					+ json);
			if (json != null) {
				jsonTopologyId = (String) ((Map<Object, Object>) json
						.get("topology")).get("id");
				jsonOffset = (Long) json.get("offset");
			}
		} catch (Throwable e) {
			LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
		}

		if (jsonTopologyId == null || jsonOffset == null) { // failed to parse
															// JSON?
			_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic,
					id.partition, spoutConfig);
			LOG.info("No partition information found, using configuration to determine offset");
		} else if (!topologyInstanceId.equals(jsonTopologyId)
				&& spoutConfig.forceFromStart) {
			_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic,
					id.partition, spoutConfig.startOffsetTime);
			LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
		} else {
			_committedTo = jsonOffset;
			LOG.info("Read last commit offset from zookeeper: " + _committedTo
					+ "; old topology_id: " + jsonTopologyId
					+ " - new topology_id: " + topologyInstanceId);
		}

		LOG.info("Starting " + _partition + " from offset " + _committedTo);
		_emittedToOffset = _committedTo;

		_fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
		_fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
		_fetchAPICallCount = new CountMetric();
		_fetchAPIMessageCount = new CountMetric();
	}

	public Map getMetricsDataMap() {
		Map ret = new HashMap();
		ret.put(_partition + "/fetchAPILatencyMax",
				_fetchAPILatencyMax.getValueAndReset());
		ret.put(_partition + "/fetchAPILatencyMean",
				_fetchAPILatencyMean.getValueAndReset());
		ret.put(_partition + "/fetchAPICallCount",
				_fetchAPICallCount.getValueAndReset());
		ret.put(_partition + "/fetchAPIMessageCount",
				_fetchAPIMessageCount.getValueAndReset());
		return ret;
	}

	// returns false if it's reached the end of current batch
	public EmitState next(SpoutOutputCollector collector) {
		
		//等待去發(fā)送的 為空了。
		if (_waitingToEmit.isEmpty()) {
			// 開始裝載
			fill();
		}
		
		while (true) {
			
			//檢索并移除List中間的第一個(gè)元素
			MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
			
			//要發(fā)送的為空的時(shí)候, 沒有發(fā)生的
			if (toEmit == null) {
				return EmitState.NO_EMITTED;
			}
			
			// 這里的tups
			Iterable<List<Object>> tups = KafkaUtils.generateTuples(
					_spoutConfig, toEmit.msg);
			if (tups != null) {
				for (List<Object> tup : tups) {
					collector.emit(tup, new KafkaMessageId(_partition,
							toEmit.offset));
				}
				break;
			} else {
				ack(toEmit.offset);
			}
		}
		if (!_waitingToEmit.isEmpty()) {
			return EmitState.EMITTED_MORE_LEFT;
		} else {
			return EmitState.EMITTED_END;
		}
	}

	
	/**
	 * 填充的行為
	 * 這里真正的決定了你有哪些數(shù)據(jù)需要去發(fā)送
	 */
	private void fill() {
		long start = System.nanoTime();
		
		
		/*
		 *  拿到MessageSet
		 */
		ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,
				_consumer, _partition, _emittedToOffset);
	
		long end = System.nanoTime();
	
		long millis = (end - start) / 1000000;
		
		_fetchAPILatencyMax.update(millis);
		_fetchAPILatencyMean.update(millis);
		_fetchAPICallCount.incr();
		
		int numMessages = countMessages(msgs);
		
		_fetchAPIMessageCount.incrBy(numMessages);

		if (numMessages > 0) {
			LOG.info("Fetched " + numMessages + " messages from: " + _partition);
		}
		
		for (MessageAndOffset msg : msgs) {
			_pending.add(_emittedToOffset);
			_waitingToEmit.add(new MessageAndRealOffset(msg.message(),
					_emittedToOffset));
			_emittedToOffset = msg.nextOffset();
		}
		
		if (numMessages > 0) {
			LOG.info("Added " + numMessages + " messages from: " + _partition
					+ " to internal buffers");
		}
	}

	private int countMessages(ByteBufferMessageSet messageSet) {
		int counter = 0;
		for (MessageAndOffset messageAndOffset : messageSet) {
			counter = counter + 1;
		}
		return counter;
	}

	public void ack(Long offset) {
		_pending.remove(offset);
	}

	public void fail(Long offset) {
		// TODO: should it use in-memory ack set to skip anything that's been
		// acked but not committed???
		// things might get crazy with lots of timeouts
		if (_emittedToOffset > offset) {
			_emittedToOffset = offset;
			_pending.tailSet(offset).clear();
		}
	}

	public void commit() {
		
		// 最新完成的偏移量
		long lastCompletedOffset = lastCompletedOffset();
		
		//寫最新的完全的偏移量到zk,的某個(gè)分區(qū),到某一個(gè)topology
		if (lastCompletedOffset != lastCommittedOffset()) {
			LOG.info("Writing last completed offset (" + lastCompletedOffset
					+ ") to ZK for " + _partition + " for topology: "
					+ _topologyInstanceId);
			
			
			Map<Object, Object> data = ImmutableMap
					.builder()
					.put("topology",
							ImmutableMap.of("id", _topologyInstanceId, "name",
									_stormConf.get(Config.TOPOLOGY_NAME)))
					.put("offset", lastCompletedOffset)
					.put("partition", _partition.partition)
					.put("broker",
							ImmutableMap.of("host", _partition.host.host,
									"port", _partition.host.port))
					.put("topic", _spoutConfig.topic).build();
			
			// 直接JSON 寫入
			_state.writeJSON(committedPath(), data);
			
			_committedTo = lastCompletedOffset;
			
			LOG.info("Wrote last completed offset (" + lastCompletedOffset
					+ ") to ZK for " + _partition + " for topology: "
					+ _topologyInstanceId);
		} else {
			LOG.info("No new offset for " + _partition + " for topology: "
					+ _topologyInstanceId);
		}
	}
	
	//提交的路徑
	private String committedPath() {
		return "/" + _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/"
				+ _partition.getId();
	}

	//拿到最新的分區(qū)便宜量
	public long queryPartitionOffsetLatestTime() {
		return KafkaUtils.getOffset(_consumer, _spoutConfig.topic,
				_partition.partition, OffsetRequest.LatestTime());
	}

	//最新的提交的便宜量
	public long lastCommittedOffset() {
		return _committedTo;
	}

	public long lastCompletedOffset() {
		if (_pending.isEmpty()) {
			return _emittedToOffset;
		} else {
			return _pending.first();
		}
	}

	//拿到最新的分區(qū)
	public Partition getPartition() {
		return _partition;
	}

	public void close() {
		_connections.unregister(_partition.host, _partition.partition);
	}
}

     1 PartitionManager封裝了一個(gè)Static 的class  kafkaMessageId,并且封裝了某個(gè)分區(qū)和偏移量

static class KafkaMessageId {
		public Partition partition;
		public long offset;

		public KafkaMessageId(Partition partition, long offset) {
			this.partition = partition;
			this.offset = offset;
		}
	}

    2:   在PartitionManager中同時(shí)持有了一下的實(shí)例變量:

                    2.1 已經(jīng)發(fā)射的數(shù)據(jù) pending

                    2.2 已經(jīng)提交的 committedTo      

                    2.3 等待去發(fā)射的 _waitingToEmit

                    2.4 具體的分區(qū) _partition

      其中   _waitingToEmit 是一個(gè)LinkedList<MessageAndRealOffset>

   3 : PartitionManager 在初始化的時(shí)候,需要傳遞的參數(shù)是

                      topologyInstanceId

                      DynamicPartitionConnections

                      ZkState

                      Map

                      SpoutConfig

                      Partition

                      SimpleConsumer  對(duì)象,SimpleConsumer對(duì)象將在 DynamicPartitionConnections中

      通過register的方法進(jìn)行注冊(cè)

到此,關(guān)于“PartitionManager分區(qū)管理器怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI