溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

基于spark的GraphX如何使用

發(fā)布時(shí)間:2021-12-16 14:51:11 來源:億速云 閱讀:213 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“基于spark的GraphX如何使用”,在日常操作中,相信很多人在基于spark的GraphX如何使用問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”基于spark的GraphX如何使用”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

基于Spark的GraphX.pptx

1. Property Graph:用戶定義的有向圖,圖中的每個(gè)頂點(diǎn)和每條邊都附加一個(gè)用戶定義的對(duì)象,允許在兩個(gè)頂點(diǎn)之間并行存在多條邊。每個(gè)頂點(diǎn)都具有一個(gè)64位的唯一標(biāo)識(shí)(VertexID),GraphX并不強(qiáng)制VertexID有序。每條邊則由起始和終止VertexID標(biāo)識(shí)。

Graph具有兩個(gè)參數(shù)化的類型:Vertex(VD)和Edge(ED),分別對(duì)應(yīng)附加在頂點(diǎn)和邊上的對(duì)象。當(dāng)VD和ED為基本的數(shù)據(jù)類型時(shí),Graph將把它們保存在數(shù)組中。

Graph和RDD一樣(spark的基本數(shù)據(jù)類型,Resilient Distributed Dataset),創(chuàng)建之后不可再改變,分布式存儲(chǔ)在集群上,并且具有容錯(cuò)能力。對(duì)圖中結(jié)構(gòu)和值的改變,都將需要產(chǎn)生一個(gè)新的Graph對(duì)象,新的Graph將與之前的Graph共享大部分?jǐn)?shù)據(jù)結(jié)構(gòu)。Graph通過頂點(diǎn)分割方法,分割在不同的機(jī)器上。任何數(shù)據(jù)分片所在機(jī)器的失敗都將引發(fā)該數(shù)據(jù)分片在其它機(jī)器上重新創(chuàng)建。

邏輯上Graph包含VertexRDD和EdgeRDD,即:

    class Graph[VD,ED] {

    val vertices: VertexRDD[VD]

    val edges: EdgeRDD[ED,VD]

    }

    其中,VertexRDD[VD]和EdgeRDD[ED,VD]分別是RDD[VertexID,VD]和RDD[Edge[ED]]經(jīng)過優(yōu)化(extends)后的版本,提供了圖計(jì)算相關(guān)功能,并做了內(nèi)部?jī)?yōu)化。

    2. Graph類的成員變量


    class Graph[VD, ED] {
      //Graph的基本信息:邊數(shù),頂點(diǎn)數(shù),入度,出度,度
      val numEdges: Long
      val numVertices: Long
      val inDegrees: VertexRDD[Int]
      val outDegrees: VertexRDD[Int]
      val degrees: VertexRDD[Int]
      //Graph的頂點(diǎn)RDD,邊RDD,以及三元組RDD
      val vertices: VertexRDD[VD]
      val edges: EdgeRDD[ED, VD]
      val triplets: RDD[EdgeTriplet[VD, ED]]
     }

  1. class Graph[VD, ED] {
      def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
      def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
      def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
    }
    上面的每個(gè)操作都將改變Graph中vertex和edge特性,并產(chǎn)生一個(gè)新的Graph

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse操作返回一個(gè)愿圖中邊的方向反轉(zhuǎn)的新圖。由于該操作沒有改變頂點(diǎn)和邊的特性,所以不需要數(shù)據(jù)的移動(dòng)

subgraph操作返回連接的點(diǎn)和邊滿足vpred和epred構(gòu)成的子圖

mask操作返回兩個(gè)圖相交的子圖,groupEdges操作合并重復(fù)的邊

5. 連接操作

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]}

joinVertices操作,連接頂點(diǎn)和輸入的RDD,然后對(duì)連接得到的頂點(diǎn),應(yīng)用用戶定義的map函數(shù),若在RDD中沒有匹配連接的頂點(diǎn),則保持頂點(diǎn)原有的值不變

outerjoinVertices類似于joinVertices,只是用戶定義的map函數(shù)應(yīng)用于所有的頂點(diǎn),且可以改變頂點(diǎn)的類型

其中f(a)(b)的寫法類似于f(a,b),只是參數(shù)b的類型取決于a

6. 鄰域聚合

GraphX中,經(jīng)過深度優(yōu)化的核心聚合操作是mapReduceTriplets

class Graph[VD, ED] {
  def mapReduceTriplets[A](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      reduce: (A, A) => A)
    : VertexRDD[A]}

mapReduceTriplets接收一個(gè)用戶定義的map函數(shù),應(yīng)用于Graph的每一個(gè)三元組,產(chǎn)生消息(message)給三元組中的任意頂點(diǎn)。為了便于預(yù)先聚合優(yōu)化,暫時(shí)只支持給其中一個(gè)頂點(diǎn)發(fā)送消息。隨后,用戶定義的reduce函數(shù)結(jié)合發(fā)送給每一個(gè)頂點(diǎn)的消息。最終返回VertexRDD[A],沒有收到消息的頂點(diǎn)不包含在該結(jié)果之中

mapRedeceTriplets還包含一個(gè)可選的參數(shù):activeSetOpt,指定執(zhí)行map操作的頂點(diǎn)集合

7. 在spark中,RDD默認(rèn)是不會(huì)一直保存在內(nèi)存中的,為了避免重復(fù)計(jì)算,需要顯式的指定:Graph.cache(),顯式指定保存在內(nèi)存中的RDD只有在系統(tǒng)內(nèi)存不足時(shí),才會(huì)強(qiáng)制采用LRU(least recently uesd)方式調(diào)出內(nèi)存。然而,對(duì)于迭代計(jì)算則應(yīng)該uncaching迭代產(chǎn)生的中間數(shù)據(jù),因此,在進(jìn)行圖的迭代計(jì)算時(shí),推薦采用Pregel API,它會(huì)自動(dòng)的unpersist不需要的中間結(jié)果。

8. GraphX Pregel API

圖天然就是一個(gè)遞歸的數(shù)據(jù)結(jié)構(gòu),圖中頂點(diǎn)的特性取決于它們鄰域頂點(diǎn)的特性,反過來又影響其鄰域頂點(diǎn)的特性。因此,很多重要的圖算法都需要迭代計(jì)算每個(gè)頂點(diǎn)的特性,直到收斂。GraphX提供類似于Pregel的操作,其是Google Pregel和GraphLab框架抽象的結(jié)合。

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,    //初始消息,最大迭代次數(shù),消息傳遞方向
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

9. 創(chuàng)建Graph

GraphX提供了根據(jù)頂點(diǎn)和邊RDD或者從磁盤上創(chuàng)建圖的方法。默認(rèn)情況下,圖構(gòu)建器不會(huì)重新分割圖的邊,即邊將留在它們起始分片所在機(jī)器。然而,Graph.groupEdges要求將圖重新分片,因?yàn)樵摬僮骷僭O(shè)相同的邊處在相同的分片中。所以需先調(diào)用Graph.partitionBy操作。

GraphLoader.edgeListFile操作,從磁盤加載圖,解析sourceVD destinationVD,跳過#開始的注釋行,頂點(diǎn)值默認(rèn)為1

10.VertexRDD和EdgeRDD

GraphX提供Graph的VertexRDD和EdgeRDD,由于GraphX對(duì)頂點(diǎn)和邊的數(shù)據(jù)結(jié)構(gòu)進(jìn)行了優(yōu)化,因此還提供一些額外的功能。Vertex[A]繼承自RDD[VertexID,A],并且約束VertexID只能出現(xiàn)一次,采用哈希表的方式存儲(chǔ)頂點(diǎn)屬性A。EdgeRDD繼承自RDD[Edge[ED]]依據(jù)策略PartitionStrategy,將邊保存在分塊中。在每個(gè)分塊中,邊的結(jié)構(gòu)和屬性保存在不同的結(jié)構(gòu)中。

到此,關(guān)于“基于spark的GraphX如何使用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI