溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

storm+kafka集成簡(jiǎn)單應(yīng)用

發(fā)布時(shí)間:2020-09-09 02:05:27 來源:網(wǎng)絡(luò) 閱讀:3851 作者:choulanlan 欄目:大數(shù)據(jù)

    這兩天公司要學(xué)習(xí)kafka,結(jié)合之前的storm,做了一個(gè)簡(jiǎn)單的集成,之前也參考了網(wǎng)上的例子一些例子,發(fā)現(xiàn)或多或少都有一些問題。所以自己做了一個(gè)。


    這個(gè)是網(wǎng)上其他人遇到的問題,給摘錄一下,防止以后自己和大家出現(xiàn):


基本場(chǎng)景是應(yīng)用出現(xiàn)錯(cuò)誤,發(fā)送日志到kafka的某個(gè)topic,storm訂閱該topic,然后進(jìn)行后續(xù)處理。場(chǎng)景非常簡(jiǎn)單,但是在學(xué)習(xí)過程中,遇到一個(gè)奇怪的異常情況:使用KafkaSpout讀取topic數(shù)據(jù)時(shí),沒有向ZK寫offset數(shù)據(jù),致使每次都從頭開始讀取。糾結(jié)了兩天,終于碰巧找到原因:應(yīng)該使用BaseBasicBolt作為bolt的父類,而不是BaseRichBolt。

        

基本訂閱 :

基本場(chǎng)景:訂閱kafka的某個(gè)topic,然后在讀取的消息前加上自定義的字符串,然后寫回到kafka另外一個(gè)topic。  從Kafka讀取數(shù)據(jù)的Spout使用storm.kafka.KafkaSpout,向Kafka寫數(shù)據(jù)的Bolt使用storm.kafka.bolt.KafkaBolt。中間進(jìn)行進(jìn)行數(shù)據(jù)處理的Bolt定義為TopicMsgBolt。

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;

import java.util.Properties;

public class TopicMsgTopology {
    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
        // 配置Kafka訂閱的Topic,以及zookeeper中數(shù)據(jù)節(jié)點(diǎn)目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root1", "topicMsgTopology");
        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Properties props = new Properties();
        // 配置Kafka broker地址
        props.put("metadata.broker.list", "localhost:9092");
        // serializer.class為消息的序列化類
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "msgTopic2");
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("msgSentenceBolt", (IBasicBolt) new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
        builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
        if (args.length == 0) {
            String topologyName = "kafkaTopicTopology";
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        } else {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
    }
}


storm.kafka.ZkHosts構(gòu)造方法的參數(shù)是zookeeper標(biāo)準(zhǔn)配置地址的形式

storm.kafka.SpoutConfig構(gòu)造方法第一個(gè)參數(shù)為上述的storm.kafka.ZkHosts對(duì)象,第二個(gè)為待訂閱的topic名稱,第三個(gè)參數(shù)zkRoot為寫讀取topic時(shí)的偏移量offset數(shù)據(jù)的節(jié)點(diǎn)(zk node),第四個(gè)參數(shù)為該節(jié)點(diǎn)上的次級(jí)節(jié)點(diǎn)名(有個(gè)地方說這個(gè)是spout的id)。  backtype.storm.Config對(duì)象是配置storm的topology(拓?fù)洌┧枰幕A(chǔ)配置。  backtype.storm.spout.SchemeAsMultiScheme的構(gòu)造方法輸入的參數(shù)是訂閱kafka數(shù)據(jù)的處理參數(shù),這里的MessageScheme是自定義的,代碼如下:

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class MessageScheme implements Scheme {
    private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

    @Override
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8");
            logger.info("get one message is {}", msg);
            return new Values(msg);
        } catch (UnsupportedEncodingException ignored) {
            return null;
        }
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

MessageScheme類中g(shù)etOutputFields方法是KafkaSpout向后發(fā)送tuple(storm傳輸數(shù)據(jù)的最小結(jié)構(gòu))的名字,需要與接收數(shù)據(jù)的Bolt中統(tǒng)一(在這個(gè)例子中可以不統(tǒng)一,因?yàn)楹竺嬷苯尤〉?條數(shù)據(jù),但是在wordCount的那個(gè)例子中就需要統(tǒng)一了)。  TopicMsgBolt類是從storm.kafka.KafkaSpout接收數(shù)據(jù)的Bolt,對(duì)接收到的數(shù)據(jù)進(jìn)行處理,然后向后傳輸給storm.kafka.bolt.KafkaBolt。代碼如下:

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicMsgBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        String out = "Message got is '" + word + "'!";
        logger.info("out={}", out);
        collector.emit(new Values(out));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

此處需要特別注意的是,要使用backtype.storm.topology.base.BaseBasicBolt對(duì)象作為父類,否則不會(huì)在zk記錄偏移量offset數(shù)據(jù)。 需要編寫的代碼已完成,接下來就是在搭建好的storm、kafka中進(jìn)行測(cè)試:

# 創(chuàng)建topic./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic msgTopic2

接下來需要分別對(duì)msgTopic1、msgTopic2啟動(dòng)producer(生產(chǎn)者)與consumer(消費(fèi)者)

# 對(duì)msgTopic1啟動(dòng)producer,用于發(fā)送數(shù)據(jù) ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic msgTopic1 
# 對(duì)msgTopic2啟動(dòng)consumer,用于查看發(fā)送數(shù)據(jù)的處理結(jié)果 ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic msgTopic2 --from-beginning

執(zhí)行storm的jar命令運(yùn)行程序:

storm jar stormkafka.jar stormkafka1.TopicMsgTopology


待對(duì)應(yīng)的worker啟動(dòng)好之后,就可以在msgTopic1的producer對(duì)應(yīng)終端輸入數(shù)據(jù),然后在msgTopic2的consumer對(duì)應(yīng)終端查看輸出結(jié)果了。  


有幾點(diǎn)需要注意的:  必須先創(chuàng)建msgTopic1、msgTopic2兩個(gè)topic; 定義的bolt必須使用BaseBasicBolt作為父類,不能夠使用BaseRichBolt,否則無法記錄偏移量; zookeeper最好使用至少三個(gè)節(jié)點(diǎn)的分布式模式或偽分布式模式,否則會(huì)出現(xiàn)一些異常情況; 在整個(gè)storm下,spout、bolt的id必須唯一,否則會(huì)出現(xiàn)異常。 TopicMsgBolt類作為storm.kafka.bolt.KafkaBolt前的最后一個(gè)Bolt,需要將輸出數(shù)據(jù)名稱定義為message,否則KafkaBolt無法接收數(shù)據(jù)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI