溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

二、spark--spark core原理與使用

發(fā)布時(shí)間:2020-06-14 17:12:11 來(lái)源:網(wǎng)絡(luò) 閱讀:356 作者:隔壁小白 欄目:大數(shù)據(jù)

[TOC]

一、spark中一些基本術(shù)語(yǔ)

RDD:彈性分布式數(shù)據(jù)集,是spark的核心重點(diǎn)
算子:操作RDD的一些函數(shù)
application:用戶的寫的spark程序(DriverProgram + ExecutorProgram)
job:一個(gè)action類算子觸發(fā)的操作
stage:一組任務(wù),會(huì)根據(jù)依賴關(guān)系將job劃分成若干個(gè)stage
task:同一個(gè)stage內(nèi)部有多個(gè)同樣操作的task(但處理的數(shù)據(jù)不同),是集群中最小的執(zhí)行單元

可能說(shuō)完這些概念,其實(shí)還是不太懂,沒(méi)關(guān)系,這只是先有點(diǎn)印象

二、RDD基本原理和使用

2.1 什么是RDD

? RDD,全稱:Resilient Distributed Dataset,也就是彈性分布式數(shù)據(jù)集。是spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性??赡苓@還不清晰,我舉個(gè)例子:
假設(shè)我使用sc.textFile(xxxx)從hdfs中讀取一個(gè)文件的數(shù)據(jù),那么文件的數(shù)據(jù)就相當(dāng)于一個(gè)RDD,但是事實(shí)上這個(gè)文件的數(shù)據(jù)在處理時(shí)是處于多個(gè)不同的worker節(jié)點(diǎn)上的,但是在邏輯上,在這個(gè)spark集群,這些數(shù)據(jù)都是屬于一個(gè)RDD中的。這也就是為什么說(shuō)RDD是個(gè)邏輯概念,它是整個(gè)集群的一個(gè)抽象對(duì)象,分布在集群中。由此看出,RDD是spark實(shí)現(xiàn)數(shù)據(jù)分布式計(jì)算處理的關(guān)鍵。例如:
二、spark--spark core原理與使用

? 圖 2.1 RDD原理

2.2 RDD的屬性

關(guān)于RDD的屬性,源碼中有一段注釋,如下:

* Internally, each RDD is characterized by five main properties:
*  - A list of partitions
1、是一組分區(qū)
理解:RDD是由分區(qū)組成的,每個(gè)分區(qū)運(yùn)行在不同的Worker上,通過(guò)這種方式,實(shí)現(xiàn)分布式計(jì)算,即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

*  - A function for computing each split
2、split理解為分區(qū)
在RDD中,有一系列函數(shù),用于處理計(jì)算每個(gè)分區(qū)中的數(shù)據(jù)。這里把函數(shù)叫做算子。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。
算子類型:
transformation   action

*  - A list of dependencies on other RDDs
3、RDD之間存在依賴關(guān)系。窄依賴,寬依賴。
需要用依賴關(guān)系來(lái)劃分Stage,任務(wù)是按照Stage來(lái)執(zhí)行的。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
4、可以自動(dòng)以分區(qū)規(guī)則來(lái)創(chuàng)建RDD
創(chuàng)建RDD時(shí),可以指定分區(qū),也可以自定義分區(qū)規(guī)則。
當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
*    an HDFS file)
5、優(yōu)先選擇離文件位置近的節(jié)點(diǎn)來(lái)執(zhí)行任務(wù)。
移動(dòng)計(jì)算,不移動(dòng)數(shù)據(jù)
這點(diǎn)需要解釋下:一般來(lái)說(shuō)spark是構(gòu)建在hdfs之上,從hdfs中讀取數(shù)據(jù)進(jìn)行處理的。而hdfs是一個(gè)分布式存儲(chǔ),比如有A、B、C三個(gè)datanode,假設(shè)spark要處理的數(shù)據(jù)剛好存儲(chǔ)在C節(jié)點(diǎn)上。如果spark此時(shí)將任務(wù)放在B節(jié)點(diǎn)或者A節(jié)點(diǎn)上執(zhí)行,那么就得先從C節(jié)點(diǎn)讀取數(shù)據(jù),然后經(jīng)過(guò)網(wǎng)絡(luò)傳輸?shù)紸或B節(jié)點(diǎn),然后才能處理,這其實(shí)是很耗費(fèi)性能。而這里spark的意思是優(yōu)先在離處理數(shù)據(jù)比較近的節(jié)點(diǎn)上執(zhí)行任務(wù),也就是優(yōu)先在C節(jié)點(diǎn)上執(zhí)行任務(wù)。這就可以節(jié)省數(shù)據(jù)傳輸所耗費(fèi)的時(shí)間和性能。也就是移動(dòng)計(jì)算而不移動(dòng)數(shù)據(jù)。

2.3 創(chuàng)建RDD

創(chuàng)建RDD首先需要?jiǎng)?chuàng)建 SparkContext對(duì)象:
//創(chuàng)建spark配置文件對(duì)象.設(shè)置app名稱,master地址,local表示為本地模式。
//如果是提交到集群中,通常不指定。因?yàn)榭赡茉诙鄠€(gè)集群匯上跑,寫死不方便
val conf = new SparkConf().setAppName("wordCount").setMaster("local")
//創(chuàng)建spark context對(duì)象
val sc = new SparkContext(conf)

通過(guò)sc.parallelize() 創(chuàng)建RDD:

sc.parallelize(seq,numPartitions)
seq:為序列對(duì)象,如list,array等
numPartitions:分區(qū)個(gè)數(shù),可以不指定,默認(rèn)是2

例子:
val rdd1 = sc.parallelize(Array(1,2,3,4,5),3)
rdd1.partitions.length

通過(guò)外部數(shù)據(jù)源創(chuàng)建

val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")

2.4 算子類型

算子有分為transformation和action類型,
transformation:

延遲計(jì)算,lazy懶值,不會(huì)觸發(fā)計(jì)算。只有遇到action算子才會(huì)觸發(fā)計(jì)算。它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí),這些轉(zhuǎn)換才會(huì)真正運(yùn)行。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行

action:

和transformation類似,但是是直接觸發(fā)計(jì)算的,不會(huì)等待

2.5 transformation算子

這里為了方便講解,實(shí)現(xiàn)創(chuàng)建一個(gè)rdd,都使用spark-shell中進(jìn)行演示:

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,8,34,100,79))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2.5.1 map(func)

map[U](f:T=>U)()
參數(shù)是一個(gè)函數(shù),并且要求函數(shù)參數(shù)是單個(gè),返回值也是單個(gè)。用函數(shù)處理傳入的數(shù)據(jù),然后返回處理后的數(shù)據(jù)

例子:
//這里傳入一個(gè)匿名函數(shù),將rdd1中的每個(gè)值*2,并返回處理的新數(shù)組
scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

//這里collect是一個(gè)action算子,觸發(fā)計(jì)算并打印結(jié)果
scala> rdd2.collect
res0: Array[Int] = Array(2, 4, 6, 8, 10, 16, 68, 200, 158)

2.5.2 filter

filter(f:T=>boolean)
參數(shù)是一個(gè)判斷函數(shù),判斷傳入的參數(shù),然后返回true還是false,常用來(lái)過(guò)濾數(shù)據(jù)。最后將true的數(shù)據(jù)返回

例子:
//過(guò)濾出大于20的數(shù)據(jù)
scala> rdd2.filter(_>20).collect
res4: Array[Int] = Array(68, 200, 158)

2.5.3 flatMap

flatMap(f:T=>U)
先map后flat,flat就是將多個(gè)列表等對(duì)象展開(kāi)合并成一個(gè)大列表。并返回處理后的數(shù)據(jù)。這個(gè)函數(shù)一般用來(lái)處理多個(gè)一個(gè)列表中還包含多個(gè)列表的情況

例子:
scala> val rdd4 = sc.parallelize(Array("a b c","d e f","x y z"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24

//處理邏輯是:將array中每個(gè)字符串按空格切割,然后生成多個(gè)array,接著將多個(gè)array展開(kāi)合并一個(gè)新的array
scala> val rdd5 = rdd4.flatMap(_.split(" "))
rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at flatMap at <console>:26

scala> rdd5.collect
res5: Array[String] = Array(a, b, c, d, e, f, x, y, z)

2.5.4 集合操作

union(otherDataset) 并集
intersection(otherDataset) 交集
distinct([numTasks]))去重

例子:
scala> val rdd6 = sc.parallelize(List(5,6,7,8,9,10))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> val rdd7 = sc.parallelize(List(1,2,3,4,5,6))
rdd7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

//并集
scala> val rdd8 = rdd6.union(rdd7)
rdd8: org.apache.spark.rdd.RDD[Int] = UnionRDD[22] at union at <console>:28

scala> rdd8.collect
res6: Array[Int] = Array(5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6)

//去重
scala> rdd8.distinct.collect
res7: Array[Int] = Array(4, 8, 1, 9, 5, 6, 10, 2, 7, 3)                         
//交集
scala> val rdd9 = rdd6.intersection(rdd7)
rdd9: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at intersection at <console>:28

scala> rdd9.collect
res8: Array[Int] = Array(6, 5)

2.5.5 分組操作

groupByKey([numTasks]):只是將同一key的進(jìn)行分組聚合
reduceByKey(f:(V,V)=>V, [numTasks]) 首先是將同一key的KV進(jìn)行聚合,然后對(duì)value進(jìn)行操作。

scala> val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("Jerry",1000),("Tom",3000),("Mike",2000)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[34] at union at <console>:28

scala> rdd3.collect
res9: Array[(String, Int)] = Array((Tom,1000), (Jerry,3000), (Mary,2000), (Jerry,1000), (Tom,3000), (Mike,2000))

scala> val rdd4 = rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[35] at groupByKey at <console>:30

//分組
scala> rdd4.collect
res10: Array[(String, Iterable[Int])] = 
Array(
(Tom,CompactBuffer(1000, 3000)), 
(Jerry,CompactBuffer(3000, 1000)), 
(Mike,CompactBuffer(2000)), 
(Mary,CompactBuffer(2000)))

注意:使用分組函數(shù)時(shí),不推薦使用groupByKey,因?yàn)樾阅懿缓?,官方推薦reduceByKey
//分組并聚合
scala> rdd3.reduceByKey(_+_).collect
res11: Array[(String, Int)] = Array((Tom,4000), (Jerry,4000), (Mike,2000), (Mary,2000))

2.5.6 cogroup

這個(gè)函數(shù)的功能不太好總結(jié),直接看例子吧
scala> val rdd1 = sc.parallelize(List(("Tom",1),("Tom",2),("jerry",1),("Mike",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry",2),("Tom",1),("Bob",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[40] at cogroup at <console>:28

scala> rdd3.collect
res12: Array[(String, (Iterable[Int], Iterable[Int]))] = 
Array(
(Tom,(CompactBuffer(1, 2),CompactBuffer(1))), 
(Mike,(CompactBuffer(2),CompactBuffer())), 
(jerry,(CompactBuffer(1),CompactBuffer(2))), 
(Bob,(CompactBuffer(),CompactBuffer(2))))

2.5.7 排序

sortByKey(acsending:true/false) 根據(jù)KV中的key排序
sortBy(f:T=>U,acsending:true/false) 一般排序,且是對(duì)處理后的數(shù)據(jù)進(jìn)行排序,可以用來(lái)給KV的,按照value進(jìn)行排序

例子:
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,8,34,100,79))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

scala> rdd2.collect
res0: Array[Int] = Array(2, 4, 6, 8, 10, 16, 68, 200, 158)

scala> rdd2.sortBy(x=>x,true)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:29

scala> rdd2.sortBy(x=>x,true).collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 16, 68, 158, 200)                      

scala> rdd2.sortBy(x=>x,false).collect
res3: Array[Int] = Array(200, 158, 68, 16, 10, 8, 6, 4, 2)

另外一個(gè)例子:

需求:
我們想按照KV中的value進(jìn)行排序,但是SortByKey按照key排序的。

做法一:
1、第一步交換,把key value交換,然后調(diào)用sortByKey
2、KV再次調(diào)換位置
scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",1),("kitty",2),("bob",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("jerry",2),("tom",3),("kitty",5),("bob",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:24

scala> val rdd3 = rdd1 union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[44] at union at <console>:28

scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[45] at reduceByKey at <console>:30

scala> rdd4.collect
res13: Array[(String, Int)] = Array((bob,3), (tom,4), (jerry,3), (kitty,7))

//調(diào)換位置再排序,然后再調(diào)回來(lái)
scala> val rdd5 = rdd4.map(t => (t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[50] at map at <console>:32

scala> rdd5.collect
res14: Array[(String, Int)] = Array((kitty,7), (tom,4), (bob,3), (jerry,3)) 

做法二:
直接使用sortBy 這個(gè)算子,可以直接按照value排序

2.6 action算子

reduce

類似前面的reduceByKey,但是用于非KV的數(shù)據(jù)合并,且是action算子

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:24

scala> val rdd2 = rdd1.reduce(_+_)
rdd2: Int = 15

還有一些action算子:

reduce(func)    通過(guò)func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是可交換且可并聯(lián)的
collect()   在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素。通常只是用于觸發(fā)計(jì)算
count()  返回RDD的元素個(gè)數(shù)
first()   返回RDD的第一個(gè)元素(類似于take(1))
take(n)   返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子
takeOrdered(n, [ordering])  ,返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組,并排序
saveAsTextFile(path)    將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path)    將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。
saveAsObjectFile(path)  
countByKey()    針對(duì)(K,V)類型的RDD,返回一個(gè)(K,Int)的map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)。
foreach(func)   在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新。

2.7 RDD的緩存特性

RDD也存在緩存機(jī)制,也就是將RDD緩存到內(nèi)存或者磁盤中,不用重復(fù)計(jì)算。
這里涉及到幾個(gè)算子:

cache()   標(biāo)識(shí)該rdd可以被緩存,默認(rèn)緩存在內(nèi)存中,底層其實(shí)是調(diào)用persist() 
persist() 標(biāo)識(shí)該rdd可以被緩存,默認(rèn)緩存在內(nèi)存中
persist(newLevel : org.apache.spark.storage.StorageLevel) 和上面類似,但是可以指定緩存的位置

可以緩存的位置有:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

基本就分為兩類:
純內(nèi)存緩存
純磁盤緩存
磁盤+內(nèi)存緩存

一般來(lái)說(shuō),直接是默認(rèn)的位置,也就是緩存在內(nèi)存中性能較好,但是會(huì)耗費(fèi)很多內(nèi)存,這點(diǎn)要注意,如無(wú)需要,就不要緩存了。
舉例:

讀取一個(gè)大文件,統(tǒng)計(jì)行數(shù)

scala> val rdd1 = sc.textFile("hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt MapPartitionsRDD[52] at textFile at <console>:24

scala> rdd1.count
res15: Long = 923452 
觸發(fā)計(jì)算,統(tǒng)計(jì)行數(shù)

scala> rdd1.cache
res16: rdd1.type = hdfs://192.168.109.132:8020/tmp_files/test_Cache.txt MapPartitionsRDD[52] at textFile at <console>:24
標(biāo)識(shí)這個(gè)RDD可以被緩存,不會(huì)觸發(fā)計(jì)算

scala> rdd1.count
res17: Long = 923452 
觸發(fā)計(jì)算,并把結(jié)果進(jìn)行緩存

scala> rdd1.count
res18: Long = 923452
直接從緩存中讀取數(shù)據(jù)。

要注意一個(gè)點(diǎn):調(diào)用cache方法的時(shí)候,只是說(shuō)標(biāo)識(shí)該rdd在后續(xù)觸發(fā)計(jì)算的時(shí)候,結(jié)果可以被緩存,而不是說(shuō)當(dāng)前rdd就被緩存了,這點(diǎn)要分清楚

2.8 RDD的容錯(cuò)機(jī)制--checkpoint

? spark在計(jì)算時(shí),中間涉及到RDD的多個(gè)轉(zhuǎn)換過(guò)程,如果這時(shí)候RDD的某個(gè)分區(qū)計(jì)算故障,導(dǎo)致結(jié)果丟失了。最簡(jiǎn)單的辦法自然是從頭開(kāi)始重新計(jì)算,但是這樣很浪費(fèi)時(shí)間。而checkpoint就是在觸發(fā)計(jì)算的時(shí)候,對(duì)RDD進(jìn)行檢查點(diǎn)狀態(tài)保存,如果后面的計(jì)算出錯(cuò)了,還可以從檢查點(diǎn)開(kāi)始重新計(jì)算。
? checkpoint一般是在具有容錯(cuò)能力,高可靠的文件系統(tǒng)上(比如HDFS, S3等)設(shè)置一個(gè)檢查點(diǎn)路徑,用于保存檢查點(diǎn)數(shù)據(jù)。出錯(cuò)的時(shí)候直接從檢查點(diǎn)目錄讀取。有本地目錄和遠(yuǎn)程目錄兩種模式。

2.8.1 本地目錄

這種模式下,要求運(yùn)行在本地模式下,不能用在集群模式下,一般用于測(cè)試開(kāi)發(fā)

sc.setCheckpointDir(本地路徑)   設(shè)置本地checkpoint路徑
rdd1.checkpoint    設(shè)置檢查點(diǎn)
rdd1.count         遇到action類算子,觸發(fā)計(jì)算,就會(huì)在checkpoint目錄生成檢查點(diǎn)

2.8.2 遠(yuǎn)程目錄(hdfs為例)

這種模式要求運(yùn)行在集群模式下,用于生產(chǎn)環(huán)境

scala> sc.setCheckpointDir("hdfs://192.168.109.132:8020/sparkckpt0619")

scala> rdd1.checkpoint

scala> rdd1.count
res22: Long = 923452

用法都是類似,只是目錄不一樣

要注意,使用checkpoint的時(shí)候,源碼中有一段話如下:

this function must be called before any job has been executed on this RDD. It is strongly recommend that  this rdd is
persisted in memory,otherwise saving it on a file will require recomputation.

大致意思就是:
要在開(kāi)始計(jì)算前就調(diào)用這個(gè)方法,也就是action算子之前。而且最好設(shè)置這個(gè)rdd緩存在內(nèi)存中,否則保存檢查點(diǎn)的時(shí)候,需要重新計(jì)算一次。

2.9 RDD中的依賴與stage原理

2.9.1 RDD依賴

? 這個(gè)是RDD運(yùn)行原理中的一個(gè)重點(diǎn)概念。
? 首先要說(shuō)說(shuō)依賴,依賴的意思就是RDD之間是有依賴關(guān)系的,因?yàn)閟park計(jì)算過(guò)程中涉及到多個(gè)RDD的轉(zhuǎn)換。RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。看圖
二、spark--spark core原理與使用

? 圖2.2 RDD的寬窄依賴

寬依賴:
一個(gè)父RDD的partition被多個(gè)子RDD的partition依賴。其實(shí)就是將父RDD的數(shù)據(jù)進(jìn)行shuffle的過(guò)程,因?yàn)楦窻DD一個(gè)partition被多個(gè)RDD的partition依賴,意味著需要將父RDD的數(shù)據(jù)打亂分配給多個(gè)RDD,打亂的過(guò)程其實(shí)就是shuffle。一般實(shí)際情況是多個(gè)父RDD的partition和多個(gè)子RDD的partition互相交錯(cuò)的依賴。

窄依賴:
一個(gè)父RDD的partition最多被一個(gè)子RDD的partition依賴

2.9.2 stage劃分

二、spark--spark core原理與使用

? 圖2.3 RDD依賴

? DAG(Directed Acyclic Graph)叫做有向無(wú)環(huán)圖,原始的RDD通過(guò)一系列的轉(zhuǎn)換就就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage。寬窄依賴的作用就是用劃分stage,stage之間是寬依賴,stage內(nèi)部是窄依賴。
? 對(duì)于窄依賴,由于父和子RDD的partition都是一對(duì)一的依賴關(guān)系,所以可以父和子的轉(zhuǎn)換可以放在一個(gè)task中執(zhí)行,例如上面的task0,CDF都是窄依賴,所以CDF直接的轉(zhuǎn)換是可以放在一個(gè)task中執(zhí)行的。一個(gè)stage內(nèi)部都是窄依賴
? 對(duì)于寬依賴,由于有shuffle的存在,那么就要求所有父RDD都處理完成后,才能執(zhí)行執(zhí)行shuffle,接著子RDD才能進(jìn)行處理。由于shuffle的存在,導(dǎo)致task任務(wù)鏈必定不是連續(xù)的了,需要重新開(kāi)始規(guī)劃task任務(wù)鏈,所以寬依賴是劃分stage的依據(jù)。
? 再往深的說(shuō),為什么要?jiǎng)澐謘tage?
? 根據(jù)寬依賴劃分了stage之后,因?yàn)閷捯蕾嚨膕huffle,所以導(dǎo)致task鏈?zhǔn)菬o(wú)法連續(xù)。而劃分stage就是讓一個(gè)stage內(nèi)部只有窄依賴,窄依賴是一對(duì)一的關(guān)系,那么task鏈就是連續(xù)的,沒(méi)有shuffle,就比如上面task0中,C->D->F,中的一個(gè)分區(qū),轉(zhuǎn)換過(guò)程都是一對(duì)一的,所以是一個(gè)連續(xù)的task鏈,放在一個(gè)task中,而另外一個(gè)分區(qū)類似,就放在task1中。因?yàn)镕->G是寬依賴,需要shuffle,所以task鏈無(wú)法連續(xù)。像這種一條線把RDD轉(zhuǎn)換邏輯串起來(lái),直到遇到寬依賴,就是一個(gè)task,而一個(gè)task處理的實(shí)際上是一個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)換過(guò)程。而在spark中,task是最小的調(diào)度單位,spark會(huì)將task分配到離分區(qū)數(shù)據(jù)近的worker節(jié)點(diǎn)上執(zhí)行。所以其實(shí)spark調(diào)度是task。
? 那么回到最開(kāi)始的問(wèn)題上,為什么要?jiǎng)澐謘tage,因?yàn)楦鶕?jù)寬窄依賴劃分出stage之后,stage內(nèi)部就可以很方便劃分出一個(gè)個(gè)task,每個(gè)task處理一個(gè)分區(qū)的數(shù)據(jù),然后spark就將task調(diào)度到對(duì)應(yīng)的worker節(jié)點(diǎn)上執(zhí)行。所以從劃分stage到劃分task,核心就在于實(shí)現(xiàn)并行計(jì)算。
? 所以,最后就是一句話,劃分stage的目的是為了更方便的劃分task

2.9.3 RDD存儲(chǔ)的是什么?

? 說(shuō)到這里,其實(shí)我們想到一個(gè)問(wèn)題,RDD里面存儲(chǔ)的是數(shù)據(jù)嗎?實(shí)際上并不是,它存儲(chǔ)的實(shí)際上對(duì)數(shù)據(jù)的轉(zhuǎn)換鏈,說(shuō)的具體點(diǎn)是對(duì)分區(qū)的轉(zhuǎn)換鏈,也就是task中包含的算子。而當(dāng)劃分stage,接著劃分task之后,一個(gè)task內(nèi)部有什么算子就已經(jīng)很清楚了,接著就是把計(jì)算任務(wù)發(fā)送到worker節(jié)點(diǎn)執(zhí)行。這種計(jì)算我們稱為 pipeline計(jì)算模式,算子就是在管道中的。
? 由此,其實(shí)RDD叫做彈性分布式數(shù)據(jù)集,并不是說(shuō)它存儲(chǔ)數(shù)據(jù),而是存儲(chǔ)了操作數(shù)據(jù)的方法函數(shù),也就是算子。

2.10 RDD高級(jí)算子

2.10.1 mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ? Iterator[U])

參數(shù)說(shuō)明:
f是一個(gè)函數(shù)參數(shù),需要自定義。
f 接收兩個(gè)參數(shù),第一個(gè)參數(shù)是Int,代表分區(qū)號(hào)。第二個(gè)Iterator[T]代表該分區(qū)中的所有元素。

通過(guò)這兩個(gè)參數(shù),可以定義處理分區(qū)的函數(shù)。
Iterator[U] : 操作完成后,返回的結(jié)果。

舉例:
將每個(gè)分區(qū)中的元素,包括分區(qū)號(hào),直接打印出來(lái)。

//先創(chuàng)建一個(gè)rdd,指定分區(qū)數(shù)為3
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def fun1(index:Int,iter:Iterator[Int]):Iterator[String]={
| iter.toList.map( x => "[PartId: " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res0: Array[String] = Array(
[PartId: 0 , value = 1 ], [PartId: 0 , value = 2 ], 
[PartId: 1 , value = 3 ], [PartId: 1 , value = 4 ], [PartId: 1 , value = 5 ], 
[PartId: 2 , value = 6 ], [PartId: 2 , value = 7 ], [PartId: 2 , value = 8 ]
)

2.10.2 aggregate

聚合操作,類似于分組(group by).
但是aggregate是先局部聚合(類似于mr中的combine),然后再全局聚合。性能比直接使用reduce算子要好,因?yàn)閞educe是直接全局聚合的。

def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)
參數(shù)說(shuō)明:
zeroValue:初始值,這個(gè)初始值會(huì)加入到每一個(gè)分區(qū)中,最后也會(huì)加入到全局操作中
seqOp: (U, T) ? U:局部聚合操作函數(shù)
combOp: (U, U) ? U:全局聚合操作函數(shù)

=================================================
例子1:
初始值是10
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

//打印看看分區(qū)情況
scala> rdd2.mapPartitionsWithIndex(fun1).collect
res7: Array[String] = Array([PartId: 0 , value = 1 ], [PartId: 0 , value = 2 ], [PartId: 1 , value = 3 ], [PartId: 1 , value = 4 ], [PartId: 1 , value = 5 ])

//求出每個(gè)分區(qū)的最大值,最后得出每個(gè)分區(qū)最大值,然后全局之后將每個(gè)最大值相加
scala> rdd2.aggregate(10)(max(_,_),_+_)
res8: Int = 30

為什么這里是10呢?
初始值是10 代表每個(gè)分區(qū)都多了一個(gè)10
局部操作,每個(gè)分區(qū)最大值都是10
全局操作,也多一個(gè)10 即 10(局部最大) + 10(局部最大) + 10(全局操作默認(rèn)值) = 30

=================================================
例子2:
使用aggregate將所有分區(qū)全局?jǐn)?shù)據(jù)求和,有兩種方式:
1、reduce(_+_)
2、aggregate(0)(_+_,_+_)

2.10.3 aggregateByKey

類似于aggregate操作,區(qū)別:操作的 <key value> 的數(shù)據(jù),只操作同一key的中的value。是將同一key的KV先局部分組,然后對(duì)value聚合。然后再全局分組,再對(duì)value聚合。

 aggregateByKey和reduceByKey實(shí)現(xiàn)的類似的功能,但是效率比reduceByKey高

例子:
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
def fun1(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={
iter.toList.map( x => "[PartId: " + index + " , value = " + x + " ]").iterator
}

pairRDD.mapPartitionsWithIndex(fun1).collect

scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at parallelize at <console>:27

scala> def fun1(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={
| iter.toList.map( x => "[PartId: " + index + " , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[(String, Int)])Iterator[String]

scala> pairRDD.mapPartitionsWithIndex(fun1).collect
res31: Array[String] = Array(
[PartId: 0 , value = (cat,2) ], [PartId: 0 , value = (cat,5) ], [PartId: 0 , value = (mouse,4) ],
[PartId: 1 , value = (cat,12) ], [PartId: 1 , value = (dog,12) ], [PartId: 1 , value = (mouse,2)
])

需求:
找到每個(gè)分區(qū)中,動(dòng)物最多的動(dòng)物,進(jìn)行就和
pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect

0:(cat,2)和(cat,5) --> (cat,5)  (mouse,4)
1:(cat,12)   (dog,12)   (mouse,2)

求和:(cat,17)  (mouse,6)   (dog,12) 

2.10.4 coalesce 和 repartition

這兩者都用于重分區(qū)
 repartition(numPartition)  指定重分區(qū)個(gè)數(shù),一定會(huì)發(fā)生shuffle
 coalesce(numPartition,shuffleOrNot) 指定重分區(qū)個(gè)數(shù),默認(rèn)不會(huì)發(fā)生shuffle,可以指定shuffle

要看更多算子的用法,<http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html>
寫的很詳細(xì)

2.11 分區(qū)

spark自帶了兩個(gè)分區(qū)類:
HashPartitioner:這個(gè)是默認(rèn)的partitioner,在一些涉及到shuffle的算子會(huì)用到。在一些可以指定最小分區(qū)數(shù)量的算子中,就會(huì)涉及到分區(qū)。這些分區(qū)只能用于KV對(duì)
RangePartitioner:按照key的范圍進(jìn)行分區(qū),比如1~100,101~200分別是不同分區(qū)的
用戶也可以自己自定義分區(qū),步驟如下:
1、先繼承Partitioner類,里面寫分區(qū)邏輯,形成一個(gè)新的分區(qū)類
2、rdd.partitionBy(new partiotionerClassxxx())
例子:

數(shù)據(jù)格式如下:
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242

需求:
將同一個(gè)頁(yè)面的訪問(wèn)日志各自寫到一個(gè)單獨(dú)的文件中 

代碼:
package SparkExer

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable

/**
  * 自定義分區(qū):
  * 1、先繼承Partitioner類,里面寫分區(qū)邏輯,形成一個(gè)新的分區(qū)類
  * 2、rdd.partitionBy(new partiotionerClassxxx())
  */
object CustomPart {
  def main(args: Array[String]): Unit = {
    //指定hadoop的家目錄,寫入文件到本地時(shí)需要用到hadoop的一些包
    System.setProperty("hadoop.home.dir","F:\\hadoop-2.7.2")

    val conf = new SparkConf().setAppName("Tomcat Log Partitioner").setMaster("local")
    val sc = new SparkContext(conf)
    //切割文件
    val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt").map(
      line => {
        val jspName = line.split(" ")(6)
        (jspName,line)
      }
    )

    //提取出所有key,也就是網(wǎng)頁(yè)名
    val rdd2 = rdd1.map(_._1).distinct().collect()
    //分區(qū)
    val rdd3 = rdd1.partitionBy(new TomcatWebPartitioner(rdd2))
    //將分區(qū)數(shù)據(jù)寫到文件中
    rdd3.saveAsTextFile("G:\\test\\tomcat_localhost")
  }
}

class TomcatWebPartitioner(jspList:Array[String]) extends Partitioner{
  private val listMap = new mutable.HashMap[String,Int]()
  var partitionNum = 0

  //根據(jù)網(wǎng)頁(yè)名稱,規(guī)劃整個(gè)分區(qū)個(gè)數(shù)
  for (s<-jspList) {
    listMap.put(s, partitionNum)
    partitionNum += 1
  }

  //返回分區(qū)總個(gè)數(shù)
  override def numPartitions: Int = listMap.size

  //按照key返回某個(gè)分區(qū)號(hào)
  override def getPartition(key: Any): Int = listMap.getOrElse(key.toString, 0)
}

2.12 序列化問(wèn)題

? 首先我們知道一點(diǎn),一個(gè)spark程序其實(shí)是分為兩部分的,一部分driver,它也是在executor中運(yùn)行的,另一部分則是普通的executor,用于運(yùn)行操作RDD的task的。所以其實(shí)也可以看出,只有是對(duì)RDD操作的代碼才會(huì)進(jìn)行分布式運(yùn)行,分配到多個(gè)executor中運(yùn)行,但是不屬于RDD的代碼是不會(huì)的,它僅僅是在driver中執(zhí)行。這就是關(guān)鍵了。
例子:

object test {
    val sc = new SparkContext()
    print("xxxx1")

    val rdd1 = sc.textFile(xxxx)
    rdd1.map(print(xxx2)) 

}

例如上面的例子,rdd1中的print(xxx2)會(huì)在多個(gè)executor中執(zhí)行,因?yàn)樗窃趓dd內(nèi)部執(zhí)行,而外面的print("xxxx1")只會(huì)在driver中執(zhí)行,也沒(méi)有實(shí)現(xiàn)序列化,所以實(shí)際上也不可能通過(guò)網(wǎng)絡(luò)傳遞。所以這種區(qū)別一定要了解。由此我們可以知道,如果變量什么的不是在rdd內(nèi)部的話,是不可能被多個(gè)executor上的程序獲得的。但是如果我們想這樣呢?而且是不需要定義在rdd內(nèi)部。那么就得用到下面的共享變量了

2.13 spark中廣播變量(共享變量)

廣播變量就是可以實(shí)現(xiàn)將driver中的變量給在不同的executor中運(yùn)行的rdd算子調(diào)用,而且無(wú)需再rdd算子內(nèi)部定義。常見(jiàn)的比如連接mysql等數(shù)據(jù)庫(kù)的連接對(duì)象,可以設(shè)置為廣播變量,這樣就可以只創(chuàng)建一個(gè)連接了。
用法例子:

//定義共享變量,用于共享從mongodb讀取的數(shù)據(jù),需要將數(shù)據(jù)封裝成 map(mid1,[map(mid2,score),map(mid3,score)....])的形式

    val moviesRecsMap = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIES_RECS)
      .format("com.mongodb.spark.sql")
      .load().as[MoviesRecs].rdd.map(item=> {
      (item.mid, item.recs.map(itemRecs=>(itemRecs.mid,itemRecs.socre)).toMap)
    }).collectAsMap()

    這是關(guān)鍵的一步,就是廣播變量出去
    //將此變量廣播,后面就可以在任意一個(gè)executor中調(diào)用了
    val moviesRecsMapBroadcast = spark.sparkContext.broadcast(moviesRecsMap)
    //因?yàn)槭菓兄导虞d,所以需要手動(dòng)調(diào)用一次才會(huì)真正廣播出去
    moviesRecsMapBroadcast.id

三、spark小案例

3.1 統(tǒng)計(jì)訪問(wèn)量前N名的網(wǎng)站頁(yè)面

需求:根據(jù)網(wǎng)站訪問(wèn)日志統(tǒng)計(jì)出訪問(wèn)量前N位的網(wǎng)頁(yè)名稱
數(shù)據(jù)格式如下:
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/web.jsp HTTP/1.1" 200 239
192.168.88.1 - - [30/Jul/2017:12:55:02 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242

代碼:
package SparkExer

import org.apache.spark.{SparkConf, SparkContext}

/**
  * 分析tomcat日志
  * 日志例子:
  * 192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
  *
  * 統(tǒng)計(jì)每個(gè)頁(yè)面的訪問(wèn)量
  */
object TomcatLog {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Tomcat Log analysis").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt")
      .map(_.split(" ")(6))
      .map((_,1))
      .reduceByKey(_+_)
      .map(t=>(t._2,t._1))
      .sortByKey(false)
      .map(t=>(t._2,t._1))
      .collect()
    //也可以使用 sortBy(_._2,false)直接根據(jù)value進(jìn)行排序

    //取出rdd中的前N個(gè)數(shù)據(jù)
    rdd1.take(2).foreach(x=>println(x._1 + ":" + x._2))
    println("=========================================")
    //取出rdd中的后N個(gè)數(shù)據(jù)
    rdd1.takeRight(2).foreach(x=>println(x._1 + ":" + x._2))
    sc.stop()
  }
}

3.2 自定義分區(qū)例子

請(qǐng)看前面2.11中分區(qū)的例子

3.3 spark連接mysql

package SparkExer

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

object SparkConMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Tomcat Log To Mysql").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt")
      .map(_.split(" ")(6))

    rdd1.foreach(l=>{
      //jdbc操作需要包含在rdd中才能被所有worker上的executor調(diào)用,也就是借用rdd實(shí)現(xiàn)序列化
      val jdbcUrl = "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8"
      var conn:Connection = null
      //sql語(yǔ)句編輯對(duì)象
      var ps:PreparedStatement = null

      conn = DriverManager.getConnection(jdbcUrl, "root", "wjt86912572")
      //?是占位符,后面需要ps1.setxxx(rowkey,value)的形式填充值進(jìn)去的,按先后順序
      ps = conn.prepareStatement("insert into customer values (?,?)")
      ps.setString(1,l)
      ps.setInt(2,1)

    })
  }
}

注意:
spark操作jdbc時(shí),如果直接使用jdbc操作數(shù)據(jù)庫(kù),會(huì)有序列化的問(wèn)題。
因?yàn)樵趕park分布式框架中,所有操作RDD的對(duì)象應(yīng)該是屬于RDD內(nèi)部的,
才有可能在整個(gè)分布式集群中使用。也就是需要序列化。
通俗來(lái)說(shuō):5個(gè)worker共享一個(gè)jdbc連接對(duì)象,和5個(gè)worker每個(gè)單獨(dú)創(chuàng)建一個(gè)連接對(duì)象的區(qū)別
所以在定義jdbc連接對(duì)象時(shí),需要在RDD內(nèi)部定義

上面的方式過(guò)于繁瑣,而且每個(gè)數(shù)據(jù)都會(huì)新建一個(gè)jdbc連接對(duì)象
優(yōu)化:使用rdd1.foreachPartition()來(lái)對(duì)每個(gè)分區(qū)操作,而不是對(duì)每條數(shù)據(jù)操作
這樣可以通過(guò)只為每個(gè)分區(qū)創(chuàng)建一個(gè)jdbc連接對(duì)象來(lái)節(jié)省數(shù)據(jù)庫(kù)資源

package SparkExer

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

object SparkConMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Tomcat Log To Mysql").setMaster("local")
    val sc = new SparkContext(conf)
    val rdd1 = sc.textFile("G:\\test\\tomcat_localhost_access_log.2017-07-30.txt")
      .map(_.split(" ")(6))

    rdd1.foreachPartition(updateMysql)
    /**
      * 上面的方式過(guò)于繁瑣,而且每個(gè)數(shù)據(jù)都會(huì)新建一個(gè)jdbc連接對(duì)象
      * 優(yōu)化:使用rdd1.foreachPartition()來(lái)對(duì)每個(gè)分區(qū)操作,而不是對(duì)每條數(shù)據(jù)操作
      * 這樣可以通過(guò)只為每個(gè)分區(qū)創(chuàng)建一個(gè)jdbc連接對(duì)象來(lái)節(jié)省數(shù)據(jù)庫(kù)資源
      */

  }

  def updateMysql(it:Iterator[String]) = {
    val jdbcUrl = "jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8"
    var conn:Connection = null
    //sql語(yǔ)句編輯對(duì)象
    var ps:PreparedStatement = null

    conn = DriverManager.getConnection(jdbcUrl, "root", "wjt86912572")
    //conn.createStatement()

    //ps = conn.prepareStatement("select * from customer")
    //?是占位符,后面需要ps1.setxxx(rowkey,value)的形式填充值進(jìn)去的,按先后順序
    ps = conn.prepareStatement("insert into customer values (?,?)")
    it.foreach(data=>{
      ps.setString(1,data)
      ps.setInt(2,1)
      ps.executeUpdate()
    })
    ps.close()
    conn.close()
  }
}

另外一種連接mysql的方式就是通過(guò)JdbcRDD對(duì)象去連接

package SparkExer

import java.sql.DriverManager

import org.apache
import org.apache.spark
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

object MysqlJDBCRdd {
  def main(args: Array[String]): Unit = {
    val conn = () => {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      DriverManager.getConnection("jdbc:mysql://bigdata121:3306/test?serverTimezone=UTC&characterEncoding=utf-8",
      "root",
      "wjt86912572")
    }
    val conf = new SparkConf().setAppName("Tomcat Log To Mysql").setMaster("local")
    val sc = new SparkContext(conf)
    //創(chuàng)建jdbcrdd對(duì)象
    val mysqlRdd = new JdbcRDD(sc,conn,"select * from customer where id>? and id<?", 2,7,2,r=> {
      r.getString(2)
    })

  }
}

這個(gè)對(duì)象的使用局限性很大,只能用于select,而且必須傳入where中的兩個(gè)限制值,還要指定分區(qū)

四、 shuffle問(wèn)題

4.1 shuffle導(dǎo)致的數(shù)據(jù)傾斜分析

https://www.cnblogs.com/diaozhaojian/p/9635829.html

1、數(shù)據(jù)傾斜原理
(1)在進(jìn)行shuffle的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的key拉取到某個(gè)節(jié)點(diǎn)上的一個(gè)task來(lái)進(jìn)行處理,此時(shí)如果某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別大的話,就會(huì)發(fā)生數(shù)據(jù)傾斜。
(2)由于shuffle之后的分區(qū)規(guī)則,導(dǎo)致某個(gè)分區(qū)數(shù)據(jù)量過(guò)多,導(dǎo)致數(shù)據(jù)傾斜  

2、數(shù)據(jù)傾斜問(wèn)題發(fā)現(xiàn)與定位
   通過(guò)Spark Web UI來(lái)查看當(dāng)前運(yùn)行的stage各個(gè)task分配的數(shù)據(jù)量,從而進(jìn)一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。
    知道數(shù)據(jù)傾斜發(fā)生在哪一個(gè)stage之后,接著我們就需要根據(jù)stage劃分原理,推算出來(lái)發(fā)生傾斜的那個(gè)stage對(duì)應(yīng)代碼中的哪一部分,這部分代碼中肯定會(huì)有一個(gè)shuffle類算子。通過(guò)countByKey查看各個(gè)key的分布。

3、數(shù)據(jù)傾斜解決方案
過(guò)濾少數(shù)導(dǎo)致傾斜的key
提高shuffle操作的并行度
局部聚合和全局聚合

4.2 shuffle類算子

1、去重:
def distinct()
def distinct(numPartitions: Int)

2、聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

3、排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

4、重分區(qū)

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

5、集合或者表操作
def intersection(other: RDD[T]): RDD[T]

def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

def intersection(other: RDD[T], numPartitions: Int): RDD[T]

def subtract(other: RDD[T], numPartitions: Int): RDD[T]

def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]

def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]

def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI