您好,登錄后才能下訂單哦!
MapReduce如何實(shí)現(xiàn)Reduce端重分區(qū)Join操作優(yōu)化,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
一、重分區(qū)Join操作(Reduce端)
本文介紹的第一種方法是最基本的重分區(qū)Join操作,該方法允許執(zhí)行內(nèi)部和外部Join。開始之前,我們先搞清楚要解決的問(wèn)題是將大型數(shù)據(jù)集Join在一起,我們選用的解決方案是Reduce端重分區(qū)Join。該方法是一種Reduce端Join實(shí)現(xiàn),利用MapReduce的sortmerge將記錄組合在一起,作為單個(gè)MapReduce作業(yè)實(shí)現(xiàn),可支持N路連接,其中N是要連接的數(shù)據(jù)集數(shù)量。
Map端負(fù)責(zé)從數(shù)據(jù)集中讀取數(shù)據(jù),確定每個(gè)Join操作的value,并將該value的key輸出,輸出key包含在reducer中并將數(shù)據(jù)集組合在一起以生成最終結(jié)果。
單個(gè)reducer調(diào)用接收map函數(shù)Join操作發(fā)出的Key對(duì)應(yīng)的所有值,并將數(shù)據(jù)分N個(gè)分區(qū),其中N是要連接的數(shù)據(jù)集數(shù)量。reducer讀取連接value的所有輸入并將它們分區(qū)到內(nèi)存中,然后跨所有分區(qū)執(zhí)行笛卡爾積,并發(fā)出每個(gè)Join操作的結(jié)果。
圖6.10 重分區(qū)Join操作的基本MapReduce實(shí)現(xiàn)
MapReduce代碼要支持這種技術(shù),需要滿足以下條件:
支持多個(gè)map類,每個(gè)map類處理不同的輸入數(shù)據(jù)集,這是通過(guò)使用MultipleInputs類完成的。
需要一種方法來(lái)標(biāo)記mapper發(fā)出的記錄,以便可以與其原點(diǎn)的數(shù)據(jù)集相關(guān)聯(lián),本文將使用htuple項(xiàng)目處理MapReduce中的數(shù)據(jù)。
重分區(qū)Join操作的代碼如下:
可以使用以下命令運(yùn)行作業(yè)并查看輸出:
總結(jié)
Hadoop捆綁了一個(gè)hadoop-datajoin模塊,這是一個(gè)重分區(qū)Join操作框架,包括用于處理多個(gè)輸入數(shù)據(jù)集和執(zhí)行Join操作的管道。上述操作示例及hadoop-datajoin代碼是重分區(qū)Join的最基本形式,兩者都要求在執(zhí)行笛卡爾積之前將連接key的所有數(shù)據(jù)加載到內(nèi)存中,但如果連接key的基數(shù)大于可用內(nèi)存,那么,這種方法就不太適用。下一個(gè)技術(shù)將著眼解決此問(wèn)題。
二、優(yōu)化重分區(qū)Join操作
舊版重分區(qū)Join操作實(shí)現(xiàn)會(huì)浪費(fèi)大量空間,需要將給定key的所有value加載到內(nèi)存中才能執(zhí)行多路連接,將較小的數(shù)據(jù)集加載到內(nèi)存中才能迭代更大的數(shù)據(jù)集,沿途執(zhí)行Join更有效。
我們希望在MapReduce中執(zhí)行重分區(qū)Join,且無(wú)需緩存reducer中的所有記錄。優(yōu)化后的重分區(qū)Join框架將僅緩存要連接的其中一個(gè)數(shù)據(jù)集,以減少reducer中緩存的數(shù)據(jù)量。此優(yōu)化僅緩存來(lái)自兩個(gè)數(shù)據(jù)集中較小者的記錄,以減少緩存所有記錄的內(nèi)存開銷,圖6.11顯示了改進(jìn)的重分區(qū)Join實(shí)現(xiàn)。
圖6.11 重分區(qū)Join操作優(yōu)化MapReduce實(shí)現(xiàn)
該技術(shù)與舊版相比存在一定差異,此處使用輔助排序確保來(lái)自較小數(shù)據(jù)集的所有記錄在較大數(shù)據(jù)集的記錄之前到達(dá)reducer,以此來(lái)盡可能減少reducer中要緩存的數(shù)據(jù)量。此外,mapper會(huì)發(fā)出需要進(jìn)行Join操作的用戶名元組的key以及標(biāo)識(shí)原始數(shù)據(jù)集的字段。
以下代碼顯示了一個(gè)新的枚舉,顯示了用戶mapper如何填充元組字段:
需要更新MapReduce驅(qū)動(dòng)程序代碼以指示元組中的哪些字段應(yīng)用于排序、分區(qū)和分組:
分區(qū)程序應(yīng)僅基于用戶名進(jìn)行分區(qū),以便用戶的所有記錄都到達(dá)同一個(gè)reducer。
排序應(yīng)使用用戶名和數(shù)據(jù)集指示符,以便首先排序較小的數(shù)據(jù)集(由于USERS常量小于USER_LOGS常量,導(dǎo)致用戶記錄在用戶登錄之前排序)。
分組應(yīng)對(duì)用戶進(jìn)行分組,以便將兩個(gè)數(shù)據(jù)集都流式傳輸?shù)酵粋€(gè)reducer調(diào)用:
最后,我們要修改reducer以緩存?zhèn)魅氲挠脩粲涗洠缓髮⑵渑c用戶日志Join:
可以使用以下命令來(lái)運(yùn)行作業(yè)并查看輸出:
Hive
在執(zhí)行重分區(qū)Join操作時(shí),Hive可支持類似優(yōu)化。Hive可緩存Join鍵的所有數(shù)據(jù)集,然后流式傳輸大型數(shù)據(jù)集,使其不需要存儲(chǔ)在內(nèi)存中。假定在查詢時(shí),Hive最后指定的數(shù)據(jù)集最大。想象一下,你有兩個(gè)名為users和user_logs的表,而user_logs要大得多。要連接這些表,我們需要確保user_logs表被引用為查詢中的最后一個(gè):
如果不想重新查詢,可以使用STREAMTABLE提示告訴Hive哪個(gè)表更大:
總結(jié)
此操作實(shí)現(xiàn)通過(guò)僅緩沖較小數(shù)據(jù)集的value來(lái)改進(jìn)早期技術(shù),但它仍然存在數(shù)據(jù)在map和reducer之間的傳輸問(wèn)題,這是一個(gè)昂貴的網(wǎng)絡(luò)成本。此外,舊版可以支持N路連接,但是這種實(shí)現(xiàn)僅支持雙向連接。
三、使用Bloom過(guò)濾器來(lái)減少混洗數(shù)據(jù)
如果希望根據(jù)某些謂詞對(duì)數(shù)據(jù)子集執(zhí)行Join操作,例如“僅限居住在加利福尼亞地區(qū)的用戶”。到目前為止,我們還必須在reducer中執(zhí)行過(guò)濾器才可以實(shí)現(xiàn)這一目的 ,因?yàn)橹挥幸粋€(gè)數(shù)據(jù)集存放了有關(guān)狀態(tài)的詳細(xì)信息——用戶日志沒(méi)有該信息。接下來(lái),我將介紹如何在map端使用Bloom過(guò)濾器,這會(huì)對(duì)作業(yè)執(zhí)行時(shí)間產(chǎn)生很大影響。我要解決的問(wèn)題是在重分區(qū)Join操作中過(guò)濾數(shù)據(jù),但要將該過(guò)濾器推送到mapper。一個(gè)可行的解決方案是使用預(yù)處理作業(yè)創(chuàng)建Bloom過(guò)濾器,然后在重分區(qū)作業(yè)中加載Bloom過(guò)濾器以過(guò)濾mapper中的記錄。
Bloom過(guò)濾器是一種非常有用的隨機(jī)數(shù)據(jù)結(jié)構(gòu),它利用位數(shù)組簡(jiǎn)潔表明集合,并能判斷一個(gè)元素是否屬于該集合。然而,與Java中的HashSet相比,Bloom需要的內(nèi)存要少得多,因此它們非常適合處理大型數(shù)據(jù)集。此解決方案有兩個(gè)步驟,一是運(yùn)行作業(yè)來(lái)生成Bloom過(guò)濾器,該過(guò)濾器將對(duì)用戶數(shù)據(jù)進(jìn)行操作,并由居住在加利福尼亞地區(qū)的用戶填充;二是在重分區(qū)Join操作中使用此Bloom過(guò)濾器丟棄不需要的用戶,該過(guò)程需要Bloom過(guò)濾器的原因是用戶日志的mapper沒(méi)有狀態(tài)的詳細(xì)信息。
圖6.12 在重分區(qū)Join中使用Bloom過(guò)濾器的兩步過(guò)程
第1步:創(chuàng)建Bloom過(guò)濾器
第一個(gè)作業(yè)是創(chuàng)建Bloom過(guò)濾器,其中包含加利福尼亞州的用戶名。mapper生成中間Bloom過(guò)濾器,reducer將其組合成一個(gè)Bloom過(guò)濾器,作業(yè)輸出是包含序列化Bloom過(guò)濾器的Avro文件:
第2步:重分區(qū)Join
重分區(qū)Join與上文提到的唯一區(qū)別是mapper加載第一步中生成的Bloom過(guò)濾器,并且在處理map記錄時(shí),執(zhí)行針對(duì)Bloom過(guò)濾器的元素審查以確定是否應(yīng)將記錄發(fā)送給reducer。以下代碼顯示了兩件事:一般化Bloom過(guò)濾器加載、抽象mapper以及支持兩個(gè)Join數(shù)據(jù)集的子類:
以下命令運(yùn)行兩個(gè)作業(yè)并轉(zhuǎn)儲(chǔ)Join輸出:
總結(jié)
該技術(shù)提出了一種在兩個(gè)數(shù)據(jù)集上執(zhí)行map端過(guò)濾的有效方法,以最小化mapper和reducer之間的網(wǎng)絡(luò)I/O。作為shuffle的一部分,它還減少了mapper和reducer的磁盤溢出數(shù)據(jù)量。過(guò)濾器通常是加速和優(yōu)化作業(yè)最簡(jiǎn)單有效的方法,重分區(qū)Join也同樣適用于其他MapReduce作業(yè)。
四、reducer端Join操作可能發(fā)生數(shù)據(jù)傾斜
數(shù)據(jù)傾斜是實(shí)際操作中很容易碰到的問(wèn)題,可能存在兩種類型的數(shù)據(jù)傾斜:
高Join-key基數(shù),其中有一些連接key在一個(gè)或兩個(gè)數(shù)據(jù)集中具有大量記錄,我把這種稱之為join-product偏差。
糟糕的散列分區(qū),少數(shù)reducer在總記錄數(shù)中占很大比例,我將此稱為散列分區(qū)傾斜。
五、加入具有高連接密鑰基數(shù)的大型數(shù)據(jù)集
這種技術(shù)解決了join-product的傾斜問(wèn)題,下一個(gè)技術(shù)檢查了散列分區(qū)偏差?,F(xiàn)在面臨的問(wèn)題是某些連接key是高基數(shù)的,這會(huì)導(dǎo)致某些reducer在嘗試緩存這些key時(shí)耗盡內(nèi)存。我們可以過(guò)濾掉這些key并將它們單獨(dú)連接或?qū)⑵湟绯龅絩educer中并安排后續(xù)作業(yè)Join。
如果提前知道了哪些Key是高基數(shù)的,則可以將其分成單獨(dú)的Join作業(yè),如果不確定高基數(shù)Key是哪些,則可能需要在reducer中構(gòu)建智能檢測(cè)并將其寫入副本文件,該文件由后續(xù)作業(yè)Join,如圖6.14所示。
圖6.13 提前知道高基數(shù)密鑰時(shí)處理傾斜
圖6.14 提前知道高基數(shù)密鑰處理時(shí)的偏差
Hive
Hive支持類似于第二種方法的偏斜緩解策略,運(yùn)行作業(yè)之前可指定以下配置啟用:
可以選擇設(shè)置一些其他配置來(lái)控制在高基數(shù)key上運(yùn)行的map端連接:
最后,如果在SQL中使用GROUP BY,可能還需要考慮啟用以下配置來(lái)處理分組數(shù)據(jù)中的偏差:
總結(jié)
此技術(shù)假設(shè)給定的Join鍵,只有一個(gè)數(shù)據(jù)集具有高基數(shù)出現(xiàn),因此可緩存較小數(shù)據(jù)集的map端連接。如果兩個(gè)數(shù)據(jù)集都是高基數(shù)的,那么將面臨一個(gè)昂貴的笛卡爾積運(yùn)算,執(zhí)行起來(lái)會(huì)很慢,因?yàn)樗贿m合MapReduce的工作方式(這意味著它本身不可拆分和可并行化)。在這種情況下,我們應(yīng)該重新檢查是否有任何技術(shù)(如過(guò)濾或投影)可幫助減少執(zhí)行join所需的時(shí)間。
六、處理由散列分區(qū)生成的偏差
MapReduce的默認(rèn)分區(qū)程序是一個(gè)散列分區(qū)程序,接受每個(gè)map輸出key的散列,并對(duì)reducer數(shù)量建模,以確定key被發(fā)送到哪個(gè)reducer。散列分區(qū)程序可以很好地用作通用分區(qū)程序,但是有些數(shù)據(jù)集可能會(huì)導(dǎo)致散列分區(qū)程序因一些不成比例的密鑰散列到同一個(gè)reducer而使其重載。與大多數(shù)reducer相比,這些reducer需要更長(zhǎng)時(shí)間才能完成。此外,當(dāng)檢查straggler reducer計(jì)數(shù)器時(shí),會(huì)注意到發(fā)送給落后者的組數(shù)遠(yuǎn)遠(yuǎn)高于已完成的其他組。
區(qū)分高基數(shù)key與散列分區(qū)引起的偏差可以使用MapReduce reducer來(lái)識(shí)別數(shù)據(jù)傾斜類型。由性能較差的哈希分區(qū)器引入的偏差將具有更多的組(唯一密鑰)發(fā)送到這些reducer,而導(dǎo)致傾斜的高基數(shù)密鑰可以通過(guò)所有reducer中大致相等數(shù)量的組來(lái)證明,傾斜越多,reducer的記錄數(shù)量越多。
我們要解決的問(wèn)題是reducer端連接需要很長(zhǎng)時(shí)間才能完成,而落后的組需要比大多數(shù)reducer更長(zhǎng)時(shí)間。使用范圍分區(qū)程序或編寫自定義分區(qū)程序,將偏移的key集中到一組reducer。此解決方案的目標(biāo)是省去默認(rèn)的散列分區(qū)程序,并將其替換為可以更好處理數(shù)據(jù)傾斜的內(nèi)容,本文提供兩個(gè)選項(xiàng)可供探索:
使用與Hadoop捆綁在一起的sampler和TotalOrderPartitioner,將散列分區(qū)程序替換為范圍分區(qū)程序。
編寫自定義分區(qū)程序,將具有數(shù)據(jù)傾斜的key路由到為傾斜key保留的Reducer。
范圍分區(qū)法
范圍分區(qū)根據(jù)預(yù)定義值分配map輸出,其中每個(gè)map接收該范圍內(nèi)的所有reducer,這正是TotalOrderPartitioner的工作原理。實(shí)際上,TeraSort使用TotalOrderPartitioner在所有Reducer之間均勻分布,以最大限度減少數(shù)據(jù)傾斜。TotalOrderPartitioner附帶采樣器,可對(duì)輸入數(shù)據(jù)進(jìn)行采樣并將其寫入HDFS,然后在分區(qū)時(shí)由TotalOrderPartitioner使用。
自定義分區(qū)法
如果已經(jīng)知道哪些Key顯示數(shù)據(jù)傾斜,并且該組Key是靜態(tài)的,則可以編寫自定義分區(qū)程序以將這些高基數(shù)key推送到一組reducer。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。