溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

發(fā)布時間:2020-08-31 13:02:48 來源:網(wǎng)絡(luò) 閱讀:7972 作者:xpleaf 欄目:大數(shù)據(jù)

[TOC]


實時WordCount案例

主要是監(jiān)聽網(wǎng)絡(luò)端口中的數(shù)據(jù),并實時進(jìn)行wc的計算。

Java版

測試代碼如下:

package cn.xpleaf.bigdata.spark.java.streaming.p1;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * 使用Java開發(fā)SparkStreaming的第一個應(yīng)用程序
 *
 * 用于監(jiān)聽網(wǎng)絡(luò)socket中的一個端口,實時獲取對應(yīng)的文本內(nèi)容
 * 計算文本內(nèi)容中的每一個單詞出現(xiàn)的次數(shù)
 */
public class _01SparkStreamingNetWorkWCOps {
    public static void main(String[] args) {
        if(args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usage: <hostname> <port>");
            System.exit(-1);
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);

        SparkConf conf = new SparkConf()
                .setAppName(_01SparkStreamingNetWorkWCOps.class.getSimpleName())
        /*
         *  設(shè)置為local是無法計算數(shù)據(jù),但是能夠接收數(shù)據(jù)
         *  設(shè)置為local[2]是既可以計算數(shù)據(jù),也可以接收數(shù)據(jù)
         *      當(dāng)master被設(shè)置為local的時候,只有一個線程,且只能被用來接收外部的數(shù)據(jù),所以不能夠進(jìn)行計算,如此便不會做對應(yīng)的輸出
         *      所以在使用的本地模式時,同時是監(jiān)聽網(wǎng)絡(luò)socket數(shù)據(jù),線程個數(shù)必須大于等于2
         */
                .setMaster("local[2]");
        /**
         * 第二個參數(shù):Duration是SparkStreaming用于進(jìn)行采集多長時間段內(nèi)的數(shù)據(jù)將其拆分成一個個batch
         * 該例表示每隔2秒采集一次數(shù)據(jù),將數(shù)據(jù)打散成一個個batch(其實就是SparkCore中的一個個RDD)
         */
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));

        String hostname = args[0].trim();
        int port = Integer.valueOf(args[1].trim());

        JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);// 默認(rèn)的持久化級別StorageLevel.MEMORY_AND_DISK_SER_2

        JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {

                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
            return new Tuple2<String, Integer>(word, 1);
        });

        JavaPairDStream<String, Integer> retDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        retDStream.print();

        // 啟動流式計算
        jsc.start();
        // 等待執(zhí)行結(jié)束
        jsc.awaitTermination();
        System.out.println("結(jié)束了沒有呀,哈哈哈~");
        jsc.close();
    }
}

啟動程序,同時在主機(jī)上使用nc命令進(jìn)行操作:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me

輸出結(jié)果如下:

-------------------------------------------
Time: 1525929096000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)

同時也可以在Spark UI上查看相應(yīng)的作業(yè)執(zhí)行情況:

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

可以看到,每2秒就會執(zhí)行一次計算,即每隔2秒采集一次數(shù)據(jù),將數(shù)據(jù)打散成一個個batch(其實就是SparkCore中的一個個RDD)。

Scala版

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object _01SparkStreamingNetWorkOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                |hostname: 監(jiān)聽的網(wǎng)絡(luò)socket的主機(jī)名或ip地址
                |port:    監(jiān)聽的網(wǎng)絡(luò)socket的端口
              """.stripMargin)
            System.exit(-1)
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
                            .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
                            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(2))
        val hostname = args(0).trim
        val port = args(1).trim.toInt

        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
        val wordsDStream:DStream[String] = linesDStream.flatMap({case line => line.split(" ")})
        val pairsDStream:DStream[(String, Integer)] = wordsDStream.map({case word => (word, 1)})
        val retDStream:DStream[(String, Integer)] = pairsDStream.reduceByKey{case (v1, v2) => v1 + v2}

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()  // stop中的boolean參數(shù),設(shè)置為true,關(guān)閉該ssc對應(yīng)的SparkContext,默認(rèn)為false,只關(guān)閉自身
    }
}

啟動程序,同時在主機(jī)上使用nc命令進(jìn)行操作:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me

輸出結(jié)果如下:

-------------------------------------------
Time: 1525929574000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)

StreamingContext和DStream詳解

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

StreamingContext的創(chuàng)建方式

1、在Spark中有兩種創(chuàng)建StreamingContext的方式

1)根據(jù)SparkConf進(jìn)行創(chuàng)建

val conf = new SparkConf().setAppName(appname).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(10));

2)還可以根據(jù)SparkContext進(jìn)行創(chuàng)建

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10));

appname,是用來在Spark UI上顯示的應(yīng)用名稱。master,是一個Spark、Mesos或者Yarn集群的URL,或者是local[*]。

2、batch interval:Seconds(10)可以根據(jù)我們自己應(yīng)用程序的情況進(jìn)行不同的設(shè)置。

StreamingContext的創(chuàng)建、啟動和銷毀

一、一個StreamingContext定義之后,必須執(zhí)行以下程序進(jìn)行實時計算的執(zhí)行

1、創(chuàng)建輸入DStream來創(chuàng)建輸入不同的數(shù)據(jù)源。

2、對DStream定義transformation和output等各種算子操作,來定義我們需要的各種實時計算邏輯。

3、調(diào)用StreamingContext的start()方法,進(jìn)行啟動我們的實時處理數(shù)據(jù)。

4、調(diào)用StreamingContext的awaitTermination()方法,來等待應(yīng)用程序的終止??梢允褂肅TRL+C手動停止,或者就是讓它持續(xù)不斷的運(yùn)行進(jìn)行計算。

5、也可以通過調(diào)用StreamingContext的stop()方法,來停止應(yīng)用程序。

二、備注(十分重要)

1、只要我們一個StreamingContext啟動之后,我們就不能再往這個Application其中添加任何計算邏輯了。比如執(zhí)行start()方法之后,還給某個DStream執(zhí)行一個算子,這是不允許的。

2、一個StreamingContext停止之后,是肯定不能夠重啟的。調(diào)用stop()之后,不能再調(diào)用start()

3、必須保證一個JVM同時只能有一個StreamingContext啟動。在你的應(yīng)用程序中,不能創(chuàng)建兩個StreamingContext。

4、調(diào)用stop()方法時,會同時停止內(nèi)部的SparkContext,如果不希望如此,還希望后面繼續(xù)使用SparkContext創(chuàng)建其他類型的Context,比如SQLContext,那么就用stop(false)。

5、一個SparkContext可以創(chuàng)建多個StreamingContext,只要上一個先用stop(false)停止,再創(chuàng)建下一個即可。(注意與第2點的區(qū)別,這里是再創(chuàng)建了一個StreamingContext)

輸入DStream和Receiver

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

輸入DStream代表了來自數(shù)據(jù)源的輸入數(shù)據(jù)流。我們之前做過了一些例子,比如從文件讀取、從TCP、從HDFS讀取等。每個DSteam都會綁定一個Receiver對象,該對象是一個關(guān)鍵的核心組件,用來從我們的各種數(shù)據(jù)源接受數(shù)據(jù),并將其存儲在Spark的內(nèi)存當(dāng)中,這個內(nèi)存的StorageLevel,我們可以自己進(jìn)行指定,老師在以后的例子中會講解這部分。

Spark Streaming提供了兩種內(nèi)置的數(shù)據(jù)源支持:

1、基礎(chǔ)數(shù)據(jù)源:SSC API中直接提供了對這些數(shù)據(jù)源的支持,比如文件、tcp socket、Akka Actor等。

2、高級數(shù)據(jù)源:比如Kafka、Flume、Kinesis和Twitter等數(shù)據(jù)源,要引入第三方的JAR來完成我們的工作。

3、自定義數(shù)據(jù)源:比如我們的ZMQ、RabbitMQ、ActiveMQ等任何格式的自定義數(shù)據(jù)源。關(guān)于自定義數(shù)據(jù)源,老師在講解最后一個項目的時候,會使用此自定義數(shù)據(jù)源如果從ZMQ中讀取數(shù)據(jù)。官方提供的Spark-ZMQ是基于zmq2.0版本的,因為現(xiàn)在的 生產(chǎn)環(huán)境都是基于ZMQ4以上的版本了,所以必須自己定義并實現(xiàn)了一個自定義的receiver機(jī)制。

Spark Streaming的運(yùn)行機(jī)制local[*]分析

1、如果我們想要在我們的Spark Streaming應(yīng)用中并行讀取N多數(shù)據(jù)的話,我們可以啟動創(chuàng)建多個DStream。這樣子就會創(chuàng)建多個Receiver,老師最多的一個案例是啟動了128個Receive,每個receiver每秒的數(shù)據(jù)是1000W以上。

2、但是要注意的是,我們Spark Streaming Application的Executor進(jìn)程,是個長時間運(yùn)行的一個進(jìn)程,因此它會獨占分給他的cpu core。所以它只能自己處理這件事情了,不能再干其他活了。

3、使用本地模式local運(yùn)行我們的Spark Streaming程序時,絕對不能使用local或者 local[1]的模式。因為Spark Streaming運(yùn)行的時候,必須要至少要有2個線程。如果只給了一條的話,Spark Streaming Application程序會直接hang在哪兒。 兩條線程的一條用來分配給Receiver接收數(shù)據(jù),另外一條線程用來處理接受到的數(shù)據(jù)。因此我們想要進(jìn)行本地測試的話,必須滿足local[N],這個N一定要大于2

4、如果我們想要在我們的Spark進(jìn)群上運(yùn)行的話,那么首先,必須要求我們的集群每個節(jié)點上,有>1個cpu core。其次,給Spark Streaming的每個executor分配的core,必須>1,這樣,才能保證分配到executor上運(yùn)行的輸入DStream,兩條線程并行,一條運(yùn)行Receiver,接收數(shù)據(jù);一條處理數(shù)據(jù)。否則的話,只會接收數(shù)據(jù),不會處理數(shù)據(jù)。

DStream與HDFS集成

輸入DFStream基礎(chǔ)數(shù)據(jù)源

基于HDFS文件的實時計算,其實就是監(jiān)控我們的一個HDFS目錄,只要其中有新文件出現(xiàn),就實時處理。相當(dāng)于處理實時的文件流。

===》Spark Streaming會監(jiān)視指定的HDFS目錄,并且處理出現(xiàn)在目錄中的文件。

1)在HDFS中的所有目錄下的文件,必須滿足相同的格式,不然的話,不容易處理。必須使用移動或者重命名的方式,將文件移入目錄。一旦處理之后,文件的內(nèi)容及時改變,也不會再處理了。

2)基于HDFS的數(shù)據(jù)結(jié)源讀取是沒有receiver的,因此不會占用一個cpu core。

3)實際上在下面的測試案例中,一直也沒有效果,也就是監(jiān)聽不到HDFS中的文件,本地文件也沒有效果;

基于HDFS的實時WordCounter案例實戰(zhàn)

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming監(jiān)聽hdfs的某一個目錄的變化(新增文件)
  */
object _02SparkStreamingHDFSOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_02SparkStreamingHDFSOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))

        val linesDStream:DStream[String] = ssc.textFileStream("hdfs://ns1/input/spark/streaming/")
//        val linesDStream:DStream[String] = ssc.textFileStream("D:/data/spark/streaming")
        linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

DStream與Kafka集成(基于Receiver方式)

Spark與Kafka集成的方式

1、利用Kafka的Receiver方式進(jìn)行集成

2、利用Kafka的Direct方式進(jìn)行集成

Spark-Streaming獲取kafka數(shù)據(jù)的兩種方式-Receiver與Direct的方式,可以從代碼中簡單理解成Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節(jié)點上獲取數(shù)據(jù)了。

基于Kafka的Receiver方式集成

這種方式使用Receiver來獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。然而,在默認(rèn)的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù)。如果要啟用高可靠機(jī)制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL)。該機(jī)制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中。所以,即使底層節(jié)點出現(xiàn)了失敗,也可以使用預(yù)寫日志中的數(shù)據(jù)進(jìn)行恢復(fù)。

補(bǔ)充說明:

(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關(guān)系的。所以,在KafkaUtils.createStream()中,提高partition的數(shù)量,只會增加一個Receiver中,讀取partition的線程的數(shù)量。不會增加Spark處理數(shù)據(jù)的并行度。

(2)、可以創(chuàng)建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收數(shù)據(jù)。

(3)、如果基于容錯的文件系統(tǒng),比如HDFS,啟用了預(yù)寫日志機(jī)制,接收到的數(shù)據(jù)都會被復(fù)制一份到預(yù)寫日志中。因此,在KafkaUtils.createStream()中,設(shè)置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。

與Kafka的集成--Maven

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.6.2</version>
</dependency>

Kafka啟動、驗證和測試

啟動kafka服務(wù)

kafka-server-start.sh -daemon config/server.properties

創(chuàng)建topic

kafka-topics.sh --create --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3

列舉kafka中已經(jīng)創(chuàng)建的topic

kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

列舉每個節(jié)點都保護(hù)那些topic、Partition

kafka-topics.sh --describe --zookeeper uplooking01:2181, uplooking02:2181, uplooking03:21821 --topic spark-kafka
  leader:負(fù)責(zé)處理消息的讀和寫,leader是從所有節(jié)點中隨機(jī)選擇的.
  replicas:列出了所有的副本節(jié)點,不管節(jié)點是否在服務(wù)中.
  isr:是正在服務(wù)中的節(jié)點.

產(chǎn)生數(shù)據(jù)

kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092

消費數(shù)據(jù)

kafka-console-consumer.sh --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

案例

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Kafka和SparkStreaming基于Receiver的模式集成
  */
object _03SparkStreamingKafkaReceiverOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_03SparkStreamingKafkaReceiverOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
//        ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka")   // checkpoint文件保存到hdfs中
        ssc.checkpoint("file:///D:/data/spark/streaming/checkpoint/streaming/kafka")    // checkpoint文件保存到本地文件系統(tǒng)

        /**
          * 使用Kafka Receiver的方式,來創(chuàng)建的輸入DStream,需要使用SparkStreaming提供的Kafka整合API
          * KafkaUtils
          */
        val zkQuorum = "uplooking01:2181,uplooking02:2181,uplooking03:2181"
        val groupId = "kafka-receiver-group-id"
        val topics:Map[String, Int] = Map("spark-kafka"->3)
        // ReceiverInputDStream中的key就是當(dāng)前一條數(shù)據(jù)在kafka中的key,value就是該條數(shù)據(jù)對應(yīng)的value
        val linesDStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)

        val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

在kafka中生產(chǎn)數(shù)據(jù):

[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me

輸出結(jié)果如下:

-------------------------------------------
Time: 1525965130000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)

在上面的代碼中,還啟用了Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL)。

如果數(shù)據(jù)保存在本地文件系統(tǒng),則如下:

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

如果數(shù)據(jù)保存在HDFS中,則如下:

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

DStream與Kafka集成(基于Direct方式)

Spark和Kafka集成Direct的特點

(1)Direct的方式是會直接操作kafka底層的元數(shù)據(jù)信息,這樣如果計算失敗了,可以把數(shù)據(jù)重新讀一下,重新處理。即數(shù)據(jù)一定會被處理。拉數(shù)據(jù),是RDD在執(zhí)行的時候直接去拉數(shù)據(jù)。

(2)由于直接操作的是kafka,kafka就相當(dāng)于你底層的文件系統(tǒng)。這個時候能保證嚴(yán)格的事務(wù)一致性,即一定會被處理,而且只會被處理一次。而Receiver的方式則不能保證,因為Receiver和ZK中的數(shù)據(jù)可能不同步,Spark Streaming可能會重復(fù)消費數(shù)據(jù),這個調(diào)優(yōu)可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,spark streaming自己負(fù)責(zé)追蹤消費這個數(shù)據(jù)的偏移量或者offset,并且自己保存到checkpoint,所以它的數(shù)據(jù)一定是同步的,一定不會被重復(fù)。即使重啟也不會重復(fù),因為checkpoint了,但是程序升級的時候,不能讀取原先的checkpoint,面對升級checkpoint無效這個問題,怎么解決呢?升級的時候讀取我指定的備份就可以了,即手動的指定checkpoint也是可以的,這就再次完美的確保了事務(wù)性,有且僅有一次的事務(wù)機(jī)制。那么怎么手動checkpoint呢?構(gòu)建SparkStreaming的時候,有g(shù)etorCreate這個api,它就會獲取checkpoint的內(nèi)容,具體指定下這個checkpoint在哪就好了。

(3)由于底層是直接讀數(shù)據(jù),沒有所謂的Receiver,直接是周期性(Batch Intervel)的查詢kafka,處理數(shù)據(jù)的時候,我們會使用基于kafka原生的Consumer api來獲取kafka中特定范圍(offset范圍)中的數(shù)據(jù)。這個時候,Direct Api訪問kafka帶來的一個顯而易見的性能上的好處就是,如果你要讀取多個partition,Spark也會創(chuàng)建RDD的partition,這個時候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個partition是沒任何關(guān)系的。這個優(yōu)勢是你的RDD,其實本質(zhì)上講在底層讀取kafka的時候,kafka的partition就相當(dāng)于原先hdfs上的一個block。這就符合了數(shù)據(jù)本地性。RDD和kafka數(shù)據(jù)都在這邊。所以讀數(shù)據(jù)的地方,處理數(shù)據(jù)的地方和驅(qū)動數(shù)據(jù)處理的程序都在同樣的機(jī)器上,這樣就可以極大的提高性能。不足之處是由于RDD和kafka的patition是一對一的,想提高并行度就會比較麻煩。提高并行度還是repartition,即重新分區(qū),因為產(chǎn)生shuffle,很耗時。這個問題,以后也許新版本可以自由配置比例,不是一對一。因為提高并行度,可以更好的利用集群的計算資源,這是很有意義的。

(4)不需要開啟wal機(jī)制,從數(shù)據(jù)零丟失的角度來看,極大的提升了效率,還至少能節(jié)省一倍的磁盤空間。從kafka獲取數(shù)據(jù),比從hdfs獲取數(shù)據(jù),因為zero copy的方式,速度肯定更快。

Kafka Direct VS Receiver

從高層次的角度看,之前的和Kafka集成方案(reciever方法)使用WAL工作方式如下:

1)運(yùn)行在Spark workers/executors上的Kafka Receivers連續(xù)不斷地從Kafka中讀取數(shù)據(jù),其中用到了Kafka中高層次的消費者API。

2)接收到的數(shù)據(jù)被存儲在Spark workers/executors中的內(nèi)存,同時也被寫入到WAL中。只有接收到的數(shù)據(jù)被持久化到log中,Kafka Receivers才會去更新Zookeeper中Kafka的偏移量。

3)接收到的數(shù)據(jù)和WAL存儲位置信息被可靠地存儲,如果期間出現(xiàn)故障,這些信息被用來從錯誤中恢復(fù),并繼續(xù)處理數(shù)據(jù)。

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

  • 這個方法可以保證從Kafka接收的數(shù)據(jù)不被丟失。但是在失敗的情況下,有些數(shù)據(jù)很有可能會被處理不止一次!這種情況在一些接收到的數(shù)據(jù)被可靠地保存到WAL中,但是還沒有來得及更新Zookeeper中Kafka偏移量,系統(tǒng)出現(xiàn)故障的情況下發(fā)生。這導(dǎo)致數(shù)據(jù)出現(xiàn)不一致性:Spark Streaming知道數(shù)據(jù)被接收,但是Kafka那邊認(rèn)為數(shù)據(jù)還沒有被接收,這樣在系統(tǒng)恢復(fù)正常時,Kafka會再一次發(fā)送這些數(shù)據(jù)。
  • 這種不一致產(chǎn)生的原因是因為兩個系統(tǒng)無法對那些已經(jīng)接收到的數(shù)據(jù)信息保存進(jìn)行原子操作。為了解決這個問題,只需要一個系統(tǒng)來維護(hù)那些已經(jīng)發(fā)送或接收的一致性視圖,而且,這個系統(tǒng)需要擁有從失敗中恢復(fù)的一切控制權(quán)利?;谶@些考慮,社區(qū)決定將所有的消費偏移量信息只存儲在Spark Streaming中,并且使用Kafka的低層次消費者API來從任意位置恢復(fù)數(shù)據(jù)。

為了構(gòu)建這個系統(tǒng),新引入的Direct API采用完全不同于Receivers和WALs的處理方式。它不是啟動一個Receivers來連續(xù)不斷地從Kafka中接收數(shù)據(jù)并寫入到WAL中,而是簡單地給出每個batch區(qū)間需要讀取的偏移量位置,最后,每個batch的Job被運(yùn)行,那些對應(yīng)偏移量的數(shù)據(jù)在Kafka中已經(jīng)準(zhǔn)備好了。這些偏移量信息也被可靠地存儲(checkpoint),在從失敗中恢復(fù)

Spark Streaming筆記整理(二):案例、SSC、數(shù)據(jù)源與自定義Receiver

  • 需要注意的是,Spark Streaming可以在失敗以后重新從Kafka中讀取并處理那些數(shù)據(jù)段。然而,由于僅處理一次的語義,最后重新處理的結(jié)果和沒有失敗處理的結(jié)果是一致的。
  • 因此,Direct API消除了需要使用WAL和Receivers的情況,而且確保每個Kafka記錄僅被接收一次并被高效地接收。這就使得我們可以將Spark Streaming和Kafka很好地整合在一起??傮w來說,這些特性使得流處理管道擁有高容錯性,高效性,而且很容易地被使用。

案例

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Kafka和SparkStreaming基于Direct的模式集成
  *
  * 在公司中使用Kafka-Direct方式
  */
object _04SparkStreamingKafkaDirectOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_04SparkStreamingKafkaDirectOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))

//        ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka")   // checkpoint文件也是可以保存到hdfs中的,不過必要性不大了,對于direct的方式來說

        val kafkaParams:Map[String, String] = Map("metadata.broker.list"-> "uplooking01:9092,uplooking02:9092,uplooking03:9092")
        val topics:Set[String] = Set("spark-kafka")
        val linesDStream:InputDStream[(String, String)] = KafkaUtils.
            // 參數(shù)分別為:key類型,value類型,key的×××,value的×××
            createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

        val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

生產(chǎn)數(shù)據(jù):

[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me

輸出結(jié)果如下:

-------------------------------------------
Time: 1525966750000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)

自定義Receiver

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming自定義Receiver
  * 通過模擬Network來學(xué)習(xí)自定義Receiver
  *
  * 自定義的步驟:
  *     1.創(chuàng)建一個類繼承一個類或者實現(xiàn)某個接口
  *     2.復(fù)寫啟動的個別方法
  *     3.進(jìn)行注冊調(diào)用
  */
object _05SparkStreamingCustomReceiverOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                  |hostname: 監(jiān)聽的網(wǎng)絡(luò)socket的主機(jī)名或ip地址
                  |port:    監(jiān)聽的網(wǎng)絡(luò)socket的端口
                """.stripMargin)
            System.exit(-1)
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_05SparkStreamingCustomReceiverOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
        val hostname = args(0).trim
        val port = args(1).trim.toInt

        val linesDStream:ReceiverInputDStream[String] = ssc.receiverStream[String](new MyNetWorkReceiver(hostname, port))
        val retDStream:DStream[(String, Int)] = linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

/**
  * 自定義receiver
  */
class MyNetWorkReceiver(storageLevel:StorageLevel) extends Receiver[String](storageLevel) {

    private var hostname:String = _
    private var port:Int = _

    def this(hostname:String, port:Int, storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) {
        this(storageLevel)
        this.hostname = hostname
        this.port = port
    }

    /**
      * 啟動及其初始化receiver資源
      */
    override def onStart(): Unit = {
        val thread = new Thread() {
            override def run(): Unit = {
                receive()
            }
        }
        thread.setDaemon(true)  // 設(shè)置成為后臺線程
        thread.start()
    }

    // 接收數(shù)據(jù)的核心api 讀取網(wǎng)絡(luò)socket中的數(shù)據(jù)
    def receive(): Unit = {
        val socket = new Socket(hostname, port)
        val ins = socket.getInputStream()
        val br = new BufferedReader(new InputStreamReader(ins))
        var line:String = null
        while((line = br.readLine()) != null) {
            store(line)
        }
        ins.close()
        socket.close()
    }

    override def onStop(): Unit = {

    }
}

啟動nc,并輸入數(shù)據(jù):

[uplooking@uplooking01 ~]$ nc -lk 4893
hello you hello he hello me

輸出結(jié)果如下:

(hello,3)
(me,1)
(you,1)
(he,1)
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI