您好,登錄后才能下訂單哦!
如何將kafka中的數(shù)據(jù)快速導(dǎo)入Hadoop,很多新手對此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
Kafka是一個(gè)分布式發(fā)布—訂閱系統(tǒng),由于其強(qiáng)大的分布式和性能特性,迅速成為數(shù)據(jù)管道的關(guān)鍵部分。它可完成許多工作,例如消息傳遞、指標(biāo)收集、流處理和日志聚合。Kafka的另一個(gè)有效用途是將數(shù)據(jù)導(dǎo)入Hadoop。使用Kafka的關(guān)鍵原因是它將數(shù)據(jù)生產(chǎn)者和消費(fèi)者分離,允許擁有多個(gè)獨(dú)立的生產(chǎn)者(可能由不同的開發(fā)團(tuán)隊(duì)編寫)。同樣,還有多個(gè)獨(dú)立的消費(fèi)者(也可能由不同的團(tuán)隊(duì)編寫)。此外,消費(fèi)者可以是實(shí)時(shí)/同步或批量/離線/異步。當(dāng)對比RabbitMQ等其他pub-sub工具時(shí),后一個(gè)屬性有很大區(qū)別。
要使用Kafka,有一些需要理解的概念:
topic—topic是相關(guān)消息的訂閱源;
分區(qū)—每個(gè)topic由一個(gè)或多個(gè)分區(qū)組成,這些分區(qū)是由日志文件支持的有序消息隊(duì)列;
生產(chǎn)者和消費(fèi)者—生產(chǎn)者和消費(fèi)者將消息寫入分區(qū)并從分區(qū)讀取。
Brokers—Brokers是管理topic和分區(qū)并為生產(chǎn)者和消費(fèi)者請求提供服務(wù)的Kafka流程。
Kafka不保證對topic的“完全”排序,只保證組成topic的各個(gè)分區(qū)是有序的。消費(fèi)者應(yīng)用程序可以根據(jù)需要強(qiáng)制執(zhí)行對“全局”topic排序。
圖5.14 顯示了Kafka的概念模型
圖5.15 顯示了如何在Kafka部署分發(fā)分區(qū)的示例
為了支持容錯(cuò),可以復(fù)制topic,這意味著每個(gè)分區(qū)可以在不同主機(jī)上具有可配置數(shù)量的副本。這提供了更高的容錯(cuò)能力,這意味著單個(gè)服務(wù)器死亡對數(shù)據(jù)或生產(chǎn)者和消費(fèi)者的可用性來說不是災(zāi)難性的。
此處采用Kafka版本0.8和Camus的0.8.X。
實(shí)踐:使用Camus將Avro數(shù)據(jù)從Kafka復(fù)制到HDFS
該技巧在已經(jīng)將數(shù)據(jù)流入Kafka用于其他目的并且希望將數(shù)據(jù)置于HDFS中的情況下非常有用。
問題
希望使用Kafka作為數(shù)據(jù)傳遞機(jī)制來將數(shù)據(jù)導(dǎo)入HDFS。
解決方案
使用LinkedIn開發(fā)的解決方案Camus將Kafka中的數(shù)據(jù)復(fù)制到HDFS。
討論
Camus是LinkedIn開發(fā)的一個(gè)開源項(xiàng)目。Kafka在LinkedIn大量部署,而Camus則用作將數(shù)據(jù)從Kafka復(fù)制到HDFS。
開箱即用,Camus支持Kafka中的兩種數(shù)據(jù)格式:JSON和Avro。在這種技術(shù)中,我們將通過Camus使用Avro數(shù)據(jù)。Camus對Avro的內(nèi)置支持要求Kafka發(fā)布者以專有方式編寫Avro數(shù)據(jù),因此對于這種技術(shù),我們假設(shè)希望在Kafka中使用vanilla序列化數(shù)據(jù)。
讓這項(xiàng)技術(shù)發(fā)揮作用需要完成三個(gè)部分的工作:首先要將一些Avro數(shù)據(jù)寫入Kafka,然后編寫一個(gè)簡單的類來幫助Camus反序列化Avro數(shù)據(jù),最后運(yùn)行一個(gè)Camus作業(yè)來執(zhí)行數(shù)據(jù)導(dǎo)入。
為了把Avro記錄寫入Kafka,在以下代碼中,需要通過配置必需的Kafka屬性來設(shè)置Kafka生成器,從文件加載一些Avro記錄,并將它們寫出到Kafka:
可以使用以下命令將樣本數(shù)據(jù)加載到名為test的Kafka的topic中:
Kafka控制臺使用者可用于驗(yàn)證數(shù)據(jù)是否已寫入Kafka,這會將二進(jìn)制Avro數(shù)據(jù)轉(zhuǎn)儲到控制臺:
完成后,編寫一些Camus代碼,以便可以在Camus中閱讀這些Avro記錄。
實(shí)踐:編寫Camus和模式注冊表
首先,需要了解三種Camus概念:
解碼器—解碼器的工作是將從Kafka提取的原始數(shù)據(jù)轉(zhuǎn)換為Camus格式。
編碼器—編碼器將解碼數(shù)據(jù)序列化為將存儲在HDFS中的格式。
Schema注冊表—提供有關(guān)正在編碼的Avro數(shù)據(jù)的schema信息。
正如前面提到的,Camus支持Avro數(shù)據(jù),但確實(shí)需要Kafka生產(chǎn)者使用Camus KafkaAvroMessageEncoder類來編寫數(shù)據(jù),該類為Avro序列化二進(jìn)制數(shù)據(jù)添加了部分專有數(shù)據(jù),可能是因?yàn)镃amus中的解碼器可以驗(yàn)證它是由該類編寫的。
在此示例中,使用 Avro serialization進(jìn)行序列化,因此需要編寫自己的解碼器。幸運(yùn)的是,這很簡單:
你可能已經(jīng)注意到我們在Kafka中寫了一個(gè)特定的Avro記錄,但在Camus中我們將該記錄讀作通用的Avro記錄,而不是特定的Avro記錄,這是因?yàn)镃amusWrapper類僅支持通用Avro記錄。否則,特定的Avro記錄可以更簡單地使用,因?yàn)榭梢允褂蒙傻拇a并具有隨之而來的所有安全特征。
CamusWrapper對象是從Kafka提取的數(shù)據(jù)。此類存在的原因是允許將元數(shù)據(jù)粘貼到envelope中,例如時(shí)間戳,服務(wù)器名稱和服務(wù)詳細(xì)信息。強(qiáng)烈建議使用的任何數(shù)據(jù)都有一些與每條記錄相關(guān)的有意義的時(shí)間戳(通常這將是創(chuàng)建或生成記錄的時(shí)間)。然后,可以使用接受時(shí)間戳作為參數(shù)的CamusWrapper構(gòu)造函數(shù):
public CamusWrapper(R record, long timestamp) { ... }
如果未設(shè)置時(shí)間戳,則Camus將在創(chuàng)建包裝器時(shí)創(chuàng)建新的時(shí)間戳。在確定輸出記錄的HDFS位置時(shí),在Camus中使用此時(shí)間戳和其他元數(shù)據(jù)。
接下來,需要編寫一個(gè)schema注冊表,以便Camus Avro編碼器知道正在寫入HDFS的Avro記錄的schema詳細(xì)信息。注冊架構(gòu)時(shí),還要指定從中拉出Avro記錄的Kafka的topic名稱:
運(yùn)行Camus
Camus在Hadoop集群上作為MapReduce作業(yè)運(yùn)行,希望在該集群中導(dǎo)入Kafka數(shù)據(jù)。需要向Camus提供一堆屬性,可以使用命令行或者使用屬性文件來執(zhí)行此操作,我們將使用此技術(shù)的屬性文件:
從屬性中可以看出,無需明確告訴Camus要導(dǎo)入哪些topic。Camus自動與Kafka通信以發(fā)現(xiàn)topic(和分區(qū))以及當(dāng)前的開始和結(jié)束偏移。
如果想要精確控制導(dǎo)入的topic,可以分別使用kafka.whitelist.topics和kafka.blacklist.topics列舉白名單(限制topic)和黑名單(排除topic),可以使用逗號作為分隔符指定多個(gè)topic,還支持正則表達(dá)式,如以下示例所示,其匹配topic的“topic1”或以“abc”開頭,后跟一個(gè)或多個(gè)數(shù)字的任何topic,可以使用與value完全相同的語法指定黑名單:
kafka.whitelist.topics=topic1,abc[0-9]+
一旦屬性全部設(shè)置完畢,就可以運(yùn)行Camus作業(yè)了:
這將導(dǎo)致Avro數(shù)據(jù)在HDFS中著陸。我們來看看HDFS中的內(nèi)容:
第一個(gè)文件包含已導(dǎo)入的數(shù)據(jù),其他供Camus管理。
可以使用AvroDump實(shí)用程序查看HDFS中的數(shù)據(jù)文件:
那么,當(dāng)Camus工作正在運(yùn)行時(shí)究竟發(fā)生了什么? Camus導(dǎo)入過程作為MapReduce作業(yè)執(zhí)行,如圖5.16所示。
隨著MapReduce中的Camus任務(wù)成功,Camus OutputCommitter(允許在任務(wù)完成時(shí)執(zhí)行自定義工作的MapReduce構(gòu)造)以原子方式將任務(wù)的數(shù)據(jù)文件移動到目標(biāo)目錄。OutputCommitter還為任務(wù)正在處理的所有分區(qū)創(chuàng)建偏移文件,同一作業(yè)中的其他任務(wù)可能會失敗,但這不會影響成功任務(wù)的狀態(tài)——成功任務(wù)的數(shù)據(jù)和偏移輸出仍然存在,因此后續(xù)的Camus執(zhí)行將從最后一個(gè)已知的成功狀態(tài)恢復(fù)處理。
接下來,讓我們看看Camus導(dǎo)入數(shù)據(jù)的位置以及如何控制行為。
數(shù)據(jù)分區(qū)
之前,我們看到了Camus導(dǎo)入位于Kafka的Avro數(shù)據(jù),讓我們仔細(xì)看看HDFS路徑結(jié)構(gòu),如圖5.17所示,看看可以做些什么來確定位置。
圖5.17 在HDFS中解析導(dǎo)出數(shù)據(jù)的Camus輸出路徑
路徑的日期/時(shí)間由從CamusWrapper中提取的時(shí)間戳確定,可以從MessageDecoder中的Kafka記錄中提取時(shí)間戳,并將它們提供給CamusWrapper,這將允許按照有意義的日期對數(shù)據(jù)進(jìn)行分區(qū),而不是默認(rèn)值,這只是在MapReduce中讀取Kafka記錄的時(shí)間。
Camus支持可插拔分區(qū)程序,允許控制圖5.18所示路徑的一部分。
圖5.18 Camus分區(qū)路徑
Camus Partitioner接口提供了兩種必須實(shí)現(xiàn)的方法:
例如,自定義分區(qū)程序可創(chuàng)建用于Hive分區(qū)的路徑。
Camus提供了一個(gè)完整的解決方案,可以在HDFS中從Kafka獲取數(shù)據(jù),并在出現(xiàn)問題時(shí)負(fù)責(zé)維護(hù)狀態(tài)和進(jìn)行錯(cuò)誤處理。通過將其與Azkaban或Oozie集成,可以輕松實(shí)現(xiàn)自動化,并根據(jù)消息時(shí)間組織HDFS數(shù)據(jù)執(zhí)行簡單的數(shù)據(jù)管理。值得一提的是,當(dāng)涉及到ETL時(shí),與Flume相比,它的功能是無懈可擊的。
Kafka捆綁了一種將數(shù)據(jù)導(dǎo)入HDFS的機(jī)制。它有一個(gè)KafkaETLInputFormat輸入格式類,可用于在MapReduce作業(yè)中從Kafka提取數(shù)據(jù)。要求編寫MapReduce作業(yè)以執(zhí)行導(dǎo)入,但優(yōu)點(diǎn)是可以直接在MapReduce流中使用數(shù)據(jù),而不是將HDFS用作數(shù)據(jù)的中間存儲。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進(jìn)一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。