溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

flume+kafka+storm運行的示例分析

發(fā)布時間:2021-12-10 13:51:34 來源:億速云 閱讀:294 作者:小新 欄目:云計算

這篇文章主要介紹flume+kafka+storm運行的示例分析,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

概述

在基于Hadoop平臺的很多應(yīng)用場景中,我們需要對數(shù)據(jù)進行離線和實時分析,離線分析可以很容易地借助于Hive或者mr來實現(xiàn)統(tǒng)計分析,但是對于實時的需求Hive和mr就不合適了。實時應(yīng)用場景可以使用Storm,它是一個實時處理系統(tǒng),它為實時處理類應(yīng)用提供了一個計算模型,可以很容易地進行編程處理。為了統(tǒng)一離線和實時計算,一般情況下,我們都希望將離線和實時計算的數(shù)據(jù)源的集合統(tǒng)一起來作為輸入,然后將數(shù)據(jù)的流向分別經(jīng)由實時系統(tǒng)和離線分析系統(tǒng),分別進行分析處理,這時我們可以考慮將數(shù)據(jù)源(如使用Flume收集日志)直接連接一個消息中間件,如Kafka,可以整合Flume+Kafka,F(xiàn)lume作為消息的Producer,生產(chǎn)的消息數(shù)據(jù)(日志數(shù)據(jù)、業(yè)務(wù)請求數(shù)據(jù)等等)發(fā)布到Kafka中,然后通過訂閱的方式,使用Storm的Topology作為消息的Consumer,在Storm集群中分別進行如下兩個需求場景的處理:
直接使用Storm的Topology對數(shù)據(jù)進行實時分析處理
整合Storm+HDFS,將消息處理后寫入HDFS進行離線分析處理

flume+kafka+storm運行的示例分析

flume+kafka+storm相結(jié)合,此時,flume作為數(shù)據(jù)來源收集數(shù)據(jù),kafka作為消息隊列,起緩沖作用,storm從kafka拉取數(shù)據(jù)分析處理。做軟件開發(fā)的都知道模塊化思想,這樣設(shè)計的原因有兩方面:
一方面是可以模塊化,功能劃分更加清晰,從“數(shù)據(jù)采集--數(shù)據(jù)接入--流式計算--數(shù)據(jù)輸出/存儲”

flume+kafka+storm運行的示例分析

1).數(shù)據(jù)采集
負責(zé)從各節(jié)點上實時采集數(shù)據(jù),選用cloudera的flume來實現(xiàn)
2).數(shù)據(jù)接入
由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個消息中間件來作為緩沖,選用apache的kafka
3).流式計算
對采集到的數(shù)據(jù)進行實時分析,選用apache的storm
4).數(shù)據(jù)輸出
對分析后的結(jié)果持久化,暫定用mysql
另一方面是模塊化之后,假如當(dāng)Storm掛掉了之后,數(shù)據(jù)采集和數(shù)據(jù)接入還是繼續(xù)在跑著,數(shù)據(jù)不會丟失,storm起來之后可以繼續(xù)進行流式計算;

數(shù)據(jù)來源flume

Kafka生產(chǎn)的數(shù)據(jù),是由Flume的Sink提供的,這里我們需要用到Flume集群,通過Flume集群將Agent的日志收集分發(fā)到 Kafka。我們根據(jù)情況選擇合適的source,這里我用的是exec,channel是memory,sink當(dāng)然就是kafkasink。詳細配置如下:

flume+kafka+storm運行的示例分析

flume到kafka

flume到kafka的傳輸過程如下圖:

flume+kafka+storm運行的示例分析

kafka的配置跟之前搭建的沒有什么改動。

測試flume到kafka

flume和kafka配置好以后,先啟動flume集群,這里是后臺運行:

flume-ng agent -n agent -c /usr/local/apache-flume-1.6.0-bin/conf  -f /usr/local/apache-flume-1.6.0-bin/conf/flume-test.conf -Dflume.root.logger=DEBUG,console &

然后啟動zookeeper:

./zkServer.sh start

接著啟動kafka集群,這里是后臺運行:

./kafka-server-start.sh ../config/server.properties &

然后向監(jiān)控的文件里輸入數(shù)據(jù):

echo 'hello world' >> topic-test.txt

接著在kafka集群上創(chuàng)建消費者,測試flume到kafka是否聯(lián)通,當(dāng)然也可以使用kafka監(jiān)控工具查看:

我們可以事先創(chuàng)建好topic,當(dāng)然我們也可以自動創(chuàng)建topic,設(shè)置kafka auto.create.topics.enable屬性為true,默認就為true。

./kafka-console-consumer.sh  --zookeeper master:2181 --from-beginning --topic topic1
這邊輸出'hello world'則表明flume到kafka連接成功。

 storm讀取kafka數(shù)據(jù)分析編程

首先搭建好storm集群,啟動nimbus、supervisor、ui

然后topology編程,我這里是java編程的一個小例子:

主類

package com.kafka_storm;
import java.util.HashMap;
import java.util.Map;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
public class StormKafkaTopo {   
    public static void main(String[] args) throws Exception { 
     // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("master:2181");
        // 配置Kafka訂閱的Topic,以及zookeeper中數(shù)據(jù)節(jié)點目錄和名字
        //這里需要注意的是,spout會根據(jù)config的后面兩個參數(shù)在zookeeper上為每個kafka分區(qū)創(chuàng)建保存讀取偏移的節(jié)點,如:/zkroot/topo/partition_0。
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "topic1", "/zkkafkaspout" , "kafkaspout");
        
        // 配置KafkaBolt中的kafka.broker.properties(可以參考kafka java編程)
        Config conf = new Config();  
        Map<String, String> map = new HashMap<String, String>(); 
        // 配置Kafka broker地址       
        map.put("metadata.broker.list", "master:9092");
        // serializer.class為消息的序列化類
        map.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", map);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "topic2");
         
        //默認情況下,spout下會發(fā)射域名為bytes的binary數(shù)據(jù),如果有需要,可以通過設(shè)置schema進行修改。
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
        TopologyBuilder builder = new TopologyBuilder();   
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 
        builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");        
 
        if (args != null && args.length > 0) {  
            conf.setNumWorkers(3);  
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
        } else {  
   
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("Topo", conf, builder.createTopology());  
            Utils.sleep(100000);  
            cluster.killTopology("Topo");  
            cluster.shutdown();  
        }  
    }  
}

消息處理

package com.kafka_storm;
import java.io.UnsupportedEncodingException;
import java.util.List;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
 * 使用KafkaSpout時需要子集實現(xiàn)Scheme接口,它主要負責(zé)從消息流中解析出需要的數(shù)據(jù)
 * @author lenovo
 *
 */
public class MessageScheme implements Scheme { 
     
    /* (non-Javadoc)
     * @see backtype.storm.spout.Scheme#deserialize(byte[])
     */
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8"); 
            return new Values(msg);
        } catch (UnsupportedEncodingException e) {  
          
        }
        return null;
    }
     
     
    /* (non-Javadoc)
     * @see backtype.storm.spout.Scheme#getOutputFields()
     */
    public Fields getOutputFields() {
        // TODO Auto-generated method stub
        return new Fields("msg");  
    }  
}

bolt

package com.kafka_storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class SenqueceBolt extends BaseBasicBolt{
     
    /* (non-Javadoc)
     * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        // TODO Auto-generated method stub
         String word = (String) input.getValue(0);  
         String out = "I'm " + word +  "!";  
         System.out.println("out=" + out);
         collector.emit(new Values(out));
    }
     
    /* (non-Javadoc)
     * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

在集群上運行

我們要將引入的第三方包全部放到storm的lib包下面,包括kafka、zookeeper的,否則會報缺失jar包的錯

storm jar StormKafkaDemo.jar com.kafka_storm.StormKafkaTopo StormKafkaDemo

開始總體測試:

向flume監(jiān)控的文件輸入數(shù)據(jù),在storm的log日志里查看輸出,當(dāng)然我們也可以在kafka里查看,因為我將結(jié)果輸出到kafka里了,topic為topic2。

日志里結(jié)果如下:

flume+kafka+storm運行的示例分析

以上是“flume+kafka+storm運行的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對大家有幫助,更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI