溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Spark Connector Reader 原理與實(shí)踐是怎樣的

發(fā)布時(shí)間:2021-12-03 19:40:58 來(lái)源:億速云 閱讀:170 作者:柒染 欄目:大數(shù)據(jù)

本篇文章為大家展示了Spark Connector Reader 原理與實(shí)踐是怎樣的,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

下面主要講述如何利用 Spark Connector 進(jìn)行 Nebula Graph 數(shù)據(jù)的讀取。

Spark Connector 簡(jiǎn)介

Spark Connector 是一個(gè) Spark 的數(shù)據(jù)連接器,可以通過(guò)該連接器進(jìn)行外部數(shù)據(jù)系統(tǒng)的讀寫(xiě)操作,Spark Connector 包含兩部分,分別是 Reader 和 Writer,而本文側(cè)重介紹 Spark Connector Reader,Writer 部分將在下篇和大家詳聊。

Spark Connector Reader 原理

Spark Connector Reader 是將 Nebula Graph 作為 Spark 的擴(kuò)展數(shù)據(jù)源,從 Nebula Graph 中將數(shù)據(jù)讀成 DataFrame,再進(jìn)行后續(xù)的 map、reduce 等操作。

Spark SQL 允許用戶自定義數(shù)據(jù)源,支持對(duì)外部數(shù)據(jù)源進(jìn)行擴(kuò)展。通過(guò) Spark SQL 讀取的數(shù)據(jù)格式是以命名列方式組織的分布式數(shù)據(jù)集 DataFrame,Spark SQL 本身也提供了眾多 API 方便用戶對(duì) DataFrame 進(jìn)行計(jì)算和轉(zhuǎn)換,能對(duì)多種數(shù)據(jù)源使用 DataFrame 接口。

Spark 調(diào)用外部數(shù)據(jù)源包的是 org.apache.spark.sql,首先了解下 Spark SQL 提供的的擴(kuò)展數(shù)據(jù)源相關(guān)的接口。

Basic Interfaces

  • BaseRelation:表示具有已知 Schema 的元組集合。所有繼承 BaseRelation 的子類都必須生成 StructType 格式的 Schema。換句話說(shuō),BaseRelation 定義了從數(shù)據(jù)源中讀取的數(shù)據(jù)在 Spark SQL 的 DataFrame 中存儲(chǔ)的數(shù)據(jù)格式的。

  • RelationProvider:獲取參數(shù)列表,根據(jù)給定的參數(shù)返回一個(gè)新的 BaseRelation。

  • DataSourceRegister:注冊(cè)數(shù)據(jù)源的簡(jiǎn)寫(xiě),在使用數(shù)據(jù)源時(shí)不用寫(xiě)數(shù)據(jù)源的全限定類名,而只需要寫(xiě)自定義的 shortName 即可。

Providers

  • RelationProvider:從指定數(shù)據(jù)源中生成自定義的 relation。 createRelation()  會(huì)基于給定的 Params 參數(shù)生成新的 relation。

  • SchemaRelationProvider:可以基于給定的 Params 參數(shù)和給定的 Schema 信息生成新的 Relation。

RDD

  • RDD[InternalRow]: 從數(shù)據(jù)源中 Scan 出來(lái)后需要構(gòu)造成 RDD[Row]

要實(shí)現(xiàn)自定義 Spark 外部數(shù)據(jù)源,需要根據(jù)數(shù)據(jù)源自定義上述部分方法。

在 Nebula Graph 的 Spark Connector 中,我們實(shí)現(xiàn)了將 Nebula Graph 作為 Spark SQL 的外部數(shù)據(jù)源,通過(guò) sparkSession.read 形式進(jìn)行數(shù)據(jù)的讀取。該功能實(shí)現(xiàn)的類圖展示如下:

Spark Connector Reader 原理與實(shí)踐是怎樣的

  1. 定義數(shù)據(jù)源 NebulaRelatioProvider,繼承 RelationProvider 進(jìn)行 relation 自定義,繼承 DataSourceRegister 進(jìn)行外部數(shù)據(jù)源的注冊(cè)。

  2. 定義 NebulaRelation 定義 Nebula Graph 的數(shù)據(jù) Schema 和數(shù)據(jù)轉(zhuǎn)換方法。在 getSchema() 方法中連接 Nebula Graph 的 Meta 服務(wù)獲取配置的返回字段對(duì)應(yīng)的 Schema 信息。

  3. 定義 NebulaRDD 進(jìn)行 Nebula Graph 數(shù)據(jù)的讀取。 compute() 方法中定義如何讀取 Nebula Graph 數(shù)據(jù),主要涉及到進(jìn)行 Nebula Graph 數(shù)據(jù) Scan、將讀到的 Nebula Graph Row 數(shù)據(jù)轉(zhuǎn)換為 Spark 的 InternalRow 數(shù)據(jù),以 InternalRow 組成 RDD 的一行,其中每一個(gè) InternalRow 表示 Nebula Graph 中的一行數(shù)據(jù),最終通過(guò)分區(qū)迭代的形式將 Nebula Graph 所有數(shù)據(jù)讀出組裝成最終的 DataFrame 結(jié)果數(shù)據(jù)。

Spark Connector Reader 實(shí)踐

Spark Connector 的 Reader 功能提供了一個(gè)接口供用戶編程進(jìn)行數(shù)據(jù)讀取。一次讀取一個(gè)點(diǎn)/邊類型的數(shù)據(jù),讀取結(jié)果為 DataFrame。

下面開(kāi)始實(shí)踐,拉取 GitHub 上 Spark Connector 代碼:

git clone -b v1.0 git@github.com:vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true

將編譯打成的包 copy 到本地 Maven 庫(kù)。

應(yīng)用示例如下:

  1. 在 mvn 項(xiàng)目的 pom 文件中加入 nebula-spark 依賴

<dependency>
  <groupId>com.vesoft</groupId>
  <artifactId>nebula-spark</artifactId>
  <version>1.1.0</version>
</dependency>
  1. 在 Spark 程序中讀取 Nebula Graph 數(shù)據(jù):

// 讀取 Nebula Graph 點(diǎn)數(shù)據(jù)
val vertexDataset: Dataset[Row] =
      spark.read
        .nebula("127.0.0.1:45500", "spaceName", "100")
        .loadVerticesToDF("tag", "field1,field2")
vertexDataset.show()
        
// 讀取 Nebula Graph 邊數(shù)據(jù)
val edgeDataset: Dataset[Row] =
      spark.read
        .nebula("127.0.0.1:45500", "spaceName", "100")
        .loadEdgesToDF("edge", "*")
edgeDataset.show()

配置說(shuō)明:

  • nebula(address: String, space: String, partitionNum: String)

address:可以配置多個(gè)地址,以英文逗號(hào)分割,如“ip1:45500,ip2:45500”
space: Nebula Graph 的 graphSpace
partitionNum: 設(shè)定spark讀取Nebula時(shí)的partition數(shù),盡量使用創(chuàng)建 Space 時(shí)指定的 Nebula Graph 中的 partitionNum,可確保一個(gè)Spark的partition讀取Nebula Graph一個(gè)part的數(shù)據(jù)。
  • loadVertices(tag: String, fields: String)

tag:Nebula Graph 中點(diǎn)的 Tag
fields:該 Tag 中的字段,,多字段名以英文逗號(hào)分隔。表示只讀取 fields 中的字段,* 表示讀取全部字段
  • loadEdges(edge: String, fields: String)

edge:Nebula Graph 中邊的 Edge
fields:該 Edge 中的字段,多字段名以英文逗號(hào)分隔。表示只讀取 fields 中的字段,* 表示讀取全部字段

上述內(nèi)容就是Spark Connector Reader 原理與實(shí)踐是怎樣的,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI