您好,登錄后才能下訂單哦!
[TOC]
主要是監(jiān)聽網(wǎng)絡(luò)端口中的數(shù)據(jù),并實時進(jìn)行wc的計算。
測試代碼如下:
package cn.xpleaf.bigdata.spark.java.streaming.p1;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
/**
* 使用Java開發(fā)SparkStreaming的第一個應(yīng)用程序
*
* 用于監(jiān)聽網(wǎng)絡(luò)socket中的一個端口,實時獲取對應(yīng)的文本內(nèi)容
* 計算文本內(nèi)容中的每一個單詞出現(xiàn)的次數(shù)
*/
public class _01SparkStreamingNetWorkWCOps {
public static void main(String[] args) {
if(args == null || args.length < 2) {
System.err.println("Parameter Errors! Usage: <hostname> <port>");
System.exit(-1);
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
SparkConf conf = new SparkConf()
.setAppName(_01SparkStreamingNetWorkWCOps.class.getSimpleName())
/*
* 設(shè)置為local是無法計算數(shù)據(jù),但是能夠接收數(shù)據(jù)
* 設(shè)置為local[2]是既可以計算數(shù)據(jù),也可以接收數(shù)據(jù)
* 當(dāng)master被設(shè)置為local的時候,只有一個線程,且只能被用來接收外部的數(shù)據(jù),所以不能夠進(jìn)行計算,如此便不會做對應(yīng)的輸出
* 所以在使用的本地模式時,同時是監(jiān)聽網(wǎng)絡(luò)socket數(shù)據(jù),線程個數(shù)必須大于等于2
*/
.setMaster("local[2]");
/**
* 第二個參數(shù):Duration是SparkStreaming用于進(jìn)行采集多長時間段內(nèi)的數(shù)據(jù)將其拆分成一個個batch
* 該例表示每隔2秒采集一次數(shù)據(jù),將數(shù)據(jù)打散成一個個batch(其實就是SparkCore中的一個個RDD)
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));
String hostname = args[0].trim();
int port = Integer.valueOf(args[1].trim());
JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);// 默認(rèn)的持久化級別StorageLevel.MEMORY_AND_DISK_SER_2
JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
return new Tuple2<String, Integer>(word, 1);
});
JavaPairDStream<String, Integer> retDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
retDStream.print();
// 啟動流式計算
jsc.start();
// 等待執(zhí)行結(jié)束
jsc.awaitTermination();
System.out.println("結(jié)束了沒有呀,哈哈哈~");
jsc.close();
}
}
啟動程序,同時在主機(jī)上使用nc命令進(jìn)行操作:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me
輸出結(jié)果如下:
-------------------------------------------
Time: 1525929096000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)
同時也可以在Spark UI上查看相應(yīng)的作業(yè)執(zhí)行情況:
可以看到,每2秒就會執(zhí)行一次計算,即每隔2秒采集一次數(shù)據(jù),將數(shù)據(jù)打散成一個個batch(其實就是SparkCore中的一個個RDD)。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object _01SparkStreamingNetWorkOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監(jiān)聽的網(wǎng)絡(luò)socket的主機(jī)名或ip地址
|port: 監(jiān)聽的網(wǎng)絡(luò)socket的端口
""".stripMargin)
System.exit(-1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val hostname = args(0).trim
val port = args(1).trim.toInt
val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
val wordsDStream:DStream[String] = linesDStream.flatMap({case line => line.split(" ")})
val pairsDStream:DStream[(String, Integer)] = wordsDStream.map({case word => (word, 1)})
val retDStream:DStream[(String, Integer)] = pairsDStream.reduceByKey{case (v1, v2) => v1 + v2}
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop() // stop中的boolean參數(shù),設(shè)置為true,關(guān)閉該ssc對應(yīng)的SparkContext,默認(rèn)為false,只關(guān)閉自身
}
}
啟動程序,同時在主機(jī)上使用nc命令進(jìn)行操作:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me
輸出結(jié)果如下:
-------------------------------------------
Time: 1525929574000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)
1、在Spark中有兩種創(chuàng)建StreamingContext的方式
1)根據(jù)SparkConf進(jìn)行創(chuàng)建
val conf = new SparkConf().setAppName(appname).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(10));
2)還可以根據(jù)SparkContext進(jìn)行創(chuàng)建
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10));
appname,是用來在Spark UI上顯示的應(yīng)用名稱。master,是一個Spark、Mesos或者Yarn集群的URL,或者是local[*]。
2、batch interval:Seconds(10)可以根據(jù)我們自己應(yīng)用程序的情況進(jìn)行不同的設(shè)置。
一、一個StreamingContext定義之后,必須執(zhí)行以下程序進(jìn)行實時計算的執(zhí)行
1、創(chuàng)建輸入DStream來創(chuàng)建輸入不同的數(shù)據(jù)源。
2、對DStream定義transformation和output等各種算子操作,來定義我們需要的各種實時計算邏輯。
3、調(diào)用StreamingContext的start()方法,進(jìn)行啟動我們的實時處理數(shù)據(jù)。
4、調(diào)用StreamingContext的awaitTermination()方法,來等待應(yīng)用程序的終止??梢允褂肅TRL+C手動停止,或者就是讓它持續(xù)不斷的運(yùn)行進(jìn)行計算。
5、也可以通過調(diào)用StreamingContext的stop()方法,來停止應(yīng)用程序。
二、備注(十分重要)
1、只要我們一個StreamingContext啟動之后,我們就不能再往這個Application其中添加任何計算邏輯了。比如執(zhí)行start()方法之后,還給某個DStream執(zhí)行一個算子,這是不允許的。
2、一個StreamingContext停止之后,是肯定不能夠重啟的。調(diào)用stop()之后,不能再調(diào)用start()
3、必須保證一個JVM同時只能有一個StreamingContext啟動。在你的應(yīng)用程序中,不能創(chuàng)建兩個StreamingContext。
4、調(diào)用stop()方法時,會同時停止內(nèi)部的SparkContext,如果不希望如此,還希望后面繼續(xù)使用SparkContext創(chuàng)建其他類型的Context,比如SQLContext,那么就用stop(false)。
5、一個SparkContext可以創(chuàng)建多個StreamingContext,只要上一個先用stop(false)停止,再創(chuàng)建下一個即可。(注意與第2點的區(qū)別,這里是再創(chuàng)建了一個StreamingContext)
輸入DStream代表了來自數(shù)據(jù)源的輸入數(shù)據(jù)流。我們之前做過了一些例子,比如從文件讀取、從TCP、從HDFS讀取等。每個DSteam都會綁定一個Receiver對象,該對象是一個關(guān)鍵的核心組件,用來從我們的各種數(shù)據(jù)源接受數(shù)據(jù),并將其存儲在Spark的內(nèi)存當(dāng)中,這個內(nèi)存的StorageLevel,我們可以自己進(jìn)行指定,老師在以后的例子中會講解這部分。
Spark Streaming提供了兩種內(nèi)置的數(shù)據(jù)源支持:
1、基礎(chǔ)數(shù)據(jù)源:SSC API中直接提供了對這些數(shù)據(jù)源的支持,比如文件、tcp socket、Akka Actor等。
2、高級數(shù)據(jù)源:比如Kafka、Flume、Kinesis和Twitter等數(shù)據(jù)源,要引入第三方的JAR來完成我們的工作。
3、自定義數(shù)據(jù)源:比如我們的ZMQ、RabbitMQ、ActiveMQ等任何格式的自定義數(shù)據(jù)源。關(guān)于自定義數(shù)據(jù)源,老師在講解最后一個項目的時候,會使用此自定義數(shù)據(jù)源如果從ZMQ中讀取數(shù)據(jù)。官方提供的Spark-ZMQ是基于zmq2.0版本的,因為現(xiàn)在的 生產(chǎn)環(huán)境都是基于ZMQ4以上的版本了,所以必須自己定義并實現(xiàn)了一個自定義的receiver機(jī)制。
1、如果我們想要在我們的Spark Streaming應(yīng)用中并行讀取N多數(shù)據(jù)的話,我們可以啟動創(chuàng)建多個DStream。這樣子就會創(chuàng)建多個Receiver,老師最多的一個案例是啟動了128個Receive,每個receiver每秒的數(shù)據(jù)是1000W以上。
2、但是要注意的是,我們Spark Streaming Application的Executor進(jìn)程,是個長時間運(yùn)行的一個進(jìn)程,因此它會獨占分給他的cpu core。所以它只能自己處理這件事情了,不能再干其他活了。
3、使用本地模式local運(yùn)行我們的Spark Streaming程序時,絕對不能使用local或者 local[1]的模式。因為Spark Streaming運(yùn)行的時候,必須要至少要有2個線程。如果只給了一條的話,Spark Streaming Application程序會直接hang在哪兒。 兩條線程的一條用來分配給Receiver接收數(shù)據(jù),另外一條線程用來處理接受到的數(shù)據(jù)。因此我們想要進(jìn)行本地測試的話,必須滿足local[N],這個N一定要大于2
4、如果我們想要在我們的Spark進(jìn)群上運(yùn)行的話,那么首先,必須要求我們的集群每個節(jié)點上,有>1個cpu core。其次,給Spark Streaming的每個executor分配的core,必須>1,這樣,才能保證分配到executor上運(yùn)行的輸入DStream,兩條線程并行,一條運(yùn)行Receiver,接收數(shù)據(jù);一條處理數(shù)據(jù)。否則的話,只會接收數(shù)據(jù),不會處理數(shù)據(jù)。
基于HDFS文件的實時計算,其實就是監(jiān)控我們的一個HDFS目錄,只要其中有新文件出現(xiàn),就實時處理。相當(dāng)于處理實時的文件流。
===》Spark Streaming會監(jiān)視指定的HDFS目錄,并且處理出現(xiàn)在目錄中的文件。
1)在HDFS中的所有目錄下的文件,必須滿足相同的格式,不然的話,不容易處理。必須使用移動或者重命名的方式,將文件移入目錄。一旦處理之后,文件的內(nèi)容及時改變,也不會再處理了。
2)基于HDFS的數(shù)據(jù)結(jié)源讀取是沒有receiver的,因此不會占用一個cpu core。
3)實際上在下面的測試案例中,一直也沒有效果,也就是監(jiān)聽不到HDFS中的文件,本地文件也沒有效果;
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* SparkStreaming監(jiān)聽hdfs的某一個目錄的變化(新增文件)
*/
object _02SparkStreamingHDFSOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_02SparkStreamingHDFSOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val linesDStream:DStream[String] = ssc.textFileStream("hdfs://ns1/input/spark/streaming/")
// val linesDStream:DStream[String] = ssc.textFileStream("D:/data/spark/streaming")
linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
1、利用Kafka的Receiver方式進(jìn)行集成
2、利用Kafka的Direct方式進(jìn)行集成
Spark-Streaming獲取kafka數(shù)據(jù)的兩種方式-Receiver與Direct的方式,可以從代碼中簡單理解成Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節(jié)點上獲取數(shù)據(jù)了。
這種方式使用Receiver來獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來實現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動的job會去處理那些數(shù)據(jù)。然而,在默認(rèn)的配置下,這種方式可能會因為底層的失敗而丟失數(shù)據(jù)。如果要啟用高可靠機(jī)制,讓數(shù)據(jù)零丟失,就必須啟用Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL)。該機(jī)制會同步地將接收到的Kafka數(shù)據(jù)寫入分布式文件系統(tǒng)(比如HDFS)上的預(yù)寫日志中。所以,即使底層節(jié)點出現(xiàn)了失敗,也可以使用預(yù)寫日志中的數(shù)據(jù)進(jìn)行恢復(fù)。
補(bǔ)充說明:
(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關(guān)系的。所以,在KafkaUtils.createStream()中,提高partition的數(shù)量,只會增加一個Receiver中,讀取partition的線程的數(shù)量。不會增加Spark處理數(shù)據(jù)的并行度。
(2)、可以創(chuàng)建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收數(shù)據(jù)。
(3)、如果基于容錯的文件系統(tǒng),比如HDFS,啟用了預(yù)寫日志機(jī)制,接收到的數(shù)據(jù)都會被復(fù)制一份到預(yù)寫日志中。因此,在KafkaUtils.createStream()中,設(shè)置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
啟動kafka服務(wù)
kafka-server-start.sh -daemon config/server.properties
創(chuàng)建topic
kafka-topics.sh --create --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
列舉kafka中已經(jīng)創(chuàng)建的topic
kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
列舉每個節(jié)點都保護(hù)那些topic、Partition
kafka-topics.sh --describe --zookeeper uplooking01:2181, uplooking02:2181, uplooking03:21821 --topic spark-kafka
leader:負(fù)責(zé)處理消息的讀和寫,leader是從所有節(jié)點中隨機(jī)選擇的.
replicas:列出了所有的副本節(jié)點,不管節(jié)點是否在服務(wù)中.
isr:是正在服務(wù)中的節(jié)點.
產(chǎn)生數(shù)據(jù)
kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
消費數(shù)據(jù)
kafka-console-consumer.sh --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Kafka和SparkStreaming基于Receiver的模式集成
*/
object _03SparkStreamingKafkaReceiverOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_03SparkStreamingKafkaReceiverOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka") // checkpoint文件保存到hdfs中
ssc.checkpoint("file:///D:/data/spark/streaming/checkpoint/streaming/kafka") // checkpoint文件保存到本地文件系統(tǒng)
/**
* 使用Kafka Receiver的方式,來創(chuàng)建的輸入DStream,需要使用SparkStreaming提供的Kafka整合API
* KafkaUtils
*/
val zkQuorum = "uplooking01:2181,uplooking02:2181,uplooking03:2181"
val groupId = "kafka-receiver-group-id"
val topics:Map[String, Int] = Map("spark-kafka"->3)
// ReceiverInputDStream中的key就是當(dāng)前一條數(shù)據(jù)在kafka中的key,value就是該條數(shù)據(jù)對應(yīng)的value
val linesDStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
在kafka中生產(chǎn)數(shù)據(jù):
[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me
輸出結(jié)果如下:
-------------------------------------------
Time: 1525965130000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)
在上面的代碼中,還啟用了Spark Streaming的預(yù)寫日志機(jī)制(Write Ahead Log,WAL)。
如果數(shù)據(jù)保存在本地文件系統(tǒng),則如下:
如果數(shù)據(jù)保存在HDFS中,則如下:
(1)Direct的方式是會直接操作kafka底層的元數(shù)據(jù)信息,這樣如果計算失敗了,可以把數(shù)據(jù)重新讀一下,重新處理。即數(shù)據(jù)一定會被處理。拉數(shù)據(jù),是RDD在執(zhí)行的時候直接去拉數(shù)據(jù)。
(2)由于直接操作的是kafka,kafka就相當(dāng)于你底層的文件系統(tǒng)。這個時候能保證嚴(yán)格的事務(wù)一致性,即一定會被處理,而且只會被處理一次。而Receiver的方式則不能保證,因為Receiver和ZK中的數(shù)據(jù)可能不同步,Spark Streaming可能會重復(fù)消費數(shù)據(jù),這個調(diào)優(yōu)可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,spark streaming自己負(fù)責(zé)追蹤消費這個數(shù)據(jù)的偏移量或者offset,并且自己保存到checkpoint,所以它的數(shù)據(jù)一定是同步的,一定不會被重復(fù)。即使重啟也不會重復(fù),因為checkpoint了,但是程序升級的時候,不能讀取原先的checkpoint,面對升級checkpoint無效這個問題,怎么解決呢?升級的時候讀取我指定的備份就可以了,即手動的指定checkpoint也是可以的,這就再次完美的確保了事務(wù)性,有且僅有一次的事務(wù)機(jī)制。那么怎么手動checkpoint呢?構(gòu)建SparkStreaming的時候,有g(shù)etorCreate這個api,它就會獲取checkpoint的內(nèi)容,具體指定下這個checkpoint在哪就好了。
(3)由于底層是直接讀數(shù)據(jù),沒有所謂的Receiver,直接是周期性(Batch Intervel)的查詢kafka,處理數(shù)據(jù)的時候,我們會使用基于kafka原生的Consumer api來獲取kafka中特定范圍(offset范圍)中的數(shù)據(jù)。這個時候,Direct Api訪問kafka帶來的一個顯而易見的性能上的好處就是,如果你要讀取多個partition,Spark也會創(chuàng)建RDD的partition,這個時候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個partition是沒任何關(guān)系的。這個優(yōu)勢是你的RDD,其實本質(zhì)上講在底層讀取kafka的時候,kafka的partition就相當(dāng)于原先hdfs上的一個block。這就符合了數(shù)據(jù)本地性。RDD和kafka數(shù)據(jù)都在這邊。所以讀數(shù)據(jù)的地方,處理數(shù)據(jù)的地方和驅(qū)動數(shù)據(jù)處理的程序都在同樣的機(jī)器上,這樣就可以極大的提高性能。不足之處是由于RDD和kafka的patition是一對一的,想提高并行度就會比較麻煩。提高并行度還是repartition,即重新分區(qū),因為產(chǎn)生shuffle,很耗時。這個問題,以后也許新版本可以自由配置比例,不是一對一。因為提高并行度,可以更好的利用集群的計算資源,這是很有意義的。
(4)不需要開啟wal機(jī)制,從數(shù)據(jù)零丟失的角度來看,極大的提升了效率,還至少能節(jié)省一倍的磁盤空間。從kafka獲取數(shù)據(jù),比從hdfs獲取數(shù)據(jù),因為zero copy的方式,速度肯定更快。
從高層次的角度看,之前的和Kafka集成方案(reciever方法)使用WAL工作方式如下:
1)運(yùn)行在Spark workers/executors上的Kafka Receivers連續(xù)不斷地從Kafka中讀取數(shù)據(jù),其中用到了Kafka中高層次的消費者API。
2)接收到的數(shù)據(jù)被存儲在Spark workers/executors中的內(nèi)存,同時也被寫入到WAL中。只有接收到的數(shù)據(jù)被持久化到log中,Kafka Receivers才會去更新Zookeeper中Kafka的偏移量。
3)接收到的數(shù)據(jù)和WAL存儲位置信息被可靠地存儲,如果期間出現(xiàn)故障,這些信息被用來從錯誤中恢復(fù),并繼續(xù)處理數(shù)據(jù)。
為了構(gòu)建這個系統(tǒng),新引入的Direct API采用完全不同于Receivers和WALs的處理方式。它不是啟動一個Receivers來連續(xù)不斷地從Kafka中接收數(shù)據(jù)并寫入到WAL中,而是簡單地給出每個batch區(qū)間需要讀取的偏移量位置,最后,每個batch的Job被運(yùn)行,那些對應(yīng)偏移量的數(shù)據(jù)在Kafka中已經(jīng)準(zhǔn)備好了。這些偏移量信息也被可靠地存儲(checkpoint),在從失敗中恢復(fù)
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Kafka和SparkStreaming基于Direct的模式集成
*
* 在公司中使用Kafka-Direct方式
*/
object _04SparkStreamingKafkaDirectOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_04SparkStreamingKafkaDirectOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka") // checkpoint文件也是可以保存到hdfs中的,不過必要性不大了,對于direct的方式來說
val kafkaParams:Map[String, String] = Map("metadata.broker.list"-> "uplooking01:9092,uplooking02:9092,uplooking03:9092")
val topics:Set[String] = Set("spark-kafka")
val linesDStream:InputDStream[(String, String)] = KafkaUtils.
// 參數(shù)分別為:key類型,value類型,key的×××,value的×××
createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
生產(chǎn)數(shù)據(jù):
[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me
輸出結(jié)果如下:
-------------------------------------------
Time: 1525966750000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* SparkStreaming自定義Receiver
* 通過模擬Network來學(xué)習(xí)自定義Receiver
*
* 自定義的步驟:
* 1.創(chuàng)建一個類繼承一個類或者實現(xiàn)某個接口
* 2.復(fù)寫啟動的個別方法
* 3.進(jìn)行注冊調(diào)用
*/
object _05SparkStreamingCustomReceiverOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監(jiān)聽的網(wǎng)絡(luò)socket的主機(jī)名或ip地址
|port: 監(jiān)聽的網(wǎng)絡(luò)socket的端口
""".stripMargin)
System.exit(-1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_05SparkStreamingCustomReceiverOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val hostname = args(0).trim
val port = args(1).trim.toInt
val linesDStream:ReceiverInputDStream[String] = ssc.receiverStream[String](new MyNetWorkReceiver(hostname, port))
val retDStream:DStream[(String, Int)] = linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
/**
* 自定義receiver
*/
class MyNetWorkReceiver(storageLevel:StorageLevel) extends Receiver[String](storageLevel) {
private var hostname:String = _
private var port:Int = _
def this(hostname:String, port:Int, storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) {
this(storageLevel)
this.hostname = hostname
this.port = port
}
/**
* 啟動及其初始化receiver資源
*/
override def onStart(): Unit = {
val thread = new Thread() {
override def run(): Unit = {
receive()
}
}
thread.setDaemon(true) // 設(shè)置成為后臺線程
thread.start()
}
// 接收數(shù)據(jù)的核心api 讀取網(wǎng)絡(luò)socket中的數(shù)據(jù)
def receive(): Unit = {
val socket = new Socket(hostname, port)
val ins = socket.getInputStream()
val br = new BufferedReader(new InputStreamReader(ins))
var line:String = null
while((line = br.readLine()) != null) {
store(line)
}
ins.close()
socket.close()
}
override def onStop(): Unit = {
}
}
啟動nc,并輸入數(shù)據(jù):
[uplooking@uplooking01 ~]$ nc -lk 4893
hello you hello he hello me
輸出結(jié)果如下:
(hello,3)
(me,1)
(you,1)
(he,1)
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。