溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么自定義JDBCRDD的分區(qū)

發(fā)布時(shí)間:2021-12-22 11:31:48 來(lái)源:億速云 閱讀:103 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要講解了“怎么自定義JDBCRDD的分區(qū)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“怎么自定義JDBCRDD的分區(qū)”吧!

1,JDBCRDD使用

val data = new JdbcRDD(sc, getConnection

, "SELECT id,aa FROM bbb where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)

參數(shù)解釋:

1,sparkcontext。

2,一個(gè)創(chuàng)建鏈接的函數(shù)。

3,sql。必須有? <= ID AND ID <= ?。

4,要取數(shù)據(jù)的id最小行。

5,要取數(shù)據(jù)的id最大行號(hào)。

6,分區(qū)數(shù)。

7,一個(gè)將ResultSet轉(zhuǎn)化為需要類(lèi)型的方法。

2,JdbcRDD的getPartition方法

override def getPartitions: Array[Partition] = {
 // bounds are inclusive, hence the + 1 here and - 1 on end
 val length = BigInt(1) + upperBound - lowerBound
 (0 until numPartitions).map(i => {
   val start = lowerBound + ((i * length) / numPartitions)
   val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
   new JdbcPartition(i, start.toLong, end.toLong)
 }).toArray
}

3,JdbcRDD的compute方法

就是一個(gè)通過(guò)jdbc獲取指定范圍數(shù)據(jù)的過(guò)程。

val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()

4,重寫(xiě)JDBC方法

重寫(xiě)分區(qū)的方法即可。

如:

CustomizedJdbcRDD[T: ClassTag](
                                     sc: SparkContext,
                                     getConnection: () => Connection,
                                     sql: String,
                                     getCustomizedPartitions: () => Array[Partition],
                                     prepareStatement: (PreparedStatement, CustomizedJdbcPartition) => PreparedStatement,
                                     mapRow: (ResultSet) => T = CustomizedJdbcRDD.resultSetToObjectArray _)

同時(shí)把getPartition方法重寫(xiě)為:

override def getPartitions: Array[Partition] = {
 getCustomizedPartitions();
}

感謝各位的閱讀,以上就是“怎么自定義JDBCRDD的分區(qū)”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)怎么自定義JDBCRDD的分區(qū)這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI