您好,登錄后才能下訂單哦!
這篇文章主要介紹“Spark的JOIN策略有哪些”,在日常操作中,相信很多人在Spark的JOIN策略有哪些問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark的JOIN策略有哪些”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
JOIN操作是非常常見的數(shù)據(jù)處理操作,Spark作為一個統(tǒng)一的大數(shù)據(jù)處理引擎,提供了非常豐富的JOIN場景。
參與JOIN的數(shù)據(jù)集的大小會直接影響Join操作的執(zhí)行效率。同樣,也會影響JOIN機制的選擇和JOIN的執(zhí)行效率。
JOIN的條件會涉及字段之間的邏輯比較。根據(jù)JOIN的條件,JOIN可分為兩大類:等值連接和非等值連接。等值連接會涉及一個或多個需要同時滿足的相等條件。在兩個輸入數(shù)據(jù)集的屬性之間應(yīng)用每個等值條件。當(dāng)使用其他運算符(運算連接符不為=
)時,稱之為非等值連接。
在輸入數(shù)據(jù)集的記錄之間應(yīng)用連接條件之后,JOIN類型會影響JOIN操作的結(jié)果。主要有以下幾種JOIN類型:
Spark提供了5種JOIN策略來執(zhí)行具體的JOIN操作。該5種JOIN策略如下所示:
當(dāng)要JOIN的表數(shù)據(jù)量比較大時,可以選擇Shuffle Hash Join。這樣可以將大表進(jìn)行按照J(rèn)OIN的key進(jìn)行重分區(qū),保證每個相同的JOIN key都發(fā)送到同一個分區(qū)中。如下圖示:
如上圖所示:Shuffle Hash Join的基本步驟主要有以下兩點:
也稱之為Map端JOIN。當(dāng)有一張表較小時,我們通常選擇Broadcast Hash Join,這樣可以避免Shuffle帶來的開銷,從而提高性能。比如事實表與維表進(jìn)行JOIN時,由于維表的數(shù)據(jù)通常會很小,所以可以使用Broadcast Hash Join將維表進(jìn)行Broadcast。這樣可以避免數(shù)據(jù)的Shuffle(在Spark中Shuffle操作是很耗時的),從而提高JOIN的效率。在進(jìn)行 Broadcast Join 之前,Spark 需要把處于 Executor 端的數(shù)據(jù)先發(fā)送到 Driver 端,然后 Driver 端再把數(shù)據(jù)廣播到 Executor 端。如果我們需要廣播的數(shù)據(jù)比較多,會造成 Driver 端出現(xiàn) OOM。具體如下圖示:
Broadcast Hash Join主要包括兩個階段:
longMetric("dataSize") += dataSize
if (dataSize >= (8L << 30)) {
throw new SparkException(
s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")
}
該JOIN機制是Spark默認(rèn)的,可以通過參數(shù)spark.sql.join.preferSortMergeJoin進(jìn)行配置,默認(rèn)是true,即優(yōu)先使用Sort Merge Join。一般在兩張大表進(jìn)行JOIN時,使用該方式。Sort Merge Join可以減少集群中的數(shù)據(jù)傳輸,該方式不會先加載所有數(shù)據(jù)的到內(nèi)存,然后進(jìn)行hashjoin,但是在JOIN之前需要對join key進(jìn)行排序。具體圖示:
Sort Merge Join主要包括三個階段:
如果 Spark 中兩張參與 Join 的表沒指定join key(ON 條件)那么會產(chǎn)生 Cartesian product join,這個 Join 得到的結(jié)果其實就是兩張行數(shù)的乘積。
該方式是在沒有合適的JOIN機制可供選擇時,最終會選擇該種join策略。優(yōu)先級為:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.
在Cartesian 與Broadcast Nested Loop Join之間,如果是內(nèi)連接,或者非等值連接,則優(yōu)先選擇Broadcast Nested Loop策略,當(dāng)時非等值連接并且一張表可以被廣播時,會選擇Cartesian Join。
1.如果join類型支持,并且其中一張表能夠被廣播(spark.sql.autoBroadcastJoinThreshold值,默認(rèn)是10MB),則選擇 broadcast hash join
2.如果參數(shù)spark.sql.join.preferSortMergeJoin設(shè)定為false,且一張表足夠小(可以構(gòu)建一個hash map) ,則選擇shuffle hash join
3.如果join keys 是排序的,則選擇sort-merge join
4.如果是內(nèi)連接,選擇 cartesian join
5.如果可能會發(fā)生OOM或者沒有可以選擇的執(zhí)行策略,則最終選擇broadcast nested loop join
1.broadcast hint:
選擇broadcast nested loop join.
2.shuffle replicate NL hint: 如果是內(nèi)連接,則選擇cartesian product join
1.如果一張表足夠小(可以被廣播),則選擇 broadcast nested loop join
2.如果是內(nèi)連接,則選擇cartesian product join
3.如果可能會發(fā)生OOM或者沒有可以選擇的執(zhí)行策略,則最終選擇broadcast nested loop join
object JoinSelection extends Strategy
with PredicateHelper
with JoinSelectionHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.BroadcastHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
nonEquiCond,
planLater(left),
planLater(right)))
}
}
def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map {
buildSide =>
Seq(joins.ShuffledHashJoinExec(
leftKeys,
rightKeys,
joinType,
buildSide,
nonEquiCond,
planLater(left),
planLater(right)))
}
}
def createSortMergeJoin() = {
if (RowOrdering.isOrderable(leftKeys)) {
Some(Seq(joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
} else {
None
}
}
def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
} else {
None
}
}
def createJoinWithoutHint() = {
createBroadcastHashJoin(false)
.orElse {
if (!conf.preferSortMergeJoin) {
createShuffleHashJoin(false)
} else {
None
}
}
.orElse(createSortMergeJoin())
.orElse(createCartesianProduct())
.getOrElse {
val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
}
}
createBroadcastHashJoin(true)
.orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None }
.orElse(createShuffleHashJoin(true))
.orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
.getOrElse(createJoinWithoutHint())
if (canBuildLeft(joinType)) BuildLeft else BuildRight
}
def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
val maybeBuildSide = if (buildLeft && buildRight) {
Some(desiredBuildSide)
} else if (buildLeft) {
Some(BuildLeft)
} else if (buildRight) {
Some(BuildRight)
} else {
None
}
maybeBuildSide.map { buildSide =>
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition))
}
}
def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
} else {
None
}
}
def createJoinWithoutHint() = {
createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf))
.orElse(createCartesianProduct())
.getOrElse {
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), desiredBuildSide, joinType, condition))
}
}
createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
.orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
.getOrElse(createJoinWithoutHint())
case _ => Nil
}
}
到此,關(guān)于“Spark的JOIN策略有哪些”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。