溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm筆記整理(二):Storm本地開發(fā)案例—總和計算與單詞統(tǒng)計

發(fā)布時間:2020-08-18 16:18:06 來源:網(wǎng)絡 閱讀:7798 作者:xpleaf 欄目:大數(shù)據(jù)

[TOC]


概述

在Strom的API中提供了LocalCluster對象,這樣在不用搭建Storm環(huán)境或者Storm集群的情況下也能夠開發(fā)Storm的程序,非常方便。

基于Maven構建工程項目,其所需要的依賴如下:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.2</version>
</dependency>

Storm本地開發(fā)案例1:總和計算

需求分析

需求如下:

數(shù)據(jù)源不斷產(chǎn)生遞增數(shù)字,對產(chǎn)生的數(shù)字累加求和

分析如下:

Strom的Topology包含Spout和Bolt兩種節(jié)點類型,在這個案例中,可以使用Spout來對數(shù)據(jù)源進行處理(模擬產(chǎn)生數(shù)據(jù)),
然后將其發(fā)送到計算和的Bolt中,所以實際上這里只需要使用一個Spout節(jié)點和一個Bolt節(jié)點就可以了。

程序開發(fā)

在理解了Storm的設計思想后,將其與MapReduce的設計思想進行對比,再看下面的程序代碼其實是非常好理解的。

OrderSpout
/**
     * 數(shù)據(jù)源
     */
static class OrderSpout extends BaseRichSpout {

    private Map conf;   // 當前組件配置信息
    private TopologyContext context;    // 當前組件上下文對象
    private SpoutOutputCollector collector; // 發(fā)送tuple的組件

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    /**
         * 接收數(shù)據(jù)的核心方法
         */
    @Override
    public void nextTuple() {
        long num = 0;
        while (true) {
            num++;
            StormUtil.sleep(1000);
            System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產(chǎn)生的訂單金額:" + num);
            this.collector.emit(new Values(num));
        }
    }

    /**
         * 是對發(fā)送出去的數(shù)據(jù)的描述schema
         */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("order_cost"));
    }
}
SumBolt
private Long sumOrderCost = 0L;

/**
     * 計算和的Bolt節(jié)點
     */
static class SumBolt extends BaseRichBolt {

    private Map conf;   // 當前組件配置信息
    private TopologyContext context;    // 當前組件上下文對象
    private OutputCollector collector; // 發(fā)送tuple的組件

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    private Long sumOrderCost = 0L;

    /**
         * 處理數(shù)據(jù)的核心方法
         */
    @Override
    public void execute(Tuple input) {
        Long orderCost = input.getLongByField("order_cost");
        sumOrderCost += orderCost;

        System.out.println("商城網(wǎng)站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
        StormUtil.sleep(1000);
    }

    /**
         * 如果當前bolt為最后一個處理單元,該方法可以不用管
         */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
StormLocalSumTopology

/**
 * 1°、實現(xiàn)數(shù)字累加求和的案例:數(shù)據(jù)源不斷產(chǎn)生遞增數(shù)字,對產(chǎn)生的數(shù)字累加求和。
 * <p>
 * Storm組件:Spout、Bolt、數(shù)據(jù)是Tuple,使用main中的Topology將spout和bolt進行關聯(lián)
 * MapReduce的組件:Mapper和Reducer、數(shù)據(jù)是Writable,通過一個main中的job將二者關聯(lián)
 * <p>
 * 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現(xiàn)任何功能。
 *                        我們稱這為適配器模式
 */
public class StormLocalSumTopology {
    /**
     * 構建拓撲,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        /**
         * 設置spout和bolt的dag(有向無環(huán)圖)
         */
        builder.setSpout("id_order_spout", new OrderSpout());
        builder.setBolt("id_sum_bolt", new SumBolt())
                .shuffleGrouping("id_order_spout"); // 通過不同的數(shù)據(jù)流轉(zhuǎn)方式,來指定數(shù)據(jù)的上游組件
        // 使用builder構建topology
        StormTopology topology = builder.createTopology();
        // 啟動topology
        LocalCluster localCluster = new LocalCluster(); // 本地開發(fā)模式,創(chuàng)建的對象為LocalCluster
        String topologyName = StormLocalSumTopology.class.getSimpleName();  // 拓撲的名稱
        Config config = new Config();   // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
        localCluster.submitTopology(topologyName, config, topology);
    }
}

需要說明的是,Spout和Bolt的類都作為StormLocalSumTopology的靜態(tài)成員變量,這樣做是為了開發(fā)的方便,當然實際上也可以將其單獨作為一個文件。

測試

執(zhí)行主函數(shù),其輸出如下:

當前時間20180412213836產(chǎn)生的訂單金額:1
商城網(wǎng)站到目前20180412213836的商品總交易額1
當前時間20180412213837產(chǎn)生的訂單金額:2
商城網(wǎng)站到目前20180412213837的商品總交易額3
當前時間20180412213838產(chǎn)生的訂單金額:3
商城網(wǎng)站到目前20180412213838的商品總交易額6
......

Storm本地開發(fā)案例2:單詞統(tǒng)計

需求分析

需求如下:

監(jiān)控一個目錄下的文件,當發(fā)現(xiàn)有新文件的時候,把文件讀取過來,解析文件中的內(nèi)容,統(tǒng)計單詞出現(xiàn)的總次數(shù)

分析如下:

可以設置三個節(jié)點:
Spout:用于持續(xù)讀取目錄下需要被監(jiān)聽(通過后綴名標識)的文件,并且將每一行輸出到下一個Bolt中
        (類似于MapReduce中的FileInputFormat)
Bolt1:讀取行,并解析其中的單詞,將每個單詞輸出到下一個Bolt中
        (類似于MapReduce中的Mapper)
Bolt2:讀取單詞,進行統(tǒng)計計算
        (類似于MapReduce中的Reducer)

程序開發(fā)

FileSpout

/**
     * Spout,獲取數(shù)據(jù)源,這里是持續(xù)讀取某一目錄下的文件,并將每一行輸出到下一個Bolt中
     */
static class FileSpout extends BaseRichSpout {
    private Map conf;   // 當前組件配置信息
    private TopologyContext context;    // 當前組件上下文對象
    private SpoutOutputCollector collector; // 發(fā)送tuple的組件

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        File directory = new File("D:/data/storm");
        // 第二個參數(shù)extensions的意思就是,只采集某些后綴名的文件
        Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true);
        for (File file : files) {
            try {
                List<String> lines = FileUtils.readLines(file, "utf-8");
                for(String line : lines) {
                    this.collector.emit(new Values(line));
                }
                // 當前文件被消費之后,需要重命名,同時為了防止相同文件的加入,重命名后的文件加了一個隨機的UUID,或者加入時間戳也可以的
                File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed");
                FileUtils.moveFile(file, destFile);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}
SplitBolt

/**
     * Bolt節(jié)點,將接收到的每一行數(shù)據(jù)切割為一個個單詞并發(fā)送到下一個節(jié)點
     */
static class SplitBolt extends BaseRichBolt {

    private Map conf;   // 當前組件配置信息
    private TopologyContext context;    // 當前組件上下文對象
    private OutputCollector collector; // 發(fā)送tuple的組件

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split(" ");
        for (String word : words) {
            this.collector.emit(new Values(word,1));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
WCBolt

/**
     * Bolt節(jié)點,執(zhí)行單詞統(tǒng)計計算
     */
static class WCBolt extends BaseRichBolt {

    private Map conf;   // 當前組件配置信息
    private TopologyContext context;    // 當前組件上下文對象
    private OutputCollector collector; // 發(fā)送tuple的組件

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    private Map<String, Integer> map = new HashMap<>();

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = input.getIntegerByField("count");
        /*if (map.containsKey(word)) {
                map.put(word, map.get(word) + 1);
            } else {
                map.put(word, 1);
            }*/
        map.put(word, map.getOrDefault(word, 0) + 1);

        System.out.println("====================================");
        map.forEach((k ,v)->{
            System.out.println(k + ":::" +v);
        });
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
StormLocalWordCountTopology

/**
 * 2°、單詞計數(shù):監(jiān)控一個目錄下的文件,當發(fā)現(xiàn)有新文件的時候,
        把文件讀取過來,解析文件中的內(nèi)容,統(tǒng)計單詞出現(xiàn)的總次數(shù)
        E:\data\storm
 */
public class StormLocalWordCountTopology {

    /**
     * 構建拓撲,組裝Spout和Bolt節(jié)點,相當于在MapReduce中構建Job
     */
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        // dag
        builder.setSpout("id_file_spout", new FileSpout());
        builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout");
        builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt");

        StormTopology stormTopology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        String topologyName = StormLocalWordCountTopology.class.getSimpleName();
        Config config = new Config();
        cluster.submitTopology(topologyName, config, stormTopology);
    }
}

測試

執(zhí)行程序后,往目標目錄中添加.txt文件,程序輸出如下:

====================================
hello:::1
====================================
hello:::1
you:::1
====================================
hello:::2
you:::1
====================================
hello:::2
he:::1
you:::1
====================================
hello:::3
he:::1
you:::1
====================================
me:::1
hello:::3
he:::1
you:::1

Storm名詞術語解釋

在編寫了Storm的程序后,再來看看其相關的術語就容易理解很多了。

  • Topology
Topology用于封裝一個實時計算應用程序的邏輯,類似于Hadoop的MapReduce Job
  • Stream消息流
Stream 消息流,是一個沒有邊界的tuple序列,這些tuples會被以一種分布式的方式并行地創(chuàng)建和處理
  • Spouts消息源
Spouts 消息源,是消息生產(chǎn)者,他會從一個外部源讀取數(shù)據(jù)并向topology里面面發(fā)出消息:tuple
  • Bolts消息處理者
Bolts 消息處理者,所有的消息處理邏輯被封裝在bolts里面,處理輸入的數(shù)據(jù)流并產(chǎn)生新的輸出數(shù)據(jù)流,
可執(zhí)行過濾,聚合,查詢數(shù)據(jù)庫等操作
  • Task
Task 每一個Spout和Bolt會被當作很多task在整個集群里面執(zhí)行,每一個task對應到一個線程.
  • Stream groupings 消息分發(fā)策略
Stream groupings 消息分發(fā)策略,定義一個Topology的其中一步是定義每個tuple接受什么樣的流作為輸入,
stream grouping就是用來定義一個stream應該如何分配給Bolts們.
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI