您好,登錄后才能下訂單哦!
這篇文章主要介紹“PartitionManager分區(qū)管理器怎么使用”,在日常操作中,相信很多人在PartitionManager分區(qū)管理器怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”PartitionManager分區(qū)管理器怎么使用”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
閱讀背景:對(duì)于java內(nèi)部類有一個(gè)粗淺的認(rèn)識(shí)
閱讀目的:了解kafka 分區(qū)是如何在Storm接口之中進(jìn)行管理的
最終主題:詳盡的梳理PartitionManager的整個(gè)過程
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.metric.api.CombinedMetric; import backtype.storm.metric.api.CountMetric; import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.spout.SpoutOutputCollector; import com.google.common.collect.ImmutableMap; import com.mixbox.storm.kafka.KafkaSpout.EmitState; import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset; import com.mixbox.storm.kafka.trident.MaxMetric; import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * 分區(qū)的管理器 * * @author Yin Shuai * */ public class PartitionManager { public static final Logger LOG = LoggerFactory .getLogger(PartitionManager.class); private final CombinedMetric _fetchAPILatencyMax; private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; /** * kafka MessageID 封裝了 partition 和offset * * @author Yin Shuai */ static class KafkaMessageId { public Partition partition; public long offset; public KafkaMessageId(Partition partition, long offset) { this.partition = partition; this.offset = offset; } } // 被發(fā)送的偏移量 Long _emittedToOffset; SortedSet<Long> _pending = new TreeSet<Long>(); // 已經(jīng)提交的 Long _committedTo; // 等待去發(fā)射 LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>(); // 分區(qū) Partition _partition; // Storm Spout的配置文件 SpoutConfig _spoutConfig; // topology 的實(shí)例ID String _topologyInstanceId; // kafka 底層的消費(fèi)者ID SimpleConsumer _consumer; // 動(dòng)態(tài)的分區(qū)Connection DynamicPartitionConnections _connections; //ZKState 狀態(tài)的維護(hù) ZkState _state; //Storm的配置文件 Map _stormConf; // @SuppressWarnings("unchecked") public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; _consumer = connections.register(id.host, id.partition); _state = state; _stormConf = stormConf; String jsonTopologyId = null; Long jsonOffset = null; String path = committedPath(); try { Map<Object, Object> json = _state.readJSON(path); LOG.info("Read partition information from: " + path + " --> " + json); if (json != null) { jsonTopologyId = (String) ((Map<Object, Object>) json .get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } if (jsonTopologyId == null || jsonOffset == null) { // failed to parse // JSON? _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); LOG.info("No partition information found, using configuration to determine offset"); } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId); } LOG.info("Starting " + _partition + " from offset " + _committedTo); _emittedToOffset = _committedTo; _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); } public Map getMetricsDataMap() { Map ret = new HashMap(); ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); return ret; } // returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { //等待去發(fā)送的 為空了。 if (_waitingToEmit.isEmpty()) { // 開始裝載 fill(); } while (true) { //檢索并移除List中間的第一個(gè)元素 MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //要發(fā)送的為空的時(shí)候, 沒有發(fā)生的 if (toEmit == null) { return EmitState.NO_EMITTED; } // 這里的tups Iterable<List<Object>> tups = KafkaUtils.generateTuples( _spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } } /** * 填充的行為 * 這里真正的決定了你有哪些數(shù)據(jù)需要去發(fā)送 */ private void fill() { long start = System.nanoTime(); /* * 拿到MessageSet */ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset); long end = System.nanoTime(); long millis = (end - start) / 1000000; _fetchAPILatencyMax.update(millis); _fetchAPILatencyMean.update(millis); _fetchAPICallCount.incr(); int numMessages = countMessages(msgs); _fetchAPIMessageCount.incrBy(numMessages); if (numMessages > 0) { LOG.info("Fetched " + numMessages + " messages from: " + _partition); } for (MessageAndOffset msg : msgs) { _pending.add(_emittedToOffset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset)); _emittedToOffset = msg.nextOffset(); } if (numMessages > 0) { LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers"); } } private int countMessages(ByteBufferMessageSet messageSet) { int counter = 0; for (MessageAndOffset messageAndOffset : messageSet) { counter = counter + 1; } return counter; } public void ack(Long offset) { _pending.remove(offset); } public void fail(Long offset) { // TODO: should it use in-memory ack set to skip anything that's been // acked but not committed??? // things might get crazy with lots of timeouts if (_emittedToOffset > offset) { _emittedToOffset = offset; _pending.tailSet(offset).clear(); } } public void commit() { // 最新完成的偏移量 long lastCompletedOffset = lastCompletedOffset(); //寫最新的完全的偏移量到zk,的某個(gè)分區(qū),到某一個(gè)topology if (lastCompletedOffset != lastCommittedOffset()) { LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); Map<Object, Object> data = ImmutableMap .builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); // 直接JSON 寫入 _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } } //提交的路徑 private String committedPath() { return "/" + _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); } //拿到最新的分區(qū)便宜量 public long queryPartitionOffsetLatestTime() { return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, OffsetRequest.LatestTime()); } //最新的提交的便宜量 public long lastCommittedOffset() { return _committedTo; } public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.first(); } } //拿到最新的分區(qū) public Partition getPartition() { return _partition; } public void close() { _connections.unregister(_partition.host, _partition.partition); } }
1 PartitionManager封裝了一個(gè)Static 的class kafkaMessageId,并且封裝了某個(gè)分區(qū)和偏移量
static class KafkaMessageId { public Partition partition; public long offset; public KafkaMessageId(Partition partition, long offset) { this.partition = partition; this.offset = offset; } }
2: 在PartitionManager中同時(shí)持有了一下的實(shí)例變量:
2.1 已經(jīng)發(fā)射的數(shù)據(jù) pending
2.2 已經(jīng)提交的 committedTo
2.3 等待去發(fā)射的 _waitingToEmit
2.4 具體的分區(qū) _partition
其中 _waitingToEmit 是一個(gè)LinkedList<MessageAndRealOffset>
3 : PartitionManager 在初始化的時(shí)候,需要傳遞的參數(shù)是
topologyInstanceId
DynamicPartitionConnections
ZkState
Map
SpoutConfig
Partition
SimpleConsumer 對(duì)象,SimpleConsumer對(duì)象將在 DynamicPartitionConnections中
通過register的方法進(jìn)行注冊(cè)
到此,關(guān)于“PartitionManager分區(qū)管理器怎么使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。